rx-java exemple de consommateur producteur avec support de contrepression dans le producteur


Exemple

Le TestProducer de cet exemple produit des objets Integer dans une plage donnée et les transmet à son Subscriber . Il étend la classe Flowable<Integer> . Pour un nouvel abonné, il crée un objet Subscription dont la méthode request(long) est utilisée pour créer et publier les valeurs Integer.

Il est important pour l' Subscription qui est transmis à l' subscriber que la méthode request() qui appelle onNext() sur l'abonné puisse être appelée de manière récursive dans cet appel onNext() . Pour empêcher un débordement de pile, l'implémentation affichée utilise le compteur outStandingRequests et l'indicateur isProducing .

class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

Le consommateur dans cet exemple étend DefaultSubscriber<Integer> et au démarrage et après avoir consommé un Integer demande le suivant. En consommant les valeurs Integer, il y a un petit délai, donc la contre-pression sera créée pour le producteur.

class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

Dans la méthode principale suivante d'une classe de test, le producteur et le consommateur sont créés et connectés:

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

Lors de l'exécution de l'exemple, le fichier journal indique que le consommateur s'exécute en continu, tandis que le producteur ne devient actif que lorsque le tampon Flowable interne de rxjava2 doit être rempli.