A collection of helper functions to create a reliable, unidirectional
channel from A
to B
, using a pair of Shared Maps.
One Shared Map (the writer
) is writable by A
and readable by B
.
The other one (the reader
) is writable by B
and readable by A
.
B
writes index numbers in its map to
acknowledge messages, helping A
to garbage collect old messages.
A channel is represented by an unbounded list of messages. However, only those that have not been acknowledged are actually present in the Shared Map.
The schema is as follows:
In the writer
map:
{
__ca_channels__ : {
foo : {
contents:[...],
index : <number> i.e., index of first msg in `contents`
}
}
}
in the reader
map:
{
__ca_acks__ : {
foo : <number> (i.e., index of last processed message of 'foo')
}
}
The first message has index 0.
When B
consumes all the messages, it resets the ack index to
foo.index + foo.contents.length -1
.
When A
garbage collects the channel, it drops all the acknowledged messages
by comparing __ca_channels__.foo.index
with __ca_acks__.foo
, and
shifting the list.
- Source:
Members
UNKNOWN_ACK_INDEX :number
- Source:
Invalid ack index.
Type:
- number
Methods
allChannelNames(writerRef) → {Array.<string>}
- Source:
Lists all channel names in a Shared Map.
Parameters:
Name | Type | Description |
---|---|---|
writerRef |
refMapType | A reference to a writable Shared Map. |
Returns:
All the channel names.
- Type
- Array.<string>
deleteChannel(writerRef, channelName)
- Source:
Deletes a channel in a Shared Map.
Parameters:
Name | Type | Description |
---|---|---|
writerRef |
refMapType | A reference to a writable Shared Map. |
channelName |
string | The name of the channel. |
firstAckIndex(readerRef, channelName) → {number}
- Source:
Returns the index of the last ack message.
Parameters:
Name | Type | Description |
---|---|---|
readerRef |
refMapType | A reference to a writable Shared Map for acks. |
channelName |
string | The name of the channel. |
Returns:
The index of the last ack message.
- Type
- number
firstIndex(writerRef, channelName) → {number}
- Source:
Returns the index of the first message available in the channel.
Parameters:
Name | Type | Description |
---|---|---|
writerRef |
refMapType | A reference to a writable Shared Map. |
channelName |
string | The name of the channel. |
Returns:
The index of the first message available in the channel.
- Type
- number
gc(writerRef, readerRef)
- Source:
Garbage collects acknowledged messages.
Parameters:
Name | Type | Description |
---|---|---|
writerRef |
refMapType | A writable map with messages. |
readerRef |
refMapType | A read-only map with ack counters. |
init(writerRef)
- Source:
Initializes a Shared Map containing reliable channels.
Parameters:
Name | Type | Description |
---|---|---|
writerRef |
refMapType | A reference to a writable Shared Map. |
receive(writerRef, readerRef, channelName) → {messagesType}
- Source:
Receives messages and updates ack counters accordingly. This operation is not idempotent, already acknowledged messages are ignored.
The return type 'messagesType' is:
{index: number, messages: Array.<jsonType>}
where the index
field corresponds to the first message in messages
or
UNKNOWN_ACK_INDEX if no messages.
Parameters:
Name | Type | Description |
---|---|---|
writerRef |
refMapType | A writable map ack counters. |
readerRef |
refMapType | A read-only map with messages. |
channelName |
string | The name of the channel to receive messages from. |
Returns:
Messages received in the channel that have not been acknowledged previously.
- Type
- messagesType
send(writerRef, channelName, messages) → {number}
- Source:
Sends an array of messages through a channel.
Parameters:
Name | Type | Description |
---|---|---|
writerRef |
refMapType | A writable map with messages. |
channelName |
string | The name of the channel. |
messages |
Array.<jsonType> | An array of JSON-serializable messages to be sent. |
Throws:
Error If messages
is not an array or is empty.
Returns:
An index for the first message in messages
.
- Type
- number