Pool

"Pool of Workers" design pattern

The act.Pool actor implements the low-level gen.ProcessBehavior interface and provides the "Pool of Workers" design pattern functionality. All asynchronous messages and synchronous requests sent to the pool process are redirected to worker processes in the pool.

To launch a process based on act.Pool, you need to embed it in your object. For example:

func factory_myPool() gen.ProcessBehavior {
    return &myPool{}
}

type myPool struct {
    act.Pool
}
...
node.Spawn(factory_myPool, gen.ProcessOptions)

act.Pool uses the act.PoolBehavior interface to interact with your object:

type PoolBehavior interface {
	gen.ProcessBehavior
	
	Init(args ...any) (PoolOptions, error)

	HandleMessage(from gen.PID, message any) error
	HandleCall(from gen.PID, ref gen.Ref, request any) (any, error)
	Terminate(reason error)

	HandleInspect(from gen.PID) map[string]string
	HandleEvent(message gen.MessageEvent) error
}

The Init method is mandatory for implementation in act.PoolBehavior, while the other methods are optional. Similar to act.Actor, act.Pool has the embedded gen.Process interface, allowing you to use its methods directly from your object:

func (p *myPool) Init(args...) (act.PoolOptions, error) {
    var options act.PoolOptions
    p.Log().Info("starting pool process with name", p.Name())
    // ...
    // set pool options
    // ...
    return options, nil
}

func (p *myPool) Terminate(reason error) {
    p.Log().Info("pool process terminated with reason: %s", reason)
}

act.PoolOptions

With act.PoolOptions, you can configure various parameters for the pool, such as:

type PoolOptions struct {
	PoolSize          int64
	WorkerFactory     gen.ProcessFactory
	WorkerMailboxSize int64
	WorkerArgs        []any
}
  • PoolSize: Determines the number of worker processes to be launched when your pool process starts. If this option is not set, the default value of 3 is used.

  • WorkerFactory: A factory function that creates worker processes.

  • WorkerMailboxSize: Specifies the size of the mailbox for each worker process.

  • WorkerArgs: Arguments passed during the startup of worker processes.

Load distribution

All asynchronous messages and synchronous requests received in the pool process's mailbox are automatically forwarded to worker processes, distributing the load evenly. The load is distributed using a FIFO queue of worker processes.

Message/Request Handling Algorithm:

  1. Select a worker from the FIFO queue.

  2. Forward the message/request.

  3. If errors like gen.ErrProcessUnknown or gen.ErrProcessTerminated occur, a new worker is started, and the message is sent to it.

  4. If gen.ErrMailboxFull occurs, the worker is requeued, and the next worker is selected.

  5. Upon successful forwarding, the worker returns to the end of the queue.

If all workers are busy (e.g., due to gen.ErrMailboxFull), the pool process logs an error, such as "no available worker process. ignored message from <PID>."

Forwarding to workers only happens for messages from the Main queue. To ensure a message is handled by the pool process itself, send it with a priority higher than gen.MessagePriorityNormal. The gen.Process interface provides SendWithPriority and CallWithPriority methods for sending messages or making synchronous requests with a specified priority.

Methods of act.Pool

To dynamically manage the number of worker processes, act.Pool provides the following methods:

  • AddWorkers: This method adds a specified number of worker processes to the pool. Upon successful execution, it returns the total number of worker processes in the pool after the addition.

  • RemoveWorkers: This method stops and removes a specified number of worker processes from the pool. Upon successful execution, it returns the total number of worker processes remaining in the pool after the removal.

Last updated