Skip to content

Commit a4aede4

Browse files
committed
Added concurrent-fiber throttle test.
1 parent 8860ff4 commit a4aede4

File tree

3 files changed

+36
-8
lines changed

3 files changed

+36
-8
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
"require": {
1212
"php": "^7.1",
1313
"amphp/amp": "^2",
14-
"async/throttle": "dev-join",
14+
"async/throttle": "^3",
1515
"psr/cache": "^1",
1616
"psr/container": "^1",
1717
"scriptfusion/retry": "^2.1",

src/Specification/AsyncImportSpecification.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
namespace ScriptFUSION\Porter\Specification;
55

6+
use ScriptFUSION\Async\Throttle\NullThrottle;
67
use ScriptFUSION\Async\Throttle\Throttle;
78
use ScriptFUSION\Porter\Connector\Recoverable\ExponentialAsyncDelayRecoverableExceptionHandler;
89
use ScriptFUSION\Porter\Connector\Recoverable\RecoverableExceptionHandler;
@@ -61,7 +62,7 @@ protected static function createDefaultRecoverableExceptionHandler(): Recoverabl
6162

6263
final public function getThrottle(): Throttle
6364
{
64-
return $this->throttle ?? $this->throttle = new Throttle();
65+
return $this->throttle ?? $this->throttle = new NullThrottle;
6566
}
6667

6768
final public function setThrottle(Throttle $throttle): self

test/Integration/PorterAsyncTest.php

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
use Amp\Iterator;
77
use Amp\Loop;
88
use Amp\Producer;
9+
use Amp\Promise;
10+
use ScriptFUSION\Async\Throttle\DualThrottle;
911
use ScriptFUSION\Porter\Collection\AsyncRecordCollection;
1012
use ScriptFUSION\Porter\Collection\CountableAsyncPorterRecords;
1113
use ScriptFUSION\Porter\Collection\CountableAsyncProviderRecords;
@@ -22,6 +24,7 @@
2224
use ScriptFUSION\Porter\Transform\AsyncTransformer;
2325
use ScriptFUSION\Porter\Transform\FilterTransformer;
2426
use ScriptFUSIONTest\MockFactory;
27+
use function Amp\call;
2528

2629
/**
2730
* @see Porter
@@ -200,20 +203,44 @@ public function testPorterAwareAsyncTransformer(): void
200203
}
201204

202205
/**
203-
* Tests that the throttle is invoked during fetch operations.
206+
* Tests that a working throttle implementation is invoked during fetch operations.
204207
*/
205208
public function testThrottle(): \Generator
206209
{
207-
$throttle = $this->specification->getThrottle();
208-
$throttle->setMaxPerSecond(1);
209-
210-
$start = microtime(true);
210+
$this->specification->setThrottle($throttle = new DualThrottle);
211+
$throttle->setMaxConcurrency(1);
211212

212213
$records = $this->porter->importAsync($this->specification);
213214
self::assertTrue($throttle->isThrottling());
214215

215216
yield $records->advance();
216217
self::assertFalse($throttle->isThrottling());
217-
self::assertGreaterThan(1, microtime(true) - $start);
218+
}
219+
220+
/**
221+
* Tests that a working throttle implementation can be called from multiple fibers queueing excess objects.
222+
*/
223+
public function testThrottleConcurrentFibers(): \Generator
224+
{
225+
$this->specification->setThrottle($throttle = new DualThrottle);
226+
$throttle->setMaxPerSecond(1);
227+
228+
$import = function (): Promise {
229+
return call(
230+
function (): \Generator {
231+
$records = $this->porter->importAsync($this->specification);
232+
233+
while (yield $records->advance()) {
234+
// Do nothing.
235+
}
236+
}
237+
);
238+
};
239+
240+
$start = microtime(true);
241+
242+
yield [$import(), $import(), $import()];
243+
244+
self::assertGreaterThan(3, microtime(true) - $start);
218245
}
219246
}

0 commit comments

Comments
 (0)