vendor/symfony/messenger/DependencyInjection/MessengerPass.php line 133

Open in your IDE?
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\DependencyInjection;
  11. use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
  12. use Symfony\Component\DependencyInjection\ChildDefinition;
  13. use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
  14. use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
  15. use Symfony\Component\DependencyInjection\ContainerBuilder;
  16. use Symfony\Component\DependencyInjection\Definition;
  17. use Symfony\Component\DependencyInjection\Exception\OutOfBoundsException;
  18. use Symfony\Component\DependencyInjection\Exception\RuntimeException;
  19. use Symfony\Component\DependencyInjection\Reference;
  20. use Symfony\Component\Messenger\Handler\HandlerDescriptor;
  21. use Symfony\Component\Messenger\Handler\HandlersLocator;
  22. use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
  23. use Symfony\Component\Messenger\TraceableMessageBus;
  24. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  25. /**
  26. * @author Samuel Roze <samuel.roze@gmail.com>
  27. */
  28. class MessengerPass implements CompilerPassInterface
  29. {
  30. private $handlerTag;
  31. private $busTag;
  32. private $receiverTag;
  33. public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $receiverTag = 'messenger.receiver')
  34. {
  35. if (0 < \func_num_args()) {
  36. trigger_deprecation('symfony/messenger', '5.3', 'Configuring "%s" is deprecated.', __CLASS__);
  37. }
  38. $this->handlerTag = $handlerTag;
  39. $this->busTag = $busTag;
  40. $this->receiverTag = $receiverTag;
  41. }
  42. /**
  43. * {@inheritdoc}
  44. */
  45. public function process(ContainerBuilder $container)
  46. {
  47. $busIds = [];
  48. foreach ($container->findTaggedServiceIds($this->busTag) as $busId => $tags) {
  49. $busIds[] = $busId;
  50. if ($container->hasParameter($busMiddlewareParameter = $busId.'.middleware')) {
  51. $this->registerBusMiddleware($container, $busId, $container->getParameter($busMiddlewareParameter));
  52. $container->getParameterBag()->remove($busMiddlewareParameter);
  53. }
  54. if ($container->hasDefinition('data_collector.messenger')) {
  55. $this->registerBusToCollector($container, $busId);
  56. }
  57. }
  58. if ($container->hasDefinition('messenger.receiver_locator')) {
  59. $this->registerReceivers($container, $busIds);
  60. }
  61. $this->registerHandlers($container, $busIds);
  62. }
  63. private function registerHandlers(ContainerBuilder $container, array $busIds)
  64. {
  65. $definitions = [];
  66. $handlersByBusAndMessage = [];
  67. $handlerToOriginalServiceIdMapping = [];
  68. foreach ($container->findTaggedServiceIds($this->handlerTag, true) as $serviceId => $tags) {
  69. foreach ($tags as $tag) {
  70. if (isset($tag['bus']) && !\in_array($tag['bus'], $busIds, true)) {
  71. throw new RuntimeException(sprintf('Invalid handler service "%s": bus "%s" specified on the tag "%s" does not exist (known ones are: "%s").', $serviceId, $tag['bus'], $this->handlerTag, implode('", "', $busIds)));
  72. }
  73. $className = $this->getServiceClass($container, $serviceId);
  74. $r = $container->getReflectionClass($className);
  75. if (null === $r) {
  76. throw new RuntimeException(sprintf('Invalid service "%s": class "%s" does not exist.', $serviceId, $className));
  77. }
  78. if (isset($tag['handles'])) {
  79. $handles = isset($tag['method']) ? [$tag['handles'] => $tag['method']] : [$tag['handles']];
  80. } else {
  81. $handles = $this->guessHandledClasses($r, $serviceId);
  82. }
  83. $message = null;
  84. $handlerBuses = (array) ($tag['bus'] ?? $busIds);
  85. foreach ($handles as $message => $options) {
  86. $buses = $handlerBuses;
  87. if (\is_int($message)) {
  88. if (\is_string($options)) {
  89. $message = $options;
  90. $options = [];
  91. } else {
  92. throw new RuntimeException(sprintf('The handler configuration needs to return an array of messages or an associated array of message and configuration. Found value of type "%s" at position "%d" for service "%s".', get_debug_type($options), $message, $serviceId));
  93. }
  94. }
  95. if (\is_string($options)) {
  96. $options = ['method' => $options];
  97. }
  98. $options += array_filter($tag);
  99. unset($options['handles']);
  100. $priority = $options['priority'] ?? 0;
  101. $method = $options['method'] ?? '__invoke';
  102. if (isset($options['bus'])) {
  103. if (!\in_array($options['bus'], $busIds)) {
  104. $messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
  105. throw new RuntimeException(sprintf('Invalid configuration '.$messageLocation.' for message "%s": bus "%s" does not exist.', $message, $options['bus']));
  106. }
  107. $buses = [$options['bus']];
  108. }
  109. if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
  110. $messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));
  111. throw new RuntimeException(sprintf('Invalid handler service "%s": class or interface "%s" '.$messageLocation.' not found.', $serviceId, $message));
  112. }
  113. if (!$r->hasMethod($method)) {
  114. throw new RuntimeException(sprintf('Invalid handler service "%s": method "%s::%s()" does not exist.', $serviceId, $r->getName(), $method));
  115. }
  116. if ('__invoke' !== $method) {
  117. $wrapperDefinition = (new Definition('Closure'))->addArgument([new Reference($serviceId), $method])->setFactory('Closure::fromCallable');
  118. $definitions[$definitionId = '.messenger.method_on_object_wrapper.'.ContainerBuilder::hash($message.':'.$priority.':'.$serviceId.':'.$method)] = $wrapperDefinition;
  119. } else {
  120. $definitionId = $serviceId;
  121. }
  122. $handlerToOriginalServiceIdMapping[$definitionId] = $serviceId;
  123. foreach ($buses as $handlerBus) {
  124. $handlersByBusAndMessage[$handlerBus][$message][$priority][] = [$definitionId, $options];
  125. }
  126. }
  127. if (null === $message) {
  128. throw new RuntimeException(sprintf('Invalid handler service "%s": method "%s::getHandledMessages()" must return one or more messages.', $serviceId, $r->getName()));
  129. }
  130. }
  131. }
  132. foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
  133. foreach ($handlersByMessage as $message => $handlersByPriority) {
  134. krsort($handlersByPriority);
  135. $handlersByBusAndMessage[$bus][$message] = array_merge(...$handlersByPriority);
  136. }
  137. }
  138. $handlersLocatorMappingByBus = [];
  139. foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
  140. foreach ($handlersByMessage as $message => $handlers) {
  141. $handlerDescriptors = [];
  142. foreach ($handlers as $handler) {
  143. $definitions[$definitionId = '.messenger.handler_descriptor.'.ContainerBuilder::hash($bus.':'.$message.':'.$handler[0])] = (new Definition(HandlerDescriptor::class))->setArguments([new Reference($handler[0]), $handler[1]]);
  144. $handlerDescriptors[] = new Reference($definitionId);
  145. }
  146. $handlersLocatorMappingByBus[$bus][$message] = new IteratorArgument($handlerDescriptors);
  147. }
  148. }
  149. $container->addDefinitions($definitions);
  150. foreach ($busIds as $bus) {
  151. $container->register($locatorId = $bus.'.messenger.handlers_locator', HandlersLocator::class)
  152. ->setArgument(0, $handlersLocatorMappingByBus[$bus] ?? [])
  153. ;
  154. if ($container->has($handleMessageId = $bus.'.middleware.handle_message')) {
  155. $container->getDefinition($handleMessageId)
  156. ->replaceArgument(0, new Reference($locatorId))
  157. ;
  158. }
  159. }
  160. if ($container->hasDefinition('console.command.messenger_debug')) {
  161. $debugCommandMapping = $handlersByBusAndMessage;
  162. foreach ($busIds as $bus) {
  163. if (!isset($debugCommandMapping[$bus])) {
  164. $debugCommandMapping[$bus] = [];
  165. }
  166. foreach ($debugCommandMapping[$bus] as $message => $handlers) {
  167. foreach ($handlers as $key => $handler) {
  168. $debugCommandMapping[$bus][$message][$key][0] = $handlerToOriginalServiceIdMapping[$handler[0]];
  169. }
  170. }
  171. }
  172. $container->getDefinition('console.command.messenger_debug')->replaceArgument(0, $debugCommandMapping);
  173. }
  174. }
  175. private function guessHandledClasses(\ReflectionClass $handlerClass, string $serviceId): iterable
  176. {
  177. if ($handlerClass->implementsInterface(MessageSubscriberInterface::class)) {
  178. return $handlerClass->getName()::getHandledMessages();
  179. }
  180. try {
  181. $method = $handlerClass->getMethod('__invoke');
  182. } catch (\ReflectionException $e) {
  183. throw new RuntimeException(sprintf('Invalid handler service "%s": class "%s" must have an "__invoke()" method.', $serviceId, $handlerClass->getName()));
  184. }
  185. if (0 === $method->getNumberOfRequiredParameters()) {
  186. throw new RuntimeException(sprintf('Invalid handler service "%s": method "%s::__invoke()" requires at least one argument, first one being the message it handles.', $serviceId, $handlerClass->getName()));
  187. }
  188. $parameters = $method->getParameters();
  189. if (!$type = $parameters[0]->getType()) {
  190. throw new RuntimeException(sprintf('Invalid handler service "%s": argument "$%s" of method "%s::__invoke()" must have a type-hint corresponding to the message class it handles.', $serviceId, $parameters[0]->getName(), $handlerClass->getName()));
  191. }
  192. if ($type instanceof \ReflectionUnionType) {
  193. $types = [];
  194. $invalidTypes = [];
  195. foreach ($type->getTypes() as $type) {
  196. if (!$type->isBuiltin()) {
  197. $types[] = (string) $type;
  198. } else {
  199. $invalidTypes[] = (string) $type;
  200. }
  201. }
  202. if ($types) {
  203. return $types;
  204. }
  205. throw new RuntimeException(sprintf('Invalid handler service "%s": type-hint of argument "$%s" in method "%s::__invoke()" must be a class , "%s" given.', $serviceId, $parameters[0]->getName(), $handlerClass->getName(), implode('|', $invalidTypes)));
  206. }
  207. if ($type->isBuiltin()) {
  208. throw new RuntimeException(sprintf('Invalid handler service "%s": type-hint of argument "$%s" in method "%s::__invoke()" must be a class , "%s" given.', $serviceId, $parameters[0]->getName(), $handlerClass->getName(), $type instanceof \ReflectionNamedType ? $type->getName() : (string) $type));
  209. }
  210. return [$type->getName()];
  211. }
  212. private function registerReceivers(ContainerBuilder $container, array $busIds)
  213. {
  214. $receiverMapping = [];
  215. $failureTransportsMap = [];
  216. if ($container->hasDefinition('console.command.messenger_failed_messages_retry')) {
  217. $commandDefinition = $container->getDefinition('console.command.messenger_failed_messages_retry');
  218. $globalReceiverName = $commandDefinition->getArgument(0);
  219. if (null !== $globalReceiverName) {
  220. if ($container->hasAlias('messenger.failure_transports.default')) {
  221. $failureTransportsMap[$globalReceiverName] = new Reference('messenger.failure_transports.default');
  222. } else {
  223. $failureTransportsMap[$globalReceiverName] = new Reference('messenger.transport.'.$globalReceiverName);
  224. }
  225. }
  226. }
  227. foreach ($container->findTaggedServiceIds($this->receiverTag) as $id => $tags) {
  228. $receiverClass = $this->getServiceClass($container, $id);
  229. if (!is_subclass_of($receiverClass, ReceiverInterface::class)) {
  230. throw new RuntimeException(sprintf('Invalid receiver "%s": class "%s" must implement interface "%s".', $id, $receiverClass, ReceiverInterface::class));
  231. }
  232. $receiverMapping[$id] = new Reference($id);
  233. foreach ($tags as $tag) {
  234. if (isset($tag['alias'])) {
  235. $receiverMapping[$tag['alias']] = $receiverMapping[$id];
  236. if ($tag['is_failure_transport'] ?? false) {
  237. $failureTransportsMap[$tag['alias']] = $receiverMapping[$id];
  238. }
  239. }
  240. }
  241. }
  242. $receiverNames = [];
  243. foreach ($receiverMapping as $name => $reference) {
  244. $receiverNames[(string) $reference] = $name;
  245. }
  246. $buses = [];
  247. foreach ($busIds as $busId) {
  248. $buses[$busId] = new Reference($busId);
  249. }
  250. if ($hasRoutableMessageBus = $container->hasDefinition('messenger.routable_message_bus')) {
  251. $container->getDefinition('messenger.routable_message_bus')
  252. ->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses));
  253. }
  254. if ($container->hasDefinition('console.command.messenger_consume_messages')) {
  255. $consumeCommandDefinition = $container->getDefinition('console.command.messenger_consume_messages');
  256. if ($hasRoutableMessageBus) {
  257. $consumeCommandDefinition->replaceArgument(0, new Reference('messenger.routable_message_bus'));
  258. }
  259. $consumeCommandDefinition->replaceArgument(4, array_values($receiverNames));
  260. try {
  261. $consumeCommandDefinition->replaceArgument(6, $busIds);
  262. } catch (OutOfBoundsException $e) {
  263. // ignore to preserve compatibility with symfony/framework-bundle < 5.4
  264. }
  265. }
  266. if ($container->hasDefinition('console.command.messenger_setup_transports')) {
  267. $container->getDefinition('console.command.messenger_setup_transports')
  268. ->replaceArgument(1, array_values($receiverNames));
  269. }
  270. $container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
  271. $failureTransportsLocator = ServiceLocatorTagPass::register($container, $failureTransportsMap);
  272. $failedCommandIds = [
  273. 'console.command.messenger_failed_messages_retry',
  274. 'console.command.messenger_failed_messages_show',
  275. 'console.command.messenger_failed_messages_remove',
  276. ];
  277. foreach ($failedCommandIds as $failedCommandId) {
  278. if ($container->hasDefinition($failedCommandId)) {
  279. $definition = $container->getDefinition($failedCommandId);
  280. $definition->replaceArgument(1, $failureTransportsLocator);
  281. }
  282. }
  283. }
  284. private function registerBusToCollector(ContainerBuilder $container, string $busId)
  285. {
  286. $container->setDefinition(
  287. $tracedBusId = 'debug.traced.'.$busId,
  288. (new Definition(TraceableMessageBus::class, [new Reference($tracedBusId.'.inner')]))->setDecoratedService($busId)
  289. );
  290. $container->getDefinition('data_collector.messenger')->addMethodCall('registerBus', [$busId, new Reference($tracedBusId)]);
  291. }
  292. private function registerBusMiddleware(ContainerBuilder $container, string $busId, array $middlewareCollection)
  293. {
  294. $middlewareReferences = [];
  295. foreach ($middlewareCollection as $middlewareItem) {
  296. $id = $middlewareItem['id'];
  297. $arguments = $middlewareItem['arguments'] ?? [];
  298. if (!$container->has($messengerMiddlewareId = 'messenger.middleware.'.$id)) {
  299. $messengerMiddlewareId = $id;
  300. }
  301. if (!$container->has($messengerMiddlewareId)) {
  302. throw new RuntimeException(sprintf('Invalid middleware: service "%s" not found.', $id));
  303. }
  304. if ($container->findDefinition($messengerMiddlewareId)->isAbstract()) {
  305. $childDefinition = new ChildDefinition($messengerMiddlewareId);
  306. $childDefinition->setArguments($arguments);
  307. if (isset($middlewareReferences[$messengerMiddlewareId = $busId.'.middleware.'.$id])) {
  308. $messengerMiddlewareId .= '.'.ContainerBuilder::hash($arguments);
  309. }
  310. $container->setDefinition($messengerMiddlewareId, $childDefinition);
  311. } elseif ($arguments) {
  312. throw new RuntimeException(sprintf('Invalid middleware factory "%s": a middleware factory must be an abstract definition.', $id));
  313. }
  314. $middlewareReferences[$messengerMiddlewareId] = new Reference($messengerMiddlewareId);
  315. }
  316. $container->getDefinition($busId)->replaceArgument(0, new IteratorArgument(array_values($middlewareReferences)));
  317. }
  318. private function getServiceClass(ContainerBuilder $container, string $serviceId): string
  319. {
  320. while (true) {
  321. $definition = $container->findDefinition($serviceId);
  322. if (!$definition->getClass() && $definition instanceof ChildDefinition) {
  323. $serviceId = $definition->getParent();
  324. continue;
  325. }
  326. return $definition->getClass();
  327. }
  328. }
  329. }