RxJS Custom Operators

The RxJS library, due to its wide choice of operators, is rightfully considered an extremely powerful tool in the developer’s arsenal. In this post, I want to introduce you to the concept of RxJS custom operators with examples of implementation.

RxJS library thanks to a wide selection operators is considered to be an extremely powerful tool in a developer’s arsenal. I have recently prepared some self-written operators to improve the reusability of some operator combinations. In this post, I want to introduce you to the concept of RxJS custom operators with examples of implementation.

Identity operator

The RxJS operator is just a function that takes some observable data as input and returns the resulting stream. Therefore, the task of writing a custom RxJS operator is reduced to writing a regular JavaScript (TypeScript) function. Let’s start with a basic identity operator, which simply mirrors the observable source data:

import { interval, Observable } from "rxjs";
import { take } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function identity<T>(source$: Observable<T>): Observable<T> {
  return source$;
}

const results$ = source$.pipe(identity);

results$.subscribe(console.log);
  
// console output: 0, 1, 2

Next, we will write a custom operator with some elementary logic.

Logging operator

The following custom statement performs a side action (logs values ​​to the console) for each value in the original stream:

<>Copy
import { interval, Observable } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function log<T>(source$: Observable<T>): Observable<T> {
  return source$.pipe(tap(v => console.log(`log: ${v}`)));
}

const results$ = source$.pipe(log);

results$.subscribe(console.log);
  
// console output: log: 0, log: 1, log: 2

The resulting stream is based on data source $, which are modified by using the built-in operators in the pipe method.

Operator Factory

In some scenarios, it is useful to provide context for a custom operator. To do this, you can define a function that returns an operator. Factory arguments are in the lexical scope of the operator:

import { interval, Observable } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function logWithTag<T>(tag: string): (source$: Observable<T>) => Observable<T> {
  return source$ =>
    source$.pipe(tap(v => console.log(`logWithTag(${tag}): ${v}`)));
}

const results$ = source$.pipe(logWithTag("RxJS"));

results$.subscribe(console.log);
  
// console output: logWithTag(RxJS): 0, logWithTag(RxJS): 1, logWithTag(RxJS): 2

The description of the return type can be simplified by using the function MonoTypeOperatorFunction RxJS libraries. Also, using static pipe function the operator definition can be shortened:

import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function logWithTag<T>(tag: string): MonoTypeOperatorFunction<T> {
  return pipe(tap(v => console.log(`logWithTag(${tag}): ${v}`)));
}

const results$ = source$.pipe(logWithTag("RxJS"));

results$.subscribe(console.log);
  
// console output: logWithTag(RxJS): 0, logWithTag(RxJS): 1, logWithTag(RxJS): 2

You can read other useful tips for RxJS here

Observer-unique lexical scope

The operator factory function is called only once at the time the thread is defined. As a result, all observers will have a common lexical scope:

import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function tapOnce<T>(job: Function): MonoTypeOperatorFunction<T> {
  let isFirst = true;

  return pipe(
    tap(v => {
      if (!isFirst) {
        return;
      }

      job(v);
      isFirst = false;
    })
  );
}

const results$ = source$.pipe(tapOnce(() => console.log("First value emitted")));

results$.subscribe(console.log);
results$.subscribe(console.log);
  
// console output: First value emitted, 0, 0, 1, 1, 2, 2

To give each observer a unique lexical scope, you can apply defer function:

import { defer, interval, MonoTypeOperatorFunction } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function tapOnceUnique<T>(job: Function): MonoTypeOperatorFunction<T> {
  return source$ =>
    defer(() => {
      let isFirst = true;

      return source$.pipe(
        tap(v => {
          if (!isFirst) {
            return;
          }

          job(v);
          isFirst = false;
        })
      );
    });
}

const results$ = source$.pipe(tapOnceUnique(() => console.log("First value emitted")));

results$.subscribe(console.log);
results$.subscribe(console.log);
  
// console output: First value emitted, 0, First value emitted, 0, 1, 1, 2, 2

Another way to solve the problem tapOnce considered in one of my previous posts

Practical examples

Operator firstTruthy:

import { MonoTypeOperatorFunction, of, pipe } from "rxjs";
import { first } from "rxjs/operators";

const source1$ = of(0, "", "foo", 69);

function firstTruthy<T>(): MonoTypeOperatorFunction<T> {
  return pipe(first(v => Boolean(v)));
}

const result1$ = source1$.pipe(firstTruthy());

result1$.subscribe(console.log);

// console output: foo

Operator evenMultiplied:

import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { filter, map, take } from "rxjs/operators";

const source2$ = interval(10).pipe(take(3));

function evenMultiplied(multiplier: number): MonoTypeOperatorFunction<number> {
  return pipe(
    filter(v => v % 2 === 0),
    map(v => v * multiplier)
  );
}

const result2$ = source2$.pipe(evenMultiplied(3));

result2$.subscribe(console.log);
  
// console output: 0, 6

Operator liveSearch:

import { ObservableInput, of, OperatorFunction, pipe  } from "rxjs";
import { debounceTime, delay, distinctUntilChanged, switchMap } from "rxjs/operators";

const source3$ = of("politics", "sport");

type DataProducer<T> = (q: string) => ObservableInput<T>;

function liveSearch<R>(
  time: number,
  dataProducer: DataProducer<R>
): OperatorFunction<string, R> {
  return pipe(
    debounceTime(time),
    distinctUntilChanged(),
    switchMap(dataProducer)
  );
}

const newsProducer = (q: string) =>
  of(`Data fetched for ${q}`).pipe(delay(2000));

const result3$ = source3$.pipe(liveSearch(500, newsProducer));

result3$.subscribe(console.log);
  
// console output: Data fetched for sport

Conclusion

Typical combinations of RxJS operators can be moved into custom operators and reused in the future when implementing similar functionality. The use of generics ensures the correct typecasting of the output values ​​processed in the further pipe sequence.

Live example: [смотрите в оригинале]

I hope you enjoyed my post and learned something new.


The translation of the material was prepared as part of the course “JavaScript Developer. Professional“. If you are interested in learning more about the course, we invite you to Open Day online, where the teacher will talk about the training format and program.

Leave a Reply