Skip to content
Merged
21 changes: 19 additions & 2 deletions packages/core/src/meshDevice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ export class MeshDevice {
this.isConfigured = true;
} else if (status === DeviceStatusEnum.DeviceConfiguring) {
this.isConfigured = false;
} else if (status === DeviceStatusEnum.DeviceDisconnected) {
if (this._heartbeatIntervalId !== undefined) {
clearInterval(this._heartbeatIntervalId);
}
this.complete();
}
});

Expand Down Expand Up @@ -770,7 +775,14 @@ export class MeshDevice {
},
});

return this.sendRaw(toBinary(Protobuf.Mesh.ToRadioSchema, toRadio));
return this.sendRaw(toBinary(Protobuf.Mesh.ToRadioSchema, toRadio)).catch(
(e) => {
if (this.deviceStatus === DeviceStatusEnum.DeviceDisconnected) {
throw new Error("Device connection lost");
}
throw e;
},
);
}

/**
Expand All @@ -797,7 +809,12 @@ export class MeshDevice {
clearInterval(this._heartbeatIntervalId);
}
this._heartbeatIntervalId = setInterval(() => {
this.heartbeat();
this.heartbeat().catch((err) => {
this.log.error(
Emitter[Emitter.Ping],
`⚠️ Unable to send heartbeat: ${err.message}`,
);
});
}, interval);
}

Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/utils/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ export class Queue {
await writer.write(item.data);
item.sent = true;
} catch (error) {
if (
error?.code === "ECONNRESET" ||
error?.code === "ERR_INVALID_STATE"
) {
writer.releaseLock();
this.lock = false;
throw error;
}
console.error(`Error sending packet ${item.id}`, error);
}
}
Expand Down
28 changes: 15 additions & 13 deletions packages/core/src/utils/transform/toDevice.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
/**
* Pads packets with appropriate framing information before writing to the output stream.
*/
export const toDeviceStream: TransformStream<Uint8Array, Uint8Array> =
new TransformStream<Uint8Array, Uint8Array>({
transform(chunk: Uint8Array, controller): void {
const bufLen = chunk.length;
const header = new Uint8Array([
0x94,
0xc3,
(bufLen >> 8) & 0xff,
bufLen & 0xff,
]);
controller.enqueue(new Uint8Array([...header, ...chunk]));
},
});
export const toDeviceStream: () => TransformStream<Uint8Array, Uint8Array> =
() => {
return new TransformStream<Uint8Array, Uint8Array>({
transform(chunk: Uint8Array, controller): void {
const bufLen = chunk.length;
const header = new Uint8Array([
0x94,
0xc3,
(bufLen >> 8) & 0xff,
bufLen & 0xff,
]);
controller.enqueue(new Uint8Array([...header, ...chunk]));
},
});
};
5 changes: 3 additions & 2 deletions packages/transport-deno/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ export class TransportDeno implements Types.Transport {

constructor(connection: Deno.Conn) {
this.connection = connection;
Utils.toDeviceStream.readable.pipeTo(this.connection.writable);
const toDeviceStream = Utils.toDeviceStream();
toDeviceStream.readable.pipeTo(this.connection.writable);

this._toDevice = Utils.toDeviceStream.writable;
this._toDevice = toDeviceStream.writable;
this._fromDevice = this.connection.readable.pipeThrough(
Utils.fromDeviceStream(),
);
Expand Down
2 changes: 2 additions & 0 deletions packages/transport-http/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export class TransportHTTP implements Types.Transport {
Types.DeviceStatusEnum.DeviceDisconnected,
this.isTimeoutOrAbort(error) ? "write-timeout" : "write-error",
);
return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix the lint rule for this, to be explicit we should return undefined as that is what an empty return does

}
throw error;
}
Expand Down Expand Up @@ -165,6 +166,7 @@ export class TransportHTTP implements Types.Transport {
Types.DeviceStatusEnum.DeviceDisconnected,
this.isTimeoutOrAbort(error) ? "write-timeout" : "write-error",
);
return;
}
throw error;
}
Expand Down
14 changes: 8 additions & 6 deletions packages/transport-node-serial/src/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ class FakeSerialPort extends Duplex {
}

