Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Debouncing #61

Open
joearasin opened this issue Sep 7, 2016 · 20 comments
Open

Add Debouncing #61

joearasin opened this issue Sep 7, 2016 · 20 comments

Comments

@joearasin
Copy link

A useful bit of functionality I threw together: Given a time window, emit the latest value given a uniqueness function.

It might make sense to generalize this as a "time-aware reduce", rather than a pure "debounce"

See #60

@viktorklang
Copy link
Member

Isn't debouncing ensuring that a single signal is propagated per timeunit? (to remove bouncing, the tendency of mechanical switches to emit multiple signals when opened or closed?)

@ktoso
Copy link
Member

ktoso commented Sep 8, 2016

debounce I always understood as:

a a a x x y y y z z z a a -> [ debounce ] -> a x y z a

Not sure if time by definition is part of it. The above could well be debounce(within = Duration.Inf)

@viktorklang
Copy link
Member

@ktoso http://whatis.techtarget.com/definition/debouncing

What you talk about is: distinctConsecutive :-)

@viktorklang
Copy link
Member

(distinctConsecutive is not time related, only order related) debounce is time related.

@ktoso
Copy link
Member

ktoso commented Sep 8, 2016

Right I see. Better name for what I showed anyway. (Would like to have it too ;-))

@patriknw
Copy link
Member

patriknw commented Sep 8, 2016

Scrubbing was the word we used for this thing when filtering market data streams.

@viktorklang
Copy link
Member

Scrubbing to me means removing sensitive data :)

@viktorklang
Copy link
Member

viktorklang commented Sep 8, 2016

/**
 * Emits elements which are passed in as long as they are not considered to be the same as the previous element emitted according the predicate function `p`.
 * `p` returns *true* if the parameters are considered to be the same and *false* otherwise.
 */
distinctConsecutive(p: (T, T) => Boolean = _ equals _) = ???

@joearasin
Copy link
Author

I have no clue what to call this. My use case was I needed something to stick on the end of DirectoryChanges. Imagine large files being written a chunk at a time. They will be emitting a bunch of events, but what I want to know is when the file is done being written. The events may not necessarily be grouped by order if a couple processes are writing the other files (i.e. I may get a stream of "File 1 changed" "File 2 changed" "File 1 changed" "File 1 changed" "File 2 changed"). And what I want to see on the other end is "File 1 changed, File 2 changed", once each file hasn't changed for a specified amount of time.

@joearasin
Copy link
Author

Anyway, it's a keyed reduce that requires things to be sufficiently stale before sending them downstream and a reduction function hardcoded to be "take the last one". It would absolutely make sense to generalize that last bit to a parameter.

@viktorklang
Copy link
Member

@joearasin Hmmm, isn't your desired behaviour possible to implement with the existing combinators?

@aruediger
Copy link

What you talk about is: distinctConsecutive :-)

a.k.a destutter 🙊

@viktorklang
Copy link
Member

Let's not rehash that conversation, please. :)

Cheers,

On Sep 9, 2016 11:06 AM, "2beaucoup" [email protected] wrote:

What you talk about is: distinctConsecutive :-)

a.k.a destutter 🙊


You are receiving this because you commented.
Reply to this email directly, view it on GitHub
#61 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAAqdzWAtoUwxRWDTPekpAn9F6z-vmIzks5qoSGVgaJpZM4J3cwU
.

@aruediger
Copy link

Isn't debouncing ensuring that a single signal is propagated per timeunit? (to remove bouncing, the tendency of mechanical switches to emit multiple signals when opened or closed?)

That would be throttling. Debounce only emits when a value hasn't changed in the specified time window.

@viktorklang
Copy link
Member

@aruediger
Copy link

I was referring to the deferred version of debounce. http://drupalmotion.com/article/debounce-and-throttle-visual-explanation has a nice overview of debounce/throttle and their variations.

@joearasin
Copy link
Author

joearasin commented Sep 9, 2016

@viktorklang I tried -- I couldn't get the results I wanted. If you have any insight, please let me know.

There are approaches other than this that might make sense to make the desired functionality possible. i.e. partition/groupBy that completes idle downstream after nothing has been sent downstream for a period of time. Customizable overflow behavior on groupBy as well would be useful. As an example, rather than a hard fail, if I'm at max substreams sending all excess elements to an "overflow" stream, which backpressures when full, and could then be buffered and cycled back around to the input, which is a mergePreferred between the overflow and "new" elements.

What does groupBy do if a downstream partition is completed with "takeWithin" or failed with "idleTimeout"? Does a new "slot" open up, or will I eventually run out of subStreams?

Would something like groupBy => idleTimeout => recover => reduce => merge work?

@drewhk
Copy link
Member

drewhk commented Sep 9, 2016

I think these concept should use proper Greek letters instead of names. I mean, what is wrong with Ξ.

@joearasin
Copy link
Author

Hmm, after reading docs, I think having groupby either back pressure, buffer-then-backpressure, or send excess flow elsewhere is probably the most "reusable" method towards having the desired effect.

@joearasin
Copy link
Author

I tried to recreate this (or something like it) with groupBy:

    Flow[T]
      .groupBy(1000, keyFunction)
      .idleTimeout(duration)
      .recoverWithRetries(-1,  { case e: scala.concurrent.TimeoutException => Source.empty[T]})
      .reduce((prev, curr) => curr)
      .mergeSubstreams

This approach, at the very least, does not work b/c groupBy won't recreate a Subflow after the downstream has been closed.

So a test that looks like: https://github.com/akka/akka-stream-contrib/pull/60/files#diff-fbc9034f76805e1d77caa627418fbdfaR76 will fail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants