idct / symfony-nats-messenger
Symfony NATS (JetStream) Messenger Bridge
Requires
- php: ^8.2
- idct/php-nats-jetstream-client: ^1
- symfony/framework-bundle: ^7 || ^8
- symfony/messenger: ^7.2 || ^8.0
- symfony/uid: ^7.2 || ^8.0
Requires (Dev)
- behat/behat: ^3.29
- dg/bypass-finals: ^1.9
- phpstan/phpstan: ^2.1
- phpunit/phpunit: ^11
Suggests
- ext-igbinary: For improved serialization performance when using the default IgbinarySerializer
README
A Symfony Messenger transport integration for NATS JetStream, enabling reliable asynchronous messaging with persistent message streaming.
Features
- π High-Performance Messaging - Leverage NATS JetStream for fast, reliable message delivery
- π¦ Symfony Integration - Seamless integration with Symfony Messenger
- βοΈ Configurable Consumers - Support for multiple consumer strategies
- π Flexible Batching - Adjustable message batch sizes and timeouts
- π Authentication Support - Built-in support for NATS authentication
- π Stream Configuration - Configurable retention policies and replication
- π§ͺ Thoroughly Tested - 102 unit tests with ~96% code coverage
π This project looks for funding. Love my work? Support it! π
-
β Buy me a coffee: https://buymeacoffee.com/idct
-
π Sponsor: https://github.com/sponsors/ideaconnect
-
πͺ BTC: bc1qntms755swm3nplsjpllvx92u8wdzrvs474a0hr
-
π ETH: 0x08E27250c91540911eD27F161572aFA53Ca24C0a
-
β‘ TRX: TVXWaU4ScNV9RBYX5RqFmySuB4zF991QaE
-
π LTC: LN5ApP1Yhk4iU9Bo1tLU8eHX39zDzzyZxB
Requirements
System Requirements
- PHP: ^8.2
- Symfony: ^7.2 || ^8
- NATS Server: ^2.9 with JetStream enabled, ^2.12 for scheduled messages support.
Installation
composer require idct/symfony-nats-messenger
Operational note: This is the package installation command, not a runtime behavior covered by the library test suite. The installed package is exercised by the unit and functional tests documented below.
Development Setup
For contributors and development:
# Install dependencies composer install # Run static analysis and the default unit test suite after every modification composer test # Start NATS server for testing composer nats:start # Run unit tests with coverage composer test:unit # Set up functional tests composer test:functional:setup # Run functional tests composer test:functional # Stop NATS server composer nats:stop
Verification note: This is the repository verification workflow used for this package. The same command sequence is run when validating documentation and code changes in this repository.
Quick Start
1. Configure NATS Server
Ensure your NATS server has JetStream enabled:
nats-server -js
Operational note: Starting
nats-server -jsis an environment prerequisite rather than package behavior. Automated coverage begins once JetStream is available and the transport is exercised by the functional scenarios below.
2. Set Up Transport in Symfony
Add the NATS transport to your Symfony Messenger configuration:
# config/packages/messenger.yaml framework: messenger: transports: nats_transport: dsn: 'nats-jetstream://localhost:4222/my-stream/my-topic' options: consumer: 'my-consumer' batching: 5 max_batch_timeout: 1.0 routing: 'App\Message\MyAsyncMessage': nats_transport
Tested by:
testReadmeDsnExamplesParseSuccessfully[README: quick-start transport],testReadmeConfigurationOptionsAreAccepted,testReadmeBatchingExamplesAreAccepted,testReadmeTimeoutExamplesAreAccepted
3. Configure Custom Serializers (Optional)
By default, the transport uses igbinary serialization for high performance when the extension is available. If ext-igbinary is not installed, it falls back to Symfony's PhpSerializer and emits a notice. You can also customize this explicitly:
Using IgbinarySerializer (Default)
# config/packages/messenger.yaml framework: messenger: transports: nats_transport: dsn: 'nats-jetstream://localhost:4222/my-stream/my-topic' serializer: 'IDCT\NatsMessenger\Serializer\IgbinarySerializer' options: consumer: 'my-consumer'
Note: Custom serializer services are resolved by Symfony before transport creation. When no serializer service is provided, the transport instantiates its built-in default serializer and falls back to Symfony's PhpSerializer if ext-igbinary is unavailable.
For example:
igbinary_serializer: class: IDCT\NatsMessenger\Serializer\IgbinarySerializer
or:
IDCT\NatsMessenger\Serializer\IgbinarySerializer: ~
Tested by:
createTransport_UsesProvidedSerializer,serialize_WithValidEnvelope_ReturnsSerializedString,decode_WithValidEncodedEnvelope_ReturnsEnvelope,testConstructorWithoutIgbinaryDoesNotCrash
Creating Custom Serializers
You can create your own serializer by extending AbstractEnveloperSerializer:
use IDCT\NatsMessenger\Serializer\AbstractEnveloperSerializer; use Symfony\Component\Messenger\Envelope; class MyCustomSerializer extends AbstractEnveloperSerializer { protected function serialize(Envelope $envelope): string { // Your custom serialization logic return serialize($envelope); } protected function deserialize(string $data): mixed { // Your custom deserialization logic return unserialize($data); } }
Tested by:
readmeCustomSerializerExample_EncodeDecode_RoundTrips,readmeCustomSerializerExample_DecodeInvalidBody_ThrowsExceptionβ the exact code above is compiled and exercised viaReadmeExampleSerializerin the unit tests.
For reference implementations, see:
src/Serializer/IgbinarySerializer.php- Binary serializationsrc/Serializer/AbstractEnveloperSerializer.php- Base class
4. Send Messages
use App\Message\MyAsyncMessage; use Symfony\Component\Messenger\MessageBus; class MyController { public function __construct(private MessageBus $bus) {} public function send(): void { $this->bus->dispatch(new MyAsyncMessage('Hello NATS!')); } }
Tested by:
testSendPublishesEncodedBodyWithoutHeaders,testSendUsesRequestWithHeadersWhenHeadersArePresent, Behat scenarioComplete message flow - send, check stats, consume, verify
5. Handle Messages
use App\Message\MyAsyncMessage; use Symfony\Component\Messenger\Handler\MessageHandlerInterface; class MyAsyncMessageHandler implements MessageHandlerInterface { public function __invoke(MyAsyncMessage $message): void { echo "Processing: " . $message->getText(); } }
Tested by: Behat scenarios
Complete message flow - send, check stats, consume, verify,Send and consume messages with a custom consumer name, andHigh-volume message processing with file output verificationβ handlers are exercised through realmessenger:consumeruns.
6. Consume Messages
symfony console messenger:consume nats_transport
Tested by: Behat scenarios
Complete message flow - send, check stats, consume, verify,Send and consume messages with a custom consumer name, andPartial message consumption with multiple consumersβ the Behat context runsmessenger:consumeas a Symfony CLI process.
Configuration Guide
DSN Format
nats-jetstream://[user:password@]host:port/stream-name/topic-name
Tested by:
testBuildWithValidDsnReturnsConfiguration,testBuildWithoutPathThrowsException,testBuildWithoutTopicThrowsException,createTransport_WithValidDsn_ReturnsNatsTransportInstance
Examples:
# Default port (4222) nats-jetstream://localhost/my-stream/my-topic # Custom port nats-jetstream://localhost:5000/my-stream/my-topic # With authentication nats-jetstream://user:password@localhost:4222/my-stream/my-topic # With query parameters nats-jetstream://localhost/my-stream/my-topic?consumer=worker&batching=10 # TLS transport scheme nats-jetstream+tls://localhost:4222/my-stream/my-topic
Tested by:
testReadmeDsnExamplesParseSuccessfullyβ each DSN above is parsed through the configuration builder via a dedicated data provider case.
Configuration Options
framework: messenger: transports: nats_transport: dsn: 'nats-jetstream://localhost:4222/my-stream/my-topic' options: # Consumer Configuration consumer: 'my-consumer' # Consumer group name (default: 'client') # Performance Tuning batching: 5 # Messages per batch (default: 1) max_batch_timeout: 1.0 # Timeout in seconds for batch fetching (default: 1) connection_timeout: 1.0 # Socket I/O timeout in seconds (default: 1) # Stream Retention Policies stream_max_age: 86400 # Max message age in seconds (0 = unlimited, default: 0) stream_max_bytes: 1073741824 # Max storage size in bytes (null = unlimited) stream_max_messages: 1000000 # Max number of messages in the stream (null = unlimited) stream_max_messages_per_subject: 1000 # Max number of messages retained per subject (null = unlimited) # Storage Backend stream_storage: 'file' # Storage type: 'file' or 'memory' (default: 'file') # High Availability stream_replicas: 1 # Number of replicas (default: 1) # Failure Handling Strategy retry_handler: 'symfony' # symfony|nats (default: symfony) # symfony => TERM on failed/rejected message # nats => NAK on failed/rejected message # Scheduled / Delayed Messages (requires NATS >= 2.12) scheduled_messages: false # Enable scheduled message support (default: false) # When enabled, Symfony DelayStamp triggers NATS # scheduled message delivery via Nats-Schedule headers # TLS Configuration tls_required: false # Force TLS for NATS connection (default: false) tls_handshake_first: false # Use TLS-first handshake mode (default: false) tls_ca_file: null # Path to CA certificate file tls_cert_file: null # Path to client certificate file tls_key_file: null # Path to client private key tls_key_passphrase: null # Passphrase for encrypted private key tls_peer_name: null # Override TLS peer name for certificate validation tls_verify_peer: true # Verify TLS peer certificate (default: true) # Additional Authentication token: null # NATS token authentication username: null # Overrides DSN username if provided password: null # Overrides DSN password if provided jwt: null # JWT authentication value nkey: null # NKey public value
Tested by:
testReadmeConfigurationOptionsAreAccepted(all options above),testReadmeBatchingExamplesAreAccepted,testReadmeTimeoutExamplesAreAccepted,testReadmeStreamRetentionExamplesAreAccepted,testBuildWithTlsAndAuthOptionsPropagatesToNatsOptions
Retry Handler Behavior
retry_handler: symfony(default) sendsTERMwhen a message fails during transport decoding or is rejected.retry_handler: natssendsNAKwhen a message fails during transport decoding or is rejected.
Tested by:
testRejectUsesTermByDefault,testRejectUsesNakWhenRetryHandlerIsNats,testBuildUsesRetryHandlerFromQuery, Behat scenariosnats_nak.featureandnats_term.feature
Important: Consumer Strategies
This is critical to understand before setting up multiple transport instances:
β οΈ Strategy A: Same Consumer, Batching = 1
Use when: Multiple instances should cooperate on the same consumer
# All instances use the same consumer with batching=1 transports: nats_worker_1: dsn: 'nats-jetstream://localhost/my-stream/my-topic' options: consumer: 'shared-consumer' # Same consumer name batching: 1 # MUST be 1 for shared consumers nats_worker_2: dsn: 'nats-jetstream://localhost/my-stream/my-topic' options: consumer: 'shared-consumer' # Same consumer name batching: 1 # MUST be 1 for shared consumers
Why batching must be 1:
- With explicit acknowledge (ACK) mode, only messages that are explicitly acknowledged are considered processed
- Multiple instances sharing the same consumer need to ACK individually
- Batching > 1 with multiple instances causes delivery conflicts
- Each instance should fetch and ACK one message at a time
Benefits:
- Automatic load balancing across instances
- NATS handles message distribution
- Guaranteed single processing per message
Tested by:
testReadmeBatchingExamplesAreAccepted(batching=1), Behat scenarioPartial message consumption with multiple consumers
β Strategy B: Different Consumers, Any Batching
Use when: Each instance needs independent message processing (duplicates allowed)
# Each instance uses a different consumer transports: nats_worker_1: dsn: 'nats-jetstream://localhost/my-stream/my-topic' options: consumer: 'worker-1-consumer' # Unique consumer per instance batching: 10 # Can use any batching nats_worker_2: dsn: 'nats-jetstream://localhost/my-stream/my-topic' options: consumer: 'worker-2-consumer' # Unique consumer per instance batching: 10 # Can use any batching
Why this works:
- Each consumer maintains its own state
- All messages are delivered to all consumers independently
- Each instance can use higher batching for better throughput
- Duplicate processing is expected (fan-out pattern)
Use cases:
- Event broadcasting to multiple systems
- Multiple independent processors
- Audit logging / event replay
Tested by:
testReadmeBatchingExamplesAreAccepted(batching=10), Behat scenarioPartial message consumption with multiple consumers
Batching & Timeouts
Batching Explained
- Higher batching: Better throughput, slightly higher latency
- Lower batching: Lower latency, slightly reduced throughput
- Optimal batching: Depends on message size and processing time
options: batching: 1 # Fetch 1 message at a time (low latency) batching: 5 # Fetch 5 messages (balanced) batching: 20 # Fetch 20 messages (high throughput)
Tested by:
testReadmeBatchingExamplesAreAcceptedβ values 1, 5, 10, 20, 50 are all verified.
Batch Timeout
Controls how long to wait for a batch to fill:
options: batching: 10 max_batch_timeout: 0.5 # Wait max 0.5s for batch to fill # Returns early if timeout reached
Tested by:
testReadmeTimeoutExamplesAreAcceptedβ values 0.5, 1.0, 2.0 are verified. Behat scenariosnats_batching.feature.
Example scenarios:
- If you set
batching: 10andmax_batch_timeout: 0.5 - If 10 messages arrive quickly, all are fetched immediately
- If only 3 messages arrive in 0.5s, return those 3
Connection Timeout
Controls the socket-level I/O timeout for all NATS operations:
options: connection_timeout: 2.0 # Socket timeout in seconds
Tested by:
testReadmeTimeoutExamplesAreAccepted(1.0, 2.0, 3.0),testBuildWithConnectionTimeoutPropagatesMs
Purpose:
- Sets the timeout for socket read/write operations
- Affects all NATS communication (publish, subscribe, ack, etc.)
- Lower values fail faster on network issues
- Higher values tolerate slower networks
When to adjust:
- Increase for high-latency networks or geographically distant NATS servers
- Decrease for faster failure detection in local environments
- Default of 1 second works well for most local/regional deployments
- Don't wait forever for the batch to fill
Stream Configuration
Retention Policies
Control how long messages are kept in the stream:
options: # By age (24 hours) stream_max_age: 86400 # By total size (1GB) stream_max_bytes: 1073741824 # By total message count across the entire stream (NATS: max_msgs) stream_max_messages: 1000000 # By message count per individual subject (NATS: max_msgs_per_subject) stream_max_messages_per_subject: 1000 # Unlimited (default) stream_max_age: 0 stream_max_bytes: null stream_max_messages: null stream_max_messages_per_subject: null
Tested by:
testReadmeStreamRetentionExamplesAreAcceptedβ all retention options above are verified. Behat scenariosnats_stream_limits.feature.
Note:
stream_max_messageslimits the total number of messages stored in the stream (maps to NATSmax_msgs), whilestream_max_messages_per_subjectlimits messages retained per individual subject (maps to NATSmax_msgs_per_subject). The per-subject limit is especially useful with multi-subject streams to prevent one high-volume subject from dominating retention.
High Availability
options: # Single replica (no redundancy) stream_replicas: 1 # 3 replicas (recommended for production) stream_replicas: 3
Tested by:
testReadmeStreamRetentionExamplesAreAccepted(replicas 1 and 3),testSetupPassesConfiguredStreamOptions
Testing
Unit Tests
# Install dependencies composer install --dev # Run static analysis and the fast unit suite after every modification composer test # Run NATS composer nats:start # Run all unit tests with coverage (recommended) composer test:unit # Or run tests manually ./vendor/bin/phpunit
Verification note: This block documents the supported contributor workflow. The same
composer testandcomposer test:unitcommands are used to verify changes in this repository.
The target is to have at least 90% of code coverage.
What's tested:
- DSN parsing and validation
- Configuration option handling
- Authentication support
- Port configuration
- Error handling
- Interface compliance
Functional Tests
Functional tests require a running NATS server with JetStream enabled:
# Set up functional test dependencies composer test:functional:setup # Start NATS server in Docker composer nats:start # Run functional tests composer test:functional # Stop NATS server composer nats:stop
Verification note: This is the scripted functional test workflow used for the transport's end-to-end verification.
Manual approach:
# Set up NATS in Docker (optional) cd tests/nats docker-compose up -d # Run functional tests cd ../functional ./vendor/bin/behat features/ # Stop NATS cd ../nats docker-compose down
Operational note: This manual Docker/Behat flow mirrors the scripted functional commands above and is not asserted separately by the package tests.
What's tested:
- Message publishing
- Message consumption
- Message acknowledgment
- Consumer setup
- Stream persistence
See also: tests/functional/README.md
Advanced Usage
Multiple Transports
Set up multiple independent transports for different use cases:
framework: messenger: transports: # High-priority, low-latency messages nats_fast: dsn: 'nats-jetstream://localhost/fast-stream/fast-topic' options: consumer: 'fast-consumer' batching: 1 # Bulk processing, high throughput nats_bulk: dsn: 'nats-jetstream://localhost/bulk-stream/bulk-topic' options: consumer: 'bulk-consumer' batching: 50 # Audit logging nats_audit: dsn: 'nats-jetstream://localhost/audit-stream/audit-topic' options: consumer: 'audit-consumer' stream_max_age: 2592000 # 30 days stream_replicas: 3
Tested by:
testReadmeDsnExamplesParseSuccessfully[README: fast transport],testReadmeDsnExamplesParseSuccessfully[README: bulk transport],testReadmeDsnExamplesParseSuccessfully[README: audit transport],testReadmeAuditTransportOptionsAreAccepted,testReadmeBatchingExamplesAreAccepted
Multi-Subject Streams
Multiple transports can share the same NATS stream with different subjects. When messenger:setup-transports runs, each transport adds its subject to the existing stream rather than overwriting it:
framework: messenger: transports: # Both transports share the "events" stream nats_orders: dsn: 'nats-jetstream://localhost/events/orders' options: consumer: 'order-consumer' delay: 0.5 batching: 1 stream_max_age: 300 nats_payments: dsn: 'nats-jetstream://localhost/events/payments' options: consumer: 'payment-consumer' delay: 1 batching: 2
The events stream will have both orders and payments as subjects.
Tested by:
testReadmeDsnExamplesParseSuccessfully[README: multi-subject orders],testReadmeDsnExamplesParseSuccessfully[README: multi-subject payments],testReadmeMultiSubjectOptionsAreAccepted,testSetupUpdatesExistingStreamMergesSubjectsAndPreservesServerConfig, Behat scenarioSetup command merges subjects for transports sharing one stream
Note: When a stream already exists, setup reads the current JetStream configuration, merges in any new subjects, and then overlays the stream settings managed by this transport. Existing subjects are preserved, duplicate subjects are not added, and the existing storage backend is kept for already-created streams.
Setup on Initialization
Automatically create streams and consumers on first run:
framework: messenger: transports: nats_transport: dsn: 'nats-jetstream://localhost/my-stream/my-topic' options: consumer: 'my-consumer'
Then call setup command:
symfony console messenger:setup-transports nats_transport
Tested by:
testSetupCreatesStreamAndConsumer,testSetupPassesConfiguredStreamOptions,testSetupUpdatesExistingStreamMergesSubjectsAndPreservesServerConfig, Behat scenariosSetup NATS stream with max age configuration,Setup command handles existing streams gracefully, andCustom consumer name is registered in JetStream
Delayed / Scheduled Messages
Requires NATS Server >= 2.12 with JetStream enabled.
Enable scheduled_messages in the DSN to use Symfony's DelayStamp for deferred delivery:
framework: messenger: transports: nats_transport: dsn: 'nats-jetstream://localhost/my-stream/my-topic?scheduled_messages=true'
Then dispatch messages with a delay:
use Symfony\Component\Messenger\Stamp\DelayStamp; // Deliver after 30 seconds $bus->dispatch(new MyMessage(), [new DelayStamp(30000)]);
Tested by:
testSendWithDelayStampPublishesToDelayedSubjectWithScheduleHeaders,testReadmeScheduledMessagesDsnEnablesFeature, Behat scenarioDelayed messages are delivered after the scheduled time
When scheduled_messages is enabled and a DelayStamp is present:
- The message is published to a
{topic}.delayed.{uuid}subject withNats-ScheduleandNats-Schedule-Targetheaders - The stream is created with an additional
{topic}.delayed.>subject andallow_msg_schedulesenabled - NATS holds the message and delivers it to the original topic at the scheduled time
- The consumer processes it like any other message
When scheduled_messages is disabled (the default), any DelayStamp on the envelope is silently ignored and messages are published immediately.
This will:
- Create the stream with configured settings
- Create the consumer with explicit ACK policy
- Verify consumer creation
Stream Monitoring
View stream and consumer information:
# List streams nats stream list # View stream info nats stream info my-stream # List consumers nats consumer list my-stream # View consumer info nats consumer info my-stream my-consumer # View message count nats consumer info my-stream my-consumer --json | jq '.state.num_pending'
Operational note: These are NATS CLI inspection commands, so this package does not assert their exact textual output directly. The underlying stream, consumer, and message-count state is covered by Behat scenarios
Setup NATS stream with max age configuration,Custom consumer name is registered in JetStream,Complete message flow - send, check stats, consume, verify, and thegetMessageCount()unit tests.
Manual Message Operations
use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\Transport\TransportInterface; // Get message count $count = $transport->getMessageCount(); // Check if messages are pending if ($count > 0) { echo "Pending messages: $count"; }
Tested by:
testGetMessageCountReturnsConsumerPendingMessages,testGetMessageCountFallsBackToStreamState,testGetMessageCountReturnsZeroWhenLookupsFail,testGetMessageCountReturnsAckPendingWhenHigherThanPending
Troubleshooting
Connection Issues
Error: "Connection refused"
# Check NATS is running nats-server --js # Verify host and port nats-jetstream://localhost:4222/stream/topic
Tested by: Behat scenario
Setup command fails gracefully when NATS is unavailable
Error: "Stream not found"
# Run setup command to create stream
symfony console messenger:setup-transports nats_transport
Tested by:
testSetupCreatesStreamAndConsumer,testSetupUpdatesStreamWhenItAlreadyExists, Behat scenariosSetup NATS stream with max age configurationandSetup command handles existing streams gracefully
Message Processing Issues
Messages not being consumed
# Check consumer exists nats consumer list my-stream # View consumer status nats consumer info my-stream my-consumer # Check for errors in consumer nats consumer info my-stream my-consumer --json | jq '.state'
Operational note: These are manual diagnosis commands. The actual consume path is covered by Behat scenarios
Complete message flow - send, check stats, consume, verify,Send and consume messages with a custom consumer name, andPartial message consumption with multiple consumers.
Messages stuck in pending
# Check handler is not throwing exceptions # Verify handler implementation # Check application logs for errors
Operational note: This checklist is manual guidance. Pending-count behavior is covered by
testGetMessageCountReturnsConsumerPendingMessages,testGetMessageCountFallsBackToStreamState,testGetMessageCountReturnsAckPendingWhenHigherThanPending, and Behat scenarioComplete message flow - send, check stats, consume, verify.
Architecture
The bridge consists of two main components:
NatsTransportFactory
- Handles DSN scheme detection (
nats-jetstream://) - Creates
NatsTransportinstances - Validates configuration
NatsTransport
- Implements Symfony's
TransportInterface - Manages stream and consumer connections
- Handles message serialization (igbinary)
- Supports batching and explicit acknowledgment
Performance Tips
-
Choose appropriate batching
- Start with
batching: 5for balanced performance - Increase to 20+ for high throughput workloads
- Use 1 for strict low-latency requirements
- Start with
-
Set reasonable timeouts
max_batch_timeout: 0.5for responsive systemsmax_batch_timeout: 2.0for background jobsconnection_timeout: 1.0for local/regional deploymentsconnection_timeout: 3.0+for cross-region or high-latency networks
-
Use appropriate replicas
stream_replicas: 1for developmentstream_replicas: 3for production
-
Monitor performance
- Use
getMessageCount()to track queue depth - Monitor handler execution time
- Watch for stuck messages
- Use
Security Considerations
β οΈ Deserialization of Untrusted Data
The default IgbinarySerializer (and any serializer extending AbstractEnveloperSerializer) deserializes raw message payloads from NATS into PHP objects. PHP object unserialization is a well-known attack vector β a crafted payload can trigger arbitrary code execution via magic methods (__wakeup, __destruct, etc.).
If your NATS topics are not fully trusted (e.g. shared infrastructure, external publishers), you should:
- Implement a custom serializer that uses a safe format (JSON, Protobuf) instead of PHP object serialization
- Add message-level authentication (e.g. HMAC signatures) to verify publisher identity before deserializing
- Restrict NATS topic publish permissions via ACLs so only trusted services can publish
The type check (instanceof Envelope) happens after deserialization, which is too late to prevent exploitation.
Stream-Exists Detection During Setup
During setup(), the transport prefers explicit NATS conflict messages (for example "already in use" or "already exists") to detect a pre-existing stream. When NATS returns a generic HTTP 400, the transport now verifies whether the stream actually exists before switching to updateStream(). This avoids misclassifying unrelated bad-request errors as an existing-stream conflict.
If you experience unexpected behavior during stream setup, review the exact error returned by your NATS server version and confirm the stream can be queried via JetStream stream info APIs.
Publish Response Validation
When JetStream publish acknowledgements are received through the header-aware request path, the transport parses the response as JSON and throws an exception if JetStream reports an error or the response is not valid JSON. This makes proxy or protocol misconfiguration fail closed instead of silently accepting an invalid publish acknowledgement.
General Recommendations
-
Authentication
- Prefer environment variables or explicit options for credentials over hard-coded DSNs
- If you use credentials in a DSN, avoid logging the full DSN because it may expose secrets
- Store credentials in environment variables
- Never commit credentials to version control
-
Message Encryption
- Encrypt sensitive data before dispatching
- NATS can be configured with TLS for transit encryption
- Implement application-level encryption for sensitive payloads
-
Access Control
- Restrict stream/consumer creation to authorized users
- Use NATS access control lists (ACLs) for fine-grained permissions
- Audit stream operations
Contributing
Contributions are welcome! Please ensure:
- Every modification runs the relevant verification commands before it is considered done
- Minimum verification for PHP changes:
composer test - All tests pass:
composer test:unit - Code coverage remains above 90%
- New features include corresponding tests
- Documentation is updated
- Functional tests pass:
composer test:functional(if applicable) docs/TESTS.mdis kept up to date when tests are added, removed, or renamed- Each release has an entry in
docs/CHANGELOG.mdfollowing Keep a Changelog format - When a PR is merged or its features are adapted, a description is added to
docs/PRs/
Quick Development Workflow
# 1. Run static analysis and the default unit suite after each modification composer test # 2. Set up functional tests (first time only) composer test:functional:setup # 3. Start NATS for functional tests composer nats:start # 4. Run functional tests composer test:functional # 5. Clean up composer nats:stop
Verification note: This is the exact end-to-end verification sequence required for repository changes and the same sequence used for this task.
License
MIT License - see LICENSE file for details
Support
For issues, questions, or suggestions:
- Check the troubleshooting section
- Check existing issues on GitHub
- Create a new issue with detailed information
π Love the project? Support it! π
- πͺ BTC: bc1qntms755swm3nplsjpllvx92u8wdzrvs474a0hr
- π ETH: 0x08E27250c91540911eD27F161572aFA53Ca24C0a
- β‘ TRX: TVXWaU4ScNV9RBYX5RqFmySuB4zF991QaE
- π LTC: LN5ApP1Yhk4iU9Bo1tLU8eHX39zDzzyZxB
- β Buy me a coffee: https://buymeacoffee.com/idct
- π Sponsor: https://github.com/sponsors/ideaconnect