David East Profile PictureTea Time!

tldr — Useful patterns for common problems and to help you learn RxJS along the way.

RxJS can feel like a lot when you’re getting started. There’s a lot of terminology: Observable, Operator, Publisher, Subscriber, Scheduler, Subject, BehaviorSubject. There’s a lot of operators! Over 90 of them. Zip, scan, merge, map, mergeMap, switchMap, switchMapTo, forkJoin, first, take, takeUntil. Reading all of this might just make you feel exhaust…ed.

But it doesn’t have to be that way. These aren’t chores! They’re tools! Tools for solving your event driven problems. You don’t have to learn each and every one. Instead let’s focus on two tips that will help you be a better RxJS developer.

The first tip is about remaining reactive (all credit goes to Kyle Banks for the name). The centerpiece of RxJS is the Observable. It’s a container that allows you react to values as they change over time. So whenever a new user logs in, we can react to this event. But this means the Observable is not directly the user, it only emits the user. And that... well that can be really confusing.

I remember when I first started with RxJS there was almost this desperation to get the value out of the Observable.

App.js
function App() {
  const btnAdd = document.querySelector("#btnAdd");
  const txtValue = document.querySelector("#txtValue");
  const user$ = getUserSomehow();
  const currentUser = null;
  user$.subscribe(user => {
    currentUser = user;
  });

  btnAdd.addEventListener("click", async clickEvent => {});
}

In this App, which don't worry about what framework it belongs to (it doesn't), I am getting the user back from the Observable and immediately assigning it to a local property. This way when a user creates a new task I can create a "Task" object and save it to my backend. You all know I always use Firebase for this, but I'm just trying to make it generic.

App.js
function App() {
  const btnAdd = document.querySelector("#btnAdd");
  const txtValue = document.querySelector("#txtValue");
  const user$ = getUserSomehow();
  const currentUser = null;
  user$.subscribe(user => {
    currentUser = user;
  });

  btnAdd.addEventListener("click", async clickEvent => {
    const taskName = txtValue.value;
    const newTask = { taskName, ...currentUser };
    const repsonse = await fetch("/tasks", {
      method: "POST",
      body: JSON.stringify(newTask)
    });
    console.log(response);
  });
}

This local value assignment creates a problem. The user observable is asynchronous, so it’s fickle to rely on the local property having a value at any time. I’m also creating unnecessary overhead, having two properties to represent one value. Not only is this messy, but immediately discharging the value out of the observable breaks reactivity. Keeping the value in the observable gives me more predictable behavior and maintains reactivity.

RxJS pushes new values that emit from the Observable and this is where we can keep our reactivity. Let’s reactively create a new task.

The first thing to do is to identify what in our code is reactive. What elements of this code respond to events?

  • The logged in user
  • The button click
  • The input from the textbox

App.js
function App() {
  const user$ = getUserSomehow();
  const click$ = fromEvent(btnAdd, "click");
  const textValue$ = fromEvent(txtValue, "input").pipe(
    pluck("target", "value")
  );
  // Get the latest text value when the button is clicked
  const clickWithText$ = click$.pipe(
    withLatestFrom(textValue$, (_, text) => text)
  );

  // ...
}

Now we have encapsulated the initial three reactive sources. Let's use these sources to create a task. The first step to remaining reactive is to use an operator to create the new object. In this case, the map operator.

App.js
function App() {
  const user$ = getUserSomehow();
  const click$ = fromEvent(btnAdd, "click");
  const textValue$ = fromEvent(txtValue, "input").pipe(
    pluck("target", "value")
  );
  const clickWithText$ = click$.pipe(
    withLatestFrom(textValue$, (_, text) => text)
  );

  const task$ = taskName => user$.pipe(map(user => ({ taskName, ...user })));
}

Every operator takes an observable in and returns a new observable out. So now we have a task observable. Now we just need to send it to the backend. We can keep the reactivity going.

App.js
function App() {
  const user$ = getUserSomehow();
  const click$ = fromEvent(btnAdd, "click");
  const textValue$ = fromEvent(txtValue, "input").pipe(
    pluck("target", "value")
  );
  const clickWithText$ = click$.pipe(
    withLatestFrom(textValue$, (_, text) => text)
  );

  const task$ = taskName => user$.pipe(map(user => ({ taskName, ...user })));

  const postTask = task =>
    fetch("/tasks", {
      method: "POST",
      body: JSON.stringify(task)
    });
}

