Deep dive into Rxjs
前段时间看函数式编程时,再次提到了 Rxjs。第一次接触 Rxjs 应该是去年刚进入工作,组内大佬做了一次 Rxjs 和 Websocket 的培训分享,并且后续只了解其基础概念,主要理解了 Rxjs 的订阅者模式,方便用于跨组件通信。Rxjs 中有大量操作符,可以轻松组合复杂异步代码,但在平时业务中所用到的 Rxjs 只是冰山一角。最近把官方文档重新看了一遍,并且看了些案例,做一个总结。
Review
Rxjs 中应该最基础的概念就是 Observable,一切都源于 Observable,在 JS 中有一个和 Observable 很相似的功能,Promise。两者都是用于处理异步操作的方式,但是又有一些明显的区别。下面从五个方面对其区别做出解释:
- 返回值:Promise 是一个单值容器,它只能返回一个值或一个错误。而 Observable 是一个多值容器,它可以连续地返回多个值。
- 执行时机:Promise 是立即执行的,一旦创建就会立即开始执行。而 Observable 是惰性执行的,只有在订阅时才会开始执行。
- 取消操作:Promise 无法取消,一旦创建就无法中途取消执行。而 Observable 可以通过调用 unsubscribe 方法来取消订阅和中止执行。
- 异步:Promise 一直是异步状态,但是 Observable 比较灵活,是否为异步得根据自己的函数来定。
- 组合操作:Observable 提供了丰富的操作符,可以对数据流进行各种转换和组合操作,例如过滤、映射、合并等。而 Promise 只提供了 then 和 catch 方法,相对较少的操作符。
export class Observable<T>
implements Subscribable<T>
{
// ...
subscribe(
observerOrNext?:
| Partial<Observer<T>>
| ((value: T) => void)
): Subscription;
// ...
}
export interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
对于基本的 Observable,可以用两种方法进行订阅,传入一个订阅者对象,或者直接传入 next 函数。
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
});
const observer = {
next: (x) => console.log("got value " + x),
};
observable.subscribe(observer);
observable.subscribe((x) =>
console.log("got value " + x)
);
但是它是被动的,只能通过发送数据给订阅者,无法主动推送数据,这时候就有了 Subject 的概念。
export class Subject<T>
extends Observable<T>
implements SubscriptionLike {}
通过定义其实很容易知道,Subject 是实现了其他内容的特殊 Observable,因此 Subject 也可发送数据给订阅者。
const subject = new Subject();
// 消费者 订阅 生产者subject
subject.subscribe((v) =>
console.log(`subject: ${v}`)
);
// 生产者subject 生产数据
subject.next(1);
// 消费者observer 订阅 生产者observable
observable.subscribe(observer);
// 消费者subject 订阅 生产者observable
observable.subscribe(subject);
subject: 1
got value 1
got value 2
got value 3
subject: 1
subject: 2
subject: 3
从上面的 Log 来看,subject 即可以作为生产者也可以作为消费者,并且 Observable 在创建时,数据流是冷启动的,只有当有订阅者订阅时,它才会开始发送数据。而 Subject 在创建时,数据流是热启动的,可以立即开始发送数据,以及其单播多播的区别也不再过多解释。