caf_sharing/ReliableChannel

A collection of helper functions to create a reliable, unidirectional channel from A to B, using a pair of Shared Maps.

One Shared Map (the writer) is writable by A and readable by B. The other one (the reader) is writable by B and readable by A. B writes index numbers in its map to acknowledge messages, helping A to garbage collect old messages.

A channel is represented by an unbounded list of messages. However, only those that have not been acknowledged are actually present in the Shared Map.

The schema is as follows:

In the writer map:

{
    __ca_channels__ : {
       foo : {
           contents:[...],
           index : <number> i.e., index of first msg in `contents`
        }
    }
}

in the reader map:

{
     __ca_acks__ : {
       foo : <number>  (i.e., index of last processed message of 'foo')
     }
}

The first message has index 0.

When B consumes all the messages, it resets the ack index to foo.index + foo.contents.length -1 .

When A garbage collects the channel, it drops all the acknowledged messages by comparing __ca_channels__.foo.index with __ca_acks__.foo, and shifting the list.

Source:

Members

UNKNOWN_ACK_INDEX :number

Source:

Invalid ack index.

Type:
  • number

Methods

allChannelNames(writerRef) → {Array.<string>}

Source:

Lists all channel names in a Shared Map.

Parameters:
Name Type Description
writerRef refMapType

A reference to a writable Shared Map.

Returns:

All the channel names.

Type
Array.<string>

deleteChannel(writerRef, channelName)

Source:

Deletes a channel in a Shared Map.

Parameters:
Name Type Description
writerRef refMapType

A reference to a writable Shared Map.

channelName string

The name of the channel.

firstAckIndex(readerRef, channelName) → {number}

Source:

Returns the index of the last ack message.

Parameters:
Name Type Description
readerRef refMapType

A reference to a writable Shared Map for acks.

channelName string

The name of the channel.

Returns:

The index of the last ack message.

Type
number

firstIndex(writerRef, channelName) → {number}

Source:

Returns the index of the first message available in the channel.

Parameters:
Name Type Description
writerRef refMapType

A reference to a writable Shared Map.

channelName string

The name of the channel.

Returns:

The index of the first message available in the channel.

Type
number

gc(writerRef, readerRef)

Source:

Garbage collects acknowledged messages.

Parameters:
Name Type Description
writerRef refMapType

A writable map with messages.

readerRef refMapType

A read-only map with ack counters.

init(writerRef)

Source:

Initializes a Shared Map containing reliable channels.

Parameters:
Name Type Description
writerRef refMapType

A reference to a writable Shared Map.

receive(writerRef, readerRef, channelName) → {messagesType}

Source:

Receives messages and updates ack counters accordingly. This operation is not idempotent, already acknowledged messages are ignored.

The return type 'messagesType' is:

  {index: number, messages: Array.<jsonType>}

where the index field corresponds to the first message in messages or UNKNOWN_ACK_INDEX if no messages.

Parameters:
Name Type Description
writerRef refMapType

A writable map ack counters.

readerRef refMapType

A read-only map with messages.

channelName string

The name of the channel to receive messages from.

Returns:

Messages received in the channel that have not been acknowledged previously.

Type
messagesType

send(writerRef, channelName, messages) → {number}

Source:

Sends an array of messages through a channel.

Parameters:
Name Type Description
writerRef refMapType

A writable map with messages.

channelName string

The name of the channel.

messages Array.<jsonType>

An array of JSON-serializable messages to be sent.

Throws:

Error If messages is not an array or is empty.

Returns:

An index for the first message in messages.

Type
number