nodemcu-firmware/lua_modules/gossip/gossip.lua

290 lines
8.0 KiB
Lua

-- Gossip protocol implementation
-- https://github.com/alexandruantochi/
local gossip = {};
local constants = {};
local utils = {};
local network = {};
local state = {};
-- Utils
utils.contains = function(list, element)
for k in pairs(list) do if list[k] == element then return true; end end
return false;
end
utils.debug = function(message)
if gossip.config.debug then
gossip.config.debugOutput(message);
end
end
utils.getNetworkState = function() return sjson.encode(gossip.networkState); end
utils.isNodeDataValid = function(nodeData)
return (nodeData and nodeData.revision and nodeData.heartbeat and
nodeData.state) ~= nil;
end
utils.compare = function(first, second)
if first > second then return -1; end
if first < second then return 1; end
return 0;
end
utils.compareNodeData = function(first, second)
local firstDataValid = utils.isNodeDataValid(first);
local secondDataValid = utils.isNodeDataValid(second);
if firstDataValid and secondDataValid then
for index in ipairs(constants.comparisonFields) do
local comparisonField = constants.comparisonFields[index];
local comparisonResult = utils.compare(first[comparisonField],
second[comparisonField]);
if comparisonResult ~= 0 then return comparisonResult; end
end
elseif firstDataValid then
return -1;
elseif secondDataValid then
return 1;
end
return 0;
end
-- computes data1 - data2 based on node compare function
utils.getMinus = function(data1, data2)
local diff = {};
for ip, nodeData1 in pairs(data1) do
if utils.compareNodeData(nodeData1, data2[ip]) == -1 then
diff[ip] = nodeData1;
end
end
return diff;
end
utils.setConfig = function(userConfig)
for k, v in pairs(userConfig) do
if gossip.config[k] ~= nil and type(gossip.config[k]) == type(v) then
gossip.config[k] = v;
end
end
end
-- State
state.setRev = function()
local revision = 0;
if file.exists(constants.revFileName) then
revision = file.getcontents(constants.revFileName) + 1;
end
file.putcontents(constants.revFileName, revision);
utils.debug('Revision set to ' .. revision);
return revision;
end
state.setRevFileValue = function(revNumber)
if revNumber then
file.putcontents(constants.revFileName, revNumber);
utils.debug('Revision overriden to ' .. revNumber);
else
utils.debug('Please provide a revision number.');
end
end
state.start = function()
if gossip.started then
utils.debug('Gossip already started.');
return;
end
gossip.ip = wifi.sta.getip();
if not gossip.ip then
utils.debug('Node not connected to network. Gossip will not start.');
return;
end
-- sending to own IP makes no sense
for index, value in ipairs(gossip.config.seedList) do
if value == gossip.ip then
table.remove(gossip.config.seedList, index)
utils.debug('removing own ip from seed list')
end
end
gossip.networkState[gossip.ip] = {};
local localState = gossip.networkState[gossip.ip];
localState.revision = state.setRev();
localState.heartbeat = tmr.time();
localState.state = constants.nodeState.UP;
gossip.socket = net.createUDPSocket();
gossip.socket:listen(gossip.config.comPort);
gossip.socket:on('receive', network.receiveData);
gossip.started = true;
gossip.timer = tmr.create();
gossip.timer:register(gossip.config.roundInterval, tmr.ALARM_AUTO,
network.sendSyn);
gossip.timer:start();
utils.debug('Gossip started.');
end
state.tickNodeState = function(ip)
if gossip.networkState[ip] then
local nodeState = gossip.networkState[ip].state;
local oldNodeState = nodeState;
if nodeState < constants.nodeState.REMOVE then
nodeState = nodeState + constants.nodeState.TICK;
gossip.networkState[ip].state = nodeState;
end
if oldNodeState == constants.nodeState.DOWN then
if gossip.updateCallback then gossip.updateCallback(gossip.networkState[ip]); end
end
end
end
-- Network
network.broadcastIp = "255.255.255.255"
network.pushGossip = function(data, ip)
if not gossip.started then
utils.debug('Gossip not started.');
return;
end
gossip.networkState[gossip.ip].data = data;
network.sendSyn(nil, ip);
end
network.updateNetworkState = function(updateData)
if gossip.updateCallback then gossip.updateCallback(updateData); end
for ip, data in pairs(updateData) do
if not utils.contains(gossip.config.seedList, ip) and ip ~= network.broadcastIp and ip ~= gossip.ip then
table.insert(gossip.config.seedList, ip);
end
if ip ~= gossip.ip then
gossip.networkState[ip] = data;
end
end
end
-- luacheck: push no unused
network.sendSyn = function(t, ip)
local destination = ip or network.pickRandomNode();
gossip.networkState[gossip.ip].heartbeat = tmr.time();
if destination then
network.sendData(destination, gossip.networkState, constants.updateType.SYN);
state.tickNodeState(destination);
end
end
-- luacheck: pop
network.pickRandomNode = function()
if #gossip.config.seedList > 0 then
local randomListPick = node.random(1, #gossip.config.seedList);
utils.debug('Randomly picked: ' .. gossip.config.seedList[randomListPick]);
return gossip.config.seedList[randomListPick];
end
utils.debug(
'Seedlist is empty. Using broadcast IP '..network.broadcastIp..' to discover.');
return network.broadcastIp;
end
network.sendData = function(ip, data, sendType)
data.type = sendType
local dataToSend = sjson.encode(data)
data.type = nil
gossip.socket:send(gossip.config.comPort, ip, dataToSend)
utils.debug("Sent "..#dataToSend.." bytes")
end
network.receiveSyn = function(ip, synData)
utils.debug('Received SYN from ' .. ip);
local update = utils.getMinus(synData, gossip.networkState);
local diff = utils.getMinus(gossip.networkState, synData);
network.updateNetworkState(update);
network.sendAck(ip, diff);
end
network.receiveAck = function(ip, ackData)
utils.debug('Received ACK from ' .. ip);
local update = utils.getMinus(ackData, gossip.networkState);
network.updateNetworkState(update);
end
network.sendAck = function(ip, diff)
local diffIps = '';
for k in pairs(diff) do diffIps = diffIps .. ' ' .. k; end
utils.debug('Sending ACK to ' .. ip .. ' with ' .. diffIps .. ' updates.');
network.sendData(ip, diff, constants.updateType.ACK);
end
-- luacheck: push no unused
network.receiveData = function(socket, data, port, ip)
if gossip.networkState[ip] then
gossip.networkState[ip].state = constants.nodeState.UP;
end
local messageDecoded, updateData = pcall(sjson.decode, data);
if not messageDecoded then
utils.debug('Invalid JSON received from ' .. ip);
return;
end
local updateType = updateData.type;
updateData.type = nil;
if updateType == constants.updateType.SYN then
network.receiveSyn(ip, updateData);
elseif updateType == constants.updateType.ACK then
network.receiveAck(ip, updateData);
else
utils.debug('Invalid data comming from ip ' .. ip ..
'. No valid type specified.');
end
end
-- luacheck: pop
-- Constants
constants.nodeState = {TICK = 1, UP = 0, SUSPECT = 2, DOWN = 3, REMOVE = 4};
constants.defaultConfig = {
seedList = {},
roundInterval = 15000,
comPort = 5000,
debug = false,
debugOutput = print
};
constants.comparisonFields = {'revision', 'heartbeat', 'state'};
constants.updateType = {ACK = 'ACK', SYN = 'SYN'}
constants.revFileName = 'gossip/rev.dat';
-- Return
gossip = {
started = false,
config = constants.defaultConfig,
setConfig = utils.setConfig,
start = state.start,
setRevFileValue = state.setRevFileValue,
networkState = {},
getNetworkState = utils.getNetworkState,
pushGossip = network.pushGossip
};
-- return
if (... == 'test') then
return {
_gossip = gossip,
_constants = constants,
_utils = utils,
_network = network,
_state = state
};
elseif net and file and tmr and wifi then
return gossip;
else
error('Gossip requires these modules to work: net, file, tmr, wifi');
end