Skip to content

Commit e50f3ac

Browse files
committed
Time-based since parameter for _changes
Use the new time-seq feature to stream changes from before a point in time. This can be used for backups or any case when then it helps to associate a range of sequence updates to a time interval. The time-seq exponential decaying interval rules apply: the further back in time, the less accurate the time intervals will be. The API change consists in making `since` accept a standard time value and streaming the changes started right before that time value based on the known time-seq intervals. The time format of the since parameter is YYYY-MM-DDTHH:MM:SSZ. It's valid as either an ISO 8601 or an RFC 3339 format. From API design point of view this feature can be regarded as an extension to the other `since` values like `now` or `0`. Implementation-wise the change is treated similarly how we treat the `now` special value: before the changes request starts, we translate the time value to a proper `since` sequence. After that, we continue on with that regular sequence as if nothing special happened. Consequently, the shape of the emitted result is exactly the same as any previous change sequences. This is an extra "plus" for consistency and compatibility. To get a feel for the feature, I created a small db and updated it every few hours during the day: `http get $DB/db/_time_seq` ``` { "00000000-ffffffff": { "[email protected]": [ ["2025-07-21T01:00:00Z", 15], ["2025-07-21T05:00:00Z", 2] ["2025-07-21T19:00:00Z", 9], ["2025-07-21T20:00:00Z", 5], ["2025-07-21T21:00:00Z", 70] ["2025-07-21T22:00:00Z", 10] ] } } ``` Change feed with `since=2025-07-21T22:00:00Z` will return documents changed since that last hour only: ``` % http get $DB/db/_changes'?since=2025-07-21T22:00:00Z' | jq -r '.results[].id' 101 102 103 104 105 106 107 108 109 110 ``` Even the somewhat obscure `since_seq` replication parameter should work, so we can replicate from a particular point in time: ``` % http post 'http://adm:pass@localhost:15984/_replicate' \ source:='"http://adm:pass@localhost:15984/db"' \ target:='"http://adm:pass@localhost:15984/tgt"' \ since_seq:='"2025-07-21T22:00:00Z"' { "history": [ { "bulk_get_attempts": 10, "bulk_get_docs": 10, "doc_write_failures": 0, "docs_read": 10, "docs_written": 10, "end_last_seq": "111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews", "end_time": "Mon, 21 Jul 2025 22:11:59 GMT", "missing_checked": 10, "missing_found": 10, "recorded_seq": "111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews", "session_id": "19252b97e34088aeaaa6cde6694a419f", "start_last_seq": "2025-07-21T22:00:00Z", "start_time": "Mon, 21 Jul 2025 22:11:55 GMT" } ], "ok": true, "replication_id_version": 4, "session_id": "19252b97e34088aeaaa6cde6694a419f", "source_last_seq": "111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews" } ``` The target db now has only documents written in that last hour: ``` % http $DB/tgt/_all_docs | jq -r '.rows[].id' 101 102 103 104 105 106 107 108 109 110 ```
1 parent a74b514 commit e50f3ac

File tree

3 files changed

+115
-14
lines changed

3 files changed

+115
-14
lines changed

src/chttpd/src/chttpd_db.erl

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -872,18 +872,17 @@ db_req(#httpd{path_parts = [_, <<"_purged_infos_limit">>]} = Req, _Db) ->
872872
send_method_not_allowed(Req, "GET,PUT");
873873
db_req(#httpd{method = 'GET', path_parts = [_, <<"_time_seq">>]} = Req, Db) ->
874874
Options = [{user_ctx, Req#httpd.user_ctx}],
875-
case fabric:get_time_seq(Db, Options) of
876-
{ok, #{} = RangeNodeToTSeq} ->
875+
case fabric:time_seq_histogram(Db, Options) of
876+
{ok, #{} = RangeNodeToHist} ->
877877
Props = maps:fold(
878878
fun([B, E], #{} = ByNode, Acc) ->
879879
Range = mem3_util:range_to_hex([B, E]),
880-
MapF = fun(_, TSeq) -> couch_time_seq:histogram(TSeq) end,
881-
[{Range, maps:map(MapF, ByNode)} | Acc]
880+
[{Range, ByNode} | Acc]
882881
end,
883882
[],
884-
RangeNodeToTSeq
883+
RangeNodeToHist
885884
),
886-
send_json(Req, {lists:sort(Props)});
885+
send_json(Req, {[{<<"time_seq">>, {lists:sort(Props)}}]});
887886
Error ->
888887
throw(Error)
889888
end;

src/chttpd/test/eunit/chttpd_changes_test.erl

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ changes_test_() ->
8383
?TDEF(t_selector_filter),
8484
?TDEF(t_design_filter),
8585
?TDEF(t_docs_id_filter),
86-
?TDEF(t_docs_id_filter_over_limit)
86+
?TDEF(t_docs_id_filter_over_limit),
87+
?TDEF(t_time_since)
8788
])
8889
}.
8990

@@ -109,7 +110,8 @@ changes_q8_test_() ->
109110
?TDEF(t_reverse_limit_one_q8),
110111
?TDEF(t_selector_filter),
111112
?TDEF(t_design_filter),
112-
?TDEF(t_docs_id_filter_q8)
113+
?TDEF(t_docs_id_filter_q8),
114+
?TDEF(t_time_since_q8)
113115
])
114116
}.
115117

@@ -493,6 +495,97 @@ t_docs_id_filter_over_limit({_, DbUrl}) ->
493495
Rows
494496
).
495497

