Hello World
1
2
3
4
5
6
7
| const { range } = require('rxjs');
const { map, filter } = require('rxjs/operators');
range(1, 200).pipe(
filter(x => x % 2 === 1),
map(x => x + x)
).subscribe(x => console.log(x));
|
Observable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
console.log('just before subscribe');
observable.subscribe({
next(x) { console.log('got value ' + x); },
error(err) { console.error('something wrong occurred: ' + err); },
complete() { console.log('done'); }
});
console.log('just after subscribe');
|
Observer
1
2
3
4
5
| const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
|
To use the Observer
, provide it to the subscribe of an Observable
:
1
| observable.subscribe(observer);
|
When calling observable.subscribe
with an Observer
, the function
subscribe in new Observable(function subscribe(subscriber) {...})
is run for that given subscriber.
The Subscription represents the ongoing execution, and has a minimal API which allows you to cancel that execution.
Operators
Operators are functions
. There are two kinds of operators:
- Pipeable Operators
- Creation Operators
A Pipeable Operator is a function that takes an Observable
as its input and returns another Observable
. It is a pure operation: the previous Observable
stays unmodified.
Creation Operators are the other kind of operator, which can be called as standalone functions to create a new Observable
.
1
| map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));
|
Piping
1
2
3
4
5
6
7
| // op4()(op3()(op2()(op1()(obs))))
obs.pipe(
op1(),
op2(),
op3(),
op3(),
)
|
Higher-order Observables
concatAll
mergeAll
switchAll
exhaust
concatMap
mergeMap
switchMap
exhaustMap
Pipeable Operators
Creation Operators
delay
interval
timer
fromEvent
ajax
bindCallback
iif
range
1
| range(1, 10).subscribe(x => console.log(x));
|
of
1
| of<T>(...args: Array<T | SchedulerLike>): Observable<T>
|
from
1
| from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T>
|
generate
1
2
| // initialState, condition, iterate
generate(1, x => x < 10, x => x + 1).subscribe(x => console.log(x));
|
Join Creation
Emitting values of multiple source Observables
.
concat
merge
partition
race
zip
combineLatest
forkJoin
map
pluck
scan
pairwise
mapTo
partition
expand
concatMap |
mergeMap |
switchMap |
exhaustMap |
concatMapTo |
mergeMapTo |
switchMapTo |
|
buffer |
bufferCount |
bufferTime |
bufferToggle |
bufferWhen |
window |
windowCount |
windowTime |
windowToggle |
windowWhen |
switchMap
Maps each value to an Observable
, then flattens all of these inner Observables.
1
2
3
4
5
6
7
| import { of } from 'rxjs';
import { switchMap } from 'rxjs/operators';
const switched = of(1, 2, 3).pipe(switchMap((x: number) => of(x, x ** 2, x ** 3)));
switched.subscribe(x => console.log(x));
// 1, 1, 1, 1, 4, 9, 1, 8, 27
|
window
It’s like buffer
, but emits a nested Observable
instead of an array.
Filtering
filter
distinct
- distinctUntilChanged
- distinctUntilKeyChanged
first
last
elementAt
take
- takeLast
- takeUntil
- takeWhile
skip
- skipLast
- skipUntil
- skipWhile
debounce
- debounceTime
throttle
- throttleTime
audit
- auditTime
sample
- sampleTime
ignoreElements
single
Join
combineAll
concatAll
exhaust
mergeAll
startWith
Multicasting
Error Handling
Utility
tap
delay
timeout
- timeoutWith
toArray
Conditional & Boolean
every
find
findIndex
isEmpty
defaultIfEmpty
Mathematical & Aggregate
Subject
- Subject
- BehaviorSubject
- ReplaySubject
- AsyncSubject
Scheduler
A scheduler
controls when a subscription starts and when notifications are delivered.
Use the operator observeOn
to specify the async
scheduler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
observeOn(asyncScheduler)
);
console.log('just before subscribe');
observable.subscribe(x => console.log(x));
console.log('just after subscribe');
|
1
| export type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;
|
1
2
3
4
| interface ArrayLike<T> {
readonly length: number;
readonly [n: number]: T;
}
|
1
2
3
| interface Iterable<T> {
[Symbol.iterator](): Iterator<T>;
}
|
1
| export type SubscribableOrPromise<T> = Subscribable<T> | Subscribable<never> | PromiseLike<T> | InteropObservable<T>;
|
1
2
3
4
5
| export interface Subscribable<T> {
subscribe(observer?: PartialObserver<T>): Unsubscribable;
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Unsubscribable;
}
|
1
2
3
4
5
6
7
8
9
| interface PromiseLike<T> {
/**
* Attaches callbacks for the resolution and/or rejection of the Promise.
* @param onfulfilled The callback to execute when the Promise is resolved.
* @param onrejected The callback to execute when the Promise is rejected.
* @returns A Promise for the completion of which ever callback is executed.
*/
then<TResult1 = T, TResult2 = never>(onfulfilled?: ((value: T) => TResult1 | PromiseLike<TResult1>) | undefined | null, onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null): PromiseLike<TResult1 | TResult2>;
}
|
1
| export type InteropObservable<T> = { [Symbol.observable]: () => Subscribable<T>; };
|