|  | 
| 14 | 14 | 
 | 
| 15 | 15 | -module(dreyfus_fabric_cleanup). | 
| 16 | 16 | 
 | 
| 17 |  | --include("dreyfus.hrl"). | 
| 18 |  | --include_lib("mem3/include/mem3.hrl"). | 
| 19 |  | --include_lib("couch/include/couch_db.hrl"). | 
| 20 |  | - | 
| 21 |  | --export([go/1]). | 
|  | 17 | +-export([go/1, go_local/3]). | 
| 22 | 18 | 
 | 
| 23 | 19 | go(DbName) -> | 
| 24 |  | -    DesignDocs = | 
| 25 |  | -        case fabric:design_docs(DbName) of | 
| 26 |  | -            {ok, DDocs} when is_list(DDocs) -> | 
| 27 |  | -                DDocs; | 
| 28 |  | -            Else -> | 
| 29 |  | -                couch_log:debug("Invalid design docs: ~p~n", [Else]), | 
| 30 |  | -                [] | 
| 31 |  | -        end, | 
| 32 |  | -    ActiveSigs = lists:usort( | 
| 33 |  | -        lists:flatmap( | 
| 34 |  | -            fun active_sigs/1, | 
| 35 |  | -            [couch_doc:from_json_obj(DD) || DD <- DesignDocs] | 
| 36 |  | -        ) | 
| 37 |  | -    ), | 
| 38 |  | -    cleanup_local_purge_doc(DbName, ActiveSigs), | 
| 39 |  | -    clouseau_rpc:cleanup(DbName, ActiveSigs), | 
| 40 |  | -    ok. | 
|  | 20 | +    case fabric_util:get_design_doc_records(DbName) of | 
|  | 21 | +        {ok, DDocs} when is_list(DDocs) -> | 
|  | 22 | +            Sigs = dreyfus_util:get_signatures_from_ddocs(DbName, DDocs), | 
|  | 23 | +            Shards = mem3:shards(DbName), | 
|  | 24 | +            ByNode = maps:groups_from_list(fun mem3:node/1, fun mem3:name/1, Shards), | 
|  | 25 | +            Fun = fun(Node, Dbs, Acc) -> | 
|  | 26 | +                erpc:send_request(Node, ?MODULE, go_local, [DbName, Dbs, Sigs], Node, Acc) | 
|  | 27 | +            end, | 
|  | 28 | +            Reqs = maps:fold(Fun, erpc:reqids_new(), ByNode), | 
|  | 29 | +            recv(DbName, Reqs, fabric_util:abs_request_timeout()); | 
|  | 30 | +        Error -> | 
|  | 31 | +            couch_log:error("~p : error fetching ddocs db:~p ~p", [?MODULE, DbName, Error]), | 
|  | 32 | +            Error | 
|  | 33 | +    end. | 
| 41 | 34 | 
 | 
| 42 |  | -active_sigs(#doc{body = {Fields}} = Doc) -> | 
|  | 35 | +% erpc endpoint for go/1 and fabric_index_cleanup:cleanup_indexes/2 | 
|  | 36 | +% | 
|  | 37 | +go_local(DbName, Dbs, #{} = Sigs) -> | 
| 43 | 38 |     try | 
| 44 |  | -        {RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}), | 
| 45 |  | -        {IndexNames, _} = lists:unzip(RawIndexes), | 
| 46 |  | -        [ | 
| 47 |  | -            begin | 
| 48 |  | -                {ok, Index} = dreyfus_index:design_doc_to_index(Doc, IndexName), | 
| 49 |  | -                Index#index.sig | 
| 50 |  | -            end | 
| 51 |  | -         || IndexName <- IndexNames | 
| 52 |  | -        ] | 
|  | 39 | +        lists:foreach( | 
|  | 40 | +            fun(Db) -> | 
|  | 41 | +                Checkpoints = dreyfus_util:get_purge_checkpoints(Db), | 
|  | 42 | +                ok = couch_index_util:cleanup_purges(Db, Sigs, Checkpoints) | 
|  | 43 | +            end, | 
|  | 44 | +            Dbs | 
|  | 45 | +        ), | 
|  | 46 | +        clouseau_rpc:cleanup(DbName, Sigs), | 
|  | 47 | +        ok | 
| 53 | 48 |     catch | 
| 54 |  | -        error:{badmatch, _Error} -> | 
| 55 |  | -            [] | 
|  | 49 | +        error:database_does_not_exist -> | 
|  | 50 | +            ok | 
| 56 | 51 |     end. | 
| 57 | 52 | 
 | 
| 58 |  | -cleanup_local_purge_doc(DbName, ActiveSigs) -> | 
| 59 |  | -    {ok, BaseDir} = clouseau_rpc:get_root_dir(), | 
| 60 |  | -    DbNamePattern = <<DbName/binary, ".*">>, | 
| 61 |  | -    Pattern0 = filename:join([BaseDir, "shards", "*", DbNamePattern, "*"]), | 
| 62 |  | -    Pattern = binary_to_list(iolist_to_binary(Pattern0)), | 
| 63 |  | -    DirListStrs = filelib:wildcard(Pattern), | 
| 64 |  | -    DirList = [iolist_to_binary(DL) || DL <- DirListStrs], | 
| 65 |  | -    LocalShards = mem3:local_shards(DbName), | 
| 66 |  | -    ActiveDirs = lists:foldl( | 
| 67 |  | -        fun(LS, AccOuter) -> | 
| 68 |  | -            lists:foldl( | 
| 69 |  | -                fun(Sig, AccInner) -> | 
| 70 |  | -                    DirName = filename:join([BaseDir, LS#shard.name, Sig]), | 
| 71 |  | -                    [DirName | AccInner] | 
| 72 |  | -                end, | 
| 73 |  | -                AccOuter, | 
| 74 |  | -                ActiveSigs | 
| 75 |  | -            ) | 
| 76 |  | -        end, | 
| 77 |  | -        [], | 
| 78 |  | -        LocalShards | 
| 79 |  | -    ), | 
| 80 |  | - | 
| 81 |  | -    DeadDirs = DirList -- ActiveDirs, | 
| 82 |  | -    lists:foreach( | 
| 83 |  | -        fun(IdxDir) -> | 
| 84 |  | -            Sig = dreyfus_util:get_signature_from_idxdir(IdxDir), | 
| 85 |  | -            case Sig of | 
| 86 |  | -                undefined -> | 
| 87 |  | -                    ok; | 
| 88 |  | -                _ -> | 
| 89 |  | -                    DocId = dreyfus_util:get_local_purge_doc_id(Sig), | 
| 90 |  | -                    LocalShards = mem3:local_shards(DbName), | 
| 91 |  | -                    lists:foreach( | 
| 92 |  | -                        fun(LS) -> | 
| 93 |  | -                            ShardDbName = LS#shard.name, | 
| 94 |  | -                            {ok, ShardDb} = couch_db:open_int(ShardDbName, []), | 
| 95 |  | -                            case couch_db:open_doc(ShardDb, DocId, []) of | 
| 96 |  | -                                {ok, LocalPurgeDoc} -> | 
| 97 |  | -                                    couch_db:update_doc( | 
| 98 |  | -                                        ShardDb, | 
| 99 |  | -                                        LocalPurgeDoc#doc{deleted = true}, | 
| 100 |  | -                                        [?ADMIN_CTX] | 
| 101 |  | -                                    ); | 
| 102 |  | -                                {not_found, _} -> | 
| 103 |  | -                                    ok | 
| 104 |  | -                            end, | 
| 105 |  | -                            couch_db:close(ShardDb) | 
| 106 |  | -                        end, | 
| 107 |  | -                        LocalShards | 
| 108 |  | -                    ) | 
| 109 |  | -            end | 
| 110 |  | -        end, | 
| 111 |  | -        DeadDirs | 
| 112 |  | -    ). | 
|  | 53 | +recv(DbName, Reqs, Timeout) -> | 
|  | 54 | +    case erpc:receive_response(Reqs, Timeout, true) of | 
|  | 55 | +        {ok, _Lable, Reqs1} -> | 
|  | 56 | +            recv(DbName, Reqs1, Timeout); | 
|  | 57 | +        {Error, Label, Reqs1} -> | 
|  | 58 | +            ErrMsg = "~p : error cleaning dreyfus indexes db:~p req:~p error:~p", | 
|  | 59 | +            couch_log:error(ErrMsg, [?MODULE, DbName, Label, Error]), | 
|  | 60 | +            recv(DbName, Reqs1, Timeout); | 
|  | 61 | +        no_request -> | 
|  | 62 | +            ok | 
|  | 63 | +    end. | 
0 commit comments