Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

rules: {
// Best Practices
accessor-pairs: error,
accessor-pairs: off,
array-callback-return: error,
block-scoped-var: error,
class-methods-use-this: off,
Expand Down Expand Up @@ -66,7 +66,8 @@
no-octal-escape: error,
no-param-reassign: off,
no-proto: error,
no-redeclare: error,
no-redeclare: off,
"@typescript-eslint/no-redeclare": ["error"],
no-restricted-properties: error,
no-return-assign: error,
no-return-await: error,
Expand Down
228 changes: 195 additions & 33 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ export function setTaskScheduler(scheduler: TaskScheduler): void {
taskScheduler = scheduler;
}

// Returns a function that calls `fn` with `self` as `this` pointer. */
function bind<T extends Function>(fn: T, self?: object): T {
return self ? fn.bind(self) : fn;
}

/**
ID of the INIT state.
Expand Down Expand Up @@ -324,7 +320,7 @@ export class AsyncIterator<T> extends EventEmitter {
const items: T[] = [];
const limit = typeof options?.limit === 'number' ? options.limit : Infinity;

return limit <= 0 ? Promise.resolve(items) : new Promise<T[]>((resolve, reject) => {
return this.ended || limit <= 0 ? Promise.resolve(items) : new Promise<T[]>((resolve, reject) => {
// Collect and return all items up to the limit
const resolveItems = () => resolve(items);
const pushItem = (item: T) => {
Expand Down Expand Up @@ -868,7 +864,7 @@ export class MappingIterator<S, D = S> extends AsyncIterator<D> {
// Validates an AsyncIterator for use as a source within another AsyncIterator
function ensureSourceAvailable<S>(source?: AsyncIterator<S>, allowDestination = false) {
if (!source || !isFunction(source.read) || !isFunction(source.on))
throw new Error(`Invalid source: ${source}`);
throw new TypeError(`Invalid source: ${source}`);
if (!allowDestination && (source as any)._destination)
throw new Error('The source already has a destination');
return source as InternalSource<S>;
Expand Down Expand Up @@ -1175,8 +1171,7 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
@param {module:asynciterator.AsyncIterator} [options.source] The source this iterator generates items from
*/
constructor(source?: SourceExpression<S>,
options: TransformIteratorOptions<S> =
source as TransformIteratorOptions<S> || {}) {
options: TransformIteratorOptions<S> = source as TransformIteratorOptions<S> || {}) {
super(options);

// Shift parameters if needed
Expand Down Expand Up @@ -1208,6 +1203,10 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
}

set source(value: AsyncIterator<S> | undefined) {
// Do not change sources if the iterator is already done
if (this.done)
return;

// Validate and set source
const source = this._source = this._validateSource(value);
source._destination = this;
Expand Down Expand Up @@ -1789,6 +1788,10 @@ export class ClonedIterator<T> extends TransformIterator<T> {
}

set source(value: AsyncIterator<T> | undefined) {
// Do not change sources if the iterator is already done
if (this.done)
return;

// Validate and set the source
const source = this._source = this._validateSource(value);
// Create a history reader for the source if none already existed
Expand Down Expand Up @@ -1981,42 +1984,179 @@ class HistoryReader<T> {
}
}

/**
* An iterator that takes a variety of iterable objects as a source.
*/
export class WrappingIterator<T> extends AsyncIterator<T> {
protected _source: InternalSource<T> | null = null;

constructor(source?: MaybePromise<IterableSource<T>>) {
super();

// If promise, set up a temporary source and replace when ready
if (isPromise(source)) {
this._source = new AsyncIterator() as any;
source.then(value => {
this._source = null;
this.source = value;
}).catch(error => this.emit('error', error));
}
// Otherwise, set the source synchronously
else if (source) {
this.source = source;
}
}

set source(value: IterableSource<T>) {
let source: InternalSource<T> = value as any;
// Do not change sources if the iterator is already done
if (this.done)
return;
if (this._source !== null)
throw new Error('The source cannot be changed after it has been set');

// Process an iterable source
if (isIterable(source))
source = source[Symbol.iterator]() as any;
// Process an iterator source
if (isIterator<T>(source)) {
let iterator: Iterator<T> | null = source;
source = new EventEmitter() as any;
source.read = (): T | null => {
if (iterator !== null) {
// Skip any null values inside of the iterator
let next: IteratorResult<T>;
while (!(next = iterator.next()).done) {
if (next.value !== null)
return next.value;
}
// No remaining values, so stop iterating
iterator = null;
this.close();
}
return null;
};
}
// Process any other readable source
else {
source = ensureSourceAvailable(source);
}

// Set up event handling
source._destination = this;
source.on('end', destinationClose);
source.on('error', destinationEmitError);
source.on('readable', destinationSetReadable);

// Enable reading from source
this._source = source;
this.readable = source.readable !== false;
}

read(): T | null {
if (this._source !== null && this._source.readable !== false) {
const item = this._source.read();
if (item !== null)
return item;
this.readable = false;
}
return null;
}

protected _end(destroy: boolean = false) {
super._end(destroy);
// Clean up event handlers
if (this._source !== null) {
this._source.removeListener('end', destinationClose);
this._source.removeListener('error', destinationEmitError);
this._source.removeListener('readable', destinationSetReadable);
delete this._source._destination;
this._source = null;
}
}
}


/**
Creates an iterator that wraps around a given iterator or readable stream.
Use this to convert an iterator-like object into a full-featured AsyncIterator.
After this operation, only read the returned iterator instead of the given one.
@function
@param {module:asynciterator.AsyncIterator|Readable} [source] The source this iterator generates items from
@param [source] The source this iterator generates items from
@param {object} [options] Settings of the iterator
@returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator
*/
export function wrap<T>(source: EventEmitter | Promise<EventEmitter>, options?: TransformIteratorOptions<T>) {
return new TransformIterator<T>(source as AsyncIterator<T> | Promise<AsyncIterator<T>>, options);
export function wrap<T>(source: null | undefined): AsyncIterator<T>;
export function wrap<T>(source: MaybePromise<IterableSource<T>>): AsyncIterator<T>;
export function wrap<T>(source: MaybePromise<AsyncIterator<T>>,
options?: TransformIteratorOptions<T>): AsyncIterator<T>;
export function wrap<T>(source?: MaybePromise<IterableSource<T>> | null,
options?: TransformIteratorOptions<T>,): AsyncIterator<T> {
// TransformIterator if TransformIteratorOptions were specified
if (options)
return new TransformIterator<T>(source as MaybePromise<AsyncIterator<T>>, options);

// Empty iterator if no source specified
if (!source)
return empty();

// Unwrap promised sources
if (isPromise<T>(source))
return new WrappingIterator(source);

// Directly return any AsyncIterator
if (source instanceof AsyncIterator)
return source;

// Other iterable objects
if (Array.isArray(source))
return fromArray<T>(source);
if (isIterable(source) || isIterator(source) || isEventEmitter(source))
return new WrappingIterator<T>(source);

// Other types are unsupported
throw new TypeError(`Invalid source: ${source}`);
}

/**
Creates an empty iterator.
*/
export function empty<T>() {
export function empty<T>(): AsyncIterator<T> {
return new EmptyIterator<T>();
}

/**
Creates an iterator with a single item.
@param {object} item the item
*/
export function single<T>(item: T) {
export function single<T>(item: T): AsyncIterator<T> {
return new SingletonIterator<T>(item);
}

/**
Creates an iterator for the given array.
@param {Array} items the items
*/
export function fromArray<T>(items: Iterable<T>) {
export function fromArray<T>(items: Iterable<T>): AsyncIterator<T> {
return new ArrayIterator<T>(items);
}

/**
Creates an iterator for the given Iterator.
@param {Iterable} source the iterator
*/
export function fromIterator<T>(source: Iterable<T> | Iterator<T>): AsyncIterator<T> {
return new WrappingIterator<T>(source);
}

/**
Creates an iterator for the given Iterable.
@param {Iterable} source the iterable
*/
export function fromIterable<T>(source: Iterable<T> | Iterator<T>): AsyncIterator<T> {
return new WrappingIterator<T>(source);
}

/**
Creates an iterator containing all items from the given iterators.
@param {Array} items the items
Expand All @@ -2035,25 +2175,12 @@ export function range(start: number, end: number, step?: number) {
return new IntegerIterator({ start, end, step });
}

// Determines whether the given object is a function
function isFunction(object: any): object is Function {
return typeof object === 'function';
}

// Determines whether the given object is an EventEmitter
function isEventEmitter(object: any): object is EventEmitter {
return object && typeof object.on === 'function';
}

// Determines whether the given object is a promise
function isPromise<T>(object: any): object is Promise<T> {
return object && typeof object.then === 'function';
}

// Determines whether the given object is a source expression
function isSourceExpression<T>(object: any): object is SourceExpression<T> {
return object && (isEventEmitter(object) || isPromise(object) || isFunction(object));
}
export type IterableSource<T> =
T[] |
AsyncIterator<T> |
EventEmitter |
Iterator<T> |
Iterable<T>;

export interface SourcedIteratorOptions {
destroySource?: boolean;
Expand Down Expand Up @@ -2098,3 +2225,38 @@ type SourceExpression<T> =

type InternalSource<T> =
AsyncIterator<T> & { _destination?: AsyncIterator<any> };

// Returns a function that calls `fn` with `self` as `this` pointer. */
function bind<T extends Function>(fn: T, self?: object): T {
return self ? fn.bind(self) : fn;
}

// Determines whether the given object is a function
export function isFunction(object: any): object is Function {
return typeof object === 'function';
}

// Determines whether the given object is an EventEmitter
export function isEventEmitter(object: any): object is EventEmitter {
return isFunction(object?.on);
}

// Determines whether the given object is a promise
export function isPromise<T>(object: any): object is Promise<T> {
return isFunction(object?.then);
}

// Determines whether the given object is a source expression
export function isSourceExpression<T>(object: any): object is SourceExpression<T> {
return object && (isEventEmitter(object) || isPromise(object) || isFunction(object));
}

// Determines whether the given object supports the iterable protocol
export function isIterable<T>(object: { [key: string]: any }): object is Iterable<T> {
return object && (Symbol.iterator in object);
}

// Determines whether the given object supports the iterator protocol
export function isIterator<T>(object: { [key: string]: any }): object is Iterator<T> {
return isFunction(object?.next);
}
18 changes: 18 additions & 0 deletions test/ArrayIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
AsyncIterator,
ArrayIterator,
fromArray,
wrap,
} from '../dist/asynciterator.js';

import { EventEmitter } from 'events';
Expand Down Expand Up @@ -41,6 +42,23 @@ describe('ArrayIterator', () => {
instance.should.be.an.instanceof(EventEmitter);
});
});

describe('the result when called through `wrap`', () => {
let instance;
before(() => { instance = wrap([]); });

it('should be an ArrayIterator object', () => {
instance.should.be.an.instanceof(ArrayIterator);
});

it('should be an AsyncIterator object', () => {
instance.should.be.an.instanceof(AsyncIterator);
});

it('should be an EventEmitter object', () => {
instance.should.be.an.instanceof(EventEmitter);
});
});
});

describe('An ArrayIterator without arguments', () => {
Expand Down
Loading