Skip to content

Commit 2a0c086

Browse files
committed
feat: handle resync & missing updates
1 parent 05dda36 commit 2a0c086

File tree

2 files changed

+44
-14
lines changed

2 files changed

+44
-14
lines changed

src/y-socket-io/client.js

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -340,19 +340,24 @@ export class SocketIOProvider extends Observable {
340340
)
341341
}
342342
if (resyncInterval > 0) {
343-
this.resyncInterval = setInterval(() => {
344-
if (this.socket.disconnected) return
345-
this.socket.emit(
346-
'sync-step-1',
347-
Y.encodeStateVector(this.doc),
348-
(/** @type {Uint8Array} */ update) => {
349-
Y.applyUpdate(this.doc, new Uint8Array(update), this)
350-
}
351-
)
352-
}, resyncInterval)
343+
this.resyncInterval = setInterval(() => this.resync(), resyncInterval)
353344
}
354345
}
355346

347+
/**
348+
* Resynchronize the document with the server by firing `sync-step-1`.
349+
*/
350+
resync () {
351+
if (this.socket.disconnected) return
352+
this.socket.emit(
353+
'sync-step-1',
354+
Y.encodeStateVector(this.doc),
355+
(/** @type {Uint8Array} */ update) => {
356+
Y.applyUpdate(this.doc, new Uint8Array(update), this)
357+
}
358+
)
359+
}
360+
356361
/**
357362
* Disconnect provider's socket
358363
* @type {() => void}
@@ -413,6 +418,11 @@ export class SocketIOProvider extends Observable {
413418
super.destroy()
414419
}
415420

421+
/**
422+
* @type {number}
423+
* @private
424+
*/
425+
_updateRetries = 0
416426
/**
417427
* This function is executed when the document is updated, if the instance that
418428
* emit the change is not this, it emit the changes by socket and broadcast channel.
@@ -421,9 +431,28 @@ export class SocketIOProvider extends Observable {
421431
* @param {SocketIOProvider} origin The SocketIOProvider instance that emits the change.
422432
* @readonly
423433
*/
424-
onUpdateDoc = (update, origin) => {
434+
onUpdateDoc = async (update, origin) => {
435+
if (this._updateRetries > 3) {
436+
this._updateRetries = 0
437+
this.disconnect()
438+
this.connect()
439+
return
440+
}
441+
425442
if (origin !== this) {
426-
this.socket.emit('sync-update', update)
443+
/** @type {boolean} */
444+
const ack = await Promise.race([
445+
new Promise((res) => this.socket.emit('sync-update', update, () => res(true))),
446+
new Promise((res) => setTimeout(() => res(false), 3000)),
447+
])
448+
if (!ack) {
449+
this._updateRetries++
450+
if (this.socket.disconnected) return
451+
this.onUpdateDoc(update, origin)
452+
return
453+
} else {
454+
this._updateRetries = 0
455+
}
427456
if (this.bcconnected) {
428457
bc.publish(
429458
this._broadcastChannel,

src/y-socket-io/y-socket-io.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,8 @@ export class YSocketIO {
330330

331331
/** @type {unknown} */
332332
let prevMsg = null
333-
socket.on('sync-update', (/** @type {ArrayBuffer} */ update) => {
334-
if (isDeepStrictEqual(update, prevMsg)) return
333+
socket.on('sync-update', (/** @type {ArrayBuffer} */ update, /** @type {() => void} */ ack) => {
334+
if (isDeepStrictEqual(update, prevMsg)) return ack()
335335
assert(this.client)
336336
const namespace = this.getNamespaceString(socket.nsp)
337337
const message = Buffer.from(update.slice(0, update.byteLength))
@@ -341,6 +341,7 @@ export class YSocketIO {
341341
'index',
342342
Buffer.from(this.toRedis('sync-update', message))
343343
)
344+
.then(() => ack())
344345
.catch(console.error)
345346
prevMsg = update
346347
})

0 commit comments

Comments
 (0)