manager API

manager

package

API reference for the manager package.

S
struct

Job

Job represents a unit of work dispatched through a Relay.

pkg/manager/manager.go:16-23
type Job struct

Fields

Name Type Description
ID string
Queue string
Topic string
Payload []byte
CreatedAt time.Time
TryCount int
T
type

Handler

Handler is the function signature for topic message handlers.

pkg/manager/manager.go:26-26
type Handler func(ctx context.Context, payload T) error
I
interface

Broker

Broker is the messaging abstraction used by Relay to publish and receive messages.

pkg/manager/manager.go:29-32
type Broker interface

Methods

Publish
Method

Parameters

topic string
payload []byte

Returns

error
func Publish(...)
Subscribe
Method

Parameters

topic string
handler func(payload []byte) error

Returns

error
func Subscribe(...)
S
struct

Relay

Relay routes typed messages between producers and consumers using a pluggable Broker.

pkg/manager/manager.go:35-42
type Relay struct

Methods

Start
Method

Start stores ctx so handlers can observe cancellation, subscribes all registered handlers to the Broker, and blocks until ctx is cancelled.

Parameters

Returns

error
func (*Relay) Start(ctx context.Context) error
{
	r.ctxMu.Lock()
	r.ctx = ctx
	r.ctxMu.Unlock()

	r.handlerMu.RLock()
	handlers := make(map[string]func([]byte) error, len(r.handlers))
	for topic, wrapperFn := range r.handlers {
		handlers[topic] = wrapperFn.(func([]byte) error)
	}
	r.handlerMu.RUnlock()

	for topic, userHandler := range handlers {
		var subscribeFn func([]byte) error
		if nr, ok := r.broker.(broker.NativeRetrier); (!ok || !nr.HandlesRetries()) && r.maxRetries > 0 {
			subscribeFn = r.retryHandler(topic, userHandler)
		} else {
			topic := topic
			subscribeFn = func(data []byte) error {
				defer func() {
					if rec := recover(); rec != nil {
						fmt.Fprintf(os.Stderr, "panic in job %s: %v\n", topic, rec)
					}
				}()
				return userHandler(data)
			}
		}

		if err := r.broker.Subscribe(topic, subscribeFn); err != nil {
			return err
		}
	}

	<-ctx.Done()
	return nil
}
retryHandler
Method

Parameters

topic string
userHandler func([]byte) error

Returns

func([]byte) error
func (*Relay) retryHandler(topic string, userHandler func([]byte) error) func([]byte) error
{
	return func(data []byte) error {
		defer func() {
			if rec := recover(); rec != nil {
				fmt.Fprintf(os.Stderr, "panic in job %s: %v\n", topic, rec)
			}
		}()

		job := Job{Topic: topic, Payload: data}
		var lastErr error
		for job.TryCount = 0; job.TryCount <= r.maxRetries; job.TryCount++ {
			if job.TryCount > 0 {
				time.Sleep(time.Duration(1<<uint(job.TryCount-1)) * 100 * time.Millisecond)
			}
			lastErr = userHandler(data)
			if lastErr == nil {
				return nil
			}
		}
		return lastErr
	}
}

Fields

Name Type Description
broker Broker
handlers map[string]any
handlerMu sync.RWMutex
ctx context.Context
ctxMu sync.RWMutex
maxRetries int
T
type

Option

Option configures a Relay instance.

pkg/manager/manager.go:45-45
type Option options.Option[Relay]
F
function

New

New creates a Relay with optional options. By default it uses an in-process
MemoryBroker suitable for testing and single-process use.

Parameters

opts
...Option

Returns

pkg/manager/manager.go:58-65
func New(opts ...Option) *Relay

{
	r := &Relay{
		broker:   broker.NewMemoryBroker(),
		handlers: make(map[string]any),
	}
	options.Apply(r, opts...)
	return r
}

Example

r := manager.New()
manager.Register(r, "emails", func(ctx context.Context, msg Email) error {
	return send(ctx, msg)
})
go r.Start(ctx)
manager.Enqueue(ctx, r, "emails", Email{To: "[email protected]"})
F
function

WithBroker

WithBroker configures the Relay to use b as its messaging backend.

Parameters

b

Returns

pkg/manager/manager.go:68-72
func WithBroker(b Broker) Option

{
	return func(r *Relay) {
		r.broker = b
	}
}
F
function

WithMaxRetries

WithMaxRetries sets the number of retry attempts for brokers that do not
handle retries natively (e.g. MemoryBroker). Retries use exponential backoff.

