|
14 | 14 |
|
15 | 15 | -module(dreyfus_fabric_cleanup). |
16 | 16 |
|
17 | | --include("dreyfus.hrl"). |
18 | | --include_lib("mem3/include/mem3.hrl"). |
19 | | --include_lib("couch/include/couch_db.hrl"). |
20 | | - |
21 | | --export([go/1]). |
| 17 | +-export([go/1, go_local/3]). |
22 | 18 |
|
23 | 19 | go(DbName) -> |
24 | | - DesignDocs = |
25 | | - case fabric:design_docs(DbName) of |
26 | | - {ok, DDocs} when is_list(DDocs) -> |
27 | | - DDocs; |
28 | | - Else -> |
29 | | - couch_log:debug("Invalid design docs: ~p~n", [Else]), |
30 | | - [] |
31 | | - end, |
32 | | - ActiveSigs = lists:usort( |
33 | | - lists:flatmap( |
34 | | - fun active_sigs/1, |
35 | | - [couch_doc:from_json_obj(DD) || DD <- DesignDocs] |
36 | | - ) |
37 | | - ), |
38 | | - cleanup_local_purge_doc(DbName, ActiveSigs), |
39 | | - clouseau_rpc:cleanup(DbName, ActiveSigs), |
40 | | - ok. |
| 20 | + case fabric_util:get_design_doc_records(DbName) of |
| 21 | + {ok, DDocs} when is_list(DDocs) -> |
| 22 | + Sigs = dreyfus_util:get_signatures_from_ddocs(DbName, DDocs), |
| 23 | + Shards = mem3:shards(DbName), |
| 24 | + ByNode = maps:groups_from_list(fun mem3:node/1, fun mem3:name/1, Shards), |
| 25 | + Fun = fun(Node, Dbs, Acc) -> |
| 26 | + erpc:send_request(Node, ?MODULE, go_local, [DbName, Dbs, Sigs], Node, Acc) |
| 27 | + end, |
| 28 | + Reqs = maps:fold(Fun, erpc:reqids_new(), ByNode), |
| 29 | + recv(DbName, Reqs, fabric_util:abs_request_timeout()); |
| 30 | + Error -> |
| 31 | + couch_log:error("~p : error fetching ddocs db:~p ~p", [?MODULE, DbName, Error]), |
| 32 | + Error |
| 33 | + end. |
41 | 34 |
|
42 | | -active_sigs(#doc{body = {Fields}} = Doc) -> |
| 35 | +% erpc endpoint for go/1 and fabric_index_cleanup:cleanup_indexes/2 |
| 36 | +% |
| 37 | +go_local(DbName, Dbs, #{} = Sigs) -> |
43 | 38 | try |
44 | | - {RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}), |
45 | | - {IndexNames, _} = lists:unzip(RawIndexes), |
46 | | - [ |
47 | | - begin |
48 | | - {ok, Index} = dreyfus_index:design_doc_to_index(Doc, IndexName), |
49 | | - Index#index.sig |
50 | | - end |
51 | | - || IndexName <- IndexNames |
52 | | - ] |
| 39 | + lists:foreach( |
| 40 | + fun(Db) -> |
| 41 | + Checkpoints = dreyfus_util:get_purge_checkpoints(Db), |
| 42 | + ok = couch_index_util:cleanup_purges(Db, Sigs, Checkpoints) |
| 43 | + end, |
| 44 | + Dbs |
| 45 | + ), |
| 46 | + clouseau_rpc:cleanup(DbName, Sigs), |
| 47 | + ok |
53 | 48 | catch |
54 | | - error:{badmatch, _Error} -> |
55 | | - [] |
| 49 | + error:database_does_not_exist -> |
| 50 | + ok |
56 | 51 | end. |
57 | 52 |
|
58 | | -cleanup_local_purge_doc(DbName, ActiveSigs) -> |
59 | | - {ok, BaseDir} = clouseau_rpc:get_root_dir(), |
60 | | - DbNamePattern = <<DbName/binary, ".*">>, |
61 | | - Pattern0 = filename:join([BaseDir, "shards", "*", DbNamePattern, "*"]), |
62 | | - Pattern = binary_to_list(iolist_to_binary(Pattern0)), |
63 | | - DirListStrs = filelib:wildcard(Pattern), |
64 | | - DirList = [iolist_to_binary(DL) || DL <- DirListStrs], |
65 | | - LocalShards = mem3:local_shards(DbName), |
66 | | - ActiveDirs = lists:foldl( |
67 | | - fun(LS, AccOuter) -> |
68 | | - lists:foldl( |
69 | | - fun(Sig, AccInner) -> |
70 | | - DirName = filename:join([BaseDir, LS#shard.name, Sig]), |
71 | | - [DirName | AccInner] |
72 | | - end, |
73 | | - AccOuter, |
74 | | - ActiveSigs |
75 | | - ) |
76 | | - end, |
77 | | - [], |
78 | | - LocalShards |
79 | | - ), |
80 | | - |
81 | | - DeadDirs = DirList -- ActiveDirs, |
82 | | - lists:foreach( |
83 | | - fun(IdxDir) -> |
84 | | - Sig = dreyfus_util:get_signature_from_idxdir(IdxDir), |
85 | | - case Sig of |
86 | | - undefined -> |
87 | | - ok; |
88 | | - _ -> |
89 | | - DocId = dreyfus_util:get_local_purge_doc_id(Sig), |
90 | | - LocalShards = mem3:local_shards(DbName), |
91 | | - lists:foreach( |
92 | | - fun(LS) -> |
93 | | - ShardDbName = LS#shard.name, |
94 | | - {ok, ShardDb} = couch_db:open_int(ShardDbName, []), |
95 | | - case couch_db:open_doc(ShardDb, DocId, []) of |
96 | | - {ok, LocalPurgeDoc} -> |
97 | | - couch_db:update_doc( |
98 | | - ShardDb, |
99 | | - LocalPurgeDoc#doc{deleted = true}, |
100 | | - [?ADMIN_CTX] |
101 | | - ); |
102 | | - {not_found, _} -> |
103 | | - ok |
104 | | - end, |
105 | | - couch_db:close(ShardDb) |
106 | | - end, |
107 | | - LocalShards |
108 | | - ) |
109 | | - end |
110 | | - end, |
111 | | - DeadDirs |
112 | | - ). |
| 53 | +recv(DbName, Reqs, Timeout) -> |
| 54 | + case erpc:receive_response(Reqs, Timeout, true) of |
| 55 | + {ok, _Lable, Reqs1} -> |
| 56 | + recv(DbName, Reqs1, Timeout); |
| 57 | + {Error, Label, Reqs1} -> |
| 58 | + ErrMsg = "~p : error cleaning dreyfus indexes db:~p req:~p error:~p", |
| 59 | + couch_log:error(ErrMsg, [?MODULE, DbName, Label, Error]), |
| 60 | + recv(DbName, Reqs1, Timeout); |
| 61 | + no_request -> |
| 62 | + ok |
| 63 | + end. |
0 commit comments