From Wikipedia:
Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.
Pull & Push System
As the action direction, the Pull and Push describe the communication initiative between data Producer and Consumer.
In the pull system the data consumer determines when to get data, producer just need to get data prepared (in cache or call-by-need).
In the push system the data producer determines when to send data, consumer just need to get ready to process data.
Observable
In Javascript ecosystem, the most commonly used library is RxJS (yes, the Reactive Extension family).
The Reactive Extension introduces a new data model named “Observable” to stands for push system.
Below is a comparison between Observable and other data models we familiar in javascript.
Name | Evaluation | Execution | Return Value Count |
---|---|---|---|
Function | Lazy | Synchronous | 1 |
Generator | Lazy | Synchronous | 0 ~ Infinity |
Promise | Eager | Asynchronous | 1 |
Observable | Lazy | Synchronous | 0 ~ Infinity |
Here highlight the sync execution for Observable, because someone (e.g. Angular) think the Reactive Programming is asynchronous , as well as RxJS, but it’s not true. Similar like a function, it can start a async execution, but the function body itself is executed synchronized.
Below sample code can help demonstrate it:
1 | const { Observable } = require('rxjs') |
The output is printed synchronized:
1 | Before subscribe |
From above demo code you may found Observable looks like an event emitter. But there is significant difference between them:
For event emitter, no matter how many the listeners(0 to infinity), it will always be executed, and only once.
For Observable, the execution depends on count of subscriptions: no subscription no execution, 100 subscription then 100 execution.
There will be 3 “generating value” printed for below demo code:
1 | const { Observable } = require('rxjs') |
Life cycle
From pervious demo code, the data is directly send by observer, but the next() is not its only functionality.
See below definition of Observer, it can explicitly terminate data producing by calling complete() or calling error() method if there is error exception.
1 | export interface Observer<T> { |
Corresponding for the subscriber can register 3 callbacks in subscribe() method:
1 | export declare class Observable<T> implements Subscribable<T> { |
You may found there is a Subscription object returned by the subscribe() method.
This means not only the Observer(data producer), but also the subscriber(data consumer) can terminate data flow.
1 | export declare class Subscription implements SubscriptionLike { |
Reminder: No matter the termination comes from Observer or Subscriber, the Observable may not completely disposed.
As below demo code, the “sending value” will be kept printing after subscription is closed:
1 | const observable = Observable.create(observer => { |
As you assumed the solution is the commented line: return a resource dispose function when creating observable.
Subject
From above demo code you may found the Observable is unicast, every subscription has its own separate execution.
What if we want multiple subscriptions share a singleton execution(multicast) ? Then we can try “Subject”.
From this perspective, a Subject is more like a EventEmitter.
Subject class inherent Observable, so all observable functions & properties could be found inside.
But a new subscription won’t start a new execution, it will be added to internal subscription list.
It also can deliver value to subscriptions directly, since it wraps all members of Observer(closed, next(), error(), complete()).
So from usage perspective, a Subject could be Observable or a Observer, it depends on you need.
1 | const { Subject, from } = require('rxjs') |
Below are the output, the “World” won’t be printed
1 | A: Hello |
With the multicast feature of a subject, an existing observable could be multicast as well, see te multicast operator.
Below are several handy Subject derivations provide by RX:
BehaviorSubject: the latest (or initial) value will be remembered, and will deliver to every new subscription.
ReplaySubject: similar as BehaviorSubject, it can remember multiple values, and deliver to new subscriptions.
AsyncSubject: don’t misleading by the name, there is nothing relation with async execution, this subject will only deliver the last value (only once) before completion.
Operators
If you ever check some source codes which import Rx, you may be confuse with its variety operations (functions). Look twice to the operation, they are all pure function and provide functional programming style, regards data flow as list to transform, filter, consolidate…
Below are major categories for operations (we can refer with functional programming methods):
Creation
- Create, Defer: create observable eagerly or lazily.
- From, Range, Repeat: create observable and emit provided values by iteration.
- Just, Start: create observable and emit one value by provide it directly or execute a function
- Interval, Timer: create observable and emit values in specific time interval or timeout.
- Empty/Never/Throw: create observable without emit values, but directly terminate or never terminate or throw error.
Transform
- Map(Select),FlatMap(SelectMany): transform and then emit the values from one or multiple observables.
- Scan, Reduce: apply a function which accept 2 parameters(previous/initial value, current value) to calculate a new value, and then emit them individually or only emit final value.
- GroupBy, Window, Buffer: subdivide and emit values by specific group key, or emit in a fix time period, or some other boundary selectors.
Filter
- First, Last, ElementAt: filter and emit one value by specific condition
- Filter, Distinct, Sample, Debounce: filter and emit values by specific condition, or the latest item in a particular time interval or timespan.
- Skip, SkipLast, Take, TakeLast, IgnoreElements: ignore some items by its index, or ignore all of them and emit termination directly.
- SkipWhile, TakeWhile: ignore or only take some item while match specific condition.
- SkipUntil, TakeUntil: ignore or take items until second observable emit values.
- Amb(like Match): from multiple observables, emit one’s values which emit value first.
Combination
- CombineLatest/WithLatestFrom, Zip, Join: combine multiple observables, use a callback function to accept and calculate with their latest values or values in sequence or in specific time window.
- Concat: emit the values from multiple observables without interleaving them.
- Merge, StartWith, switchAll: combine multiple observables, and them emit their values in sequence.
Predication
- All, Contains: determine whether all or any items meet specific criteria.
- SequenceEqual: determine whether two Observables emit the same sequence of items
Math
Error Handler
- catchError: catch error from observable, and then continue with a different observable or throw a new error.
- Retry: mirror a source observable, continue emitting value and skip the error.
Utility
More documentation: