leberknecht / amqp-rpc-transporter-bundle
The bundle implements RPC functionality for AMQP protocol to the messenger component
Installs: 11 026
Dependents: 0
Suggesters: 0
Security: 0
Stars: 1
Watchers: 1
Forks: 1
Open Issues: 1
Type:symfony-bundle
Requires
- php: >=7.3
- ext-amqp: *
- symfony/framework-bundle: >=4.0
- symfony/messenger: >=4.3-dev
Requires (Dev)
This package is auto-updated.
Last update: 2025-03-23 13:13:41 UTC
README
A workaround bundle to bring RPC functionality to the AMQP transporter of the Symfony messenger component
What it does
This bundle introduces ampqp-rpc
transporter, which is identical to the normal amqp
transporter, except that it will use reply_to
and correlation_id
headers. On receive, the content of the HandledStamp
will be published to the queue identified in the reply_to
field. On send, a random queue name will be generated and after publishing the original message, we will wait for a response on that queue, then adding ResponseStamp
with the result to the envelope.
Installation
composer require leberknecht/amqp-rpc-transporter-bundle
Usage
# messenger.yaml framework: messenger: transports: rpc_calls: dsn: 'amqp-rpc://user:password@rabbit-mq-host/%2f/rpc_calls' routing: # Route your messages to the transports 'App\Message\RpcCallMessage': rpc_calls
Then use it like this:
// send $rpcCallMessage = new RpcCallMessage(); $envelope = $this->messageBus->dispatch($rpcCallMessage); /** @var ResponseStamp $response */ $response = $envelope->last(ResponseStamp::class); $result = $response->getResult();
To set the result from the handler, just return something:
final public function __invoke(RpcCallMessage $message): void { [...] return 42; }
Remarks
This is a work-in-progress, as a first-shot workaround. It would be much more elegant to override the messenger.transport.amqp.factory
service and add rpc: true
and rpc_queue_name
to the messenger config, so we extend the existing transporter instead of bringing in this new one. Also note: in this state, we will always generate a exclusive queue with a random name for the response. This is sub-optimal for heavy loaded queues, see https://www.rabbitmq.com/tutorials/tutorial-six-python.html