1

Topic: Re: RxJava, 1 flow of the generator, n handling flows

Hello, Tyomchik, you wrote: Those> I Do so: 1 flow generates packs of messages (<List <Msg>>), which further  in n-strimov <Msg> and then subscribe. After waiting on subscription.unsubscribe. As a result the very first flow continues to give rise to messages, they simply do not leave further. How correctly to do unsubscribe that all  stopped? Refcount

2

Re: Re: RxJava, 1 flow of the generator, n handling flows

Hello, Tyomchik, you wrote: Those> I Do so: 1 flow generates packs of messages (<List <Msg>>), which further  in n-strimov <Msg> and then subscribe. After waiting on subscription.unsubscribe. As a result the very first flow continues to give rise to messages, they simply do not leave further. How correctly to do unsubscribe that all  stopped? On a subject - an example, and that of variants much - is possible refcount (but it is terrible as it is difficultly supervised) it is possible subscribe to cause only on the first and at it not to do unsubscribe. On here, it is the reactor, but it all one hogwash, consider that Flux it Observable: @Test public void connectableFluxDispose () throws Exception {CountDownLatch stoppedLatch = new CountDownLatch (1); Flux <String> stringFlux = Flux.create (emmiter-> {Thread thread = new Thread (()-> {while (! Thread.currentThread ().isInterrupted ()) {emmiter.next (System.currentTimeMillis () + ""); Uninterruptibles.sleepUninterruptibly (1, TimeUnit. NANOSECONDS);} stoppedLatch.countDown ();}); thread.start (); emmiter.onDispose (thread:: interrupt);}); ConnectableFlux <String> connectableFlux = stringFlux.publish (); AtomicLong receivedEvents = new AtomicLong (0); CountDownLatch firstEventLatch = new CountDownLatch (10); Disposable subscribeDisposable = connectableFlux.subscribe (s-> {receivedEvents.incrementAndGet (); firstEventLatch.countDown ();} ); Disposable connectDisposable = connectableFlux.connect (); firstEventLatch.await ();//this will stop only receiving events in the first ' subscribe ' subscribeDisposable.dispose (); long counterAfterFirstDispose = receivedEvents.get (); CountDownLatch secondEventLatch = new CountDownLatch (10); subscribeDisposable = connectableFlux.subscribe (s-> secondEventLatch.countDown ()); secondEventLatch.await (); subscribeDisposable.dispose (); connectDisposable.dispose (); stoppedLatch.await ();//asserts that first ' subscribe ' have not received events since subscribeDisposable.dispose (); assertEquals (counterAfterFirstDispose, receivedEvents.get ());} Not on a subject - hammer on rxjava -  terrible, written not clearly whom and for whom. On java it is better to use https://projectreactor.io/, there with quality (productivity, more correct backpressure and ) is better.

3

Re: Re: RxJava, 1 flow of the generator, n handling flows