Approaches:
- direct call of a built-in operator
pipe()
function- function returning observable:
function inRange(
start: number,
end: number
): MonoTypeOperatorFunction<number> {
return filter((x: number) => {
return x >= start && x <= end;
});
}
// ----
const stream$ = of(1,2,3,4,5,6,7,8).pipe(
inRange(2,6)
);
stream$.subscribe(console.log);
// 2 3 4 5 6
// Op<number> gives:
// UnaryFunction<Observable<number>, Observable<number>>
type Op<I, O = I, Obs = true> = UnaryFunction<
Obs extends true | 1 ? Observable<I> : I,
Obs extends true | 2 ? Observable<O> : O
>;
function even(): Op<number> {
return pipe(filter(
(x: number) => x % 2 === 0
));
}
function mulBy(x: number): Op<number> {
return pipe(map(
(n: number) => n * x
));
}
// ----
const stream$ = of(1,2,3,4,5,6,7,8).pipe(
even(),
mulBy(10)
);
stream$.subscribe(console.log);
// 20 40 60 80
function mulEvenBy(x: number) {
return pipe(
filter((x:number) => x % 2 === 0),
map((n: number) => n * x)
);
}
// ----
const stream$ = of(1,2,3,4,5,6,7,8).pipe(
mulEvenBy(10)
);
stream$.subscribe(console.log);
// 20 40 60 80
const stream$ = of(1,2,3,4,5,6,7,8).pipe(
pipe(
filter((x: number) => x % 2 === 0),
map((x: number) => x * 10)
)
);
stream$.subscribe(console.log);
// 20 40 60 80
function mulEvenBy(input: number) {
return function (source: Observable<number>) {
return source.pipe(
filter((x: number) => x % 2 === 0),
map((x: number) => x * input)
);
}
}
// ----
const stream$ = of(1,2,3,4,5,6,7,8).pipe(
mulEvenBy(10)
);
stream$.subscribe(console.log);
// 20 40 60 80
function sum() {
return function (source: Observable<number>) {
return new Observable(subscriber => {
let _sum = 0;
const subscription = source.subscribe({
next(val) {
_sum += val;
},
error(err) {
subscriber.error(err);
},
complete() {
subscriber.next(_sum);
subscriber.complete();
}
});
return () => subscription.unsubscribe();
});
};
}
// ----
const stream$ = of(1,2,3,4,5,6,7,8).pipe(
sum()
);
stream$.subscribe(console.log);
// 36
function takeUntilX(x: number) {
return function (source: Observable<number>) {
return new Observable(subscriber => {
const subscription = source.subscribe({
next(val) {
subscriber.next(val);
if (val === x) subscriber.complete();
},
error(err) {
subscriber(err);
},
complete() {
subscriber.complete();
}
});
return () => subscription.unsubscribe();
});
};
}
// ----
const stream$ = of(1,2,3,4,5,6,7,8).pipe(
takeUntilX(4)
);
stream$.subscribe(console.log);
// 1 2 3 4