RXJS等待所有可观察者完成并返回结果

我正在尝试创建一个RX流,它将执行一个XHR调用异步列表,然后在进入下一个调用之前等待它们完成.

为了帮助解释这可以在普通的JS中这样写:

try {
    await* [
        ...requests.map(r => angularHttpService.get(`/foo/bar/${r}`))
    ];
} catch(e) { throw e }

// do something

这是我尝试的代码,但它是单独运行它们而不是等待它们全部完成才能继续. (这是一个NGRX效果流,所以它与香草rx略有不同).

mergeMap(
        () => this.requests, concatMap((resqests) => from(resqests))),
        (request) =>
            this.myAngularHttpService
                .get(`foo/bar/${request}`)
                .pipe(catchError(e => of(new HttpError(e))))
    ),
    switchMap(res => new DeleteSuccess())

最佳答案 您可以使用
forkJoin,它将从每个已完成的observable中发出最后一个发射值.以下是链接文档中的示例:

import { mergeMap } from 'rxjs/operators';
import { forkJoin } from 'rxjs/observable/forkJoin';
import { of } from 'rxjs/observable/of';

const myPromise = val =>
  new Promise(resolve =>
    setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
  );

const source = of([1, 2, 3, 4, 5]);
//emit array of all 5 results
const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));
/*
  output:
  [
   "Promise Resolved: 1",
   "Promise Resolved: 2",
   "Promise Resolved: 3",
   "Promise Resolved: 4",
   "Promise Resolved: 5"
  ]
*/
const subscribe = example.subscribe(val => console.log(val));

彼得·B·史密斯也有这个很好的食谱,同样也使用forkJoin:

> Making chained API Calls using @ngrx/Effects

点赞