A comprehensive guide for managing, caching, transforming API data, handling errors, slowing down, and aborting API requests using Rx.js.

Simple API Call

API calls in Angular are handled in services, which represent a layer between UI components and the backend. Assuming you have an API that retrieves User data, you’d generate a service and create method that invokes the API.

> ng g service users
@Injectable({
providedIn: 'root',
})
export class UsersService {

constructor(private http: HttpClient) {}

}

The services that talk to APIs inject the HttpClient class that opens a door to the outside. To get started, define a method getUserData() that will talk to the API.

interface IUserData {
id: number;
username: string;
email: string;
age: number;
}

@Injectable({
providedIn: 'root',
})
export class UsersService {

constructor(private http: HttpClient) {}

getUserData(id: number): Observable<IUserData> {
return this.http.get<IUserData>(`http://localhost:3000/api/users/${id}`)
}
}

The method getUserData() is public and is called outside the service to connect to the backend. Since HttpClient the works with Observables, the output getUserData() will be wrapped into an Observable, that you need to call subscribe on the method to extract the user data:

@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrl: './app.component.scss'
})
export class AppComponent implements OnInit {

constructor(private readonly usersService: UsersService) { }

ngOnInit() {

this.usersService.getUserData(1)
.subscribe((userData: IUserData) => {
console.log('userData :>> ', userData); // { id: 1, email: '...' }
})

}
}

Map

The simplest transformation to apply is to map the API response to get the desired output. For example, retrieve only username:

this.usersService.getUserData(1)
.pipe(
map((userData: IUserData) => userData.username)
)
.subscribe((username: string) => {
console.log('username :>> ', username); // 'Mirza'
})

Or retrieve several properties:

this.usersService.getUserData(1)
.pipe(
map((userData: IUserData) =>
({ username: userData.username, age: userData.age })
)
)
.subscribe((mappedData) => {
console.log('mappedData :>> ', mappedData);
// { username: 'Mirza', age: 30 }
})

Filter & Skip

Although this is a conditional operator, the filter is commonly used with map to filter out unwated data. For example, if you have an API that returns a list of Users:

@Injectable({
providedIn: 'root',
})
export class UsersService {

constructor(private http: HttpClient) {}

getAllUsers(): Observable<IUserData[]> {
return this.http.get<any>(`http://localhost:3000/api/users`)
}
}

You can use the filter operator to retrieve only specific users:

this.usersService.getAllUsers()
.pipe(
map((userData: IUserData) =>
({ username: userData.username, age: userData.age })
),
filter(user => user.age >= 18) // retrive users 18 years old or older
)
.subscribe((mappedData) => {
console.log('mappedData :>> ', mappedData);
// [{ username: 'Mirza', age: 30 }, ...]
})

Alternatively, you can use the skip operator to filter users without proper access to the pipeline.

this.usersService.getAllUsers()
.pipe(
skipWhile(user => user.role !== 'ADMIN')
map((userData: IUserData) =>
({ username: userData.username, age: userData.age })
),
)
.subscribe((mappedData) => {
console.log('mappedData :>> ', mappedData);
// [{ username: 'Mirza', age: 30 }, ...]
})

Concat Map & Merge Map

Often you find yourself in a situation where the result of one API needs to be used to invoke another.

getUserPost() {
this.getCurrentUser() // first API
.pipe(
concatMap((user) => this.service.getAllPosts(user) // second API
)
.subscribe((post?: Post) => {
this.myPost = post;
})
}

private this.service.getAllPosts(user): Observable<Post> {
return this.http.get<Post>(`http://localhost:3000/api/posts`)
.pipe(
map((posts: Posts[]) => posts.find(post => post.userId === user.id))
));
}

The concatMap operator will ensure that API calls occur sequentially. It waits for the previous observable to complete before processing the next API call.

But if the ordering is not revelant, you can swap the concatMap with mergeMap operator that handles Observables concurrently. In the example below, I retrieve posts for multiple users:

getUserPosts() {
this.getAllUsers() // first API
.pipe(
mergeMap(users => from(users)), // create an observable from users array
mergeMap(user => this.service.getAllPosts(user)) // second API for each user concurrently
)
.subscribe((post?: Post) => {
// handle each post
console.log(post);
});
}

