RxJava2-Transformation operator study notes

RxJava2-Transformation operator study notes

This study note mainly refers to Android RxJava: Detailed Explanation of Transformation Operators in Graphic and Text This blog, the original blog is full of graphics and texts, thanks to the original author for sharing, you can click on the blue font above or click here to view the content of the original blog. This article learns The pictures in the notes are all from the pictures of the original blog. Thanks again to the blogger for sharing.


This study note mainly records

map()
,
flatMap()
,
concatMap()
,
buffer()
The functions and usage of the four transformation operators are simply tracked
map()
The source code of the method.

The environment of the study notes of this film:

implementation 'io.reactivex.rxjava2: rxjava: 2.2.21' copy the code

Here is the text of the note:

RxJava transformation operator learning

Overview

The main functions of transformation operators are:

  • Process (ie transform) the events in the event sequence/the entire event sequence to transform it into a different event/the entire event sequence

The specific schematic diagram is as follows:

Here is the picture from the original blogger s article, thank you for the picture in the blogger s article, please click here to view the original blogger s article , here is the original blogger s short book homepage

map()

map()
The function of the operator is: each event sent by the observer is processed through a specified function, thereby transforming it into another event, that is, the event sent by the observer is converted into any type of event.
map()
The execution diagram of the operator is as follows:

Here is the picture from the original blogger s article, thank you for the picture in the blogger s article, please click here to view the original blogger s article , here is the original blogger s short book homepage

The code demonstration is as follows:

Observable.just( 1 , 2 , 3 , 4 , 5 ) .map( object : Function< Int , String> { override fun apply (t: Int ) : String { return "this is $t " } }) .subscribe { Logs.e( "transformWithMap: $it " ) } Copy code

In the code above, we first pass

just()
Created an Observed
Observable
, Launch events in turn, and then pass
map()
The data transmitted every time has been processed, you can see
map(Function<T,R>)
Accept one
Function<T,R>
Type parameter, this is an interface, we implemented this interface by creating an anonymous inner class and overridden it
apply(T): R
Method, the final running effect of the above code is:

2021-03-26 14:16:37.755 24371-24371/com.project.mystudyproject E/com.project.mystudyproject: transformWithMap: this is 1 2021-03-26 14:16:37.755 24371-24371/com.project.mystudyproject E/com.project.mystudyproject: transformWithMap: this is 2 2021-03-26 14:16:37.756 24371-24371/com.project.mystudyproject E/com.project.mystudyproject: transformWithMap: this is 3 2021-03-26 14:16:37.756 24371-24371/com.project.mystudyproject E/com.project.mystudyproject: transformWithMap: this is 4 2021-03-26 14:16:37.756 24371-24371/com.project.mystudyproject E/com.project.mystudyproject: transformWithMap: this is 5 Copy code

It can also be seen from the above running results that every data we transmit is added

this is
Prefix.

Below is the right

map()
Simple source code tracking, because we have tracked it in the previous note
Observable.create(ObservableOnSubscribe).subscribe(Observer)
The source code, when executed
Observable.just()
Method we will definitely get a
Observable
, And now we re going directly from
map()
Start tracking here.

//The map() method in Observable @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map (Function<? super T,? Extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null" ); return RxJavaPlugins.onAssembly( new ObservableMap<T, R>( this , mapper)); } Copy code

As you can see, it is still executing

RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper))
The code here, through the study of the previous note, we have been here until the default is to return the parameters passed in, that is to say through
map()
Method we will get a
ObservableMap
The objects are as follows:

public final class ObservableMap < T , U > extends AbstractObservableWithUpstream < T , U > { final Function<? super T,? extends U> function; public ObservableMap (ObservableSource<T> source, Function<? super T,? extends U> function) { super (source); this .function = function; } ...Other source code... } //Related parameters and methods in the parent class AbstractObservableWithUpStream /** The source consumable Observable. */ protected final ObservableSource<T> source; /** * Constructs the ObservableSource with the given consumable. * @param source the consumable Observable */ AbstractObservableWithUpstream(ObservableSource<T> source) { this .source = source; } Copy code

In the code above, we will

Observable.just()
Created
Observable
Save it and pass us in
Function
Save it.

After executing the above code, it will execute next

