Skip to content

Commit 7332f81

Browse files
authored
New low-level proxying C API (#15737)
Implement a new proxying API that is meant to be suitable both for proxying in syscall implementations as well as for proxying arbitrary user work. Since the system proxying queue is processed from so many locations, it is dangerous to use it for arbitrary work that might take a lock or otherwise block and the work sent to it has to be structured more like native signal handlers. To avoid this limitation, the new API allows users to create their own proxying queues that are processed only when the target thread returns to the JS event loop or when the user explicitly requests processing. In contrast to the existing proxying API, this new API: - Never drops tasks (except in the case of allocation failure). It grows the task queues as necessary instead. - Does not attempt to dynamically type or dispatch queued functions, but rather uses statically typed function pointers that take `void*` arguments. This simplifies both the API and implementation. Packing of varargs into dynamically typed function wrappers could easily be layered on top of this API. - Is less redundant. There is only one way to proxy work synchronously or asynchronously to any thread. - Is more general. It allows waiting for a task to be explicitly signaled as done in addition to waiting for the proxied function to return. - Uses arrays instead of linked lists for better data locality. - Has a more uniform naming convention. A follow-up PR will reimplement the existing proxying API in terms of this new API.
1 parent e002f7f commit 7332f81

File tree

8 files changed

+810
-0
lines changed

8 files changed

+810
-0
lines changed

src/library_pthread.js

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -976,6 +976,25 @@ var LibraryPThread = {
976976
worker.postMessage({'cmd' : 'processThreadQueue'});
977977
}
978978
return 1;
979+
},
980+
981+
_emscripten_notify_proxying_queue: function(targetThreadId, currThreadId, mainThreadId, queue) {
982+
if (targetThreadId == currThreadId) {
983+
setTimeout(function() { _emscripten_proxy_execute_queue(queue); });
984+
} else if (ENVIRONMENT_IS_PTHREAD) {
985+
postMessage({'targetThread' : targetThreadId, 'cmd' : 'processProxyingQueue', 'queue' : queue});
986+
} else {
987+
var pthread = PThread.pthreads[targetThreadId];
988+
var worker = pthread && pthread.worker;
989+
if (!worker) {
990+
#if ASSERTIONS
991+
err('Cannot send message to thread with ID ' + targetThreadId + ', unknown thread ID!');
992+
#endif
993+
return /*0*/;
994+
}
995+
worker.postMessage({'cmd' : 'processProxyingQueue', 'queue': queue});
996+
}
997+
return 1;
979998
}
980999
};
9811000

