2

what I'm trying to achieve is this (with Angular 2/Typescript):

  • Observable A produces stream of events.

  • For each event of Observable A, make 8 different http calls. (8 switchmaps)

  • After all of the 8 requests return, do something (subscribe to zip of 8 switchmaps).

  • Repeat 8 requests for each event of Observable A (taken care of by switchmap and zip)

Code: (full code at https://plnkr.co/edit/44yqw0RYzC7v1TFACMx1)

let source = Observable
.interval(5000)
.take(100);

let requests = [];

for(let i=0; i<8;i++) {
  let request = source.switchMap(x=> http.get('https://jsonplaceholder.typicode.com/users/'+(i+1))).publish();
  request.subscribe(res => console.log(res.json()));
  requests.push(request);
}

Observable.zip(requests)
.subscribe(console.log("All requests completed"));

requests.forEach(r => r.connect());

The problem is my zip never gets called. I console.log'ged the subscription to each of the 8 switchmaps and I'm getting logs showing eight http calls return successfully each time there is an event in Observable/stream A. (also can see the 8 calls returning in the network tab of the debug tools)

But zip never emits anything.


If I try a different (less ideal) approach:

  • Subscribe to Observable A once (not switchmap)
  • Within subscription create 8 Observables for each http call, and subscribe to ForkJoin of the 8 Observables

Code: (full code at https://plnkr.co/edit/GqQde1Ae2licBjtL0jcj)

let source = Observable
.interval(5000)
.take(100);

 source.subscribe(x=> {
   console.log(x);
   let requests = [];

   for(let i=0; i<8;i++) {
     let request = http.get('https://jsonplaceholder.typicode.com/users/'+(i+1)).publish();
     request.subscribe(res => console.log(res.json()));
     requests.push(request);
   }

   Observable.forkJoin(requests)
   .subscribe(console.log("All requests completed"));

   requests.forEach(r => r.connect());

 });

This works. But with the obvious pitfall that I'm creating 8+1 nested observables/subscriptions each time Observable A emits.

(In both cases I'm using publish/connect to share/reuse subscriptions, but the problem exists even without it)

2
  • 1
    Show some code. Commented Jul 4, 2017 at 8:52
  • Added it @RobinDijkhof . Took some time to distil the main logic from my application
    – flak37
    Commented Jul 4, 2017 at 12:04

1 Answer 1

9

You first example would work if you call zip correctly with multiple arguments and pass a function to subscribe (not the result of console.log which is undefined). Demo.

Observable.zip(...requests) // <-- spread this 
    .subscribe(() => console.log("All requests completed")); // <-- pass a function

requests.forEach(r => r.connect());
6
  • 2
    This works! Thanks. But I'm curious why forkJoin, combineLatest etc work without spreading the argument. (And also why subscribe works directly with console.log() with them, without passing a lambda/arrow function) ?
    – flak37
    Commented Jul 4, 2017 at 13:00
  • 2
    @flak37 "why forkJoin, combineLatest etc work without spreading the argument" ad-hoc polymorphism. They have special handling when first arg is array. code Commented Jul 4, 2017 at 13:25
  • 1
    @flak37 "And also why subscribe works directly with console.log() with them" It was not actually working. Simple proof: plnkr.co/edit/k0XW99xu8WLdMY3ecOl9?p=preview Commented Jul 4, 2017 at 13:27
  • Oh thanks! Finally can you please explain why in the last plunk "All requests completed <-- not true" is getting printed on every subscribe? Is it simply undefined-like behaviour due to incorrectly passing the console.log function?
    – flak37
    Commented Jul 4, 2017 at 13:31
  • 1
    @flak37 The same reason why "'This line should come before ..." is getting printed as well: all this code is running inside onNext handler. Commented Jul 4, 2017 at 13:35

Not the answer you're looking for? Browse other questions tagged or ask your own question.