The postTask function expects a task and returns the fetch() request. Now we just need to combine all these reactive pieces together.

App.js
function App() {
  const user$ = getUserSomehow();
  const click$ = fromEvent(btnAdd, "click");
  const textValue$ = fromEvent(txtValue, "input").pipe(
    pluck("target", "value")
  );
  const clickWithText$ = click$.pipe(
    withLatestFrom(textValue$, (_, text) => text)
  );

  const task$ = taskName => user$.pipe(map(user => ({ taskName, ...user })));

  const postTask = task =>
    fetch("/tasks", {
      method: "POST",
      body: JSON.stringify(task)
    });

  const createTask$ = clickWithText$.pipe(
    switchMap(task$),
    switchMap(postTask),
    switchMap(response => response.json())
  );

  createTask$.subscribe(result => console.log(result));
}

Now this may look a little convoluted, but this code reacts to multiple asynchronous events and handle it in one callback. Remaining reactive is an important part of writing effective RxJS code. But another important piece is making sure your code is reusable.

Now this pattern is more about code cleanliness than anything else. So while it's simple, it's an important habit to maintain. What problem does custom creators solve?

The code we wrote before has a few loose pieces.

App.js
function App() {
  // 1
  const user$ = getUserSomehow();

  btnAdd.addEventListener("click", async clickEvent => {
    // 2
    const taskName = txtValue.value;
    // 3
    user$
      .pipe(
        // 4
        map(user => ({ taskName, ...user })),
        // 5
        mergeMap(task =>
          /* 6 */ fetch("/tasks", {
            method: "POST",
            body: JSON.stringify(task)
          })
        ),
        // 7
        take(1),
        // 8
        mergeMap(res => res.json())
      )
      .subscribe();
  });
}

Let's count the number of pieces. We have the user$ observable. The taskName. Beginning the .pipe. The map function to create the new task. The task itself. Remembering to flatten with mergeMap. The fetch call. The take(1) operator to trigger the unsubscribe. And lastly, the mergeMap of the JSON response. That's about 8 different aspects to get right. Imagine if this needed to be used elsewhere? That could get messy. That's one problem a Custom Creator solves.

I'll probably end up saying this a 100 times, every RxJS operator takes an observable in and returns a new observable out. It allows for composition and it's a major philosophy of the library. Custom creators are just functions that take in arguments and return a new observable.

index.js
function postTask$(user$, taskName) {
  return user$.pipe();
}

This is where you can do the guts of your work.

index.js
function postTask$(user$, taskName) {
  return user$.pipe(
    map(user => ({ taskName, ...user })),
    mergeMap(
      task =>
        fetch("/tasks", {
          method: "POST",
          body: JSON.stringify(task)
        }),
      take(1),
      mergeMap(res => res.json())
    )
  );
}

Usually I store this in a separate file. This makes it easier to test and reuse elsewhere. Then back in the component I can do my favorite thing: delete code.

index.js
function App() {
  const user$ = getUserSomehow();

  btnAdd.addEventListener("click", async clickEvent => {
    const taskName = txtValue.value;
    postTask$(user$, taskName).subscribe(response => {
      console.log("do something to confirm the save.");
    });
  });
}

Now the component code is more concise and focused. Was this pattern just simply "Hey, create helper functions so you don't write the same code all over the place?" Yes. Yes it was. But I'm sure you've experienced before how easy it can be to not follow this simple pattern and pay for it down the line. This concept isn't advanced, but having the discipline is. But on to the last pattern.

If there's one pattern name I have some faith in, it's this one. The reducer pattern is right out of the redux world. If you're familiar with redux, you'll pick this one right up.

We're going to switch gears for this example. Instead of a "tasks" app we're going to build a "counter" app. This is effectively the "Hello, World" of redux style apps.

Many times we have multiple "events" that affect the state of our application. In this basic "counter" app, think about a "incrementing", "decrementing", and "reseting". Each click will change the amount of tasks in the app's state.

index.js
const qs = selector => document.querySelector.bind(document)(selector);

function App() {
  const btnIncrement = qs("#btnIncrement");
  const btnDecrement = qs("#btnDecrement");
  const lblCount = qs("#lblCount");

  // Make sure the count returns as a number
  const count = () => parseInt(lblCount.textContent, 10);

  btnIncrement.addEventListener("click", click => {
    lblCount.textContent = count() + 1;
  });

  btnDecrement.addEventListener("click", click => {
    lblCount.textContent = count() - 1;
  });

  btnReset.addEventListener("click", click => {
    lblCount.textContent = 0;
  });
}