src/worker.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,10 @@ self.onmessage = (e) => {
285285
if (Module['_pthread_self']()) { // If this thread is actually running?
286286
Module['_emscripten_current_thread_process_queued_calls']();
287287
}
288+
} else if (e.data.cmd === 'processProxyingQueue') {
289+
if (Module['_pthread_self']()) { // If this thread is actually running?
290+
Module['_emscripten_proxy_execute_queue'](e.data.queue);
291+
}
288292
} else {
289293
err('worker.js received unknown command ' + e.data.cmd);
290294
err(e.data);

system/lib/pthread/proxying.c

Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
/*
2+
* Copyright 2021 The Emscripten Authors. All rights reserved.
3+
* Emscripten is available under two separate licenses, the MIT license and the
4+
* University of Illinois/NCSA Open Source License. Both these licenses can be
5+
* found in the LICENSE file.
6+
*/
7+
8+
#include <assert.h>
9+
#include <emscripten/threading.h>
10+
#include <pthread.h>
11+
#include <stdlib.h>
12+
#include <string.h>
13+
14+
#include "proxying.h"
15+
16+
#define TASK_QUEUE_INITIAL_CAPACITY 128
17+
18+
extern int _emscripten_notify_proxying_queue(pthread_t target_thread,
19+
pthread_t curr_thread,
20+
pthread_t main_thread,
21+
em_proxying_queue* queue);
22+
23+
typedef struct task {
24+
void (*func)(void*);
25+
void* arg;
26+
} task;
27+
28+
// A task queue for a particular thread. Organized into a linked list of
29+
// task_queues for different threads.
30+
typedef struct task_queue {
31+
// The target thread for this task_queue.
32+
pthread_t thread;
33+
// Recursion guard. TODO: We disallow recursive processing because that's what
34+
// the old proxying API does, so it is safer to start with the same behavior.
35+
// Experiment with relaxing this restriction once the old API uses these
36+
// queues as well.
37+
int processing;
38+
// Ring buffer of tasks of size `capacity`. New tasks are enqueued at
39+
// `tail` and dequeued at `head`.
40+
task* tasks;
41+
int capacity;
42+
int head;
43+
int tail;
44+
} task_queue;
45+
46+
static int task_queue_init(task_queue* tasks, pthread_t thread) {
47+
task* task_buffer = malloc(sizeof(task) * TASK_QUEUE_INITIAL_CAPACITY);
48+
if (task_buffer == NULL) {
49+
return 0;
50+
}
51+
*tasks = (task_queue){.thread = thread,
52+
.processing = 0,
53+
.tasks = task_buffer,
54+
.capacity = TASK_QUEUE_INITIAL_CAPACITY,
55+
.head = 0,
56+
.tail = 0};
57+
return 1;
58+
}
59+
60+
static void task_queue_deinit(task_queue* tasks) { free(tasks->tasks); }
61+
62+
// Not thread safe.
63+
static int task_queue_is_empty(task_queue* tasks) {
64+
return tasks->head == tasks->tail;
65+
}
66+
67+
// Not thread safe.
68+
static int task_queue_full(task_queue* tasks) {
69+
return tasks->head == (tasks->tail + 1) % tasks->capacity;
70+
}
71+
72+
// // Not thread safe. Returns 1 on success and 0 on failure.
73+
static int task_queue_grow(task_queue* tasks) {
74+
// Allocate a larger task queue.
75+
int new_capacity = tasks->capacity * 2;
76+
task* new_tasks = malloc(sizeof(task) * new_capacity);
77+
if (new_tasks == NULL) {
78+
return 0;
79+
}
80+
// Copy the tasks such that the head of the queue is at the beginning of the
81+
// buffer. There are two cases to handle: either the queue wraps around the
82+
// end of the old buffer or it does not.
83+
int queued_tasks;
84+
if (tasks->head <= tasks->tail) {
85+
// No wrap. Copy the tasks in one chunk.
86+
queued_tasks = tasks->tail - tasks->head;
87+
memcpy(new_tasks, &tasks->tasks[tasks->head], sizeof(task) * queued_tasks);
88+
} else {
89+
// Wrap. Copy `first_queued` tasks up to the end of the old buffer and
90+
// `last_queued` tasks at the beginning of the old buffer.
91+
int first_queued = tasks->capacity - tasks->head;
92+
int last_queued = tasks->tail;
93+
queued_tasks = first_queued + last_queued;
94+
memcpy(new_tasks, &tasks->tasks[tasks->head], sizeof(task) * first_queued);
95+
memcpy(new_tasks + first_queued, tasks->tasks, sizeof(task) * last_queued);
96+
}
97+
free(tasks->tasks);
98+
tasks->tasks = new_tasks;
99+
tasks->capacity = new_capacity;
100+
tasks->head = 0;
101+
tasks->tail = queued_tasks;
102+
return 1;
103+
}
104+
105+
// Not thread safe. Returns 1 on success and 0 on failure.
106+
static int task_queue_enqueue(task_queue* tasks, task t) {
107+
if (task_queue_full(tasks) && !task_queue_grow(tasks)) {
108+
return 0;
109+
}
110+
tasks->tasks[tasks->tail] = t;
111+
tasks->tail = (tasks->tail + 1) % tasks->capacity;
112+
return 1;
113+
}
114+
115+
// Not thread safe. Assumes the queue is not empty.
116+
static task task_queue_dequeue(task_queue* tasks) {
117+
task t = tasks->tasks[tasks->head];
118+
tasks->head = (tasks->head + 1) % tasks->capacity;
119+
return t;
120+
}
121+
122+
struct em_proxying_queue {
123+
// Protects all accesses to all task_queues.
124+
pthread_mutex_t mutex;
125+
// `size` task queues stored in an array of size `capacity`.
126+
task_queue* task_queues;
127+
int size;
128+
int capacity;
129+
};
130+
131+
static em_proxying_queue system_proxying_queue = {.mutex =
132+
PTHREAD_MUTEX_INITIALIZER,
133+
.task_queues = NULL,
134+
.size = 0,
135+
.capacity = 0};
136+
137+
em_proxying_queue* emscripten_proxy_get_system_queue(void) {
138+
return &system_proxying_queue;
139+
}
140+
141+
em_proxying_queue* em_proxying_queue_create(void) {
142+
em_proxying_queue* q = malloc(sizeof(em_proxying_queue));
143+
if (q == NULL) {
144+
return NULL;
145+
}
146+
*q = (em_proxying_queue){.mutex = PTHREAD_MUTEX_INITIALIZER,
147+
.task_queues = NULL,
148+
.size = 0,
149+
.capacity = 0};
150+
return q;
151+
}
152+
153+
void em_proxying_queue_destroy(em_proxying_queue* q) {
154+
assert(q != NULL);
155+
assert(q != &system_proxying_queue && "cannot destroy system proxying queue");
156+
// No need to acquire the lock; no one should be racing with the destruction
157+
// of the queue.
158+
pthread_mutex_destroy(&q->mutex);
159+
for (int i = 0; i < q->size; i++) {
160+
task_queue_deinit(&q->task_queues[i]);
161+
}
162+
free(q->task_queues);
163+
free(q);
164+
}
165+
166+
// Not thread safe. Returns -1 if there are no tasks for the thread.
167+
static int get_tasks_index_for_thread(em_proxying_queue* q, pthread_t thread) {
168+
assert(q != NULL);
169+
for (int i = 0; i < q->size; i++) {
170+
if (pthread_equal(q->task_queues[i].thread, thread)) {
171+
return i;
172+
}
173+
}
174+
return -1;
175+
}
176+
177+
// Not thread safe.
178+
static task_queue* get_or_add_tasks_for_thread(em_proxying_queue* q,
179+
pthread_t thread) {
180+
int tasks_index = get_tasks_index_for_thread(q, thread);
181+
if (tasks_index != -1) {
182+
return &q->task_queues[tasks_index];
183+
}
184+
// There were no tasks for the thread; initialize a new task_queue. If there
185+
// are not enough queues, allocate more.
186+
if (q->size == q->capacity) {
187+
int new_capacity = q->capacity == 0 ? 1 : q->capacity * 2;
188+
task_queue* new_task_queues =
189+
realloc(q->task_queues, sizeof(task_queue) * new_capacity);
190+
if (new_task_queues == NULL) {
191+
return NULL;
192+
}
193+
q->task_queues = new_task_queues;
194+
q->capacity = new_capacity;
195+
}
196+
// Initialize the next available task queue.
197+
task_queue* tasks = &q->task_queues[q->size];
198+
if (!task_queue_init(tasks, thread)) {
199+
return NULL;
200+
}
201+
q->size++;
202+
return tasks;
203+
}
204+
205+
// Exported for use in worker.js.
206+
EMSCRIPTEN_KEEPALIVE
207+
void emscripten_proxy_execute_queue(em_proxying_queue* q) {
208+
assert(q != NULL);
209+
pthread_mutex_lock(&q->mutex);
210+
int tasks_index = get_tasks_index_for_thread(q, pthread_self());
211+
task_queue* tasks = tasks_index == -1 ? NULL : &q->task_queues[tasks_index];
212+
if (tasks == NULL || tasks->processing) {
213+
// No tasks for this thread or they are already being processed.
214+
pthread_mutex_unlock(&q->mutex);
215+
return;
216+
}
217+
// Found the task queue; process the tasks.
218+
tasks->processing = 1;
219+
while (!task_queue_is_empty(tasks)) {
220+
task t = task_queue_dequeue(tasks);
221+
// Unlock while the task is running to allow more work to be queued in
222+
// parallel.
223+
pthread_mutex_unlock(&q->mutex);
224+
t.func(t.arg);
225+
pthread_mutex_lock(&q->mutex);
226+
// The tasks might have been reallocated, so recalculate the pointer.
227+
tasks = &q->task_queues[tasks_index];
228+
}
229+
tasks->processing = 0;
230+
pthread_mutex_unlock(&q->mutex);
231+
}
232+
233+
int emscripten_proxy_async(em_proxying_queue* q,
234+
pthread_t target_thread,
235+
void (*func)(void*),
236+
void* arg) {
237+
assert(q != NULL);
238+
pthread_mutex_lock(&q->mutex);
239+
task_queue* tasks = get_or_add_tasks_for_thread(q, target_thread);
240+
if (tasks == NULL) {
241+
goto failed;
242+
}
243+
int empty = task_queue_is_empty(tasks);
244+
if (!task_queue_enqueue(tasks, (task){func, arg})) {
245+
goto failed;
246+
}
247+
pthread_mutex_unlock(&q->mutex);
248+
// If the queue was previously empty, notify the target thread to process it.
249+
// Otherwise, the target thread was already notified when the existing work
250+
// was enqueued so we don't need to notify it again.
251+
if (empty) {
252+
_emscripten_notify_proxying_queue(
253+
target_thread, pthread_self(), emscripten_main_browser_thread_id(), q);
254+
}
255+
return 1;
256+
257+
failed:
258+
pthread_mutex_unlock(&q->mutex);
259+
return 0;
260+
}
261+
262+
struct em_proxying_ctx {
263+
// The user-provided function and argument.
264+
void (*func)(em_proxying_ctx*, void*);
265+
void* arg;
266+
// Set `done` to 1 and signal the condition variable once the proxied task is
267+
// done.
268+
int done;
269+
pthread_mutex_t mutex;
270+
pthread_cond_t cond;
271+
};
272+
273+
static void em_proxying_ctx_init(em_proxying_ctx* ctx,
274+
void (*func)(em_proxying_ctx*, void*),
275+
void* arg) {
276+
*ctx = (em_proxying_ctx){.func = func,
277+
.arg = arg,
278+
.done = 0,
279+
.mutex = PTHREAD_MUTEX_INITIALIZER,
280+
.cond = PTHREAD_COND_INITIALIZER};
281+
}
282+
283+
static void em_proxying_ctx_deinit(em_proxying_ctx* ctx) {
284+
pthread_mutex_destroy(&ctx->mutex);
285+
pthread_cond_destroy(&ctx->cond);
286+
}
287+
288+
void emscripten_proxy_finish(em_proxying_ctx* ctx) {
289+
pthread_mutex_lock(&ctx->mutex);
290+
ctx->done = 1;
291+
pthread_mutex_unlock(&ctx->mutex);
292+
pthread_cond_signal(&ctx->cond);
293+
}
294+
295+
// Helper for wrapping the call with ctx as a `void (*)(void*)`.
296+
static void call_with_ctx(void* p) {
297+
em_proxying_ctx* ctx = (em_proxying_ctx*)p;
298+
ctx->func(ctx, ctx->arg);
299+
}
300+
301+
int emscripten_proxy_sync_with_ctx(em_proxying_queue* q,
302+
pthread_t target_thread,
303+
void (*func)(em_proxying_ctx*, void*),
304+
void* arg) {
305+
assert(!pthread_equal(target_thread, pthread_self()) &&
306+
"Cannot synchronously wait for work proxied to the current thread");
307+
em_proxying_ctx ctx;
308+
em_proxying_ctx_init(&ctx, func, arg);
309+
if (!emscripten_proxy_async(q, target_thread, call_with_ctx, &ctx)) {
310+
return 0;
311+
}
312+
pthread_mutex_lock(&ctx.mutex);
313+
while (!ctx.done) {
314+
pthread_cond_wait(&ctx.cond, &ctx.mutex);
315+
}
316+
pthread_mutex_unlock(&ctx.mutex);
317+
em_proxying_ctx_deinit(&ctx);
318+
return 1;
319+
}
320+
321+
// Helper for signaling the end of the task after the user function returns.
322+
static void call_then_finish(em_proxying_ctx* ctx, void* arg) {
323+
task* t = (task*)arg;
324+
t->func(t->arg);
325+
emscripten_proxy_finish(ctx);
326+
}
327+
328+
int emscripten_proxy_sync(em_proxying_queue* q,
329+
pthread_t target_thread,
330+
void (*func)(void*),
331+
void* arg) {
332+
task t = {func, arg};
333+
return emscripten_proxy_sync_with_ctx(q, target_thread, call_then_finish, &t);
334+
}

0 commit comments

Comments
 (0)