498+
t_time_since({_, DbUrl}) ->
499+
% Far into the future, we should get nothing
500+
Params1 = "?since=3000-02-03T04:05:00Z",
501+
{Seq1, Pending1, Rows1} = changes(DbUrl, Params1),
502+
?assertEqual(8, Seq1),
503+
?assertEqual(0, Pending1),
504+
?assertEqual([], Rows1),
505+
506+
% Before the feature is released, should same as since=0
507+
Params2 = "?since=2025-01-01T04:05:00Z",
508+
Res2 = {Seq2, Pending2, Rows2} = changes(DbUrl, Params2),
509+
?assertEqual(8, Seq2),
510+
?assertEqual(0, Pending2),
511+
?assertEqual(
512+
[
513+
{6, {?DOC1, <<"2-c">>}, ?LEAFREV},
514+
{7, {?DOC3, <<"2-b">>}, ?DELETED},
515+
{8, {?DDOC2, <<"2-c">>}, ?LEAFREV}
516+
],
517+
Rows2
518+
),
519+
% Verify we're getting the same results as since=0
520+
?assertEqual(Res2, changes(DbUrl, "?since=0")),
521+
522+
% Invalid time
523+
Params3 = "?since=2025-01-01Txx:yy:00Z",
524+
{InvalCode, InvalRes} = reqraw(get, DbUrl ++ "/_changes" ++ Params3),
525+
?assertEqual(400, InvalCode),
526+
?assertMatch(
527+
#{
528+
<<"error">> := <<"bad_request">>,
529+
<<"reason">> := <<"invalid_time_format">>
530+
},
531+
json(InvalRes)
532+
),
533+
534+
% Check we can get _time_seq
535+
{TSeqCode, TSeqRes} = reqraw(get, DbUrl ++ "/_time_seq"),
536+
?assertEqual(200, TSeqCode),
537+
Year = integer_to_binary(element(1, date())),
538+
Node = atom_to_binary(config:node_name()),
539+
?assertMatch(
540+
#{
541+
<<"time_seq">> := #{
542+
<<"00000000-ffffffff">> := #{
543+
Node := [[<<Year:4/binary, _/binary>>, 8]]
544+
}
545+
}
546+
},
547+
json(TSeqRes)
548+
),
549+
550+
% Reset the time seq info
551+
{TSeqResetCode, TSeqResetRes} = reqraw(delete, DbUrl ++ "/_time_seq"),
552+
?assertEqual(200, TSeqResetCode),
553+
?assertMatch(#{<<"ok">> := true}, json(TSeqResetRes)),
554+
555+
{TSeqCodeVerify, TSeqResVerify} = reqraw(get, DbUrl ++ "/_time_seq"),
556+
?assertEqual(200, TSeqCodeVerify),
557+
?assertMatch(
558+
#{<<"time_seq">> := #{<<"00000000-ffffffff">> := #{Node := []}}},
559+
json(TSeqResVerify)
560+
),
561+
562+
% Changes feeds still work after a reset
563+
?assertEqual(Res2, changes(DbUrl, Params2)).
564+
565+
t_time_since_q8({_, DbUrl}) ->
566+
% Far into the future, we should get nothing
567+
Params1 = "?since=3000-02-03T04:05:00Z",
568+
{Seq1, Pending1, Rows1} = changes(DbUrl, Params1),
569+
?assertEqual(8, Seq1),
570+
?assertEqual(0, Pending1),
571+
?assertEqual([], Rows1),
572+
573+
% Before the feature is released, should get everything
574+
Params2 = "?since=2025-01-01T04:05:00Z",
575+
{Seq2, Pending2, Rows2} = changes(DbUrl, Params2),
576+
{Seqs, Revs, _Deleted} = lists:unzip3(Rows2),
577+
?assertEqual(8, Seq2),
578+
?assertEqual(0, Pending2),
579+
?assertEqual(
580+
[
581+
{?DDOC2, <<"2-c">>},
582+
{?DOC1, <<"2-c">>},
583+
{?DOC3, <<"2-b">>}
584+
],
585+
lists:sort(Revs)
586+
),
587+
?assertEqual(Seqs, lists:sort(Seqs)).
588+
496589
t_js_filter({_, DbUrl}) ->
497590
DDocId = "_design/filters",
498591
FilterFun = <<"function(doc, req) {return (doc._id == 'doc3')}">>,

src/fabric/src/fabric_view_changes.erl

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727

2828
-import(fabric_db_update_listener, [wait_db_updated/1]).
2929

30+
-define(RFC3339_TIME, [_, _, _, _, $-, _, _, $-, _, _, $T, _, _, $:, _, _, $:, _, _, $Z]).
31+
3032
go(DbName, Feed, Options, Callback, Acc0) when
3133
Feed == "continuous" orelse
3234
Feed == "longpoll" orelse Feed == "eventsource"
3335
->
3436
Args = make_changes_args(Options),
3537
Since = get_start_seq(DbName, Args),
36-
case validate_start_seq(DbName, Since) of
38+
case validate_start_seq(Since) of
3739
ok ->
3840
{ok, Acc} = Callback(start, Acc0),
3941
{Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
@@ -69,7 +71,7 @@ go(DbName, Feed, Options, Callback, Acc0) when
6971
go(DbName, "normal", Options, Callback, Acc0) ->
7072
Args = make_changes_args(Options),
7173
Since = get_start_seq(DbName, Args),
72-
case validate_start_seq(DbName, Since) of
74+
case validate_start_seq(Since) of
7375
ok ->
7476
{ok, Acc} = Callback(start, Acc0),
7577
{ok, Collector} = send_changes(
@@ -369,6 +371,11 @@ get_start_seq(DbName, #changes_args{dir = Dir, since = Since}) when
369371
->
370372
{ok, Info} = fabric:get_db_info(DbName),
371373
couch_util:get_value(update_seq, Info);
374+
get_start_seq(DbName, #changes_args{dir = fwd, since = ?RFC3339_TIME = Since}) ->
375+
case fabric:time_seq_since(DbName, Since) of
376+
{ok, SinceSeq} -> SinceSeq;
377+
{error, Error} -> {error, Error}
378+
end;
372379
get_start_seq(_DbName, #changes_args{dir = fwd, since = Since}) ->
373380
Since.
374381

@@ -728,11 +735,11 @@ make_split_seq({Num, Uuid, Node}, RepCount) when RepCount > 1 ->
728735
make_split_seq(Seq, _) ->
729736
Seq.
730737

731-
validate_start_seq(_DbName, 0) ->
738+
validate_start_seq(0) ->
732739
ok;
733-
validate_start_seq(_DbName, "0") ->
740+
validate_start_seq("0") ->
734741
ok;
735-
validate_start_seq(_DbName, Seq) when is_list(Seq) orelse is_binary(Seq) ->
742+
validate_start_seq(Seq) when is_list(Seq) orelse is_binary(Seq) ->
736743
try
737744
Opaque = unpack_seq_regex_match(Seq),
738745
unpack_seq_decode_term(Opaque),
@@ -741,7 +748,9 @@ validate_start_seq(_DbName, Seq) when is_list(Seq) orelse is_binary(Seq) ->
741748
_:_ ->
742749
Reason = <<"Malformed sequence supplied in 'since' parameter.">>,
743750
{error, {bad_request, Reason}}
744-
end.
751+
end;
752+
validate_start_seq({error, Error}) ->
753+
{error, {bad_request, Error}}.
745754

746755
get_changes_epoch() ->
747756
case application:get_env(fabric, changes_epoch) of

0 commit comments

Comments
 (0)