Now, I want to make something clear. If you're just building a "counter" app, then using RxJS or Redux is likely too much. The arguments for using a Redux system tend to be that your code is easily testable and can scale as more features are added. This example is just to highlight the concepts and provide a base for applying them for more complex situations.

But back to the pattern. The problem is making sure that these state changes are applied consistently across the app. The count is modified in three different places across the app. This can get messy as other properties are added to the app's state. The Redux solution to issue an intent to modify the app's state and then have a single location where it is modified.

This is where RxJS can simplify things. Click event listeners make great observables since they are a stream of clicks.

index.js
const qs = selector => document.querySelector.bind(document)(selector);

function App() {
  const btnIncrement = qs("#btnIncrement");
  const btnDecrement = qs("#btnDecrement");
  const lblCount = qs("#lblCount");

  // Make sure the count returns as a number
  const count = () => parseInt(lblCount.textContent, 10);

  const increment$ = fromEvent(btnIncrement, "click").pipe(
    map(click => ({ type: "INCREMENT" }))
  );

  // ... we'll change the others later
}

The increment$ observable takes a click event and turns it into an object that issues an intent to increment the counter state. And if we get clever, a creator will simplify all three actions.

index.js
function clickAction$(button, type) {
  return fromEvent(button, "click").pipe(map(click => ({ type })));
}

This creator takes in the button and the "type" of action to emit.

index.js
const qs = selector => document.querySelector.bind(document)(selector);

function App() {
  const btnIncrement = qs("#btnIncrement");
  const btnDecrement = qs("#btnDecrement");
  const lblCount = qs("#lblCount");

  // Make sure the count returns as a number
  const count = () => parseInt(lblCount.textContent, 10);

  const increment$ = clickAction$(btnIncrement, "INCREMENT");
  const decrement$ = clickAction$(btnDecrement, "DECREMENT");
  const reset$ = clickAction$(btnReset, "RESET");

  const actions = [increment$, decrement$, reset$];
}

Now we have all the power to emit all clicks as actions. The trick is now to merge them all together into one observable that emits for each and every type of click.

index.js
const actions = [increment$, decrement$, reset$];
const actions$ = merge(...actions);
actions$.subscribe(action => {
  console.log("Action Type: ", type);
});

In Redux all state changes are applied in a reducer function. It function arguments are the current state and the intent to modify state, the action.

index.js
function reduce(state, action) {
  switch (action.type) {
    case "INCREMENT":
      return state + 1;
    case "DECREMENT":
      return state - 1;
    case "RESET":
      return 0;
    default:
      return state;
  }
  return state;
}

RxJS has a really simple way of using this reduce method. The scan operator is like a reduce method that happens over time. Whenever a click occurs, we can dispatch that action and apply it to the reducer method with the scan operator.

index.js
actions$.pipe(scan((state, action) => reduce(state, action)));

We can even start with an initial piece of state using the startWith() operator.

index.js
const actions = [increment$, decrement$, reset$];
const actions$ = merge(...actions)
  // start the app state at zero
  .pipe(startWith(0));
actions$
  .pipe(scan((state, action) => reduce(state, action)))
  .subscribe(state => {
    console.log("After a click the count is ", state);
  });

Now let's put it all together.

index.js
const qs = selector => document.querySelector.bind(document)(selector);

function App() {
  const btnIncrement = qs("#btnIncrement");
  const btnDecrement = qs("#btnDecrement");
  const lblCount = qs("#lblCount");

  // Make sure the count returns as a number
  const count = () => parseInt(lblCount.textContent, 10);

  const increment$ = clickAction$(btnIncrement, "INCREMENT");
  const decrement$ = clickAction$(btnDecrement, "DECREMENT");
  const reset$ = clickAction$(btnReset, "RESET");

  const actions = [increment$, decrement$, reset$];

  const actions$ = merge(...actions).pipe(startWith(initialState));
  actions$
    .pipe(scan((acc, curr) => reduce(acc, curr)))
    .subscribe(state => (lblCount.textContent = state));
}

Just like that we’ve created an entire Redux style system by just using RxJS.

RxJS can feel overwhelming at times. There's dozens of operators and it feels like you have to know them all, but you don't. Operators are just tools. Learn the basics: map(), filter(), and mergeMap(). And then after that you can learn everything else with a quick google search.

The goal isn't to learn every single operator in RxJS. The goal is to know have a flexible library for creating your own solutions.