private this.service.getAllPosts(user): Observable<Post> {
return this.http.get<Post>(`http://localhost:3000/api/posts`)
.pipe(
map((posts: Posts[]) => posts.find(post => post.userId === user.id))
);
}

In scenarios involving multiple parallel requests, mergeMap can provide a significant performance boost.

SwitchMap

This operator is suitable for orchestrating requests as it comes with cancelling effect. In this example below, I’m using the switchMap operator map the input value to search API, but also to cancel ongoing search request whenever the text input changes.

fromEvent(document.getElementById('text-input'), 'input') // element, event
.pipe(
map((event) => (event.target as HTMLInputElement).value),
switchMap((searchText) => this.searchService.search(searchText)),
)
.subscribe(searchResponse => {
console.log(searchResponse);
});

On each emission (text-input change) the previous inner observable (this.searchService.search) is canceled and the new observable is subscribed.

Cancel Long-Running Requests

Assuming you have an ongoing request that takes long to complete

// Express.js backend server

app.get('/api/really-slow-api', (req, res) => {

setTimeout((() => {
res.send({ message: 'Hello World!' });
}), 5000) // wait for 5 seconds

});

You can make use of the timeout operator to terminate the request after waiting for a specified time:

// Angular frontend app

fetchData() {
return this.http.get<any>('localhost:3000/api/really-slow-api')
.pipe(
timeout(3000) // wait for 3 seconds
)
.subscribe();
}

If the timeout expires before the request finishes, the Observable will throw an exception that can be caught using the TimeoutError inside the catchError operator.

fetchData() {
return this.http.get<any>('localhost:3000/api/really-slow-api')
.pipe(
timeout(3000),
catchError((err: TimeoutError) => {
console.log('The request is cancelled due to timeout!');
return [];
})
)
.subscribe();
}

The Observable pipeline breaks when an error occurs. That’s why it’s very important to do proper error handling. In Rx.js, the error is handled using the catchError operator.

fetchData() {
return this.http.get<any>('localhost:3000/api/really-slow-api')
.pipe(
catchError(error => {
/* Handle Error */
});
)
.subscribe();
}

Assuming that you have a server in the back that returns an error message to the client:

// Express.js API

app.get('/api/broken-api', (req, res) => {
res.status(400).send({ yourCustomProperty: 'Something went wrong!' });
});

You can extract this message from the response object in catchError:

    this.service.brokenAPI()
.pipe(
catchError((err: HttpErrorResponse) => {
console.log('err :>> ', err.error.yourCustomProperty);
// err :>> Something went wrong!
return EMPTY;
})
)
.subscribe()

The catchError operator must return an Observable. However, it’s up to you to decide what the output will be.

Return Custom Error

Here I mapped the API error message and status code into my own response object.

    this.service.brokenAPI()
.pipe(
catchError((err: HttpErrorResponse) => {

const customErr = {
statusCode: err.status,
message: err.error.yourCustomProperty
}

return of(customErr);
})
)
.subscribe({
next: (data) => console.log(data),
complete: () => console.log('Completed!')
})

The output will be logged in the next function. The complete function will fire afterwards.

Return EMPTY Observable

The EMPTY keyword is an Observable that emits no items but terminates normally.

    this.service.brokenAPI()
.pipe(
catchError((err: HttpErrorResponse) => {
return EMPTY; // or return []
})
)
.subscribe({
next: (data) => console.log(data), // will not print
complete: () => console.log('Completed!')
})

In this instance, only the complete function will be called.

Return NEVER Observable

This is an Observable that emits no items and does not complete.

this.service.brokenAPI()
.pipe(
catchError((err: HttpErrorResponse) => {
return NEVER;
})
)
.subscribe({
next: (data) => console.log(data), // will not print
complete: () => console.log('Completed!') // will not print
})

Return Nothing

Simply return null from the error handler:

this.service.brokenAPI()
.pipe(
catchError((err: HttpErrorResponse) => {
return of(null);
})
)
.subscribe({
next: (data) => console.log(data), // null
complete: () => console.log('Completed!') // will be printed
})

