Only this pageAll pages
Powered by GitBook
1 of 50

Ergo Framework Documentation

Loading...

Basics

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Actors

Loading...

Loading...

Loading...

Loading...

Meta Processes

Loading...

Loading...

Loading...

Loading...

Testing

Loading...

extra library

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Networking

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Tools

Loading...

Loading...

Loading...

Overview

Ergo Framework - is an implementation of ideas, technologies, and design patterns from the Erlang world in the Go programming language. It is built on the actor model, network transparency, and a set of ready-to-use components for development. This makes it significantly easier to create complex and distributed solutions while maintaining a high level of reliability and performance.

Node

The management of starting and stopping processes (actors), as well as routing messages between them, is handled by the node. The node is also responsible for network communication. If a message is sent to a remote process, the node automatically establishes a network connection with the remote node where the process is running, thus ensuring network transparency.

Actors

Actors in Ergo Framework are lightweight processes that live on top of goroutines. Interaction between processes occurs through message passing. Each process has a mailbox with multiple queues to prioritize message handling. Actors can exchange asynchronous messages and also make synchronous requests to each other.

Networking

Each node has a built-in Service Discovery function. Typically, the first node launched on a host becomes the registrar for other nodes running on the same host. Upon registration, each node reports its name, the port number for incoming connections, and a set of additional parameters.

Network transparency in Ergo Framework is achieved through the ENP (Ergo Network Protocol) and EDF (Ergo Data Format). These are based on ideas from Erlang's network stack but are not compatible with it. Therefore, to communicate with Erlang nodes, an additional Erlang package must be used.

To maximize performance in network message exchange, Ergo Framework uses a pool of multiple TCP connections combined into a single logical network connection.

Performance

Ergo Framework demonstrates high performance in local message exchange due to the use of lock-free queues within the process mailbox and the efficient use of goroutines to handle the process itself. A goroutine is only activated when the process receives a message; otherwise, the process remains idle and does not consume CPU resources.

You can evaluate the performance yourself using the provided benchmarks https://github.com/ergo-services/benchmarks.

Requirements

In the development of Ergo Framework, we adhere to the concept of zero dependencies. All functionality is implemented using only the standard Go library. The only requirement for using Ergo Framework is the version of the Go language. Starting from version 3.0, Ergo Framework depends on Go 1.20 or higher.

Supervision Tree

process control

To create fault-tolerant applications, Ergo Framework introduces a process structuring model. The core idea of this model is to divide processes into two types:

  • worker

  • supervisor

Worker processes perform computations, while supervisors are responsible solely for managing worker processes.

In Ergo Framework, worker processes are actors based on . Supervisors, on the other hand, are actors based on . The role of act.Supervisor is to start child processes and restart them according to the chosen restart strategy. Several are available for this purpose. A child process can be not only an actor based on act.Actor but also a supervisor based on act.Supervisor. This allows you to form a hierarchical structure for managing processes. Thanks to this approach, your solution becomes more reliable and fault-tolerant.

act.Actor
act.Supervisor
restart strategies

CertManager

Ergo Framework introduces gen.CertManager to simplify TLS certificate management. It enables live certificate updates without restarting the application and can be used across all network components like node acceptors, web, or TCP servers.

Use the function gen.CreateCertManager(cert tls.Certificate) to create a certificate manager. The gen.CertManager interface provides the following methods:

  • Update: Update the TLS certificate on the fly. All components using this CertManager will automatically receive the updated certificate.

  • GetCertificateFunc: Returns a closure to retrieve the TLS certificate.

  • GetCertificate: Returns the current TLS certificate.

Additionally, you can use the lib.GenerateSelfSignedCert function to generate a self-signed certificate.

import (
    ...
    "ergo.services/ergo"
    "ergo.services/ergo/gen"
    "ergo.services/ergo/lib"
    ...
)

var (
    Version = gen.Version{
        Name:    "DemoService",
	Release: "1.0",
    }
)

func main() {
    var options gen.NodeOptions
    ...
    cert, err := lib.GenerateSelfSignedCert(Version.String())
    if err != nil {
        panic(err)
    }
    options.CertManager = gen.CreateCertManager(cert)
    ...
    node, err := ergo.StartNode("node@localhost", options)
    if err != nil {
	panic(err)
    }

    node.Wait()
} 

Applications

The additional application library for Ergo Framework contains packages with a narrow specialization or external dependencies since Ergo Framework adheres to the "zero dependencies" principle.

You can find the source code of these applications in the application library repository at https://github.com/ergo-services/application.

Loggers

An extra library of logger implementations that are not included in the standard Ergo Framework library. This library contains packages with a narrow specialization. It also includes packages that have external dependencies, as Ergo Framework adheres to a "zero dependency" policy

Meta-Processes

An extra library of meta-process implementations not included in the standard Ergo Framework library. This library contains packages with a narrow specialization. It also includes packages with external dependencies, as Ergo Framework adheres to a "zero dependency" policy.

UDP

This meta-process allows you to launch a UDP server and handle incoming UDP packets as asynchronous messages of type meta.MessageUDP.

To create the meta-process, use the meta.CreateUDPServer function. This function takes meta.UDPServerOptions as an argument, which specifies the configuration options for the UDP server, such as the host, port, buffer size, and other relevant settings for managing UDP traffic:

type UDPServerOptions struct {
	Host       string
	Port       uint16
	Process    gen.Atom
	BufferSize int
	BufferPool *sync.Pool
}
  • Host, Port: Set the port number and host name for the UDP server.

  • Process: The name of the process that will handle meta.MessageUDP messages. If not specified, all messages will be sent to the parent process. You can also use an act.Pool process for distributing message handling.

  • BufferSize: Sets the buffer size created for incoming UDP messages. For each message, a new buffer of this size is allocated. This field is ignored if BufferPool is provided.

  • BufferPool: Defines a pool for message buffers. The Get method should return an allocated buffer of type []byte.

If the meta-process is successfully created, the UDP server will start, and the function meta.CreateUDPServer will return gen.MetaBehavior.

Next, the created meta-process must be started. It will handle incoming UDP packets and forward them to the process for handling. To start the meta-process, use the SpawnMeta method of the gen.Process interface.

If an error occurs when starting the meta-process, the UDP server created by the meta.CreateUDPServer function must be stopped using the Terminate method of the gen.MetaBehavior interface.

Stopping the meta-process results in the shutdown of the UDP server and the closing of the socket.

Sending a UDP-packet

To send a UDP packet, use the Send (or SendAlias) method from the gen.Process interface. The recipient should be the identifier gen.Alias of the UDP server meta-process. The message should be of type meta.MessageUDP. The meta.MessageUDP.ID field is ignored when sending (it is used only for incoming messages).

If a BufferPool was used in the UDP server options, the buffer in meta.MessageUDP.Data will be returned to the pool after the packet is sent.

For an example UDP server implementation, see the project demo at https://github.com/ergo-services/examples.

Registrars

An extra library of registrars or client implementations not included in the standard Ergo Framework library. This library contains packages with a narrow specialization. It also includes packages with external dependencies, as Ergo Framework follows a "zero dependency" policy.

Available Registrars

A client library for the central registrar. Provides service discovery, configuration management, and real-time cluster event notifications through a centralized registrar service.

Features:

  • Centralized service discovery

  • Real-time event notifications

  • Configuration management

  • TLS security support

  • Token-based authentication

A client library for , a distributed key-value store. Provides decentralized service discovery, hierarchical configuration management with type conversion, and automatic lease management.

Features:

  • Distributed service discovery

  • Hierarchical configuration with type conversion from strings ("int:123", "float:3.14")

  • Automatic lease management and cleanup

  • Real-time cluster change notifications

  • TLS/authentication support

  • Four-level configuration priority system

Choose Saturn for centralized management with a dedicated registrar service, or etcd for a distributed approach with built-in consensus and reliability guarantees.

Observer

The Application Observer provides a convenient web interface to view node status, network activity, and running processes in the node built with Ergo Framework. Additionally, it allows you to inspect the internal state of processes or meta-processes. The application is can also be used as a standalone tool Observer. For more details, see the section . You can add the Observer application to your node during startup by including it in the node's startup options:

The function observer.CreateApp takes observer.Options as an argument, allowing you to configure the Observer application. You can set:

  • Port: The port number for the web server (default: 9911 if not specified).

  • Host: The interface name (default: localhost).

  • LogLevel: The logging level for the Observer application (useful for debugging). The default is gen.LogLevelInfo

Network Protocols

The Ergo Framework allows nodes to run with various network stacks. You can replace the default network stack or add it as an additional stack. For more information, refer to the section.

This library contains implementations of network stacks that are not part of the standard Ergo Framework library.

Remote Spawn Process

The network stack in Ergo Framework includes the capability to spawn processes on remote nodes. Nodes can control access to this feature or disable it entirely by using the EnableRemoteSpawn flag in gen.NetworkFlags. By default, this feature is enabled.

Access Control

To manage the ability of remote nodes to spawn processes, the gen.Network interface provides two methods:

The name argument specifies the name under which the process factory will be registered in the network stack. Remote nodes must use this name when requesting to spawn a process.

Using EnableSpawn with the nodes argument creates an access list, allowing only the specified remote nodes to spawn processes with the registered factory. Conversely, using DisableSpawn with the nodes argument removes the specified nodes from the access list. If the list becomes empty, access is open to all nodes. To fully disable access to the process factory, use DisableSpawn without the nodes argument.

Spawn

To spawn a process on a remote node, you need to use the gen.RemoteNode interface, which can be obtained using the GetNode or Node methods of the gen.Network interface. The gen.RemoteNode interface provides two methods:

For the name argument, you must use the name under which the process factory is registered on the remote node.

Just like with local process spawning, the process started on the remote node will inherit certain parameters from its parent. In this case, the parent is the virtual identifier of the node that sent the spawn request. The process will also inherit the logging level.

To inherit environment variables, you need to enable the ExposeEnvRemoteSpawn option in gen.NodeOptions.Securitywhen starting your node. The environment variable values must be encodable and decodable using the EDF format. If you are using a custom type as the value of an environment variable, that type must be registered (see the section).

Upon success, these methods return the process identifier (gen.PID) of the process that was successfully started on the remote node.

You can also spawn a process on a remote node using the methods provided by the gen.Process interface:

When using these methods, the process started on the remote node will inherit parameters from the parent process, including the application name, logging level, and environment variables (if the ExposeEnvRemoteSpawn flag in gen.NodeOptions.Security was enabled).

Note that linking options in gen.ProcessOptions will be ignored when spawning processes remotely.

Saturn Client
Saturn
etcd Client
etcd
import (
	"ergo.services/ergo"
	"ergo.services/application/observer"
	"ergo.services/ergo/gen"
)

func main() {
	opt := gen.NodeOptions{
		Applications: []gen.ApplicationBehavior {
			observer.CreateApp(observer.Options{}),
		}
	}
	node, err := ergo.StartNode("example@localhost", opt)
	if err != nil {
		panic(err)
	}
	node.Wait()
}
Inspecting With Observer
Network Stack
EnableSpawn(name gen.Atom, factory gen.ProcessFactory, nodes ...gen.Atom) error
DisableSpawn(name gen.Atom, nodes ...gen.Atom) error
Spawn(name gen.Atom, options gen.ProcessOptions, args ...any) (gen.PID, error)
SpawnRegister(register gen.Atom, name gen.Atom, options gen.ProcessOptions, args ...any) (PID, error)
RemoteSpawn(node gen.Atom, name gen.Atom, options gen.ProcessOptions, args ...any) (gen.PID, error)
RemoteSpawnRegister(node gen.Atom, name gen.Atom, register gen.Atom, options gen.ProcessOptions, args ...any) (gen.PID, error)
Network Transparency

Service Discovering

Registrar

The Service Discovering mechanism allows nodes to automatically find other nodes and determine connection parameters.

Each node has a Registrar that starts during the node's initialization and operates in either server or client mode. In Ergo Framework, the default Registrar implementation is available in the package ergo.services/ergo/net/registrar. If you are using Saturn, you need to use the corresponding client available in ergo.services/registrar/saturn. For communication with Erlang nodes, you should use the implementation from ergo.services/proto/erlang23/epmd.

The default Registrar, when running in server mode, opens a TCP socket on localhost:4499 to register other nodes on the host and a UDP socket on *:4499 to handle resolve requests from other nodes, including remote ones. If the Registrar fails to start in server mode, it switches to client mode and registers with the Registrar on the other node at same host which is running in server-mode.

When a node starts, it establishes a TCP connection with the Registrar server, which remains open until the node terminates. During registration, your node communicates its name, information about all acceptors, and their parameters (port number, Handshake/Proto version, TLS flag) to the Registrar. If using Saturn, additional information about running applications on the node is also communicated (this functionality is not supported by the built-in Registrar).

When a node running the Registrar in server mode terminates, other nodes on the same host automatically attempt to switch to server mode. The node that first successfully opens the TCP socket on localhost:4499 will switch to server mode, while the remaining nodes on the host will continue to operate in client mode and automatically register with the new Registrar in server node.

Node names must be unique within the same host. If a node attempts to register with a name that has already been registered by another node, the Registrar will return the error gen.ErrTaken.

When a node attempts to establish a network connection with another node, it first sends a resolve request (a UDP packet) to the Registrar, using the host name of the target node and port 4499. In response to this request, the Registrar returns a list of acceptors along with their parameters needed to establish the connection:

  • The port number where the acceptor is running.

  • The version of the Handshake protocol.

  • The version of the Proto protocol.

  • The TLS flag, which indicates whether TLS encryption is required for this connection.

Using these parameters, the node then establishes the network connection to the target node.

By default, the built-in network stack of Ergo Framework (ergo.services/ergo/net) is used for both incoming and outgoing connections. However, the node can work with multiple network stacks simultaneously (e.g., for compatibility with Erlang's network stack).

You can create multiple acceptors with different sets of parameters for handling incoming connections. For outgoing connections, you can manage the connection parameters using the Static Routes functionality, which allows you to control how connections are established with various nodes.

WebWorker

To handle HTTP-requests

The act.WebWorker actor implements the low-level gen.ProcessBehavior interface and allows HTTP requests to be handled as asynchronous messages. It is designed to be used with a web server (refer to the Web section in meta-processes). To launch a process based on act.WebWorker, you need to embed it in your object and implement the required methods from the act.WebWorkerBehavior interface.

Example:

type MyWebWorker struct {
    act.WebWorker
}

func factoryMyWebWorker() gen.ProcessBehavior {
    return &MyWebWorker{}
}

To work with your object, act.WebWorker uses the act.WebWorkerBehavior interface. This interface defines a set of callback methods:

type WebWorkerBehavior interface {
	gen.ProcessBehavior

	// Init invoked on a spawn WebWorker for the initializing.
	Init(args ...any) error

	// HandleMessage invoked if WebWorker received a message sent with gen.Process.Send(...).
	// Non-nil value of the returning error will cause termination of this process.
	// To stop this process normally, return gen.TerminateReasonNormal
	// or any other for abnormal termination.
	HandleMessage(from gen.PID, message any) error

	// HandleCall invoked if WebWorker got a synchronous request made with gen.Process.Call(...).
	// Return nil as a result to handle this request asynchronously and
	// to provide the result later using the gen.Process.SendResponse(...) method.
	HandleCall(from gen.PID, ref gen.Ref, request any) (any, error)

	// Terminate invoked on a termination process
	Terminate(reason error)

	// HandleEvent invoked on an event message if this process got subscribed on
	// this event using gen.Process.LinkEvent or gen.Process.MonitorEvent
	HandleEvent(message gen.MessageEvent) error

	// HandleInspect invoked on the request made with gen.Process.Inspect(...)
	HandleInspect(from gen.PID, item ...string) map[string]string

	// HandleGet invoked on a GET request
	HandleGet(from gen.PID, writer http.ResponseWriter, request *http.Request) error
	// HandlePOST invoked on a POST request
	HandlePost(from gen.PID, writer http.ResponseWriter, request *http.Request) error
	// HandlePut invoked on a PUT request
	HandlePut(from gen.PID, writer http.ResponseWriter, request *http.Request) error
	// HandlePatch invoked on a PATCH request
	HandlePatch(from gen.PID, writer http.ResponseWriter, request *http.Request) error
	// HandleDelete invoked on a DELETE request
	HandleDelete(from gen.PID, writer http.ResponseWriter, request *http.Request) error
	// HandleHead invoked on a HEAD request
	HandleHead(from gen.PID, writer http.ResponseWriter, request *http.Request) error
	// HandleOptions invoked on an OPTIONS request
	HandleOptions(from gen.PID, writer http.ResponseWriter, request *http.Request) error
}

All methods in the act.WebWorkerBehavior interface are optional for implementation.

It is most efficient to use act.WebWorker in combination with act.Pool for load balancing when handling HTTP requests:

//
// WebWorker
//
func factory_MyWebWorker() gen.ProcessBehavior {
	return &MyWebWorker{}
}

type MyWebWorker struct {
	act.WebWorker
}

// Handle GET requests. 
func (w *MyWebWorker) HandleGet(from gen.PID, writer http.ResponseWriter, request *http.Request) error {
	w.Log().Info("got HTTP request %q", request.URL.Path)
	w.WriteHeader(http.StatusOK)
	return nil
}

//
// Pool of workers
//
type MyPool struct {
	act.Pool
}

func factory_MyPool() gen.ProcessBehavior {
	return &MyPool{}
}

// Init invoked on a spawn Pool for the initializing.
func (p *MyPool) Init(args ...any) (act.PoolOptions, error) {
	opts := act.PoolOptions{
		WorkerFactory: factory_MyWebWorker,
	}
	p.Log().Info("started process pool of MyWebWorker with %d workers", opts.PoolSize)
	return opts, nil
}

An example implementation of a web server using act.WebWorker and act.Pool for load distribution when handling HTTP requests can be found in the repository at https://github.com/ergo-services/examples, the project demo.

Events

The events mechanism in Ergo Framework is built on top of the pub/sub subsystem. It allows any process to become an event producer, while other processes can subscribe to these events. This enables flexible event-driven architectures, where processes can publish and consume events across the system.

Producer

To register an event (gen.Event), you use the RegisterEvent method available in the gen.Process or gen.Nodeinterface. When registering an event, you can configure the following parameters using the gen.EventOptions:

  • Notify: This flag controls whether the producer should be notified about the presence or absence of subscribers for the event. If notifications are enabled, the process will receive a gen.MessageEventStart message when the first subscriber appears and a gen.MessageEventStop message when the last subscriber unsubscribes. This allows the producer to generate events only when there are active subscribers. If the event is registered using the RegisterEvent method of the gen.Node interface, this field is ignored.

  • Buffer: This specifies how many of the most recent events should be stored in the event buffer. If this option is set to zero, buffering is disabled.

The RegisterEvent function returns a token of type gen.Ref upon success. This token is used to generate events. Only the process that owns the event, or a process that has been delegated the token by the event owner, can produce events for that registration.

To generate events, the gen.Process interface provides the SendEvent method. This method accepts the following arguments:

  • name: The name of the registered event (gen.Atom).

  • token: The key obtained during the event registration (gen.Ref).

  • message: The event payload, which can be of any type.

It's important to note that the RegisterEvent method is not available to a process during its initialization state.

To generate events, the gen.Node interface provides the SendEvent method. This method is similar to the SendEventmethod in the gen.Process interface but includes an additional parameter, gen.MessageOptions. This extra parameter allows for further customization of how the event message is sent, such as setting priority, compression, or other message-related options.

Consumer

To subscribe to a registered event, the gen.Process interface provides the following methods:

  • LinkEvent: This method creates a link to a gen.Event. The process will receive an exit signal if the producer process of the event terminates or if the event is unregistered. To remove the link, use the UnlinkEvent method. When the link is created, the method returns a list of the most recent events from the producer's buffer.

  • MonitorEvent: This method creates a monitor on a gen.Event. The process will receive a gen.MessageDownEventif the producer process terminates or if the event is unregistered. If the event is unregistered, the Reason field in gen.MessageDownEvent will contain gen.ErrUnregistered. To remove the monitor, use the DemonitorEventmethod.

Both methods accept an argument of type gen.Event. For an event registered on the local node, you only need to specify the Name field, and you can leave the Node field empty. This simplifies subscribing to local events while still providing flexibility for handling events from remote nodes.

type myActor {
    act.Actor
}
...
func (a *myActor) HandleMessage(from gen.PID, message any) error {
    ...
    // local event
    event := gen.Event{Name: "exampleEvent"}
    lastEvents, err := a.LinkEvent(event)
    ...
    // remote event
    event := gen.Event{Name: "remoteEvent", Node: "remoteNode"}
    lastEvents, err := a.LinkEvent(event)
}

Upon successfully creating a link or monitor, the function returns a list of events (gen.MessageEvent) provided by the producer process from its message buffer. Each event contains the following information: the event name (gen.Event), the timestamp of when the event was generated (obtained using time.Now().UnixNano()), and the actual event value sent by the producer process. If the list of events is empty, it means that either the producer process has not yet generated any events or the producer registered the event with a zero-sized buffer.

For an example demonstrating the capabilities of the events mechanism, you can refer to the events project in the ergo-services/examples repository:

Links And Monitors

Linking and Monitoring Mechanisms

These mechanisms allow processes to respond to termination events of targets with which links or monitors have been created. Such targets can include:

  • A process identifier (gen.PID)

  • A process name (gen.Atom or gen.ProcessID)

  • A process alias (gen.Alias)

  • An event (gen.Event)

  • A node name (gen.Atom, in the case of monitoring a node connection)

The key difference between linking and monitoring lies in the response to the termination event of the target. When a process creates a link with a target, and that target terminates, the linked process receives an exit signal, which generally causes it to terminate as well.

In contrast, when a process creates a monitor with a target, and the target terminates, the monitoring process receives a gen.MessageDown* message, which contains the reason for the target's termination. This notification allows the monitoring process to react accordingly without necessarily terminating itself.

Links

In Ergo Framework, links are created between processes to ensure that termination events of one process propagate to linked processes. The gen.Process interface provides the following methods for creating and managing links:

  • Link: This universal method allows you to create a link to a target, which can be a gen.Atom (for a local process), gen.PID, gen.ProcessID, or gen.Alias. The process will receive an exit signal upon the target process's termination. To remove the link, use the Unlink method with the same argument.

  • LinkPID: Creates a link to a process by its gen.PID. The process will receive an exit signal upon the target process's termination. To remove the link, use UnlinkPID or Unlink with the same argument.

  • LinkProcessID: Creates a link to a process by its gen.ProcessID. The process will receive an exit signal upon the target process's termination or when the associated name is unregistered. To remove the link, use UnlinkProcessIDor Unlink.

  • LinkAlias: Creates a link to a process by its gen.Alias. The process will receive an exit signal upon the termination of the process that created the alias or upon the alias's deletion. To remove the link, use UnlinkAlias or Unlink.

  • LinkNode: Creates a link to a node connection using the node name (gen.Atom). The process will receive an exit signal if the connection to the target node is lost. To remove the link, use UnlinkNode.

  • LinkEvent: Creates a link to an event (gen.Event). The process will receive an exit signal when the producer process of the event terminates or when the gen.Event is unregistered. To remove the link, use UnlinkEvent. More information about this mechanism can be found in the section.

Exit signals are always delivered with the highest priority and placed in the Urgent queue of the process's mailbox.

In Erlang, the linking mechanism is bidirectional, meaning that if a process that created a link with a target process terminates, it will also cause the termination of the target process.

In contrast, in Ergo Framework, the linking mechanism is unidirectional. The termination of the process that created the link has no effect on the target process. Only the termination of the target process triggers an exit signal to the linked process. This design allows more flexibility and control, preventing unintended cascading failures in linked processes.

Monitors

Monitors allow a process to observe the lifecycle of a target process or event. The gen.Process interface provides several methods for creating monitors:

  • Monitor: A universal method for creating a monitor. The target can be a gen.Atom (for a local process), gen.PID, gen.ProcessID, or gen.Alias. The monitoring process will receive a gen.MessageDown* message upon the termination of the target process. To remove the monitor, use the Demonitor method with the same argument.

  • MonitorPID: Creates a monitor for a process by its gen.PID. The monitoring process will receive a gen.MessageDownPID message upon the target process's termination, with the reason for termination included in the Reason field. To remove the monitor, use DemonitorPID or Demonitor.

  • MonitorProcessID: Creates a monitor for a process by its gen.ProcessID. The monitoring process will receive a gen.MessageDownProcessID message upon the target process's termination, with the reason for termination in the Reason field. If the target process unregisters its name, the Reason field of the gen.MessageDownProcessID will contain gen.ErrUnregistered. To remove the monitor, use DemonitorProcessID or Demonitor.

  • MonitorAlias: Creates a monitor for a process by its gen.Alias. The monitoring process will receive a gen.MessageDownAlias message upon the termination of the process that created the alias or if the alias is deleted. In the case of alias deletion, the Reason field of the gen.MessageDownAlias will contain gen.ErrUnregistered. To remove the monitor, use DemonitorAlias or Demonitor.

  • MonitorNode: Creates a monitor for a node connection. The monitoring process will receive a gen.MessageDownNode message upon the disconnection of the node. To remove the monitor, use DemonitorNode.

  • MonitorEvent: Creates a monitor for an event (gen.Event). The monitoring process will receive a gen.MessageDownEvent message when the producer of the event terminates or if the event is unregistered. If the event is unregistered, the Reason field of the gen.MessageDownEvent will contain gen.ErrUnregistered. To remove the monitor, use DemonitorEvent. More details about this mechanism can be found in the section.

Messages of type gen.MessageDown* are delivered with high priority and placed in the System queue of the process's mailbox.

Remote targets

Thanks to , the linking and monitoring mechanisms in Ergo Framework can be used with targets on remote nodes.

When a process creates a link or monitor with a remote target (such as a process, alias, or event), if the connection to the node where the target resides is lost, the process will receive an exit signal or a gen.MessageDown* message, depending on the mechanism used. In this case, the reason for termination will be gen.ErrNoConnection. This allows processes to handle network disconnections and remote target failures seamlessly, as part of the same fault-tolerant mechanisms used for local processes.

It's important to remember that the methods for creating links and monitors are not available to a process during its initialization state (see the section for more details).

Static Routes

Managing outgoing connections

When creating an outgoing connection to a remote node, the node first checks the internal Static Routing Table for an existing route. If no static route is found for the specified remote node's name, the node uses the Registrar and sends a resolve request to obtain the necessary connection parameters (see the section for more information).

To define a static route for a specific node or group of nodes, use the AddRoute method of the gen.Network interface. This method allows you to manually configure the connection parameters for outgoing connections, ensuring more control over how the node establishes connections with the specified nodes, bypassing the need to query the Registrar.

  • match: defines the name of the remote node or a pattern to match multiple nodes. The node uses the MatchStringmethod from Go's standard library regexp package to match the node names based on this pattern.

  • route: specifies the connection parameters to be used when establishing a connection to the matched node(s). This includes details such as the network protocol, port, TLS settings, and other connection options.

  • weight: defines the weight of the specified route. If multiple routes match the same node name in the routing table, the node will return a list of routes sorted by descending weight. The node will then use the route with the highest weight to establish the connection.

To check for the existence of a static route for a specific remote node name, use the Route method of the gen.Networkinterface. To remove a static route, use the RemoveRoute method of the gen.Network interface, specifying the match value that was used when adding the route.

gen.NetworkRoute

The route argument in the AddRoute method allows you to specify detailed connection parameters for establishing outgoing connections to remote nodes. This enables fine-grained control over how connections are created:

  • Resolver: specifies a particular Registrar to be used by the node for resolving the connection. This interface is obtained via the Resolver method of the gen.Registrar interface. This functionality is useful when a node is working with multiple network stacks. For example, see the section for managing clusters with different network stacks simultaneously.

  • Route: allows you to explicitly define the host name, port number, TLS mode, handshake version, and protocol version. If the Resolver parameter is explicitly set, the route parameters provided will override any route information returned by the Registrar.

  • Cookie: overrides the default Cookie value specified in gen.NetworkOptions for the given route, providing custom authentication or security settings specific to the connection.

  • Cert: specifies a CertManager to establish a TLS connection, ensuring that the appropriate certificates are used during the connection.

  • Flags: you can override specific flags for the given route. For example, you can explicitly disable certain mechanisms, such as preventing the remote node from spawning processes over the established connection.

  • AtomMapping: enables automatic substitution of gen.Atom values transmitted or received during the connection.

  • LogLevel: defines the logging level for the network stack within the established connection. This provides granular control over the verbosity of log messages for troubleshooting or monitoring network activity on a per-connection basis

Remote Start Application

In addition to the ability to spawn processes on remote nodes, Ergo Framework also allows you to start applications remotely. Similar to process spawning, access to this functionality is controlled by the EnableRemoteApplicationStartflag in gen.NetworkFlags. By default, this flag is enabled.

Access Control

To allow an application to be started from a remote node, it must be registered in the network stack. The gen.Networkinterface provides the following methods for controlling this functionality:

The name argument represents the application's name, and the nodes argument controls which nodes are permitted to start the application remotely. This gives you fine-grained control over which nodes can initiate the remote startup of specific applications.

Using the EnableApplicationStart method with the nodes argument creates an access list of remote nodes that are permitted to start the specified application.

When using the DisableApplicationStart method with the nodes argument, it removes the specified nodes from the access list. If the access list becomes empty after removing nodes, access is opened to all remote nodes. To fully disable access to starting the application, use the DisableApplicationStart method without the nodes argument.

Start Application

To start an application on a remote node, use the ApplicationStart method of the gen.RemoteNode interface (which you can access via the GetNode or Node methods of the gen.Network interface). This interface also provides additional methods to control the application's :

When an application is started remotely, the Parent property of the application is assigned the name of the node that initiated the startup. You can retrieve information about the application and its status using the ApplicationInfo method of the gen.Node interface.

Events
Events
network transparency
Process
AddRoute(match string, route gen.NetworkRoute, weight int) error
Service Discovery
Erlang
EnableApplicationStart(name gen.Atom, nodes ...gen.Atom) error
DisableApplicationStart(name gen.Atom, nodes ...gen.Atom) error
ApplicationStartTemporary(name gen.Atom, options gen.ApplicationOptions) error
ApplicationStartTransient(name gen.Atom, options gen.ApplicationOptions) error
ApplicationStartPermanent(name gen.Atom, options gen.ApplicationOptions) error
startup mode

Generic Types

Data Types and Interfaces Used in Ergo Framework

Data types

gen.Atom

This type is an alias for the string type. It was introduced as a specialized string that allows differentiation between regular strings and those used for node names or process names. In the network stack, this type is also handled separately and is actively used in the Atom-cache and Atom-mapping mechanisms. When printed, values of this type are enclosed in single quotes.

fmt.Printf("%s", gen.Atom("hello"))
...
'hello'

gen.PID

This type is used as a process identifier. It is a structure containing several fields: the node name Node, a unique sequential number within the node ID, and a Creation field.

When printed as a string, this type is transformed into the following representation:

pid := gen.PID{Node:"t1node@localhost", ID:1001, Creation:1685523227}
fmt.Printf("%s", pid)
...
<90A29F11.0.1001>

The node name is encoded into a hash using the CRC32 algorithm.

gen.Ref

The node can generate unique identifiers. For this purpose, the gen.Node interface provides the MakeRef method. It returns a guaranteed unique value of type gen.Ref. These values are used as unique request identifiers in synchronous calls and as unique tokens when registering a gen.Event by a producer process.

When printed as a string, the value is represented as:

ref := gen.Ref{
    Node:"t1node@localhost", 
    Creation:1685524098, 
    ID:[3]uint32{0x1f4c2, 0x5d90, 0x0},
}
fmt.Printf("%s", ref)
...
Ref#<90A29F11.128194.23952.0>

gen.ProcessID

This type is used as a process identifier with an associated name. It is a structure containing two fields: the process name Name and the node name Node.

When printed as a string, the node name is transformed into a hash using the CRC32 algorithm, resulting in the following representation:

process := gen.ProcessID{Name:"example", Node:"t1node@localhost"}
fmt.Printf("%s", process)
...
<90A29F11.'example'>

gen.Alias

This type is an alias for the gen.Ref type. Values of type gen.Alias are used as temporary process identifiers, created using the CreateAlias method in the gen.Process interface, as well as identifiers for meta-processes.

When printed as a string, the representation is similar to gen.Ref, but with a different prefix:

alias := gen.Alias{
    Node:"t1node@localhost", 
    Creation:1685524098, 
    ID:[3]uint32{0x1f4c2, 0x5d90, 0x0},
}
fmt.Printf("%s", ref)
...
Alias#<90A29F11.128194.23952.0>

gen.Event

Values of this type are used when subscribing to events through the MonitorEvent and LinkEvent methods. This type is a structure similar to gen.ProcessID, containing two fields: Name and Node.

When printed as a string, the representation is similar to gen.ProcessID, but with an added prefix:

event := gen.Event{Name:"event1", Node:"t1node@localhost"}
fmt.Printf("%s", event)
...
Event#<90A29F11:'event1'>

gen.Env

This type is an alias for the string type. It is used for the names of environment variables for nodes and processes. In Ergo Framework, environment variable names are case-insensitive.

When printed as a string, the value of this type is converted to uppercase:

env := gen.Env("name1")
fmt.Printf("%s", env)
...
NAME1

Interfaces

gen.Node

To start a node, use the function ergo.StartNode(...). If the node starts successfully, it returns the gen.Node interface. This interface provides a set of functions for interacting with the node. The full list of methods can be found in the reference documentation.

gen.Process

This type is an interface to a process object. In Ergo Framework, this interface is embedded in the actor act.Actor, so all methods of this interface become available to objects based on act.Actor (and its derivatives).

type myActor struct {
    act.Actor
}

func (a *myActor) Init(args ...any) error {
    // Sending a message to itself using methods PID and Send 
    // that belong to the embedded gen.Process interface
    a.Send(a.PID(), "hello")
}

The full list of available methods for the gen.Process interface can be found in the reference documentation.

gen.Network

You can access this interface using the Network method of the gen.Node interface. This interface provides a set of methods for managing the node's network stack. The full list of available methods for this interface can be found in the reference documentation.

gen.RemoteNode

This interface allows you to retrieve information about a remote node with which a connection has been established, as well as to spawn processes (using the Spawn and SpawnRegister methods) and start applications on it (using the ApplicationStart* methods). You can access this interface through the GetNode and Node methods of the gen.Networkinterface.

Rotate

This package implements the gen.LoggerBehavior interface and provides the capability to log messages to a file, with support for log file rotation at a specified interval.

Available options

  • Period Specifies the rotation period (minimum value: time.Minute)

  • TimeFormat Sets the format for the timestamp in log messages. You can choose any existing format (see time package) or define your own. By default, timestamps are in nanoseconds

  • IncludeBehavior includes the process behavior in the log

  • IncludeName includes the registered process name

  • ShortLevelName enables shortnames for the log levels

  • Path directory for the log files, default: ./log

  • Prefix defines the log files name prefix (<Path>/<Prefix>.YYYYMMDDHHMi.log[.gz])

  • Compress Enables gzip compression for log files

  • Depth Specifies the number of log files in rotation

Example

package main

import (
	"time"

	"ergo.services/ergo"
	"ergo.services/ergo/gen"
	"ergo.services/logger/rotate"
)

func main() {
	var options gen.NodeOptions
	ropt := rotate.Options{Period: time.Minute, Compress: false}
	rlog, err := rotate.CreateLogger(ropt)
	if err != nil {
		panic(err)
	}
	logger := gen.Logger{
		Name:   "rotate",
		Logger: rlog,
	}
	options.Log.Loggers = append(options.Log.Loggers, logger)

	node, err := ergo.StartNode("demo@localhost", options)
	if err != nil {
		panic(err)
	}
	node.Wait()
}

Cron

schedule tasks on a repetitive basis, such as daily, weekly, or monthly

Introduced in 3.1.0 (not yet released. available in v310 branch)

Cron functionality is provided to enable periodic job execution in a node. Its implementation replicates the functionality of the and supports the format.

To set up jobs at the node startup, use the gen.NodeOptions.Cron parameters. Each job is described using the gen.CronJob:

Parameters:

  • Name defines the name of the job. It must be unique among all cron jobs

  • Spec sets the scheduling parameters for the job in

  • Location allows to set the time zone for the job's scheduling parameters. By default, the local time zone is used

  • Action defines what needs to be run

  • Fallback allows specifying a process to notify if the Action results in an error

The Action field has the interface type gen.CronAction. You can use the following ready-to-use implementations:

  • gen.CreateCronActionMessage(to any, priority gen.MessagePriority) – creates an Action that sends a gen.MessageCron message to the specified process to. The to argument can be one of the following: gen.Atom, gen.ProcessID, gen.PID, or gen.Alias, referring to either a local process or a process on a remote node.

  • gen.CreateCronActionSpawn(factory gen.ProcessFactory, options gen.CronActionSpawnOptions) – spawns a local process. The options argument specifies the startup parameters for the process.

  • gen.CreateCronActionRemoteSpawn(node gen.Atom, name gen.Atom, options gen.CronActionSpawnOptions) – spawns a process on a remote node. The mechanism for spawning processes on a remote node is described in the section.

When using CreateCronActionSpawn or CreateCronActionRemoteSpawn, the following environment variables will be added to the spawned processes:

  • gen.CronEnvNodeName

  • gen.CronEnvJobName

  • gen.CronEnvJobActionTime

Example

In the example below, every day at 21:07 (Shanghai time), a gen.MessageCron message will be sent to the local process registered as myweb1. If the message cannot be delivered, a gen.MessageCronFallback message will be sent to the local process named myweb2:

gen.Cron interface

You can also manage jobs using the gen.Cron interface. It provides the following methods for this:

Access to the gen.Cron interface can be obtained using the Cron method of the gen.Node interface.

Custom Action

You can also create your own Action. To do this, simply implement the gen.CronAction interface:

It is worth noting that the action_time argument is passed in the time zone of the job, i.e., the one specified in the gen.CronJob.Location field when the job was created.

The Info method of the interface is used when calling gen.Cron.Info() to retrieve summary information about Cron and its jobs.

Cron specification

  • * represents "all". For example, using * * * * * will run every minute. Using * * * * 1 will run every minute only on Monday.

  • - allows specifying a range of values. For example, 15 23 * * 1-3 will trigger the job from Monday to Wednesday at 23:15

  • , defines a sequence of values: 15,25,35 23 * * * – this will trigger the job every day at 23:15, 23:25, and 23:35

  • / used for step values: */5 3 * * * this will trigger the job every 5 minutes during the hour starting from 3:00 (3:00, 3:05 ... 3:55). It can also be used with ranges: 21-37/5 17 * * * - every day at 17:21, 17:26, 17:31 and 17:36

  • # available for use only in the day-of-week field in the format day-of-week#occurrence. It allows specifying constructs like 5#2, which refers to the second Friday of the month

  • L stands for "last". In the day-of-month field, it specifies the last day of the month. In the day-of-week field - allows specifying constructs such as "the last Friday" (5L) of a given month.

You can also use the following macro definitions:

  • @hourly - 1 * * * * every hour

  • @daily - 10 3 * * * every day at 3:10

  • @monthly - 20 4 1 * * on day 1 of the month at 4:20

  • @weekly - 30 5 * * 1 on Monday at 5:30

Examples

1 19 * * 1#1,7L run at 19:01 every month on the first Monday and last Sunday

15 15 10-15/3 * * run at 15:15 every month on the 10th and 13th

To view the schedule of your job, use the JobSchedule method of the gen.Cron interface. To view the schedule of all jobs, use the Schedule method of this interface.

DST (daylight saving time) and Time-adjustment support

This implementation takes time changes into account – if a time adjustment occurs at the time of the job's scheduled execution and the new time does not match the job's schedule, the job will not be executed.

For example, if a job has the specification 0 2 * * * (to run every day at 2:00), it will be skipped on March 30, 2025, because at 2:00 that day, the time will be moved forward by one hour (due to DST). After the end of DST on October 26, 2025, at 3:00, the time will be shifted back by one hour, but the job in the example will be executed only once.

Actor

An actor in Ergo Framework implements the low-level gen.ProcessBehavior interface. To launch a process based on act.Actor, you need to create an object with an embedded act.Actor and implement a factory function for it. For example:

act.Actor uses the act.ActorBehavior interface to interact with your object. This interface defines a set of callback methods:

All methods in the act.ActorBehavior interface are optional, allowing you to implement only the necessary callbacks in your object. Since act.Actor embeds the gen.Process interface, you can directly use its methods from within your actor object.

Example:

Process Initialization

Process initialization begins when act.Actor invokes the Init method of the act.ActorBehavior interface, passing the args provided during Spawn (or SpawnRegister). If Init completes successfully, the node registers the process, allowing it to receive messages or synchronous requests. If Init fails, Spawn (or SpawnRegister) returns the error. It's important to note that during initialization, the process is not yet registered with the node, limiting access to certain methods of the embedded gen.Process interface until the process is fully initialized.

Handling Asynchronous Messages and Synchronous Requests

The process mailbox contains several queues: Main, System, Urgent, and a special Log queue. In act.Actor, messages are processed in the following order: Urgent, System, Main, and then Log.

Asynchronous messages are sent using the Send method of the gen.Node or gen.Process interface. Upon receiving such a message, act.Actor calls the HandleMessage method of the gen.ActorBehavior interface.

For synchronous requests, the HandleCall callback method is invoked in an actor based on act.Actor. The result returned by this method is sent as the response to the request.

A process can send asynchronous messages to itself, but attempting to make a synchronous request to itself will return the error gen.ErrTimeout..

act.Actor also allows handling synchronous requests asynchronously. This feature lets you delegate the processing of synchronous requests to other processes or delay the response. To do this, the HandleCall method should return nil, and you should use the SendResponse method of the gen.Process interface to send the response. You must provide the gen.PID of the requesting process and the gen.Ref reference of the request.

For debugging or monitoring the internal state, the Inspect method allows high-priority synchronous requests. In act.Actor, this triggers the HandleInspect method. This feature is widely used in the tool.

If the actor process is registered as a logger, the HandleLog method is called when log messages (gen.MessageLog) are received, generated using the gen.Log interface. If the actor process has subscribed to events using the LinkEvent or MonitorEvent methods of gen.Process, the HandleEvent method is called when an event message is received.

Process Termination

To stop a process, simply return a non-nil error value from the message-handling callback. After termination, the Terminate callback method will be called, with the reason argument being the returned error value.

Additionally, you can terminate a process using predefined constants like:

  • gen.TerminateReasonNormal for a normal (non-failure) termination.

  • gen.TerminateReasonKill when the process is forcibly stopped using the Kill method from the gen.Node interface.

In Ergo Framework, two additional termination reasons are defined:

  • gen.TerminateReasonPanic: Occurs when a panic happens during message processing.

  • gen.TerminateReasonShutdown: This termination reason can be sent by the parent process or the node when the node is stopped using the StopForce method of the gen.Node interface. It is not considered a failure.

At the moment the Terminate callback is invoked, the process has already been removed from the node. Therefore, most methods of the gen.Process interface will return the gen.ErrNotAllowed error.

SplitHandle

The SplitHandle option allows an actor to call the callback methods of the act.ActorBehavior interface based on the process identifier used to send a message or make a synchronous call to your process. You can enable this option using the SetSplitHandle(split bool) method defined in act.Actor. To check the current value of this option, use the SplitHandle method. Here is an example usage:

If your process has a registered associated name and receives a message using that name, act.Actor will call the HandleMessageName callback. The name argument will be the value used to send the message. For synchronous requests, the HandleCallName callback will be invoked.

If a process alias (gen.Alias) was used, the callbacks HandleMessageAlias and HandleCallAlias will be triggered for messages and synchronous requests, respectively. This enables customized handling based on the way the message or request was sent.

TrapExit - handling exit-signal

In Ergo Framework, an exit signal can be sent to your process by another process or node using the SendExit method from the gen.Process or gen.Node interfaces. It can also be generated by the node if your process had a link created via the Link* methods in the gen.Process interface. By default, when an actor receives an exit signal, it terminates and calls the Terminate callback with the reason specified in the exit signal.

To prevent your actor from terminating upon receiving an exit signal, you can enable the option to intercept such signals. This can be done using the SetTrapExit(trap bool) method in act.Actor. The default value is false, but you can enable it at any time, for example, during initialization:

When the TrapExit option is enabled, act.Actor converts exit signals into standard gen.MessageExit* messages:

  • gen.MessageExitPID: The source of the exit signal was a process.

  • gen.MessageExitProcessID: The source was gen.ProcessID (a link created using LinkProcessID).

  • gen.MessageExitAlias: The source was gen.Alias (link created using LinkAlias).

  • gen.MessageExitEvent: The source was gen.Event (link created using LinkEvent).

  • gen.MessageExitNode: The source was a network connection with a node (link created using LinkNode).

Scenarios where the exit signal cannot be intercepted:

  • Parent Process Sends Exit: When the parent process sends an exit signal via SendExit using the gen.Process interface, it cannot be intercepted.

  • Parent Process Terminates: If the parent process, with which a link was created via gen.PID (using Link or LinkPID), terminates, or if the parent process used the gen.ProcessOptions.LinkParent option during child process startup.

In these cases, the actor will terminate, and the Terminate callback will be called, even with TrapExit enabled. All other exit signals can be intercepted when TrapExit is active.

Port

Introduced in 3.1.0 (not yet released. available in v310 branch)

This implementation of the meta-process allows running external programs and communicating with them using stdin and stdout. Communication can be performed in both text and binary formats.

To create a meta.Port, you need to use the meta.CreatePort function. It takes meta.PortOptions as an argument, which allows specifying the following options:

  • Cmd command to run

  • Args arguments for the command

  • Env environment variables for the program being executed

  • EnableEnvMeta adds environment variables of the meta-process

  • EnableEnvOS adds OS environment variables

  • Tag can help distinguish between multiple meta-processes running with the same Cmd/Args

  • Process The name of the process that will handle meta.MessagePort* messages. If not specified, all messages will be sent to the parent process. You can also use an act.Pool process for distributing message handling

  • SplitFuncStdout to specify a split-function. (uses Scanner from the standard bufio package) for processing data from stdout in text mode. By default, data is split by lines

  • SplitFuncStderr to specify a split-function for the data from stderr

  • Binary enables binary mode for the data from stdout. See for more information

Below is an example of creating and running the external program example-prog using a meta-process meta.Port:

Binary mode

The implementation of meta.Port supports binary mode when working with stdin/stdout. To enable it, use the meta.PortOptions.Binary.Enable option and specify the binary format parameters:

  • ReadBufferSize sets the buffer size for reading. The default size is 8192

  • ReadBufferPool defines a pool for message buffers. The Get method should return an allocated buffer of type []byte

  • ReadChunk enables auto-chunking mode, which splits the incoming stream into chunks according to the specified parameters. For more details, see Auto chunking

  • WriteBufferKeepAlive enables the software keep-alive mechanism – a specified value will be automatically sent at the interval defined in the option WriteBufferKeepAlivePeriod

When binary mode is enabled, the SplitFuncStdout option is ignored. However, stderr is processed in text mode, and you can use the SplitFuncStderr option.

Auto chunking

This feature allows automatically chunking the incoming stream in binary mode. The chunks can be of fixed length – to achieve this, you need to specify the chunk size using the FixedLength option. For data with dynamic length, you must specify header parameters that will define the chunk length.

To enable the auto-chunking feature in binary mode, use the meta.PortOptions.Binary.ReadChunk.Enable option, and specify the parameters for automatic data chunking using the following options:

  • FixedLength allows you to set the length of chunks in the stdout. In this case, all Header* options will be ignored. Leave this field with a zero value to work with dynamically sized chunks

  • HeaderSize specifies the size of the header for dynamically sized chunks

  • HeaderLengthPosition specifies the position in the header where the length is stored

  • HeaderLengthSize specifies the size of the length in bytes (1, 2 or 4)

  • HeaderLengthIncludesHeader specifies whether the length value includes the header size

  • MaxLength allows you to set the maximum chunk length. If this value is exceeded, the meta-process will terminate, and the launched program will also be automatically stopped. Leave this value as zero if no length limits are required

For instance: incoming stream consists of dynamically sized chunks with a 7-byte header, and the length is encoded as a 32-bit number at the 3rd byte position (counting from 0). The length value includes the header size: L = 7 + len(payload)

In this case, the auto-chunking options would be as follows

You can find an example implementation in the repository , in the port project. Use the -bin argument to enable the demonstration of binary mode operation:

available in v310 branch of example repo

Pool

"Pool of Workers" design pattern

The act.Pool actor implements the low-level gen.ProcessBehavior interface and provides the "Pool of Workers" design pattern functionality. All asynchronous messages and synchronous requests sent to the pool process are redirected to worker processes in the pool.

To launch a process based on act.Pool, you need to embed it in your object. For example:

act.Pool uses the act.PoolBehavior interface to interact with your object:

The Init method is mandatory for implementation in act.PoolBehavior, while the other methods are optional. Similar to act.Actor, act.Pool has the embedded gen.Process interface, allowing you to use its methods directly from your object:

act.PoolOptions

With act.PoolOptions, you can configure various parameters for the pool, such as:

  • PoolSize: Determines the number of worker processes to be launched when your pool process starts. If this option is not set, the default value of 3 is used.

  • WorkerFactory: A factory function that creates worker processes.

  • WorkerMailboxSize: Specifies the size of the mailbox for each worker process.

  • WorkerArgs: Arguments passed during the startup of worker processes.

Load distribution

All asynchronous messages and synchronous requests received in the pool process's mailbox are automatically forwarded to worker processes, distributing the load evenly. The load is distributed using a FIFO queue of worker processes.

Message/Request Handling Algorithm:

  1. Select a worker from the FIFO queue.

  2. Forward the message/request.

  3. If errors like gen.ErrProcessUnknown or gen.ErrProcessTerminated occur, a new worker is started, and the message is sent to it.

  4. If gen.ErrMailboxFull occurs, the worker is requeued, and the next worker is selected.

  5. Upon successful forwarding, the worker returns to the end of the queue.

If all workers are busy (e.g., due to gen.ErrMailboxFull), the pool process logs an error, such as "no available worker process. ignored message from <PID>."

Forwarding to workers only happens for messages from the Main queue. To ensure a message is handled by the pool process itself, send it with a priority higher than gen.MessagePriorityNormal. The gen.Process interface provides SendWithPriority and CallWithPriority methods for sending messages or making synchronous requests with a specified priority.

Methods of act.Pool

To dynamically manage the number of worker processes, act.Pool provides the following methods:

  • AddWorkers: This method adds a specified number of worker processes to the pool. Upon successful execution, it returns the total number of worker processes in the pool after the addition.

  • RemoveWorkers: This method stops and removes a specified number of worker processes from the pool. Upon successful execution, it returns the total number of worker processes remaining in the pool after the removal.

Web

To handle synchronous HTTP requests with the actor model in Ergo Framework, they must be converted into asynchronous messages. For this purpose, two meta-processes have been created: meta.WebServer and meta.WebHandler. The first is used to start an HTTP server, and the second transforms synchronous HTTP requests into messages that are sent to a worker process. A worker process can be created using act.WebWorker, which processes the incoming messages asynchronously.

Creating and Spawning meta.WebHandler

You can create a meta.WebHandler using the meta.CreateWebHandler function. This function creates an object that implements the gen.MetaProcessBehavior and http.Handler interfaces.

It takes meta.WebHandlerOptions as an argument:

  • Process: The name of the process to which transformed asynchronous meta.MessageWebRequest messages are sent. If not specified, these messages are sent to the parent meta-process.

  • RequestTimeout: The time allowed for the process to handle the message, with a default of 5 seconds.

After successful creation, you need to start the meta-process using SpawnMeta from the gen.Process interface.

Once the meta-process is running, it can be used as an HTTP request handler

Creating and Spawning meta.WebServer

To create the meta.WebServer meta-process, use the meta.CreateWebServer function with the meta.WebServerOptions argument. These options allow you to set:

  • Host: The interface on which the port will be opened for handling HTTP requests.

  • Port: The port number.

  • CertManager: Enables TLS encryption for the HTTP server. You can use the node's CertManager to activate the node's certificate by using the CertManager() method of the gen.Node interface.

  • Handler: Specifies the HTTP request handler.

When the meta-process is created, the HTTP server starts. If the server fails to start, meta.CreateWebServer returns an error. After successful creation, start the meta-process using SpawnMeta(...) from the gen.Process interface.

Example:

Example can be found in the repository at , specifically in the demo project.

Network Transparency

The network transparency of Ergo Framework allows processes to exchange messages regardless of whether they are running locally or on remote nodes. The process identifier (gen.PID, gen.ProcessID, gen.Alias) includes the name of the node where the process is running—this information is used by the node to route the message.

If a message is being sent to a process on a remote node, the mechanism helps determine how to establish a network connection to that node. Once the connection is established, the node automatically encodes the message and sends it to the remote node, which then delivers it to the recipient process's mailbox.

The Ergo Data Format (EDF) is used for data transmission within Ergo Framework.

EDF - Ergo Framework Data Format

The Ergo Data Format (EDF) supports encoding and decoding of all scalar types in Golang, as well as specialized types from Ergo Framework (see ). To encode custom types, they must be registered using the edf.RegisterTypeOf method from the ergo.services/ergo/net network stack. The registration of a custom type must occur on both the sender and receiver nodes.

If you're registering a data type that contains another custom type among its child elements, you must register the child type first before registering the parent type.

Example:

All child elements in the registered structures must be public. If you need to encode/decode structures with private fields, you will need to implement a custom encoder/decoder for that data type. To do this, you must implement the edf.Marshaler and edf.Unmarshaler interfaces.

These interfaces allow you to define how the custom type is marshaled (encoded) and unmarshaled (decoded), giving you control over the serialization and deserialization process for types with private fields or special requirements.

Example:

The length of a message encoded using the MarshalEDF method must not exceed 2^32 bytes.

In Golang, the error type is an interface. To transmit error types over the network, they also need to be registered using the edf.RegisterError method. A set of standard errors in Ergo Framework (such as gen.Err*, gen.TerminateReason*) is automatically registered when the node starts.

Additionally, the following limits are imposed on specific types:

  • gen.Atom: Maximum length of 255 bytes

  • string: Maximum length of 65,535 bytes (2^16)

  • binary: Maximum length of 2^32 bytes

  • map/array/slice: Maximum number of elements is 2^32

When establishing a connection between nodes, during the handshake phase, the nodes exchange dictionaries containing registered data types and registered errors. These dictionaries are used to reduce the volume of transmitted information during message exchanges, as identifiers are used instead of the names of registered types and errors.

If you register a data type or an error after the connection is established, sending data of that type will result in an error. To make these newly registered types available for network exchange, you will need to disconnect and re-establish the connection.

The EDF format does not support pointers. The use of pointers should remain an internal optimization within your application and should not extend beyond it.

TCP

For working with TCP connections, two types of meta-processes are implemented in Ergo Framework:

  1. TCP Server Process: Responsible for creating a TCP server. It opens the socket and handles incoming connections.

  2. TCP Connection Handler: Manages established TCP connections, whether they are incoming or client-initiated

Server

To create a TCP server meta-process, use the meta.CreateTCPServer function. It accepts meta.TCPServerOptions with the following parameters:

  • Host, Port: Specify the interface and port for incoming connections.

  • ProcessPool: Defines a list of worker processes to handle incoming connections. Each new connection is assigned a worker from the pool. If ProcessPool is not set, the parent process handles all connections. Using act.Pool in ProcessPool is not recommended to avoid packet mismanagement.

  • CertManager: Enables TLS encryption.

  • BufferSize/BufferPool: Manage incoming data buffers.

  • KeepAlivePeriod/InsecureSkipVerify: Handle TCP keep-alive and TLS certificate verification.

After creating the meta-process, you should start it with SpawnMeta from the gen.Process interface. Example:

If starting the meta-process fails for any reason, you need to free up the port specified in meta.TCPServerOptions.Port by using the Terminate method of the gen.MetaBehavior interface.

Upon an incoming connection, the TCP server meta-process creates and starts a new meta-process to handle that connection.

Connection

To create a client TCP connection, use the meta.CreateTCPConnection function. It initiates a TCP connection and, if successful, creates a meta-process to manage it.

The function takes meta.TCPConnectionOption, similar to meta.TCPServerOptions, but instead of ProcessPool, it includes a Process field for specifying a process to handle TCP packets. If this field is not used, all data packets are sent to the parent process.

After a successful connection, start the meta-process with SpawnMeta. If the process fails, close the connection using the Terminate method of the gen.MetaBehavior interface..

Example:

TCP-connection handling

To handle a TCP connection, the meta-process communicates with either the parent or worker process based on the meta-process's launch options. Three types of messages are used:

  • meta.MessageTCPConnect: Sent when the meta-process starts, containing the connection ID (ID), RemoteAddr, and LocalAddr information.

  • meta.MessageTCPDisconnect: Sent when the TCP connection is disconnected, including cases where the meta-process is terminated.

  • meta.MessageTCP: Used for both receiving and sending data.

If the meta-process cannot send a message to the worker process, the connection is terminated, and the meta-process stops working.

Sending TCP-packet

To send a message to a TCP connection, use the Send (or SendAlias) method from the gen.Process interface. The recipient should be the gen.Alias of the meta-process managing the connection. The message must be of type meta.MessageTCP. Note that the meta.MessageTCP.ID field is ignored during sending (it is only used for incoming messages).

For a detailed example of a TCP server implementation, you can check out the demo project in the repository: .

func factory_myPool() gen.ProcessBehavior {
    return &myPool{}
}

type myPool struct {
    act.Pool
}
...
node.Spawn(factory_myPool, gen.ProcessOptions)
type PoolBehavior interface {
	gen.ProcessBehavior
	
	Init(args ...any) (PoolOptions, error)

	HandleMessage(from gen.PID, message any) error
	HandleCall(from gen.PID, ref gen.Ref, request any) (any, error)
	Terminate(reason error)

	HandleInspect(from gen.PID) map[string]string
	HandleEvent(message gen.MessageEvent) error
}
func (p *myPool) Init(args...) (act.PoolOptions, error) {
    var options act.PoolOptions
    p.Log().Info("starting pool process with name", p.Name())
    // ...
    // set pool options
    // ...
    return options, nil
}

func (p *myPool) Terminate(reason error) {
    p.Log().Info("pool process terminated with reason: %s", reason)
}
type PoolOptions struct {
	PoolSize          int64
	WorkerFactory     gen.ProcessFactory
	WorkerMailboxSize int64
	WorkerArgs        []any
}
type CronJob struct {
	// Name job name
	Name gen.Atom
	// Spec time spec in "crontab" format
	Spec string
	// Location defines timezone
	Location *time.Location
	// Action
	Action gen.CronAction
	// Fallback
	Fallback gen.ProcessFallback
}
func main() {
	var options gen.NodeOptions
	// ...
	locationAsiaShanghai, _ := time.LoadLocation("Asia/Shanghai")
	options.Cron.Jobs = []gen.CronJob{
		gen.CronJob{Name: "job1",
			Spec:     "7 21 * * *",
			Action:   gen.CreateCronActionMessage(gen.Atom("myweb1"), 
							gen.MessagePriorityNormal),
			Location: locationAsiaShanghai,
			Fallback: gen.ProcessFallback{Enable: true, Name: "myweb2"},
		},
	}
	node, err := ergo.StartNode("cron@localhost", options)
	if err != nil {
		panic(err)
	}
	// ...
}
type Cron interface {
	// AddJob adds a new job
	AddJob(job gen.CronJob) error
	// RemoveJob removes new job
	RemoveJob(name gen.Atom) error
	// EnableJob allows you to enable previously disabled job
	EnableJob(name gen.Atom) error
	// DisableJob disables job
	DisableJob(name gen.Atom) error

	// Info returns information about the jobs, spool of jobs for the next run
	Info() gen.CronInfo
	// JobInfo returns information for the given job
	JobInfo(name gen.Atom) (gen.CronJobInfo, error)

	// Schedule returns a list of jobs planned to be run for the given period
	Schedule(since time.Time, duration time.Duration) []gen.CronSchedule
	// JobSchedule returns a list of scheduled run times for the given job and period.
	JobSchedule(job Atom, since time.Time, duration time.Duration) ([]time.Time, error)
}
type CronAction interface {
	Do(job gen.Atom, node gen.Node, action_time time.Time) error
	Info() string
}
* * * * *
| | | | |                                                 allowed format
| | | | day-of-week (1–7) (Monday to Sunday)      *   d   d,d   d-d    dL    d#d
| | | month (1–12)                                *   d   d,d   d-d   */d
| | day-of-month (1–31)                           *   d   d,d   d-d   */d   d-d/d   L
| hour (0–23)                                     *   d   d,d   d-d   */d   d-d/d
minute (0–59)                                     *   d   d,d   d-d   */d   d-d/d
Cron service in Unix systems
Crontab specification
crontab format
Remote Spawn Process
type MyActor struct {
    act.Actor
    i int
}
func factoryMyActor() gen.ProcessBehavior {
    return &MyActor{}
}
type ActorBehavior interface {
	gen.ProcessBehavior
	
	// main callbacks
	Init(args ...any) error
	HandleMessage(from gen.PID, message any) error
	HandleCall(from gen.PID, ref gen.Ref, request any) (any, error)
	Terminate(reason error)
	
	// extended callbacks used if SplitHandle was enabled
	HandleMessageName(name gen.Atom, from gen.PID, message any) error
	HandleMessageAlias(alias gen.Alias, from gen.PID, message any) error
	HandleCallName(name gen.Atom, from gen.PID, ref gen.Ref, request any) (any, error)
	HandleCallAlias(alias gen.Alias, from gen.PID, ref gen.Ref, request any) (any, error)
	
	// specialized callbacks
	HandleInspect() map[string]string
	HandleLog(message gen.MessageLog) error
	HandleEvent(message gen.MessageEvent) error
	
}
func (a *MyActor) Init(args...) error {
    // get the gen.Log interface using Log method of embedded gen.Process interface
    a.Log().Info("starting process %s", a.PID())
    // initialize value
    a.i = 100
    // sending message to itself with methods Send and PID of embedded gen.Process
    a.Send(a.PID(), "hello")
    return nil
}

func (a *MyActor) HandleMessage(from gen.PID, message any) error {
    a.Log().Info("got message from %s: %s", from, message)
    ...
    // handling message
    ...
    return nil
}

func (a *MyActor) Terminate(reason error) {
    a.Log().Info("%s terminated with reason: %s", a.PID(), reason)
}
func (a *MyActor) Init(args...) error {
    a.SetSplitHandle(true)
    ...
    return nil
}
func (a *MyActor) Init(args...) error {
    a.SetTrapExit(true)
    ...
    return nil
}
Observer
import ergo.services/ergo/net/edf
...
type MyData struct {
    A int
    B string
    C MyBool
}

type MyBool bool
...

func init() {
    ...
    // type MyBool must be registered first
    edf.RegisterTypeOf(MyBool(false))
    edf.RegisterTypeOf(MyData{})
    ...
}
type myStruct struct{
    a int
    b string    
}

// implementation of edf.Marshaler interface
func (m myStruct) MarshalEDF(w io.Writer) error {
    var encoded []byte
    // do encode
    w.Write(encoded)
    return nil
}

// implementation of edf.Unmarshaler interface
func (m *myStruct) UnmarshalEDF(b []byte) error {
    // do decode
    return nil
}
Service Discovery
Generic Types
type ActorPort struct {
	act.Actor
}

func (ap *ActorPort) Init(args ...any) error {
	var options meta.PortOptions

	options.Cmd = "example-prog"

	// create port
	metaport, err := meta.CreatePort(options)
	if err != nil {
		ap.Log().Error("unable to create Port: %s", err)
		return err
	}

	// spawn meta process
	id, err := ap.SpawnMeta(metaport, gen.MetaOptions{})
	if err != nil {
		ap.Log().Error("unable to spawn port meta-process: %s", err)
		return err
	}

	ap.Log().Info("started Port (iotxt) %s (meta-process: %s)", options.Cmd, id)
	return nil
}
  __ header _   ___ payload ___ 
 |           | |               |
 x x x L L L L . . . . . . . . .
...
var options meta.PortOptions
...
options.Binary.Enable = true
options.Binary.ReadChunk.Enable = true
options.Binary.ReadChunk.HeaderSize = 7
options.Binary.ReadChunk.HeaderLengthPosition = 3
options.Binary.ReadChunk.HeaderLengthSize = 4
options.Binary.ReadChunk.HeaderLengthIncludesHeader = true
Binary mode
https://github.com/ergo-services/examples
https://github.com/ergo-services/examples

Saturn Сlient

This package implements the gen.Registrar interface and serves as a client library for the central registrar, Saturn. In addition to the primary Service Discovery function, it automatically notifies all connected nodes about cluster configuration changes.

To create a client, use the Create function from the saturn package. The function requires:

  • The hostname where the central registrar is running (default port: 4499, unless specified in saturn.Options)

  • A token for connecting to Saturn

  • a set of options saturn.Options

Then, set this client in the gen.NetworkOption.Registrar options

import (
     "ergo.services/ergo"
     "ergo.services/ergo/gen"

     "ergo.services/registrar/saturn"
)

func main() {
     var options gen.NodeOptions
     ...
     host := "localhost"
     token := "IwOBhgAEAGzPt"
     options.Network.Registrar = saturn.Create(host, token, saturn.Options{})
     ...
     node, err := ergo.StartNode("demo@localhost", options)
     ...
}

Using saturn.Options, you can specify:

  • Cluster - The cluster name for your node

  • Port - The port number for the central Saturn registrar

  • KeepAlive - The keep-alive parameter for the TCP connection with Saturn

  • InsecureSkipVerify - Option to ignore TLS certificate verification

When the node starts, it will register with the Saturn central registrar in the specified cluster.

Additionally, this library registers a gen.Event and generates messages based on events received from the central Saturn registrar within the specified cluster. This allows the node to stay informed of any updates or changes within the cluster, ensuring real-time event-driven communication and responsiveness to cluster configurations:

  • saturn.EventNodeJoined - Triggered when another node is registered in the same cluster.

  • saturn.EventNodeLeft - Triggered when a node disconnects from the central registrar

  • saturn.EventApplicationLoaded - An application was loaded on a remote node. Use ResolveApplication from the gen.Resolver interface to get application details

  • saturn.EventApplicationStarted - Triggered when an application starts on a remote node.

  • saturn.EventApplicationStopping - Triggered when an application begins stopping on a remote node.

  • satrun.EventApplicationStopped - Triggered when an application is stopped on a remote node.

  • saturn.EventApplicationUnloaded - Triggered when an application is unloaded on a remote node

  • saturn.EventConfigUpdate - The node's configuration was updated

To receive such messages, you need to subscribe to Saturn client events using the LinkEvent or MonitorEvent methods from the gen.Process interface. You can obtain the name of the registered event using the Event method from the gen.Registrar interface. This allows your node to listen for important cluster events like node joins, application starts, configuration updates, and more, ensuring real-time updates and handling of cluster changes.

type myActor struct {
    act.Actor
}

func (m *myActor) HandleMessage(from gen.PID, message any) error {
    reg, e := a.Node().Network().Registrar()
    if e != nil {
	a.Log().Error("unable to get Registrar interface %s", e)
	return nil
    }
    ev, e := reg.Event()
    if e != nil {
	a.Log().Error("Registrar has no registered Event: %s", e)
	return nil
    }
    
    a.MonitorEvent(ev)
    return nil
}

func (m *myActor) HandleEvent(event gen.MessageEvent) error {
    m.Log().Info("got event message: %v", event)
    return nil
}

Using the saturn.EventApplication* events and the Remote Start Application feature, you can dynamically manage the functionality of your cluster. The saturn.EventConfigUpdate events allow you to adjust the cluster configuration on the fly without restarting nodes, such as updating the cookie value for all nodes or refreshing the TLS certificate. Refer to the Saturn - Central Registrar section for more details.

You can also use the Config and ConfigItem methods from the gen.Registrar interface to retrieve configuration parameters from the registrar.

To get information about available applications in the cluster, use the ResolveApplication method from the gen.Resolver interface, which returns a list of gen.ApplicationRoute structures:

type ApplicationRoute struct {
	Node   Atom
	Name   Atom
	Weight int
	Mode   ApplicationMode
	State  ApplicationState
}
  • Name The name of the application

  • Node The name of the node where the application is loaded or running

  • Weight The weight assigned to the application in gen.ApplicationSpec

  • Mode The application's startup mode (gen.ApplicationModeTemporary, gen.ApplicationModePermanent, gen.ApplicationModeTransient)..

  • State The current state of the application (gen.ApplicationStateLoaded, gen.ApplicationStateRunning, gen.ApplicationStateStopping)

You can access the gen.Resolver interface using the Resolver method from the gen.Registrar interface.

func factory_MyWeb() gen.ProcessBehavior {
	return &MyWeb{}
}

type MyWeb struct {
	act.Actor
}

// Init invoked on a start this process.
func (w *MyWeb) Init(args ...any) error {
	// create an HTTP request multiplexer
	mux := http.NewServeMux()

	// create a root handler meta-process
	root := meta.CreateWebHandler(meta.WebHandlerOptions{
		// use process based on act.WebWorker 
		// and spawned with registered name "mywebworker"
		Worker: "mywebworker",
	})
	
	// spawn this meta-process
	rootid, err := w.SpawnMeta(root, gen.MetaOptions{})
	if err != nil {
		w.Log().Error("unable to spawn WebHandler meta-process: %s", err)
		return err
	}
	
	// add our meta-process as a handler of HTTP-requests to the mux
	// since it implements http.Handler interface
	mux.Handle("/", root)
	
	// you can also use your middleware function:
	// mux.Handle("/", middleware(root))
	
	w.Log().Info("started WebHandler to serve '/' (meta-process: %s)", rootid)

	// create and spawn web server meta-process
	// with the mux we created to handle HTTP-requests
	//
	// see below...
	...
	return nil
}
type MyWeb struct {
	act.Actor
}

// Init invoked on a start this process.
func (w *MyWeb) Init(args ...any) error {
	// create an HTTP request multiplexer
	mux := http.NewServeMux()
	
	// create and spawn your handler meta-processes
	// and add them to the mux
	//
	// see above
	...

	// create and spawn web server meta-process
	serverOptions := meta.WebServerOptions{
		Port: 9090,
		Host: "localhost",
		// use node's certificate if it was enabled there
		CertManager: w.Node().CertManager(),
		Handler:     mux,
	}

	webserver, err := meta.CreateWebServer(serverOptions)
	if err != nil {
		w.Log().Error("unable to create Web server meta-process: %s", err)
		return err
	}
	webserverid, err := w.SpawnMeta(webserver, gen.MetaOptions{})
	if err != nil {
		// invoke Terminate to close listening socket
		webserver.Terminate(err)
	}

	w.Log().Info("started Web server %s: use http[s]://%s:%d/", 
			webserverid, 
			serverOptions.Host, 
			serverOptions.Port)
	return nil
}
https://github.com/ergo-services/examples
type MyTCP struct {
	act.Actor
}

func (t *MyTCP) Init(args ...any) error {
	// create meta-process with TCP server
	opt := meta.TCPServerOptions{
		Port: 17171,
	}
	if metatcp, err := meta.CreateTCPServer(opt); err != nil {
		t.Log().Error("unable to create tcp server meta-process: %s", err)
		return err
	}
	
	// spawn meta-process
	if _, err := t.SpawnMeta(metatcp, gen.MetaOptions{}); err != nil {
		t.Log().Error("unable to spawn tcp server meta-process", err)
		// invoke Terminate to close listening socket
		metatcp.Terminate(err)
		return err
	}
	
	return nil
}
type Client struct {
	act.Actor
}

func (c *Client) Init(args ...any) error {
	// create meta-process with tcp connection
	opt := meta.TCPConnectionOptions{
		Port: c.port,
	}
	connection, err := meta.CreateTCPConnection(opt)
	if err != nil {
		c.Log().Error("unable to create tcp connection: %s", err)
		return err
	}
	// start meta-process to serve TCP-connection
	id, err := c.SpawnMeta(connection, gen.MetaOptions{})
	if err != nil {
		c.Log().Error("unable to spawn tcp connection meta process: %s", err)
		connection.Terminate(err)
		return err
	}
}
https://github.com/ergo-services/examples

Node

What is a Node in Ergo Framework?

A Node is the core of the service you create using Ergo Framework. This core includes:

  • Process Management Subsystem: handles the starting/stopping of processes, and the registration of process names/aliases.

  • Message Routing Subsystem: manages the routing of asynchronous messages and synchronous requests between processes.

  • Pub/Sub Subsystem: powers the , functionalities, enabling distributed event handling.

  • Network Stack: provides and , facilitating seamless communication between nodes.

  • Subsystem

Starting a node

To start a node, the first step is to define its name. The name consists of two parts: <name>@<hostname>, where the hostname determines on which network interface the port for incoming connections will be opened.

The node's name must be unique on the host. This means that two nodes with the same name cannot be running on the same host.

Below is an example code for starting a node:

If the gen.NodeOptions.Applications option includes applications when starting the node, they will be automatically loaded and started. However, if any application fails to start, the ergo.StartNode(...) function will return an error, and the node will shut down. Upon a successful start, this function returns the gen.Node interface.

You can also specify environment variables for the node using the Env option in gen.NodeOptions. All processes started on this node will inherit these variables. The gen.Node interface provides the ability to manage environment variables through methods like EnvList(), SetEnv(...), and Env(...). However, changes to environment variables will only affect newly started processes. Environment variable names are case-insensitive.

Additionally, the gen.Node interface provides the following methods:

  • Starting/Stopping Processes: methods like Spawn(...), SpawnRegister(...) allow you to start processes, while Kill(...) and SendExit(...) are used to stop them.

  • Retrieving Process Information: use the ProcessInfo(...) method to get information about a process, and the MetaInfo(...) method to retrieve information about meta-processes.

  • Managing the Node's Network Stack: access the gen.Network interface through the Network() method to manage the node's network operations.

  • Getting Node Uptime: the Uptime() method provides the node's uptime in seconds.

  • General Node Information: use the Info() method to retrieve general information about the node.

  • Sending Asynchronous Messages: the Send(...) method allows you to send asynchronous messages to a process.

The full list of available methods for the gen.Node interface can be found in the reference documentation.

Process management

The mechanism for starting and stopping processes is provided by the node's core. Each process is assigned a unique identifier, gen.PID, which facilitates message routing.

The node also allows you to register a name associated with a process. You can register such a name at the time of process startup by using the SpawnRegister method and specifying the desired name. Alternatively, you can use the RegisterName method from the gen.Node or gen.Process interfaces to assign a name to an already running process.

A process can only have one associated name. If you want to change a process's name, you must first unregister the existing name using the UnregisterName method from the gen.Node or gen.Process interfaces. Upon success, this method returns the gen.PID of the process previously associated with that name.

A process can be terminated by the node by sending it an exit signal. This is done via the SendExit method in the gen.Node interface. If necessary, the node can also forcefully stop a process using the Kill method provided by the gen.Node interface.

Message routing

One of the key responsibilities of a node is message routing between processes. This routing is transparent to both the sender and the receiver processes. If the recipient process is located on a remote node, the node will automatically attempt to establish a network connection to the remote node and deliver the message to the recipient. This is the provided by the Ergo Framework—you don't need to worry about how to send the message or how to encode it, as the node handles all of this automatically and transparently for you.

Message routing is not limited to the gen.PID process identifier. A message can also be addressed using:

  • Local Process Name (gen.Atom): messages can be sent to a local process by referencing its registered name.

  • gen.Alias: this is a unique identifier that can be created by the process itself. It is often used for or as a process temporary identifier. You can read more about this feature in the section.

  • gen.ProcessID: This is used for sending messages by the recipient process's name. The structure contains two fields—Name and Node. It is a convenient way to send a message to a process on a remote node when you do not know the gen.PID or gen.Alias of the remote process.

Through these flexible options, the Ergo Framework provides a robust and seamless message routing system, simplifying communication between processes whether they are local or remote.

Pub/Sub subsystem

The core of the node implements a publisher/subscriber mechanism, which serves as the foundation for process linking, monitoring, and connections with other nodes.

  • Monitoring Functionality: this allows any process to monitor other processes or nodes. In the event that a monitored process stops or the connection to a node is lost, the process that created the monitor will receive a gen.MessageDownPID or gen.MessageDownNode message, respectively.

  • Linking Functionality: similar to monitoring, linking differs in that when a linked process terminates or the connection to a node is lost, the process that created the link will receive an exit signal, causing it to stop as well.

  • Event System: the publisher/subscriber mechanism is also used in the events functionality. It allows any process to register its own event type and act as a producer of those events, while other processes can subscribe to those events.

Thanks to network transparency, the pub/sub subsystem works not only for local processes but also for remote ones.

You can read more about these features in the sections on and .

Network stack

The process of sending a message to a remote node involves several steps:

  1. Connection Establishment: if a connection to the remote node has not yet been created, the node on the host where the target node (owner of the recipient gen.PID, gen.ProcessID, or gen.Alias) is running. The registrar returns the port number on which the target node accepts incoming connections. The connection to this node is then established. This connection remains active until one of the nodes explicitly closes it by calling Disconnect from the gen.RemoteNode interface.

  2. Message Encoding: the message is encoded into the binary .

  3. Data Compression: if compression was enabled for the sender process, the binary data is compressed.

  4. Message Transmission: the message is sent over the network using the ENP protocol.

  5. Message Decoding and Delivery: remote node automatically decodes the received message and ensures its delivery to the recipient process's mailbox.

For more detailed information on network interactions, refer to the section.

Node shutdown

You can stop a node using the Stop() or StopForce() methods from the gen.Node interface.

  • Stop: all processes will be sent an exit signal with the reason gen.TerminateReasonShutdown from the parent process, and the node will wait for all processes to terminate. Once all processes have stopped, the node's network stack and the node itself will be shut down.

  • StopForce: in the case of a forced shutdown, all processes will be terminated using the Kill method from the gen.Node interface, without waiting for their graceful shutdown.

If you call the Stop method of the gen.Node interface from a running process (for example, within the HandleMessagecallback of your actor), this will create a deadlock. The process will remain in the running state and will be unable to terminate, while the Stop method will wait for all processes to stop before shutting down the node.

To avoid this issue, you should either invoke the Stop method in a separate goroutine or use the StopForce method of the gen.Node interface to stop the node from within a process.

import (
    "ergo.services/ergo"
    "ergo.services/ergo/gen"
)

func main() {
    name := "example@localhost"

    // Start node. Returns gen.Node interface
    node, err := ergo.StartNode(name, opts)
    if err != nil {
        panic(err)
    }
    
    node.Wait()
}
event
link, and monitor
service discovery
network transparency
Logging
network transparency
meta-processes
Process
Links and Monitors
Events
queries the registrar
EDF format
Network Stack

Meta-Process

Meta-processes are designed to integrate synchronous objects into the asynchronous model of Ergo Framework. Although they share some similarities with regular processes, they have a different nature and operational characteristics:

  • Meta-Process Identifier: meta-process has a process identifier (of type gen.Alias) and is associated with its parent process. This allows message routing and synchronous requests to be directed to the meta-process.

  • Sending Messages: meta-process can send asynchronous messages to other processes or meta-processes (including remote ones) using the Send method in the gen.MetaProcess interface. However, the sender will always appear as the parent process's gen.PID.

  • Handling Messages: meta-processes can handle asynchronous messages from other processes or meta-processes via the HandleMessage callback method of the gen.MetaBehavior interface.

  • Handling Synchronous Requests: can handle synchronous requests from other processes (including remote ones) using the HandleCall callback method of the gen.MetaBehavior interface.

  • Spawning Other Meta-Processes: can spawn other meta-processes using the Spawn method in the gen.MetaProcess interface.

  • Linking and Monitoring: other processes (including remote ones) can create a link or monitor with the meta-process using the LinkAlias and MonitorAlias methods of the gen.Process interface, referencing the meta-process’s gen.Alias.

Despite these similarities with regular processes, meta-processes have distinct characteristics:

  • Concurrency: meta-process operates with two goroutines. The main goroutine starts when the meta-process is created and handles operations on the synchronous object. The termination of this goroutine leads to the termination of the meta-process. The auxiliary goroutine is launched to handle incoming messages from other processes or meta-processes, and it shuts down when there are no more messages in the meta-process’s mailbox. Therefore, access to the meta-process object’s data is concurrent, as it can be accessed by both goroutines simultaneously. This concurrency must be managed carefully when implementing your own meta-process.

  • Parent Process Dependency: a meta-process can only be started by a process (or another meta-process) using the SpawnMeta method in the gen.Process interface (or the Spawn method in the gen.MetaProcessinterface).

  • No Own Environment Variables: meta-processes do not have their own environment variables. The Env and EnvList methods of the gen.MetaProcess interface return the environment variables of the parent process.

  • Limitations: meta-processes cannot make synchronous requests, create links, or establish monitors themselves.

  • Termination with Parent: when the parent process terminates, the meta-process is also automatically terminated.

Meta-processes offer a way to integrate synchronous objects into the asynchronous system while maintaining compatibility with the process communication model. However, due to their concurrent nature and relationship with the parent process, they require careful handling in certain scenarios.

Ready-to-Use Meta-Processes

Ergo Framework provides several ready-to-use implementations of meta-processes:

  • meta.TCP: allows you to launch a TCP server or create a TCP connection.

  • meta.UDP: for launching a UDP server.

  • meta.Web: for launching an HTTP server and creating an http.Handler based on a meta-process.

  • WebSocket Meta-Process: for working with WebSockets, a WebSocket meta-process is available. Since its implementation has a dependency on github.com/gorilla/websocket, it is provided as a separate package: ergo.services/meta/websocket.

These meta-processes offer pre-built solutions for integrating common networking and web functionality into your application.

Meta-process starting

To start a meta-process, the gen.Process interface provides the SpawnMeta method:

SpawnMeta(behavior gen.MetaBehavior, options genMetaOptions) (gen.Alias, error)

If a meta-process needs to spawn other meta-processes, the gen.MetaProcess interface implements the Spawn method:

Spawn(behavior gen.MetaBehavior, options gen.MetaOptions) (gen.Alias, error)

In the gen.MetaOptions options, you can configure:

  • MailboxSize: size of the meta-process’s mailbox for incoming messages.

  • SendPriority: priority for messages sent by the meta-process.

  • LogLevel: the logging level for the meta-process.

Upon successful startup, these methods return the meta-process identifier, gen.Alias. This identifier can be used to:

  • Send asynchronous messages via the SendAlias method in the gen.Process interface.

  • Make synchronous requests using the CallAlias method in the gen.Process interface.

  • Create links or monitors with the meta-process using the LinkAlias or MonitorAlias methods in the gen.Processinterface.

Meta-process termination

To stop a meta-process, you can send it an exit signal using the SendExitMeta method of the gen.Process interface. When the meta-process receives this signal, it triggers the callback method Terminate from the gen.MetaBehaviorinterface.

Additionally, a meta-process will be terminated in the following cases:

  • The parent process has terminated.

  • The main goroutine of the meta-process has finished (i.e., the Start method of the gen.MetaBehavior interface has completed its work).

  • The HandleMessage (or HandleCall) callback method returned an error while processing an asynchronous message (or a synchronous request).

  • A panic occurred in any method of the gen.MetaBehavior interface.

Implementing Your Own Meta-Process

To create a custom meta-process in Ergo Framework, you need to implement the gen.MetaBehavior interface. This interface defines the behavior of the meta-process and includes the following key methods:

type MetaBehavior interface {
	// callback method for main goroutine
	Start(process MetaProcess) error
	
	// callback methods for the auxiliary goroutine
	HandleMessage(from PID, message any) error
	HandleCall(from PID, ref Ref, request any) (any, error)
	Terminate(reason error)
	HandleInspect(from PID) map[string]string
}
  • Start: this method is called when the meta-process starts. It is responsible for initializing the meta-process and running the main logic. The Start method runs in the main goroutine of the meta-process, and when it finishes, the meta-process is terminated.

  • HandleMessage: this callback is invoked to handle incoming asynchronous messages from other processes or meta-processes. It runs in the auxiliary goroutine that processes the meta-process’s mailbox.

  • HandleCall: this method processes synchronous requests (calls) from other processes or meta-processes. It is also part of the auxiliary goroutine that processes incoming messages.

  • Terminate: called when the meta-process receives an exit signal or when its parent process terminates. It is responsible for cleanup and finalizing the meta-process before it is removed.

Here is an example of implementing a custom meta-process in Ergo Framework - demonstrates how to create a meta-process that reads data from a network connection and sends it to its parent process:

createMyMeta(conn net.Conn) gen.MetaBehavior {
    return &myMeta{
        conn: conn,
    }
}

type myMeta struct {
    gen.MetaProcess  // Embedding the gen.MetaProcess interface
    conn net.Conn.   // Network connection
}

func (mm *myMeta) Start(meta gen.MetaProcess) error {
    // Assign the meta-process instance to the embedded interface
    mm.MetaProcess = meta
    
    // Ensure that the socket is closed upon exit
    defer close(mm.conn)
    
    // Read data from the socket in a loop
    for {
        buf := make([]byte, 1024)
        n, err mm.conn.Read(buf)
        if err != nil {
            return err // Terminate the meta-process on error
        }
        // Send the data to the parent process
        mm.Send(mm.Parent(), buf[:n])
    }
    return nil // meta-process terminated
}

In this example, we embed the gen.MetaProcess interface within the myMeta struct. This allows us to use the methods of gen.MetaProcess (like Send, Parent, and Log) inside the callback methods defined by gen.MetaBehavior.

func (mm *myMeta) HandleMessage(from gen.PID, message any) error {
    // Log information about the parent process
    mm.Log().Info("my parent process is %s", mm.Parent())
    // Handle received messages as byte slice
    if data, ok := message.([]byte); ok {
        // Write the data back to the connection
        mm.conn.Write(data)
        return nil
    }
    // Log if the message is of an unknown type
    mm.Log().Info("got unknown message %#v", message)
    return nil
}

func (mm *myMeta) Terminate(reason error) {
    // Close the connection when the meta-process is terminated
    close(mm.conn)
}

The HandleMessage method processes asynchronous messages sent to the meta-process. It checks the type of the message and takes appropriate actions, such as writing data to the connection. If the method returns an error, it causes the termination of the meta-process, triggering the Terminate method to handle the cleanup.

The Terminate method ensures that resources like the network connection are properly released when the meta-process ends. If the meta-process encounters an error in HandleMessage or HandleCall, or if any other termination condition occurs, the Terminate method will be called to finalize the shutdown process.

List of available methods in the gen.MetaProcess Interface:

type MetaProcess interface {
    ID() Alias           // Returns the ID (gen.Alias) of this meta-process
    Parent() PID         // Returns the PID of the parent process
    Send(to any, message any) error // Sends an asynchronous message to another process or meta-process
    Spawn(behavior MetaBehavior, options MetaOptions) (Alias, error) // Spawns a new meta-process
    Log() Log            // Provides the gen.Log interface for logging
}

Application

In Ergo Framework, an Application is a component that allows you to group a set of actors (and/or supervisors) and manage them as a single entity. The Application is responsible for starting each actor within the group and handling dependencies on other Applications if specified in the configuration. Additionally, the Application monitors the running processes and, depending on the mode chosen during startup, may stop all group members and itself if one of the processes terminates for any reason.

Create Application

To create an Application in Ergo Framework, the gen.ApplicationBehavior interface is provided:

type ApplicationBehavior interface {
	// Load invoked on loading application using method ApplicationLoad 
	// of gen.Node interface.
	Load(node Node, args ...any) (ApplicationSpec, error)
	// Start invoked once the application started
	Start(mode ApplicationMode)
	// Terminate invoked once the application stopped
	Terminate(reason error)
}

All methods of the gen.ApplicationBehavior interface must be implemented.

For example, if you want to combine an actor A1 and a supervisor S1 into a single application, the code for creating such an Application would look like this:

func createApp() gen.ApplicationBehavior {
	return &app{}
}

type app struct{}

func (a *app) Load(node gen.Node, args ...any) (gen.ApplicationSpec, error) {
	return gen.ApplicationSpec{
		Name: "my_app",
		Group: []gen.ApplicationMemberSpec{
			{
				Name:    "a1", // start with registered name
				Factory: factory_A1,
			},
			{
				// omitted field Name. will be started with no name
				Factory: factory_S1,
			},
		},
	}, nil

}
func (a *app) Start(mode gen.ApplicationMode) {} // no need? keep it empty
func (a *app) Terminate(reason error)         {}

...
myApp := createApp()
appName, err := node.ApplicationLoad(myApp)

Since an Application, by its nature, is not a process, its startup is divided into two stages: loading and starting.

To load the application, the gen.Node interface provides the method ApplicationLoad. You pass your implementation of the ApplicationBehavior interface to this method. The node will then call the Load callback method of the gen.ApplicationBehavior interface, which must return the application's specification as a gen.ApplicationSpec:

type ApplicationSpec struct {
	Name        Atom
	Description string
	Version     Version
	Depends     ApplicationDepends
	Env         map[Env]any
	Group       []ApplicationMemberSpec
	Mode        ApplicationMode
	Weight      int
	LogLevel    LogLevel
}

In the gen.ApplicationSpec structure, the Name field defines the name under which your application will be registered. This name must be unique within the node. However, the registry for application names and the registry for process names are separate. Although it is possible to run a process with a name identical to the application name, this could lead to confusion, so care should be taken when choosing names for both processes and applications. The Description field is optional.

The Group field contains a group of specifications for launching processes, defined by gen.ApplicationMemberSpec. If the Name field in gen.ApplicationMemberSpec is not empty, the process will be launched using the SpawnRegistermethod of the gen.Node interface. The processes in the group are launched sequentially, following the order specified in the Group field.

The Env field is used to set environment variables for all processes started within the application. Environment variables are inherited in the following order: Node > Application > Parent > Process.

If your application depends on other applications or network services, you can specify these dependencies using the Depends field. These dependencies are taken into account when starting the application.

The Mode field defines the startup mode for the application and its behavior if one of the processes in the group stops. This value serves as the default mode when the application is started using the ApplicationStart(...) method of the gen.Node interface. However, the startup mode can be overridden using the methods ApplicationStartTransient, ApplicationStartTemporary, or ApplicationStartPermanent of the gen.Node interface.

Finally, the LogLevel field allows you to set the logging level for the entire group of processes. If this option is not specified, the node's logging level will be used. However, individual processes can override this setting by specifying it in the ApplicationMemberSpec.Options during the process startup specification.

Application Startup Modes

Ergo Framework provides three startup modes for applications, each determining how the application behaves when one of its processes stops:

  1. gen.ApplicationModeTemporary: This is the default startup mode if no specific mode is indicated in the application's specification (gen.ApplicationSpec). In this mode, the termination of any individual process (for any reason) does not lead to the shutdown of the entire application. The application will only stop when all the processes in the application group, as defined in the Group field of the gen.ApplicationSpec, have stopped. The application will always stop with the reason gen.TerminationReasonNormal.

  2. gen.ApplicationModeTransient: The application stops only when one of its processes terminates unexpectedly (with a reason other than gen.TerminationReasonNormal or gen.TerminationReasonShutdown). In such cases, all running processes will receive an exit signal with the reason gen.TerminationReasonShutdown. Once all processes have stopped, the application will terminate with the reason that caused the initial process failure. If all processes complete normally, the application will stop with the reason gen.TerminationReasonNormal.

  3. gen.ApplicationModePermanent: In this mode, the termination of any process in the application, for any reason, will trigger the shutdown of the entire application. All running processes will receive an exit signal with the reason gen.TerminationReasonShutdown. Once all processes have stopped, the application will terminate with the reason that caused the shutdown. If all processes complete without errors, the application will stop with the reason gen.TerminationReasonNormal.

Processes that are part of the application group can launch child processes. These child processes will inherit the application's attributes, but their termination does not affect the application's logic; their role is informational. You can determine whether a process belongs to an application using the Info method of the gen.Process interface or the ProcessInfo method of the gen.Node interface. Both methods return a gen.ProcessInfo structure, which contains the Application field indicating the process's application affiliation.

Starting an Application

In Ergo Framework, the gen.Node interface provides several methods to start an application:

  • ApplicationStart: Starts the loaded application in the mode specified by gen.ApplicationSpec.Mode.

  • ApplicationStartTemporary: Starts the application in gen.ApplicationModeTemporary, regardless of the mode specified in the application's specification.

  • ApplicationStartTransient: Starts the application in gen.ApplicationModeTransient, regardless of the mode specified in the application's specification.

  • ApplicationStartPermanent: Starts the application in gen.ApplicationModePermanent, regardless of the mode specified in the application's specification.

Before starting the application's processes, the node checks the dependencies specified in gen.ApplicationSpec.Depends. If the application depends on other applications, the node will attempt to start them first. Therefore, all dependent applications must either already be loaded or running before starting the application. If the dependencies are not met, the method will return the error gen.ErrApplicationDepends.

Once all dependencies are satisfied, the node starts launching the processes according to the order specified in gen.ApplicationSpec.Group. If any process fails to start, all previously started processes will be forcibly stopped using the Kill method of the gen.Node interface, and the startup process will return an error.

After all processes are successfully started, the Start callback method of the gen.ApplicationBehavior interface is called, and the application transitions to the running state.

You can retrieve information about the application using the ApplicationInfo method of the gen.Node interface. This method returns a gen.ApplicationInfo structure that contains summary information about the application and its current state.

Stopping an Application

To stop an application, the gen.Node interface provides the ApplicationStop method. When this method is called, the application sends an exit signal to all of its processes with the reason gen.TerminateReasonShutdown and waits for them to stop completely.

If the processes do not stop within 5 seconds, a timeout error will be returned. If the ApplicationStop method is called again while the application is still waiting for its processes to stop, it will return the error gen.ErrApplicationStopping.

For forcefully stopping the application's processes, the ApplicationStopForce method is available in the gen.Nodeinterface. In this case, all of the application's processes will be forcibly terminated using the Kill method of the gen.Nodeinterface.

Additionally, the gen.Node interface provides the ApplicationStopWithTimeout method, which allows you to specify a custom timeout period for the application to stop.

Once all of the application's processes have stopped, the node will call the Stop method of the gen.ApplicationBehavior interface, completing the shutdown process.

WebSocket

This package implements functionality for working with WebSocket connections through two meta-processes: WebSocket Handler (for handling HTTP requests to create WebSocket connections) and WebSocket Connection (for managing the established WebSocket connection). This implementation depends on the external library https://github.com/gorilla/websocket, which is why it is placed in a separate package "ergo.services/meta/websocket". You can find the source code in the additional meta-process library repository at https://github.com/ergo-services/meta.

WebSocket Handler

To handle incoming WebSocket connections, use the CreateHandler function. This function returns an object implementing gen.MetaProcess and http.Handler, needed for launching the meta-process and processing HTTP requests. It accepts websocket.HandlerOptions as arguments:

  • ProcessPool: Specifies a list of process names to handle WebSocket messages.

  • HandshakeTimeout: Limits the time for the upgrade process.

  • EnableCompression: Enables compression for WebSocket connections.

  • CheckOrigin: Allows setting a function to verify the request's origin.

After creation, start the meta-process with SpawnMeta. Each connection launches a new WebSocket Connection meta-process for handling.

WebSocket Connection

This package implements the functionality of a meta-process for handling a WebSocket connection. It also allows initiating a WebSocket connection. To initiate a connection, use the CreateConnection function, which takes websocket.ConnectionOptions as an argument:

type ConnectionOptions struct {
	Process           gen.Atom
	URL               url.URL
	HandshakeTimeout  time.Duration
	EnableCompression bool
}

This structure is similar to websocket.HandlerOptions but includes an additional field, URL, where the WebSocket server address must be specified. For example:

opt := websocket.ClientOptions{
	URL: url.URL{Scheme: "ws", Host: "localhost:8080", Path: "/ws"},
}

During the execution of the CreateConnection function, it attempts to establish a WebSocket connection with the specified server.

After creating the meta-process, you must launch it using the SpawnMeta method from the gen.Process interface. If an error occurs while launching the meta-process, you need to call the Terminate method from the gen.MetaBehavior interface to close the WebSocket connection created by CreateConnection:

func(a *MyActor) HandleMessage(from gen.PID, message any) error {
    // ...
    opt := websocket.ConnectionOptions{
	URL: url.URL{Scheme: "ws", Host: "localhost:8080", Path: "/ws"},
    }
    wsconn, err := websocket.CreateConnection(opt)
    if err != nil {
        a.Log().Error("unable to connect to %s: %s", opt.URL.String(), err)
        return nil
    }
    id, err := a.SpawnMeta(wsconn, gen.MetaOptions{})
    if err != nil {
        // unable to spawn meta-process. need to close the connection
        wsconn.Terminate(err)
        a.Log().Error("unable to spawn meta-process: %s", err)
        return nil
    }
    a.Log().Info("spawned meta-process %s to handle connection with %s", id, opt.URL.String())
    // ...
    return nil
}

WebSocket messages

When working with WebSocket connections, three types of messages are used:

  • websocket.MessageConnect - Sent to the process when the meta-process handling the WebSocket connection is started:

    type MessageConnect struct {
    	ID         gen.Alias
    	RemoteAddr net.Addr
    	LocalAddr  net.Addr
    }

    The ID field in websocket.MessageConnect contains the identifier of the meta-process handling the WebSocket connection

  • websocket.MessageDisconnect - is sent to the process when a WebSocket connection is disconnected:

    type MessageDisconnect struct {
    	ID         gen.Alias
    }

    The ID field in websocket.MessageDisconnect contains the identifier of the meta-process that handled the WebSocket connection. After the connection is terminated, the meta-process finishes its work and sends this message during its termination phase, signaling that the WebSocket connection has been closed

  • websocket.Message - is used for both receiving and sending WebSocket messages:

    type Message struct {
    	ID   gen.Alias
    	Type MessageType
    	Body []byte
    }

    ID - identifier of the meta-process handling the WebSocket connection. Body - the message's body. Type - type of WebSocket message, according to the WebSocket specification. Several message types are available:

    const (
    	MessageTypeText   MessageType = 1
    	MessageTypeBinary MessageType = 2
    	MessageTypeClose  MessageType = 8
    	MessageTypePing   MessageType = 9
    	MessageTypePong   MessageType = 10
    )

If the meta-process fails to send a message to the process, the WebSocket connection is terminated, and the meta-process itself also terminates.

Sending a WebSocket-message

To send a message over a WebSocket connection, use the Send (or SendAlias) method from the gen.Process interface. The recipient should be the gen.Alias of the meta-process handling the WebSocket connection.

The message must be of type websocket.Message. If the Type field is not explicitly set, the default is websocket.MessageTypeText. The websocket.Message.ID field is ignored when sending (it's used only for incoming messages).

For a complete implementation of a WebSocket server, see the project in the repository https://github.com/ergo-services/examples, the websocket project:

Network Stack

The Ergo Framework's network stack implements network transparency using three components:

Registrar - Manages node registration, including applications and additional parameters to help nodes find each other and establish connections. See the Service Discovering section for more details.

EDF (Ergo Data Format) - A data format for network communication that automatically encodes and decodes messages when sent to remote nodes.

ENP (Ergo Network Protocol) - A protocol for network interaction, facilitating asynchronous message exchange, synchronous requests, and remote process/application initiation between nodes.

Network Options

Network stack parameters can be specified at node startup in gen.NodeOptions.Network. This field, of type gen.NetworkOptions, allows you to configure the following:

  • Mode: default is gen.NetworkModeEnabled. Use gen.NetworkModeHidden to disable incoming connections, while still allowing outgoing ones. Use gen.NetworkModeDisabled to completely disable networking.

  • Cookie: sets a secret password for access control, used for both incoming and outgoing connections.

  • MaxMessageSize: defines the maximum size of network messages.

  • Flags: specify features available to remote nodes, like process or application launching.

  • Registrar: by default, the node uses the built-in implementation (ergo.services/ergo/net/registrar). For central registrar Saturn or for Erlang clusters, use respective libraries (Saturn or EPMD).

  • Handshake: connection setup starts with remote node authentication (cookie verification) and protocol parameter exchange. Default uses the built-in implementation (ergo.services/ergo/net/handshake).

  • Proto: default protocol is Ergo Framework's ENP (ergo.services/ergo/net/proto), but other protocols like the Erlang stack's DIST can be used.

  • Acceptors: defines a set of acceptors for the node ([]gen.AcceptorOptions). Each acceptor can configure port, TLS encryption, message size limits, and a custom cookie different from gen.NetworkOptions.Cookie. You can also configure different Registrar/Handshake/Proto sets for each acceptor, enabling simultaneous use of multiple network stacks, such as Ergo Framework and Erlang.

Establishing a connection with a remote node

To connect to a remote node, simply send a message to a process running on that node. The node will automatically attempt to establish a network connection before sending the message.

You can also establish a connection explicitly by calling the GetNode method from the gen.Network interface. For example:

func main() {
    ...
    node, err := ergo.StartNode("node1@localhost", gen.NodeOptions{})
    ...
    remote, err := node.Network().GetNode("node2@localhost")
    ...
}

The GetNode method returns the gen.RemoteNode interface, which provides information about the remote node. Additionally, it allows you to launch a process or application on that remote node:

type RemoteNode interface {
	Name() Atom
	Uptime() int64
	ConnectionUptime() int64
	Version() Version
	Info() RemoteNodeInfo

	Spawn(name Atom, options ProcessOptions, args ...any) (PID, error)
	SpawnRegister(register Atom, name Atom, options ProcessOptions, args ...any) (PID, error)

	// ApplicationStart starts application on the remote node.
	// Starting mode is according to the defined in the gen.ApplicationSpec.Mode
	ApplicationStart(name Atom, options ApplicationOptions) error

	// ApplicationStartTemporary starts application on the remote node in temporary mode
	// overriding the value of gen.ApplicationSpec.Mode
	ApplicationStartTemporary(name Atom, options ApplicationOptions) error

	// ApplicationStartTransient starts application on the remote node in transient mode
	// overriding the value of gen.ApplicationSpec.Mode
	ApplicationStartTransient(name Atom, options ApplicationOptions) error

	// ApplicationStartPermanent starts application on the remote node in permanent mode
	// overriding the value of gen.ApplicationSpec.Mode
	ApplicationStartPermanent(name Atom, options ApplicationOptions) error
	
	Creation() int64
	Disconnect()
}

For more details about remote process and application spawning, refer to the Remote Spawn Process and Remote Start Application sections.

You can also use the Node method from the gen.Network interface. It returns a gen.RemoteNode if a connection to the remote node exists; otherwise, it returns gen.ErrNoConnection.

To establish a connection with a remote node using specific parameters, use the GetNodeWithRoute method from the gen.Network interface. This method allows for more customized network connection settings.

Network Stack Interfaces

This section explains the internal mechanisms by which a node interacts with the network stack. Understanding these can help you implement a custom network stack or components, such as a modified version of gen.NetworkHandshake for customized authentication.

For incoming connections, the node starts acceptors with specified parameters. For outgoing connections, it independently creates TCP connections using parameters from the registrar (see Service Discovering). After establishing the TCP connection, the node leverages several interfaces to operate the network stack:

gen.NetworkHandshake

At the first stage, the node works with the gen.NetworkHandshake interface. If a TCP connection was created by an acceptor, the Accept method is called, initiating the remote node's authentication and the exchange of parameters for the network protocol. If the TCP connection was initiated by the node itself, the Start method is used, which handles the authentication process and the exchange of network protocol parameters.

The Join method is used to combine multiple TCP connections. In the ENP protocol, by default, 3 TCP connections are created and merged into one virtual connection between nodes, which improves network protocol performance. You can set the size of the TCP connection pool using the PoolSize parameter in handshake.Options (ergo.services/ergo/net/handshake). The Erlang network stack's DIST protocol does not support this feature.

type NetworkHandshake interface {
	NetworkFlags() NetworkFlags
	// Start initiates handshake process.
	Start(NodeHandshake, net.Conn, HandshakeOptions) (HandshakeResult, error)
	// Join attempts to join new TCP-connection to the existing connection 
	// to remote node
	Join(NodeHandshake, net.Conn, id string, HandshakeOptions) ([]byte, error)
	// Accept initiate handshake process for the connection created by remote node.
	Accept(NodeHandshake, net.Conn, HandshakeOptions) (HandshakeResult, error)
	// Version
	Version() Version
}

gen.NetworkProto

After successfully completing the handshake procedure, the node transfers control of the TCP connection to the network protocol using the gen.NetworkProto interface. This occurs in two stages: first, the node calls the NewConnectionmethod, which returns the gen.Connection interface for the created connection. The node then registers this connection in its internal routing mechanism with the remote node's name. If a connection with the same name already exists, the node automatically closes the TCP connection, as only one connection can exist between two nodes.

Upon successful registration, the node calls the Serve method. The completion of this method indicates the termination of the connection with the remote node, effectively closing the connection.

type NetworkProto interface {
	// NewConnection
	NewConnection(core Core, result HandshakeResult, log Log) (Connection, error)
	// Serve connection. Argument dial is the closure to create TCP connection with invoking
	// NetworkHandshake.Join inside to shortcut the handshake process
	Serve(conn Connection, dial NetworkDial) error
	// Version
	Version() Version
}

gen.Connection

The gen.Connection interface is used by the node for routing messages and requests to the remote node with which a connection has been established. If you are developing your own protocol, you will need to implement all the methods of this interface to handle communication between nodes.

type Connection interface {
	Node() RemoteNode

	// Methods for sending async message to the remote process
	SendPID(from PID, to PID, options MessageOptions, message any) error
	SendProcessID(from PID, to ProcessID, options MessageOptions, message any) error
	SendAlias(from PID, to Alias, options MessageOptions, message any) error

	SendEvent(from PID, options MessageOptions, message MessageEvent) error
	SendExit(from PID, to PID, reason error) error
	SendResponse(from PID, to PID, ref Ref, options MessageOptions, response any) error

	// target terminated
	SendTerminatePID(target PID, reason error) error
	SendTerminateProcessID(target ProcessID, reason error) error
	SendTerminateAlias(target Alias, reason error) error
	SendTerminateEvent(target Event, reason error) error

	// Methods for sending sync request to the remote process
	CallPID(ref Ref, from PID, to PID, options MessageOptions, message any) error
	CallProcessID(ref Ref, from PID, to ProcessID, options MessageOptions, message any) error
	CallAlias(ref Ref, from PID, to Alias, options MessageOptions, message any) error

	// Links
	LinkPID(pid PID, target PID) error
	UnlinkPID(pid PID, target PID) error

	LinkProcessID(pid PID, target ProcessID) error
	UnlinkProcessID(pid PID, target ProcessID) error

	LinkAlias(pid PID, target Alias) error
	UnlinkAlias(pid PID, target Alias) error

	LinkEvent(pid PID, target Event) ([]MessageEvent, error)
	UnlinkEvent(pid PID, targer Event) error

	// Monitors
	MonitorPID(pid PID, target PID) error
	DemonitorPID(pid PID, target PID) error

	MonitorProcessID(pid PID, target ProcessID) error
	DemonitorProcessID(pid PID, target ProcessID) error

	MonitorAlias(pid PID, target Alias) error
	DemonitorAlias(pid PID, target Alias) error

	MonitorEvent(pid PID, target Event) ([]MessageEvent, error)
	DemonitorEvent(pid PID, targer Event) error

	RemoteSpawn(name Atom, options ProcessOptionsExtra) (PID, error)

	Join(c net.Conn, id string, dial NetworkDial, tail []byte) error
	Terminate(reason error)
}

Supervisor

A supervisor in Ergo Framework is a specialized actor responsible for managing child processes. Its main tasks include starting child processes, monitoring their life cycles, and restarting them if necessary according to the chosen .

act.Supervisor implements the low-level gen.ProcessBehavior interface. Similar to act.Actor, it has the embedded gen.Process interface. This means that an object based on act.Supervisor has access to all the methods provided by the gen.Process interface.

To create a supervisor actor, you simply embed act.Supervisor into your object. Additionally, you need to create a factory function for the supervisor. Here is an example:

To work with your object, act.Supervisor uses the act.SupervisorBehavior interface. This interface defines a set of callback methods for initialization, handling asynchronous messages, and handling synchronous calls:

The Init method is mandatory for implementation, while the other methods are optional. This method returns the supervisor specification act.SupervisorSpec, which includes the following options:

  • Type: The type of supervisor (act.SupervisorType).

  • Children: The list of child process specifications ([]act.SupervisorChildSpec). This parameter also determines the order of child process startups.

  • Restart: Defines the restart strategy for child processes.

  • EnableHandleChild: Enables the invocation of the callback methods HandleChildStart and HandleChildStop during the startup and shutdown of child processes.

  • DisableAutoShutdown: Prevents the supervisor from stopping if all child processes terminate non-faultily.

Here is an example implementation of the act.SupervisorBehavior interface for the supervisor MySupervisor with one child process MyActor:

Supervisor Types

The type of supervisor in the SupervisorSpec defines its behavior. In Ergo Framework, several supervisor types are implemented:

act.SupervisorTypeOneForOne

This is the default supervisor type. If the type is not specified in the SupervisorSpec, this one will be used. Upon startup, the supervisor sequentially starts child processes as specified in act.SupervisorSpec.Children. If an error occurs while starting the child processes, the supervisor process will terminate.

Each child process is launched with the associated name defined in the Name option of the child process specification act.SupervisorChildSpec. This means that only one process can be started for each child process specification. If this field is empty, the supervisor will use the Spawn method from the gen.Process interface. Otherwise, the SpawnRegister method will be used.

Stopping a child process triggers the restart strategy for that process. Depending on the restart strategy defined by act.SupervisorSpec.Restart.Strategy, the process will either be restarted or stopped. The does not affect other processes.

The child process is restarted with the same set of arguments that were used in its previous startup.

The KeepOrder option in act.SupervisorSpec.Restart is ignored for this supervisor type. When a new child process specification is added via the AddChild method (provided by act.Supervisor), the supervisor automatically starts the new child process. When a child specification is disabled (using method) DisableChild, the supervisor stops the corresponding child process with the reason gen.TerminateReasonShutdown. The supervisor will automatically terminate if no child processes remain (with the reasons gen.TerminateReasonNormal or gen.TerminateReasonShutdown). You can enable the DisableAutoShutdown option in act.SupervisorSpec to prevent this.

act.SupervisorTypeOneForAll

When starting, this supervisor sequentially launches child processes according to the list of child process specifications. If an error occurs during the startup of child processes, the supervisor process terminates.

A child process is started with the associated name defined in the Name option of act.SupervisorChildSpec.

Stopping a child process in this supervisor triggers the for all child processes according to act.SupervisorSpec.Restart.Strategy.

If the KeepOrder option is enabled, child processes are stopped in reverse order. Without KeepOrder, they are stopped simultaneously. After stopping, the supervisor restarts them in the specified order.

Each child process is restarted with the same arguments as in the previous run.

When a new child process specification is added (using AddChild), the supervisor automatically starts the child process. When disabling (DisableChild), the supervisor stops the corresponding child process with the reason gen.TerminateReasonShutdown.

The DisableAutoShutdown option allows the supervisor to continue running even if all child processes have stopped (terminated with gen.TerminateReasonNormal or gen.TerminateReasonShutdown).

act.SupervisorTypeRestForOne

When starting, this supervisor sequentially launches child processes according to the list of child process specifications. If an error occurs during the startup of child processes, the supervisor process terminates.

A child process is started with the associated name defined in the Name option of act.SupervisorChildSpec.

When a child process terminates, the is activated. All child processes starting from the last one (according to the order in act.SupervisorSpec.Children) to the process that triggered the restart are stopped.

With the KeepOrder option enabled, child processes are stopped sequentially in reverse order. If the option is disabled, child processes stop simultaneously.

After all child processes are stopped, the supervisor restarts them according to the order specified in act.SupervisorSpec.Children.

Each child process is restarted with the same set of arguments used in the previous startup.

Each child process is launched with the associated name defined in the Name option of the child process specification act.SupervisorChildSpec.

act.SupervisorTypeSimpleOneForOne

This type of supervisor is a simplified version of act.SupervisorTypeOneForOne. It does not start child processes on startup. Instead, child processes are started using the StartChild method.

As in act.SupervisorTypeOneForOne, the termination of a child process triggers the for that process. Depending on the strategy defined in act.SupervisorSpec.Restart.Strategy, the process will either be restarted or stopped. The restart strategy does not affect other processes.

Child processes in this supervisor are launched without an associated name, and multiple processes can be started from one child process specification.

The KeepOrder option in act.SupervisorSpec.Restart is ignored for this supervisor type.

The DisableAutoShutdown option is also ignored, and stopping all child processes does not result in the termination of the supervisor process.

Child Process Specification

This specification describes the parameters for starting a child process. This is done using act.SupervisorChildSpec:

  • Name: The specification's name, used as the associated name for the process (except for act.SupervisorTypeSimpleOneForOne).

  • Factory, Options, Args: Parameters needed to launch a new process.

  • Significant: Determines the importance of the child process. The termination of a significant process may lead to the supervisor and all child processes stopping, depending on the restart strategy:

    • act.SupervisorStrategyTemporary: Termination of a significant child process leads to the termination of the supervisor and all children.

    • act.SupervisorStrategyTransient: Non-crash termination (e.g., gen.TerminateReasonNormal or gen.TerminateReasonShutdown) of a significant child process leads to the termination of the supervisor and all children, while a crash triggers the restart strategy.

    • act.SupervisorStrategyPermanent: The Significant field is ignored, and termination always triggers the restart strategy.

Additionally, the Significant field is ignored in supervisors of types act.SupervisorTypeOneForOne and act.SupervisorSimpleOneForOne.

Restart Strategy

The Restart option in act.SupervisorSpec is of type act.SupervisorRestart and defines parameters for the restart strategy:

  • Strategy: Specifies the restart strategy type:

    • act.SupervisorStrategyTransient: A crash triggers a restart. If the process terminates normally (e.g., gen.TerminateReasonNormal or gen.TerminateReasonShutdown), the restart strategy is not activated. This is the default strategy.

    • act.SupervisorStrategyTemporary: No restart occurs, even in the event of a crash.

    • act.SupervisorStrategyPermanent: Any termination triggers a restart, ignoring DisableAutoShutdown.

  • Intensity and Period: Define the maximum number of restart activations allowed within a given period (in seconds). If this limit is exceeded, the supervisor terminates all child processes and itself, with the reason act.ErrSupervisorRestartsExceeded.

  • KeepOrder: Specifies the order in which child processes are stopped when the restart strategy is activated. If this option is enabled, processes will be stopped sequentially in reverse order. By default, this option is disabled, meaning that all child processes are stopped simultaneously. Processes are restarted only after all child processes have fully stopped (either all processes for act.SupervisorTypeAllForOne or part of the processes for act.SupervisorTypeRestForOne).

Methods of act.Supervisor

  • StartChild(name gen.Atom, args...): Starts a child process by its specification name. If arguments (args) are provided, they are updated in the specification for future restarts.

  • AddChild(child act.SupervisorChildSpec): Adds a child specification and automatically starts the process.

  • EnableChild(name gen.Atom): Enables and starts a previously disabled child process.

  • DisableChild(name gen.Atom): Disables the specification and stops the running process.

  • Children(): Returns a list of []act.SupervisorChild with data on specifications and running processes, sorted according to the child process specification list.

During initialization, the supervisor defines the initial set of child processes, the supervisor type, and the restart strategy for the child processes. act.Supervisor provides several methods for managing child processes dynamically and obtaining information about running processes:

Logging

In Ergo Framework, a flexible logging system is implemented that allows multiple loggers to exist within a node. These loggers can be configured to receive specific logging levels.

For logging purposes, the gen.Node and gen.Process interfaces provide the Log method. This method returns the gen.Log interface, which is used for logging messages within the framework.

Using the gen.Log interface, you can manage the logging level not only for the node but also for each individual process (including meta-processes). There are six standard logging levels that can be set using the SetLevel method:

  • gen.LogLevelDebug

  • gen.LogLevelInfo

  • gen.LogLevelWarning

  • gen.LogLevelError

  • gen.LogLevelPanic

  • gen.LogLevelDisabled

Additionally, there are specialized logging levels:

  • gen.LogLevelDefault: This is the default logging level. When a node starts with this level, it defaults to gen.LogLevelInfo. For applications or processes, the logging level inherits from the node. Meta-processes inherit the logging level of their parent process.

  • gen.LogLevelTrace: This level is used exclusively for deep debugging. To avoid accidental activation, this level cannot be set through the SetLevel method of the gen.Log interface. Instead, it can only be enabled during the startup of a node or process through gen.NodeOptions.Log.Level or gen.ProcessOptions.LogLevel.

By default, the logging level for the node is set to gen.LogLevelInfo. Processes inherit the node's logging level at startup unless explicitly set in gen.ProcessOptions.

The SetLogger method allows you to restrict logging to a specified logger. If no logger is specified, the log message will be delivered to all registered loggers. You can retrieve the list of registered loggers using the Loggers method of the gen.Node interface.

The gen.LogLevelDisabled level can be used to temporarily disable logging, either for the entire node or a specific process.

Furthermore, the gen.Node interface provides two methods for process-specific logging control:

  • ProcessLogLevel(pid gen.PID): Retrieves the current logging level of a specific process.

  • SetProcessLogLevel(pid gen.PID, level gen.LogLevel): Sets the desired logging level for a specific process.

Logger

By default, a node in Ergo Framework uses a standard logger, and its parameters can be configured during node startup via gen.NodeOptions.Log.DefaultLogger. You have the option to disable the default logger and use a process-based logger or integrate one (or more) loggers from an additional logging library.

If you wish to create your own logger, you simply need to implement the gen.LoggerBehavior interface. This interface defines the behavior of a logger, allowing you to customize the logging system according to your application's requirements. By implementing this interface, you can control how log messages are processed, formatted, and routed, providing flexibility beyond the default logging system.

To add a new logger, the gen.Node interface provides two methods:

  • LoggerAdd(name string, logger gen.LoggerBehavior, filter ...gen.LogLevel): This method allows you to add a logger (an object implementing the gen.LoggerBehavior interface) to the node's logging system.

  • LoggerAddPID(pid gen.PID, name string, filter ...gen.LogLevel): This method adds a process as a logger. In this case, the node sends log messages to the specified process. The act.Actor class provides a callback method HandleLog for handling such log messages.

Both methods accept a filter argument, which allows you to specify a set of log levels that the logger should handle. For example, if you want your logger to only receive messages at the gen.LogLevelPanic and gen.LogLevelInfo levels, you can list them in the filter:

If the filter argument is not specified, the default set of log levels, gen.DefaultLogFilter, will be used:

The logging methods of the gen.LoggerBehavior interface will only be invoked for messages that match the specified logging levels.

Reusing a logger name is not allowed. If a logger with the same name already exists, the LoggerAdd or LoggerAddPIDfunction will return the error gen.ErrTaken. To resolve this, you must first remove the existing logger from the system using the appropriate method: LoggerDelete or LoggerDeletePID from the gen.Node interface.

When a logger is removed from the system, the Terminate callback method of the gen.LoggerBehavior interface is called.

The methods of the gen.LoggerBehavior interface are called synchronously by the node or process. If you are implementing your own logger, it is important to keep this in mind and avoid blocking these calls with complex or time-consuming log message processing.

To mitigate this issue, it is recommended to consider using a In this case, all log messages are placed in the process's mailbox and processed asynchronously, ensuring that the gen.LoggerBehavior interface methods are not blocked by the logging logic. This approach allows for more efficient handling of log messages without impacting the overall performance of the node or process.

Default logger

When starting a node, you can specify logging parameters using the gen.NodeOptions.Log.DefaultLogger option. These parameters will be applied to the default logger, which outputs all log messages in text format to a specified output (by default, os.Stdout). With these options, you can configure:

  • Output: The interface used for log message output. To direct log messages to both standard output and a file simultaneously, you can use the io.MultiWriter function from Golang's standard library:

  • TimeFormat: Defines the format for the timestamp of log messages. You can use any existing format, such as time.DateTime (see ), or define your own custom format. By default, the timestamp is output in nanoseconds.

  • IncludeBehavior: Adds the behavior of the process or meta-process to the log message.

  • IncludeName: Adds the registered name of the process to the log message, if it exists.

  • Filter: Specifies the logging levels that the default logger will handle. Leave this field empty to use the default set of levels (gen.DefaultLogFilter).

  • Disable: Disables the default logger.

Here is an example of how the standard logger's output might look with these options applied:

The first field is the timestamp of the log message, the second field is the log level, and the third field is the source of the message. The sources can come from various interfaces:

  • gen.Node.Log(): Uses the node's name in the form of a CRC32 hash. Example: 9F35C982

  • gen.Process.Log(): The string representation of the process identifier gen.PID. Example: <9F35C982.0.1013>

  • gen.MetaProcess.Log(): The string representation of the meta-process identifier gen.Alias. Example: Alias#<9F35C982.123663.24065.0>

  • gen.Connection.Log(): String representations of the local and remote nodes. Example: 9F35C982-90A29F11

Process-logger

If you want to manage log message streams using the familiar actor model, any process in a running node can serve as a logger. In the act.Actor implementation, received log messages are passed as the message argument to the HandleLog callback method.

To add your actor to the node as a logger, use the LoggerAddPID method from the gen.Node interface:

Note that the LoggerAddPID method of the gen.Node interface is not available to a process during its initialization state, as the node has not yet recognized the process (see the Process section). Therefore, an actor cannot add itself as a logger within the Init callback method of the act.ActorBehavior interface.

When a logger process terminates, it will be automatically removed from the node's logging system.

Implementations of gen.Logger

To extend the capabilities of the Ergo Framework, two additional loggers have been implemented:

  • : Enables colored highlighting for log messages in the console.

  • : Allows logging to a file with support for log file rotation at specified intervals.

You can disable the default logger and add both of these loggers, enabling colored output in the console while also saving logs to a file with rotation. You can find an example of using them in the demo project .

type MySupervisor struct {
    act.Supervisor
}

func factoryMySupervisor() gen.ProcessBehavior {
    return &MySupervisor{}
}
type SupervisorBehavior interface {
	gen.ProcessBehavior

	// Init invoked on a spawn Supervisor process. 
	// This is a mandatory callback for the implementation
	Init(args ...any) (SupervisorSpec, error)

	// HandleChildStart invoked on a successful child process starting if option EnableHandleChild
	// was enabled in gen.SupervisorSpec
	HandleChildStart(name gen.Atom, pid gen.PID) error

	// HandleChildTerminate invoked on a child process termination 
	// if option EnableHandleChild was enabled in gen.SupervisorSpec
	HandleChildTerminate(name gen.Atom, pid gen.PID, reason error) error

	// HandleMessage invoked if Supervisor received a message sent with gen.Process.Send(...).
	// Non-nil value of the returning error will cause termination of this process.
	// To stop this process normally, return gen.TerminateReasonNormal or
	// gen.TerminateReasonShutdown. Any other - for abnormal termination.
	HandleMessage(from gen.PID, message any) error

	// HandleCall invoked if Supervisor got a synchronous request made with gen.Process.Call(...).
	// Return nil as a result to handle this request asynchronously and
	// to provide the result later using the gen.Process.SendResponse(...) method.
	HandleCall(from gen.PID, ref gen.Ref, request any) (any, error)

	// Terminate invoked on a termination supervisor process
	Terminate(reason error)

	// HandleInspect invoked on the request made with gen.Process.Inspect(...)
	HandleInspect(from gen.PID) map[string]string
}
//
// Child process implementation
//

func factoryMyActor() gen.ProcessBehavior {
    return &MyActor{}
}

type MyActor struct {
    act.Actor
}

func (a *MyActor) Init(args ...any) error {
    a.Log().Info("starting child process MyActor")
    return nil
}

//
// gen.SupervisorBehavior implementation
//

func (s *MySupervisor) Init(args ...any) (act.SupervisorSpec, error) {
    s.Log().Info("initialize supervisor...")
    spec := act.SupervisorSpec{
        Children: []act.SupervisorChildSpec{
            act.SupervisorChildSpec{
                Name: "myChild",
                Factory: factoryMyActor,
            },
        }
    }
    return spec, nil
}
...
pid, err := node.Spawn(factoryMySupervisor, gen.ProcessOptions{})
node.Log().Info("MySupervisor is started succesfully with pid %s", pid)
...

restart strategy
restart strategy
restart strategy
restart strategy
restart strategy
type Log interface {
	Level() LogLevel
	SetLevel(level LogLevel) error

	Logger() string
	SetLogger(name string)

	Trace(format string, args ...any)
	Debug(format string, args ...any)
	Info(format string, args ...any)
	Warning(format string, args ...any)
	Error(format string, args ...any)
	Panic(format string, args ...any)
}
type LoggerBehavior interface {
	Log(message MessageLog)
	Terminate()
}
node.LoggerAdd("my logger", myLogger, gen.LogLevelPanic, gen.LogLevelInfo)
[]gen.LogLevel{
    LogLevelTrace,
    LogLevelDebug,
    LogLevelInfo,
    LogLevelWarning,
    LogLevelError,
    LogLevelPanic,
}
var options gen.NodeOptions
logFile, _ := os.Create("out.log")
options.Log.Logger.Output = io.MultiWriter(os.Stdout, logFile)
TimeFormat: time.DateTime
IncludeBehavior: true
IncludeName: true
func (a *MyActor) Init(args ...any) error {
    ...
    a.Log().Info("starting my actor")
    return nil
}
...
pid, err := node.SpawnRegister("example", factoryMyActor, gen.ProcessOptions{})
node.Log().Info("started MyActor with PID %s", pid)
...
2024-07-31 07:53:57 [info] <6EE4478D.0.1017> 'example' main.MyActor: starting my actor
2024-07-31 07:53:57 [info] 6EE4478D: started MyActor with PID <6EE4478D.0.1017>
func factoryMyLogger() gen.ProcessBehavior {
    return &MyLogger{}
}

type MyLogger struct {
    act.Actor
}

func (ml *MyLogger) HandleLog(message gen.MessageLog) error {
    switch m := message.Source.(type) {
    case gen.MessageLogNode:
        // handle message 
    case gen.MessageLogProcess:
        // handle message
    case gen.MessageLogMeta:
        // handle message
    case gen.MessageLogNetwork:
        // handle message
    }
    ...
    return nil
}
...
pid, err := node.Spawn(factoryMyLogger, gen.ProcessOptions{})
node.LoggerAddPID(pid)
process-based logger.
time package constants
colored
rotate
here

Process

What is a Process in Ergo Framework

In Ergo Framework, a process is a lightweight entity that operates on top of a goroutine and is based on the actor model. Each process has a mailbox for receiving incoming messages. The size of the mailbox is determined by the gen.ProcessOptions.MailboxSize parameter when the process is started. By default, this value is set to zero, which makes the mailbox unlimited in size. Inside the mailbox, there are several queues—Main, System, Urgent, and Log—that prioritize the processing of messages.

Any running process can send messages and make synchronous calls to other processes, including those running on remote nodes. Additionally, a process can spawn new processes, both locally and on remote nodes (see Remote Spawn Process).

Process Identifiers

Processes in Ergo Framework are created and run within the scope of a node. Each process is assigned a unique identifier called gen.PID when it is started. The gen.PID consists of several key components:

  • Node: name of the node on which the process is running.

  • ID: unique number that identifies the process within its node.

  • Creation: property that represents the "incarnation" of the process, meaning the specific instance of the node in which the process was created.

This combination of properties allows for:

  • Identification Across Nodes: node name is used for routing messages to remote nodes where the process resides. Once routed to the correct node, the process's unique sequential number is used for local message routing on the remote node.

  • Process Incarnation Handling: Creation property allows the system to distinguish between different instances of a node. This property has the same value for all processes within a node's lifecycle. If the node is restarted, the Creation value changes (it stores the timestamp of the node's startup).

If you attempt to send a message to a process with a gen.PID that belongs to a previous incarnation of the node, an error gen.ErrProcessIncarnation will be returned.

This mechanism ensures that processes are accurately identified and that message routing remains reliable, even across restarts of nodes.

gen.ProcessID

A process can also be identified by a name associated with it. You can register such a name when starting a process using the SpawnRegister method of the gen.Node interface. Additionally, a process can register its name after it has started by calling the RegisterName method of the gen.Process interface.

If you want to change the associated name of a process, you first need to unregister the current name. This is because a process is limited to having only one associated name at a time. The UnregisterName method of the gen.Processinterface allows you to remove the current name registration.

To send a message to a process by its registered name, you can use the SendProcessID method of the gen.Processinterface. For making a synchronous request by name, the gen.Process interface provides the CallProcessID method.

These methods make it easy to send messages and interact with processes using their associated names, without needing to know their gen.PID, especially when dealing with remote or dynamically created processes.

type myActor struct {
    act.Actor
}
...
func (a *myActor) HandleMessage(from gen.PID, message any) error {
    ...
    remoteProcess := gen.ProcessID{Name:"p1", Node:"node2@localhost"}
    // sending async message to the remote process with associated name
    a.SendProcessID(remoteProcess, "hello")
    // making a sync request to the remote process with associated name
    result, err := a.CallProcessID(remoteProcess, "hi")
    ...
    // you can also use methods Send and Call
    a.Send(remoteProcess)
    result, err = a.Call(remoteProcess)
    ...
}

To send a message or make a synchronous request to a local process, you can use the universal methods Send and Call of the gen.Process interface, respectively:

 func (a *myActor) HandleMessage(from gen.PID, message any) error {
    ...
    localProcess := gen.Atom("local")
    a.Send(localProcess, "hello")
    result, err := a.Call(localProcess, "hi")
    ...
 }

gen.Alias

In Ergo Framework, processes have the ability to create temporary identifiers known as aliases. A process can generate an unlimited number of aliases using the CreateAlias method of the gen.Process interface. These aliases remain valid throughout the lifetime of the process or until the process explicitly removes them using the DeleteAlias method of the gen.Process interface.

Aliases provide additional flexibility by allowing a process to be referenced by multiple temporary identifiers, useful for handling specific tasks or interactions where a unique, temporary process reference is needed.

Process states

A process in Ergo Framework can be in one of the following states:

  • Init - This is the state of the process during its startup. At this stage, the process is not yet registered in the node, so only a limited set of methods from the gen.Process interface are available—such as modifying process properties (environment variables, message compression settings, setting priorities for outgoing messages). Attempts to use methods like RegisterName, RegisterEvent, CreateAlias, Call*, Link*, and Monitor* will return the error gen.ErrNotAllowed. However, methods like Send* and Spawn* remain available in this state.

  • Sleep - This state occurs when the process has no messages in its mailbox, and no goroutine is running for the process. The process is idle and consumes no CPU resources in this state.

  • Running - When a process receives a message in its mailbox, a goroutine is launched, and the appropriate callback is executed. Once the message has been processed and if there are no other messages in the mailbox, the process returns to the Sleep state, and the goroutine is terminated.

  • WaitResponse - The process transitions to this state from Running when it makes a synchronous request (Call*). The process waits for a response to the request, and once received, it transitions back to the Running state

  • Zombee - This is an intermediate state that occurs when a process was in the Running state but the node forcibly stopped the process using the Kill method. In this state, the process cannot access any methods from the gen.Process interface. After processing the current message and terminating the goroutine, the process transitions from Zombie to Terminated. In this state, the process no longer receives new messages.

  • Terminated - This is the final state of the process before it is removed from the node. Messages are not delivered to a process in this state.

You can retrieve the current state of a process using the ProcessInfo method of the gen.Node interface. Alternatively, you can use the State method of the gen.Process interface to check the state directly from the process.

These states define the lifecycle of a process and determine what actions it can perform at any given moment in Ergo Framework.

Environment Variables

Each process in Ergo Framework has its own set of environment variables, which can be retrieved using the EnvList()method of the gen.Process interface. To modify these variables, the SetEnv(name gen.Env, value any) method is used; if value is set to nil, the environment variable with the specified name will be removed. Environment variable names are case-insensitive, allowing for flexible and dynamic configuration management within processes:

type myActor struct {
    act.Actor
}

func (a *myActor) Init(args ...any) error {
    ...
    a.SetEnv("var", 1) // set variable
    a.SetEnv("Var", 2) // overwrite value, the name is case insensitive
    v, _ := a.Env("vAr") // return 2 as a value
    a.SetEnv("vaR", nil) // remove variable
    ...
}

The initialization of a process's environment variables occurs at the moment the process starts. The starting process first inherits the environment variables of the node, then the environment variables of the application and parent process, and finally, the environment variables specified in gen.ProcessOptions are added.

Process starting

To launch a new process in Ergo Framework, there are two methods:

Spawn(factory ProcessFactory, options ProcessOptions, args ...any) (PID, error)
SpawnRegister(register Atom, factory ProcessFactory, options ProcessOptions, args ...any) (PID, error)

Both methods are available in the gen.Node and gen.Process interfaces.

When using the SpawnRegister method, the node checks whether the specified name register can be assigned to the starting process. If the name is already taken by another process, this function will return the error gen.ErrTaken.

Parent process

Processes launched by the node (using the Spawn* methods of the gen.Node interface) are assigned a virtual parent process identifier, known as CorePID. You can retrieve this value using the CorePID method of the gen.Node interface.

If a process is launched using the Spawn* methods of the gen.Process interface, it becomes a child process. The Parentmethod of the gen.Process interface in the child process will return the identifier of the process that launched it.

Attempting to send an exit signal to a parent process using the SendExit method of the gen.Process interface will result in the error gen.ErrNotAllowed.

Additionally, the parent process identifier is used in handling exit signals in act.Actor (see the section on TrapExit in Actor).

Process leader

Each process in Ergo Framework has a process leader identifier, in addition to the parent process identifier. You can retrieve this identifier using the Leader method of the gen.Process interface. When a process is started by the node, its process leader is set to the virtual identifier CorePID. If the process is started by another process, the child process inherits the leader identifier from its parent process.

During process startup, it is also possible to explicitly set a process leader using the Leader option in gen.ProcessOption. This is commonly done when starting child processes in act.Supervisor. While the process leader identifier does not influence the operational logic of processes, it serves an informational role and can be used within your application's logic as needed.

gen.ProcessFactory

To start a process in Ergo Framework, you need to specify a factory argument, which is a factory function. This function must return an object that implements the gen.ProcessBehavior interface. This is a low-level interface that is already implemented in general-purpose actors (such as act.Actor, act.Supervisor, etc.). Therefore, in most cases, you only need to embed the required actor into your object to satisfy the gen.ProcessBehavior interface.

Example:

// based on act.Actor
type MyActor struct {
    act.Actor // embed generic actor
}
func factoryMyActor() gen.ProcessBehavior {
    return &MyActor{}
}
...
node.Spawn(factoryMyActor, gen.ProcessOptions{})

gen.ProcessOptions

The options parameter in the Spawn or SpawnRegister functions allows you to configure various parameters for the process being launched:

  • MailboxSize: defines the size of the mailbox. If not specified, the mailbox will be unlimited in size.

  • Leader: sets the leader of the process group. If not specified, the leader value is inherited from the parent process. This setting is informational and does not affect the process itself but can be used in your application's logic. Typically, this is set by supervisor processes.

  • Compression: specifies the message compression settings when sending messages over the network to remote nodes. This option is ignored if the recipient is a local process.

  • SendPriority: allows you to set the priority for sent messages. There are three priority levels:

    • gen.MessagePriorityNormal: Messages are delivered to the Main queue of the recipient's mailbox.

    • gen.MessagePriorityHigh: Messages are delivered to the System queue.

    • gen.MessagePriorityMax: Highest-priority messages are delivered to the Urgent queue of the recipient's mailbox.

  • ImportantDelivery: enables the important flag for messages sent to remote processes. When enabled, the remote node will send an acknowledgment that the message was delivered to the recipient's mailbox. If the message cannot be delivered, an error (e.g., gen.ErrProcessTerminated, gen.ProcessUnknown, gen.ProcessMailboxFull) will be returned. You can also manage this flag using the SetImportantDelivery method of the gen.Process interface. This flag only applies to messages sent via the SendPID, SendProcessID, and SendAlias methods or requests made using CallPID, CallProcessID, or CallAlias methods of gen.Process. To force a message to be sent with the important flag, use the SendImportant or CallImportant methods of gen.Process.

  • Fallback: specifies a fallback process to which messages will be redirected in the event that the recipient's mailbox is full. Each such message is wrapped in the gen.MessageFallback structure. This option is ignored if the recipient process's mailbox is unlimited.

  • LinkParent: Automatically creates a link with the parent process when the process is started (after successful initialization). This option is ignored if the process is started by the node.

  • LinkChild: Automatically creates a link with a child process upon its successful startup (after successful initialization). This option is ignored if the process is started by the node.

  • Env: Sets the environment variables for the process.

During the initialization stage, processes do not have access to linking methods (Link* from the gen.Process interface). However, the LinkParent and LinkChild options provide a way to bypass this limitation when launching child processes. These options allow the process to automatically create links with the parent and child processes after successful initialization, ensuring that linking is established even during the startup phase

Process termination

In Ergo Framework, a process can be terminated in the following ways:

  1. Forcefully: process can be terminated by force using the Kill method provided by the gen.Node interface.

  2. Sending an Exit Signal: you can stop a process by sending it an exit signal with a specified reason. This is done using the SendExit method, which is available in both the gen.Node and gen.Process interfaces. The exit signal is always delivered with the highest priority and placed in the Urgent queue. In act.Actor, there is a mechanism to intercept exit signals, allowing them to be handled like normal gen.MessageExit* messages. More information on this mechanism can be found in the Actor section. Sending an exit signal to yourself, your parent process, or the process leader is not allowed. In such cases, the SendExit method will return the error gen.ErrNotAllowed.

  3. Self-Termination: A process can terminate itself by returning a non-nil value (of type error) from its message handling callback. The returned value will be treated as the termination reason, and the process will be terminated. You can use gen.TerminateReasonNormal or gen.TerminateReasonShutdown for normal shutdown scenarios.

  4. Panic: If a panic occurs during message processing, the process will be terminated with the reason gen.TerminateReasonPanic.

Upon termination, all resources associated with the process are released:

  • All events registered by the process are unregistered.

  • Any associated name, if registered, is freed.

  • Aliases, links, and monitors created by the process are removed.

  • The process is removed from the logging system if it was previously registered as a logger.

  • All meta-processes launched by the process are terminated.

Process information

You can retrieve summary information about a process using the ProcessInfo method of the gen.Nodeinterface or the Info method of the gen.Process interface. Both methods return a gen.ProcessInfo structure containing detailed information about the process.

etcd Client

Introduced for Ergo Framework 3.1.0 and above (not yet released. available in v310 branch)

This package implements the gen.Registrar interface and serves as a client library for etcd, a distributed key-value store that provides a reliable way to store data that needs to be accessed by a distributed system or cluster of machines. In addition to the primary Service Discovery function, it automatically notifies all connected nodes about cluster configuration changes and supports hierarchical configuration management with type conversion.

To create a client, use the Create function from the etcd package. The function requires a set of options etcd.Options to configure the connection and behavior.

Then, set this client in the gen.NetworkOption.Registrar options:

import (
     "ergo.services/ergo"
     "ergo.services/ergo/gen"

     "ergo.services/registrar/etcd"
)

func main() {
     var options gen.NodeOptions
     ...
     registrarOptions := etcd.Options{
         Endpoints: []string{"localhost:2379"},
         Cluster:   "production",
     }
     options.Network.Registrar = etcd.Create(registrarOptions)
     ...
     node, err := ergo.StartNode("demo@localhost", options)
     ...
}

Using etcd.Options, you can specify:

  • Cluster - The cluster name for your node (default: "default")

  • Endpoints - List of etcd endpoints (default: ["localhost:2379"])

  • Username - Username for etcd authentication (optional)

  • Password - Password for etcd authentication (optional)

  • TLS - TLS configuration for secure connections (optional)

  • InsecureSkipVerify - Option to ignore TLS certificate verification

  • DialTimeout - Connection timeout (default: 10s)

  • RequestTimeout - Request timeout (default: 10s)

  • KeepAlive - Keep-alive timeout (default: 10s)

When the node starts, it will register with the etcd cluster and maintain a lease to ensure automatic cleanup if the node becomes unavailable.

Configuration Management

The etcd registrar provides hierarchical configuration management with four priority levels:

  1. Cross-cluster node-specific: services/ergo/config/{cluster}/{node}/{item}

  2. Cluster node-specific: services/ergo/cluster/{cluster}/config/{node}/{item}

  3. Cluster-wide default: services/ergo/cluster/{cluster}/config/*/{item}

  4. Global default: services/ergo/config/global/{item}

Typed Configuration

The etcd registrar supports typed configuration values using string prefixes. Configuration values are stored as strings in etcd and automatically converted to the appropriate Go types when read by the registrar:

  • "int:123" → int64(123)

  • "float:3.14" → float64(3.14)

  • "bool:true" → bool(true), "bool:false" → bool(false)

  • "hello" → "hello" (strings without prefixes remain unchanged)

Important: All configuration values must be stored as strings in etcd. The type conversion happens automatically when the registrar reads the configuration.

Example configuration setup using etcdctl:

# Node-specific integer configuration (stored as string, converted to int64)
etcdctl put services/ergo/cluster/production/config/web1/database.port "int:5432"

# Cluster-wide float configuration (stored as string, converted to float64)
etcdctl put services/ergo/cluster/production/config/*/cache.ratio "float:0.75"

# Boolean configuration (stored as string, converted to bool)
etcdctl put services/ergo/cluster/production/config/*/debug.enabled "bool:true"
etcdctl put services/ergo/cluster/production/config/web1/ssl.enabled "bool:false"

# Application-specific configuration (visible to all nodes using wildcard format)
etcdctl put services/ergo/cluster/production/config/*/myapp.cache.size "int:256"
etcdctl put services/ergo/cluster/production/config/*/client.timeout "int:30"

# Global string configuration (stored and returned as string)
etcdctl put services/ergo/config/global/log.level "info"

Access configuration in your application:

registrar, err := node.Network().Registrar()
if err != nil {
    return err
}

// Get single configuration item
port, err := registrar.ConfigItem("database.port")
if err != nil {
    return err
}
// port will be int64(5432)

// Get multiple configuration items
config, err := registrar.Config("database.port", "cache.ratio", "debug.enabled", "log.level")
if err != nil {
    return err
}
// config["database.port"] = int64(5432)
// config["cache.ratio"] = float64(0.75) 
// config["debug.enabled"] = bool(true)
// config["log.level"] = "info"

Event System

The etcd registrar registers a gen.Event and generates messages based on changes in the etcd cluster within the specified cluster. This allows the node to stay informed of any updates or changes within the cluster, ensuring real-time event-driven communication and responsiveness to cluster configurations:

  • etcd.EventNodeJoined - Triggered when another node is registered in the same cluster

  • etcd.EventNodeLeft - Triggered when a node disconnects or its lease expires

  • etcd.EventApplicationLoaded - An application was loaded on a remote node

  • etcd.EventApplicationStarted - Triggered when an application starts on a remote node

  • etcd.EventApplicationStopping - Triggered when an application begins stopping on a remote node

  • etcd.EventApplicationStopped - Triggered when an application is stopped on a remote node

  • etcd.EventConfigUpdate - The cluster configuration was updated

To receive such messages, you need to subscribe to etcd client events using the LinkEvent or MonitorEvent methods from the gen.Process interface. You can obtain the name of the registered event using the Event method from the gen.Registrar interface:

type myActor struct {
    act.Actor
}

func (m *myActor) HandleMessage(from gen.PID, message any) error {
    reg, err := m.Node().Network().Registrar()
    if err != nil {
        m.Log().Error("unable to get Registrar interface: %s", err)
        return nil
    }
    
    ev, err := reg.Event()
    if err != nil {
        m.Log().Error("Registrar has no registered Event: %s", err)
        return nil
    }
    
    m.MonitorEvent(ev)
    return nil
}

func (m *myActor) HandleEvent(event gen.MessageEvent) error {
    switch msg := event.Message.(type) {
    case etcd.EventNodeJoined:
        m.Log().Info("Node %s joined cluster", msg.Name)
    case etcd.EventApplicationStarted:
        m.Log().Info("Application %s started on node %s", msg.Name, msg.Node)
    case etcd.EventConfigUpdate:
        m.Log().Info("Configuration %s updated", msg.Item)
        
        // Handle specific configuration changes
        if msg.Item == "ssl.enabled" {
            if enabled, ok := msg.Value.(bool); ok {
                m.Log().Info("SSL %s", map[bool]string{true: "enabled", false: "disabled"}[enabled])
            }
        }
    }
    return nil
}

Application Discovery

To get information about available applications in the cluster, use the ResolveApplication method from the gen.Resolver interface, which returns a list of gen.ApplicationRoute structures:

type ApplicationRoute struct {
    Node   Atom
    Name   Atom
    Weight int
    Mode   ApplicationMode
    State  ApplicationState
}
  • Name - The name of the application

  • Node - The name of the node where the application is loaded or running

  • Weight - The weight assigned to the application in gen.ApplicationSpec

  • Mode - The application's startup mode (gen.ApplicationModeTemporary, gen.ApplicationModePermanent, gen.ApplicationModeTransient)

  • State - The current state of the application (gen.ApplicationStateLoaded, gen.ApplicationStateRunning, gen.ApplicationStateStopping)

You can access the gen.Resolver interface using the Resolver method from the gen.Registrar interface:

resolver := registrar.Resolver()

// Resolve application routes
routes, err := resolver.ResolveApplication("web-server")
if err != nil {
    return err
}

for _, route := range routes {
    log.Printf("Application %s running on node %s (weight: %d, state: %s)", 
        route.Name, route.Node, route.Weight, route.State)
}

Node Discovery

Get a list of all nodes in the cluster:

nodes, err := registrar.Nodes()
if err != nil {
    return err
}

for _, nodeName := range nodes {
    log.Printf("Node in cluster: %s", nodeName)
}

Data Storage Structure

The etcd registrar organizes data in etcd using the following key structure:

services/ergo/cluster/{cluster}/
├── routes/                         # Non-overlapping with config paths
│   ├── nodes/{node}               # Node registration with lease (edf.Encode + base64)
│   └── applications/{app}/{node}  # Application routes (edf.Encode + base64)
└── config/                        # Configuration data (string + type prefixes) 
    ├── {node}/{item}             # Node-specific config
    └── */{item}                  # Cluster-wide config

services/ergo/config/
├── {cluster}/{node}/{item}        # Cross-cluster node config
└── global/{item}                  # Global config

Important Architecture Notes:

  • Routes (nodes/applications) use edf.Encode + base64 encoding and are stored in the routes/ subpath. Don't change anything there.

  • Configuration uses string encoding with type prefixes and is stored in the config/ subpath

Example

A fully featured example can be found at GitHub - Ergo Services Examples in the docker directory.

This example demonstrates how to run multiple Ergo nodes using etcd as a registrar for service discovery. It showcases service discovery, actor communication, typed configuration management, and real-time configuration event monitoring across a cluster.

Development and Testing

The etcd registrar includes comprehensive testing infrastructure:

Docker Testing Setup

Use the included Docker Compose setup for testing:

# Start etcd for testing
make start-etcd

# Run tests with coverage
make test-coverage

# Run integration tests only
make test-integration

# Clean up
make clean

Manual etcd Operations

For debugging and manual operations:

# Check cluster health
etcdctl --endpoints=localhost:12379 endpoint health

# List all keys in cluster
etcdctl --endpoints=localhost:12379 get --prefix "services/ergo/"

# Set configuration manually (values must be strings)
etcdctl --endpoints=localhost:12379 put \
  "services/ergo/cluster/production/config/web1/database.timeout" "int:30"

etcdctl --endpoints=localhost:12379 put \
  "services/ergo/cluster/production/config/web1/debug.enabled" "bool:true"

# Watch for changes
etcdctl --endpoints=localhost:12379 watch --prefix "services/ergo/cluster/production/"

The etcd registrar provides a robust, scalable solution for service discovery and configuration management in distributed Ergo applications, with the reliability and consistency guarantees of etcd.

Inspecting With Observer

Installation and starting

To install the observer tool, you need to have Golang compiler version 1.20 or higher. Run the following command:

$ go install ergo.services/tools/observer@latest

Available arguments for starting observer:

  • -help: displays information about the available arguments.

  • -version: prints the current version of the Observer tool.

  • -host: specifies the interface name for the Web server to run on (default: "localhost").

  • -port: defines the port number for the Web server (default: 9911).

  • -cookie: sets the default cookie value used for connecting to other nodes.

If you are running observer on a server for continuous operation, it is recommended to use the environment variable COOKIE instead of the -cookie argument. Using sensitive data in command-line arguments is insecure.

After starting observer, it initially has no connections to other nodes, so you will be prompted to specify the node you want to connect to.

Once you establish a connection with a remote node, the Observer application main page will open, displaying information about that node.

If you have integrated the Observer application into your node, upon opening the Observer page, you will immediately land on the main page showing information about the node where the Observer application was launched.

Info (main page)

On this tab, you will find general information about the node and the ability to manage its logging level. Changing the logging level only affects the node itself and any newly started processes, but it does not impact processes that are already running.

Graphs provide real-time information over the last 60 seconds, including the total number of processes, the number of processes in the running state, and memory usage data. Memory usage is divided into used, which indicates how much memory was reserved from the operating system, and allocated, which shows how much of that reserved memory is currently being used by the Golang runtime.

In addition to these details, you can view information about the available loggers on the node and their respective logging levels. For more details, refer to the Logging section. Environment variables will also be displayed here, but only if the ExposeEnvInfo option was enabled in the gen.NodeOptions.Security settings when the inspected node was started.

Network (main page)

The Network tab displays information about the node's network stack.

The Mode indicates how the network stack was started (enabled, hidden, or disabled).

The Registrar section shows the properties of the registrar in use, including its capabilities. Embedded Server indicates whether the registrar is running in server mode, while the Server field shows the address and port number of the registrar with which the node is registered.

Additionally, the tab provides information about the default handshake and protocol versions used for outgoing connections.

The Flags section lists the set of flags that define the functionality available to remote nodes.

The Acceptors section lists the node's acceptors, with detailed information available for each. This list will be empty if the network stack is running in hidden mode.

Since the node can work with multiple network stacks simultaneously, some acceptors may have different registrar parameters and handshake/protocol versions. For an example of simultaneous usage of the Erlang and Ergo Framework network stacks, refer to the Erlang section.

The Connected Nodes section displays a list of active connections with remote nodes. For each connection, you can view detailed information, including the version of the handshake used when the connection was established and the protocol currently in use. The Flags section shows which features are available to the node when interacting with the remote node.

Since the ENP protocol supports a pool of TCP connections within a single network connection, you will find information about the Pool Size (the number of TCP connections). The Pool DSN field will be empty if this is an incoming connection for the node or if the protocol does not support TCP connection pooling.

Graphs provide a summary of the number of received/sent messages and network traffic over the last 60 seconds, offering a quick overview of communication activity and data flow.

Process list (main page)

On the Processes List tab, you can view general information about the processes running on the node. The number of processes displayed is controlled by the Start from and Limit parameters.

By default, the list is sorted by the process identifier. However, you can choose different sorting options:

  • Top Running: displays processes that have spent the most time in the running state.

  • Top Messaging: sorts processes by the number of sent/received messages in descending order.

  • Top Mailbox: helps identify processes with the highest number of messages in their mailbox, which can be an indication that the process is struggling to handle the load efficiently.

For each process, you can view brief information:

The Behavior field shows the type of object that the process represents.

Application field indicates the application to which the process belongs. This property is inherited from the parent, so all processes started within an application and their child processes will share the same value.

Mailbox Messages displays the total number of messages across all queues in the process's mailbox.

Running Time shows the total time the process has spent in the running state, which occurs when the process is actively handling messages from its queue.

By clicking on the process identifier, you will be directed to a page with more detailed information about that specific process.

Log (main page)

All log messages from the node, processes, network stack, or meta-processes are displayed here. When you connect to the Observer via a browser, the Observer's backend sends a request to the inspector to start a log process with specified logging levels (this log process is visible on the main Info tab).

When you change the set of logging levels, the Observer's backend requests the start of a new log process (the old log process will automatically terminate).

To reduce the load on the browser, the number of displayed log messages is limited, but you can adjust this by setting the desired number in the Last field.

The Play/Pause button allows you to stop or resume the log process, which is useful if you want to halt the flow of log messages and focus on examining the already received logs in more detail.

Process information

This page displays detailed information about the process, including its state, uptime, and other key metrics.

The fallback parameters specify which process will receive redirected messages in case the current process's mailbox becomes full. However, if the Mailbox Size is unlimited, these fallback parameters are ignored.

The Message Priority field shows the priority level used for messages sent by this process.

Keep Network Order is a parameter applied only to messages sent over the network. It ensures that all messages sent by this process to a remote process are delivered in the same order as they were sent. This parameter is enabled by default, but it can be disabled in certain cases to improve performance.

The Important Delivery setting indicates whether the important flag is enabled for messages sent to remote nodes. Enabling this option forces the remote node to send an acknowledgment confirming that the message was successfully delivered to the recipient's mailbox.

The Compression parameters allow you to enable message compression for network transmissions and define the compression settings.

Graphs on this page help you assess the load on the process, displaying data over the last 60 seconds.

Additionally, you can find detailed information about any aliases, links, and monitors created by this process, as well as any registered events and started meta-processes.

The list of environment variables is displayed only if the ExposeEnvInfo option was enabled in the node's gen.NodeOptions.Security settings.

Additionally, on this page, you can send a message to the process, send an exit signal, or even forcibly stop the process using the kill command. These options are available in the context menu.

Inspect (process page)

If the behavior of this process implements the HandleInspect method, the response from the process to the inspect request will be displayed here. The Observer sends these requests once per second while you are on this tab.

In the example screenshot above, you can see the inspection of a process based on act.Pool. Upon receiving the inspect request, it returns information about the pool of processes and metrics such as the number of messages processed.

Log (process page)

The Log tab on the process information page displays a list of log messages generated by the specific process.

Please note that since the Observer uses a single stream for logging, any changes to the logging levels will also affect the content of the Log tab on the main page.

Meta-process information

On this page, you'll find detailed information about the meta-process, along with graphs showing data for the last 60 seconds related to incoming/outgoing messages and the number of messages in its mailbox. The meta-process has only two message queues: main and system.

You can also send a message to the meta-process or issue an exit signal. However, it is not possible to forcibly stop the meta-process using the kill command.

Inspect (meta-process page)

If the meta-process's behavior implements the HandleInspect method, the response from the meta-process to the inspect request will be displayed on this tab. The Observer sends this request once per second while you are on the tab.

Log (meta-process page)

On the Log tab of the meta-process, you will see log messages generated by that specific meta-process. Changing the logging levels will also affect the content of the Log tab on the main page.

Erlang

Erlang network stack

This package implements the Erlang network stack, including the DIST protocol, ETF data format, EPMD registrar functionality, and the Handshake mechanism.

It is compatible with OTP-23 to OTP-27. The source code is available on the project's GitHub page at https://github.com/ergo-services/proto in the erlang23 directory.

Note that the source code is distributed under the Business Source License 1.1 and cannot be used for production or commercial purposes without a license, which can be purchased on the project's sponsor page.

EPMD

The epmd package implements the gen.Registrar interface. To create it, use the epmd.Create function with the following options:

  • Port: Registrar port number (default: 4369).

  • EnableRouteTLS: Enables TLS for all gen.Route responses on resolve requests. This is necessary if the Erlang cluster uses TLS.

  • DisableServer: Disables the internal server mode, useful when using the Erlang-provided epmd service.

To use this package, include ergo.services/proto/erlang23/epmd.

Handshake

The handshake package implements the gen.NetworkHandshake interface. To create a handshake instance, use the handshake.Create function with the following options:

  • Flags: Defines the supported functionality of the Erlang network stack. The default is set by handshake.DefaultFlags().

  • UseVersion5: Enables handshake version 5 mode (default is version 6).

To use this package, include ergo.services/proto/erlang23/handshake.

DIST protocol

The ergo.services/proto/erlang/dist package implements the gen.NetworkProto and gen.Connection interfaces. To create it, use the dist.Create function and provide dist.Options as an argument, where you can specify the FragmentationUnit size in bytes. This value is used for fragmenting large messages. The default size is set to 65000 bytes.

To use this package, include ergo.services/proto/erlang/dist.

ETF data format

Erlang uses the ETF (Erlang Term Format) for encoding messages transmitted over the network. Due to differences in data types between Golang and Erlang, decoding received messages involves converting the data to their corresponding Golang types:

  • number -> int64

  • float number -> float64

  • big number -> big.Int from math/big, or to int64/uint64

  • map -> map[any]any

  • binary -> []byte

  • list -> etf.List ([]any)

  • tuple -> etf.Tuple ([]any) or a registered struct type

  • string -> []any. convert to string using etf.TermToString

  • atom -> gen.Atom

  • pid -> gen.Pid

  • ref -> gen.Ref

  • ref (alias) -> gen.Alias

  • atom = true/false -> bool

When encoding data in the Erlang ETF format:

  • map -> map #{}

  • slice/array -> list []

  • struct -> map with field names as keys (considering etf: tags on struct fields)

  • registered type of struct -> tuple with the first element being the registered struct name, followed by field values in order.

  • []byte -> binary

  • int*/float*/big.Int -> number

  • string -> string

  • gen.Atom -> atom

  • gen.Pid -> pid

  • gen.Ref -> ref

  • gen.Alias -> ref (alias)

  • bool -> atom true/false

You can also use the functions etf.TermIntoStruct and etf.TermProplistIntoStruct for decoding data. These functions take into account etf: tags on struct fields, allowing the values to map correctly to the corresponding struct fields when decoding proplist data.

To automatically decode data into a struct, you can register the struct type using etf.RegisterTypeOf. This function takes the object of the type being registered and decoding options etf.RegisterTypeOption. The options include:

  • Name - The name of the registered type. By default, the type name is taken using the reflect package in the format #/pkg/path/TypeName

  • Strict - Determines whether the data must strictly match the struct. If disabled, non-matching data will be decoded into any.

To be automatically decoded the data sent from Erlang must be a tuple, with the first element being an atom whose value matches the type name registered in Golang. For example:

type MyValue struct{
    MyString string
    MyInt    int32
}

...
// register type MyValue with name "myvalue"
etf.RegisterTypeOf(MyValue{}, etf.RegisterTypeOptions{Name: "myvalue", Strict: true})
...

The values sent by an Erlang process should be in the following format:

> erlang:send(Pid, {myvalue, "hello", 123}).

Ergo-node in Erlang-cluster

If you want to use the Erlang network stack by default in your node, you need to specify this in gen.NetworkOptions when starting the node:

import (
    "fmt"
    
    "ergo.services/ergo"
    "ergo.services/ergo/gen"
    "ergo.services/proto/erlang23/dist"
    "ergo.services/proto/erlang23/epmd"
    "ergo.services/proto/erlang23/handshake"
)

func main() {
    var options gen.NodeOptions
    
    // set cookie
    options.Network.Cookie = "123"
    
    // set Erlang Network Stack for this node
    options.Network.Registrar = epmd.Create(epmd.Options{})
    options.Network.Handshake = handshake.Create(handshake.Options{})
    options.Network.Proto = dist.Create(dist.Options{})

    // starting node
    node, err := ergo.StartNode(gen.Atom(OptionNodeName), options)
    if err != nil {
        fmt.Printf("Unable to start node '%s': %s\n", OptionNodeName, err)
        return
    }
    
    node.Wait()
}

In this case, all outgoing and incoming connections will be handled by the Erlang network stack. For a complete example, you can refer to the repository at https://github.com/ergo-services/examples, specifically the erlang project

If you want to maintain the ability to accept connections from Ergo nodes while using the Erlang network stack as a main one, you need to add an acceptor in the gen.NetworkOptions settings:

import (
    "fmt"
    "ergo.services/ergo"
    "ergo.services/ergo/gen"
    
    // Ergo Network Stack
    hs "ergo.services/ergo/net/handshake"
    "ergo.services/ergo/net/proto"
    "ergo.services/ergo/net/registrar"

    // Erlang Network Stack    
    "ergo.services/proto/erlang23/dist"
    "ergo.services/proto/erlang23/epmd"
    "ergo.services/proto/erlang23/handshake"
)

func main() {
    ...
    acceptorErlang := gen.AcceptorOptions{}
    acceptorErgo := gen.AcceptorOptions{
        Registrar: registrar.Create(registrar.Options{}),
        Handshake: hs.Create(hs.Options{}),
        Proto:     proto.Create(),
    }
    options.Network.Acceptors = append(options.Network.Acceptors, 
                                    acceptorErlang, acceptorErgo)
    // starting node
    node, err := ergo.StartNode(gen.Atom(OptionNodeName), options)

Please note that if the list of acceptors is empty when starting the node, it will launch an acceptor with the network stack using Registrar, Handshake, and Proto from gen.NetworkOptions.

If you set the options.Network.Acceptor, you must explicitly define the parameters for all necessary acceptors. In the example, acceptorErlang is created with empty gen.AcceptorOptions (the Erlang stack from gen.NetworkOptions will be used), while for acceptorErgo, the Ergo Framework stack (Registrar, Handshake, and Proto) is explicitly defined.

In this example, you can establish incoming and outgoing connections using the Erlang network stack. However, the Ergo Framework network stack can only be used for incoming connections. To create outgoing network connections using the Ergo Framework stack, you need to configure a static route for a group of nodes by defining a match pattern:

...
// starting node
node, err := ergo.StartNode(gen.Atom(OptionNodeName), options)
// add static route  
route := gen.NetworkRoute{
    Resolver: acceptorErgo.Registrar.Resolver(),
}
match := ".ergonodes.local"
if err := node.Network().AddRoute(match, route, 1); err != nil {
    panic(err)
}

For more detailed information, please refer to the Static Routes section.

Erlang-node in Ergo-cluster

If your cluster primarily uses the Ergo Framework network stack by default and you want to enable interaction with Erlang nodes, you'll need to add an acceptor using the Erlang network stack. Additionally, you must define a static route for Erlang nodes using a match pattern:

import (
    "fmt"
    
    "ergo.services/ergo"
    "ergo.services/ergo/gen"
    "ergo.services/proto/erlang23/dist"
    "ergo.services/proto/erlang23/epmd"
    "ergo.services/proto/erlang23/handshake"
)

func main() {
    var options gen.NodeOptions
    
    // set cookie
    options.Network.Cookie = "123"
    
    // add acceptors
    acceptorErgo := gen.AcceptorOptions{}
    acceptorErlang := gen.AcceptorOptions{
        Registrar: epmd.Create(epmd.Options{}),
        Handshake: handshake.Create(handshake.Options{}),
        Proto:     dist.Create(dist.Options{}),
    }
    options.Network.Acceptors = append(options.Network.Acceptors, 
                                    acceptorErgo, acceptorErlang)

    // starting node
    node, err := ergo.StartNode(gen.Atom(OptionNodeName), options)
    if err != nil {
        fmt.Printf("Unable to start node '%s': %s\n", OptionNodeName, err)
        return
    }
    
    // add static route  
    route := gen.NetworkRoute{
        Resolver: acceptorErlang.Registrar.Resolver(),
    }
    if err := node.Network().AddRoute(".erlangnodes.local", route, 1); err != nil {
        panic(err)
    }
    
    node.Wait()
}

Actor GenServer

The erlang23.GenServer actor implements the low-level gen.ProcessBehavior interface, enabling it to handle messages and synchronous requests from processes running on an Erlang node. The following message types are used for communication in Erlang:

  • regular messages - sent from Erlang using erlang:send or the Pid ! message syntax

  • cast-messages - sent from Erlang with gen_server:cast

  • call-requests - from Erlang made with gen_server:call

erlang23.GenServer uses the erlang23.GenServerBehavior interface to interact with your object. This interface defines a set of callback methods for your object, which allow it to handle incoming messages and requests. All methods in this interface are optional, meaning you can choose to implement only the ones relevant to your specific use case:

type GenServerBehavior interface {
	gen.ProcessBehavior

	Init(args ...any) error
	HandleInfo(message any) error
	HandleCast(message any) error
	HandleCall(from gen.PID, ref gen.Ref, request any) (any, error)
	Terminate(reason error)

	HandleEvent(message gen.MessageEvent) error
	HandleInspect(from gen.PID, item ...string) map[string]string
}

The callback method HandleInfo is invoked when an asynchronous message is received from an Erlang process using erlang:send or via the Send* methods of the gen.Process interface. The HandleCast callback method is called when a cast message is sent using gen_server:cast from an Erlang process. Synchronous requests sent with gen_server:call or Call* methods are handled by the HandleCall callback method.

If your actor only needs to handle regular messages from Erlang processes, you can use the standard act.Actor and process asynchronous messages in the HandleMessage callback method.

To start a process based on erlang23.GenServer, create an object embedding erlang23.GenServer and implement a factory function for it.

Example:

import "ergo.services/proto/erlang23"

func factory_MyActor gen.ProcessBehavior {
    return &MyActor{}
}

type MyActor struct {
    erlang23.GenServer
}

To send a cast message, use the Cast method of erlnag23.GenServer.

func (ma *MyActor) HandleInfo(message any) error {
    ...
    ma.Cast(Pid, "cast message")
    return nil
}

To send regular messages, use the Send* methods of the embedded gen.Process interface. Synchronous requests are made using the Call* methods of the gen.Process interface.

Like act.Actor, an actor based on erlang23.GenServer supports the TrapExit functionality to intercept exit signals. Use the SetTrapExit and TrapExit methods of your object to manage this functionality, allowing your process to handle exit signals rather than terminating immediately when receiving them.

Saturn - Central Registrar

Ergo Service Registry and Discovery

saturn is a tool designed to simplify the management of clusters of nodes created using the Ergo Framework. It offers the following features:

  • A unified registry for node registration within a cluster.

  • The ability to manage multiple clusters simultaneously.

  • The capability to manage the configuration of the entire cluster without restarting the nodes connected to Saturn (configuration changes are applied on the fly).

  • Notifications to all cluster participants about changes in the status of applications running on nodes connected to Saturn.

The source code of the saturn tool is available on the project's page: https://github.com/ergo-services/tools.

Installation

To install saturn, use the following command:

$ go install ergo.services/tools/saturn@latest

Available arguments:

  • host: Specifies the hostname to use for incoming connections.

  • port: Port number for incoming connections. The default value is 4499.

  • path: Path to the configuration file saturn.yaml.

  • debug: Enables debug mode for outputting detailed information.

  • version: Displays the current version of Saturn.

Starting Saturn

To start Saturn, a configuration file named saturn.yaml is required. By default, Saturn expects this file to be located in the current directory. You can specify a different location for the configuration file using the -path argument.

You can find an example configuration file in the project's Git repository.

Configuration file structure

The saturn.yaml configuration file contains two root elements:

  1. Saturn: This section includes settings for the Saturn server.

    • You can configure the Token for access by remote nodes and specify certificate files for TLS connections.

    • By default, a self-signed certificate is used. For clients to accept this certificate, they must enable the InsecureSkipVerify option when creating the client.

    • Changes to this section require a restart of the Saturn server.

  2. Clusters: This section includes the configurations for clusters.

    • Changes in this section are automatically reloaded and sent to the registered nodes as updated configuration messages, without requiring a restart of Saturn.

    • The settings can target:

      • All nodes in all clusters.

      • Only nodes with a specified name in all clusters.

      • Only nodes within a specific cluster.

      • Only a node with a specified name within a specific cluster.

If the name of a configuration element ends with the suffix .file, the value of that element is treated as a file. The content of this file is then sent to the nodes as a []byte.

To configure settings for all nodes in all clusters, use the Clusters section in the saturn.yaml configuration file. Here, you can define global settings that will apply to every node within every cluster managed by Saturn:

Clusters:
    Var1: 123
    Var2: 12.3
    Var3: "123"
    Var4.file: "./myfile.txt"
    
    [email protected]:
        Var1: 456

in this example:

  • Var1, Var2, Var3, and Var4 will be applied to all nodes in all clusters.

  • However, the value of Var1 for nodes named [email protected] in any cluster will be overridden with the value 456.

If nodes are registered without specifying a Cluster in saturn.Options, they become part of the general cluster. Configuration for the general cluster should be provided in the Cluster@ section

Clusters:
    Var1: 123
    Cluster@:
        Var1: 789
        node@host:
            Var1: 456

In the example above:

  • The variable Var1 is set to 789 for the general cluster (all nodes in the general cluster will receive Var1: 789).

  • However, for the node [email protected] within the general cluster, Var1 will be overridden to 456.

Thus, all nodes in the general cluster will inherit Var1: 789, except for [email protected], which will specifically have Var1: 456. Other nodes in the general cluster will retain the default values from the Cluster@ section unless they are explicitly overridden in the configuration.

To specify settings for a particular cluster, use the element name Cluster@<cluster name> in the configuration file:

Clusters:
    Var1: 123
    Cluster@mycluster:
        Var1: 321
        node@host: 654

Service Discovery

Saturn can manage multiple clusters simultaneously, but resolve requests from nodes are handled only within their own cluster.

The name of a registered node must be unique within its cluster.

When a node registers, it informs the registrar which cluster it belongs to. Additionally, the node reports the applications running on it. Other nodes in the same cluster receive notifications about the newly connected node and its applications. Any changes in application statuses are also reported to the registrar, which in turn notifies all participants in the cluster.

For more details, see the Saturn Client section.

Unit

A zero-dependency library for testing Ergo Framework actors with fluent API

Introduced for Ergo Framework 3.1.0 and above (not yet released. available in v310 branch)

The Ergo Unit Testing Library makes testing actor-based systems simple and reliable. It provides specialized tools designed specifically for the unique challenges of testing actors, with zero external dependencies and an intuitive, readable API.

What You'll Learn

This guide takes you from simple actor tests to complex distributed scenarios. Here's the journey:

Getting Started (You Are Here!)

  • Your First Test - Simple echo and counter examples

  • Built-in Assertions - Simple tools for common checks

  • Basic Message Testing - Verify actors send the right messages

  • Basic Logging Testing - Verify your actors provide good observability

Intermediate Skills (Next Steps)

  • Configuration Testing - Test environment-driven behavior

  • Complex Message Patterns - Handle sophisticated message flows

  • Basic Process Spawning - Test actor creation and lifecycle

  • Event Inspection - Debug and analyze actor behavior

Advanced Features (When You Need Them)

  • Actor Termination - Test error handling and graceful shutdowns

  • Exit Signals - Manage process lifecycles in supervision trees

  • Scheduled Operations - Test cron jobs and time-based behavior

  • Network & Distribution - Test multi-node actor systems

Expert Level (Complex Scenarios)

  • Dynamic Value Capture - Handle generated IDs, timestamps, and random data

  • Complex Workflows - Test multi-step business processes

  • Performance & Load Testing - Verify behavior under stress

Tip: The documentation follows this learning path. You can jump to advanced topics if needed, but starting from the beginning ensures you understand the foundations.

Why Testing Actors is Different

Traditional testing tools don't work well with actors. Here's why:

The Challenge: Actors Are Not Functions

Regular code testing follows a simple pattern:

// Traditional testing - call function, check result
result := calculateTax(income, rate)
assert.Equal(t, 1500.0, result)

But actors are fundamentally different:

  • They run asynchronously - you send a message and the response comes later

  • They maintain state - previous messages affect future behavior

  • They spawn other actors - creating complex hierarchies

  • They communicate only via messages - no direct access to internal state

  • They can fail and restart - requiring lifecycle testing

What Makes Actor Testing Hard

  1. Asynchronous Communication

// This doesn't work with actors:
actor.SendMessage("process_order")
result := actor.GetResult() // ❌ No direct way to get result
  1. Message Flow Complexity

// An actor might send multiple messages to different targets:
actor.SendMessage("start_workflow")
// ❌ How do you verify it sent the right messages to the right places?
  1. Dynamic Process Creation

// Actors spawn other actors with generated IDs:
actor.SendMessage("create_worker")
// ❌ How do you test the spawned worker when you don't know its PID?
  1. State Changes Over Time

// Actor behavior changes based on message history:
actor.SendMessage("login", user1)
actor.SendMessage("login", user2)  
actor.SendMessage("get_users")
// ❌ How do you verify the internal state without breaking encapsulation?

How This Library Solves Actor Testing

The Ergo Unit Testing Library addresses each of these challenges:

Event Capture - See Everything Your Actor Does

Instead of guessing what happened, the library automatically captures every actor operation:

actor.SendMessage("process_order")
// Library automatically captures:
// - What messages were sent
// - Which processes were spawned  
// - What was logged
// - When the actor terminated

Fluent Assertions - Test What Matters

Express your test intentions clearly:

actor.SendMessage("create_user", userData)
actor.ShouldSend().To("database").Message(SaveUser{...}).Once().Assert()
actor.ShouldSpawn().Factory(userWorkerFactory).Once().Assert()
actor.ShouldLog().Level(Info).Containing("User created").Assert()

Dynamic Value Handling - Work With Generated Data

Capture and reuse dynamically generated values:

actor.SendMessage("create_session")
sessionResult := actor.ShouldSpawn().Once().Capture()
sessionPID := sessionResult.PID // Use the actual generated PID in further tests

State Testing Through Behavior - Verify State Changes

Test state indirectly by verifying behavioral changes:

actor.SendMessage("login", user1)
actor.SendMessage("get_status")
actor.ShouldSend().To(user1).Message(StatusResponse{LoggedIn: true}).Assert()

Why Zero Dependencies Matters

Actor testing is complex enough without dependency management headaches:

  • No version conflicts - Works with any Go testing setup

  • No external tools - Everything needed is built-in

  • Simple imports - Just import "ergo.services/ergo/testing/unit"

  • Fast execution - No overhead from external libraries

Core Concepts

Now that you understand why actor testing is different, let's explore the key concepts that make this library work.

The Event-Driven Testing Model

Everything your actor does becomes a testable "event".

When you run this simple test:

actor.SendMessage(sender, "hello")
actor.ShouldSend().To(sender).Message("hello").Assert()

Here's what happens behind the scenes:

  1. Your actor receives the message - Normal actor behavior

  2. Your actor sends a response - Normal actor behavior

  3. The library captures a SendEvent - Testing magic

  4. You verify the captured event - Your assertion

The library automatically captures these events:

  • SendEvent - When your actor sends a message

  • SpawnEvent - When your actor creates child processes

  • LogEvent - When your actor writes log messages

  • TerminateEvent - When your actor shuts down

Why Events Matter

Events solve the fundamental challenge of testing asynchronous systems:

Instead of this (impossible):

actor.SendMessage("process_order")
result := actor.WaitForResult() // ❌ Actors don't work this way

You do this (works perfectly):

actor.SendMessage("process_order")
// Verify the actor did what it should do:
actor.ShouldSend().To("database").Message(SaveOrder{...}).Assert()
actor.ShouldSend().To("inventory").Message(CheckStock{...}).Assert()
actor.ShouldLog().Level(Info).Containing("Processing order").Assert()

The Fluent Assertion API

The library provides a readable, chainable API that expresses test intentions clearly:

// Basic pattern: Actor.Should[Action]().Details().Assert()

actor.ShouldSend().To(recipient).Message(content).Once().Assert()
actor.ShouldSpawn().Factory(workerFactory).Times(3).Assert()
actor.ShouldLog().Level(Info).Containing("started").Assert()
actor.ShouldTerminate().WithReason(normalShutdown).Assert()

Benefits of the fluent API:

  • Readable - Tests read like English sentences

  • Discoverable - IDE autocomplete guides you through options

  • Flexible - Chain only the validations you need

  • Precise - Specify exactly what matters for each test

Installation

go get ergo.services/ergo/testing/unit

Your First Actor Test

Let's start with the simplest possible actor test to understand the basics:

A Simple Echo Actor

package main

import (
    "testing"
    "ergo.services/ergo/act"
    "ergo.services/ergo/gen"
    "ergo.services/ergo/testing/unit"
)

// EchoActor - receives a message and sends it back
type EchoActor struct {
    act.Actor
}

func (e *EchoActor) HandleMessage(from gen.PID, message any) error {
    // Simply echo the message back to sender
    e.Send(from, message)
    return nil
}

// Factory function to create the actor
func newEchoActor() gen.ProcessBehavior {
    return &EchoActor{}
}

Testing the Echo Actor

func TestEchoActor_BasicBehavior(t *testing.T) {
    // 1. Create a test actor
    actor, err := unit.Spawn(t, newEchoActor)
    if err != nil {
        t.Fatal(err)
    }

    // 2. Create a sender PID (who is sending the message)
    sender := gen.PID{Node: "test", ID: 123}

    // 3. Send a message to the actor
    actor.SendMessage(sender, "hello world")

    // 4. Verify the actor sent the message back
    actor.ShouldSend().
        To(sender).                    // Should send to the original sender
        Message("hello world").        // Should send back the same message
        Once().                        // Should happen exactly once
        Assert()                       // Check that it actually happened
}

What Just Happened?

This simple test demonstrates the core pattern:

  1. unit.Spawn() - Creates a test actor in an isolated environment

  2. actor.SendMessage() - Sends a message to your actor (like prod would)

  3. actor.ShouldSend() - Verifies that your actor sent the expected message

Key insight: You're not testing internal state - you're testing behavior. You verify what the actor does (sends messages) rather than what it contains (internal variables).

Why This Works

The testing library automatically captures everything your actor does:

  • Every message sent by your actor

  • Every process spawned by your actor

  • Every log message written by your actor

  • When your actor terminates

Then it provides fluent assertions to verify these captured events.

Adding Slightly More Complexity

Let's test an actor that maintains some state:

type CounterActor struct {
    act.Actor
    count int
}

func (c *CounterActor) HandleMessage(from gen.PID, message any) error {
    switch message {
    case "increment":
        c.count++
        c.Send(from, c.count)
    case "get":
        c.Send(from, c.count)
    case "reset":
        c.count = 0
        c.Send(from, "reset complete")
    }
    return nil
}

func TestCounterActor_StatefulBehavior(t *testing.T) {
    actor, _ := unit.Spawn(t, func() gen.ProcessBehavior { return &CounterActor{} })
    client := gen.PID{Node: "test", ID: 456}

    // Test incrementing
    actor.SendMessage(client, "increment")
    actor.ShouldSend().To(client).Message(1).Once().Assert()

    actor.SendMessage(client, "increment")
    actor.ShouldSend().To(client).Message(2).Once().Assert()

    // Test getting current value
    actor.SendMessage(client, "get")
    actor.ShouldSend().To(client).Message(2).Once().Assert()

    // Test reset
    actor.SendMessage(client, "reset")
    actor.ShouldSend().To(client).Message("reset complete").Once().Assert()

    // Verify reset worked
    actor.SendMessage(client, "get")
    actor.ShouldSend().To(client).Message(0).Once().Assert()
}

This shows how you test stateful behavior without accessing internal state - by observing how the actor's responses change over time.

Built-in Assertions

Before diving into complex actor testing, let's cover the simple assertion utilities you'll use throughout your tests.

Why Built-in Assertions Matter for Actor Testing:

Actor tests often need to verify simple conditions alongside complex event assertions. Rather than forcing you to import external testing libraries (which could conflict with your project dependencies), the unit testing library provides everything you need:

func TestActorWithBuiltInAssertions(t *testing.T) {
    actor, _ := unit.Spawn(t, newEchoActor)
    
    // Use built-in assertions for simple checks
    unit.NotNil(t, actor, "Actor should be created successfully")
    unit.Equal(t, false, actor.IsTerminated(), "New actor should not be terminated")
    
    // Combine with actor-specific assertions
    actor.SendMessage(gen.PID{Node: "test", ID: 1}, "hello")
    actor.ShouldSend().Message("hello").Once().Assert()
}

Available Assertions

Equality Testing:

unit.Equal(t, expected, actual)        // Values must be equal
unit.NotEqual(t, unexpected, actual)   // Values must be different

Boolean Testing:

unit.True(t, condition)               // Condition must be true
unit.False(t, condition)              // Condition must be false

Nil Testing:

unit.Nil(t, value)                    // Value must be nil
unit.NotNil(t, value)                 // Value must not be nil

String Testing:

unit.Contains(t, "hello world", "world")  // String must contain substring

Type Testing:

unit.IsType(t, "", actualValue)       // Value must be of specific type

Why Zero Dependencies Matter

No Import Conflicts:

// ❌ This could cause version conflicts:
import "github.com/stretchr/testify/assert"
import "github.com/other/testing/lib"

// This always works:
import "ergo.services/ergo/testing/unit"

Consistent Error Messages: All assertions provide clear, consistent error messages that integrate well with the actor testing output.

Framework Agnostic: Works with any Go testing setup - standard go test, IDE test runners, CI/CD systems, etc.

Basic Message Testing

Now that you understand the fundamentals, let's explore message testing in more depth.

What Comes Next

Now you'll learn how to test different aspects of actor behavior, building from simple to complex:

Fundamentals (You're here!)

  • Basic message sending and receiving

  • Simple process creation

  • Logging and observability

  • Configuration testing

Intermediate Skills

  • Complex message patterns

  • Event inspection and debugging

  • Actor lifecycle and termination

  • Error handling and recovery

Advanced Features

  • Scheduled operations (cron jobs)

  • Network and distribution

  • Performance and load testing

Basic Logging Testing

Logging is crucial for production actors - it provides visibility into what your actors are doing and helps with debugging. Let's learn how to test logging behavior.

Why Test Logging?

Logging tests ensure:

  • Your actors provide sufficient information for monitoring

  • Debug information is available when needed

  • Log levels are respected (don't log debug in production)

  • Sensitive operations are properly audited

Simple Logging Test

func TestGreeter_LogsWelcomeMessage(t *testing.T) {
    actor, _ := unit.Spawn(t, newGreeter, unit.WithLogLevel(gen.LogLevelInfo))
    
    actor.SendMessage(gen.PID{}, Welcome{Name: "Alice"})
    
    // Verify the actor logged the welcome
    actor.ShouldLog().
        Level(gen.LogLevelInfo).
        Containing("Welcome Alice").
        Once().
        Assert()
}

Testing Different Log Levels

func TestDataProcessor_LogLevels(t *testing.T) {
    actor, _ := unit.Spawn(t, newDataProcessor, unit.WithLogLevel(gen.LogLevelDebug))
    
    actor.SendMessage(gen.PID{}, ProcessData{Data: "sample"})
    
    // Should log at info level for important events
    actor.ShouldLog().Level(gen.LogLevelInfo).Containing("Processing started").Once().Assert()
    
    // Should log at debug level for detailed info
    actor.ShouldLog().Level(gen.LogLevelDebug).Containing("Processing sample data").Once().Assert()
    
    // Should never log at error level for normal operations
    actor.ShouldLog().Level(gen.LogLevelError).Times(0).Assert()
}

Testing Log Content

func TestAuditLogger_SecurityEvents(t *testing.T) {
    actor, _ := unit.Spawn(t, newAuditLogger)
    
    actor.SendMessage(gen.PID{}, LoginAttempt{User: "admin", Success: false})
    
    // Verify security events are properly logged
    actor.ShouldLog().MessageMatching(func(msg string) bool {
        return strings.Contains(msg, "SECURITY") && 
               strings.Contains(msg, "admin") && 
               strings.Contains(msg, "failed")
    }).Once().Assert()
}

Logging Best Practices for Testing

Structure your log messages to make them easy to test:

// Good: Structured, predictable format
log.Info("User login: user=%s success=%t", userID, success)

// Poor: Hard to test reliably  
log.Info("User " + userID + " tried to login and it " + result)

Test log levels appropriately:

  • Error - Test that errors are logged when they occur

  • Warning - Test that concerning but non-fatal events are captured

  • Info - Test that important business events are recorded

  • Debug - Test that detailed troubleshooting info is available

Intermediate Skills

Now that you've mastered the basics, let's tackle more complex testing scenarios.

Configuration and Environment Testing

Real actors often behave differently based on configuration. Let's test this:

The Spawn function creates an isolated testing environment for your actor. Unlike production actors that run in a complex node environment, test actors run in a controlled sandbox where every operation is captured for verification.

Key Benefits:

  • Isolation: Each test actor runs independently without affecting other tests

  • Deterministic: Test outcomes are predictable and repeatable

  • Observable: All actor operations are automatically captured as events

  • Configurable: Fine-tune the testing environment to match your needs

Example Actor:

type messageCounter struct {
    act.Actor
    count int
}

func (m *messageCounter) Init(args ...any) error {
    m.count = 0
    m.Log().Info("Counter initialized")
    return nil
}

func (m *messageCounter) HandleMessage(from gen.PID, message any) error {
    switch msg := message.(type) {
    case "increment":
        m.count++
        m.Send("output", CountChanged{Count: m.count})
        m.Log().Debug("Count incremented to %d", m.count)
        return nil
    case "get_count":
        m.Send(from, CountResponse{Count: m.count})
        return nil
    case "reset":
        m.count = 0
        m.Send("output", CountReset{})
        return nil
    }
    return nil
}

type CountChanged struct{ Count int }
type CountResponse struct{ Count int }
type CountReset struct{}

func factoryMessageCounter() gen.ProcessBehavior {
    return &messageCounter{}
}

Test Implementation:

func TestMessageCounter_BasicUsage(t *testing.T) {
    // Create test actor with configuration
    actor, err := unit.Spawn(t, factoryMessageCounter,
        unit.WithLogLevel(gen.LogLevelDebug),
        unit.WithEnv(map[gen.Env]any{
            "test_mode": true,
            "timeout":   30,
        }),
    )
    if err != nil {
        t.Fatal(err)
    }

    // Test initialization
    actor.ShouldLog().Level(gen.LogLevelInfo).Containing("Counter initialized").Once().Assert()

    // Test message handling
    actor.SendMessage(gen.PID{}, "increment")
    actor.ShouldSend().To("output").Message(CountChanged{Count: 1}).Once().Assert()
    actor.ShouldLog().Level(gen.LogLevelDebug).Containing("Count incremented to 1").Once().Assert()

    // Test state query
    actor.SendMessage(gen.PID{Node: "test", ID: 123}, "get_count")
    actor.ShouldSend().To(gen.PID{Node: "test", ID: 123}).Message(CountResponse{Count: 1}).Once().Assert()

    // Test reset
    actor.SendMessage(gen.PID{}, "reset")
    actor.ShouldSend().To("output").Message(CountReset{}).Once().Assert()
}

Configuration Options - Fine-Tuning the Test Environment

Test configuration allows you to simulate different runtime conditions without requiring complex setup:

// Available options for unit.Spawn()
unit.WithLogLevel(gen.LogLevelDebug)                    // Set log level
unit.WithEnv(map[gen.Env]any{"key": "value"})          // Environment variables
unit.WithParent(gen.PID{Node: "parent", ID: 100})      // Parent process
unit.WithRegister(gen.Atom("registered_name"))         // Register with name
unit.WithNodeName(gen.Atom("test_node@localhost"))     // Node name

Environment Variables (WithEnv): Test how your actors behave with different configurations without changing production code. Useful for testing feature flags, database URLs, timeout values, and other configuration-driven behavior.

Log Levels (WithLogLevel): Control the verbosity of test output and verify that your actors log appropriately at different levels. Critical for testing monitoring and debugging capabilities.

Process Hierarchy (WithParent, WithRegister): Test actors that need to interact with parent processes or require specific naming for registration-based lookups.

Message Testing

ShouldSend() - Verifying Actor Communication

Message testing is the heart of actor validation. Since actors communicate exclusively through messages, verifying message flow is crucial for ensuring correct behavior.

Why Message Testing Matters:

  • Validates Integration: Ensures actors communicate correctly with their dependencies

  • Confirms Business Logic: Verifies that the right messages are sent in response to inputs

  • Detects Side Effects: Catches unintended message sends that could cause bugs

  • Tests Message Content: Validates that message payloads contain correct data

Example Actor:

type notificationService struct {
    act.Actor
    subscribers []gen.PID
}

func (n *notificationService) HandleMessage(from gen.PID, message any) error {
    switch msg := message.(type) {
    case Subscribe:
        n.subscribers = append(n.subscribers, msg.PID)
        n.Send(msg.PID, SubscriptionConfirmed{})
        return nil
    case Broadcast:
        for _, subscriber := range n.subscribers {
            n.Send(subscriber, Notification{
                ID:      msg.ID,
                Message: msg.Message,
                Sender:  from,
            })
        }
        n.Send("analytics", BroadcastSent{
            ID:          msg.ID,
            Subscribers: len(n.subscribers),
        })
        return nil
    }
    return nil
}

type Subscribe struct{ PID gen.PID }
type SubscriptionConfirmed struct{}
type Broadcast struct{ ID string; Message string }
type Notification struct{ ID, Message string; Sender gen.PID }
type BroadcastSent struct{ ID string; Subscribers int }

Test Implementation:

func TestNotificationService_MessageSending(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryNotificationService)

    subscriber1 := gen.PID{Node: "test", ID: 101}
    subscriber2 := gen.PID{Node: "test", ID: 102}

    // Test subscription
    actor.SendMessage(gen.PID{}, Subscribe{PID: subscriber1})
    actor.SendMessage(gen.PID{}, Subscribe{PID: subscriber2})

    // Verify subscription confirmations
    actor.ShouldSend().To(subscriber1).Message(SubscriptionConfirmed{}).Once().Assert()
    actor.ShouldSend().To(subscriber2).Message(SubscriptionConfirmed{}).Once().Assert()

    // Test broadcast
    broadcaster := gen.PID{Node: "test", ID: 200}
    actor.SendMessage(broadcaster, Broadcast{ID: "msg-123", Message: "Hello World"})

    // Verify notifications sent to all subscribers
    actor.ShouldSend().To(subscriber1).MessageMatching(func(msg any) bool {
        if notif, ok := msg.(Notification); ok {
            return notif.ID == "msg-123" && 
                   notif.Message == "Hello World" &&
                   notif.Sender == broadcaster
        }
        return false
    }).Once().Assert()

    actor.ShouldSend().To(subscriber2).MessageMatching(func(msg any) bool {
        if notif, ok := msg.(Notification); ok {
            return notif.ID == "msg-123" && notif.Message == "Hello World"
        }
        return false
    }).Once().Assert()

    // Verify analytics
    actor.ShouldSend().To("analytics").Message(BroadcastSent{
        ID:          "msg-123",
        Subscribers: 2,
    }).Once().Assert()

    // Test multiple sends to same target
    actor.SendMessage(broadcaster, Broadcast{ID: "msg-124", Message: "Second message"})
    actor.ShouldSend().To("analytics").Times(2).Assert() // Total of 2 analytics messages
}

Advanced Message Matching - Flexible Validation Patterns

When testing complex message structures or dynamic content, the library provides powerful matching capabilities:

// Message type matching
actor.ShouldSend().MessageMatching(unit.IsTypeGeneric[CountChanged]()).Assert()

// Field-based matching
actor.ShouldSend().MessageMatching(unit.HasField("Count", unit.Equals(5))).Assert()

// Structure matching with custom field validation
actor.ShouldSend().MessageMatching(
    unit.StructureMatching(Notification{}, map[string]unit.Matcher{
        "ID":      unit.Equals("msg-123"),
        "Sender":  unit.IsValidPID(),
    }),
).Assert()

// Never sent verification
actor.ShouldNotSend().To("error_handler").Message("error").Assert()

Pattern Matching Benefits:

  • Partial Validation: Test only the fields that matter for your specific test case

  • Dynamic Content Handling: Validate messages with timestamps, UUIDs, or generated IDs

  • Type Safety: Ensure messages are of the correct type even when content varies

  • Negative Testing: Verify that certain messages are NOT sent in specific scenarios

Process Spawning

ShouldSpawn() - Testing Process Lifecycle Management

Process spawning is a fundamental actor pattern for building hierarchical systems. The testing library provides comprehensive tools for verifying that actors create, configure, and manage child processes correctly.

Why Process Spawning Tests Matter:

  • Resource Management: Ensure actors don't spawn too many or too few processes

  • Configuration Propagation: Verify that child processes receive correct configuration

  • Error Handling: Test behavior when process spawning fails

  • Supervision Trees: Validate that supervisors manage their children appropriately

Example Actor:

type workerSupervisor struct {
    act.Actor
    workers    map[string]gen.PID
    maxWorkers int
}

func (w *workerSupervisor) Init(args ...any) error {
    w.workers = make(map[string]gen.PID)
    w.maxWorkers = 3
    return nil
}

func (w *workerSupervisor) HandleMessage(from gen.PID, message any) error {
    switch msg := message.(type) {
    case StartWorker:
        if len(w.workers) >= w.maxWorkers {
            w.Send(from, WorkerError{Error: "max workers reached"})
            return nil
        }

        // Spawn worker with dynamic name
        workerPID, err := w.Spawn(factoryWorker, gen.ProcessOptions{}, msg.WorkerID)
        if err != nil {
            w.Send(from, WorkerError{Error: err.Error()})
            return nil
        }

        w.workers[msg.WorkerID] = workerPID
        w.Send(from, WorkerStarted{WorkerID: msg.WorkerID, PID: workerPID})
        w.Send("monitor", SupervisorStatus{
            ActiveWorkers: len(w.workers),
            MaxWorkers:    w.maxWorkers,
        })
        return nil

    case StopWorker:
        if pid, exists := w.workers[msg.WorkerID]; exists {
            w.SendExit(pid, gen.TerminateReasonShutdown)
            delete(w.workers, msg.WorkerID)
            w.Send(from, WorkerStopped{WorkerID: msg.WorkerID})
        }
        return nil

    case StopAllWorkers:
        for workerID, pid := range w.workers {
            w.SendExit(pid, gen.TerminateReasonShutdown)
            delete(w.workers, workerID)
        }
        w.Send(from, AllWorkersStopped{Count: len(w.workers)})
        return nil
    }
    return nil
}

type StartWorker struct{ WorkerID string }
type StopWorker struct{ WorkerID string }
type StopAllWorkers struct{}
type WorkerStarted struct{ WorkerID string; PID gen.PID }
type WorkerStopped struct{ WorkerID string }
type WorkerError struct{ Error string }
type AllWorkersStopped struct{ Count int }
type SupervisorStatus struct{ ActiveWorkers, MaxWorkers int }

func factoryWorker() gen.ProcessBehavior { return &worker{} }
func factoryWorkerSupervisor() gen.ProcessBehavior { return &workerSupervisor{} }

type worker struct{ act.Actor }
func (w *worker) HandleMessage(from gen.PID, message any) error { return nil }

Test Implementation:

func TestWorkerSupervisor_SpawnManagement(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryWorkerSupervisor)
    client := gen.PID{Node: "test", ID: 999}

    // Test worker spawning
    actor.SendMessage(client, StartWorker{WorkerID: "worker-1"})

    // Capture the spawn event to get the PID
    spawnResult := actor.ShouldSpawn().Factory(factoryWorker).Once().Capture()
    unit.NotNil(t, spawnResult)

    // Verify worker started response
    actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
        if started, ok := msg.(WorkerStarted); ok {
            return started.WorkerID == "worker-1" && started.PID == spawnResult.PID
        }
        return false
    }).Once().Assert()

    // Verify monitor notification
    actor.ShouldSend().To("monitor").Message(SupervisorStatus{
        ActiveWorkers: 1,
        MaxWorkers:    3,
    }).Once().Assert()

    // Test multiple workers
    actor.SendMessage(client, StartWorker{WorkerID: "worker-2"})
    actor.SendMessage(client, StartWorker{WorkerID: "worker-3"})

    // Should have spawned 3 workers total
    actor.ShouldSpawn().Factory(factoryWorker).Times(3).Assert()

    // Test max worker limit
    actor.SendMessage(client, StartWorker{WorkerID: "worker-4"})
    actor.ShouldSend().To(client).Message(WorkerError{Error: "max workers reached"}).Once().Assert()
    
    // Should still only have 3 spawned workers
    actor.ShouldSpawn().Factory(factoryWorker).Times(3).Assert()

    // Test stopping a worker
    actor.SendMessage(client, StopWorker{WorkerID: "worker-1"})
    actor.ShouldSend().To(client).Message(WorkerStopped{WorkerID: "worker-1"}).Once().Assert()
}

Dynamic Process Testing - Handling Generated Values

Real-world actors often generate dynamic values like session IDs, request tokens, or timestamps. The library provides sophisticated tools for capturing and validating these dynamic values.

func TestDynamicProcessCreation(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryTaskProcessor)

    // Test dynamic process creation with captured PIDs
    actor.SendMessage(gen.PID{}, CreateSessionWorker{UserID: "user123"})

    // Capture the spawn to get dynamic PID
    spawnResult := actor.ShouldSpawn().Once().Capture()
    sessionPID := spawnResult.PID

    // Verify session was registered with the dynamic PID
    actor.ShouldSend().To("session_registry").MessageMatching(func(msg any) bool {
        if reg, ok := msg.(SessionRegistered); ok {
            return reg.UserID == "user123" && reg.SessionPID == sessionPID
        }
        return false
    }).Once().Assert()

    // Test sending work to the dynamic session
    actor.SendMessage(gen.PID{}, SendToSession{
        UserID: "user123", 
        Task:   "process_data",
    })

    // Should route to the captured session PID
    actor.ShouldSend().To(sessionPID).MessageMatching(func(msg any) bool {
        if task, ok := msg.(SessionTask); ok {
            return task.Task == "process_data"
        }
        return false
    }).Once().Assert()
}

