Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion lib/std/threads/fixed_pool.c3
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ struct FixedThreadPool
Mutex mu;
QueueItem[] queue;
usz qindex;
usz qworking;
usz num_threads;
bitstruct : char {
bool initialized;
bool stop;
bool stop_now;
bool joining;
}
Thread[] pool;
ConditionVariable notify;
ConditionVariable collect;
}

struct QueueItem @private
Expand All @@ -47,6 +50,7 @@ fn void? FixedThreadPool.init(&self, usz threads, usz queue_size = 0)
};
self.mu.init()!;
self.notify.init()!;
self.collect.init()!;
foreach (&thread : self.pool)
{
thread.create(&process_work, self)!;
Expand All @@ -55,6 +59,22 @@ fn void? FixedThreadPool.init(&self, usz threads, usz queue_size = 0)
}
}

<*
Join all threads in the pool.
*>
fn void? FixedThreadPool.join(&self)
{
if (self.initialized)
{
self.mu.lock()!;
self.joining = true;
self.notify.broadcast()!;
self.collect.wait(&self.mu)!;
self.joining = false;
self.mu.unlock()!;
}
}

<*
Stop all the threads and cleanup the pool.
Any pending work will be dropped.
Expand Down Expand Up @@ -93,12 +113,15 @@ macro void? FixedThreadPool.@shutdown(&self, #stop) @private
self.notify.signal()!;
}
self.mu.destroy()!;
self.notify.destroy()!;
self.collect.destroy()!;
self.initialized = false;
while (self.qindex)
{
free_qitem(self.queue[--self.qindex]);
}
free(self.queue);
free(self.pool);
self.queue = {};
}
}
Expand All @@ -120,6 +143,7 @@ fn void? FixedThreadPool.push(&self, ThreadPoolFn func, args...)
}
self.queue[self.qindex] = { .func = func, .args = data };
self.qindex++;
self.qworking++;
defer catch
{
free_qitem(self.queue[--self.qindex]);
Expand All @@ -144,6 +168,7 @@ fn int process_work(void* self_arg) @private
// Wait for work.
while (self.qindex == 0)
{
if (self.joining && self.qworking == 0) self.collect.signal()!!;
if (self.stop)
{
// Shutdown requested.
Expand All @@ -164,8 +189,11 @@ fn int process_work(void* self_arg) @private
self.qindex--;
QueueItem item = self.queue[self.qindex];
self.mu.unlock()!!;
defer free_qitem(item);
item.func(item.args);
defer free_qitem(item);
self.mu.lock()!!;
self.qworking--;
self.mu.unlock()!!;
}
}

Expand Down
27 changes: 27 additions & 0 deletions lib/std/threads/pool.c3
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ struct ThreadPool
Mutex mu;
QueueItem[SIZE] queue;
usz qindex;
usz qworking;
usz num_threads;
bitstruct : char
{
bool initialized;
bool stop;
bool stop_now;
bool joining;
}

Thread[SIZE] pool;
ConditionVariable notify;
ConditionVariable collect;
}

struct QueueItem @private
Expand All @@ -33,6 +36,7 @@ fn void? ThreadPool.init(&self)
*self = { .num_threads = SIZE, .initialized = true };
self.mu.init()!;
self.notify.init()!;
self.collect.init()!;
foreach (&thread : self.pool)
{
thread.create(&process_work, self)!;
Expand All @@ -41,6 +45,22 @@ fn void? ThreadPool.init(&self)
}
}

<*
Join all threads in the pool.
*>
fn void? ThreadPool.join(&self)
{
if (self.initialized)
{
self.mu.lock()!;
self.joining = true;
self.notify.broadcast()!;
self.collect.wait(&self.mu)!;
self.joining = false;
self.mu.unlock()!;
}
}