Error Handling for each Observable

If you have multiple Observables in the pipeline, you can attach .pipe() function on each and handle errors individually:

    of(1, 2, 3)
.pipe(
concatMap(id => this.service.getHobbyData(id)
.pipe(
catchError(err => {
/* Handle Error for this API */
return of(err);
})
)
),
catchError(err => {
/* Handle Global Error */
return of(err);
})
)
.subscribe(console.log)

Handle Various Types of Exceptions

You may come across cases where you’re terminating long-running requests and handling various HTTP Status errors. In those cases, you can expand the catchError operator with each type of exception you want to handle:

// Also creating a custom Error class

class CustomError {
message!: string
}
catchError((err: HttpErrorResponse | TimeoutError | CustomError) => {

if (err instanceof CustomError) {
console.log(err.message);
return [];
}

if (err instanceof TimeoutError) {
console.log({ name: err.name, message: 'The request is cancelled due to timeout!' });
return [];
}

if (err instanceof HttpErrorResponse) {
if (err.status === 400) {
console.log({ statusCode: [err.status], message: 'Check your input!' })
return [];
}

console.log('Internal Server Error!')
}
return [];
})

Handle Different HTTP Status Codes

If you’re creating a web form where users can enter any data, send it to the backend, and receive the response, it’s a good practice to handle various error cases:

catchError((err: HttpErrorResponse) => {

// Bad Request
if (err.status === 400) {
console.log('Invalid data provided. Check your input!')
return [];
}

// Unauthorized / Forbidden
if (err.status === 401 || err.status === 403) {
console.log('It appears that you are not allowed to make this request!')
return [];
}

// Not Found
if (err.status === 404) {
console.log('Data not found!')
return [];
}

// In all other cases
console.log('Internal Server Error!')
return [];
})

Handle API Termination

This is another form of HTTP exception that occurs when the client cannot establish or has lost a connection with the server. In this instance, the property status on the HttpErrorResponse instance will have a value 0, which is not a valid HTTP Status Code.

catchError((err: HttpErrorResponse) => {

if (err.status === 0) {
console.log({ message: 'Unable to connect to the server!' })
return [];
}

console.log('Internal Server Error!')
return [];
})

Manual Error Throwing

If you ever run into a situation where you need to manually throw and then handle the error, Rx.js has you covered.

    of(1, 2, 3, 4, 5)
.pipe(
map(x => {
if (x === 3) {
throw new Error('Not your lucky day!')
}
return x;
}),
catchError((e: Error) => {
return of(e.message); // Not your lucky day!
})
)
.subscribe({
next: (data) => console.log(data),
complete: () => console.log('Completed!')
})

Here I’m using the JavaScript throw to create new Error with my custom error message. The message is read in the catchError, then passed down to be printed in the next function, followed by the complete function.

The Rx.js also has throwError operator that works just like the regular throw, but with the difference is that it returns an Observable.

of(1, 2, 3, 4, 5)
.pipe(
mergeMap(x => {
if (x === 3) {
// Observable<Error>
return throwError(() => new Error('Not your lucky day!'));
}
return of(x); // because mergeMap must return Observable
})
)
.subscribe();

Retry on Failure

The retry operator is used to retry failed requests.

this.service.brokenAPI()
.pipe(
retry(3),
catchError((err: HttpErrorResponse) => {
return EMPTY;
})
)
.subscribe()

In case of a failure, the API will be retried x number of times.
If the number of failed attempts exceeds the specified retry count (e.g. 3), the Observable will go into the catchError operator.
If the API respones successfully before the count runs out, the Observable will be successful.

When retrying failed requests be aware of the Idempotency. Requests like GET, DELETE, PUT are idempotent, because the backend data will not be harmed if you repeat GET, UPDATE or attempt to DELETE the same item. However, POST creates new items and therefore is not idempotent. Hence retries on POST should be avoided.

Schedule Retries

Rx.js also lets you schedule retries. For example, you can set for how long the retry should wait before invoking the API again:

    this.service.brokenAPI()
.pipe(
retry({
count: 5, // execute 5 times
delay: 500, // wait 500 ms between retries
resetOnSuccess: true
}),
)
.subscribe(data => {
console.log('data :>> ', data);
})
}

