ssigwart / sqs-queue-processor
Library to process SQS queues.
v1.0.0
2024-01-01 18:22 UTC
Requires
- php: >=8.1.0
- aws/aws-sdk-php: ^3.0
- psr/log: ^3.0
- ssigwart/aws-high-availability-sqs: ^1.0
Requires (Dev)
- phpunit/phpunit: ^10.4
README
This library provides an SQS queue processor with the following features:
- Duplicate message processing.
- Delayed retry capability.
- Error handling options.
Installation
composer require ssigwart/sqs-queue-processor
Usage
Use the following to set up and run a queue processor.
$sqsQueueProcessor = new SqsQueueProcessor(/* Fill in parameters */); $sqsQueueProcessor->processMessages();
Parameter: SqsQueueProcessorConfiguration
- This parameter specifies the maximum number of messages to return on a single SQS
ReceiveMessage
call (default 10), the visibility timeout to use for received messages (default 300), and the number of seconds to wait for a message (default 20). - You can also control what gets logged when processing messages.
Parameter: SqsQueueProcessorTimingInterface
- This parameter will tell the queue processor when to stop processing messages using
shouldStopProcessingMessages
. - To loop forever, have it always return false.
- You can also check the time and stop it after a given time.
- You can also have the same object implement this and
SqsQueueProcessorMessageProviderInterface
and have it stop after it has received an empty message list.
Parameter: SqsQueueProcessorCleanupInterface
- This parameter includes cleanup functions such as
cleanUpAfterExceptionProcessingMessage
. If using a transactional database, you make want to callROLLBACK
on the database.
Parameter: SqsQueueProcessorMessageProviderInterface
- This parameter is used to get messages, delete messages, and update the visibility timeout on messages.
- It is recommended to use
AwsHighAvailabilitySqsMessageProvider
, which implements this interface.
Parameter: SqsQueueProcessorMessageStatusInterface
- This parameter provides functions to determine if a message is being processed or has completed processing.
- Suggested ways to implement this interface are with Memcached, Redis, or a database.
Parameter: SqsQueueProcessorSingleMessageProcessorInterface
- This parameter is the core interface for processing the message.
- Your work to process the message should be in here.
- It should return a
SqsQueueProcessorSingleMessageProcessorResult
:- For a message that has completed processing successfully, return
SqsQueueProcessorSingleMessageProcessorResult::newSuccessResult()
. - For a message that you want to delay processing on, return
SqsQueueProcessorSingleMessageProcessorResult::newDelayedProcessingResult(...)
with a new visibility timeout. - For a message that had an error while processing, return
SqsQueueProcessorSingleMessageProcessorResult::newFailureResult(...)
, possibly with a new visibility timeout.
- For a message that has completed processing successfully, return
- It is recommended to catch any
Throwable
, do cleanup, and return a failure, but uncaught exceptions will be treated as a failure.
Parameter: SqsQueueProcessorErrorReportingInterface
- This parameter provides error handling capabilities.
- Below are the suggested ways to handle different error types:
MSG_MARKED_AS_COMPLETED
- Store rawSqsMessage
in S3 and send an alert or create a task to review the message to be sure it was successfully processed. The S3 data can be used to requeue the message if needed.MSG_MARKED_AS_IN_PROGRESS
- Send an alert or create a task to review the message if a message has been marked as in progress for a while. It possibly the in-progress flag failed to be cleared when message processing failed.EXCEPTION_THROWN_HANDLING_MESSAGE
- Send an alert or create a task to investigate error.FAILED_TO_MARK_AS_PROCESSED
- Send an alert. This often won't be much of an issue. It becomes an issue if the message is failed to be marked as processed and the SQS message is received a second time (e.g. if an SQS delete failed).FAILED_TO_DELETE_MSG
- Send an alert or create a task to delete the message.FAILED_TO_CLEAR_MSG_IN_PROGRESS_FLAG
- Send an alert or create a task to clear the flag so the message can be processed again later.
Parameter: LoggerInterface
- This parameter is a
\Psr\Log\LoggerInterface
, so any implementation of that will do. - All logged message will automatically have the SQS message ID appended to it to aid in distinguishing log messages for different SQS messages.