function stubCoreTransforms() {
const toDevice = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
const toDevice = () =>
new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});

const fromDeviceFactory = () =>
new TransformStream<Uint8Array, Types.DeviceOutput>({
Expand All @@ -67,8 +68,9 @@ function stubCoreTransforms() {
});

// Utils.toDeviceStream is a getter
const transform = Utils.toDeviceStream;
vi.spyOn(Utils, "toDeviceStream", "get").mockReturnValue(
toDevice as unknown as typeof Utils.toDeviceStream,
toDevice as unknown as typeof transform,
);

vi.spyOn(Utils, "fromDeviceStream").mockImplementation(
Expand Down
5 changes: 3 additions & 2 deletions packages/transport-node-serial/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ export class TransportNodeSerial implements Types.Transport {
});

// Stream for data going FROM the application TO the Meshtastic device.
this._toDevice = Utils.toDeviceStream.writable;
const toDeviceTransform = Utils.toDeviceStream();
this._toDevice = toDeviceTransform.writable;

this.pipePromise = Utils.toDeviceStream.readable
this.pipePromise = toDeviceTransform.readable
.pipeTo(Writable.toWeb(port) as WritableStream<Uint8Array>, {
signal: controller.signal,
})
Expand Down
14 changes: 8 additions & 6 deletions packages/transport-node/src/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ class FakeSocket extends Duplex {
}

function stubCoreTransforms() {
const toDevice = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
const toDevice = () =>
new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});

const fromDeviceFactory = () =>
new TransformStream<Uint8Array, Types.DeviceOutput>({
Expand All @@ -67,8 +68,9 @@ function stubCoreTransforms() {
},
});

const transform = Utils.toDeviceStream;
vi.spyOn(Utils, "toDeviceStream", "get").mockReturnValue(
toDevice as unknown as typeof Utils.toDeviceStream,
toDevice as unknown as typeof transform,
);

