Skip to content

Adds support for handle_continue/2 to gen_stage #264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 169 additions & 22 deletions lib/gen_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -885,11 +885,18 @@ defmodule GenStage do

@callback init(args :: term) ::
{:producer, state}
| {:producer, state, {:continue, term} | :hibernate}
| {:producer, state, [producer_option]}
| {:producer, state, {:continue, term} | :hibernate, [producer_option]}
| {:producer_consumer, state}
| {:producer_consumer, state, {:continue, term} | :hibernate}
| {:producer_consumer, state, [producer_consumer_option]}
| {:producer_consumer, state, {:continue, term} | :hibernate,
[producer_consumer_option]}
| {:consumer, state}
| {:consumer, state, {:continue, term} | :hibernate}
| {:consumer, state, [consumer_option]}
| {:consumer, state, {:continue, term} | :hibernate, [consumer_option]}
| :ignore
| {:stop, reason :: any}
when state: any
Expand Down Expand Up @@ -925,6 +932,7 @@ defmodule GenStage do
@callback handle_demand(demand :: pos_integer, state :: term) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason, new_state}
when new_state: term, reason: term, event: term

Expand Down Expand Up @@ -1004,6 +1012,7 @@ defmodule GenStage do
) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason, new_state}
when event: term, new_state: term, reason: term

Expand All @@ -1017,6 +1026,7 @@ defmodule GenStage do
@callback handle_events(events :: [event], from, state :: term) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason, new_state}
when new_state: term, reason: term, event: term

Expand Down Expand Up @@ -1056,8 +1066,10 @@ defmodule GenStage do
@callback handle_call(request :: term, from :: GenServer.from(), state :: term) ::
{:reply, reply, [event], new_state}
| {:reply, reply, [event], new_state, :hibernate}
| {:reply, reply, [event], new_state, {:continue, term}}
| {:noreply, [event], new_state}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason, reply, new_state}
| {:stop, reason, new_state}
when reply: term, new_state: term, reason: term, event: term
Expand Down Expand Up @@ -1086,6 +1098,7 @@ defmodule GenStage do
@callback handle_cast(request :: term, state :: term) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason :: term, new_state}
when new_state: term, event: term

Expand All @@ -1103,6 +1116,27 @@ defmodule GenStage do
@callback handle_info(message :: term, state :: term) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason :: term, new_state}
when new_state: term, event: term

@doc """
Invoked to handle `continue` instructions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Invoked to handle `continue` instructions.
Invoked to handle `:continue` instructions.


It is useful for performing work after initialization or for splitting the work
in a callback in multiple steps, updating the process state along the way.
Comment on lines +1126 to +1127
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be a bit misleading. I think the biggest use of this is being able to emit events and then do something right after, being sure that no messages, calls, or anything else will be "injected" in between. Maybe we can rephrase this?

My suggestion:

Suggested change
It is useful for performing work after initialization or for splitting the work
in a callback in multiple steps, updating the process state along the way.
This callback can be used to perform work right after emitting events from
other callbacks. The "continue mechanism" makes sure that no messages,
calls, casts, or anything else will be handled between a callback emitting
a `:continue` tuple and the `c:handle_continue/2` callback being invoked.


Return values are the same as `c:handle_cast/2`.

This callback is optional. If one is not implemented, the server will fail
if a continue instruction is used.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if a continue instruction is used.
if a `:continue` instruction is used.


This callback is only supported on Erlang/OTP 21+.
"""
@callback handle_continue(continue :: term, state :: term) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, :hibernate}
| {:noreply, [event], new_state, {:continue, term}}
| {:stop, reason :: term, new_state}
when new_state: term, event: term

Expand Down Expand Up @@ -1139,6 +1173,7 @@ defmodule GenStage do
format_status: 2,
handle_call: 3,
handle_cast: 2,
handle_continue: 2,
handle_info: 2,
terminate: 2
]
Expand Down Expand Up @@ -1722,22 +1757,58 @@ defmodule GenStage do
def init({mod, args}) do
case mod.init(args) do
{:producer, state} ->
init_producer(mod, [], state)
init_producer(mod, [], state, nil)

