broker API

broker

package

API reference for the broker package.

I
interface

NativeRetrier

NativeRetrier is implemented by brokers that handle retries natively.
The manager skips its own retry loop for brokers that implement this interface.

pkg/broker/retrier.go:5-7
type NativeRetrier interface

Methods

Returns

bool
func HandlesRetries(...)
S
struct

MemoryBroker

pkg/broker/memory.go:8-11
type MemoryBroker struct

Methods

Publish
Method

Parameters

topic string
payload []byte

Returns

error
func (*MemoryBroker) Publish(ctx context.Context, topic string, payload []byte) error
{
	m.subMu.RLock()
	handlers := m.subs[topic]
	m.subMu.RUnlock()

	for _, h := range handlers {
		go func(fn func([]byte) error) {
			_ = fn(payload)
		}(h)
	}
	return nil
}
Subscribe
Method

Parameters

topic string
handler func([]byte) error

Returns

error
func (*MemoryBroker) Subscribe(topic string, handler func([]byte) error) error
{
	m.subMu.Lock()
	defer m.subMu.Unlock()
	m.subs[topic] = append(m.subs[topic], handler)
	return nil
}

Fields

Name Type Description
subs map[string][]func([]byte) error
subMu sync.RWMutex
F
function

NewMemoryBroker

Returns

pkg/broker/memory.go:13-17
func NewMemoryBroker() *MemoryBroker

{
	return &MemoryBroker{
		subs: make(map[string][]func([]byte) error),
	}
}
T
type

NATSBrokerOption

NATSBrokerOption is a functional option for NATSBroker.

pkg/broker/nats.go:17-17
type NATSBrokerOption options.Option[NATSBroker]
S
struct
Implements: NativeRetrier

NATSBroker

NATSBroker implements the Broker interface using NATS JetStream with
auto stream creation, durable competing consumers, and configurable delivery.

pkg/broker/nats.go:21-26
type NATSBroker struct

Methods

HandlesRetries returns true, indicating that NATSBroker manages delivery retries internally via JetStream re-delivery and consumer configuration.

Returns

bool
func (*NATSBroker) HandlesRetries() bool
{ return true }
Publish
Method

Publish publishes payload to the NATS JetStream subject.

Parameters

topic string
payload []byte

Returns

error
func (*NATSBroker) Publish(ctx context.Context, topic string, payload []byte) error
{
	_, err := b.js.Publish(ctx, topic, payload)
	return err
}
Subscribe
Method

Subscribe creates or updates a stream for the topic, creates a durable competing consumer, and starts consuming messages asynchronously.

Parameters

topic string
handler func([]byte) error

Returns

error
func (*NATSBroker) Subscribe(topic string, handler func([]byte) error) error
{
	ctx := context.Background()
	streamName := sanitizeStreamName(topic)

	_, err := b.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
		Name:     streamName,
		Subjects: []string{topic},
	})
	if err != nil {
		return fmt.Errorf("create stream: %w", err)
	}

	consumer, err := b.js.CreateOrUpdateConsumer(ctx, streamName, jetstream.ConsumerConfig{
		Durable:       b.group,
		FilterSubject: topic,
		AckPolicy:     jetstream.AckExplicitPolicy,
		AckWait:       b.ackWait,
		MaxDeliver:    b.maxDeliver,
	})
	if err != nil {
		return fmt.Errorf("create consumer: %w", err)
	}

	_, err = consumer.Consume(func(msg jetstream.Msg) {
		if err := handler(msg.Data()); err != nil {
			_ = msg.Nak()
			return
		}
		_ = msg.Ack()
	})
	return err
}

Fields

Name Type Description
js jetstream.JetStream
group string
maxDeliver int
ackWait time.Duration
F
function

NewNATSBroker

NewNATSBroker creates a new NATSBroker backed by NATS JetStream.

Parameters

opts
...NATSBrokerOption

Returns

pkg/broker/nats.go:29-38
func NewNATSBroker(js jetstream.JetStream, opts ...NATSBrokerOption) *NATSBroker

{
	b := &NATSBroker{
		js:         js,
		group:      "go-relay",
		maxDeliver: 3,
		ackWait:    30 * time.Second,
	}
	options.Apply(b, opts...)
	return b
}
F
function

WithNATSGroup