If you don’t specify the count, the retry will call the API indefinitely:

this.service.brokenAPI()
.pipe(
retry({
delay: 500, // wait 500 ms between retries
resetOnSuccess: true
}),
)
.subscribe(data => {
console.log('data :>> ', data);
})
}

You can think of the side effects as other events you can trigger within the pipeline that will not effect the Observable. The easiest one is to put console logs between operations using the tap operator:

fromEvent(document.getElementById('text-input'), 'input') // element, event
.pipe(
tap(() => console.log('Subscribed to an event.')),
map((event) => (event.target as HTMLInputElement).value),
tap(() => console.log('Extracted the text from the input...')),
switchMap((searchText) => this.searchService.search(searchText)),
)
.subscribe(searchResponse => {
console.log(searchResponse);
});

The tap operator is executed in the same line where it’s called.
It can be used for dispatching actions or toggling a loader on UI. I like to combine the tap and finalize operators to display the loader in between the start and the end of the API request.

fromEvent(document.getElementById('text-input'), 'input') // element, event
.pipe(
map((event) => (event.target as HTMLInputElement).value),
tap(() => this.displayLoader = true), // before API call
switchMap((searchText) => this.searchService.search(searchText)
.pipe(
finalize(() => this.displayLoader = false) // after API call
)
)
)
.subscribe(searchResponse => {
console.log(searchResponse);
});

The finalize the operator is triggered when an Observable completes, successfully or not.

Now let’s learn how to slow down Observable emissions.

Delays

The simplest one is to put a delay operator in the Observable to pause the Observable for specified time. I like to use it to intentionally slow down quick APIs and leave time for loader to appear and disappear.

  getHobbyData(id: number): Observable<IHobbyData> {
return this.http.get<any>(`http://localhost:3000/api/hobbies/${id}`)
.pipe(
delay(250) // 250 ms
)
}

Delays can also be achieved using Rx.js Schedulers.

Debounce Time & Distinct Until Changed

Assuming that you have search functionality where users can search just by typing on the input. In the example below I’m doing just that, but there is one flaw.

fromEvent(document.getElementById('text-input'), 'input') // element, event
.pipe(
map((event) => (event.target as HTMLInputElement).value),
concatMap((searchText) => this.searchService.search(searchText)),
)
.subscribe(searchResponse => {
console.log(searchResponse);
});

The search service is called on each input (for each new letter). It’s highly advisable to slow down the Observable and the best way to do it is to use the debounceTime operator.

fromEvent(document.getElementById('text-input'), 'input') // element, event
.pipe(
map((event) => (event.target as HTMLInputElement).value),
debounceTime(1000), // wait for 1000ms
concatMap((searchText) => this.searchService.search(searchText)),
)
.subscribe(searchResponse => {
console.log(searchResponse);
});

The debounceTime an operator will not allow for the pipeline to proceed until the Observable emissions have stopped for specified time. Once that happens, the operator will emit everything as a single value.
This means that debounceTime will wait for the user to stop typing for x milliseconds and then emit the completed sentence.

fromEvent(document.getElementById('text-input'), 'input') // element, event
.pipe(
map((event) => (event.target as HTMLInputElement).value),
debounceTime(1000),
distinctUntilChanged(),
concatMap((searchText) => this.searchService.search(searchText)),
)
.subscribe(searchResponse => {
console.log(searchResponse);
});

The debounceTime operator is commonly used with distinctUntilChanged operator to prevent the Observable from emitting if the new value is the same as the previous.

API Throttling

Rx.js can be used on the backend to strategy for reduce the network traffic. To get started, set up a basic Express.js server:

> npm init -y
> npm i express rxjs
// app.js

const express = require('express');
const app = express();

app.listen(3000, () =>
console.log('server started on port 3000!'));

Now add a route that handles a GET request:

// app.js

const express = require('express');
const app = express();

app.get('/api/throttle', (req, res) => {
res.send({ message: 'Hello World!' });
});

app.listen(3000, () =>
console.log('server started on port 3000!'));

To peform a load test, install the Loadtest library globally. This library will be used to peform to test the /api/throttle route.