// Required message types for this example:
type CreateSessionWorker struct{ UserID string }
type SessionRegistered struct{ UserID string; SessionPID gen.PID }
type SendToSession struct{ UserID, Task string }
type SessionTask struct{ Task string }
// factoryTaskProcessor() gen.ProcessBehavior function would be defined separately

Dynamic Value Testing Scenarios:

  • Session Management: Test actors that create sessions with generated IDs

  • Request Tracking: Verify that request tokens are properly generated and used

  • Time-based Operations: Validate actors that schedule work or create timestamps

  • Resource Allocation: Test dynamic assignment of resources to processes

Remote Spawn Testing

ShouldRemoteSpawn() - Testing Distributed Actor Creation

Remote spawn testing allows you to verify that actors correctly create processes on remote nodes in a distributed system. The testing library captures RemoteSpawnEvent operations and provides fluent assertions for validation.

Why Test Remote Spawning:

  • Distribution Logic: Ensure actors spawn processes on the correct remote nodes

  • Load Distribution: Verify round-robin or other distribution strategies work correctly

  • Error Handling: Test behavior when remote nodes are unavailable

  • Resource Management: Validate that remote spawning respects capacity limits

Example Actor:

type distributedCoordinator struct {
    act.Actor
    nodeAvailability map[gen.Atom]bool
    roundRobin      int
}

func (dc *distributedCoordinator) HandleMessage(from gen.PID, message any) error {
    switch msg := message.(type) {
    case SpawnRemoteWorker:
        if !dc.isNodeAvailable(msg.NodeName) {
            dc.Send(from, RemoteSpawnError{
                NodeName: msg.NodeName,
                Error:    "node not available",
            })
            return nil
        }

        // Use RemoteSpawn which generates RemoteSpawnEvent
        pid, err := dc.RemoteSpawn(msg.NodeName, msg.WorkerName, gen.ProcessOptions{}, msg.Config)
        if err != nil {
            dc.Send(from, RemoteSpawnError{NodeName: msg.NodeName, Error: err.Error()})
            return nil
        }

        dc.Send(from, RemoteWorkerSpawned{
            NodeName:   msg.NodeName,
            WorkerName: msg.WorkerName,
            PID:        pid,
        })
        return nil

    case SpawnRemoteService:
        // Use RemoteSpawnRegister which generates RemoteSpawnEvent with registration
        pid, err := dc.RemoteSpawnRegister(msg.NodeName, msg.ServiceName, msg.RegisterName, gen.ProcessOptions{})
        if err != nil {
            dc.Send(from, RemoteSpawnError{NodeName: msg.NodeName, Error: err.Error()})
            return nil
        }

        dc.Send(from, RemoteServiceSpawned{
            NodeName:     msg.NodeName,
            ServiceName:  msg.ServiceName,
            RegisterName: msg.RegisterName,
            PID:          pid,
        })
        return nil
    }
    return nil
}

type SpawnRemoteWorker struct{ NodeName, WorkerName gen.Atom; Config map[string]any }
type SpawnRemoteService struct{ NodeName, ServiceName, RegisterName gen.Atom }
type RemoteWorkerSpawned struct{ NodeName, WorkerName gen.Atom; PID gen.PID }
type RemoteServiceSpawned struct{ NodeName, ServiceName, RegisterName gen.Atom; PID gen.PID }
type RemoteSpawnError struct{ NodeName gen.Atom; Error string }

Test Implementation:

func TestDistributedCoordinator_RemoteSpawn(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryDistributedCoordinator)

    // Setup remote nodes for testing
    actor.CreateRemoteNode("worker@node1", true)  // Available
    actor.CreateRemoteNode("worker@node2", false) // Unavailable

    clientPID := gen.PID{Node: "test", ID: 100}
    actor.ClearEvents() // Clear initialization events

    // Test basic remote spawn
    actor.SendMessage(clientPID, SpawnRemoteWorker{
        NodeName:   "worker@node1",
        WorkerName: "data-processor",
        Config:     map[string]any{"timeout": 30},
    })

    // Verify remote spawn event
    actor.ShouldRemoteSpawn().
        ToNode("worker@node1").
        WithName("data-processor").
        Once().
        Assert()

    // Test remote spawn with registration
    actor.SendMessage(clientPID, SpawnRemoteService{
        NodeName:     "worker@node1",
        ServiceName:  "user-service",
        RegisterName: "users",
    })

    // Verify remote spawn with register
    actor.ShouldRemoteSpawn().
        ToNode("worker@node1").
        WithName("user-service").
        WithRegister("users").
        Once().
        Assert()

    // Test total remote spawns
    actor.ShouldRemoteSpawn().Times(2).Assert()

    // Test negative assertion - should not spawn on unavailable node
    actor.SendMessage(clientPID, SpawnRemoteWorker{
        NodeName:   "worker@node2",
        WorkerName: "test-worker",
    })

    actor.ShouldNotRemoteSpawn().ToNode("worker@node2").Assert()
}

Advanced Remote Spawn Patterns:

  • Multi-Node Distribution: Test round-robin or other distribution strategies across multiple nodes

  • Error Scenarios: Verify proper error handling when nodes are unavailable

  • Event Inspection: Direct inspection of RemoteSpawnEvent for detailed validation

  • Negative Assertions: Ensure remote spawns don't happen under certain conditions

Actor Termination Testing

ShouldTerminate() - Testing Actor Lifecycle Completion

Actor termination is a critical aspect of actor systems. Actors can terminate for various reasons: normal completion, explicit shutdown, or errors. The testing library provides comprehensive tools for validating termination behavior and ensuring proper cleanup.

Why Test Actor Termination:

  • Resource Cleanup: Ensure actors properly clean up resources when terminating

  • Error Propagation: Verify that errors are handled correctly and lead to appropriate termination

  • Graceful Shutdown: Test that actors respond correctly to shutdown signals

  • Supervision Trees: Validate that supervisors handle child termination appropriately

Termination Reasons:

  • gen.TerminateReasonNormal - Normal completion of actor work

  • gen.TerminateReasonShutdown - Graceful shutdown request

  • Custom errors - Abnormal termination due to specific errors

Example Actor:

type connectionManager struct {
    act.Actor
    connections map[string]*Connection
    maxRetries  int
}

func (c *connectionManager) Init(args ...any) error {
    c.connections = make(map[string]*Connection)
    c.maxRetries = 3
    c.Log().Info("Connection manager started")
    return nil
}

func (c *connectionManager) HandleMessage(from gen.PID, message any) error {
    switch msg := message.(type) {
    case CreateConnection:
        conn := &Connection{ID: msg.ID, Status: "active"}
        c.connections[msg.ID] = conn
        c.Send(from, ConnectionCreated{ID: msg.ID})
        c.Log().Info("Created connection %s", msg.ID)
        return nil

    case CloseConnection:
        if conn, exists := c.connections[msg.ID]; exists {
            conn.Close()
            delete(c.connections, msg.ID)
            c.Send(from, ConnectionClosed{ID: msg.ID})
            c.Log().Info("Closed connection %s", msg.ID)
        }
        return nil

    case "shutdown":
        // Graceful shutdown - close all connections
        for id, conn := range c.connections {
            conn.Close()
            c.Log().Info("Shutdown: closed connection %s", id)
        }
        c.Send("monitor", ShutdownComplete{ConnectionsClosed: len(c.connections)})
        return gen.TerminateReasonShutdown

    case ConnectionError:
        c.Log().Error("Connection error for %s: %s", msg.ID, msg.Error)
        msg.RetryCount++
        
        if msg.RetryCount >= c.maxRetries {
            c.Log().Error("Max retries exceeded for connection %s", msg.ID)
            return fmt.Errorf("connection failed after %d retries: %s", c.maxRetries, msg.Error)
        }
        
        // Retry the connection
        c.Send(c.PID(), CreateConnection{ID: msg.ID})
        return nil

    case "force_error":
        // Simulate critical error
        return fmt.Errorf("critical system error: database unavailable")
    }
    return nil
}

type CreateConnection struct{ ID string }
type CloseConnection struct{ ID string }
type ConnectionCreated struct{ ID string }
type ConnectionClosed struct{ ID string }
type ConnectionError struct{ ID, Error string; RetryCount int }
type ShutdownComplete struct{ ConnectionsClosed int }

type Connection struct {
    ID     string
    Status string
}

func (c *Connection) Close() { c.Status = "closed" }

func factoryConnectionManager() gen.ProcessBehavior {
    return &connectionManager{}
}

Test Implementation:

func TestConnectionManager_TerminationHandling(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryConnectionManager)
    client := gen.PID{Node: "test", ID: 100}

    // Test normal operation first
    actor.SendMessage(client, CreateConnection{ID: "conn-1"})
    actor.ShouldSend().To(client).Message(ConnectionCreated{ID: "conn-1"}).Once().Assert()
    
    // Verify actor is not terminated during normal operation
    unit.Equal(t, false, actor.IsTerminated())
    unit.Nil(t, actor.TerminationReason())

    // Test graceful shutdown
    actor.SendMessage(client, "shutdown")
    
    // Verify shutdown message sent
    actor.ShouldSend().To("monitor").MessageMatching(func(msg any) bool {
        if shutdown, ok := msg.(ShutdownComplete); ok {
            return shutdown.ConnectionsClosed == 1
        }
        return false
    }).Once().Assert()

    // Verify graceful termination
    unit.Equal(t, true, actor.IsTerminated())
    unit.Equal(t, gen.TerminateReasonShutdown, actor.TerminationReason())

    // Verify termination event was captured
    actor.ShouldTerminate().
        WithReason(gen.TerminateReasonShutdown).
        Once().
        Assert()
}

func TestConnectionManager_ErrorTermination(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryConnectionManager)

    // Test abnormal termination due to critical error
    actor.SendMessage(gen.PID{}, "force_error")

    // Verify actor terminated with error
    unit.Equal(t, true, actor.IsTerminated())
    unit.NotNil(t, actor.TerminationReason())
    unit.Contains(t, actor.TerminationReason().Error(), "critical system error")

    // Verify termination event with specific error
    actor.ShouldTerminate().
        ReasonMatching(func(reason error) bool {
            return strings.Contains(reason.Error(), "database unavailable")
        }).
        Once().
        Assert()
}

func TestConnectionManager_RetryBeforeTermination(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryConnectionManager)

    // Test retry logic before termination
    actor.SendMessage(gen.PID{}, CreateConnection{ID: "conn-retry"})
    actor.ClearEvents() // Clear creation events

    // Send connection errors that should trigger retries
    for i := 0; i < 2; i++ {
        actor.SendMessage(gen.PID{}, ConnectionError{
            ID:         "conn-retry",
            Error:      "network timeout",
            RetryCount: i,
        })

        // Should not terminate yet
        unit.Equal(t, false, actor.IsTerminated())
        
        // Should retry by sending CreateConnection
        actor.ShouldSend().To(actor.PID()).MessageMatching(func(msg any) bool {
            if create, ok := msg.(CreateConnection); ok {
                return create.ID == "conn-retry"
            }
            return false
        }).Once().Assert()
    }

    // Final error that exceeds max retries
    actor.SendMessage(gen.PID{}, ConnectionError{
        ID:         "conn-retry",
        Error:      "network timeout",
        RetryCount: 3, // Exceeds maxRetries
    })

    // Now should terminate with error
    unit.Equal(t, true, actor.IsTerminated())
    unit.Contains(t, actor.TerminationReason().Error(), "connection failed after 3 retries")

    // Verify termination assertion
    actor.ShouldTerminate().
        ReasonMatching(func(reason error) bool {
            return strings.Contains(reason.Error(), "retries") && 
                   strings.Contains(reason.Error(), "network timeout")
        }).
        Once().
        Assert()
}

func TestTerminatedActor_NoFurtherProcessing(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryConnectionManager)

    // Terminate the actor
    actor.SendMessage(gen.PID{}, "force_error")
    unit.Equal(t, true, actor.IsTerminated())

    actor.ClearEvents() // Clear termination events

    // Try to send more messages - should not be processed
    actor.SendMessage(gen.PID{}, CreateConnection{ID: "should-not-work"})
    
    // Should not process the message (no CreateConnection response)
    actor.ShouldNotSend().To(gen.PID{}).Message(ConnectionCreated{ID: "should-not-work"}).Assert()
    
    // Should not create any new events
    events := actor.Events()
    unit.Equal(t, 0, len(events), "Terminated actor should not process messages")
}

#### Termination Testing Methods

