mirror of
https://github.com/TecharoHQ/anubis.git
synced 2025-10-04 05:41:59 +08:00
* feat(store/bbolt): implement actor pattern Signed-off-by: Xe Iaso <me@xeiaso.net> * docs(internal/actorify): document package Signed-off-by: Xe Iaso <me@xeiaso.net> * Update metadata check-spelling run (pull_request) for Xe/actorify Signed-off-by: check-spelling-bot <check-spelling-bot@users.noreply.github.com> on-behalf-of: @check-spelling <check-spelling-bot@check-spelling.dev> --------- Signed-off-by: Xe Iaso <me@xeiaso.net> Signed-off-by: check-spelling-bot <check-spelling-bot@users.noreply.github.com>
107 lines
2.5 KiB
Go
107 lines
2.5 KiB
Go
// Package actorify lets you transform a parallel operation into a serialized
|
|
// operation via the Actor pattern[1].
|
|
//
|
|
// [1]: https://en.wikipedia.org/wiki/Actor_model
|
|
package actorify
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
)
|
|
|
|
func z[Z any]() Z {
|
|
var z Z
|
|
return z
|
|
}
|
|
|
|
var (
|
|
// ErrActorDied is returned when the actor inbox or reply channel was closed.
|
|
ErrActorDied = errors.New("actorify: the actor inbox or reply channel was closed")
|
|
)
|
|
|
|
// Handler is a function alias for the underlying logic the Actor should call.
|
|
type Handler[Input, Output any] func(ctx context.Context, input Input) (Output, error)
|
|
|
|
// Actor is a serializing wrapper that runs a function in a background goroutine.
|
|
// Whenever the Call method is invoked, a message is sent to the actor's inbox and then
|
|
// the callee waits for a response. Depending on how busy the actor is, this may take
|
|
// a moment.
|
|
type Actor[Input, Output any] struct {
|
|
handler Handler[Input, Output]
|
|
inbox chan *message[Input, Output]
|
|
}
|
|
|
|
type message[Input, Output any] struct {
|
|
ctx context.Context
|
|
arg Input
|
|
reply chan reply[Output]
|
|
}
|
|
|
|
type reply[Output any] struct {
|
|
output Output
|
|
err error
|
|
}
|
|
|
|
// New constructs a new Actor and starts its background thread. Cancel the context and you cancel
|
|
// the Actor.
|
|
func New[Input, Output any](ctx context.Context, handler Handler[Input, Output]) *Actor[Input, Output] {
|
|
result := &Actor[Input, Output]{
|
|
handler: handler,
|
|
inbox: make(chan *message[Input, Output], 32),
|
|
}
|
|
|
|
go result.handle(ctx)
|
|
|
|
return result
|
|
}
|
|
|
|
func (a *Actor[Input, Output]) handle(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
close(a.inbox)
|
|
return
|
|
case msg, ok := <-a.inbox:
|
|
if !ok {
|
|
if msg.reply != nil {
|
|
close(msg.reply)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
result, err := a.handler(msg.ctx, msg.arg)
|
|
|
|
reply := reply[Output]{
|
|
output: result,
|
|
err: err,
|
|
}
|
|
|
|
msg.reply <- reply
|
|
}
|
|
}
|
|
}
|
|
|
|
// Call calls the Actor with a given Input and returns the handler's Output.
|
|
//
|
|
// This only works with unary functions by design. If you need to have more inputs, define
|
|
// a struct type to use as a container.
|
|
func (a *Actor[Input, Output]) Call(ctx context.Context, input Input) (Output, error) {
|
|
replyCh := make(chan reply[Output])
|
|
|
|
a.inbox <- &message[Input, Output]{
|
|
arg: input,
|
|
reply: replyCh,
|
|
}
|
|
|
|
select {
|
|
case reply, ok := <-replyCh:
|
|
if !ok {
|
|
return z[Output](), ErrActorDied
|
|
}
|
|
|
|
return reply.output, reply.err
|
|
case <-ctx.Done():
|
|
return z[Output](), context.Cause(ctx)
|
|
}
|
|
}
|