subscribe(Observer)
Method, through the study of the previous article, we can also understand that this method will eventually be executed
Observable.subscribeActual()
Method, here it will be executed
ObservableMap.subscribeActual()
Method, here is the source code of this method:

// @Override public void subscribeActual (Observer<? super U> t) { source.subscribe( new MapObserver<T, U>(t, function)); } Copy code

Will be executed here

source.subscribe(MapObserver())
Method, where
source
Is our first step
Observable.just()
Method returns
Observable
,here
Observable
The specific type is
ObservableFromArray
, Then execute
ObservableFromArray.subscribe()
In the end it is still executed
ObservableFromArray.subscribeActual()
Method, here is the source code of this method:

// @Override public void subscribeActual (Observer<? super T> observer) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array); observer.onSubscribe(d); if (d.fusionMode) { return ; } d.run(); } Copy code

As you can see, first of all

observer
Create
FromArrayDisposable
, Then this
observer
That's where we are
ObservableMap.subscribeActual()
Created in
MapObserver
, Knowing this, enter the
FromArrayDisposable.run()
In the method:

void run () { T[] a = array; int n = a.length; for ( int i = 0 ; i <n && !isDisposed(); i++) { T value = a[i]; if (value == null ) { downstream.onError( new NullPointerException( "The element at index " + i + " is null" )); return ; } downstream.onNext(value); } if (!isDisposed()) { downstream.onComplete(); } } Copy code

As you can see, here is to traverse the array, passing us

just
The parameters in are transmitted one by one, the specific method is to call
downstream.onNext(value)
Method, this
downstream
We are creating
FromArrayDisposable
Time passed
MapObserver
And below is
MapObserver
middle
onNext()
method:

