From 931d69f38138a8ad4690c423989baf7864dc52d9 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Thu, 7 Apr 2022 20:52:55 +0200 Subject: [PATCH 1/5] Add WrappingIterator for faster wrapping. --- .eslintrc | 7 +- asynciterator.ts | 199 +++++++++++++--- test/ArrayIterator-test.js | 18 ++ test/AsyncIterator-test.js | 24 ++ test/EmptyIterator-test.js | 18 ++ test/MappingIterator-test.js | 1 - test/TransformIterator-test.js | 2 +- test/WrappingIterator-test.js | 401 +++++++++++++++++++++++++++++++++ 8 files changed, 633 insertions(+), 37 deletions(-) create mode 100644 test/WrappingIterator-test.js diff --git a/.eslintrc b/.eslintrc index 45f5ccaa..b81247da 100644 --- a/.eslintrc +++ b/.eslintrc @@ -20,7 +20,7 @@ rules: { // Best Practices - accessor-pairs: error, + accessor-pairs: off, array-callback-return: error, block-scoped-var: error, class-methods-use-this: off, @@ -66,9 +66,10 @@ no-octal-escape: error, no-param-reassign: off, no-proto: error, - no-redeclare: error, + no-redeclare: off, + "@typescript-eslint/no-redeclare": ["error"], no-restricted-properties: error, - no-return-assign: error, + no-return-assign: off, no-return-await: error, no-script-url: error, no-self-assign: error, diff --git a/asynciterator.ts b/asynciterator.ts index 50a429ea..8e6b8d22 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -25,10 +25,6 @@ export function setTaskScheduler(scheduler: TaskScheduler): void { taskScheduler = scheduler; } -// Returns a function that calls `fn` with `self` as `this` pointer. */ -function bind(fn: T, self?: object): T { - return self ? fn.bind(self) : fn; -} /** ID of the INIT state. @@ -868,7 +864,7 @@ export class MappingIterator extends AsyncIterator { // Validates an AsyncIterator for use as a source within another AsyncIterator function ensureSourceAvailable(source?: AsyncIterator, allowDestination = false) { if (!source || !isFunction(source.read) || !isFunction(source.on)) - throw new Error(`Invalid source: ${source}`); + throw new TypeError(`Invalid source: ${source}`); if (!allowDestination && (source as any)._destination) throw new Error('The source already has a destination'); return source as InternalSource; @@ -1175,8 +1171,7 @@ export class TransformIterator extends BufferedIterator { @param {module:asynciterator.AsyncIterator} [options.source] The source this iterator generates items from */ constructor(source?: SourceExpression, - options: TransformIteratorOptions = - source as TransformIteratorOptions || {}) { + options: TransformIteratorOptions = source as TransformIteratorOptions || {}) { super(options); // Shift parameters if needed @@ -1981,23 +1976,125 @@ class HistoryReader { } } +/** + * An iterator that takes a variety of iterable objects as a source. + */ +export class WrappingIterator extends AsyncIterator { + protected _source: InternalSource | null = null; + + constructor(source: MaybePromise>) { + super(); + if (!isPromise(source)) + this.source = source as any; + else + source.then(s => this.source = s as any).catch(e => this.emit('error', e)); + } + + protected set source(source: InternalSource) { + // Process an iterable source + if (isIterable(source)) + source = source[Symbol.iterator]() as any; + // Process an iterator source + if (isIterator(source)) { + let iterator: Iterator | null = source; + source = new EventEmitter() as any; + source.read = (): T | null => { + if (iterator !== null) { + const item = iterator.next(); + if (!item.done) + return item.value; + // No remaining values, so stop iterating + iterator = null; + this.close(); + } + return null; + }; + } + // Process any other readable source + else { + source = ensureSourceAvailable(source); + } + + // Set up event handling + source._destination = this; + source.on('end', destinationClose); + source.on('error', destinationEmitError); + source.on('readable', destinationSetReadable); + + // Enable reading from source + this._source = source; + this.readable = true; + } + + read(): T | null { + if (this._source !== null) { + const item = this._source.read(); + if (item !== null) + return item; + this.readable = false; + } + return null; + } + + protected _end(destroy: boolean = false) { + super._end(destroy); + // Clean up event handlers + if (this._source !== null) { + this._source.removeListener('end', destinationClose); + this._source.removeListener('error', destinationEmitError); + this._source.removeListener('readable', destinationSetReadable); + delete this._source._destination; + this._source = null; + } + } +} + + /** Creates an iterator that wraps around a given iterator or readable stream. Use this to convert an iterator-like object into a full-featured AsyncIterator. After this operation, only read the returned iterator instead of the given one. @function - @param {module:asynciterator.AsyncIterator|Readable} [source] The source this iterator generates items from + @param [source] The source this iterator generates items from @param {object} [options] Settings of the iterator @returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator */ -export function wrap(source: EventEmitter | Promise, options?: TransformIteratorOptions) { - return new TransformIterator(source as AsyncIterator | Promise>, options); +export function wrap(source: null | undefined): AsyncIterator; +export function wrap(source: MaybePromise>): AsyncIterator; +export function wrap(source: MaybePromise>, + options?: TransformIteratorOptions): AsyncIterator; +export function wrap(source?: MaybePromise> | null, + options?: TransformIteratorOptions,): AsyncIterator { + // TransformIterator if TransformIteratorOptions were specified + if (options) + return new TransformIterator(source as MaybePromise>, options); + + // Empty iterator if no source specified + if (!source) + return empty(); + + // Unwrap promised sources + if (isPromise(source)) + return new WrappingIterator(source); + + // Directly return any AsyncIterator + if (source instanceof AsyncIterator) + return source; + + // Other iterable objects + if (Array.isArray(source)) + return fromArray(source); + if (isIterable(source) || isIterator(source) || isEventEmitter(source)) + return new WrappingIterator(source); + + // Other types are unsupported + throw new TypeError(`Invalid source: ${source}`); } /** Creates an empty iterator. */ -export function empty() { +export function empty(): AsyncIterator { return new EmptyIterator(); } @@ -2005,7 +2102,7 @@ export function empty() { Creates an iterator with a single item. @param {object} item the item */ -export function single(item: T) { +export function single(item: T): AsyncIterator { return new SingletonIterator(item); } @@ -2013,10 +2110,26 @@ export function single(item: T) { Creates an iterator for the given array. @param {Array} items the items */ -export function fromArray(items: Iterable) { +export function fromArray(items: Iterable): AsyncIterator { return new ArrayIterator(items); } +/** + Creates an iterator for the given Iterator. + @param {Iterable} source the iterator + */ +export function fromIterator(source: Iterable | Iterator): AsyncIterator { + return new WrappingIterator(source); +} + +/** + Creates an iterator for the given Iterable. + @param {Iterable} source the iterable + */ +export function fromIterable(source: Iterable | Iterator): AsyncIterator { + return new WrappingIterator(source); +} + /** Creates an iterator containing all items from the given iterators. @param {Array} items the items @@ -2035,25 +2148,12 @@ export function range(start: number, end: number, step?: number) { return new IntegerIterator({ start, end, step }); } -// Determines whether the given object is a function -function isFunction(object: any): object is Function { - return typeof object === 'function'; -} - -// Determines whether the given object is an EventEmitter -function isEventEmitter(object: any): object is EventEmitter { - return object && typeof object.on === 'function'; -} - -// Determines whether the given object is a promise -function isPromise(object: any): object is Promise { - return object && typeof object.then === 'function'; -} - -// Determines whether the given object is a source expression -function isSourceExpression(object: any): object is SourceExpression { - return object && (isEventEmitter(object) || isPromise(object) || isFunction(object)); -} +export type IterableSource = + T[] | + AsyncIterator | + EventEmitter | + Iterator | + Iterable; export interface SourcedIteratorOptions { destroySource?: boolean; @@ -2098,3 +2198,38 @@ type SourceExpression = type InternalSource = AsyncIterator & { _destination?: AsyncIterator }; + +// Returns a function that calls `fn` with `self` as `this` pointer. */ +function bind(fn: T, self?: object): T { + return self ? fn.bind(self) : fn; +} + +// Determines whether the given object is a function +export function isFunction(object: any): object is Function { + return typeof object === 'function'; +} + +// Determines whether the given object is an EventEmitter +export function isEventEmitter(object: any): object is EventEmitter { + return isFunction(object?.on); +} + +// Determines whether the given object is a promise +export function isPromise(object: any): object is Promise { + return isFunction(object?.then); +} + +// Determines whether the given object is a source expression +export function isSourceExpression(object: any): object is SourceExpression { + return object && (isEventEmitter(object) || isPromise(object) || isFunction(object)); +} + +// Determines whether the given object supports the iterable protocol +export function isIterable(object: { [key: string]: any }): object is Iterable { + return object && (Symbol.iterator in object); +} + +// Determines whether the given object supports the iterator protocol +export function isIterator(object: { [key: string]: any }): object is Iterator { + return isFunction(object?.next); +} diff --git a/test/ArrayIterator-test.js b/test/ArrayIterator-test.js index 773eb68f..f659ffc2 100644 --- a/test/ArrayIterator-test.js +++ b/test/ArrayIterator-test.js @@ -2,6 +2,7 @@ import { AsyncIterator, ArrayIterator, fromArray, + wrap, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -41,6 +42,23 @@ describe('ArrayIterator', () => { instance.should.be.an.instanceof(EventEmitter); }); }); + + describe('the result when called through `wrap`', () => { + let instance; + before(() => { instance = wrap([]); }); + + it('should be an ArrayIterator object', () => { + instance.should.be.an.instanceof(ArrayIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); }); describe('An ArrayIterator without arguments', () => { diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index 37a74d60..a062dd95 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -5,6 +5,8 @@ import { ENDED, DESTROYED, scheduleTask, + isPromise, + isIterator, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1308,3 +1310,25 @@ describe('AsyncIterator', () => { }); }); }); + +describe('Type-checking functions', () => { + describe('isPromise', () => { + it('returns false for null', () => { + expect(isPromise(null)).to.equal(false); + }); + + it('returns true for a Promise', () => { + expect(isPromise(Promise.resolve(0))).to.equal(true); + }); + }); + + describe('isIterator', () => { + it('returns false for null', () => { + expect(isIterator(null)).to.equal(false); + }); + + it('returns true for an iterator', () => { + expect(isIterator([][Symbol.iterator]())).to.equal(true); + }); + }); +}); diff --git a/test/EmptyIterator-test.js b/test/EmptyIterator-test.js index caf21ab4..9810f30a 100644 --- a/test/EmptyIterator-test.js +++ b/test/EmptyIterator-test.js @@ -2,6 +2,7 @@ import { AsyncIterator, EmptyIterator, empty, + wrap, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -41,6 +42,23 @@ describe('EmptyIterator', () => { instance.should.be.an.instanceof(EventEmitter); }); }); + + describe('the result when called through `.wrap`', () => { + let instance; + before(() => { instance = wrap(); }); + + it('should be an EmptyIterator object', () => { + instance.should.be.an.instanceof(EmptyIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); }); describe('An EmptyIterator without arguments', () => { diff --git a/test/MappingIterator-test.js b/test/MappingIterator-test.js index ae2e4820..7d8974f3 100644 --- a/test/MappingIterator-test.js +++ b/test/MappingIterator-test.js @@ -542,7 +542,6 @@ describe('MappingIterator', () => { describe('A chain of maps and filters', () => { for (const iteratorGen of [() => range(0, 2), () => fromArray([0, 1, 2]), () => wrap(range(0, 2))]) { - // eslint-disable-next-line no-loop-func describe(`with ${iteratorGen()}`, () => { let iterator; diff --git a/test/TransformIterator-test.js b/test/TransformIterator-test.js index e5d8116d..5ad99a0b 100644 --- a/test/TransformIterator-test.js +++ b/test/TransformIterator-test.js @@ -35,7 +35,7 @@ describe('TransformIterator', () => { describe('the result when called through `wrap`', () => { let instance; - before(() => { instance = wrap(); }); + before(() => { instance = wrap({}, {}); }); it('should be an TransformIterator object', () => { instance.should.be.an.instanceof(TransformIterator); diff --git a/test/WrappingIterator-test.js b/test/WrappingIterator-test.js new file mode 100644 index 00000000..1f5b3207 --- /dev/null +++ b/test/WrappingIterator-test.js @@ -0,0 +1,401 @@ +import { + AsyncIterator, + ArrayIterator, + WrappingIterator, + TransformIterator, + EmptyIterator, + IntegerIterator, + fromIterable, + fromIterator, + wrap, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; +import { Readable } from 'stream'; + +describe('WrappingIterator', () => { + describe('The WrappingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { instance = new WrappingIterator(new EmptyIterator()); }); + + it('should be an WrappingIterator object', () => { + instance.should.be.an.instanceof(WrappingIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('with an empty iterable', () => { + let iterator; + before(() => { iterator = fromIterable((function * () { /* empty */ })()); }); + + it('should be readable', () => { + expect(iterator.readable).to.be.true; + }); + + it('should end after the first invocation of read, which should return null', done => { + expect(iterator.once('end', done).read()).to.equal(null); + }); + + it('should not be readable anymore', () => { + expect(iterator.readable).to.equal(false); + }); + }); + + describe('with an iterable that emits one item', () => { + let iterator; + before(() => { iterator = fromIterable((function * () { yield 'first'; })()); }); + + it('should be readable', () => { + expect(iterator.readable).to.be.true; + }); + + it('should read the first item', () => { + expect(iterator.read()).to.equal('first'); + }); + + it('should end after the second invocation of read, which should return null', done => { + expect(iterator.on('end', done).read()).to.equal(null); + }); + + it('should not be readable anymore', () => { + expect(iterator.readable).to.be.false; + }); + }); + + describe('with an iterable that emits 10 items', () => { + let iterator; + beforeEach(() => { + iterator = new WrappingIterator((function * () { + for (let i = 0; i < 10; i += 1) + yield i; + })()); + }); + + it('should emit all items', async () => { + const arr = await iterator.toArray(); + expect(arr).to.deep.equal([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + }); + }); + + describe('with an iterator that emits one item', () => { + let iterator; + before(() => { + let done = false; + iterator = fromIterator({ + next: () => { + if (done) + return { done }; + done = true; + return { value: 'first' }; + }, + }); + }); + + it('should be readable', () => { + expect(iterator.readable).to.be.true; + }); + + it('should read the first item', () => { + expect(iterator.read()).to.equal('first'); + }); + + it('should end after the second invocation of read, which should return null', done => { + expect(iterator.on('end', done).read()).to.equal(null); + }); + + it('should not be readable anymore', () => { + expect(iterator.readable).to.be.false; + }); + }); + + describe('with an array source', () => { + let iterator, source; + before(() => { + source = [0, 1, 2, 3, 4]; + iterator = new WrappingIterator(source); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with an array source', () => { + let iterator, source; + before(() => { + source = Promise.resolve([0, 1, 2, 3, 4]); + iterator = new WrappingIterator(source); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with a stream.Readable source', () => { + let iterator, source; + before(() => { + source = Readable.from([0, 1, 2, 3, 4]); + iterator = new WrappingIterator(source); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with a promisified stream.Readable source', () => { + let iterator, source; + before(() => { + source = Promise.resolve(Readable.from([0, 1, 2, 3, 4])); + iterator = new WrappingIterator(source); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with an AsyncIterator source', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4]); + iterator = new WrappingIterator(source); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with a promisified AsyncIterator source', () => { + let iterator, source; + before(() => { + source = Promise.resolve(new ArrayIterator([0, 1, 2, 3, 4])); + iterator = new WrappingIterator(source); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with an EmptyIterator source that never emits a readable event', () => { + let iterator; + before(() => { + iterator = new WrappingIterator(new EmptyIterator()); + captureEvents(iterator, 'readable', 'end'); + }); + + it('should have emitted the end event', () => { + expect(iterator._eventCounts.end).to.equal(1); + }); + }); + + describe('with a promisified IntegerIterator', () => { + let iterator, source; + before(() => { + source = Promise.resolve(new IntegerIterator({ start: 0, step: 1, end: 4 })); + iterator = new WrappingIterator(source); + captureEvents(iterator, 'readable'); + }); + + it('should have emitted the readable event', () => { + expect(iterator._eventCounts.readable).to.be.gt(0); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with a buffering AsyncIterator source with autoStart: false', () => { + let iterator, source; + before(() => { + source = new TransformIterator(new ArrayIterator([0, 1, 2, 3, 4]), { autoStart: false }); + iterator = new WrappingIterator(source); + captureEvents(iterator, 'readable'); + }); + + it('should have emitted the readable event', () => { + expect(iterator._eventCounts.readable).to.be.gt(0); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with a buffering AsyncIterator source with autoStart: true', () => { + let iterator; + let source; + + before(() => { + source = new TransformIterator(new ArrayIterator([0, 1, 2, 3, 4]), { autoStart: true }); + iterator = new WrappingIterator(source); + captureEvents(iterator, 'readable'); + }); + + it('should have emitted the readable event', () => { + expect(iterator._eventCounts.readable).to.be.gt(0); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with a rejecting promise as a source', () => { + it('should emit the error', done => { + const error = new Error('some error'); + const iterator = new WrappingIterator(Promise.reject(error)); + iterator.once('error', e => { + expect(e).to.equal(error); + done(); + }); + }); + }); + + describe('with a stream.Readable source that emits an error', () => { + it('should relay the error downstream', done => { + const error = new Error('some error'); + const source = new Readable({ + read() { + return null; + }, + }); + const iterator = new WrappingIterator(source); + iterator.once('error', e => { + expect(e).to.equal(error); + done(); + }); + source.emit('error', error); + }); + }); +}); + +describe('wrap', () => { + describe('with a stream.Readable source', () => { + let iterator; + let source; + before(() => { + source = Readable.from([0, 1, 2, 3, 4]); + iterator = wrap(source); + }); + + it('should return an instance of WrappingIterator', async () => { + expect(iterator).to.be.instanceof(WrappingIterator); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with a promisified stream.Readable source', () => { + let iterator, source; + before(() => { + source = Promise.resolve(Readable.from([0, 1, 2, 3, 4])); + iterator = wrap(source); + }); + + it('should return an instance of WrappingIterator', async () => { + expect(iterator).to.be.instanceof(WrappingIterator); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with an AsyncIterator source', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4]); + iterator = wrap(source); + }); + + it('should return the source itself', () => { + expect(iterator).to.equal(source); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with a rejecting promise as a source', () => { + it('should return an instance of WrappingIterator', done => { + const err = new Error('some error'); + const iterator = wrap(Promise.reject(err)); + expect(iterator).to.be.instanceof(WrappingIterator); + iterator.once('error', _err => { + expect(_err).to.equal(err); + done(); + }); + }); + }); + + describe('with an invalid source', () => { + it('should throw an error', done => { + try { + wrap({}); + } + catch (err) { + expect(err.message).to.match(/^Invalid source/); + done(); + } + }); + }); + + describe('with an instance of Iterator as the source', () => { + let source; + let iterator; + before(() => { + source = (function * () { + for (let i = 0; i < 5; i += 1) + yield i; + }()); + iterator = wrap(source); + }); + + it('should return an instance of WrappingIterator', () => { + expect(iterator).to.be.instanceof(WrappingIterator); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with an instance of Iterable as the source', () => { + let source; + let iterator; + before(() => { + source = { + *[Symbol.iterator]() { + for (let i = 0; i < 5; i += 1) + yield i; + }, + }; + iterator = wrap(source); + }); + + it('should return an instance of WrappingIterator', () => { + expect(iterator).to.be.instanceof(WrappingIterator); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); +}); From 147e01732131ff4bb6ee3a25a7ae615cb4a3174f Mon Sep 17 00:00:00 2001 From: Ruben Verborgh Date: Sat, 30 Jul 2022 19:54:37 +0100 Subject: [PATCH 2/5] Guard against null values in iterators. --- asynciterator.ts | 9 ++++++--- test/WrappingIterator-test.js | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 8e6b8d22..3732c32f 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2000,9 +2000,12 @@ export class WrappingIterator extends AsyncIterator { source = new EventEmitter() as any; source.read = (): T | null => { if (iterator !== null) { - const item = iterator.next(); - if (!item.done) - return item.value; + // Skip any null values inside of the iterator + let next: IteratorResult; + while (!(next = iterator.next()).done) { + if (next.value !== null) + return next.value; + } // No remaining values, so stop iterating iterator = null; this.close(); diff --git a/test/WrappingIterator-test.js b/test/WrappingIterator-test.js index 1f5b3207..ab9db44b 100644 --- a/test/WrappingIterator-test.js +++ b/test/WrappingIterator-test.js @@ -86,6 +86,25 @@ describe('WrappingIterator', () => { }); }); + describe('with an iterable that emits null values', () => { + let iterator; + beforeEach(() => { + iterator = new WrappingIterator((function * () { + yield 0; + yield null; + yield null; + yield 1; + yield null; + yield null; + })()); + }); + + it('should skip null values', async () => { + const arr = await iterator.toArray(); + expect(arr).to.deep.equal([0, 1]); + }); + }); + describe('with an iterator that emits one item', () => { let iterator; before(() => { From 174d87cbef4d6415c4f2238d75c9d30de402a5de Mon Sep 17 00:00:00 2001 From: Ruben Verborgh Date: Sat, 30 Jul 2022 20:17:37 +0100 Subject: [PATCH 3/5] Do not change sources if the iterator is done. --- asynciterator.ts | 14 +++++++++++++- test/ClonedIterator-test.js | 4 ++++ test/WrappingIterator-test.js | 28 ++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/asynciterator.ts b/asynciterator.ts index 3732c32f..c349da87 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -320,7 +320,7 @@ export class AsyncIterator extends EventEmitter { const items: T[] = []; const limit = typeof options?.limit === 'number' ? options.limit : Infinity; - return limit <= 0 ? Promise.resolve(items) : new Promise((resolve, reject) => { + return this.ended || limit <= 0 ? Promise.resolve(items) : new Promise((resolve, reject) => { // Collect and return all items up to the limit const resolveItems = () => resolve(items); const pushItem = (item: T) => { @@ -1203,6 +1203,10 @@ export class TransformIterator extends BufferedIterator { } set source(value: AsyncIterator | undefined) { + // Do not change sources if the iterator is already done + if (this.done) + return; + // Validate and set source const source = this._source = this._validateSource(value); source._destination = this; @@ -1784,6 +1788,10 @@ export class ClonedIterator extends TransformIterator { } set source(value: AsyncIterator | undefined) { + // Do not change sources if the iterator is already done + if (this.done) + return; + // Validate and set the source const source = this._source = this._validateSource(value); // Create a history reader for the source if none already existed @@ -1991,6 +1999,10 @@ export class WrappingIterator extends AsyncIterator { } protected set source(source: InternalSource) { + // Do not change sources if the iterator is already done + if (this.done) + return; + // Process an iterable source if (isIterable(source)) source = source[Symbol.iterator]() as any; diff --git a/test/ClonedIterator-test.js b/test/ClonedIterator-test.js index 8b97ba41..f543728f 100644 --- a/test/ClonedIterator-test.js +++ b/test/ClonedIterator-test.js @@ -83,6 +83,10 @@ describe('ClonedIterator', () => { clone.close(); }); + it('should not do anything when a source is set', () => { + clone.source = {}; + }); + it('should have undefined as `source` property', () => { expect(clone.source).to.be.undefined; }); diff --git a/test/WrappingIterator-test.js b/test/WrappingIterator-test.js index ab9db44b..be35856d 100644 --- a/test/WrappingIterator-test.js +++ b/test/WrappingIterator-test.js @@ -148,7 +148,35 @@ describe('WrappingIterator', () => { }); }); + describe('with a promise that resolves after closing', () => { + let iterator; + before(() => { + let resolve; + iterator = new WrappingIterator(new Promise(r => { + resolve = r; + })); + iterator.close(); + resolve([1, 2, 3]); + }); + + it('should not emit any items', async () => { + expect(await iterator.toArray()).to.deep.equal([]); + }); + }); + describe('with an array source', () => { + let iterator, source; + before(() => { + source = [0, 1, 2, 3, 4]; + iterator = new WrappingIterator(source); + }); + + it('should emit all items', async () => { + expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('with a promisified array source', () => { let iterator, source; before(() => { source = Promise.resolve([0, 1, 2, 3, 4]); From 803326ce188473379885c899c39d3debb2c33f1f Mon Sep 17 00:00:00 2001 From: Ruben Verborgh Date: Sat, 30 Jul 2022 20:19:28 +0100 Subject: [PATCH 4/5] Allow setting the source later. --- .eslintrc | 2 +- asynciterator.ts | 24 ++++++-- test/WrappingIterator-test.js | 109 ++++++++++++++++++++++++++++++++-- 3 files changed, 122 insertions(+), 13 deletions(-) diff --git a/.eslintrc b/.eslintrc index b81247da..f812d652 100644 --- a/.eslintrc +++ b/.eslintrc @@ -69,7 +69,7 @@ no-redeclare: off, "@typescript-eslint/no-redeclare": ["error"], no-restricted-properties: error, - no-return-assign: off, + no-return-assign: error, no-return-await: error, no-script-url: error, no-self-assign: error, diff --git a/asynciterator.ts b/asynciterator.ts index c349da87..5bc0cae3 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1990,18 +1990,30 @@ class HistoryReader { export class WrappingIterator extends AsyncIterator { protected _source: InternalSource | null = null; - constructor(source: MaybePromise>) { + constructor(source?: MaybePromise>) { super(); - if (!isPromise(source)) - this.source = source as any; - else - source.then(s => this.source = s as any).catch(e => this.emit('error', e)); + + // If promise, set up a temporary source and replace when ready + if (isPromise(source)) { + this._source = new AsyncIterator() as any; + source.then(value => { + this._source = null; + this.source = value; + }).catch(error => this.emit('error', error)); + } + // Otherwise, set the source synchronously + else if (source) { + this.source = source; + } } - protected set source(source: InternalSource) { + set source(value: IterableSource) { + let source: InternalSource = value as any; // Do not change sources if the iterator is already done if (this.done) return; + if (this._source !== null) + throw new Error('The source cannot be changed after it has been set'); // Process an iterable source if (isIterable(source)) diff --git a/test/WrappingIterator-test.js b/test/WrappingIterator-test.js index be35856d..e3f88d52 100644 --- a/test/WrappingIterator-test.js +++ b/test/WrappingIterator-test.js @@ -16,19 +16,116 @@ import { Readable } from 'stream'; describe('WrappingIterator', () => { describe('The WrappingIterator function', () => { describe('the result when called with `new`', () => { - let instance; - before(() => { instance = new WrappingIterator(new EmptyIterator()); }); + let iterator; + before(() => { iterator = new WrappingIterator(); }); - it('should be an WrappingIterator object', () => { - instance.should.be.an.instanceof(WrappingIterator); + it('should be an AsyncWrapper object', () => { + iterator.should.be.an.instanceof(WrappingIterator); }); it('should be an AsyncIterator object', () => { - instance.should.be.an.instanceof(AsyncIterator); + iterator.should.be.an.instanceof(AsyncIterator); }); it('should be an EventEmitter object', () => { - instance.should.be.an.instanceof(EventEmitter); + iterator.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('constructed without a source', () => { + let iterator, source; + before(() => { + source = new AsyncIterator(); + source.read = () => 'item'; + iterator = new WrappingIterator(); + }); + + describe('before a source is set', () => { + it('should not be readable', () => { + expect(iterator.readable).to.be.false; + }); + + it('should not have ended', () => { + expect(iterator.ended).to.be.false; + }); + + it('should return null upon read', () => { + expect(iterator.read()).to.be.null; + }); + }); + + describe('after a source is set', () => { + before(() => { + iterator.source = new ArrayIterator([1, 2, 3]); + }); + + it('should be readable', () => { + expect(iterator.readable).to.be.true; + }); + + it('should not have ended', () => { + expect(iterator.ended).to.be.false; + }); + + it('should return the first item upon read', () => { + expect(iterator.read()).to.equal(1); + }); + + it('disallows setting another source', () => { + (() => { iterator.source = new EmptyIterator(); }) + .should.throw('The source cannot be changed after it has been set'); + }); + }); + }); + + describe('constructed with a promise to a source', () => { + let iterator, resolve; + before(() => { + iterator = new WrappingIterator(new Promise(r => { + resolve = r; + })); + }); + + describe('before the promise resolves', () => { + it('should not be readable', () => { + expect(iterator.readable).to.be.false; + }); + + it('should not have ended', () => { + expect(iterator.ended).to.be.false; + }); + + it('should return null upon read', () => { + expect(iterator.read()).to.be.null; + }); + + it('disallows setting another source', () => { + (() => { iterator.source = new EmptyIterator(); }) + .should.throw('The source cannot be changed after it has been set'); + }); + }); + + describe('after the promise resolves', () => { + before(() => { + resolve(new ArrayIterator([1, 2, 3])); + }); + + it('should be readable', () => { + expect(iterator.readable).to.be.true; + }); + + it('should not have ended', () => { + expect(iterator.ended).to.be.false; + }); + + it('should return the first item upon read', () => { + expect(iterator.read()).to.equal(1); + }); + + it('disallows setting another source', () => { + (() => { iterator.source = new EmptyIterator(); }) + .should.throw('The source cannot be changed after it has been set'); }); }); }); From 093382790881ab090f222d7e6a67ed38b53f5756 Mon Sep 17 00:00:00 2001 From: Ruben Verborgh Date: Sun, 31 Jul 2022 12:02:38 +0100 Subject: [PATCH 5/5] Only access source when readable. --- asynciterator.ts | 4 +-- test/WrappingIterator-test.js | 55 +++++++++++++++++++++++++++++++---- 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 5bc0cae3..102936d4 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -2050,11 +2050,11 @@ export class WrappingIterator extends AsyncIterator { // Enable reading from source this._source = source; - this.readable = true; + this.readable = source.readable !== false; } read(): T | null { - if (this._source !== null) { + if (this._source !== null && this._source.readable !== false) { const item = this._source.read(); if (item !== null) return item; diff --git a/test/WrappingIterator-test.js b/test/WrappingIterator-test.js index e3f88d52..50a34396 100644 --- a/test/WrappingIterator-test.js +++ b/test/WrappingIterator-test.js @@ -130,6 +130,47 @@ describe('WrappingIterator', () => { }); }); + describe('with an AsyncIterator as source', () => { + let iterator, source; + before(() => { + source = new AsyncIterator(); + source.read = () => 'item'; + iterator = new WrappingIterator(source); + }); + + describe('before the source becomes readable', () => { + it('should not be readable', () => { + expect(iterator.readable).to.be.false; + }); + + it('should not have ended', () => { + expect(iterator.ended).to.be.false; + }); + + it('should return null upon read', () => { + expect(iterator.read()).to.be.null; + }); + }); + + describe('after the source becomes readable', () => { + before(() => { + source.readable = true; + }); + + it('should not be readable', () => { + expect(iterator.readable).to.be.true; + }); + + it('should not have ended', () => { + expect(iterator.ended).to.be.false; + }); + + it('should return the item upon read', () => { + expect(iterator.read()).to.equal('item'); + }); + }); + }); + describe('with an empty iterable', () => { let iterator; before(() => { iterator = fromIterable((function * () { /* empty */ })()); }); @@ -138,12 +179,13 @@ describe('WrappingIterator', () => { expect(iterator.readable).to.be.true; }); - it('should end after the first invocation of read, which should return null', done => { - expect(iterator.once('end', done).read()).to.equal(null); + it('should end after reading null', done => { + iterator.on('end', done); + expect(iterator.read()).to.be.null; }); it('should not be readable anymore', () => { - expect(iterator.readable).to.equal(false); + expect(iterator.readable).to.be.false; }); }); @@ -309,7 +351,7 @@ describe('WrappingIterator', () => { }); }); - describe('with an AsyncIterator source', () => { + describe('with an ArrayIterator source', () => { let iterator, source; before(() => { source = new ArrayIterator([0, 1, 2, 3, 4]); @@ -321,7 +363,7 @@ describe('WrappingIterator', () => { }); }); - describe('with a promisified AsyncIterator source', () => { + describe('with a promisified ArrayIterator source', () => { let iterator, source; before(() => { source = Promise.resolve(new ArrayIterator([0, 1, 2, 3, 4])); @@ -383,8 +425,9 @@ describe('WrappingIterator', () => { let iterator; let source; - before(() => { + before(done => { source = new TransformIterator(new ArrayIterator([0, 1, 2, 3, 4]), { autoStart: true }); + source.once('readable', done); iterator = new WrappingIterator(source); captureEvents(iterator, 'readable'); });