Skip to content

Commit d9ed36b

Browse files
committed
Parallel dir entry sync options
1 parent c7c04b1 commit d9ed36b

File tree

9 files changed

+146
-30
lines changed

9 files changed

+146
-30
lines changed

doc/admin-guide/files/records.yaml.en.rst

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2573,6 +2573,23 @@ Cache Control
25732573
:units: millisecond
25742574

25752575
How long to wait between each write cycle when syncing the cache directory to disk.
2576+
.. ts:cv:: CONFIG proxy.config.cache.dir.sync_parallel_tasks INT 1
2577+
:reloadable:
2578+
2579+
Number of parallel tasks to use for directory syncing. Each task syncs
2580+
directories for a different physical drive on ET_TASK threads.
2581+
2582+
======= ==================================================================
2583+
Value Description
2584+
======= ==================================================================
2585+
``-1`` Unlimited - one task per drive (maximum parallelism)
2586+
``1`` Sequential - one task for all drives (default, safe)
2587+
``N`` Parallel - up to N tasks (drives) sync concurrently
2588+
======= ==================================================================
2589+
2590+
Default is ``1`` (sequential). Set to ``-1`` for maximum parallelism on
2591+
high-end NVMe arrays, or to ``4-8`` for balanced performance on multi-drive
2592+
systems.
25762593

25772594
.. ts:cv:: CONFIG proxy.config.cache.limits.http.max_alts INT 5
25782595

