system.reactivesystem.reactive入门


备注

本节概述了system.reactive是什么,以及开发人员可能想要使用它的原因。

它还应该提及system.reactive中的任何大型主题,并链接到相关主题。由于system.reactive的Documentation是新的,您可能需要创建这些相关主题的初始版本。

过滤可观察的值

emails.Where(email => email.From == "John")
 

获取正在运行的聚合

假设你有一个热门的观察者,你很想保留它的数量。它可能是IObservable<StockTick> ,您想要保持平均交易量的计数。您可以使用“ Scan ”。

var tradeVolume = stockTicks.Select(e => e.Price)
    .Scan(0.0m, (aggregated, newtick) => aggregated + newtick)
    .Select((aggregated, index) => aggregated / (index + 1))
 

现在您可以简单地订阅您的交易量,该交易量在收到每个新的Tick时实时更新。

var subscription = tradeVolume.Subscribe(vol => Console.WriteLine("New trade volume is {0}", vol);
 

忽略重复的值

有两个运算符用于过滤重复项:

emails.Distinct(); // Never see the same value twice
emails.DistinctUntilChanged(); // Never see the same value twice in a row
 

您还可以传入谓词:

emails.DistinctUntilChanged(x => x.Length); // Never see the same length email twice in a row
 

安装或设置

Reactive Extensions在NuGetMyGet上发布

因此,安装和使用它们与任何其他NuGet包相同:

 Install-Package System.Reactive
 

NB包名称在v2和v3之间更改。 有关详细信息,请参阅Github上的README

打破变化

在从v2.xx迁移到> v3.0.0时,NuGet包已更改其包命名

Rx-Main现在是System.Reactive Rx-Core现在是System.Reactive.Core Rx-Interfaces现在是System.Reactive.Interfaces Rx-Linq现在是System.Reactive.Linq Rx-PlatformServices现在是System.Reactive.PlatformServices Rx-现在测试是Microsoft.Reactive.Testing

为可观察的每个值选择一个新值

emails.Select(email => email.Body)
 

共享一个订阅(Publish + RefCount)

此代码将订阅两次可观​​察的emails

emails.Where(email => email.From == "John").Subscribe(email => Console.WriteLine("A"));
emails.Where(email => email.From == "Mary").Subscribe(email => Console.WriteLine("B"));
 

要共享单个订阅emails ,我们可以使用PublishRefCount

var _emails = emails.Publish().RefCount();
_emails.Where(email => email.From == "John").Subscribe(email => Console.WriteLine("A"));
_emails.Where(email => email.From == "Mary").Subscribe(email => Console.WriteLine("B"));
 

共享单个订阅(发布)

鉴于商家offersIObservable<Offer> 以固定价格购买或出售某种类型的商品,我们可以按如下方式匹配买家和卖家对:

var sellers = offers.Where(offer => offer.IsSell).Select(offer => offer.Merchant);
var buyers = offers.Where(offer => offer.IsBuy).Select(offer => offer.Merchant);
var trades = Observable.Zip(sellers, buyers, (seller, buyer) => new Trade(seller, buyer));
 

这里的问题是,每个订阅trades 将认购offers 的两倍。我们可以让sellersbuyers 分享单一订阅offers 使用Publish

var trades = offers.Publish(_offers =>
{
    var sellers = _offers.Where(offer => offer.IsSell).Select(offer => offer.User);
    var buyers = _offers.Where(offer => offer.IsBuy).Select(offer => offer.User);
    return Observable.Zip(sellers, buyers, (seller, buyer) => new Trade(seller, buyer));
});
 

订阅一个observable(CancellationToken)

emails.Subscribe(email =>
    Console.WriteLine("Email from {0} to {1}", email.From, email.To),
    cancellationToken);
 

订阅/取消订阅可观察(IDisposable)

订阅返回IDisposable

IDisposable subscription = emails.Subscribe(email =>
    Console.WriteLine("Email from {0} to {1}", email.From, email.To));
 

当您准备取消订阅时,只需处理订阅:

subscription.Dispose();
 

限制流

假设您需要实现自动搜索框,但搜索操作有点昂贵,例如发送Web请求或命中数据库。您可能希望限制正在执行的搜索量。

例如,用户在搜索框中键入“C#Reactive Extensions”:

IObservable<string> TypingSearchText()
{
    return Observable.Create<string>(o =>
    {
        const string SearchText = "C# Reactive Extensions";
        
        var builder = new StringBuilder();
        foreach (var c in SearchText)
        {
            builder.Append(c);
            
            // notify that the search text has been changed
            o.OnNext(builder.ToString());

            // pause between each character to simulate actual typing
            Thread.Sleep(125);
            
            // spent some time to think about the next word to type
            if (c == ' ')
                Thread.Sleep(1000);
        }
        
        o.OnCompleted();

        return () => { /* nothing to dispose here */ };
    });
}
 

现在,我们不希望每次用户按下键时都执行搜索。相反,只要用户停止输入的时间超过半秒,它就会完成:

TypingSearchText()
    // print the changes
    .Do(x => Console.WriteLine("Typing: " + x))
    
    // ignore changes that happens within 500ms of each other
    .Throttle(TimeSpan.FromMilliseconds(500))
    
    // some costly operation
    .Subscribe(x => Console.WriteLine("Searching: " + x));
 

输出:

Typing: C
Typing: C#
Typing: C# 
Searching: C# 
Typing: C# R
Typing: C# Re
...
Typing: C# Reactive
Typing: C# Reactive 
Searching: C# Reactive 
Typing: C# Reactive E
Typing: C# Reactive Ex
...
Typing: C# Reactive Extension
Typing: C# Reactive Extensions
Searching: C# Reactive Extensions
 

在项目中使用Rx

安装NuGet包System.Reactive ,然后添加此using语句以访问Rx扩展方法:

using System.Reactive.Linq;
 

将异步方法包装为observable

给定一个像这样的async 方法:

Task<string> GetNameAsync(CancellationToken cancellationToken)
 

将它包装为IObservable<string> 如下所示:

Observable.FromAsync(cancellationToken => GetNameAsync(cancellationToken))