> npm install -g loadtest

Before throttling

Run the server in one terminal (node app.js) and loadtest in another:

> loadtest -c 10 --rps 200 http://localhost:3000/api/throttle

This command sends exactly 200 requests per second with concurrency 10.
These are the results:

Requests: 125, requests per second: 25, mean latency: 2.6 ms
Requests: 125, requests per second: 25, mean latency: 3.3 ms
Requests: 125, requests per second: 25, mean latency: 3.4 ms
Requests: 125, requests per second: 25, mean latency: 1.9 ms
Requests: 125, requests per second: 25, mean latency: 3.5 ms
Requests: 125, requests per second: 25, mean latency: 3.4 ms
Requests: 125, requests per second: 25, mean latency: 3.1 ms
Requests: 125, requests per second: 25, mean latency: 2 ms
Requests: 250, requests per second: 25, mean latency: 3.2 ms
Requests: 250, requests per second: 25, mean latency: 1.7 ms

Target URL: http://localhost:3000/api/throttle
Max time (s): 10
Target rps: 200
Concurrent clients: 8
Running on cores: 8
Agent: none

Completed requests: 2000
Total errors: 0
Total time: 10.077 s
Mean latency: 2.8 ms
Effective rps: 198

Percentage of requests served within a certain time
50% 2 ms
90% 4 ms
95% 5 ms
99% 6 ms
100% 25 ms (longest request)

All requests (2000) have been completed.
Now let’s add Rx.js.
Start by wrapping the endpoint callback function into a Subject.

const { Subject, throttleTime } = require('rxjs');

const subject = new Subject();

app.get('/api/throttle', (req, res) => subject.next([req, res]));

subject
.subscribe(([req, res]) => {
res.send({ message: 'Hello World!' });
});

At this point, nothing has changed. We’ve simply created a route and delagated the Subject to handle the response.
Now let’s apply the throttleTime operator to the Subject.

app.get('/api/throttle', (req, res) => subject.next([req, res]));

subject
.pipe(
throttleTime(333, null, {
leading: true
})
)
.subscribe(([req, res]) => {
res.send({ message: 'Hello World!' });
});

What the throttleTime the operator allows the first value to pass, then blocks all requests for the specified time frame (333 ms), and then repeats.

After throttling

Running the test again:

> loadtest -c 10 --rps 200 http://localhost:3000/api/throttle
Requests: 1, requests per second: 0, mean latency: 3 ms
Requests: 1, requests per second: 0, mean latency: 32 ms
Requests: 3, requests per second: 1, mean latency: 3 ms
Requests: 1, requests per second: 0, mean latency: 3 ms
Requests: 3, requests per second: 1, mean latency: 2.3 ms
Requests: 1, requests per second: 0, mean latency: 4 ms
Requests: 3, requests per second: 1, mean latency: 3.3 ms
Requests: 2, requests per second: 0, mean latency: 3.5 ms
Requests: 4, requests per second: 0, mean latency: 2 ms
Requests: 7, requests per second: 1, mean latency: 1.8 ms
Requests: 4, requests per second: 0, mean latency: 1.5 ms

Target URL: http://localhost:3000/api/throttle
Max time (s): 10
Target rps: 200
Concurrent clients: 1970
Running on cores: 8
Agent: none

Completed requests: 30
Total errors: 0
Total time: 10.07 s
Mean latency: 3.4 ms
Effective rps: 3

Percentage of requests served within a certain time
50% 2 ms
90% 5 ms
95% 5 ms
99% 32 ms
100% 32 ms (longest request)

We can see that the request per second count has dropped significantly and fewer requests have been completed.

Learn how to set up Rx.js on the Express server yourself.

The race conditions occur when two or more asynchronous operations attempt to access the same resource at the same time and cause problems in the system.

RaceWith

This operator is used when you have multiple observables that compete against each other, and you’re only interested in the first one to emit a value. I added three different APIs in the service, each taking some time to complete:

  fetchReallySlowAPI() {
return this.http.get<any>('http://localhost:3000/api/really-slow-api')
}

fetchReallyFastAPI() {
return this.http.get<any>('http://localhost:3000/api/fast-api')
}

