Skip to content

Commit 7c397dd

Browse files
committed
delay file events until matching xorb is emitted
1 parent 2e27699 commit 7c397dd

File tree

1 file changed

+47
-13
lines changed

1 file changed

+47
-13
lines changed

packages/hub/src/utils/createXorbs.ts

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const MAX_XORB_CHUNKS = 8 * 1024;
1818
const INTERVAL_BETWEEN_REMOTE_DEDUP = 4_000_000; // 4MB
1919

2020
export async function* createXorbs(
21-
fileSources: AsyncGenerator<{ content: Blob; path: string; sha256: string }>,
21+
fileSources: AsyncGenerator<{ content: Blob; path: string }>,
2222
params: XetWriteTokenParams
2323
): AsyncGenerator<
2424
| {
@@ -36,7 +36,6 @@ export async function* createXorbs(
3636
event: "file";
3737
path: string;
3838
hash: string;
39-
sha256: string;
4039
representation: Array<{
4140
xorbId: number | string; // either xorb id (for local xorbs) or xorb hash (for remote xorbs)
4241
offset: number;
@@ -59,9 +58,30 @@ export async function* createXorbs(
5958
let xorbOffset = 0;
6059
let xorbChunks = Array<{ hash: string; length: number; offset: number }>();
6160
/**
62-
* path => 0..1 mapping
61+
* path => 0..1 mapping of the current xorb
62+
*
63+
* eg
64+
*
65+
* A => 1
66+
* B => 1
67+
* C => 0.345
68+
*
69+
* If the xorb contains the end of file A, B, and up to 34.5% of file C
6370
*/
64-
let fileProgress: Record<string, number> = {};
71+
let xorbFileProgress: Record<string, number> = {};
72+
73+
const pendingFileEvents: Array<{
74+
event: "file";
75+
path: string;
76+
hash: string;
77+
representation: Array<{
78+
xorbId: number | string;
79+
offset: number;
80+
endOffset: number;
81+
length: number;
82+
rangeHash: string;
83+
}>;
84+
}> = [];
6585

6686
const remoteXorbHashes: string[] = [""]; // starts at index 1 (to simplify implem a bit)
6787
let bytesSinceRemoteDedup = Infinity;
@@ -148,14 +168,20 @@ export async function* createXorbs(
148168
hash: chunkModule.compute_xorb_hash(xorbChunks),
149169
chunks: [...xorbChunks],
150170
id: xorbId,
151-
files: Object.entries(fileProgress).map(([path, progress]) => ({ path, progress })),
171+
files: Object.entries(xorbFileProgress).map(([path, progress]) => ({ path, progress })),
152172
};
153173
xorbId++;
154174
xorb = new Uint8Array(XORB_SIZE);
155175
chunkOffset = 0;
156176
chunkXorbId = xorbId;
177+
xorbFileProgress = {};
178+
179+
for (const event of pendingFileEvents) {
180+
yield event;
181+
}
182+
pendingFileEvents.length = 0;
183+
157184
xorbOffset = writeChunk(xorb, 0, chunkToCopy);
158-
fileProgress = {};
159185

160186
if (xorbOffset === 0) {
161187
throw new Error("Failed to write chunk into xorb");
@@ -202,21 +228,26 @@ export async function* createXorbs(
202228
}
203229
}
204230
xorbChunks.push({ hash: chunk.hash, length: chunk.length, offset: chunkOffset });
205-
fileProgress[fileSource.path] = processedBytes / fileSource.content.size;
231+
xorbFileProgress[fileSource.path] = processedBytes / fileSource.content.size;
206232
if (xorbChunks.length >= MAX_XORB_CHUNKS) {
207233
yield {
208234
event: "xorb" as const,
209235
xorb: xorb.subarray(0, xorbOffset),
210236
hash: chunkModule.compute_xorb_hash(xorbChunks),
211237
chunks: [...xorbChunks],
212238
id: xorbId,
213-
files: Object.entries(fileProgress).map(([path, progress]) => ({ path, progress })),
239+
files: Object.entries(xorbFileProgress).map(([path, progress]) => ({ path, progress })),
214240
};
215241
xorbId++;
216242
xorbOffset = 0;
217243
xorbChunks = [];
218-
fileProgress = {};
244+
xorbFileProgress = {};
219245
xorb = new Uint8Array(XORB_SIZE);
246+
247+
for (const event of pendingFileEvents) {
248+
yield event;
249+
}
250+
pendingFileEvents.length = 0;
220251
}
221252
}
222253
};
@@ -239,13 +270,12 @@ export async function* createXorbs(
239270
);
240271
}
241272

242-
yield {
273+
pendingFileEvents.push({
243274
event: "file" as const,
244275
path: fileSource.path,
245276
hash: chunkModule.compute_file_hash(fileChunks),
246277
representation: fileRepresentation,
247-
sha256: fileSource.sha256,
248-
};
278+
});
249279
}
250280

251281
if (xorbOffset > 0) {
@@ -255,9 +285,13 @@ export async function* createXorbs(
255285
hash: chunkModule.compute_xorb_hash(xorbChunks),
256286
chunks: [...xorbChunks],
257287
id: xorbId,
258-
files: Object.entries(fileProgress).map(([path, progress]) => ({ path, progress })),
288+
files: Object.entries(xorbFileProgress).map(([path, progress]) => ({ path, progress })),
259289
};
260290
}
291+
292+
for (const event of pendingFileEvents) {
293+
yield event;
294+
}
261295
} finally {
262296
chunker.free();
263297
// ^ is this really needed ?

0 commit comments

Comments
 (0)