Skip to content

Commit def9c09

Browse files
jeswrRubenVerborgh
authored andcommitted
Add MappingIterator.
1 parent d036485 commit def9c09

File tree

7 files changed

+1218
-187
lines changed

7 files changed

+1218
-187
lines changed

asynciterator.ts

Lines changed: 117 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ export function setTaskScheduler(scheduler: TaskScheduler): void {
2525
taskScheduler = scheduler;
2626
}
2727

28+
// Returns a function that calls `fn` with `self` as `this` pointer. */
29+
function bind<T extends Function>(fn: T, self?: object): T {
30+
return self ? fn.bind(self) : fn;
31+
}
32+
2833
/**
2934
ID of the INIT state.
3035
An iterator is initializing if it is preparing main item generation.
@@ -161,7 +166,7 @@ export class AsyncIterator<T> extends EventEmitter {
161166
@param {object?} self The `this` pointer for the callback
162167
*/
163168
forEach(callback: (item: T) => void, self?: object) {
164-
this.on('data', self ? callback.bind(self) : callback);
169+
this.on('data', bind(callback, self));
165170
}
166171

167172
/**
@@ -455,8 +460,8 @@ export class AsyncIterator<T> extends EventEmitter {
455460
@param {object?} self The `this` pointer for the mapping function
456461
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
457462
*/
458-
map<D>(map: (item: T) => D, self?: any): AsyncIterator<D> {
459-
return this.transform({ map: self ? map.bind(self) : map });
463+
map<D>(map: MapFunction<T, D>, self?: any): AsyncIterator<D> {
464+
return new MappingIterator(this, bind(map, self));
460465
}
461466

462467
/**
@@ -469,7 +474,9 @@ export class AsyncIterator<T> extends EventEmitter {
469474
filter<K extends T>(filter: (item: T) => item is K, self?: any): AsyncIterator<K>;
470475
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T>;
471476
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> {
472-
return this.transform({ filter: self ? filter.bind(self) : filter });
477+
return this.map(function (this: any, item: T) {
478+
return filter.call(self || this, item) ? item : null;
479+
});
473480
}
474481

475482
/**
@@ -510,7 +517,7 @@ export class AsyncIterator<T> extends EventEmitter {
510517
@returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items
511518
*/
512519
skip(offset: number): AsyncIterator<T> {
513-
return this.transform({ offset });
520+
return this.map(item => offset-- > 0 ? null : item);
514521
}
515522

516523
/**
@@ -777,9 +784,99 @@ export class IntegerIterator extends AsyncIterator<number> {
777784
}
778785
}
779786

787+
/**
788+
* A synchronous mapping function from one element to another.
789+
* A return value of `null` means that nothing should be emitted for a particular item.
790+
*/
791+
export type MapFunction<S, D = S> = (item: S) => D | null;
792+
793+
/** Function that maps an element to itself. */
794+
export function identity<S>(item: S): typeof item {
795+
return item;
796+
}
797+
798+
799+
/**
800+
An iterator that synchronously transforms every item from its source
801+
by applying a mapping function.
802+
@extends module:asynciterator.AsyncIterator
803+
*/
804+
export class MappingIterator<S, D = S> extends AsyncIterator<D> {
805+
protected readonly _map: MapFunction<S, D>;
806+
protected readonly _source: InternalSource<S>;
807+
protected readonly _destroySource: boolean;
808+
809+
/**
810+
* Applies the given mapping to the source iterator.
811+
*/
812+
constructor(
813+
source: AsyncIterator<S>,
814+
map: MapFunction<S, D> = identity as MapFunction<S, D>,
815+
options: SourcedIteratorOptions = {}
816+
) {
817+
super();
818+
this._map = map;
819+
this._source = ensureSourceAvailable(source);
820+
this._destroySource = options.destroySource !== false;
821+
822+
// Close if the source is already empty
823+
if (source.done) {
824+
this.close();
825+
}
826+
// Otherwise, wire up the source for reading
827+
else {
828+
this._source._destination = this;
829+
this._source.on('end', destinationClose);
830+
this._source.on('error', destinationEmitError);
831+
this._source.on('readable', destinationSetReadable);
832+
this.readable = this._source.readable;
833+
}
834+
}
835+
836+
/* Tries to read the next item from the iterator. */
837+
read(): D | null {
838+
if (!this.done) {
839+
// Try to read an item that maps to a non-null value
840+
if (this._source.readable) {
841+
let item: S | null, mapped: D | null;
842+
while ((item = this._source.read()) !== null) {
843+
if ((mapped = this._map(item)) !== null)
844+
return mapped;
845+
}
846+
}
847+
this.readable = false;
848+
849+
// Close this iterator if the source is empty
850+
if (this._source.done)
851+
this.close();
852+
}
853+
return null;
854+
}
855+
856+
/* Cleans up the source iterator and ends. */
857+
protected _end(destroy: boolean) {
858+
this._source.removeListener('end', destinationClose);
859+
this._source.removeListener('error', destinationEmitError);
860+
this._source.removeListener('readable', destinationSetReadable);
861+
delete this._source._destination;
862+
if (this._destroySource)
863+
this._source.destroy();
864+
super._end(destroy);
865+
}
866+
}
867+
868+
// Validates an AsyncIterator for use as a source within another AsyncIterator
869+
function ensureSourceAvailable<S>(source?: AsyncIterator<S>, allowDestination = false) {
870+
if (!source || !isFunction(source.read) || !isFunction(source.on))
871+
throw new Error(`Invalid source: ${source}`);
872+
if (!allowDestination && (source as any)._destination)
873+
throw new Error('The source already has a destination');
874+
return source as InternalSource<S>;
875+
}
876+
780877

781878
/**
782-
A iterator that maintains an internal buffer of items.
879+
An iterator that maintains an internal buffer of items.
783880
This class serves as a base class for other iterators
784881
with a typically complex item generation process.
785882
@extends module:asynciterator.AsyncIterator
@@ -797,7 +894,7 @@ export class BufferedIterator<T> extends AsyncIterator<T> {
797894
@param {integer} [options.maxBufferSize=4] The number of items to preload in the internal buffer
798895
@param {boolean} [options.autoStart=true] Whether buffering starts directly after construction
799896
*/
800-
constructor({ maxBufferSize = 4, autoStart = true } = {}) {
897+
constructor({ maxBufferSize = 4, autoStart = true }: BufferedIteratorOptions = {}) {
801898
super(INIT);
802899
this.maxBufferSize = maxBufferSize;
803900
taskScheduler(() => this._init(autoStart));
@@ -1150,14 +1247,10 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
11501247
@param {object} source The source to validate
11511248
@param {boolean} allowDestination Whether the source can already have a destination
11521249
*/
1153-
protected _validateSource(source?: AsyncIterator<S>, allowDestination = false) {
1250+
protected _validateSource(source?: AsyncIterator<S>, allowDestination = false): InternalSource<S> {
11541251
if (this._source || typeof this._createSource !== 'undefined')
11551252
throw new Error('The source cannot be changed after it has been set');
1156-
if (!source || !isFunction(source.read) || !isFunction(source.on))
1157-
throw new Error(`Invalid source: ${source}`);
1158-
if (!allowDestination && (source as any)._destination)
1159-
throw new Error('The source already has a destination');
1160-
return source as InternalSource<S>;
1253+
return ensureSourceAvailable(source, allowDestination);
11611254
}
11621255

11631256
/**
@@ -1240,9 +1333,15 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
12401333
}
12411334
}
12421335

1336+
function destinationSetReadable<S>(this: InternalSource<S>) {
1337+
this._destination!.readable = true;
1338+
}
12431339
function destinationEmitError<S>(this: InternalSource<S>, error: Error) {
12441340
this._destination!.emit('error', error);
12451341
}
1342+
function destinationClose<S>(this: InternalSource<S>) {
1343+
this._destination!.close();
1344+
}
12461345
function destinationCloseWhenDone<S>(this: InternalSource<S>) {
12471346
(this._destination as any)._closeWhenDone();
12481347
}
@@ -1956,15 +2055,18 @@ function isSourceExpression<T>(object: any): object is SourceExpression<T> {
19562055
return object && (isEventEmitter(object) || isPromise(object) || isFunction(object));
19572056
}
19582057

2058+
export interface SourcedIteratorOptions {
2059+
destroySource?: boolean;
2060+
}
2061+
19592062
export interface BufferedIteratorOptions {
19602063
maxBufferSize?: number;
19612064
autoStart?: boolean;
19622065
}
19632066

1964-
export interface TransformIteratorOptions<S> extends BufferedIteratorOptions {
2067+
export interface TransformIteratorOptions<S> extends SourcedIteratorOptions, BufferedIteratorOptions {
19652068
source?: SourceExpression<S>;
19662069
optional?: boolean;
1967-
destroySource?: boolean;
19682070
}
19692071

19702072
export interface TransformOptions<S, D> extends TransformIteratorOptions<S> {

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
"test:microtask": "npm run mocha",
3232
"test:immediate": "npm run mocha -- --require test/config/useSetImmediate.js",
3333
"mocha": "c8 mocha",
34-
"lint": "eslint asynciterator.ts test",
34+
"lint": "eslint asynciterator.ts test perf",
3535
"docs": "npm run build:module && npm run jsdoc",
3636
"jsdoc": "jsdoc -c jsdoc.json"
3737
},

perf/.eslintrc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
rules: {
3+
no-console: off,
4+
},
5+
}

perf/MappingIterator-perf.js

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { ArrayIterator, range } from '../dist/asynciterator.js';
2+
3+
function noop() {
4+
// empty function to drain an iterator
5+
}
6+
7+
async function perf(warmupIterator, iterator, description) {
8+
return new Promise(res => {
9+
const now = performance.now();
10+
iterator.on('data', noop);
11+
iterator.on('end', () => {
12+
console.log(description, performance.now() - now);
13+
res();
14+
});
15+
});
16+
}
17+
18+
function run(iterator) {
19+
return new Promise(res => {
20+
iterator.on('data', noop);
21+
iterator.on('end', () => {
22+
res();
23+
});
24+
});
25+
}
26+
27+
function baseIterator() {
28+
return new ArrayIterator(new Array(20_000_000).fill(true).map((_, i) => i));
29+
}
30+
31+
function createMapped(filter) {
32+
let iterator = baseIterator();
33+
for (let j = 0; j < 20; j++) {
34+
iterator = iterator.map(item => item);
35+
if (filter)
36+
iterator = iterator.filter(item => item % (j + 2) === 0);
37+
}
38+
return iterator;
39+
}
40+
41+
(async () => {
42+
await run(baseIterator()); // warm-up run
43+
44+
await perf(baseIterator(), createMapped(), '20,000,000 elems 20 maps\t\t\t\t\t');
45+
await perf(createMapped(true), createMapped(true), '20,000,000 elems 20 maps 20 filter\t\t\t');
46+
47+
const now = performance.now();
48+
for (let j = 0; j < 100_000; j++) {
49+
let it = range(1, 100);
50+
for (let k = 0; k < 5; k++)
51+
it = it.map(item => item);
52+
53+
await new Promise((resolve, reject) => {
54+
it.on('data', () => null);
55+
it.on('end', resolve);
56+
it.on('error', reject);
57+
});
58+
}
59+
console.log('100,000 iterators each with 5 maps and 100 elements\t', performance.now() - now);
60+
})();

test/.eslintrc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,3 @@
1919
callback-return: 0, // For testing incorrect usage
2020
},
2121
}
22-

0 commit comments

Comments
 (0)