-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Rewrite the old proxying API in terms of the new API #15880
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
6584c16
99f14f5
59a7bb3
faa6423
f241353
c3732d5
be9db82
5b6294e
e5ed1ba
8e17f91
be3de8f
f274a3c
d147a5a
64c1d42
4d5aa0e
b421b3c
2a21b12
82ea908
45f7b97
3b9cbd8
e9af913
fa0d198
54ae773
e4b05eb
28be33e
0fa13af
af9d2ce
d03c3da
9a89e69
f67a82e
18eebf5
67aa5ef
1db6681
72d7457
47fbd08
a5a151b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -263,9 +263,9 @@ var LibraryPThread = { | |
return; | ||
} | ||
|
||
if (cmd === 'processQueuedMainThreadWork') { | ||
if (cmd === 'processProxyingQueue') { | ||
// TODO: Must post message to main Emscripten thread in PROXY_TO_WORKER mode. | ||
_emscripten_main_thread_process_queued_calls(); | ||
_emscripten_proxy_execute_queue(d['queue']); | ||
} else if (cmd === 'spawnThread') { | ||
spawnThread(d); | ||
} else if (cmd === 'cleanupThread') { | ||
|
@@ -1037,15 +1037,11 @@ var LibraryPThread = { | |
return {{{ makeDynCall('ii', 'ptr') }}}(arg); | ||
}, | ||
|
||
// This function is called internally to notify target thread ID that it has messages it needs to | ||
// process in its message queue inside the Wasm heap. As a helper, the caller must also pass the | ||
// ID of the main browser thread to this function, to avoid needlessly ping-ponging between JS and | ||
// Wasm boundaries. | ||
_emscripten_notify_thread_queue: function(targetThreadId, mainThreadId) { | ||
if (targetThreadId == mainThreadId) { | ||
postMessage({'cmd' : 'processQueuedMainThreadWork'}); | ||
_emscripten_notify_proxying_queue: function(targetThreadId, currThreadId, mainThreadId, queue) { | ||
if (targetThreadId == currThreadId) { | ||
setTimeout(function() { _emscripten_proxy_execute_queue(queue); }); | ||
|
||
} else if (ENVIRONMENT_IS_PTHREAD) { | ||
postMessage({'targetThread': targetThreadId, 'cmd': 'processThreadQueue'}); | ||
postMessage({'targetThread' : targetThreadId, 'cmd' : 'processProxyingQueue', 'queue' : queue}); | ||
} else { | ||
var pthread = PThread.pthreads[targetThreadId]; | ||
var worker = pthread && pthread.worker; | ||
|
@@ -1055,7 +1051,7 @@ var LibraryPThread = { | |
#endif | ||
return /*0*/; | ||
} | ||
worker.postMessage({'cmd' : 'processThreadQueue'}); | ||
worker.postMessage({'cmd' : 'processProxyingQueue', 'queue': queue}); | ||
} | ||
return 1; | ||
}, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -285,9 +285,9 @@ self.onmessage = (e) => { | |
} | ||
} else if (e.data.target === 'setimmediate') { | ||
// no-op | ||
} else if (e.data.cmd === 'processThreadQueue') { | ||
} else if (e.data.cmd === 'processProxyingQueue') { | ||
if (Module['_pthread_self']()) { // If this thread is actually running? | ||
Module['_emscripten_current_thread_process_queued_calls'](); | ||
Module['_emscripten_proxy_execute_queue'](e.data.queue); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to pass the queue pointer around or can we just assume its the system queue here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we need to pass the queue pointer around because this is the same code path used for both the system queue and user queues. Actually, I see that this ended up duplicated somehow as well. Will fix. |
||
} | ||
} else if (e.data.cmd === 'processProxyingQueue') { | ||
if (Module['_pthread_self']()) { // If this thread is actually running? | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ | |
#include <emscripten/stack.h> | ||
|
||
#include "threading_internal.h" | ||
#include "proxying.h" | ||
|
||
int emscripten_pthread_attr_gettransferredcanvases(const pthread_attr_t* a, const char** str) { | ||
*str = a->_a_transferredcanvases; | ||
|
@@ -177,7 +178,6 @@ void emscripten_async_waitable_close(em_queued_call* call) { | |
|
||
extern EMSCRIPTEN_RESULT _emscripten_set_offscreencanvas_size(const char *target, int width, int height); | ||
extern double emscripten_receive_on_main_thread_js(int functionIndex, int numCallArgs, double* args); | ||
extern int _emscripten_notify_thread_queue(pthread_t targetThreadId, pthread_t mainThreadId); | ||
|
||
static void _do_call(void* arg) { | ||
em_queued_call* q = (em_queued_call*)arg; | ||
|
@@ -321,60 +321,6 @@ static void _do_call(void* arg) { | |
} | ||
} | ||
|
||
#define CALL_QUEUE_SIZE 128 | ||
|
||
// Shared data synchronized by call_queue_lock. | ||
typedef struct CallQueueEntry { | ||
void (*func)(void*); | ||
void* arg; | ||
} CallQueueEntry; | ||
|
||
typedef struct CallQueue { | ||
void* target_thread; | ||
CallQueueEntry* call_queue; | ||
int call_queue_head; | ||
int call_queue_tail; | ||
struct CallQueue* next; | ||
} CallQueue; | ||
|
||
// Currently global to the queue, but this can be improved to be per-queue specific. (TODO: with | ||
// lockfree list operations on callQueue_head, or removing the list by moving this data to | ||
// pthread_t) | ||
static pthread_mutex_t call_queue_lock = PTHREAD_MUTEX_INITIALIZER; | ||
static CallQueue* callQueue_head = 0; | ||
|
||
// Not thread safe, call while having call_queue_lock obtained. | ||
static CallQueue* GetQueue(void* target) { | ||
assert(target); | ||
CallQueue* q = callQueue_head; | ||
while (q && q->target_thread != target) | ||
q = q->next; | ||
return q; | ||
} | ||
|
||
// Not thread safe, call while having call_queue_lock obtained. | ||
static CallQueue* GetOrAllocateQueue(void* target) { | ||
CallQueue* q = GetQueue(target); | ||
if (q) | ||
return q; | ||
|
||
q = (CallQueue*)malloc(sizeof(CallQueue)); | ||
q->target_thread = target; | ||
q->call_queue = 0; | ||
q->call_queue_head = 0; | ||
q->call_queue_tail = 0; | ||
q->next = 0; | ||
if (callQueue_head) { | ||
CallQueue* last = callQueue_head; | ||
while (last->next) | ||
last = last->next; | ||
last->next = q; | ||
} else { | ||
callQueue_head = q; | ||
} | ||
return q; | ||
} | ||
|
||
EMSCRIPTEN_RESULT emscripten_wait_for_call_v(em_queued_call* call, double timeoutMSecs) { | ||
int r; | ||
|
||
|
@@ -410,85 +356,43 @@ pthread_t emscripten_main_browser_thread_id() { | |
return &__main_pthread; | ||
} | ||
|
||
int _emscripten_do_dispatch_to_thread(pthread_t target_thread, em_queued_call* call) { | ||
assert(call); | ||
|
||
// #if PTHREADS_DEBUG // TODO: Create a debug version of pthreads library | ||
// EM_ASM_INT({dump('thread ' + _pthread_self() + ' (ENVIRONMENT_IS_WORKER: ' + | ||
//ENVIRONMENT_IS_WORKER + '), queueing call of function enum=' + $0 + '/ptr=' + $1 + ' on thread ' | ||
//+ $2 + '\n' + new Error().stack)}, call->functionEnum, call->functionPtr, target_thread); | ||
// #endif | ||
|
||
// Can't be a null pointer here, and can't be | ||
// EM_CALLBACK_THREAD_CONTEXT_MAIN_BROWSER_THREAD either. | ||
static pthread_t normalize_thread(pthread_t target_thread) { | ||
assert(target_thread); | ||
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_MAIN_BROWSER_THREAD) | ||
target_thread = emscripten_main_browser_thread_id(); | ||
|
||
// If we are the target recipient of this message, we can just call the operation directly. | ||
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_CALLING_THREAD || | ||
target_thread == pthread_self()) { | ||
_do_call(call); | ||
return 1; | ||
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_MAIN_BROWSER_THREAD) { | ||
return emscripten_main_browser_thread_id(); | ||
} | ||
|
||
// Add the operation to the call queue of the main runtime thread. | ||
pthread_mutex_lock(&call_queue_lock); | ||
CallQueue* q = GetOrAllocateQueue(target_thread); | ||
if (!q->call_queue) { | ||
// Shared data synchronized by call_queue_lock. | ||
q->call_queue = malloc(sizeof(CallQueueEntry) * CALL_QUEUE_SIZE); | ||
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_CALLING_THREAD) { | ||
return pthread_self(); | ||
} | ||
return target_thread; | ||
} | ||
|
||
int head = q->call_queue_head; | ||
int tail = q->call_queue_tail; | ||
int new_tail = (tail + 1) % CALL_QUEUE_SIZE; | ||
|
||
while (new_tail == head) { // Queue is full? | ||
pthread_mutex_unlock(&call_queue_lock); | ||
|
||
// If queue of the main browser thread is full, then we wait. (never drop messages for the main | ||
// browser thread) | ||
if (target_thread == emscripten_main_browser_thread_id()) { | ||
emscripten_futex_wait((void*)&q->call_queue_head, head, INFINITY); | ||
pthread_mutex_lock(&call_queue_lock); | ||
head = q->call_queue_head; | ||
tail = q->call_queue_tail; | ||
new_tail = (tail + 1) % CALL_QUEUE_SIZE; | ||
} else { | ||
// For the queues of other threads, just drop the message. | ||
// #if DEBUG TODO: a debug build of pthreads library? | ||
// EM_ASM(console.error('Pthread queue overflowed, dropping queued | ||
//message to thread. ' + new Error().stack)); | ||
// #endif | ||
em_queued_call_free(call); | ||
return 0; | ||
} | ||
// Execute `call` and return 1 only if already on the `target_thread`. Otherwise | ||
// return 0. | ||
static int maybe_call_on_current_thread(pthread_t target_thread, | ||
em_queued_call* call) { | ||
if (pthread_equal(target_thread, pthread_self())) { | ||
_do_call(call); | ||
return 1; | ||
} | ||
return 0; | ||
} | ||
|
||
q->call_queue[tail].func = _do_call; | ||
q->call_queue[tail].arg = call; | ||
|
||
// If the call queue was empty, the main runtime thread is likely idle in the browser event loop, | ||
// so send a message to it to ensure that it wakes up to start processing the command we have | ||
// posted. | ||
if (head == tail) { | ||
int success = _emscripten_notify_thread_queue(target_thread, emscripten_main_browser_thread_id()); | ||
// Failed to dispatch the thread, delete the crafted message. | ||
if (!success) { | ||
em_queued_call_free(call); | ||
pthread_mutex_unlock(&call_queue_lock); | ||
return 0; | ||
} | ||
// Execute or proxy `call`. Return 1 if the work was executed or otherwise | ||
// return 0. | ||
static int do_dispatch_to_thread(pthread_t target_thread, | ||
em_queued_call* call) { | ||
target_thread = normalize_thread(target_thread); | ||
if (maybe_call_on_current_thread(target_thread, call)) { | ||
return 1; | ||
} | ||
|
||
q->call_queue_tail = new_tail; | ||
pthread_mutex_unlock(&call_queue_lock); | ||
emscripten_proxy_async( | ||
emscripten_proxy_get_system_queue(), target_thread, _do_call, call); | ||
return 0; | ||
} | ||
|
||
void emscripten_async_run_in_main_thread(em_queued_call* call) { | ||
_emscripten_do_dispatch_to_thread(emscripten_main_browser_thread_id(), call); | ||
do_dispatch_to_thread(emscripten_main_browser_thread_id(), call); | ||
tlively marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
void emscripten_sync_run_in_main_thread(em_queued_call* call) { | ||
|
@@ -589,50 +493,7 @@ void* emscripten_sync_run_in_main_thread_7(int function, void* arg1, | |
} | ||
|
||
void emscripten_current_thread_process_queued_calls() { | ||
// #if PTHREADS_DEBUG == 2 | ||
// EM_ASM(console.error('thread ' + _pthread_self() + ': | ||
//emscripten_current_thread_process_queued_calls(), ' + new Error().stack)); | ||
// #endif | ||
|
||
static thread_local bool thread_is_processing_queued_calls = false; | ||
|
||
// It is possible that when processing a queued call, the control flow leads back to calling this | ||
// function in a nested fashion! Therefore this scenario must explicitly be detected, and | ||
// processing the queue must be avoided if we are nesting, or otherwise the same queued calls | ||
// would be processed again and again. | ||
if (thread_is_processing_queued_calls) | ||
return; | ||
// This must be before pthread_mutex_lock(), since pthread_mutex_lock() can call back to this | ||
// function. | ||
thread_is_processing_queued_calls = true; | ||
|
||
pthread_mutex_lock(&call_queue_lock); | ||
CallQueue* q = GetQueue(pthread_self()); | ||
if (!q) { | ||
pthread_mutex_unlock(&call_queue_lock); | ||
thread_is_processing_queued_calls = false; | ||
return; | ||
} | ||
|
||
int head = q->call_queue_head; | ||
int tail = q->call_queue_tail; | ||
while (head != tail) { | ||
// Assume that the call is heavy, so unlock access to the call queue while it is being | ||
// performed. | ||
pthread_mutex_unlock(&call_queue_lock); | ||
q->call_queue[head].func(q->call_queue[head].arg); | ||
pthread_mutex_lock(&call_queue_lock); | ||
|
||
head = (head + 1) % CALL_QUEUE_SIZE; | ||
q->call_queue_head = head; | ||
tail = q->call_queue_tail; | ||
} | ||
pthread_mutex_unlock(&call_queue_lock); | ||
|
||
// If the queue was full and we had waiters pending to get to put data to queue, wake them up. | ||
emscripten_futex_wake((void*)&q->call_queue_head, INT_MAX); | ||
|
||
thread_is_processing_queued_calls = false; | ||
emscripten_proxy_execute_queue(emscripten_proxy_get_system_queue()); | ||
} | ||
|
||
// At times when we disallow the main thread to process queued calls, this will | ||
|
@@ -644,7 +505,15 @@ void emscripten_main_thread_process_queued_calls() { | |
if (!_emscripten_allow_main_runtime_queued_calls) | ||
return; | ||
|
||
// Recursion guard to avoid infinite recursion when we arrive here from the | ||
// pthread calls inside `emscripten_proxy_execute_queue`. | ||
tlively marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
static bool processing = 0; | ||
tlively marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
if (processing) { | ||
return; | ||
} | ||
processing = 1; | ||
emscripten_current_thread_process_queued_calls(); | ||
processing = 0; | ||
} | ||
|
||
int emscripten_sync_run_in_main_runtime_thread_(EM_FUNC_SIGNATURE sig, void* func_ptr, ...) { | ||
|
@@ -733,17 +602,6 @@ em_queued_call* emscripten_async_waitable_run_in_main_runtime_thread_( | |
return q; | ||
} | ||
|
||
typedef struct DispatchToThreadArgs { | ||
pthread_t target_thread; | ||
em_queued_call* q; | ||
} DispatchToThreadArgs; | ||
|
||
static void dispatch_to_thread_helper(void* user_data) { | ||
DispatchToThreadArgs* args = (DispatchToThreadArgs*)user_data; | ||
_emscripten_do_dispatch_to_thread(args->target_thread, args->q); | ||
free(user_data); | ||
} | ||
|
||
int emscripten_dispatch_to_thread_args(pthread_t target_thread, | ||
EM_FUNC_SIGNATURE sig, | ||
void* func_ptr, | ||
|
@@ -761,7 +619,7 @@ int emscripten_dispatch_to_thread_args(pthread_t target_thread, | |
|
||
// `q` will not be used after it is called, so let the call clean it up. | ||
q->calleeDelete = 1; | ||
return _emscripten_do_dispatch_to_thread(target_thread, q); | ||
return do_dispatch_to_thread(target_thread, q); | ||
} | ||
|
||
int emscripten_dispatch_to_thread_(pthread_t target_thread, | ||
|
@@ -792,10 +650,7 @@ int emscripten_dispatch_to_thread_async_args(pthread_t target_thread, | |
q->calleeDelete = 1; | ||
|
||
// Schedule the call to run later on this thread. | ||
DispatchToThreadArgs* args = malloc(sizeof(DispatchToThreadArgs)); | ||
args->target_thread = target_thread; | ||
args->q = q; | ||
emscripten_set_timeout(dispatch_to_thread_helper, 0, args); | ||
emscripten_set_timeout(_do_call, 0, args); | ||
|
||
return 0; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we save a little of JS codesize here by passing zero args and having
_emscripten_proxy_execute_queue
assume that NULL == system queue? Would save the extra post message packing / unpacking too.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, potentially, but I would prefer to make it a clear-cut error for NULL to ever be passed to
emscripten_proxy_execute_queue
. Better for users to get an assertion failure (in debug builds) when they do that by accident than for the system queue to be executed.