{:producer, state, {:continue, _term} = continue} ->
init_producer(mod, [], state, continue)

{:producer, state, :hibernate} ->
Comment on lines +1762 to +1765
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would force these return values to be {:producer, state, [], {:continue, term} | :hibernate} to simplify the API. Thoughts? Same for the cases below with producer_consumer and consumer.

Copy link
Author

@hazardfn hazardfn Jan 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this incredibly long case statement is less than ideal but if I understand your suggestion correctly wouldn't this result in the GenStage not being initialized (validation/state setup) properly at all in the :hibernate case and in the {:continue, term} case initialization of the GenStage state would be forced upon the user...?

EDIT: Actually I think in the continue case it wouldn't work either because if I remember correctly user code only has access to the "inner" user state of the GenStage not the internals...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're OK with keeping the current logic as it is it might be cleaner to pull all this out into a pattern matched group of functions instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now you have a few extra cases because of the fact that you are allowing the third element of the tuple to be either the options, :hibernate, or {:continue, term}. What I am suggesting is that you don't allow it to be the options. The spec of the return would become something like {:producer, state :: term, options :: keyword, :hibernate | {:continue, term}}.

This way, you shave off a few case clauses by forcing users to use an empty list of options ([]) if they have no options to pass but still want to use :hibernate or {:continue, term}. Does it make more sense?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right I see, thanks for the clarification! I'll look into these changes tonight 👍

init_producer(mod, [], state, :hibernate)

{:producer, state, opts} when is_list(opts) ->
init_producer(mod, opts, state)
init_producer(mod, opts, state, nil)

{:producer, state, {:continue, _term} = continue, opts} when is_list(opts) ->
init_producer(mod, opts, state, continue)

{:producer, state, :hibernate, opts} when is_list(opts) ->
init_producer(mod, opts, state, :hibernate)

{:producer_consumer, state} ->
init_producer_consumer(mod, [], state)
init_producer_consumer(mod, [], state, nil)

{:producer_consumer, state, {:continue, _term} = continue} ->
init_producer_consumer(mod, [], state, continue)

{:producer_consumer, state, :hibernate} ->
init_producer_consumer(mod, [], state, :hibernate)

{:producer_consumer, state, opts} when is_list(opts) ->
init_producer_consumer(mod, opts, state)
init_producer_consumer(mod, opts, state, nil)

{:producer_consumer, state, {:continue, _term} = continue, opts} when is_list(opts) ->
init_producer_consumer(mod, opts, state, continue)

{:producer_consumer, state, :hibernate, opts} when is_list(opts) ->
init_producer_consumer(mod, opts, state, :hibernate)

{:consumer, state} ->
init_consumer(mod, [], state)
init_consumer(mod, [], state, nil)

{:consumer, state, {:continue, _term} = continue} ->
init_consumer(mod, [], state, continue)

{:consumer, state, :hibernate} ->
init_consumer(mod, [], state, :hibernate)

{:consumer, state, opts} when is_list(opts) ->
init_consumer(mod, opts, state)
init_consumer(mod, opts, state, nil)

{:consumer, state, {:continue, _term} = continue, opts} when is_list(opts) ->
init_consumer(mod, opts, state, continue)

{:consumer, state, :hibernate, opts} when is_list(opts) ->
init_consumer(mod, opts, state, :hibernate)

{:stop, _} = stop ->
stop
Expand All @@ -1750,7 +1821,7 @@ defmodule GenStage do
end
end

defp init_producer(mod, opts, state) do
defp init_producer(mod, opts, state, continue_or_hibernate) do
with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts),
{:ok, buffer_size, opts} <-
Utils.validate_integer(opts, :buffer_size, 10000, 0, :infinity, true),
Expand All @@ -1770,7 +1841,7 @@ defmodule GenStage do
dispatcher_state: dispatcher_state
}

