View on GitHub

ccsdsmo-malgo

CCSDS MO MAL Go API

MAL/GO API

Introduction

This GO API represents all MAL concepts, especially the interaction patterns, the MAL message format and the data model. The main goal of this API is to offer a simple and efficient API that makes full use of GO features.

Concepts

Most concepts of MAL/GO API are imposed by the MAL specification:

However, the MAL specification does not define some concepts that are specific to this implementation:

Overview of the MAL/GO implementation

The MAL/GO implementation shall comprise the following levels:

The MAL/GO implementation consists of several modules:

MAL data types

The MAL Element is the base type of all data constructs, all types are derived from it. The Element GO interface represents this MAL Element type, it defines the base interface allowing to encode (resp. decode) each data type.

The MAL Attribute is the base type of all attributes of the MAL data model. Attributes are contained within Composites and are used to build complex structures that make the data model. The Attribute Go interface represents this MAL Attribute type, all GO types defining a MAL attribute inherits from this type. All MAL attributes except Time and FineTime are represented by GO base types.

The MAL Composite is the base structure for composite structures that contain a set of elements. The Composite GO interface represents this structure, all GO types defining a MAL Composite inherits from this type.

For each GO interface or structure, named X, defining a MAL Element, a XList type shall be defined to handle MAL list of this element.

All data types defined in the MAL standard and the corresponding list are defined in the MAL/GO API.

MAL message

The Message GO structure represents the MAL Message. It defines all header data fields, the body is handled as a byte array (encoded representation of this message body). The encoding of the message to (resp. from) the Protocol Data Unit (PDU) is the responsibility of the transport implementation.

MAL context

The Context GO structure is the base entity enabling a client to use the communication functions provided by the MAL layer. Normally it is not be used as is but through the EndPoint structure or the high level API. However you can send MAL messages using the Context.Send method, and receive MAL Message registering a Listener.

###Initialization and configuration

A MAL context is created and initialized from a call to NewContext function. This function takes in parameter the URI of this context, the corresponding transport will be instantiated and initialized through a factory using this URI (this transport factory needs to be first registered).

func NewContext(url string) (*Context, error)

Note: Register a transport factory only needs to import for side effect the corresponding package in your program.

Configuration

There are 2 methods allowing to configure a MAL context:

func (ctx *Context) SetAccessControl(achdlr AccessControl)
func (ctx *Context) SetErrorChannel(errch chan *MessageError)

Messages sending

The Context defines the Send request method to transmit a message through the transport component.

func (ctx *Context) Send(msg *Message) error

Messages reception

The Context defines the Receive indication method to handle incoming messages from the transport component. All incoming messages are pushed in the internal channel if the security check is ok. Then they can be delivered to registered listeners.

func (ctx *Context) Receive(msg *Message) error

The Context defines method to register (resp. unregister) End-Points, an End-Point is a message listener associated with a specific MAL URI. After registration all messages sent to the corresponding URI are delivered to the End-Point through the onMessage method of its Listener interface. The GetEndPoint method allows to retrieve the End-Point associated with a specific URI.

func (ctx *Context) GetEndPoint(uri *URI) (Listener, error)
func (ctx *Context) RegisterEndPoint(uri *URI, listener Listener) error
func (ctx *Context) UnregisterEndPoint(uri *URI) error

Access control

Message error

MAL end-point

The EndPoint defines a generic implementation of the Listener interface, it is the base entity allowing to send and receive MAL Messages. It defines a method to send messages EndPoint.Send, a blocking method to receive messages EndPoint.Receive, and a method to close and unregister the end-point EndPoint.Close. The construction of each MAL message shall be done manually.

Each end-point handles an atomic counter allowing to generates the appropriate MAL TransactionId using the EndPoint.TransactionId method. A end-point is created by using the NewEndPoint function.

