From 56f19e44c35f019b6b56d04ec5b9a764e6e0413c Mon Sep 17 00:00:00 2001 From: Javier Peletier Date: Tue, 29 Jan 2019 22:36:29 +0100 Subject: [PATCH] ESP32: MQTT module rewrite (#2629) * mqtt: first pass * mqtt: correct gc during callback bug * mqtt: document and cleanup * mqtt: separate lnodeaux due to copyright * mqtt: rename lnodeaux methods * mqtt: update docs * mqtt: fix copyright in lnodeaux.c * mqtt: refactor and cleanup * mqtt: use ESP logging --- components/base_nodemcu/include/lnodeaux.h | 68 ++ components/base_nodemcu/lnodeaux.c | 117 ++ components/modules/mqtt.c | 1163 +++++++++----------- docs/modules/mqtt.md | 1 + 4 files changed, 715 insertions(+), 634 deletions(-) create mode 100644 components/base_nodemcu/include/lnodeaux.h create mode 100644 components/base_nodemcu/lnodeaux.c diff --git a/components/base_nodemcu/include/lnodeaux.h b/components/base_nodemcu/include/lnodeaux.h new file mode 100644 index 00000000..80217018 --- /dev/null +++ b/components/base_nodemcu/include/lnodeaux.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2019 the NodeMCU authors. All rights reserved + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the + * distribution. + * - Neither the name of the copyright holders nor the names of + * its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * + * @author Javier Peletier + * @brief This library exports useful functions to handle lua registry refs and strings + */ + +#ifndef _NODEMCU_LNODEAUX_H_ +#define _NODEMCU_LNODEAUX_H_ + +#include "lua.h" + +// lua_ref_t represents a reference to a lua object in the registry +typedef int lua_ref_t; + +//luaX_weak_ref pops an item from the stack and returns a weak reference to it +lua_ref_t luaX_weak_ref(lua_State* L); + +//luaL_push_weak takes a weak reference and pushes the original item on the stack +void luaX_push_weak_ref(lua_State* L, lua_ref_t ref); + +// luaX_alloc_string creates a dynamically-allocated null-terminated string copying it +// from a lua stack position +char* luaX_alloc_string(lua_State* L, int idx, int max_length); + +// luaX_free_string deallocates memory of a string allocated with luaX_alloc_string +void luaX_free_string(lua_State* L, char* st); + +// luaX_unset_ref unpins a reference to a lua object in the registry +void luaX_unset_ref(lua_State* L, lua_ref_t* ref); + +// luaX_set_ref pins a reference to a lua object, provided a registry +// or stack position +void luaX_set_ref(lua_State* L, int idx, lua_ref_t* ref); + +// luaX_valid_ref returns true if the reference is set. +inline bool luaX_valid_ref(lua_ref_t ref) { + return ref > 0; +} + +#endif diff --git a/components/base_nodemcu/lnodeaux.c b/components/base_nodemcu/lnodeaux.c new file mode 100644 index 00000000..478c4b1e --- /dev/null +++ b/components/base_nodemcu/lnodeaux.c @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2019 the NodeMCU authors. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * - Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the + * distribution. + * - Neither the name of the copyright holders nor the names of + * its contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * + * @author Javier Peletier + */ +#include "lnodeaux.h" +#include +#include "lauxlib.h" +#include "lmem.h" + +// reference to the weak references metatable +static lua_ref_t weak_mt_ref = LUA_NOREF; + +// luaX_weak_ref pops an item from the stack and returns a weak reference to it +// inspired by https://stackoverflow.com/a/19340846 +lua_ref_t luaX_weak_ref(lua_State* L) { + lua_newtable(L); // push a table on the stack that will serve as proxy of our item + + // Initialize weak metatable if this is the first call + if (weak_mt_ref == LUA_NOREF) { + lua_newtable(L); // push a new table on the stack to serve as metatable + lua_pushliteral(L, "__mode"); + lua_pushliteral(L, "v"); + lua_rawset(L, -3); // metatable._mode='v' (values are weak) http://lua-users.org/wiki/WeakTablesTutorial + lua_pushvalue(L, -1); // duplicate metatable on the stack + weak_mt_ref = luaL_ref(L, LUA_REGISTRYINDEX); // store ref to weak metatable in the registry (pops 1 item) + } else { + lua_rawgeti(L, LUA_REGISTRYINDEX, weak_mt_ref); // retrieve metatable from registry + } + + lua_setmetatable(L, -2); // setmetatable(proxy,metatable) + + lua_pushvalue(L, -2); // push the previous top of stack + lua_rawseti(L, -2, 1); // proxy[1]=original value on top of the stack + + lua_ref_t ref = luaL_ref(L, LUA_REGISTRYINDEX); // this pops the proxy + lua_pop(L, 1); // pop the ref + return ref; +} + +//luaL_push_weak takes a weak reference and pushes the original item on the stack +void luaX_push_weak_ref(lua_State* L, lua_ref_t ref) { + if (ref <= 0) { + luaL_error(L, "invalid weak ref"); + } + lua_rawgeti(L, LUA_REGISTRYINDEX, ref); // push the proxy object on the stack + lua_rawgeti(L, -1, 1); // push proxy[1] (our proxied item) on the stack + lua_remove(L, -2); // remove proxy from underneath the stack. + // Retrieved item remains on top, as output of this function. +} + +// luaX_alloc_string creates a dynamically-allocated null-terminated string copying it +// from a lua stack position +char* luaX_alloc_string(lua_State* L, int idx, int max_length) { + const char* lua_st = luaL_checkstring(L, idx); //retrieve string from lua + // measure the string and limit it to max_length + int len = strlen(lua_st); + if (len > max_length) + len = max_length; + + // allocate memory for our copy, saving 1 byte to null-terminate it. + char* st = luaM_malloc(L, len + 1); + + // actually make a copy + strncpy(st, lua_st, len); + + // terminate it with a null char. + st[len] = '\0'; + return st; +} + +// luaX_free_string deallocates memory of a string allocated with luaX_alloc_string +void luaX_free_string(lua_State* L, char* st) { + if (st) + luaM_freearray(L, st, strlen(st) + 1, char); +} + +// luaX_unset_ref unpins a reference to a lua object in the registry +void luaX_unset_ref(lua_State* L, lua_ref_t* ref) { + luaL_unref(L, LUA_REGISTRYINDEX, *ref); + *ref = LUA_NOREF; +} + +// luaX_set_ref pins a reference to a lua object, provided a registry +// or stack position +void luaX_set_ref(lua_State* L, int idx, lua_ref_t* ref) { + luaX_unset_ref(L, ref); // make sure we free previous reference + lua_pushvalue(L, idx); // push on the stack the referenced index + *ref = luaL_ref(L, LUA_REGISTRYINDEX); // set the reference (pops 1 value) +} diff --git a/components/modules/mqtt.c b/components/modules/mqtt.c index 5bd4cb0a..76702d28 100644 --- a/components/modules/mqtt.c +++ b/components/modules/mqtt.c @@ -1,735 +1,630 @@ // Module for interfacing with an MQTT broker - -#include "module.h" +#include "esp_log.h" #include "lauxlib.h" +#include "lmem.h" +#include "lnodeaux.h" +#include "module.h" #include "platform.h" #include "task/task.h" -#include "mqtt_client.h" #include +#include "mqtt_client.h" -#define MQTT_MAX_HOST_LEN 64 -#define MQTT_MAX_CLIENT_LEN 32 -#define MQTT_MAX_USERNAME_LEN 32 -#define MQTT_MAX_PASSWORD_LEN 65 -#define MQTT_MAX_LWT_TOPIC 32 -#define MQTT_MAX_LWT_MSG 128 +#define MQTT_MAX_HOST_LEN 64 +#define MQTT_MAX_CLIENT_LEN 32 +#define MQTT_MAX_USERNAME_LEN 32 +#define MQTT_MAX_PASSWORD_LEN 65 +#define MQTT_MAX_LWT_TOPIC 32 +#define MQTT_MAX_LWT_MSG 128 +#define MQTT_METATABLE "mqtt.mt" +#define TAG "MQTT" +// mqtt_context struct contains information to wrap a esp_mqtt client in lua typedef struct { - esp_mqtt_client_config_t config; - char host[MQTT_MAX_HOST_LEN]; - char uri[MQTT_MAX_HOST_LEN]; - char client_id[MQTT_MAX_CLIENT_LEN]; - char username[MQTT_MAX_USERNAME_LEN]; - char password[MQTT_MAX_PASSWORD_LEN]; - char lwt_topic[MQTT_MAX_LWT_TOPIC]; - char lwt_msg[MQTT_MAX_LWT_MSG]; -} esp_mqtt_lua_client_config_t; + esp_mqtt_client_handle_t client; // handle to mqtt client + char* client_id; // mqtt client ID + char* username; // mqtt username + char* password; // mqtt password + char* lwt_topic; // mqtt last will/testament topic + char* lwt_msg; // mqtt last will message + int lwt_qos; // mqtt LWT qos level + int lwt_retain; // mqtt LWT retain flag + int keepalive; // keepalive ping period in seconds + int disable_clean_session; // Whether to not clean the session on reconnect + union { + struct { + lua_ref_t on_connect_cb; // maps to "connect" event + lua_ref_t on_message_cb; // maps to "message" event + lua_ref_t on_offline_cb; // maps to "offline" event + lua_ref_t connected_ok_cb; + lua_ref_t connected_nok_cb; + lua_ref_t published_ok_cb; + lua_ref_t subscribed_ok_cb; + lua_ref_t unsubscribed_ok_cb; + lua_ref_t self; + }; + lua_ref_t lua_refs[9]; + }; +} mqtt_context_t; -task_handle_t hConn; -task_handle_t hOff; -task_handle_t hPub; -task_handle_t hSub; -task_handle_t hUnsub; -task_handle_t hData; +// event_handler_t is the function signature for all events +typedef void (*event_handler_t)(lua_State* L, mqtt_context_t* mqtt_context, esp_mqtt_event_handle_t event); -// ------------------------------------------------------------------------- // +// eventnames contains a list of the events that can be set in lua +// with client:on(eventName, function) +// The order is important, as they map directly to callbacks +// in the union/struct above +const char* const eventnames[] = {"connect", "message", "offline", NULL}; -// locate the C mqtt_client pointer and leave the -// Lua instance on the top of the stack -static esp_mqtt_client_handle_t get_client( lua_State * L ) -{ - if( !lua_istable( L, 1 ) ) - { - luaL_error( L, "Expected MQTT module (client)" ); - return 0; //never reached - } +// nodemcu task handlers for receiving events +task_handle_t event_handler_task_id = 0; - lua_getfield( L, 1, "_client" ); - if( !lua_islightuserdata( L, -1 ) ) - { - luaL_error( L, "Expected MQTT client pointer" ); - return 0; //never reached - } +// event_clone makes a copy of the mqtt event received so we can pass it on +// and the mqtt library can discard it. +static esp_mqtt_event_handle_t event_clone(esp_mqtt_event_handle_t ev) { + // allocate memory for the copy + esp_mqtt_event_handle_t ev1 = (esp_mqtt_event_handle_t)malloc(sizeof(esp_mqtt_event_t)); + ESP_LOGD(TAG, "event_clone(): event %p, event id %d, msg %d", ev, ev->event_id, ev->msg_id); - esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) lua_touserdata( L, -1 ); - lua_pop( L, 1 ); // just pop the _mqtt field - return client; -} + // make a shallow copy: + *ev1 = *ev; -// locate the C mqtt_settings pointer and leave the -// Lua instance on the top of the stack -static esp_mqtt_lua_client_config_t * get_settings( lua_State * L ) -{ - if( !lua_istable( L, 1 ) ) - { - luaL_error( L, "Expected MQTT module (settings)" ); - return 0; //never reached - } - - lua_getfield( L, 1, "_settings" ); - if( !lua_islightuserdata( L, -1 ) ) - { - luaL_error( L, "Expected MQTT settings pointer" ); - return 0; //never reached - } - - esp_mqtt_lua_client_config_t * settings = (esp_mqtt_lua_client_config_t *) lua_touserdata( L, -1 ); - lua_pop( L, 1 ); // just pop the _mqtt field - return settings; -} - -// ------------------------------------------------------------------------- // - -static esp_mqtt_event_handle_t event_clone(esp_mqtt_event_handle_t ev) -{ - esp_mqtt_event_handle_t ev1 = (esp_mqtt_event_handle_t) malloc(sizeof(esp_mqtt_event_t)); - memset(ev1, 0, sizeof(esp_mqtt_event_t)); - NODE_DBG("event_clone():malloc: event %p, msg %d\n", ev, ev->msg_id); - - ev1->event_id = ev->event_id; - ev1->client = ev->client; - ev1->user_context = ev->user_context; - ev1->total_data_len = ev->total_data_len; - ev1->current_data_offset = ev->current_data_offset; - ev1->msg_id = ev->msg_id; - - ev1->data_len = ev->data_len; - if( ev->data != NULL && ev->data_len > 0 ) - { - ev1->data = malloc(ev->data_len + 1); - memcpy(ev1->data, ev->data, ev->data_len); - ev1->data[ev1->data_len] = '\0'; - NODE_DBG("event_clone():malloc: event %p, msg %d, data %p, num %d\n", ev1, ev1->msg_id, ev1->data, ev1->data_len); - } - - ev1->topic_len = ev->topic_len; - if( ev->topic != NULL && ev->topic_len > 0 ) - { - ev1->topic = malloc(ev->topic_len + 1); - memcpy(ev1->topic, ev->topic, ev->topic_len); - ev1->topic[ev1->topic_len] = '\0'; - NODE_DBG("event_clone():malloc: event %p, msg %d, topic %p, num %d\n", ev1, ev1->msg_id, ev1->topic, ev1->topic_len); - } - return ev1; -} - -static void event_free(esp_mqtt_event_handle_t ev) -{ - if(ev->data != NULL) - { - NODE_DBG("event_free():free: event %p, msg %d, data %p\n", ev, ev->msg_id, ev->data); - free(ev->data); - } - if(ev->topic != NULL) - { - NODE_DBG("event_free():free: event %p, msg %d, topic %p\n", ev, ev->msg_id, ev->topic); - free(ev->topic); - } - free(ev); -} - -// ------------------------------------------------------------------------- // - -static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) -{ - switch (event->event_id) { - case MQTT_EVENT_CONNECTED: - task_post_medium(hConn, (task_param_t) event_clone(event)); - break; - - case MQTT_EVENT_DISCONNECTED: - task_post_medium( hOff, (task_param_t) event_clone(event)); - break; - - case MQTT_EVENT_SUBSCRIBED: - task_post_medium(hSub, (task_param_t) event_clone(event)); - break; - - case MQTT_EVENT_UNSUBSCRIBED: - task_post_medium(hUnsub, (task_param_t) event_clone(event)); - break; - - case MQTT_EVENT_PUBLISHED: - task_post_medium(hPub, (task_param_t) event_clone(event)); - break; - - case MQTT_EVENT_DATA: - task_post_medium(hData, (task_param_t) event_clone(event)); - break; - - case MQTT_EVENT_ERROR: - break; + // if the event carries data, make also a copy of it: + if (ev->data != NULL) { + if (ev->data_len > 0) { + ev1->data = malloc(ev->data_len + 1); // null-terminate the data, useful for debugging + memcpy(ev1->data, ev->data, ev->data_len); + ev1->data[ev1->data_len] = '\0'; + ESP_LOGD(TAG, "event_clone():malloc: event %p, msg %d, data %p, num %d", ev1, ev1->msg_id, ev1->data, ev1->data_len); + } else { + ev1->data = NULL; + } } + + // if the event carries a topic, make also a copy of it: + if (ev->topic != NULL) { + if (ev->topic_len > 0) { + ev1->topic = malloc(ev->topic_len + 1); // null-terminate the data, useful for debugging + memcpy(ev1->topic, ev->topic, ev->topic_len); + ev1->topic[ev1->topic_len] = '\0'; + ESP_LOGD(TAG, "event_clone():malloc: event %p, msg %d, topic %p, num %d", ev1, ev1->msg_id, ev1->topic, ev1->topic_len); + } else { + ev1->topic = NULL; + } + } + return ev1; +} + +// event_free deallocates all the memory associated with a cloned event +static void event_free(esp_mqtt_event_handle_t ev) { + if (ev->data != NULL) { + ESP_LOGD(TAG, "event_free():free: event %p, msg %d, data %p", ev, ev->msg_id, ev->data); + free(ev->data); + } + if (ev->topic != NULL) { + ESP_LOGD(TAG, "event_free():free: event %p, msg %d, topic %p", ev, ev->msg_id, ev->topic); + free(ev->topic); + } + free(ev); +} + +// event_connected is run when the mqtt client connected +static void event_connected(lua_State* L, mqtt_context_t* mqtt_context, esp_mqtt_event_handle_t event) { + // if the user set a one-shot connected callback, execute it: + if (luaX_valid_ref(mqtt_context->connected_ok_cb)) { + lua_rawgeti(L, LUA_REGISTRYINDEX, mqtt_context->connected_ok_cb); // push the callback function reference to the stack + luaX_push_weak_ref(L, mqtt_context->self); // push a reference to the client (first parameter) + + ESP_LOGD(TAG, "CB:connect: calling registered one-shot connect callback"); + int res = lua_pcall(L, 1, 0, 0); //call the connect callback: function(client) + if (res != 0) + ESP_LOGD(TAG, "CB:connect: Error when calling one-shot connect callback - (%d) %s", res, luaL_checkstring(L, -1)); + + //after connecting ok, we clear _both_ the one-shot callbacks: + luaX_unset_ref(L, &mqtt_context->connected_ok_cb); + luaX_unset_ref(L, &mqtt_context->connected_nok_cb); + } + + // now we check for the standard connect callback registered with 'mqtt:on()' + if (luaX_valid_ref(mqtt_context->on_connect_cb)) { + ESP_LOGD(TAG, "CB:connect: calling registered standard connect callback"); + lua_rawgeti(L, LUA_REGISTRYINDEX, mqtt_context->on_connect_cb); // push the callback function reference to the stack + luaX_push_weak_ref(L, mqtt_context->self); // push a reference to the client (first parameter) + int res = lua_pcall(L, 1, 0, 0); //call the connect callback: function(client) + if (res != 0) + ESP_LOGD(TAG, "CB:connect: Error when calling connect callback - (%d) %s", res, luaL_checkstring(L, -1)); + } +} + +// event_disconnected is run after a connection to the MQTT broker breaks. +static void event_disconnected(lua_State* L, mqtt_context_t* mqtt_context, esp_mqtt_event_handle_t event) { + if (mqtt_context->client == NULL) { + ESP_LOGD(TAG, "MQTT Client was NULL on a disconnect event"); + } + + // destroy the wrapped mqtt_client object + esp_mqtt_client_destroy(mqtt_context->client); + mqtt_context->client = NULL; + + // if the user set a one-shot connect error callback, execute it: + if (luaX_valid_ref(mqtt_context->connected_nok_cb)) { + lua_rawgeti(L, LUA_REGISTRYINDEX, mqtt_context->connected_nok_cb); // push the callback function reference to the stack + luaX_push_weak_ref(L, mqtt_context->self); // push a reference to the client (first parameter) + lua_pushinteger(L, -6); // esp sdk mqtt lib does not provide reason codes. Push "-6" to be backward compatible with ESP8266 API + + ESP_LOGD(TAG, "CB:disconnect: calling registered one-shot disconnect callback"); + int res = lua_pcall(L, 2, 0, 0); //call the disconnect callback with 2 parameters: function(client, reason) + if (res != 0) + ESP_LOGD(TAG, "CB:disconnect: Error when calling one-shot disconnect callback - (%d) %s", res, luaL_checkstring(L, -1)); + + //after connecting ok, we clear _both_ the one-shot callbacks + luaX_unset_ref(L, &mqtt_context->connected_ok_cb); + luaX_unset_ref(L, &mqtt_context->connected_nok_cb); + } + + // now we check for the standard offline callback registered with 'mqtt:on()' + if (luaX_valid_ref(mqtt_context->on_offline_cb)) { + ESP_LOGD(TAG, "CB:disconnect: calling registered standard on_offline_cb callback"); + lua_rawgeti(L, LUA_REGISTRYINDEX, mqtt_context->on_offline_cb); // push the callback function reference to the stack + luaX_push_weak_ref(L, mqtt_context->self); // push a reference to the client (first parameter) + int res = lua_pcall(L, 1, 0, 0); //call the offline callback: function(client) + if (res != 0) + ESP_LOGD(TAG, "CB:disconnect: Error when calling offline callback - (%d) %s", res, luaL_checkstring(L, -1)); + } +} + +// event_subscribed is called when the last subscribe call is successful +static void event_subscribed(lua_State* L, mqtt_context_t* mqtt_context, esp_mqtt_event_handle_t event) { + if (!luaX_valid_ref(mqtt_context->subscribed_ok_cb)) return; + + ESP_LOGD(TAG, "CB:subscribe: calling registered one-shot subscribe callback"); + lua_rawgeti(L, LUA_REGISTRYINDEX, mqtt_context->subscribed_ok_cb); // push the function reference on the stack + luaX_push_weak_ref(L, mqtt_context->self); // push the client object on the stack + int res = lua_pcall(L, 1, 0, 0); //call the connect callback with one parameter: function(client) + if (res != 0) + ESP_LOGD(TAG, "CB:subscribe: Error when calling one-shot subscribe callback - (%d) %s", res, luaL_checkstring(L, -1)); + + luaX_unset_ref(L, &mqtt_context->subscribed_ok_cb); // forget the callback since it is one-shot +} + +//event_published is called when a publish operation completes +static void event_published(lua_State* L, mqtt_context_t* mqtt_context, esp_mqtt_event_handle_t event) { + if (!luaX_valid_ref(mqtt_context->published_ok_cb)) return; + + ESP_LOGD(TAG, "CB:publish: calling registered one-shot publish callback"); + lua_rawgeti(L, LUA_REGISTRYINDEX, mqtt_context->published_ok_cb); // push the callback function reference to the stack + luaX_push_weak_ref(L, mqtt_context->self); // push the client reference to the stack + int res = lua_pcall(L, 1, 0, 0); //call the connect callback with 1 parameter: function(client) + if (res != 0) + ESP_LOGD(TAG, "CB:publish: Error when calling one-shot publish callback - (%d) %s", res, luaL_checkstring(L, -1)); + + luaX_unset_ref(L, &mqtt_context->published_ok_cb); // forget this callback since it is one-shot +} + +// event_unsubscribed is called when a subscription is successful +static void event_unsubscribed(lua_State* L, mqtt_context_t* mqtt_context, esp_mqtt_event_handle_t event) { + if (!luaX_valid_ref(mqtt_context->unsubscribed_ok_cb)) return; + + ESP_LOGD(TAG, "CB:unsubscribe: calling registered one-shot unsubscribe callback"); + lua_rawgeti(L, LUA_REGISTRYINDEX, mqtt_context->unsubscribed_ok_cb); // push callback function reference on the stack + luaX_push_weak_ref(L, mqtt_context->self); // push a reference to the client + int res = lua_pcall(L, 1, 0, 0); //call the connect callback with one parameter: function(client) + if (res != 0) + ESP_LOGD(TAG, "CB:unsubscribe: Error when calling one-shot unsubscribe callback - (%d) %s", res, luaL_checkstring(L, -1)); + + luaX_unset_ref(L, &mqtt_context->unsubscribed_ok_cb); // forget callback as it is one-shot +} + +//event_data_received is called when data is received on a subscribed topic +static void event_data_received(lua_State* L, mqtt_context_t* mqtt_context, esp_mqtt_event_handle_t event) { + if (!luaX_valid_ref(mqtt_context->on_message_cb)) return; + + lua_rawgeti(L, LUA_REGISTRYINDEX, mqtt_context->on_message_cb); + int numArg = 2; + luaX_push_weak_ref(L, mqtt_context->self); + lua_pushlstring(L, event->topic, event->topic_len); + if (event->data != NULL) { + lua_pushlstring(L, event->data, event->data_len); + numArg++; + } + int res = lua_pcall(L, numArg, 0, 0); //call the messagecallback + if (res != 0) + ESP_LOGD(TAG, "CB:data: Error when calling message callback - (%d) %s", res, luaL_checkstring(L, -1)); +} + +// event_task_handler takes a nodemcu task message and dispatches it to the appropriate event_xxx callback above. +static void event_task_handler(task_param_t param, task_prio_t prio) { + // extract the event data out of the task param + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t)param; + + // recover the mqtt context from the event user_context field: + mqtt_context_t* mqtt_context = (mqtt_context_t*)event->user_context; + + // Check if this event is about an object that is in the process of garbage collection: + if (!luaX_valid_ref(mqtt_context->self)) { + ESP_LOGW(TAG, "caught stray event: %d", event->event_id); // this can happen if the userdata object is dereferenced while attempting to connect + goto task_handler_end; // free resources and abort + } + + lua_State* L = lua_getstate(); //returns main Lua state + if (L == NULL) { + goto task_handler_end; // free resources and abort + } + + ESP_LOGD(TAG, "event_task_handler: event_id: %d state %p, settings %p, stack top %d", event->event_id, L, mqtt_context, lua_gettop(L)); + + event_handler_t eventhandler = NULL; + + switch (event->event_id) { + case MQTT_EVENT_DATA: + eventhandler = event_data_received; + break; + case MQTT_EVENT_CONNECTED: + eventhandler = event_connected; + break; + case MQTT_EVENT_DISCONNECTED: + eventhandler = event_disconnected; + break; + case MQTT_EVENT_SUBSCRIBED: + eventhandler = event_subscribed; + break; + case MQTT_EVENT_UNSUBSCRIBED: + eventhandler = event_unsubscribed; + break; + case MQTT_EVENT_PUBLISHED: + eventhandler = event_published; + break; + default: + goto task_handler_end; // free resources and abort + } + + int top = lua_gettop(L); // save the stack status to restore it later + lua_checkstack(L, 5); // make sure there are at least 5 slots available + + // pin our object by putting a reference on the stack, + // so it can't be garbage collected during user callback execution. + luaX_push_weak_ref(L, mqtt_context->self); + + eventhandler(L, mqtt_context, event); + + lua_settop(L, top); // leave the stack as it was + +task_handler_end: + event_free(event); // free the event copy memory +} + +// mqtt_event_handler receives all events from the esp mqtt library and converts them +// to a task message +static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) { + task_post_medium(event_handler_task_id, (task_param_t)event_clone(event)); return ESP_OK; } -//typedef void (*task_callback_t)(task_param_t param, task_prio_t prio); -static void _connected_cb(task_param_t param, task_prio_t prio) -{ - lua_State * L = lua_getstate(); //returns main Lua state - if( L == NULL ) - return; - - esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; - - int top = lua_gettop(L); - lua_checkstack(L, 8); - - char key[64]; - snprintf(key, 64, "mqtt_%p", event->client); - lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:connect: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); - - lua_getfield( L, -1, "_connect_ok" ); - if( lua_isfunction( L, -1 ) ) - { - int top1 = lua_gettop(L); - - NODE_DBG("CB:connect: calling registered one-shot connect callback\n"); - lua_pushvalue( L, -2 ); //dup mqtt table - int res = lua_pcall( L, 1, 0, 0 ); //call the connect callback - if( res != 0 ) - NODE_DBG("CB:connect: Error when calling one-shot connect callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); - - //after connecting ok, we clear _both_ the one-shot callbacks - lua_pushnil(L); - lua_setfield(L, 1, "_connect_ok"); - lua_pushnil(L); - lua_setfield(L, 1, "_connect_nok"); - - lua_settop(L, top1); - } - - // now we check for the standard connect callback registered with 'mqtt:on()' - lua_getfield( L, 1, "_on_connect" ); - if( lua_isfunction( L, -1 ) ) - { - NODE_DBG("CB:connect: calling registered standard connect callback\n"); - lua_pushvalue( L, 1 ); //dup mqtt table - int res = lua_pcall( L, 1, 0, 0 ); //call the connect callback - if( res != 0 ) - NODE_DBG("CB:connect: Error when calling connect callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); - } - lua_settop(L, top); - event_free(event); -} - -static void _disconnected_cb(task_param_t param, task_prio_t prio) -{ - lua_State * L = lua_getstate(); //returns main Lua state - if( L == NULL ) - return; - - esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; - - int top = lua_gettop(L); - lua_checkstack(L, 8); - - char key[64]; - snprintf(key, 64, "mqtt_%p", event->client); - lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:disconnect: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); - - lua_getfield( L, -1, "_connect_nok" ); - if( lua_isfunction( L, -1 ) ) - { - NODE_DBG("CB:disconnect: calling registered one-shot disconnect callback\n"); - lua_pushvalue( L, -2 ); //dup mqtt table - int res = lua_pcall( L, 1, 0, 0 ); //call the disconnect callback - if( res != 0 ) - NODE_DBG("CB:disconnect: Error when calling one-shot disconnect callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); - - //after connecting ok, we clear _both_ the one-shot callbacks - lua_pushnil(L); - lua_setfield(L, 1, "_connect_ok"); - lua_pushnil(L); - lua_setfield(L, 1, "_connect_nok"); - } - - // now we check for the standard connect callback registered with 'mqtt:on()' - lua_getfield( L, -1, "_on_offline" ); - if( !lua_isfunction( L, -1 ) || lua_isnil( L, -1 ) ) - { - event_free(event); - return; - } - - NODE_DBG("CB:disconnect: calling registered standard offline callback\n"); - lua_pushvalue( L, -2 ); //dup mqtt table - int res = lua_pcall( L, 1, 0, 0 ); //call the offline callback - if( res != 0 ) - NODE_DBG("CB:disconnect: Error when calling offline callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); - - lua_settop(L, top); - event_free(event); -} - -static void _subscribe_cb(task_param_t param, task_prio_t prio) -{ - lua_State * L = lua_getstate(); //returns main Lua state - if( L == NULL ) - return; - - esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; - - int top = lua_gettop(L); - lua_checkstack(L, 8); - - char key[64]; - snprintf(key, 64, "mqtt_%p", event->client); - lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:subscribe: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); - - lua_getfield( L, 1, "_subscribe_ok" ); - if( lua_isfunction( L, -1 ) ) - { - NODE_DBG("CB:subscribe: calling registered one-shot subscribe callback\n"); - lua_pushvalue( L, 1 ); //dup mqtt table - int res = lua_pcall( L, 1, 0, 0 ); //call the disconnect callback - if( res != 0 ) - NODE_DBG("CB:subscribe: Error when calling one-shot subscribe callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); - - lua_pushnil(L); - lua_setfield(L, 1, "_subscribe_ok"); - } - lua_settop(L, top); - event_free(event); -} - -static void _publish_cb(task_param_t param, task_prio_t prio) -{ - NODE_DBG("CB:publish: successfully transferred control back to main task\n"); - lua_State * L = lua_getstate(); //returns main Lua state - if( L == NULL ) - return; - - esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; - - int top = lua_gettop(L); - lua_checkstack(L, 8); - - char key[64]; - snprintf(key, 64, "mqtt_%p", event->client); - lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:publish: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); - - lua_getfield( L, 1, "_publish_ok" ); - if( lua_isfunction( L, -1 ) ) - { - NODE_DBG("CB:publish: calling registered one-shot publish callback\n"); - lua_pushvalue( L, 1 ); //dup mqtt table - int res = lua_pcall( L, 1, 0, 0 ); //call the disconnect callback - if( res != 0 ) - NODE_DBG("CB:publish: Error when calling one-shot publish callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); - - lua_pushnil(L); - lua_setfield(L, 1, "_publish_ok"); - } - lua_settop(L, top); - event_free(event); -} - -static void _unsubscribe_cb(task_param_t param, task_prio_t prio) -{ - lua_State * L = lua_getstate(); //returns main Lua state - if( L == NULL ) - return; - - esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; - - int top = lua_gettop(L); - - char key[64]; - snprintf(key, 64, "mqtt_%p", event->client); - lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:subscribe: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); - - lua_getfield( L, 1, "_unsubscribe_ok" ); - if( lua_isfunction( L, -1 ) ) - { - NODE_DBG("CB:unsubscribe: calling registered one-shot unsubscribe callback\n"); - lua_pushvalue( L, 1 ); //dup mqtt table - int res = lua_pcall( L, 1, 0, 0 ); //call the disconnect callback - if( res != 0 ) - NODE_DBG("CB:unsubscribe: Error when calling one-shot unsubscribe callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); - - lua_pushnil(L); - lua_setfield(L, 1, "_unsubscribe_ok"); - } - lua_settop(L, top); - event_free(event); -} - -static void _data_cb(task_param_t param, task_prio_t prio) -{ - lua_State * L = lua_getstate(); //returns main Lua state - if( L == NULL ) - return; - - esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; - - int top = lua_gettop(L); - lua_checkstack(L, 8); - - char key[64]; - snprintf(key, 64, "mqtt_%p", event->client); - lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:data: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); - - lua_getfield( L, 1, "_on_message" ); - if( lua_isfunction( L, -1 ) ) - { - int numArg = 2; - lua_pushvalue( L, 1 ); //dup mqtt table - lua_pushstring( L, event->topic ); - if( event->data != NULL ) - { - lua_pushstring( L, event->data ); - numArg++; - } - - int res = lua_pcall( L, numArg, 0, 0 ); //call the messagecallback - if( res != 0 ) - NODE_DBG("CB:data: Error when calling message callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); - } - lua_settop(L, top); - - event_free(event); -} - -// ------------------------------------------------------------------------- // -// ------------------------------------------------------------------------- // - // Lua: on() -static int mqtt_on(lua_State *L) -{ - enum events{ - ON_CONNECT = 0, - ON_MESSAGE = 1, - ON_OFFLINE = 2 - }; - const char *const eventnames[] = {"connect", "message", "offline", NULL}; +// mqtt_on allows to set the callback associated to mqtt events +static int mqtt_on(lua_State* L) { + if (!lua_isfunction(L, 3)) //check whether we are passed a callback function + return 0; - // mqtt_settings * settings = get_settings( L ); - int event = luaL_checkoption(L, 2, "message", eventnames); + int event = luaL_checkoption(L, 2, "message", eventnames); // map passed event name to an index in the eventnames array - if( !lua_isfunction( L, 3 ) ) - return 0; + mqtt_context_t* mqtt_context = (mqtt_context_t*)luaL_checkudata(L, 1, MQTT_METATABLE); //retrieve the mqtt_context - switch (event) { - case ON_CONNECT: - lua_setfield(L, 1, "_on_connect"); - break; - case ON_MESSAGE: - lua_setfield(L, 1, "_on_message"); - break; - case ON_OFFLINE: - lua_setfield(L, 1, "_on_offline"); - break; - default: - return 0; - } + luaX_set_ref(L, 3, &mqtt_context->lua_refs[event]); // set the callback reference - lua_pop(L, 1); //pop event name - return 0; + return 0; } - // Lua: mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)[, function(client, reason)]]) -static int mqtt_connect( lua_State* L ) -{ - esp_mqtt_lua_client_config_t * mqtt_cfg = get_settings( L ); +// mqtt_connect starts a connection with the mqtt broker +static int mqtt_connect(lua_State* L) { + mqtt_context_t* mqtt_context = (mqtt_context_t*)luaL_checkudata(L, 1, MQTT_METATABLE); //retrieve the mqtt context - int secure = 0; - int reconnect = 0; - const char * host = luaL_checkstring( L, 2 ); - int port = 1883; - int n = 3; + if (mqtt_context->client) { // destroy existing client. This disconnects an existing connection using this object + esp_mqtt_client_destroy(mqtt_context->client); + mqtt_context->client = NULL; + } - if( lua_isnumber( L, n ) ) - { - port = luaL_checknumber( L, n ); - n++; - } + // initialize a mqtt config structure set to zero + esp_mqtt_client_config_t config; + memset(&config, 0, sizeof(esp_mqtt_client_config_t)); - if( lua_isnumber( L, n ) ) - { - secure = !!luaL_checkinteger( L, n ); - n++; - } + // process function parameters populating the mqtt config structure + config.host = luaL_checkstring(L, 2); - if( lua_isnumber( L, n ) ) - { - reconnect = !!luaL_checkinteger( L, n ); - n++; - } + // set defaults: + int secure = 0; + int reconnect = 0; + int port = 1883; + int n = 3; - if( lua_isfunction( L, n ) ) - { - lua_pushvalue( L, n ); - lua_setfield( L, 1, "_connect_ok" ); // set _G["_cb_connect_ok"] = fn() - n++; - } + if (lua_isnumber(L, n)) { + port = luaL_checknumber(L, n); + n++; + } - if( lua_isfunction( L, n ) ) - { - lua_pushvalue( L, n ); - lua_setfield(L, 1, "_connect_nok"); // set _G["_cb_connect_nok"] = fn() - n++; - } + if (lua_isnumber(L, n)) { + secure = !!luaL_checkinteger(L, n); + n++; + } - lua_pop( L, n - 2 ); //pop parameters + if (lua_isnumber(L, n)) { + reconnect = !!luaL_checkinteger(L, n); + n++; + } - strncpy(mqtt_cfg->host, host, MQTT_MAX_HOST_LEN ); - mqtt_cfg->config.port = port; + if (lua_isfunction(L, n)) { + luaX_set_ref(L, n, &mqtt_context->connected_ok_cb); + n++; + } - mqtt_cfg->config.disable_auto_reconnect = (reconnect == 0); - mqtt_cfg->config.transport = secure ? MQTT_TRANSPORT_OVER_SSL : MQTT_TRANSPORT_OVER_TCP; + if (lua_isfunction(L, n)) { + luaX_set_ref(L, n, &mqtt_context->connected_nok_cb); + n++; + } - esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg->config); - if( client == NULL ) - { - luaL_error( L, "MQTT library failed to start" ); - return 0; - } + ESP_LOGD(TAG, "connect: mqtt_context*: %p", mqtt_context); - esp_mqtt_client_start(client); + config.user_context = mqtt_context; // store a pointer to our context in the mqtt client user context field + // this will be useful to identify to which instance events belong to + config.event_handle = mqtt_event_handler; // set the function that will be called by the mqtt client everytime something + // happens - lua_pushlightuserdata( L, client ); - lua_setfield( L, -2, "_client" ); //and store a reference in the MQTT table + config.client_id = mqtt_context->client_id; + config.lwt_msg = mqtt_context->lwt_msg; + config.lwt_topic = mqtt_context->lwt_topic; + config.username = mqtt_context->username; + config.password = mqtt_context->password; + config.keepalive = mqtt_context->keepalive; + config.disable_clean_session = mqtt_context->disable_clean_session; + config.port = port; + config.disable_auto_reconnect = (reconnect == 0); + config.transport = secure ? MQTT_TRANSPORT_OVER_SSL : MQTT_TRANSPORT_OVER_TCP; - char id[32]; - snprintf( id, 32, "mqtt_%p", client); - NODE_DBG("Store MQTT table in _G stack pos %d\n", lua_gettop(L)); - lua_pushvalue( L, 1 ); //make a copy of the table - lua_setglobal( L, id); + // create a mqtt client instance + mqtt_context->client = esp_mqtt_client_init(&config); + if (mqtt_context->client == NULL) { + luaL_error(L, "MQTT library failed to start"); + return 0; + } - return 0; + // actually start the mqtt client and connect + esp_err_t err = esp_mqtt_client_start(mqtt_context->client); + if (err != ESP_OK) { + luaL_error(L, "Error starting mqtt client"); + } + + lua_pushboolean(L, true); // return true (ok) + return 1; } // Lua: mqtt:close() -static int mqtt_close( lua_State* L ) -{ - esp_mqtt_client_handle_t client = get_client( L ); - if( client == NULL ) - return 0; +// mqtt_close terminates the current connection +static int mqtt_close(lua_State* L) { + mqtt_context_t* mqtt_context = (mqtt_context_t*)luaL_checkudata(L, 1, MQTT_METATABLE); - NODE_DBG("Closing MQTT client %p\n", client); + if (mqtt_context->client == NULL) + return 0; - char id[64]; - snprintf(id, 64, "mqtt_%p", client); - lua_pushnil( L ); - lua_setglobal( L, id ); // remove global reference + ESP_LOGD(TAG, "Closing MQTT client %p", mqtt_context->client); - lua_pushstring( L, "_client" ); - lua_pushnil( L ); - lua_settable( L, -3 ); //and remove a reference in the MQTT table + esp_mqtt_client_destroy(mqtt_context->client); + mqtt_context->client = NULL; - return 0; + return 0; } // Lua: mqtt:lwt(topic, message[, qos[, retain]]) -static int mqtt_lwt( lua_State* L ) -{ - esp_mqtt_lua_client_config_t * mqtt_cfg = get_settings( L ); +// mqtt_lwt sets last will / testament topic and message +// must be called before connecting +static int mqtt_lwt(lua_State* L) { + mqtt_context_t* mqtt_context = (mqtt_context_t*)luaL_checkudata(L, 1, MQTT_METATABLE); - strncpy( mqtt_cfg->lwt_topic, luaL_checkstring( L, 2 ), MQTT_MAX_LWT_TOPIC ); - strncpy( mqtt_cfg->lwt_msg, luaL_checkstring( L, 3 ), MQTT_MAX_LWT_MSG ); - mqtt_cfg->config.lwt_msg_len = strlen( mqtt_cfg->lwt_msg ); + // free previous topic and messasge, if any. + luaX_free_string(L, mqtt_context->lwt_topic); + luaX_free_string(L, mqtt_context->lwt_msg); - int n = 4; - if( lua_isnumber( L, n ) ) - { - mqtt_cfg->config.lwt_qos = lua_tonumber( L, n ); - n++; - } + // save a copy of topic and message to pass to the client + // when connecting + mqtt_context->lwt_topic = luaX_alloc_string(L, 2, MQTT_MAX_LWT_TOPIC); + mqtt_context->lwt_msg = luaX_alloc_string(L, 3, MQTT_MAX_LWT_MSG); - if( lua_isnumber( L, n ) ) - { - mqtt_cfg->config.lwt_retain = lua_tonumber( L, n ); - n++; - } + //process optional parameters + int n = 4; + if (lua_isnumber(L, n)) { + mqtt_context->lwt_qos = (int)lua_tonumber(L, n); + n++; + } - lua_pop( L, n ); - NODE_DBG("Set LWT topic '%s', qos %d, retain %d, len %d\n", - mqtt_cfg->lwt_topic, mqtt_cfg->config.lwt_qos, mqtt_cfg->config.lwt_retain, mqtt_cfg->config.lwt_msg_len); - return 0; + if (lua_isnumber(L, n)) { + mqtt_context->lwt_retain = (int)lua_tonumber(L, n); + n++; + } + + ESP_LOGD(TAG, "Set LWT topic '%s', qos %d, retain %d", + mqtt_context->lwt_topic, mqtt_context->lwt_qos, mqtt_context->lwt_retain); + return 0; } //Lua: mqtt:publish(topic, payload, qos, retain[, function(client)]) -static int mqtt_publish( lua_State * L ) -{ - esp_mqtt_client_handle_t client = get_client( L ); +// returns true on success, false otherwise +// mqtt_publish publishes a message on the given topic +static int mqtt_publish(lua_State* L) { + mqtt_context_t* mqtt_context = (mqtt_context_t*)luaL_checkudata(L, 1, MQTT_METATABLE); + esp_mqtt_client_handle_t client = mqtt_context->client; - int top = lua_gettop(L); + if (client == NULL) { + lua_pushboolean(L, false); // return false (error) + return 1; + } - const char * topic = luaL_checkstring( L, 2 ); - const char * data = luaL_checkstring( L, 3 ); - int qos = luaL_checkint( L, 4 ); - int retain = luaL_checkint( L, 5 ); + const char* topic = luaL_checkstring(L, 2); + size_t data_size; + const char* data = luaL_checklstring(L, 3, &data_size); + int qos = luaL_checkint(L, 4); + int retain = luaL_checkint(L, 5); - int n = 6; - if( lua_isfunction( L, n ) ) - { - lua_pushvalue( L, n ); - lua_setfield(L, 1, "_publish_ok"); // set _G["_cb_connect_nok"] = fn() - n++; - } + if (lua_isfunction(L, 6)) { // set one-shot on publish callback + luaX_set_ref(L, 6, &mqtt_context->published_ok_cb); + } - lua_settop(L, top ); - NODE_DBG("MQTT publish client %p, topic %s, %d bytes\n", client, topic, strlen(data)); - esp_mqtt_client_publish(client, topic, data, strlen(data), qos, retain); - return 0; + ESP_LOGD(TAG, "MQTT publish client %p, topic %s, %d bytes", client, topic, data_size); + int msg_id = esp_mqtt_client_publish(client, topic, data, data_size, qos, retain); + + lua_pushboolean(L, msg_id >= 0); // if msg_id < 0 there was an error. + return 1; } // Lua: mqtt:subscribe(topic, qos[, function(client)]) OR mqtt:subscribe(table[, function(client)]) -static int mqtt_subscribe( lua_State* L ) -{ - esp_mqtt_client_handle_t client = get_client( L ); +// returns true on success, false otherwise +// mqtt_subscribe subscribes to the given topic +static int mqtt_subscribe(lua_State* L) { + mqtt_context_t* mqtt_context = (mqtt_context_t*)luaL_checkudata(L, 1, MQTT_METATABLE); + esp_mqtt_client_handle_t client = mqtt_context->client; - int top = lua_gettop(L); + if (client == NULL) { + lua_pushboolean(L, false); // return false (error) + return 1; + } - const char * topic = luaL_checkstring( L, 2 ); - int qos = luaL_checkint( L, 3 ); + const char* topic = luaL_checkstring(L, 2); + int qos = luaL_checkint(L, 3); - if( lua_isfunction( L, 4 ) ) - { - lua_pushvalue( L, 4 ); - lua_setfield(L, 1, "_subscribe_ok"); // set _G["_cb_connect_nok"] = fn() - } + if (lua_isfunction(L, 4)) // if a callback is provided, set it. + luaX_set_ref(L, 4, &mqtt_context->subscribed_ok_cb); - lua_settop(L, top ); - NODE_DBG("MQTT subscribe client %p, topic %s\n", client, topic); - esp_mqtt_client_subscribe(client, topic, qos); - return 0; + ESP_LOGD(TAG, "MQTT subscribe client %p, topic %s", client, topic); + + esp_err_t err = esp_mqtt_client_subscribe(client, topic, qos); + lua_pushboolean(L, err == ESP_OK); + + return 1; // one value returned, true on success, false on error. } -// Lua: mqtt:unsubscribe(topic[, function(client)]) OR mqtt:unsubscribe(table[, function(client)]) -static int mqtt_unsubscribe( lua_State* L ) -{ - esp_mqtt_client_handle_t client = get_client( L ); +// Lua: mqtt:unsubscribe(topic[, function(client)]) +// TODO: accept also mqtt:unsubscribe(table[, function(client)]) +// returns true on success, false otherwise +// mqtt_unsubscribe unsubscribes from the given topic +static int mqtt_unsubscribe(lua_State* L) { + mqtt_context_t* mqtt_context = (mqtt_context_t*)luaL_checkudata(L, 1, MQTT_METATABLE); + esp_mqtt_client_handle_t client = mqtt_context->client; - int top = lua_gettop(L); + if (client == NULL) { + lua_pushboolean(L, false); // return false (error) + return 1; + } - const char * topic = luaL_checkstring( L, 2 ); - int n = 3; - if( lua_isfunction( L, n ) ) - { - lua_pushvalue( L, n ); - lua_setfield(L, 1, "_unsubscribe_ok"); // set _G["_cb_connect_nok"] = fn() - n++; - } + const char* topic = luaL_checkstring(L, 2); + if (lua_isfunction(L, 3)) + luaX_set_ref(L, 3, &mqtt_context->unsubscribed_ok_cb); - lua_settop(L, top ); - NODE_DBG("MQTT unsubscribe client %p, topic %s\n", client, topic); - esp_mqtt_client_unsubscribe(client, topic); - return 0; + ESP_LOGD(TAG, "MQTT unsubscribe client %p, topic %s", client, topic); - return 0; + esp_err_t err = esp_mqtt_client_unsubscribe(client, topic); + lua_pushboolean(L, err == ESP_OK); + + return 1; // return 1 value: true OK, false error. } -static int mqtt_delete( lua_State* L ) -{ - esp_mqtt_lua_client_config_t * settings = get_settings( L ); - if( settings != NULL ) - free( settings ); +// mqtt_deleted is called on garbage collection +static int mqtt_delete(lua_State* L) { + mqtt_context_t* mqtt_context = (mqtt_context_t*)luaL_checkudata(L, 1, MQTT_METATABLE); - esp_mqtt_client_handle_t client = get_client( L ); - if( client != NULL ) - { - NODE_DBG("stopping MQTT client %p\n", client); - esp_mqtt_client_destroy( client ); - free( client ); - } - return 0; + // forget all callbacks + for (int i = 0; i < sizeof(mqtt_context->lua_refs) / sizeof(lua_ref_t); i++) { + luaX_unset_ref(L, &mqtt_context->lua_refs[i]); + } + + // if there is a client active, shut it down. + if (mqtt_context->client != NULL) { + ESP_LOGD(TAG, "stopping MQTT client %p;", mqtt_context); + // destroy the client. This is a blocking call. + // If a connection request was ongoing this will block and + // a disconnect callback could be fired before coming back here. + esp_mqtt_client_destroy(mqtt_context->client); + } + + // free all dynamic strings + luaX_free_string(L, mqtt_context->client_id); + luaX_free_string(L, mqtt_context->username); + luaX_free_string(L, mqtt_context->password); + luaX_free_string(L, mqtt_context->lwt_msg); + luaX_free_string(L, mqtt_context->lwt_topic); + + ESP_LOGD(TAG, "MQTT client garbage collected"); + return 0; } // Lua: mqtt.Client(clientid, keepalive[, username, password, cleansession]) -static int mqtt_new( lua_State* L ) -{ - const char * clientid = NULL; - clientid = luaL_checkstring( L, 1 ); - NODE_DBG("MQTT client id %s\n", clientid); +// mqtt_new creates a new instance of our mqtt userdata lua object +static int mqtt_new(lua_State* L) { + //create a new lua userdata object and initialize to 0. + mqtt_context_t* mqtt_context = (mqtt_context_t*)lua_newuserdata(L, sizeof(mqtt_context_t)); + memset(mqtt_context, 0, sizeof(mqtt_context_t)); - esp_mqtt_lua_client_config_t * mqtt_cfg = (esp_mqtt_lua_client_config_t *) malloc(sizeof(esp_mqtt_lua_client_config_t)); - memset(mqtt_cfg, 0, sizeof(esp_mqtt_lua_client_config_t)); - mqtt_cfg->config.host = mqtt_cfg->host; - mqtt_cfg->config.uri = mqtt_cfg->uri; - mqtt_cfg->config.client_id = mqtt_cfg->client_id; - mqtt_cfg->config.username = mqtt_cfg->username; - mqtt_cfg->config.password = mqtt_cfg->password; - mqtt_cfg->config.lwt_topic = mqtt_cfg->lwt_topic; - mqtt_cfg->config.lwt_msg = mqtt_cfg->lwt_msg; + // initialize all callbacks to LUA_NOREF, indicating they're unset. + for (int i = 0; i < sizeof(mqtt_context->lua_refs) / sizeof(lua_ref_t); i++) { + mqtt_context->lua_refs[i] = LUA_NOREF; + } - mqtt_cfg->config.event_handle = mqtt_event_handler; + // keep a weak reference to our userdata object so we can pass it as a parameter to user callbacks + lua_pushvalue(L, -1); + mqtt_context->self = luaX_weak_ref(L); - strncpy(mqtt_cfg->client_id, clientid, MQTT_MAX_CLIENT_LEN); - mqtt_cfg->config.keepalive = luaL_checkinteger( L, 2 ); + // store the parameters passed: + mqtt_context->client_id = luaX_alloc_string(L, 1, MQTT_MAX_CLIENT_LEN); + ESP_LOGD(TAG, "MQTT client id %s", mqtt_context->client_id); - int n = 2; - if( lua_isstring(L, 3) ) - { - strncpy( mqtt_cfg->username, luaL_checkstring( L, 3 ), MQTT_MAX_USERNAME_LEN); - n++; - } + mqtt_context->keepalive = luaL_checkinteger(L, 2); - if( lua_isstring(L, 4) ) - { - strncpy(mqtt_cfg->password, luaL_checkstring( L, 4 ), MQTT_MAX_PASSWORD_LEN); - n++; - } + int n = 2; + if (lua_isstring(L, 3)) { + mqtt_context->username = luaX_alloc_string(L, 3, MQTT_MAX_USERNAME_LEN); + n++; + } - if( lua_isnumber(L, 5) ) - { - mqtt_cfg->config.disable_clean_session = (luaL_checknumber( L, 5 ) == 0); - n++; - } - lua_pop( L, n ); //remove parameters + if (lua_isstring(L, 4)) { + mqtt_context->password = luaX_alloc_string(L, 4, MQTT_MAX_PASSWORD_LEN); + n++; + } - lua_newtable( L ); - NODE_DBG("New MQTT table at stack pos %d\n", lua_gettop(L)); + if (lua_isnumber(L, 5)) { + mqtt_context->disable_clean_session = (luaL_checknumber(L, 5) == 0); + n++; + } - lua_pushlightuserdata( L, mqtt_cfg ); - lua_setfield( L, -2, "_settings" ); // set t["_mqtt"] = client + luaL_getmetatable(L, MQTT_METATABLE); + lua_setmetatable(L, -2); - luaL_getmetatable( L, "mqtt.mt" ); - lua_setmetatable( L, -2 ); + if (event_handler_task_id == 0) { // if this is the first time, create nodemcu tasks for every event type + event_handler_task_id = task_get_id(event_task_handler); + } - hConn = task_get_id(_connected_cb); - hOff = task_get_id(_disconnected_cb); - hPub = task_get_id(_publish_cb); - hSub = task_get_id(_subscribe_cb); - hUnsub = task_get_id(_unsubscribe_cb); - hData = task_get_id(_data_cb); - - NODE_DBG("conn %d, off %d, pub %d, sub %d, data %d\n", hConn, hOff, hPub, hSub, hData); - return 1; //leave table on top of the stack + return 1; //one object returned, the mqtt context wrapped in a lua userdata object } +// map client methods to functions: static const LUA_REG_TYPE mqtt_metatable_map[] = -{ - { LSTRKEY( "connect" ), LFUNCVAL( mqtt_connect )}, - { LSTRKEY( "close" ), LFUNCVAL( mqtt_close )}, - { LSTRKEY( "lwt" ), LFUNCVAL( mqtt_lwt )}, - { LSTRKEY( "publish" ), LFUNCVAL( mqtt_publish )}, - { LSTRKEY( "subscribe" ), LFUNCVAL( mqtt_subscribe )}, - { LSTRKEY( "unsubscribe" ), LFUNCVAL( mqtt_unsubscribe )}, - { LSTRKEY( "on" ), LFUNCVAL( mqtt_on )}, - { LSTRKEY( "__gc" ), LFUNCVAL( mqtt_delete )}, - { LSTRKEY( "__index" ), LROVAL( mqtt_metatable_map )}, - { LNILKEY, LNILVAL} -}; + { + {LSTRKEY("connect"), LFUNCVAL(mqtt_connect)}, + {LSTRKEY("close"), LFUNCVAL(mqtt_close)}, + {LSTRKEY("lwt"), LFUNCVAL(mqtt_lwt)}, + {LSTRKEY("publish"), LFUNCVAL(mqtt_publish)}, + {LSTRKEY("subscribe"), LFUNCVAL(mqtt_subscribe)}, + {LSTRKEY("unsubscribe"), LFUNCVAL(mqtt_unsubscribe)}, + {LSTRKEY("on"), LFUNCVAL(mqtt_on)}, + {LSTRKEY("__gc"), LFUNCVAL(mqtt_delete)}, + {LSTRKEY("__index"), LROVAL(mqtt_metatable_map)}, + {LNILKEY, LNILVAL}}; // Module function map static const LUA_REG_TYPE mqtt_map[] = { - { LSTRKEY( "Client" ), LFUNCVAL( mqtt_new ) }, - { LNILKEY, LNILVAL } -}; + {LSTRKEY("Client"), LFUNCVAL(mqtt_new)}, + {LNILKEY, LNILVAL}}; -int luaopen_mqtt(lua_State *L) { - luaL_rometatable(L, "mqtt.mt", (void *)mqtt_metatable_map); // create metatable for mqtt - return 0; +int luaopen_mqtt(lua_State* L) { + luaL_rometatable(L, MQTT_METATABLE, (void*)mqtt_metatable_map); // create metatable for mqtt + return 0; } NODEMCU_MODULE(MQTT, "mqtt", mqtt_map, luaopen_mqtt); diff --git a/docs/modules/mqtt.md b/docs/modules/mqtt.md index ea4e801f..46b15014 100644 --- a/docs/modules/mqtt.md +++ b/docs/modules/mqtt.md @@ -1,6 +1,7 @@ # MQTT Module | Since | Origin / Contributor | Maintainer | Source | | :----- | :-------------------- | :---------- | :------ | +| 2019-01-28 | [Javier Peletier](https://github.com/jpeletier) | | [mqtt.c](../../components/modules/mqtt.c)| | 2018-10-08 | [Tuan PM](https://github.com/tuanpmt/esp_mqtt), [Espressif](https://docs.espressif.com/projects/esp-idf/en/latest/api-reference/protocols/mqtt.html) | | [mqtt.c](../../components/modules/mqtt.c)| The client supports version 3.1 and 3.1.1 of the [MQTT](https://en.wikipedia.org/wiki/MQTT) protocol. Make sure that the correct version is set with `make menuconfig` -> "Component config" -> "ESP-MQTT Configurations" -> "Enable MQTT protocol 3.1.1".