Looking for system.reactive Keywords? Try Ask4Keywords

system.reactiveErste Schritte mit system.reactive


Bemerkungen

In diesem Abschnitt erhalten Sie einen Überblick darüber, was system.reactive ist und warum ein Entwickler es möglicherweise verwenden möchte.

Es sollte auch alle großen Themen in system.reactive erwähnen und auf die verwandten Themen verweisen. Da die Dokumentation für system.reactive neu ist, müssen Sie möglicherweise erste Versionen dieser verwandten Themen erstellen.

Die Werte eines Beobachtbaren filtern

emails.Where(email => email.From == "John")
 

Holen Sie sich eine laufende Aggregation

Angenommen, Sie haben ein heißes Observable, für das Sie gerne zählen würden. Es könnte sich um den IObservable<StockTick> und Sie möchten das durchschnittliche Handelsvolumen zählen. Sie können dazu Scan .

var tradeVolume = stockTicks.Select(e => e.Price)
    .Scan(0.0m, (aggregated, newtick) => aggregated + newtick)
    .Select((aggregated, index) => aggregated / (index + 1))
 

Jetzt können Sie einfach Ihr Handelsvolumen abonnieren, das nach Erhalt jedes neuen Ticks live aktualisiert wird.

var subscription = tradeVolume.Subscribe(vol => Console.WriteLine("New trade volume is {0}", vol);
 

Wiederholte Werte werden ignoriert

Es gibt zwei Operatoren zum Filtern von Duplikaten:

emails.Distinct(); // Never see the same value twice
emails.DistinctUntilChanged(); // Never see the same value twice in a row
 

Sie können auch ein Prädikat übergeben:

emails.DistinctUntilChanged(x => x.Length); // Never see the same length email twice in a row
 

Installation oder Setup

Reactive Extensions werden auf NuGet und MyGet veröffentlicht .

Das Installieren und Verwenden derselben ist daher dasselbe wie jedes andere NuGet-Paket:

 Install-Package System.Reactive
 

NB-Paketnamen wurden zwischen Version 2 und Version 3 geändert. Weitere Informationen finden Sie in der Readme-Datei zu Github

Änderungen brechen

Die NuGet-Pakete haben ihre Paketnamen bei der Umstellung von v2.xx auf> v3.0.0 geändert

Rx-Main ist jetzt System.Reactive. Rx-Core ist jetzt System.Reactive.Core Rx-Interfaces ist jetzt System.Reactive.Interfaces. Testen ist jetzt Microsoft.Reactive.Testing

Auswählen eines neuen Wertes für jeden Wert in einem Beobachtbaren

emails.Select(email => email.Body)
 

Ein einzelnes Abonnement gemeinsam nutzen (Publizieren + RefCount)

Dieser Code wird die beobachtbaren emails zweimal abonnieren:

emails.Where(email => email.From == "John").Subscribe(email => Console.WriteLine("A"));
emails.Where(email => email.From == "Mary").Subscribe(email => Console.WriteLine("B"));
 

Um ein einzelnes Abonnement für emails RefCount , können wir stattdessen Publish und RefCount verwenden:

var _emails = emails.Publish().RefCount();
_emails.Where(email => email.From == "John").Subscribe(email => Console.WriteLine("A"));
_emails.Where(email => email.From == "Mary").Subscribe(email => Console.WriteLine("B"));
 

Ein einzelnes Abonnement teilen (Veröffentlichen)

Angesichts eines IObservable<Offer> von offers von Händlern, bestimmte Artikel zu einem Festpreis zu kaufen oder zu verkaufen, können wir Paare von Käufern und Verkäufern wie folgt zusammenstellen:

var sellers = offers.Where(offer => offer.IsSell).Select(offer => offer.Merchant);
var buyers = offers.Where(offer => offer.IsBuy).Select(offer => offer.Merchant);
var trades = Observable.Zip(sellers, buyers, (seller, buyer) => new Trade(seller, buyer));
 

Das Problem dabei ist , dass jedes Abonnement trades abonnieren , werden offers zweimal. Wir können sellers und buyers ein einzelnes Abonnement für offers indem Sie Publish :

var trades = offers.Publish(_offers =>
{
    var sellers = _offers.Where(offer => offer.IsSell).Select(offer => offer.User);
    var buyers = _offers.Where(offer => offer.IsBuy).Select(offer => offer.User);
    return Observable.Zip(sellers, buyers, (seller, buyer) => new Trade(seller, buyer));
});
 

Abonnieren eines Beobachtbaren (Annullierungstoken)

emails.Subscribe(email =>
    Console.WriteLine("Email from {0} to {1}", email.From, email.To),
    cancellationToken);
 

Abonnieren / Abbestellen eines Beobachtbaren (IDisposable)

Das Abonnement gibt ein IDisposable :

IDisposable subscription = emails.Subscribe(email =>
    Console.WriteLine("Email from {0} to {1}", email.From, email.To));
 

Wenn Sie zum Abbestellen bereit sind, entsorgen Sie einfach das Abonnement:

subscription.Dispose();
 

Drosseln eines Streams

Angenommen, Sie müssen ein automatisches Suchfeld implementieren, der Suchvorgang ist jedoch etwas kostspielig, beispielsweise das Senden einer Webanforderung oder das Auffinden einer Datenbank. Möglicherweise möchten Sie die Anzahl der durchgeführten Suchen begrenzen.

Der Benutzer gibt beispielsweise "C # Reactive Extensions" in das Suchfeld ein:

IObservable<string> TypingSearchText()
{
    return Observable.Create<string>(o =>
    {
        const string SearchText = "C# Reactive Extensions";
        
        var builder = new StringBuilder();
        foreach (var c in SearchText)
        {
            builder.Append(c);
            
            // notify that the search text has been changed
            o.OnNext(builder.ToString());

            // pause between each character to simulate actual typing
            Thread.Sleep(125);
            
            // spent some time to think about the next word to type
            if (c == ' ')
                Thread.Sleep(1000);
        }
        
        o.OnCompleted();

        return () => { /* nothing to dispose here */ };
    });
}
 

Nun möchten wir die Suche nicht jedes Mal durchführen, wenn der Benutzer eine Taste drückt. Stattdessen wird dies ausgeführt, wenn der Benutzer länger als eine halbe Sekunde nicht mehr tippt:

TypingSearchText()
    // print the changes
    .Do(x => Console.WriteLine("Typing: " + x))
    
    // ignore changes that happens within 500ms of each other
    .Throttle(TimeSpan.FromMilliseconds(500))
    
    // some costly operation
    .Subscribe(x => Console.WriteLine("Searching: " + x));
 

Ausgabe :

Typing: C
Typing: C#
Typing: C# 
Searching: C# 
Typing: C# R
Typing: C# Re
...
Typing: C# Reactive
Typing: C# Reactive 
Searching: C# Reactive 
Typing: C# Reactive E
Typing: C# Reactive Ex
...
Typing: C# Reactive Extension
Typing: C# Reactive Extensions
Searching: C# Reactive Extensions
 

Verwendung von Rx in Ihrem Projekt

Installieren Sie das NuGet-Paket System.Reactive und fügen Sie diese using-Anweisung hinzu, um auf die Rx-Erweiterungsmethoden zuzugreifen:

using System.Reactive.Linq;
 

Eine asynchrone Methode als beobachtbar einwickeln

Bei einer async Methode wie async :

Task<string> GetNameAsync(CancellationToken cancellationToken)
 

Wickeln Sie es als IObservable<string> wie IObservable<string> ein:

Observable.FromAsync(cancellationToken => GetNameAsync(cancellationToken))