WithNATSGroup sets the durable consumer group name.

Parameters

group
string
pkg/broker/nats.go:41-45
func WithNATSGroup(group string) NATSBrokerOption

{
	return func(b *NATSBroker) {
		b.group = group
	}
}
F
function

WithMaxDeliver

WithMaxDeliver sets the maximum number of delivery attempts before NATS
moves the message to an advisory dead-letter subject.

Parameters

n
int
pkg/broker/nats.go:49-53
func WithMaxDeliver(n int) NATSBrokerOption

{
	return func(b *NATSBroker) {
		b.maxDeliver = n
	}
}
F
function

WithAckWait

WithAckWait sets the duration NATS waits for an ack before redelivering.

Parameters

pkg/broker/nats.go:56-60
func WithAckWait(d time.Duration) NATSBrokerOption

{
	return func(b *NATSBroker) {
		b.ackWait = d
	}
}
F
function

sanitizeStreamName

Parameters

topic
string

Returns

string
pkg/broker/nats.go:103-118
func sanitizeStreamName(topic string) string

{
	sanitized := strings.Map(func(r rune) rune {
		if unicode.IsLetter(r) || unicode.IsDigit(r) || r == '-' || r == '_' {
			return r
		}
		return '_'
	}, topic)

	// Prevent collisions by appending a hash suffix for topics with special chars
	if sanitized != topic {
		hash := sha256.Sum256([]byte(topic))
		suffix := hex.EncodeToString(hash[:])[:8]
		return sanitized + "_" + suffix
	}
	return sanitized
}
T
type

RedisBrokerOption

RedisBrokerOption is a functional option for RedisBroker.

pkg/broker/redis.go:14-14
type RedisBrokerOption options.Option[RedisBroker]
S
struct
Implements: NativeRetrier

RedisBroker

RedisBroker implements the Broker interface using Redis Streams with
consumer groups, dead-letter support, and automatic stale-message reclaim.

pkg/broker/redis.go:18-27
type RedisBroker struct

Methods

HandlesRetries returns true, indicating that RedisBroker manages delivery retries and dead-lettering internally via Redis Streams consumer groups.

Returns

bool
func (*RedisBroker) HandlesRetries() bool
{ return true }
Publish
Method

Publish writes payload to the Redis Stream for the given topic via XADD.

Parameters

topic string
payload []byte

Returns

error
func (*RedisBroker) Publish(ctx context.Context, topic string, payload []byte) error
{
	return r.client.XAdd(ctx, &redis.XAddArgs{
		Stream: topic,
		Values: map[string]any{"payload": payload},
	}).Err()
}
Subscribe
Method

Subscribe ensures the consumer group exists, then starts a goroutine that reads messages from the stream via XREADGROUP.

Parameters

topic string
handler func([]byte) error

Returns

error
func (*RedisBroker) Subscribe(topic string, handler func([]byte) error) error
{
	err := r.client.XGroupCreateMkStream(r.ctx, topic, r.group, "$").Err()
	if err != nil && !isGroupExistsErr(err) {
		return fmt.Errorf("create consumer group: %w", err)
	}
	go r.readLoop(topic, handler)
	return nil
}
readLoop
Method

Parameters

topic string
handler func([]byte) error
func (*RedisBroker) readLoop(topic string, handler func([]byte) error)
{
	dead := topic + ".dead"
	r.reclaimPending(topic, dead, handler)

	for {
		select {
		case <-r.ctx.Done():
			return
		default:
		}

		streams, err := r.client.XReadGroup(r.ctx, &redis.XReadGroupArgs{
			Group:    r.group,
			Consumer: r.consumer,
			Streams:  []string{topic, ">"},
			Count:    10,
			Block:    r.blockTimeout,
		}).Result()
		if err != nil {
			if r.ctx.Err() != nil {
				return
			}
			continue
		}

		for _, stream := range streams {
			for _, msg := range stream.Messages {
				r.processMessage(topic, dead, msg, handler)
			}
		}
	}
}

Parameters