{:ok, stage}
if continue_or_hibernate, do: {:ok, stage, continue_or_hibernate}, else: {:ok, stage}
else
{:error, message} -> {:stop, {:bad_opts, message}}
end
Expand All @@ -1792,7 +1863,7 @@ defmodule GenStage do
end
end

defp init_producer_consumer(mod, opts, state) do
defp init_producer_consumer(mod, opts, state, continue_or_hibernate) do
with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts),
{:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []),
{:ok, buffer_size, opts} <-
Expand All @@ -1811,22 +1882,68 @@ defmodule GenStage do
dispatcher_state: dispatcher_state
}

consumer_init_subscribe(subscribe_to, stage)
case handle_gen_server_init_args(continue_or_hibernate, stage) do
{:ok, stage} ->
consumer_init_subscribe(subscribe_to, stage)

{:ok, stage, args} ->
{:ok, stage} = consumer_init_subscribe(subscribe_to, stage)
{:ok, stage, args}

{:stop, _, _} = error ->
error
end
else
{:error, message} -> {:stop, {:bad_opts, message}}
end
end

defp init_consumer(mod, opts, state) do
defp init_consumer(mod, opts, state, continue_or_hibernate) do
with {:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []),
:ok <- Utils.validate_no_opts(opts) do
stage = %GenStage{mod: mod, state: state, type: :consumer}
consumer_init_subscribe(subscribe_to, stage)

case handle_gen_server_init_args(continue_or_hibernate, stage) do
{:ok, stage} ->
consumer_init_subscribe(subscribe_to, stage)

{:ok, stage, args} ->
{:ok, stage} = consumer_init_subscribe(subscribe_to, stage)
{:ok, stage, args}

{:stop, _, _} = error ->
error
end
else
{:error, message} -> {:stop, {:bad_opts, message}}
end
end

defp handle_gen_server_init_args({:continue, _term} = continue, stage) do
case handle_continue(continue, stage) do
{:noreply, stage} ->
{:ok, stage}

{:noreply, stage, :hibernate} ->
{:ok, stage, :hibernate}

{:noreply, stage, {:continue, _term} = continue} ->
{:ok, stage, continue}

{:stop, reason, stage} ->
{:stop, reason, stage}
end
end

defp handle_gen_server_init_args(:hibernate, stage), do: {:ok, stage, :hibernate}
defp handle_gen_server_init_args(nil, stage), do: {:ok, stage}

@doc false

def handle_continue(continue, %{state: state} = stage) do
noreply_callback(:handle_continue, [continue, state], stage)
end

@doc false

def handle_call({:"$info", msg}, _from, stage) do
Expand Down Expand Up @@ -1855,6 +1972,10 @@ defmodule GenStage do
stage = dispatch_events(events, length(events), %{stage | state: state})
{:reply, reply, stage, :hibernate}

{:reply, reply, events, state, {:continue, _term} = continue} ->
stage = dispatch_events(events, length(events), %{stage | state: state})
{:reply, reply, stage, continue}

{:stop, reason, reply, state} ->
{:stop, reason, reply, %{stage | state: state}}

Expand Down Expand Up @@ -1995,7 +2116,7 @@ defmodule GenStage do
case producers do
%{^ref => entry} ->
{batches, stage} = consumer_receive(from, entry, events, stage)
consumer_dispatch(batches, from, mod, state, stage, false)
consumer_dispatch(batches, from, mod, state, stage, nil)

_ ->
msg = {:"$gen_producer", {self(), ref}, {:cancel, :unknown_subscription}}
Expand Down Expand Up @@ -2122,6 +2243,14 @@ defmodule GenStage do
end
end

defp noreply_callback(:handle_continue, [continue, state], %{mod: mod} = stage) do
if function_exported?(mod, :handle_continue, 2) do
handle_noreply_callback(mod.handle_continue(continue, state), stage)
else
:error_handler.raise_undef_exception(mod, :handle_continue, [continue, state])
end
end

defp noreply_callback(callback, args, %{mod: mod} = stage) do
handle_noreply_callback(apply(mod, callback, args), stage)
end
Expand All @@ -2136,6 +2265,10 @@ defmodule GenStage do
stage = dispatch_events(events, length(events), %{stage | state: state})
{:noreply, stage, :hibernate}