doc/developer-guide/cache-architecture/architecture.en.rst

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -963,6 +963,26 @@ own entry in the volume directory which are computationally chained (each
963963
table is accumulated in the earliest ``Doc`` which has the offsets of the first
964964
byte for each fragment.
965965

966+
Directory Synchronization
967+
-------------------------
968+
969+
The in-memory stripe directories are periodically synchronized to disk to ensure
970+
cache metadata persistence across restarts. This is handled by ``CacheSync``
971+
instances scheduled on ET_TASK threads.
972+
973+
By default, directory syncing is sequential (one task handles all stripes). The
974+
:ts:cv:`proxy.config.cache.dir.sync_parallel_tasks` configuration allows parallel
975+
syncing across multiple drives:
976+
977+
* ``1`` - Sequential (default): single task syncs all stripes
978+
* ``N`` - Parallel: up to N tasks sync concurrently
979+
* ``<0`` - Maximum: one task per physical drive
980+
981+
Stripes are distributed across sync tasks based on their physical drives, ensuring
982+
that I/O operations for different drives can proceed in parallel on separate ET_TASK
983+
threads. Each ``CacheSync`` instance maintains a list of stripe indices to sync and
984+
processes them sequentially within its task.
985+
966986
.. _evacuation-mechanics:
967987

968988
Evacuation Mechanics

src/iocore/cache/Cache.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ int cache_config_log_alternate_eviction = 0;
6464
int cache_config_dir_sync_frequency = 60;
6565
int cache_config_dir_sync_delay = 500;
6666
int cache_config_dir_sync_max_write = (2 * 1024 * 1024);
67+
int cache_config_dir_sync_parallel_tasks = 1;
6768
int cache_config_permit_pinning = 0;
6869
int cache_config_select_alternate = 1;
6970
int cache_config_max_doc_size = 0;
@@ -90,7 +91,6 @@ Cache *theCache = nullptr;
9091
std::vector<std::unique_ptr<CacheDisk>> gdisks;
9192
int gndisks = 0;
9293
Cache *caches[NUM_CACHE_FRAG_TYPES] = {nullptr};
93-
CacheSync *cacheDirSync = nullptr;
9494
Store theCacheStore;
9595
StripeSM **gstripes = nullptr;
9696
std::atomic<int> gnstripes = 0;
@@ -884,6 +884,9 @@ ink_cache_init(ts::ModuleVersion v)
884884

885885
cacheProcessor.wait_for_cache = RecGetRecordInt("proxy.config.http.wait_for_cache").value_or(0);
886886

887+
RecEstablishStaticConfigInt32(cache_config_dir_sync_parallel_tasks, "proxy.config.cache.dir.sync_parallel_tasks");
888+
Dbg(dbg_ctl_cache_init, "proxy.config.cache.dir.sync_parallel_tasks = %d", cache_config_dir_sync_parallel_tasks);
889+
887890
RecEstablishStaticConfigInt32(cache_config_persist_bad_disks, "proxy.config.cache.persist_bad_disks");
888891
Dbg(dbg_ctl_cache_init, "proxy.config.cache.persist_bad_disks = %d", cache_config_persist_bad_disks);
889892
if (cache_config_persist_bad_disks) {

src/iocore/cache/CacheDir.cc

Lines changed: 68 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@
3030

3131
#include "tscore/hugepages.h"
3232
#include "tscore/Random.h"
33+
#include "iocore/eventsystem/Tasks.h"
34+
35+
#include <vector>
36+
#include <map>
37+
#include <atomic>
3338

3439
#ifdef LOOP_CHECK_MODE
3540
#define DIR_LOOP_THRESHOLD 1000
@@ -849,14 +854,51 @@ dir_lookaside_remove(const CacheKey *key, StripeSM *stripe)
849854
return;
850855
}
851856

852-
// Cache Sync
853-
//
857+
// Cache Dir Sync
858+
859+
static std::vector<std::unique_ptr<CacheSync>> cache_syncs;
854860

855861
void
856862
dir_sync_init()
857863
{
858-
cacheDirSync = new CacheSync;
859-
cacheDirSync->trigger = eventProcessor.schedule_in(cacheDirSync, HRTIME_SECONDS(cache_config_dir_sync_frequency));
864+
static std::atomic<bool> initialized{false};
865+
ink_release_assert(!initialized.exchange(true)); // Should only be called once
866+
867+
std::map<CacheDisk *, std::vector<int>> drive_stripe_map;
868+
869+
// Group stripes by disk
870+
for (int i = 0; i < gnstripes; i++) {
871+
drive_stripe_map[gstripes[i]->disk].push_back(i);
872+
}
873+
874+
// Any negative value means "all drives" (maximum parallelism)
875+
int num_tasks = (cache_config_dir_sync_parallel_tasks < 0) ?
876+
drive_stripe_map.size() :
877+
std::min(drive_stripe_map.size(), static_cast<size_t>(std::max(1, cache_config_dir_sync_parallel_tasks)));
878+
879+
cache_syncs.resize(num_tasks);
880+
for (int i = 0; i < num_tasks; i++) {
881+
cache_syncs[i] = std::make_unique<CacheSync>();
882+
}
883+
884+
int task_idx = 0;
885+
886+
for (auto &[disk, indices] : drive_stripe_map) {
887+
int target_task = task_idx % num_tasks;
888+
889+
Dbg(dbg_ctl_cache_dir_sync, "Disk %s: %zu stripe(s) assigned to task %d", disk->path, indices.size(), target_task);
890+
for (int stripe_idx : indices) {
891+
cache_syncs[target_task]->stripe_indices.push_back(stripe_idx);
892+
gstripes[stripe_idx]->cache_sync = cache_syncs[target_task].get();
893+
}
894+
task_idx++;
895+
}
896+
897+
for (int i = 0; i < num_tasks; i++) {
898+
cache_syncs[i]->current_index = 0;
899+
cache_syncs[i]->trigger =
900+
eventProcessor.schedule_in(cache_syncs[i].get(), HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK);
901+
}
860902
}
861903

862904
void
@@ -909,6 +951,16 @@ void
909951
sync_cache_dir_on_shutdown()
910952
{
911953
Dbg(dbg_ctl_cache_dir_sync, "sync started");
954+
955+
// Cancel any active async sync tasks first
956+
for (auto &sync : cache_syncs) {
957+
if (sync && sync->trigger) {
958+
sync->trigger->cancel_action();
959+
sync->trigger = nullptr;
960+
}
961+
}
962+
cache_syncs.clear();
963+
912964
EThread *t = reinterpret_cast<EThread *>(0xdeadbeef);
913965
for (int i = 0; i < gnstripes; i++) {
914966
gstripes[i]->shutdown(t);
@@ -917,34 +969,36 @@ sync_cache_dir_on_shutdown()
917969
}
918970

919971
int
920-
CacheSync::mainEvent(int event, Event *e)
972+
CacheSync::mainEvent(int event, Event * /* e ATS_UNUSED */)
921973
{
922974
if (trigger) {
923975
trigger->cancel_action();
924976
trigger = nullptr;
925977
}
926978

927979
Lrestart:
928-
if (stripe_index >= gnstripes) {
929-
stripe_index = 0;
980+
if (current_index >= static_cast<int>(stripe_indices.size())) {
981+
current_index = 0;
982+
#if FREE_BUF_BETWEEN_CYCLES
983+
// Free buffer between sync cycles to avoid holding large amounts of memory
984+
// ToDo: If we decide needed, we should add a RecordsConfig here.
930985
if (buf) {
931986
if (buf_huge) {
932987
ats_free_hugepage(buf, buflen);
933988
} else {
934989
ats_free(buf);
935990
}
936-
buflen = 0;
937991
buf = nullptr;
992+
buflen = 0;
938993
buf_huge = false;
939994
}
940-
Dbg(dbg_ctl_cache_dir_sync, "sync done");
941-
if (event == EVENT_INTERVAL) {
942-
trigger = e->ethread->schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency));
943-
} else {
944-
trigger = eventProcessor.schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency));
945-
}
995+
#endif
996+
Dbg(dbg_ctl_cache_dir_sync, "sync cycle done");
997+
trigger = eventProcessor.schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK);
946998
return EVENT_CONT;
947999
}
1000+
stripe_index = stripe_indices[current_index];
1001+
current_index++;
9481002

