Skip to content

Commit fd69434

Browse files
committed
WIP: Review.
1 parent ba64a35 commit fd69434

File tree

7 files changed

+742
-754
lines changed

7 files changed

+742
-754
lines changed

asynciterator.ts

Lines changed: 127 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export function setTaskScheduler(scheduler: TaskScheduler): void {
2626
}
2727

2828
/** Binds a function to an object */
29-
function bind(fn: Function, self: any) {
29+
function bind(fn: Function, self?: object) {
3030
return self ? fn.bind(self) : fn;
3131
}
3232

@@ -457,13 +457,12 @@ export class AsyncIterator<T> extends EventEmitter {
457457
Maps items from this iterator using the given function.
458458
After this operation, only read the returned iterator instead of the current one.
459459
@param {Function} map A mapping function to call on this iterator's (remaining) items.
460-
A `null` value indicates that nothing should be returned for a particular item..
460+
A `null` value indicates that nothing should be returned for a particular item.
461461
@param {object?} self The `this` pointer for the mapping function
462-
@param {boolean?} close Close the iterator after an item is mapped to null
463462
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
464463
*/
465464
map<D>(map: (item: T, it: AsyncIterator<any>) => D | null, self?: any): AsyncIterator<D> {
466-
return new MappingIterator<T, D>(this, [bind(map, self)]);
465+
return new MappingIterator<T, D>(this, bind(map, self));
467466
}
468467

469468
/**
@@ -785,9 +784,121 @@ export class IntegerIterator extends AsyncIterator<number> {
785784
}
786785
}
787786

787+
/**
788+
* A synchronous mapping function from one element to another.
789+
* The iterator performing the mapping is passed as a second argument.
790+
*/
791+
export type MapFunction<S, D = S, I extends AsyncIterator<D> = AsyncIterator<D>> =
792+
(item: S, iterator: I) => D | null;
793+
794+
/**
795+
An iterator that calls a synchronous mapping function
796+
on every item from its source iterator.
797+
@extends module:asynciterator.AsyncIterator
798+
*/
799+
export class MappingIterator<S, D = S> extends AsyncIterator<D> {
800+
protected _source: AsyncIterator<S>;
801+
private readonly _destroySource: boolean;
802+
private readonly _mappings: MapFunction<any, any, MappingIterator<S, D>>[];
803+
private readonly _mappingRoot: InternalSource<any>;
804+
805+
// This is wrong: readable should be set by listening to source events
806+
get readable() {
807+
return (this._state < CLOSED) && this._source.readable;
808+
}
809+
810+
/**
811+
* Applies the given mapping to the source iterator.
812+
*/
813+
constructor(
814+
source: AsyncIterator<S>,
815+
mapping?: MapFunction<S, D, MappingIterator<S, D>>,
816+
options?: SourcedIteratorOptions,
817+
);
818+
819+
/**
820+
* Applies the given list of mappings to the mapping root.
821+
*
822+
* This is an optimization for
823+
* root.map(f1).map(f2).map(f3)
824+
* where the combined mapping x => f3(f2(f1(x)))
825+
* is applied to root rather than to the intermediate sources.
826+
*/
827+
constructor(
828+
source: AsyncIterator<S>,
829+
mappings: MapFunction<any, any, MappingIterator<S, D>>[],
830+
mappingRoot: AsyncIterator<any>,
831+
options?: SourcedIteratorOptions,
832+
);
833+
834+
constructor(
835+
source: AsyncIterator<S>,
836+
mappings: MapFunction<S, D, MappingIterator<S, D>> |
837+
MapFunction<any, any, MappingIterator<S, D>>[] = [],
838+
mappingRoot?: AsyncIterator<any> | SourcedIteratorOptions,
839+
options: SourcedIteratorOptions = {},
840+
) {
841+
super();
842+
// Resolve optional parameters
843+
if (!isEventEmitter(mappingRoot)) {
844+
if (mappingRoot)
845+
options = mappingRoot;
846+
mappingRoot = source;
847+
}
848+
this._source = source;
849+
this._mappings = isFunction(mappings) ? [mappings] : mappings;
850+
this._mappingRoot = mappingRoot as InternalSource<any>;
851+
this._destroySource = options.destroySource !== false;
852+
853+
if (mappingRoot.done) {
854+
this.close();
855+
}
856+
else {
857+
_validateSource(mappingRoot);
858+
this._mappingRoot._destination = this;
859+
this._mappingRoot.on('end', destinationClose);
860+
this._mappingRoot.on('error', destinationEmitError);
861+
this._mappingRoot.on('readable', destinationEmitReadable);
862+
}
863+
}
864+
865+
read(): D | null {
866+
let mapped : any = null;
867+
while (mapped === null && (mapped = this._source.read()) !== null) {
868+
for (let i = 0; i < this._mappings.length; i++) {
869+
mapped = this._mappings[i](mapped, this);
870+
if (mapped === null)
871+
break;
872+
}
873+
}
874+
return mapped;
875+
}
876+
877+
map<K>(map: (item: D, it: AsyncIterator<any>) => K | null, self?: any): AsyncIterator<K> {
878+
return new MappingIterator<S, K>(this._source, [...this._mappings, bind(map, self)], this);
879+
}
880+
881+
public close() {
882+
if (this._destroySource)
883+
this._mappingRoot.destroy();
884+
super.close();
885+
}
886+
887+
/* Cleans up the source iterator and ends. */
888+
protected _end(destroy: boolean) {
889+
this._mappingRoot.removeListener('end', destinationClose);
890+
this._mappingRoot.removeListener('error', destinationEmitError);
891+
this._mappingRoot.removeListener('readable', destinationEmitReadable);
892+
delete this._mappingRoot._destination;
893+
if (this._destroySource)
894+
this._mappingRoot.destroy();
895+
super._end(destroy);
896+
}
897+
}
898+
788899

789900
/**
790-
A iterator that maintains an internal buffer of items.
901+
An iterator that maintains an internal buffer of items.
791902
This class serves as a base class for other iterators
792903
with a typically complex item generation process.
793904
@extends module:asynciterator.AsyncIterator
@@ -1252,9 +1363,15 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
12521363
}
12531364
}
12541365

1366+
function destinationEmitReadable<S>(this: InternalSource<S>) {
1367+
this._destination!.emit('readable');
1368+
}
12551369
function destinationEmitError<S>(this: InternalSource<S>, error: Error) {
12561370
this._destination!.emit('error', error);
12571371
}
1372+
function destinationClose<S>(this: InternalSource<S>) {
1373+
this._destination!.close();
1374+
}
12581375
function destinationCloseWhenDone<S>(this: InternalSource<S>) {
12591376
(this._destination as any)._closeWhenDone();
12601377
}
@@ -1263,91 +1380,6 @@ function destinationFillBuffer<S>(this: InternalSource<S>) {
12631380
(this._destination as any)._fillBuffer();
12641381
}
12651382

1266-
export class MappingIterator<T, D = T> extends AsyncIterator<D> {
1267-
private _destroySource: boolean;
1268-
1269-
get readable() {
1270-
return this.source.readable;
1271-
}
1272-
1273-
set readable(readable) {
1274-
this.source.readable = readable;
1275-
}
1276-
1277-
constructor(
1278-
protected source: AsyncIterator<T>,
1279-
private transforms: ((item: any, iterator: AsyncIterator<any>) => any)[],
1280-
private upstream: AsyncIterator<any> = source,
1281-
options: { destroySource?: boolean } = {}
1282-
) {
1283-
// Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing
1284-
// listeners to the original source
1285-
super();
1286-
this._destroySource = options.destroySource !== false;
1287-
if (upstream.done) {
1288-
this.close();
1289-
}
1290-
else {
1291-
_validateSource(upstream);
1292-
// @ts-ignore
1293-
upstream._destination = this;
1294-
upstream.on('end', onSourceEnd);
1295-
upstream.on('error', onSourceError);
1296-
upstream.on('readable', onSourceReadable);
1297-
}
1298-
}
1299-
1300-
read(): D | null {
1301-
const { source, transforms } = this;
1302-
let item, i;
1303-
while ((item = source.read()) !== null) {
1304-
i = transforms.length;
1305-
// Applies each of the transforms in sequence, and terminates
1306-
// early if a transform returns null
1307-
//
1308-
// Do not use a for-of loop here, it slows down transformations
1309-
// by approximately a factor of 2.
1310-
while (i-- >= 1 && (item = transforms[i](item, this)) !== null)
1311-
;
1312-
if (item !== null)
1313-
return item;
1314-
}
1315-
return null;
1316-
}
1317-
1318-
map<K>(map: (item: D, it: AsyncIterator<any>) => K | null, self?: any): AsyncIterator<K> {
1319-
return new MappingIterator<T, K>(this.source, [bind(map, self), ...this.transforms], this);
1320-
}
1321-
1322-
destroy(cause?: Error): void {
1323-
this.upstream.destroy(cause);
1324-
super.destroy(cause);
1325-
}
1326-
1327-
public close() {
1328-
this.upstream.removeListener('end', onSourceEnd);
1329-
this.upstream.removeListener('error', onSourceError);
1330-
this.upstream.removeListener('readable', onSourceReadable);
1331-
if (this._destroySource)
1332-
this.upstream.destroy();
1333-
scheduleTask(() => {
1334-
// @ts-ignore
1335-
delete this.upstream._destination;
1336-
delete this.source;
1337-
});
1338-
super.close();
1339-
}
1340-
}
1341-
1342-
function onSourceError<S>(this: InternalSource<S>, error: Error) {
1343-
this._destination.emit('error', error);
1344-
}
1345-
function onSourceEnd<S>(this: InternalSource<S>) {
1346-
this._destination.close();
1347-
}
1348-
function onSourceReadable<S>(this: InternalSource<S>) {
1349-
this._destination.emit('readable');
1350-
}
13511383

13521384
/**
13531385
An iterator that generates items based on a source iterator
@@ -2043,15 +2075,18 @@ function isSourceExpression<T>(object: any): object is SourceExpression<T> {
20432075
return object && (isEventEmitter(object) || isPromise(object) || isFunction(object));
20442076
}
20452077

2078+
export interface SourcedIteratorOptions {
2079+
destroySource?: boolean;
2080+
}
2081+
20462082
export interface BufferedIteratorOptions {
20472083
maxBufferSize?: number;
20482084
autoStart?: boolean;
20492085
}
20502086

2051-
export interface TransformIteratorOptions<S> extends BufferedIteratorOptions {
2087+
export interface TransformIteratorOptions<S> extends SourcedIteratorOptions, BufferedIteratorOptions {
20522088
source?: SourceExpression<S>;
20532089
optional?: boolean;
2054-
destroySource?: boolean;
20552090
}
20562091

20572092
export interface TransformOptions<S, D> extends TransformIteratorOptions<S> {

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
"scripts": {
2222
"build": "npm run build:clean && npm run build:module && npm run build:commonjs && npm run build:types",
2323
"build:clean": "rm -rf dist",
24-
"build:module": " tsc --module es2015 && mv dist/ts-out/*.js dist && npm run build:module:import",
25-
"build:module:import": " sed -i'.bak' -e 's/\\.\\/linkedlist/.\\/linkedlist.js/' -e 's/\\.\\/taskscheduler/.\\/taskscheduler.js/' dist/asynciterator.js && rm dist/*.bak",
24+
"build:module": "tsc && mv dist/ts-out/*.js dist && npm run build:module:import",
25+
"build:module:import": "sed -i'.bak' -e 's/\\.\\/linkedlist/.\\/linkedlist.js/' -e 's/\\.\\/taskscheduler/.\\/taskscheduler.js/' dist/asynciterator.js && rm dist/*.bak",
2626
"build:commonjs": "tsc --module commonjs && ./.change-extension cjs dist/ts-out/*.js && mv dist/ts-out/*.cjs dist && npm run build:commonjs:import",
2727
"build:commonjs:import": "sed -i'.bak' -e 's/\\.\\/linkedlist/.\\/linkedlist.cjs/' -e 's/\\.\\/taskscheduler/.\\/taskscheduler.cjs/' dist/asynciterator.cjs && rm dist/*.bak",
2828
"build:types": "tsc -d && rm dist/ts-out/*.js && mv dist/ts-out/*.d.ts dist",

test.js

Lines changed: 0 additions & 25 deletions
This file was deleted.

0 commit comments

Comments
 (0)