Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 142 additions & 44 deletions src/fabric/src/fabric_doc_update.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
doc_count,
w,
grouped_docs,
reply
reply,
update_options,
leaders = [],
started = []
}).

go(_, [], _) ->
Expand All @@ -33,25 +36,25 @@ go(DbName, AllDocs0, Opts) ->
validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)),
Options = lists:delete(all_or_nothing, Opts),
GroupedDocs = lists:map(
fun({#shard{name = Name, node = Node} = Shard, Docs}) ->
Docs1 = untag_docs(Docs),
Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs1, Options]}),
{Shard#shard{ref = Ref}, Docs}
fun({#shard{} = Shard, Docs}) ->
{Shard#shard{ref = make_ref()}, Docs}
end,
group_docs_by_shard(DbName, AllDocs)
),
{Workers, _} = lists:unzip(GroupedDocs),
RexiMon = fabric_util:create_monitors(Workers),
W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
Acc0 = #acc{
update_options = Options,
waiting_count = length(Workers),
doc_count = length(AllDocs),
w = list_to_integer(W),
grouped_docs = GroupedDocs,
reply = dict:new()
},
Timeout = fabric_util:request_timeout(),
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of
Acc1 = start_leaders(Acc0),
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc1, infinity, Timeout) of
{ok, {Health, Results}} when
Health =:= ok; Health =:= accepted; Health =:= error
->
Expand All @@ -72,61 +75,78 @@ go(DbName, AllDocs0, Opts) ->
rexi_monitor:stop(RexiMon)
end.

handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, #acc{} = Acc0) ->
handle_message({rexi_DOWN, _, {_, NodeRef}, _}, Worker, #acc{} = Acc0) ->
#acc{grouped_docs = GroupedDocs} = Acc0,
NewGrpDocs = [X || {#shard{node = N}, _} = X <- GroupedDocs, N =/= NodeRef],
skip_message(Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs});
Acc1 = Acc0#acc{waiting_count = length(NewGrpDocs), grouped_docs = NewGrpDocs},
Acc2 = start_followers(Worker, Acc1),
skip_message(Acc2);
handle_message({rexi_EXIT, _}, Worker, #acc{} = Acc0) ->
#acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0,
NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs});
Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs},
Acc2 = start_followers(Worker, Acc1),
skip_message(Acc2);
handle_message({error, all_dbs_active}, Worker, #acc{} = Acc0) ->
% treat it like rexi_EXIT, the hope at least one copy will return successfully
#acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0,
NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs});
Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs},
Acc2 = start_followers(Worker, Acc1),
skip_message(Acc2);
handle_message(internal_server_error, Worker, #acc{} = Acc0) ->
% happens when we fail to load validation functions in an RPC worker
#acc{waiting_count = WC, grouped_docs = GrpDocs} = Acc0,
NewGrpDocs = lists:keydelete(Worker, 1, GrpDocs),
skip_message(Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs});
Acc1 = Acc0#acc{waiting_count = WC - 1, grouped_docs = NewGrpDocs},
Acc2 = start_followers(Worker, Acc1),
skip_message(Acc2);
handle_message(attachment_chunk_received, _Worker, #acc{} = Acc0) ->
{ok, Acc0};
handle_message({ok, Replies}, Worker, #acc{} = Acc0) ->
#acc{
waiting_count = WaitingCount,
doc_count = DocCount,
w = W,
grouped_docs = GroupedDocs,
reply = DocReplyDict0
} = Acc0,
{value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs),
DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
case {WaitingCount, dict:size(DocReplyDict)} of
{1, _} ->
% last message has arrived, we need to conclude things
{Health, W, Reply} = dict:fold(
fun force_reply/3,
{ok, W, []},
DocReplyDict
),
{stop, {Health, Reply}};
{_, DocCount} ->
% we've got at least one reply for each document, let's take a look
case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of
continue ->
{ok, Acc0#acc{
waiting_count = WaitingCount - 1,
grouped_docs = NewGrpDocs,
reply = DocReplyDict
}};
{stop, W, FinalReplies} ->
{stop, {ok, FinalReplies}}
end;
_ ->
{ok, Acc0#acc{
waiting_count = WaitingCount - 1, grouped_docs = NewGrpDocs, reply = DocReplyDict
}}
{value, {_, Docs}, NewGrpDocs0} = lists:keytake(Worker, 1, GroupedDocs),
IsLeader = lists:member(Worker#shard.ref, Acc0#acc.leaders),
DocReplyDict = append_update_replies(Docs, Replies, W, IsLeader, DocReplyDict0),
Acc1 = Acc0#acc{grouped_docs = NewGrpDocs0, reply = DocReplyDict},
Acc2 = remove_conflicts(Docs, Replies, Acc1),
NewGrpDocs = Acc2#acc.grouped_docs,
case skip_message(Acc2) of
{stop, Msg} ->
{stop, Msg};
{ok, Acc3} ->
Acc4 = start_followers(Worker, Acc3),
case {Acc4#acc.waiting_count, dict:size(DocReplyDict)} of
{1, _} ->
% last message has arrived, we need to conclude things
{Health, W, Reply} = dict:fold(
fun force_reply/3,
{ok, W, []},
DocReplyDict
),
{stop, {Health, Reply}};
{_, DocCount} ->
% we've got at least one reply for each document, let's take a look
case dict:fold(fun maybe_reply/3, {stop, W, []}, DocReplyDict) of
continue ->
{ok, Acc4#acc{
waiting_count = Acc4#acc.waiting_count - 1,
grouped_docs = NewGrpDocs
}};
{stop, W, FinalReplies} ->
{stop, {ok, FinalReplies}}
end;
_ ->
{ok, Acc4#acc{
waiting_count = Acc4#acc.waiting_count - 1,
grouped_docs = NewGrpDocs
}}
end
end;
handle_message({missing_stub, Stub}, _, _) ->
throw({missing_stub, Stub});
Expand Down Expand Up @@ -318,13 +338,91 @@ group_docs_by_shard(DbName, Docs) ->
)
).