9491003
StripeSM *stripe = gstripes[stripe_index]; // must be named "vol" to make STAT macros work.
9501004

@@ -1059,9 +1113,7 @@ CacheSync::mainEvent(int event, Event *e)
10591113
return EVENT_CONT;
10601114
}
10611115
Ldone:
1062-
// done
10631116
writepos = 0;
1064-
++stripe_index;
10651117
goto Lrestart;
10661118
}
10671119

src/iocore/cache/P_CacheDir.h

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "iocore/eventsystem/Continuation.h"
2929
#include "iocore/aio/AIO.h"
3030
#include "tscore/Version.h"
31+
#include "tscore/hugepages.h"
3132

3233
#include <cstdint>
3334
#include <ctime>
@@ -39,6 +40,7 @@ struct CacheVC;
3940
class CacheEvacuateDocVC;
4041

4142
// #define LOOP_CHECK_MODE 1
43+
// #define FREE_BUF_BETWEEN_CYCLES 1
4244

4345
/*
4446
Directory layout
@@ -246,10 +248,29 @@ struct CacheSync : public Continuation {
246248
AIOCallback io;
247249
Event *trigger = nullptr;
248250
ink_hrtime start_time = 0;
249-
int mainEvent(int event, Event *e);
250-
void aio_write(int fd, char *b, int n, off_t o);
251+
252+
std::vector<int> stripe_indices;
253+
int current_index{0};
254+
255+
int mainEvent(int event, Event *e);
256+
void aio_write(int fd, char *b, int n, off_t o);
251257

252258
CacheSync() : Continuation(new_ProxyMutex()) { SET_HANDLER(&CacheSync::mainEvent); }
259+
260+
~CacheSync()
261+
{
262+
if (trigger) {
263+
trigger->cancel_action();
264+
trigger = nullptr;
265+
}
266+
if (buf) {
267+
if (buf_huge) {
268+
ats_free_hugepage(buf, buflen);
269+
} else {
270+
ats_free(buf);
271+
}
272+
}
273+
}
253274
};
254275

255276
struct StripteHeaderFooter {

src/iocore/cache/P_CacheInternal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ extern CacheStatsBlock cache_rsb;
9696
extern int cache_config_dir_sync_frequency;
9797
extern int cache_config_dir_sync_delay;
9898
extern int cache_config_dir_sync_max_write;
99+
extern int cache_config_dir_sync_parallel_tasks;
99100
extern int cache_config_http_max_alts;
100101
extern int cache_config_log_alternate_eviction;
101102
extern int cache_config_permit_pinning;
@@ -140,7 +141,6 @@ struct CacheRemoveCont : public Continuation {
140141
// Global Data
141142
extern ClassAllocator<CacheVC> cacheVConnectionAllocator;
142143
extern ClassAllocator<CacheEvacuateDocVC> cacheEvacuateDocVConnectionAllocator;
143-
extern CacheSync *cacheDirSync;
144144
// Function Prototypes
145145
int cache_write(CacheVC *, CacheHTTPInfoVector *);
146146
int get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key);

src/iocore/cache/StripeSM.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -709,9 +709,9 @@ StripeSM::aggWriteDone(int event, Event *e)
709709
{
710710
cancel_trigger();
711711

712-
// ensure we have the cacheDirSync lock if we intend to call it later
712+
// ensure we have the cache_sync lock if we intend to call it later
713713
// retaking the current mutex recursively is a NOOP
714-
CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
714+
CACHE_TRY_LOCK(lock, dir_sync_waiting ? cache_sync->mutex : mutex, mutex->thread_holding);
715715
if (!lock.is_locked()) {
716716
eventProcessor.schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
717717
return EVENT_CONT;
@@ -759,7 +759,7 @@ StripeSM::aggWriteDone(int event, Event *e)
759759
}
760760
if (dir_sync_waiting) {
761761
dir_sync_waiting = false;
762-
cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr);
762+
cache_sync->handleEvent(EVENT_IMMEDIATE, nullptr);
763763
}
764764
if (this->_write_buffer.get_pending_writers().head || sync.head) {
765765
return aggWrite(event, e);

src/iocore/cache/StripeSM.h

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,14 @@ class StripeSM : public Continuation, public Stripe
8585

8686
StripeInitInfo *init_info = nullptr;
8787

88-
Cache *cache = nullptr;
89-
uint32_t last_sync_serial = 0;
90-
uint32_t last_write_serial = 0;
91-
bool recover_wrapped = false;
92-
bool dir_sync_waiting = false;
93-
bool dir_sync_in_progress = false;
94-
bool writing_end_marker = false;
88+
Cache *cache = nullptr;
89+
uint32_t last_sync_serial = 0;
90+
uint32_t last_write_serial = 0;
91+
bool recover_wrapped = false;
92+
bool dir_sync_waiting = false;
93+
bool dir_sync_in_progress = false;
94+
bool writing_end_marker = false;
95+
CacheSync *cache_sync = nullptr;
9596

9697
CacheKey first_fragment_key;
9798
int64_t first_fragment_offset = 0;

src/records/RecordsConfig.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,8 @@ static constexpr RecordElement RecordsConfig[] =
853853
,
854854
{RECT_CONFIG, "proxy.config.cache.dir.sync_max_write", RECD_INT, "2097152", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
855855
,
856+
{RECT_CONFIG, "proxy.config.cache.dir.sync_parallel_tasks", RECD_INT, "1", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
857+
,
856858
{RECT_CONFIG, "proxy.config.cache.hostdb.disable_reverse_lookup", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
857859
,
858860
{RECT_CONFIG, "proxy.config.cache.select_alternate", RECD_INT, "1", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}

0 commit comments

Comments
 (0)