RxJS

字数 2026 · 2019-07-01

#javascript

Hello World

1
2
3
4
5
6
7
const { range } = require('rxjs');
const { map, filter } = require('rxjs/operators');

range(1, 200).pipe(
  filter(x => x % 2 === 1),
  map(x => x + x)
).subscribe(x => console.log(x));

Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe({
  next(x) { console.log('got value ' + x); },
  error(err) { console.error('something wrong occurred: ' + err); },
  complete() { console.log('done'); }
});
console.log('just after subscribe');

Observer

1
2
3
4
5
const observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

To use the Observer, provide it to the subscribe of an Observable:

1
observable.subscribe(observer);

When calling observable.subscribe with an Observer, the function subscribe in new Observable(function subscribe(subscriber) {...}) is run for that given subscriber.

The Subscription represents the ongoing execution, and has a minimal API which allows you to cancel that execution.

Operators

Operators are functions. There are two kinds of operators:

  • Pipeable Operators
  • Creation Operators

A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified.


Creation Operators are the other kind of operator, which can be called as standalone functions to create a new Observable.


1
map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));

Piping

1
2
3
4
5
6
7
// op4()(op3()(op2()(op1()(obs))))
obs.pipe(
  op1(),
  op2(),
  op3(),
  op3(),
)

Higher-order Observables

concatAll mergeAll switchAll exhaust
concatMap mergeMap switchMap exhaustMap

Pipeable Operators

  • map
  • first

Creation Operators

  • delay
  • interval
  • timer
  • fromEvent
  • ajax
  • bindCallback
  • iif

range

1
range(1, 10).subscribe(x => console.log(x));

of

1
of<T>(...args: Array<T | SchedulerLike>): Observable<T>

from

1
from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T>

generate

1
2
// initialState, condition, iterate
generate(1, x => x < 10, x => x + 1).subscribe(x => console.log(x));

Join Creation

Emitting values of multiple source Observables.

  • concat
  • merge
  • partition
  • race
  • zip
  • combineLatest
  • forkJoin

Transformation

  • map
  • pluck
  • scan
  • pairwise
  • mapTo
  • partition
  • expand
concatMap mergeMap switchMap exhaustMap
concatMapTo mergeMapTo switchMapTo  
buffer bufferCount bufferTime bufferToggle bufferWhen
window windowCount windowTime windowToggle windowWhen

switchMap

Maps each value to an Observable, then flattens all of these inner Observables.

1
2
3
4
5
6
7
import { of } from 'rxjs';
import { switchMap } from 'rxjs/operators';
 
const switched = of(1, 2, 3).pipe(switchMap((x: number) => of(x, x ** 2, x ** 3)));
switched.subscribe(x => console.log(x));

// 1, 1, 1, 1, 4, 9, 1, 8, 27

window

It’s like buffer, but emits a nested Observable instead of an array.

Filtering

  • filter
  • distinct - distinctUntilChanged - distinctUntilKeyChanged
  • first
  • last
  • elementAt
  • take - takeLast - takeUntil - takeWhile
  • skip - skipLast - skipUntil - skipWhile
  • debounce - debounceTime
  • throttle - throttleTime
  • audit - auditTime
  • sample - sampleTime
  • ignoreElements
  • single

Join

  • combineAll
  • concatAll
  • exhaust
  • mergeAll
  • startWith

Multicasting

  • share

Error Handling

  • catchError
  • retry

Utility

  • tap
  • delay
  • timeout - timeoutWith
  • toArray

Conditional & Boolean

  • every
  • find
  • findIndex
  • isEmpty
  • defaultIfEmpty

Mathematical & Aggregate

  • count
  • min
  • max
  • reduce

Subject

  • Subject
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Scheduler

A scheduler controls when a subscription starts and when notifications are delivered.

Use the operator observeOn to specify the async scheduler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
 
const observable = new Observable((observer) => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
}).pipe(
  observeOn(asyncScheduler)
);
 
console.log('just before subscribe');
observable.subscribe(x => console.log(x));
console.log('just after subscribe');

ObservableInput

1
export type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;
1
2
3
4
interface ArrayLike<T> {
    readonly length: number;
    readonly [n: number]: T;
}
1
2
3
interface Iterable<T> {
    [Symbol.iterator](): Iterator<T>;
}
1
export type SubscribableOrPromise<T> = Subscribable<T> | Subscribable<never> | PromiseLike<T> | InteropObservable<T>;
1
2
3
4
5
export interface Subscribable<T> {
  subscribe(observer?: PartialObserver<T>): Unsubscribable;

  subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Unsubscribable;
}
1
2
3
4
5
6
7
8
9
interface PromiseLike<T> {
    /**
     * Attaches callbacks for the resolution and/or rejection of the Promise.
     * @param onfulfilled The callback to execute when the Promise is resolved.
     * @param onrejected The callback to execute when the Promise is rejected.
     * @returns A Promise for the completion of which ever callback is executed.
     */
    then<TResult1 = T, TResult2 = never>(onfulfilled?: ((value: T) => TResult1 | PromiseLike<TResult1>) | undefined | null, onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null): PromiseLike<TResult1 | TResult2>;
}
1
export type InteropObservable<T> = { [Symbol.observable]: () => Subscribable<T>; };