@Override public void onNext (T t) { if (done) { return ; } if (sourceMode != NONE) { downstream.onNext( null ); return ; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value." ); } catch (Throwable ex) { fail(ex); return ; } downstream.onNext(v); } Copy code

In this method, first judge

sourceMode != NONE
, Under normal circumstances, this is not true, and then we will directly call the passed in
Function.apply()
Method to pass the array in the current array loop
apply()
The method executes our custom operation, and finally transmits the result.

The logic of this piece is a bit convoluted. Simply put:

  1. We we pass

    Observable.just()
    Created a
    ObservableFromArray
    , Which saves what we passed in the form of an array
    just
    Parameters in and then called
    map()
    Method, in this method, a
    ObservableMap
    , Here we use two variables to save what we have in the previous step
    ObservableFromArray
    with
    Function
    .

  2. Next called

    ObservableMap
    of
    subscribe(Observer)
    Method, in this method first created a
    MapObserver
    , Which saves the
    Function
    And then called
    ObservableMap
    Stored inside
    ObservableFromArray
    inside
    subscribe(MapObserver)
    Method and add the
    MapObserver
    Pass it in.

  3. Then in

    ObservableFromArray
    of
    subscribe(MapObserver())
    Created inside
    FromArrayDisposable(MapObserver,array)
    This object, the passed parameters are what we created
    MapObserver
    And by
    just()
    The array generated by the parameters inside is executed next
    FromArrayDisposable
    inside
    run()
    Method, in this method, the array is traversed, and every time the value in the array is obtained, it will be executed
    MapObserver
    middle
    onNext()
    Method and pass this value to
    onNext()
    In this way, our
    just()
    The parameters in are passed to
    MapObserver
    of
    onNext()
    bingo.

  4. Finally, we are

    MapObserver
    of
    onNext()
    In the method, the value in the obtained array is taken as
    Function.apply(value)
    The parameters in the method, combined with what we created in the first step
    Function
    Rewritten
    apply()
    Method, get the final data, and then call us in the second step
    ObservableMap.subscribe(Observer)
    Created in
    Observer
    of
    onNext()
    Method to transmit data to our
    Observer
    in.

flatMap()

The function of this method is to split & convert the event sequence sent by the observer separately, merge it into a new event sequence, and finally send it, but

flatMap()
The order of the sequence of events cannot be guaranteed.

The principle of this method is as follows:

  • Create one for each event in the event sequence
    Observable
    Object
  • Put the new event converted from each original event into the corresponding
    Observable
    in
  • Will be newly created for each
    Observable
    All merged into a new, total
    Observable
    Object
  • New, total
    Observable
    Send the newly merged sequence of events to the observer
    Observer

Here is the picture from the original blogger s article, thank you for the picture in the blogger s article, please click here to view the original blogger s article , here is the original blogger s short book homepage

The relevant code is demonstrated as follows:

Observable.create< Int > { it.onNext( 1 ) it.onNext( 2 ) it.onNext( 3 ) } .flatMap( object : Function< Int , Observable<String>> { override fun apply (t: Int ) : Observable<String> { val array = mutableListOf<String>() for (i in 0 until 3 ) { array.add( " $t --flatMap--> ${t * 10 + i} " ) } return Observable.fromIterable(array) } }) .subscribe { Logs.e( "transformWithFlatMap onNext: $it " ) } Copy code

The results of the above code running are as follows:

2021-03-26 15:45:51.129 26971-26971/com.project.mystudyproject E/com.project.mystudyproject: transformWithFlatMap onNext: 1--flatMap--> 10 2021-03-26 15:45:51.129 26971-26971/com.project.mystudyproject E/com.project.mystudyproject: transformWithFlatMap onNext: 1--flatMap--> 11 2021-03-26 15:45:51.129 26971-26971/com.project.mystudyproject E/com.project.mystudyproject: transformWithFlatMap onNext: 1--flatMap--> 12 2021-03-26 15:45:51.129 26971-26971/com.project.mystudyproject E/com.project.mystudyproject: transformWithFlatMap onNext: 2--flatMap--> 20 2021-03-26 15:45:51.129 26971-26971/com.project.mystudyproject E/com.project.mystudyproject: transformWithFlatMap onNext: 2--flatMap--> 21 2021-03-26 15:45:51.129 26971-26971/com.project.mystudyproject E/com.project.mystudyproject: transformWithFlatMap onNext: 2--flatMap--> 22 2021-03-26 15:45:51.129 26971-26971/com.project.mystudyproject E/com.project.mystudyproject: transformWithFlatMap onNext: 3--flatMap--> 30 2021-03-26 15:45:51.129 26971-26971/com.project.mystudyproject E/com.project.mystudyproject: transformWithFlatMap onNext: 3--flatMap--> 31 2021-03-26 15:45:51.129 26971-26971/com.project.mystudyproject E/com.project.mystudyproject: transformWithFlatMap onNext: 3--flatMap--> 32 Copy code

Created in several ways

Observable
, The final order is in order, but the official document also clearly states that the transformed elements may be interleaved, so the order should still be paid attention to here.

concatMap()

The role of this operator and

flatMap()
Similar, the main difference is: the sequence of the event sequence generated by the split & re-merge = the sequence of the observable s old sequence.

Here is the picture from the original blogger s article, thank you for the picture in the blogger s article, please click here to view the original blogger s article , here is the original blogger s short book homepage

The relevant code is demonstrated as follows:

Observable.just( 1 , 2 , 3 ) .concatMap( object : Function< Int , Observable<String>> { override fun apply (t: Int ) : Observable<String> { val list = mutableListOf<String>() for (i in t * 10 until t * 10 + 3 ) { list.add( " $t --concatMap--> $i " ) } return Observable.fromIterable(list) } }).subscribe { Logs.e( "transformWithConcatMap onNext: $it " ) } Copy code

The results of the operation are as follows:

2021-03-26 16:15:10.921 27535-27535/com.project.mystudyproject E/com.project.mystudyproject: transformWithConcatMap onNext: 1 --concatMap--> 10 2021-03-26 16:15:10.921 27535-27535/com.project.mystudyproject E/com.project.mystudyproject: transformWithConcatMap onNext: 1 --concatMap--> 11 2021-03-26 16:15:10.921 27535-27535/com.project.mystudyproject E/com.project.mystudyproject: transformWithConcatMap onNext: 1 --concatMap--> 12 2021-03-26 16:15:10.921 27535-27535/com.project.mystudyproject E/com.project.mystudyproject: transformWithConcatMap onNext: 2 --concatMap--> 20 2021-03-26 16:15:10.921 27535-27535/com.project.mystudyproject E/com.project.mystudyproject: transformWithConcatMap onNext: 2 --concatMap--> 21 2021-03-26 16:15:10.921 27535-27535/com.project.mystudyproject E/com.project.mystudyproject: transformWithConcatMap onNext: 2 --concatMap--> 22 2021-03-26 16:15:10.921 27535-27535/com.project.mystudyproject E/com.project.mystudyproject: transformWithConcatMap onNext: 3 --concatMap--> 30 2021-03-26 16:15:10.921 27535-27535/com.project.mystudyproject E/com.project.mystudyproject: transformWithConcatMap onNext: 3 --concatMap--> 31 2021-03-26 16:15:10.921 27535-27535/com.project.mystudyproject E/com.project.mystudyproject: transformWithConcatMap onNext: 3 --concatMap--> 32 Copy code

buffer()

The function of this method is: regularly get a certain number of events from the events that the observer needs to send & put them into the cache, and finally send them.

Here is the picture from the original blogger s article, thank you for the picture in the blogger s article, please click here to view the original blogger s article , here is the original blogger s short book homepage

The code demonstration is as follows:

Observable.just( 1 , 2 , 3 , 4 , 5 ) .buffer( 3 , 2 ) .subscribe { Logs.e( "transformWithBuffer onNext: $it " ) } Copy code

Parameter Description:

  • The first parameter
    3
    Means to get 3 data at a time, if the data is not enough, then there are a few to get a few
  • 2.parameter
    2
    Represents the step size, that is to say the first time from the first
    0
    Get data from the position, the second time from the
    0+2
    Data from locations, the third time from
    0+2+2
    Get data from the location, and so on.

The results of this method are as follows:

2021-03-26 16:50:53.794 30119-30119/com.project.mystudyproject E/com.project.mystudyproject: transformWithBuffer onNext: [1, 2, 3] 2021-03-26 16:50:53.795 30119-30119/com.project.mystudyproject E/com.project.mystudyproject: transformWithBuffer onNext: [3, 4, 5] 2021-03-26 16:50:53.795 30119-30119/com.project.mystudyproject E/com.project.mystudyproject: transformWithBuffer onNext: [5] Copy code

As you can see, the result is the same as the above parameter description.

Another usage

by

buffer()
The overloaded function can be found,
buffer
There are other usages, as shown below:

Observable.intervalRange( 10 , 100 , 0 , 1 ,TimeUnit.SECONDS) .buffer( 20 , 2 ,TimeUnit.SECONDS) .subscribe{ Logs.e( "transformWithBuffer onNext: $it " ) } Copy code

Parameter Description:

  • The first parameter
    20
    Means waiting
    20s
    Then start to get data from the observed
  • 2.parameter
    2
    Indicates that every subsequent
    2s
    Before getting
    20s
    Data sent by the internal observer
  • The third parameter is the time unit

In the code above, we pass

intervalRange
Created an observer, this
Observable
From
10
Start transmitting data every
1s
Launch once, then pass
buffer()
Set the buffer, and then print the contents of the buffer, the final printed result is as follows:

2021-03-26 17:03:12.562 30811-30864/com.project.mystudyproject E/com.project.mystudyproject: transformWithBuffer onNext: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19 , 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30] 2021-03-26 17:03:14.556 30811-30864/com.project.mystudyproject E/com.project.mystudyproject: transformWithBuffer onNext: [12, 13, 14, 15, 16, 17, 18, 19, 20, 21 , 22, 23, 24, 25, 26, 27, 28, 29, 30, 31] 2021-03-26 17:03:16.556 30811-30864/com.project.mystudyproject E/com.project.mystudyproject: transformWithBuffer onNext: [14, 15, 16, 17, 18, 19, 20, 21, 22, 23 , 24, 25, 26, 27, 28, 29, 30, 31, 32, 33] 2021-03-26 17:03:18.561 30811-30864/com.project.mystudyproject E/com.project.mystudyproject: transformWithBuffer onNext: [17, 18, 19, 20, 21, 22, 23, 24, 25, 26 , 27, 28, 29, 30, 31, 32, 33, 34, 35, 36] 2021-03-26 17:03:20.560 30811-30864/com.project.mystudyproject E/com.project.mystudyproject: transformWithBuffer onNext: [18, 19, 20, 21, 22, 23, 24, 25, 26, 27 , 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38] Copy code