Skip to content

Commit 0f3e85d

Browse files
authored
Merge pull request #545 from rabbitmq/expose-ra-counters
Expose ra counters, remove ra_metrics
2 parents e51e070 + 3a6bff4 commit 0f3e85d

19 files changed

+52
-80
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ ESCRIPT_EMU_ARGS = -noinput -setcookie ra_fifo_cli
1010

1111
dep_gen_batch_server = hex 0.8.9
1212
dep_aten = hex 0.6.0
13-
dep_seshat = hex 0.6.0
13+
dep_seshat = hex 1.0.0
1414
DEPS = aten gen_batch_server seshat
1515

1616
TEST_DEPS = proper meck eunit_formatters inet_tcp_proxy

README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ which algorithms such as Raft significantly benefit from.
5353

5454
This library was primarily developed as the foundation of a replication layer for
5555
[quorum queues](https://rabbitmq.com/quorum-queues.html) in RabbitMQ, and today
56-
also powers [RabbitMQ streams](https://rabbitmq.com/streams.html) and [Khepri](https://github.com/rabbitmq/khepri).
56+
also powers [RabbitMQ streams](https://rabbitmq.com/streams.html) and [Khepri](https://github.com/rabbitmq/khepri).
5757

5858
The design it aims to replace uses
5959
a variant of [Chain Based Replication](https://www.cs.cornell.edu/home/rvr/papers/OSDI04.pdf)
@@ -405,9 +405,14 @@ is available in a separate repository.
405405
</tr>
406406
<tr>
407407
<td>metrics_key</td>
408-
<td>Metrics key. The key used to write metrics into the ra_metrics table.</td>
408+
<td>DEPRECATED: Metrics key. The key was used to write metrics into the ra_metrics table. Metrics are now in Seshat.</td>
409409
<td>Atom</td>
410410
</tr>
411+
<tr>
412+
<td>metrics_labels</td>
413+
<td>Metrics labels. Labels that are later returned with metrics for this server (eg. `#{vhost => ..., queue => ...}` for quorum queues).</td>
414+
<td>Map</td>
415+
</tr>
411416
<tr>
412417
<td>low_priority_commands_flush_size</td>
413418
<td>

rebar.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{deps, [
22
{gen_batch_server, "0.8.9"},
33
{aten, "0.6.0"},
4-
{seshat, "0.6.0"}
4+
{seshat, "1.0.0"}
55
]}.
66

77
{profiles,

rebar.lock

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
{"1.2.0",
22
[{<<"aten">>,{pkg,<<"aten">>,<<"0.6.0">>},0},
33
{<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.8.9">>},0},
4-
{<<"seshat">>,{pkg,<<"seshat">>,<<"0.6.0">>},0}]}.
4+
{<<"seshat">>,{pkg,<<"seshat">>,<<"1.0.0">>},0}]}.
55
[
66
{pkg_hash,[
77
{<<"aten">>, <<"7A57B275A6DAF515AC3683FB9853E280B4D0DCDD74292FD66AC4A01C8694F8C7">>},
88
{<<"gen_batch_server">>, <<"1C6BC0F530BF8C17E8B4ACC20C2CC369FFA5BEE2B46DE01E21410745F24B1BC9">>},
9-
{<<"seshat">>, <<"3172EB1D7A2A4F66108CD6933A4E465AFF80F84AA90ED83F047B92F636123CCD">>}]},
9+
{<<"seshat">>, <<"7CFD882C6BD0DA4B8C91AA339351727FE982EF7260A7B4E5B2A3E6551BE30E52">>}]},
1010
{pkg_hash_ext,[
1111
{<<"aten">>, <<"5F39A164206AE3F211EF5880B1F7819415686436E3229D30B6A058564FBAA168">>},
1212
{<<"gen_batch_server">>, <<"C8581FE4A4B6BCCF91E53CE6A8C7E6C27C8C591BAB5408B160166463F5579C22">>},
13-
{<<"seshat">>, <<"7CEF700F92831DD7CAE6A6DD223CCC55AC88ECCE0631EE9AB0F2B5FB70E79B90">>}]}
13+
{<<"seshat">>, <<"95018806AB04E10563157532F01C523F202C50DEC0E72C6C999064CE829CFF24">>}]}
1414
].

