Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Ergo Framework - is an implementation of ideas, technologies, and design patterns from the Erlang world in the Go programming language. It is built on the actor model, network transparency, and a set of ready-to-use components for development. This makes it significantly easier to create complex and distributed solutions while maintaining a high level of reliability and performance.
The management of starting and stopping processes (actors), as well as routing messages between them, is handled by the node. The node is also responsible for network communication. If a message is sent to a remote process, the node automatically establishes a network connection with the remote node where the process is running, thus ensuring network transparency.
Actors in Ergo Framework are lightweight processes that live on top of goroutines. Interaction between processes occurs through message passing. Each process has a mailbox with multiple queues to prioritize message handling. Actors can exchange asynchronous messages and also make synchronous requests to each other.
Each node has a built-in Service Discovery function. Typically, the first node launched on a host becomes the registrar for other nodes running on the same host. Upon registration, each node reports its name, the port number for incoming connections, and a set of additional parameters.
Network transparency in Ergo Framework is achieved through the ENP (Ergo Network Protocol) and EDF (Ergo Data Format). These are based on ideas from Erlang's network stack but are not compatible with it. Therefore, to communicate with Erlang nodes, an additional Erlang package must be used.
To maximize performance in network message exchange, Ergo Framework uses a pool of multiple TCP connections combined into a single logical network connection.
Ergo Framework demonstrates high performance in local message exchange due to the use of lock-free queues within the process mailbox and the efficient use of goroutines to handle the process itself. A goroutine is only activated when the process receives a message; otherwise, the process remains idle and does not consume CPU resources.
You can evaluate the performance yourself using the provided benchmarks https://github.com/ergo-services/benchmarks.
In the development of Ergo Framework, we adhere to the concept of zero dependencies. All functionality is implemented using only the standard Go library. The only requirement for using Ergo Framework is the version of the Go language. Starting from version 3.0, Ergo Framework depends on Go 1.20 or higher.
process control
To create fault-tolerant applications, Ergo Framework introduces a process structuring model. The core idea of this model is to divide processes into two types:
worker
supervisor
Worker processes perform computations, while supervisors are responsible solely for managing worker processes.
In Ergo Framework, worker processes are actors based on . Supervisors, on the other hand, are actors based on . The role of act.Supervisor
is to start child processes and restart them according to the chosen restart strategy. Several are available for this purpose. A child process can be not only an actor based on act.Actor
but also a supervisor based on act.Supervisor
. This allows you to form a hierarchical structure for managing processes. Thanks to this approach, your solution becomes more reliable and fault-tolerant.
Ergo Framework introduces gen.CertManager
to simplify TLS certificate management. It enables live certificate updates without restarting the application and can be used across all network components like node acceptors, web, or TCP servers.
Use the function gen.CreateCertManager(cert tls.Certificate)
to create a certificate manager. The gen.CertManager
interface provides the following methods:
Update: Update the TLS certificate on the fly. All components using this CertManager
will automatically receive the updated certificate.
GetCertificateFunc: Returns a closure to retrieve the TLS certificate.
GetCertificate: Returns the current TLS certificate.
Additionally, you can use the lib.GenerateSelfSignedCert
function to generate a self-signed certificate.
import (
...
"ergo.services/ergo"
"ergo.services/ergo/gen"
"ergo.services/ergo/lib"
...
)
var (
Version = gen.Version{
Name: "DemoService",
Release: "1.0",
}
)
func main() {
var options gen.NodeOptions
...
cert, err := lib.GenerateSelfSignedCert(Version.String())
if err != nil {
panic(err)
}
options.CertManager = gen.CreateCertManager(cert)
...
node, err := ergo.StartNode("node@localhost", options)
if err != nil {
panic(err)
}
node.Wait()
}
The additional application library for Ergo Framework contains packages with a narrow specialization or external dependencies since Ergo Framework adheres to the "zero dependencies" principle.
You can find the source code of these applications in the application library repository at https://github.com/ergo-services/application.
An extra library of logger implementations that are not included in the standard Ergo Framework library. This library contains packages with a narrow specialization. It also includes packages that have external dependencies, as Ergo Framework adheres to a "zero dependency" policy
An extra library of meta-process implementations not included in the standard Ergo Framework library. This library contains packages with a narrow specialization. It also includes packages with external dependencies, as Ergo Framework adheres to a "zero dependency" policy.
This meta-process allows you to launch a UDP server and handle incoming UDP packets as asynchronous messages of type meta.MessageUDP
.
To create the meta-process, use the meta.CreateUDPServer
function. This function takes meta.UDPServerOptions
as an argument, which specifies the configuration options for the UDP server, such as the host, port, buffer size, and other relevant settings for managing UDP traffic:
type UDPServerOptions struct {
Host string
Port uint16
Process gen.Atom
BufferSize int
BufferPool *sync.Pool
}
Host, Port: Set the port number and host name for the UDP server.
Process: The name of the process that will handle meta.MessageUDP
messages. If not specified, all messages will be sent to the parent process. You can also use an act.Pool
process for distributing message handling.
BufferSize: Sets the buffer size created for incoming UDP messages. For each message, a new buffer of this size is allocated. This field is ignored if BufferPool
is provided.
BufferPool: Defines a pool for message buffers. The Get
method should return an allocated buffer of type []byte
.
If the meta-process is successfully created, the UDP server will start, and the function meta.CreateUDPServer
will return gen.MetaBehavior
.
Next, the created meta-process must be started. It will handle incoming UDP packets and forward them to the process for handling. To start the meta-process, use the SpawnMeta
method of the gen.Process
interface.
If an error occurs when starting the meta-process, the UDP server created by the meta.CreateUDPServer
function must be stopped using the Terminate
method of the gen.MetaBehavior
interface.
Stopping the meta-process results in the shutdown of the UDP server and the closing of the socket.
To send a UDP packet, use the Send
(or SendAlias
) method from the gen.Process
interface. The recipient should be the identifier gen.Alias
of the UDP server meta-process. The message should be of type meta.MessageUDP
. The meta.MessageUDP.ID
field is ignored when sending (it is used only for incoming messages).
If a BufferPool
was used in the UDP server options, the buffer in meta.MessageUDP.Data
will be returned to the pool after the packet is sent.
For an example UDP server implementation, see the project demo
at https://github.com/ergo-services/examples.
An extra library of registrars or client implementations not included in the standard Ergo Framework library. This library contains packages with a narrow specialization. It also includes packages with external dependencies, as Ergo Framework follows a "zero dependency" policy.
A client library for the central registrar. Provides service discovery, configuration management, and real-time cluster event notifications through a centralized registrar service.
Features:
Centralized service discovery
Real-time event notifications
Configuration management
TLS security support
Token-based authentication
A client library for , a distributed key-value store. Provides decentralized service discovery, hierarchical configuration management with type conversion, and automatic lease management.
Features:
Distributed service discovery
Hierarchical configuration with type conversion from strings ("int:123"
, "float:3.14"
)
Automatic lease management and cleanup
Real-time cluster change notifications
TLS/authentication support
Four-level configuration priority system
Choose Saturn for centralized management with a dedicated registrar service, or etcd for a distributed approach with built-in consensus and reliability guarantees.
The Application Observer provides a convenient web interface to view node status, network activity, and running processes in the node built with Ergo Framework. Additionally, it allows you to inspect the internal state of processes or meta-processes. The application is can also be used as a standalone tool Observer. For more details, see the section . You can add the Observer application to your node during startup by including it in the node's startup options:
The function observer.CreateApp
takes observer.Options
as an argument, allowing you to configure the Observer application. You can set:
Port: The port number for the web server (default: 9911
if not specified).
Host: The interface name (default: localhost
).
LogLevel: The logging level for the Observer application (useful for debugging). The default is gen.LogLevelInfo
The Ergo Framework allows nodes to run with various network stacks. You can replace the default network stack or add it as an additional stack. For more information, refer to the section.
This library contains implementations of network stacks that are not part of the standard Ergo Framework library.
The network stack in Ergo Framework includes the capability to spawn processes on remote nodes. Nodes can control access to this feature or disable it entirely by using the EnableRemoteSpawn
flag in gen.NetworkFlags
. By default, this feature is enabled.
To manage the ability of remote nodes to spawn processes, the gen.Network
interface provides two methods:
The name
argument specifies the name under which the process factory will be registered in the network stack. Remote nodes must use this name when requesting to spawn a process.
Using EnableSpawn
with the nodes
argument creates an access list, allowing only the specified remote nodes to spawn processes with the registered factory. Conversely, using DisableSpawn
with the nodes
argument removes the specified nodes from the access list. If the list becomes empty, access is open to all nodes. To fully disable access to the process factory, use DisableSpawn
without the nodes
argument.
To spawn a process on a remote node, you need to use the gen.RemoteNode
interface, which can be obtained using the GetNode
or Node
methods of the gen.Network
interface. The gen.RemoteNode
interface provides two methods:
For the name
argument, you must use the name under which the process factory is registered on the remote node.
Just like with local process spawning, the process started on the remote node will inherit certain parameters from its parent. In this case, the parent is the virtual identifier of the node that sent the spawn request. The process will also inherit the logging level.
To inherit environment variables, you need to enable the ExposeEnvRemoteSpawn
option in gen.NodeOptions.Security
when starting your node. The environment variable values must be encodable and decodable using the EDF format. If you are using a custom type as the value of an environment variable, that type must be registered (see the section).
Upon success, these methods return the process identifier (gen.PID
) of the process that was successfully started on the remote node.
You can also spawn a process on a remote node using the methods provided by the gen.Process
interface:
When using these methods, the process started on the remote node will inherit parameters from the parent process, including the application name, logging level, and environment variables (if the ExposeEnvRemoteSpawn
flag in gen.NodeOptions.Security
was enabled).
Note that linking options in gen.ProcessOptions
will be ignored when spawning processes remotely.
import (
"ergo.services/ergo"
"ergo.services/application/observer"
"ergo.services/ergo/gen"
)
func main() {
opt := gen.NodeOptions{
Applications: []gen.ApplicationBehavior {
observer.CreateApp(observer.Options{}),
}
}
node, err := ergo.StartNode("example@localhost", opt)
if err != nil {
panic(err)
}
node.Wait()
}
EnableSpawn(name gen.Atom, factory gen.ProcessFactory, nodes ...gen.Atom) error
DisableSpawn(name gen.Atom, nodes ...gen.Atom) error
Spawn(name gen.Atom, options gen.ProcessOptions, args ...any) (gen.PID, error)
SpawnRegister(register gen.Atom, name gen.Atom, options gen.ProcessOptions, args ...any) (PID, error)
RemoteSpawn(node gen.Atom, name gen.Atom, options gen.ProcessOptions, args ...any) (gen.PID, error)
RemoteSpawnRegister(node gen.Atom, name gen.Atom, register gen.Atom, options gen.ProcessOptions, args ...any) (gen.PID, error)
Registrar
The Service Discovering mechanism allows nodes to automatically find other nodes and determine connection parameters.
Each node has a Registrar that starts during the node's initialization and operates in either server or client mode. In Ergo Framework, the default Registrar implementation is available in the package ergo.services/ergo/net/registrar
. If you are using Saturn, you need to use the corresponding client available in ergo.services/registrar/saturn
. For communication with Erlang nodes, you should use the implementation from ergo.services/proto/erlang23/epmd
.
The default Registrar, when running in server mode, opens a TCP socket on localhost:4499
to register other nodes on the host and a UDP socket on *:4499
to handle resolve requests from other nodes, including remote ones. If the Registrar fails to start in server mode, it switches to client mode and registers with the Registrar on the other node at same host which is running in server-mode.
When a node starts, it establishes a TCP connection with the Registrar server, which remains open until the node terminates. During registration, your node communicates its name, information about all acceptors, and their parameters (port number, Handshake/Proto version, TLS flag) to the Registrar. If using Saturn, additional information about running applications on the node is also communicated (this functionality is not supported by the built-in Registrar).
When a node running the Registrar in server mode terminates, other nodes on the same host automatically attempt to switch to server mode. The node that first successfully opens the TCP socket on localhost:4499
will switch to server mode, while the remaining nodes on the host will continue to operate in client mode and automatically register with the new Registrar in server node.
Node names must be unique within the same host. If a node attempts to register with a name that has already been registered by another node, the Registrar will return the error gen.ErrTaken
.
When a node attempts to establish a network connection with another node, it first sends a resolve request (a UDP packet) to the Registrar, using the host name of the target node and port 4499
. In response to this request, the Registrar returns a list of acceptors along with their parameters needed to establish the connection:
The port number where the acceptor is running.
The version of the Handshake protocol.
The version of the Proto protocol.
The TLS flag, which indicates whether TLS encryption is required for this connection.
Using these parameters, the node then establishes the network connection to the target node.
By default, the built-in network stack of Ergo Framework (ergo.services/ergo/net
) is used for both incoming and outgoing connections. However, the node can work with multiple network stacks simultaneously (e.g., for compatibility with Erlang's network stack).
You can create multiple acceptors with different sets of parameters for handling incoming connections. For outgoing connections, you can manage the connection parameters using the Static Routes functionality, which allows you to control how connections are established with various nodes.
To handle HTTP-requests
The act.WebWorker
actor implements the low-level gen.ProcessBehavior
interface and allows HTTP requests to be handled as asynchronous messages. It is designed to be used with a web server (refer to the Web section in meta-processes). To launch a process based on act.WebWorker
, you need to embed it in your object and implement the required methods from the act.WebWorkerBehavior
interface.
Example:
type MyWebWorker struct {
act.WebWorker
}
func factoryMyWebWorker() gen.ProcessBehavior {
return &MyWebWorker{}
}
To work with your object, act.WebWorker
uses the act.WebWorkerBehavior
interface. This interface defines a set of callback methods:
type WebWorkerBehavior interface {
gen.ProcessBehavior
// Init invoked on a spawn WebWorker for the initializing.
Init(args ...any) error
// HandleMessage invoked if WebWorker received a message sent with gen.Process.Send(...).
// Non-nil value of the returning error will cause termination of this process.
// To stop this process normally, return gen.TerminateReasonNormal
// or any other for abnormal termination.
HandleMessage(from gen.PID, message any) error
// HandleCall invoked if WebWorker got a synchronous request made with gen.Process.Call(...).
// Return nil as a result to handle this request asynchronously and
// to provide the result later using the gen.Process.SendResponse(...) method.
HandleCall(from gen.PID, ref gen.Ref, request any) (any, error)
// Terminate invoked on a termination process
Terminate(reason error)
// HandleEvent invoked on an event message if this process got subscribed on
// this event using gen.Process.LinkEvent or gen.Process.MonitorEvent
HandleEvent(message gen.MessageEvent) error
// HandleInspect invoked on the request made with gen.Process.Inspect(...)
HandleInspect(from gen.PID, item ...string) map[string]string
// HandleGet invoked on a GET request
HandleGet(from gen.PID, writer http.ResponseWriter, request *http.Request) error
// HandlePOST invoked on a POST request
HandlePost(from gen.PID, writer http.ResponseWriter, request *http.Request) error
// HandlePut invoked on a PUT request
HandlePut(from gen.PID, writer http.ResponseWriter, request *http.Request) error
// HandlePatch invoked on a PATCH request
HandlePatch(from gen.PID, writer http.ResponseWriter, request *http.Request) error
// HandleDelete invoked on a DELETE request
HandleDelete(from gen.PID, writer http.ResponseWriter, request *http.Request) error
// HandleHead invoked on a HEAD request
HandleHead(from gen.PID, writer http.ResponseWriter, request *http.Request) error
// HandleOptions invoked on an OPTIONS request
HandleOptions(from gen.PID, writer http.ResponseWriter, request *http.Request) error
}
All methods in the act.WebWorkerBehavior
interface are optional for implementation.
It is most efficient to use act.WebWorker
in combination with act.Pool
for load balancing when handling HTTP requests:
//
// WebWorker
//
func factory_MyWebWorker() gen.ProcessBehavior {
return &MyWebWorker{}
}
type MyWebWorker struct {
act.WebWorker
}
// Handle GET requests.
func (w *MyWebWorker) HandleGet(from gen.PID, writer http.ResponseWriter, request *http.Request) error {
w.Log().Info("got HTTP request %q", request.URL.Path)
w.WriteHeader(http.StatusOK)
return nil
}
//
// Pool of workers
//
type MyPool struct {
act.Pool
}
func factory_MyPool() gen.ProcessBehavior {
return &MyPool{}
}
// Init invoked on a spawn Pool for the initializing.
func (p *MyPool) Init(args ...any) (act.PoolOptions, error) {
opts := act.PoolOptions{
WorkerFactory: factory_MyWebWorker,
}
p.Log().Info("started process pool of MyWebWorker with %d workers", opts.PoolSize)
return opts, nil
}
An example implementation of a web server using act.WebWorker
and act.Pool
for load distribution when handling HTTP requests can be found in the repository at https://github.com/ergo-services/examples, the project demo
.
The events mechanism in Ergo Framework is built on top of the pub/sub subsystem. It allows any process to become an event producer, while other processes can subscribe to these events. This enables flexible event-driven architectures, where processes can publish and consume events across the system.
To register an event (gen.Event
), you use the RegisterEvent
method available in the gen.Process
or gen.Node
interface. When registering an event, you can configure the following parameters using the gen.EventOptions
:
Notify
: This flag controls whether the producer should be notified about the presence or absence of subscribers for the event. If notifications are enabled, the process will receive a gen.MessageEventStart
message when the first subscriber appears and a gen.MessageEventStop
message when the last subscriber unsubscribes. This allows the producer to generate events only when there are active subscribers. If the event is registered using the RegisterEvent
method of the gen.Node
interface, this field is ignored.
Buffer
: This specifies how many of the most recent events should be stored in the event buffer. If this option is set to zero, buffering is disabled.
The RegisterEvent
function returns a token of type gen.Ref
upon success. This token is used to generate events. Only the process that owns the event, or a process that has been delegated the token by the event owner, can produce events for that registration.
To generate events, the gen.Process
interface provides the SendEvent
method. This method accepts the following arguments:
name
: The name of the registered event (gen.Atom
).
token
: The key obtained during the event registration (gen.Ref
).
message
: The event payload, which can be of any type.
To generate events, the gen.Node
interface provides the SendEvent
method. This method is similar to the SendEvent
method in the gen.Process
interface but includes an additional parameter, gen.MessageOptions
. This extra parameter allows for further customization of how the event message is sent, such as setting priority, compression, or other message-related options.
To subscribe to a registered event, the gen.Process
interface provides the following methods:
LinkEvent
: This method creates a link to a gen.Event
. The process will receive an exit signal if the producer process of the event terminates or if the event is unregistered. To remove the link, use the UnlinkEvent
method. When the link is created, the method returns a list of the most recent events from the producer's buffer.
MonitorEvent
: This method creates a monitor on a gen.Event
. The process will receive a gen.MessageDownEvent
if the producer process terminates or if the event is unregistered. If the event is unregistered, the Reason
field in gen.MessageDownEvent
will contain gen.ErrUnregistered
. To remove the monitor, use the DemonitorEvent
method.
Both methods accept an argument of type gen.Event
. For an event registered on the local node, you only need to specify the Name
field, and you can leave the Node
field empty. This simplifies subscribing to local events while still providing flexibility for handling events from remote nodes.
type myActor {
act.Actor
}
...
func (a *myActor) HandleMessage(from gen.PID, message any) error {
...
// local event
event := gen.Event{Name: "exampleEvent"}
lastEvents, err := a.LinkEvent(event)
...
// remote event
event := gen.Event{Name: "remoteEvent", Node: "remoteNode"}
lastEvents, err := a.LinkEvent(event)
}
Upon successfully creating a link or monitor, the function returns a list of events (gen.MessageEvent
) provided by the producer process from its message buffer. Each event contains the following information: the event name (gen.Event
), the timestamp of when the event was generated (obtained using time.Now().UnixNano()
), and the actual event value sent by the producer process. If the list of events is empty, it means that either the producer process has not yet generated any events or the producer registered the event with a zero-sized buffer.
For an example demonstrating the capabilities of the events mechanism, you can refer to the events project in the ergo-services/examples
repository:
Linking and Monitoring Mechanisms
These mechanisms allow processes to respond to termination events of targets with which links or monitors have been created. Such targets can include:
A process identifier (gen.PID
)
A process name (gen.Atom
or gen.ProcessID
)
A process alias (gen.Alias
)
An event (gen.Event
)
A node name (gen.Atom
, in the case of monitoring a node connection)
The key difference between linking and monitoring lies in the response to the termination event of the target. When a process creates a link with a target, and that target terminates, the linked process receives an exit signal, which generally causes it to terminate as well.
In contrast, when a process creates a monitor with a target, and the target terminates, the monitoring process receives a gen.MessageDown*
message, which contains the reason for the target's termination. This notification allows the monitoring process to react accordingly without necessarily terminating itself.
In Ergo Framework, links are created between processes to ensure that termination events of one process propagate to linked processes. The gen.Process
interface provides the following methods for creating and managing links:
Link
: This universal method allows you to create a link to a target, which can be a gen.Atom
(for a local process), gen.PID
, gen.ProcessID
, or gen.Alias
. The process will receive an exit signal upon the target process's termination. To remove the link, use the Unlink
method with the same argument.
LinkPID
: Creates a link to a process by its gen.PID
. The process will receive an exit signal upon the target process's termination. To remove the link, use UnlinkPID
or Unlink
with the same argument.
LinkProcessID
: Creates a link to a process by its gen.ProcessID
. The process will receive an exit signal upon the target process's termination or when the associated name is unregistered. To remove the link, use UnlinkProcessID
or Unlink
.
LinkAlias
: Creates a link to a process by its gen.Alias
. The process will receive an exit signal upon the termination of the process that created the alias or upon the alias's deletion. To remove the link, use UnlinkAlias
or Unlink
.
LinkNode
: Creates a link to a node connection using the node name (gen.Atom
). The process will receive an exit signal if the connection to the target node is lost. To remove the link, use UnlinkNode
.
LinkEvent
: Creates a link to an event (gen.Event
). The process will receive an exit signal when the producer process of the event terminates or when the gen.Event
is unregistered. To remove the link, use UnlinkEvent
. More information about this mechanism can be found in the section.
Exit signals are always delivered with the highest priority and placed in the Urgent queue of the process's mailbox.
Monitors allow a process to observe the lifecycle of a target process or event. The gen.Process
interface provides several methods for creating monitors:
Monitor
: A universal method for creating a monitor. The target can be a gen.Atom
(for a local process), gen.PID
, gen.ProcessID
, or gen.Alias
. The monitoring process will receive a gen.MessageDown*
message upon the termination of the target process. To remove the monitor, use the Demonitor
method with the same argument.
MonitorPID
: Creates a monitor for a process by its gen.PID
. The monitoring process will receive a gen.MessageDownPID
message upon the target process's termination, with the reason for termination included in the Reason
field. To remove the monitor, use DemonitorPID
or Demonitor
.
MonitorProcessID
: Creates a monitor for a process by its gen.ProcessID
. The monitoring process will receive a gen.MessageDownProcessID
message upon the target process's termination, with the reason for termination in the Reason
field. If the target process unregisters its name, the Reason
field of the gen.MessageDownProcessID
will contain gen.ErrUnregistered
. To remove the monitor, use DemonitorProcessID
or Demonitor
.
MonitorAlias
: Creates a monitor for a process by its gen.Alias
. The monitoring process will receive a gen.MessageDownAlias
message upon the termination of the process that created the alias or if the alias is deleted. In the case of alias deletion, the Reason
field of the gen.MessageDownAlias
will contain gen.ErrUnregistered
. To remove the monitor, use DemonitorAlias
or Demonitor
.
MonitorNode
: Creates a monitor for a node connection. The monitoring process will receive a gen.MessageDownNode
message upon the disconnection of the node. To remove the monitor, use DemonitorNode
.
MonitorEvent
: Creates a monitor for an event (gen.Event
). The monitoring process will receive a gen.MessageDownEvent
message when the producer of the event terminates or if the event is unregistered. If the event is unregistered, the Reason
field of the gen.MessageDownEvent
will contain gen.ErrUnregistered
. To remove the monitor, use DemonitorEvent
. More details about this mechanism can be found in the section.
Messages of type gen.MessageDown*
are delivered with high priority and placed in the System queue of the process's mailbox.
Thanks to , the linking and monitoring mechanisms in Ergo Framework can be used with targets on remote nodes.
When a process creates a link or monitor with a remote target (such as a process, alias, or event), if the connection to the node where the target resides is lost, the process will receive an exit signal or a gen.MessageDown*
message, depending on the mechanism used. In this case, the reason for termination will be gen.ErrNoConnection
. This allows processes to handle network disconnections and remote target failures seamlessly, as part of the same fault-tolerant mechanisms used for local processes.
Managing outgoing connections
When creating an outgoing connection to a remote node, the node first checks the internal Static Routing Table for an existing route. If no static route is found for the specified remote node's name, the node uses the Registrar and sends a resolve request to obtain the necessary connection parameters (see the section for more information).
To define a static route for a specific node or group of nodes, use the AddRoute
method of the gen.Network
interface. This method allows you to manually configure the connection parameters for outgoing connections, ensuring more control over how the node establishes connections with the specified nodes, bypassing the need to query the Registrar.
match
: defines the name of the remote node or a pattern to match multiple nodes. The node uses the MatchString
method from Go's standard library regexp
package to match the node names based on this pattern.
route
: specifies the connection parameters to be used when establishing a connection to the matched node(s). This includes details such as the network protocol, port, TLS settings, and other connection options.
weight
: defines the weight of the specified route. If multiple routes match the same node name in the routing table, the node will return a list of routes sorted by descending weight. The node will then use the route with the highest weight to establish the connection.
To check for the existence of a static route for a specific remote node name, use the Route
method of the gen.Network
interface. To remove a static route, use the RemoveRoute
method of the gen.Network
interface, specifying the match
value that was used when adding the route.
The route
argument in the AddRoute
method allows you to specify detailed connection parameters for establishing outgoing connections to remote nodes. This enables fine-grained control over how connections are created:
Resolver
: specifies a particular Registrar to be used by the node for resolving the connection. This interface is obtained via the Resolver
method of the gen.Registrar
interface. This functionality is useful when a node is working with multiple network stacks. For example, see the section for managing clusters with different network stacks simultaneously.
Route
: allows you to explicitly define the host name, port number, TLS mode, handshake version, and protocol version. If the Resolver
parameter is explicitly set, the route parameters provided will override any route information returned by the Registrar.
Cookie
: overrides the default Cookie
value specified in gen.NetworkOptions
for the given route, providing custom authentication or security settings specific to the connection.
Cert
: specifies a CertManager
to establish a TLS connection, ensuring that the appropriate certificates are used during the connection.
Flags
: you can override specific flags for the given route. For example, you can explicitly disable certain mechanisms, such as preventing the remote node from spawning processes over the established connection.
AtomMapping
: enables automatic substitution of gen.Atom
values transmitted or received during the connection.
LogLevel
: defines the logging level for the network stack within the established connection. This provides granular control over the verbosity of log messages for troubleshooting or monitoring network activity on a per-connection basis
In addition to the ability to spawn processes on remote nodes, Ergo Framework also allows you to start applications remotely. Similar to process spawning, access to this functionality is controlled by the EnableRemoteApplicationStart
flag in gen.NetworkFlags
. By default, this flag is enabled.
To allow an application to be started from a remote node, it must be registered in the network stack. The gen.Network
interface provides the following methods for controlling this functionality:
The name
argument represents the application's name, and the nodes
argument controls which nodes are permitted to start the application remotely. This gives you fine-grained control over which nodes can initiate the remote startup of specific applications.
Using the EnableApplicationStart
method with the nodes
argument creates an access list of remote nodes that are permitted to start the specified application.
When using the DisableApplicationStart
method with the nodes
argument, it removes the specified nodes from the access list. If the access list becomes empty after removing nodes, access is opened to all remote nodes. To fully disable access to starting the application, use the DisableApplicationStart
method without the nodes
argument.
To start an application on a remote node, use the ApplicationStart
method of the gen.RemoteNode
interface (which you can access via the GetNode
or Node
methods of the gen.Network
interface). This interface also provides additional methods to control the application's :
When an application is started remotely, the Parent
property of the application is assigned the name of the node that initiated the startup. You can retrieve information about the application and its status using the ApplicationInfo
method of the gen.Node
interface.
AddRoute(match string, route gen.NetworkRoute, weight int) error
EnableApplicationStart(name gen.Atom, nodes ...gen.Atom) error
DisableApplicationStart(name gen.Atom, nodes ...gen.Atom) error
ApplicationStartTemporary(name gen.Atom, options gen.ApplicationOptions) error
ApplicationStartTransient(name gen.Atom, options gen.ApplicationOptions) error
ApplicationStartPermanent(name gen.Atom, options gen.ApplicationOptions) error
Data Types and Interfaces Used in Ergo Framework
This type is an alias for the string
type. It was introduced as a specialized string that allows differentiation between regular strings and those used for node names or process names. In the network stack, this type is also handled separately and is actively used in the Atom-cache and Atom-mapping mechanisms. When printed, values of this type are enclosed in single quotes.
fmt.Printf("%s", gen.Atom("hello"))
...
'hello'
This type is used as a process identifier. It is a structure containing several fields: the node name Node
, a unique sequential number within the node ID
, and a Creation
field.
When printed as a string, this type is transformed into the following representation:
pid := gen.PID{Node:"t1node@localhost", ID:1001, Creation:1685523227}
fmt.Printf("%s", pid)
...
<90A29F11.0.1001>
The node name is encoded into a hash using the CRC32 algorithm.
The node can generate unique identifiers. For this purpose, the gen.Node
interface provides the MakeRef
method. It returns a guaranteed unique value of type gen.Ref
. These values are used as unique request identifiers in synchronous calls and as unique tokens when registering a gen.Event
by a producer process.
When printed as a string, the value is represented as:
ref := gen.Ref{
Node:"t1node@localhost",
Creation:1685524098,
ID:[3]uint32{0x1f4c2, 0x5d90, 0x0},
}
fmt.Printf("%s", ref)
...
Ref#<90A29F11.128194.23952.0>
This type is used as a process identifier with an associated name. It is a structure containing two fields: the process name Name
and the node name Node
.
When printed as a string, the node name is transformed into a hash using the CRC32 algorithm, resulting in the following representation:
process := gen.ProcessID{Name:"example", Node:"t1node@localhost"}
fmt.Printf("%s", process)
...
<90A29F11.'example'>
This type is an alias for the gen.Ref
type. Values of type gen.Alias
are used as temporary process identifiers, created using the CreateAlias
method in the gen.Process
interface, as well as identifiers for meta-processes.
When printed as a string, the representation is similar to gen.Ref
, but with a different prefix:
alias := gen.Alias{
Node:"t1node@localhost",
Creation:1685524098,
ID:[3]uint32{0x1f4c2, 0x5d90, 0x0},
}
fmt.Printf("%s", ref)
...
Alias#<90A29F11.128194.23952.0>
Values of this type are used when subscribing to events through the MonitorEvent
and LinkEvent
methods. This type is a structure similar to gen.ProcessID
, containing two fields: Name
and Node
.
When printed as a string, the representation is similar to gen.ProcessID
, but with an added prefix:
event := gen.Event{Name:"event1", Node:"t1node@localhost"}
fmt.Printf("%s", event)
...
Event#<90A29F11:'event1'>
This type is an alias for the string
type. It is used for the names of environment variables for nodes and processes. In Ergo Framework, environment variable names are case-insensitive.
When printed as a string, the value of this type is converted to uppercase:
env := gen.Env("name1")
fmt.Printf("%s", env)
...
NAME1
To start a node, use the function ergo.StartNode(...)
. If the node starts successfully, it returns the gen.Node
interface. This interface provides a set of functions for interacting with the node. The full list of methods can be found in the reference documentation.
This type is an interface to a process object. In Ergo Framework, this interface is embedded in the actor act.Actor
, so all methods of this interface become available to objects based on act.Actor
(and its derivatives).
type myActor struct {
act.Actor
}
func (a *myActor) Init(args ...any) error {
// Sending a message to itself using methods PID and Send
// that belong to the embedded gen.Process interface
a.Send(a.PID(), "hello")
}
The full list of available methods for the gen.Process
interface can be found in the reference documentation.
You can access this interface using the Network
method of the gen.Node
interface. This interface provides a set of methods for managing the node's network stack. The full list of available methods for this interface can be found in the reference documentation.
This interface allows you to retrieve information about a remote node with which a connection has been established, as well as to spawn processes (using the Spawn
and SpawnRegister
methods) and start applications on it (using the ApplicationStart*
methods). You can access this interface through the GetNode
and Node
methods of the gen.Network
interface.
This package implements the gen.LoggerBehavior
interface and provides the capability to log messages to a file, with support for log file rotation at a specified interval.
Period
Specifies the rotation period (minimum value: time.Minute
)
TimeFormat
Sets the format for the timestamp in log messages. You can choose any existing format (see time package) or define your own. By default, timestamps are in nanoseconds
IncludeBehavior
includes the process behavior in the log
IncludeName
includes the registered process name
ShortLevelName
enables shortnames for the log levels
Path
directory for the log files, default: ./log
Prefix
defines the log files name prefix (<Path>/<Prefix>.YYYYMMDDHHMi.log[.gz]
)
Compress
Enables gzip compression for log files
Depth
Specifies the number of log files in rotation
package main
import (
"time"
"ergo.services/ergo"
"ergo.services/ergo/gen"
"ergo.services/logger/rotate"
)
func main() {
var options gen.NodeOptions
ropt := rotate.Options{Period: time.Minute, Compress: false}
rlog, err := rotate.CreateLogger(ropt)
if err != nil {
panic(err)
}
logger := gen.Logger{
Name: "rotate",
Logger: rlog,
}
options.Log.Loggers = append(options.Log.Loggers, logger)
node, err := ergo.StartNode("demo@localhost", options)
if err != nil {
panic(err)
}
node.Wait()
}
schedule tasks on a repetitive basis, such as daily, weekly, or monthly
Cron functionality is provided to enable periodic job execution in a node. Its implementation replicates the functionality of the and supports the format.
To set up jobs at the node startup, use the gen.NodeOptions.Cron
parameters. Each job is described using the gen.CronJob
:
Parameters:
Name
defines the name of the job. It must be unique among all cron jobs
Spec
sets the scheduling parameters for the job in
Location
allows to set the time zone for the job's scheduling parameters. By default, the local time zone is used
Action
defines what needs to be run
Fallback
allows specifying a process to notify if the Action results in an error
The Action
field has the interface type gen.CronAction
. You can use the following ready-to-use implementations:
gen.CreateCronActionMessage
(to any, priority gen.MessagePriority)
– creates an Action
that sends a gen.MessageCron
message to the specified process to
. The to
argument can be one of the following: gen.Atom
, gen.ProcessID
, gen.PID
, or gen.Alias
, referring to either a local process or a process on a remote node.
gen.CreateCronActionSpawn
(factory gen.ProcessFactory, options gen.CronActionSpawnOptions)
– spawns a local process. The options
argument specifies the startup parameters for the process.
gen.CreateCronActionRemoteSpawn
(node gen.Atom, name gen.Atom, options gen.CronActionSpawnOptions)
– spawns a process on a remote node. The mechanism for spawning processes on a remote node is described in the section.
When using CreateCronActionSpawn
or CreateCronActionRemoteSpawn
, the following environment variables will be added to the spawned processes:
gen.CronEnvNodeName
gen.CronEnvJobName
gen.CronEnvJobActionTime
In the example below, every day at 21:07 (Shanghai time), a gen.MessageCron
message will be sent to the local process registered as myweb1
. If the message cannot be delivered, a gen.MessageCronFallback
message will be sent to the local process named myweb2
:
gen.Cron
interfaceYou can also manage jobs using the gen.Cron
interface. It provides the following methods for this:
Access to the gen.Cron
interface can be obtained using the Cron
method of the gen.Node
interface.
You can also create your own Action. To do this, simply implement the gen.CronAction
interface:
It is worth noting that the action_time
argument is passed in the time zone of the job, i.e., the one specified in the gen.CronJob.Location
field when the job was created.
The Info
method of the interface is used when calling gen.Cron.Info()
to retrieve summary information about Cron and its jobs.
*
represents "all". For example, using * * * * *
will run every minute. Using * * * * 1
will run every minute only on Monday.
-
allows specifying a range of values. For example, 15 23 * * 1-3
will trigger the job from Monday to Wednesday at 23:15
,
defines a sequence of values: 15,25,35 23 * * *
– this will trigger the job every day at 23:15, 23:25, and 23:35
/
used for step values: */5 3 * * *
this will trigger the job every 5 minutes during the hour starting from 3:00 (3:00, 3:05 ... 3:55). It can also be used with ranges: 21-37/5 17 * * *
- every day at 17:21, 17:26, 17:31 and 17:36
#
available for use only in the day-of-week field in the format day-of-week#occurrence
. It allows specifying constructs like 5#2
, which refers to the second Friday of the month
L
stands for "last". In the day-of-month field, it specifies the last day of the month. In the day-of-week field - allows specifying constructs such as "the last Friday" (5L
) of a given month.
You can also use the following macro definitions:
@hourly
- 1 * * * *
every hour
@daily
- 10 3 * * *
every day at 3:10
@monthly
- 20 4 1 * *
on day 1 of the month at 4:20
@weekly
- 30 5 * * 1
on Monday at 5:30
1 19 * * 1#1,7L
run at 19:01 every month on the first Monday and last Sunday
15 15 10-15/3 * *
run at 15:15 every month on the 10th and 13th
To view the schedule of your job, use the JobSchedule
method of the gen.Cron
interface. To view the schedule of all jobs, use the Schedule
method of this interface.
This implementation takes time changes into account – if a time adjustment occurs at the time of the job's scheduled execution and the new time does not match the job's schedule, the job will not be executed.
For example, if a job has the specification 0 2 * * *
(to run every day at 2:00), it will be skipped on March 30, 2025, because at 2:00 that day, the time will be moved forward by one hour (due to DST). After the end of DST on October 26, 2025, at 3:00, the time will be shifted back by one hour, but the job in the example will be executed only once.
An actor in Ergo Framework implements the low-level gen.ProcessBehavior
interface. To launch a process based on act.Actor
, you need to create an object with an embedded act.Actor
and implement a factory function for it. For example:
act.Actor
uses the act.ActorBehavior
interface to interact with your object. This interface defines a set of callback methods:
All methods in the act.ActorBehavior
interface are optional, allowing you to implement only the necessary callbacks in your object. Since act.Actor
embeds the gen.Process
interface, you can directly use its methods from within your actor object.
Example:
Process initialization begins when act.Actor
invokes the Init
method of the act.ActorBehavior
interface, passing the args
provided during Spawn
(or SpawnRegister
). If Init
completes successfully, the node registers the process, allowing it to receive messages or synchronous requests. If Init
fails, Spawn
(or SpawnRegister
) returns the error. It's important to note that during initialization, the process is not yet registered with the node, limiting access to certain methods of the embedded gen.Process
interface until the process is fully initialized.
The process mailbox contains several queues: Main, System, Urgent, and a special Log queue. In act.Actor
, messages are processed in the following order: Urgent, System, Main, and then Log.
Asynchronous messages are sent using the Send
method of the gen.Node
or gen.Process
interface. Upon receiving such a message, act.Actor
calls the HandleMessage
method of the gen.ActorBehavior
interface.
For synchronous requests, the HandleCall
callback method is invoked in an actor based on act.Actor
. The result returned by this method is sent as the response to the request.
A process can send asynchronous messages to itself, but attempting to make a synchronous request to itself will return the error gen.ErrTimeout
..
act.Actor
also allows handling synchronous requests asynchronously. This feature lets you delegate the processing of synchronous requests to other processes or delay the response. To do this, the HandleCall
method should return nil
, and you should use the SendResponse
method of the gen.Process
interface to send the response. You must provide the gen.PID
of the requesting process and the gen.Ref
reference of the request.
For debugging or monitoring the internal state, the Inspect
method allows high-priority synchronous requests. In act.Actor
, this triggers the HandleInspect
method. This feature is widely used in the tool.
If the actor process is registered as a logger, the HandleLog
method is called when log messages (gen.MessageLog
) are received, generated using the gen.Log
interface. If the actor process has subscribed to events using the LinkEvent
or MonitorEvent
methods of gen.Process
, the HandleEvent
method is called when an event message is received.
To stop a process, simply return a non-nil error
value from the message-handling callback. After termination, the Terminate
callback method will be called, with the reason
argument being the returned error value.
Additionally, you can terminate a process using predefined constants like:
gen.TerminateReasonNormal
for a normal (non-failure) termination.
gen.TerminateReasonKill
when the process is forcibly stopped using the Kill
method from the gen.Node
interface.
In Ergo Framework, two additional termination reasons are defined:
gen.TerminateReasonPanic: Occurs when a panic happens during message processing.
gen.TerminateReasonShutdown: This termination reason can be sent by the parent process or the node when the node is stopped using the StopForce
method of the gen.Node
interface. It is not considered a failure.
At the moment the Terminate
callback is invoked, the process has already been removed from the node. Therefore, most methods of the gen.Process
interface will return the gen.ErrNotAllowed
error.
The SplitHandle
option allows an actor to call the callback methods of the act.ActorBehavior
interface based on the process identifier used to send a message or make a synchronous call to your process. You can enable this option using the SetSplitHandle(split bool)
method defined in act.Actor
. To check the current value of this option, use the SplitHandle
method. Here is an example usage:
If your process has a registered associated name and receives a message using that name, act.Actor
will call the HandleMessageName
callback. The name
argument will be the value used to send the message. For synchronous requests, the HandleCallName
callback will be invoked.
If a process alias (gen.Alias
) was used, the callbacks HandleMessageAlias
and HandleCallAlias
will be triggered for messages and synchronous requests, respectively. This enables customized handling based on the way the message or request was sent.
In Ergo Framework, an exit signal can be sent to your process by another process or node using the SendExit
method from the gen.Process
or gen.Node
interfaces. It can also be generated by the node if your process had a link created via the Link*
methods in the gen.Process
interface. By default, when an actor receives an exit signal, it terminates and calls the Terminate
callback with the reason specified in the exit signal.
To prevent your actor from terminating upon receiving an exit signal, you can enable the option to intercept such signals. This can be done using the SetTrapExit(trap bool)
method in act.Actor
. The default value is false
, but you can enable it at any time, for example, during initialization:
When the TrapExit
option is enabled, act.Actor
converts exit signals into standard gen.MessageExit*
messages:
gen.MessageExitPID: The source of the exit signal was a process.
gen.MessageExitProcessID: The source was gen.ProcessID
(a link created using LinkProcessID
).
gen.MessageExitAlias: The source was gen.Alias
(link created using LinkAlias
).
gen.MessageExitEvent: The source was gen.Event
(link created using LinkEvent
).
gen.MessageExitNode: The source was a network connection with a node (link created using LinkNode
).
This implementation of the meta-process allows running external programs and communicating with them using stdin and stdout. Communication can be performed in both text and binary formats.
To create a meta.Port
, you need to use the meta.CreatePort
function. It takes meta.PortOptions
as an argument, which allows specifying the following options:
Cmd
command to run
Args
arguments for the command
Env
environment variables for the program being executed
EnableEnvMeta
adds environment variables of the meta-process
EnableEnvOS
adds OS environment variables
Tag
can help distinguish between multiple meta-processes running with the same Cmd
/Args
Process
The name of the process that will handle meta.MessagePort*
messages. If not specified, all messages will be sent to the parent process. You can also use an act.Pool
process for distributing message handling
SplitFuncStdout
to specify a split-function. (uses Scanner
from the standard bufio
package) for processing data from stdout in text mode. By default, data is split by lines
SplitFuncStderr
to specify a split-function for the data from stderr
Binary
enables binary mode for the data from stdout. See for more information
Below is an example of creating and running the external program example-prog
using a meta-process meta.Port
:
The implementation of meta.Port
supports binary mode when working with stdin/stdout. To enable it, use the meta.PortOptions.Binary.Enable
option and specify the binary format parameters:
ReadBufferSize
sets the buffer size for reading. The default size is 8192
ReadBufferPool
defines a pool for message buffers. The Get
method should return an allocated buffer of type []byte
ReadChunk
enables auto-chunking mode, which splits the incoming stream into chunks according to the specified parameters. For more details, see Auto chunking
WriteBufferKeepAlive
enables the software keep-alive mechanism – a specified value will be automatically sent at the interval defined in the option WriteBufferKeepAlivePeriod
When binary mode is enabled, the SplitFuncStdout
option is ignored. However, stderr is processed in text mode, and you can use the SplitFuncStderr
option.
This feature allows automatically chunking the incoming stream in binary mode. The chunks can be of fixed length – to achieve this, you need to specify the chunk size using the FixedLength
option. For data with dynamic length, you must specify header parameters that will define the chunk length.
To enable the auto-chunking feature in binary mode, use the meta.PortOptions.Binary.ReadChunk.Enable
option, and specify the parameters for automatic data chunking using the following options:
FixedLength
allows you to set the length of chunks in the stdout. In this case, all Header*
options will be ignored. Leave this field with a zero value to work with dynamically sized chunks
HeaderSize
specifies the size of the header for dynamically sized chunks
HeaderLengthPosition
specifies the position in the header where the length is stored
HeaderLengthSize
specifies the size of the length in bytes (1, 2 or 4)
HeaderLengthIncludesHeader
specifies whether the length value includes the header size
MaxLength
allows you to set the maximum chunk length. If this value is exceeded, the meta-process will terminate, and the launched program will also be automatically stopped. Leave this value as zero if no length limits are required
For instance: incoming stream consists of dynamically sized chunks with a 7-byte header, and the length is encoded as a 32-bit number at the 3rd byte position (counting from 0). The length value includes the header size: L = 7 + len(payload)
In this case, the auto-chunking options would be as follows
You can find an example implementation in the repository , in the port
project. Use the -bin
argument to enable the demonstration of binary mode operation:
"Pool of Workers" design pattern
The act.Pool
actor implements the low-level gen.ProcessBehavior
interface and provides the "Pool of Workers" design pattern functionality. All asynchronous messages and synchronous requests sent to the pool process are redirected to worker processes in the pool.
To launch a process based on act.Pool
, you need to embed it in your object. For example:
act.Pool
uses the act.PoolBehavior
interface to interact with your object:
The Init
method is mandatory for implementation in act.PoolBehavior
, while the other methods are optional. Similar to act.Actor
, act.Pool
has the embedded gen.Process
interface, allowing you to use its methods directly from your object:
With act.PoolOptions
, you can configure various parameters for the pool, such as:
PoolSize: Determines the number of worker processes to be launched when your pool process starts. If this option is not set, the default value of 3 is used.
WorkerFactory: A factory function that creates worker processes.
WorkerMailboxSize: Specifies the size of the mailbox for each worker process.
WorkerArgs: Arguments passed during the startup of worker processes.
All asynchronous messages and synchronous requests received in the pool process's mailbox are automatically forwarded to worker processes, distributing the load evenly. The load is distributed using a FIFO queue of worker processes.
Message/Request Handling Algorithm:
Select a worker from the FIFO queue.
Forward the message/request.
If errors like gen.ErrProcessUnknown
or gen.ErrProcessTerminated
occur, a new worker is started, and the message is sent to it.
If gen.ErrMailboxFull
occurs, the worker is requeued, and the next worker is selected.
Upon successful forwarding, the worker returns to the end of the queue.
If all workers are busy (e.g., due to gen.ErrMailboxFull
), the pool process logs an error, such as "no available worker process. ignored message from <PID>."
Forwarding to workers only happens for messages from the Main queue. To ensure a message is handled by the pool process itself, send it with a priority higher than gen.MessagePriorityNormal
. The gen.Process
interface provides SendWithPriority
and CallWithPriority
methods for sending messages or making synchronous requests with a specified priority.
To dynamically manage the number of worker processes, act.Pool
provides the following methods:
AddWorkers: This method adds a specified number of worker processes to the pool. Upon successful execution, it returns the total number of worker processes in the pool after the addition.
RemoveWorkers: This method stops and removes a specified number of worker processes from the pool. Upon successful execution, it returns the total number of worker processes remaining in the pool after the removal.
To handle synchronous HTTP requests with the actor model in Ergo Framework, they must be converted into asynchronous messages. For this purpose, two meta-processes have been created: meta.WebServer
and meta.WebHandler
. The first is used to start an HTTP server, and the second transforms synchronous HTTP requests into messages that are sent to a worker process. A worker process can be created using act.WebWorker
, which processes the incoming messages asynchronously.
You can create a meta.WebHandler
using the meta.CreateWebHandler
function. This function creates an object that implements the gen.MetaProcessBehavior
and http.Handler
interfaces.
It takes meta.WebHandlerOptions
as an argument:
Process
: The name of the process to which transformed asynchronous meta.MessageWebRequest
messages are sent. If not specified, these messages are sent to the parent meta-process.
RequestTimeout
: The time allowed for the process to handle the message, with a default of 5 seconds.
After successful creation, you need to start the meta-process using SpawnMeta
from the gen.Process
interface.
Once the meta-process is running, it can be used as an HTTP request handler
To create the meta.WebServer
meta-process, use the meta.CreateWebServer
function with the meta.WebServerOptions
argument. These options allow you to set:
Host
: The interface on which the port will be opened for handling HTTP requests.
Port
: The port number.
CertManager
: Enables TLS encryption for the HTTP server. You can use the node's CertManager
to activate the node's certificate by using the CertManager()
method of the gen.Node
interface.
Handler
: Specifies the HTTP request handler.
When the meta-process is created, the HTTP server starts. If the server fails to start, meta.CreateWebServer
returns an error. After successful creation, start the meta-process using SpawnMeta(...)
from the gen.Process
interface.
Example:
Example can be found in the repository at , specifically in the demo
project.
The network transparency of Ergo Framework allows processes to exchange messages regardless of whether they are running locally or on remote nodes. The process identifier (gen.PID
, gen.ProcessID
, gen.Alias
) includes the name of the node where the process is running—this information is used by the node to route the message.
If a message is being sent to a process on a remote node, the mechanism helps determine how to establish a network connection to that node. Once the connection is established, the node automatically encodes the message and sends it to the remote node, which then delivers it to the recipient process's mailbox.
The Ergo Data Format (EDF) is used for data transmission within Ergo Framework.
The Ergo Data Format (EDF) supports encoding and decoding of all scalar types in Golang, as well as specialized types from Ergo Framework (see ). To encode custom types, they must be registered using the edf.RegisterTypeOf
method from the ergo.services/ergo/net
network stack. The registration of a custom type must occur on both the sender and receiver nodes.
If you're registering a data type that contains another custom type among its child elements, you must register the child type first before registering the parent type.
Example:
All child elements in the registered structures must be public. If you need to encode/decode structures with private fields, you will need to implement a custom encoder/decoder for that data type. To do this, you must implement the edf.Marshaler
and edf.Unmarshaler
interfaces.
These interfaces allow you to define how the custom type is marshaled (encoded) and unmarshaled (decoded), giving you control over the serialization and deserialization process for types with private fields or special requirements.
Example:
The length of a message encoded using the MarshalEDF
method must not exceed 2^32
bytes.
In Golang, the error
type is an interface. To transmit error types over the network, they also need to be registered using the edf.RegisterError
method. A set of standard errors in Ergo Framework (such as gen.Err*
, gen.TerminateReason*
) is automatically registered when the node starts.
Additionally, the following limits are imposed on specific types:
gen.Atom
: Maximum length of 255 bytes
string
: Maximum length of 65,535 bytes (2^16)
binary
: Maximum length of 2^32 bytes
map/array/slice
: Maximum number of elements is 2^32
When establishing a connection between nodes, during the handshake phase, the nodes exchange dictionaries containing registered data types and registered errors. These dictionaries are used to reduce the volume of transmitted information during message exchanges, as identifiers are used instead of the names of registered types and errors.
If you register a data type or an error after the connection is established, sending data of that type will result in an error. To make these newly registered types available for network exchange, you will need to disconnect and re-establish the connection.
The EDF format does not support pointers. The use of pointers should remain an internal optimization within your application and should not extend beyond it.
For working with TCP connections, two types of meta-processes are implemented in Ergo Framework:
TCP Server Process: Responsible for creating a TCP server. It opens the socket and handles incoming connections.
TCP Connection Handler: Manages established TCP connections, whether they are incoming or client-initiated
To create a TCP server meta-process, use the meta.CreateTCPServer
function. It accepts meta.TCPServerOptions
with the following parameters:
Host, Port: Specify the interface and port for incoming connections.
ProcessPool: Defines a list of worker processes to handle incoming connections. Each new connection is assigned a worker from the pool. If ProcessPool
is not set, the parent process handles all connections. Using act.Pool
in ProcessPool
is not recommended to avoid packet mismanagement.
CertManager: Enables TLS encryption.
BufferSize/BufferPool: Manage incoming data buffers.
KeepAlivePeriod/InsecureSkipVerify: Handle TCP keep-alive and TLS certificate verification.
After creating the meta-process, you should start it with SpawnMeta
from the gen.Process
interface. Example:
If starting the meta-process fails for any reason, you need to free up the port specified in meta.TCPServerOptions.Port
by using the Terminate
method of the gen.MetaBehavior
interface.
Upon an incoming connection, the TCP server meta-process creates and starts a new meta-process to handle that connection.
To create a client TCP connection, use the meta.CreateTCPConnection
function. It initiates a TCP connection and, if successful, creates a meta-process to manage it.
The function takes meta.TCPConnectionOption
, similar to meta.TCPServerOptions
, but instead of ProcessPool
, it includes a Process
field for specifying a process to handle TCP packets. If this field is not used, all data packets are sent to the parent process.
After a successful connection, start the meta-process with SpawnMeta
. If the process fails, close the connection using the Terminate
method of the gen.MetaBehavior
interface..
Example:
To handle a TCP connection, the meta-process communicates with either the parent or worker process based on the meta-process's launch options. Three types of messages are used:
meta.MessageTCPConnect
: Sent when the meta-process starts, containing the connection ID (ID
), RemoteAddr
, and LocalAddr
information.
meta.MessageTCPDisconnect
: Sent when the TCP connection is disconnected, including cases where the meta-process is terminated.
meta.MessageTCP
: Used for both receiving and sending data.
If the meta-process cannot send a message to the worker process, the connection is terminated, and the meta-process stops working.
To send a message to a TCP connection, use the Send
(or SendAlias
) method from the gen.Process
interface. The recipient should be the gen.Alias
of the meta-process managing the connection. The message must be of type meta.MessageTCP
. Note that the meta.MessageTCP.ID
field is ignored during sending (it is only used for incoming messages).
For a detailed example of a TCP server implementation, you can check out the demo
project in the repository: .
func factory_myPool() gen.ProcessBehavior {
return &myPool{}
}
type myPool struct {
act.Pool
}
...
node.Spawn(factory_myPool, gen.ProcessOptions)
type PoolBehavior interface {
gen.ProcessBehavior
Init(args ...any) (PoolOptions, error)
HandleMessage(from gen.PID, message any) error
HandleCall(from gen.PID, ref gen.Ref, request any) (any, error)
Terminate(reason error)
HandleInspect(from gen.PID) map[string]string
HandleEvent(message gen.MessageEvent) error
}
func (p *myPool) Init(args...) (act.PoolOptions, error) {
var options act.PoolOptions
p.Log().Info("starting pool process with name", p.Name())
// ...
// set pool options
// ...
return options, nil
}
func (p *myPool) Terminate(reason error) {
p.Log().Info("pool process terminated with reason: %s", reason)
}
type PoolOptions struct {
PoolSize int64
WorkerFactory gen.ProcessFactory
WorkerMailboxSize int64
WorkerArgs []any
}
type CronJob struct {
// Name job name
Name gen.Atom
// Spec time spec in "crontab" format
Spec string
// Location defines timezone
Location *time.Location
// Action
Action gen.CronAction
// Fallback
Fallback gen.ProcessFallback
}
func main() {
var options gen.NodeOptions
// ...
locationAsiaShanghai, _ := time.LoadLocation("Asia/Shanghai")
options.Cron.Jobs = []gen.CronJob{
gen.CronJob{Name: "job1",
Spec: "7 21 * * *",
Action: gen.CreateCronActionMessage(gen.Atom("myweb1"),
gen.MessagePriorityNormal),
Location: locationAsiaShanghai,
Fallback: gen.ProcessFallback{Enable: true, Name: "myweb2"},
},
}
node, err := ergo.StartNode("cron@localhost", options)
if err != nil {
panic(err)
}
// ...
}
type Cron interface {
// AddJob adds a new job
AddJob(job gen.CronJob) error
// RemoveJob removes new job
RemoveJob(name gen.Atom) error
// EnableJob allows you to enable previously disabled job
EnableJob(name gen.Atom) error
// DisableJob disables job
DisableJob(name gen.Atom) error
// Info returns information about the jobs, spool of jobs for the next run
Info() gen.CronInfo
// JobInfo returns information for the given job
JobInfo(name gen.Atom) (gen.CronJobInfo, error)
// Schedule returns a list of jobs planned to be run for the given period
Schedule(since time.Time, duration time.Duration) []gen.CronSchedule
// JobSchedule returns a list of scheduled run times for the given job and period.
JobSchedule(job Atom, since time.Time, duration time.Duration) ([]time.Time, error)
}
type CronAction interface {
Do(job gen.Atom, node gen.Node, action_time time.Time) error
Info() string
}
* * * * *
| | | | | allowed format
| | | | day-of-week (1–7) (Monday to Sunday) * d d,d d-d dL d#d
| | | month (1–12) * d d,d d-d */d
| | day-of-month (1–31) * d d,d d-d */d d-d/d L
| hour (0–23) * d d,d d-d */d d-d/d
minute (0–59) * d d,d d-d */d d-d/d
type MyActor struct {
act.Actor
i int
}
func factoryMyActor() gen.ProcessBehavior {
return &MyActor{}
}
type ActorBehavior interface {
gen.ProcessBehavior
// main callbacks
Init(args ...any) error
HandleMessage(from gen.PID, message any) error
HandleCall(from gen.PID, ref gen.Ref, request any) (any, error)
Terminate(reason error)
// extended callbacks used if SplitHandle was enabled
HandleMessageName(name gen.Atom, from gen.PID, message any) error
HandleMessageAlias(alias gen.Alias, from gen.PID, message any) error
HandleCallName(name gen.Atom, from gen.PID, ref gen.Ref, request any) (any, error)
HandleCallAlias(alias gen.Alias, from gen.PID, ref gen.Ref, request any) (any, error)
// specialized callbacks
HandleInspect() map[string]string
HandleLog(message gen.MessageLog) error
HandleEvent(message gen.MessageEvent) error
}
func (a *MyActor) Init(args...) error {
// get the gen.Log interface using Log method of embedded gen.Process interface
a.Log().Info("starting process %s", a.PID())
// initialize value
a.i = 100
// sending message to itself with methods Send and PID of embedded gen.Process
a.Send(a.PID(), "hello")
return nil
}
func (a *MyActor) HandleMessage(from gen.PID, message any) error {
a.Log().Info("got message from %s: %s", from, message)
...
// handling message
...
return nil
}
func (a *MyActor) Terminate(reason error) {
a.Log().Info("%s terminated with reason: %s", a.PID(), reason)
}
func (a *MyActor) Init(args...) error {
a.SetSplitHandle(true)
...
return nil
}
func (a *MyActor) Init(args...) error {
a.SetTrapExit(true)
...
return nil
}
import ergo.services/ergo/net/edf
...
type MyData struct {
A int
B string
C MyBool
}
type MyBool bool
...
func init() {
...
// type MyBool must be registered first
edf.RegisterTypeOf(MyBool(false))
edf.RegisterTypeOf(MyData{})
...
}
type myStruct struct{
a int
b string
}
// implementation of edf.Marshaler interface
func (m myStruct) MarshalEDF(w io.Writer) error {
var encoded []byte
// do encode
w.Write(encoded)
return nil
}
// implementation of edf.Unmarshaler interface
func (m *myStruct) UnmarshalEDF(b []byte) error {
// do decode
return nil
}
type ActorPort struct {
act.Actor
}
func (ap *ActorPort) Init(args ...any) error {
var options meta.PortOptions
options.Cmd = "example-prog"
// create port
metaport, err := meta.CreatePort(options)
if err != nil {
ap.Log().Error("unable to create Port: %s", err)
return err
}
// spawn meta process
id, err := ap.SpawnMeta(metaport, gen.MetaOptions{})
if err != nil {
ap.Log().Error("unable to spawn port meta-process: %s", err)
return err
}
ap.Log().Info("started Port (iotxt) %s (meta-process: %s)", options.Cmd, id)
return nil
}
__ header _ ___ payload ___
| | | |
x x x L L L L . . . . . . . . .
...
var options meta.PortOptions
...
options.Binary.Enable = true
options.Binary.ReadChunk.Enable = true
options.Binary.ReadChunk.HeaderSize = 7
options.Binary.ReadChunk.HeaderLengthPosition = 3
options.Binary.ReadChunk.HeaderLengthSize = 4
options.Binary.ReadChunk.HeaderLengthIncludesHeader = true
This package implements the gen.Registrar
interface and serves as a client library for the central registrar, Saturn. In addition to the primary Service Discovery function, it automatically notifies all connected nodes about cluster configuration changes.
To create a client, use the Create
function from the saturn
package. The function requires:
The hostname where the central registrar is running (default port: 4499
, unless specified in saturn.Options
)
A token for connecting to Saturn
a set of options saturn.Options
Then, set this client in the gen.NetworkOption.Registrar
options
import (
"ergo.services/ergo"
"ergo.services/ergo/gen"
"ergo.services/registrar/saturn"
)
func main() {
var options gen.NodeOptions
...
host := "localhost"
token := "IwOBhgAEAGzPt"
options.Network.Registrar = saturn.Create(host, token, saturn.Options{})
...
node, err := ergo.StartNode("demo@localhost", options)
...
}
Using saturn.Options
, you can specify:
Cluster
- The cluster name for your node
Port
- The port number for the central Saturn registrar
KeepAlive
- The keep-alive parameter for the TCP connection with Saturn
InsecureSkipVerify
- Option to ignore TLS certificate verification
When the node starts, it will register with the Saturn central registrar in the specified cluster.
Additionally, this library registers a gen.Event
and generates messages based on events received from the central Saturn registrar within the specified cluster. This allows the node to stay informed of any updates or changes within the cluster, ensuring real-time event-driven communication and responsiveness to cluster configurations:
saturn.EventNodeJoined
- Triggered when another node is registered in the same cluster.
saturn.EventNodeLeft
- Triggered when a node disconnects from the central registrar
saturn.EventApplicationLoaded
- An application was loaded on a remote node. Use ResolveApplication
from the gen.Resolver
interface to get application details
saturn.EventApplicationStarted
- Triggered when an application starts on a remote node.
saturn.EventApplicationStopping
- Triggered when an application begins stopping on a remote node.
satrun.EventApplicationStopped
- Triggered when an application is stopped on a remote node.
saturn.EventApplicationUnloaded
- Triggered when an application is unloaded on a remote node
saturn.EventConfigUpdate
- The node's configuration was updated
To receive such messages, you need to subscribe to Saturn client events using the LinkEvent
or MonitorEvent
methods from the gen.Process
interface. You can obtain the name of the registered event using the Event
method from the gen.Registrar
interface. This allows your node to listen for important cluster events like node joins, application starts, configuration updates, and more, ensuring real-time updates and handling of cluster changes.
type myActor struct {
act.Actor
}
func (m *myActor) HandleMessage(from gen.PID, message any) error {
reg, e := a.Node().Network().Registrar()
if e != nil {
a.Log().Error("unable to get Registrar interface %s", e)
return nil
}
ev, e := reg.Event()
if e != nil {
a.Log().Error("Registrar has no registered Event: %s", e)
return nil
}
a.MonitorEvent(ev)
return nil
}
func (m *myActor) HandleEvent(event gen.MessageEvent) error {
m.Log().Info("got event message: %v", event)
return nil
}
Using the saturn.EventApplication*
events and the Remote Start Application feature, you can dynamically manage the functionality of your cluster. The saturn.EventConfigUpdate
events allow you to adjust the cluster configuration on the fly without restarting nodes, such as updating the cookie value for all nodes or refreshing the TLS certificate. Refer to the Saturn - Central Registrar section for more details.
You can also use the Config
and ConfigItem
methods from the gen.Registrar
interface to retrieve configuration parameters from the registrar.
To get information about available applications in the cluster, use the ResolveApplication
method from the gen.Resolver
interface, which returns a list of gen.ApplicationRoute
structures:
type ApplicationRoute struct {
Node Atom
Name Atom
Weight int
Mode ApplicationMode
State ApplicationState
}
Name
The name of the application
Node
The name of the node where the application is loaded or running
Weight
The weight assigned to the application in gen.ApplicationSpec
Mode
The application's startup mode (gen.ApplicationModeTemporary
, gen.ApplicationModePermanent
, gen.ApplicationModeTransient
)..
State
The current state of the application (gen.ApplicationStateLoaded
, gen.ApplicationStateRunning
, gen.ApplicationStateStopping
)
You can access the gen.Resolver
interface using the Resolver
method from the gen.Registrar
interface.
func factory_MyWeb() gen.ProcessBehavior {
return &MyWeb{}
}
type MyWeb struct {
act.Actor
}
// Init invoked on a start this process.
func (w *MyWeb) Init(args ...any) error {
// create an HTTP request multiplexer
mux := http.NewServeMux()
// create a root handler meta-process
root := meta.CreateWebHandler(meta.WebHandlerOptions{
// use process based on act.WebWorker
// and spawned with registered name "mywebworker"
Worker: "mywebworker",
})
// spawn this meta-process
rootid, err := w.SpawnMeta(root, gen.MetaOptions{})
if err != nil {
w.Log().Error("unable to spawn WebHandler meta-process: %s", err)
return err
}
// add our meta-process as a handler of HTTP-requests to the mux
// since it implements http.Handler interface
mux.Handle("/", root)
// you can also use your middleware function:
// mux.Handle("/", middleware(root))
w.Log().Info("started WebHandler to serve '/' (meta-process: %s)", rootid)
// create and spawn web server meta-process
// with the mux we created to handle HTTP-requests
//
// see below...
...
return nil
}
type MyWeb struct {
act.Actor
}
// Init invoked on a start this process.
func (w *MyWeb) Init(args ...any) error {
// create an HTTP request multiplexer
mux := http.NewServeMux()
// create and spawn your handler meta-processes
// and add them to the mux
//
// see above
...
// create and spawn web server meta-process
serverOptions := meta.WebServerOptions{
Port: 9090,
Host: "localhost",
// use node's certificate if it was enabled there
CertManager: w.Node().CertManager(),
Handler: mux,
}
webserver, err := meta.CreateWebServer(serverOptions)
if err != nil {
w.Log().Error("unable to create Web server meta-process: %s", err)
return err
}
webserverid, err := w.SpawnMeta(webserver, gen.MetaOptions{})
if err != nil {
// invoke Terminate to close listening socket
webserver.Terminate(err)
}
w.Log().Info("started Web server %s: use http[s]://%s:%d/",
webserverid,
serverOptions.Host,
serverOptions.Port)
return nil
}
type MyTCP struct {
act.Actor
}
func (t *MyTCP) Init(args ...any) error {
// create meta-process with TCP server
opt := meta.TCPServerOptions{
Port: 17171,
}
if metatcp, err := meta.CreateTCPServer(opt); err != nil {
t.Log().Error("unable to create tcp server meta-process: %s", err)
return err
}
// spawn meta-process
if _, err := t.SpawnMeta(metatcp, gen.MetaOptions{}); err != nil {
t.Log().Error("unable to spawn tcp server meta-process", err)
// invoke Terminate to close listening socket
metatcp.Terminate(err)
return err
}
return nil
}
type Client struct {
act.Actor
}
func (c *Client) Init(args ...any) error {
// create meta-process with tcp connection
opt := meta.TCPConnectionOptions{
Port: c.port,
}
connection, err := meta.CreateTCPConnection(opt)
if err != nil {
c.Log().Error("unable to create tcp connection: %s", err)
return err
}
// start meta-process to serve TCP-connection
id, err := c.SpawnMeta(connection, gen.MetaOptions{})
if err != nil {
c.Log().Error("unable to spawn tcp connection meta process: %s", err)
connection.Terminate(err)
return err
}
}
What is a Node in Ergo Framework?
A Node is the core of the service you create using Ergo Framework. This core includes:
Process Management Subsystem: handles the starting/stopping of processes, and the registration of process names/aliases.
Message Routing Subsystem: manages the routing of asynchronous messages and synchronous requests between processes.
Pub/Sub Subsystem: powers the , functionalities, enabling distributed event handling.
Network Stack: provides and , facilitating seamless communication between nodes.
Subsystem
To start a node, the first step is to define its name. The name consists of two parts: <name>@<hostname>
, where the hostname determines on which network interface the port for incoming connections will be opened.
The node's name must be unique on the host. This means that two nodes with the same name cannot be running on the same host.
Below is an example code for starting a node:
If the gen.NodeOptions.Applications
option includes applications when starting the node, they will be automatically loaded and started. However, if any application fails to start, the ergo.StartNode(...)
function will return an error, and the node will shut down. Upon a successful start, this function returns the gen.Node
interface.
You can also specify environment variables for the node using the Env
option in gen.NodeOptions
. All processes started on this node will inherit these variables. The gen.Node
interface provides the ability to manage environment variables through methods like EnvList()
, SetEnv(...)
, and Env(...)
. However, changes to environment variables will only affect newly started processes. Environment variable names are case-insensitive.
Additionally, the gen.Node
interface provides the following methods:
Starting/Stopping Processes: methods like Spawn(...)
, SpawnRegister(...)
allow you to start processes, while Kill(...)
and SendExit(...)
are used to stop them.
Retrieving Process Information: use the ProcessInfo(...)
method to get information about a process, and the MetaInfo(...)
method to retrieve information about meta-processes.
Managing the Node's Network Stack: access the gen.Network
interface through the Network()
method to manage the node's network operations.
Getting Node Uptime: the Uptime()
method provides the node's uptime in seconds.
General Node Information: use the Info()
method to retrieve general information about the node.
Sending Asynchronous Messages: the Send(...)
method allows you to send asynchronous messages to a process.
The full list of available methods for the gen.Node
interface can be found in the reference documentation.
The mechanism for starting and stopping processes is provided by the node's core. Each process is assigned a unique identifier, gen.PID
, which facilitates message routing.
The node also allows you to register a name associated with a process. You can register such a name at the time of process startup by using the SpawnRegister
method and specifying the desired name. Alternatively, you can use the RegisterName
method from the gen.Node
or gen.Process
interfaces to assign a name to an already running process.
A process can only have one associated name. If you want to change a process's name, you must first unregister the existing name using the UnregisterName
method from the gen.Node
or gen.Process
interfaces. Upon success, this method returns the gen.PID
of the process previously associated with that name.
A process can be terminated by the node by sending it an exit signal. This is done via the SendExit
method in the gen.Node
interface. If necessary, the node can also forcefully stop a process using the Kill
method provided by the gen.Node
interface.
One of the key responsibilities of a node is message routing between processes. This routing is transparent to both the sender and the receiver processes. If the recipient process is located on a remote node, the node will automatically attempt to establish a network connection to the remote node and deliver the message to the recipient. This is the provided by the Ergo Framework—you don't need to worry about how to send the message or how to encode it, as the node handles all of this automatically and transparently for you.
Message routing is not limited to the gen.PID
process identifier. A message can also be addressed using:
Local Process Name (gen.Atom
): messages can be sent to a local process by referencing its registered name.
gen.Alias
: this is a unique identifier that can be created by the process itself. It is often used for or as a process temporary identifier. You can read more about this feature in the section.
gen.ProcessID
: This is used for sending messages by the recipient process's name. The structure contains two fields—Name
and Node
. It is a convenient way to send a message to a process on a remote node when you do not know the gen.PID
or gen.Alias
of the remote process.
Through these flexible options, the Ergo Framework provides a robust and seamless message routing system, simplifying communication between processes whether they are local or remote.
The core of the node implements a publisher/subscriber mechanism, which serves as the foundation for process linking, monitoring, and connections with other nodes.
Monitoring Functionality: this allows any process to monitor other processes or nodes. In the event that a monitored process stops or the connection to a node is lost, the process that created the monitor will receive a gen.MessageDownPID
or gen.MessageDownNode
message, respectively.
Linking Functionality: similar to monitoring, linking differs in that when a linked process terminates or the connection to a node is lost, the process that created the link will receive an exit signal, causing it to stop as well.
Event System: the publisher/subscriber mechanism is also used in the events functionality. It allows any process to register its own event type and act as a producer of those events, while other processes can subscribe to those events.
Thanks to network transparency, the pub/sub subsystem works not only for local processes but also for remote ones.
You can read more about these features in the sections on and .
The process of sending a message to a remote node involves several steps:
Connection Establishment: if a connection to the remote node has not yet been created, the node on the host where the target node (owner of the recipient gen.PID
, gen.ProcessID
, or gen.Alias
) is running. The registrar returns the port number on which the target node accepts incoming connections. The connection to this node is then established. This connection remains active until one of the nodes explicitly closes it by calling Disconnect
from the gen.RemoteNode
interface.
Message Encoding: the message is encoded into the binary .
Data Compression: if compression was enabled for the sender process, the binary data is compressed.
Message Transmission: the message is sent over the network using the ENP protocol.
Message Decoding and Delivery: remote node automatically decodes the received message and ensures its delivery to the recipient process's mailbox.
For more detailed information on network interactions, refer to the section.
You can stop a node using the Stop()
or StopForce()
methods from the gen.Node
interface.
Stop
: all processes will be sent an exit signal with the reason gen.TerminateReasonShutdown
from the parent process, and the node will wait for all processes to terminate. Once all processes have stopped, the node's network stack and the node itself will be shut down.
StopForce
: in the case of a forced shutdown, all processes will be terminated using the Kill
method from the gen.Node
interface, without waiting for their graceful shutdown.
import (
"ergo.services/ergo"
"ergo.services/ergo/gen"
)
func main() {
name := "example@localhost"
// Start node. Returns gen.Node interface
node, err := ergo.StartNode(name, opts)
if err != nil {
panic(err)
}
node.Wait()
}
Meta-processes are designed to integrate synchronous objects into the asynchronous model of Ergo Framework. Although they share some similarities with regular processes, they have a different nature and operational characteristics:
Meta-Process Identifier: meta-process has a process identifier (of type gen.Alias
) and is associated with its parent process. This allows message routing and synchronous requests to be directed to the meta-process.
Sending Messages: meta-process can send asynchronous messages to other processes or meta-processes (including remote ones) using the Send
method in the gen.MetaProcess
interface. However, the sender will always appear as the parent process's gen.PID
.
Handling Messages: meta-processes can handle asynchronous messages from other processes or meta-processes via the HandleMessage
callback method of the gen.MetaBehavior
interface.
Handling Synchronous Requests: can handle synchronous requests from other processes (including remote ones) using the HandleCall
callback method of the gen.MetaBehavior
interface.
Spawning Other Meta-Processes: can spawn other meta-processes using the Spawn
method in the gen.MetaProcess
interface.
Linking and Monitoring: other processes (including remote ones) can create a link or monitor with the meta-process using the LinkAlias
and MonitorAlias
methods of the gen.Process
interface, referencing the meta-process’s gen.Alias
.
Despite these similarities with regular processes, meta-processes have distinct characteristics:
Concurrency: meta-process operates with two goroutines. The main goroutine starts when the meta-process is created and handles operations on the synchronous object. The termination of this goroutine leads to the termination of the meta-process. The auxiliary goroutine is launched to handle incoming messages from other processes or meta-processes, and it shuts down when there are no more messages in the meta-process’s mailbox. Therefore, access to the meta-process object’s data is concurrent, as it can be accessed by both goroutines simultaneously. This concurrency must be managed carefully when implementing your own meta-process.
Parent Process Dependency: a meta-process can only be started by a process (or another meta-process) using the SpawnMeta
method in the gen.Process
interface (or the Spawn
method in the gen.MetaProcess
interface).
No Own Environment Variables: meta-processes do not have their own environment variables. The Env
and EnvList
methods of the gen.MetaProcess
interface return the environment variables of the parent process.
Limitations: meta-processes cannot make synchronous requests, create links, or establish monitors themselves.
Termination with Parent: when the parent process terminates, the meta-process is also automatically terminated.
Meta-processes offer a way to integrate synchronous objects into the asynchronous system while maintaining compatibility with the process communication model. However, due to their concurrent nature and relationship with the parent process, they require careful handling in certain scenarios.
Ergo Framework provides several ready-to-use implementations of meta-processes:
meta.TCP
: allows you to launch a TCP server or create a TCP connection.
meta.UDP
: for launching a UDP server.
meta.Web
: for launching an HTTP server and creating an http.Handler
based on a meta-process.
WebSocket Meta-Process: for working with WebSockets, a WebSocket meta-process is available. Since its implementation has a dependency on github.com/gorilla/websocket
, it is provided as a separate package: ergo.services/meta/websocket
.
These meta-processes offer pre-built solutions for integrating common networking and web functionality into your application.
To start a meta-process, the gen.Process
interface provides the SpawnMeta
method:
SpawnMeta(behavior gen.MetaBehavior, options genMetaOptions) (gen.Alias, error)
If a meta-process needs to spawn other meta-processes, the gen.MetaProcess
interface implements the Spawn
method:
Spawn(behavior gen.MetaBehavior, options gen.MetaOptions) (gen.Alias, error)
In the gen.MetaOptions
options, you can configure:
MailboxSize
: size of the meta-process’s mailbox for incoming messages.
SendPriority
: priority for messages sent by the meta-process.
LogLevel
: the logging level for the meta-process.
Upon successful startup, these methods return the meta-process identifier, gen.Alias
. This identifier can be used to:
Send asynchronous messages via the SendAlias
method in the gen.Process
interface.
Make synchronous requests using the CallAlias
method in the gen.Process
interface.
Create links or monitors with the meta-process using the LinkAlias
or MonitorAlias
methods in the gen.Process
interface.
To stop a meta-process, you can send it an exit signal using the SendExitMeta
method of the gen.Process
interface. When the meta-process receives this signal, it triggers the callback method Terminate
from the gen.MetaBehavior
interface.
Additionally, a meta-process will be terminated in the following cases:
The parent process has terminated.
The main goroutine of the meta-process has finished (i.e., the Start
method of the gen.MetaBehavior
interface has completed its work).
The HandleMessage
(or HandleCall
) callback method returned an error while processing an asynchronous message (or a synchronous request).
A panic occurred in any method of the gen.MetaBehavior
interface.
To create a custom meta-process in Ergo Framework, you need to implement the gen.MetaBehavior
interface. This interface defines the behavior of the meta-process and includes the following key methods:
type MetaBehavior interface {
// callback method for main goroutine
Start(process MetaProcess) error
// callback methods for the auxiliary goroutine
HandleMessage(from PID, message any) error
HandleCall(from PID, ref Ref, request any) (any, error)
Terminate(reason error)
HandleInspect(from PID) map[string]string
}
Start
: this method is called when the meta-process starts. It is responsible for initializing the meta-process and running the main logic. The Start
method runs in the main goroutine of the meta-process, and when it finishes, the meta-process is terminated.
HandleMessage
: this callback is invoked to handle incoming asynchronous messages from other processes or meta-processes. It runs in the auxiliary goroutine that processes the meta-process’s mailbox.
HandleCall
: this method processes synchronous requests (calls) from other processes or meta-processes. It is also part of the auxiliary goroutine that processes incoming messages.
Terminate
: called when the meta-process receives an exit signal or when its parent process terminates. It is responsible for cleanup and finalizing the meta-process before it is removed.
Here is an example of implementing a custom meta-process in Ergo Framework - demonstrates how to create a meta-process that reads data from a network connection and sends it to its parent process:
createMyMeta(conn net.Conn) gen.MetaBehavior {
return &myMeta{
conn: conn,
}
}
type myMeta struct {
gen.MetaProcess // Embedding the gen.MetaProcess interface
conn net.Conn. // Network connection
}
func (mm *myMeta) Start(meta gen.MetaProcess) error {
// Assign the meta-process instance to the embedded interface
mm.MetaProcess = meta
// Ensure that the socket is closed upon exit
defer close(mm.conn)
// Read data from the socket in a loop
for {
buf := make([]byte, 1024)
n, err mm.conn.Read(buf)
if err != nil {
return err // Terminate the meta-process on error
}
// Send the data to the parent process
mm.Send(mm.Parent(), buf[:n])
}
return nil // meta-process terminated
}
In this example, we embed the gen.MetaProcess
interface within the myMeta
struct. This allows us to use the methods of gen.MetaProcess
(like Send
, Parent
, and Log
) inside the callback methods defined by gen.MetaBehavior.
func (mm *myMeta) HandleMessage(from gen.PID, message any) error {
// Log information about the parent process
mm.Log().Info("my parent process is %s", mm.Parent())
// Handle received messages as byte slice
if data, ok := message.([]byte); ok {
// Write the data back to the connection
mm.conn.Write(data)
return nil
}
// Log if the message is of an unknown type
mm.Log().Info("got unknown message %#v", message)
return nil
}
func (mm *myMeta) Terminate(reason error) {
// Close the connection when the meta-process is terminated
close(mm.conn)
}
The HandleMessage
method processes asynchronous messages sent to the meta-process. It checks the type of the message and takes appropriate actions, such as writing data to the connection. If the method returns an error, it causes the termination of the meta-process, triggering the Terminate
method to handle the cleanup.
The Terminate
method ensures that resources like the network connection are properly released when the meta-process ends. If the meta-process encounters an error in HandleMessage
or HandleCall
, or if any other termination condition occurs, the Terminate
method will be called to finalize the shutdown process.
List of available methods in the gen.MetaProcess
Interface:
type MetaProcess interface {
ID() Alias // Returns the ID (gen.Alias) of this meta-process
Parent() PID // Returns the PID of the parent process
Send(to any, message any) error // Sends an asynchronous message to another process or meta-process
Spawn(behavior MetaBehavior, options MetaOptions) (Alias, error) // Spawns a new meta-process
Log() Log // Provides the gen.Log interface for logging
}
In Ergo Framework, an Application is a component that allows you to group a set of actors (and/or supervisors) and manage them as a single entity. The Application is responsible for starting each actor within the group and handling dependencies on other Applications if specified in the configuration. Additionally, the Application monitors the running processes and, depending on the mode chosen during startup, may stop all group members and itself if one of the processes terminates for any reason.
To create an Application in Ergo Framework, the gen.ApplicationBehavior
interface is provided:
type ApplicationBehavior interface {
// Load invoked on loading application using method ApplicationLoad
// of gen.Node interface.
Load(node Node, args ...any) (ApplicationSpec, error)
// Start invoked once the application started
Start(mode ApplicationMode)
// Terminate invoked once the application stopped
Terminate(reason error)
}
All methods of the gen.ApplicationBehavior
interface must be implemented.
For example, if you want to combine an actor A1
and a supervisor S1
into a single application, the code for creating such an Application would look like this:
func createApp() gen.ApplicationBehavior {
return &app{}
}
type app struct{}
func (a *app) Load(node gen.Node, args ...any) (gen.ApplicationSpec, error) {
return gen.ApplicationSpec{
Name: "my_app",
Group: []gen.ApplicationMemberSpec{
{
Name: "a1", // start with registered name
Factory: factory_A1,
},
{
// omitted field Name. will be started with no name
Factory: factory_S1,
},
},
}, nil
}
func (a *app) Start(mode gen.ApplicationMode) {} // no need? keep it empty
func (a *app) Terminate(reason error) {}
...
myApp := createApp()
appName, err := node.ApplicationLoad(myApp)
Since an Application, by its nature, is not a process, its startup is divided into two stages: loading and starting.
To load the application, the gen.Node
interface provides the method ApplicationLoad
. You pass your implementation of the ApplicationBehavior
interface to this method. The node will then call the Load
callback method of the gen.ApplicationBehavior
interface, which must return the application's specification as a gen.ApplicationSpec
:
type ApplicationSpec struct {
Name Atom
Description string
Version Version
Depends ApplicationDepends
Env map[Env]any
Group []ApplicationMemberSpec
Mode ApplicationMode
Weight int
LogLevel LogLevel
}
In the gen.ApplicationSpec
structure, the Name
field defines the name under which your application will be registered. This name must be unique within the node. However, the registry for application names and the registry for process names are separate. Although it is possible to run a process with a name identical to the application name, this could lead to confusion, so care should be taken when choosing names for both processes and applications. The Description
field is optional.
The Group
field contains a group of specifications for launching processes, defined by gen.ApplicationMemberSpec
. If the Name
field in gen.ApplicationMemberSpec
is not empty, the process will be launched using the SpawnRegister
method of the gen.Node
interface. The processes in the group are launched sequentially, following the order specified in the Group
field.
The Env
field is used to set environment variables for all processes started within the application. Environment variables are inherited in the following order: Node > Application > Parent > Process.
If your application depends on other applications or network services, you can specify these dependencies using the Depends
field. These dependencies are taken into account when starting the application.
The Mode
field defines the startup mode for the application and its behavior if one of the processes in the group stops. This value serves as the default mode when the application is started using the ApplicationStart(...)
method of the gen.Node
interface. However, the startup mode can be overridden using the methods ApplicationStartTransient
, ApplicationStartTemporary
, or ApplicationStartPermanent
of the gen.Node
interface.
Finally, the LogLevel
field allows you to set the logging level for the entire group of processes. If this option is not specified, the node's logging level will be used. However, individual processes can override this setting by specifying it in the ApplicationMemberSpec.Options
during the process startup specification.
Ergo Framework provides three startup modes for applications, each determining how the application behaves when one of its processes stops:
gen.ApplicationModeTemporary
:
This is the default startup mode if no specific mode is indicated in the application's specification (gen.ApplicationSpec
). In this mode, the termination of any individual process (for any reason) does not lead to the shutdown of the entire application. The application will only stop when all the processes in the application group, as defined in the Group
field of the gen.ApplicationSpec
, have stopped. The application will always stop with the reason gen.TerminationReasonNormal
.
gen.ApplicationModeTransient
:
The application stops only when one of its processes terminates unexpectedly (with a reason other than gen.TerminationReasonNormal
or gen.TerminationReasonShutdown
). In such cases, all running processes will receive an exit signal with the reason gen.TerminationReasonShutdown
. Once all processes have stopped, the application will terminate with the reason that caused the initial process failure. If all processes complete normally, the application will stop with the reason gen.TerminationReasonNormal
.
gen.ApplicationModePermanent
:
In this mode, the termination of any process in the application, for any reason, will trigger the shutdown of the entire application. All running processes will receive an exit signal with the reason gen.TerminationReasonShutdown
. Once all processes have stopped, the application will terminate with the reason that caused the shutdown. If all processes complete without errors, the application will stop with the reason gen.TerminationReasonNormal
.
Processes that are part of the application group can launch child processes. These child processes will inherit the application's attributes, but their termination does not affect the application's logic; their role is informational. You can determine whether a process belongs to an application using the Info
method of the gen.Process
interface or the ProcessInfo
method of the gen.Node
interface. Both methods return a gen.ProcessInfo
structure, which contains the Application
field indicating the process's application affiliation.
In Ergo Framework, the gen.Node
interface provides several methods to start an application:
ApplicationStart
: Starts the loaded application in the mode specified by gen.ApplicationSpec.Mode
.
ApplicationStartTemporary
: Starts the application in gen.ApplicationModeTemporary
, regardless of the mode specified in the application's specification.
ApplicationStartTransient
: Starts the application in gen.ApplicationModeTransient
, regardless of the mode specified in the application's specification.
ApplicationStartPermanent
: Starts the application in gen.ApplicationModePermanent
, regardless of the mode specified in the application's specification.
Before starting the application's processes, the node checks the dependencies specified in gen.ApplicationSpec.Depends
. If the application depends on other applications, the node will attempt to start them first. Therefore, all dependent applications must either already be loaded or running before starting the application. If the dependencies are not met, the method will return the error gen.ErrApplicationDepends
.
Once all dependencies are satisfied, the node starts launching the processes according to the order specified in gen.ApplicationSpec.Group
. If any process fails to start, all previously started processes will be forcibly stopped using the Kill
method of the gen.Node
interface, and the startup process will return an error.
After all processes are successfully started, the Start
callback method of the gen.ApplicationBehavior
interface is called, and the application transitions to the running state.
You can retrieve information about the application using the ApplicationInfo
method of the gen.Node
interface. This method returns a gen.ApplicationInfo
structure that contains summary information about the application and its current state.
To stop an application, the gen.Node
interface provides the ApplicationStop
method. When this method is called, the application sends an exit signal to all of its processes with the reason gen.TerminateReasonShutdown
and waits for them to stop completely.
If the processes do not stop within 5 seconds, a timeout error will be returned. If the ApplicationStop
method is called again while the application is still waiting for its processes to stop, it will return the error gen.ErrApplicationStopping
.
For forcefully stopping the application's processes, the ApplicationStopForce
method is available in the gen.Node
interface. In this case, all of the application's processes will be forcibly terminated using the Kill
method of the gen.Node
interface.
Additionally, the gen.Node
interface provides the ApplicationStopWithTimeout
method, which allows you to specify a custom timeout period for the application to stop.
Once all of the application's processes have stopped, the node will call the Stop
method of the gen.ApplicationBehavior
interface, completing the shutdown process.
This package implements functionality for working with WebSocket connections through two meta-processes: WebSocket Handler (for handling HTTP requests to create WebSocket connections) and WebSocket Connection (for managing the established WebSocket connection). This implementation depends on the external library https://github.com/gorilla/websocket, which is why it is placed in a separate package "ergo.services/meta/websocket"
. You can find the source code in the additional meta-process library repository at https://github.com/ergo-services/meta.
To handle incoming WebSocket connections, use the CreateHandler
function. This function returns an object implementing gen.MetaProcess
and http.Handler
, needed for launching the meta-process and processing HTTP requests. It accepts websocket.HandlerOptions
as arguments:
ProcessPool
: Specifies a list of process names to handle WebSocket messages.
HandshakeTimeout
: Limits the time for the upgrade process.
EnableCompression
: Enables compression for WebSocket connections.
CheckOrigin
: Allows setting a function to verify the request's origin.
After creation, start the meta-process with SpawnMeta
. Each connection launches a new WebSocket Connection meta-process for handling.
This package implements the functionality of a meta-process for handling a WebSocket connection. It also allows initiating a WebSocket connection. To initiate a connection, use the CreateConnection
function, which takes websocket.ConnectionOptions
as an argument:
type ConnectionOptions struct {
Process gen.Atom
URL url.URL
HandshakeTimeout time.Duration
EnableCompression bool
}
This structure is similar to websocket.HandlerOptions
but includes an additional field, URL, where the WebSocket server address must be specified. For example:
opt := websocket.ClientOptions{
URL: url.URL{Scheme: "ws", Host: "localhost:8080", Path: "/ws"},
}
During the execution of the CreateConnection
function, it attempts to establish a WebSocket connection with the specified server.
After creating the meta-process, you must launch it using the SpawnMeta
method from the gen.Process
interface. If an error occurs while launching the meta-process, you need to call the Terminate
method from the gen.MetaBehavior
interface to close the WebSocket connection created by CreateConnection
:
func(a *MyActor) HandleMessage(from gen.PID, message any) error {
// ...
opt := websocket.ConnectionOptions{
URL: url.URL{Scheme: "ws", Host: "localhost:8080", Path: "/ws"},
}
wsconn, err := websocket.CreateConnection(opt)
if err != nil {
a.Log().Error("unable to connect to %s: %s", opt.URL.String(), err)
return nil
}
id, err := a.SpawnMeta(wsconn, gen.MetaOptions{})
if err != nil {
// unable to spawn meta-process. need to close the connection
wsconn.Terminate(err)
a.Log().Error("unable to spawn meta-process: %s", err)
return nil
}
a.Log().Info("spawned meta-process %s to handle connection with %s", id, opt.URL.String())
// ...
return nil
}
When working with WebSocket connections, three types of messages are used:
websocket.MessageConnect
- Sent to the process when the meta-process handling the WebSocket connection is started:
type MessageConnect struct {
ID gen.Alias
RemoteAddr net.Addr
LocalAddr net.Addr
}
The ID
field in websocket.MessageConnect
contains the identifier of the meta-process handling the WebSocket connection
websocket.MessageDisconnect
- is sent to the process when a WebSocket connection is disconnected:
type MessageDisconnect struct {
ID gen.Alias
}
The ID
field in websocket.MessageDisconnect
contains the identifier of the meta-process that handled the WebSocket connection. After the connection is terminated, the meta-process finishes its work and sends this message during its termination phase, signaling that the WebSocket connection has been closed
websocket.Message
- is used for both receiving and sending WebSocket messages:
type Message struct {
ID gen.Alias
Type MessageType
Body []byte
}
ID
- identifier of the meta-process handling the WebSocket connection.
Body
- the message's body.
Type
- type of WebSocket message, according to the WebSocket specification. Several message types are available:
const (
MessageTypeText MessageType = 1
MessageTypeBinary MessageType = 2
MessageTypeClose MessageType = 8
MessageTypePing MessageType = 9
MessageTypePong MessageType = 10
)
If the meta-process fails to send a message to the process, the WebSocket connection is terminated, and the meta-process itself also terminates.
To send a message over a WebSocket connection, use the Send
(or SendAlias
) method from the gen.Process
interface. The recipient should be the gen.Alias
of the meta-process handling the WebSocket connection.
The message must be of type websocket.Message
. If the Type
field is not explicitly set, the default is websocket.MessageTypeText
. The websocket.Message.ID
field is ignored when sending (it's used only for incoming messages).
For a complete implementation of a WebSocket server, see the project in the repository https://github.com/ergo-services/examples, the websocket
project:
The Ergo Framework's network stack implements network transparency using three components:
Registrar
- Manages node registration, including applications and additional parameters to help nodes find each other and establish connections. See the Service Discovering section for more details.
EDF
(Ergo Data Format) - A data format for network communication that automatically encodes and decodes messages when sent to remote nodes.
ENP
(Ergo Network Protocol) - A protocol for network interaction, facilitating asynchronous message exchange, synchronous requests, and remote process/application initiation between nodes.
Network stack parameters can be specified at node startup in gen.NodeOptions.Network
. This field, of type gen.NetworkOptions
, allows you to configure the following:
Mode
: default is gen.NetworkModeEnabled
. Use gen.NetworkModeHidden
to disable incoming connections, while still allowing outgoing ones. Use gen.NetworkModeDisabled
to completely disable networking.
Cookie
: sets a secret password for access control, used for both incoming and outgoing connections.
MaxMessageSize
: defines the maximum size of network messages.
Flags
: specify features available to remote nodes, like process or application launching.
Registrar
: by default, the node uses the built-in implementation (ergo.services/ergo/net/registrar
). For central registrar Saturn or for Erlang clusters, use respective libraries (Saturn or EPMD).
Handshake
: connection setup starts with remote node authentication (cookie verification) and protocol parameter exchange. Default uses the built-in implementation (ergo.services/ergo/net/handshake
).
Proto
: default protocol is Ergo Framework's ENP (ergo.services/ergo/net/proto
), but other protocols like the Erlang stack's DIST can be used.
Acceptors
: defines a set of acceptors for the node ([]gen.AcceptorOptions
). Each acceptor can configure port, TLS encryption, message size limits, and a custom cookie different from gen.NetworkOptions.Cookie
. You can also configure different Registrar/Handshake/Proto
sets for each acceptor, enabling simultaneous use of multiple network stacks, such as Ergo Framework and Erlang.
To connect to a remote node, simply send a message to a process running on that node. The node will automatically attempt to establish a network connection before sending the message.
You can also establish a connection explicitly by calling the GetNode
method from the gen.Network
interface. For example:
func main() {
...
node, err := ergo.StartNode("node1@localhost", gen.NodeOptions{})
...
remote, err := node.Network().GetNode("node2@localhost")
...
}
The GetNode
method returns the gen.RemoteNode
interface, which provides information about the remote node. Additionally, it allows you to launch a process or application on that remote node:
type RemoteNode interface {
Name() Atom
Uptime() int64
ConnectionUptime() int64
Version() Version
Info() RemoteNodeInfo
Spawn(name Atom, options ProcessOptions, args ...any) (PID, error)
SpawnRegister(register Atom, name Atom, options ProcessOptions, args ...any) (PID, error)
// ApplicationStart starts application on the remote node.
// Starting mode is according to the defined in the gen.ApplicationSpec.Mode
ApplicationStart(name Atom, options ApplicationOptions) error
// ApplicationStartTemporary starts application on the remote node in temporary mode
// overriding the value of gen.ApplicationSpec.Mode
ApplicationStartTemporary(name Atom, options ApplicationOptions) error
// ApplicationStartTransient starts application on the remote node in transient mode
// overriding the value of gen.ApplicationSpec.Mode
ApplicationStartTransient(name Atom, options ApplicationOptions) error
// ApplicationStartPermanent starts application on the remote node in permanent mode
// overriding the value of gen.ApplicationSpec.Mode
ApplicationStartPermanent(name Atom, options ApplicationOptions) error
Creation() int64
Disconnect()
}
For more details about remote process and application spawning, refer to the Remote Spawn Process and Remote Start Application sections.
You can also use the Node
method from the gen.Network
interface. It returns a gen.RemoteNode
if a connection to the remote node exists; otherwise, it returns gen.ErrNoConnection
.
To establish a connection with a remote node using specific parameters, use the GetNodeWithRoute
method from the gen.Network
interface. This method allows for more customized network connection settings.
This section explains the internal mechanisms by which a node interacts with the network stack. Understanding these can help you implement a custom network stack or components, such as a modified version of gen.NetworkHandshake
for customized authentication.
For incoming connections, the node starts acceptors with specified parameters. For outgoing connections, it independently creates TCP connections using parameters from the registrar (see Service Discovering). After establishing the TCP connection, the node leverages several interfaces to operate the network stack:
At the first stage, the node works with the gen.NetworkHandshake
interface. If a TCP connection was created by an acceptor, the Accept
method is called, initiating the remote node's authentication and the exchange of parameters for the network protocol. If the TCP connection was initiated by the node itself, the Start
method is used, which handles the authentication process and the exchange of network protocol parameters.
The Join
method is used to combine multiple TCP connections. In the ENP protocol, by default, 3 TCP connections are created and merged into one virtual connection between nodes, which improves network protocol performance. You can set the size of the TCP connection pool using the PoolSize
parameter in handshake.Options
(ergo.services/ergo/net/handshake
). The Erlang network stack's DIST protocol does not support this feature.
type NetworkHandshake interface {
NetworkFlags() NetworkFlags
// Start initiates handshake process.
Start(NodeHandshake, net.Conn, HandshakeOptions) (HandshakeResult, error)
// Join attempts to join new TCP-connection to the existing connection
// to remote node
Join(NodeHandshake, net.Conn, id string, HandshakeOptions) ([]byte, error)
// Accept initiate handshake process for the connection created by remote node.
Accept(NodeHandshake, net.Conn, HandshakeOptions) (HandshakeResult, error)
// Version
Version() Version
}
After successfully completing the handshake procedure, the node transfers control of the TCP connection to the network protocol using the gen.NetworkProto
interface. This occurs in two stages: first, the node calls the NewConnection
method, which returns the gen.Connection
interface for the created connection. The node then registers this connection in its internal routing mechanism with the remote node's name. If a connection with the same name already exists, the node automatically closes the TCP connection, as only one connection can exist between two nodes.
Upon successful registration, the node calls the Serve
method. The completion of this method indicates the termination of the connection with the remote node, effectively closing the connection.
type NetworkProto interface {
// NewConnection
NewConnection(core Core, result HandshakeResult, log Log) (Connection, error)
// Serve connection. Argument dial is the closure to create TCP connection with invoking
// NetworkHandshake.Join inside to shortcut the handshake process
Serve(conn Connection, dial NetworkDial) error
// Version
Version() Version
}
The gen.Connection
interface is used by the node for routing messages and requests to the remote node with which a connection has been established. If you are developing your own protocol, you will need to implement all the methods of this interface to handle communication between nodes.
type Connection interface {
Node() RemoteNode
// Methods for sending async message to the remote process
SendPID(from PID, to PID, options MessageOptions, message any) error
SendProcessID(from PID, to ProcessID, options MessageOptions, message any) error
SendAlias(from PID, to Alias, options MessageOptions, message any) error
SendEvent(from PID, options MessageOptions, message MessageEvent) error
SendExit(from PID, to PID, reason error) error
SendResponse(from PID, to PID, ref Ref, options MessageOptions, response any) error
// target terminated
SendTerminatePID(target PID, reason error) error
SendTerminateProcessID(target ProcessID, reason error) error
SendTerminateAlias(target Alias, reason error) error
SendTerminateEvent(target Event, reason error) error
// Methods for sending sync request to the remote process
CallPID(ref Ref, from PID, to PID, options MessageOptions, message any) error
CallProcessID(ref Ref, from PID, to ProcessID, options MessageOptions, message any) error
CallAlias(ref Ref, from PID, to Alias, options MessageOptions, message any) error
// Links
LinkPID(pid PID, target PID) error
UnlinkPID(pid PID, target PID) error
LinkProcessID(pid PID, target ProcessID) error
UnlinkProcessID(pid PID, target ProcessID) error
LinkAlias(pid PID, target Alias) error
UnlinkAlias(pid PID, target Alias) error
LinkEvent(pid PID, target Event) ([]MessageEvent, error)
UnlinkEvent(pid PID, targer Event) error
// Monitors
MonitorPID(pid PID, target PID) error
DemonitorPID(pid PID, target PID) error
MonitorProcessID(pid PID, target ProcessID) error
DemonitorProcessID(pid PID, target ProcessID) error
MonitorAlias(pid PID, target Alias) error
DemonitorAlias(pid PID, target Alias) error
MonitorEvent(pid PID, target Event) ([]MessageEvent, error)
DemonitorEvent(pid PID, targer Event) error
RemoteSpawn(name Atom, options ProcessOptionsExtra) (PID, error)
Join(c net.Conn, id string, dial NetworkDial, tail []byte) error
Terminate(reason error)
}
A supervisor in Ergo Framework is a specialized actor responsible for managing child processes. Its main tasks include starting child processes, monitoring their life cycles, and restarting them if necessary according to the chosen .
act.Supervisor
implements the low-level gen.ProcessBehavior
interface. Similar to act.Actor
, it has the embedded gen.Process
interface. This means that an object based on act.Supervisor
has access to all the methods provided by the gen.Process
interface.
To create a supervisor actor, you simply embed act.Supervisor
into your object. Additionally, you need to create a factory function for the supervisor. Here is an example:
To work with your object, act.Supervisor
uses the act.SupervisorBehavior
interface. This interface defines a set of callback methods for initialization, handling asynchronous messages, and handling synchronous calls:
The Init
method is mandatory for implementation, while the other methods are optional. This method returns the supervisor specification act.SupervisorSpec
, which includes the following options:
Type: The type of supervisor (act.SupervisorType
).
Children: The list of child process specifications ([]act.SupervisorChildSpec
). This parameter also determines the order of child process startups.
Restart: Defines the restart strategy for child processes.
EnableHandleChild: Enables the invocation of the callback methods HandleChildStart
and HandleChildStop
during the startup and shutdown of child processes.
DisableAutoShutdown: Prevents the supervisor from stopping if all child processes terminate non-faultily.
Here is an example implementation of the act.SupervisorBehavior
interface for the supervisor MySupervisor
with one child process MyActor
:
The type of supervisor in the SupervisorSpec
defines its behavior. In Ergo Framework, several supervisor types are implemented:
This is the default supervisor type. If the type is not specified in the SupervisorSpec
, this one will be used. Upon startup, the supervisor sequentially starts child processes as specified in act.SupervisorSpec.Children
. If an error occurs while starting the child processes, the supervisor process will terminate.
Each child process is launched with the associated name defined in the Name
option of the child process specification act.SupervisorChildSpec
. This means that only one process can be started for each child process specification. If this field is empty, the supervisor will use the Spawn
method from the gen.Process
interface. Otherwise, the SpawnRegister
method will be used.
Stopping a child process triggers the restart strategy for that process. Depending on the restart strategy defined by act.SupervisorSpec.Restart.Strategy
, the process will either be restarted or stopped. The does not affect other processes.
The child process is restarted with the same set of arguments that were used in its previous startup.
The KeepOrder
option in act.SupervisorSpec.Restart
is ignored for this supervisor type. When a new child process specification is added via the AddChild
method (provided by act.Supervisor
), the supervisor automatically starts the new child process. When a child specification is disabled (using method) DisableChild
, the supervisor stops the corresponding child process with the reason gen.TerminateReasonShutdown
. The supervisor will automatically terminate if no child processes remain (with the reasons gen.TerminateReasonNormal
or gen.TerminateReasonShutdown
). You can enable the DisableAutoShutdown
option in act.SupervisorSpec
to prevent this.
When starting, this supervisor sequentially launches child processes according to the list of child process specifications. If an error occurs during the startup of child processes, the supervisor process terminates.
A child process is started with the associated name defined in the Name
option of act.SupervisorChildSpec
.
Stopping a child process in this supervisor triggers the for all child processes according to act.SupervisorSpec.Restart.Strategy
.
If the KeepOrder
option is enabled, child processes are stopped in reverse order. Without KeepOrder
, they are stopped simultaneously. After stopping, the supervisor restarts them in the specified order.
Each child process is restarted with the same arguments as in the previous run.
When a new child process specification is added (using AddChild
), the supervisor automatically starts the child process. When disabling (DisableChild
), the supervisor stops the corresponding child process with the reason gen.TerminateReasonShutdown
.
The DisableAutoShutdown
option allows the supervisor to continue running even if all child processes have stopped (terminated with gen.TerminateReasonNormal
or gen.TerminateReasonShutdown
).
When starting, this supervisor sequentially launches child processes according to the list of child process specifications. If an error occurs during the startup of child processes, the supervisor process terminates.
A child process is started with the associated name defined in the Name
option of act.SupervisorChildSpec
.
When a child process terminates, the is activated. All child processes starting from the last one (according to the order in act.SupervisorSpec.Children
) to the process that triggered the restart are stopped.
With the KeepOrder
option enabled, child processes are stopped sequentially in reverse order. If the option is disabled, child processes stop simultaneously.
After all child processes are stopped, the supervisor restarts them according to the order specified in act.SupervisorSpec.Children
.
Each child process is restarted with the same set of arguments used in the previous startup.
Each child process is launched with the associated name defined in the Name
option of the child process specification act.SupervisorChildSpec
.
This type of supervisor is a simplified version of act.SupervisorTypeOneForOne
. It does not start child processes on startup. Instead, child processes are started using the StartChild
method.
As in act.SupervisorTypeOneForOne
, the termination of a child process triggers the for that process. Depending on the strategy defined in act.SupervisorSpec.Restart.Strategy
, the process will either be restarted or stopped. The restart strategy does not affect other processes.
Child processes in this supervisor are launched without an associated name, and multiple processes can be started from one child process specification.
The KeepOrder
option in act.SupervisorSpec.Restart
is ignored for this supervisor type.
The DisableAutoShutdown
option is also ignored, and stopping all child processes does not result in the termination of the supervisor process.
This specification describes the parameters for starting a child process. This is done using act.SupervisorChildSpec
:
Name: The specification's name, used as the associated name for the process (except for act.SupervisorTypeSimpleOneForOne
).
Factory, Options, Args: Parameters needed to launch a new process.
Significant: Determines the importance of the child process. The termination of a significant process may lead to the supervisor and all child processes stopping, depending on the restart strategy:
act.SupervisorStrategyTemporary
: Termination of a significant child process leads to the termination of the supervisor and all children.
act.SupervisorStrategyTransient
: Non-crash termination (e.g., gen.TerminateReasonNormal
or gen.TerminateReasonShutdown
) of a significant child process leads to the termination of the supervisor and all children, while a crash triggers the restart strategy.
act.SupervisorStrategyPermanent
: The Significant
field is ignored, and termination always triggers the restart strategy.
Additionally, the Significant
field is ignored in supervisors of types act.SupervisorTypeOneForOne
and act.SupervisorSimpleOneForOne
.
The Restart
option in act.SupervisorSpec
is of type act.SupervisorRestart
and defines parameters for the restart strategy:
Strategy: Specifies the restart strategy type:
act.SupervisorStrategyTransient
: A crash triggers a restart. If the process terminates normally (e.g., gen.TerminateReasonNormal
or gen.TerminateReasonShutdown
), the restart strategy is not activated. This is the default strategy.
act.SupervisorStrategyTemporary
: No restart occurs, even in the event of a crash.
act.SupervisorStrategyPermanent
: Any termination triggers a restart, ignoring DisableAutoShutdown
.
Intensity and Period: Define the maximum number of restart activations allowed within a given period (in seconds). If this limit is exceeded, the supervisor terminates all child processes and itself, with the reason act.ErrSupervisorRestartsExceeded
.
KeepOrder: Specifies the order in which child processes are stopped when the restart strategy is activated. If this option is enabled, processes will be stopped sequentially in reverse order. By default, this option is disabled, meaning that all child processes are stopped simultaneously. Processes are restarted only after all child processes have fully stopped (either all processes for act.SupervisorTypeAllForOne
or part of the processes for act.SupervisorTypeRestForOne
).
act.Supervisor
StartChild(name gen.Atom, args...)
: Starts a child process by its specification name. If arguments (args
) are provided, they are updated in the specification for future restarts.
AddChild(child act.SupervisorChildSpec)
: Adds a child specification and automatically starts the process.
EnableChild(name gen.Atom)
: Enables and starts a previously disabled child process.
DisableChild(name gen.Atom)
: Disables the specification and stops the running process.
Children()
: Returns a list of []act.SupervisorChild
with data on specifications and running processes, sorted according to the child process specification list.
During initialization, the supervisor defines the initial set of child processes, the supervisor type, and the restart strategy for the child processes. act.Supervisor
provides several methods for managing child processes dynamically and obtaining information about running processes:
In Ergo Framework, a flexible logging system is implemented that allows multiple loggers to exist within a node. These loggers can be configured to receive specific logging levels.
For logging purposes, the gen.Node
and gen.Process
interfaces provide the Log
method. This method returns the gen.Log
interface, which is used for logging messages within the framework.
Using the gen.Log
interface, you can manage the logging level not only for the node but also for each individual process (including meta-processes). There are six standard logging levels that can be set using the SetLevel
method:
gen.LogLevelDebug
gen.LogLevelInfo
gen.LogLevelWarning
gen.LogLevelError
gen.LogLevelPanic
gen.LogLevelDisabled
Additionally, there are specialized logging levels:
gen.LogLevelDefault
: This is the default logging level. When a node starts with this level, it defaults to gen.LogLevelInfo
. For applications or processes, the logging level inherits from the node. Meta-processes inherit the logging level of their parent process.
gen.LogLevelTrace
: This level is used exclusively for deep debugging. To avoid accidental activation, this level cannot be set through the SetLevel
method of the gen.Log
interface. Instead, it can only be enabled during the startup of a node or process through gen.NodeOptions.Log.Level
or gen.ProcessOptions.LogLevel
.
By default, the logging level for the node is set to gen.LogLevelInfo
. Processes inherit the node's logging level at startup unless explicitly set in gen.ProcessOptions
.
The SetLogger
method allows you to restrict logging to a specified logger. If no logger is specified, the log message will be delivered to all registered loggers. You can retrieve the list of registered loggers using the Loggers
method of the gen.Node
interface.
The gen.LogLevelDisabled
level can be used to temporarily disable logging, either for the entire node or a specific process.
Furthermore, the gen.Node
interface provides two methods for process-specific logging control:
ProcessLogLevel(pid gen.PID)
: Retrieves the current logging level of a specific process.
SetProcessLogLevel(pid gen.PID, level gen.LogLevel)
: Sets the desired logging level for a specific process.
By default, a node in Ergo Framework uses a standard logger, and its parameters can be configured during node startup via gen.NodeOptions.Log.DefaultLogger
. You have the option to disable the default logger and use a process-based logger or integrate one (or more) loggers from an additional logging library.
If you wish to create your own logger, you simply need to implement the gen.LoggerBehavior
interface. This interface defines the behavior of a logger, allowing you to customize the logging system according to your application's requirements. By implementing this interface, you can control how log messages are processed, formatted, and routed, providing flexibility beyond the default logging system.
To add a new logger, the gen.Node
interface provides two methods:
LoggerAdd(name string, logger gen.LoggerBehavior, filter ...gen.LogLevel)
: This method allows you to add a logger (an object implementing the gen.LoggerBehavior
interface) to the node's logging system.
LoggerAddPID(pid gen.PID, name string, filter ...gen.LogLevel)
: This method adds a process as a logger. In this case, the node sends log messages to the specified process. The act.Actor
class provides a callback method HandleLog
for handling such log messages.
Both methods accept a filter
argument, which allows you to specify a set of log levels that the logger should handle. For example, if you want your logger to only receive messages at the gen.LogLevelPanic
and gen.LogLevelInfo
levels, you can list them in the filter:
If the filter
argument is not specified, the default set of log levels, gen.DefaultLogFilter
, will be used:
The logging methods of the gen.LoggerBehavior
interface will only be invoked for messages that match the specified logging levels.
Reusing a logger name is not allowed. If a logger with the same name already exists, the LoggerAdd
or LoggerAddPID
function will return the error gen.ErrTaken
. To resolve this, you must first remove the existing logger from the system using the appropriate method: LoggerDelete
or LoggerDeletePID
from the gen.Node
interface.
When a logger is removed from the system, the Terminate
callback method of the gen.LoggerBehavior
interface is called.
When starting a node, you can specify logging parameters using the gen.NodeOptions.Log.DefaultLogger
option. These parameters will be applied to the default logger, which outputs all log messages in text format to a specified output (by default, os.Stdout
). With these options, you can configure:
Output
: The interface used for log message output. To direct log messages to both standard output and a file simultaneously, you can use the io.MultiWriter
function from Golang's standard library:
TimeFormat
: Defines the format for the timestamp of log messages. You can use any existing format, such as time.DateTime
(see ), or define your own custom format. By default, the timestamp is output in nanoseconds.
IncludeBehavior
: Adds the behavior of the process or meta-process to the log message.
IncludeName
: Adds the registered name of the process to the log message, if it exists.
Filter
: Specifies the logging levels that the default logger will handle. Leave this field empty to use the default set of levels (gen.DefaultLogFilter
).
Disable
: Disables the default logger.
Here is an example of how the standard logger's output might look with these options applied:
The first field is the timestamp of the log message, the second field is the log level, and the third field is the source of the message. The sources can come from various interfaces:
gen.Node.Log()
: Uses the node's name in the form of a CRC32 hash. Example: 9F35C982
gen.Process.Log()
: The string representation of the process identifier gen.PID
. Example: <9F35C982.0.1013>
gen.MetaProcess.Log()
: The string representation of the meta-process identifier gen.Alias
. Example: Alias#<9F35C982.123663.24065.0>
gen.Connection.Log()
: String representations of the local and remote nodes. Example: 9F35C982-90A29F11
If you want to manage log message streams using the familiar actor model, any process in a running node can serve as a logger. In the act.Actor
implementation, received log messages are passed as the message
argument to the HandleLog
callback method.
To add your actor to the node as a logger, use the LoggerAddPID
method from the gen.Node
interface:
Note that the LoggerAddPID
method of the gen.Node
interface is not available to a process during its initialization state, as the node has not yet recognized the process (see the Process section). Therefore, an actor cannot add itself as a logger within the Init
callback method of the act.ActorBehavior
interface.
When a logger process terminates, it will be automatically removed from the node's logging system.
To extend the capabilities of the Ergo Framework, two additional loggers have been implemented:
: Enables colored highlighting for log messages in the console.
: Allows logging to a file with support for log file rotation at specified intervals.
You can disable the default logger and add both of these loggers, enabling colored output in the console while also saving logs to a file with rotation. You can find an example of using them in the demo
project .
type MySupervisor struct {
act.Supervisor
}
func factoryMySupervisor() gen.ProcessBehavior {
return &MySupervisor{}
}
type SupervisorBehavior interface {
gen.ProcessBehavior
// Init invoked on a spawn Supervisor process.
// This is a mandatory callback for the implementation
Init(args ...any) (SupervisorSpec, error)
// HandleChildStart invoked on a successful child process starting if option EnableHandleChild
// was enabled in gen.SupervisorSpec
HandleChildStart(name gen.Atom, pid gen.PID) error
// HandleChildTerminate invoked on a child process termination
// if option EnableHandleChild was enabled in gen.SupervisorSpec
HandleChildTerminate(name gen.Atom, pid gen.PID, reason error) error
// HandleMessage invoked if Supervisor received a message sent with gen.Process.Send(...).
// Non-nil value of the returning error will cause termination of this process.
// To stop this process normally, return gen.TerminateReasonNormal or
// gen.TerminateReasonShutdown. Any other - for abnormal termination.
HandleMessage(from gen.PID, message any) error
// HandleCall invoked if Supervisor got a synchronous request made with gen.Process.Call(...).
// Return nil as a result to handle this request asynchronously and
// to provide the result later using the gen.Process.SendResponse(...) method.
HandleCall(from gen.PID, ref gen.Ref, request any) (any, error)
// Terminate invoked on a termination supervisor process
Terminate(reason error)
// HandleInspect invoked on the request made with gen.Process.Inspect(...)
HandleInspect(from gen.PID) map[string]string
}
//
// Child process implementation
//
func factoryMyActor() gen.ProcessBehavior {
return &MyActor{}
}
type MyActor struct {
act.Actor
}
func (a *MyActor) Init(args ...any) error {
a.Log().Info("starting child process MyActor")
return nil
}
//
// gen.SupervisorBehavior implementation
//
func (s *MySupervisor) Init(args ...any) (act.SupervisorSpec, error) {
s.Log().Info("initialize supervisor...")
spec := act.SupervisorSpec{
Children: []act.SupervisorChildSpec{
act.SupervisorChildSpec{
Name: "myChild",
Factory: factoryMyActor,
},
}
}
return spec, nil
}
...
pid, err := node.Spawn(factoryMySupervisor, gen.ProcessOptions{})
node.Log().Info("MySupervisor is started succesfully with pid %s", pid)
...
type Log interface {
Level() LogLevel
SetLevel(level LogLevel) error
Logger() string
SetLogger(name string)
Trace(format string, args ...any)
Debug(format string, args ...any)
Info(format string, args ...any)
Warning(format string, args ...any)
Error(format string, args ...any)
Panic(format string, args ...any)
}
type LoggerBehavior interface {
Log(message MessageLog)
Terminate()
}
node.LoggerAdd("my logger", myLogger, gen.LogLevelPanic, gen.LogLevelInfo)
[]gen.LogLevel{
LogLevelTrace,
LogLevelDebug,
LogLevelInfo,
LogLevelWarning,
LogLevelError,
LogLevelPanic,
}
var options gen.NodeOptions
logFile, _ := os.Create("out.log")
options.Log.Logger.Output = io.MultiWriter(os.Stdout, logFile)
TimeFormat: time.DateTime
IncludeBehavior: true
IncludeName: true
func (a *MyActor) Init(args ...any) error {
...
a.Log().Info("starting my actor")
return nil
}
...
pid, err := node.SpawnRegister("example", factoryMyActor, gen.ProcessOptions{})
node.Log().Info("started MyActor with PID %s", pid)
...
2024-07-31 07:53:57 [info] <6EE4478D.0.1017> 'example' main.MyActor: starting my actor
2024-07-31 07:53:57 [info] 6EE4478D: started MyActor with PID <6EE4478D.0.1017>
func factoryMyLogger() gen.ProcessBehavior {
return &MyLogger{}
}
type MyLogger struct {
act.Actor
}
func (ml *MyLogger) HandleLog(message gen.MessageLog) error {
switch m := message.Source.(type) {
case gen.MessageLogNode:
// handle message
case gen.MessageLogProcess:
// handle message
case gen.MessageLogMeta:
// handle message
case gen.MessageLogNetwork:
// handle message
}
...
return nil
}
...
pid, err := node.Spawn(factoryMyLogger, gen.ProcessOptions{})
node.LoggerAddPID(pid)
What is a Process in Ergo Framework
In Ergo Framework, a process is a lightweight entity that operates on top of a goroutine and is based on the actor model. Each process has a mailbox for receiving incoming messages. The size of the mailbox is determined by the gen.ProcessOptions.MailboxSize
parameter when the process is started. By default, this value is set to zero, which makes the mailbox unlimited in size. Inside the mailbox, there are several queues—Main, System, Urgent, and Log—that prioritize the processing of messages.
Any running process can send messages and make synchronous calls to other processes, including those running on remote nodes. Additionally, a process can spawn new processes, both locally and on remote nodes (see Remote Spawn Process).
Processes in Ergo Framework are created and run within the scope of a node. Each process is assigned a unique identifier called gen.PID
when it is started. The gen.PID
consists of several key components:
Node
: name of the node on which the process is running.
ID
: unique number that identifies the process within its node.
Creation
: property that represents the "incarnation" of the process, meaning the specific instance of the node in which the process was created.
This combination of properties allows for:
Identification Across Nodes: node name is used for routing messages to remote nodes where the process resides. Once routed to the correct node, the process's unique sequential number is used for local message routing on the remote node.
Process Incarnation Handling: Creation
property allows the system to distinguish between different instances of a node. This property has the same value for all processes within a node's lifecycle. If the node is restarted, the Creation
value changes (it stores the timestamp of the node's startup).
If you attempt to send a message to a process with a gen.PID
that belongs to a previous incarnation of the node, an error gen.ErrProcessIncarnation
will be returned.
This mechanism ensures that processes are accurately identified and that message routing remains reliable, even across restarts of nodes.
A process can also be identified by a name associated with it. You can register such a name when starting a process using the SpawnRegister
method of the gen.Node
interface. Additionally, a process can register its name after it has started by calling the RegisterName
method of the gen.Process
interface.
If you want to change the associated name of a process, you first need to unregister the current name. This is because a process is limited to having only one associated name at a time. The UnregisterName
method of the gen.Process
interface allows you to remove the current name registration.
To send a message to a process by its registered name, you can use the SendProcessID
method of the gen.Process
interface. For making a synchronous request by name, the gen.Process
interface provides the CallProcessID
method.
These methods make it easy to send messages and interact with processes using their associated names, without needing to know their gen.PID
, especially when dealing with remote or dynamically created processes.
type myActor struct {
act.Actor
}
...
func (a *myActor) HandleMessage(from gen.PID, message any) error {
...
remoteProcess := gen.ProcessID{Name:"p1", Node:"node2@localhost"}
// sending async message to the remote process with associated name
a.SendProcessID(remoteProcess, "hello")
// making a sync request to the remote process with associated name
result, err := a.CallProcessID(remoteProcess, "hi")
...
// you can also use methods Send and Call
a.Send(remoteProcess)
result, err = a.Call(remoteProcess)
...
}
To send a message or make a synchronous request to a local process, you can use the universal methods Send
and Call
of the gen.Process
interface, respectively:
func (a *myActor) HandleMessage(from gen.PID, message any) error {
...
localProcess := gen.Atom("local")
a.Send(localProcess, "hello")
result, err := a.Call(localProcess, "hi")
...
}
In Ergo Framework, processes have the ability to create temporary identifiers known as aliases. A process can generate an unlimited number of aliases using the CreateAlias
method of the gen.Process
interface. These aliases remain valid throughout the lifetime of the process or until the process explicitly removes them using the DeleteAlias
method of the gen.Process
interface.
Aliases provide additional flexibility by allowing a process to be referenced by multiple temporary identifiers, useful for handling specific tasks or interactions where a unique, temporary process reference is needed.
A process in Ergo Framework can be in one of the following states:
Init - This is the state of the process during its startup. At this stage, the process is not yet registered in the node, so only a limited set of methods from the gen.Process
interface are available—such as modifying process properties (environment variables, message compression settings, setting priorities for outgoing messages). Attempts to use methods like RegisterName
, RegisterEvent
, CreateAlias
, Call*
, Link*
, and Monitor*
will return the error gen.ErrNotAllowed
. However, methods like Send*
and Spawn*
remain available in this state.
Sleep - This state occurs when the process has no messages in its mailbox, and no goroutine is running for the process. The process is idle and consumes no CPU resources in this state.
Running - When a process receives a message in its mailbox, a goroutine is launched, and the appropriate callback is executed. Once the message has been processed and if there are no other messages in the mailbox, the process returns to the Sleep state, and the goroutine is terminated.
WaitResponse - The process transitions to this state from Running when it makes a synchronous request (Call*
). The process waits for a response to the request, and once received, it transitions back to the Running state
Zombee - This is an intermediate state that occurs when a process was in the Running state but the node forcibly stopped the process using the Kill
method. In this state, the process cannot access any methods from the gen.Process
interface. After processing the current message and terminating the goroutine, the process transitions from Zombie to Terminated. In this state, the process no longer receives new messages.
Terminated - This is the final state of the process before it is removed from the node. Messages are not delivered to a process in this state.
You can retrieve the current state of a process using the ProcessInfo
method of the gen.Node
interface. Alternatively, you can use the State
method of the gen.Process
interface to check the state directly from the process.
These states define the lifecycle of a process and determine what actions it can perform at any given moment in Ergo Framework.
Each process in Ergo Framework has its own set of environment variables, which can be retrieved using the EnvList()
method of the gen.Process
interface. To modify these variables, the SetEnv(name gen.Env, value any)
method is used; if value
is set to nil
, the environment variable with the specified name will be removed. Environment variable names are case-insensitive, allowing for flexible and dynamic configuration management within processes:
type myActor struct {
act.Actor
}
func (a *myActor) Init(args ...any) error {
...
a.SetEnv("var", 1) // set variable
a.SetEnv("Var", 2) // overwrite value, the name is case insensitive
v, _ := a.Env("vAr") // return 2 as a value
a.SetEnv("vaR", nil) // remove variable
...
}
The initialization of a process's environment variables occurs at the moment the process starts. The starting process first inherits the environment variables of the node, then the environment variables of the application and parent process, and finally, the environment variables specified in gen.ProcessOptions
are added.
To launch a new process in Ergo Framework, there are two methods:
Spawn(factory ProcessFactory, options ProcessOptions, args ...any) (PID, error)
SpawnRegister(register Atom, factory ProcessFactory, options ProcessOptions, args ...any) (PID, error)
Both methods are available in the gen.Node
and gen.Process
interfaces.
When using the SpawnRegister
method, the node checks whether the specified name register
can be assigned to the starting process. If the name is already taken by another process, this function will return the error gen.ErrTaken
.
Processes launched by the node (using the Spawn*
methods of the gen.Node
interface) are assigned a virtual parent process identifier, known as CorePID. You can retrieve this value using the CorePID
method of the gen.Node
interface.
If a process is launched using the Spawn*
methods of the gen.Process
interface, it becomes a child process. The Parent
method of the gen.Process
interface in the child process will return the identifier of the process that launched it.
Attempting to send an exit signal to a parent process using the SendExit
method of the gen.Process
interface will result in the error gen.ErrNotAllowed
.
Additionally, the parent process identifier is used in handling exit signals in act.Actor
(see the section on TrapExit in Actor).
Each process in Ergo Framework has a process leader identifier, in addition to the parent process identifier. You can retrieve this identifier using the Leader
method of the gen.Process
interface. When a process is started by the node, its process leader is set to the virtual identifier CorePID. If the process is started by another process, the child process inherits the leader identifier from its parent process.
During process startup, it is also possible to explicitly set a process leader using the Leader
option in gen.ProcessOption
. This is commonly done when starting child processes in act.Supervisor
. While the process leader identifier does not influence the operational logic of processes, it serves an informational role and can be used within your application's logic as needed.
To start a process in Ergo Framework, you need to specify a factory
argument, which is a factory function. This function must return an object that implements the gen.ProcessBehavior
interface. This is a low-level interface that is already implemented in general-purpose actors (such as act.Actor
, act.Supervisor
, etc.). Therefore, in most cases, you only need to embed the required actor into your object to satisfy the gen.ProcessBehavior
interface.
Example:
// based on act.Actor
type MyActor struct {
act.Actor // embed generic actor
}
func factoryMyActor() gen.ProcessBehavior {
return &MyActor{}
}
...
node.Spawn(factoryMyActor, gen.ProcessOptions{})
The options
parameter in the Spawn
or SpawnRegister
functions allows you to configure various parameters for the process being launched:
MailboxSize
: defines the size of the mailbox. If not specified, the mailbox will be unlimited in size.
Leader
: sets the leader of the process group. If not specified, the leader value is inherited from the parent process. This setting is informational and does not affect the process itself but can be used in your application's logic. Typically, this is set by supervisor processes.
Compression
: specifies the message compression settings when sending messages over the network to remote nodes. This option is ignored if the recipient is a local process.
SendPriority
: allows you to set the priority for sent messages. There are three priority levels:
gen.MessagePriorityNormal
: Messages are delivered to the Main queue of the recipient's mailbox.
gen.MessagePriorityHigh
: Messages are delivered to the System queue.
gen.MessagePriorityMax
: Highest-priority messages are delivered to the Urgent queue of the recipient's mailbox.
ImportantDelivery
: enables the important flag for messages sent to remote processes. When enabled, the remote node will send an acknowledgment that the message was delivered to the recipient's mailbox. If the message cannot be delivered, an error (e.g., gen.ErrProcessTerminated
, gen.ProcessUnknown
, gen.ProcessMailboxFull
) will be returned. You can also manage this flag using the SetImportantDelivery
method of the gen.Process
interface. This flag only applies to messages sent via the SendPID
, SendProcessID
, and SendAlias
methods or requests made using CallPID
, CallProcessID
, or CallAlias
methods of gen.Process
. To force a message to be sent with the important flag, use the SendImportant
or CallImportant
methods of gen.Process
.
Fallback
: specifies a fallback process to which messages will be redirected in the event that the recipient's mailbox is full. Each such message is wrapped in the gen.MessageFallback
structure. This option is ignored if the recipient process's mailbox is unlimited.
LinkParent
: Automatically creates a link with the parent process when the process is started (after successful initialization). This option is ignored if the process is started by the node.
LinkChild
: Automatically creates a link with a child process upon its successful startup (after successful initialization). This option is ignored if the process is started by the node.
Env
: Sets the environment variables for the process.
In Ergo Framework, a process can be terminated in the following ways:
Forcefully: process can be terminated by force using the Kill
method provided by the gen.Node
interface.
Sending an Exit Signal: you can stop a process by sending it an exit signal with a specified reason. This is done using the SendExit
method, which is available in both the gen.Node
and gen.Process
interfaces. The exit signal is always delivered with the highest priority and placed in the Urgent queue. In act.Actor
, there is a mechanism to intercept exit signals, allowing them to be handled like normal gen.MessageExit*
messages. More information on this mechanism can be found in the Actor section. Sending an exit signal to yourself, your parent process, or the process leader is not allowed. In such cases, the SendExit
method will return the error gen.ErrNotAllowed
.
Self-Termination: A process can terminate itself by returning a non-nil
value (of type error
) from its message handling callback. The returned value will be treated as the termination reason, and the process will be terminated. You can use gen.TerminateReasonNormal
or gen.TerminateReasonShutdown
for normal shutdown scenarios.
Panic: If a panic occurs during message processing, the process will be terminated with the reason gen.TerminateReasonPanic
.
Upon termination, all resources associated with the process are released:
All events registered by the process are unregistered.
Any associated name, if registered, is freed.
Aliases, links, and monitors created by the process are removed.
The process is removed from the logging system if it was previously registered as a logger.
All meta-processes launched by the process are terminated.
You can retrieve summary information about a process using the ProcessInfo
method of the gen.Node
interface or the Info
method of the gen.Process
interface. Both methods return a gen.ProcessInfo
structure containing detailed information about the process.
This package implements the gen.Registrar
interface and serves as a client library for etcd, a distributed key-value store that provides a reliable way to store data that needs to be accessed by a distributed system or cluster of machines. In addition to the primary Service Discovery function, it automatically notifies all connected nodes about cluster configuration changes and supports hierarchical configuration management with type conversion.
To create a client, use the Create
function from the etcd
package. The function requires a set of options etcd.Options
to configure the connection and behavior.
Then, set this client in the gen.NetworkOption.Registrar
options:
import (
"ergo.services/ergo"
"ergo.services/ergo/gen"
"ergo.services/registrar/etcd"
)
func main() {
var options gen.NodeOptions
...
registrarOptions := etcd.Options{
Endpoints: []string{"localhost:2379"},
Cluster: "production",
}
options.Network.Registrar = etcd.Create(registrarOptions)
...
node, err := ergo.StartNode("demo@localhost", options)
...
}
Using etcd.Options
, you can specify:
Cluster
- The cluster name for your node (default: "default")
Endpoints
- List of etcd endpoints (default: ["localhost:2379"])
Username
- Username for etcd authentication (optional)
Password
- Password for etcd authentication (optional)
TLS
- TLS configuration for secure connections (optional)
InsecureSkipVerify
- Option to ignore TLS certificate verification
DialTimeout
- Connection timeout (default: 10s)
RequestTimeout
- Request timeout (default: 10s)
KeepAlive
- Keep-alive timeout (default: 10s)
When the node starts, it will register with the etcd cluster and maintain a lease to ensure automatic cleanup if the node becomes unavailable.
The etcd registrar provides hierarchical configuration management with four priority levels:
Cross-cluster node-specific: services/ergo/config/{cluster}/{node}/{item}
Cluster node-specific: services/ergo/cluster/{cluster}/config/{node}/{item}
Cluster-wide default: services/ergo/cluster/{cluster}/config/*/{item}
Global default: services/ergo/config/global/{item}
The etcd registrar supports typed configuration values using string prefixes. Configuration values are stored as strings in etcd and automatically converted to the appropriate Go types when read by the registrar:
"int:123"
→ int64(123)
"float:3.14"
→ float64(3.14)
"bool:true"
→ bool(true)
, "bool:false"
→ bool(false)
"hello"
→ "hello"
(strings without prefixes remain unchanged)
Important: All configuration values must be stored as strings in etcd. The type conversion happens automatically when the registrar reads the configuration.
Example configuration setup using etcdctl:
# Node-specific integer configuration (stored as string, converted to int64)
etcdctl put services/ergo/cluster/production/config/web1/database.port "int:5432"
# Cluster-wide float configuration (stored as string, converted to float64)
etcdctl put services/ergo/cluster/production/config/*/cache.ratio "float:0.75"
# Boolean configuration (stored as string, converted to bool)
etcdctl put services/ergo/cluster/production/config/*/debug.enabled "bool:true"
etcdctl put services/ergo/cluster/production/config/web1/ssl.enabled "bool:false"
# Application-specific configuration (visible to all nodes using wildcard format)
etcdctl put services/ergo/cluster/production/config/*/myapp.cache.size "int:256"
etcdctl put services/ergo/cluster/production/config/*/client.timeout "int:30"
# Global string configuration (stored and returned as string)
etcdctl put services/ergo/config/global/log.level "info"
Access configuration in your application:
registrar, err := node.Network().Registrar()
if err != nil {
return err
}
// Get single configuration item
port, err := registrar.ConfigItem("database.port")
if err != nil {
return err
}
// port will be int64(5432)
// Get multiple configuration items
config, err := registrar.Config("database.port", "cache.ratio", "debug.enabled", "log.level")
if err != nil {
return err
}
// config["database.port"] = int64(5432)
// config["cache.ratio"] = float64(0.75)
// config["debug.enabled"] = bool(true)
// config["log.level"] = "info"
The etcd registrar registers a gen.Event
and generates messages based on changes in the etcd cluster within the specified cluster. This allows the node to stay informed of any updates or changes within the cluster, ensuring real-time event-driven communication and responsiveness to cluster configurations:
etcd.EventNodeJoined
- Triggered when another node is registered in the same cluster
etcd.EventNodeLeft
- Triggered when a node disconnects or its lease expires
etcd.EventApplicationLoaded
- An application was loaded on a remote node
etcd.EventApplicationStarted
- Triggered when an application starts on a remote node
etcd.EventApplicationStopping
- Triggered when an application begins stopping on a remote node
etcd.EventApplicationStopped
- Triggered when an application is stopped on a remote node
etcd.EventConfigUpdate
- The cluster configuration was updated
To receive such messages, you need to subscribe to etcd client events using the LinkEvent
or MonitorEvent
methods from the gen.Process
interface. You can obtain the name of the registered event using the Event
method from the gen.Registrar
interface:
type myActor struct {
act.Actor
}
func (m *myActor) HandleMessage(from gen.PID, message any) error {
reg, err := m.Node().Network().Registrar()
if err != nil {
m.Log().Error("unable to get Registrar interface: %s", err)
return nil
}
ev, err := reg.Event()
if err != nil {
m.Log().Error("Registrar has no registered Event: %s", err)
return nil
}
m.MonitorEvent(ev)
return nil
}
func (m *myActor) HandleEvent(event gen.MessageEvent) error {
switch msg := event.Message.(type) {
case etcd.EventNodeJoined:
m.Log().Info("Node %s joined cluster", msg.Name)
case etcd.EventApplicationStarted:
m.Log().Info("Application %s started on node %s", msg.Name, msg.Node)
case etcd.EventConfigUpdate:
m.Log().Info("Configuration %s updated", msg.Item)
// Handle specific configuration changes
if msg.Item == "ssl.enabled" {
if enabled, ok := msg.Value.(bool); ok {
m.Log().Info("SSL %s", map[bool]string{true: "enabled", false: "disabled"}[enabled])
}
}
}
return nil
}
To get information about available applications in the cluster, use the ResolveApplication
method from the gen.Resolver
interface, which returns a list of gen.ApplicationRoute
structures:
type ApplicationRoute struct {
Node Atom
Name Atom
Weight int
Mode ApplicationMode
State ApplicationState
}
Name
- The name of the application
Node
- The name of the node where the application is loaded or running
Weight
- The weight assigned to the application in gen.ApplicationSpec
Mode
- The application's startup mode (gen.ApplicationModeTemporary
, gen.ApplicationModePermanent
, gen.ApplicationModeTransient
)
State
- The current state of the application (gen.ApplicationStateLoaded
, gen.ApplicationStateRunning
, gen.ApplicationStateStopping
)
You can access the gen.Resolver
interface using the Resolver
method from the gen.Registrar
interface:
resolver := registrar.Resolver()
// Resolve application routes
routes, err := resolver.ResolveApplication("web-server")
if err != nil {
return err
}
for _, route := range routes {
log.Printf("Application %s running on node %s (weight: %d, state: %s)",
route.Name, route.Node, route.Weight, route.State)
}
Get a list of all nodes in the cluster:
nodes, err := registrar.Nodes()
if err != nil {
return err
}
for _, nodeName := range nodes {
log.Printf("Node in cluster: %s", nodeName)
}
The etcd registrar organizes data in etcd using the following key structure:
services/ergo/cluster/{cluster}/
├── routes/ # Non-overlapping with config paths
│ ├── nodes/{node} # Node registration with lease (edf.Encode + base64)
│ └── applications/{app}/{node} # Application routes (edf.Encode + base64)
└── config/ # Configuration data (string + type prefixes)
├── {node}/{item} # Node-specific config
└── */{item} # Cluster-wide config
services/ergo/config/
├── {cluster}/{node}/{item} # Cross-cluster node config
└── global/{item} # Global config
Important Architecture Notes:
Routes (nodes/applications) use edf.Encode + base64
encoding and are stored in the routes/
subpath. Don't change anything there.
Configuration uses string encoding with type prefixes and is stored in the config/
subpath
A fully featured example can be found at GitHub - Ergo Services Examples in the docker
directory.
This example demonstrates how to run multiple Ergo nodes using etcd as a registrar for service discovery. It showcases service discovery, actor communication, typed configuration management, and real-time configuration event monitoring across a cluster.
The etcd registrar includes comprehensive testing infrastructure:
Use the included Docker Compose setup for testing:
# Start etcd for testing
make start-etcd
# Run tests with coverage
make test-coverage
# Run integration tests only
make test-integration
# Clean up
make clean
For debugging and manual operations:
# Check cluster health
etcdctl --endpoints=localhost:12379 endpoint health
# List all keys in cluster
etcdctl --endpoints=localhost:12379 get --prefix "services/ergo/"
# Set configuration manually (values must be strings)
etcdctl --endpoints=localhost:12379 put \
"services/ergo/cluster/production/config/web1/database.timeout" "int:30"
etcdctl --endpoints=localhost:12379 put \
"services/ergo/cluster/production/config/web1/debug.enabled" "bool:true"
# Watch for changes
etcdctl --endpoints=localhost:12379 watch --prefix "services/ergo/cluster/production/"
The etcd registrar provides a robust, scalable solution for service discovery and configuration management in distributed Ergo applications, with the reliability and consistency guarantees of etcd.
To install the observer
tool, you need to have Golang compiler version 1.20 or higher. Run the following command:
$ go install ergo.services/tools/observer@latest
Available arguments for starting observer
:
-help
: displays information about the available arguments.
-version
: prints the current version of the Observer tool.
-host
: specifies the interface name for the Web server to run on (default: "localhost"
).
-port
: defines the port number for the Web server (default: 9911
).
-cookie
: sets the default cookie value used for connecting to other nodes.
If you are running observer
on a server for continuous operation, it is recommended to use the environment variable COOKIE
instead of the -cookie
argument. Using sensitive data in command-line arguments is insecure.
After starting observer
, it initially has no connections to other nodes, so you will be prompted to specify the node you want to connect to.
Once you establish a connection with a remote node, the Observer application main page will open, displaying information about that node.
If you have integrated the Observer application into your node, upon opening the Observer page, you will immediately land on the main page showing information about the node where the Observer application was launched.
On this tab, you will find general information about the node and the ability to manage its logging level. Changing the logging level only affects the node itself and any newly started processes, but it does not impact processes that are already running.
Graphs provide real-time information over the last 60 seconds, including the total number of processes, the number of processes in the running state, and memory usage data. Memory usage is divided into used, which indicates how much memory was reserved from the operating system, and allocated, which shows how much of that reserved memory is currently being used by the Golang runtime.
In addition to these details, you can view information about the available loggers on the node and their respective logging levels. For more details, refer to the Logging section. Environment variables will also be displayed here, but only if the ExposeEnvInfo
option was enabled in the gen.NodeOptions.Security
settings when the inspected node was started.
The Network tab displays information about the node's network stack.
The Mode indicates how the network stack was started (enabled, hidden, or disabled).
The Registrar section shows the properties of the registrar in use, including its capabilities. Embedded Server indicates whether the registrar is running in server mode, while the Server field shows the address and port number of the registrar with which the node is registered.
Additionally, the tab provides information about the default handshake and protocol versions used for outgoing connections.
The Flags section lists the set of flags that define the functionality available to remote nodes.
The Acceptors section lists the node's acceptors, with detailed information available for each. This list will be empty if the network stack is running in hidden mode.
Since the node can work with multiple network stacks simultaneously, some acceptors may have different registrar parameters and handshake/protocol versions. For an example of simultaneous usage of the Erlang and Ergo Framework network stacks, refer to the Erlang section.
The Connected Nodes section displays a list of active connections with remote nodes. For each connection, you can view detailed information, including the version of the handshake used when the connection was established and the protocol currently in use. The Flags section shows which features are available to the node when interacting with the remote node.
Since the ENP protocol supports a pool of TCP connections within a single network connection, you will find information about the Pool Size (the number of TCP connections). The Pool DSN field will be empty if this is an incoming connection for the node or if the protocol does not support TCP connection pooling.
Graphs provide a summary of the number of received/sent messages and network traffic over the last 60 seconds, offering a quick overview of communication activity and data flow.
On the Processes List tab, you can view general information about the processes running on the node. The number of processes displayed is controlled by the Start from and Limit parameters.
By default, the list is sorted by the process identifier. However, you can choose different sorting options:
Top Running: displays processes that have spent the most time in the running state.
Top Messaging: sorts processes by the number of sent/received messages in descending order.
Top Mailbox: helps identify processes with the highest number of messages in their mailbox, which can be an indication that the process is struggling to handle the load efficiently.
For each process, you can view brief information:
The Behavior field shows the type of object that the process represents.
Application field indicates the application to which the process belongs. This property is inherited from the parent, so all processes started within an application and their child processes will share the same value.
Mailbox Messages displays the total number of messages across all queues in the process's mailbox.
Running Time shows the total time the process has spent in the running state, which occurs when the process is actively handling messages from its queue.
By clicking on the process identifier, you will be directed to a page with more detailed information about that specific process.
All log messages from the node, processes, network stack, or meta-processes are displayed here. When you connect to the Observer via a browser, the Observer's backend sends a request to the inspector to start a log process with specified logging levels (this log process is visible on the main Info tab).
When you change the set of logging levels, the Observer's backend requests the start of a new log process (the old log process will automatically terminate).
To reduce the load on the browser, the number of displayed log messages is limited, but you can adjust this by setting the desired number in the Last field.
The Play/Pause button allows you to stop or resume the log process, which is useful if you want to halt the flow of log messages and focus on examining the already received logs in more detail.
This page displays detailed information about the process, including its state, uptime, and other key metrics.
The fallback parameters specify which process will receive redirected messages in case the current process's mailbox becomes full. However, if the Mailbox Size is unlimited, these fallback parameters are ignored.
The Message Priority field shows the priority level used for messages sent by this process.
Keep Network Order is a parameter applied only to messages sent over the network. It ensures that all messages sent by this process to a remote process are delivered in the same order as they were sent. This parameter is enabled by default, but it can be disabled in certain cases to improve performance.
The Important Delivery setting indicates whether the important flag is enabled for messages sent to remote nodes. Enabling this option forces the remote node to send an acknowledgment confirming that the message was successfully delivered to the recipient's mailbox.
The Compression parameters allow you to enable message compression for network transmissions and define the compression settings.
Graphs on this page help you assess the load on the process, displaying data over the last 60 seconds.
Additionally, you can find detailed information about any aliases, links, and monitors created by this process, as well as any registered events and started meta-processes.
The list of environment variables is displayed only if the ExposeEnvInfo
option was enabled in the node's gen.NodeOptions.Security
settings.
Additionally, on this page, you can send a message to the process, send an exit signal, or even forcibly stop the process using the kill command. These options are available in the context menu.
If the behavior of this process implements the HandleInspect
method, the response from the process to the inspect request will be displayed here. The Observer sends these requests once per second while you are on this tab.
In the example screenshot above, you can see the inspection of a process based on act.Pool
. Upon receiving the inspect request, it returns information about the pool of processes and metrics such as the number of messages processed.
The Log tab on the process information page displays a list of log messages generated by the specific process.
Please note that since the Observer uses a single stream for logging, any changes to the logging levels will also affect the content of the Log tab on the main page.
On this page, you'll find detailed information about the meta-process, along with graphs showing data for the last 60 seconds related to incoming/outgoing messages and the number of messages in its mailbox. The meta-process has only two message queues: main and system.
You can also send a message to the meta-process or issue an exit signal. However, it is not possible to forcibly stop the meta-process using the kill command.
If the meta-process's behavior implements the HandleInspect
method, the response from the meta-process to the inspect request will be displayed on this tab. The Observer sends this request once per second while you are on the tab.
On the Log tab of the meta-process, you will see log messages generated by that specific meta-process. Changing the logging levels will also affect the content of the Log tab on the main page.
Erlang network stack
This package implements the Erlang network stack, including the DIST protocol, ETF data format, EPMD registrar functionality, and the Handshake mechanism.
It is compatible with OTP-23 to OTP-27. The source code is available on the project's GitHub page at https://github.com/ergo-services/proto in the erlang23
directory.
Note that the source code is distributed under the Business Source License 1.1 and cannot be used for production or commercial purposes without a license, which can be purchased on the project's sponsor page.
The epmd
package implements the gen.Registrar
interface. To create it, use the epmd.Create
function with the following options:
Port: Registrar port number (default: 4369
).
EnableRouteTLS: Enables TLS for all gen.Route
responses on resolve requests. This is necessary if the Erlang cluster uses TLS.
DisableServer: Disables the internal server mode, useful when using the Erlang-provided epmd
service.
To use this package, include ergo.services/proto/erlang23/epmd
.
The handshake
package implements the gen.NetworkHandshake
interface. To create a handshake instance, use the handshake.Create
function with the following options:
Flags: Defines the supported functionality of the Erlang network stack. The default is set by handshake.DefaultFlags()
.
UseVersion5: Enables handshake version 5 mode (default is version 6).
To use this package, include ergo.services/proto/erlang23/handshake
.
The ergo.services/proto/erlang/dist
package implements the gen.NetworkProto
and gen.Connection
interfaces. To create it, use the dist.Create
function and provide dist.Options
as an argument, where you can specify the FragmentationUnit
size in bytes. This value is used for fragmenting large messages. The default size is set to 65000
bytes.
To use this package, include ergo.services/proto/erlang/dist
.
Erlang uses the ETF (Erlang Term Format) for encoding messages transmitted over the network. Due to differences in data types between Golang and Erlang, decoding received messages involves converting the data to their corresponding Golang types:
number
-> int64
float
number -> float64
big number
-> big.Int
from math/big
, or to int64
/uint64
map
-> map[any]any
binary
-> []byte
list
-> etf.List
([]any
)
tuple
-> etf.Tuple
([]any
) or a registered struct type
string
-> []any
. convert to string using etf.TermToString
atom
-> gen.Atom
pid
-> gen.Pid
ref
-> gen.Ref
ref
(alias) -> gen.Alias
atom
= true/false -> bool
When encoding data in the Erlang ETF format:
map
-> map
#{}
slice
/array
-> list
[]
struct
-> map
with field names as keys (considering etf:
tags on struct fields)
registered type of struct
-> tuple
with the first element being the registered struct name, followed by field values in order.
[]byte
-> binary
int*
/float*
/big.Int
-> number
string
-> string
gen.Atom
-> atom
gen.Pid
-> pid
gen.Ref
-> ref
gen.Alias -> ref
(alias)
bool
-> atom
true/false
You can also use the functions etf.TermIntoStruct
and etf.TermProplistIntoStruct
for decoding data. These functions take into account etf:
tags on struct fields, allowing the values to map correctly to the corresponding struct fields when decoding proplist
data.
To automatically decode data into a struct, you can register the struct type using etf.RegisterTypeOf
. This function takes the object of the type being registered and decoding options etf.RegisterTypeOption
. The options include:
Name
- The name of the registered type. By default, the type name is taken using the reflect
package in the format #/pkg/path/TypeName
Strict
- Determines whether the data must strictly match the struct. If disabled, non-matching data will be decoded into any
.
To be automatically decoded the data sent from Erlang must be a tuple, with the first element being an atom whose value matches the type name registered in Golang. For example:
type MyValue struct{
MyString string
MyInt int32
}
...
// register type MyValue with name "myvalue"
etf.RegisterTypeOf(MyValue{}, etf.RegisterTypeOptions{Name: "myvalue", Strict: true})
...
The values sent by an Erlang process should be in the following format:
> erlang:send(Pid, {myvalue, "hello", 123}).
If you want to use the Erlang network stack by default in your node, you need to specify this in gen.NetworkOptions
when starting the node:
import (
"fmt"
"ergo.services/ergo"
"ergo.services/ergo/gen"
"ergo.services/proto/erlang23/dist"
"ergo.services/proto/erlang23/epmd"
"ergo.services/proto/erlang23/handshake"
)
func main() {
var options gen.NodeOptions
// set cookie
options.Network.Cookie = "123"
// set Erlang Network Stack for this node
options.Network.Registrar = epmd.Create(epmd.Options{})
options.Network.Handshake = handshake.Create(handshake.Options{})
options.Network.Proto = dist.Create(dist.Options{})
// starting node
node, err := ergo.StartNode(gen.Atom(OptionNodeName), options)
if err != nil {
fmt.Printf("Unable to start node '%s': %s\n", OptionNodeName, err)
return
}
node.Wait()
}
In this case, all outgoing and incoming connections will be handled by the Erlang network stack. For a complete example, you can refer to the repository at https://github.com/ergo-services/examples, specifically the erlang
project
If you want to maintain the ability to accept connections from Ergo nodes while using the Erlang network stack as a main one, you need to add an acceptor in the gen.NetworkOptions
settings:
import (
"fmt"
"ergo.services/ergo"
"ergo.services/ergo/gen"
// Ergo Network Stack
hs "ergo.services/ergo/net/handshake"
"ergo.services/ergo/net/proto"
"ergo.services/ergo/net/registrar"
// Erlang Network Stack
"ergo.services/proto/erlang23/dist"
"ergo.services/proto/erlang23/epmd"
"ergo.services/proto/erlang23/handshake"
)
func main() {
...
acceptorErlang := gen.AcceptorOptions{}
acceptorErgo := gen.AcceptorOptions{
Registrar: registrar.Create(registrar.Options{}),
Handshake: hs.Create(hs.Options{}),
Proto: proto.Create(),
}
options.Network.Acceptors = append(options.Network.Acceptors,
acceptorErlang, acceptorErgo)
// starting node
node, err := ergo.StartNode(gen.Atom(OptionNodeName), options)
Please note that if the list of acceptors is empty when starting the node, it will launch an acceptor with the network stack using Registrar
, Handshake
, and Proto
from gen.NetworkOptions
.
If you set the options.Network.Acceptor
, you must explicitly define the parameters for all necessary acceptors. In the example, acceptorErlang
is created with empty gen.AcceptorOptions
(the Erlang stack from gen.NetworkOptions
will be used), while for acceptorErgo
, the Ergo Framework stack (Registrar
, Handshake
, and Proto
) is explicitly defined.
In this example, you can establish incoming and outgoing connections using the Erlang network stack. However, the Ergo Framework network stack can only be used for incoming connections. To create outgoing network connections using the Ergo Framework stack, you need to configure a static route for a group of nodes by defining a match pattern:
...
// starting node
node, err := ergo.StartNode(gen.Atom(OptionNodeName), options)
// add static route
route := gen.NetworkRoute{
Resolver: acceptorErgo.Registrar.Resolver(),
}
match := ".ergonodes.local"
if err := node.Network().AddRoute(match, route, 1); err != nil {
panic(err)
}
For more detailed information, please refer to the Static Routes section.
If your cluster primarily uses the Ergo Framework network stack by default and you want to enable interaction with Erlang nodes, you'll need to add an acceptor using the Erlang network stack. Additionally, you must define a static route for Erlang nodes using a match pattern:
import (
"fmt"
"ergo.services/ergo"
"ergo.services/ergo/gen"
"ergo.services/proto/erlang23/dist"
"ergo.services/proto/erlang23/epmd"
"ergo.services/proto/erlang23/handshake"
)
func main() {
var options gen.NodeOptions
// set cookie
options.Network.Cookie = "123"
// add acceptors
acceptorErgo := gen.AcceptorOptions{}
acceptorErlang := gen.AcceptorOptions{
Registrar: epmd.Create(epmd.Options{}),
Handshake: handshake.Create(handshake.Options{}),
Proto: dist.Create(dist.Options{}),
}
options.Network.Acceptors = append(options.Network.Acceptors,
acceptorErgo, acceptorErlang)
// starting node
node, err := ergo.StartNode(gen.Atom(OptionNodeName), options)
if err != nil {
fmt.Printf("Unable to start node '%s': %s\n", OptionNodeName, err)
return
}
// add static route
route := gen.NetworkRoute{
Resolver: acceptorErlang.Registrar.Resolver(),
}
if err := node.Network().AddRoute(".erlangnodes.local", route, 1); err != nil {
panic(err)
}
node.Wait()
}
GenServer
The erlang23.GenServer
actor implements the low-level gen.ProcessBehavior
interface, enabling it to handle messages and synchronous requests from processes running on an Erlang node. The following message types are used for communication in Erlang:
regular messages - sent from Erlang using erlang:send
or the Pid ! message
syntax
cast-messages - sent from Erlang with gen_server:cast
call-requests - from Erlang made with gen_server:call
erlang23.GenServer
uses the erlang23.GenServerBehavior
interface to interact with your object. This interface defines a set of callback methods for your object, which allow it to handle incoming messages and requests. All methods in this interface are optional, meaning you can choose to implement only the ones relevant to your specific use case:
type GenServerBehavior interface {
gen.ProcessBehavior
Init(args ...any) error
HandleInfo(message any) error
HandleCast(message any) error
HandleCall(from gen.PID, ref gen.Ref, request any) (any, error)
Terminate(reason error)
HandleEvent(message gen.MessageEvent) error
HandleInspect(from gen.PID, item ...string) map[string]string
}
The callback method HandleInfo
is invoked when an asynchronous message is received from an Erlang process using erlang:send
or via the Send*
methods of the gen.Process
interface. The HandleCast
callback method is called when a cast message is sent using gen_server:cast
from an Erlang process. Synchronous requests sent with gen_server:call
or Call*
methods are handled by the HandleCall
callback method.
If your actor only needs to handle regular messages from Erlang processes, you can use the standard act.Actor
and process asynchronous messages in the HandleMessage
callback method.
To start a process based on erlang23.GenServer
, create an object embedding erlang23.GenServer
and implement a factory function for it.
Example:
import "ergo.services/proto/erlang23"
func factory_MyActor gen.ProcessBehavior {
return &MyActor{}
}
type MyActor struct {
erlang23.GenServer
}
To send a cast message, use the Cast
method of erlnag23.GenServer
.
func (ma *MyActor) HandleInfo(message any) error {
...
ma.Cast(Pid, "cast message")
return nil
}
To send regular messages, use the Send*
methods of the embedded gen.Process
interface. Synchronous requests are made using the Call*
methods of the gen.Process
interface.
Like act.Actor
, an actor based on erlang23.GenServer
supports the TrapExit
functionality to intercept exit signals. Use the SetTrapExit
and TrapExit
methods of your object to manage this functionality, allowing your process to handle exit signals rather than terminating immediately when receiving them.
Ergo Service Registry and Discovery
saturn
is a tool designed to simplify the management of clusters of nodes created using the Ergo Framework. It offers the following features:
A unified registry for node registration within a cluster.
The ability to manage multiple clusters simultaneously.
The capability to manage the configuration of the entire cluster without restarting the nodes connected to Saturn (configuration changes are applied on the fly).
Notifications to all cluster participants about changes in the status of applications running on nodes connected to Saturn.
The source code of the saturn
tool is available on the project's page: https://github.com/ergo-services/tools.
To install saturn
, use the following command:
$ go install ergo.services/tools/saturn@latest
Available arguments:
host
: Specifies the hostname to use for incoming connections.
port
: Port number for incoming connections. The default value is 4499
.
path
: Path to the configuration file saturn.yaml
.
debug
: Enables debug mode for outputting detailed information.
version
: Displays the current version of Saturn.
To start Saturn, a configuration file named saturn.yaml
is required. By default, Saturn expects this file to be located in the current directory. You can specify a different location for the configuration file using the -path
argument.
You can find an example configuration file in the project's Git repository.
The saturn.yaml
configuration file contains two root elements:
Saturn
: This section includes settings for the Saturn server.
You can configure the Token
for access by remote nodes and specify certificate files for TLS connections.
By default, a self-signed certificate is used. For clients to accept this certificate, they must enable the InsecureSkipVerify
option when creating the client.
Changes to this section require a restart of the Saturn server.
Clusters
: This section includes the configurations for clusters.
Changes in this section are automatically reloaded and sent to the registered nodes as updated configuration messages, without requiring a restart of Saturn.
The settings can target:
All nodes in all clusters.
Only nodes with a specified name in all clusters.
Only nodes within a specific cluster.
Only a node with a specified name within a specific cluster.
If the name of a configuration element ends with the suffix .file
, the value of that element is treated as a file. The content of this file is then sent to the nodes as a []byte
.
To configure settings for all nodes in all clusters, use the Clusters
section in the saturn.yaml
configuration file. Here, you can define global settings that will apply to every node within every cluster managed by Saturn:
Clusters:
Var1: 123
Var2: 12.3
Var3: "123"
Var4.file: "./myfile.txt"
[email protected]:
Var1: 456
in this example:
Var1
, Var2
, Var3
, and Var4
will be applied to all nodes in all clusters.
However, the value of Var1
for nodes named [email protected]
in any cluster will be overridden with the value 456
.
If nodes are registered without specifying a Cluster
in saturn.Options
, they become part of the general cluster. Configuration for the general cluster should be provided in the Cluster@
section
Clusters:
Var1: 123
Cluster@:
Var1: 789
node@host:
Var1: 456
In the example above:
The variable Var1
is set to 789
for the general cluster (all nodes in the general cluster will receive Var1: 789
).
However, for the node [email protected]
within the general cluster, Var1
will be overridden to 456
.
Thus, all nodes in the general cluster will inherit Var1: 789
, except for [email protected]
, which will specifically have Var1: 456
. Other nodes in the general cluster will retain the default values from the Cluster@
section unless they are explicitly overridden in the configuration.
To specify settings for a particular cluster, use the element name Cluster@<cluster name>
in the configuration file:
Clusters:
Var1: 123
Cluster@mycluster:
Var1: 321
node@host: 654
Saturn can manage multiple clusters simultaneously, but resolve requests from nodes are handled only within their own cluster.
The name of a registered node must be unique within its cluster.
When a node registers, it informs the registrar which cluster it belongs to. Additionally, the node reports the applications running on it. Other nodes in the same cluster receive notifications about the newly connected node and its applications. Any changes in application statuses are also reported to the registrar, which in turn notifies all participants in the cluster.
For more details, see the Saturn Client section.
A zero-dependency library for testing Ergo Framework actors with fluent API
The Ergo Unit Testing Library makes testing actor-based systems simple and reliable. It provides specialized tools designed specifically for the unique challenges of testing actors, with zero external dependencies and an intuitive, readable API.
This guide takes you from simple actor tests to complex distributed scenarios. Here's the journey:
Your First Test - Simple echo and counter examples
Built-in Assertions - Simple tools for common checks
Basic Message Testing - Verify actors send the right messages
Basic Logging Testing - Verify your actors provide good observability
Configuration Testing - Test environment-driven behavior
Complex Message Patterns - Handle sophisticated message flows
Basic Process Spawning - Test actor creation and lifecycle
Event Inspection - Debug and analyze actor behavior
Actor Termination - Test error handling and graceful shutdowns
Exit Signals - Manage process lifecycles in supervision trees
Scheduled Operations - Test cron jobs and time-based behavior
Network & Distribution - Test multi-node actor systems
Dynamic Value Capture - Handle generated IDs, timestamps, and random data
Complex Workflows - Test multi-step business processes
Performance & Load Testing - Verify behavior under stress
Tip: The documentation follows this learning path. You can jump to advanced topics if needed, but starting from the beginning ensures you understand the foundations.
Traditional testing tools don't work well with actors. Here's why:
Regular code testing follows a simple pattern:
// Traditional testing - call function, check result
result := calculateTax(income, rate)
assert.Equal(t, 1500.0, result)
But actors are fundamentally different:
They run asynchronously - you send a message and the response comes later
They maintain state - previous messages affect future behavior
They spawn other actors - creating complex hierarchies
They communicate only via messages - no direct access to internal state
They can fail and restart - requiring lifecycle testing
Asynchronous Communication
// This doesn't work with actors:
actor.SendMessage("process_order")
result := actor.GetResult() // ❌ No direct way to get result
Message Flow Complexity
// An actor might send multiple messages to different targets:
actor.SendMessage("start_workflow")
// ❌ How do you verify it sent the right messages to the right places?
Dynamic Process Creation
// Actors spawn other actors with generated IDs:
actor.SendMessage("create_worker")
// ❌ How do you test the spawned worker when you don't know its PID?
State Changes Over Time
// Actor behavior changes based on message history:
actor.SendMessage("login", user1)
actor.SendMessage("login", user2)
actor.SendMessage("get_users")
// ❌ How do you verify the internal state without breaking encapsulation?
The Ergo Unit Testing Library addresses each of these challenges:
Instead of guessing what happened, the library automatically captures every actor operation:
actor.SendMessage("process_order")
// Library automatically captures:
// - What messages were sent
// - Which processes were spawned
// - What was logged
// - When the actor terminated
Express your test intentions clearly:
actor.SendMessage("create_user", userData)
actor.ShouldSend().To("database").Message(SaveUser{...}).Once().Assert()
actor.ShouldSpawn().Factory(userWorkerFactory).Once().Assert()
actor.ShouldLog().Level(Info).Containing("User created").Assert()
Capture and reuse dynamically generated values:
actor.SendMessage("create_session")
sessionResult := actor.ShouldSpawn().Once().Capture()
sessionPID := sessionResult.PID // Use the actual generated PID in further tests
Test state indirectly by verifying behavioral changes:
actor.SendMessage("login", user1)
actor.SendMessage("get_status")
actor.ShouldSend().To(user1).Message(StatusResponse{LoggedIn: true}).Assert()
Actor testing is complex enough without dependency management headaches:
No version conflicts - Works with any Go testing setup
No external tools - Everything needed is built-in
Simple imports - Just import "ergo.services/ergo/testing/unit"
Fast execution - No overhead from external libraries
Now that you understand why actor testing is different, let's explore the key concepts that make this library work.
Everything your actor does becomes a testable "event".
When you run this simple test:
actor.SendMessage(sender, "hello")
actor.ShouldSend().To(sender).Message("hello").Assert()
Here's what happens behind the scenes:
Your actor receives the message - Normal actor behavior
Your actor sends a response - Normal actor behavior
The library captures a SendEvent
- Testing magic
You verify the captured event - Your assertion
The library automatically captures these events:
SendEvent
- When your actor sends a message
SpawnEvent
- When your actor creates child processes
LogEvent
- When your actor writes log messages
TerminateEvent
- When your actor shuts down
Events solve the fundamental challenge of testing asynchronous systems:
Instead of this (impossible):
actor.SendMessage("process_order")
result := actor.WaitForResult() // ❌ Actors don't work this way
You do this (works perfectly):
actor.SendMessage("process_order")
// Verify the actor did what it should do:
actor.ShouldSend().To("database").Message(SaveOrder{...}).Assert()
actor.ShouldSend().To("inventory").Message(CheckStock{...}).Assert()
actor.ShouldLog().Level(Info).Containing("Processing order").Assert()
The library provides a readable, chainable API that expresses test intentions clearly:
// Basic pattern: Actor.Should[Action]().Details().Assert()
actor.ShouldSend().To(recipient).Message(content).Once().Assert()
actor.ShouldSpawn().Factory(workerFactory).Times(3).Assert()
actor.ShouldLog().Level(Info).Containing("started").Assert()
actor.ShouldTerminate().WithReason(normalShutdown).Assert()
Benefits of the fluent API:
Readable - Tests read like English sentences
Discoverable - IDE autocomplete guides you through options
Flexible - Chain only the validations you need
Precise - Specify exactly what matters for each test
go get ergo.services/ergo/testing/unit
Let's start with the simplest possible actor test to understand the basics:
package main
import (
"testing"
"ergo.services/ergo/act"
"ergo.services/ergo/gen"
"ergo.services/ergo/testing/unit"
)
// EchoActor - receives a message and sends it back
type EchoActor struct {
act.Actor
}
func (e *EchoActor) HandleMessage(from gen.PID, message any) error {
// Simply echo the message back to sender
e.Send(from, message)
return nil
}
// Factory function to create the actor
func newEchoActor() gen.ProcessBehavior {
return &EchoActor{}
}
func TestEchoActor_BasicBehavior(t *testing.T) {
// 1. Create a test actor
actor, err := unit.Spawn(t, newEchoActor)
if err != nil {
t.Fatal(err)
}
// 2. Create a sender PID (who is sending the message)
sender := gen.PID{Node: "test", ID: 123}
// 3. Send a message to the actor
actor.SendMessage(sender, "hello world")
// 4. Verify the actor sent the message back
actor.ShouldSend().
To(sender). // Should send to the original sender
Message("hello world"). // Should send back the same message
Once(). // Should happen exactly once
Assert() // Check that it actually happened
}
This simple test demonstrates the core pattern:
unit.Spawn()
- Creates a test actor in an isolated environment
actor.SendMessage()
- Sends a message to your actor (like prod would)
actor.ShouldSend()
- Verifies that your actor sent the expected message
Key insight: You're not testing internal state - you're testing behavior. You verify what the actor does (sends messages) rather than what it contains (internal variables).
The testing library automatically captures everything your actor does:
Every message sent by your actor
Every process spawned by your actor
Every log message written by your actor
When your actor terminates
Then it provides fluent assertions to verify these captured events.
Let's test an actor that maintains some state:
type CounterActor struct {
act.Actor
count int
}
func (c *CounterActor) HandleMessage(from gen.PID, message any) error {
switch message {
case "increment":
c.count++
c.Send(from, c.count)
case "get":
c.Send(from, c.count)
case "reset":
c.count = 0
c.Send(from, "reset complete")
}
return nil
}
func TestCounterActor_StatefulBehavior(t *testing.T) {
actor, _ := unit.Spawn(t, func() gen.ProcessBehavior { return &CounterActor{} })
client := gen.PID{Node: "test", ID: 456}
// Test incrementing
actor.SendMessage(client, "increment")
actor.ShouldSend().To(client).Message(1).Once().Assert()
actor.SendMessage(client, "increment")
actor.ShouldSend().To(client).Message(2).Once().Assert()
// Test getting current value
actor.SendMessage(client, "get")
actor.ShouldSend().To(client).Message(2).Once().Assert()
// Test reset
actor.SendMessage(client, "reset")
actor.ShouldSend().To(client).Message("reset complete").Once().Assert()
// Verify reset worked
actor.SendMessage(client, "get")
actor.ShouldSend().To(client).Message(0).Once().Assert()
}
This shows how you test stateful behavior without accessing internal state - by observing how the actor's responses change over time.
Before diving into complex actor testing, let's cover the simple assertion utilities you'll use throughout your tests.
Why Built-in Assertions Matter for Actor Testing:
Actor tests often need to verify simple conditions alongside complex event assertions. Rather than forcing you to import external testing libraries (which could conflict with your project dependencies), the unit testing library provides everything you need:
func TestActorWithBuiltInAssertions(t *testing.T) {
actor, _ := unit.Spawn(t, newEchoActor)
// Use built-in assertions for simple checks
unit.NotNil(t, actor, "Actor should be created successfully")
unit.Equal(t, false, actor.IsTerminated(), "New actor should not be terminated")
// Combine with actor-specific assertions
actor.SendMessage(gen.PID{Node: "test", ID: 1}, "hello")
actor.ShouldSend().Message("hello").Once().Assert()
}
Equality Testing:
unit.Equal(t, expected, actual) // Values must be equal
unit.NotEqual(t, unexpected, actual) // Values must be different
Boolean Testing:
unit.True(t, condition) // Condition must be true
unit.False(t, condition) // Condition must be false
Nil Testing:
unit.Nil(t, value) // Value must be nil
unit.NotNil(t, value) // Value must not be nil
String Testing:
unit.Contains(t, "hello world", "world") // String must contain substring
Type Testing:
unit.IsType(t, "", actualValue) // Value must be of specific type
No Import Conflicts:
// ❌ This could cause version conflicts:
import "github.com/stretchr/testify/assert"
import "github.com/other/testing/lib"
// This always works:
import "ergo.services/ergo/testing/unit"
Consistent Error Messages: All assertions provide clear, consistent error messages that integrate well with the actor testing output.
Framework Agnostic:
Works with any Go testing setup - standard go test
, IDE test runners, CI/CD systems, etc.
Now that you understand the fundamentals, let's explore message testing in more depth.
Now you'll learn how to test different aspects of actor behavior, building from simple to complex:
Fundamentals (You're here!)
Basic message sending and receiving
Simple process creation
Logging and observability
Configuration testing
Intermediate Skills
Complex message patterns
Event inspection and debugging
Actor lifecycle and termination
Error handling and recovery
Advanced Features
Scheduled operations (cron jobs)
Network and distribution
Performance and load testing
Logging is crucial for production actors - it provides visibility into what your actors are doing and helps with debugging. Let's learn how to test logging behavior.
Logging tests ensure:
Your actors provide sufficient information for monitoring
Debug information is available when needed
Log levels are respected (don't log debug in production)
Sensitive operations are properly audited
func TestGreeter_LogsWelcomeMessage(t *testing.T) {
actor, _ := unit.Spawn(t, newGreeter, unit.WithLogLevel(gen.LogLevelInfo))
actor.SendMessage(gen.PID{}, Welcome{Name: "Alice"})
// Verify the actor logged the welcome
actor.ShouldLog().
Level(gen.LogLevelInfo).
Containing("Welcome Alice").
Once().
Assert()
}
func TestDataProcessor_LogLevels(t *testing.T) {
actor, _ := unit.Spawn(t, newDataProcessor, unit.WithLogLevel(gen.LogLevelDebug))
actor.SendMessage(gen.PID{}, ProcessData{Data: "sample"})
// Should log at info level for important events
actor.ShouldLog().Level(gen.LogLevelInfo).Containing("Processing started").Once().Assert()
// Should log at debug level for detailed info
actor.ShouldLog().Level(gen.LogLevelDebug).Containing("Processing sample data").Once().Assert()
// Should never log at error level for normal operations
actor.ShouldLog().Level(gen.LogLevelError).Times(0).Assert()
}
func TestAuditLogger_SecurityEvents(t *testing.T) {
actor, _ := unit.Spawn(t, newAuditLogger)
actor.SendMessage(gen.PID{}, LoginAttempt{User: "admin", Success: false})
// Verify security events are properly logged
actor.ShouldLog().MessageMatching(func(msg string) bool {
return strings.Contains(msg, "SECURITY") &&
strings.Contains(msg, "admin") &&
strings.Contains(msg, "failed")
}).Once().Assert()
}
Structure your log messages to make them easy to test:
// Good: Structured, predictable format
log.Info("User login: user=%s success=%t", userID, success)
// Poor: Hard to test reliably
log.Info("User " + userID + " tried to login and it " + result)
Test log levels appropriately:
Error
- Test that errors are logged when they occur
Warning
- Test that concerning but non-fatal events are captured
Info
- Test that important business events are recorded
Debug
- Test that detailed troubleshooting info is available
Now that you've mastered the basics, let's tackle more complex testing scenarios.
Real actors often behave differently based on configuration. Let's test this:
The Spawn
function creates an isolated testing environment for your actor. Unlike production actors that run in a complex node environment, test actors run in a controlled sandbox where every operation is captured for verification.
Key Benefits:
Isolation: Each test actor runs independently without affecting other tests
Deterministic: Test outcomes are predictable and repeatable
Observable: All actor operations are automatically captured as events
Configurable: Fine-tune the testing environment to match your needs
Example Actor:
type messageCounter struct {
act.Actor
count int
}
func (m *messageCounter) Init(args ...any) error {
m.count = 0
m.Log().Info("Counter initialized")
return nil
}
func (m *messageCounter) HandleMessage(from gen.PID, message any) error {
switch msg := message.(type) {
case "increment":
m.count++
m.Send("output", CountChanged{Count: m.count})
m.Log().Debug("Count incremented to %d", m.count)
return nil
case "get_count":
m.Send(from, CountResponse{Count: m.count})
return nil
case "reset":
m.count = 0
m.Send("output", CountReset{})
return nil
}
return nil
}
type CountChanged struct{ Count int }
type CountResponse struct{ Count int }
type CountReset struct{}
func factoryMessageCounter() gen.ProcessBehavior {
return &messageCounter{}
}
Test Implementation:
func TestMessageCounter_BasicUsage(t *testing.T) {
// Create test actor with configuration
actor, err := unit.Spawn(t, factoryMessageCounter,
unit.WithLogLevel(gen.LogLevelDebug),
unit.WithEnv(map[gen.Env]any{
"test_mode": true,
"timeout": 30,
}),
)
if err != nil {
t.Fatal(err)
}
// Test initialization
actor.ShouldLog().Level(gen.LogLevelInfo).Containing("Counter initialized").Once().Assert()
// Test message handling
actor.SendMessage(gen.PID{}, "increment")
actor.ShouldSend().To("output").Message(CountChanged{Count: 1}).Once().Assert()
actor.ShouldLog().Level(gen.LogLevelDebug).Containing("Count incremented to 1").Once().Assert()
// Test state query
actor.SendMessage(gen.PID{Node: "test", ID: 123}, "get_count")
actor.ShouldSend().To(gen.PID{Node: "test", ID: 123}).Message(CountResponse{Count: 1}).Once().Assert()
// Test reset
actor.SendMessage(gen.PID{}, "reset")
actor.ShouldSend().To("output").Message(CountReset{}).Once().Assert()
}
Test configuration allows you to simulate different runtime conditions without requiring complex setup:
// Available options for unit.Spawn()
unit.WithLogLevel(gen.LogLevelDebug) // Set log level
unit.WithEnv(map[gen.Env]any{"key": "value"}) // Environment variables
unit.WithParent(gen.PID{Node: "parent", ID: 100}) // Parent process
unit.WithRegister(gen.Atom("registered_name")) // Register with name
unit.WithNodeName(gen.Atom("test_node@localhost")) // Node name
Environment Variables (WithEnv
): Test how your actors behave with different configurations without changing production code. Useful for testing feature flags, database URLs, timeout values, and other configuration-driven behavior.
Log Levels (WithLogLevel
): Control the verbosity of test output and verify that your actors log appropriately at different levels. Critical for testing monitoring and debugging capabilities.
Process Hierarchy (WithParent
, WithRegister
): Test actors that need to interact with parent processes or require specific naming for registration-based lookups.
ShouldSend()
- Verifying Actor CommunicationMessage testing is the heart of actor validation. Since actors communicate exclusively through messages, verifying message flow is crucial for ensuring correct behavior.
Why Message Testing Matters:
Validates Integration: Ensures actors communicate correctly with their dependencies
Confirms Business Logic: Verifies that the right messages are sent in response to inputs
Detects Side Effects: Catches unintended message sends that could cause bugs
Tests Message Content: Validates that message payloads contain correct data
Example Actor:
type notificationService struct {
act.Actor
subscribers []gen.PID
}
func (n *notificationService) HandleMessage(from gen.PID, message any) error {
switch msg := message.(type) {
case Subscribe:
n.subscribers = append(n.subscribers, msg.PID)
n.Send(msg.PID, SubscriptionConfirmed{})
return nil
case Broadcast:
for _, subscriber := range n.subscribers {
n.Send(subscriber, Notification{
ID: msg.ID,
Message: msg.Message,
Sender: from,
})
}
n.Send("analytics", BroadcastSent{
ID: msg.ID,
Subscribers: len(n.subscribers),
})
return nil
}
return nil
}
type Subscribe struct{ PID gen.PID }
type SubscriptionConfirmed struct{}
type Broadcast struct{ ID string; Message string }
type Notification struct{ ID, Message string; Sender gen.PID }
type BroadcastSent struct{ ID string; Subscribers int }
Test Implementation:
func TestNotificationService_MessageSending(t *testing.T) {
actor, _ := unit.Spawn(t, factoryNotificationService)
subscriber1 := gen.PID{Node: "test", ID: 101}
subscriber2 := gen.PID{Node: "test", ID: 102}
// Test subscription
actor.SendMessage(gen.PID{}, Subscribe{PID: subscriber1})
actor.SendMessage(gen.PID{}, Subscribe{PID: subscriber2})
// Verify subscription confirmations
actor.ShouldSend().To(subscriber1).Message(SubscriptionConfirmed{}).Once().Assert()
actor.ShouldSend().To(subscriber2).Message(SubscriptionConfirmed{}).Once().Assert()
// Test broadcast
broadcaster := gen.PID{Node: "test", ID: 200}
actor.SendMessage(broadcaster, Broadcast{ID: "msg-123", Message: "Hello World"})
// Verify notifications sent to all subscribers
actor.ShouldSend().To(subscriber1).MessageMatching(func(msg any) bool {
if notif, ok := msg.(Notification); ok {
return notif.ID == "msg-123" &&
notif.Message == "Hello World" &&
notif.Sender == broadcaster
}
return false
}).Once().Assert()
actor.ShouldSend().To(subscriber2).MessageMatching(func(msg any) bool {
if notif, ok := msg.(Notification); ok {
return notif.ID == "msg-123" && notif.Message == "Hello World"
}
return false
}).Once().Assert()
// Verify analytics
actor.ShouldSend().To("analytics").Message(BroadcastSent{
ID: "msg-123",
Subscribers: 2,
}).Once().Assert()
// Test multiple sends to same target
actor.SendMessage(broadcaster, Broadcast{ID: "msg-124", Message: "Second message"})
actor.ShouldSend().To("analytics").Times(2).Assert() // Total of 2 analytics messages
}
When testing complex message structures or dynamic content, the library provides powerful matching capabilities:
// Message type matching
actor.ShouldSend().MessageMatching(unit.IsTypeGeneric[CountChanged]()).Assert()
// Field-based matching
actor.ShouldSend().MessageMatching(unit.HasField("Count", unit.Equals(5))).Assert()
// Structure matching with custom field validation
actor.ShouldSend().MessageMatching(
unit.StructureMatching(Notification{}, map[string]unit.Matcher{
"ID": unit.Equals("msg-123"),
"Sender": unit.IsValidPID(),
}),
).Assert()
// Never sent verification
actor.ShouldNotSend().To("error_handler").Message("error").Assert()
Pattern Matching Benefits:
Partial Validation: Test only the fields that matter for your specific test case
Dynamic Content Handling: Validate messages with timestamps, UUIDs, or generated IDs
Type Safety: Ensure messages are of the correct type even when content varies
Negative Testing: Verify that certain messages are NOT sent in specific scenarios
ShouldSpawn()
- Testing Process Lifecycle ManagementProcess spawning is a fundamental actor pattern for building hierarchical systems. The testing library provides comprehensive tools for verifying that actors create, configure, and manage child processes correctly.
Why Process Spawning Tests Matter:
Resource Management: Ensure actors don't spawn too many or too few processes
Configuration Propagation: Verify that child processes receive correct configuration
Error Handling: Test behavior when process spawning fails
Supervision Trees: Validate that supervisors manage their children appropriately
Example Actor:
type workerSupervisor struct {
act.Actor
workers map[string]gen.PID
maxWorkers int
}
func (w *workerSupervisor) Init(args ...any) error {
w.workers = make(map[string]gen.PID)
w.maxWorkers = 3
return nil
}
func (w *workerSupervisor) HandleMessage(from gen.PID, message any) error {
switch msg := message.(type) {
case StartWorker:
if len(w.workers) >= w.maxWorkers {
w.Send(from, WorkerError{Error: "max workers reached"})
return nil
}
// Spawn worker with dynamic name
workerPID, err := w.Spawn(factoryWorker, gen.ProcessOptions{}, msg.WorkerID)
if err != nil {
w.Send(from, WorkerError{Error: err.Error()})
return nil
}
w.workers[msg.WorkerID] = workerPID
w.Send(from, WorkerStarted{WorkerID: msg.WorkerID, PID: workerPID})
w.Send("monitor", SupervisorStatus{
ActiveWorkers: len(w.workers),
MaxWorkers: w.maxWorkers,
})
return nil
case StopWorker:
if pid, exists := w.workers[msg.WorkerID]; exists {
w.SendExit(pid, gen.TerminateReasonShutdown)
delete(w.workers, msg.WorkerID)
w.Send(from, WorkerStopped{WorkerID: msg.WorkerID})
}
return nil
case StopAllWorkers:
for workerID, pid := range w.workers {
w.SendExit(pid, gen.TerminateReasonShutdown)
delete(w.workers, workerID)
}
w.Send(from, AllWorkersStopped{Count: len(w.workers)})
return nil
}
return nil
}
type StartWorker struct{ WorkerID string }
type StopWorker struct{ WorkerID string }
type StopAllWorkers struct{}
type WorkerStarted struct{ WorkerID string; PID gen.PID }
type WorkerStopped struct{ WorkerID string }
type WorkerError struct{ Error string }
type AllWorkersStopped struct{ Count int }
type SupervisorStatus struct{ ActiveWorkers, MaxWorkers int }
func factoryWorker() gen.ProcessBehavior { return &worker{} }
func factoryWorkerSupervisor() gen.ProcessBehavior { return &workerSupervisor{} }
type worker struct{ act.Actor }
func (w *worker) HandleMessage(from gen.PID, message any) error { return nil }
Test Implementation:
func TestWorkerSupervisor_SpawnManagement(t *testing.T) {
actor, _ := unit.Spawn(t, factoryWorkerSupervisor)
client := gen.PID{Node: "test", ID: 999}
// Test worker spawning
actor.SendMessage(client, StartWorker{WorkerID: "worker-1"})
// Capture the spawn event to get the PID
spawnResult := actor.ShouldSpawn().Factory(factoryWorker).Once().Capture()
unit.NotNil(t, spawnResult)
// Verify worker started response
actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
if started, ok := msg.(WorkerStarted); ok {
return started.WorkerID == "worker-1" && started.PID == spawnResult.PID
}
return false
}).Once().Assert()
// Verify monitor notification
actor.ShouldSend().To("monitor").Message(SupervisorStatus{
ActiveWorkers: 1,
MaxWorkers: 3,
}).Once().Assert()
// Test multiple workers
actor.SendMessage(client, StartWorker{WorkerID: "worker-2"})
actor.SendMessage(client, StartWorker{WorkerID: "worker-3"})
// Should have spawned 3 workers total
actor.ShouldSpawn().Factory(factoryWorker).Times(3).Assert()
// Test max worker limit
actor.SendMessage(client, StartWorker{WorkerID: "worker-4"})
actor.ShouldSend().To(client).Message(WorkerError{Error: "max workers reached"}).Once().Assert()
// Should still only have 3 spawned workers
actor.ShouldSpawn().Factory(factoryWorker).Times(3).Assert()
// Test stopping a worker
actor.SendMessage(client, StopWorker{WorkerID: "worker-1"})
actor.ShouldSend().To(client).Message(WorkerStopped{WorkerID: "worker-1"}).Once().Assert()
}
Real-world actors often generate dynamic values like session IDs, request tokens, or timestamps. The library provides sophisticated tools for capturing and validating these dynamic values.
func TestDynamicProcessCreation(t *testing.T) {
actor, _ := unit.Spawn(t, factoryTaskProcessor)
// Test dynamic process creation with captured PIDs
actor.SendMessage(gen.PID{}, CreateSessionWorker{UserID: "user123"})
// Capture the spawn to get dynamic PID
spawnResult := actor.ShouldSpawn().Once().Capture()
sessionPID := spawnResult.PID
// Verify session was registered with the dynamic PID
actor.ShouldSend().To("session_registry").MessageMatching(func(msg any) bool {
if reg, ok := msg.(SessionRegistered); ok {
return reg.UserID == "user123" && reg.SessionPID == sessionPID
}
return false
}).Once().Assert()
// Test sending work to the dynamic session
actor.SendMessage(gen.PID{}, SendToSession{
UserID: "user123",
Task: "process_data",
})
// Should route to the captured session PID
actor.ShouldSend().To(sessionPID).MessageMatching(func(msg any) bool {
if task, ok := msg.(SessionTask); ok {
return task.Task == "process_data"
}
return false
}).Once().Assert()
}
// Required message types for this example:
type CreateSessionWorker struct{ UserID string }
type SessionRegistered struct{ UserID string; SessionPID gen.PID }
type SendToSession struct{ UserID, Task string }
type SessionTask struct{ Task string }
// factoryTaskProcessor() gen.ProcessBehavior function would be defined separately
Dynamic Value Testing Scenarios:
Session Management: Test actors that create sessions with generated IDs
Request Tracking: Verify that request tokens are properly generated and used
Time-based Operations: Validate actors that schedule work or create timestamps
Resource Allocation: Test dynamic assignment of resources to processes
ShouldRemoteSpawn()
- Testing Distributed Actor CreationRemote spawn testing allows you to verify that actors correctly create processes on remote nodes in a distributed system. The testing library captures RemoteSpawnEvent
operations and provides fluent assertions for validation.
Why Test Remote Spawning:
Distribution Logic: Ensure actors spawn processes on the correct remote nodes
Load Distribution: Verify round-robin or other distribution strategies work correctly
Error Handling: Test behavior when remote nodes are unavailable
Resource Management: Validate that remote spawning respects capacity limits
Example Actor:
type distributedCoordinator struct {
act.Actor
nodeAvailability map[gen.Atom]bool
roundRobin int
}
func (dc *distributedCoordinator) HandleMessage(from gen.PID, message any) error {
switch msg := message.(type) {
case SpawnRemoteWorker:
if !dc.isNodeAvailable(msg.NodeName) {
dc.Send(from, RemoteSpawnError{
NodeName: msg.NodeName,
Error: "node not available",
})
return nil
}
// Use RemoteSpawn which generates RemoteSpawnEvent
pid, err := dc.RemoteSpawn(msg.NodeName, msg.WorkerName, gen.ProcessOptions{}, msg.Config)
if err != nil {
dc.Send(from, RemoteSpawnError{NodeName: msg.NodeName, Error: err.Error()})
return nil
}
dc.Send(from, RemoteWorkerSpawned{
NodeName: msg.NodeName,
WorkerName: msg.WorkerName,
PID: pid,
})
return nil
case SpawnRemoteService:
// Use RemoteSpawnRegister which generates RemoteSpawnEvent with registration
pid, err := dc.RemoteSpawnRegister(msg.NodeName, msg.ServiceName, msg.RegisterName, gen.ProcessOptions{})
if err != nil {
dc.Send(from, RemoteSpawnError{NodeName: msg.NodeName, Error: err.Error()})
return nil
}
dc.Send(from, RemoteServiceSpawned{
NodeName: msg.NodeName,
ServiceName: msg.ServiceName,
RegisterName: msg.RegisterName,
PID: pid,
})
return nil
}
return nil
}
type SpawnRemoteWorker struct{ NodeName, WorkerName gen.Atom; Config map[string]any }
type SpawnRemoteService struct{ NodeName, ServiceName, RegisterName gen.Atom }
type RemoteWorkerSpawned struct{ NodeName, WorkerName gen.Atom; PID gen.PID }
type RemoteServiceSpawned struct{ NodeName, ServiceName, RegisterName gen.Atom; PID gen.PID }
type RemoteSpawnError struct{ NodeName gen.Atom; Error string }
Test Implementation:
func TestDistributedCoordinator_RemoteSpawn(t *testing.T) {
actor, _ := unit.Spawn(t, factoryDistributedCoordinator)
// Setup remote nodes for testing
actor.CreateRemoteNode("worker@node1", true) // Available
actor.CreateRemoteNode("worker@node2", false) // Unavailable
clientPID := gen.PID{Node: "test", ID: 100}
actor.ClearEvents() // Clear initialization events
// Test basic remote spawn
actor.SendMessage(clientPID, SpawnRemoteWorker{
NodeName: "worker@node1",
WorkerName: "data-processor",
Config: map[string]any{"timeout": 30},
})
// Verify remote spawn event
actor.ShouldRemoteSpawn().
ToNode("worker@node1").
WithName("data-processor").
Once().
Assert()
// Test remote spawn with registration
actor.SendMessage(clientPID, SpawnRemoteService{
NodeName: "worker@node1",
ServiceName: "user-service",
RegisterName: "users",
})
// Verify remote spawn with register
actor.ShouldRemoteSpawn().
ToNode("worker@node1").
WithName("user-service").
WithRegister("users").
Once().
Assert()
// Test total remote spawns
actor.ShouldRemoteSpawn().Times(2).Assert()
// Test negative assertion - should not spawn on unavailable node
actor.SendMessage(clientPID, SpawnRemoteWorker{
NodeName: "worker@node2",
WorkerName: "test-worker",
})
actor.ShouldNotRemoteSpawn().ToNode("worker@node2").Assert()
}
Advanced Remote Spawn Patterns:
Multi-Node Distribution: Test round-robin or other distribution strategies across multiple nodes
Error Scenarios: Verify proper error handling when nodes are unavailable
Event Inspection: Direct inspection of RemoteSpawnEvent
for detailed validation
Negative Assertions: Ensure remote spawns don't happen under certain conditions
ShouldTerminate()
- Testing Actor Lifecycle CompletionActor termination is a critical aspect of actor systems. Actors can terminate for various reasons: normal completion, explicit shutdown, or errors. The testing library provides comprehensive tools for validating termination behavior and ensuring proper cleanup.
Why Test Actor Termination:
Resource Cleanup: Ensure actors properly clean up resources when terminating
Error Propagation: Verify that errors are handled correctly and lead to appropriate termination
Graceful Shutdown: Test that actors respond correctly to shutdown signals
Supervision Trees: Validate that supervisors handle child termination appropriately
Termination Reasons:
gen.TerminateReasonNormal
- Normal completion of actor work
gen.TerminateReasonShutdown
- Graceful shutdown request
Custom errors - Abnormal termination due to specific errors
Example Actor:
type connectionManager struct {
act.Actor
connections map[string]*Connection
maxRetries int
}
func (c *connectionManager) Init(args ...any) error {
c.connections = make(map[string]*Connection)
c.maxRetries = 3
c.Log().Info("Connection manager started")
return nil
}
func (c *connectionManager) HandleMessage(from gen.PID, message any) error {
switch msg := message.(type) {
case CreateConnection:
conn := &Connection{ID: msg.ID, Status: "active"}
c.connections[msg.ID] = conn
c.Send(from, ConnectionCreated{ID: msg.ID})
c.Log().Info("Created connection %s", msg.ID)
return nil
case CloseConnection:
if conn, exists := c.connections[msg.ID]; exists {
conn.Close()
delete(c.connections, msg.ID)
c.Send(from, ConnectionClosed{ID: msg.ID})
c.Log().Info("Closed connection %s", msg.ID)
}
return nil
case "shutdown":
// Graceful shutdown - close all connections
for id, conn := range c.connections {
conn.Close()
c.Log().Info("Shutdown: closed connection %s", id)
}
c.Send("monitor", ShutdownComplete{ConnectionsClosed: len(c.connections)})
return gen.TerminateReasonShutdown
case ConnectionError:
c.Log().Error("Connection error for %s: %s", msg.ID, msg.Error)
msg.RetryCount++
if msg.RetryCount >= c.maxRetries {
c.Log().Error("Max retries exceeded for connection %s", msg.ID)
return fmt.Errorf("connection failed after %d retries: %s", c.maxRetries, msg.Error)
}
// Retry the connection
c.Send(c.PID(), CreateConnection{ID: msg.ID})
return nil
case "force_error":
// Simulate critical error
return fmt.Errorf("critical system error: database unavailable")
}
return nil
}
type CreateConnection struct{ ID string }
type CloseConnection struct{ ID string }
type ConnectionCreated struct{ ID string }
type ConnectionClosed struct{ ID string }
type ConnectionError struct{ ID, Error string; RetryCount int }
type ShutdownComplete struct{ ConnectionsClosed int }
type Connection struct {
ID string
Status string
}
func (c *Connection) Close() { c.Status = "closed" }
func factoryConnectionManager() gen.ProcessBehavior {
return &connectionManager{}
}
Test Implementation:
func TestConnectionManager_TerminationHandling(t *testing.T) {
actor, _ := unit.Spawn(t, factoryConnectionManager)
client := gen.PID{Node: "test", ID: 100}
// Test normal operation first
actor.SendMessage(client, CreateConnection{ID: "conn-1"})
actor.ShouldSend().To(client).Message(ConnectionCreated{ID: "conn-1"}).Once().Assert()
// Verify actor is not terminated during normal operation
unit.Equal(t, false, actor.IsTerminated())
unit.Nil(t, actor.TerminationReason())
// Test graceful shutdown
actor.SendMessage(client, "shutdown")
// Verify shutdown message sent
actor.ShouldSend().To("monitor").MessageMatching(func(msg any) bool {
if shutdown, ok := msg.(ShutdownComplete); ok {
return shutdown.ConnectionsClosed == 1
}
return false
}).Once().Assert()
// Verify graceful termination
unit.Equal(t, true, actor.IsTerminated())
unit.Equal(t, gen.TerminateReasonShutdown, actor.TerminationReason())
// Verify termination event was captured
actor.ShouldTerminate().
WithReason(gen.TerminateReasonShutdown).
Once().
Assert()
}
func TestConnectionManager_ErrorTermination(t *testing.T) {
actor, _ := unit.Spawn(t, factoryConnectionManager)
// Test abnormal termination due to critical error
actor.SendMessage(gen.PID{}, "force_error")
// Verify actor terminated with error
unit.Equal(t, true, actor.IsTerminated())
unit.NotNil(t, actor.TerminationReason())
unit.Contains(t, actor.TerminationReason().Error(), "critical system error")
// Verify termination event with specific error
actor.ShouldTerminate().
ReasonMatching(func(reason error) bool {
return strings.Contains(reason.Error(), "database unavailable")
}).
Once().
Assert()
}
func TestConnectionManager_RetryBeforeTermination(t *testing.T) {
actor, _ := unit.Spawn(t, factoryConnectionManager)
// Test retry logic before termination
actor.SendMessage(gen.PID{}, CreateConnection{ID: "conn-retry"})
actor.ClearEvents() // Clear creation events
// Send connection errors that should trigger retries
for i := 0; i < 2; i++ {
actor.SendMessage(gen.PID{}, ConnectionError{
ID: "conn-retry",
Error: "network timeout",
RetryCount: i,
})
// Should not terminate yet
unit.Equal(t, false, actor.IsTerminated())
// Should retry by sending CreateConnection
actor.ShouldSend().To(actor.PID()).MessageMatching(func(msg any) bool {
if create, ok := msg.(CreateConnection); ok {
return create.ID == "conn-retry"
}
return false
}).Once().Assert()
}
// Final error that exceeds max retries
actor.SendMessage(gen.PID{}, ConnectionError{
ID: "conn-retry",
Error: "network timeout",
RetryCount: 3, // Exceeds maxRetries
})
// Now should terminate with error
unit.Equal(t, true, actor.IsTerminated())
unit.Contains(t, actor.TerminationReason().Error(), "connection failed after 3 retries")
// Verify termination assertion
actor.ShouldTerminate().
ReasonMatching(func(reason error) bool {
return strings.Contains(reason.Error(), "retries") &&
strings.Contains(reason.Error(), "network timeout")
}).
Once().
Assert()
}
func TestTerminatedActor_NoFurtherProcessing(t *testing.T) {
actor, _ := unit.Spawn(t, factoryConnectionManager)
// Terminate the actor
actor.SendMessage(gen.PID{}, "force_error")
unit.Equal(t, true, actor.IsTerminated())
actor.ClearEvents() // Clear termination events
// Try to send more messages - should not be processed
actor.SendMessage(gen.PID{}, CreateConnection{ID: "should-not-work"})
// Should not process the message (no CreateConnection response)
actor.ShouldNotSend().To(gen.PID{}).Message(ConnectionCreated{ID: "should-not-work"}).Assert()
// Should not create any new events
events := actor.Events()
unit.Equal(t, 0, len(events), "Terminated actor should not process messages")
}
#### Termination Testing Methods
**TestActor Termination Status:**
```go
// Check if actor is terminated
isTerminated := actor.IsTerminated() // bool
// Get termination reason (nil if not terminated)
reason := actor.TerminationReason() // error or nil
// Test that actor should terminate
actor.ShouldTerminate().Once().Assert()
// Test with specific reason
actor.ShouldTerminate().WithReason(gen.TerminateReasonShutdown).Assert()
// Test with reason matching
actor.ShouldTerminate().ReasonMatching(func(reason error) bool {
return strings.Contains(reason.Error(), "expected error")
}).Assert()
// Test that actor should NOT terminate
actor.ShouldNotTerminate().Assert()
Advanced Termination Patterns:
// Test multiple termination attempts
actor.ShouldTerminate().Times(1).Assert() // Should terminate exactly once
// Capture termination for detailed analysis
terminationResult := actor.ShouldTerminate().Once().Capture()
unit.NotNil(t, terminationResult)
unit.Equal(t, expectedReason, terminationResult.Reason)
// Test termination with timeout
success := unit.WithTimeout(func() {
actor.SendMessage(gen.PID{}, "shutdown")
actor.ShouldTerminate().Once().Assert()
}, 5*time.Second)
unit.True(t, success(), "Actor should terminate within timeout")
ShouldSendExit()
- Testing Graceful Process TerminationExit signals (SendExit
and SendExitMeta
) are used to gracefully terminate other processes. This is different from actor self-termination - it's about one actor telling another to exit. The testing library provides comprehensive assertions for validating exit signal behavior.
Why Test Exit Signals:
Graceful Shutdown: Ensure supervisors can properly terminate child processes
Resource Cleanup: Verify that exit signals trigger proper cleanup in target processes
Error Propagation: Test that failure conditions are communicated via exit signals
Supervision Trees: Validate that supervisors manage process lifecycles correctly
Example Actor:
type processSupervisor struct {
act.Actor
workers map[string]gen.PID
maxWorkers int
}
func (p *processSupervisor) Init(args ...any) error {
p.workers = make(map[string]gen.PID)
p.maxWorkers = 5
return nil
}
func (p *processSupervisor) HandleMessage(from gen.PID, message any) error {
switch msg := message.(type) {
case StartWorker:
if len(p.workers) >= p.maxWorkers {
p.Send(from, WorkerStartError{Error: "max workers reached"})
return nil
}
workerPID, err := p.Spawn(factoryWorkerProcess, gen.ProcessOptions{}, msg.WorkerID)
if err != nil {
p.Send(from, WorkerStartError{Error: err.Error()})
return nil
}
p.workers[msg.WorkerID] = workerPID
p.Send(from, WorkerStarted{WorkerID: msg.WorkerID, PID: workerPID})
return nil
case StopWorker:
if workerPID, exists := p.workers[msg.WorkerID]; exists {
// Send exit signal to worker
p.SendExit(workerPID, gen.TerminateReasonShutdown)
delete(p.workers, msg.WorkerID)
p.Send(from, WorkerStopped{WorkerID: msg.WorkerID})
p.Log().Info("Sent exit signal to worker %s", msg.WorkerID)
} else {
p.Send(from, WorkerStopError{WorkerID: msg.WorkerID, Error: "worker not found"})
}
return nil
case EmergencyShutdown:
// Send exit signals to all workers with error reason
shutdownReason := fmt.Errorf("emergency shutdown: %s", msg.Reason)
for workerID, workerPID := range p.workers {
p.SendExit(workerPID, shutdownReason)
p.Log().Warning("Emergency shutdown: sent exit to worker %s", workerID)
}
// Send meta exit signal to monitoring system
p.SendExitMeta(gen.PID{Node: "monitor", ID: 999}, shutdownReason)
p.Send(from, EmergencyShutdownComplete{
WorkersTerminated: len(p.workers),
Reason: msg.Reason,
})
p.workers = make(map[string]gen.PID) // Clear workers map
return nil
case TerminateWorkerWithError:
if workerPID, exists := p.workers[msg.WorkerID]; exists {
errorReason := fmt.Errorf("worker error: %s", msg.Error)
p.SendExit(workerPID, errorReason)
delete(p.workers, msg.WorkerID)
p.Send(from, WorkerTerminated{
WorkerID: msg.WorkerID,
Reason: msg.Error,
})
}
return nil
}
return nil
}
type StartWorker struct{ WorkerID string }
type StopWorker struct{ WorkerID string }
type EmergencyShutdown struct{ Reason string }
type TerminateWorkerWithError struct{ WorkerID, Error string }
type WorkerStarted struct{ WorkerID string; PID gen.PID }
type WorkerStopped struct{ WorkerID string }
type WorkerStartError struct{ Error string }
type WorkerStopError struct{ WorkerID, Error string }
type EmergencyShutdownComplete struct{ WorkersTerminated int; Reason string }
type WorkerTerminated struct{ WorkerID, Reason string }
type workerProcess struct{ act.Actor }
func (w *workerProcess) HandleMessage(from gen.PID, message any) error { return nil }
func factoryWorkerProcess() gen.ProcessBehavior { return &workerProcess{} }
func factoryProcessSupervisor() gen.ProcessBehavior { return &processSupervisor{} }
Test Implementation:
func TestProcessSupervisor_ExitSignals(t *testing.T) {
actor, _ := unit.Spawn(t, factoryProcessSupervisor)
client := gen.PID{Node: "test", ID: 100}
// Start some workers
actor.SendMessage(client, StartWorker{WorkerID: "worker-1"})
actor.SendMessage(client, StartWorker{WorkerID: "worker-2"})
// Capture worker PIDs for validation
spawn1 := actor.ShouldSpawn().Factory(factoryWorkerProcess).Once().Capture()
spawn2 := actor.ShouldSpawn().Factory(factoryWorkerProcess).Once().Capture()
worker1PID := spawn1.PID
worker2PID := spawn2.PID
actor.ClearEvents() // Clear spawn events
// Test graceful worker stop
actor.SendMessage(client, StopWorker{WorkerID: "worker-1"})
// Verify exit signal sent to worker
actor.ShouldSendExit().
To(worker1PID).
WithReason(gen.TerminateReasonShutdown).
Once().
Assert()
// Verify stop confirmation
actor.ShouldSend().To(client).Message(WorkerStopped{WorkerID: "worker-1"}).Once().Assert()
// Test worker termination with custom error
actor.SendMessage(client, TerminateWorkerWithError{
WorkerID: "worker-2",
Error: "memory leak detected",
})
// Verify exit signal with custom error reason
actor.ShouldSendExit().
To(worker2PID).
ReasonMatching(func(reason error) bool {
return strings.Contains(reason.Error(), "memory leak detected")
}).
Once().
Assert()
// Verify termination response
actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
if terminated, ok := msg.(WorkerTerminated); ok {
return terminated.WorkerID == "worker-2" &&
terminated.Reason == "memory leak detected"
}
return false
}).Once().Assert()
}
func TestProcessSupervisor_EmergencyShutdown(t *testing.T) {
actor, _ := unit.Spawn(t, factoryProcessSupervisor)
client := gen.PID{Node: "test", ID: 100}
// Start multiple workers
for i := 1; i <= 3; i++ {
actor.SendMessage(client, StartWorker{WorkerID: fmt.Sprintf("worker-%d", i)})
}
// Capture all worker PIDs
workers := make([]gen.PID, 3)
for i := 0; i < 3; i++ {
spawn := actor.ShouldSpawn().Factory(factoryWorkerProcess).Once().Capture()
workers[i] = spawn.PID
}
actor.ClearEvents() // Clear spawn events
// Trigger emergency shutdown
actor.SendMessage(client, EmergencyShutdown{Reason: "system overload"})
// Verify exit signals sent to all workers
for _, workerPID := range workers {
actor.ShouldSendExit().
To(workerPID).
ReasonMatching(func(reason error) bool {
return strings.Contains(reason.Error(), "emergency shutdown") &&
strings.Contains(reason.Error(), "system overload")
}).
Once().
Assert()
}
// Verify meta exit signal sent to monitoring
monitorPID := gen.PID{Node: "monitor", ID: 999}
actor.ShouldSendExitMeta().
To(monitorPID).
ReasonMatching(func(reason error) bool {
return strings.Contains(reason.Error(), "system overload")
}).
Once().
Assert()
// Verify shutdown completion message
actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
if complete, ok := msg.(EmergencyShutdownComplete); ok {
return complete.WorkersTerminated == 3 &&
complete.Reason == "system overload"
}
return false
}).Once().Assert()
// Verify total exit signals (3 workers + 1 meta)
actor.ShouldSendExit().Times(3).Assert()
actor.ShouldSendExitMeta().Times(1).Assert()
}
func TestExitSignal_NegativeAssertions(t *testing.T) {
actor, _ := unit.Spawn(t, factoryProcessSupervisor)
client := gen.PID{Node: "test", ID: 100}
// Try to stop non-existent worker
actor.SendMessage(client, StopWorker{WorkerID: "non-existent"})
// Should not send any exit signals
actor.ShouldNotSendExit().Assert()
actor.ShouldNotSendExitMeta().Assert()
// Should send error response instead
actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
if stopError, ok := msg.(WorkerStopError); ok {
return stopError.WorkerID == "non-existent" &&
stopError.Error == "worker not found"
}
return false
}).Once().Assert()
}
Basic Exit Signal Assertions:
// Test that exit signal was sent
actor.ShouldSendExit().To(targetPID).Once().Assert()
// Test with specific reason
actor.ShouldSendExit().To(targetPID).WithReason(gen.TerminateReasonShutdown).Assert()
// Test with reason matching
actor.ShouldSendExit().ReasonMatching(func(reason error) bool {
return strings.Contains(reason.Error(), "expected error")
}).Assert()
// Test meta exit signals
actor.ShouldSendExitMeta().To(monitorPID).WithReason(errorReason).Assert()
// Negative assertions
actor.ShouldNotSendExit().To(targetPID).Assert()
actor.ShouldNotSendExitMeta().Assert()
Advanced Exit Signal Patterns:
// Test multiple exit signals
actor.ShouldSendExit().Times(3).Assert() // Should send exactly 3 exit signals
// Test exit signals to specific targets
actor.ShouldSendExit().To(worker1PID).Once().Assert()
actor.ShouldSendExit().To(worker2PID).Once().Assert()
// Capture exit signal for detailed analysis
exitResult := actor.ShouldSendExit().Once().Capture()
unit.NotNil(t, exitResult)
unit.Equal(t, expectedPID, exitResult.To)
unit.Equal(t, expectedReason, exitResult.Reason)
// Combined assertions
actor.ShouldSendExit().To(workerPID).WithReason(gen.TerminateReasonShutdown).Once().Assert()
actor.ShouldSendExitMeta().To(monitorPID).ReasonMatching(func(r error) bool {
return strings.Contains(r.Error(), "shutdown complete")
}).Once().Assert()
ShouldAddCronJob()
, ShouldExecuteCronJob()
- Testing Scheduled OperationsCron job testing allows you to validate scheduled operations in your actors without waiting for real time to pass. The testing library provides comprehensive mock time support and detailed cron job lifecycle management.
Why Test Cron Jobs:
Schedule Validation: Ensure cron expressions are correct and jobs run at expected times
Job Management: Test job addition, removal, enabling, and disabling operations
Execution Logic: Verify that scheduled operations perform correctly when triggered
Time Control: Use mock time to test time-dependent behavior deterministically
Cron Testing Features:
Mock Time Support: Control time flow for deterministic testing
Job Lifecycle Testing: Validate job creation, scheduling, execution, and cleanup
Event Tracking: Monitor all cron-related operations and state changes
Schedule Simulation: Test complex scheduling scenarios without real time delays
Example Actor:
type taskScheduler struct {
act.Actor
taskCounter int
schedules map[string]gen.CronJobSchedule
}
func (t *taskScheduler) Init(args ...any) error {
t.taskCounter = 0
t.schedules = make(map[string]gen.CronJobSchedule)
t.Log().Info("Task scheduler started")
return nil
}
func (t *taskScheduler) HandleMessage(from gen.PID, message any) error {
switch msg := message.(type) {
case ScheduleTask:
// Add a new cron job
jobID, err := t.Cron().AddJob(msg.Schedule, gen.CronJobFunction(func() {
t.taskCounter++
t.Send("output", TaskExecuted{
TaskID: msg.TaskID,
Count: t.taskCounter,
Timestamp: time.Now(),
})
t.Log().Info("Executed scheduled task %s (count: %d)", msg.TaskID, t.taskCounter)
}))
if err != nil {
t.Send(from, ScheduleError{TaskID: msg.TaskID, Error: err.Error()})
return nil
}
t.schedules[msg.TaskID] = gen.CronJobSchedule{ID: jobID, Schedule: msg.Schedule}
t.Send(from, TaskScheduled{TaskID: msg.TaskID, JobID: jobID})
t.Log().Debug("Scheduled task %s with job ID %s", msg.TaskID, jobID)
return nil
case UnscheduleTask:
if schedule, exists := t.schedules[msg.TaskID]; exists {
err := t.Cron().RemoveJob(schedule.ID)
if err != nil {
t.Send(from, UnscheduleError{TaskID: msg.TaskID, Error: err.Error()})
return nil
}
delete(t.schedules, msg.TaskID)
t.Send(from, TaskUnscheduled{TaskID: msg.TaskID})
t.Log().Debug("Unscheduled task %s", msg.TaskID)
} else {
t.Send(from, UnscheduleError{TaskID: msg.TaskID, Error: "task not found"})
}
return nil
case EnableTask:
if schedule, exists := t.schedules[msg.TaskID]; exists {
err := t.Cron().EnableJob(schedule.ID)
if err != nil {
t.Send(from, TaskError{TaskID: msg.TaskID, Error: err.Error()})
return nil
}
t.Send(from, TaskEnabled{TaskID: msg.TaskID})
}
return nil
case DisableTask:
if schedule, exists := t.schedules[msg.TaskID]; exists {
err := t.Cron().DisableJob(schedule.ID)
if err != nil {
t.Send(from, TaskError{TaskID: msg.TaskID, Error: err.Error()})
return nil
}
t.Send(from, TaskDisabled{TaskID: msg.TaskID})
}
return nil
case GetTaskInfo:
if schedule, exists := t.schedules[msg.TaskID]; exists {
info, err := t.Cron().JobInfo(schedule.ID)
if err != nil {
t.Send(from, TaskError{TaskID: msg.TaskID, Error: err.Error()})
return nil
}
t.Send(from, TaskInfo{
TaskID: msg.TaskID,
JobID: schedule.ID,
Schedule: schedule.Schedule,
Enabled: info.Enabled,
NextRun: info.NextRun,
})
} else {
t.Send(from, TaskError{TaskID: msg.TaskID, Error: "task not found"})
}
return nil
}
return nil
}
type ScheduleTask struct{ TaskID, Schedule string }
type UnscheduleTask struct{ TaskID string }
type EnableTask struct{ TaskID string }
type DisableTask struct{ TaskID string }
type GetTaskInfo struct{ TaskID string }
type TaskScheduled struct{ TaskID, JobID string }
type TaskUnscheduled struct{ TaskID string }
type TaskEnabled struct{ TaskID string }
type TaskDisabled struct{ TaskID string }
type TaskExecuted struct{ TaskID string; Count int; Timestamp time.Time }
type TaskInfo struct{ TaskID, JobID, Schedule string; Enabled bool; NextRun time.Time }
type ScheduleError struct{ TaskID, Error string }
type UnscheduleError struct{ TaskID, Error string }
type TaskError struct{ TaskID, Error string }
func factoryTaskScheduler() gen.ProcessBehavior {
return &taskScheduler{}
}
Test Implementation:
func TestTaskScheduler_CronJobs(t *testing.T) {
actor, _ := unit.Spawn(t, factoryTaskScheduler)
client := gen.PID{Node: "test", ID: 100}
// Test basic job scheduling
actor.SendMessage(client, ScheduleTask{
TaskID: "daily-backup",
Schedule: "0 2 * * *", // Daily at 2 AM
})
// Verify cron job was added
actor.ShouldAddCronJob().
WithSchedule("0 2 * * *").
Once().
Assert()
// Verify scheduling response
actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
if scheduled, ok := msg.(TaskScheduled); ok {
return scheduled.TaskID == "daily-backup" && scheduled.JobID != ""
}
return false
}).Once().Assert()
// Test job execution by triggering it
actor.TriggerCronJob("0 2 * * *") // Manually trigger the scheduled job
// Verify job execution
actor.ShouldExecuteCronJob().
WithSchedule("0 2 * * *").
Once().
Assert()
// Verify task execution message
actor.ShouldSend().To("output").MessageMatching(func(msg any) bool {
if executed, ok := msg.(TaskExecuted); ok {
return executed.TaskID == "daily-backup" && executed.Count == 1
}
return false
}).Once().Assert()
}
func TestTaskScheduler_MockTimeControl(t *testing.T) {
actor, _ := unit.Spawn(t, factoryTaskScheduler)
client := gen.PID{Node: "test", ID: 100}
// Set initial mock time
baseTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
actor.SetCronMockTime(baseTime)
// Schedule a job for every minute
actor.SendMessage(client, ScheduleTask{
TaskID: "minute-task",
Schedule: "* * * * *", // Every minute
})
cronJob := actor.ShouldAddCronJob().Once().Capture()
actor.ClearEvents()
// Advance time by 1 minute - should trigger the job
actor.SetCronMockTime(baseTime.Add(1 * time.Minute))
// Verify job executed
actor.ShouldExecuteCronJob().
WithJobID(cronJob.ID).
Once().
Assert()
// Advance time by another minute
actor.SetCronMockTime(baseTime.Add(2 * time.Minute))
// Should execute again
actor.ShouldExecuteCronJob().
WithJobID(cronJob.ID).
Times(2). // Total of 2 executions
Assert()
}
Job Lifecycle Assertions:
// Test that cron job was added
actor.ShouldAddCronJob().WithSchedule("0 2 * * *").Once().Assert()
// Test job execution
actor.ShouldExecuteCronJob().WithSchedule("0 * * * *").Times(3).Assert()
// Test job removal
actor.ShouldRemoveCronJob().WithJobID("job-123").Once().Assert()
// Test job enable/disable
actor.ShouldEnableCronJob().WithJobID("job-123").Once().Assert()
actor.ShouldDisableCronJob().WithJobID("job-123").Once().Assert()
// Negative assertions
actor.ShouldNotAddCronJob().Assert()
actor.ShouldNotExecuteCronJob().Assert()
Mock Time Control:
// Set mock time for deterministic testing
baseTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
actor.SetCronMockTime(baseTime)
// Advance time to trigger scheduled jobs
actor.SetCronMockTime(baseTime.Add(1 * time.Hour))
// Manually trigger cron jobs for testing
actor.TriggerCronJob("0 * * * *") // Trigger hourly job
actor.TriggerCronJob("job-id-123") // Trigger by job ID
Advanced Cron Patterns:
// Capture cron job for detailed analysis
cronJob := actor.ShouldAddCronJob().Once().Capture()
jobID := cronJob.ID
schedule := cronJob.Schedule
// Test multiple job executions with time control
for i := 0; i < 5; i++ {
actor.SetCronMockTime(baseTime.Add(time.Duration(i) * time.Minute))
actor.TriggerCronJob("* * * * *") // Every minute
}
actor.ShouldExecuteCronJob().Times(5).Assert()
The library includes a comprehensive set of zero-dependency assertion functions that cover common testing scenarios without requiring external testing frameworks:
func TestBuiltInAssertions(t *testing.T) {
// Equality assertions
unit.Equal(t, "expected", "expected")
unit.NotEqual(t, "different", "value")
// Boolean assertions
unit.True(t, true)
unit.False(t, false)
// Nil assertions
unit.Nil(t, nil)
unit.NotNil(t, "not nil")
// String assertions
unit.Contains(t, "hello world", "world")
// Type assertions
unit.IsType(t, "", "string value")
}
Why Built-in Assertions:
Zero Dependencies: Avoid version conflicts and complex dependency management
Consistent Interface: All assertions follow the same pattern and error reporting
Testing Framework Agnostic: Works with any Go testing approach
Actor-Specific: Designed specifically for the needs of actor testing
Real-world actors frequently generate dynamic values like timestamps, UUIDs, session IDs, or auto-incrementing counters. Traditional testing approaches struggle with these values because they're unpredictable. The library provides sophisticated capture mechanisms to handle these scenarios elegantly.
The Challenge of Dynamic Values:
Timestamps: Created at runtime, impossible to predict exact values
UUIDs: Randomly generated, different in every test run
Auto-incrementing IDs: Dependent on execution order and system state
Process IDs: Assigned by the actor system, not controllable in tests
The Solution - Value Capture:
func TestDynamicValues(t *testing.T) {
actor, _ := unit.Spawn(t, factorySessionManager)
// Send request that will generate dynamic session ID
actor.SendMessage(gen.PID{}, CreateSession{UserID: "user123"})
// Capture the spawn to get the dynamic session PID
spawnResult := actor.ShouldSpawn().Once().Capture()
sessionPID := spawnResult.PID
// Use captured PID in subsequent assertions
actor.ShouldSend().MessageMatching(func(msg any) bool {
if created, ok := msg.(SessionCreated); ok {
return created.SessionPID == sessionPID && created.UserID == "user123"
}
return false
}).Once().Assert()
}
Capture Strategies:
Immediate Capture: Capture values as soon as they're generated
Pattern Matching: Use validation functions to identify and validate dynamic content
Structured Matching: Validate message structure while ignoring specific dynamic fields
Cross-Reference Testing: Use captured values in multiple assertions to ensure consistency
For complex testing scenarios or debugging difficult issues, the library provides direct access to the complete event timeline. This allows you to perform sophisticated analysis of actor behavior beyond what's possible with standard assertions.
Events()
- Complete Event HistoryAccess all captured events for detailed analysis:
func TestEventInspection(t *testing.T) {
actor, _ := unit.Spawn(t, factoryComplexActor)
// Perform operations
actor.SendMessage(gen.PID{}, ComplexOperation{})
// Get all events for inspection
events := actor.Events()
var sendCount, spawnCount, logCount, remoteSpawnCount int
for _, event := range events {
switch event.(type) {
case unit.SendEvent:
sendCount++
case unit.SpawnEvent:
spawnCount++
case unit.LogEvent:
logCount++
case unit.RemoteSpawnEvent:
remoteSpawnCount++
}
}
unit.True(t, sendCount > 0, "Should have send events")
unit.True(t, spawnCount == 2, "Should spawn exactly 2 processes")
unit.True(t, logCount >= 1, "Should have log events")
}
LastEvent()
- Most Recent OperationGet the most recently captured event:
func TestLastEvent(t *testing.T) {
actor, _ := unit.Spawn(t, factoryExampleActor)
actor.SendMessage(gen.PID{}, "test")
// Get the most recent event
lastEvent := actor.LastEvent()
unit.NotNil(t, lastEvent, "Should have a last event")
unit.Equal(t, "send", lastEvent.Type())
if sendEvent, ok := lastEvent.(unit.SendEvent); ok {
unit.Equal(t, "test", sendEvent.Message)
}
}
ClearEvents()
- Reset Event HistoryClear all captured events, useful for isolating test phases:
func TestClearEvents(t *testing.T) {
actor, _ := unit.Spawn(t, factoryExampleActor)
// Perform some operations
actor.SendMessage(gen.PID{}, "setup")
actor.ShouldSend().Once().Assert()
// Clear events before main test
actor.ClearEvents()
// Now test the main functionality
actor.SendMessage(gen.PID{}, "main_operation")
// Only the main operation events are captured
events := actor.Events()
unit.Equal(t, 1, len(events), "Should only have main operation event")
}
Event Inspection Use Cases:
Performance Analysis: Count operations to identify performance bottlenecks
Workflow Validation: Ensure complex multi-step processes execute in the correct order
Error Investigation: Analyze the complete event sequence leading to failures
Integration Testing: Verify that multiple actors interact correctly in complex scenarios
The library provides timeout support for assertions that might need time-based validation:
import (
"testing"
"time"
"ergo.services/ergo/testing/unit"
)
func TestWithTimeout(t *testing.T) {
actor, _ := unit.Spawn(t, factoryExampleActor)
// Test that assertion completes within timeout
success := unit.WithTimeout(func() {
actor.SendMessage(gen.PID{}, "test")
actor.ShouldSend().Once().Assert()
}, 5*time.Second)
unit.True(t, success(), "Assertion should complete within timeout")
}
Timeout Function Usage:
Assertion Wrapping: Wrap assertion functions to add timeout behavior
Integration Testing: Useful when testing with external systems that might have delays
Performance Validation: Ensure assertions complete within expected time limits
Single Responsibility Testing: Each test should focus on one specific behavior or scenario. This makes tests easier to understand, debug, and maintain.
// Good: Tests one specific behavior
func TestUserManager_CreateUser_Success(t *testing.T) { ... }
func TestUserManager_CreateUser_DuplicateEmail(t *testing.T) { ... }
func TestUserManager_CreateUser_InvalidData(t *testing.T) { ... }
// Poor: Tests multiple behaviors in one test
func TestUserManager_AllOperations(t *testing.T) { ... }
State Isolation:
Each test should start with a clean state and not depend on other tests. Use actor.ClearEvents()
when needed to reset event history between test phases.
Error Path Testing: Don't just test the happy path. Actor systems need robust error handling, so test failure scenarios thoroughly:
func TestWorkerSupervisor_MaxWorkersReached(t *testing.T) {
// Test that supervisor properly rejects requests when at capacity
// Test that appropriate error messages are sent
// Test that the supervisor remains functional after rejecting requests
}
Structured Messages: Design your messages to be easily testable by using structured types rather than primitive values:
// Good: Easy to test with pattern matching
type UserCreated struct {
UserID string
Email string
Created time.Time
}
// Poor: Hard to validate in tests
type GenericMessage struct {
Type string
Data map[string]interface{}
}
Predictable vs Dynamic Content: Separate predictable content from dynamic content in your messages to make testing easier:
type OrderProcessed struct {
OrderID string // Predictable - can be set in test
Total float64 // Predictable - can be set in test
ProcessedAt time.Time // Dynamic - use pattern matching
RequestID string // Dynamic - capture and validate
}
Event Overhead: While event capture is lightweight, be aware that every operation creates events. For performance-critical tests, you can:
Clear events periodically with ClearEvents()
Focus assertions on specific time windows
Use event inspection to identify performance bottlenecks
Scaling Testing: Test how your actors behave under load by simulating multiple concurrent operations:
import (
"fmt"
"testing"
"ergo.services/ergo/testing/unit"
)
func TestWorkerPool_ConcurrentRequests(t *testing.T) {
actor, _ := unit.Spawn(t, factoryWorkerPool)
// Send multiple requests concurrently
for i := 0; i < 100; i++ {
actor.SendMessage(gen.PID{}, ProcessRequest{ID: fmt.Sprintf("req-%d", i)})
}
// Verify all requests were processed
actor.ShouldSend().To("output").Times(100).Assert()
}
// Note: This example assumes you have defined:
// - type ProcessRequest struct{ ID string }
// - factoryWorkerPool() gen.ProcessBehavior function
Use descriptive test names that clearly indicate what behavior is being tested
Test all message types your actor handles, including edge cases
Capture dynamic values early using the Capture()
method for generated IDs
Test error conditions not just the happy path
Use pattern matching for complex message validation
Clear events between test phases when needed with ClearEvents()
Configure appropriate log levels for debugging vs production testing
Test temporal behaviors with timeout mechanisms
Validate distributed scenarios using network simulation
Organize tests by behavior rather than by implementation details
This testing library provides comprehensive coverage for all Ergo Framework actor patterns while maintaining zero external dependencies and excellent readability. By following these patterns and practices, you can build robust, well-tested actor systems that behave correctly in both simple and complex scenarios.
The library includes comprehensive test examples organized into feature-specific files that demonstrate all capabilities through real-world scenarios:
basic_test.go
- Fundamental Actor Testing
Basic actor functionality and message handling
Dynamic value capture and validation
Built-in assertions and event tracking
Core testing patterns and best practices
network_test.go
- Distributed System Testing
Remote node simulation and connectivity
Network configuration and route management
Remote spawn operations and event capture
Multi-node interaction patterns
workflow_test.go
- Complex Business Logic
Multi-step order processing workflows
State machine validation and transitions
Business process orchestration
Error handling and recovery scenarios
call_test.go
- Synchronous Communication
Call operations and response handling
Async call patterns and timeouts
Send/response communication flows
Concurrent request management
cron_test.go
- Scheduled Operations
Cron job lifecycle management
Mock time control and schedule validation
Job execution tracking and assertions
Time-dependent behavior testing
termination_test.go
- Actor Lifecycle Management
Actor termination handling and cleanup
Exit signal testing (SendExit/SendExitMeta)
Normal vs abnormal termination scenarios
Resource cleanup validation
Complex State Machine Testing (workflow_test.go
)
Multi-step order processing workflow
Validation, payment, and fulfillment pipeline
State transition validation and error handling
Process Management (basic_test.go
)
Dynamic worker spawning and management
Resource capacity limits and monitoring
Worker lifecycle (start, stop, restart)
Advanced Pattern Matching (basic_test.go
)
Structure matching with partial validation
Dynamic value handling and field validation
Complex conditional message matching
Remote Spawn Testing (network_test.go
)
Remote spawn operations on multiple nodes
Round-robin distribution testing
Error handling for unavailable nodes
Event inspection and workflow validation
Cron Job Management (cron_test.go
)
Job scheduling and execution validation
Mock time control for deterministic testing
Schedule expression testing and validation
Actor Termination (termination_test.go
)
Normal and abnormal termination scenarios
Exit signal testing and process cleanup
Termination reason validation
Post-termination behavior verification
Concurrent Operations (call_test.go
)
Multi-client concurrent request handling
Resource contention and capacity management
Load testing and performance validation
Environment & Configuration (basic_test.go
)
Environment variable management
Runtime configuration changes
Feature flag and conditional behavior testing
// Import the testing library
import "ergo.services/ergo/testing/unit"
// Run all tests
go test -v ergo.services/ergo/testing/unit
// Run feature-specific tests
go test -v -run TestBasic ergo.services/ergo/testing/unit
go test -v -run TestNetwork ergo.services/ergo/testing/unit
go test -v -run TestWorkflow ergo.services/ergo/testing/unit
go test -v -run TestCall ergo.services/ergo/testing/unit
go test -v -run TestCron ergo.services/ergo/testing/unit
go test -v -run TestTermination ergo.services/ergo/testing/unit
Start with Basic Examples: basic_test.go
- Core functionality and patterns
Explore Message Testing: basic_test.go
- Message flow and assertions
Learn Process Management: basic_test.go
- Spawn operations and lifecycle
Master Synchronous Communication: call_test.go
- Calls and responses
Study Complex Workflows: workflow_test.go
- Business logic testing
Practice Network Testing: network_test.go
- Distributed operations
Explore Scheduling: cron_test.go
- Time-based operations
Understand Termination: termination_test.go
- Lifecycle completion
Each test file provides complete, working implementations of specific actor patterns and demonstrates best practices for testing each scenario. All tests include comprehensive comments explaining the testing strategy and validation approach.
Real actors often behave differently based on configuration. Let's test this:
func TestDatabaseActor_ConfigurationBehavior(t *testing.T) {
// Test with different configurations
// Development configuration
devActor, _ := unit.Spawn(t, newDatabaseActor,
unit.WithEnv(map[gen.Env]any{
"DB_POOL_SIZE": 5,
"LOG_QUERIES": true,
}))
devActor.SendMessage(gen.PID{}, ExecuteQuery{SQL: "SELECT * FROM users"})
devActor.ShouldLog().Level(gen.LogLevelDebug).Containing("SELECT * FROM users").Assert()
// Production configuration
prodActor, _ := unit.Spawn(t, newDatabaseActor,
unit.WithEnv(map[gen.Env]any{
"DB_POOL_SIZE": 50,
"LOG_QUERIES": false,
}))
prodActor.SendMessage(gen.PID{}, ExecuteQuery{SQL: "SELECT * FROM users"})
prodActor.ShouldLog().Level(gen.LogLevelDebug).Times(0).Assert() // No query logging in prod
}
As your actors become more sophisticated, your message testing needs to handle more complex scenarios:
func TestOrderProcessor_WorkflowSteps(t *testing.T) {
actor, _ := unit.Spawn(t, newOrderProcessor)
client := gen.PID{Node: "client", ID: 1}
// Start an order
actor.SendMessage(client, CreateOrder{Items: []string{"book", "pen"}})
// Should trigger a sequence of operations
actor.ShouldSend().To("inventory").Message("check_availability").Once().Assert()
actor.ShouldSend().To("payment").Message("calculate_total").Once().Assert()
actor.ShouldSend().To("shipping").Message("estimate_delivery").Once().Assert()
// Should send status back to client
actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
if status, ok := msg.(OrderStatus); ok {
return status.Status == "processing"
}
return false
}).Once().Assert()
}
func TestSecurityGate_AccessControl(t *testing.T) {
actor, _ := unit.Spawn(t, newSecurityGate)
// Test admin access
admin := gen.PID{Node: "admin", ID: 1}
actor.SendMessage(admin, AccessRequest{Resource: "admin_panel", User: "admin"})
actor.ShouldSend().To(admin).Message(AccessGranted{}).Once().Assert()
// Test regular user access to admin panel
user := gen.PID{Node: "user", ID: 2}
actor.SendMessage(user, AccessRequest{Resource: "admin_panel", User: "regular_user"})
actor.ShouldSend().To(user).Message(AccessDenied{Reason: "insufficient privileges"}).Once().Assert()
// Test regular user access to public resources
actor.SendMessage(user, AccessRequest{Resource: "public_content", User: "regular_user"})
actor.ShouldSend().To(user).Message(AccessGranted{}).Once().Assert()
}
Many actors need to create child processes. Here's how to test this:
func TestTaskManager_WorkerCreation(t *testing.T) {
actor, _ := unit.Spawn(t, newTaskManager)
client := gen.PID{Node: "client", ID: 1}
// Request a new worker
actor.SendMessage(client, CreateWorker{TaskType: "data_processing"})
// Should spawn a worker process
actor.ShouldSpawn().Once().Assert()
// Should confirm to client
actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
if response, ok := msg.(WorkerCreated); ok {
return response.TaskType == "data_processing"
}
return false
}).Once().Assert()
}
When actors spawn processes, you often need to use the generated PID in subsequent tests:
func TestSessionManager_UserSessions(t *testing.T) {
actor, _ := unit.Spawn(t, newSessionManager)
client := gen.PID{Node: "client", ID: 1}
// Create a session for a user
actor.SendMessage(client, CreateSession{UserID: "alice"})
// Capture the spawned session process
sessionSpawn := actor.ShouldSpawn().Once().Capture()
sessionPID := sessionSpawn.PID
// Verify session was registered
actor.ShouldSend().To(client).MessageMatching(func(msg any) bool {
if response, ok := msg.(SessionCreated); ok {
return response.UserID == "alice" && response.SessionPID == sessionPID
}
return false
}).Once().Assert()
// Send work to the session
actor.SendMessage(client, SendToSession{UserID: "alice", Data: "important_data"})
// Should route to the captured session PID
actor.ShouldSend().To(sessionPID).Message("important_data").Once().Assert()
}
When tests fail, you need to understand what actually happened:
func TestComplexActor_DebugFailures(t *testing.T) {
actor, _ := unit.Spawn(t, newComplexActor)
// Perform some operations
actor.SendMessage(gen.PID{}, TriggerComplexWorkflow{})
// If something goes wrong, inspect all events
events := actor.Events()
t.Logf("Total events captured: %d", len(events))
for i, event := range events {
t.Logf("Event %d: %s - %s", i, event.Type(), event.String())
}
// Clear events and test specific behavior
actor.ClearEvents()
actor.SendMessage(gen.PID{}, SimpleBehavior{})
// Now only simple behavior events are captured
simpleEvents := actor.Events()
unit.Equal(t, 1, len(simpleEvents), "Should only have one event after clearing")
}
Event Accumulation: Events accumulate across multiple operations. Use ClearEvents()
to reset between test phases.
Timing Issues: Some assertions may need time to complete. Use appropriate timeouts and consider async patterns.
Message Ordering: In high-throughput scenarios, message ordering might not be guaranteed. Test for this explicitly.
State Leakage: Each test should start with clean state. Don't rely on previous test state.
The Ergo Framework unit testing library provides comprehensive tools for testing actor-based systems. From simple message exchanges to complex distributed workflows, you can validate every aspect of your actor behavior with confidence.
Key Takeaways:
Start Simple: Begin with basic message testing and gradually add complexity
Test Comprehensively: Cover happy paths, error conditions, and edge cases
Use Fluent Assertions: Take advantage of the readable assertion API
Inspect Events: Use event inspection for debugging and understanding actor behavior
Organize Tests: Structure tests by behavior and keep them focused
Handle Async Patterns: Use appropriate timeouts and pattern matching for async operations
The library's zero-dependency design, comprehensive feature set, and integration with Go's testing framework make it the ideal choice for building robust, well-tested actor systems with the Ergo Framework.
Next Steps:
Explore the complete test examples in the framework repository
Start with simple actors and gradually build complexity
Integrate testing into your development workflow
Use the debugging features when tests fail
Share testing patterns with your team
Happy testing!
The Actor Model and Its Properties
The actor model was developed in the 1970s for concurrent and parallel computations. It defines some key rules for how system components should behave and interact with each other. The main idea is that interactions between program components are not conducted through function or procedure calls but through the exchange of asynchronous messages. You can read more about the actor model and its history in the Wikipedia article.
One of the most popular programming languages that uses this model is Erlang, where the actor model is at the core of its BEAM virtual machine. In Erlang, actors are represented by lightweight processes within the virtual machine.
There are also implementations of this model in other languages. For example, in Java, the Akka framework provides a powerful tool for building solutions based on the actor model. Akka brings the principles of the actor model to the Java ecosystem, enabling developers to create highly concurrent, distributed, and fault-tolerant applications.
The Ergo Framework implements the actor model based on lightweight processes. Each process can send and receive asynchronous messages, make synchronous calls, and spawn new processes.
A process in Ergo Framework is a lightweight entity running on top of a goroutine, built around the actor model. Each process has a mailbox for incoming messages. By default, this mailbox is of unlimited size, but it can be limited by setting an appropriate parameter when the process is started. The mailbox contains four queues: Main, System, Urgent, and Log. These queues determine the priority of message processing.
An actor in Ergo Framework is an abstraction over such a process, with a set of callbacks for handling incoming messages. The standard library includes general-purpose actors like act.Actor
, act.Supervisor
, and act.Pool
, as well as a specialized actor for handling HTTP requests, act.WebHandler
.
Mailbox processing in an actor is done sequentially in a FIFO order, but with respect to priority. Messages from the Urgent queue are processed first, followed by those in the System queue. If these queues are empty, messages from the Main queue are processed. Typically, messages from the node (e.g., a request to stop the process) are placed in the Urgent and System queues. All other messages are delivered to the Main queue by default. Messages in the Log queue are processed with the lowest priority.
This package implements the gen.LoggerBehavior
interface and provides the ability to output log messages to standard output with color highlighting.
Below is a demonstration of log messages (from nodes, processes, and meta-processes) with different logging levels:
<time> <level> <log source> [process name] [process behavior]: <log message>
When logging, the package also highlights in color the types gen.Atom
, gen.PID
, gen.ProcessID
, gen.Ref
, gen.Alias
, gen.Event
.
Sets the format for the timestamp of log messages. You can use any existing format (see time package) or define your own. By default, the time is displayed in nanoseconds
ShortLevelName
Displays the shortened name of the log level
IncludeBehavior
Includes the name of the process behavior in the log message
IncludeName
includes the registered name of the process in the log message
package main
import (
"ergo.services/ergo"
"ergo.services/ergo/gen"
"ergo.services/logger/colored"
)
func main() {
logger := gen.Logger{
Name: "colored",
Logger: colored.CreateLogger(colored.Options{}),
}
nopt := gen.NodeOptions{}
nopt.Log.Loggers = []gen.Logger{logger}
// disable default logger to get rid of duplicating log-messages
nopt.Log.DefaultLogger.Disable = true
node, err := ergo.StartNode("demo@localhost", nopt)
if err != nil {
panic(err)
}
node.Log().Warning("Hello World!!!")
node.Wait()
}
The ergo
tool allows you to generate the structure and source code for a project based on the Ergo Framework. To install it, use the following command:
go install ergo.services/tools/ergo@latest
Alternatively, you can build it from the source code available at https://github.com/ergo-services/tools.
When using ergo
tool, you need to follow the specific template for providing arguments:
Parent:Actor{param1:value1,param2:value2...}
Parent can be a supervisor (specified earlier with -with-sup
) or an application (specified earlier with -with-app
).
Actor can be an actor (added earlier with -with-actor
) or a supervisor (specified earlier with -with-sup
).
This structured approach ensures the proper hierarchy and parameters are defined for your actors and supervisors
-init <node_name>
: a required argument that sets the name of the node for your service. Available parameters:
tls
: enables encryption for network connections (a self-signed certificate will be used).
module
: allows you to specify the module name for the go.mod
file.
-path <path>
: specifies the path for the code of the generated project.
-with-actor <name>
: adds an actor (based on act.Actor
).
-with-app <name>
: adds an application. Available parameters:
mode
: specifies the application's start mode (temp
- Temporary, perm
- Permanent, trans
- Transient). The default mode is trans
.
Example: -with-app MyApp{mode:perm}
-with-sup <name>
: adds a supervisor (based on act.Supervisor
). Available parameters:
type
: specifies the type of supervisor (ofo
- One For One, sofo
- Simple One For One, afo
- All For One, rfo
- Rest For One). The default type is ofo
.
strategy
: specifies the restart strategy for the supervisor (temp
- Temporary, perm
- Permanent, trans
- Transient). The default strategy is trans
.
-with-pool <name>
: adds a process pool actor (based on act.Pool
). Available parameters:
size
: Specifies the number of worker processes in the pool. By default, 3 processes are started.
-with-web <name>
: adds a Web server (based on act.Pool
and act.WebHandler
). Available parameters:
host
: specifies the hostname for the Web server.
port
: specifies the port number for the Web server. The default is 9090
.
tls
: enables encryption for the Web server using the node's CertManager
.
-with-tcp <name>
: adds a TCP server actor (based on act.Actor
and meta.TCP
meta-process). Available parameters:
host
: specifies the hostname for the TCP server.
port
: specifies the port number for the TCP server. The default is 7654
.
tls
: enables encryption for the TCP server using the node's CertManager
.
-with-udp <name>
: adds a UDP server actor (based on act.Pool
, meta.UDPServer
and act.Actor
as worker processes). Available parameters:
host
: specifies the hostname for the UDP server.
port
: specifies the port number for the UDP server. The default is 7654
.
-with-msg <name>
: adds a message type for network interactions.
-with-observer
: adds the Observer application.
For clarity, let's use all available arguments for ergo
in the following example:
$ ergo -path /tmp/project \
-init demo{tls} \
-with-app MyApp \
-with-actor MyApp:MyActorInApp \
-with-sup MyApp:MySup \
-with-actor MySup:MyActorInSup \
-with-tcp "MySup:MyTCP{port:12345,tls}" \
-with-udp MySup:MyUDP{port:54321} \
-with-pool MySup:MyPool{size:4} \
-with-web "MyWeb{port:8888,tls}" \
-with-msg MyMsg1 \
-with-msg MyMsg2 \
-with-logger colored \
-with-logger rotate \
-with-observer
Generating project "/tmp/project/demo"...
generating "/tmp/project/demo/apps/myapp/myactorinapp.go"
generating "/tmp/project/demo/apps/myapp/myactorinsup.go"
generating "/tmp/project/demo/cmd/myweb.go"
generating "/tmp/project/demo/cmd/myweb_worker.go"
generating "/tmp/project/demo/apps/myapp/mytcp.go"
generating "/tmp/project/demo/apps/myapp/myudp.go"
generating "/tmp/project/demo/apps/myapp/myudp_worker.go"
generating "/tmp/project/demo/apps/myapp/mypool.go"
generating "/tmp/project/demo/apps/myapp/mypool_worker.go"
generating "/tmp/project/demo/apps/myapp/mysup.go"
generating "/tmp/project/demo/apps/myapp/myapp.go"
generating "/tmp/project/demo/types.go"
generating "/tmp/project/demo/cmd/demo.go"
generating "/tmp/project/demo/README.md"
generating "/tmp/project/demo/go.mod"
generating "/tmp/project/demo/go.sum"
Successfully completed.
Pay attention to the values of the -with-tcp
and -with-web
arguments — they are enclosed in double quotes. If an argument has multiple parameters, they are separated by commas without spaces. However, since commas are argument delimiters for the shell interpreter, we enclose the entire value of the argument in double quotes to ensure the shell correctly processes the parameters.
In our example, we specified two loggers: colored
and rotate
. This allows for colored log messages in the standard output as well as logging to files with log rotation functionality. In this case, the default logger is disabled to prevent duplicate log messages from appearing on the standard output.
Additionally, we included the observer
application. By default, this interface is accessible at http://localhost:9911
.
As a result of the generation process, we have a well-structured project source code that is ready for execution:
demo
├── apps
│ └── myapp
│ ├── myactorinapp.go
│ ├── myactorinsup.go
│ ├── myapp.go
│ ├── mypool.go
│ ├── mypool_worker.go
│ ├── mysup.go
│ ├── mytcp.go
│ ├── myudp.go
│ └── myudp_worker.go
├── cmd
│ ├── demo.go
│ ├── myweb.go
│ └── myweb_worker.go
├── go.mod
├── go.sum
├── README.md
└── types.go
The generated code is ready for compilation and execution:
Since this example includes the observer application, you can open http://localhost:9911
in your browser to access the web interface for inspecting the node and its running processes.