append_update_replies([], [], DocReplyDict) ->
%% use 'lowest' node that hosts this shard range as leader
is_leader(Worker, Workers) ->
Worker#shard.node ==
lists:min([W#shard.node || W <- Workers, W#shard.range == Worker#shard.range]).

start_leaders(#acc{} = Acc0) ->
#acc{grouped_docs = GroupedDocs} = Acc0,
{Workers, _} = lists:unzip(GroupedDocs),
LeaderRefs = lists:foldl(
fun({Worker, Docs}, RefAcc) ->
case is_leader(Worker, Workers) of
true ->
start_worker(Worker, Docs, Acc0),
[Worker#shard.ref | RefAcc];
false ->
RefAcc
end
end,
[],
GroupedDocs
),
Acc0#acc{leaders = LeaderRefs, started = LeaderRefs}.

start_followers(#shard{} = Leader, #acc{} = Acc0) ->
Followers = [
{Worker, Docs}
|| {Worker, Docs} <- Acc0#acc.grouped_docs,
Worker#shard.range == Leader#shard.range,
not lists:member(Worker#shard.ref, Acc0#acc.started)
],
lists:foreach(
fun({Worker, Docs}) ->
start_worker(Worker, Docs, Acc0)
end,
Followers
),
Started = [Ref || {#shard{ref = Ref}, _Docs} <- Followers],
Acc0#acc{started = lists:append([Started, Acc0#acc.started])}.

start_worker(#shard{ref = Ref} = Worker, Docs, #acc{} = Acc0) when is_reference(Ref) ->
#shard{name = Name, node = Node} = Worker,
#acc{update_options = UpdateOptions} = Acc0,
rexi:cast_ref(Ref, Node, {fabric_rpc, update_docs, [Name, untag_docs(Docs), UpdateOptions]}),
ok;
start_worker(#shard{ref = undefined}, _Docs, #acc{}) ->
% for unit tests below.
ok.

append_update_replies([], [], _W, _IsLeader, DocReplyDict) ->
DocReplyDict;
append_update_replies([Doc | Rest], [], Dict0) ->
append_update_replies([Doc | Rest], [], W, IsLeader, Dict0) ->
% icky, if replicated_changes only errors show up in result
append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
append_update_replies([Doc | Rest1], [Reply | Rest2], Dict0) ->
append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
append_update_replies(Rest, [], W, IsLeader, dict:append(Doc, noreply, Dict0));
append_update_replies([Doc | Rest1], [conflict | Rest2], W, true, Dict0) ->
%% fake conflict replies from followers as we won't ask them
append_update_replies(
Rest1, Rest2, W, true, dict:append_list(Doc, lists:duplicate(W, conflict), Dict0)
);
append_update_replies([Doc | Rest1], [Reply | Rest2], W, IsLeader, Dict0) ->
append_update_replies(Rest1, Rest2, W, IsLeader, dict:append(Doc, Reply, Dict0)).

%% leader found a conflict, remove that doc from the other (follower) workers,
%% removing the worker entirely if no docs remain.
remove_conflicts([], [], #acc{} = Acc0) ->
Acc0;
remove_conflicts([Doc | DocRest], [conflict | ReplyRest], #acc{} = Acc0) ->
#acc{grouped_docs = GroupedDocs0} = Acc0,
GroupedDocs1 = lists:foldl(
fun({Worker, Docs}, FoldAcc) ->
case lists:delete(Doc, Docs) of
[] ->
FoldAcc#acc{waiting_count = FoldAcc#acc.waiting_count - 1};
Rest ->
[{Worker, Rest} | FoldAcc]
end
end,
[],
GroupedDocs0
),
Acc1 = Acc0#acc{grouped_docs = GroupedDocs1},
remove_conflicts(DocRest, ReplyRest, Acc1);
remove_conflicts([_Doc | DocRest], [_Reply | ReplyRest], #acc{} = Acc0) ->
remove_conflicts(DocRest, ReplyRest, Acc0);
remove_conflicts([_Doc | DocRest], [], #acc{} = Acc0) ->
remove_conflicts(DocRest, [], Acc0).

skip_message(#acc{waiting_count = 0, w = W, reply = DocReplyDict}) ->
{Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict),
Expand Down