<*
Stop all the threads and cleanup the pool.
Any pending work will be dropped.
Expand Down Expand Up @@ -79,6 +99,8 @@ macro void? ThreadPool.@shutdown(&self, #stop) @private
self.notify.signal()!;
}
self.mu.destroy()!;
self.notify.destroy()!;
self.collect.destroy()!;
self.initialized = false;
}
}
Expand All @@ -97,6 +119,7 @@ fn void? ThreadPool.push(&self, ThreadFn func, void* arg)
{
self.queue[self.qindex] = { .func = func, .arg = arg };
self.qindex++;
self.qworking++;
// Notify the threads that work is available.
self.notify.broadcast()!;
return;
Expand All @@ -113,6 +136,7 @@ fn int process_work(void* arg) @private
// Wait for work.
while (self.qindex == 0)
{
if (self.joining && self.qworking == 0) self.collect.broadcast()!!;
if (self.stop)
{
// Shutdown requested.
Expand All @@ -134,5 +158,8 @@ fn int process_work(void* arg) @private
QueueItem item = self.queue[self.qindex];
self.mu.unlock()!!;
item.func(item.arg);
self.mu.lock()!!;
self.qworking--;
self.mu.unlock()!!;
}
}
7 changes: 4 additions & 3 deletions releasenotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- Casting a distinct type based on a pointer to an `any` would accidentally be permitted. #2575

### Stdlib changes
- Add `ThreadPool` join function to wait for all threads to finish in the pool without destroying the threads.

## 0.7.7 Change list

Expand All @@ -37,7 +38,7 @@
- Allow `..` ranges to use "a..a-1" in order to express zero length.
- Disallow aliasing of `@local` symbols with a higher visibility in the alias.
- Add `--max-macro-iterations` to set macro iteration limit.
- Improved generic inference in initializers #2541.
- Improved generic inference in initializers #2541.
- "Maybe-deref" subscripting `foo.[i] += 1` #2540.
- ABI change for vectors: store and pass them as arrays #2542.
- Add @simd and @align attributes to typedef #2543.
Expand Down Expand Up @@ -78,7 +79,7 @@
- Unify generic and regular module namespace.
- `env::PROJECT_VERSION` now returns the version in project.json.
- Comparing slices and arrays of user-defined types that implement == operator now works #2486.
- Add 'loop-vectorize', 'slp-vectorize', 'unroll-loops' and 'merge-functions' optimization flags #2491.
- Add 'loop-vectorize', 'slp-vectorize', 'unroll-loops' and 'merge-functions' optimization flags #2491.
- Add exec timings to -vv output #2490.
- Support #! as a comment on the first line only.
- Add `+++=` operator.
Expand Down Expand Up @@ -152,7 +153,7 @@
- `@assignable_to` is deprecated in favour of `$define`
- Add `linklib-dir` to c3l-libraries to place their linked libraries in. Defaults to `linked-libs`
- If the `os-arch` linked library doesn't exist, try with `os` for c3l libs.
- A file with an inferred module may not contain additional other modules.
- A file with an inferred module may not contain additional other modules.
- Update error message for missing body after if/for/etc #2289.
- `@is_const` is deprecated in favour of directly using `$defined`.
- `@is_lvalue(#value)` is deprecated in favour of directly using `$defined`.
Expand Down
22 changes: 22 additions & 0 deletions test/unit/stdlib/threads/pool.c3
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,32 @@ fn void push_stop() @test
}
}

fn void join() @test
{
@atomic_store(x, 0);
Pool pool;
pool.init()!!;
defer pool.stop_and_destroy()!!;
for (usz i = 0; i < 4; i++)
{
pool.push(&do_wait, (void*)i)!!;
}
pool.join()!!;
test::eq(x, 6);
}

int x;

fn int do_work(void* arg)
{
@atomic_store(x, @atomic_load(*(int*)arg));
return 0;
}

fn int do_wait(void* arg)
{
usz value = (iptr)arg;
for (usz i = 0; i < value; i++) thread::sleep(time::ms(50));
@atomic_store(x, @atomic_load(x) + (int)value);
return 0;
}