vi.spyOn(Utils, "fromDeviceStream").mockImplementation(
Expand Down
59 changes: 45 additions & 14 deletions packages/transport-node/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,36 @@ export class TransportNode implements Types.Transport {
Types.DeviceStatusEnum.DeviceDisconnected;

private closingByUser = false;
private errored = false;

/**
* Creates and connects a new TransportNode instance.
* @param hostname - The IP address or hostname of the Meshtastic device.
* @param port - The port number for the TCP connection (defaults to 4403).
* @param timeout - TCP socket timeout in milliseconds (defaults to 60000).
* @returns A promise that resolves with a connected TransportNode instance.
*/
public static create(hostname: string, port = 4403): Promise<TransportNode> {
public static create(
hostname: string,
port = 4403,
timeout = 60000,
): Promise<TransportNode> {
return new Promise((resolve, reject) => {
const socket = new Socket();

const onError = (err: Error) => {
socket.destroy();
socket.removeAllListeners();
reject(err);
};

socket.once("error", onError);

socket.connect(port, hostname, () => {
socket.once("ready", () => {
socket.removeListener("error", onError);
resolve(new TransportNode(socket));
});
socket.setTimeout(timeout);
socket.connect(port, hostname);
});
}

Expand All @@ -52,8 +60,10 @@ export class TransportNode implements Types.Transport {
constructor(connection: Socket) {
this.socket = connection;

this.socket.on("error", (err) => {
console.error("Socket connection error:", err);
this.socket.on("error", () => {
this.errored = true;
this.socket?.removeAllListeners();
this.socket?.destroy();
if (!this.closingByUser) {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
Expand All @@ -62,6 +72,24 @@ export class TransportNode implements Types.Transport {
}
});

this.socket.on("end", () => {
if (this.closingByUser) {
return; // suppress close-derived disconnect in user flow
}
this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "socket-end");
this.socket?.removeAllListeners();
this.socket?.destroy();
});

this.socket.on("timeout", () => {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"socket-timeout",
);
this.socket?.removeAllListeners();
this.socket?.destroy();
});

this.socket.on("close", () => {
if (this.closingByUser) {
return; // suppress close-derived disconnect in user flow
Expand Down Expand Up @@ -98,7 +126,7 @@ export class TransportNode implements Types.Transport {
}
ctrl.close();
} catch (error) {
if (this.closingByUser) {
if (this.closingByUser || this.errored) {
ctrl.close();
} else {
this.emitStatus(
Expand All @@ -120,18 +148,17 @@ export class TransportNode implements Types.Transport {
});

// Stream for data going FROM the application TO the Meshtastic device.
const toDeviceTransform = Utils.toDeviceStream;
const toDeviceTransform = Utils.toDeviceStream();
this._toDevice = toDeviceTransform.writable;

this.pipePromise = toDeviceTransform.readable
.pipeTo(Writable.toWeb(connection) as WritableStream<Uint8Array>, {
signal: abortController.signal,
})
.catch((err) => {
if (abortController.signal.aborted) {
if (abortController.signal.aborted || this.socket?.destroyed) {
return;
}
console.error("Error piping data to socket:", err);
const error = err instanceof Error ? err : new Error(String(err));
this.socket?.destroy(error);
});
Expand Down Expand Up @@ -160,11 +187,11 @@ export class TransportNode implements Types.Transport {
if (this.pipePromise) {
await this.pipePromise;
}

this.socket?.destroy();
} finally {
this.socket = undefined;
this.closingByUser = false;
this.errored = false;
}
}

Expand All @@ -173,9 +200,13 @@ export class TransportNode implements Types.Transport {
return;
}
this.lastStatus = next;
this.fromDeviceController?.enqueue({
type: "status",
data: { status: next, reason },
});
try {
this.fromDeviceController?.enqueue({
type: "status",
data: { status: next, reason },
});
} catch (e) {
console.error("Enqueue fail", e);
}
}
}
14 changes: 8 additions & 6 deletions packages/transport-web-serial/src/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import { runTransportContract } from "../../../tests/utils/transportContract.ts"
import { TransportWebSerial } from "./transport.ts";

function stubCoreTransforms() {
const toDevice = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
const toDevice = () =>
new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});

// maps raw bytes -> DeviceOutput.packet
const fromDeviceFactory = () =>
Expand All @@ -18,9 +19,10 @@ function stubCoreTransforms() {
},
});

const transform = Utils.toDeviceStream;
const restoreTo = vi
.spyOn(Utils, "toDeviceStream", "get")
.mockReturnValue(toDevice as unknown as typeof Utils.toDeviceStream);
.mockReturnValue(toDevice as unknown as typeof transform);

const restoreFrom = vi
.spyOn(Utils, "fromDeviceStream")
Expand Down
8 changes: 5 additions & 3 deletions packages/transport-web-serial/src/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ export class TransportWebSerial implements Types.Transport {
const abortController = this.abortController;

// Set up the pipe with abort signal for clean cancellation
this.pipePromise = Utils.toDeviceStream.readable
const toDeviceTransform = Utils.toDeviceStream();
this.pipePromise = toDeviceTransform.readable
.pipeTo(connection.writable, { signal: this.abortController.signal })
.catch((err) => {
// Ignore expected rejection when we cancel it via the AbortController.
Expand All @@ -73,7 +74,7 @@ export class TransportWebSerial implements Types.Transport {
);
});

this._toDevice = Utils.toDeviceStream.writable;
this._toDevice = toDeviceTransform.writable;

// Wrap + capture controller to inject status packets
this._fromDevice = new ReadableStream<Types.DeviceOutput>({
Expand Down Expand Up @@ -199,7 +200,8 @@ export class TransportWebSerial implements Types.Transport {
const abortController = this.abortController;

// Re-establish the pipe connection
this.pipePromise = Utils.toDeviceStream.readable
const toDeviceTransform = Utils.toDeviceStream();
this.pipePromise = toDeviceTransform.readable
.pipeTo(this.connection.writable, {
signal: this.abortController.signal,
})
Expand Down