Skip to content

Commit bcdda8b

Browse files
authored
Connection robustness improvements (#813)
* Clear heartbeat and queue when disconnected * Give clearer error in case configure fails due to a lost connection. Used to throw 'Packet does not exist' * If the queue processing error is due to a lost connection, throw it instead of looping endlessly * In case we send a disconnection event we don't need to also throw * Catch heartbeat errors * Also handle invalid state errors * Handle socket timeouts * Log heartbeat failures * Make linter happy * Transform stream being a singleton prevented reconnection attempts * Adapt tests to not using singleton * Aborting already ends the connection
1 parent dcb44d2 commit bcdda8b

File tree

11 files changed

+124
-54
lines changed

11 files changed

+124
-54
lines changed

packages/core/src/meshDevice.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ export class MeshDevice {
6464
this.isConfigured = true;
6565
} else if (status === DeviceStatusEnum.DeviceConfiguring) {
6666
this.isConfigured = false;
67+
} else if (status === DeviceStatusEnum.DeviceDisconnected) {
68+
if (this._heartbeatIntervalId !== undefined) {
69+
clearInterval(this._heartbeatIntervalId);
70+
}
71+
this.complete();
6772
}
6873
});
6974

@@ -770,7 +775,14 @@ export class MeshDevice {
770775
},
771776
});
772777

773-
return this.sendRaw(toBinary(Protobuf.Mesh.ToRadioSchema, toRadio));
778+
return this.sendRaw(toBinary(Protobuf.Mesh.ToRadioSchema, toRadio)).catch(
779+
(e) => {
780+
if (this.deviceStatus === DeviceStatusEnum.DeviceDisconnected) {
781+
throw new Error("Device connection lost");
782+
}
783+
throw e;
784+
},
785+
);
774786
}
775787

776788
/**
@@ -797,7 +809,12 @@ export class MeshDevice {
797809
clearInterval(this._heartbeatIntervalId);
798810
}
799811
this._heartbeatIntervalId = setInterval(() => {
800-
this.heartbeat();
812+
this.heartbeat().catch((err) => {
813+
this.log.error(
814+
Emitter[Emitter.Ping],
815+
`⚠️ Unable to send heartbeat: ${err.message}`,
816+
);
817+
});
801818
}, interval);
802819
}
803820

packages/core/src/utils/queue.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,14 @@ export class Queue {
117117
await writer.write(item.data);
118118
item.sent = true;
119119
} catch (error) {
120+
if (
121+
error?.code === "ECONNRESET" ||
122+
error?.code === "ERR_INVALID_STATE"
123+
) {
124+
writer.releaseLock();
125+
this.lock = false;
126+
throw error;
127+
}
120128
console.error(`Error sending packet ${item.id}`, error);
121129
}
122130
}
Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
/**
22
* Pads packets with appropriate framing information before writing to the output stream.
33
*/
4-
export const toDeviceStream: TransformStream<Uint8Array, Uint8Array> =
5-
new TransformStream<Uint8Array, Uint8Array>({
6-
transform(chunk: Uint8Array, controller): void {
7-
const bufLen = chunk.length;
8-
const header = new Uint8Array([
9-
0x94,
10-
0xc3,
11-
(bufLen >> 8) & 0xff,
12-
bufLen & 0xff,
13-
]);
14-
controller.enqueue(new Uint8Array([...header, ...chunk]));
15-
},
16-
});
4+
export const toDeviceStream: () => TransformStream<Uint8Array, Uint8Array> =
5+
() => {
6+
return new TransformStream<Uint8Array, Uint8Array>({
7+
transform(chunk: Uint8Array, controller): void {
8+
const bufLen = chunk.length;
9+
const header = new Uint8Array([
10+
0x94,
11+
0xc3,
12+
(bufLen >> 8) & 0xff,
13+
bufLen & 0xff,
14+
]);
15+
controller.enqueue(new Uint8Array([...header, ...chunk]));
16+
},
17+
});
18+
};

packages/transport-deno/src/transport.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ export class TransportDeno implements Types.Transport {
1616

1717
constructor(connection: Deno.Conn) {
1818
this.connection = connection;
19-
Utils.toDeviceStream.readable.pipeTo(this.connection.writable);
19+
const toDeviceStream = Utils.toDeviceStream();
20+
toDeviceStream.readable.pipeTo(this.connection.writable);
2021

21-
this._toDevice = Utils.toDeviceStream.writable;
22+
this._toDevice = toDeviceStream.writable;
2223
this._fromDevice = this.connection.readable.pipeThrough(
2324
Utils.fromDeviceStream(),
2425
);

packages/transport-http/src/transport.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ export class TransportHTTP implements Types.Transport {
7676
Types.DeviceStatusEnum.DeviceDisconnected,
7777
this.isTimeoutOrAbort(error) ? "write-timeout" : "write-error",
7878
);
79+
return;
7980
}
8081
throw error;
8182
}
@@ -165,6 +166,7 @@ export class TransportHTTP implements Types.Transport {
165166
Types.DeviceStatusEnum.DeviceDisconnected,
166167
this.isTimeoutOrAbort(error) ? "write-timeout" : "write-error",
167168
);
169+
return;
168170
}
169171
throw error;
170172
}

