diff --git a/packages/filesystem-access/src/downloader.ts b/packages/filesystem-access/src/downloader.ts index cb03380..abd681c 100644 --- a/packages/filesystem-access/src/downloader.ts +++ b/packages/filesystem-access/src/downloader.ts @@ -3,37 +3,37 @@ import { FileMeta } from "./filemeta" export interface Downloader { readonly public: boolean - getLocation(): Promise - getEncryptionKey(): Promise + getLocation?(): Promise + getEncryptionKey?(): Promise - readonly cancelled: boolean - readonly errored: boolean - readonly started: boolean - readonly done: boolean - readonly paused: boolean + readonly cancelled?: boolean + readonly errored?: boolean + readonly started?: boolean + readonly done?: boolean + readonly paused?: boolean - readonly name: string + readonly name?: string - readonly size: number | undefined - readonly sizeOnFS: number | undefined + readonly size?: number | undefined + readonly sizeOnFS?: number | undefined - getDownloadUrl(): Promise - getMetadata(): Promise + getDownloadUrl?(): Promise + getMetadata?(): Promise - readonly output: ReadableStream | undefined + readonly output?: ReadableStream | undefined - readonly startTime: number | undefined - readonly endTime: number | undefined - readonly pauseDuration: number + readonly startTime?: number | undefined + readonly endTime?: number | undefined + readonly pauseDuration?: number _beforeDownload?: (d: this) => Promise _afterDownload?: (d: this) => Promise - pause(): Promise - unpause(): Promise + pause?(): Promise + unpause?(): Promise - start(): Promise | undefined> - finish(): Promise + start?(): Promise | undefined> + finish?(): Promise - cancel(): Promise + cancel?(): Promise } diff --git a/packages/filesystem-access/src/uploader.ts b/packages/filesystem-access/src/uploader.ts index 008e671..d96a229 100644 --- a/packages/filesystem-access/src/uploader.ts +++ b/packages/filesystem-access/src/uploader.ts @@ -3,36 +3,36 @@ import { FileMeta } from "./filemeta" export interface Uploader { readonly public: boolean - getLocation(): Promise - getEncryptionKey(): Promise + getLocation?(): Promise + getEncryptionKey?(): Promise - readonly cancelled: boolean - readonly errored: boolean - readonly started: boolean - readonly done: boolean - readonly paused: boolean + readonly cancelled?: boolean + readonly errored?: boolean + readonly started?: boolean + readonly done?: boolean + readonly paused?: boolean - readonly name: string - readonly path: string - readonly metadata: FileMeta + readonly name?: string + readonly path?: string + readonly metadata?: FileMeta - readonly size: number - readonly sizeOnFS: number + readonly size?: number + readonly sizeOnFS?: number - readonly output: TransformStream | undefined + readonly output?: TransformStream | undefined - readonly startTime: number | undefined - readonly endTime: number | undefined - readonly pauseDuration: number + readonly startTime?: number | undefined + readonly endTime?: number | undefined + readonly pauseDuration?: number _beforeUpload?: (u: this) => Promise _afterUpload?: (u: this) => Promise - pause(): Promise - unpause(): Promise + pause?(): Promise + unpause?(): Promise - start(): Promise | undefined> - finish(): Promise + start?(): Promise | undefined> + finish?(): Promise - cancel(): Promise + cancel?(): Promise } diff --git a/packages/opaque/src/download.ts b/packages/opaque/src/download.ts index 8e8606e..015081d 100755 --- a/packages/opaque/src/download.ts +++ b/packages/opaque/src/download.ts @@ -242,7 +242,7 @@ export class OpaqueDownload extends EventTarget implements Downloader, IDownload }) } - async getMetadata (): Promise { + async getMetadata (): Promise { return this._m.runExclusive(async () => { if (this._fileMeta) { return this._fileMeta diff --git a/packages/sia/fuse.ts b/packages/sia/fuse.ts new file mode 100755 index 0000000..da93033 --- /dev/null +++ b/packages/sia/fuse.ts @@ -0,0 +1,22 @@ +import { fusebox, pluginReplace } from "fuse-box" + +const fuse = fusebox({ + target: "browser", + entry: "src/index.ts", + webIndex: { + template: "src/index.html", + }, + devServer: true, + sourceMap: true, + hmr: false, + plugins: [ + pluginReplace(/node_modules\/bn\.js\/.*/, { + "require('buffer')": "require('" + require.resolve("./node_modules/buffer") + "')", + }), + pluginReplace(/node_modules\/readable-stream\/.*/, { + "require('util')": "require('" + require.resolve("./node_modules/util") + "')", + }), + ], +}) + +fuse.runDev() diff --git a/packages/sia/package.json b/packages/sia/package.json new file mode 100755 index 0000000..05306f6 --- /dev/null +++ b/packages/sia/package.json @@ -0,0 +1,37 @@ +{ + "name": "@opacity/sia", + "version": "0.0.1", + "description": "> TODO: description", + "author": "Khoa ", + "homepage": "https://github.com/opacity/ts-client-library#readme", + "main": "src/index.ts", + "directories": { + "lib": "src" + }, + "files": [ + "src" + ], + "repository": { + "type": "git", + "url": "git+https://github.com/opacity/ts-client-library.git" + }, + "bugs": { + "url": "https://github.com/opacity/ts-client-library/issues" + }, + "scripts": { + "start": "rm -rf ./.cache && rm -rf ./dist && npx ts-node --project ./config/tsconfig.dev.json fuse.ts", + "pretty": "prettierx --ignore-path ./config/.prettierxignore --config ./config/.prettierx.config.json --write .", + "test": "echo \"Error: run tests from root\" && exit 1" + }, + "dependencies": { + "@opacity/account-system": "^0.0.46", + "@opacity/filesystem-access": "^0.0.46", + "@opacity/middleware": "^0.0.46", + "@opacity/util": "^0.0.46", + "async-mutex": "^0.2.6", + "buffer": "^6.0.3", + "tslib": "^2.1.0", + "util": "^0.12.3", + "web-streams-polyfill": "^3.0.1" + } +} diff --git a/packages/sia/src/download.ts b/packages/sia/src/download.ts new file mode 100755 index 0000000..602bec7 --- /dev/null +++ b/packages/sia/src/download.ts @@ -0,0 +1,384 @@ +import { Mutex } from "async-mutex" + +import { blockSizeOnFS, numberOfBlocks, numberOfBlocksOnFS, sizeOnFS } from "@opacity/util/src/blocks" +import { blocksPerPart, numberOfPartsOnFS, partSizeOnFS } from "@opacity/util/src/parts" +import { bytesToHex } from "@opacity/util/src/hex" +import { CryptoMiddleware, NetworkMiddleware } from "@opacity/middleware" +import { Downloader } from "@opacity/filesystem-access/src/downloader" +import { + DownloadFinishedEvent, + DownloadMetadataEvent, + DownloadProgressEvent, + DownloadStartedEvent, + IDownloadEvents, +} from "@opacity/filesystem-access/src/events" +import { extractPromise } from "@opacity/util/src/promise" +import { FileMeta } from "@opacity/filesystem-access/src/filemeta" +import { + ISiaDownloadEvents, + SiaDownloadBlockFinishedEvent, + SiaDownloadBlockStartedEvent, + SiaDownloadPartFinishedEvent, + SiaDownloadPartStartedEvent, +} from "./events" +import { OQ } from "@opacity/util/src/oqueue" +import { + polyfillReadableStreamIfNeeded, + ReadableStream, + TransformStream, + WritableStream, +} from "@opacity/util/src/streams" +import { serializeEncrypted } from "@opacity/util/src/serializeEncrypted" +import { Uint8ArrayChunkStream } from "@opacity/util/src/streams" +import { AccountSystem, FileMetadata } from "@opacity/account-system/src" + +export type SiaDownloadConfig = { + storageNode: string + + crypto: CryptoMiddleware + net: NetworkMiddleware + + queueSize?: { + net?: number + decrypt?: number + } +} + +export type SiaDownloadArgs = { + config: SiaDownloadConfig + handle: Uint8Array + name: string + fileMeta: FileMetadata + accountSystem: AccountSystem +} + +export class SiaDownload extends EventTarget implements Downloader, IDownloadEvents, ISiaDownloadEvents { + readonly public = false + + config: SiaDownloadConfig + + _m = new Mutex() + + _location = extractPromise() + _encryptionKey = extractPromise() + + async getLocation (): Promise { + return this._location[0] + } + + async getEncryptionKey (): Promise { + return this._encryptionKey[0] + } + + _cancelled = false + _errored = false + _started = false + _done = false + + get cancelled () { + return this._cancelled + } + get errored () { + return this._errored + } + get started () { + return this._started + } + get done () { + return this._done + } + + _finished: Promise + _resolve: (value?: void) => void + _reject: (reason?: any) => void + + _name: string + + _fileMeta: FileMetadata + + get name () { + return this._name + } + + _size?: number + _sizeOnFS?: number + _numberOfBlocks?: number + _numberOfParts?: number + + get size () { + return this._size + } + get sizeOnFS () { + return this._sizeOnFS + } + + _metadata?: FileMeta + _accountSystem? : AccountSystem | any + + + _netQueue?: OQ + _decryptQueue?: OQ + + _output?: ReadableStream + + get output () { + return this._output + } + + _timestamps: { start?: number; end?: number; pauseDuration: number } = { + start: undefined, + end: undefined, + pauseDuration: 0, + } + + get startTime () { + return this._timestamps.start + } + get endTime () { + return this._timestamps.end + } + get pauseDuration () { + return this._timestamps.pauseDuration + } + + _beforeDownload?: (d: Downloader | any) => Promise + _afterDownload?: (d: Downloader | any) => Promise + + + constructor ({ config, handle, name, fileMeta, accountSystem }: SiaDownloadArgs) { + super() + + this.config = config + this.config.queueSize = this.config.queueSize || {} + this.config.queueSize.net = this.config.queueSize.net || 3 + this.config.queueSize.decrypt = this.config.queueSize.decrypt || blocksPerPart + + this._location[1](handle.slice(0, 32)) + this._encryptionKey[1](handle.slice(32)) + + this._name = name + + this._fileMeta = fileMeta + this._accountSystem = accountSystem + + const d = this + + const [finished, resolveFinished, rejectFinished] = extractPromise() + this._finished = finished + this._resolve = (val) => { + d._done = true + resolveFinished(val) + + this._timestamps.end = Date.now() + this.dispatchEvent( + new DownloadFinishedEvent({ + start: this._timestamps.start!, + end: this._timestamps.end, + duration: this._timestamps.end - this._timestamps.start! - this._timestamps.pauseDuration, + realDuration: this._timestamps.end - this._timestamps.start!, + }), + ) + } + this._reject = (err) => { + d._errored = true + + rejectFinished(err) + } + } + + async start (): Promise | undefined> { + if (this._cancelled || this._errored) { + return + } + + if (this._started) { + return this._output + } + + this._started = true + this._timestamps.start = Date.now() + + const ping = await this.config.net + .GET(this.config.storageNode + "", undefined, undefined, async (d) => + new TextDecoder("utf8").decode(await new Response(d).arrayBuffer()), + ) + .catch(this._reject) + + // server didn't respond + if (!ping) { + return + } + + const d = this + + if (this._beforeDownload) { + await this._beforeDownload(d) + } + + const fileHandle = this._fileMeta.private.handle + + const metadata = await this._accountSystem.getFileMetadataLocationByFileHandle(fileHandle) + if (!metadata) { + return + } + + d._size = metadata.size + d._sizeOnFS = sizeOnFS(metadata.size) + d._numberOfBlocks = 1 + d._numberOfParts = numberOfPartsOnFS(d._sizeOnFS) + + d.dispatchEvent( + new DownloadStartedEvent({ + time: this._timestamps.start, + }), + ) + + const netQueue = new OQ(this.config.queueSize!.net) + const decryptQueue = new OQ(this.config.queueSize!.decrypt) + + d._netQueue = netQueue + d._decryptQueue = decryptQueue + + let partIndex = 0 + + d._output = new ReadableStream({ + async pull (controller) { + if (d._cancelled || d._errored) { + return + } + + if (partIndex >= d._numberOfParts!) { + return + } + + netQueue.add( + partIndex, + async (partIndex) => { + if (d._cancelled || d._errored) { + return + } + + + d.dispatchEvent(new SiaDownloadPartStartedEvent({ index: partIndex })) + + const res = await d.config.net + .POST( + d.config.storageNode + "/api/v2/sia/download", + undefined, + JSON.stringify({ fileID: bytesToHex(await d.getLocation()) }), + async (b) => JSON.parse(new TextDecoder("utf8").decode(await new Response(b).arrayBuffer())).fileDownloadUrl, + ) + .catch(d._reject) + + + + if (!res || !res.data) { + return + } + + const fileData = res.data + + let l = 0 + fileData + .pipeThrough( + new TransformStream({ + // log progress + transform (chunk, controller) { + d.dispatchEvent(new SiaDownloadBlockStartedEvent({ index: 1 })) + controller.enqueue(chunk) + }, + }) as ReadableWritablePair, + ) + .pipeThrough(new Uint8ArrayChunkStream(d._sizeOnFS)) + .pipeTo( + new WritableStream({ + async write (part) { + decryptQueue.add( + 1, + async (blockIndex) => { + if (d._cancelled || d._errored) { + return + } + + const decrypted = await d.config.crypto + .decrypt(await d.getEncryptionKey(), part) + .catch(d._reject) + + if (!decrypted) { + return + } + + return decrypted + }, + async (decrypted, blockIndex) => { + if (!decrypted) { + return + } + + controller.enqueue(decrypted) + + d.dispatchEvent(new SiaDownloadBlockFinishedEvent({ index: blockIndex })) + d.dispatchEvent(new DownloadProgressEvent({ progress: blockIndex / d._numberOfBlocks! })) + }, + ) + }, + }) as WritableStream, + ) + + await decryptQueue.waitForCommit(1) + + d.dispatchEvent(new SiaDownloadPartFinishedEvent({ index: partIndex })) + }, + () => {}, + ) + }, + async start (controller) { + netQueue.add( + 1, + () => {}, + async () => { + netQueue.close() + }, + ) + + decryptQueue.add( + 1, + () => {}, + async () => { + decryptQueue.close() + }, + ) + + // the start function is blocking for pulls so this must not be awaited + Promise.all([netQueue.waitForClose(), decryptQueue.waitForClose()]).then(async () => { + if (d._afterDownload) { + await d._afterDownload(d).catch(d._reject) + } + + d._resolve() + controller.close() + }) + }, + cancel () { + d._cancelled = true + }, + }) as ReadableStream + + return d._output + } + + + async finish () { + return this._finished + } + + async cancel () { + this._cancelled = true + + if (this._output) { + this._output.cancel() + } + + this._reject() + } +} diff --git a/packages/sia/src/events.ts b/packages/sia/src/events.ts new file mode 100644 index 0000000..937ef7c --- /dev/null +++ b/packages/sia/src/events.ts @@ -0,0 +1,127 @@ +import { EventListenerOrEventListenerObject } from "@opacity/util/src/events" + +export enum SiaDownloadEvents { + BLOCK_START = "block-loaded", + BLOCK_FINISH = "block-finished", + PART_START = "part-loaded", + PART_FINISH = "part-finished", +} + +type SiaDownloadBlockStartedEventData = { index: number } +export class SiaDownloadBlockStartedEvent extends CustomEvent { + constructor (data: SiaDownloadBlockStartedEventData) { + super(SiaDownloadEvents.BLOCK_START, { detail: data }) + } +} +type SiaDownloadBlockFinishedEventData = { index: number } +export class SiaDownloadBlockFinishedEvent extends CustomEvent { + constructor (data: SiaDownloadBlockFinishedEventData) { + super(SiaDownloadEvents.BLOCK_FINISH, { detail: data }) + } +} + +type SiaDownloadPartStartedEventData = { index: number } +export class SiaDownloadPartStartedEvent extends CustomEvent { + constructor (data: SiaDownloadPartStartedEventData) { + super(SiaDownloadEvents.PART_START, { detail: data }) + } +} +type SiaDownloadPartFinishedEventData = { index: number } +export class SiaDownloadPartFinishedEvent extends CustomEvent { + constructor (data: SiaDownloadPartFinishedEventData) { + super(SiaDownloadEvents.PART_FINISH, { detail: data }) + } +} + +export interface ISiaDownloadEvents { + addEventListener( + type: SiaDownloadEvents, + listener: EventListener | EventListenerObject | null, + options?: boolean | AddEventListenerOptions | undefined, + ): void + + addEventListener( + type: SiaDownloadEvents.BLOCK_START, + listener: EventListenerOrEventListenerObject | null, + options?: boolean | AddEventListenerOptions | undefined, + ): void + addEventListener( + type: SiaDownloadEvents.BLOCK_FINISH, + listener: EventListenerOrEventListenerObject | null, + options?: boolean | AddEventListenerOptions | undefined, + ): void + + addEventListener( + type: SiaDownloadEvents.PART_START, + listener: EventListenerOrEventListenerObject | null, + options?: boolean | AddEventListenerOptions | undefined, + ): void + addEventListener( + type: SiaDownloadEvents.PART_FINISH, + listener: EventListenerOrEventListenerObject | null, + options?: boolean | AddEventListenerOptions | undefined, + ): void +} + +export enum SiaUploadEvents { + BLOCK_START = "block-loaded", + BLOCK_FINISH = "block-finished", + PART_START = "part-loaded", + PART_FINISH = "part-finished", +} + +type SiaUploadBlockStartedEventData = { index: number } +export class SiaUploadBlockStartedEvent extends CustomEvent { + constructor (data: SiaUploadBlockStartedEventData) { + super(SiaUploadEvents.BLOCK_START, { detail: data }) + } +} +type SiaUploadBlockFinishedEventData = { index: number } +export class SiaUploadBlockFinishedEvent extends CustomEvent { + constructor (data: SiaUploadBlockFinishedEventData) { + super(SiaUploadEvents.BLOCK_FINISH, { detail: data }) + } +} + +type SiaUploadPartStartedEventData = { index: number } +export class SiaUploadPartStartedEvent extends CustomEvent { + constructor (data: SiaUploadPartStartedEventData) { + super(SiaUploadEvents.PART_START, { detail: data }) + } +} +type SiaUploadPartFinishedEventData = { index: number } +export class SiaUploadPartFinishedEvent extends CustomEvent { + constructor (data: SiaUploadPartFinishedEventData) { + super(SiaUploadEvents.PART_FINISH, { detail: data }) + } +} + +export interface ISiaUploadEvents { + addEventListener( + type: SiaUploadEvents, + listener: EventListener | EventListenerObject | null, + options?: boolean | AddEventListenerOptions | undefined, + ): void + + addEventListener( + type: SiaUploadEvents.BLOCK_START, + listener: EventListenerOrEventListenerObject | null, + options?: boolean | AddEventListenerOptions | undefined, + ): void + addEventListener( + type: SiaUploadEvents.BLOCK_FINISH, + listener: EventListenerOrEventListenerObject | null, + options?: boolean | AddEventListenerOptions | undefined, + ): void + + addEventListener( + type: SiaUploadEvents.PART_START, + listener: EventListenerOrEventListenerObject | null, + options?: boolean | AddEventListenerOptions | undefined, + ): void + addEventListener( + type: SiaUploadEvents.PART_FINISH, + listener: EventListenerOrEventListenerObject | null, + options?: boolean | AddEventListenerOptions | undefined, + ): void +} diff --git a/packages/sia/src/index.html b/packages/sia/src/index.html new file mode 100755 index 0000000..06c4407 --- /dev/null +++ b/packages/sia/src/index.html @@ -0,0 +1,12 @@ + + + + Foo + $css + + + +
+ $bundles + + diff --git a/packages/sia/src/index.ts b/packages/sia/src/index.ts new file mode 100755 index 0000000..4dbf01e --- /dev/null +++ b/packages/sia/src/index.ts @@ -0,0 +1,17 @@ +export { Sia, SiaArgs, SiaConfig } from "./download" +export { SiaUpload, SiaUploadArgs, SiaUploadConfig } from "./upload" + +export { + ISiaDownloadEvents, + ISiaUploadEvents, + SiaDownloadBlockFinishedEvent, + SiaDownloadBlockStartedEvent, + SiaDownloadEvents, + SiaDownloadPartFinishedEvent, + SiaDownloadPartStartedEvent, + SiaUploadBlockFinishedEvent, + SiaUploadBlockStartedEvent, + SiaUploadEvents, + SiaUploadPartFinishedEvent, + SiaUploadPartStartedEvent, +} from "./events" diff --git a/packages/sia/src/upload.ts b/packages/sia/src/upload.ts new file mode 100755 index 0000000..cab75d1 --- /dev/null +++ b/packages/sia/src/upload.ts @@ -0,0 +1,379 @@ +import { Mutex } from "async-mutex"; + +import { numberOfBlocks, sizeOnFS } from "@opacity/util/src/blocks"; +import { bytesToHex } from "@opacity/util/src/hex"; +import { CryptoMiddleware, NetworkMiddleware } from "@opacity/middleware"; +import { extractPromise } from "@opacity/util/src/promise"; +import { FileMeta } from "@opacity/filesystem-access/src/filemeta"; +import { getPayload, getPayloadFD } from "@opacity/util/src/payload"; +import { ISiaUploadEvents, SiaUploadPartStartedEvent } from "./events"; +import { IUploadEvents, UploadFinishedEvent, UploadMetadataEvent, UploadStartedEvent } from "@opacity/filesystem-access/src/events"; +import { numberOfPartsOnFS } from "@opacity/util/src/parts"; +import { OQ } from "@opacity/util/src/oqueue"; +import { Retry } from "@opacity/util/src/retry"; +import { TransformStream, WritableStream, Uint8ArrayChunkStream } from "@opacity/util/src/streams"; +import { Uploader } from "@opacity/filesystem-access/src/uploader"; + +export type SiaUploadConfig = { + storageNode: string; + + crypto: CryptoMiddleware; + net: NetworkMiddleware; + + queueSize?: { + encrypt?: number; + net?: number; + }; +}; + +export type SiaUploadArgs = { + config: SiaUploadConfig; + path: string; + name: string; + meta: FileMeta; +}; + +type UploadInitPayload = { + fileHandle: string; + fileSizeInByte: number; +}; + +type UploadInitExtraPayload = { + metadata: Uint8Array; +}; + +type UploadPayload = { + fileHandle: string; +}; + +type UploadExtraPayload = { + fileData: Uint8Array; +}; + +type UploadStatusPayload = { + fileHandle: string; +}; + +export class SiaUpload extends EventTarget implements Uploader, IUploadEvents, ISiaUploadEvents { + readonly public = false; + + config: SiaUploadConfig; + + _m = new Mutex(); + + _location?: Uint8Array; + _encryptionKey?: Uint8Array; + + _locationExtractedPromise = extractPromise(); + _encryptionKeyExtractedPromise = extractPromise(); + + private async _generateKeys() { + if (this._location && this._encryptionKey) { + return; + } + + await this._m.runExclusive(async () => { + if (this._location && this._encryptionKey) { + return; + } + + this._location = await this.config.crypto.getRandomValues(32); + this._encryptionKey = await this.config.crypto.generateSymmetricKey(); + + this._locationExtractedPromise[1](this._location); + this._encryptionKeyExtractedPromise[1](this._encryptionKey); + }); + } + + async getLocation(): Promise { + await this._generateKeys(); + + return await this._locationExtractedPromise[0]; + } + + async getEncryptionKey(): Promise { + await this._generateKeys(); + + return await this._encryptionKeyExtractedPromise[0]; + } + + _cancelled = false; + _errored = false; + _started = false; + _done = false; + + get cancelled() { + return this._cancelled; + } + get errored() { + return this._errored; + } + get started() { + return this._started; + } + get done() { + return this._done; + } + + _finished: Promise; + _resolve: (value?: void) => void; + _reject: (reason?: any) => void; + + _size: number; + _sizeOnFS: number; + _numberOfBlocks: number; + _numberOfParts: number; + + get size() { + return this._size; + } + get sizeOnFS() { + return this._sizeOnFS; + } + + _name: string; + _path: string; + _metadata: FileMeta; + + get name() { + return this._name; + } + get path() { + return this._path; + } + get metadata() { + return this._metadata; + } + + _netQueue?: OQ; + _encryptQueue?: OQ; + + _output?: TransformStream; + + get output() { + return this._output; + } + + _timestamps: { start?: number; end?: number; pauseDuration: number } = { + start: undefined, + end: undefined, + pauseDuration: 0, + }; + + get startTime() { + return this._timestamps.start; + } + get endTime() { + return this._timestamps.end; + } + + _beforeUpload?: (u: Uploader | any) => Promise; + _afterUpload?: (u: Uploader | any) => Promise; + + constructor({ config, name, path, meta }: SiaUploadArgs) { + super(); + + this.config = config; + this.config.queueSize = this.config.queueSize || {}; + this.config.queueSize.encrypt = this.config.queueSize.encrypt || 3; + this.config.queueSize.net = this.config.queueSize.net || 1; + + this._name = name; + this._path = path; + this._metadata = meta; + + this._size = this._metadata.size; + this._sizeOnFS = sizeOnFS(this._size); + this._numberOfBlocks = numberOfBlocks(this._size); + this._numberOfParts = numberOfPartsOnFS(this._sizeOnFS); + const u = this; + + const [finished, resolveFinished, rejectFinished] = extractPromise(); + this._finished = finished; + this._resolve = (val) => { + u._done = true; + resolveFinished(val); + + this._timestamps.end = Date.now(); + this.dispatchEvent( + new UploadFinishedEvent({ + start: this._timestamps.start!, + end: this._timestamps.end, + duration: this._timestamps.end - this._timestamps.start! - this._timestamps.pauseDuration, + realDuration: this._timestamps.end - this._timestamps.start!, + }) + ); + }; + this._reject = (err) => { + u._errored = true; + + rejectFinished(err); + }; + } + + async start(): Promise | undefined> { + if (this._cancelled || this._errored) { + return; + } + + if (this._started) { + return this._output; + } + + this._started = true; + this._timestamps.start = Date.now(); + + const ping = await this.config.net + .GET(this.config.storageNode + "", undefined, undefined, async (d) => + new TextDecoder("utf8").decode(await new Response(d).arrayBuffer()) + ) + .catch(this._reject); + + // server didn't respond + if (!ping) { + return; + } + + this.dispatchEvent(new UploadMetadataEvent({ metadata: this._metadata })); + + const u = this; + + // if (this._beforeUpload) { + // await this._beforeUpload(u).catch(u._reject) + // } + + const encryptedMeta = await u.config.crypto.encrypt( + await u.getEncryptionKey(), + new TextEncoder().encode( + JSON.stringify({ + lastModified: u._metadata.lastModified, + size: u._metadata.size, + type: u._metadata.type, + } as FileMeta) + ) + ); + + const fd = await getPayloadFD({ + crypto: u.config.crypto, + payload: { + fileHandle: bytesToHex(await u.getLocation()), + fileSizeInByte: u._sizeOnFS, + }, + extraPayload: { + metadata: encryptedMeta, + }, + }); + + await u.config.net.POST(u.config.storageNode + "/api/v2/sia/init-upload", {}, fd).catch(u._reject); + + u.dispatchEvent( + new UploadStartedEvent({ + time: this._timestamps.start, + }) + ); + + const encryptQueue = new OQ(this.config.queueSize!.encrypt, Number.MAX_SAFE_INTEGER); + const netQueue = new OQ(this.config.queueSize!.net); + + u._encryptQueue = encryptQueue; + u._netQueue = netQueue; + let blockIndex = 0; + let partIndex = 0; + + const partCollector = new Uint8ArrayChunkStream( + u._sizeOnFS, // will be updated chunk stream based + new ByteLengthQueuingStrategy({ highWaterMark: this.config.queueSize!.net! * u._sizeOnFS + 1 }), + new ByteLengthQueuingStrategy({ highWaterMark: this.config.queueSize!.net! * u._sizeOnFS + 1 }) + ); + + u._output = new TransformStream( + { + transform(chunk, controller) { + controller.enqueue(chunk); + }, + }, + new ByteLengthQueuingStrategy({ highWaterMark: this.config.queueSize!.net! * u._sizeOnFS + 1 }) + ) as TransformStream; + + u._output.readable.pipeThrough(partCollector).pipeTo( + new WritableStream({ + async write(part) { + u.dispatchEvent(new SiaUploadPartStartedEvent({ index: partIndex })); + + const res = await new Retry( + async () => { + const fd = await getPayloadFD({ + crypto: u.config.crypto, + payload: { + fileHandle: bytesToHex(await u.getLocation()), + }, + extraPayload: { + fileData: part, + }, + }); + + return await u.config.net.POST(u.config.storageNode + "/api/v2/sia/upload", {}, fd); + }, + { + firstTimer: 500, + handler: (err) => { + console.warn(err); + + return false; + }, + } + ) + .start() + .catch(u._reject); + + if (!res) { + return; + } + }, + async close() { + await encryptQueue.waitForClose(); + }, + }) as WritableStream + ); + + netQueue.add( + u._numberOfParts, + () => {}, + async () => { + const data = await getPayload({ + crypto: u.config.crypto, + payload: { + fileHandle: bytesToHex(await u.getLocation()), + }, + }); + + const res = (await u.config.net + .POST(u.config.storageNode + "/api/v2/sia/upload-status", {}, JSON.stringify(data)) + .catch(u._reject)) as void; + + console.log(res) + + netQueue.close(); + } + ); + + Promise.all([encryptQueue.waitForClose(), netQueue.waitForClose()]).then(async () => { + if (this._afterUpload) { + await this._afterUpload(u).catch(u._reject); + } + + u._resolve(); + }); + + return u._output; + } + + async finish() { + return this._finished; + } + + async cancel() { + this._cancelled = true; + this._reject(); + } +}