Parameters

n
int

Returns

pkg/manager/manager.go:76-80
func WithMaxRetries(n int) Option

{
	return func(r *Relay) {
		r.maxRetries = n
	}
}
F
function

Register

Register subscribes fn to handle messages of type T on topic. The handler
receives the context passed to Start, allowing cancellation to propagate
into handler logic.

Parameters

r
topic
string
fn
Handler[T]
pkg/manager/manager.go:85-104
func Register[T any](r *Relay, topic string, fn Handler[T])

{
	wrapper := func(raw []byte) error {
		r.ctxMu.RLock()
		ctx := r.ctx
		r.ctxMu.RUnlock()
		if ctx == nil {
			ctx = context.Background()
		}

		var payload T
		if err := json.Unmarshal(raw, &payload); err != nil {
			return fmt.Errorf("payload unmarshal failed: %w", err)
		}
		return fn(ctx, payload)
	}

	r.handlerMu.Lock()
	r.handlers[topic] = wrapper
	r.handlerMu.Unlock()
}
F
function

Enqueue

Enqueue serialises payload and publishes it to topic via the Relay’s Broker.

Parameters

r
topic
string
payload
T

Returns

error
pkg/manager/manager.go:114-121
func Enqueue[T any](ctx context.Context, r *Relay, topic string, payload T) error

{
	data, err := json.Marshal(payload)
	if err != nil {
		return fmt.Errorf("payload marshal failed: %w", err)
	}

	return r.broker.Publish(ctx, topic, data)
}

Example

err := manager.Enqueue(ctx, r, "orders", Order{ID: "abc", Amount: 9.99})
if err != nil {
	log.Fatal(err)
}
F
function

TestHandlerReceivesStartContext

TestHandlerReceivesStartContext verifies that handlers receive the context
passed to Start, not context.Background(). When Start’s context is cancelled,
an in-flight handler should observe the cancellation.

Parameters

pkg/manager/manager_test.go:13-67
func TestHandlerReceivesStartContext(t *testing.T)

{
	r := New()

	var receivedCtx context.Context
	var mu sync.Mutex

	Register(r, "test-topic", func(ctx context.Context, payload string) error {
		mu.Lock()
		receivedCtx = ctx
		mu.Unlock()
		return nil
	})

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	done := make(chan error, 1)
	go func() {
		done <- r.Start(ctx)
	}()

	// Publish a message and wait for handler to run.
	if err := Enqueue(ctx, r, "test-topic", "hello"); err != nil {
		t.Fatalf("Enqueue failed: %v", err)
	}

	// Give the handler time to run.
	time.Sleep(50 * time.Millisecond)

	mu.Lock()
	rc := receivedCtx
	mu.Unlock()

	if rc == nil {
		t.Fatal("handler was not called")
	}
	if rc == context.Background() {
		t.Fatal("handler received context.Background() instead of Start's context")
	}

	// Cancel Start's context and verify it is the same context the handler saw.
	cancel()
	select {
	case <-done:
	case <-time.After(time.Second):
		t.Fatal("Start did not return after context cancel")
	}

	// The handler's context should also be done now.
	select {
	case <-rc.Done():
	default:
		t.Fatal("handler context is not done after Start's context was cancelled")
	}
}
F
function

TestRegisterAfterStartDoesNotDeadlock

TestRegisterAfterStartDoesNotDeadlock verifies that calling Register after
Start does not block (i.e. handlerMu is not held for Start’s lifetime).

Parameters

pkg/manager/manager_test.go:71-111
func TestRegisterAfterStartDoesNotDeadlock(t *testing.T)

{
	r := New()

	Register(r, "initial-topic", func(ctx context.Context, payload string) error {
		return nil
	})

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	startDone := make(chan error, 1)
	go func() {
		startDone <- r.Start(ctx)
	}()

	// Give Start time to enter its subscribe loop and block on <-ctx.Done().
	time.Sleep(50 * time.Millisecond)

	// Register must not deadlock; complete within the timeout.
	registered := make(chan struct{})
	go func() {
		Register(r, "late-topic", func(ctx context.Context, payload string) error {
			return nil
		})
		close(registered)
	}()

	select {
	case <-registered:
		// success
	case <-time.After(time.Second):
		t.Fatal("Register called after Start deadlocked")
	}

	cancel()
	select {
	case <-startDone:
	case <-time.After(time.Second):
		t.Fatal("Start did not return after context cancel")
	}
}