{:noreply, events, state, {:continue, _term} = continue} when is_list(events) ->
stage = dispatch_events(events, length(events), %{stage | state: state})
{:noreply, stage, continue}

{:stop, reason, state} ->
{:stop, reason, %{stage | state: state}}

Expand Down Expand Up @@ -2259,6 +2392,9 @@ defmodule GenStage do
# main module must know the consumer is no longer subscribed.
dispatcher_callback(:cancel, [{pid, ref}, dispatcher_state], stage)

{:noreply, %{dispatcher_state: dispatcher_state} = stage, _hibernate_or_continue} ->
dispatcher_callback(:cancel, [{pid, ref}, dispatcher_state], stage)

{:stop, _, _} = stop ->
stop
end
Expand Down Expand Up @@ -2459,17 +2595,22 @@ defmodule GenStage do
{[{events, 0}], stage}
end

defp consumer_dispatch([{batch, ask} | batches], from, mod, state, stage, _hibernate?) do
defp consumer_dispatch([{batch, ask} | batches], from, mod, state, stage, _gen_opts) do
case mod.handle_events(batch, from, state) do
{:noreply, events, state} when is_list(events) ->
stage = dispatch_events(events, length(events), stage)
ask(from, ask, [:noconnect])
consumer_dispatch(batches, from, mod, state, stage, false)
consumer_dispatch(batches, from, mod, state, stage, nil)

{:noreply, events, state, :hibernate} when is_list(events) ->
{:noreply, events, state, :hibernate} ->
stage = dispatch_events(events, length(events), stage)
ask(from, ask, [:noconnect])
consumer_dispatch(batches, from, mod, state, stage, true)
consumer_dispatch(batches, from, mod, state, stage, :hibernate)

{:noreply, events, state, {:continue, _} = continue} ->
stage = dispatch_events(events, length(events), stage)
ask(from, ask, [:noconnect])
consumer_dispatch(batches, from, mod, state, stage, continue)

{:stop, reason, state} ->
{:stop, reason, %{stage | state: state}}
Expand All @@ -2479,12 +2620,12 @@ defmodule GenStage do
end
end

defp consumer_dispatch([], _from, _mod, state, stage, false) do
defp consumer_dispatch([], _from, _mod, state, stage, nil) do
{:noreply, %{stage | state: state}}
end

defp consumer_dispatch([], _from, _mod, state, stage, true) do
{:noreply, %{stage | state: state}, :hibernate}
defp consumer_dispatch([], _from, _mod, state, stage, gen_opts) do
{:noreply, %{stage | state: state}, gen_opts}
end

defp consumer_subscribe({to, opts}, stage) when is_list(opts),
Expand Down Expand Up @@ -2613,11 +2754,11 @@ defmodule GenStage do
{producer_id, _, _} = entry
from = {producer_id, ref}
{batches, stage} = consumer_receive(from, entry, events, stage)
consumer_dispatch(batches, from, mod, state, stage, false)
consumer_dispatch(batches, from, mod, state, stage, nil)

%{} ->
# We queued but producer was removed
consumer_dispatch([{events, 0}], {:pid, ref}, mod, state, stage, false)
consumer_dispatch([{events, 0}], {:pid, ref}, mod, state, stage, nil)
end
end

Expand All @@ -2634,6 +2775,9 @@ defmodule GenStage do
{:noreply, stage, :hibernate} ->
take_pc_events(queue, counter, stage)

{:noreply, stage, {:continue, _term}} ->
take_pc_events(queue, counter, stage)

{:stop, _, _} = stop ->
stop
end
Expand All @@ -2646,6 +2790,9 @@ defmodule GenStage do
{:noreply, %{events: {queue, counter}} = stage, :hibernate} ->
take_pc_events(queue, counter, stage)

{:noreply, %{events: {queue, counter}} = stage, {:continue, _term}} ->
take_pc_events(queue, counter, stage)

{:stop, _, _} = stop ->
stop
end
Expand Down
Loading