system.reactiveAan de slag met system.reactive


Opmerkingen

Deze sectie geeft een overzicht van wat system.reactive is en waarom een ontwikkelaar het misschien wil gebruiken.

Het moet ook alle grote onderwerpen binnen system.reactive vermelden en een link naar de gerelateerde onderwerpen bevatten. Aangezien de documentatie voor system.reactive nieuw is, moet u mogelijk eerste versies van die gerelateerde onderwerpen maken.

De waarden van een waarneembare filteren

emails.Where(email => email.From == "John")
 

Ontvang een lopende aggregatie

Stel dat je een hete waarnemer hebt waarvan je de telling graag zou willen houden. Het kan de IObservable<StockTick> en u wilt het gemiddelde handelsvolume bijhouden. Daar kunt u Scan voor gebruiken.

var tradeVolume = stockTicks.Select(e => e.Price)
    .Scan(0.0m, (aggregated, newtick) => aggregated + newtick)
    .Select((aggregated, index) => aggregated / (index + 1))
 

Nu kunt u zich eenvoudig abonneren op uw handelsvolume dat live wordt bijgewerkt bij ontvangst van elke nieuwe Tick.

var subscription = tradeVolume.Subscribe(vol => Console.WriteLine("New trade volume is {0}", vol);
 

Negerende waarden negeren

Er zijn twee operators voor het filteren van duplicaten:

emails.Distinct(); // Never see the same value twice
emails.DistinctUntilChanged(); // Never see the same value twice in a row
 

Je kunt ook een predikaat doorgeven:

emails.DistinctUntilChanged(x => x.Length); // Never see the same length email twice in a row
 

Installatie of instellingen

Reactive Extensions worden gepubliceerd op zowel NuGet als MyGet .

Het installeren en gebruiken ervan is daarom hetzelfde als elk ander NuGet-pakket:

 Install-Package System.Reactive
 

NB pakketnamen zijn gewijzigd tussen v2 en v3. Zie de README op Github voor meer info

Veranderingen doorbreken

De NuGet-pakketten hebben hun pakketnaam gewijzigd in de overgang van v2.xx naar> v3.0.0

Rx-Main is nu System.Reactive Rx-Core is nu System.Reactive.Core Rx-Interfaces is nu System.Reactive.Interfaces Rx-Linq is nu System.Reactive.Linq Rx-PlatformServices is nu System.Reactive.PlatformServices Rx- Testen is nu Microsoft.Reactive.Testing

Een nieuwe waarde selecteren voor elke waarde in een waarneembaar

emails.Select(email => email.Body)
 

Eén abonnement delen (Publiceren + RefCount)

Deze code zal zich abonneren op de emails twee keer kunnen worden bekeken:

emails.Where(email => email.From == "John").Subscribe(email => Console.WriteLine("A"));
emails.Where(email => email.From == "Mary").Subscribe(email => Console.WriteLine("B"));
 

Om een enkel abonnement op emails te delen, kunnen we in plaats daarvan Publish en RefCount gebruiken:

var _emails = emails.Publish().RefCount();
_emails.Where(email => email.From == "John").Subscribe(email => Console.WriteLine("A"));
_emails.Where(email => email.From == "Mary").Subscribe(email => Console.WriteLine("B"));
 

Eén abonnement delen (publiceren)

Gegeven een IObservable<Offer> van offers van verkopers om een bepaald type item tegen een vaste prijs te kopen of verkopen, kunnen we paren kopers en verkopers als volgt matchen:

var sellers = offers.Where(offer => offer.IsSell).Select(offer => offer.Merchant);
var buyers = offers.Where(offer => offer.IsBuy).Select(offer => offer.Merchant);
var trades = Observable.Zip(sellers, buyers, (seller, buyer) => new Trade(seller, buyer));
 

Het probleem hiermee is dat elk abonnement op trades tweemaal op offers wordt geabonneerd. We kunnen sellers en buyers een enkel abonnement op offers door Publish :

var trades = offers.Publish(_offers =>
{
    var sellers = _offers.Where(offer => offer.IsSell).Select(offer => offer.User);
    var buyers = _offers.Where(offer => offer.IsBuy).Select(offer => offer.User);
    return Observable.Zip(sellers, buyers, (seller, buyer) => new Trade(seller, buyer));
});
 

Abonneren op een waarneembaar (annuleringstoken)

emails.Subscribe(email =>
    Console.WriteLine("Email from {0} to {1}", email.From, email.To),
    cancellationToken);
 

In- / uitschrijven op een waarneembare (IDisposable)

Abonnement retourneert een IDisposable :

IDisposable subscription = emails.Subscribe(email =>
    Console.WriteLine("Email from {0} to {1}", email.From, email.To));
 

Wanneer u klaar bent om u uit te schrijven, verwijdert u eenvoudig het abonnement:

subscription.Dispose();
 

Een stroom smoren

Stel dat u een automatisch zoekvak moet implementeren, maar de zoekbewerking is enigszins duur, zoals het verzenden van een webverzoek of het openen van een database. Misschien wilt u de hoeveelheid zoekopdrachten beperken.

De gebruiker typt bijvoorbeeld "C # Reactive Extensions" in het zoekvak:

IObservable<string> TypingSearchText()
{
    return Observable.Create<string>(o =>
    {
        const string SearchText = "C# Reactive Extensions";
        
        var builder = new StringBuilder();
        foreach (var c in SearchText)
        {
            builder.Append(c);
            
            // notify that the search text has been changed
            o.OnNext(builder.ToString());

            // pause between each character to simulate actual typing
            Thread.Sleep(125);
            
            // spent some time to think about the next word to type
            if (c == ' ')
                Thread.Sleep(1000);
        }
        
        o.OnCompleted();

        return () => { /* nothing to dispose here */ };
    });
}
 

Nu willen we de zoekopdracht niet elke keer uitvoeren als de gebruiker op een toets drukt. In plaats daarvan wordt dit gedaan wanneer de gebruiker langer dan een halve seconde stopt met typen:

TypingSearchText()
    // print the changes
    .Do(x => Console.WriteLine("Typing: " + x))
    
    // ignore changes that happens within 500ms of each other
    .Throttle(TimeSpan.FromMilliseconds(500))
    
    // some costly operation
    .Subscribe(x => Console.WriteLine("Searching: " + x));
 

Uitgang:

Typing: C
Typing: C#
Typing: C# 
Searching: C# 
Typing: C# R
Typing: C# Re
...
Typing: C# Reactive
Typing: C# Reactive 
Searching: C# Reactive 
Typing: C# Reactive E
Typing: C# Reactive Ex
...
Typing: C# Reactive Extension
Typing: C# Reactive Extensions
Searching: C# Reactive Extensions
 

Rx gebruiken in uw project

Installeer de Nuget pakket System.Reactive , voeg dan deze met behulp van verklaring voor toegang tot het Rx extension methods:

using System.Reactive.Linq;
 

Een async-methode inpakken als waarneembaar

Gegeven een async methode zoals deze:

Task<string> GetNameAsync(CancellationToken cancellationToken)
 

Verpak het als een IObservable<string> als volgt:

Observable.FromAsync(cancellationToken => GetNameAsync(cancellationToken))