-
-
Notifications
You must be signed in to change notification settings - Fork 21
Description
Hello there,
I'm working on a project where I have a pipeline (one readable stream as the source and multiple Transforms).
In normal circumstances, everything works fine. But it is when a huge amount of data enters the pipeline in rapid succession that things get strange (the problem appears with at least 4000 chunks but I guess that's irrelevant):
The pipeline hangs after processing a small portion of the data, and this continues forever. It doesn't even resume.
By adding log and investigating the problem, I came to the conclusion that it is always the last Transform that hangs. And to be sure, I deleted the last Transform (so now, the n-1 Transform is the last one in the pipeline) and it is, then, the new last Transform that hangs.
I thought that it is a memory problem and so I tried some workarounds:
- I increased the
highWaterMarkthreshold, but this didn't completely solve the issue, though it delayed it (the pipeline processes more data but still hangs at some point). - I increased/decreased the
parallelismoption for eachTransformand this didn't solve it either.
At that point, I decided to simplify my pipeline to get to some sort of a skeleton model. It turned out that the ordered option is causing the problem when set to false.
Valid workaround
I thought that it is maybe one of the Transforms is being corked at some point. So I added a data event listener for each one to uncork manually if this is the case. It worked. But then I tried, for no valid reason, to leave the callback of the listener empty (not trying to uncork) and it still works.
So, long story short : adding an empty listener on the Transform fixes the hanging problem (code chunk below)
return transform(myParallelism, { ordered: false }, async (data: Input, callback: TransformCallback) => {
try {
const result = await someAsyncOperation(data)
callback(null, result)
} catch (error) {
callback(error)
}
}).on('data', () => {
})
I'm not really sure, though, if the real problem comes for this library or from the implementation of node streams.
Also I don't have a minimal code to show the problem, but I can work on one if deemed necessary.
Thank you so much 😁