manager
packageAPI reference for the manager
package.
Imports
(9)Job
Job represents a unit of work dispatched through a Relay.
type Job struct
Fields
| Name | Type | Description |
|---|---|---|
| ID | string | |
| Queue | string | |
| Topic | string | |
| Payload | []byte | |
| CreatedAt | time.Time | |
| TryCount | int |
Handler
Handler is the function signature for topic message handlers.
type Handler func(ctx context.Context, payload T) error
Broker
Broker is the messaging abstraction used by Relay to publish and receive messages.
type Broker interface
Relay
Relay routes typed messages between producers and consumers using a pluggable Broker.
type Relay struct
Methods
Start stores ctx so handlers can observe cancellation, subscribes all registered handlers to the Broker, and blocks until ctx is cancelled.
Parameters
Returns
func (*Relay) Start(ctx context.Context) error
{
r.ctxMu.Lock()
r.ctx = ctx
r.ctxMu.Unlock()
r.handlerMu.RLock()
handlers := make(map[string]func([]byte) error, len(r.handlers))
for topic, wrapperFn := range r.handlers {
handlers[topic] = wrapperFn.(func([]byte) error)
}
r.handlerMu.RUnlock()
for topic, userHandler := range handlers {
var subscribeFn func([]byte) error
if nr, ok := r.broker.(broker.NativeRetrier); (!ok || !nr.HandlesRetries()) && r.maxRetries > 0 {
subscribeFn = r.retryHandler(topic, userHandler)
} else {
topic := topic
subscribeFn = func(data []byte) error {
defer func() {
if rec := recover(); rec != nil {
fmt.Fprintf(os.Stderr, "panic in job %s: %v\n", topic, rec)
}
}()
return userHandler(data)
}
}
if err := r.broker.Subscribe(topic, subscribeFn); err != nil {
return err
}
}
<-ctx.Done()
return nil
}
Parameters
Returns
func (*Relay) retryHandler(topic string, userHandler func([]byte) error) func([]byte) error
{
return func(data []byte) error {
defer func() {
if rec := recover(); rec != nil {
fmt.Fprintf(os.Stderr, "panic in job %s: %v\n", topic, rec)
}
}()
job := Job{Topic: topic, Payload: data}
var lastErr error
for job.TryCount = 0; job.TryCount <= r.maxRetries; job.TryCount++ {
if job.TryCount > 0 {
time.Sleep(time.Duration(1<<uint(job.TryCount-1)) * 100 * time.Millisecond)
}
lastErr = userHandler(data)
if lastErr == nil {
return nil
}
}
return lastErr
}
}
Fields
| Name | Type | Description |
|---|---|---|
| broker | Broker | |
| handlers | map[string]any | |
| handlerMu | sync.RWMutex | |
| ctx | context.Context | |
| ctxMu | sync.RWMutex | |
| maxRetries | int |
Uses
Option
Option configures a Relay instance.
type Option options.Option[Relay]
New
New creates a Relay with optional options. By default it uses an in-process
MemoryBroker suitable for testing and single-process use.
Parameters
Returns
func New(opts ...Option) *Relay
{
r := &Relay{
broker: broker.NewMemoryBroker(),
handlers: make(map[string]any),
}
options.Apply(r, opts...)
return r
}
Example
r := manager.New()
manager.Register(r, "emails", func(ctx context.Context, msg Email) error {
return send(ctx, msg)
})
go r.Start(ctx)
manager.Enqueue(ctx, r, "emails", Email{To: "[email protected]"})
WithBroker
WithBroker configures the Relay to use b as its messaging backend.
func WithBroker(b Broker) Option
{
return func(r *Relay) {
r.broker = b
}
}
WithMaxRetries
WithMaxRetries sets the number of retry attempts for brokers that do not
handle retries natively (e.g. MemoryBroker). Retries use exponential backoff.
Parameters
Returns
func WithMaxRetries(n int) Option
{
return func(r *Relay) {
r.maxRetries = n
}
}
Uses
Register
Register subscribes fn to handle messages of type T on topic. The handler
receives the context passed to Start, allowing cancellation to propagate
into handler logic.
Parameters
func Register[T any](r *Relay, topic string, fn Handler[T])
{
wrapper := func(raw []byte) error {
r.ctxMu.RLock()
ctx := r.ctx
r.ctxMu.RUnlock()
if ctx == nil {
ctx = context.Background()
}
var payload T
if err := json.Unmarshal(raw, &payload); err != nil {
return fmt.Errorf("payload unmarshal failed: %w", err)
}
return fn(ctx, payload)
}
r.handlerMu.Lock()
r.handlers[topic] = wrapper
r.handlerMu.Unlock()
}
Enqueue
Enqueue serialises payload and publishes it to topic via the Relay’s Broker.
Parameters
Returns
func Enqueue[T any](ctx context.Context, r *Relay, topic string, payload T) error
{
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("payload marshal failed: %w", err)
}
return r.broker.Publish(ctx, topic, data)
}
Example
err := manager.Enqueue(ctx, r, "orders", Order{ID: "abc", Amount: 9.99})
if err != nil {
log.Fatal(err)
}
TestHandlerReceivesStartContext
TestHandlerReceivesStartContext verifies that handlers receive the context
passed to Start, not context.Background(). When Start’s context is cancelled,
an in-flight handler should observe the cancellation.
Parameters
func TestHandlerReceivesStartContext(t *testing.T)
{
r := New()
var receivedCtx context.Context
var mu sync.Mutex
Register(r, "test-topic", func(ctx context.Context, payload string) error {
mu.Lock()
receivedCtx = ctx
mu.Unlock()
return nil
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan error, 1)
go func() {
done <- r.Start(ctx)
}()
// Publish a message and wait for handler to run.
if err := Enqueue(ctx, r, "test-topic", "hello"); err != nil {
t.Fatalf("Enqueue failed: %v", err)
}
// Give the handler time to run.
time.Sleep(50 * time.Millisecond)
mu.Lock()
rc := receivedCtx
mu.Unlock()
if rc == nil {
t.Fatal("handler was not called")
}
if rc == context.Background() {
t.Fatal("handler received context.Background() instead of Start's context")
}
// Cancel Start's context and verify it is the same context the handler saw.
cancel()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("Start did not return after context cancel")
}
// The handler's context should also be done now.
select {
case <-rc.Done():
default:
t.Fatal("handler context is not done after Start's context was cancelled")
}
}
TestRegisterAfterStartDoesNotDeadlock
TestRegisterAfterStartDoesNotDeadlock verifies that calling Register after
Start does not block (i.e. handlerMu is not held for Start’s lifetime).
Parameters
func TestRegisterAfterStartDoesNotDeadlock(t *testing.T)
{
r := New()
Register(r, "initial-topic", func(ctx context.Context, payload string) error {
return nil
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
startDone := make(chan error, 1)
go func() {
startDone <- r.Start(ctx)
}()
// Give Start time to enter its subscribe loop and block on <-ctx.Done().
time.Sleep(50 * time.Millisecond)
// Register must not deadlock; complete within the timeout.
registered := make(chan struct{})
go func() {
Register(r, "late-topic", func(ctx context.Context, payload string) error {
return nil
})
close(registered)
}()
select {
case <-registered:
// success
case <-time.After(time.Second):
t.Fatal("Register called after Start deadlocked")
}
cancel()
select {
case <-startDone:
case <-time.After(time.Second):
t.Fatal("Start did not return after context cancel")
}
}