-
Notifications
You must be signed in to change notification settings - Fork 8
refactor concurrent_imports support in wasmtime-wit-bindgen
#6
Conversation
I had a few goals with this PR: 1. Improve the ergonomics of concurrent import bindings by supporting `async`/`await` sugar and allowing store access to be arbitrarily interspersed between `await` points -- while still preventing references to the store across `await` points. 2. Get rid of the `Data` associated types for `Host` traits and support `add_to_linker_get_host` where the `Host` impl is not the same as the `T` in `Store<T>`. 3. Allow creating, reading from, writing to, etc. streams and futures without exposing `StoreContextMut` directly. Unfortunately, after a day of intense type tetris I failed to achieve items 2 or 3, so this only covers item 1. Regarding item 1: I've introduced a new `Accessor` type which wraps a `*mut dyn VMStore` and provides access to it only via a `with` method that accepts a synchronous closure which takes a `StoreContextMut<T>` parameter. The closure can do what it likes and return an arbitrary value as long as that result has a `'static` lifetime (i.e. does not borrow from the store). This ensures that the host function is able to access the store only between `await`s and not across them; we prohibit the latter because it would prevent other async-lowered imports from running concurrently. Finally, since host function takes a `&mut Accessor<T>`, it is not possible for the reference to outlive the future returned by the host function, and since the `with` method takes `&mut self` it cannot be used recursively. Regarding items 2 and 3: In order to read from or write to streams/futures, we need to be able to efficiently lift and lower their payload types, which requires that both the payload type (of which there could be several for a given world) and the `T` in `Store<T>` be in scope. I was unable to find a way to thread those types through the various layers of closures, futures, and generated code without adding unwanted `'static` bounds and/or breaking the blanket `impl`s used for forwarding calls from `&mut X` to `X`. Also, the usual tricks of using dyn objects or vtables could only be used ergonomically to erase one of the two types but not both. I'd love to revisit this later with the help of a Rust type wizard to see if there's a strategy I missed. Signed-off-by: Joel Dice <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this looks great. I've tested the changes by rebasing #1 on top of the refactor and I think it's a big improvement!
For example, here's what bind and connect look like now:
wasip3-prototyping/crates/wasi/src/p3/sockets/host/types/tcp.rs
Lines 67 to 153 in 7dbc8a9
| async fn bind( | |
| store: &mut Accessor<Self::TcpSocketData>, | |
| socket: Resource<TcpSocket>, | |
| local_address: IpSocketAddress, | |
| ) -> wasmtime::Result<Result<(), ErrorCode>> { | |
| let local_address = SocketAddr::from(local_address); | |
| if !is_tcp_allowed(store) | |
| || !is_addr_allowed(store, local_address, SocketAddrUse::TcpBind).await | |
| { | |
| return Ok(Err(ErrorCode::AccessDenied)); | |
| } | |
| store.with(|mut store| { | |
| let socket = get_socket(store.data_mut().table(), &socket)?; | |
| if !is_valid_unicast_address(local_address.ip(), socket.family) { | |
| return Ok(Err(ErrorCode::InvalidArgument)); | |
| } | |
| match mem::replace(&mut socket.tcp_state, TcpState::Closed) { | |
| TcpState::Default(sock) => { | |
| if let Err(err) = bind(&sock, local_address) { | |
| socket.tcp_state = TcpState::Default(sock); | |
| Ok(Err(err)) | |
| } else { | |
| socket.tcp_state = TcpState::Bound(sock); | |
| Ok(Ok(())) | |
| } | |
| } | |
| tcp_state => { | |
| socket.tcp_state = tcp_state; | |
| Ok(Err(ErrorCode::InvalidState)) | |
| } | |
| } | |
| }) | |
| } | |
| async fn connect( | |
| store: &mut Accessor<Self::TcpSocketData>, | |
| socket: Resource<TcpSocket>, | |
| remote_address: IpSocketAddress, | |
| ) -> wasmtime::Result<Result<(), ErrorCode>> { | |
| let remote_address = SocketAddr::from(remote_address); | |
| if !is_tcp_allowed(store) | |
| || !is_addr_allowed(store, remote_address, SocketAddrUse::TcpConnect).await | |
| { | |
| return Ok(Err(ErrorCode::AccessDenied)); | |
| } | |
| let ip = remote_address.ip().to_canonical(); | |
| if ip.is_unspecified() || remote_address.port() == 0 { | |
| return Ok(Err(ErrorCode::InvalidArgument)); | |
| } | |
| match store.with(|mut store| { | |
| let socket = get_socket(store.data_mut().table(), &socket)?; | |
| if !is_valid_unicast_address(ip, socket.family) { | |
| return Ok(Err(ErrorCode::InvalidArgument)); | |
| } | |
| match mem::replace(&mut socket.tcp_state, TcpState::Connecting) { | |
| TcpState::Default(sock) | TcpState::Bound(sock) => Ok(Ok(sock)), | |
| tcp_state => { | |
| socket.tcp_state = tcp_state; | |
| Ok(Err(ErrorCode::InvalidState)) | |
| } | |
| } | |
| }) { | |
| Ok(Ok(sock)) => { | |
| let res = sock.connect(remote_address).await; | |
| store.with(|mut store| { | |
| let socket = get_socket(store.data_mut().table(), &socket)?; | |
| ensure!( | |
| matches!(socket.tcp_state, TcpState::Connecting), | |
| "corrupted socket state" | |
| ); | |
| match res { | |
| Ok(stream) => { | |
| socket.tcp_state = TcpState::Connected(stream); | |
| Ok(Ok(())) | |
| } | |
| Err(err) => { | |
| socket.tcp_state = TcpState::Closed; | |
| Ok(Err(err.into())) | |
| } | |
| } | |
| }) | |
| } | |
| Ok(Err(err)) => Ok(Err(err)), | |
| Err(err) => Err(err), | |
| } | |
| } |
I've attempted to implement listen using the "concurrent imports" as well, however I've stumbled upon what appears to be another blocker: listen returns a stream of tcp-socket resources (https://github.com/WebAssembly/wasi-sockets/blob/47282579d904116762fd4086cb78b65aa2faf679/wit-0.3.0-draft/types.wit#L289), which means that the host impl needs to start a Tokio task, and push a resource to the store on each successful accept. Alternatively, the (host) receiver side could handle pushing resources to the store, but it seems that somehow, we need to get access to the store asynchronously after the host binding returns. (
wasip3-prototyping/crates/wasi/src/p3/sockets/host/types/tcp.rs
Lines 180 to 194 in 7dbc8a9
| task: AbortOnDropJoinHandle::from(spawn({ | |
| let listener = Arc::clone(&listener); | |
| async move { | |
| _ = tx; | |
| loop { | |
| match listener.accept().await { | |
| Ok((mut stream, addr)) => { | |
| // TODO: find a way to create a socket resource | |
| eprintln!("accepted TCP connection from {addr}"); | |
| if let Err(err) = stream.shutdown().await { | |
| eprintln!( | |
| "failed to shutdown accepted stream: {err:?}" | |
| ) | |
| } | |
| } |
Alternatively, perhaps something like "message passing" could work where we'd be able to send "commands" to the (Wasmtime) async executor, which would be able to operate on resources in the store without the actual implementations ever getting access to it. Maybe it's something worth discussing later today at the CM meeting?
Yes, I like that idea. I was thinking something like a |
|
Something like this, perhaps: impl<T> Accessor<T> {
// ...
/// Spawn a background task which will receive an `&mut Accessor<T>` and run
/// concurrently with any other tasks in progress for the current instance.
pub fn spawn<F: Future<Output = Result<()>>>(&mut self, fun: impl FnOnce(&mut Self) -> F) {
/// ...
}
}I'll try implementing that today and test it with a |
I had a few goals with this PR:
Improve the ergonomics of concurrent import bindings by supporting
async/awaitsugar and allowing store access to be arbitrarily interspersed betweenawaitpoints -- while still preventing references to the store acrossawaitpoints.Get rid of the
Dataassociated types forHosttraits and supportadd_to_linker_get_hostwhere theHostimpl is not the same as theTinStore<T>.Allow creating, reading from, writing to, etc. streams and futures without exposing
StoreContextMutdirectly.Unfortunately, after a day of intense type tetris I failed to achieve items 2 or 3, so this only covers item 1.
Regarding item 1: I've introduced a new
Accessortype which wraps a*mut dyn VMStoreand provides access to it only via awithmethod that accepts a synchronous closure which takes aStoreContextMut<T>parameter. The closure can do what it likes and return an arbitrary value as long as that result has a'staticlifetime (i.e. does not borrow from the store). This ensures that the host function is able to access the store only betweenawaits and not across them; we prohibit the latter because it would prevent other async-lowered imports from running concurrently. Finally, since host function takes a&mut Accessor<T>, it is not possible for the reference to outlive the future returned by the host function, and since thewithmethod takes&mut selfit cannot be used recursively.Regarding items 2 and 3: In order to read from or write to streams/futures, we need to be able to efficiently lift and lower their payload types, which requires that both the payload type (of which there could be several for a given world) and the
TinStore<T>be in scope. I was unable to find a way to thread those types through the various layers of closures, futures, and generated code without adding unwanted'staticbounds and/or breaking the blanketimpls used for forwarding calls from&mut XtoX. Also, the usual tricks of using dyn objects or vtables could only be used ergonomically to erase one of the two types but not both. I'd love to revisit this later with the help of a Rust type wizard to see if there's a strategy I missed.