[C#] Rx.NET 筆記 (不定時更新)

最近在專案中開始使用 Rx.NET 了,剛開始有些不太適應,不過大致熟悉後,寫起來還是很過癮,不輸給使用 RxJS,以下做一些簡單的筆記,讓對 Rx.NET 有興趣的朋友能快速上手。

因為之前都是寫 RxJS 居多,也會有一些跟 RxJS 的比較,不過主軸還是以 Rx.NET 為主。

安裝 Rx.NET

v3.0 之後的套件名稱為 System.Reactive.*,從 NuGet 安裝只要搜尋 System.Reactive 即可,目前最新版本為 5.0.0。

  • 使用 .NET CLI
dotnet add package System.Reactive
  • 使用 Package Manager
Install-Package System.Reactive

Chain 非 Pipe

在 RxJS 中我們會用 pipe() 來串接多個 operators,對於前端來說這樣最大的好處是便於 tree shaking,而 Rx.NET 基本上沒 tree shaking 的困擾,同時也為了符合 LINQ 操作的感覺,所以都是用 chain 的方式,一個一個用 . 去接下一個 operator。

var subscription = source
    .Where(user => user.Age >= 18)
    .Select(data => data.Name)
    .Subscribe(data => { Console.WriteLine(data.Name) });

每一個 operator 都是 IObservable 的 extension method。

觀察者 Observer

呼叫 Subscribe 訂閱後,一樣是提供 3 個 callback function,依次是 OnNextOnErrorOnCpmplete;必須照順序撰寫,只有 OnNext 的時候其他可以省略不寫。

var subscription = source
    .Subscribe(
        data => { Console.WriteLine(data.Name); }, // OnNext callback
        error => { Console.WriteLine("Error"); }, // OnError callback
        () => { Console.WriteLine("Complete"); }
    );

取消訂閱

在 RxJS 我們會用 .unsibscribe() 來取消訂閱一個 Subscription,而在 Rx.NET 內名稱為 Dispose(),算是對齊 .NET 處理物件的習慣。

var subscription = source.Subscribe(Console.WriteLine);
subscription.Dispose();

建立 Observable

Observable.Return

對應到 RxJS 的 of

Observable
    .Return(1)
    .Subscribe(Console.WriteLine);

Observable.FromEventPattern

當需要整合別人寫的 Library 使用 event 時很有機會用到

private event EventHandler<int> MyEvent;

public void MyFunc()
{
    var source = Observable.FromEventPattern<int>(
        addHandler => MyEvent += addHandler,
        removeHandler => MyEvent -= removeHandler);

    var subscription = source
        .Select(data => data.EventArgs)
        .Filter(result => result > 0)
        .Subscribe(Console.WriteLine);

    subscription.Dispose();
}

Observable.Timer / Observable.Interval

名稱都跟 RxJS 使用一樣,但傳入的時間可以用 .NET 內建的 TimeSpan,可讀性好多了

Observable
    .Timer(TimeSpan.FromSeconds(1.5), TimeSpan.FromSeconds(0.5))
    .Take(5)
    .Subscribe(Console.WriteLine);

在 Rx.NET 內所有 Operators 只要設定時間參數都可以使用 .NET 內建的 TimeSpan

Observable
    .Return("Mike")
    .Delay(TimeSpan.FromSeconds(1))
    .Subscribe(Console.WriteLine);

Hot Observable - Subject 系列

這部分使用上基本跟 RxJS 習慣一樣

var subject = new Subject<int>();
var behaviorSubject = new BehaviorSubject<int>("test");
var replaySubject = new ReplaySubject<int>(1);

有一個很大的重點是一定要指定型別,因為 C# 是強型別語言,直接 new Subject() 不指定型別的話編譯是不會過的。

當有新的事件要送出時,使用 OnNextOnErrorOnCompleted

var subject = new Subject<int>();
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);

subject.OnError(new Exception("Error"));

subject.OnCompleted();

Cold Observable

使用 Observable.create 建立新的 cold observable

Observable
    .Create<int>(observer =>
    {
        observer.OnNext(1);
        observer.OnNext(2);
        observer.OnCompleted();
        return Disposable.Empty;
    });

最後要回傳一個「訂閱取消時的處理方法」,這裡回傳 Disposable.Empty 代表的是不處理 (說穿了,就是一個實作 Dispose() 方法但裡面什麼事情都沒做的物件)。

