rxjava meetup presentation
TRANSCRIPT
Guillaume Valverde Software Developer [email protected] @drevlav
A year with RXJavaSeptember / 2016
Kick Application
Observable / Observer Pattern
Observable : emits event.
Observer : listens to events and acts accordingly.
Observable: emits data called a stream
Emits 0, 1, x or an infinite amount of items
Observable: emits data called a stream
Emits 0, 1, x or an infinite amount of items
Observable: emits data called a stream
Emits 0, 1, x or an infinite amount of items
Observable: emits data called a stream
Emits 0, 1, x or an infinite amount of items
Observable: emits data called a stream
Emits 0, 1, x or an infinite amount of items
Complete
Observable: emits data called a stream
Emits 0, 1, x or an infinite amount of items
Complete
Observable: emits data called a stream
Emits 0, 1, x or an infinite amount of items
Send an error
Observable: emits data called a stream
Emits 0, 1, x or an infinite amount of items
Send an error
OBSERVABLE CONTRACT
0 to N events
nothing | error | complete
Observable exemples:• Mouse clicks• Button Clicks• Network request • Network State• Content Provider
update• list, array• UI State
Everything is an event
Observable.just(“rxjava”)
“rxjava”
Observable.just(“rxjava”) .subscribe();
Observable.from(…)
1 2 3
Integer[] ids = {1, 2, 3};Observable.from(ids) .subscribe();
Using Retrofit
Network Request
Response
RxView.clicks(View view)
first click
RxView.clicks(sendButton) .subscribe(click -> { do something… })
Second click
Third click
who is listening?
who is listening?The subscriber
who is listening?The subscriber
who is listening?The subscriber
Integer[] ids = {1, 2, 3};Observable.from(ids) .subscribe();
who is listening?The subscriber
Integer[] ids = {1, 2, 3};Observable.from(ids) .subscribe();
Subscriber: Observer & subscription
public abstract class Subscriber<T> implements Observer<T>, Subscription {
…
}
Subscriber : Observer & Subscription
public interface Observer<T> { void onCompleted(); void onError(Throwable e); void onNext(T t);}
public interface Subscription { void unsubscribe(); boolean isUnsubscribed();}
Observable / Subscriber (Observer & Subscription)
String[] ids = {"1", "2", "3"};
Subscription subscription = Observable.from(ids) .subscribe(id -> Log.v(TAG, id), error -> Log.e(TAG, "error: " + error.getMessage()), () -> Log.v(TAG, "complete"));
Stream on steroids operators
Composition
Map
RXJava is Awesome
.map(word -> word.length())
6 2 7
String[] ids = {“RXJava”, “is”, “Awesome”};Observable.from(ids)
FlatMap1 2 3
FlatMap1 2 3
.Flatmap ( )xxx
FlatMap1 2 3
.Flatmap ( )xxx
11
FlatMap1 2 3
.Flatmap ( )xxx
11
22
FlatMap1 2 3
.Flatmap ( )xxx
11
22
33
FlatMap1 2 3
.Flatmap ( )xxx
11
22
33
.flatten
FlatMap1 2 3
.Flatmap ( )xxx
11
22
33
.flatten
FlatMap1 2 3
.Flatmap ( )xxx
11
22
33
.flatten
1
FlatMap1 2 3
.Flatmap ( )xxx
11
22
33
.flatten
1 2
FlatMap1 2 3
.Flatmap ( )xxx
11
22
33
.flatten
1 2 2
FlatMap1 2 3
.Flatmap ( )xxx
11
22
33
.flatten
1 2 2 1
FlatMap1 2 3
.Flatmap ( )xxx
11
22
33
.flatten
1 2 2 1 3
FlatMap1 2 3
.Flatmap ( )xxx
11
22
33
.flatten
1 2 2 1 3 3
FlatMap1 2 3
.Flatmap ( )xxx
11
22
33
.flatten
1 2 2 1 3 3
id1 id2 id3
.flatMap(id -> fetchArticle())
Article1 Article2 Article3
FlatMapList ids … Observable.from(ids)
Compose withObservables
Retrieve a list of repositories
• more than 3 letters• cancel previous search when new one• make a search when the user is done typing
Compose with ObservablesRxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString) .switchMap(searchString -> networkManager.search(searchString)) .observeOn(AndroidSchedulers.mainThread()) .subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));
Type RxJava
Type RxJava
RxTextView.textChanges(editText)
Type RxJava
R Rx RxJ RxJa RxJav RxJava
RxTextView.textChanges(editText)
Type RxJava
R Rx RxJ RxJa RxJav RxJava
RxTextView.textChanges(editText)
.observeOn(Schedulers.io())
Type RxJava
R Rx RxJ RxJa RxJav RxJava
R Rx RxJ RxJa RxJav RxJava
RxTextView.textChanges(editText)
.observeOn(Schedulers.io())
Type RxJava
R Rx RxJ RxJa RxJav RxJava
R Rx RxJ RxJa RxJav RxJava
RxTextView.textChanges(editText)
.observeOn(Schedulers.io())
.debounce(500, TimeUnit.MILLISECONDS)
Type RxJava
R Rx RxJ RxJa RxJav RxJava
R Rx RxJ RxJa RxJav RxJava
RxTextView.textChanges(editText)
.observeOn(Schedulers.io())
.debounce(500, TimeUnit.MILLISECONDS)
RxJ RxJavaR
Type RxJava
R Rx RxJ RxJa RxJav RxJava
R Rx RxJ RxJa RxJav RxJava
RxTextView.textChanges(editText)
.observeOn(Schedulers.io())
.debounce(500, TimeUnit.MILLISECONDS)
.filter(charSequence -> charSequence.length() >= 3)
RxJ RxJavaR
Type RxJava
R Rx RxJ RxJa RxJav RxJava
R Rx RxJ RxJa RxJav RxJava
RxJ RxJava
RxTextView.textChanges(editText)
.observeOn(Schedulers.io())
.debounce(500, TimeUnit.MILLISECONDS)
.filter(charSequence -> charSequence.length() >= 3)
RxJ RxJavaR
RxJ RxJava
RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString)
RxJ RxJava
RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString)
.switchMap(searchString -> networkManager.search(searchString))
RxJ RxJava
RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString)
.switchMap(searchString -> networkManager.search(searchString))
List<Repo>
RxJ RxJava
RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString)
.switchMap(searchString -> networkManager.search(searchString)) List<Repo>
List<Repo>
RxJ RxJava
RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString)
.switchMap(searchString -> networkManager.search(searchString)) List<Repo>
List<Repo>
List<Repo>
RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString) .switchMap(searchString -> networkManager.search(searchString))
List<Repo>
RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString) .switchMap(searchString -> networkManager.search(searchString))
List<Repo>
.observeOn(AndroidSchedulers.mainThread())
RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString) .switchMap(searchString -> networkManager.search(searchString))
List<Repo>
.observeOn(AndroidSchedulers.mainThread())
List<Repo>
RxTextView.textChanges(editText) .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString) .switchMap(searchString -> networkManager.search(searchString))
List<Repo>
.observeOn(AndroidSchedulers.mainThread())
List<Repo>
.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));
R Rx RxJ RxJa RxJav RxJava
R Rx RxJ RxJa RxJav RxJava
RxJ RxJava
RxJ RxJava
List<Repo>
List<Repo>
R
RXjava is synchronous by default
RXjava is synchronous by default
for(String word: sentence) { Log.v(TAG, word);}
RXjava is synchronous by default
Same as
for(String word: sentence) { Log.v(TAG, word);}
RXjava is synchronous by default
Same as
for(String word: sentence) { Log.v(TAG, word);}
Observable.from(sentence) .subscribe(word -> Log.v(TAG, word));
THREADING
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
THREADING
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
THREADING
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
ObserveOn / SubscribeOn
Observable.from(ids)
.flatMap(id -> fetchArticle())
.subscribeOn(… newThread)
.observeOn(… mainThread)
.subscribe(…)
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
ObserveOn / SubscribeOn
Observable.from(ids)
.flatMap(id -> fetchArticle())
.subscribeOn(… newThread)
.observeOn(… mainThread)
.subscribe(…) CurrentThread
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
ObserveOn / SubscribeOn
Observable.from(ids)
.flatMap(id -> fetchArticle())
.subscribeOn(… newThread)
.observeOn(… mainThread)
.subscribe(…)
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
ObserveOn / SubscribeOn
Observable.from(ids)
.flatMap(id -> fetchArticle())
.subscribeOn(… newThread)
.observeOn(… mainThread)
.subscribe(…)
ChangeThread,New Thread
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
ObserveOn / SubscribeOn
Observable.from(ids)
.flatMap(id -> fetchArticle())
.subscribeOn(… newThread)
.observeOn(… mainThread)
.subscribe(…)
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
ObserveOn / SubscribeOn
Observable.from(ids)
.flatMap(id -> fetchArticle())
.subscribeOn(… newThread)
.observeOn(… mainThread)
.subscribe(…)
emit ids
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
ObserveOn / SubscribeOn
Observable.from(ids)
.flatMap(id -> fetchArticle())
.subscribeOn(… newThread)
.observeOn(… mainThread)
.subscribe(…)
emit an Article
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
ObserveOn / SubscribeOn
Observable.from(ids)
.flatMap(id -> fetchArticle())
.subscribeOn(… newThread)
.observeOn(… mainThread)
.subscribe(…)
emit an Article
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
ObserveOn / SubscribeOn
Observable.from(ids)
.flatMap(id -> fetchArticle())
.subscribeOn(… newThread)
.observeOn(… mainThread)
.subscribe(…) emit an Article
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
ObserveOn / SubscribeOn
Observable.from(ids)
.flatMap(id -> fetchArticle())
.subscribeOn(… newThread)
.observeOn(… mainThread)
.subscribe(…) act on Article
Observable.from(ids) .flatMap(id -> fetchArticle()) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(article -> actOnArticle(article));
THREADING SubscribeOn / ObserveOn
• Only 1 SubscribeOn effective.
• Multiple ObserveOn can work.
Organise your codeArchitecture / View Model / Separation of concern
business LogicReturn Observable
Input :Source Observable
.subscribe()
Unittesting
Integrationtesting
Acceptance / smoketesting
ViewView Model
Architecture
Hold the business logicFor the View
display uiaccording to state of the data
Data Layer
Store
Store
Binding
Hold the logic for dataemits Pojos
ViewView Model
Architecture
Hold the business logicFor the View
display uiaccording to state of the data
Data Layer
Store
Store
Binding
Hold the logic for dataemits Pojos
ViewView Model
Architecture
Hold the business logicFor the View
display uiaccording to state of the data
Data Layer
Store
Store
Binding
Hold the logic for dataemits Pojos
ViewView Model
Architecture
Hold the business logicFor the View
display uiaccording to state of the data
Data Layer
Store
Store
Binding
Hold the logic for dataemits Pojos
ViewView Model
Architecture
Hold the business logicFor the View
display uiaccording to state of the data
Data Layer
Store
Store
Binding
Hold the logic for dataemits Pojos
ViewView Model
Architecture
Hold the business logicFor the View
display uiaccording to state of the data
Data Layer
Store
Store
Binding
Hold the logic for dataemits Pojos
updatetrigger
ViewView Model
Architecture
Hold the business logicFor the View
display uiaccording to state of the data
Data Layer
Store
Store
Binding
Hold the logic for dataemits Pojos
ViewView Model
Architecture
Hold the business logicFor the View
display uiaccording to state of the data
Data Layer
Store
Store
Binding
Hold the logic for dataemits Pojos
ViewView Model
Architecture
Hold the business logicFor the View
display uiaccording to state of the data
Data Layer
Store
Store
Binding
Hold the logic for dataemits Pojos
.observeOn(Schedulers.io())
.debounce(500, TimeUnit.MILLISECONDS)
.filter(charSequence -> charSequence.length() > 3)
.map(CharSequence::toString)
.switchMap(word -> dataLayer.fetchListRepo(word))
.observeOn(AndroidSchedulers.mainThread())
RxTextView.textChanges(editText)
.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));
.observeOn(Schedulers.io())
.debounce(500, TimeUnit.MILLISECONDS)
.filter(charSequence -> charSequence.length() > 3)
.map(CharSequence::toString)
.switchMap(word -> dataLayer.fetchListRepo(word))
.observeOn(AndroidSchedulers.mainThread())
Input :Source Observable
RxTextView.textChanges(editText)
.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));
dataLayer
UIView
dependencies:
.observeOn(Schedulers.io())
.debounce(500, TimeUnit.MILLISECONDS)
.filter(charSequence -> charSequence.length() > 3)
.map(CharSequence::toString)
.switchMap(word -> dataLayer.fetchListRepo(word))
.observeOn(AndroidSchedulers.mainThread())
ViewModel
Input :Source Observable
RxTextView.textChanges(editText)
.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));
dataLayer
UIView
dependencies:
.observeOn(Schedulers.io())
.debounce(500, TimeUnit.MILLISECONDS)
.filter(charSequence -> charSequence.length() > 3)
.map(CharSequence::toString)
.switchMap(word -> dataLayer.fetchListRepo(word))
.observeOn(AndroidSchedulers.mainThread())
ViewModel
Input :Source Observable
RxTextView.textChanges(editText)
.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));
dataLayer
UIView
dependencies:
.observeOn(Schedulers.io())
.debounce(500, TimeUnit.MILLISECONDS)
.filter(charSequence -> charSequence.length() > 3)
.map(CharSequence::toString)
.switchMap(word -> dataLayer.fetchListRepo(word))
.observeOn(AndroidSchedulers.mainThread())
ViewModel
Input :Source Observable
.subscribe(…)
RxTextView.textChanges(editText)
.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));
dataLayer
UIView
dependencies:
.observeOn(Schedulers.io())
.debounce(500, TimeUnit.MILLISECONDS)
.filter(charSequence -> charSequence.length() > 3)
.map(CharSequence::toString)
.switchMap(word -> dataLayer.fetchListRepo(word))
.observeOn(AndroidSchedulers.mainThread())
ViewModel
Input :Source Observable
.subscribe(…)
RxTextView.textChanges(editText)
.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));
dataLayer
UIView
dependencies:
.observeOn(Schedulers.io())
.debounce(500, TimeUnit.MILLISECONDS)
.filter(charSequence -> charSequence.length() > 3)
.map(CharSequence::toString)
.switchMap(word -> dataLayer.fetchListRepo(word))
.observeOn(AndroidSchedulers.mainThread())
ViewModel
Input :Source Observable
.subscribe(…)
RxTextView.textChanges(editText)
.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));
dataLayer
UIView
dependencies:
.observeOn(Schedulers.io())
.debounce(500, TimeUnit.MILLISECONDS)
.filter(charSequence -> charSequence.length() > 3)
.map(CharSequence::toString)
.switchMap(word -> dataLayer.fetchListRepo(word))
.observeOn(AndroidSchedulers.mainThread())
ViewModel
Input :Source Observable
.subscribe(…)
RxTextView.textChanges(editText)
.subscribe(list -> searchRecyclerAdapter.refreshListRepo(list), error -> Logger.logOnNextError(TAG));
dataLayer
UIView
dependencies:
ViewView Model
Architecture
Hold the business logicFor the View
display uiaccording to state of the data
Data Layer
Store
Store
Binding
Hold the logic for dataemits Pojos
public class ViewModel { private Observable<CharSequence> charSequenceObservable; private DataLayer dataLayer; ViewModel(Observable<CharSequence> charSequenceObservable, DataLayer dataLayer) { this.charSequenceObservable = charSequenceObservable; this.dataLayer = dataLayer; } Observable<List<GitHubRepository>> getListObservable() { return charSequenceObservable .observeOn(Schedulers.io()) .debounce(500, TimeUnit.MILLISECONDS) .filter(charSequence -> charSequence.length() > 3) .map(CharSequence::toString) .switchMap(word -> dataLayer.search(word)); }}
Thank you