Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions big_tests/tests/mod_event_pusher_filter.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-module(mod_event_pusher_filter).
-moduledoc """
An example filter and metadata provider for mod_event_pusher

It performs two actions:
1. Filters out any events other than 'user_status_event'.
2. Adds the following event metadata:
- timestamp in milliseconds (taken from mongoose_acc)
- number of user's active sessions
The metadata can be injected into the event published by the configured backends
(currently only 'rabbit' supports this).
""".

-behaviour(gen_mod).

%% gen_mod callbacks
-export([start/2, stop/1, hooks/1]).

%% hook handlers
-export([push_event/3]).

-include_lib("../../include/mod_event_pusher_events.hrl").

%% gen_mod callbacks

-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
start(_HostType, _Opts) ->
ok.

-spec stop(mongooseim:host_type()) -> ok.
stop(_HostType) ->
ok.

-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
hooks(HostType) ->
[{push_event, HostType, fun ?MODULE:push_event/3, #{}, 10}]. % needs to run before other handlers

%% hook handlers

-doc "For user status events, add timestamp and session count. Filter out other events.".
-spec push_event(mod_event_pusher:push_event_acc(), mod_event_pusher:push_event_params(),
gen_hook:extra()) -> gen_hook:hook_fn_ret(mod_event_pusher:push_event_acc()).
push_event(HookAcc = #{acc := Acc}, #{event := #user_status_event{jid = JID}}, _Extra) ->
#{metadata := Metadata} = HookAcc,
NewMetadata = Metadata#{timestamp => mongoose_acc:timestamp(Acc),
session_count => count_user_sessions(JID)},
{ok, HookAcc#{metadata := NewMetadata}};
push_event(HookAcc, _Params, _Extra) ->
{stop, HookAcc}.

%% helpers

-spec count_user_sessions(jid:jid()) -> non_neg_integer().
count_user_sessions(JID) ->
length(ejabberd_sm:get_user_present_resources(JID)).
50 changes: 36 additions & 14 deletions big_tests/tests/mod_event_pusher_rabbit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ all() ->
{group, only_presence_status_publish},
{group, chat_message_publish},
{group, group_chat_message_publish},
{group, instrumentation}
{group, instrumentation},
{group, filter_and_metadata}
].

groups() ->
Expand All @@ -61,7 +62,8 @@ groups() ->
{only_presence_status_publish, [], only_presence_status_publish_tests()},
{chat_message_publish, [], chat_message_publish_tests()},
{group_chat_message_publish, [], group_chat_message_publish_tests()},
{instrumentation, [], instrumentation_tests()}].
{instrumentation, [], instrumentation_tests()},
{filter_and_metadata, [], filter_and_metadata_tests()}].

pool_startup_tests() ->
[rabbit_pool_starts_with_default_config].
Expand Down Expand Up @@ -96,6 +98,10 @@ instrumentation_tests() ->
[connections_events_are_executed,
messages_published_events_are_executed].

filter_and_metadata_tests() ->
[messages_published_events_are_not_executed,
presence_messages_are_properly_formatted_with_metadata].

suite() ->
escalus:suite().

Expand All @@ -112,6 +118,7 @@ init_per_suite(Config) ->
start_rabbit_wpool(domain()),
{ok, _} = application:ensure_all_started(amqp_client),
muc_helper:load_muc(),
mongoose_helper:inject_module(mod_event_pusher_filter),
escalus:init_per_suite(Config);
false ->
{skip, "RabbitMQ server is not available on default port."}
Expand All @@ -132,6 +139,9 @@ init_per_group(GroupName, Config0) ->

required_modules(pool_startup) ->
[{mod_event_pusher, stopped}];
required_modules(filter_and_metadata = GroupName) ->
[{mod_event_pusher_filter, #{}},
{mod_event_pusher, #{rabbit => mod_event_pusher_rabbit_opts(GroupName)}}];
required_modules(GroupName) ->
[{mod_event_pusher, #{rabbit => mod_event_pusher_rabbit_opts(GroupName)}}].

Expand Down Expand Up @@ -218,7 +228,7 @@ only_presence_exchange_is_created_on_module_startup(Config) ->
?assertNot(is_exchange_present(Connection, {?GROUP_CHAT_MSG_EXCHANGE, ExCustomType})).

%%--------------------------------------------------------------------
%% GROUP (only_)presence_status_publish
%% GROUP (only_)presence_status_publish, filter_and_metadata
%%--------------------------------------------------------------------

connected_users_push_presence_events_when_change_status(Config) ->
Expand All @@ -236,20 +246,31 @@ connected_users_push_presence_events_when_change_status(Config) ->
end).

presence_messages_are_properly_formatted(Config) ->
escalus:story(
escalus:fresh_story_with_config(
Config, [{bob, 1}], fun presence_messages_are_properly_formatted_story/2).

presence_messages_are_properly_formatted_with_metadata(Config) ->
escalus:fresh_story_with_config(
Config, [{bob, 1}],
fun(Bob) ->
%% GIVEN
BobJID = client_lower_short_jid(Bob),
BobFullJID = client_lower_full_jid(Bob),
listen_to_presence_events_from_rabbit([BobJID], Config),
%% WHEN user logout
escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)),
%% THEN receive message
?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false},
get_decoded_message_from_rabbit(BobJID))
fun(_, Bob) ->
TS = rpc(mim(), erlang, system_time, [microsecond]),
DecodedMessage = presence_messages_are_properly_formatted_story(Config, Bob),
#{<<"timestamp">> := T, <<"session_count">> := 0} = DecodedMessage,
?assert(is_integer(T) andalso T > TS)
end).

presence_messages_are_properly_formatted_story(Config, Bob) ->
%% GIVEN
BobJID = client_lower_short_jid(Bob),
BobFullJID = client_lower_full_jid(Bob),
listen_to_presence_events_from_rabbit([BobJID], Config),
%% WHEN user logout
escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)),
%% THEN receive message
DecodedMessage = get_decoded_message_from_rabbit(BobJID),
?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false}, DecodedMessage),
DecodedMessage.

