Skip to content

Conversation

jrmlhermitte
Copy link
Contributor

Should we allow messages to pass? I think something like this would be useful.

For example here, we send a clear message which should clear various elements of the stream that hold state. Right now, only this is performed:

    s = Stream()
    # increment
    def acc1(x1, x2):
        return x1 + x2
    from streamz.core import ClearMSG

    cc = ClearMSG()

    sout = s.accumulate(acc1)

    Lout = sout.sink_to_list()

    s.emit(1)
    s.emit(2)
    s.emit(cc)
    s.emit(3)
    assert Lout == [1, 3, 3]

but I'd find this useful. One could change these messages to node specific, like "CLEAR_ACC", or some control stream with flags like: Clear(clear_acc=True,...).

any thoughts? This is meant to open discussion. I find it more convenient than an issue in this case.
The various messages or message handler can perhaps also be handled by dispatching, to allow for more convenient subclassing.

This was a quick thought. If it's of interest in the base class we can discuss adding more.

@codecov-io
Copy link

codecov-io commented Oct 23, 2017

Codecov Report

Merging #111 into master will increase coverage by 0.05%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #111      +/-   ##
==========================================
+ Coverage   93.68%   93.74%   +0.05%     
==========================================
  Files          12       12              
  Lines        1377     1390      +13     
==========================================
+ Hits         1290     1303      +13     
  Misses         87       87
Impacted Files Coverage Δ
streamz/core.py 95.65% <100%> (+0.11%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ab4ff2b...7d79052. Read the comment docs.

@mrocklin
Copy link
Collaborator

Do other systems do this? If so then what are their semantics?

The alternative here would be to ask the user to implement this sort of logic within their operations, either in their acc1 function or as a separate stream operation.

clear_msg = ClearMSG()
ignore_msg = IgnoreMSG()

sout = s.accumulate(acc1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if you had a map before the accumulate? Does map (or all other stream operations) also need to know what to do with ClearMSGs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would imagine that all update methods will need a cutout for messages.

@CJ-Wright
Copy link
Member

I'm not certain that we can ask the user to modify everything, as there are some cases where we buffer information without user input. For example zip_latest. In the case of zip_latest you may decide that all the information in the zip_latest buffer is no longer applicable and you want it to go back into "buffer my main stream until my secondary stream emits" mode. A clear message could be helpful here.

@jrmlhermitte
Copy link
Contributor Author

jrmlhermitte commented Oct 26, 2017

I will have to get back to you about this, sorry busy.
I spent a few min looking at ReactiveX.
They split emit into three different functions: onNext (regular emit), onError and onCompleted.
The way error handling works is that when an error is received, the node 'disconnects' (unsubscribes) from its parent stream and the error is passed through to the other elements in the stream. This effectively stops data from passing through the stream graph.

If you want to prevent this behaviour from happening, you must insert a Retry node in between the node that could return an error and the node that would receive an error. Upon receiving an error, this node will not pass it through and resubscribe to its parent stream.

I don't like this method but I do like the splitting of emit into different methods. We could have emit dispatch the element to something like onNext, onCompleted, onClear and onError. All accumulators, zips etc would then have to implement their own onClear methods for example. The default for onClear would be onNext (emit).

Initial thoughts? This ties in with #86 as well I believe.

@jrmlhermitte
Copy link
Contributor Author

jrmlhermitte commented Oct 26, 2017

(Just for extra reference, @CJ-Wright has already implemented something similar on the update level here. dispatch will dispatch to a start, event, descriptor and stop methods which intercept different types of messages/documents. Something like this could be considered for streamz perhaps)

Copy link
Member

@CJ-Wright CJ-Wright left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want messages to have a call method which takes in the node? This way we could do things like timing execution.

logger = logging.getLogger(__name__)


class ClearMSG:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want a MSG base class so you can just check if an item is an instance if that class

@martindurant
Copy link
Member

superceded by #249 and its discussion, I think

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants