3030
3131#include " tscore/hugepages.h"
3232#include " tscore/Random.h"
33+ #include " iocore/eventsystem/Tasks.h"
34+
35+ #include < vector>
36+ #include < map>
37+ #include < atomic>
3338
3439#ifdef LOOP_CHECK_MODE
3540#define DIR_LOOP_THRESHOLD 1000
@@ -849,14 +854,55 @@ dir_lookaside_remove(const CacheKey *key, StripeSM *stripe)
849854 return ;
850855}
851856
852- // Cache Sync
853- //
857+ // Cache Dir Sync
858+
859+ static std::vector<std::unique_ptr<CacheSync>> cache_syncs;
854860
855861void
856862dir_sync_init ()
857863{
858- cacheDirSync = new CacheSync;
859- cacheDirSync->trigger = eventProcessor.schedule_in (cacheDirSync, HRTIME_SECONDS (cache_config_dir_sync_frequency));
864+ static std::atomic<bool > initialized{false };
865+ ink_release_assert (!initialized.exchange (true ));
866+
867+ // No stripes to sync
868+ if (gnstripes == 0 ) {
869+ return ;
870+ }
871+
872+ std::map<CacheDisk *, std::vector<int >> drive_stripe_map;
873+
874+ // Group stripes by disk
875+ for (int i = 0 ; i < gnstripes; i++) {
876+ drive_stripe_map[gstripes[i]->disk ].push_back (i);
877+ }
878+ // Any negative value means "all drives" (maximum parallelism)
879+ int num_tasks = (cache_config_dir_sync_parallel_tasks < 0 ) ?
880+ drive_stripe_map.size () :
881+ std::min (drive_stripe_map.size (), static_cast <size_t >(std::max (1 , cache_config_dir_sync_parallel_tasks)));
882+
883+ cache_syncs.resize (num_tasks);
884+ for (int i = 0 ; i < num_tasks; i++) {
885+ cache_syncs[i] = std::make_unique<CacheSync>();
886+ }
887+
888+ int task_idx = 0 ;
889+
890+ for (auto &[disk, indices] : drive_stripe_map) {
891+ int target_task = task_idx % num_tasks;
892+
893+ Dbg (dbg_ctl_cache_dir_sync, " Disk %s: %zu stripe(s) assigned to task %d" , disk->path , indices.size (), target_task);
894+ for (int stripe_idx : indices) {
895+ cache_syncs[target_task]->stripe_indices .push_back (stripe_idx);
896+ gstripes[stripe_idx]->cache_sync = cache_syncs[target_task].get ();
897+ }
898+ task_idx++;
899+ }
900+
901+ for (int i = 0 ; i < num_tasks; i++) {
902+ cache_syncs[i]->current_index = 0 ;
903+ cache_syncs[i]->trigger =
904+ eventProcessor.schedule_in (cache_syncs[i].get (), HRTIME_SECONDS (cache_config_dir_sync_frequency), ET_TASK);
905+ }
860906}
861907
862908void
@@ -909,6 +955,16 @@ void
909955sync_cache_dir_on_shutdown ()
910956{
911957 Dbg (dbg_ctl_cache_dir_sync, " sync started" );
958+
959+ // Cancel any active async sync tasks first
960+ for (auto &sync : cache_syncs) {
961+ if (sync && sync->trigger ) {
962+ sync->trigger ->cancel_action ();
963+ sync->trigger = nullptr ;
964+ }
965+ }
966+ cache_syncs.clear ();
967+
912968 EThread *t = reinterpret_cast <EThread *>(0xdeadbeef );
913969 for (int i = 0 ; i < gnstripes; i++) {
914970 gstripes[i]->shutdown (t);
@@ -917,34 +973,36 @@ sync_cache_dir_on_shutdown()
917973}
918974
919975int
920- CacheSync::mainEvent (int event, Event *e )
976+ CacheSync::mainEvent (int event, Event * /* e ATS_UNUSED */ )
921977{
922978 if (trigger) {
923979 trigger->cancel_action ();
924980 trigger = nullptr ;
925981 }
926982
927983Lrestart:
928- if (stripe_index >= gnstripes) {
929- stripe_index = 0 ;
984+ if (current_index >= static_cast <int >(stripe_indices.size ())) {
985+ current_index = 0 ;
986+ #if FREE_BUF_BETWEEN_CYCLES
987+ // Free buffer between sync cycles to avoid holding large amounts of memory
988+ // ToDo: If we decide needed, we should add a RecordsConfig here.
930989 if (buf) {
931990 if (buf_huge) {
932991 ats_free_hugepage (buf, buflen);
933992 } else {
934993 ats_free (buf);
935994 }
936- buflen = 0 ;
937995 buf = nullptr ;
996+ buflen = 0 ;
938997 buf_huge = false ;
939998 }
940- Dbg (dbg_ctl_cache_dir_sync, " sync done" );
941- if (event == EVENT_INTERVAL) {
942- trigger = e->ethread ->schedule_in (this , HRTIME_SECONDS (cache_config_dir_sync_frequency));
943- } else {
944- trigger = eventProcessor.schedule_in (this , HRTIME_SECONDS (cache_config_dir_sync_frequency));
945- }
999+ #endif
1000+ Dbg (dbg_ctl_cache_dir_sync, " sync cycle done" );
1001+ trigger = eventProcessor.schedule_in (this , HRTIME_SECONDS (cache_config_dir_sync_frequency), ET_TASK);
9461002 return EVENT_CONT;
9471003 }
1004+ stripe_index = stripe_indices[current_index];
1005+ current_index++;
9481006
9491007 StripeSM *stripe = gstripes[stripe_index]; // must be named "vol" to make STAT macros work.
9501008
@@ -1059,9 +1117,7 @@ CacheSync::mainEvent(int event, Event *e)
10591117 return EVENT_CONT;
10601118 }
10611119Ldone:
1062- // done
10631120 writepos = 0 ;
1064- ++stripe_index;
10651121 goto Lrestart;
10661122}
10671123
0 commit comments