1

Topic: [Rx] to Delay event under a condition

Is about such code: someObservable. Select (x => getY (x)); Y getY (X x) {if x. Value == X.ABC) return new Y (1); else return new Y (2);} in certain cases check x. Value == X.ABC ' needs to be made repeatedly after a while as Value can exchange. I.e. something like: Y getY (X x) {if x. Value == X.ABC) return new Y (1); else if x. SomethingElse == true) {Thread. Sleep (timeout); if x. Value == X.ABC) return new Y (1); else return new Y (2);}} How correctly to get rid from Thread. Sleep?

2

Re: [Rx] to Delay event under a condition

Hello, _NN _, you wrote: _NN> In certain cases check x. Value == X.ABC ' needs to be made repeatedly after a while as Value can exchange. I that still the welder, on storage - to expose getY as the second Observable + join. If does not approach, it is better to ask on stackoverflow, here  only . ganjustas on Rx regularly answers. If whom forgot -

3

Re: [Rx] to Delay event under a condition

Hello, Sinix, you wrote: S> Hello, _NN _, you wrote: _NN>> In certain cases check x. Value == X.ABC ' needs to be made repeatedly after a while as Value can exchange. S> I that still the welder, on storage - to expose getY as the second Observable + join. If does not approach, it is better to ask on stackoverflow, here  only . ganjustas on Rx regularly answers. If whom forgot -  And the order will be the same or it is necessary to throw off in a window and to sort just in case?

4

Re: [Rx] to Delay event under a condition

Hello, _NN _, you wrote: _NN> And the order will be the same or it is necessary to throw off in a window and to sort just in case? Like the same, but I can say lies.

5

Re: [Rx] to Delay event under a condition

Hello, Sinix, you wrote: S> Hello, _NN _, you wrote: _NN>> And the order will be the same or it is necessary to throw off in a window and to sort just in case? S> like the same, but I can say lies. I will try on SO, we look that prompt.

6

Re: [Rx] to Delay event under a condition

Hello, _NN _, you wrote: _NN> Is about such code: _NN> How correctly to get rid from Thread. Sleep? Rewrite all without Observable And so, it is necessary to replace Thread. Sleep on Task. Delay and instead of Select to send a data stream in TransformBlock with the minimum buffer.

7

Re: [Rx] to Delay event under a condition

Hello, _NN _, you wrote: _NN> And the order will be the same or it is necessary to throw off in a window and to sort just in case? Yours Thread. Sleep  a source. If queue does not frighten, it is possible simply Observable. Delay to use

8

Re: [Rx] to Delay event under a condition

Hello, TK, you wrote: TK> Hello, _NN _, you wrote: _NN>> Is about such code: _NN>> How correctly to get rid from Thread. Sleep? TK> Rewrite all without Observable Yes I think it is necessary most likely that worked, and then to think as beautifully to write. TK> and so, it is necessary to replace Thread. Sleep on Task. Delay and instead of Select to send a data stream in TransformBlock with the minimum buffer. I in TPL Dataflow am completely not strong. It is possible to describe schematically at least idea, and there look and in Rx it will be possible to solve?

9

Re: [Rx] to Delay event under a condition

