RxJS
RxJS is a library for reactive programming using Observables. It makes it easier to compose asynchronous or callback-based code. It is part of the ReactiveX collection of open-source libraries (RxJava, RxSwift, Rx.NET, RxScala...). They all share a very similar API, which means transferring Rx skills from one language to another is very easy.
The ReactiveX Observable
model allows you to treat streams of asynchronous events with the same sort of simple, composable operations that you use for collections of data items like arrays, operations such as filter
, map
, flatMap
, reduce
and many more. It frees you from tangled webs of callbacks, and thereby makes your code more readable and less prone to bugs.
The library provides the Observable
type as well as utility functions to:
- convert existing code linked to async operations into observables
- iterating through the values in a stream
- mapping values to different types
- filtering streams
- catching errors
- composing multiple streams
This chapter will not go in depth about the concepts of Rx, you can refer to the official documentation to that purpose. However it will illustrate common situations encountered in Angular applications.
WARNING
This chapter is based on RxJS v7, the default version used by Angular 15.
The Observable
The previous chapter showed you the basic usage of Observables. Here is what we saw in it:
- Observables are returned by the
HttpClient
service methods. - Observables are only executed once subscribed to
- The subscribe method takes one object with three callbacks (next, error and complete) as a parameter.
First, let's illustrate the second and third points:
The Observable
fires 3 next notifications followed by a complete notification. Observables either stop emitting values because they error out or because they complete. The two events are mutually exclusive.
Observable creation
In an Angular app, you will rarely have to create observables yourself. Most of the time you will handle streams that the framework created for you such as handling http call results, listening to router events or listening to form events when using the ReactiveFormsModule
(the name of the module gives away its reactive nature). However, you may encounter situations where it may fall on you to create a stream. Here are a the main ways it could happen.
- interval (marble / documentation)
interval(1000)
.subscribe({ next: n => {
console.log(`It's been ${n + 1} seconds since subscribing!`)
} })
- promise transformation (marble / documentation)
const promise1 = new Promise((resolve, reject) => {
setTimeout(() => {
resolve('foo')
}, 2000)
})
from(promise1).subscribe({
next: message => console.log(`The delayed message is '${message}'`),
error: error => console.log(this.promiseMessage = 'There\'s been an error'),
complete: () => console.log('Completed')
})
- browser event transformation (documentation)
fromEvent(document, 'click').subscribe({ next: _ => console.log('Clicked!') })
The following Stackblitz let's you play around with those examples:
Filtering and mapping
Similar to the well known Array.prototype.map
function, the map
operator (marble / documentation) applies a projection to each value and emits that projection in the output Observable
.
Let's transform the previous example about the click event on the document so that it prints the coordinates of the click:
Pipe
pipe()
is a function used to compose operators such as map()
, filter()
, take()
... Operators are applied to the stream in the order they are passed to the pipe function
Similar to the Array.prototype.filter
function, the filter
operator (marble / documentation) filters items emitted by the source Observable by only emitting those that satisfy a specified predicate.
from([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(filter(data => data % 2 === 0))
.subscribe({ next: data => console.log(data) })
This snippet will print:
2
4
6
8
Exercise: Using the previous Stackblitz about the map operation, only update the message for clicks made within the coordinates between 0-100 on the x and y axis.
Error handling
Like seen previously, the subscribe
method takes an object that has an error
callback. When the Observable
errors out, it is executed instead of the next
callback and the Observable
stops emitting.
this.userService.getUsers()
.subscribe({
next: users => console.log(`The following users exist in the system: ${users}`),
error: error => console.log(`An error occurred: ${error}`),
complete: () => console.log('Completed')
})
This behaviour is not always the desired one. RxJS provides a catchError
operator (documentation) to deal with the error in a "silent" way, meaning that it is the next
callback and not the error
one that is called.
Let's imagine you expect an array of users from the backend but it sends you back a 404 HTTP error, you can use catchError
to return an empty array instead, and keep throwing an error for other HTTP errors.
this.userService.getUsers()
.pipe(
catchError(error => {
if ((error as HttpErrorResponse).status === 404){
return of([])
}
return throwError(error)
})
)
.subscribe({
next: users => console.log(`The following users exist in the system: ${users}`),
error: error => console.log(`An error occurred: ${error}`),
complete: () => console.log('Completed')
})
Question: What will be printed to the console in case of a 404 error returned by the backend? In case of a 500?
Stream composition
Streams can be composed for many purposes. To study this notion in a simpler environment, we will only study it in the context of backend calls.
Having to chain backend calls is quite common. For example, the user has just edited a resource and you want your page to display its updated details. Some backend do send back the details of the updated resource in the body of the edit call response. However, some just send back a 200 or 204 HTTP response without a body. This means that the edit call and detail call need be chained to update the UI. RxJS provides several operators to chain events in a declarative manner. We will use the switchMap
operator (documentation / marble) in this case. You can try it in the Stackblitz below (click anywhere on the preview and see what happens in the console, click again and see how things change in the console).
Question: From this example, what do you learn on the way switchMap works? (Having a look at the marble diagram can help)
Let's adapt the above example to the context of chained backend calls:
Another useful operator to combine calls is exhaustMap
(documentation). While switchMap
cancels the subscription to the previous projected Observable, exhaustMap ignores new events as long as the previous projected Observable hasn't completed.
Don't nest subscribes
A very common pitfall with RxJS is to nest subscribes. RxJS provides plenty of operators so that you won't ever have to mix synchronous and asynchronous code. Why shouldn't you mix them ?
- it is spaghetti code that becomes hard to read and maintain as it doesn't benefit from the declarativeness of RxJS anymore,
- it makes it hard to compose observables,
- it causes memory leaks.
Most often it is done without realising. For instance, inside the next callback of a subscribe you call a method that has a subscribe. That is nesting subscribes.
Example of what you should NOT do:
Unsubscribing
For the moment we've seen how to subscribe to Observables. To avoid memory leaks with long-lived Observables, you should unsubscribe from them.
Let's reuse our previous routing example to illustrate how memory leaks can happen. An interval Observable is created in the ngOnInit method of the book details component. Navigate to the details of a book and watch the console. Then leave the page and come back. What happens in the console? What does it mean?
When should you unsubscribe? If you have no certainty the Observable
will complete or error out, you should manually unsubscribe from it. The HttpClient
always completes the Observable it returns after having received a response. So, theoretically, if you only encounter Observables from the HttpClient
, you do not have to take care of unsubscribing. In other cases, be safe and unsubscribe.
How to unsubscribe? There are two ways:
- The
subscribe
method returns aSubscription
object that can be disposed of by calling the unsubscribe method on it when desired, usually when the component it lives in is destroyed. - Using the
takeUntil
operator (marble / documentation) and aSubject
which is a special kind ofObservable
on which it is possible to call the next(), error() and complete() methods.
The second way is easier to maintain when your code base grows so it is the one you should favour using.
Let's fix the memory leak of the previous example. To demonstrate both techniques, the interval Observable has been added to the author details component as well:
The async pipe
Subscribing to an Observable
and saving the value in a property of the component is not the only way to display the values from the Observable
. Angular provides a pipe to which the Observable
can be passed directly.
export class AppComponent {
counter: Observable<number>
ngOnInit(): void {
this.counter = interval(1000)
}
}
<p>{{counter | async}}</p>
For objects, an alternative syntax exists to avoid repetitively using the async pipe to access each field:
<p>{{(user | async)?.firstName}}</p>
<p>{{(user | async)?.lastName}}</p>
<p>{{(user | async)?.age}}</p>
<!-- OR -->
<ng-container *ngIf="user | async as user">
<p>{{user.firstName}}</p>
<p>{{user.lastName}}</p>
<p>{{user.age}}</p>
</ng-container>
export class AppComponent {
user: Observable<User>
ngOnInit(): void {
this.user = interval(1000).map(_ => new User('John', 'Snow', 28))
}
}
interface User {
firstName: string
lastName: string
age: number
}
Since no subscription is made, unsubscribing is not necessary. The async pipe takes care of it for us.
Summary
Key Takeaways
- Unsubscribe or use the async pipe
- Never nest subscribes, find the right operators instead
Here is a table of the most commonly used operators.
Area | Operators |
---|---|
Creation | from, of, fromEvent, interval |
Filtering | filter, takeUntil, take, distinctUntilChanged |
Transformation | switchMap, exhaustMap, concatMap, mergeMap, map |
Combination | combineLatest, concat, merge, startWith, withLatestFrom, zip |
Utility | tap, finalize, catchError |
There also exists two Observable
constants: NEVER
(emits neither values nor errors nor the completion notification) and EMPTY
(emits no items and immediately emits a complete notification). EMPTY
is quite useful as a return value of the catchError
operator.
To help you decide which operator fits your use case, the RxJS documentation provides an operator decision tree. It also helps with just discovering the many operators RxJS provides.
Practical work
- In the
film-search.component.ts
file, stop subscribing to the search response and use an async pipe instead in the template. - Even though it is not strictly necessary in those cases, unsubscribe from the login and register calls in the
LoginFormComponent
using theSubject
technique.