packages/transport-node-serial/src/transport.test.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,12 @@ class FakeSerialPort extends Duplex {
5353
}
5454

5555
function stubCoreTransforms() {
56-
const toDevice = new TransformStream<Uint8Array, Uint8Array>({
57-
transform(chunk, controller) {
58-
controller.enqueue(chunk);
59-
},
60-
});
56+
const toDevice = () =>
57+
new TransformStream<Uint8Array, Uint8Array>({
58+
transform(chunk, controller) {
59+
controller.enqueue(chunk);
60+
},
61+
});
6162

6263
const fromDeviceFactory = () =>
6364
new TransformStream<Uint8Array, Types.DeviceOutput>({
@@ -67,8 +68,9 @@ function stubCoreTransforms() {
6768
});
6869

6970
// Utils.toDeviceStream is a getter
71+
const transform = Utils.toDeviceStream;
7072
vi.spyOn(Utils, "toDeviceStream", "get").mockReturnValue(
71-
toDevice as unknown as typeof Utils.toDeviceStream,
73+
toDevice as unknown as typeof transform,
7274
);
7375

7476
vi.spyOn(Utils, "fromDeviceStream").mockImplementation(

packages/transport-node-serial/src/transport.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,10 @@ export class TransportNodeSerial implements Types.Transport {
112112
});
113113

114114
// Stream for data going FROM the application TO the Meshtastic device.
115-
this._toDevice = Utils.toDeviceStream.writable;
115+
const toDeviceTransform = Utils.toDeviceStream();
116+
this._toDevice = toDeviceTransform.writable;
116117

117-
this.pipePromise = Utils.toDeviceStream.readable
118+
this.pipePromise = toDeviceTransform.readable
118119
.pipeTo(Writable.toWeb(port) as WritableStream<Uint8Array>, {
119120
signal: controller.signal,
120121
})

packages/transport-node/src/transport.test.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,12 @@ class FakeSocket extends Duplex {
5454
}
5555

5656
function stubCoreTransforms() {
57-
const toDevice = new TransformStream<Uint8Array, Uint8Array>({
58-
transform(chunk, controller) {
59-
controller.enqueue(chunk);
60-
},
61-
});
57+
const toDevice = () =>
58+
new TransformStream<Uint8Array, Uint8Array>({
59+
transform(chunk, controller) {
60+
controller.enqueue(chunk);
61+
},
62+
});
6263

6364
const fromDeviceFactory = () =>
6465
new TransformStream<Uint8Array, Types.DeviceOutput>({
@@ -67,8 +68,9 @@ function stubCoreTransforms() {
6768
},
6869
});
6970

71+
const transform = Utils.toDeviceStream;
7072
vi.spyOn(Utils, "toDeviceStream", "get").mockReturnValue(
71-
toDevice as unknown as typeof Utils.toDeviceStream,
73+
toDevice as unknown as typeof transform,
7274
);
7375

7476
vi.spyOn(Utils, "fromDeviceStream").mockImplementation(

packages/transport-node/src/transport.ts

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,36 @@ export class TransportNode implements Types.Transport {
2020
Types.DeviceStatusEnum.DeviceDisconnected;
2121

2222
private closingByUser = false;
23+
private errored = false;
2324

2425
/**
2526
* Creates and connects a new TransportNode instance.
2627
* @param hostname - The IP address or hostname of the Meshtastic device.
2728
* @param port - The port number for the TCP connection (defaults to 4403).
29+
* @param timeout - TCP socket timeout in milliseconds (defaults to 60000).
2830
* @returns A promise that resolves with a connected TransportNode instance.
2931
*/
30-
public static create(hostname: string, port = 4403): Promise<TransportNode> {
32+
public static create(
33+
hostname: string,
34+
port = 4403,
35+
timeout = 60000,
36+
): Promise<TransportNode> {
3137
return new Promise((resolve, reject) => {
3238
const socket = new Socket();
3339

3440
const onError = (err: Error) => {
3541
socket.destroy();
42+
socket.removeAllListeners();
3643
reject(err);
3744
};
3845

3946
socket.once("error", onError);
40-
41-
socket.connect(port, hostname, () => {
47+
socket.once("ready", () => {
4248
socket.removeListener("error", onError);
4349
resolve(new TransportNode(socket));
4450
});
51+
socket.setTimeout(timeout);
52+
socket.connect(port, hostname);
4553
});
4654
}
4755

@@ -52,8 +60,10 @@ export class TransportNode implements Types.Transport {
5260
constructor(connection: Socket) {
5361
this.socket = connection;
5462

55-
this.socket.on("error", (err) => {
56-
console.error("Socket connection error:", err);
63+
this.socket.on("error", () => {
64+
this.errored = true;
65+
this.socket?.removeAllListeners();
66+
this.socket?.destroy();
5767
if (!this.closingByUser) {
5868
this.emitStatus(
5969
Types.DeviceStatusEnum.DeviceDisconnected,
@@ -62,6 +72,24 @@ export class TransportNode implements Types.Transport {
6272
}
6373
});
6474

75+
this.socket.on("end", () => {
76+
if (this.closingByUser) {
77+
return; // suppress close-derived disconnect in user flow
78+
}
79+
this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "socket-end");
80+
this.socket?.removeAllListeners();
81+
this.socket?.destroy();
82+
});
83+
84+
this.socket.on("timeout", () => {
85+
this.emitStatus(
86+
Types.DeviceStatusEnum.DeviceDisconnected,
87+
"socket-timeout",
88+
);
89+
this.socket?.removeAllListeners();
90+
this.socket?.destroy();
91+
});
92+
6593
this.socket.on("close", () => {
6694
if (this.closingByUser) {
6795
return; // suppress close-derived disconnect in user flow
@@ -98,7 +126,7 @@ export class TransportNode implements Types.Transport {
98126
}
99127
ctrl.close();
100128
} catch (error) {
101-
if (this.closingByUser) {
129+
if (this.closingByUser || this.errored) {
102130
ctrl.close();
103131
} else {
104132
this.emitStatus(
@@ -120,18 +148,17 @@ export class TransportNode implements Types.Transport {
120148
});
121149

122150
// Stream for data going FROM the application TO the Meshtastic device.
123-
const toDeviceTransform = Utils.toDeviceStream;
151+
const toDeviceTransform = Utils.toDeviceStream();
124152
this._toDevice = toDeviceTransform.writable;
125153

126154
this.pipePromise = toDeviceTransform.readable
127155
.pipeTo(Writable.toWeb(connection) as WritableStream<Uint8Array>, {
128156
signal: abortController.signal,
129157
})
130158
.catch((err) => {
131-
if (abortController.signal.aborted) {
159+
if (abortController.signal.aborted || this.socket?.destroyed) {
132160
return;
133161
}
134-
console.error("Error piping data to socket:", err);
135162
const error = err instanceof Error ? err : new Error(String(err));
136163
this.socket?.destroy(error);
137164
});
@@ -160,11 +187,11 @@ export class TransportNode implements Types.Transport {
160187
if (this.pipePromise) {
161188
await this.pipePromise;
162189
}
163-
164190
this.socket?.destroy();
165191
} finally {
166192
this.socket = undefined;
167193
this.closingByUser = false;
194+
this.errored = false;
168195
}
169196
}
170197

@@ -173,9 +200,13 @@ export class TransportNode implements Types.Transport {
173200
return;
174201
}
175202
this.lastStatus = next;
176-
this.fromDeviceController?.enqueue({
177-
type: "status",
178-
data: { status: next, reason },
179-
});
203+
try {
204+
this.fromDeviceController?.enqueue({
205+
type: "status",
206+
data: { status: next, reason },
207+
});
208+
} catch (e) {
209+
console.error("Enqueue fail", e);
210+
}
180211
}
181212
}

packages/transport-web-serial/src/transport.test.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import { runTransportContract } from "../../../tests/utils/transportContract.ts"
44
import { TransportWebSerial } from "./transport.ts";
55

66
function stubCoreTransforms() {
7-
const toDevice = new TransformStream<Uint8Array, Uint8Array>({
8-
transform(chunk, controller) {
9-
controller.enqueue(chunk);
10-
},
11-
});
7+
const toDevice = () =>
8+
new TransformStream<Uint8Array, Uint8Array>({
9+
transform(chunk, controller) {
10+
controller.enqueue(chunk);
11+
},
12+
});
1213

1314
// maps raw bytes -> DeviceOutput.packet
1415
const fromDeviceFactory = () =>
@@ -18,9 +19,10 @@ function stubCoreTransforms() {
1819
},
1920
});
2021

22+
const transform = Utils.toDeviceStream;
2123
const restoreTo = vi
2224
.spyOn(Utils, "toDeviceStream", "get")
23-
.mockReturnValue(toDevice as unknown as typeof Utils.toDeviceStream);
25+
.mockReturnValue(toDevice as unknown as typeof transform);
2426

2527
const restoreFrom = vi
2628
.spyOn(Utils, "fromDeviceStream")

0 commit comments

Comments
 (0)