Hello, _NN _, you wrote: TK>> And so, it is necessary to replace Thread. Sleep on Task. Delay and instead of Select to send a data stream in TransformBlock with the minimum buffer. _NN> I in TPL Dataflow am completely not strong. _NN> it is possible to describe schematically at least idea, and there look and in Rx it will be possible to solve? And what there to describe? TransformBlock accepts on an input a method of type Func <A, Task <B>> (yours getY) further about the following: return Observable. Using (() => someObservable. Subscribe (block. AsObserver ()), r => block. AsObservable ().Finally (() => r. Dispose ()).Finally (() => block. Complete ());

10

Re: [Rx] to Delay event under a condition

Hello, TK, you wrote: TK> Hello, _NN _, you wrote: TK>>> And so, it is necessary to replace Thread. Sleep on Task. Delay and instead of Select to send a data stream in TransformBlock with the minimum buffer. _NN>> I in TPL Dataflow am completely not strong. _NN>> it is possible to describe schematically at least idea, and there look and in Rx it will be possible to solve? TK> And what there to describe? TransformBlock accepts on an input a method of type Func <A, Task <B>> (yours getY) TK> further about the following: TK> return Observable. Using (() => someObservable. Subscribe (block. AsObserver ()), r => block. AsObservable ().Finally (() => r. Dispose ()).Finally (() => block. Complete ()); To me for.NET 3.5 I would Think I will make at first in the old manner with queue and a flow and when there will be at me a working code, we think over as to refine.

11

Re: [Rx] to Delay event under a condition

Hello, _NN _, you wrote: _NN> To me for.NET 3.5 _NN> I would Think I will make at first in the old manner with queue and a flow and when there will be at me a working code, we think over as to refine. What for there a flow? Should suffice and Observable. Defer with a semaphore.

12

Re: [Rx] to Delay event under a condition

Hello, TK, you wrote: TK> Hello, _NN _, you wrote: _NN>> To me for.NET 3.5 _NN>> I would Think I will make at first in the old manner with queue and a flow and when there will be at me a working code, we think over as to refine. TK> what for there a flow? Should suffice and Observable. Defer with a semaphore. Yes somehow never there were possibilities to use Defer so I somehow and did not think of it. It is possible to try.

13

Re: [Rx] to Delay event under a condition

Hello, TK, you wrote: TK> Hello, _NN _, you wrote: _NN>> To me for.NET 3.5 _NN>> I would Think I will make at first in the old manner with queue and a flow and when there will be at me a working code, we think over as to refine. TK> what for there a flow? Should suffice and Observable. Defer with a semaphore. By the way and to brake queue of output agents it not will if make a time delay inside through Delay?

14

Re: [Rx] to Delay event under a condition

Hello, _NN _, you wrote: TK>> What for there a flow? Should suffice and Observable. Defer with a semaphore. _NN> by the way and to brake queue of output agents it not will if make a time delay inside through Delay? Looking as the code to write. Is to change on: IObservable <Y> getY (X x) {if x. Value == X.ABC) return Observable. Return (new Y (1)); else if x. SomethingElse == true) {return Observable. Delay (Observable. Return (x), timeout).Select (xx = xx. Value == X.ABC? new Y (1): new Y (2));}} Or, it is possible Delay and not to use: IObservable <Y> getY (X x) {return Observable. Create <Y> (async (obs, token) => {if x. Value == X.ABC) {obs. OnNext (new Y (1));} else if x. SomethingElse == true) {await Task. Delay (timeout, token); if x. Value == X.ABC) {obs. OnNext (new Y (1));} else {obs. OnNext (new Y (2));} }} It is all a source of events all will not brake also will be strong  how to process result. It is possible to write someObservable. Select (x => getY (x)).Concat (), it is possible someObservable. Select (x => getY (x)).Merge () - with different consequences on result. To you which are necessary? That queue to brake it is necessary to thrust waiting of a semaphore someone more close to an input in getY () or not to stick if, its formation is not terrible...

15

Re: [Rx] to Delay event under a condition

16

Re: [Rx] to Delay event under a condition

Hello, _NN _, you wrote: TK>> It is all a source of events all will not brake also will be strong  how to process result. It is possible to write someObservable. Select (x => getY (x)).Concat (), it is possible someObservable. Select (x => getY (x)).Merge () - with different consequences on result. To you which are necessary? _NN> to me the main thing that there was an order. _NN> I.e. if event comes while I wait let waits aside and will be processed when its queue comes. _NN> thus it would be desirable to allow for other subscribers to work further while I wait. Not clearly... 1. The order gives Concat () but while there is a waiting queue from not processed events (well and with TimeOut there will be troubles) will be saved. Merge () such queue will not save but, the sequence of events will be forced down". 2. The subscriber it who? It yours or is the subscriber on someObservable? If someObservable that, they are not locked. If yours that see point 1 someObservable.S Select (x => getY (x)).Merge () |Concat () |MergeAndSort ().Where (y => y. IsValid).Scan (new {Current=abc, Prev=null}, (prev, cur) => new {Current=cur, Prev=prev. Current}).Select (makeResult)

17

Re: [Rx] to Delay event under a condition