src/ra.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@
389389
"The last index of the log."},
390390
{last_written_index, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, counter,
391391
"The last fully written and fsynced index of the log."},
392-
{commit_latency, ?C_RA_SVR_METRIC_COMMIT_LATENCY, gauge,
392+
{commit_latency, ?C_RA_SVR_METRIC_COMMIT_LATENCY, {gauge, time_ms},
393393
"Approximate time taken from an entry being written to the log until it is committed."},
394394
{term, ?C_RA_SVR_METRIC_TERM, counter, "The current term."},
395395
{checkpoint_index, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, counter,

src/ra_bench.erl

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,8 @@ spawn_client(Parent, Leader, Num, DataSize, Pipe, Counter) ->
199199
end
200200
end).
201201

202-
print_metrics(Name) ->
202+
print_metrics(_Name) ->
203203
io:format("Node: ~w~n", [node()]),
204-
io:format("metrics ~p~n", [ets:lookup(ra_metrics, Name)]),
205204
io:format("counters ~p~n", [ra_counters:overview()]).
206205

207206

@@ -220,4 +219,3 @@ print_metrics(Name) ->
220219
% GzFile = Base ++ ".gz.*",
221220
% lg_callgrind:profile_many(GzFile, Base ++ ".out",#{}),
222221
% ok.
223-

src/ra_counters.erl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
-export([
1111
init/0,
1212
new/2,
13+
new/3,
1314
fetch/1,
1415
overview/0,
1516
overview/1,
@@ -32,6 +33,9 @@ init() ->
3233
new(Name, FieldsSpec) ->
3334
seshat:new(ra, Name, FieldsSpec).
3435

36+
new(Name, FieldsSpec, MetricLabels) ->
37+
seshat:new(ra, Name, FieldsSpec, MetricLabels).
38+
3539
-spec fetch(name()) -> undefined | counters:counters_ref().
3640
fetch(Name) ->
3741
seshat:fetch(ra, Name).
@@ -42,11 +46,11 @@ delete(Name) ->
4246

4347
-spec overview() -> #{name() => #{atom() => non_neg_integer()}}.
4448
overview() ->
45-
seshat:overview(ra).
49+
seshat:counters(ra).
4650

47-
-spec overview(name()) -> #{atom() => non_neg_integer()}.
51+
-spec overview(name()) -> #{atom() => non_neg_integer()} | undefined.
4852
overview(Name) ->
49-
seshat:overview(ra, Name).
53+
seshat:counters(ra, Name).
5054

5155
-spec counters(name(), [atom()]) ->
5256
#{atom() => non_neg_integer()} | undefined.

src/ra_log_segment_writer.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ init([#{data_dir := DataDir,
115115
name := SegWriterName,
116116
system := System} = Conf]) ->
117117
process_flag(trap_exit, true),
118-
CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS),
118+
CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS, #{ra_system => System, module => ?MODULE}),
119119
SegmentConf = maps:get(segment_conf, Conf, #{}),
120120
maybe_upgrade_segment_file_names(System, DataDir),
121121
{ok, #state{system = System,
@@ -554,4 +554,3 @@ maybe_upgrade_segment_file_names(System, DataDir) ->
554554
true ->
555555
ok
556556
end.
557-

src/ra_log_sup.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ init([#{data_dir := DataDir,
5656

5757

5858
make_wal_conf(#{data_dir := DataDir,
59-
name := _System,
59+
name := System,
6060
names := #{wal := WalName,
6161
segment_writer := SegWriterName} = Names} = Cfg) ->
6262
WalDir = case Cfg of
@@ -78,6 +78,7 @@ make_wal_conf(#{data_dir := DataDir,
7878
?MIN_BIN_VHEAP_SIZE),
7979
MinHeapSize = maps:get(wal_min_heap_size, Cfg, ?MIN_HEAP_SIZE),
8080
#{name => WalName,
81+
system => System,
8182
names => Names,
8283
dir => WalDir,
8384
segment_writer => SegWriterName,

src/ra_log_wal.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132

133133
-type state() :: #state{}.
134134
-type wal_conf() :: #{name := atom(), %% the name to register the wal as
135+
system := atom(),
135136
names := ra_system:names(),
136137
dir := file:filename_all(),
137138
max_size_bytes => non_neg_integer(),
@@ -254,7 +255,7 @@ start_link(#{name := Name} = Config)
254255
-spec init(wal_conf()) ->
255256
{ok, state()} |
256257
{stop, wal_checksum_validation_failure} | {stop, term()}.
257-
init(#{dir := Dir} = Conf0) ->
258+
init(#{dir := Dir, system := System} = Conf0) ->
258259
#{max_size_bytes := MaxWalSize,
259260
max_entries := MaxEntries,
260261
recovery_chunk_size := RecoveryChunkSize,
@@ -278,7 +279,9 @@ init(#{dir := Dir} = Conf0) ->
278279
process_flag(message_queue_data, off_heap),
279280
process_flag(min_bin_vheap_size, MinBinVheapSize),
280281
process_flag(min_heap_size, MinHeapSize),
281-
CRef = ra_counters:new(WalName, ?COUNTER_FIELDS),
282+
CRef = ra_counters:new(WalName,
283+
?COUNTER_FIELDS,
284+
#{ra_system => System, module => ?MODULE}),
282285
Conf = #conf{dir = Dir,
283286
segment_writer = SegWriter,
284287
compute_checksums = ComputeChecksums,

0 commit comments

Comments
 (0)