system.reactive Getting started with system.reactive

Help us to keep this website almost Ad Free! It takes only 10 seconds of your time:
> Step 1: Go view our video on YouTube: EF Core Bulk Insert
> Step 2: And Like the video. BONUS: You can also share it!

Remarks

This section provides an overview of what system.reactive is, and why a developer might want to use it.

It should also mention any large subjects within system.reactive, and link out to the related topics. Since the Documentation for system.reactive is new, you may need to create initial versions of those related topics.

Filtering the values of an observable

emails.Where(email => email.From == "John")
 

Get a running aggregation

Suppose you have a hot observable for which you would love to keep the count of. It could be the IObservable<StockTick> and you want to keep count of the average trade volume. You can use Scan for that.

var tradeVolume = stockTicks.Select(e => e.Price)
    .Scan(0.0m, (aggregated, newtick) => aggregated + newtick)
    .Select((aggregated, index) => aggregated / (index + 1))
 

Now you can simply subscribe to your trade volume which is live updated upon receipt of every new Tick.

var subscription = tradeVolume.Subscribe(vol => Console.WriteLine("New trade volume is {0}", vol);
 

Ignoring repeated values

There are two operators for filtering duplicates:

emails.Distinct(); // Never see the same value twice
emails.DistinctUntilChanged(); // Never see the same value twice in a row
 

You can also pass in a predicate:

emails.DistinctUntilChanged(x => x.Length); // Never see the same length email twice in a row
 

Installation or Setup

Reactive Extensions are published on both NuGet and MyGet.

Installing and using them is therefore the same as any other NuGet package:

 Install-Package System.Reactive
 

NB package names changed between v2 and v3. See the README on Github for more info

Breaking changes

The NuGet packages have changed their package naming in the move from v2.x.x to >v3.0.0

Rx-Main is now System.Reactive Rx-Core is now System.Reactive.Core Rx-Interfaces is now System.Reactive.Interfaces Rx-Linq is now System.Reactive.Linq Rx-PlatformServices is now System.Reactive.PlatformServices Rx-Testing is now Microsoft.Reactive.Testing

Selecting a new value for each value in an observable

emails.Select(email => email.Body)
 

Sharing a single subscription (Publish + RefCount)

This code will subscribe to the emails observable twice:

emails.Where(email => email.From == "John").Subscribe(email => Console.WriteLine("A"));
emails.Where(email => email.From == "Mary").Subscribe(email => Console.WriteLine("B"));
 

To share a single subscription to emails , we can use Publish and RefCount instead:

var _emails = emails.Publish().RefCount();
_emails.Where(email => email.From == "John").Subscribe(email => Console.WriteLine("A"));
_emails.Where(email => email.From == "Mary").Subscribe(email => Console.WriteLine("B"));
 

Sharing a single subscription (Publish)

Given an IObservable<Offer> of offers from merchants to buy or sell some type of item at a fixed price, we can match pairs of buyers and sellers as follows:

var sellers = offers.Where(offer => offer.IsSell).Select(offer => offer.Merchant);
var buyers = offers.Where(offer => offer.IsBuy).Select(offer => offer.Merchant);
var trades = Observable.Zip(sellers, buyers, (seller, buyer) => new Trade(seller, buyer));
 

The problem with this is that each subscription to trades will subscribe to offers twice. We can make sellers and buyers share a single subscription to offers by using Publish :

var trades = offers.Publish(_offers =>
{
    var sellers = _offers.Where(offer => offer.IsSell).Select(offer => offer.User);
    var buyers = _offers.Where(offer => offer.IsBuy).Select(offer => offer.User);
    return Observable.Zip(sellers, buyers, (seller, buyer) => new Trade(seller, buyer));
});
 

Subscribing to an observable (CancellationToken)

emails.Subscribe(email =>
    Console.WriteLine("Email from {0} to {1}", email.From, email.To),
    cancellationToken);
 

Subscribing/unsubscribing to an observable (IDisposable)

Subscription returns an IDisposable :

IDisposable subscription = emails.Subscribe(email =>
    Console.WriteLine("Email from {0} to {1}", email.From, email.To));
 

When you are ready to unsubscribe, simply dispose the subscription:

subscription.Dispose();
 

Throttling a stream

Let's say you need to implement an automatic search box, but the search operation is somewhat costly, like sending a web request or hitting up a database. You may want to limit the amount of search being done.

For example, the user is typing "C# Reactive Extensions" in the search box :

IObservable<string> TypingSearchText()
{
    return Observable.Create<string>(o =>
    {
        const string SearchText = "C# Reactive Extensions";
        
        var builder = new StringBuilder();
        foreach (var c in SearchText)
        {
            builder.Append(c);
            
            // notify that the search text has been changed
            o.OnNext(builder.ToString());

            // pause between each character to simulate actual typing
            Thread.Sleep(125);
            
            // spent some time to think about the next word to type
            if (c == ' ')
                Thread.Sleep(1000);
        }
        
        o.OnCompleted();

        return () => { /* nothing to dispose here */ };
    });
}
 

Now, we don't want to perform the search every time the user presses a key. Instead, it will be done whenever the user stops typing longer than half a second :

TypingSearchText()
    // print the changes
    .Do(x => Console.WriteLine("Typing: " + x))
    
    // ignore changes that happens within 500ms of each other
    .Throttle(TimeSpan.FromMilliseconds(500))
    
    // some costly operation
    .Subscribe(x => Console.WriteLine("Searching: " + x));
 

Output :

Typing: C
Typing: C#
Typing: C# 
Searching: C# 
Typing: C# R
Typing: C# Re
...
Typing: C# Reactive
Typing: C# Reactive 
Searching: C# Reactive 
Typing: C# Reactive E
Typing: C# Reactive Ex
...
Typing: C# Reactive Extension
Typing: C# Reactive Extensions
Searching: C# Reactive Extensions
 

Using Rx in your project

Install the NuGet package System.Reactive , then add this using statement to access the Rx extension methods:

using System.Reactive.Linq;
 

Wrapping an async method as an observable

Given an async method like this:

Task<string> GetNameAsync(CancellationToken cancellationToken)
 

Wrap it as an IObservable<string> like this:

Observable.FromAsync(cancellationToken => GetNameAsync(cancellationToken))
 


Got any system.reactive Question?