在 RxJS 內沒有強制規定要回傳這個處理方法,但 C# 是強型別語言,因此有設定回傳型別的方法就一定要回傳。

這個「訂閱取消時的處理方法」如果實際要處理,範例如下:

var source = Observable
    .Create<int>(observer =>
    {
        observer.OnNext(1);
        observer.OnNext(2);
        return () => { Console.WriteLine("Unsubscribe"); };
    });
    
var subscription = source
    .Subscribe(result => Console.WriteLine(result));

subscription.Dispose();

// 1
// 2
// Unsubscribe

幾個常用的 Operators

Select

就是 RxJS 的 map,在 Rx.NET 用 Select 是為了對應到 LINQ 的習慣。

source.Select(num => num + 1);

Where

RxJS 的 filter,一樣是對應到 LINQ 的習慣

source.Where(user => user.Age >= 18);

Do

RxJS 過去也叫做 do ,後來改名叫做 tap,而在 Rx.NET 內依然叫做 Do

source
    .Do(value => Console.WriteLine($"Current: {value}")
    .Subscribe();

跟 side effect 的操作都盡可能在 Do 裡面處理。

Select + Switch / Merge / Concat

Rx.NET 沒有內建 switchMapmergeMapconcatMap,必須先用 Select 轉換成目標 Observable,再使用 SwitchMergeConcat

Observable
    .Range(1, 3)
    // 先用 Select 換成另一個 Observable
    .Select(value => 
        Observable
            .Return(value)
            .Delay(TimeSpan.FromSeconds(value)))
    // 在用 Switch / Merge / Concat 等組合這些 Observable
    .Concat()
    .Subscribe(Console.WriteLine);

Rx.NET 沒有實作 exhaustMap,必須要自己寫 (參考寫法)。

First / FirstAsync

這個 Operator 本身沒有什麼特別的,但在撰寫 ASP.NET WebAPI 專案時卻有極大的幫助,由於 Observable 都是在 Subscribe 處理,而撰寫 ASP.NET WebAPI 時候是回傳 IActionResult 或是直接回傳結果,因此回傳一個 Observable<T> 是沒有用的。由於 Observable 本身有 stream 的概念,看起來似乎和 WebAPI 這種一次性回應不搭,但只要用 First 就可以轉成非 stream 的結果了。

public Student GetStudent(int id)
{
    return studentService
        .GetStudent(id) // 假設 GetStudent 回傳的是一個 Observable
        .First();
}

或是使用 IActionResult

public IActionResult GetStudent(int id)
{
    return StudentService
        .GetStudent(id)
        .Select(student => Ok(student))
        .First();
}

會使用到 Observable,通常都是用在非同步的處理上,因此通常使用 FirstAsyn 然後搭配 async/await 應該會是比較合理的

public async Task<IActionResult> GetStudentAsync(int id)
{
    return await StudentService
        .GetStudent(id)
        .Select(student => Ok(student))
        .FirstAsync();
}

使用 FirstFirstAsync 只是個範例,重點在把一個 stream 轉為非 stream 的物件,所以依照情境不同,使用 ToListAsync 或其他處理方式也是有可能的!

另外,如果規則夠明確,也可以乾脆寫個 Middleware 來處理,那麼就真的只要回傳 Observable 就可以囉!

自訂 Operator

Rx.NET 內所有的 Operator 都是擴充方法,擴充 IObservable 所以我們也只需要替 IObservable 撰寫擴充方法,並回傳一個新的 IObservable 即可。

public static IObservable<Student> FilterStudnetScore(this IObservable<Student> source, int passScore)
{
    return source.Where(student => student.Score >= passScore)
}

其他功能

ToObservable

所有 IEnumerable 型別的物件都可以使用 ToObservable 擴充方法,幫助我們直接把集合轉換成 Observable,類似 RxJS 的 fromfrom 可以接受更多來源。

(new List<int>() { 1, 3, 5 })
  .ToObservable()
  .Select(value => value + 1)
  .Subscribe(Console.WriteLine);

ToEnumerable

ToObservable 剛好相反,把一個 Observable 轉換成 IEnumerable 集合,類似 RxJS 的 toArray

Observable
    .Interval
    .ToEnumerable();

ToEvent / ToEventPattern

一樣跟其他 libraries 整合時有機會用到,將 Observable 轉換成 event。

幾個 Resources

如果您覺得我的文章有幫助,歡迎免費成為 LikeCoin 會員,幫我的文章拍手 5 次表示支持!