func NewEndPoint(ctx *Context, service string, ch chan *Message) (*EndPoint, error) {

func (endpoint *EndPoint) TransactionId() ULong
func (endpoint *EndPoint) Send(msg *Message) error
func (endpoint *EndPoint) Recv() (*Message, error)

func (endpoint *EndPoint) Close() error

MAL client context

The ClientContext entity is the entry point of the high level API. It corresponds to a unique MAL URI and is registered as an end-point with the underlying MAL context. The ClientContext entity manages the MAL TransactionId. It allows providers to register handlers to process consumer’s requests. It allows consumers to create operations to initiate and manage interaction with providers.

Initialization and configuration

A ClientContext is created and initialized from a call to NewClientContext function (package github.com/ccsdsmo/malgo/mal/api). This function takes in parameter the underlying MAL context, and the name of the service (last part of the MAL URI for the corresponding end-point).

func NewClientContext(ctx *Context, service string) (*ClientContext, error)

After creation the client context can be configured through a set of primitives allowing to fix various MAL message attributes:

func (cctx *ClientContext) SetAuthenticationId(AuthenticationId Blob) *ClientContext
func (cctx *ClientContext) SetEncodingId(EncodingId UOctet) *ClientContext
func (cctx *ClientContext) SetQoSLevel(QoSLevel QoSLevel) *ClientContext
func (cctx *ClientContext) SetPriority(Priority UInteger) *ClientContext
func (cctx *ClientContext) SetDomain(Domain IdentifierList) *ClientContext
func (cctx *ClientContext) SetNetworkZone(NetworkZone Identifier) *ClientContext
func (cctx *ClientContext) SetSession(Session SessionType) *ClientContext
func (cctx *ClientContext) SetSessionName(SessionName Identifier) *ClientContext

ClientContext concurrency

By default handling of incoming messages is done by the thread of underlying MAL Context. This feature ensures the order of message processing, however such behavior can lead to deadlocks in case of nested calls (direct or indirect). When the Concurrency attribute is set each message processing is executed in a separate goroutine. It is then the responsibility of the supplier to ensure the synchronization and the order of processing of requests.

func (cctx *ClientContext) SetConcurrency(multi bool) *ClientContext

Registering provider’s handler

The ClientContext entity defines a set of 6 methods allowing providers to register handlers to process consumer’s requests. Each method is dedicated to a particular MAL interaction. When invoked the corresponding handler receives in parameter a Transaction entity corresponding to the interaction. This entity provides methods to handle the interaction.

The definition of handler interface is:

type ProviderHandler func(*Message, Transaction) error

// Register an handler for Send interaction
func (cctx *ClientContext) RegisterSendHandler(area UShort, areaVersion UOctet, service UShort, operation UShort, handler ProviderHandler) error

// Register an handler for Submit interaction
func (cctx *ClientContext) RegisterSubmitHandler(area UShort, areaVersion UOctet, service UShort, operation UShort, handler ProviderHandler) error

// Register an handler for Request interaction
func (cctx *ClientContext) RegisterRequestHandler(area UShort, areaVersion UOctet, service UShort, operation UShort, handler ProviderHandler) error

// Register an handler for Invoke interaction
func (cctx *ClientContext) RegisterInvokeHandler(area UShort, areaVersion UOctet, service UShort, operation UShort, handler ProviderHandler) error

// Register an handler for progress interaction
func (cctx *ClientContext) RegisterProgressHandler(area UShort, areaVersion UOctet, service UShort, operation UShort, handler ProviderHandler) error
                                                   
// Register an handler for PubSub interaction
func (cctx *ClientContext) RegisterBrokerHandler(area UShort, areaVersion UOctet, service UShort, operation UShort, handler ProviderHandler) error

The definition of transactions entities are:

type Transaction interface {
	getTid() ULong
}

// MAL Send interaction
type SendTransaction interface {
	Transaction
}

// MAL Submit interaction
type SubmitTransaction interface {
	Transaction
	Ack(body []byte, isError bool) error
}

// MAL Request interaction
type RequestTransaction interface {
	Transaction
	Reply(body []byte, isError bool) error
}

// MAL Invoke interaction
type InvokeTransaction interface {
	Transaction
	Ack(body []byte, isError bool) error
	Reply(body []byte, isError bool) error
}

// MAL Progress interaction
type ProgressTransaction interface {
	Transaction
	Ack(body []byte, isError bool) error
	Update(body []byte, isError bool) error
	Reply(body []byte, isError bool) error
}

// MAL Pub/Sub interaction
type BrokerTransaction interface {
	Transaction
	AckRegister(body []byte, isError bool) error
	AckDeregister(body []byte, isError bool) error
}

// SubscriberTransaction
type SubscriberTransaction interface {
	BrokerTransaction
	Notify(body []byte, isError bool) error
}

// PublisherTransaction
type PublisherTransaction interface {
	BrokerTransaction
}

Handling errors in provider’s handler

The body of an error message must consist of an error number followed by extra information. Each service specification must define which error numbers may be returned for a given operation and the nature of the additional information (possibly missing). Depending on the operation being processed, you must encode the error message according to the information described in the specification. Then at the API level, you must use the transaction’s method that corresponds to your current interaction (Ack, Reply, Update, etc.) to send this message and specify that it is an error using the isError parameter (set to true).

Creating consumer’s operation

The ClientContext entity defines a set of 7 methods allowing consumers to create entities to request providers. Each created entity encapsulates the resources needed to enable a MAL consumer to initiate and manage interactions with a MAL provider or broker.

Additionally all Operation entities provide a Close method and a Reset method. The Reset method allows to reuse the operation after its completion.

The interface of ClientContext is:

func NewClientContext(ctx *Context, service string) (*ClientContext, error)
func (cctx *ClientContext) TransactionId() ULong

func (cctx *ClientContext) NewSendOperation(urito *URI,
	area UShort, areaVersion UOctet, service UShort, operation UShort) SendOperation 
	
func (cctx *ClientContext) NewSubmitOperation(urito *URI,
	area UShort, areaVersion UOctet, service UShort, operation UShort) SubmitOperation
	
func (cctx *ClientContext) NewRequestOperation(urito *URI,
	area UShort, areaVersion UOctet, service UShort, operation UShort) RequestOperation

func (cctx *ClientContext) NewInvokeOperation(urito *URI,
	area UShort, areaVersion UOctet, service UShort, operation UShort) InvokeOperation

func (cctx *ClientContext) NewProgressOperation(urito *URI,
	area UShort, areaVersion UOctet, service UShort, operation UShort) ProgressOperation

func (cctx *ClientContext) NewSubscriberOperation(urito *URI,
	area UShort, areaVersion UOctet, service UShort, operation UShort) SubscriberOperation

func (cctx *ClientContext) NewPublisherOperation(urito *URI,
	area UShort, areaVersion UOctet, service UShort, operation UShort) PublisherOperation

The definition of operation entities are:

type Operation interface {
	// Get current TransactionId
	GetTid() ULong
	// Interrupt the operation during a blocking processing.
	Interrupt()
	// Reset the operation in order to reuse it
	Reset() error
	// Close the operation
	Close() error
}

type SendOperation interface {
	Operation
	Send(body []byte) error
}

type SubmitOperation interface {
	Operation
	Submit(body []byte) (*Message, error)
}

type RequestOperation interface {
	Operation
	Request(body []byte) (*Message, error)
}

type InvokeOperation interface {
	Operation
	Invoke(body []byte) (*Message, error)
	GetResponse() (*Message, error)
}

type ProgressOperation interface {
	Operation
	Progress(body []byte) (*Message, error)
	GetUpdate() (*Message, error)
	GetResponse() (*Message, error)
}

type SubscriberOperation interface {
	Operation
	Register(body []byte) (*Message, error)
	GetNotify() (*Message, error)
	Deregister(body []byte) (*Message, error)
}

type PublisherOperation interface {
	Operation
	Register(body []byte) (*Message, error)
	Publish(body []byte) error
	Deregister(body []byte) (*Message, error)
}

Closing a client context

The Close method allows to close and unregister the context.s

func (cctx *ClientContext) Close() error

User’s Guide of the MAL/GO implementationn

Creating a MAL program using the MAL/GO implementation needs first to create and initialize a MAL context, then you can either use the low level API with EndPoint or use the high level API with ClientContext, Handlers and operations. These two modes are demonstrated through code samples below.

Creation of MAL context

A MAL context is created and initialized from a call to NewContext function of the mal package. This function takes in parameter an URL corresponding to the URI of this context:

After use the MAL context shall be closed using the Close method, this call closes all registered listeners (end-point, etc) and finalizes the underlying transport.

func NewContext(url string) (*Context, error)
func (ctx *Context) Close() error

Example: The code below creates a MAL context using MALTCP transport and listening on port 16000 of the local network interface. The Close method of this context will be called at the end of the current function.

consumer_ctx, err := NewContext("maltcp://127.0.0.1:16000")
if err != nil 
	t.Fatal("Error creating context, ", err
	return
defer consumer_ctx.Close()

Using a simple end-point

The end-point is created using the NewEndPoint function, the parameters are:

A code sample is available in the TCP transport tests (github.com/ccsdsmo/malgo/mal/transport/tcp/transport_test.go).

service, err := NewEndPoint(ctx, "service", nil)
if err != nil {
	logger.Fatalf("Error creating service end-point: ", err)
}

Sending and Receiving a message

msg := &Message{
	UriFrom:          service.Uri,
	UriTo:            providerUri,
	TransactionId:    service.TransactionId(),
	...
	Body:             payload,
}
service.Send(msg)

msg, err := service.Recv()
if msg != nil {
	logger.Errorf("Error receiving a message: ", err)
}

Using the High level API

Implementing a Provider

The example below shows a provider handling a unique progress interaction. Code samples are available in the MAL API tests (github.com/ccsdsmo/malgo/mal/api/*_test.go).

// Provider context containing its datas
type MyProvider struct {
	ctx   *Context
	cctx  *ClientContext
	nbmsg int
}

// Creation and initialization of the provider
func newMyProvider(url string, service string) (*MyProvider, error) {
	// Creates and initializes the MAL context
	ctx, err := NewContext(url)
	if err != nil {
		return nil, err
	}
	// Creates the client context for the provider
	cctx, err := NewClientContext(ctx, service)
	if err != nil {
		return nil, err
	}
	// Allocates and initializes the provider structure
	provider := &MyProvider{ctx, cctx, 0}

	// Handler of a Progress operation
	progressHandler := func(msg *Message, t Transaction) error {
		provider.nbmsg += 1
		if msg != nil {
			transaction := t.(ProgressTransaction)
			transaction.Ack(nil, false)
			for i := 0; i < 10; i++ {
				transaction.Update(update, false)
			}
			transaction.Reply(response, false)
		} else {
			logger.Warnf("receive: nil")
		}
		return nil
	}
	// Registers the handler above
	cctx.RegisterProgressHandler(200, 1, 1, 1, progressHandler1)

	// Declares and registers other handlers..

	return provider, nil
}

// Close the provider.
func (provider *MyProvider) close() {
	provider.ctx.Close()
}

The code below allows to create the provider.

// Creates a provider registered with maltcp://127.0.0.1:16000/service URI
provider, err := newMyProvider("maltcp://127.0.0.1:16000", "service")
if err != nil {
	logger.Errorf("Error creating provider: ", err)
	return
}
defer provider.close()

Implementing a Consumer

The code below shows how to request the progress interaction of the provider above.

// Creates a new MAL context.
ctx, err := NewContext("maltcp://127.0.0.1:16001")
if err != nil {
	return err
}
defer ctx.Close()

// Creates a client context for consumer's operations.
consumer, err := NewClientContext(consumer_ctx, "consumer")
if err != nil {
	return err
}

// Creates a new ProgressOperation from the consumer's context.
op := consumer.NewProgressOperation("maltcp://127.0.0.1:16000/service", 200, 1, 1, 1)
// Initiates a Progress interaction with payload ([]byte encoded content).
op.Progress(payload)

// Gets Update messages from service
updt, err := op1.GetUpdate()
if err != nil {
	return err
}
for updt != nil {
	updt, err = op1.GetUpdate()
	if err != nil {
		return err
	}
}
// There is no more updates, gets the Response message from service.
rep, err := op1.GetResponse()
if err != nil {
	return err
}

Using a MAL broker

A simple implementation of a MAL broker is available in github.com/ccsdsmo/malgo/mal/broker package. This implementation is based on the high-level API describes above.

// Creates a new broker
func NewBroker(cctx *ClientContext, updtHandler UpdateValueHandler, encoding EncodingFactory) (*BrokerHandler, error)

// Get broker URI
func (handler *BrokerImpl) Uri() *URI
// Gets the underlying ClientContext used by the broker.
func (handler *BrokerImpl) ClientContext() *ClientContext
// Closes the Broker
func (handler *BrokerImpl) Close()

The UpdateValueHandler interface allows the handling of list of specific «Update Value Type» by the broker. It allows the decoding of the «Update Value Type» lists received from the PUBLISH message and the construction and encoding of the resulting lists of each NOTIFY messages sent.

type UpdateValueHandler interface {
	// Decodes the lists for each type defined in the top level PUBSUB template
	// (see CCSDS 521.0-B-2 3.5.6.8 l,m,n and o). 
	DecodeUpdateValueList(decoder Decoder) error
	// Returns the number of update value decoded in each list.
	UpdateValueListSize() int
	// Append the specified element of each <<Update Value Type>> lists received in
	// the resulting list for the NOTIFY message under construction.
	AppendValue(idx int)
	// Encode the built list using the specified encoder.
	EncodeUpdateValueList(encoder Encoder) error
	// Reset the build list allowing a new cycle.
	ResetValues()
}

Examples of usage are available in the broker’s tests, as well as in the implementation of the COM Event service.