Reactive Javascript

  • Hans Huang
  • 60 Minutes
  • January 17, 2019

From Wikipedia:

Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.

Pull & Push System

As the action direction, the Pull and Push describe the communication initiative between data Producer and Consumer.

In the pull system the data consumer determines when to get data, producer just need to get data prepared (in cache or call-by-need).
In the push system the data producer determines when to send data, consumer just need to get ready to process data.

Observable

In Javascript ecosystem, the most commonly used library is RxJS (yes, the Reactive Extension family).
The Reactive Extension introduces a new data model named “Observable” to stands for push system.
Below is a comparison between Observable and other data models we familiar in javascript.

Name Evaluation Execution Return Value Count
Function Lazy Synchronous 1
Generator Lazy Synchronous 0 ~ Infinity
Promise Eager Asynchronous 1
Observable Lazy Synchronous 0 ~ Infinity

Here highlight the sync execution for Observable, because someone (e.g. Angular) think the Reactive Programming is asynchronous , as well as RxJS, but it’s not true. Similar like a function, it can start a async execution, but the function body itself is executed synchronized.

Below sample code can help demonstrate it:

1
2
3
4
5
6
7
8
9
10
11
12
const { Observable } = require('rxjs')

const observable = Observable.create(observer => {
console.log('Before send value')
observer.next(1)
observer.next(2)
console.log('After send value')
})

console.log('Before subscribe')
observable.subscribe(value => console.log(value))
console.log('After subscribe')

The output is printed synchronized:

1
2
3
4
5
6
Before subscribe
Before send value
1
2
After send value
After subscribe

From above demo code you may found Observable looks like an event emitter. But there is significant difference between them:
For event emitter, no matter how many the listeners(0 to infinity), it will always be executed, and only once.
For Observable, the execution depends on count of subscriptions: no subscription no execution, 100 subscription then 100 execution.

There will be 3 “generating value” printed for below demo code:

1
2
3
4
5
6
7
8
9
const { Observable } = require('rxjs')
const observable = Observable.create(observer => {
console.log('generating value')
observer.next(1)
})

observable.subscribe()
observable.subscribe()
observable.subscribe()

Life cycle

From pervious demo code, the data is directly send by observer, but the next() is not its only functionality.
See below definition of Observer, it can explicitly terminate data producing by calling complete() or calling error() method if there is error exception.

1
2
3
4
5
6
export interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}

Corresponding for the subscriber can register 3 callbacks in subscribe() method:

1
2
3
4
5
export declare class Observable<T> implements Subscribable<T> {
//hide other class members

subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription;
}

You may found there is a Subscription object returned by the subscribe() method.
This means not only the Observer(data producer), but also the subscriber(data consumer) can terminate data flow.

1
2
3
4
5
6
7
export declare class Subscription implements SubscriptionLike {
//hide other class members

closed: boolean;

unsubscribe(): void;
}

Reminder: No matter the termination comes from Observer or Subscriber, the Observable may not completely disposed.

As below demo code, the “sending value” will be kept printing after subscription is closed:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const observable = Observable.create(observer => {
let id = setInterval(() => {
console.log('sending value')
observer.next(1)
}, 1000)

// return () => clearInterval(id)
})


const subscription = observable.subscribe(value => console.log(value))
setTimeout(() => {
subscription.unsubscribe()
console.log(`Subscription is closed: ${subscription.closed}`) // true
}, 3 * 1000)

As you assumed the solution is the commented line: return a resource dispose function when creating observable.

Subject

From above demo code you may found the Observable is unicast, every subscription has its own separate execution.
What if we want multiple subscriptions share a singleton execution(multicast) ? Then we can try “Subject”.
From this perspective, a Subject is more like a EventEmitter.

Subject class inherent Observable, so all observable functions & properties could be found inside.
But a new subscription won’t start a new execution, it will be added to internal subscription list.
It also can deliver value to subscriptions directly, since it wraps all members of Observer(closed, next(), error(), complete()).

So from usage perspective, a Subject could be Observable or a Observer, it depends on you need.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const { Subject, from } = require('rxjs')

const subject = new Subject()
subject.subscribe(value => console.log(`A: ${value}`))
subject.subscribe(value => console.log(`B: ${value}`))

// Below line: subject plays as a Observable
subject.next('Hello')

// Below 2 lines: subject plays as a Observer
const observable = from([1, 2, 3])
observable.subscribe(subject);

// Below line: subject plays as a Observable again, but it won't be effective
subject.next('World')

Below are the output, the “World” won’t be printed

1
2
3
4
5
6
7
8
A: Hello
B: Hello
A: 1
B: 1
A: 2
B: 2
A: 3
B: 3

With the multicast feature of a subject, an existing observable could be multicast as well, see te multicast operator.

Below are several handy Subject derivations provide by RX:

Operators

If you ever check some source codes which import Rx, you may be confuse with its variety operations (functions). Look twice to the operation, they are all pure function and provide functional programming style, regards data flow as list to transform, filter, consolidate…
Below are major categories for operations (we can refer with functional programming methods):

More documentation: