0

I've followed this guide on how to cancel HTTP requests with the switchMap Operator Guide

I have following Component in my Angular project

import { Component, OnInit } from '@angular/core';
import { BehaviorSubject, Observable, forkJoin, skip, switchMap } from 'rxjs';
import { Todo } from 'src/app/model/todo.model';
import { User } from 'src/app/model/user.model';
import { TodoService } from 'src/app/service/todo.service';
import { UserService } from 'src/app/service/user.service';

@Component({
  selector: 'app-todo',
  templateUrl: './todo.component.html',
  styleUrls: ['./todo.component.scss']
})
export class TodoComponent implements OnInit{

  todos: Todo[] = [];
  users: User[] = [];

  fetchTodos$ = new BehaviorSubject<Observable<Todo[]>>(null);
  fetchUsers$ = new BehaviorSubject<Observable<User[]>>(null);

  public constructor(
    private todoService: TodoService,
    private userService: UserService
  ){}

  ngOnInit(): void {
    this.fetchTodos$.pipe(
      skip(1),
      switchMap(innerObservable => innerObservable)
    )
    .subscribe(todos => {
      this.todos = todos;
    });

    this.fetchUsers$.pipe(
      skip(1),
      switchMap(innerObservable => innerObservable)
    )
    .subscribe(users => {
      this.users = users;
    });

    this.loadData();
  }

  loadData(){
    const getAllTodosObservable = this.todoService.getAllTodos();
    this.fetchTodos$.next(getAllTodosObservable);

    const getAllUsersObservable = this.userService.getAllUsers();
    this.fetchUsers$.next(getAllUsersObservable);

    forkJoin([this.fetchTodos$, this.fetchUsers$])
    .subscribe(_ => {
      this.processTodos();
    });
  }

  //Only process todos when all todos and users have been loaded
  processTodos(){
    console.log("Processing todos...")
  }
}

The problem is that forkJoin subscribe will never be called because the BehaviourSubjects are not completed.

forkJoin([this.fetchTodos$, this.fetchUsers$])
    .subscribe(_ => {
      this.processTodos();
  });

How would I achieve calling processTodos() when both requests are done without losing the functionality of HTTP cancellation?

I tried calling complete() on the BehaviourSubjects when requests were done like so

ngOnInit(): void {
    this.fetchTodos$.pipe(
      skip(1),
      switchMap(innerObservable => innerObservable)
    )
    .subscribe(todos => {
      this.todos = todos;
      this.fetchTodos$.complete();
    });

    this.fetchUsers$.pipe(
      skip(1),
      switchMap(innerObservable => innerObservable)
    )
    .subscribe(users => {
      this.users = users;
      this.fetchUsers$.complete();
    });

    this.loadData();
  }

This will actually call the subscribe method of the forkJoin, but only once and not more.

I've also tried using combineLatest instead of forkJoin like this

loadData(){
    const getAllTodosObservable = this.todoService.getAllTodos();
    this.fetchTodos$.next(getAllTodosObservable);

    const getAllUsersObservable = this.userService.getAllUsers();
    this.fetchUsers$.next(getAllUsersObservable);

    combineLatest([this.fetchTodos$, this.fetchUsers$])
    .subscribe(_ => {
      this.processTodos();
    });
  }

The problem with that is, processTodos() will be called twice due to two values being emitted.

Last thing I tried was to use forkJoin() with the HTTP observables directly instead of the behaviour subjects like following

loadData(){
    const getAllTodosObservable = this.todoService.getAllTodos();
    this.fetchTodos$.next(getAllTodosObservable);

    const getAllUsersObservable = this.userService.getAllUsers();
    this.fetchUsers$.next(getAllUsersObservable);

    forkJoin([getAllTodosObservable, getAllUsersObservable])
    .subscribe(_ => {
      this.processTodos();
    });
  }

Here I have to problem that the HTTP requests are being sent twice because of two subscriptions. Subscriptions in ngOnInit:

ngOnInit(): void {
    this.fetchTodos$.pipe(
      skip(1),
      switchMap(innerObservable => innerObservable)
    )
    .subscribe(todos => {
      this.todos = todos;
      this.fetchTodos$.complete();
    });

    this.fetchUsers$.pipe(
      skip(1),
      switchMap(innerObservable => innerObservable)
    )
    .subscribe(users => {
      this.users = users;
      this.fetchUsers$.complete();
    });

    this.loadData();
  }

3 Answers 3

2

I left the original answer because it is useful as an answer to the original defined question.

Adding the requirement of using switchMap, the revised code is as follows:

export class TodoService {
  toDoUrl = `https://jsonplaceholder.typicode.com/todos`;
  UserUrl = `https://jsonplaceholder.typicode.com/users`;

  constructor(private httpClient: HttpClient) { }

  doLoading = new BehaviorSubject<Boolean>(true);
  doLoading$ = this.doLoading.asObservable();

  allData$ = this.doLoading$.pipe(
    switchMap(() => this.httpClient.get<Todo[]>(this.toDoUrl).pipe(
      combineLatestWith(this.httpClient.get<User[]>(this.UserUrl))
    )),
    map(([todos, users]) => this.processTodos(todos, users))
  )

  processTodos(todos: Todo[], users: User[]) {
    // whatever code here
    console.log("processing");
    return { todos, users };
  }

  loadData() {
    this.doLoading.next(true);
  }

}

Here the code sets up one single BehaviorSubject to define when to do the loading. It then uses a switchMap to make the http get cancellable.

The component code then looks like this:

export class TodoComponent {
  todos: Todo[] = [];
  users: User[] = [];

  sub = this.todoService.allData$.subscribe(
    alldata => {
      this.todos = alldata.todos;
      this.users = alldata.users;
    });

  public constructor(
    private todoService: TodoService,
    private userService: UserService
  ) {}

  loadData() {
    this.todoService.loadData();
  }
}

The resulting Stackblitz is here: https://stackblitz.com/edit/angular-ivy-tnte6q

0

It seems you tried all combinations except for one.

It should work if you use combineLatest with the direct data streams (instead of the BehaviorSubjects).

Something like this:

  // Get all posts
  // Get all users
  postData$ = this.http.get<Post[]>(this.postUrl);
  userData$ = this.http.get<User[]>(this.userUrl);

  // Combine them.
  // Each of these will only emit once
  // The combineLatest won't emit until both streams have emitted.
  sub = combineLatest([
    this.postData$,
    this.userData$
  ]).subscribe(
    ([posts, users]) => this.processTodos(posts, users)
  )

  constructor(private http: HttpClient) {}

  processTodos(posts: Post[], users: User[]) {
    // whatever code here
    console.log("processing");
  }

I have an example here: https://stackblitz.com/edit/angular-posts-user-combinelatest-deborahk

3
  • Thanks for the quick response, I've tried this approach and it does fire the subscribe method but I lose the functionality of HTTP request cancellation. If I load the data several times in a short time period, all requests are being processed, I only need the most recent requests to be processed and previous ones to be cancelled. I have my example on here: stackblitz.com/edit/…
    – Unrated
    Commented Jul 15, 2023 at 10:56
  • In general, you could try piping something like .pipe(takeUntil(this.cancel$)) to the stream you want to interrupt, where cancel$ = new Subject<void>() and in the template <button (click)="cancel$.next()">. However you'd want to hide it in an inner observable (to prevent combineLatest from dying), or to pipe it to each of the two requests. It can be tricky to maintain reliable UI data display after cancellation - data get get out of sync. Definitely too complex to fit into an SO comment :)
    – user776686
    Commented Jul 20, 2023 at 18:55
  • I provided a new answer that uses switchMap.
    – DeborahK
    Commented Jul 26, 2023 at 23:16
0

Instead of the two different BehaviorSubject, You could just use one Subject, and use switchMap on its emits like below

fetchTodosAndUsers$ = new Subject<void>();

ngOnInit(): void {
  this.fetchTodosAndUsers$.pipe(
    startWith(() => {}),
    switchMap(() => forkJoin([
      this.todoService.getAllTodos(),
      this.userService.getAllUsers()
    ])),
    map(([todos, users]) => {
      this.todos = todos;
      this.users = users;
      this.processTodos();
    })
   ).subscribe(); 
 }

loadData() {
  this.fetchTodosAndUsers$.next();
}

//Only process todos when all todos and users have been loaded
processTodos() {
  console.log('Processing todos...');
}

If you want to separate the two different calls

ngOnInit(): void {
  this.fetchTodo$.pipe(
    startWith(() => {}),
    switchMap(() => this.todoService.getAllTodos()),
    map((todos) => {
      this.todos = todos;
      this.processTodos();
    })
  ).subscribe();

  this.fetchUsers$.pipe(
    startWith(() => {}),
    switchMap(() => this.userService.getAllUsers()),
    map((users => {
      this.users = users;
      this.processUsersOrWhatEver();
    })
  ).subscribe();
}

loadData() {
  this.fetchTodo$.next();
  this.fetchUsers$.next();
}
2
  • Thanks for the answer, how would this work if I want them to be decoupled. Right now I'm always stuck with fetching Todos and Users at one time. In future I might only want to call processTodos() if only the getAllTodos() request has been completed. My question is, how can I build this dynamically? Thanks in advance!
    – Unrated
    Commented Jul 20, 2023 at 15:06
  • @Unrated, The logic you described says that you want to call both services on component ngOnInit(). So they are coupled in that sense. Thought you can use two subjects and call next() method on both of them in loadData() method, as my updated answer
    – fujy
    Commented Jul 20, 2023 at 15:37

Not the answer you're looking for? Browse other questions tagged or ask your own question.