**TestActor Termination Status:**
```go
// Check if actor is terminated
isTerminated := actor.IsTerminated() // bool

// Get termination reason (nil if not terminated)
reason := actor.TerminationReason() // error or nil

// Test that actor should terminate
actor.ShouldTerminate().Once().Assert()

// Test with specific reason
actor.ShouldTerminate().WithReason(gen.TerminateReasonShutdown).Assert()

// Test with reason matching
actor.ShouldTerminate().ReasonMatching(func(reason error) bool {
    return strings.Contains(reason.Error(), "expected error")
}).Assert()

// Test that actor should NOT terminate
actor.ShouldNotTerminate().Assert()

Advanced Termination Patterns:

// Test multiple termination attempts
actor.ShouldTerminate().Times(1).Assert() // Should terminate exactly once

// Capture termination for detailed analysis
terminationResult := actor.ShouldTerminate().Once().Capture()
unit.NotNil(t, terminationResult)
unit.Equal(t, expectedReason, terminationResult.Reason)

// Test termination with timeout
success := unit.WithTimeout(func() {
    actor.SendMessage(gen.PID{}, "shutdown")
    actor.ShouldTerminate().Once().Assert()
}, 5*time.Second)
unit.True(t, success(), "Actor should terminate within timeout")

Exit Signal Testing

ShouldSendExit() - Testing Graceful Process Termination

Exit signals (SendExit and SendExitMeta) are used to gracefully terminate other processes. This is different from actor self-termination - it's about one actor telling another to exit. The testing library provides comprehensive assertions for validating exit signal behavior.

Why Test Exit Signals:

  • Graceful Shutdown: Ensure supervisors can properly terminate child processes

  • Resource Cleanup: Verify that exit signals trigger proper cleanup in target processes

  • Error Propagation: Test that failure conditions are communicated via exit signals

  • Supervision Trees: Validate that supervisors manage process lifecycles correctly

Example Actor:

type processSupervisor struct {
    act.Actor
    workers map[string]gen.PID
    maxWorkers int
}

func (p *processSupervisor) Init(args ...any) error {
    p.workers = make(map[string]gen.PID)
    p.maxWorkers = 5
    return nil
}

func (p *processSupervisor) HandleMessage(from gen.PID, message any) error {
    switch msg := message.(type) {
    case StartWorker:
        if len(p.workers) >= p.maxWorkers {
            p.Send(from, WorkerStartError{Error: "max workers reached"})
            return nil
        }

        workerPID, err := p.Spawn(factoryWorkerProcess, gen.ProcessOptions{}, msg.WorkerID)
        if err != nil {
            p.Send(from, WorkerStartError{Error: err.Error()})
            return nil
        }

        p.workers[msg.WorkerID] = workerPID
        p.Send(from, WorkerStarted{WorkerID: msg.WorkerID, PID: workerPID})
        return nil

    case StopWorker:
        if workerPID, exists := p.workers[msg.WorkerID]; exists {
            // Send exit signal to worker
            p.SendExit(workerPID, gen.TerminateReasonShutdown)
            delete(p.workers, msg.WorkerID)
            p.Send(from, WorkerStopped{WorkerID: msg.WorkerID})
            p.Log().Info("Sent exit signal to worker %s", msg.WorkerID)
        } else {
            p.Send(from, WorkerStopError{WorkerID: msg.WorkerID, Error: "worker not found"})
        }
        return nil

    case EmergencyShutdown:
        // Send exit signals to all workers with error reason
        shutdownReason := fmt.Errorf("emergency shutdown: %s", msg.Reason)
        
        for workerID, workerPID := range p.workers {
            p.SendExit(workerPID, shutdownReason)
            p.Log().Warning("Emergency shutdown: sent exit to worker %s", workerID)
        }
        
        // Send meta exit signal to monitoring system
        p.SendExitMeta(gen.PID{Node: "monitor", ID: 999}, shutdownReason)
        
        p.Send(from, EmergencyShutdownComplete{
            WorkersTerminated: len(p.workers),
            Reason:           msg.Reason,
        })
        
        p.workers = make(map[string]gen.PID) // Clear workers map
        return nil

    case TerminateWorkerWithError:
        if workerPID, exists := p.workers[msg.WorkerID]; exists {
            errorReason := fmt.Errorf("worker error: %s", msg.Error)
            p.SendExit(workerPID, errorReason)
            delete(p.workers, msg.WorkerID)
            
            p.Send(from, WorkerTerminated{
                WorkerID: msg.WorkerID,
                Reason:   msg.Error,
            })
        }
        return nil
    }
    return nil
}

type StartWorker struct{ WorkerID string }
type StopWorker struct{ WorkerID string }
type EmergencyShutdown struct{ Reason string }
type TerminateWorkerWithError struct{ WorkerID, Error string }

type WorkerStarted struct{ WorkerID string; PID gen.PID }
type WorkerStopped struct{ WorkerID string }
type WorkerStartError struct{ Error string }
type WorkerStopError struct{ WorkerID, Error string }
type EmergencyShutdownComplete struct{ WorkersTerminated int; Reason string }
type WorkerTerminated struct{ WorkerID, Reason string }

type workerProcess struct{ act.Actor }
func (w *workerProcess) HandleMessage(from gen.PID, message any) error { return nil }
func factoryWorkerProcess() gen.ProcessBehavior { return &workerProcess{} }
func factoryProcessSupervisor() gen.ProcessBehavior { return &processSupervisor{} }

Test Implementation:

func TestProcessSupervisor_ExitSignals(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryProcessSupervisor)
    client := gen.PID{Node: "test", ID: 100}

    // Start some workers
    actor.SendMessage(client, StartWorker{WorkerID: "worker-1"})
    actor.SendMessage(client, StartWorker{WorkerID: "worker-2"})

    // Capture worker PIDs for validation
    spawn1 := actor.ShouldSpawn().Factory(factoryWorkerProcess).Once().Capture()
    spawn2 := actor.ShouldSpawn().Factory(factoryWorkerProcess).Once().Capture()
    
    worker1PID := spawn1.PID
    worker2PID := spawn2.PID

    actor.ClearEvents() // Clear spawn events

    // Test graceful worker stop
    actor.SendMessage(client, StopWorker{WorkerID: "worker-1"})

    // Verify exit signal sent to worker
    actor.ShouldSendExit().
        To(worker1PID).
        WithReason(gen.TerminateReasonShutdown).
        Once().
        Assert()

    // Verify stop confirmation
    actor.ShouldSend().To(client).Message(WorkerStopped{WorkerID: "worker-1"}).Once().Assert()

    // Test worker termination with custom error
    actor.SendMessage(client, TerminateWorkerWithError{
        WorkerID: "worker-2",
        Error:    "memory leak detected",
    })

    // Verify exit signal with custom error reason
    actor.ShouldSendExit().
        To(worker2PID).
        ReasonMatching(func(reason error) bool {
            return strings.Contains(reason.Error(), "memory leak detected")
        }).
        Once().
        Assert()

    // Verify termination response
    actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
        if terminated, ok := msg.(WorkerTerminated); ok {
            return terminated.WorkerID == "worker-2" && 
                   terminated.Reason == "memory leak detected"
        }
        return false
    }).Once().Assert()
}

func TestProcessSupervisor_EmergencyShutdown(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryProcessSupervisor)
    client := gen.PID{Node: "test", ID: 100}

    // Start multiple workers
    for i := 1; i <= 3; i++ {
        actor.SendMessage(client, StartWorker{WorkerID: fmt.Sprintf("worker-%d", i)})
    }

    // Capture all worker PIDs
    workers := make([]gen.PID, 3)
    for i := 0; i < 3; i++ {
        spawn := actor.ShouldSpawn().Factory(factoryWorkerProcess).Once().Capture()
        workers[i] = spawn.PID
    }

    actor.ClearEvents() // Clear spawn events

    // Trigger emergency shutdown
    actor.SendMessage(client, EmergencyShutdown{Reason: "system overload"})

    // Verify exit signals sent to all workers
    for _, workerPID := range workers {
        actor.ShouldSendExit().
            To(workerPID).
            ReasonMatching(func(reason error) bool {
                return strings.Contains(reason.Error(), "emergency shutdown") &&
                       strings.Contains(reason.Error(), "system overload")
            }).
            Once().
            Assert()
    }

    // Verify meta exit signal sent to monitoring
    monitorPID := gen.PID{Node: "monitor", ID: 999}
    actor.ShouldSendExitMeta().
        To(monitorPID).
        ReasonMatching(func(reason error) bool {
            return strings.Contains(reason.Error(), "system overload")
        }).
        Once().
        Assert()

    // Verify shutdown completion message
    actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
        if complete, ok := msg.(EmergencyShutdownComplete); ok {
            return complete.WorkersTerminated == 3 && 
                   complete.Reason == "system overload"
        }
        return false
    }).Once().Assert()

    // Verify total exit signals (3 workers + 1 meta)
    actor.ShouldSendExit().Times(3).Assert()
    actor.ShouldSendExitMeta().Times(1).Assert()
}

func TestExitSignal_NegativeAssertions(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryProcessSupervisor)
    client := gen.PID{Node: "test", ID: 100}

    // Try to stop non-existent worker
    actor.SendMessage(client, StopWorker{WorkerID: "non-existent"})

    // Should not send any exit signals
    actor.ShouldNotSendExit().Assert()
    actor.ShouldNotSendExitMeta().Assert()

    // Should send error response instead
    actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
        if stopError, ok := msg.(WorkerStopError); ok {
            return stopError.WorkerID == "non-existent" && 
                   stopError.Error == "worker not found"
        }
        return false
    }).Once().Assert()
}

Exit Signal Testing Methods

Basic Exit Signal Assertions:

// Test that exit signal was sent
actor.ShouldSendExit().To(targetPID).Once().Assert()

// Test with specific reason
actor.ShouldSendExit().To(targetPID).WithReason(gen.TerminateReasonShutdown).Assert()

// Test with reason matching
actor.ShouldSendExit().ReasonMatching(func(reason error) bool {
    return strings.Contains(reason.Error(), "expected error")
}).Assert()

// Test meta exit signals
actor.ShouldSendExitMeta().To(monitorPID).WithReason(errorReason).Assert()

// Negative assertions
actor.ShouldNotSendExit().To(targetPID).Assert()
actor.ShouldNotSendExitMeta().Assert()

Advanced Exit Signal Patterns:

// Test multiple exit signals
actor.ShouldSendExit().Times(3).Assert() // Should send exactly 3 exit signals

// Test exit signals to specific targets
actor.ShouldSendExit().To(worker1PID).Once().Assert()
actor.ShouldSendExit().To(worker2PID).Once().Assert()

// Capture exit signal for detailed analysis
exitResult := actor.ShouldSendExit().Once().Capture()
unit.NotNil(t, exitResult)
unit.Equal(t, expectedPID, exitResult.To)
unit.Equal(t, expectedReason, exitResult.Reason)

// Combined assertions
actor.ShouldSendExit().To(workerPID).WithReason(gen.TerminateReasonShutdown).Once().Assert()
actor.ShouldSendExitMeta().To(monitorPID).ReasonMatching(func(r error) bool {
    return strings.Contains(r.Error(), "shutdown complete")
}).Once().Assert()

Cron Testing

ShouldAddCronJob(), ShouldExecuteCronJob() - Testing Scheduled Operations

Cron job testing allows you to validate scheduled operations in your actors without waiting for real time to pass. The testing library provides comprehensive mock time support and detailed cron job lifecycle management.

Why Test Cron Jobs:

  • Schedule Validation: Ensure cron expressions are correct and jobs run at expected times

  • Job Management: Test job addition, removal, enabling, and disabling operations

  • Execution Logic: Verify that scheduled operations perform correctly when triggered

  • Time Control: Use mock time to test time-dependent behavior deterministically

Cron Testing Features:

  • Mock Time Support: Control time flow for deterministic testing

  • Job Lifecycle Testing: Validate job creation, scheduling, execution, and cleanup

  • Event Tracking: Monitor all cron-related operations and state changes

  • Schedule Simulation: Test complex scheduling scenarios without real time delays

Example Actor:

type taskScheduler struct {
    act.Actor
    taskCounter int
    schedules   map[string]gen.CronJobSchedule
}

func (t *taskScheduler) Init(args ...any) error {
    t.taskCounter = 0
    t.schedules = make(map[string]gen.CronJobSchedule)
    t.Log().Info("Task scheduler started")
    return nil
}

func (t *taskScheduler) HandleMessage(from gen.PID, message any) error {
    switch msg := message.(type) {
    case ScheduleTask:
        // Add a new cron job
        jobID, err := t.Cron().AddJob(msg.Schedule, gen.CronJobFunction(func() {
            t.taskCounter++
            t.Send("output", TaskExecuted{
                TaskID:    msg.TaskID,
                Count:     t.taskCounter,
                Timestamp: time.Now(),
            })
            t.Log().Info("Executed scheduled task %s (count: %d)", msg.TaskID, t.taskCounter)
        }))
        
        if err != nil {
            t.Send(from, ScheduleError{TaskID: msg.TaskID, Error: err.Error()})
            return nil
        }

        t.schedules[msg.TaskID] = gen.CronJobSchedule{ID: jobID, Schedule: msg.Schedule}
        t.Send(from, TaskScheduled{TaskID: msg.TaskID, JobID: jobID})
        t.Log().Debug("Scheduled task %s with job ID %s", msg.TaskID, jobID)
        return nil

    case UnscheduleTask:
        if schedule, exists := t.schedules[msg.TaskID]; exists {
            err := t.Cron().RemoveJob(schedule.ID)
            if err != nil {
                t.Send(from, UnscheduleError{TaskID: msg.TaskID, Error: err.Error()})
                return nil
            }
            
            delete(t.schedules, msg.TaskID)
            t.Send(from, TaskUnscheduled{TaskID: msg.TaskID})
            t.Log().Debug("Unscheduled task %s", msg.TaskID)
        } else {
            t.Send(from, UnscheduleError{TaskID: msg.TaskID, Error: "task not found"})
        }
        return nil

    case EnableTask:
        if schedule, exists := t.schedules[msg.TaskID]; exists {
            err := t.Cron().EnableJob(schedule.ID)
            if err != nil {
                t.Send(from, TaskError{TaskID: msg.TaskID, Error: err.Error()})
                return nil
            }
            t.Send(from, TaskEnabled{TaskID: msg.TaskID})
        }
        return nil

    case DisableTask:
        if schedule, exists := t.schedules[msg.TaskID]; exists {
            err := t.Cron().DisableJob(schedule.ID)
            if err != nil {
                t.Send(from, TaskError{TaskID: msg.TaskID, Error: err.Error()})
                return nil
            }
            t.Send(from, TaskDisabled{TaskID: msg.TaskID})
        }
        return nil

    case GetTaskInfo:
        if schedule, exists := t.schedules[msg.TaskID]; exists {
            info, err := t.Cron().JobInfo(schedule.ID)
            if err != nil {
                t.Send(from, TaskError{TaskID: msg.TaskID, Error: err.Error()})
                return nil
            }
            t.Send(from, TaskInfo{
                TaskID:   msg.TaskID,
                JobID:    schedule.ID,
                Schedule: schedule.Schedule,
                Enabled:  info.Enabled,
                NextRun:  info.NextRun,
            })
        } else {
            t.Send(from, TaskError{TaskID: msg.TaskID, Error: "task not found"})
        }
        return nil
    }
    return nil
}

type ScheduleTask struct{ TaskID, Schedule string }
type UnscheduleTask struct{ TaskID string }
type EnableTask struct{ TaskID string }
type DisableTask struct{ TaskID string }
type GetTaskInfo struct{ TaskID string }

type TaskScheduled struct{ TaskID, JobID string }
type TaskUnscheduled struct{ TaskID string }
type TaskEnabled struct{ TaskID string }
type TaskDisabled struct{ TaskID string }
type TaskExecuted struct{ TaskID string; Count int; Timestamp time.Time }
type TaskInfo struct{ TaskID, JobID, Schedule string; Enabled bool; NextRun time.Time }
type ScheduleError struct{ TaskID, Error string }
type UnscheduleError struct{ TaskID, Error string }
type TaskError struct{ TaskID, Error string }

func factoryTaskScheduler() gen.ProcessBehavior {
    return &taskScheduler{}
}

Test Implementation:

func TestTaskScheduler_CronJobs(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryTaskScheduler)
    client := gen.PID{Node: "test", ID: 100}

    // Test basic job scheduling
    actor.SendMessage(client, ScheduleTask{
        TaskID:   "daily-backup",
        Schedule: "0 2 * * *", // Daily at 2 AM
    })

    // Verify cron job was added
    actor.ShouldAddCronJob().
        WithSchedule("0 2 * * *").
        Once().
        Assert()

    // Verify scheduling response
    actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
        if scheduled, ok := msg.(TaskScheduled); ok {
            return scheduled.TaskID == "daily-backup" && scheduled.JobID != ""
        }
        return false
    }).Once().Assert()

    // Test job execution by triggering it
    actor.TriggerCronJob("0 2 * * *") // Manually trigger the scheduled job

    // Verify job execution
    actor.ShouldExecuteCronJob().
        WithSchedule("0 2 * * *").
        Once().
        Assert()

    // Verify task execution message
    actor.ShouldSend().To("output").MessageMatching(func(msg any) bool {
        if executed, ok := msg.(TaskExecuted); ok {
            return executed.TaskID == "daily-backup" && executed.Count == 1
        }
        return false
    }).Once().Assert()
}

func TestTaskScheduler_MockTimeControl(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryTaskScheduler)
    client := gen.PID{Node: "test", ID: 100}

    // Set initial mock time
    baseTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
    actor.SetCronMockTime(baseTime)

    // Schedule a job for every minute
    actor.SendMessage(client, ScheduleTask{
        TaskID:   "minute-task",
        Schedule: "* * * * *", // Every minute
    })

    cronJob := actor.ShouldAddCronJob().Once().Capture()
    actor.ClearEvents()

    // Advance time by 1 minute - should trigger the job
    actor.SetCronMockTime(baseTime.Add(1 * time.Minute))

    // Verify job executed
    actor.ShouldExecuteCronJob().
        WithJobID(cronJob.ID).
        Once().
        Assert()

    // Advance time by another minute
    actor.SetCronMockTime(baseTime.Add(2 * time.Minute))

    // Should execute again
    actor.ShouldExecuteCronJob().
        WithJobID(cronJob.ID).
        Times(2). // Total of 2 executions
        Assert()
}

Cron Testing Methods

Job Lifecycle Assertions:

// Test that cron job was added
actor.ShouldAddCronJob().WithSchedule("0 2 * * *").Once().Assert()

// Test job execution
actor.ShouldExecuteCronJob().WithSchedule("0 * * * *").Times(3).Assert()

// Test job removal
actor.ShouldRemoveCronJob().WithJobID("job-123").Once().Assert()

// Test job enable/disable
actor.ShouldEnableCronJob().WithJobID("job-123").Once().Assert()
actor.ShouldDisableCronJob().WithJobID("job-123").Once().Assert()

// Negative assertions
actor.ShouldNotAddCronJob().Assert()
actor.ShouldNotExecuteCronJob().Assert()

Mock Time Control:

// Set mock time for deterministic testing
baseTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
actor.SetCronMockTime(baseTime)

// Advance time to trigger scheduled jobs
actor.SetCronMockTime(baseTime.Add(1 * time.Hour))

// Manually trigger cron jobs for testing
actor.TriggerCronJob("0 * * * *") // Trigger hourly job
actor.TriggerCronJob("job-id-123") // Trigger by job ID

Advanced Cron Patterns:

// Capture cron job for detailed analysis
cronJob := actor.ShouldAddCronJob().Once().Capture()
jobID := cronJob.ID
schedule := cronJob.Schedule

// Test multiple job executions with time control
for i := 0; i < 5; i++ {
    actor.SetCronMockTime(baseTime.Add(time.Duration(i) * time.Minute))
    actor.TriggerCronJob("* * * * *") // Every minute
}
actor.ShouldExecuteCronJob().Times(5).Assert()

Built-in Assertions

The library includes a comprehensive set of zero-dependency assertion functions that cover common testing scenarios without requiring external testing frameworks:

func TestBuiltInAssertions(t *testing.T) {
    // Equality assertions
    unit.Equal(t, "expected", "expected")
    unit.NotEqual(t, "different", "value")

    // Boolean assertions  
    unit.True(t, true)
    unit.False(t, false)

    // Nil assertions
    unit.Nil(t, nil)
    unit.NotNil(t, "not nil")

    // String assertions
    unit.Contains(t, "hello world", "world")
    
    // Type assertions
    unit.IsType(t, "", "string value")
}

Why Built-in Assertions:

  • Zero Dependencies: Avoid version conflicts and complex dependency management

  • Consistent Interface: All assertions follow the same pattern and error reporting

  • Testing Framework Agnostic: Works with any Go testing approach

  • Actor-Specific: Designed specifically for the needs of actor testing

Advanced Features

Dynamic Value Capture - Testing Generated Content

Real-world actors frequently generate dynamic values like timestamps, UUIDs, session IDs, or auto-incrementing counters. Traditional testing approaches struggle with these values because they're unpredictable. The library provides sophisticated capture mechanisms to handle these scenarios elegantly.

The Challenge of Dynamic Values:

  • Timestamps: Created at runtime, impossible to predict exact values

  • UUIDs: Randomly generated, different in every test run

  • Auto-incrementing IDs: Dependent on execution order and system state

  • Process IDs: Assigned by the actor system, not controllable in tests

The Solution - Value Capture:

func TestDynamicValues(t *testing.T) {
    actor, _ := unit.Spawn(t, factorySessionManager)

    // Send request that will generate dynamic session ID
    actor.SendMessage(gen.PID{}, CreateSession{UserID: "user123"})

    // Capture the spawn to get the dynamic session PID
    spawnResult := actor.ShouldSpawn().Once().Capture()
    sessionPID := spawnResult.PID

    // Use captured PID in subsequent assertions
    actor.ShouldSend().MessageMatching(func(msg any) bool {
        if created, ok := msg.(SessionCreated); ok {
            return created.SessionPID == sessionPID && created.UserID == "user123"
        }
        return false
    }).Once().Assert()
}

Capture Strategies:

  • Immediate Capture: Capture values as soon as they're generated

  • Pattern Matching: Use validation functions to identify and validate dynamic content

  • Structured Matching: Validate message structure while ignoring specific dynamic fields

  • Cross-Reference Testing: Use captured values in multiple assertions to ensure consistency

Event Inspection - Deep System Analysis

For complex testing scenarios or debugging difficult issues, the library provides direct access to the complete event timeline. This allows you to perform sophisticated analysis of actor behavior beyond what's possible with standard assertions.

Events() - Complete Event History

Access all captured events for detailed analysis:

func TestEventInspection(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryComplexActor)

    // Perform operations
    actor.SendMessage(gen.PID{}, ComplexOperation{})

    // Get all events for inspection
    events := actor.Events()
    
    var sendCount, spawnCount, logCount, remoteSpawnCount int
    for _, event := range events {
        switch event.(type) {
        case unit.SendEvent:
            sendCount++
        case unit.SpawnEvent:
            spawnCount++
        case unit.LogEvent:
            logCount++
        case unit.RemoteSpawnEvent:
            remoteSpawnCount++
        }
    }

    unit.True(t, sendCount > 0, "Should have send events")
    unit.True(t, spawnCount == 2, "Should spawn exactly 2 processes")
    unit.True(t, logCount >= 1, "Should have log events")
}

LastEvent() - Most Recent Operation

Get the most recently captured event:

func TestLastEvent(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryExampleActor)

    actor.SendMessage(gen.PID{}, "test")
    
    // Get the most recent event
    lastEvent := actor.LastEvent()
    unit.NotNil(t, lastEvent, "Should have a last event")
    unit.Equal(t, "send", lastEvent.Type())
    
    if sendEvent, ok := lastEvent.(unit.SendEvent); ok {
        unit.Equal(t, "test", sendEvent.Message)
    }
}

ClearEvents() - Reset Event History

Clear all captured events, useful for isolating test phases:

func TestClearEvents(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryExampleActor)

    // Perform some operations
    actor.SendMessage(gen.PID{}, "setup")
    actor.ShouldSend().Once().Assert()

    // Clear events before main test
    actor.ClearEvents()

    // Now test the main functionality
    actor.SendMessage(gen.PID{}, "main_operation")
    
    // Only the main operation events are captured
    events := actor.Events()
    unit.Equal(t, 1, len(events), "Should only have main operation event")
}

Event Inspection Use Cases:

  • Performance Analysis: Count operations to identify performance bottlenecks

  • Workflow Validation: Ensure complex multi-step processes execute in the correct order

  • Error Investigation: Analyze the complete event sequence leading to failures

  • Integration Testing: Verify that multiple actors interact correctly in complex scenarios

Timeout Support - Assertion Timing Control

The library provides timeout support for assertions that might need time-based validation:

import (
    "testing"
    "time"
    "ergo.services/ergo/testing/unit"
)

func TestWithTimeout(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryExampleActor)

    // Test that assertion completes within timeout
    success := unit.WithTimeout(func() {
        actor.SendMessage(gen.PID{}, "test")
        actor.ShouldSend().Once().Assert()
    }, 5*time.Second)

    unit.True(t, success(), "Assertion should complete within timeout")
}

Timeout Function Usage:

  • Assertion Wrapping: Wrap assertion functions to add timeout behavior

  • Integration Testing: Useful when testing with external systems that might have delays

  • Performance Validation: Ensure assertions complete within expected time limits

Testing Patterns and Best Practices

Test Organization Strategies

Single Responsibility Testing: Each test should focus on one specific behavior or scenario. This makes tests easier to understand, debug, and maintain.

// Good: Tests one specific behavior
func TestUserManager_CreateUser_Success(t *testing.T) { ... }
func TestUserManager_CreateUser_DuplicateEmail(t *testing.T) { ... }
func TestUserManager_CreateUser_InvalidData(t *testing.T) { ... }

// Poor: Tests multiple behaviors in one test
func TestUserManager_AllOperations(t *testing.T) { ... }

State Isolation: Each test should start with a clean state and not depend on other tests. Use actor.ClearEvents() when needed to reset event history between test phases.

Error Path Testing: Don't just test the happy path. Actor systems need robust error handling, so test failure scenarios thoroughly:

func TestWorkerSupervisor_MaxWorkersReached(t *testing.T) {
    // Test that supervisor properly rejects requests when at capacity
    // Test that appropriate error messages are sent
    // Test that the supervisor remains functional after rejecting requests
}

Message Design for Testability

Structured Messages: Design your messages to be easily testable by using structured types rather than primitive values:

// Good: Easy to test with pattern matching
type UserCreated struct {
    UserID   string
    Email    string
    Created  time.Time
}

// Poor: Hard to validate in tests
type GenericMessage struct {
    Type string
    Data map[string]interface{}
}

Predictable vs Dynamic Content: Separate predictable content from dynamic content in your messages to make testing easier:

type OrderProcessed struct {
    OrderID   string    // Predictable - can be set in test
    Total     float64   // Predictable - can be set in test
    ProcessedAt time.Time // Dynamic - use pattern matching
    RequestID string     // Dynamic - capture and validate
}

Performance Testing Considerations

Event Overhead: While event capture is lightweight, be aware that every operation creates events. For performance-critical tests, you can:

  • Clear events periodically with ClearEvents()

  • Focus assertions on specific time windows

  • Use event inspection to identify performance bottlenecks

Scaling Testing: Test how your actors behave under load by simulating multiple concurrent operations:

import (
    "fmt"
    "testing"
    "ergo.services/ergo/testing/unit"
)

func TestWorkerPool_ConcurrentRequests(t *testing.T) {
    actor, _ := unit.Spawn(t, factoryWorkerPool)
    
    // Send multiple requests concurrently
    for i := 0; i < 100; i++ {
        actor.SendMessage(gen.PID{}, ProcessRequest{ID: fmt.Sprintf("req-%d", i)})
    }
    
    // Verify all requests were processed
    actor.ShouldSend().To("output").Times(100).Assert()
}

// Note: This example assumes you have defined:
// - type ProcessRequest struct{ ID string }
// - factoryWorkerPool() gen.ProcessBehavior function

Best Practices

  1. Use descriptive test names that clearly indicate what behavior is being tested

  2. Test all message types your actor handles, including edge cases

  3. Capture dynamic values early using the Capture() method for generated IDs

  4. Test error conditions not just the happy path

  5. Use pattern matching for complex message validation

  6. Clear events between test phases when needed with ClearEvents()

  7. Configure appropriate log levels for debugging vs production testing

  8. Test temporal behaviors with timeout mechanisms

  9. Validate distributed scenarios using network simulation

  10. Organize tests by behavior rather than by implementation details

This testing library provides comprehensive coverage for all Ergo Framework actor patterns while maintaining zero external dependencies and excellent readability. By following these patterns and practices, you can build robust, well-tested actor systems that behave correctly in both simple and complex scenarios.

Complete Examples and Use Cases

The library includes comprehensive test examples organized into feature-specific files that demonstrate all capabilities through real-world scenarios:

Feature-Based Test Files

basic_test.go - Fundamental Actor Testing

  • Basic actor functionality and message handling

  • Dynamic value capture and validation

  • Built-in assertions and event tracking

  • Core testing patterns and best practices

network_test.go - Distributed System Testing

  • Remote node simulation and connectivity

  • Network configuration and route management

  • Remote spawn operations and event capture

  • Multi-node interaction patterns

workflow_test.go - Complex Business Logic

  • Multi-step order processing workflows

  • State machine validation and transitions

  • Business process orchestration

  • Error handling and recovery scenarios

call_test.go - Synchronous Communication

  • Call operations and response handling

  • Async call patterns and timeouts

  • Send/response communication flows

  • Concurrent request management

cron_test.go - Scheduled Operations

  • Cron job lifecycle management

  • Mock time control and schedule validation

  • Job execution tracking and assertions

  • Time-dependent behavior testing

termination_test.go - Actor Lifecycle Management

  • Actor termination handling and cleanup

  • Exit signal testing (SendExit/SendExitMeta)

  • Normal vs abnormal termination scenarios

  • Resource cleanup validation

Comprehensive Test Examples

  1. Complex State Machine Testing (workflow_test.go)

    • Multi-step order processing workflow

    • Validation, payment, and fulfillment pipeline

    • State transition validation and error handling

  2. Process Management (basic_test.go)

    • Dynamic worker spawning and management

    • Resource capacity limits and monitoring

    • Worker lifecycle (start, stop, restart)

  3. Advanced Pattern Matching (basic_test.go)

    • Structure matching with partial validation

    • Dynamic value handling and field validation

    • Complex conditional message matching

  4. Remote Spawn Testing (network_test.go)

    • Remote spawn operations on multiple nodes

    • Round-robin distribution testing

    • Error handling for unavailable nodes

    • Event inspection and workflow validation

  5. Cron Job Management (cron_test.go)

    • Job scheduling and execution validation

    • Mock time control for deterministic testing

    • Schedule expression testing and validation

  6. Actor Termination (termination_test.go)

    • Normal and abnormal termination scenarios

    • Exit signal testing and process cleanup

    • Termination reason validation

    • Post-termination behavior verification

  7. Concurrent Operations (call_test.go)

    • Multi-client concurrent request handling

    • Resource contention and capacity management

    • Load testing and performance validation

  8. Environment & Configuration (basic_test.go)

    • Environment variable management

    • Runtime configuration changes

    • Feature flag and conditional behavior testing

Getting Started with Examples

// Import the testing library
import "ergo.services/ergo/testing/unit"

// Run all tests
go test -v ergo.services/ergo/testing/unit

// Run feature-specific tests
go test -v -run TestBasic ergo.services/ergo/testing/unit
go test -v -run TestNetwork ergo.services/ergo/testing/unit
go test -v -run TestWorkflow ergo.services/ergo/testing/unit
go test -v -run TestCall ergo.services/ergo/testing/unit
go test -v -run TestCron ergo.services/ergo/testing/unit
go test -v -run TestTermination ergo.services/ergo/testing/unit

Learning Path

  1. Start with Basic Examples: basic_test.go - Core functionality and patterns

  2. Explore Message Testing: basic_test.go - Message flow and assertions

  3. Learn Process Management: basic_test.go - Spawn operations and lifecycle

  4. Master Synchronous Communication: call_test.go - Calls and responses

  5. Study Complex Workflows: workflow_test.go - Business logic testing

  6. Practice Network Testing: network_test.go - Distributed operations

  7. Explore Scheduling: cron_test.go - Time-based operations

  8. Understand Termination: termination_test.go - Lifecycle completion

Each test file provides complete, working implementations of specific actor patterns and demonstrates best practices for testing each scenario. All tests include comprehensive comments explaining the testing strategy and validation approach.

Configuration and Environment Testing

Real actors often behave differently based on configuration. Let's test this:

func TestDatabaseActor_ConfigurationBehavior(t *testing.T) {
    // Test with different configurations
    
    // Development configuration
    devActor, _ := unit.Spawn(t, newDatabaseActor, 
        unit.WithEnv(map[gen.Env]any{
            "DB_POOL_SIZE": 5,
            "LOG_QUERIES":  true,
        }))
    
    devActor.SendMessage(gen.PID{}, ExecuteQuery{SQL: "SELECT * FROM users"})
    devActor.ShouldLog().Level(gen.LogLevelDebug).Containing("SELECT * FROM users").Assert()
    
    // Production configuration  
    prodActor, _ := unit.Spawn(t, newDatabaseActor,
        unit.WithEnv(map[gen.Env]any{
            "DB_POOL_SIZE": 50,
            "LOG_QUERIES":  false,
        }))
    
    prodActor.SendMessage(gen.PID{}, ExecuteQuery{SQL: "SELECT * FROM users"})
    prodActor.ShouldLog().Level(gen.LogLevelDebug).Times(0).Assert() // No query logging in prod
}

Complex Message Patterns

As your actors become more sophisticated, your message testing needs to handle more complex scenarios:

Testing Message Sequences

func TestOrderProcessor_WorkflowSteps(t *testing.T) {
    actor, _ := unit.Spawn(t, newOrderProcessor)
    client := gen.PID{Node: "client", ID: 1}
    
    // Start an order
    actor.SendMessage(client, CreateOrder{Items: []string{"book", "pen"}})
    
    // Should trigger a sequence of operations
    actor.ShouldSend().To("inventory").Message("check_availability").Once().Assert()
    actor.ShouldSend().To("payment").Message("calculate_total").Once().Assert()
    actor.ShouldSend().To("shipping").Message("estimate_delivery").Once().Assert()
    
    // Should send status back to client
    actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
        if status, ok := msg.(OrderStatus); ok {
            return status.Status == "processing"
        }
        return false
    }).Once().Assert()
}

Testing Conditional Logic

func TestSecurityGate_AccessControl(t *testing.T) {
    actor, _ := unit.Spawn(t, newSecurityGate)
    
    // Test admin access
    admin := gen.PID{Node: "admin", ID: 1}
    actor.SendMessage(admin, AccessRequest{Resource: "admin_panel", User: "admin"})
    actor.ShouldSend().To(admin).Message(AccessGranted{}).Once().Assert()
    
    // Test regular user access to admin panel
    user := gen.PID{Node: "user", ID: 2}
    actor.SendMessage(user, AccessRequest{Resource: "admin_panel", User: "regular_user"})
    actor.ShouldSend().To(user).Message(AccessDenied{Reason: "insufficient privileges"}).Once().Assert()
    
    // Test regular user access to public resources
    actor.SendMessage(user, AccessRequest{Resource: "public_content", User: "regular_user"})
    actor.ShouldSend().To(user).Message(AccessGranted{}).Once().Assert()
}

Basic Process Spawning

Many actors need to create child processes. Here's how to test this:

func TestTaskManager_WorkerCreation(t *testing.T) {
    actor, _ := unit.Spawn(t, newTaskManager)
    client := gen.PID{Node: "client", ID: 1}
    
    // Request a new worker
    actor.SendMessage(client, CreateWorker{TaskType: "data_processing"})
    
    // Should spawn a worker process
    actor.ShouldSpawn().Once().Assert()
    
    // Should confirm to client
    actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
        if response, ok := msg.(WorkerCreated); ok {
            return response.TaskType == "data_processing"
        }
        return false
    }).Once().Assert()
}

Capturing Dynamic Process IDs

When actors spawn processes, you often need to use the generated PID in subsequent tests:

func TestSessionManager_UserSessions(t *testing.T) {
    actor, _ := unit.Spawn(t, newSessionManager)
    client := gen.PID{Node: "client", ID: 1}
    
    // Create a session for a user
    actor.SendMessage(client, CreateSession{UserID: "alice"})
    
    // Capture the spawned session process
    sessionSpawn := actor.ShouldSpawn().Once().Capture()
    sessionPID := sessionSpawn.PID
    
    // Verify session was registered
    actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
        if response, ok := msg.(SessionCreated); ok {
            return response.UserID == "alice" && response.SessionPID == sessionPID
        }
        return false
    }).Once().Assert()
    
    // Send work to the session
    actor.SendMessage(client, SendToSession{UserID: "alice", Data: "important_data"})
    
    // Should route to the captured session PID
    actor.ShouldSend().To(sessionPID).Message("important_data").Once().Assert()
}

Event Inspection for Debugging

When tests fail, you need to understand what actually happened:

func TestComplexActor_DebugFailures(t *testing.T) {
    actor, _ := unit.Spawn(t, newComplexActor)
    
    // Perform some operations
    actor.SendMessage(gen.PID{}, TriggerComplexWorkflow{})
    
    // If something goes wrong, inspect all events
    events := actor.Events()
    t.Logf("Total events captured: %d", len(events))
    
    for i, event := range events {
        t.Logf("Event %d: %s - %s", i, event.Type(), event.String())
    }
    
    // Clear events and test specific behavior
    actor.ClearEvents()
    actor.SendMessage(gen.PID{}, SimpleBehavior{})
    
    // Now only simple behavior events are captured
    simpleEvents := actor.Events()
    unit.Equal(t, 1, len(simpleEvents), "Should only have one event after clearing")
}

Common Pitfalls

  1. Event Accumulation: Events accumulate across multiple operations. Use ClearEvents() to reset between test phases.

  2. Timing Issues: Some assertions may need time to complete. Use appropriate timeouts and consider async patterns.

  3. Message Ordering: In high-throughput scenarios, message ordering might not be guaranteed. Test for this explicitly.

  4. State Leakage: Each test should start with clean state. Don't rely on previous test state.

Conclusion

The Ergo Framework unit testing library provides comprehensive tools for testing actor-based systems. From simple message exchanges to complex distributed workflows, you can validate every aspect of your actor behavior with confidence.

Key Takeaways:

  • Start Simple: Begin with basic message testing and gradually add complexity

  • Test Comprehensively: Cover happy paths, error conditions, and edge cases

  • Use Fluent Assertions: Take advantage of the readable assertion API

  • Inspect Events: Use event inspection for debugging and understanding actor behavior

  • Organize Tests: Structure tests by behavior and keep them focused

  • Handle Async Patterns: Use appropriate timeouts and pattern matching for async operations

The library's zero-dependency design, comprehensive feature set, and integration with Go's testing framework make it the ideal choice for building robust, well-tested actor systems with the Ergo Framework.

Next Steps:

  1. Explore the complete test examples in the framework repository

  2. Start with simple actors and gradually build complexity

  3. Integrate testing into your development workflow

  4. Use the debugging features when tests fail

  5. Share testing patterns with your team

Happy testing!

Actor Model

The Actor Model and Its Properties

Intro

The actor model was developed in the 1970s for concurrent and parallel computations. It defines some key rules for how system components should behave and interact with each other. The main idea is that interactions between program components are not conducted through function or procedure calls but through the exchange of asynchronous messages. You can read more about the actor model and its history in the Wikipedia article.

One of the most popular programming languages that uses this model is Erlang, where the actor model is at the core of its BEAM virtual machine. In Erlang, actors are represented by lightweight processes within the virtual machine.

There are also implementations of this model in other languages. For example, in Java, the Akka framework provides a powerful tool for building solutions based on the actor model. Akka brings the principles of the actor model to the Java ecosystem, enabling developers to create highly concurrent, distributed, and fault-tolerant applications.

Actor in Ergo Framework

The Ergo Framework implements the actor model based on lightweight processes. Each process can send and receive asynchronous messages, make synchronous calls, and spawn new processes.

A process in Ergo Framework is a lightweight entity running on top of a goroutine, built around the actor model. Each process has a mailbox for incoming messages. By default, this mailbox is of unlimited size, but it can be limited by setting an appropriate parameter when the process is started. The mailbox contains four queues: Main, System, Urgent, and Log. These queues determine the priority of message processing.

An actor in Ergo Framework is an abstraction over such a process, with a set of callbacks for handling incoming messages. The standard library includes general-purpose actors like act.Actor, act.Supervisor, and act.Pool, as well as a specialized actor for handling HTTP requests, act.WebHandler.

Mailbox processing in an actor is done sequentially in a FIFO order, but with respect to priority. Messages from the Urgent queue are processed first, followed by those in the System queue. If these queues are empty, messages from the Main queue are processed. Typically, messages from the node (e.g., a request to stop the process) are placed in the Urgent and System queues. All other messages are delivered to the Main queue by default. Messages in the Log queue are processed with the lowest priority.

Colored

This package implements the gen.LoggerBehavior interface and provides the ability to output log messages to standard output with color highlighting.

Below is a demonstration of log messages (from nodes, processes, and meta-processes) with different logging levels:

Format

<time> <level> <log source> [process name] [process behavior]: <log message>

When logging, the package also highlights in color the types gen.Atom, gen.PID, gen.ProcessID, gen.Ref, gen.Alias, gen.Event.

Available options

Sets the format for the timestamp of log messages. You can use any existing format (see time package) or define your own. By default, the time is displayed in nanoseconds

  • ShortLevelName Displays the shortened name of the log level

  • IncludeBehavior Includes the name of the process behavior in the log message

  • IncludeName includes the registered name of the process in the log message

Example

package main

import (
	"ergo.services/ergo"
	"ergo.services/ergo/gen"

	"ergo.services/logger/colored"
)

func main() {
	logger := gen.Logger{
		Name:   "colored",
		Logger: colored.CreateLogger(colored.Options{}),
	}

	nopt := gen.NodeOptions{}
	nopt.Log.Loggers = []gen.Logger{logger}
	
	// disable default logger to get rid of duplicating log-messages
	nopt.Log.DefaultLogger.Disable = true

	node, err := ergo.StartNode("demo@localhost", nopt)
	if err != nil {
		panic(err)
	}
	node.Log().Warning("Hello World!!!")
	node.Wait()
}

This package is not intended for use with intensive logging and may impact node performance. For log messages with the level gen.LogLevelTrace, color highlighting is applied only to the timestamp and the source of the log message; color highlighting is turned off for the body of the log message

Boilerplate Code Generation

The ergo tool allows you to generate the structure and source code for a project based on the Ergo Framework. To install it, use the following command:

go install ergo.services/tools/ergo@latest

Alternatively, you can build it from the source code available at https://github.com/ergo-services/tools.

When using ergo tool, you need to follow the specific template for providing arguments:

Parent:Actor{param1:value1,param2:value2...}

  • Parent can be a supervisor (specified earlier with -with-sup) or an application (specified earlier with -with-app).

  • Actor can be an actor (added earlier with -with-actor) or a supervisor (specified earlier with -with-sup).

This structured approach ensures the proper hierarchy and parameters are defined for your actors and supervisors

Available Arguments and Parameters :

  • -init <node_name>: a required argument that sets the name of the node for your service. Available parameters:

    • tls: enables encryption for network connections (a self-signed certificate will be used).

    • module: allows you to specify the module name for the go.mod file.

  • -path <path>: specifies the path for the code of the generated project.

  • -with-actor <name>: adds an actor (based on act.Actor).

  • -with-app <name>: adds an application. Available parameters:

    • mode: specifies the application's start mode (temp - Temporary, perm - Permanent, trans - Transient). The default mode is trans. Example: -with-app MyApp{mode:perm}

  • -with-sup <name>: adds a supervisor (based on act.Supervisor). Available parameters:

    • type: specifies the type of supervisor (ofo - One For One, sofo - Simple One For One, afo - All For One, rfo - Rest For One). The default type is ofo.

    • strategy: specifies the restart strategy for the supervisor (temp - Temporary, perm - Permanent, trans - Transient). The default strategy is trans.

  • -with-pool <name>: adds a process pool actor (based on act.Pool). Available parameters:

    • size: Specifies the number of worker processes in the pool. By default, 3 processes are started.

  • -with-web <name>: adds a Web server (based on act.Pool and act.WebHandler). Available parameters:

    • host: specifies the hostname for the Web server.

    • port: specifies the port number for the Web server. The default is 9090.

    • tls: enables encryption for the Web server using the node's CertManager.

  • -with-tcp <name>: adds a TCP server actor (based on act.Actor and meta.TCP meta-process). Available parameters:

    • host: specifies the hostname for the TCP server.

    • port: specifies the port number for the TCP server. The default is 7654.

    • tls: enables encryption for the TCP server using the node's CertManager.

  • -with-udp <name>: adds a UDP server actor (based on act.Pool , meta.UDPServer and act.Actor as worker processes). Available parameters:

    • host: specifies the hostname for the UDP server.

    • port: specifies the port number for the UDP server. The default is 7654.

  • -with-msg <name>: adds a message type for network interactions.

  • -with-logger <name>: adds a logger from the extended library. Available loggers: colored, rotate

  • -with-observer: adds the Observer application.

Example

For clarity, let's use all available arguments for ergo in the following example:

$ ergo -path /tmp/project \
      -init demo{tls} \
      -with-app MyApp \
      -with-actor MyApp:MyActorInApp \
      -with-sup MyApp:MySup \
      -with-actor MySup:MyActorInSup \
      -with-tcp "MySup:MyTCP{port:12345,tls}" \
      -with-udp MySup:MyUDP{port:54321} \
      -with-pool MySup:MyPool{size:4} \
      -with-web "MyWeb{port:8888,tls}" \
      -with-msg MyMsg1 \
      -with-msg MyMsg2 \
      -with-logger colored \
      -with-logger rotate \
      -with-observer
      
Generating project "/tmp/project/demo"...
   generating "/tmp/project/demo/apps/myapp/myactorinapp.go"
   generating "/tmp/project/demo/apps/myapp/myactorinsup.go"
   generating "/tmp/project/demo/cmd/myweb.go"
   generating "/tmp/project/demo/cmd/myweb_worker.go"
   generating "/tmp/project/demo/apps/myapp/mytcp.go"
   generating "/tmp/project/demo/apps/myapp/myudp.go"
   generating "/tmp/project/demo/apps/myapp/myudp_worker.go"
   generating "/tmp/project/demo/apps/myapp/mypool.go"
   generating "/tmp/project/demo/apps/myapp/mypool_worker.go"
   generating "/tmp/project/demo/apps/myapp/mysup.go"
   generating "/tmp/project/demo/apps/myapp/myapp.go"
   generating "/tmp/project/demo/types.go"
   generating "/tmp/project/demo/cmd/demo.go"
   generating "/tmp/project/demo/README.md"
   generating "/tmp/project/demo/go.mod"
   generating "/tmp/project/demo/go.sum"

Successfully completed.

Pay attention to the values of the -with-tcp and -with-web arguments — they are enclosed in double quotes. If an argument has multiple parameters, they are separated by commas without spaces. However, since commas are argument delimiters for the shell interpreter, we enclose the entire value of the argument in double quotes to ensure the shell correctly processes the parameters.

In our example, we specified two loggers: colored and rotate. This allows for colored log messages in the standard output as well as logging to files with log rotation functionality. In this case, the default logger is disabled to prevent duplicate log messages from appearing on the standard output.

Additionally, we included the observer application. By default, this interface is accessible at http://localhost:9911.

As a result of the generation process, we have a well-structured project source code that is ready for execution:

 demo
├── apps
│  └── myapp
│     ├── myactorinapp.go
│     ├── myactorinsup.go
│     ├── myapp.go
│     ├── mypool.go
│     ├── mypool_worker.go
│     ├── mysup.go
│     ├── mytcp.go
│     ├── myudp.go
│     └── myudp_worker.go
├── cmd
│  ├── demo.go
│  ├── myweb.go
│  └── myweb_worker.go
├── go.mod
├── go.sum
├── README.md
└── types.go

The generated code is ready for compilation and execution:

Since this example includes the observer application, you can open http://localhost:9911 in your browser to access the web interface for inspecting the node and its running processes.