topic string
dead string
msg redis.XMessage
handler func([]byte) error
func (*RedisBroker) processMessage(topic, dead string, msg redis.XMessage, handler func([]byte) error)
{
	payload := extractPayload(msg)

	if err := handler(payload); err == nil {
		r.client.XAck(r.ctx, topic, r.group, msg.ID)
		return
	}

	pending, err := r.client.XPendingExt(r.ctx, &redis.XPendingExtArgs{
		Stream:   topic,
		Group:    r.group,
		Start:    msg.ID,
		End:      msg.ID,
		Count:    1,
		Consumer: r.consumer,
	}).Result()
	if err != nil || len(pending) == 0 {
		return
	}

	if int(pending[0].RetryCount) >= r.maxRetries {
		r.client.XAdd(r.ctx, &redis.XAddArgs{
			Stream: dead,
			Values: map[string]any{"payload": payload, "original_id": msg.ID},
		})
		r.client.XAck(r.ctx, topic, r.group, msg.ID)
	}
}

Parameters

topic string
dead string
handler func([]byte) error
func (*RedisBroker) reclaimPending(topic, dead string, handler func([]byte) error)
{
	start := "0-0"
	for {
		msgs, next, err := r.client.XAutoClaim(r.ctx, &redis.XAutoClaimArgs{
			Stream:   topic,
			Group:    r.group,
			Consumer: r.consumer,
			MinIdle:  r.claimTimeout,
			Start:    start,
			Count:    10,
		}).Result()
		if err != nil {
			return
		}
		for _, msg := range msgs {
			r.processMessage(topic, dead, msg, handler)
		}
		if next == "0-0" || next == "" || len(msgs) < 10 {
			return
		}
		start = next
	}
}

Fields

Name Type Description
client *redis.Client
group string
consumer string
maxRetries int
claimTimeout time.Duration
blockTimeout time.Duration
ctx context.Context
cancel context.CancelFunc
F
function

NewRedisBroker

NewRedisBroker creates a new RedisBroker backed by Redis Streams.

Parameters

client
*redis.Client
opts
...RedisBrokerOption

Returns

pkg/broker/redis.go:30-45
func NewRedisBroker(client *redis.Client, opts ...RedisBrokerOption) *RedisBroker

{
	ctx, cancel := context.WithCancel(context.Background())
	hostname, _ := os.Hostname()
	b := &RedisBroker{
		client:       client,
		group:        "go-relay",
		consumer:     fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano()),
		maxRetries:   3,
		claimTimeout: 30 * time.Second,
		blockTimeout: 2 * time.Second,
		ctx:          ctx,
		cancel:       cancel,
	}
	options.Apply(b, opts...)
	return b
}
F
function

WithGroup

WithGroup sets the consumer group name.

Parameters

group
string
pkg/broker/redis.go:48-52
func WithGroup(group string) RedisBrokerOption

{
	return func(b *RedisBroker) {
		b.group = group
	}
}
F
function

WithRedisMaxRetries

WithRedisMaxRetries sets the maximum delivery attempts before moving a
message to the dead-letter stream.

Parameters

n
int
pkg/broker/redis.go:56-60
func WithRedisMaxRetries(n int) RedisBrokerOption

{
	return func(b *RedisBroker) {
		b.maxRetries = n
	}
}
F
function

WithClaimTimeout

WithClaimTimeout sets the idle duration after which pending messages are
reclaimed from dead workers on startup.

Parameters

pkg/broker/redis.go:64-68
func WithClaimTimeout(d time.Duration) RedisBrokerOption

{
	return func(b *RedisBroker) {
		b.claimTimeout = d
	}
}
F
function

WithBlockTimeout

WithBlockTimeout sets the blocking read duration for XREADGROUP.

Parameters

pkg/broker/redis.go:71-75
func WithBlockTimeout(d time.Duration) RedisBrokerOption

{
	return func(b *RedisBroker) {
		b.blockTimeout = d
	}
}
F
function

extractPayload

Parameters

msg
redis.XMessage

Returns

[]byte
pkg/broker/redis.go:182-191
func extractPayload(msg redis.XMessage) []byte

{
	raw := msg.Values["payload"]
	switch v := raw.(type) {
	case string:
		return []byte(v)
	case []byte:
		return v
	}
	return nil
}
F
function

isGroupExistsErr

Parameters

err
error

Returns

bool
pkg/broker/redis.go:193-195
func isGroupExistsErr(err error) bool

{
	return err != nil && err.Error() == "BUSYGROUP Consumer Group name already exists"
}