Hello, TK, you wrote: TK> 1. The order gives Concat () but while there is a waiting queue from not processed events (well and with TimeOut there will be troubles) will be saved. Merge () such queue will not save but, the sequence of events will be forced down". Here that that is necessary. It would be desirable to make still small optimization not to wait a wide interval and to check short intervals and to quit on time outflow if it turned out nothing. We tell such example: using System; using System. Reactive. Linq; namespace RxTest {class Program {static void Main (string [] args) {var r = Observable. Range (0, 10); r. Select (x => x).Select (x => getY (x)).Concat ().Subscribe (x => Console. WriteLine (" a: "+ x)); Console. ReadKey ();} private static IObservable <int> getY (int i, int tries = 0) {Console. WriteLine ("In getY: i = {0}, tries = {1}", i, tries); if (i> 5) {int y; if (tryGetReal (i, out y)) {//No delay return Observable. Return (y);} else {if (tries <3) {//Let's Try again return Observable. Defer (() => getY (i, tries + 1)).Delay (TimeSpan. FromMilliseconds (100));} else {//No more tries, stop! return Observable. Empty <int> ();}}} else {return Observable. Empty <int> ();}} private static bool tryGetReal (int i, out int y) {if (i> 7) {y = i; return false;} else {y = i; return true;}}}} How much it is correct?

18

Re: [Rx] to Delay event under a condition

Hello, _NN _, you wrote: _NN> It would be desirable to make still small optimization not to wait a wide interval and to check short intervals and to quit on time outflow if it turned out nothing. _NN> we tell such example: Defer, recursive calls and out parameters? Most likely somewhere there is an ambush. If that is necessary simply n short repetitions: var query = from counter in Observable. Timer (TimeSpan. FromMsec (0), TimeSpan. FromMsec (100)).Take (repeatCount) let result = tryGetReal (x) where result. Success select result. Value; return query. Take (1); _NN> How much it is correct? Too it at you ... Probably that for your task it will be most easier to remove generally IObservable: async Task RunProcessing <T> (IObservable <T> source) {foreach (var sample in source. AsEnumerable ()) {var output = await ProcessItem (sample); Console. WriteLine ("a:" + output);} } Workers' and Peasants' foreach not only easier to understand/debug, but also will work most likely unlike faster.

19

Re: [Rx] to Delay event under a condition

Hello, TK, you wrote: TK> Hello, _NN _, you wrote: _NN>> It would be desirable to make still small optimization not to wait a wide interval and to check short intervals and to quit on time outflow if it turned out nothing. _NN>> we tell such example: TK> Defer, recursive calls and out parameters? Most likely somewhere there is an ambush. If that is necessary simply n short repetitions: Probably also is but works TK> TK> var query = from counter in Observable. Timer (TimeSpan. FromMsec (0), TimeSpan. FromMsec (100)).Take (repeatCount) TK> let result = tryGetReal (x) TK> where result. Success TK> select result. Value; TK> return query. Take (1); TK> with the timer I did not foresee Such variant. _NN>> how much it is correct? TK> too it at you ... Probably that for your task it will be most easier to remove generally IObservable: TK> TK> async Task RunProcessing <T> (IObservable <T> source) TK> {TK> foreach (var sample in source. AsEnumerable ()) TK> {TK> var output = await ProcessItem (sample); TK> Console. WriteLine ("a:" + output); TK>} TK>} TK> TK> Workers' and Peasants' foreach not only easier to understand/debug, but also will work most likely unlike faster. . At us.NET 3.5.

20

Re: [Rx] to Delay event under a condition

Hello, _NN _, you wrote: TK>> Workers' and Peasants' foreach not only easier to understand/debug, but also will work most likely unlike faster. _NN> Eh. At us.NET 3.5. https://www.nuget.org/packages/TaskParallelLibrary/ it does not work? Anyway, if number of your output agents  - make selected . At AsEnumerable the queue inside and the initial flow is not locked and at usage Thread. Sleep

21

Re: [Rx] to Delay event under a condition

Hello, TK, you wrote: TK> Hello, _NN _, you wrote: TK>>> Workers' and Peasants' foreach not only easier to understand/debug, but also will work most likely unlike faster. _NN>> Eh. At us.NET 3.5. TK> https://www.nuget.org/packages/TaskParallelLibrary/ it does not work? For support await it is necessary to do props still. TK> anyway if number of your output agents  - make selected . At AsEnumerable the queue inside and the initial flow is not locked and at usage Thread. Sleep I made through Observable. Timer as well as it is offered.