From ab5a9e00fe994e2382c44bbef7c56714ef558539 Mon Sep 17 00:00:00 2001 From: Sander van den Bosch Date: Sat, 15 Nov 2025 02:00:39 +0100 Subject: [PATCH 1/3] add join for ThreadPool without destroying the threads --- lib/std/threads/fixed_pool.c3 | 34 ++++++++++++++++++++++++++++++++ lib/std/threads/pool.c3 | 34 ++++++++++++++++++++++++++++++++ releasenotes.md | 7 ++++--- test/unit/stdlib/threads/pool.c3 | 25 +++++++++++++++++++++++ 4 files changed, 97 insertions(+), 3 deletions(-) diff --git a/lib/std/threads/fixed_pool.c3 b/lib/std/threads/fixed_pool.c3 index 17b4f7cc1..b73d5406f 100644 --- a/lib/std/threads/fixed_pool.c3 +++ b/lib/std/threads/fixed_pool.c3 @@ -14,13 +14,16 @@ struct FixedThreadPool QueueItem[] queue; usz qindex; usz num_threads; + usz thread_counter; bitstruct : char { bool initialized; bool stop; bool stop_now; + bool joining; } Thread[] pool; ConditionVariable notify; + ConditionVariable collect; } struct QueueItem @private @@ -55,6 +58,31 @@ fn void? FixedThreadPool.init(&self, usz threads, usz queue_size = 0) } } +<* + Join all threads in the pool. +*> +fn void? FixedThreadPool.join(&self) +{ + if (self.initialized) + { + self.mu.lock()!; + self.thread_counter = 0; + self.joining = true; + self.notify.broadcast()!; + self.mu.unlock()!; + while(self.thread_counter != self.num_threads) + { + self.mu.lock()!; + self.notify.signal()!; + self.mu.unlock()!!; + } + self.mu.lock()!; + self.joining = false; + self.collect.broadcast()!; + self.mu.unlock()!; + } +} + <* Stop all the threads and cleanup the pool. Any pending work will be dropped. @@ -144,6 +172,12 @@ fn int process_work(void* self_arg) @private // Wait for work. while (self.qindex == 0) { + if (self.joining) + { + // Join requested + self.thread_counter++; + self.collect.wait(&self.mu)!!; + } if (self.stop) { // Shutdown requested. diff --git a/lib/std/threads/pool.c3 b/lib/std/threads/pool.c3 index ccec23741..bbd9d8d46 100644 --- a/lib/std/threads/pool.c3 +++ b/lib/std/threads/pool.c3 @@ -7,15 +7,18 @@ struct ThreadPool QueueItem[SIZE] queue; usz qindex; usz num_threads; + usz thread_counter; bitstruct : char { bool initialized; bool stop; bool stop_now; + bool joining; } Thread[SIZE] pool; ConditionVariable notify; + ConditionVariable collect; } struct QueueItem @private @@ -41,6 +44,31 @@ fn void? ThreadPool.init(&self) } } +<* + Join all threads in the pool. +*> +fn void? ThreadPool.join(&self) +{ + if (self.initialized) + { + self.mu.lock()!; + self.thread_counter = 0; + self.joining = true; + self.notify.broadcast()!; + self.mu.unlock()!; + while(self.thread_counter != self.num_threads) + { + self.mu.lock()!; + self.notify.signal()!; + self.mu.unlock()!!; + } + self.mu.lock()!; + self.joining = false; + self.collect.broadcast()!; + self.mu.unlock()!; + } +} + <* Stop all the threads and cleanup the pool. Any pending work will be dropped. @@ -113,6 +141,12 @@ fn int process_work(void* arg) @private // Wait for work. while (self.qindex == 0) { + if (self.joining) + { + // Join requested + self.thread_counter++; + self.collect.wait(&self.mu)!!; + } if (self.stop) { // Shutdown requested. diff --git a/releasenotes.md b/releasenotes.md index 166afab94..f5042e2da 100644 --- a/releasenotes.md +++ b/releasenotes.md @@ -26,6 +26,7 @@ - Casting a distinct type based on a pointer to an `any` would accidentally be permitted. #2575 ### Stdlib changes +- Add `ThreadPool` join function to wait for all threads to finish in the pool without destroying the threads. ## 0.7.7 Change list @@ -37,7 +38,7 @@ - Allow `..` ranges to use "a..a-1" in order to express zero length. - Disallow aliasing of `@local` symbols with a higher visibility in the alias. - Add `--max-macro-iterations` to set macro iteration limit. -- Improved generic inference in initializers #2541. +- Improved generic inference in initializers #2541. - "Maybe-deref" subscripting `foo.[i] += 1` #2540. - ABI change for vectors: store and pass them as arrays #2542. - Add @simd and @align attributes to typedef #2543. @@ -78,7 +79,7 @@ - Unify generic and regular module namespace. - `env::PROJECT_VERSION` now returns the version in project.json. - Comparing slices and arrays of user-defined types that implement == operator now works #2486. -- Add 'loop-vectorize', 'slp-vectorize', 'unroll-loops' and 'merge-functions' optimization flags #2491. +- Add 'loop-vectorize', 'slp-vectorize', 'unroll-loops' and 'merge-functions' optimization flags #2491. - Add exec timings to -vv output #2490. - Support #! as a comment on the first line only. - Add `+++=` operator. @@ -152,7 +153,7 @@ - `@assignable_to` is deprecated in favour of `$define` - Add `linklib-dir` to c3l-libraries to place their linked libraries in. Defaults to `linked-libs` - If the `os-arch` linked library doesn't exist, try with `os` for c3l libs. -- A file with an inferred module may not contain additional other modules. +- A file with an inferred module may not contain additional other modules. - Update error message for missing body after if/for/etc #2289. - `@is_const` is deprecated in favour of directly using `$defined`. - `@is_lvalue(#value)` is deprecated in favour of directly using `$defined`. diff --git a/test/unit/stdlib/threads/pool.c3 b/test/unit/stdlib/threads/pool.c3 index 82e67f7e8..e0547a88c 100644 --- a/test/unit/stdlib/threads/pool.c3 +++ b/test/unit/stdlib/threads/pool.c3 @@ -42,10 +42,35 @@ fn void push_stop() @test } } +fn void join() @test +{ + @atomic_store(x, 0); + Pool pool; + pool.init()!!; + defer pool.stop_and_destroy()!!; + for (usz i = 0; i < 10; i++) + { + pool.push(&do_wait, (void*)i)!!; + } + pool.join()!!; + test::eq(x, 45); +} + int x; fn int do_work(void* arg) { @atomic_store(x, @atomic_load(*(int*)arg)); return 0; +} + +fn int do_wait(void* arg) +{ + for (usz i = 0; i < (iptr)arg; i++) + { + thread::sleep(time::ms(10)); + } + io::printfn("%s", arg); + x += (int)(iptr)arg; + return 0; } \ No newline at end of file From 08559ca04d17a1a5e79f66153743bf2071619d44 Mon Sep 17 00:00:00 2001 From: Sander van den Bosch Date: Fri, 21 Nov 2025 19:49:48 +0100 Subject: [PATCH 2/3] Make the main Thread block waiting for the worker threads to finish instead of buzy looping and do proper initialization and freeing of all variables. --- lib/std/threads/fixed_pool.c3 | 30 ++++++++++++------------------ lib/std/threads/pool.c3 | 27 ++++++++++----------------- 2 files changed, 22 insertions(+), 35 deletions(-) diff --git a/lib/std/threads/fixed_pool.c3 b/lib/std/threads/fixed_pool.c3 index b73d5406f..1219631c3 100644 --- a/lib/std/threads/fixed_pool.c3 +++ b/lib/std/threads/fixed_pool.c3 @@ -13,8 +13,8 @@ struct FixedThreadPool Mutex mu; QueueItem[] queue; usz qindex; + usz qworking; usz num_threads; - usz thread_counter; bitstruct : char { bool initialized; bool stop; @@ -50,6 +50,7 @@ fn void? FixedThreadPool.init(&self, usz threads, usz queue_size = 0) }; self.mu.init()!; self.notify.init()!; + self.collect.init()!; foreach (&thread : self.pool) { thread.create(&process_work, self)!; @@ -66,19 +67,10 @@ fn void? FixedThreadPool.join(&self) if (self.initialized) { self.mu.lock()!; - self.thread_counter = 0; self.joining = true; self.notify.broadcast()!; - self.mu.unlock()!; - while(self.thread_counter != self.num_threads) - { - self.mu.lock()!; - self.notify.signal()!; - self.mu.unlock()!!; - } - self.mu.lock()!; + self.collect.wait(&self.mu)!; self.joining = false; - self.collect.broadcast()!; self.mu.unlock()!; } } @@ -121,12 +113,15 @@ macro void? FixedThreadPool.@shutdown(&self, #stop) @private self.notify.signal()!; } self.mu.destroy()!; + self.notify.destroy()!; + self.collect.destroy()!; self.initialized = false; while (self.qindex) { free_qitem(self.queue[--self.qindex]); } free(self.queue); + free(self.pool); self.queue = {}; } } @@ -148,6 +143,7 @@ fn void? FixedThreadPool.push(&self, ThreadPoolFn func, args...) } self.queue[self.qindex] = { .func = func, .args = data }; self.qindex++; + self.qworking++; defer catch { free_qitem(self.queue[--self.qindex]); @@ -172,12 +168,7 @@ fn int process_work(void* self_arg) @private // Wait for work. while (self.qindex == 0) { - if (self.joining) - { - // Join requested - self.thread_counter++; - self.collect.wait(&self.mu)!!; - } + if (self.joining && self.qworking == 0) self.collect.signal()!!; if (self.stop) { // Shutdown requested. @@ -198,8 +189,11 @@ fn int process_work(void* self_arg) @private self.qindex--; QueueItem item = self.queue[self.qindex]; self.mu.unlock()!!; - defer free_qitem(item); item.func(item.args); + defer free_qitem(item); + self.mu.lock()!!; + self.qworking--; + self.mu.unlock()!!; } } diff --git a/lib/std/threads/pool.c3 b/lib/std/threads/pool.c3 index bbd9d8d46..fcee4ab7c 100644 --- a/lib/std/threads/pool.c3 +++ b/lib/std/threads/pool.c3 @@ -6,8 +6,8 @@ struct ThreadPool Mutex mu; QueueItem[SIZE] queue; usz qindex; + usz qworking; usz num_threads; - usz thread_counter; bitstruct : char { bool initialized; @@ -36,6 +36,7 @@ fn void? ThreadPool.init(&self) *self = { .num_threads = SIZE, .initialized = true }; self.mu.init()!; self.notify.init()!; + self.collect.init()!; foreach (&thread : self.pool) { thread.create(&process_work, self)!; @@ -52,19 +53,10 @@ fn void? ThreadPool.join(&self) if (self.initialized) { self.mu.lock()!; - self.thread_counter = 0; self.joining = true; self.notify.broadcast()!; - self.mu.unlock()!; - while(self.thread_counter != self.num_threads) - { - self.mu.lock()!; - self.notify.signal()!; - self.mu.unlock()!!; - } - self.mu.lock()!; + self.collect.wait(&self.mu)!; self.joining = false; - self.collect.broadcast()!; self.mu.unlock()!; } } @@ -107,6 +99,8 @@ macro void? ThreadPool.@shutdown(&self, #stop) @private self.notify.signal()!; } self.mu.destroy()!; + self.notify.destroy()!; + self.collect.destroy()!; self.initialized = false; } } @@ -125,6 +119,7 @@ fn void? ThreadPool.push(&self, ThreadFn func, void* arg) { self.queue[self.qindex] = { .func = func, .arg = arg }; self.qindex++; + self.qworking++; // Notify the threads that work is available. self.notify.broadcast()!; return; @@ -141,12 +136,7 @@ fn int process_work(void* arg) @private // Wait for work. while (self.qindex == 0) { - if (self.joining) - { - // Join requested - self.thread_counter++; - self.collect.wait(&self.mu)!!; - } + if (self.joining && self.qworking == 0) self.collect.broadcast()!!; if (self.stop) { // Shutdown requested. @@ -168,5 +158,8 @@ fn int process_work(void* arg) @private QueueItem item = self.queue[self.qindex]; self.mu.unlock()!!; item.func(item.arg); + self.mu.lock()!!; + self.qworking--; + self.mu.unlock()!!; } } \ No newline at end of file From ae5ddaf760d68f24b8cdfabfb4b42e4ee4f7c7e5 Mon Sep 17 00:00:00 2001 From: Sander van den Bosch Date: Sun, 23 Nov 2025 02:08:25 +0100 Subject: [PATCH 3/3] Updated test to use `atomic_store` and take into account the maximum queue size of the threadpool. --- test/unit/stdlib/threads/pool.c3 | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/test/unit/stdlib/threads/pool.c3 b/test/unit/stdlib/threads/pool.c3 index e0547a88c..7c0b37ee7 100644 --- a/test/unit/stdlib/threads/pool.c3 +++ b/test/unit/stdlib/threads/pool.c3 @@ -48,12 +48,12 @@ fn void join() @test Pool pool; pool.init()!!; defer pool.stop_and_destroy()!!; - for (usz i = 0; i < 10; i++) + for (usz i = 0; i < 4; i++) { pool.push(&do_wait, (void*)i)!!; } pool.join()!!; - test::eq(x, 45); + test::eq(x, 6); } int x; @@ -66,11 +66,8 @@ fn int do_work(void* arg) fn int do_wait(void* arg) { - for (usz i = 0; i < (iptr)arg; i++) - { - thread::sleep(time::ms(10)); - } - io::printfn("%s", arg); - x += (int)(iptr)arg; + usz value = (iptr)arg; + for (usz i = 0; i < value; i++) thread::sleep(time::ms(50)); + @atomic_store(x, @atomic_load(x) + (int)value); return 0; } \ No newline at end of file