diff --git a/big_tests/tests/mod_event_pusher_filter.erl b/big_tests/tests/mod_event_pusher_filter.erl new file mode 100644 index 0000000000..0014df1e61 --- /dev/null +++ b/big_tests/tests/mod_event_pusher_filter.erl @@ -0,0 +1,55 @@ +-module(mod_event_pusher_filter). +-moduledoc """ +An example filter and metadata provider for mod_event_pusher + +It performs two actions: +1. Filters out any events other than 'user_status_event'. +2. Adds the following event metadata: + - timestamp in milliseconds (taken from mongoose_acc) + - number of user's active sessions + The metadata can be injected into the event published by the configured backends + (currently only 'rabbit' supports this). +""". + +-behaviour(gen_mod). + +%% gen_mod callbacks +-export([start/2, stop/1, hooks/1]). + +%% hook handlers +-export([push_event/3]). + +-include_lib("../../include/mod_event_pusher_events.hrl"). + +%% gen_mod callbacks + +-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. +start(_HostType, _Opts) -> + ok. + +-spec stop(mongooseim:host_type()) -> ok. +stop(_HostType) -> + ok. + +-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list(). +hooks(HostType) -> + [{push_event, HostType, fun ?MODULE:push_event/3, #{}, 10}]. % needs to run before other handlers + +%% hook handlers + +-doc "For user status events, add timestamp and session count. Filter out other events.". +-spec push_event(mod_event_pusher:push_event_acc(), mod_event_pusher:push_event_params(), + gen_hook:extra()) -> gen_hook:hook_fn_ret(mod_event_pusher:push_event_acc()). +push_event(HookAcc = #{acc := Acc}, #{event := #user_status_event{jid = JID}}, _Extra) -> + #{metadata := Metadata} = HookAcc, + NewMetadata = Metadata#{timestamp => mongoose_acc:timestamp(Acc), + session_count => count_user_sessions(JID)}, + {ok, HookAcc#{metadata := NewMetadata}}; +push_event(HookAcc, _Params, _Extra) -> + {stop, HookAcc}. + +%% helpers + +-spec count_user_sessions(jid:jid()) -> non_neg_integer(). +count_user_sessions(JID) -> + length(ejabberd_sm:get_user_present_resources(JID)). diff --git a/big_tests/tests/mod_event_pusher_rabbit_SUITE.erl b/big_tests/tests/mod_event_pusher_rabbit_SUITE.erl index 7a5a1fb43f..f0c1aab8b5 100644 --- a/big_tests/tests/mod_event_pusher_rabbit_SUITE.erl +++ b/big_tests/tests/mod_event_pusher_rabbit_SUITE.erl @@ -50,7 +50,8 @@ all() -> {group, only_presence_status_publish}, {group, chat_message_publish}, {group, group_chat_message_publish}, - {group, instrumentation} + {group, instrumentation}, + {group, filter_and_metadata} ]. groups() -> @@ -61,7 +62,8 @@ groups() -> {only_presence_status_publish, [], only_presence_status_publish_tests()}, {chat_message_publish, [], chat_message_publish_tests()}, {group_chat_message_publish, [], group_chat_message_publish_tests()}, - {instrumentation, [], instrumentation_tests()}]. + {instrumentation, [], instrumentation_tests()}, + {filter_and_metadata, [], filter_and_metadata_tests()}]. pool_startup_tests() -> [rabbit_pool_starts_with_default_config]. @@ -96,6 +98,10 @@ instrumentation_tests() -> [connections_events_are_executed, messages_published_events_are_executed]. +filter_and_metadata_tests() -> + [messages_published_events_are_not_executed, + presence_messages_are_properly_formatted_with_metadata]. + suite() -> escalus:suite(). @@ -112,6 +118,7 @@ init_per_suite(Config) -> start_rabbit_wpool(domain()), {ok, _} = application:ensure_all_started(amqp_client), muc_helper:load_muc(), + mongoose_helper:inject_module(mod_event_pusher_filter), escalus:init_per_suite(Config); false -> {skip, "RabbitMQ server is not available on default port."} @@ -132,6 +139,9 @@ init_per_group(GroupName, Config0) -> required_modules(pool_startup) -> [{mod_event_pusher, stopped}]; +required_modules(filter_and_metadata = GroupName) -> + [{mod_event_pusher_filter, #{}}, + {mod_event_pusher, #{rabbit => mod_event_pusher_rabbit_opts(GroupName)}}]; required_modules(GroupName) -> [{mod_event_pusher, #{rabbit => mod_event_pusher_rabbit_opts(GroupName)}}]. @@ -218,7 +228,7 @@ only_presence_exchange_is_created_on_module_startup(Config) -> ?assertNot(is_exchange_present(Connection, {?GROUP_CHAT_MSG_EXCHANGE, ExCustomType})). %%-------------------------------------------------------------------- -%% GROUP (only_)presence_status_publish +%% GROUP (only_)presence_status_publish, filter_and_metadata %%-------------------------------------------------------------------- connected_users_push_presence_events_when_change_status(Config) -> @@ -236,20 +246,31 @@ connected_users_push_presence_events_when_change_status(Config) -> end). presence_messages_are_properly_formatted(Config) -> - escalus:story( + escalus:fresh_story_with_config( + Config, [{bob, 1}], fun presence_messages_are_properly_formatted_story/2). + +presence_messages_are_properly_formatted_with_metadata(Config) -> + escalus:fresh_story_with_config( Config, [{bob, 1}], - fun(Bob) -> - %% GIVEN - BobJID = client_lower_short_jid(Bob), - BobFullJID = client_lower_full_jid(Bob), - listen_to_presence_events_from_rabbit([BobJID], Config), - %% WHEN user logout - escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)), - %% THEN receive message - ?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false}, - get_decoded_message_from_rabbit(BobJID)) + fun(_, Bob) -> + TS = rpc(mim(), erlang, system_time, [microsecond]), + DecodedMessage = presence_messages_are_properly_formatted_story(Config, Bob), + #{<<"timestamp">> := T, <<"session_count">> := 0} = DecodedMessage, + ?assert(is_integer(T) andalso T > TS) end). +presence_messages_are_properly_formatted_story(Config, Bob) -> + %% GIVEN + BobJID = client_lower_short_jid(Bob), + BobFullJID = client_lower_full_jid(Bob), + listen_to_presence_events_from_rabbit([BobJID], Config), + %% WHEN user logout + escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)), + %% THEN receive message + DecodedMessage = get_decoded_message_from_rabbit(BobJID), + ?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false}, DecodedMessage), + DecodedMessage. + messages_published_events_are_not_executed(Config) -> escalus:story( Config, [{bob, 1}, {alice, 1}], @@ -702,6 +723,7 @@ send_presence_stanza(User, NumOfMsgs) -> get_decoded_message_from_rabbit(RoutingKey) -> receive {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} -> + ct:log("Decoded rabbit message, rk=~p~nmessage:~ts", [RoutingKey, Msg]), jiffy:decode(Msg, [return_maps]) after 5000 -> ct:fail("Timeout when decoding message, rk=~p", [RoutingKey]) diff --git a/doc/migrations/6.4.0_6.5.0.md b/doc/migrations/6.4.0_6.5.0.md index a51d51e17e..50ae0622b8 100644 --- a/doc/migrations/6.4.0_6.5.0.md +++ b/doc/migrations/6.4.0_6.5.0.md @@ -32,6 +32,10 @@ Instead of always sending all notifications, it only enables the ones with a rel Make sure you have all the necessary sections present in the configuration file before upgrading. +### Custom backends for `mod_event_pusher` + +If you have a custom backend implemented for `mod_event_pusher`, you need to update it to handle the `push_event` hook instead of implementing the `mod_event_pusher` behaviour (which no longer exists). + ### Deprecation MSSQL backend is deprecated and will be removed in the next release. diff --git a/doc/modules/mod_event_pusher.md b/doc/modules/mod_event_pusher.md index 62e5183e17..254419fa9c 100644 --- a/doc/modules/mod_event_pusher.md +++ b/doc/modules/mod_event_pusher.md @@ -1,18 +1,10 @@ ## Module Description -This module is a generic interface for event-pushing backends. -It defines a single callback, `push_event/2` that forwards the event to all registered backends. -Each backend decides how and if to handle the event in its `push_event/2` implementation. - +This module is a generic interface for pushing **events** to the configured **backends**. +The events include presence updates and incoming/outgoing messages. Currently supported backends include [http], [push], [rabbit] and [sns]. Refer to their specific documentation to learn more about their functions and configuration options. -### How it works - -The events are standardized as records that can be found in the `mod_event_pusher_events.hrl` file. -Common events like user presence changes (offline and online), chat and groupchat messages (incoming -and outgoing) are already handled in the `mod_event_pusher_hook_translator` module, which is a proxy between various hooks and the `push_event/2` handler. - !!! Warning This module does not support [dynamic domains](../configuration/general.md#generalhost_types). @@ -46,6 +38,36 @@ The `[modules.mod_event_pusher]` section itself is omitted - this is allowed in [modules.mod_event_pusher.rabbit] ``` +## How it works + +The events are standardized as records that can be found in the `mod_event_pusher_events.hrl` file. +Common events like user presence changes (offline and online), chat and groupchat messages (incoming and outgoing) are handled in the `mod_event_pusher_hook_translator` module. +Each event has a corresponding [hook](../developers-guide/Hooks-and-handlers.md), e.g. `user_send_message` is run when a user sends a message. +`mod_event_pusher_hook_translator` has a handler function for each supported hook. + +Handling an event includes the following steps: + +1. The event hook is executed, and the corresponding handler function in `mod_event_pusher_hook_translator` is called. +1. The handler function calls `mod_event_pusher:push_event(Acc, Event)`. +1. `mod_event_pusher:push_event/2` runs another hook called `push_event`. +1. All configured backend modules have handlers for the `push_event` hook, and all these handlers are called. + +### Custom event processing + +By implementing your own module handling the `push_event` hook, you can: + +- Push the events to a new service, such as a message queue or a database. +- Filter the events by returning `{ok, ...}` to keep an event, or `{stop, ...}` to drop it. +- Add a map with **metadata** to the events. The keys need to be atoms, and the values need to be atoms, binaries or numbers. + +There is an example [mod_event_pusher_filter.erl](https://github.com/esl/MongooseIM/blob/master/big_tests/tests/mod_event_pusher_filter.erl) module, demonstrating how to filter the events and append additional metadata. + +!!! Note + Execution order of handlers depends on their priorities. In particular, filtering events or adding metadata needs to happend before pushing notifications to external services. The example handler has the priority value of 10, while backends have the priority of 50. + +!!! Warning + Currently only the [rabbit](mod_event_pusher_rabbit.md#additional-metadata) backend supports adding metadata to the published notifications. + [http]: ./mod_event_pusher_http.md [push]: ./mod_event_pusher_push.md [rabbit]: ./mod_event_pusher_rabbit.md diff --git a/doc/modules/mod_event_pusher_rabbit.md b/doc/modules/mod_event_pusher_rabbit.md index dbb67b2e19..1debdd5256 100644 --- a/doc/modules/mod_event_pusher_rabbit.md +++ b/doc/modules/mod_event_pusher_rabbit.md @@ -185,6 +185,10 @@ and for "received" events: } ``` +## Additional metadata + +If you decide to [customize the events](mod_event_pusher.md#event-customization) with additional metadata, the additional key-value pairs will be added directly to the JSON object. You can override existing properties, but it is counter-intuitive and thus not recommended. + ## Metrics The module provides some metrics related to RabbitMQ connections and messages diff --git a/src/event_pusher/mod_event_pusher.erl b/src/event_pusher/mod_event_pusher.erl index e64bdd8c50..e9af2ff01e 100644 --- a/src/event_pusher/mod_event_pusher.erl +++ b/src/event_pusher/mod_event_pusher.erl @@ -25,7 +25,10 @@ -type backend() :: http | push | rabbit | sns. -type event() :: #user_status_event{} | #chat_event{} | #unack_msg_event{}. --export_type([event/0]). +-type metadata() :: #{atom() => atom() | binary() | number()}. +-type push_event_params() :: #{event := event()}. +-type push_event_acc() :: #{acc := mongoose_acc:t(), metadata := metadata()}. +-export_type([event/0, metadata/0, push_event_acc/0, push_event_params/0]). -export([deps/2, start/2, stop/1, config_spec/0, push_event/2]). @@ -33,12 +36,6 @@ -ignore_xref([behaviour_info/1]). -%%-------------------------------------------------------------------- -%% Callbacks -%%-------------------------------------------------------------------- - --callback push_event(mongoose_acc:t(), event()) -> mongoose_acc:t(). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -46,9 +43,8 @@ %% @doc Pushes the event to each backend registered with the event_pusher. -spec push_event(mongoose_acc:t(), event()) -> mongoose_acc:t(). push_event(Acc, Event) -> - HostType = mongoose_acc:host_type(Acc), - Backends = maps:keys(gen_mod:get_loaded_module_opts(HostType, ?MODULE)), - lists:foldl(fun(B, Acc0) -> (backend_module(B)):push_event(Acc0, Event) end, Acc, Backends). + #{acc := NewAcc} = mongoose_hooks:push_event(Acc, Event), + NewAcc. %%-------------------------------------------------------------------- %% gen_mod API diff --git a/src/event_pusher/mod_event_pusher_http.erl b/src/event_pusher/mod_event_pusher_http.erl index 39493e9d50..91b33e63aa 100644 --- a/src/event_pusher/mod_event_pusher_http.erl +++ b/src/event_pusher/mod_event_pusher_http.erl @@ -11,7 +11,6 @@ -ignore_xref([behaviour_info/1]). -behaviour(gen_mod). --behaviour(mod_event_pusher). -behaviour(mongoose_module_metrics). -callback should_make_req(Acc :: mongoose_acc:t(), @@ -39,7 +38,10 @@ -include("jlib.hrl"). %% API --export([start/2, stop/1, config_spec/0, push_event/2, instrumentation/1]). +-export([start/2, stop/1, hooks/1, config_spec/0, instrumentation/1]). + +%% hook handlers +-export([push_event/3]). %% config spec callbacks -export([fix_path/1]). @@ -58,6 +60,10 @@ start(_HostType, _Opts) -> stop(_HostType) -> ok. +-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list(). +hooks(HostType) -> + [{push_event, HostType, fun ?MODULE:push_event/3, #{}, 50}]. + -spec config_spec() -> mongoose_config_spec:config_section(). config_spec() -> #section{items = #{<<"handlers">> => #list{items = handler_config_spec(), @@ -81,13 +87,16 @@ instrumentation(HostType) -> [{?SENT_METRIC, #{host_type => HostType}, #{metrics => #{count => spiral, response_time => histogram, failure_count => spiral}}}]. -push_event(Acc, #chat_event{direction = Dir, from = From, to = To, packet = Packet}) -> - HostType = mongoose_acc:host_type(Acc), +-spec push_event(mod_event_pusher:push_event_acc(), mod_event_pusher:push_event_params(), + gen_hook:extra()) -> {ok, mod_event_pusher:push_event_acc()}. +push_event(HookAcc, #{event := #chat_event{direction = Dir, from = From, to = To, packet = Packet}}, + #{host_type := HostType}) -> + #{acc := Acc} = HookAcc, lists:map(fun(Opts) -> push_event(Acc, Dir, From, To, Packet, Opts) end, gen_mod:get_module_opt(HostType, ?MODULE, handlers)), - Acc; -push_event(Acc, _Event) -> - Acc. + {ok, HookAcc}; +push_event(HookAcc, _Params, _Extra) -> + {ok, HookAcc}. push_event(Acc, Dir, From, To, Packet, Opts = #{callback_module := Mod}) -> Body = exml_query:path(Packet, [{element, <<"body">>}, cdata], <<>>), diff --git a/src/event_pusher/mod_event_pusher_push.erl b/src/event_pusher/mod_event_pusher_push.erl index 42e79281c6..f008be9a5e 100644 --- a/src/event_pusher/mod_event_pusher_push.erl +++ b/src/event_pusher/mod_event_pusher_push.erl @@ -12,7 +12,6 @@ -module(mod_event_pusher_push). -author('rafal.slota@erlang-solutions.com'). -behavior(gen_mod). --behavior(mod_event_pusher). -behaviour(mongoose_module_metrics). -xep([{xep, 357}, {version, "0.4.1"}]). @@ -29,13 +28,13 @@ %%-------------------------------------------------------------------- %% gen_mod behaviour --export([start/2, stop/1, config_spec/0]). +-export([start/2, stop/1, hooks/1, config_spec/0]). %% mongoose_module_metrics behaviour -export([config_metrics/1]). -%% mod_event_pusher behaviour --export([push_event/2]). +%% hook handlers +-export([push_event/3]). %% Hooks and IQ handlers -export([iq_handler/4, @@ -66,7 +65,6 @@ start(HostType, Opts) -> mod_event_pusher_push_backend:init(HostType, Opts), mod_event_pusher_push_plugin:init(HostType, Opts), init_iq_handlers(HostType, Opts), - gen_hook:add_handler(remove_user, HostType, fun ?MODULE:remove_user/3, #{}, 90), ok. start_pool(HostType, #{wpool := WpoolOpts}) -> @@ -80,14 +78,17 @@ init_iq_handlers(HostType, #{iqdisc := IQDisc}) -> -spec stop(mongooseim:host_type()) -> ok. stop(HostType) -> - gen_hook:delete_handler(remove_user, HostType, fun ?MODULE:remove_user/3, #{}, 90), - gen_iq_handler:remove_iq_handler(ejabberd_sm, HostType, ?NS_PUSH), gen_iq_handler:remove_iq_handler(ejabberd_local, HostType, ?NS_PUSH), mongoose_wpool:stop(generic, HostType, pusher_push), ok. +-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list(). +hooks(HostType) -> + [{remove_user, HostType, fun ?MODULE:remove_user/3, #{}, 90}, + {push_event, HostType, fun ?MODULE:push_event/3, #{}, 50}]. + -spec config_spec() -> mongoose_config_spec:config_section(). config_spec() -> VirtPubSubHost = #option{type = string, validate = subdomain_template, @@ -108,21 +109,6 @@ wpool_spec() -> Wpool = mongoose_config_spec:wpool(#{<<"strategy">> => available_worker}), Wpool#section{include = always}. -%%-------------------------------------------------------------------- -%% mod_event_pusher callbacks -%%-------------------------------------------------------------------- --spec push_event(mongoose_acc:t(), mod_event_pusher:event()) -> mongoose_acc:t(). -push_event(Acc, Event = #chat_event{direction = out, to = To, type = Type}) - when Type =:= groupchat; - Type =:= chat -> - BareRecipient = jid:to_bare(To), - do_push_event(Acc, Event, BareRecipient); -push_event(Acc, Event = #unack_msg_event{to = To}) -> - BareRecipient = jid:to_bare(To), - do_push_event(Acc, Event, BareRecipient); -push_event(Acc, _) -> - Acc. - %%-------------------------------------------------------------------- %% Hooks and IQ handlers %%-------------------------------------------------------------------- @@ -135,6 +121,23 @@ remove_user(Acc, #{jid := #jid{luser = LUser, lserver = LServer}}, _) -> mongoose_lib:log_if_backend_error(R, ?MODULE, ?LINE, {Acc, LUser, LServer}), {ok, Acc}. +-spec push_event(mod_event_pusher:push_event_acc(), mod_event_pusher:push_event_params(), + gen_hook:extra()) -> {ok, mod_event_pusher:push_event_acc()}. +push_event(HookAcc, #{event := Event = #chat_event{direction = out, to = To, type = Type}}, _Extra) + when Type =:= groupchat; + Type =:= chat -> + #{acc := Acc} = HookAcc, + BareRecipient = jid:to_bare(To), + NewAcc = do_push_event(Acc, Event, BareRecipient), + {ok, HookAcc#{acc := NewAcc}}; +push_event(HookAcc = #{acc := Acc}, #{event := Event = #unack_msg_event{to = To}}, _Extra) -> + BareRecipient = jid:to_bare(To), + #{acc := Acc} = HookAcc, + NewAcc = do_push_event(Acc, Event, BareRecipient), + {ok, HookAcc#{acc := NewAcc}}; +push_event(HookAcc, _Params, _Extra) -> + {ok, HookAcc}. + -spec iq_handler(From :: jid:jid(), To :: jid:jid(), Acc :: mongoose_acc:t(), IQ :: jlib:iq()) -> {mongoose_acc:t(), jlib:iq() | ignore}. diff --git a/src/event_pusher/mod_event_pusher_rabbit.erl b/src/event_pusher/mod_event_pusher_rabbit.erl index 24060a2be9..84331d968d 100644 --- a/src/event_pusher/mod_event_pusher_rabbit.erl +++ b/src/event_pusher/mod_event_pusher_rabbit.erl @@ -26,7 +26,6 @@ -behaviour(gen_mod). -behaviour(mongoose_module_metrics). --behaviour(mod_event_pusher). %%%=================================================================== %%% Definitions @@ -44,10 +43,10 @@ %%%=================================================================== %% MIM module callbacks --export([start/2, stop/1, config_spec/0]). +-export([start/2, stop/1, hooks/1, config_spec/0]). -%% API --export([push_event/2]). +%% hook handlers +-export([push_event/3]). %%%=================================================================== %%% Callbacks @@ -62,6 +61,10 @@ start(HostType, Opts) -> stop(_HostType) -> ok. +-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list(). +hooks(HostType) -> + [{push_event, HostType, fun ?MODULE:push_event/3, #{}, 50}]. + -spec config_spec() -> mongoose_config_spec:config_section(). config_spec() -> #section{items = #{<<"presence_exchange">> => exchange_spec(<<"presence">>), @@ -85,21 +88,16 @@ exchange_spec(Name) -> defaults = #{<<"name">> => Name, <<"type">> => <<"topic">>}}. --spec push_event(mongoose_acc:t(), mod_event_pusher:event()) -> mongoose_acc:t(). -push_event(Acc, Event) -> - case event_to_key(Event) of - {ok, ExchangeKey} -> - HostType = mongoose_acc:host_type(Acc), - case exchange_opts(HostType, ExchangeKey) of - {ok, ExchangeOpts} -> - handle_event(HostType, Event, ExchangeOpts), - Acc; - {error, not_found} -> - Acc - end; - skip -> - Acc - end. +-spec push_event(mod_event_pusher:push_event_acc(), mod_event_pusher:push_event_params(), + gen_hook:extra()) -> {ok, mod_event_pusher:push_event_acc()}. +push_event(HookAcc, #{event := Event}, #{host_type := HostType}) -> + maybe + {ok, ExchangeKey} ?= event_to_key(Event), + {ok, ExchangeOpts} ?= exchange_opts(HostType, ExchangeKey), + #{metadata := Metadata} = HookAcc, + handle_event(HostType, Event, Metadata, ExchangeOpts) + end, + {ok, HookAcc}. %%%=================================================================== %%% Internal functions @@ -116,13 +114,14 @@ create_exchange(HostType, #{name := ExName, type := Type}) -> call_rabbit_worker(HostType, {amqp_call, mongoose_amqp:exchange_declare(ExName, Type)}). -spec handle_event(mongooseim:host_type(), #user_status_event{} | #chat_event{}, - exchange_opts()) -> ok. -handle_event(HostType, Event, ExchangeOpts = #{name := ExchangeName}) -> + mod_event_pusher:metadata(), exchange_opts()) -> ok. +handle_event(HostType, Event, Metadata, ExchangeOpts = #{name := ExchangeName}) -> case message(Event) of Message = #{} -> + MessageJSON = iolist_to_binary(jiffy:encode(maps:merge(Message, Metadata))), RoutingKey = routing_key(Event, ExchangeOpts), PublishMethod = mongoose_amqp:basic_publish(ExchangeName, RoutingKey), - AMQPMessage = mongoose_amqp:message(iolist_to_binary(jiffy:encode(Message))), + AMQPMessage = mongoose_amqp:message(MessageJSON), cast_rabbit_worker(HostType, {amqp_publish, PublishMethod, AMQPMessage}); skip -> ok diff --git a/src/event_pusher/mod_event_pusher_sns.erl b/src/event_pusher/mod_event_pusher_sns.erl index 28d1e762e7..09a8fb2e4c 100644 --- a/src/event_pusher/mod_event_pusher_sns.erl +++ b/src/event_pusher/mod_event_pusher_sns.erl @@ -1,7 +1,6 @@ -module(mod_event_pusher_sns). -behaviour(gen_mod). --behaviour(mod_event_pusher). -include("mod_event_pusher_events.hrl"). -include("mongoose.hrl"). @@ -32,10 +31,13 @@ %%%=================================================================== %% MIM module callbacks --export([start/2, stop/1, config_spec/0]). +-export([start/2, stop/1, hooks/1, config_spec/0]). %% API --export([try_publish/5, push_event/2]). +-export([try_publish/5]). + +%% hook handlers +-export([push_event/3]). -ignore_xref([behaviour_info/1, try_publish/5]). @@ -48,8 +50,7 @@ start(HostType, Opts) -> start_pool(HostType, Opts), ok. --spec start_pool(mongooseim:host_type(), gen_mod:module_opts()) -> - term(). +-spec start_pool(mongooseim:host_type(), gen_mod:module_opts()) -> term(). start_pool(HostType, Opts) -> {ok, _} = mongoose_wpool:start(generic, HostType, pusher_sns, pool_opts(Opts)). @@ -67,6 +68,10 @@ stop(HostType) -> mongoose_wpool:stop(generic, HostType, pusher_sns), ok. +-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list(). +hooks(HostType) -> + [{push_event, HostType, fun ?MODULE:push_event/3, #{}, 50}]. + -spec config_spec() -> mongoose_config_spec:config_section(). config_spec() -> #section{ @@ -96,14 +101,18 @@ config_spec() -> } }. -push_event(Acc, #user_status_event{jid = UserJID, status = Status}) -> - user_presence_changed(mongoose_acc:host_type(Acc), UserJID, Status == online), - Acc; -push_event(Acc, #chat_event{direction = in, from = From, to = To, packet = Packet}) -> - handle_packet(mongoose_acc:host_type(Acc), From, To, Packet), - Acc; -push_event(Acc, _) -> - Acc. +-spec push_event(mod_event_pusher:push_event_acc(), mod_event_pusher:push_event_params(), + gen_hook:extra()) -> {ok, mod_event_pusher:push_event_acc()}. +push_event(HookAcc, #{event := #user_status_event{jid = UserJID, status = Status}}, + #{host_type := HostType}) -> + user_presence_changed(HostType, UserJID, Status == online), + {ok, HookAcc}; +push_event(HookAcc, #{event := #chat_event{direction = in, from = From, to = To, packet = Packet}}, + #{host_type := HostType}) -> + handle_packet(HostType, From, To, Packet), + {ok, HookAcc}; +push_event(HookAcc, _Params, _Extra) -> + {ok, HookAcc}. %%%=================================================================== %%% Internal functions diff --git a/src/gen_hook.erl b/src/gen_hook.erl index 06a664a884..bb2edfd7e2 100644 --- a/src/gen_hook.erl +++ b/src/gen_hook.erl @@ -21,7 +21,7 @@ %% exported for unit tests only -export([error_running_hook/5]). --ignore_xref([start_link/0, add_handlers/1, delete_handlers/1]). +-ignore_xref([start_link/0, delete_handler/5]). -include("safely.hrl"). -include("mongoose.hrl"). diff --git a/src/hooks/mongoose_hooks.erl b/src/hooks/mongoose_hooks.erl index 438d3fdccd..f232222cf1 100644 --- a/src/hooks/mongoose_hooks.erl +++ b/src/hooks/mongoose_hooks.erl @@ -144,6 +144,8 @@ node_cleanup/1, node_cleanup_for_host_type/2]). +-export([push_event/2]). + -ignore_xref([remove_domain/2]). -ignore_xref([mam_archive_sync/1, mam_muc_archive_sync/1]). @@ -1376,6 +1378,11 @@ mod_global_distrib_known_recipient(GlobalHost, From, To, LocalHost) -> mod_global_distrib_unknown_recipient(GlobalHost, Info) -> run_hook_for_host_type(mod_global_distrib_unknown_recipient, GlobalHost, Info, #{}). +%%% @doc The `push_event' hook is called when `mod_event_pusher' publishes an event. +-spec push_event(mongoose_acc:t(), mod_event_pusher:event()) -> mod_event_pusher:push_event_acc(). +push_event(Acc, Event) -> + HostType = mongoose_acc:host_type(Acc), + run_hook_for_host_type(push_event, HostType, #{acc => Acc, metadata => #{}}, #{event => Event}). %%%---------------------------------------------------------------------- %%% Internal functions diff --git a/test/event_pusher_sns_SUITE.erl b/test/event_pusher_sns_SUITE.erl index 8c2d4735e8..5285498b56 100644 --- a/test/event_pusher_sns_SUITE.erl +++ b/test/event_pusher_sns_SUITE.erl @@ -134,20 +134,23 @@ send_packet_callback(Config, Type, Body) -> Packet = message(Config, Type, Body), Sender = ?config(sender, Config), Recipient = ?config(recipient, Config), - mod_event_pusher_sns:push_event(mongoose_acc:new(?ACC_PARAMS), - #chat_event{type = chat, direction = in, - from = Sender, to = Recipient, - packet = Packet}). + push_event(#chat_event{type = chat, direction = in, + from = Sender, to = Recipient, + packet = Packet}). user_present_callback(Config) -> Jid = ?config(sender, Config), - mod_event_pusher_sns:push_event(mongoose_acc:new(?ACC_PARAMS), - #user_status_event{jid = Jid, status = online}). + push_event(#user_status_event{jid = Jid, status = online}). user_not_present_callback(Config) -> Jid = ?config(sender, Config), - mod_event_pusher_sns:push_event(mongoose_acc:new(?ACC_PARAMS), - #user_status_event{jid = Jid, status = offline}). + push_event(#user_status_event{jid = Jid, status = offline}). + +push_event(Event) -> + HookAcc = #{acc => mongoose_acc:new(?ACC_PARAMS), metadata => #{}}, + HookParams = #{event => Event}, + HookExtra = #{host_type => host_type()}, + mod_event_pusher_sns:push_event(HookAcc, HookParams, HookExtra). %% Helpers