micronative / sqs
AWS SQS Transport (fork and improve from enqueue/sqs)
2.0.1
2022-04-24 09:52 UTC
Requires
- php: ^7.1.3
- aws/aws-sdk-php: ~3.26
- enqueue/dsn: ^0.9
- queue-interop/queue-interop: ^0.7|^0.8
Requires (Dev)
- enqueue/test: 0.9.x-dev
- phpunit/phpunit: ~5.4.0
- queue-interop/queue-spec: ^0.6
- dev-master
- 2.0.1
- 2.0.0
- 1.1.2
- 1.1.1
- 1.1.0
- 1.0.3
- 1.0.2
- 1.0.1
- 1.0.0
- 0.9.x-dev
- 0.9.12
- 0.9.11
- 0.9.8
- 0.9.7
- 0.9.2
- 0.9.1
- 0.9.0
- 0.8.x-dev
- 0.8.42
- 0.8.41
- 0.8.40
- 0.8.39
- 0.8.38
- 0.8.37
- 0.8.36
- 0.8.34
- 0.8.32
- 0.8.24
- 0.8.23
- 0.8.21
- 0.8.18
- 0.8.16
- 0.8.12
- 0.8.11
- 0.8.9
- 0.8.3
- 0.8.0
- 0.7.15
- 0.7.8
- 0.7.6
- 0.7.5
- 0.7.4
- 0.7.0
- 0.6.2
- 0.6.1
- 0.6.0
- 0.5.2
- 0.5.0
- 0.4.9
- 0.4.5
- 0.4.4
- 0.4.3
- 0.4.2
- 0.4.1
- 0.4.0
- 0.3.8
- 0.3.7
- 0.3.6
- dev-amqp-add-basic-consume-support
This package is auto-updated.
Last update: 2024-11-12 11:42:17 UTC
README
Configuration
"require": { "micronative/sqs": "^2.0.0" }, "repositories": [ { "type": "vcs", "url": "https://github.com/micronative/sqs" } ],
Run
composer require micronative/sqs:2.0.0
Description
This project was forked from enqueue/sqs and made the following improvements:
- Move all classes to src
- Rename Tests to tests
- Move examples to tests
- Change namespace to Micronative\Sqs
SqsProducer->send():
public function send(Destination $destination, Message $message): void { InvalidDestinationException::assertDestinationInstanceOf($destination, SqsDestination::class); InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class); $body = $message->getBody(); if (empty($body)) { throw new InvalidMessageException('The message body must be a non-empty string.'); } $arguments = [ '@region' => $destination->getRegion(), 'MessageBody' => $body, 'QueueUrl' => $this->context->getQueueUrl($destination), ]; if (null !== $this->deliveryDelay) { $arguments['DelaySeconds'] = (int) $this->deliveryDelay / 1000; } if ($message->getDelaySeconds()) { $arguments['DelaySeconds'] = $message->getDelaySeconds(); } if ($message->getMessageDeduplicationId()) { $arguments['MessageDeduplicationId'] = $message->getMessageDeduplicationId(); } if ($message->getMessageGroupId()) { $arguments['MessageGroupId'] = $message->getMessageGroupId(); } if ($message->getHeaders()) { $arguments['MessageAttributes']['Headers'] = [ 'DataType' => 'String', 'StringValue' => json_encode([$message->getHeaders()]), ]; } if ($message->getProperties()) { foreach ($message->getProperties() as $name => $value) { $arguments['MessageAttributes'][$name] = ['DataType' => 'String', 'StringValue' => $value]; } } $result = $this->context->getSqsClient()->sendMessage($arguments); if (false == $result->hasKey('MessageId')) { throw new \RuntimeException('Message was not sent'); } $message->setMessageId($result['MessageId']); }
SqsConsumer->covertMessage():
protected function convertMessage(array $sqsMessage): SqsMessage { $message = $this->context->createMessage(); $message->setBody($sqsMessage['Body']); $message->setReceiptHandle($sqsMessage['ReceiptHandle']); if (isset($sqsMessage['Attributes'])) { $message->setAttributes($sqsMessage['Attributes']); if (isset($sqsMessage['Attributes']['MessageDeduplicationId'])) { $message->setMessageDeduplicationId($sqsMessage['Attributes']['MessageDeduplicationId']); } if (isset($sqsMessage['Attributes']['MessageGroupId'])) { $message->setMessageGroupId($sqsMessage['Attributes']['MessageGroupId']); } } if (isset($sqsMessage['Attributes']['ApproximateReceiveCount'])) { $message->setRedelivered(((int) $sqsMessage['Attributes']['ApproximateReceiveCount']) > 1); } if (isset($sqsMessage['MessageAttributes'])) { foreach ($sqsMessage['MessageAttributes'] as $name => $attribute) { if ($name == 'Headers') { $headers = json_decode($attribute['StringValue'], true); $message->setHeaders($headers); } else { $message->setProperty($name, $attribute['StringValue']); } } } if (isset($sqsMessage['MessageId'])) { $message->setMessageId($sqsMessage['MessageId']); } return $message; }