-
-
Notifications
You must be signed in to change notification settings - Fork 13
Open
Description
When the default bus is not a RoutableMessageBus as it is in Symfony, SyncTransportFactory causes a bug. The SyncTransport currently behaves so that it takes the middlewares from the default bus even though BusNameStamp sets a different bus.
I tried to submit a pull request, but there's a problem with the test because it requires a MessageBus instance. (https://github.com/contributte/messenger/blob/master/tests/Cases/DI/MessengerExtension.routing.phpt#L103)
Here is the fix for BusPass:
public function loadPassConfiguration(): void
{
$builder = $this->getContainerBuilder();
$config = $this->getConfig();
$defaultBus = null;
// Iterate all buses
foreach ($config->bus as $name => $busConfig) {
$middlewares = [];
$builder->addDefinition($this->prefix(sprintf('bus.%s.locator', $name)))
->setFactory(ContainerServiceHandlersLocator::class, [[]])
->setAutowired(false);
if ($busConfig->defaultMiddlewares === true) {
$middlewares[] = $builder->addDefinition($this->prefix(sprintf('bus.%s.middleware.busStampMiddleware', $name)))
->setFactory(AddBusNameStampMiddleware::class, [$name])
->setAutowired(false);
$middlewares[] = $builder->addDefinition($this->prefix(sprintf('bus.%s.middleware.dispatchAfterCurrentBusMiddleware', $name)))
->setFactory(DispatchAfterCurrentBusMiddleware::class)
->setAutowired(false);
$middlewares[] = $builder->addDefinition($this->prefix(sprintf('bus.%s.middleware.failedMessageProcessingMiddleware', $name)))
->setFactory(FailedMessageProcessingMiddleware::class)
->setAutowired(false);
}
// Custom middlewares
foreach ($busConfig->middlewares as $index => $middlewareConfig) {
$middlewares[] = $builder->addDefinition($this->prefix(sprintf('bus.%s.middleware.custom%sMiddleware', $name, $index)))
->setFactory($middlewareConfig)
->setAutowired(false);
}
if ($busConfig->defaultMiddlewares === true) {
$middlewares[] = $builder->addDefinition($this->prefix(sprintf('bus.%s.middleware.sendMiddleware', $name)))
->setFactory(SendMessageMiddleware::class, [$this->prefix('@routing.locator'), $this->prefix('@event.dispatcher'), $busConfig->allowNoSenders])
->setAutowired(false)
->addSetup('setLogger', [$this->prefix('@logger.logger')]);
$middlewares[] = $builder->addDefinition($this->prefix(sprintf('bus.%s.middleware.handleMiddleware', $name)))
->setFactory(HandleMessageMiddleware::class, [$this->prefix(sprintf('@bus.%s.locator', $name)), $busConfig->allowNoHandlers])
->setAutowired(false)
->addSetup('setLogger', [$this->prefix('@logger.logger')]);
}
// Register message bus
$builder->addDefinition($this->prefix(sprintf('bus.%s.bus', $name)))
->setFactory($busConfig->class ?? SymfonyMessageBus::class, [$middlewares])
->setAutowired(false)
->setTags([MessengerExtension::BUS_TAG => $name]);
// Register bus wrapper
if (isset($busConfig->wrapper) || isset(self::BUS_WRAPPERS[$name])) {
$builder->addDefinition($this->prefix(sprintf('bus.%s.wrapper', $name)))
->setFactory($busConfig->wrapper ?? self::BUS_WRAPPERS[$name], [$this->prefix(sprintf('@bus.%s.bus', $name))]);
}
// Register as fallback bus
$isDefaultBus = $busConfig->autowired ?? count($builder->findByTag(MessengerExtension::BUS_TAG)) === 0;
if ($isDefaultBus) {
$defaultBus = $this->prefix(sprintf('@bus.%s.bus', $name));
}
}
// Register bus container
$builder->addDefinition($this->prefix('bus.container'))
->setFactory(NetteContainer::class)
->setAutowired(false);
// Register routable bus
$builder->addDefinition($this->prefix('bus.routable'))
->setType(MessageBusInterface::class)
->setFactory(RoutableMessageBus::class, [$this->prefix('@bus.container'), $defaultBus])
->setAutowired(true);
// Register bus registry
$builder->addDefinition($this->prefix('busRegistry'))
->setFactory(BusRegistry::class, [$this->prefix('@bus.container')]);
}Metadata
Metadata
Assignees
Labels
No labels
Type
Projects
Status
Todo