system.reactiveDémarrer avec system.reactive


Remarques

Cette section fournit une vue d'ensemble de ce qu'est system.reactive et pourquoi un développeur peut vouloir l'utiliser.

Il devrait également mentionner tous les grands sujets dans system.reactive, et établir un lien avec les sujets connexes. La documentation de system.reactive étant nouvelle, vous devrez peut-être créer des versions initiales de ces rubriques connexes.

Filtrage des valeurs d'une observable

emails.Where(email => email.From == "John")
 

Obtenir une agrégation en cours d'exécution

Supposons que vous ayez une observable chaude pour laquelle vous aimeriez garder le compte. Il peut s'agir du IObservable<StockTick> et que vous souhaitez conserver le nombre moyen de transactions. Vous pouvez utiliser Scan pour cela.

var tradeVolume = stockTicks.Select(e => e.Price)
    .Scan(0.0m, (aggregated, newtick) => aggregated + newtick)
    .Select((aggregated, index) => aggregated / (index + 1))
 

Maintenant, vous pouvez simplement vous abonner à votre volume de transactions qui est mis à jour en direct à la réception de chaque nouveau tick.

var subscription = tradeVolume.Subscribe(vol => Console.WriteLine("New trade volume is {0}", vol);
 

Ignorer les valeurs répétées

Il y a deux opérateurs pour filtrer les doublons:

emails.Distinct(); // Never see the same value twice
emails.DistinctUntilChanged(); // Never see the same value twice in a row
 

Vous pouvez aussi passer un prédicat:

emails.DistinctUntilChanged(x => x.Length); // Never see the same length email twice in a row
 

Installation ou configuration

Les extensions réactives sont publiées sur NuGet et MyGet .

Leur installation et leur utilisation sont donc les mêmes que pour tout autre package NuGet:

 Install-Package System.Reactive
 

NB les noms de paquets ont changé entre v2 et v3. Voir le README sur Github pour plus d'informations

Briser les changements

Les packages NuGet ont changé leur nom de package lors du passage de v2.xx à> v3.0.0

Rx-Main est maintenant System.Reactive Rx-Core est maintenant System.Reactive.Core Rx-Interfaces est maintenant System.Reactive.Interfaces Rx-Linq est maintenant System.Reactive.Linq Rx-PlatformServices est maintenant System.Reactive.PlatformServices Rx- Les tests sont maintenant Microsoft.Reactive.Testing

Sélection d'une nouvelle valeur pour chaque valeur dans une observable

emails.Select(email => email.Body)
 

Partage d'un seul abonnement (Publier + RefCount)

Ce code sera abonné aux emails observables deux fois:

emails.Where(email => email.From == "John").Subscribe(email => Console.WriteLine("A"));
emails.Where(email => email.From == "Mary").Subscribe(email => Console.WriteLine("B"));
 

Pour partager un seul abonnement à des emails , nous pouvons utiliser Publish et RefCount place:

var _emails = emails.Publish().RefCount();
_emails.Where(email => email.From == "John").Subscribe(email => Console.WriteLine("A"));
_emails.Where(email => email.From == "Mary").Subscribe(email => Console.WriteLine("B"));
 

Partage d'un seul abonnement (Publier)

Étant donné que IObservable<Offer> d' offers de commerçants pour acheter ou vendre un type d'article à un prix fixe, nous pouvons faire correspondre les paires d'acheteurs et de vendeurs comme suit:

var sellers = offers.Where(offer => offer.IsSell).Select(offer => offer.Merchant);
var buyers = offers.Where(offer => offer.IsBuy).Select(offer => offer.Merchant);
var trades = Observable.Zip(sellers, buyers, (seller, buyer) => new Trade(seller, buyer));
 

Le problème, c'est que chaque abonnement à des trades s'abonnera aux offers deux fois. Nous pouvons faire en sorte que les sellers et les buyers partagent un abonnement unique aux offers en utilisant Publish :

var trades = offers.Publish(_offers =>
{
    var sellers = _offers.Where(offer => offer.IsSell).Select(offer => offer.User);
    var buyers = _offers.Where(offer => offer.IsBuy).Select(offer => offer.User);
    return Observable.Zip(sellers, buyers, (seller, buyer) => new Trade(seller, buyer));
});
 

S'abonner à une observable (CancellationToken)

emails.Subscribe(email =>
    Console.WriteLine("Email from {0} to {1}", email.From, email.To),
    cancellationToken);
 

Abonnement / désabonnement à une observable (IDisposable)

L'abonnement renvoie un IDisposable :

IDisposable subscription = emails.Subscribe(email =>
    Console.WriteLine("Email from {0} to {1}", email.From, email.To));
 

Lorsque vous êtes prêt à vous désabonner, il vous suffit de supprimer l’abonnement:

subscription.Dispose();
 

Limiter un flux

Disons que vous devez implémenter un champ de recherche automatique, mais l'opération de recherche est un peu coûteuse, comme l'envoi d'une requête Web ou la création d'une base de données. Vous voudrez peut-être limiter la quantité de recherche en cours.

Par exemple, l'utilisateur tape "Extensions réactives C #" dans la zone de recherche:

IObservable<string> TypingSearchText()
{
    return Observable.Create<string>(o =>
    {
        const string SearchText = "C# Reactive Extensions";
        
        var builder = new StringBuilder();
        foreach (var c in SearchText)
        {
            builder.Append(c);
            
            // notify that the search text has been changed
            o.OnNext(builder.ToString());

            // pause between each character to simulate actual typing
            Thread.Sleep(125);
            
            // spent some time to think about the next word to type
            if (c == ' ')
                Thread.Sleep(1000);
        }
        
        o.OnCompleted();

        return () => { /* nothing to dispose here */ };
    });
}
 

Maintenant, nous ne voulons pas effectuer la recherche chaque fois que l'utilisateur appuie sur une touche. Au lieu de cela, cela sera fait chaque fois que l'utilisateur arrête de taper plus d'une demi-seconde:

TypingSearchText()
    // print the changes
    .Do(x => Console.WriteLine("Typing: " + x))
    
    // ignore changes that happens within 500ms of each other
    .Throttle(TimeSpan.FromMilliseconds(500))
    
    // some costly operation
    .Subscribe(x => Console.WriteLine("Searching: " + x));
 

Sortie:

Typing: C
Typing: C#
Typing: C# 
Searching: C# 
Typing: C# R
Typing: C# Re
...
Typing: C# Reactive
Typing: C# Reactive 
Searching: C# Reactive 
Typing: C# Reactive E
Typing: C# Reactive Ex
...
Typing: C# Reactive Extension
Typing: C# Reactive Extensions
Searching: C# Reactive Extensions
 

Utiliser Rx dans votre projet

Installez le package NuGet System.Reactive , puis ajoutez cette instruction using pour accéder aux méthodes d'extension Rx:

using System.Reactive.Linq;
 

Envelopper une méthode asynchrone comme une observable

Étant donné une méthode async comme celle-ci:

Task<string> GetNameAsync(CancellationToken cancellationToken)
 

IObservable<string> comme une IObservable<string> comme ceci:

Observable.FromAsync(cancellationToken => GetNameAsync(cancellationToken))