broker
packageAPI reference for the broker
package.
Imports
(12)NativeRetrier
NativeRetrier is implemented by brokers that handle retries natively.
The manager skips its own retry loop for brokers that implement this interface.
type NativeRetrier interface
Methods
MemoryBroker
type MemoryBroker struct
Methods
Parameters
Returns
func (*MemoryBroker) Publish(ctx context.Context, topic string, payload []byte) error
{
m.subMu.RLock()
handlers := m.subs[topic]
m.subMu.RUnlock()
for _, h := range handlers {
go func(fn func([]byte) error) {
_ = fn(payload)
}(h)
}
return nil
}
Parameters
Returns
func (*MemoryBroker) Subscribe(topic string, handler func([]byte) error) error
{
m.subMu.Lock()
defer m.subMu.Unlock()
m.subs[topic] = append(m.subs[topic], handler)
return nil
}
Fields
| Name | Type | Description |
|---|---|---|
| subs | map[string][]func([]byte) error | |
| subMu | sync.RWMutex |
NewMemoryBroker
Returns
func NewMemoryBroker() *MemoryBroker
{
return &MemoryBroker{
subs: make(map[string][]func([]byte) error),
}
}
NATSBrokerOption
NATSBrokerOption is a functional option for NATSBroker.
type NATSBrokerOption options.Option[NATSBroker]
NATSBroker
NATSBroker implements the Broker interface using NATS JetStream with
auto stream creation, durable competing consumers, and configurable delivery.
type NATSBroker struct
Methods
HandlesRetries returns true, indicating that NATSBroker manages delivery retries internally via JetStream re-delivery and consumer configuration.
Returns
func (*NATSBroker) HandlesRetries() bool
{ return true }
Publish publishes payload to the NATS JetStream subject.
Parameters
Returns
func (*NATSBroker) Publish(ctx context.Context, topic string, payload []byte) error
{
_, err := b.js.Publish(ctx, topic, payload)
return err
}
Subscribe creates or updates a stream for the topic, creates a durable competing consumer, and starts consuming messages asynchronously.
Parameters
Returns
func (*NATSBroker) Subscribe(topic string, handler func([]byte) error) error
{
ctx := context.Background()
streamName := sanitizeStreamName(topic)
_, err := b.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: []string{topic},
})
if err != nil {
return fmt.Errorf("create stream: %w", err)
}
consumer, err := b.js.CreateOrUpdateConsumer(ctx, streamName, jetstream.ConsumerConfig{
Durable: b.group,
FilterSubject: topic,
AckPolicy: jetstream.AckExplicitPolicy,
AckWait: b.ackWait,
MaxDeliver: b.maxDeliver,
})
if err != nil {
return fmt.Errorf("create consumer: %w", err)
}
_, err = consumer.Consume(func(msg jetstream.Msg) {
if err := handler(msg.Data()); err != nil {
_ = msg.Nak()
return
}
_ = msg.Ack()
})
return err
}
Fields
| Name | Type | Description |
|---|---|---|
| js | jetstream.JetStream | |
| group | string | |
| maxDeliver | int | |
| ackWait | time.Duration |
NewNATSBroker
NewNATSBroker creates a new NATSBroker backed by NATS JetStream.
Parameters
Returns
func NewNATSBroker(js jetstream.JetStream, opts ...NATSBrokerOption) *NATSBroker
{
b := &NATSBroker{
js: js,
group: "go-relay",
maxDeliver: 3,
ackWait: 30 * time.Second,
}
options.Apply(b, opts...)
return b
}
WithNATSGroup
WithNATSGroup sets the durable consumer group name.
Parameters
Returns
func WithNATSGroup(group string) NATSBrokerOption
{
return func(b *NATSBroker) {
b.group = group
}
}
WithMaxDeliver
WithMaxDeliver sets the maximum number of delivery attempts before NATS
moves the message to an advisory dead-letter subject.
Parameters
Returns
func WithMaxDeliver(n int) NATSBrokerOption
{
return func(b *NATSBroker) {
b.maxDeliver = n
}
}
WithAckWait
WithAckWait sets the duration NATS waits for an ack before redelivering.
Parameters
Returns
func WithAckWait(d time.Duration) NATSBrokerOption
{
return func(b *NATSBroker) {
b.ackWait = d
}
}
sanitizeStreamName
Parameters
Returns
func sanitizeStreamName(topic string) string
{
sanitized := strings.Map(func(r rune) rune {
if unicode.IsLetter(r) || unicode.IsDigit(r) || r == '-' || r == '_' {
return r
}
return '_'
}, topic)
// Prevent collisions by appending a hash suffix for topics with special chars
if sanitized != topic {
hash := sha256.Sum256([]byte(topic))
suffix := hex.EncodeToString(hash[:])[:8]
return sanitized + "_" + suffix
}
return sanitized
}
RedisBrokerOption
RedisBrokerOption is a functional option for RedisBroker.
type RedisBrokerOption options.Option[RedisBroker]
RedisBroker
RedisBroker implements the Broker interface using Redis Streams with
consumer groups, dead-letter support, and automatic stale-message reclaim.
type RedisBroker struct
Methods
HandlesRetries returns true, indicating that RedisBroker manages delivery retries and dead-lettering internally via Redis Streams consumer groups.
Returns
func (*RedisBroker) HandlesRetries() bool
{ return true }
Publish writes payload to the Redis Stream for the given topic via XADD.
Parameters
Returns
func (*RedisBroker) Publish(ctx context.Context, topic string, payload []byte) error
{
return r.client.XAdd(ctx, &redis.XAddArgs{
Stream: topic,
Values: map[string]any{"payload": payload},
}).Err()
}
Subscribe ensures the consumer group exists, then starts a goroutine that reads messages from the stream via XREADGROUP.
Parameters
Returns
func (*RedisBroker) Subscribe(topic string, handler func([]byte) error) error
{
err := r.client.XGroupCreateMkStream(r.ctx, topic, r.group, "$").Err()
if err != nil && !isGroupExistsErr(err) {
return fmt.Errorf("create consumer group: %w", err)
}
go r.readLoop(topic, handler)
return nil
}
Parameters
func (*RedisBroker) readLoop(topic string, handler func([]byte) error)
{
dead := topic + ".dead"
r.reclaimPending(topic, dead, handler)
for {
select {
case <-r.ctx.Done():
return
default:
}
streams, err := r.client.XReadGroup(r.ctx, &redis.XReadGroupArgs{
Group: r.group,
Consumer: r.consumer,
Streams: []string{topic, ">"},
Count: 10,
Block: r.blockTimeout,
}).Result()
if err != nil {
if r.ctx.Err() != nil {
return
}
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
r.processMessage(topic, dead, msg, handler)
}
}
}
}
Parameters
func (*RedisBroker) processMessage(topic, dead string, msg redis.XMessage, handler func([]byte) error)
{
payload := extractPayload(msg)
if err := handler(payload); err == nil {
r.client.XAck(r.ctx, topic, r.group, msg.ID)
return
}
pending, err := r.client.XPendingExt(r.ctx, &redis.XPendingExtArgs{
Stream: topic,
Group: r.group,
Start: msg.ID,
End: msg.ID,
Count: 1,
Consumer: r.consumer,
}).Result()
if err != nil || len(pending) == 0 {
return
}
if int(pending[0].RetryCount) >= r.maxRetries {
r.client.XAdd(r.ctx, &redis.XAddArgs{
Stream: dead,
Values: map[string]any{"payload": payload, "original_id": msg.ID},
})
r.client.XAck(r.ctx, topic, r.group, msg.ID)
}
}
Parameters
func (*RedisBroker) reclaimPending(topic, dead string, handler func([]byte) error)
{
start := "0-0"
for {
msgs, next, err := r.client.XAutoClaim(r.ctx, &redis.XAutoClaimArgs{
Stream: topic,
Group: r.group,
Consumer: r.consumer,
MinIdle: r.claimTimeout,
Start: start,
Count: 10,
}).Result()
if err != nil {
return
}
for _, msg := range msgs {
r.processMessage(topic, dead, msg, handler)
}
if next == "0-0" || next == "" || len(msgs) < 10 {
return
}
start = next
}
}
Fields
| Name | Type | Description |
|---|---|---|
| client | *redis.Client | |
| group | string | |
| consumer | string | |
| maxRetries | int | |
| claimTimeout | time.Duration | |
| blockTimeout | time.Duration | |
| ctx | context.Context | |
| cancel | context.CancelFunc |
NewRedisBroker
NewRedisBroker creates a new RedisBroker backed by Redis Streams.
Parameters
Returns
func NewRedisBroker(client *redis.Client, opts ...RedisBrokerOption) *RedisBroker
{
ctx, cancel := context.WithCancel(context.Background())
hostname, _ := os.Hostname()
b := &RedisBroker{
client: client,
group: "go-relay",
consumer: fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano()),
maxRetries: 3,
claimTimeout: 30 * time.Second,
blockTimeout: 2 * time.Second,
ctx: ctx,
cancel: cancel,
}
options.Apply(b, opts...)
return b
}
WithGroup
WithGroup sets the consumer group name.
Parameters
Returns
func WithGroup(group string) RedisBrokerOption
{
return func(b *RedisBroker) {
b.group = group
}
}
WithRedisMaxRetries
WithRedisMaxRetries sets the maximum delivery attempts before moving a
message to the dead-letter stream.
Parameters
Returns
func WithRedisMaxRetries(n int) RedisBrokerOption
{
return func(b *RedisBroker) {
b.maxRetries = n
}
}
WithClaimTimeout
WithClaimTimeout sets the idle duration after which pending messages are
reclaimed from dead workers on startup.
Parameters
Returns
func WithClaimTimeout(d time.Duration) RedisBrokerOption
{
return func(b *RedisBroker) {
b.claimTimeout = d
}
}
WithBlockTimeout
WithBlockTimeout sets the blocking read duration for XREADGROUP.
Parameters
Returns
func WithBlockTimeout(d time.Duration) RedisBrokerOption
{
return func(b *RedisBroker) {
b.blockTimeout = d
}
}
extractPayload
Parameters
Returns
func extractPayload(msg redis.XMessage) []byte
{
raw := msg.Values["payload"]
switch v := raw.(type) {
case string:
return []byte(v)
case []byte:
return v
}
return nil
}
isGroupExistsErr
Parameters
Returns
func isGroupExistsErr(err error) bool
{
return err != nil && err.Error() == "BUSYGROUP Consumer Group name already exists"
}