Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 61 additions & 7 deletions src/backend/iocp.zig
Original file line number Diff line number Diff line change
Expand Up @@ -545,13 +545,47 @@ pub const Loop = struct {
.close => |v| .{ .result = .{ .close = windows.CloseHandle(v.fd) } },

.connect => |*v| action: {
const result = windows.ws2_32.connect(asSocket(v.socket), &v.addr.any, @as(i32, @intCast(v.addr.getOsSockLen())));
if (result != 0) {
const err = windows.ws2_32.WSAGetLastError();
break :action switch (err) {
else => .{ .result = .{ .connect = windows.unexpectedWSAError(err) } },
const as_socket = asSocket(v.socket);
// Associate our socket with loop's completion port.
self.associate_fd(completion.handle().?) catch unreachable;

// ConnectEx requires socket to be initially bound.
// https://github.com/tigerbeetle/tigerbeetle/blob/main/src/io/windows.zig#L467
{
const inaddr_any = std.mem.zeroes([4]u8);
const bind_addr = std.net.Address.initIp4(inaddr_any, 0);
// NOTE: This may return many other errors; we should extend `ConnectError` set.
posix.bind(as_socket, &bind_addr.any, bind_addr.getOsSockLen()) catch unreachable;
}

// Dynamically load the ConnectEx function.
const ConnectEx = windows.loadWinsockExtensionFunction(windows.LPFN_CONNECTEX, as_socket, windows.ws2_32.WSAID_CONNECTEX) catch |err| switch (err) {
error.OperationNotSupported => unreachable, // Something other than sockets has given.
error.FileDescriptorNotASocket => unreachable, // Must be preferred on a socket.
error.ShortRead => unreachable,
error.Unexpected => break :action .{ .result = .{ .connect = error.Unexpected } },
};

// Connect attempt.
var bytes_transferred: windows.DWORD = 0;
const result = ConnectEx(as_socket, &v.addr.any, v.addr.getOsSockLen(), null, 0, &bytes_transferred, &completion.overlapped);

// If ConnectEx returns `windows.TRUE`, it means operation completed immediately.
// Which is most of the time not the case; we should check it anyways though!
if (result == windows.FALSE) {
// NOTE: This may return many other errors; we should extend `ConnectError` set.
break :action switch (windows.ws2_32.WSAGetLastError()) {
.WSA_IO_PENDING, .WSAEWOULDBLOCK, .WSA_IO_INCOMPLETE => .{ .submitted = {} }, // Operation will be completed in the future.
else => |err| .{ .result = .{ .connect = windows.unexpectedWSAError(err) } },
};
}

// Surprisingly, we connected immediately.
// The following is necessary for various functions (shutdown, getsockopt, setsockopt, getsockname, getpeername) to work.
// https://learn.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex#remarks
// https://stackoverflow.com/questions/13598530/connectex-requires-the-socket-to-be-initially-bound-but-to-what
_ = windows.ws2_32.setsockopt(as_socket, windows.ws2_32.SOL.SOCKET, windows.ws2_32.SO.UPDATE_CONNECT_CONTEXT, null, 0);

break :action .{ .result = .{ .connect = {} } };
},

Expand Down Expand Up @@ -950,7 +984,7 @@ pub const Completion = struct {
/// Returns a handle for the current operation if it makes sense.
fn handle(self: Completion) ?windows.HANDLE {
return switch (self.op) {
inline .accept => |*v| v.socket,
inline .accept, .connect => |*v| v.socket,
inline .read, .pread, .write, .pwrite, .recv, .send, .recvfrom, .sendto => |*v| v.fd,
else => null,
};
Expand All @@ -960,7 +994,7 @@ pub const Completion = struct {
/// operation for the completion.
pub fn perform(self: *Completion) Result {
return switch (self.op) {
.noop, .close, .connect, .shutdown, .timer, .cancel => {
.noop, .close, .shutdown, .timer, .cancel => {
std.log.warn("perform op={s}", .{@tagName(self.op)});
unreachable;
},
Expand All @@ -985,6 +1019,26 @@ pub const Completion = struct {
return .{ .accept = self.op.accept.internal_accept_socket.? };
},

.connect => |*v| r: {
const as_socket = asSocket(v.socket);
var transferred: windows.DWORD = 0;
var flags: windows.DWORD = 0;
const result = windows.ws2_32.WSAGetOverlappedResult(as_socket, &self.overlapped, &transferred, windows.FALSE, &flags);

// Connected successfully.
if (result == windows.TRUE) {
// The following is necessary for various functions (shutdown, getsockopt, setsockopt, getsockname, getpeername) to work.
// https://learn.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex#remarks
// https://stackoverflow.com/questions/13598530/connectex-requires-the-socket-to-be-initially-bound-but-to-what
_ = windows.ws2_32.setsockopt(as_socket, windows.ws2_32.SOL.SOCKET, windows.ws2_32.SO.UPDATE_CONNECT_CONTEXT, null, 0);

break :r .{ .connect = {} };
}

// We got an error.
break :r .{ .connect = windows.unexpectedWSAError(windows.ws2_32.WSAGetLastError()) };
},

.read => |*v| {
var bytes_transferred: windows.DWORD = 0;
const result = windows.kernel32.GetOverlappedResult(v.fd, &self.overlapped, &bytes_transferred, windows.FALSE);
Expand Down
12 changes: 12 additions & 0 deletions src/windows.zig
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,21 @@ pub const QueryPerformanceFrequency = windows.QueryPerformanceFrequency;
pub const GetQueuedCompletionStatusEx = windows.GetQueuedCompletionStatusEx;
pub const PostQueuedCompletionStatus = windows.PostQueuedCompletionStatus;
pub const CreateIoCompletionPort = windows.CreateIoCompletionPort;
pub const loadWinsockExtensionFunction = windows.loadWinsockExtensionFunction;

pub extern "kernel32" fn DeleteFileW(lpFileName: [*:0]const u16) callconv(windows.WINAPI) windows.BOOL;

/// https://learn.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex
pub const LPFN_CONNECTEX = *const fn (
Socket: windows.ws2_32.SOCKET,
SockAddr: *const windows.ws2_32.sockaddr,
SockLen: posix.socklen_t,
SendBuf: ?*const anyopaque,
SendBufLen: windows.DWORD,
BytesSent: *windows.DWORD,
Overlapped: *windows.OVERLAPPED,
) callconv(.winapi) windows.BOOL;

pub const exp = struct {
pub const STATUS_PENDING = 0x00000103;
pub const STILL_ACTIVE = STATUS_PENDING;
Expand Down
Loading