messages_published_events_are_not_executed(Config) ->
escalus:story(
Config, [{bob, 1}, {alice, 1}],
Expand Down Expand Up @@ -702,6 +723,7 @@ send_presence_stanza(User, NumOfMsgs) ->
get_decoded_message_from_rabbit(RoutingKey) ->
receive
{#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} ->
ct:log("Decoded rabbit message, rk=~p~nmessage:~ts", [RoutingKey, Msg]),
jiffy:decode(Msg, [return_maps])
after
5000 -> ct:fail("Timeout when decoding message, rk=~p", [RoutingKey])
Expand Down
4 changes: 4 additions & 0 deletions doc/migrations/6.4.0_6.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ Instead of always sending all notifications, it only enables the ones with a rel

Make sure you have all the necessary sections present in the configuration file before upgrading.

### Custom backends for `mod_event_pusher`

If you have a custom backend implemented for `mod_event_pusher`, you need to update it to handle the `push_event` hook instead of implementing the `mod_event_pusher` behaviour (which no longer exists).

### Deprecation

MSSQL backend is deprecated and will be removed in the next release.
42 changes: 32 additions & 10 deletions doc/modules/mod_event_pusher.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
## Module Description

This module is a generic interface for event-pushing backends.
It defines a single callback, `push_event/2` that forwards the event to all registered backends.
Each backend decides how and if to handle the event in its `push_event/2` implementation.

This module is a generic interface for pushing **events** to the configured **backends**.
The events include presence updates and incoming/outgoing messages.
Currently supported backends include [http], [push], [rabbit] and [sns].
Refer to their specific documentation to learn more about their functions and configuration options.

### How it works

The events are standardized as records that can be found in the `mod_event_pusher_events.hrl` file.
Common events like user presence changes (offline and online), chat and groupchat messages (incoming
and outgoing) are already handled in the `mod_event_pusher_hook_translator` module, which is a proxy between various hooks and the `push_event/2` handler.

!!! Warning
This module does not support [dynamic domains](../configuration/general.md#generalhost_types).

Expand Down Expand Up @@ -46,6 +38,36 @@ The `[modules.mod_event_pusher]` section itself is omitted - this is allowed in
[modules.mod_event_pusher.rabbit]
```

## How it works

The events are standardized as records that can be found in the `mod_event_pusher_events.hrl` file.
Common events like user presence changes (offline and online), chat and groupchat messages (incoming and outgoing) are handled in the `mod_event_pusher_hook_translator` module.
Each event has a corresponding [hook](../developers-guide/Hooks-and-handlers.md), e.g. `user_send_message` is run when a user sends a message.
`mod_event_pusher_hook_translator` has a handler function for each supported hook.

Handling an event includes the following steps:

1. The event hook is executed, and the corresponding handler function in `mod_event_pusher_hook_translator` is called.
1. The handler function calls `mod_event_pusher:push_event(Acc, Event)`.
1. `mod_event_pusher:push_event/2` runs another hook called `push_event`.
1. All configured backend modules have handlers for the `push_event` hook, and all these handlers are called.

### Custom event processing

By implementing your own module handling the `push_event` hook, you can:

- Push the events to a new service, such as a message queue or a database.
- Filter the events by returning `{ok, ...}` to keep an event, or `{stop, ...}` to drop it.
- Add a map with **metadata** to the events. The keys need to be atoms, and the values need to be atoms, binaries or numbers.

There is an example [mod_event_pusher_filter.erl](https://github.com/esl/MongooseIM/blob/master/big_tests/tests/mod_event_pusher_filter.erl) module, demonstrating how to filter the events and append additional metadata.

!!! Note
Execution order of handlers depends on their priorities. In particular, filtering events or adding metadata needs to happend before pushing notifications to external services. The example handler has the priority value of 10, while backends have the priority of 50.

!!! Warning
Currently only the [rabbit](mod_event_pusher_rabbit.md#additional-metadata) backend supports adding metadata to the published notifications.

[http]: ./mod_event_pusher_http.md
[push]: ./mod_event_pusher_push.md
[rabbit]: ./mod_event_pusher_rabbit.md
Expand Down
4 changes: 4 additions & 0 deletions doc/modules/mod_event_pusher_rabbit.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ and for "received" events:
}
```

## Additional metadata

If you decide to [customize the events](mod_event_pusher.md#event-customization) with additional metadata, the additional key-value pairs will be added directly to the JSON object. You can override existing properties, but it is counter-intuitive and thus not recommended.

## Metrics

The module provides some metrics related to RabbitMQ connections and messages
Expand Down
16 changes: 6 additions & 10 deletions src/event_pusher/mod_event_pusher.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,26 @@

-type backend() :: http | push | rabbit | sns.
-type event() :: #user_status_event{} | #chat_event{} | #unack_msg_event{}.
-export_type([event/0]).
-type metadata() :: #{atom() => atom() | binary() | number()}.
-type push_event_params() :: #{event := event()}.
-type push_event_acc() :: #{acc := mongoose_acc:t(), metadata := metadata()}.
-export_type([event/0, metadata/0, push_event_acc/0, push_event_params/0]).

-export([deps/2, start/2, stop/1, config_spec/0, push_event/2]).

-export([config_metrics/1]).

-ignore_xref([behaviour_info/1]).

%%--------------------------------------------------------------------
%% Callbacks
%%--------------------------------------------------------------------

-callback push_event(mongoose_acc:t(), event()) -> mongoose_acc:t().

%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------

%% @doc Pushes the event to each backend registered with the event_pusher.
-spec push_event(mongoose_acc:t(), event()) -> mongoose_acc:t().
push_event(Acc, Event) ->
HostType = mongoose_acc:host_type(Acc),
Backends = maps:keys(gen_mod:get_loaded_module_opts(HostType, ?MODULE)),
lists:foldl(fun(B, Acc0) -> (backend_module(B)):push_event(Acc0, Event) end, Acc, Backends).
#{acc := NewAcc} = mongoose_hooks:push_event(Acc, Event),
NewAcc.

%%--------------------------------------------------------------------
%% gen_mod API
Expand Down
23 changes: 16 additions & 7 deletions src/event_pusher/mod_event_pusher_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
-ignore_xref([behaviour_info/1]).

-behaviour(gen_mod).
-behaviour(mod_event_pusher).
-behaviour(mongoose_module_metrics).

-callback should_make_req(Acc :: mongoose_acc:t(),
Expand Down Expand Up @@ -39,7 +38,10 @@
-include("jlib.hrl").

%% API
-export([start/2, stop/1, config_spec/0, push_event/2, instrumentation/1]).
-export([start/2, stop/1, hooks/1, config_spec/0, instrumentation/1]).

%% hook handlers
-export([push_event/3]).

%% config spec callbacks
-export([fix_path/1]).
Expand All @@ -58,6 +60,10 @@ start(_HostType, _Opts) ->
stop(_HostType) ->
ok.

-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
hooks(HostType) ->
[{push_event, HostType, fun ?MODULE:push_event/3, #{}, 50}].

-spec config_spec() -> mongoose_config_spec:config_section().
config_spec() ->
#section{items = #{<<"handlers">> => #list{items = handler_config_spec(),
Expand All @@ -81,13 +87,16 @@ instrumentation(HostType) ->
[{?SENT_METRIC, #{host_type => HostType},
#{metrics => #{count => spiral, response_time => histogram, failure_count => spiral}}}].

push_event(Acc, #chat_event{direction = Dir, from = From, to = To, packet = Packet}) ->
HostType = mongoose_acc:host_type(Acc),
-spec push_event(mod_event_pusher:push_event_acc(), mod_event_pusher:push_event_params(),
gen_hook:extra()) -> {ok, mod_event_pusher:push_event_acc()}.
push_event(HookAcc, #{event := #chat_event{direction = Dir, from = From, to = To, packet = Packet}},
#{host_type := HostType}) ->
#{acc := Acc} = HookAcc,
lists:map(fun(Opts) -> push_event(Acc, Dir, From, To, Packet, Opts) end,
gen_mod:get_module_opt(HostType, ?MODULE, handlers)),
Acc;
push_event(Acc, _Event) ->
Acc.
{ok, HookAcc};
push_event(HookAcc, _Params, _Extra) ->
{ok, HookAcc}.

push_event(Acc, Dir, From, To, Packet, Opts = #{callback_module := Mod}) ->
Body = exml_query:path(Packet, [{element, <<"body">>}, cdata], <<>>),
Expand Down
Loading