Add Lua module for Gossip protocol (#3013)

This commit is contained in:
Alexandru Antochi 2020-03-06 21:24:25 +00:00 committed by Marcel Stör
parent 56a86c2661
commit 1719f90a3b
6 changed files with 819 additions and 0 deletions

191
docs/lua-modules/gossip.md Normal file
View File

@ -0,0 +1,191 @@
# ESPGossip
| Since | Origin / Contributor | Maintainer | Source |
| :----- | :-------------------- | :---------- | :------ |
| 2020-01-20 | [alexandruantochi](https://github.com/alexandruantochi) | [alexandruantochi](https://github.com/alexandruantochi) | [gossip.lua](../../lua_modules/gossip/gossip.lua) |
This module is based on the gossip protocol and it can be used to disseminate information through the network to other nodes. The time it takes for the information to reach all nodes is logN. For every round number n, 2^n nodes will receive the information.
### Require
```lua
gossip = require('gossip')
```
### Release
```lua
gossip.inboundSocket:close()
gossip = nil
```
## Usage
```lua
config = {
seedList = { '192.168.0.1', '192.168.0.15' },
debug = true,
debugOutput = print
}
gossip = require ("gossip")
gossip.setConfig(config)
gossip.start()
```
## Strategy
Each controller will randomly pick an IP from it's seed list. It will send a `SYN` request to that IP and set receiving node's `state` to an intermediary state between `Up` and `Suspect`. The node that receives the `SYN` request will compute a diff on the received networkState vs own networkState. It will then send that diff as an `ACK` request. If there is no data to send, it will only send an `ACK`. When the `ACK` is received, the sender's state will revert to `Up` and the receiving node will update it's own networkState using the diff (based on the `ACK` reply).
Gossip will establish if the information received from another node has fresher data by first comparing the `revision`, then the `heartbeat` and lastly the `state`. States that are closer to `DOWN` have priority as an offline node does not update it's heartbeat.
Any other parameter can be sent along with the mandatory `revision`, `heartbeat` and `state` thus allowing the user to spread information around the network. Every time a node receives 'fresh' data, the `gossip.updateCallback` will be called with that data as the first parameter.
Currently there is no implemented deletion for nodes that are down except for the fact that their status is signaled as `REMOVE`.
## Example use-case
There are multiple modules on the network that measure temperature. We want to know the maximum and minimum temperature at a given time and have every node display it.
The brute force solution would be to query each node from a single point and save the `min` and `max` values, then go back to each node and present them with the computed `min` and `max`. This requires n*2 rounds, where n is the number of nodes. It also opens the algorithm to a single point of failure (the node that is in charge of gathering the data).
Using gossip, one can have the node send it's latest value through `SYN` or `pushGossip()` and use the `callbackUpdate` function to compare the values from other nodes to it's own. Based on that, the node will display the values it knows about by gossiping with others. The data will be transmitted in ~log(n) rounds, where n is the number of nodes.
## Terms
`revision` : generation of the node; if a node restarts, the revision will be increased by one. The revision data is stored as a file to provide persistency
`heartBeat` : the node uptime in seconds (`tmr.time()`). This is used to help the other nodes figure out if the information about that particular node is newer.
`networkState` : the list with the state of the network composed of the `ip` as a key and `revision`, `heartBeat` and `state` as values packed in a table.
`state` : all nodes start with a state set to `UP` and when a node sends a `SYN` request, it will mark the destination node in an intermediary state until it receives an `ACK` or a `SYN` from it. If a node receives any message, it will mark that senders IP as `UP` as this provides proof that the node is online.
## setConfig()
#### Syntax
```lua
gossip.setConfig(config)
```
Sets the configuration for gossip. The available options are:
`seedList` : the list of seeds gossip will start with; this will be updated as new nodes are discovered. Note that it's enough for all nodes to start with the same IP in the seedList, as once they have one seed in common, the data will propagate
`roundInterval`: interval in milliseconds at which gossip will pick a random node from the seed list and send a `SYN` request
`comPort` : port for the listening UDP socket
`debug` : flag that will provide debugging messages
`debugOutput` : if debug is set to `true`, then this method will be used as a callback with the debug message as the first parameter
```lua
config = {
seedList = {'192.168.0.54','192.168.0.55'},
roundInterval = 10000,
comPort = 5000,
debug = true,
debugOutput = function(message) print('Gossip says: '..message); end
}
```
If any of them is not provided, the values will default:
`seedList` : nil
`roundInterval`: 10000 (10 seconds)
`comPort` : 5000
`debug` : false
`debugOutput` : print
## start()
#### Syntax
```lua
gossip.start()
```
Starts gossip, sets the `started` flag to true and initiates the `revision`. The revision (generation) main purpose is like a persistent heartbeat, as the heartbeat (measured by uptime in seconds) will obviously revert to 0.
## callbackFunction
#### Syntax
```lua
gossip.callbackFunction = function(data)
processData(data)
end
-- stop the callback
gossip.callbackFunction = nil
```
If declared, this function will get called every time there is a `SYN` with new data.
## pushGossip()
#### Syntax
```lua
gossip.pushGossip(data, [ip])
-- remove data
gossip.pushGossip(nil, [ip])
```
Send a `SYN` request outside of the normal gossip round. The IP is optional and if none given, it will pick a random node.
```
!!! note
. By calling `pushGossip(nil)` you effectively remove the `data` table from the node's network state and notify other nodes of this.
```
## setRevManually()
#### Syntax
```lua
gossip.setRevFileValue(number)
```
The only scenario when rev should be set manually is when a new node is added to the network and has the same IP. Having a smaller revision than the previous node with the same IP would make gossip think the data it received is old, thus ignoring it.
```
!!! note
The revision file value will only be read when gossip starts and it will be incremented by one.
```
## getNetworkState()
#### Syntax
```lua
networkState = gossip.getNetworkState()
print(networkState)
```
The network state can be directly accessed as a Lua table : `gossip.networkState` or it can be received as a JSON with this method.
#### Returns
JSON formatted string regarding the network state.
Example:
```JSON
{
"192.168.0.53": {
"state": 3,
"revision": 25,
"heartbeat": 2500,
"extra" : "this is some extra info from node 53"
},
"192.168.0.75": {
"state": 0,
"revision": 4,
"heartbeat": 6500
}
}
```

View File

@ -0,0 +1,73 @@
-- need a wifi connection
-- enter your wifi credentials
local credentials = {SSID = "SSID", PASS = "PASS"};
-- push a message onto the network
-- this can also be done by changing gossip.networkState[gossip.ip].data = {temperature = 78};
local function sendAlarmingData()
Gossip.pushGossip({temperature = 78});
print('Pushed alarming data');
end
local function removeAlarmingData()
Gossip.pushGossip(nil);
print('Removed alarming data from the network.');
end
-- callback function for when gossip receives an update
local function treatAlarmingData(updateData)
for k in pairs(updateData) do
if updateData[k].data then
if updateData[k].data.temperature and updateData[k].data.temperature > 30 then
print('Warning, the temp is above 30 degrees at ' .. k);
end
end
end
end
local function Startup()
-- initialize all nodes with the seed except for the seed itself
-- eventually they will all know about each other
-- enter at least one ip that will be a start seed
local startingSeed = '192.168.0.73';
-- luacheck: push allow defined
Gossip = require('gossip');
-- luacheck: pop
local config = {debug = true, seedList = {}};
if wifi.sta.getip() ~= startingSeed then
table.insert(config.seedList, startingSeed);
end
Gossip.setConfig(config);
-- add the update callback
Gossip.updateCallback = treatAlarmingData;
-- start gossiping
Gossip.start();
-- send some alarming data timer
if wifi.sta.getip() == startingSeed then
tmr.create():alarm(50000, tmr.ALARM_SINGLE, sendAlarmingData);
tmr.create():alarm(50000*3, tmr.ALARM_SINGLE, removeAlarmingData);
end
end
local function startExample()
wifi.eventmon.register(wifi.eventmon.STA_DISCONNECTED,
function() print('Diconnected') end);
print("Connecting to WiFi access point...");
if wifi.sta.getip() == nil then
wifi.setmode(wifi.STATION);
wifi.sta.config({ssid = credentials.SSID, pwd = credentials.PASS});
end
print('Ip: ' .. wifi.sta.getip() .. '. Starting in 5s ..');
tmr.create():alarm(5000, tmr.ALARM_SINGLE, Startup);
end
startExample();

View File

@ -0,0 +1,3 @@
# Gossip module
Documentation for this Lua module is available in the [gossip.md](../../docs/lua-modules/gossip.md) file and in the [Official NodeMCU Documentation](https://nodemcu.readthedocs.io/) in `Lua Modules` section.

View File

@ -0,0 +1,274 @@
-- Gossip protocol implementation
-- https://github.com/alexandruantochi/
local gossip = {};
local constants = {};
local utils = {};
local network = {};
local state = {};
-- Utils
utils.contains = function(list, element)
for k in pairs(list) do if list[k] == element then return true; end end
return false;
end
utils.debug = function(message)
if gossip.config.debug then
if gossip.config.debugOutput then
gossip.config.debugOutput(message);
else
print(message);
end
end
end
utils.getNetworkState = function() return sjson.encode(gossip.networkState); end
utils.isNodeDataValid = function(nodeData)
return (nodeData and nodeData.revision and nodeData.heartbeat and
nodeData.state) ~= nil;
end
utils.compare = function(first, second)
if first > second then return -1; end
if first < second then return 1; end
return 0;
end
utils.compareNodeData = function(first, second)
local firstDataValid = utils.isNodeDataValid(first);
local secondDataValid = utils.isNodeDataValid(second);
if firstDataValid and secondDataValid then
for index in ipairs(constants.comparisonFields) do
local comparisonField = constants.comparisonFields[index];
local comparisonResult = utils.compare(first[comparisonField],
second[comparisonField]);
if comparisonResult ~= 0 then return comparisonResult; end
end
elseif firstDataValid then
return -1;
elseif secondDataValid then
return 1;
end
return 0;
end
-- computes data1 - data2 based on node compare function
utils.getMinus = function(data1, data2)
local diff = {};
for ip, nodeData1 in pairs(data1) do
if utils.compareNodeData(nodeData1, data2[ip]) == -1 then
diff[ip] = nodeData1;
end
end
return diff;
end
utils.setConfig = function(userConfig)
for k, v in pairs(userConfig) do
if gossip.config[k] ~= nil and type(gossip.config[k]) == type(v) then
gossip.config[k] = v;
end
end
end
-- State
state.setRev = function()
local revision = 0;
if file.exists(constants.revFileName) then
revision = file.getcontents(constants.revFileName) + 1;
end
file.putcontents(constants.revFileName, revision);
utils.debug('Revision set to ' .. revision);
return revision;
end
state.setRevFileValue = function(revNumber)
if revNumber then
file.putcontents(constants.revFileName, revNumber);
utils.debug('Revision overriden to ' .. revNumber);
else
utils.debug('Please provide a revision number.');
end
end
state.start = function()
if gossip.started then
utils.debug('Gossip already started.');
return;
end
gossip.ip = wifi.sta.getip();
if not gossip.ip then
utils.debug('Node not connected to network. Gossip will not start.');
return;
end
gossip.networkState[gossip.ip] = {};
local localState = gossip.networkState[gossip.ip];
localState.revision = state.setRev();
localState.heartbeat = tmr.time();
localState.state = constants.nodeState.UP;
gossip.inboundSocket = net.createUDPSocket();
gossip.inboundSocket:listen(gossip.config.comPort);
gossip.inboundSocket:on('receive', network.receiveData);
gossip.started = true;
gossip.timer = tmr.create();
gossip.timer:register(gossip.config.roundInterval, tmr.ALARM_AUTO,
network.sendSyn);
gossip.timer:start();
utils.debug('Gossip started.');
end
state.tickNodeState = function(ip)
if gossip.networkState[ip] then
local nodeState = gossip.networkState[ip].state;
if nodeState < constants.nodeState.REMOVE then
nodeState = nodeState + constants.nodeState.TICK;
gossip.networkState[ip].state = nodeState;
end
end
end
-- Network
network.pushGossip = function(data, ip)
gossip.networkState[gossip.ip].data = data;
network.sendSyn(nil, ip);
end
network.updateNetworkState = function(updateData)
if gossip.updateCallback then gossip.updateCallback(updateData); end
for ip, data in pairs(updateData) do
if not utils.contains(gossip.config.seedList, ip) then
table.insert(gossip.config.seedList, ip);
end
gossip.networkState[ip] = data;
end
end
-- luacheck: push no unused
network.sendSyn = function(t, ip)
local destination = ip or network.pickRandomNode();
gossip.networkState[gossip.ip].heartbeat = tmr.time();
if destination then
network.sendData(destination, gossip.networkState, constants.updateType.SYN);
state.tickNodeState(destination);
end
end
-- luacheck: pop
network.pickRandomNode = function()
if #gossip.config.seedList > 0 then
local randomListPick = node.random(1, #gossip.config.seedList);
utils.debug('Randomly picked: ' .. gossip.config.seedList[randomListPick]);
return gossip.config.seedList[randomListPick];
end
utils.debug(
'Seedlist is empty. Please provide one or wait for node to be contacted.');
return nil;
end
network.sendData = function(ip, data, sendType)
local outboundSocket = net.createUDPSocket();
data.type = sendType;
local dataToSend = sjson.encode(data);
data.type = nil;
outboundSocket:send(gossip.config.comPort, ip, dataToSend);
outboundSocket:close();
end
network.receiveSyn = function(ip, synData)
utils.debug('Received SYN from ' .. ip);
local update = utils.getMinus(synData, gossip.networkState);
local diff = utils.getMinus(gossip.networkState, synData);
network.updateNetworkState(update);
network.sendAck(ip, diff);
end
network.receiveAck = function(ip, ackData)
utils.debug('Received ACK from ' .. ip);
local update = utils.getMinus(ackData, gossip.networkState);
network.updateNetworkState(update);
end
network.sendAck = function(ip, diff)
local diffIps = '';
for k in pairs(diff) do diffIps = diffIps .. ' ' .. k; end
utils.debug('Sending ACK to ' .. ip .. ' with ' .. diffIps .. ' updates.');
network.sendData(ip, diff, constants.updateType.ACK);
end
-- luacheck: push no unused
network.receiveData = function(socket, data, port, ip)
if gossip.networkState[ip] then
gossip.networkState[ip].state = constants.nodeState.UP;
end
local messageDecoded, updateData = pcall(sjson.decode, data);
if not messageDecoded then
utils.debug('Invalid JSON received from ' .. ip);
return;
end
local updateType = updateData.type;
updateData.type = nil;
if updateType == constants.updateType.SYN then
network.receiveSyn(ip, updateData);
elseif updateType == constants.updateType.ACK then
network.receiveAck(ip, updateData);
else
utils.debug('Invalid data comming from ip ' .. ip ..
'. No valid type specified.');
end
end
-- luacheck: pop
-- Constants
constants.nodeState = {TICK = 1, UP = 0, SUSPECT = 2, DOWN = 3, REMOVE = 4};
constants.defaultConfig = {
seedList = {},
roundInterval = 15000,
comPort = 5000,
debug = false
};
constants.comparisonFields = {'revision', 'heartbeat', 'state'};
constants.updateType = {ACK = 'ACK', SYN = 'SYN'}
constants.revFileName = 'gossip/rev.dat';
-- Return
gossip = {
started = false,
config = constants.defaultConfig,
setConfig = utils.setConfig,
start = state.start,
setRevFileValue = state.setRevFileValue,
networkState = {},
getNetworkState = utils.getNetworkState,
pushGossip = network.pushGossip
};
-- return
if (... == 'test') then
return {
_gossip = gossip,
_constants = constants,
_utils = utils,
_network = network,
_state = state
};
elseif net and file and tmr and wifi then
return gossip;
else
error('Gossip requires these modules to work: net, file, tmr, wifi');
end

View File

@ -0,0 +1,277 @@
-- Gossip protocol implementation tests
-- https://github.com/alexandruantochi/
local gossipSubmodules = loadfile('gossip.lua')('test');
local gossip = gossipSubmodules._gossip;
local constants = gossipSubmodules._constants;
local utils = gossipSubmodules._utils;
local network = gossipSubmodules._network;
local state = gossipSubmodules._state;
-- test constants and mocks
local function dummy() return nil; end
-- luacheck: push allow defined
tmr = {};
tmr.time = function() return 200; end
sjson = {};
sjson.decode = function(data) return data; end
file = {};
file.exists = dummy
file.putcontents = dummy
-- luacheck: pop
local Ip_1 = '192.168.0.1';
local Ip_2 = '192.168.0.2';
-- test runner
local Test = {};
local RunTests = function()
local failures = {};
print('\nRunning tests...\n');
for testName, test in pairs(Test) do
if type(test) == 'function' then
local result = testName .. ': ';
local passed, res = pcall(test);
if passed then
result = result .. ' Passed.';
else
result = result .. ' Failed ->';
result = '>>>' .. result .. res;
table.insert(failures, testName);
end
print(result);
end
end
if (#failures ~= 0) then
print('\n\n');
print('Failed tests (' .. #failures .. '): \n');
for k in pairs(failures) do print(failures[k]); end
print('\n');
end
end
-- utils
function Test.utils_contains()
local seedList = {};
assert(not utils.contains(seedList, Ip_1));
table.insert(seedList, Ip_1);
assert(utils.contains(seedList, Ip_1));
table.insert(seedList, Ip_2);
assert(utils.contains(seedList, Ip_1) and utils.contains(seedList, Ip_2));
end
function Test.utils_setConfig()
local config = {
seedList = {Ip_1},
roundInterval = 1500,
comPort = 8000,
junk = 'junk'
};
gossip.config = constants.defaultConfig;
utils.setConfig(config);
assert(#gossip.config.seedList == 1, 'Config failed when adding seedList');
assert(gossip.config.seedList[1] == Ip_1,
'Config failed to add ip to seedList');
assert(gossip.config.roundInterval == 1500,
'Config failed to add round interval.');
assert(gossip.config.comPort == 8000, 'Config failed to add comPort.');
assert(gossip.config.debug == false, 'Debug should be false.');
assert(gossip.config.junk == nil, 'Junk data inserted in config.');
gossip.config = constants.defaultConfig;
end
function Test.utils_compare()
assert(utils.compare(1, 2) == 1);
assert(utils.compare(2, 1) == -1);
assert(utils.compare(0, 0) == 0);
end
function Test.utils_compareNodeData_on_revision()
local networkData_1 = {
revision = 1,
heartbeat = 500,
state = constants.nodeState.UP
};
local networkData_2 = {
revision = 2,
heartbeat = 500,
state = constants.nodeState.UP
};
assert(utils.compareNodeData(networkData_1, networkData_2) == 1);
assert(utils.compareNodeData(networkData_2, networkData_1) == -1);
networkData_1.revision = networkData_2.revision;
assert(utils.compareNodeData(networkData_1, networkData_2) == 0);
end
function Test.utils_compareNodeData_on_heartbeat()
local networkData_1 = {
revision = 1,
heartbeat = 500,
state = constants.nodeState.UP
};
local networkData_2 = {
revision = 1,
heartbeat = 600,
state = constants.nodeState.UP
};
assert(utils.compareNodeData(networkData_1, networkData_2) == 1);
assert(utils.compareNodeData(networkData_2, networkData_1) == -1);
networkData_1.heartbeat = networkData_2.heartbeat;
assert(utils.compareNodeData(networkData_1, networkData_2) == 0);
end
function Test.utils_compareNodeData_on_state()
local networkData_1 = {
revision = 1,
heartbeat = 500,
state = constants.nodeState.UP
};
local networkData_2 = {
revision = 1,
heartbeat = 500,
state = constants.nodeState.SUSPECT
};
assert(utils.compareNodeData(networkData_1, networkData_2) == 1);
assert(utils.compareNodeData(networkData_2, networkData_1) == -1);
networkData_1.state = networkData_2.state;
assert(utils.compareNodeData(networkData_1, networkData_2) == 0);
end
function Test.utils_compareNodeData_on_bad_data()
local networkData_1 = {
revision = 1,
heartbeat = nil,
state = constants.nodeState.UP
};
local networkData_2 = {
revision = 1,
heartbeat = 600,
state = constants.nodeState.UP
};
assert(utils.compareNodeData(networkData_1, networkData_2) == 1);
assert(utils.compareNodeData(networkData_2, networkData_1) == -1);
networkData_2.state = nil;
assert(utils.compareNodeData(networkData_1, networkData_2) == 0);
end
function Test.utils_getMinus()
local data1 = {};
local data2 = {};
data1[Ip_1] = {
revision = 1,
heartbeat = 500,
state = constants.nodeState.UP
};
data1[Ip_2] = {
revision = 1,
heartbeat = 400,
state = constants.nodeState.UP
};
data2[Ip_1] = {
revision = 1,
heartbeat = 400,
state = constants.nodeState.UP
};
data2[Ip_2] = {
revision = 1,
heartbeat = 400,
state = constants.nodeState.SUSPECT;
};
--local diff1 = utils.getMinus(data1, data2);
local diff2 = utils.getMinus(data2, data1);
--assert(diff1[Ip_1] ~= nil and diff1[Ip_2] == nil);
assert(diff2[Ip_1] == nil and diff2[Ip_2] ~= nil);
end
-- state
function Test.state_setRev()
gossip.ip = Ip_1;
gossip.networkState[Ip_1] = {};
gossip.networkState[Ip_1].revision = -1;
assert(state.setRev() == 0, 'Revision not initialized to 0.');
end
function Test.state_tickNodeState()
local ip_1 = Ip_1;
local ip_2 = Ip_2;
gossip.networkState[ip_1] = {};
gossip.networkState[ip_2] = {};
gossip.networkState[ip_1].state = constants.nodeState.UP;
gossip.networkState[ip_2].state = constants.nodeState.DOWN;
state.tickNodeState(ip_1);
state.tickNodeState(ip_2);
assert(gossip.networkState[ip_1].state == constants.nodeState.UP +
constants.nodeState.TICK);
assert(gossip.networkState[ip_2].state == constants.nodeState.REMOVE);
state.tickNodeState(ip_1);
assert(gossip.networkState[ip_1].state == constants.nodeState.SUSPECT);
gossip.networkState = {};
end
-- network
function Test.network_updateNetworkState_no_callback()
local updateData = {}
updateData[Ip_1] = {
revision = 1,
heartbeat = 400,
state = constants.nodeState.UP
};
updateData[Ip_2] = {
revision = 1,
heartbeat = 700,
state = constants.nodeState.UP
};
network.updateNetworkState(updateData);
-- send duplicate data
network.updateNetworkState(updateData);
assert(#gossip.config.seedList == 2);
assert(gossip.config.seedList[1] == Ip_1);
assert(gossip.config.seedList[2] == Ip_2);
assert(gossip.networkState[Ip_1] ~= nil and gossip.networkState[Ip_2] ~= nil);
gossip.networkState = {};
gossip.config = constants.defaultConfig;
end
function Test.network_updateNetworkState_with_callback()
local callbackTriggered = false;
local function updateCallback() callbackTriggered = true; end
gossip.updateCallback = updateCallback;
Test.network_updateNetworkState_no_callback();
assert(callbackTriggered);
gossip.updateCallback = nil;
end
function Test.network_receiveData_when_receive_syn()
local originalReceiveSyn = network.receiveSyn;
local receiveSynCalled = false;
network.receiveSyn = function() receiveSynCalled = true; end
network.receiveData('socket', {type = constants.updateType.SYN});
network.receiveSyn = originalReceiveSyn;
assert(receiveSynCalled);
end
function Test.network_receiveData_when_receive_ack()
local originalReceiveAck = network.receiveAck;
local receiveAckCalled = false;
network.receiveAck = function() receiveAckCalled = true; end
network.receiveData('socket', {type = constants.updateType.ACK});
network.receiveAck = originalReceiveAck;
assert(receiveAckCalled);
end
-- run tests
RunTests();

View File

@ -45,6 +45,7 @@ pages:
- 'fifo' : 'lua-modules/fifo.md' - 'fifo' : 'lua-modules/fifo.md'
- 'fifosock' : 'lua-modules/fifosock.md' - 'fifosock' : 'lua-modules/fifosock.md'
- 'ftpserver': 'lua-modules/ftpserver.md' - 'ftpserver': 'lua-modules/ftpserver.md'
- 'gossip': 'lua-modules/gossip.md'
- 'hdc1000': 'lua-modules/hdc1000.md' - 'hdc1000': 'lua-modules/hdc1000.md'
- 'httpserver': 'lua-modules/httpserver.md' - 'httpserver': 'lua-modules/httpserver.md'
- 'imap': 'lua-modules/imap.md' - 'imap': 'lua-modules/imap.md'