From 4207d6c8adb1e99f6ab5cfc4b13971da3763c73e Mon Sep 17 00:00:00 2001 From: Andrew Gough Date: Thu, 1 Feb 2018 21:03:10 +1100 Subject: [PATCH 1/5] Add espmqtt library + bindings --- .gitmodules | 3 + components/espmqtt | 1 + components/modules/Kconfig | 6 + components/modules/mqtt.c | 626 +++++++++++++++++++++++++++++++++++++ 4 files changed, 636 insertions(+) create mode 160000 components/espmqtt create mode 100644 components/modules/mqtt.c diff --git a/.gitmodules b/.gitmodules index 14fb0722..b4038b19 100644 --- a/.gitmodules +++ b/.gitmodules @@ -8,3 +8,6 @@ [submodule "components/u8g2/u8g2"] path = components/u8g2/u8g2 url = https://github.com/olikraus/U8g2_Arduino.git +[submodule "components/espmqtt"] + path = components/espmqtt + url = https://github.com/tuanpmt/espmqtt diff --git a/components/espmqtt b/components/espmqtt new file mode 160000 index 00000000..2967332b --- /dev/null +++ b/components/espmqtt @@ -0,0 +1 @@ +Subproject commit 2967332b95454d4b53068a0d5484ae60e312eb12 diff --git a/components/modules/Kconfig b/components/modules/Kconfig index 1d8ed4c3..98ae8f48 100644 --- a/components/modules/Kconfig +++ b/components/modules/Kconfig @@ -180,4 +180,10 @@ config LUA_MODULE_WS2812 help Includes the ws2812 module. +config LUA_MODULE_MQTT + bool "MQTT module" + default "n" + help + Includes the MQTT module. + endmenu diff --git a/components/modules/mqtt.c b/components/modules/mqtt.c new file mode 100644 index 00000000..3b2d8c48 --- /dev/null +++ b/components/modules/mqtt.c @@ -0,0 +1,626 @@ +// Module for interfacing with an MQTT broker + +#include "module.h" +#include "lauxlib.h" +#include "platform.h" +#include "task/task.h" + +#include "mqtt.h" +#include + +task_handle_t hConn; +task_handle_t hOff; +task_handle_t hPub; +task_handle_t hSub; +task_handle_t hData; + +//used as a holder to copy received data from the MQTT task +//to the main Lua task. Due to the async nature of the tasks +//we must copy the data to deliver it intact to the Lua +//callback +typedef struct lmqtt_ctx +{ + mqtt_client * client; + char topic[CONFIG_MQTT_MAX_LWT_TOPIC]; + char * data; +} lmqtt_ctx_t; + + +// locate the C mqtt_client pointer and leave the +// Lua instance on the top of the stack +static mqtt_client * get_client( lua_State * L ) +{ + if( !lua_istable( L, 1 ) ) + { + luaL_error( L, "Expected MQTT module (client)" ); + return 0; //never reached + } + + lua_getfield( L, 1, "_client" ); + if( !lua_islightuserdata( L, -1 ) ) + { + luaL_error( L, "Expected MQTT client pointer" ); + return 0; //never reached + } + + mqtt_client * client = (mqtt_client *) lua_touserdata( L, -1 ); + lua_pop( L, 1 ); // just pop the _mqtt field + return client; +} + +// locate the C mqtt_settings pointer and leave the +// Lua instance on the top of the stack +static mqtt_settings * get_settings( lua_State * L ) +{ + if( !lua_istable( L, 1 ) ) + { + luaL_error( L, "Expected MQTT module (settings)" ); + return 0; //never reached + } + + lua_getfield( L, 1, "_settings" ); + if( !lua_islightuserdata( L, -1 ) ) + { + luaL_error( L, "Expected MQTT settings pointer" ); + return 0; //never reached + } + + mqtt_settings * settings = (mqtt_settings *) lua_touserdata( L, -1 ); + lua_pop( L, 1 ); // just pop the _mqtt field + return settings; +} + +// Lua: on() +static int lmqtt_on(lua_State *L) +{ + enum events{ + ON_CONNECT = 0, + ON_MESSAGE = 1, + ON_OFFLINE = 2 + }; + const char *const eventnames[] = {"connect", "message", "offline", NULL}; + + // mqtt_settings * settings = get_settings( L ); + int event = luaL_checkoption(L, 2, "message", eventnames); + + if( !lua_isfunction( L, 3 ) ) + return 0; + + switch (event) { + case ON_CONNECT: + lua_setfield(L, 1, "_on_connect"); + break; + case ON_MESSAGE: + lua_setfield(L, 1, "_on_message"); + break; + case ON_OFFLINE: + lua_setfield(L, 1, "_on_offline"); + break; + default: + return 0; + } + + lua_pop(L, 1); //pop event name + return 0; +} + +//typedef void (*task_callback_t)(task_param_t param, task_prio_t prio); +static void lmqtt_connected_cb(task_param_t param, task_prio_t prio) +{ + lua_State * L = lua_getstate(); //returns main Lua state + if( L == NULL ) + return; + + mqtt_client * client = (mqtt_client *) param; + + int top = lua_gettop(L); + lua_checkstack(L, 8); + + char key[64]; + snprintf(key, 64, "mqtt_%p", client->settings); + lua_getglobal( L, key ); //retrieve MQTT table from _G + NODE_DBG("CB:connect: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); + + lua_getfield( L, -1, "_connect_ok" ); + if( lua_isfunction( L, -1 ) ) + { + int top1 = lua_gettop(L); + + NODE_DBG("CB:connect: calling registered one-shot connect callback\n"); + lua_pushvalue( L, -2 ); //dup mqtt table + int res = lua_pcall( L, 1, 0, 0 ); //call the connect callback + if( res != 0 ) + NODE_DBG("CB:connect: Error when calling one-shot connect callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); + + //after connecting ok, we clear _both_ the one-shot callbacks + lua_pushnil(L); + lua_setfield(L, 1, "_connect_ok"); + lua_pushnil(L); + lua_setfield(L, 1, "_connect_nok"); + + lua_settop(L, top1); + } + + // now we check for the standard connect callback registered with 'mqtt:on()' + lua_getfield( L, 1, "_on_connect" ); + if( lua_isfunction( L, -1 ) ) + { + NODE_DBG("CB:connect: calling registered standard connect callback\n"); + lua_pushvalue( L, 1 ); //dup mqtt table + int res = lua_pcall( L, 1, 0, 0 ); //call the connect callback + if( res != 0 ) + NODE_DBG("CB:connect: Error when calling connect callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); + } + lua_settop(L, top); +} + +static void connected_cb( mqtt_client *client, mqtt_event_data_t *event_data ) +{ + task_post_medium(hConn, (task_param_t) client); +} + + +static void lmqtt_disconnected_cb(task_param_t param, task_prio_t prio) +{ + lua_State * L = lua_getstate(); //returns main Lua state + if( L == NULL ) + return; + + mqtt_client * client = (mqtt_client *) param; + + int top = lua_gettop(L); + lua_checkstack(L, 8); + + char key[64]; + snprintf(key, 64, "mqtt_%p", client->settings); + lua_getglobal( L, key ); //retrieve MQTT table from _G + NODE_DBG("CB:disconnect: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); + + lua_getfield( L, -1, "_connect_nok" ); + if( lua_isfunction( L, -1 ) ) + { + NODE_DBG("CB:disconnect: calling registered one-shot disconnect callback\n"); + lua_pushvalue( L, -2 ); //dup mqtt table + int res = lua_pcall( L, 1, 0, 0 ); //call the disconnect callback + if( res != 0 ) + NODE_DBG("CB:disconnect: Error when calling one-shot disconnect callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); + + //after connecting ok, we clear _both_ the one-shot callbacks + lua_pushnil(L); + lua_setfield(L, 1, "_connect_ok"); + lua_pushnil(L); + lua_setfield(L, 1, "_connect_nok"); + } + + // now we check for the standard connect callback registered with 'mqtt:on()' + lua_getfield( L, -1, "_on_offline" ); + if( !lua_isfunction( L, -1 ) || lua_isnil( L, -1 ) ) + return; + + NODE_DBG("CB:disconnect: calling registered standard offline callback\n"); + lua_pushvalue( L, -2 ); //dup mqtt table + int res = lua_pcall( L, 1, 0, 0 ); //call the offline callback + if( res != 0 ) + NODE_DBG("CB:disconnect: Error when calling offline callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); + + lua_settop(L, top); +} + +static void disconnected_cb( mqtt_client *client, mqtt_event_data_t *event_data ) +{ + task_post_medium( hOff, (task_param_t) client); +} + +static void lmqtt_subscribe_cb(task_param_t param, task_prio_t prio) +{ + lua_State * L = lua_getstate(); //returns main Lua state + if( L == NULL ) + return; + + mqtt_client * client = (mqtt_client *) param; + + int top = lua_gettop(L); + lua_checkstack(L, 8); + + char key[64]; + snprintf(key, 64, "mqtt_%p", client->settings); + lua_getglobal( L, key ); //retrieve MQTT table from _G + NODE_DBG("CB:subscribe: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); + + lua_getfield( L, 1, "_subscribe_ok" ); + if( lua_isfunction( L, -1 ) ) + { + NODE_DBG("CB:subscribe: calling registered one-shot subscribe callback\n"); + lua_pushvalue( L, 1 ); //dup mqtt table + int res = lua_pcall( L, 1, 0, 0 ); //call the disconnect callback + if( res != 0 ) + NODE_DBG("CB:subscribe: Error when calling one-shot subscribe callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); + + lua_pushnil(L); + lua_setfield(L, 1, "_subscribe_ok"); + } + lua_settop(L, top); +} + +static void subscribe_cb( mqtt_client *client, mqtt_event_data_t *event_data ) +{ + task_post_medium(hSub, (task_param_t) client); +} + +static void lmqtt_publish_cb(task_param_t param, task_prio_t prio) +{ + NODE_DBG("CB:publish: successfully transferred control back to main task\n"); + lua_State * L = lua_getstate(); //returns main Lua state + if( L == NULL ) + return; + + mqtt_client * client = (mqtt_client *) param; + + int top = lua_gettop(L); + lua_checkstack(L, 8); + + char key[64]; + snprintf(key, 64, "mqtt_%p", client->settings); + lua_getglobal( L, key ); //retrieve MQTT table from _G + NODE_DBG("CB:publish: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); + + lua_getfield( L, 1, "_publish_ok" ); + if( lua_isfunction( L, -1 ) ) + { + NODE_DBG("CB:publish: calling registered one-shot publish callback\n"); + lua_pushvalue( L, 1 ); //dup mqtt table + int res = lua_pcall( L, 1, 0, 0 ); //call the disconnect callback + if( res != 0 ) + NODE_DBG("CB:publish: Error when calling one-shot publish callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); + + lua_pushnil(L); + lua_setfield(L, 1, "_publish_ok"); + } + lua_settop(L, top); +} + +static void publish_cb( mqtt_client *client, mqtt_event_data_t *event_data ) +{ + NODE_DBG("CB:publish: transferring control back to main task\n"); + task_post_medium(hPub, (task_param_t) client); +} + +static void lmqtt_data_cb(task_param_t param, task_prio_t prio) +{ + lua_State * L = lua_getstate(); //returns main Lua state + if( L == NULL ) + return; + + lmqtt_ctx_t * ctx = (lmqtt_ctx_t *) param; + + int top = lua_gettop(L); + lua_checkstack(L, 8); + + char key[64]; + snprintf(key, 64, "mqtt_%p", ctx->client->settings); + lua_getglobal( L, key ); //retrieve MQTT table from _G + NODE_DBG("CB:data: state %p, settings %p, stack top %d\n", L, ctx->client->settings, lua_gettop(L)); + + lua_getfield( L, 1, "_on_message" ); + if( lua_isfunction( L, -1 ) ) + { + int numArg = 2; + lua_pushvalue( L, 1 ); //dup mqtt table + lua_pushstring( L, ctx->topic ); + if( ctx->data != NULL ) + { + lua_pushstring( L, ctx->data ); + numArg++; + } + + int res = lua_pcall( L, numArg, 0, 0 ); //call the messagecallback + if( res != 0 ) + NODE_DBG("CB:data: Error when calling message callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); + } + lua_settop(L, top); + + if( ctx->data != NULL ) + free( ctx->data ); +} + +static void data_cb( mqtt_client *client, mqtt_event_data_t *event_data ) +{ + NODE_DBG("CB:data: topic len %d, data len %d\n", event_data->topic_length, event_data->data_length); + + lmqtt_ctx_t * ctx = malloc(sizeof(lmqtt_ctx_t)); + ctx->client = client; + strncpy(ctx->topic, event_data->topic, event_data->topic_length); + ctx->topic[event_data->topic_length] = '\0'; + + ctx->data = NULL; + if( event_data->data_length > 0 ) + { + ctx->data = malloc( event_data->data_length ); + strncpy( ctx->data, event_data->data, event_data->data_length ); + ctx->data[event_data->data_length] = '\0'; + } + + task_post_medium(hData, (task_param_t) ctx); +} + + +// Lua: mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)[, function(client, reason)]]) +static int lmqtt_connect( lua_State* L ) +{ + mqtt_settings * settings = get_settings( L ); + + // int secure = 0; + int reconnect = 0; + const char * host = luaL_checkstring( L, 2 ); + int port = 1883; + int n = 3; + if( lua_isnumber( L, n ) ) + { + port = luaL_checknumber( L, n ); + n++; + } + + if( lua_isnumber( L, n ) ) + { + // secure = !!luaL_checkinteger( L, -4 ); + n++; + } + + if( lua_isnumber( L, n ) ) + { + reconnect = !!luaL_checkinteger( L, n ); + n++; + } + + if( lua_isfunction( L, n ) ) + { + lua_pushvalue( L, n ); + lua_setfield( L, 1, "_connect_ok" ); // set _G["_cb_connect_ok"] = fn() + n++; + } + + if( lua_isfunction( L, n ) ) + { + lua_pushvalue( L, n ); + lua_setfield(L, 1, "_connect_nok"); // set _G["_cb_connect_nok"] = fn() + n++; + } + + lua_pop( L, n - 2 ); //pop parameters + + strncpy(settings->host, host, CONFIG_MQTT_MAX_HOST_LEN ); + settings->port = port; + + settings->auto_reconnect = reconnect != 0; + + settings->connected_cb = connected_cb; + settings->disconnected_cb = disconnected_cb; + settings->subscribe_cb = subscribe_cb; + settings->publish_cb = publish_cb; + settings->data_cb = data_cb; + + mqtt_client * client = mqtt_start( settings ); + if( client == NULL ) + { + luaL_error( L, "MQTT library failed to start" ); + return 0; + } + + NODE_DBG("Created MQTT client @ %p, settings @ %p, state @ %p, top %d \n", client, settings, L, lua_gettop( L ) ); + lua_pushlightuserdata( L, client ); + lua_setfield( L, -2, "_client" ); //and store a reference in the MQTT table + + return 0; +} + +// Lua: mqtt:close() +static int lmqtt_close( lua_State* L ) +{ + mqtt_client * client = get_client( L ); + if( client == NULL ) + return 0; + + NODE_DBG("Closing MQTT client %p\n", client); + + char id[64]; + snprintf(id, 64, "mqtt_%p", client->settings); + lua_pushnil( L ); + lua_setglobal( L, id ); // remove global reference + + lua_pushstring( L, "_client" ); + lua_pushnil( L ); + lua_settable( L, -3 ); //and remove a reference in the MQTT table + + return 0; +} + +// Lua: mqtt:lwt(topic, message[, qos[, retain]]) +static int lmqtt_lwt( lua_State* L ) +{ + mqtt_settings * settings = get_settings( L ); + + strncpy( settings->lwt_topic, luaL_checkstring( L, 2 ), CONFIG_MQTT_MAX_LWT_TOPIC ); + strncpy( settings->lwt_msg, luaL_checkstring( L, 3 ), CONFIG_MQTT_MAX_LWT_MSG ); + settings->lwt_msg_len = strlen( settings->lwt_msg ); + + int n = 4; + if( lua_isnumber( L, n ) ) + { + settings->lwt_qos = lua_tonumber( L, n ); + n++; + } + + if( lua_isnumber( L, n ) ) + { + settings->lwt_retain = lua_tonumber( L, n ); + n++; + } + + lua_pop( L, n ); + NODE_DBG("Set LWT topic '%s', qos %d, retain %d, len %d\n", settings->lwt_topic, settings->lwt_qos, settings->lwt_retain, settings->lwt_msg_len); + return 0; +} + +//Lua: mqtt:publish(topic, payload, qos, retain[, function(client)]) +static int lmqtt_publish( lua_State * L ) +{ + mqtt_client * client = get_client( L ); + + int top = lua_gettop(L); + + const char * topic = luaL_checkstring( L, 2 ); + const char * data = luaL_checkstring( L, 3 ); + int qos = luaL_checkint( L, 4 ); + int retain = luaL_checkint( L, 5 ); + + int n = 6; + if( lua_isfunction( L, n ) ) + { + lua_pushvalue( L, n ); + lua_setfield(L, 1, "_publish_ok"); // set _G["_cb_connect_nok"] = fn() + n++; + } + + lua_settop(L, top ); + NODE_DBG("MQTT publish client id %s, topic %s, %d bytes\n", client->settings->client_id, topic, strlen(data)); + mqtt_publish(client, topic, data, strlen(data), qos, retain); + return 0; +} + +// Lua: mqtt:subscribe(topic, qos[, function(client)]) OR mqtt:subscribe(table[, function(client)]) +static int lmqtt_subscribe( lua_State* L ) +{ + mqtt_client * client = get_client( L ); + + int top = lua_gettop(L); + + const char * topic = luaL_checkstring( L, 2 ); + int qos = luaL_checkint( L, 3 ); + + if( lua_isfunction( L, 4 ) ) + { + lua_pushvalue( L, 4 ); + lua_setfield(L, 1, "_subscribe_ok"); // set _G["_cb_connect_nok"] = fn() + } + + lua_settop(L, top ); + NODE_DBG("MQTT subscribe client id %s, topic %s\n", client->settings->client_id, topic); + mqtt_subscribe(client, topic, qos); + return 0; +} + +// Lua: mqtt:unsubscribe(topic[, function(client)]) OR mqtt:unsubscribe(table[, function(client)]) +static int lmqtt_unsubscribe( lua_State* L ) +{ + // mqtt_client * client = get_client( L ); + + return 0; +} + +static int lmqtt_delete( lua_State* L ) +{ + mqtt_settings * settings = get_settings( L ); + if( settings != NULL ) + free( settings ); + + mqtt_client * client = get_client( L ); + if( client != NULL ) + { + NODE_DBG("stopping MQTT client id %s\n", client->settings->client_id); + mqtt_destroy( client ); + free( client ); + } + return 0; +} + +// Lua: mqtt.Client(clientid, keepalive[, username, password, cleansession]) +static int lmqtt_new( lua_State* L ) +{ + const char * clientid = NULL; + + clientid = luaL_checkstring( L, 1 ); + NODE_DBG("MQTT client id %s\n", clientid); + + mqtt_settings * settings = (mqtt_settings *) malloc( sizeof(mqtt_settings) ); + memset(settings, 0, sizeof(mqtt_settings) ); + + strncpy(settings->client_id, clientid, CONFIG_MQTT_MAX_CLIENT_LEN); + settings->keepalive = luaL_checkinteger( L, 2 ); + + int n = 2; + if( lua_isstring(L, 3) ) + { + strncpy( settings->username, luaL_checkstring( L, 3 ), CONFIG_MQTT_MAX_USERNAME_LEN); + n++; + } + + if( lua_isstring(L, 4) ) + { + strncpy(settings->password, luaL_checkstring( L, 4 ), CONFIG_MQTT_MAX_PASSWORD_LEN); + n++; + } + + if( lua_isnumber(L, 5) ) + { + settings->clean_session = luaL_checknumber( L, 5 ); + n++; + } + lua_pop( L, n ); //remove parameters + + lua_newtable( L ); + NODE_DBG("New MQTT table at stack pos %d\n", lua_gettop(L)); + + lua_pushlightuserdata( L, settings ); + lua_setfield( L, -2, "_settings" ); // set t["_mqtt"] = client + + lua_pushcfunction( L, lmqtt_connect ); + lua_setfield( L, -2, "connect" ); // set t["connect"] = lmqtt_connect + + lua_pushcfunction( L, lmqtt_close ); + lua_setfield( L, -2, "close" ); // set t["close"] = lmqtt_close + + lua_pushcfunction( L, lmqtt_lwt ); + lua_setfield( L, -2, "lwt" ); // set t["lwt"] = lmqtt_lwt + + lua_pushcfunction( L, lmqtt_publish ); + lua_setfield( L, -2, "publish" ); // set t["publish"] = lmqtt_publish + + lua_pushcfunction( L, lmqtt_subscribe ); + lua_setfield( L, -2, "subscribe" ); // set t["subscribe"] = lmqtt_subscribe + + lua_pushcfunction( L, lmqtt_unsubscribe ); + lua_setfield( L, -2, "unsubscribe" ); // set t["unsubscribe"] = lmqtt_unsubscribe + + lua_pushcfunction( L, lmqtt_on ); + lua_setfield( L, -2, "on" ); // set t["on"] = lmqtt_on + + lua_pushcfunction( L, lmqtt_delete ); + lua_setfield( L, -2, "__gc" ); // set t["__gc"] = lmqtt_delete + + lua_pushvalue( L, 1 ); //make a copy of the table + lua_setmetatable( L, -2 ); + + char id[32]; + snprintf( id, 32, "mqtt_%p", settings ); + NODE_DBG("Store MQTT table in _G stack pos %d\n", lua_gettop(L)); + lua_pushvalue( L, 1 ); //make a copy of the table + lua_setglobal( L, id); + + hConn = task_get_id(lmqtt_connected_cb); + hOff = task_get_id(lmqtt_disconnected_cb); + hPub = task_get_id(lmqtt_publish_cb); + hSub = task_get_id(lmqtt_subscribe_cb); + hData = task_get_id(lmqtt_data_cb); + + NODE_DBG("conn %d, off %d, pub %d, sub %d, data %d\n", hConn, hOff, hPub, hSub, hData); + return 1; //leave table on top of the stack +} + + +// Module function map +static const LUA_REG_TYPE mqtt_map[] = { + { LSTRKEY( "Client" ), LFUNCVAL( lmqtt_new ) }, + { LNILKEY, LNILVAL } +}; + +NODEMCU_MODULE(MQTT, "mqtt", mqtt_map, NULL); From 33232b501dc69cd96d4c0607401697d13bc6b593 Mon Sep 17 00:00:00 2001 From: Andrew Gough Date: Mon, 19 Mar 2018 19:31:38 +1100 Subject: [PATCH 2/5] Upgrade to espmqttc head --- components/espmqtt | 2 +- components/modules/mqtt.c | 439 +++++++++++++++++++++++--------------- 2 files changed, 266 insertions(+), 175 deletions(-) diff --git a/components/espmqtt b/components/espmqtt index 2967332b..55f04a8e 160000 --- a/components/espmqtt +++ b/components/espmqtt @@ -1 +1 @@ -Subproject commit 2967332b95454d4b53068a0d5484ae60e312eb12 +Subproject commit 55f04a8e61a0e3bacf2b40518d9d522e9b79e615 diff --git a/components/modules/mqtt.c b/components/modules/mqtt.c index 3b2d8c48..9ab4518a 100644 --- a/components/modules/mqtt.c +++ b/components/modules/mqtt.c @@ -5,30 +5,21 @@ #include "platform.h" #include "task/task.h" -#include "mqtt.h" +#include "mqtt_client.h" #include task_handle_t hConn; task_handle_t hOff; task_handle_t hPub; task_handle_t hSub; +task_handle_t hUnsub; task_handle_t hData; -//used as a holder to copy received data from the MQTT task -//to the main Lua task. Due to the async nature of the tasks -//we must copy the data to deliver it intact to the Lua -//callback -typedef struct lmqtt_ctx -{ - mqtt_client * client; - char topic[CONFIG_MQTT_MAX_LWT_TOPIC]; - char * data; -} lmqtt_ctx_t; - +// ------------------------------------------------------------------------- // // locate the C mqtt_client pointer and leave the // Lua instance on the top of the stack -static mqtt_client * get_client( lua_State * L ) +static esp_mqtt_client_handle_t get_client( lua_State * L ) { if( !lua_istable( L, 1 ) ) { @@ -43,14 +34,14 @@ static mqtt_client * get_client( lua_State * L ) return 0; //never reached } - mqtt_client * client = (mqtt_client *) lua_touserdata( L, -1 ); + esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) lua_touserdata( L, -1 ); lua_pop( L, 1 ); // just pop the _mqtt field return client; } // locate the C mqtt_settings pointer and leave the // Lua instance on the top of the stack -static mqtt_settings * get_settings( lua_State * L ) +static esp_mqtt_client_config_t * get_settings( lua_State * L ) { if( !lua_istable( L, 1 ) ) { @@ -65,61 +56,112 @@ static mqtt_settings * get_settings( lua_State * L ) return 0; //never reached } - mqtt_settings * settings = (mqtt_settings *) lua_touserdata( L, -1 ); + esp_mqtt_client_config_t * settings = (esp_mqtt_client_config_t *) lua_touserdata( L, -1 ); lua_pop( L, 1 ); // just pop the _mqtt field return settings; } -// Lua: on() -static int lmqtt_on(lua_State *L) +// ------------------------------------------------------------------------- // + +static esp_mqtt_event_handle_t event_clone(esp_mqtt_event_handle_t ev) { - enum events{ - ON_CONNECT = 0, - ON_MESSAGE = 1, - ON_OFFLINE = 2 - }; - const char *const eventnames[] = {"connect", "message", "offline", NULL}; + esp_mqtt_event_handle_t ev1 = (esp_mqtt_event_handle_t) malloc(sizeof(esp_mqtt_event_t)); + memset(ev1, 0, sizeof(esp_mqtt_event_t)); + NODE_DBG("event_clone():malloc: event %p, msg %d\n", ev, ev->msg_id); - // mqtt_settings * settings = get_settings( L ); - int event = luaL_checkoption(L, 2, "message", eventnames); + ev1->event_id = ev->event_id; + ev1->client = ev->client; + ev1->user_context = ev->user_context; + ev1->total_data_len = ev->total_data_len; + ev1->current_data_offset = ev->current_data_offset; + ev1->msg_id = ev->msg_id; - if( !lua_isfunction( L, 3 ) ) - return 0; - - switch (event) { - case ON_CONNECT: - lua_setfield(L, 1, "_on_connect"); - break; - case ON_MESSAGE: - lua_setfield(L, 1, "_on_message"); - break; - case ON_OFFLINE: - lua_setfield(L, 1, "_on_offline"); - break; - default: - return 0; + ev1->data_len = ev->data_len; + if( ev->data != NULL && ev->data_len > 0 ) + { + ev1->data = malloc(ev->data_len + 1); + memcpy(ev1->data, ev->data, ev->data_len); + ev1->data[ev1->data_len] = '\0'; + NODE_DBG("event_clone():malloc: event %p, msg %d, data %p, num %d\n", ev1, ev1->msg_id, ev1->data, ev1->data_len); } - lua_pop(L, 1); //pop event name - return 0; + ev1->topic_len = ev->topic_len; + if( ev->topic != NULL && ev->topic_len > 0 ) + { + ev1->topic = malloc(ev->topic_len + 1); + memcpy(ev1->topic, ev->topic, ev->topic_len); + ev1->topic[ev1->topic_len] = '\0'; + NODE_DBG("event_clone():malloc: event %p, msg %d, topic %p, num %d\n", ev1, ev1->msg_id, ev1->topic, ev1->topic_len); + } + return ev1; +} + +static void event_free(esp_mqtt_event_handle_t ev) +{ + if(ev->data != NULL) + { + NODE_DBG("event_free():free: event %p, msg %d, data %p\n", ev, ev->msg_id, ev->data); + free(ev->data); + } + if(ev->topic != NULL) + { + NODE_DBG("event_free():free: event %p, msg %d, topic %p\n", ev, ev->msg_id, ev->topic); + free(ev->topic); + } + free(ev); +} + +// ------------------------------------------------------------------------- // + +static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) +{ + switch (event->event_id) { + case MQTT_EVENT_CONNECTED: + task_post_medium(hConn, (task_param_t) event_clone(event)); + break; + + case MQTT_EVENT_DISCONNECTED: + task_post_medium( hOff, (task_param_t) event_clone(event)); + break; + + case MQTT_EVENT_SUBSCRIBED: + task_post_medium(hSub, (task_param_t) event_clone(event)); + break; + + case MQTT_EVENT_UNSUBSCRIBED: + task_post_medium(hUnsub, (task_param_t) event_clone(event)); + break; + + case MQTT_EVENT_PUBLISHED: + task_post_medium(hPub, (task_param_t) event_clone(event)); + break; + + case MQTT_EVENT_DATA: + task_post_medium(hData, (task_param_t) event_clone(event)); + break; + + case MQTT_EVENT_ERROR: + break; + } + return ESP_OK; } //typedef void (*task_callback_t)(task_param_t param, task_prio_t prio); -static void lmqtt_connected_cb(task_param_t param, task_prio_t prio) +static void _connected_cb(task_param_t param, task_prio_t prio) { lua_State * L = lua_getstate(); //returns main Lua state if( L == NULL ) return; - mqtt_client * client = (mqtt_client *) param; + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; int top = lua_gettop(L); lua_checkstack(L, 8); char key[64]; - snprintf(key, 64, "mqtt_%p", client->settings); + snprintf(key, 64, "mqtt_%p", event->client); lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:connect: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); + NODE_DBG("CB:connect: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); lua_getfield( L, -1, "_connect_ok" ); if( lua_isfunction( L, -1 ) ) @@ -152,29 +194,24 @@ static void lmqtt_connected_cb(task_param_t param, task_prio_t prio) NODE_DBG("CB:connect: Error when calling connect callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); } lua_settop(L, top); + event_free(event); } -static void connected_cb( mqtt_client *client, mqtt_event_data_t *event_data ) -{ - task_post_medium(hConn, (task_param_t) client); -} - - -static void lmqtt_disconnected_cb(task_param_t param, task_prio_t prio) +static void _disconnected_cb(task_param_t param, task_prio_t prio) { lua_State * L = lua_getstate(); //returns main Lua state if( L == NULL ) return; - mqtt_client * client = (mqtt_client *) param; + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; int top = lua_gettop(L); lua_checkstack(L, 8); char key[64]; - snprintf(key, 64, "mqtt_%p", client->settings); + snprintf(key, 64, "mqtt_%p", event->client); lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:disconnect: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); + NODE_DBG("CB:disconnect: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); lua_getfield( L, -1, "_connect_nok" ); if( lua_isfunction( L, -1 ) ) @@ -195,7 +232,10 @@ static void lmqtt_disconnected_cb(task_param_t param, task_prio_t prio) // now we check for the standard connect callback registered with 'mqtt:on()' lua_getfield( L, -1, "_on_offline" ); if( !lua_isfunction( L, -1 ) || lua_isnil( L, -1 ) ) + { + event_free(event); return; + } NODE_DBG("CB:disconnect: calling registered standard offline callback\n"); lua_pushvalue( L, -2 ); //dup mqtt table @@ -204,28 +244,24 @@ static void lmqtt_disconnected_cb(task_param_t param, task_prio_t prio) NODE_DBG("CB:disconnect: Error when calling offline callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); lua_settop(L, top); + event_free(event); } -static void disconnected_cb( mqtt_client *client, mqtt_event_data_t *event_data ) -{ - task_post_medium( hOff, (task_param_t) client); -} - -static void lmqtt_subscribe_cb(task_param_t param, task_prio_t prio) +static void _subscribe_cb(task_param_t param, task_prio_t prio) { lua_State * L = lua_getstate(); //returns main Lua state if( L == NULL ) return; - mqtt_client * client = (mqtt_client *) param; + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; int top = lua_gettop(L); lua_checkstack(L, 8); char key[64]; - snprintf(key, 64, "mqtt_%p", client->settings); + snprintf(key, 64, "mqtt_%p", event->client); lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:subscribe: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); + NODE_DBG("CB:subscribe: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); lua_getfield( L, 1, "_subscribe_ok" ); if( lua_isfunction( L, -1 ) ) @@ -240,29 +276,25 @@ static void lmqtt_subscribe_cb(task_param_t param, task_prio_t prio) lua_setfield(L, 1, "_subscribe_ok"); } lua_settop(L, top); + event_free(event); } -static void subscribe_cb( mqtt_client *client, mqtt_event_data_t *event_data ) -{ - task_post_medium(hSub, (task_param_t) client); -} - -static void lmqtt_publish_cb(task_param_t param, task_prio_t prio) +static void _publish_cb(task_param_t param, task_prio_t prio) { NODE_DBG("CB:publish: successfully transferred control back to main task\n"); lua_State * L = lua_getstate(); //returns main Lua state if( L == NULL ) return; - mqtt_client * client = (mqtt_client *) param; + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; int top = lua_gettop(L); lua_checkstack(L, 8); char key[64]; - snprintf(key, 64, "mqtt_%p", client->settings); + snprintf(key, 64, "mqtt_%p", event->client); lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:publish: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); + NODE_DBG("CB:publish: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); lua_getfield( L, 1, "_publish_ok" ); if( lua_isfunction( L, -1 ) ) @@ -277,39 +309,65 @@ static void lmqtt_publish_cb(task_param_t param, task_prio_t prio) lua_setfield(L, 1, "_publish_ok"); } lua_settop(L, top); + event_free(event); } -static void publish_cb( mqtt_client *client, mqtt_event_data_t *event_data ) -{ - NODE_DBG("CB:publish: transferring control back to main task\n"); - task_post_medium(hPub, (task_param_t) client); -} - -static void lmqtt_data_cb(task_param_t param, task_prio_t prio) +static void _unsubscribe_cb(task_param_t param, task_prio_t prio) { lua_State * L = lua_getstate(); //returns main Lua state if( L == NULL ) return; - lmqtt_ctx_t * ctx = (lmqtt_ctx_t *) param; + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; + + int top = lua_gettop(L); + + char key[64]; + snprintf(key, 64, "mqtt_%p", event->client); + lua_getglobal( L, key ); //retrieve MQTT table from _G + NODE_DBG("CB:subscribe: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); + + lua_getfield( L, 1, "_unsubscribe_ok" ); + if( lua_isfunction( L, -1 ) ) + { + NODE_DBG("CB:unsubscribe: calling registered one-shot unsubscribe callback\n"); + lua_pushvalue( L, 1 ); //dup mqtt table + int res = lua_pcall( L, 1, 0, 0 ); //call the disconnect callback + if( res != 0 ) + NODE_DBG("CB:unsubscribe: Error when calling one-shot unsubscribe callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); + + lua_pushnil(L); + lua_setfield(L, 1, "_unsubscribe_ok"); + } + lua_settop(L, top); + event_free(event); +} + +static void _data_cb(task_param_t param, task_prio_t prio) +{ + lua_State * L = lua_getstate(); //returns main Lua state + if( L == NULL ) + return; + + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; int top = lua_gettop(L); lua_checkstack(L, 8); char key[64]; - snprintf(key, 64, "mqtt_%p", ctx->client->settings); + snprintf(key, 64, "mqtt_%p", event->client); lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:data: state %p, settings %p, stack top %d\n", L, ctx->client->settings, lua_gettop(L)); + NODE_DBG("CB:data: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); lua_getfield( L, 1, "_on_message" ); if( lua_isfunction( L, -1 ) ) { int numArg = 2; lua_pushvalue( L, 1 ); //dup mqtt table - lua_pushstring( L, ctx->topic ); - if( ctx->data != NULL ) + lua_pushstring( L, event->topic ); + if( event->data != NULL ) { - lua_pushstring( L, ctx->data ); + lua_pushstring( L, event->data ); numArg++; } @@ -319,41 +377,58 @@ static void lmqtt_data_cb(task_param_t param, task_prio_t prio) } lua_settop(L, top); - if( ctx->data != NULL ) - free( ctx->data ); + event_free(event); } -static void data_cb( mqtt_client *client, mqtt_event_data_t *event_data ) +// ------------------------------------------------------------------------- // +// ------------------------------------------------------------------------- // + +// Lua: on() +static int mqtt_on(lua_State *L) { - NODE_DBG("CB:data: topic len %d, data len %d\n", event_data->topic_length, event_data->data_length); + enum events{ + ON_CONNECT = 0, + ON_MESSAGE = 1, + ON_OFFLINE = 2 + }; + const char *const eventnames[] = {"connect", "message", "offline", NULL}; - lmqtt_ctx_t * ctx = malloc(sizeof(lmqtt_ctx_t)); - ctx->client = client; - strncpy(ctx->topic, event_data->topic, event_data->topic_length); - ctx->topic[event_data->topic_length] = '\0'; + // mqtt_settings * settings = get_settings( L ); + int event = luaL_checkoption(L, 2, "message", eventnames); - ctx->data = NULL; - if( event_data->data_length > 0 ) - { - ctx->data = malloc( event_data->data_length ); - strncpy( ctx->data, event_data->data, event_data->data_length ); - ctx->data[event_data->data_length] = '\0'; + if( !lua_isfunction( L, 3 ) ) + return 0; + + switch (event) { + case ON_CONNECT: + lua_setfield(L, 1, "_on_connect"); + break; + case ON_MESSAGE: + lua_setfield(L, 1, "_on_message"); + break; + case ON_OFFLINE: + lua_setfield(L, 1, "_on_offline"); + break; + default: + return 0; } - task_post_medium(hData, (task_param_t) ctx); + lua_pop(L, 1); //pop event name + return 0; } // Lua: mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)[, function(client, reason)]]) -static int lmqtt_connect( lua_State* L ) +static int mqtt_connect( lua_State* L ) { - mqtt_settings * settings = get_settings( L ); + esp_mqtt_client_config_t * mqtt_cfg = get_settings( L ); - // int secure = 0; + int secure = 0; int reconnect = 0; const char * host = luaL_checkstring( L, 2 ); int port = 1883; int n = 3; + if( lua_isnumber( L, n ) ) { port = luaL_checknumber( L, n ); @@ -362,7 +437,7 @@ static int lmqtt_connect( lua_State* L ) if( lua_isnumber( L, n ) ) { - // secure = !!luaL_checkinteger( L, -4 ); + secure = !!luaL_checkinteger( L, n ); n++; } @@ -388,42 +463,44 @@ static int lmqtt_connect( lua_State* L ) lua_pop( L, n - 2 ); //pop parameters - strncpy(settings->host, host, CONFIG_MQTT_MAX_HOST_LEN ); - settings->port = port; + strncpy(mqtt_cfg->host, host, MQTT_MAX_HOST_LEN ); + mqtt_cfg->port = port; - settings->auto_reconnect = reconnect != 0; + mqtt_cfg->disable_auto_reconnect = (reconnect == 0); + mqtt_cfg->transport = secure ? MQTT_TRANSPORT_OVER_SSL : MQTT_TRANSPORT_OVER_TCP; - settings->connected_cb = connected_cb; - settings->disconnected_cb = disconnected_cb; - settings->subscribe_cb = subscribe_cb; - settings->publish_cb = publish_cb; - settings->data_cb = data_cb; - - mqtt_client * client = mqtt_start( settings ); + esp_mqtt_client_handle_t client = esp_mqtt_client_init(mqtt_cfg); if( client == NULL ) { luaL_error( L, "MQTT library failed to start" ); return 0; } - NODE_DBG("Created MQTT client @ %p, settings @ %p, state @ %p, top %d \n", client, settings, L, lua_gettop( L ) ); + esp_mqtt_client_start(client); + lua_pushlightuserdata( L, client ); lua_setfield( L, -2, "_client" ); //and store a reference in the MQTT table + char id[32]; + snprintf( id, 32, "mqtt_%p", client); + NODE_DBG("Store MQTT table in _G stack pos %d\n", lua_gettop(L)); + lua_pushvalue( L, 1 ); //make a copy of the table + lua_setglobal( L, id); + return 0; } // Lua: mqtt:close() -static int lmqtt_close( lua_State* L ) +static int mqtt_close( lua_State* L ) { - mqtt_client * client = get_client( L ); + esp_mqtt_client_handle_t client = get_client( L ); if( client == NULL ) return 0; NODE_DBG("Closing MQTT client %p\n", client); char id[64]; - snprintf(id, 64, "mqtt_%p", client->settings); + snprintf(id, 64, "mqtt_%p", client); lua_pushnil( L ); lua_setglobal( L, id ); // remove global reference @@ -435,36 +512,37 @@ static int lmqtt_close( lua_State* L ) } // Lua: mqtt:lwt(topic, message[, qos[, retain]]) -static int lmqtt_lwt( lua_State* L ) +static int mqtt_lwt( lua_State* L ) { - mqtt_settings * settings = get_settings( L ); + esp_mqtt_client_config_t * mqtt_cfg = get_settings( L ); - strncpy( settings->lwt_topic, luaL_checkstring( L, 2 ), CONFIG_MQTT_MAX_LWT_TOPIC ); - strncpy( settings->lwt_msg, luaL_checkstring( L, 3 ), CONFIG_MQTT_MAX_LWT_MSG ); - settings->lwt_msg_len = strlen( settings->lwt_msg ); + strncpy( mqtt_cfg->lwt_topic, luaL_checkstring( L, 2 ), MQTT_MAX_LWT_TOPIC ); + strncpy( mqtt_cfg->lwt_msg, luaL_checkstring( L, 3 ), MQTT_MAX_LWT_MSG ); + mqtt_cfg->lwt_msg_len = strlen( mqtt_cfg->lwt_msg ); int n = 4; if( lua_isnumber( L, n ) ) { - settings->lwt_qos = lua_tonumber( L, n ); + mqtt_cfg->lwt_qos = lua_tonumber( L, n ); n++; } if( lua_isnumber( L, n ) ) { - settings->lwt_retain = lua_tonumber( L, n ); + mqtt_cfg->lwt_retain = lua_tonumber( L, n ); n++; } lua_pop( L, n ); - NODE_DBG("Set LWT topic '%s', qos %d, retain %d, len %d\n", settings->lwt_topic, settings->lwt_qos, settings->lwt_retain, settings->lwt_msg_len); + NODE_DBG("Set LWT topic '%s', qos %d, retain %d, len %d\n", + mqtt_cfg->lwt_topic, mqtt_cfg->lwt_qos, mqtt_cfg->lwt_retain, mqtt_cfg->lwt_msg_len); return 0; } //Lua: mqtt:publish(topic, payload, qos, retain[, function(client)]) -static int lmqtt_publish( lua_State * L ) +static int mqtt_publish( lua_State * L ) { - mqtt_client * client = get_client( L ); + esp_mqtt_client_handle_t client = get_client( L ); int top = lua_gettop(L); @@ -482,15 +560,15 @@ static int lmqtt_publish( lua_State * L ) } lua_settop(L, top ); - NODE_DBG("MQTT publish client id %s, topic %s, %d bytes\n", client->settings->client_id, topic, strlen(data)); - mqtt_publish(client, topic, data, strlen(data), qos, retain); + NODE_DBG("MQTT publish client %p, topic %s, %d bytes\n", client, topic, strlen(data)); + esp_mqtt_client_publish(client, topic, data, strlen(data), qos, retain); return 0; } // Lua: mqtt:subscribe(topic, qos[, function(client)]) OR mqtt:subscribe(table[, function(client)]) -static int lmqtt_subscribe( lua_State* L ) +static int mqtt_subscribe( lua_State* L ) { - mqtt_client * client = get_client( L ); + esp_mqtt_client_handle_t client = get_client( L ); int top = lua_gettop(L); @@ -504,65 +582,82 @@ static int lmqtt_subscribe( lua_State* L ) } lua_settop(L, top ); - NODE_DBG("MQTT subscribe client id %s, topic %s\n", client->settings->client_id, topic); - mqtt_subscribe(client, topic, qos); + NODE_DBG("MQTT subscribe client %p, topic %s\n", client, topic); + esp_mqtt_client_subscribe(client, topic, qos); return 0; } // Lua: mqtt:unsubscribe(topic[, function(client)]) OR mqtt:unsubscribe(table[, function(client)]) -static int lmqtt_unsubscribe( lua_State* L ) +static int mqtt_unsubscribe( lua_State* L ) { - // mqtt_client * client = get_client( L ); + esp_mqtt_client_handle_t client = get_client( L ); + + int top = lua_gettop(L); + + const char * topic = luaL_checkstring( L, 2 ); + int n = 3; + if( lua_isfunction( L, n ) ) + { + lua_pushvalue( L, n ); + lua_setfield(L, 1, "_unsubscribe_ok"); // set _G["_cb_connect_nok"] = fn() + n++; + } + + lua_settop(L, top ); + NODE_DBG("MQTT unsubscribe client %p, topic %s\n", client, topic); + esp_mqtt_client_unsubscribe(client, topic); + return 0; return 0; } -static int lmqtt_delete( lua_State* L ) +static int mqtt_delete( lua_State* L ) { - mqtt_settings * settings = get_settings( L ); + esp_mqtt_client_config_t * settings = get_settings( L ); if( settings != NULL ) free( settings ); - mqtt_client * client = get_client( L ); + esp_mqtt_client_handle_t client = get_client( L ); if( client != NULL ) { - NODE_DBG("stopping MQTT client id %s\n", client->settings->client_id); - mqtt_destroy( client ); + NODE_DBG("stopping MQTT client %p\n", client); + esp_mqtt_client_destroy( client ); free( client ); } return 0; } // Lua: mqtt.Client(clientid, keepalive[, username, password, cleansession]) -static int lmqtt_new( lua_State* L ) +static int mqtt_new( lua_State* L ) { const char * clientid = NULL; - clientid = luaL_checkstring( L, 1 ); NODE_DBG("MQTT client id %s\n", clientid); - mqtt_settings * settings = (mqtt_settings *) malloc( sizeof(mqtt_settings) ); - memset(settings, 0, sizeof(mqtt_settings) ); + esp_mqtt_client_config_t * mqtt_cfg = (esp_mqtt_client_config_t *) malloc(sizeof(esp_mqtt_client_config_t)); + memset(mqtt_cfg, 0, sizeof(esp_mqtt_client_config_t)); - strncpy(settings->client_id, clientid, CONFIG_MQTT_MAX_CLIENT_LEN); - settings->keepalive = luaL_checkinteger( L, 2 ); + mqtt_cfg->event_handle = mqtt_event_handler; + + strncpy(mqtt_cfg->client_id, clientid, MQTT_MAX_CLIENT_LEN); + mqtt_cfg->keepalive = luaL_checkinteger( L, 2 ); int n = 2; if( lua_isstring(L, 3) ) { - strncpy( settings->username, luaL_checkstring( L, 3 ), CONFIG_MQTT_MAX_USERNAME_LEN); + strncpy( mqtt_cfg->username, luaL_checkstring( L, 3 ), MQTT_MAX_USERNAME_LEN); n++; } if( lua_isstring(L, 4) ) { - strncpy(settings->password, luaL_checkstring( L, 4 ), CONFIG_MQTT_MAX_PASSWORD_LEN); + strncpy(mqtt_cfg->password, luaL_checkstring( L, 4 ), MQTT_MAX_PASSWORD_LEN); n++; } if( lua_isnumber(L, 5) ) { - settings->clean_session = luaL_checknumber( L, 5 ); + mqtt_cfg->disable_clean_session = (luaL_checknumber( L, 5 ) == 0); n++; } lua_pop( L, n ); //remove parameters @@ -570,47 +665,42 @@ static int lmqtt_new( lua_State* L ) lua_newtable( L ); NODE_DBG("New MQTT table at stack pos %d\n", lua_gettop(L)); - lua_pushlightuserdata( L, settings ); + lua_pushlightuserdata( L, mqtt_cfg ); lua_setfield( L, -2, "_settings" ); // set t["_mqtt"] = client - lua_pushcfunction( L, lmqtt_connect ); + lua_pushcfunction( L, mqtt_connect ); lua_setfield( L, -2, "connect" ); // set t["connect"] = lmqtt_connect - lua_pushcfunction( L, lmqtt_close ); + lua_pushcfunction( L, mqtt_close ); lua_setfield( L, -2, "close" ); // set t["close"] = lmqtt_close - lua_pushcfunction( L, lmqtt_lwt ); + lua_pushcfunction( L, mqtt_lwt ); lua_setfield( L, -2, "lwt" ); // set t["lwt"] = lmqtt_lwt - lua_pushcfunction( L, lmqtt_publish ); + lua_pushcfunction( L, mqtt_publish ); lua_setfield( L, -2, "publish" ); // set t["publish"] = lmqtt_publish - lua_pushcfunction( L, lmqtt_subscribe ); + lua_pushcfunction( L, mqtt_subscribe ); lua_setfield( L, -2, "subscribe" ); // set t["subscribe"] = lmqtt_subscribe - lua_pushcfunction( L, lmqtt_unsubscribe ); + lua_pushcfunction( L, mqtt_unsubscribe ); lua_setfield( L, -2, "unsubscribe" ); // set t["unsubscribe"] = lmqtt_unsubscribe - lua_pushcfunction( L, lmqtt_on ); + lua_pushcfunction( L, mqtt_on ); lua_setfield( L, -2, "on" ); // set t["on"] = lmqtt_on - lua_pushcfunction( L, lmqtt_delete ); + lua_pushcfunction( L, mqtt_delete ); lua_setfield( L, -2, "__gc" ); // set t["__gc"] = lmqtt_delete lua_pushvalue( L, 1 ); //make a copy of the table lua_setmetatable( L, -2 ); - char id[32]; - snprintf( id, 32, "mqtt_%p", settings ); - NODE_DBG("Store MQTT table in _G stack pos %d\n", lua_gettop(L)); - lua_pushvalue( L, 1 ); //make a copy of the table - lua_setglobal( L, id); - - hConn = task_get_id(lmqtt_connected_cb); - hOff = task_get_id(lmqtt_disconnected_cb); - hPub = task_get_id(lmqtt_publish_cb); - hSub = task_get_id(lmqtt_subscribe_cb); - hData = task_get_id(lmqtt_data_cb); + hConn = task_get_id(_connected_cb); + hOff = task_get_id(_disconnected_cb); + hPub = task_get_id(_publish_cb); + hSub = task_get_id(_subscribe_cb); + hUnsub = task_get_id(_unsubscribe_cb); + hData = task_get_id(_data_cb); NODE_DBG("conn %d, off %d, pub %d, sub %d, data %d\n", hConn, hOff, hPub, hSub, hData); return 1; //leave table on top of the stack @@ -619,8 +709,9 @@ static int lmqtt_new( lua_State* L ) // Module function map static const LUA_REG_TYPE mqtt_map[] = { - { LSTRKEY( "Client" ), LFUNCVAL( lmqtt_new ) }, + { LSTRKEY( "Client" ), LFUNCVAL( mqtt_new ) }, { LNILKEY, LNILVAL } }; NODEMCU_MODULE(MQTT, "mqtt", mqtt_map, NULL); + From ba57b0fa7e538551b7200277865ae57631fac2da Mon Sep 17 00:00:00 2001 From: Andrew Gough Date: Mon, 19 Mar 2018 21:06:58 +1100 Subject: [PATCH 3/5] Add slightly modified version of standard MQTT docs. Note that in most cases the API is identical [see note in unsubscribe()] --- docs/en/modules/mqtt.md | 266 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 docs/en/modules/mqtt.md diff --git a/docs/en/modules/mqtt.md b/docs/en/modules/mqtt.md new file mode 100644 index 00000000..ee3bb6a4 --- /dev/null +++ b/docs/en/modules/mqtt.md @@ -0,0 +1,266 @@ +# MQTT Module +| Since | Origin / Contributor | Maintainer | Source | +| :----- | :-------------------- | :---------- | :------ | +| 2015-01-23 | [Stephen Robinson](https://github.com/esar/contiki-mqtt), [Tuan PM](https://github.com/tuanpmt/esp_mqtt) | [Vowstar](https://github.com/vowstar) | [mqtt.c](../../../app/modules/mqtt.c)| + + +The client adheres to version 3.1.1 of the [MQTT](https://en.wikipedia.org/wiki/MQTT) protocol. Make sure that your broker supports and is correctly configured for version 3.1.1. The client is backwards incompatible with brokers running MQTT 3.1. + +## mqtt.Client() + +Creates a MQTT client. + +#### Syntax +`mqtt.Client(clientid, keepalive[, username, password, cleansession])` + +#### Parameters +- `clientid` client ID +- `keepalive` keepalive seconds +- `username` user name +- `password` user password +- `cleansession` 0/1 for `false`/`true`. Default is 1 (`true`). + +#### Returns +MQTT client + +#### Example +```lua +-- init mqtt client without logins, keepalive timer 120s +m = mqtt.Client("clientid", 120) + +-- init mqtt client with logins, keepalive timer 120sec +m = mqtt.Client("clientid", 120, "user", "password") + +-- setup Last Will and Testament (optional) +-- Broker will publish a message with qos = 0, retain = 0, data = "offline" +-- to topic "/lwt" if client don't send keepalive packet +m:lwt("/lwt", "offline", 0, 0) + +m:on("connect", function(client) print ("connected") end) +m:on("offline", function(client) print ("offline") end) + +-- on publish message receive event +m:on("message", function(client, topic, data) + print(topic .. ":" ) + if data ~= nil then + print(data) + end +end) + +-- for TLS: m:connect("192.168.11.118", secure-port, 1) +m:connect("192.168.11.118", 1883, 0, function(client) + print("connected") + -- Calling subscribe/publish only makes sense once the connection + -- was successfully established. You can do that either here in the + -- 'connect' callback or you need to otherwise make sure the + -- connection was established (e.g. tracking connection status or in + -- m:on("connect", function)). + + -- subscribe topic with qos = 0 + client:subscribe("/topic", 0, function(client) print("subscribe success") end) + -- publish a message with data = hello, QoS = 0, retain = 0 + client:publish("/topic", "hello", 0, 0, function(client) print("sent") end) +end, +function(client, reason) + print("failed reason: " .. reason) +end) + +m:close(); +-- you can call m:connect again +``` + +# MQTT Client + + +## mqtt.client:close() + +Closes connection to the broker. + +#### Syntax +`mqtt:close()` + +#### Parameters +none + +#### Returns +`true` on success, `false` otherwise + +## mqtt.client:connect() + +Connects to the broker specified by the given host, port, and secure options. + +#### Syntax +`mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)[, function(client, reason)]])` + +#### Parameters +- `host` host, domain or IP (string) +- `port` broker port (number), default 1883 +- `secure` 0/1 for `false`/`true`, default 0. Take note of constraints documented in the [net module](net.md). +- `autoreconnect` 0/1 for `false`/`true`, default 0. This option is *deprecated*. +- `function(client)` callback function for when the connection was established +- `function(client, reason)` callback function for when the connection could not be established. No further callbacks should be called. + +#### Returns +`true` on success, `false` otherwise + +#### Notes + +Don't use `autoreconnect`. Let me repeat that, don't use `autoreconnect`. You should handle the errors explicitly and appropriately for +your application. In particular, the default for `cleansession` above is `true`, so all subscriptions are destroyed when the connection +is lost for any reason. + +In order to acheive a consistent connection, handle errors in the error callback. For example: + +``` +function handle_mqtt_error(client, reason) + tmr.create():alarm(10 * 1000, tmr.ALARM_SINGLE, do_mqtt_connect) +end + +function do_mqtt_connect() + mqtt:connect("server", function(client) print("connected") end, handle_mqtt_error) +end +``` + +In reality, the connected function should do something useful! + +This is the description of how the `autoreconnect` functionality may (or may not) work. + +> When `autoreconnect` is set, then the connection will be re-established when it breaks. No error indication will be given (but all the +> subscriptions may be lost if `cleansession` is true). However, if the +> very first connection fails, then no reconnect attempt is made, and the error is signalled through the callback (if any). The first connection +> is considered a success if the client connects to a server and gets back a good response packet in response to its MQTT connection request. +> This implies (for example) that the username and password are correct. + +#### Connection failure callback reason codes: + +| Constant | Value | Description | +|----------|-------|-------------| +|`mqtt.CONN_FAIL_SERVER_NOT_FOUND`|-5|There is no broker listening at the specified IP Address and Port| +|`mqtt.CONN_FAIL_NOT_A_CONNACK_MSG`|-4|The response from the broker was not a CONNACK as required by the protocol| +|`mqtt.CONN_FAIL_DNS`|-3|DNS Lookup failed| +|`mqtt.CONN_FAIL_TIMEOUT_RECEIVING`|-2|Timeout waiting for a CONNACK from the broker| +|`mqtt.CONN_FAIL_TIMEOUT_SENDING`|-1|Timeout trying to send the Connect message| +|`mqtt.CONNACK_ACCEPTED`|0|No errors. _Note: This will not trigger a failure callback._| +|`mqtt.CONNACK_REFUSED_PROTOCOL_VER`|1|The broker is not a 3.1.1 MQTT broker.| +|`mqtt.CONNACK_REFUSED_ID_REJECTED`|2|The specified ClientID was rejected by the broker. (See `mqtt.Client()`)| +|`mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE`|3|The server is unavailable.| +|`mqtt.CONNACK_REFUSED_BAD_USER_OR_PASS`|4|The broker refused the specified username or password.| +|`mqtt.CONNACK_REFUSED_NOT_AUTHORIZED`|5|The username is not authorized.| + +## mqtt.client:lwt() + +Setup [Last Will and Testament](http://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament) (optional). A broker will publish a message with qos = 0, retain = 0, data = "offline" to topic "/lwt" if client does not send keepalive packet. + +As the last will is sent to the broker when connecting, `lwt()` must be called BEFORE calling `connect()`.   + +The broker will publish a client's last will message once he NOTICES that the connection to the client is broken. The broker will notice this when: +  - The client fails to send a keepalive packet for as long as specified in `mqtt.Client()` +  - The tcp-connection is properly closed (without closing the mqtt-connection before) + - The broker tries to send data to the client and fails to do so, because the tcp-connection is not longer open. + +This means if you specified 120 as keepalive timer, just turn off the client device and the broker does not send any data to the client, the last will message will be published 120s after turning off the device. + +#### Syntax +`mqtt:lwt(topic, message[, qos[, retain]])` + +#### Parameters +- `topic` the topic to publish to (string) +- `message` the message to publish, (buffer or string) +- `qos` QoS level, default 0 +- `retain` retain flag, default 0 + +#### Returns +`nil` + +## mqtt.client:on() + +Registers a callback function for an event. + +#### Syntax +`mqtt:on(event, function(client[, topic[, message]]))` + +#### Parameters +- `event` can be "connect", "message" or "offline" +- `function(client[, topic[, message]])` callback function. The first parameter is the client. If event is "message", the 2nd and 3rd param are received topic and message (strings). + +#### Returns +`nil` + +## mqtt.client:publish() + +Publishes a message. + +#### Syntax +`mqtt:publish(topic, payload, qos, retain[, function(client)])` + +#### Parameters +- `topic` the topic to publish to ([topic string](http://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices)) +- `message` the message to publish, (buffer or string) +- `qos` QoS level +- `retain` retain flag +- `function(client)` optional callback fired when PUBACK received. NOTE: When calling publish() more than once, the last callback function defined will be called for ALL publish commands. + + +#### Returns +`true` on success, `false` otherwise + +## mqtt.client:subscribe() + +Subscribes to one or several topics. + +#### Syntax +`mqtt:subscribe(topic, qos[, function(client)])` +`mqtt:subscribe(table[, function(client)])` + +#### Parameters +- `topic` a [topic string](http://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices) +- `qos` QoS subscription level, default 0 +- `table` array of 'topic, qos' pairs to subscribe to +- `function(client)` optional callback fired when subscription(s) succeeded. NOTE: When calling subscribe() more than once, the last callback function defined will be called for ALL subscribe commands. + +#### Returns +`true` on success, `false` otherwise + +#### Example +```lua +-- subscribe topic with qos = 0 +m:subscribe("/topic",0, function(conn) print("subscribe success") end) + +-- or subscribe multiple topic (topic/0, qos = 0; topic/1, qos = 1; topic2 , qos = 2) +m:subscribe({["topic/0"]=0,["topic/1"]=1,topic2=2}, function(conn) print("subscribe success") end) +``` + +!!! caution + + Rather than calling `subscribe` multiple times you should use the multiple topics syntax shown in the above example if you want to subscribe to more than one topic at once. + +## mqtt.client:unsubscribe() + +Unsubscribes from one or several topics. + +#### Syntax +`mqtt:unsubscribe(topic[, function(client)])` +`mqtt:unsubscribe(table[, function(client)])` + +#### Parameters +- `topic` a [topic string](http://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices) +- `table` array of 'topic, anything' pairs to unsubscribe from +- `function(client)` optional callback fired when unsubscription(s) succeeded. NOTE: When calling unsubscribe() more than once, the last callback function defined will be called for ALL unsubscribe commands. + +!!! caution + + The `mqtt:unsubscribe(table,...)` function is unimplented at this time as + the underlying MQTT library does not natively support this model. You must + subscribe and unsubsribe from topic individually. + +#### Returns +`true` on success, `false` otherwise + +#### Example +```lua +-- unsubscribe topic +m:unsubscribe("/topic", function(conn) print("unsubscribe success") end) + +-- or unsubscribe multiple topic (topic/0; topic/1; topic2) +m:unsubscribe({["topic/0"]=0,["topic/1"]=0,topic2="anything"}, function(conn) print("unsubscribe success") end) +``` From 74fd5184ddb594ea0ef3c2762aeb16fe0c97d68a Mon Sep 17 00:00:00 2001 From: Ryan Hartlage Date: Sat, 14 Jul 2018 13:07:55 -0400 Subject: [PATCH 4/5] Use RO metatable for MQTT module; add link to docs in mkdocs.yml --- components/modules/mqtt.c | 51 +++++++++++++++++---------------------- mkdocs.yml | 1 + 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/components/modules/mqtt.c b/components/modules/mqtt.c index 9ab4518a..26abb637 100644 --- a/components/modules/mqtt.c +++ b/components/modules/mqtt.c @@ -147,7 +147,7 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) } //typedef void (*task_callback_t)(task_param_t param, task_prio_t prio); -static void _connected_cb(task_param_t param, task_prio_t prio) +static void _connected_cb(task_param_t param, task_prio_t prio) { lua_State * L = lua_getstate(); //returns main Lua state if( L == NULL ) @@ -534,13 +534,13 @@ static int mqtt_lwt( lua_State* L ) } lua_pop( L, n ); - NODE_DBG("Set LWT topic '%s', qos %d, retain %d, len %d\n", + NODE_DBG("Set LWT topic '%s', qos %d, retain %d, len %d\n", mqtt_cfg->lwt_topic, mqtt_cfg->lwt_qos, mqtt_cfg->lwt_retain, mqtt_cfg->lwt_msg_len); return 0; } //Lua: mqtt:publish(topic, payload, qos, retain[, function(client)]) -static int mqtt_publish( lua_State * L ) +static int mqtt_publish( lua_State * L ) { esp_mqtt_client_handle_t client = get_client( L ); @@ -668,31 +668,7 @@ static int mqtt_new( lua_State* L ) lua_pushlightuserdata( L, mqtt_cfg ); lua_setfield( L, -2, "_settings" ); // set t["_mqtt"] = client - lua_pushcfunction( L, mqtt_connect ); - lua_setfield( L, -2, "connect" ); // set t["connect"] = lmqtt_connect - - lua_pushcfunction( L, mqtt_close ); - lua_setfield( L, -2, "close" ); // set t["close"] = lmqtt_close - - lua_pushcfunction( L, mqtt_lwt ); - lua_setfield( L, -2, "lwt" ); // set t["lwt"] = lmqtt_lwt - - lua_pushcfunction( L, mqtt_publish ); - lua_setfield( L, -2, "publish" ); // set t["publish"] = lmqtt_publish - - lua_pushcfunction( L, mqtt_subscribe ); - lua_setfield( L, -2, "subscribe" ); // set t["subscribe"] = lmqtt_subscribe - - lua_pushcfunction( L, mqtt_unsubscribe ); - lua_setfield( L, -2, "unsubscribe" ); // set t["unsubscribe"] = lmqtt_unsubscribe - - lua_pushcfunction( L, mqtt_on ); - lua_setfield( L, -2, "on" ); // set t["on"] = lmqtt_on - - lua_pushcfunction( L, mqtt_delete ); - lua_setfield( L, -2, "__gc" ); // set t["__gc"] = lmqtt_delete - - lua_pushvalue( L, 1 ); //make a copy of the table + luaL_getmetatable( L, "mqtt.mt" ); lua_setmetatable( L, -2 ); hConn = task_get_id(_connected_cb); @@ -706,6 +682,19 @@ static int mqtt_new( lua_State* L ) return 1; //leave table on top of the stack } +static const LUA_REG_TYPE mqtt_metatable_map[] = +{ + { LSTRKEY( "connect" ), LFUNCVAL( mqtt_connect )}, + { LSTRKEY( "close" ), LFUNCVAL( mqtt_close )}, + { LSTRKEY( "lwt" ), LFUNCVAL( mqtt_lwt )}, + { LSTRKEY( "publish" ), LFUNCVAL( mqtt_publish )}, + { LSTRKEY( "subscribe" ), LFUNCVAL( mqtt_subscribe )}, + { LSTRKEY( "unsubscribe" ), LFUNCVAL( mqtt_unsubscribe )}, + { LSTRKEY( "on" ), LFUNCVAL( mqtt_on )}, + { LSTRKEY( "__gc" ), LFUNCVAL( mqtt_delete )}, + { LSTRKEY( "__index" ), LROVAL( mqtt_metatable_map )}, + { LNILKEY, LNILVAL} +}; // Module function map static const LUA_REG_TYPE mqtt_map[] = { @@ -713,5 +702,9 @@ static const LUA_REG_TYPE mqtt_map[] = { { LNILKEY, LNILVAL } }; -NODEMCU_MODULE(MQTT, "mqtt", mqtt_map, NULL); +int luaopen_mqtt(lua_State *L) { + luaL_rometatable(L, "mqtt.mt", (void *)mqtt_metatable_map); // create metatable for mqtt + return 0; +} +NODEMCU_MODULE(MQTT, "mqtt", mqtt_map, luaopen_mqtt); diff --git a/mkdocs.yml b/mkdocs.yml index 14f83f26..23843e0f 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -41,6 +41,7 @@ pages: - 'gpio': 'en/modules/gpio.md' - 'i2c': 'en/modules/i2c.md' - 'ledc': 'en/modules/ledc.md' + - 'mqtt': 'en/modules/mqtt.md' - 'net': 'en/modules/net.md' - 'node': 'en/modules/node.md' - 'ow (1-Wire)': 'en/modules/ow.md' From c8e3a11a00f5c9f61bfedea897fef7164086e6c0 Mon Sep 17 00:00:00 2001 From: Ryan Hartlage Date: Sat, 14 Jul 2018 15:07:39 -0400 Subject: [PATCH 5/5] Use alphabetical order for lua modules in kconfig --- components/modules/Kconfig | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/components/modules/Kconfig b/components/modules/Kconfig index 98ae8f48..0ad73643 100644 --- a/components/modules/Kconfig +++ b/components/modules/Kconfig @@ -110,6 +110,12 @@ config LUA_MODULE_LEDC help Includes the LEDC module. +config LUA_MODULE_MQTT + bool "MQTT module" + default "n" + help + Includes the MQTT module. + config LUA_MODULE_NET bool "Net module" default "y" @@ -180,10 +186,4 @@ config LUA_MODULE_WS2812 help Includes the ws2812 module. -config LUA_MODULE_MQTT - bool "MQTT module" - default "n" - help - Includes the MQTT module. - endmenu