fetchThrottleAPI() {
return this.http.get<any>('http://localhost:3000/api/throttle')
}

Inside the component, I put these three against each other using the raceWith operator:

export class AppComponent implements OnInit {

ngOnInit(): void {
const fastAPI$ = this.service.fetchReallyFastAPI();
const slowAPI$ = this.service.fetchReallySlowAPI();
const throttleAPI$ = this.service.fetchThrottleAPI();

slowAPI$.pipe(
raceWith(fastAPI$, throttleAPI$)
)
.subscribe(console.log)

}
}

Whichever observable completes first, that API will be invoked and completed, while the rest will be canceled. You can verify this in the Dev Tools network tab:

Fork Join

This operator is best used when you have a group of observables and only care about the final emitted value of each (like Promise.all()). The forkJoin operator ensures that all APIs set in the pipeline complete and prints the response of each in the order they are set/invoked.

export class AppComponent implements OnInit {

ngOnInit(): void {
const fastAPI$ = this.service.fetchReallyFastAPI();
const slowAPI$ = this.service.fetchReallySlowAPI();
const throttleAPI$ = this.service.fetchThrottleAPI();

forkJoin([fastAPI$, slowAPI$, throttleAPI$])
.subscribe(console.log)

}
}

The output is an array containing final result of each API:

If any request fails, the rest of the requests will be canceled and the Observable will return an error.

Cache APIs

The Observables are designed to be cold in nature. Every time you subscribe, the Observable is recreated for each new subscriber.

What this means is if you’re Observable is invoking an API and there are five subscribers, the API will be invoked for each.

  ngOnInit(): void {
const fastAPI$ = this.service.fetchReallyFastAPI();

fastAPI$.subscribe(console.log);
fastAPI$.subscribe(console.log);
fastAPI$.subscribe(console.log);
fastAPI$.subscribe(console.log);
fastAPI$.subscribe(console.log);

}
}

To prevent this from happening, you can use the share operator that will cache the response between subscribers.

export class AppComponent implements OnInit {

ngOnInit(): void {
const fastAPI$ = this.service.fetchReallyFastAPI()
.pipe(
share()
)
;

fastAPI$.subscribe(console.log);
fastAPI$.subscribe(console.log);
fastAPI$.subscribe(console.log);
fastAPI$.subscribe(console.log);
fastAPI$.subscribe(console.log);

}
}

All five subscribers will get the same API response

But there is only one API call to the backend.

If your API is returning multiple responses (stream), you can swap the share() operator with shareReplay(x) which is used to specify how many responses (x) you want to share between subscribers.

The Observables are streams that won’t terminate unless you tell them to.

First & Take

Assuming you have a stream that returns multiple values over time, you can limit the output to produce only one value:

this.streamService.fetchData()
.pipe(
first() // Take only first value
)
.subscribe()

The Observable will emit once and immediately terminate itself.

Alternatively, you can use the take operator to take as many results as you desire.

this.streamService.fetchData()
.pipe(
take(5) // Take only first five values
)
.subscribe()

TakeUntil

A common practice is to unsubscribe (terminate) the Observable when the view (component) is no longer being used. This can be done using a Subject and ngOnDestroy hook in Angular.

export class AppComponent implements OnInit, OnDestroy {

postsData!: Posts[];
// Creating subject that will be used to unsubscribe
private unsubscribe$ = new Subject<void>();

ngOnDestroy() {
this.unsubscribe$.next();
// Subject will complete when the component is destroyed
this.unsubscribe$.complete();
}

}

The ngOnDestroy hook will be triggered when the component is destroyed, which will cause a Subject to fire and complete.

Don’t forget to put takeUntil the operator inside the Observable pipe. The takeUntil operator waits for a Subject to kick off. Once it’s called, the Observable pipeline will be terminated.

  ngOnInit() {
this.service.getAllPosts()
.pipe(
// Observable will remain active until this subject completes
takeUntil(this.unsubscribe$)
)
.subscribe(data => this.postsData = data)
}

So once the component is destroyed and the subject completes, does the Observable stream.

Wrapping Up

The Rx.js is a powerful utility library that allows manage APIs however we desire. This is just a small slice of everything it has to offer