javascript – 根据特定条件在RxJ中连接两个可观察的流

我有两个对象流,帐户和余额.

我需要根据id和account_id合并(加入)两个流

var accounts = Rx.Observable.from([
    { id: 1, name: 'account 1' },
    { id: 2, name: 'account 2' },
    { id: 3, name: 'account 3' },
]);

var balances = Rx.Observable.from([
    { account_id: 1, balance: 100 },
    { account_id: 2, balance: 200 },
    { account_id: 3, balance: 300 },
]);

预期结果:

var results = [
    { id: 1, name: 'account 1', balance: 100},
    { id: 2, name: 'account 2', balance: 200},
    { id: 3, name: 'account 3', balance: 300},
];

这对RxJs是否可行?

要清楚,我知道如何使用普通的js / lodash或类似的东西来做到这一点.在我的情况下,我从Angular Http模块获取这些流,所以我问我是否可以在这种情况下获得RxJs的好处

最佳答案 根据您的一条评论,您的示例是模拟来自Angular Http调用的流.

所以代替:

var accounts = Rx.Observable.from([
    { id: 1, name: 'account 1' },
    { id: 2, name: 'account 2' },
    { id: 3, name: 'account 3' },
]);

var balances = Rx.Observable.from([
    { account_id: 1, balance: 100 },
    { account_id: 2, balance: 200 },
    { account_id: 3, balance: 300 },
]);

我宁愿说它是:

var accounts = Rx.Observable.of([
    { id: 1, name: 'account 1' },
    { id: 2, name: 'account 2' },
    { id: 3, name: 'account 3' },
]);

var balances = Rx.Observable.of([
    { account_id: 1, balance: 100 },
    { account_id: 2, balance: 200 },
    { account_id: 3, balance: 300 },
]);

为什么:from将逐个发出每个项目,将发出整个数组,我猜你的http响应是整个数组.

那就是说,你可能想要达到的目标是:

const { Observable } = Rx;

// simulate HTTP requests
const accounts$= Rx.Observable.of([
  { id: 1, name: 'account 1' },
  { id: 2, name: 'account 2' },
  { id: 3, name: 'account 3' }
]);

const balances$= Rx.Observable.of([
  { account_id: 1, balance: 100 },
  { account_id: 2, balance: 200 },
  { account_id: 3, balance: 300 }
]);

// utils
const joinArrays = (accounts, balances) =>
  accounts
    .map(account => Object.assign({}, account, { balance: findBalanceByAccountId(balances, account.id).balance }));

const findBalanceByAccountId = (balances, id) =>
  balances.find(balance => balance.account_id === id) || { balance: 0 };

const print = (obj) => JSON.stringify(obj, null, 2)

// use forkJoin to start both observables at the same time and not wait between every request
Observable
  .forkJoin(accounts$, balances$)
  .map(([accounts, balances]) => joinArrays(accounts, balances))
  .do(rslt => console.log(print(rslt)))
  .subscribe();

输出:

[
  {
    "id": 1,
    "name": "account 1",
    "balance": 100
  },
  {
    "id": 2,
    "name": "account 2",
    "balance": 200
  },
  {
    "id": 3,
    "name": "account 3",
    "balance": 300
  }
]

这是一个有效的Plunkr:https://plnkr.co/edit/bc0YHrISu3FT45ftIFwz?p=preview

编辑1:
处理数组以构成结果可能不是最佳的性能,而不是返回一个数组,可能会尝试返回一个具有密钥的对象,即帐户的ID.这样你可以简单地删除findBalanceByAccountId函数并拥有一个更快的应用程序(这里只修改了代码):

const balances$= Rx.Observable.of({
  1: { account_id: 1, balance: 100 },
  2: { account_id: 2, balance: 200 },
  3: { account_id: 3, balance: 300 }
});

// utils
const joinArrays = (accounts, balances) =>
  accounts
    .map(account => Object.assign(
      {}, 
      account, 
      { balance: balances[account.id].balance }
    ));
点赞