diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index 92570c04..2a411e9f 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -50,10 +50,6 @@ typedef struct mqtt_event_data_t uint16_t data_offset; } mqtt_event_data_t; -#define RECONNECT_OFF 0 -#define RECONNECT_POSSIBLE 1 -#define RECONNECT_ON 2 - typedef enum { MQTT_RECV_NORMAL, MQTT_RECV_BUFFERING_SHORT, @@ -64,7 +60,6 @@ typedef enum { typedef struct mqtt_state_t { uint16_t port; - uint8_t auto_reconnect; // 0 is not auto_reconnect. 1 is auto reconnect, but never connected. 2 is auto reconnect, but once connected mqtt_connect_info_t* connect_info; mqtt_connection_t mqtt_connection; msg_queue_t* pending_msg_q; @@ -144,31 +139,19 @@ static void mqtt_socket_disconnected(void *arg) // tcp only mud->mqtt_state.recv_buffer_size = 0; mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL; - if(mud->mqtt_state.auto_reconnect == RECONNECT_ON) { - mud->pesp_conn->reverse = mud; - mud->pesp_conn->type = ESPCONN_TCP; - mud->pesp_conn->state = ESPCONN_NONE; - mud->connected = false; - mud->pesp_conn->proto.tcp->remote_port = mud->mqtt_state.port; - mud->pesp_conn->proto.tcp->local_port = espconn_port(); - espconn_regist_connectcb(mud->pesp_conn, mqtt_socket_connected); - espconn_regist_reconcb(mud->pesp_conn, mqtt_socket_reconnected); - socket_connect(pesp_conn); - } else { - if(mud->pesp_conn){ - mud->pesp_conn->reverse = NULL; - if(mud->pesp_conn->proto.tcp) - c_free(mud->pesp_conn->proto.tcp); - mud->pesp_conn->proto.tcp = NULL; - c_free(mud->pesp_conn); - mud->pesp_conn = NULL; - } - - mud->connected = false; - luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref); - mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self + if(mud->pesp_conn){ + mud->pesp_conn->reverse = NULL; + if(mud->pesp_conn->proto.tcp) + c_free(mud->pesp_conn->proto.tcp); + mud->pesp_conn->proto.tcp = NULL; + c_free(mud->pesp_conn); + mud->pesp_conn = NULL; } + mud->connected = false; + luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref); + mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self + if(call_back){ lua_call(L, 1, 0); } @@ -191,24 +174,18 @@ static void mqtt_socket_reconnected(void *arg, sint8_t err) mud->event_timeout = 0; // no need to count anymore - if(mud->mqtt_state.auto_reconnect == RECONNECT_ON) { - pesp_conn->proto.tcp->remote_port = mud->mqtt_state.port; - pesp_conn->proto.tcp->local_port = espconn_port(); - socket_connect(pesp_conn); - } else { #ifdef CLIENT_SSL_ENABLE - if (mud->secure) { - espconn_secure_disconnect(pesp_conn); - } else + if (mud->secure) { + espconn_secure_disconnect(pesp_conn); + } else #endif - { - espconn_disconnect(pesp_conn); - } - - mqtt_connack_fail(mud, MQTT_CONN_FAIL_SERVER_NOT_FOUND); - - mqtt_socket_disconnected(arg); + { + espconn_disconnect(pesp_conn); } + + mqtt_connack_fail(mud, MQTT_CONN_FAIL_SERVER_NOT_FOUND); + + mqtt_socket_disconnected(arg); NODE_DBG("leave mqtt_socket_reconnected.\n"); } @@ -475,9 +452,6 @@ READPACKET: mud->keepalive_sent = 0; luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); mud->cb_connect_fail_ref = LUA_NOREF; - if (mud->mqtt_state.auto_reconnect == RECONNECT_POSSIBLE) { - mud->mqtt_state.auto_reconnect = RECONNECT_ON; - } if(mud->cb_connect_ref == LUA_NOREF) break; if(mud->self_ref == LUA_NOREF) @@ -1050,7 +1024,6 @@ static int mqtt_socket_client( lua_State* L ) mud->connect_info.max_message_length = max_message_length; mud->mqtt_state.pending_msg_q = NULL; - mud->mqtt_state.auto_reconnect = RECONNECT_OFF; mud->mqtt_state.port = 1883; mud->mqtt_state.connect_info = &mud->connect_info; mud->mqtt_state.recv_buffer = NULL; @@ -1242,7 +1215,7 @@ static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) } #include "pm/swtimer.h" -// Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client), function(client, connect_return_code) ) +// Lua: mqtt:connect( host, port, secure, function(client), function(client, connect_return_code) ) static int mqtt_socket_connect( lua_State* L ) { NODE_DBG("enter mqtt_socket_connect.\n"); @@ -1252,7 +1225,7 @@ static int mqtt_socket_connect( lua_State* L ) ip_addr_t ipaddr; const char *domain; int stack = 1; - unsigned secure = 0, auto_reconnect = RECONNECT_OFF; + unsigned secure = 0; int top = lua_gettop(L); sint8 espconn_status; @@ -1315,13 +1288,15 @@ static int mqtt_socket_connect( lua_State* L ) pesp_conn->proto.tcp->local_port = espconn_port(); mud->mqtt_state.port = port; - if ( (stack<=top) && lua_isnumber(L, stack) ) + if ( (stack<=top) && (lua_isnumber(L, stack) || lua_isboolean(L, stack)) ) { - secure = lua_tointeger(L, stack); - stack++; - if ( secure != 0 && secure != 1 ){ - secure = 0; // default to 0 + if (lua_isnumber(L, stack)) { + platform_print_deprecation_note("mqtt.connect secure parameter as integer","in the future"); + secure = !!lua_tointeger(L, stack); + } else { + secure = lua_toboolean(L, stack); } + stack++; } else { secure = 0; // default to 0 } @@ -1334,19 +1309,6 @@ static int mqtt_socket_connect( lua_State* L ) } #endif - if ( (stack<=top) && lua_isnumber(L, stack) ) - { - platform_print_deprecation_note("autoreconnect is deprecated", "in the next version"); - auto_reconnect = lua_tointeger(L, stack); - stack++; - if ( auto_reconnect != RECONNECT_OFF && auto_reconnect != RECONNECT_POSSIBLE ){ - auto_reconnect = RECONNECT_OFF; // default to 0 - } - } else { - auto_reconnect = RECONNECT_OFF; // default to 0 - } - mud->mqtt_state.auto_reconnect = auto_reconnect; - // call back function when a connection is obtained, tcp only if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){ lua_pushvalue(L, stack); // copy argument (func) to the top of stack @@ -1416,8 +1378,6 @@ static int mqtt_socket_close( lua_State* L ) return 1; } - mud->mqtt_state.auto_reconnect = RECONNECT_OFF; // stop auto reconnect. - sint8 espconn_status = ESPCONN_CONN; if (mud->connected) { // Send disconnect message @@ -1486,6 +1446,15 @@ static int mqtt_socket_on( lua_State* L ) }else if( sl == 8 && c_strcmp(method, "overflow") == 0){ luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_overflow_ref); mud->cb_overflow_ref = luaL_ref(L, LUA_REGISTRYINDEX); + }else if( sl == 6 && c_strcmp(method, "puback") == 0){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref); + mud->cb_puback_ref = luaL_ref(L, LUA_REGISTRYINDEX); + }else if( sl == 6 && c_strcmp(method, "suback") == 0){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_suback_ref); + mud->cb_suback_ref = luaL_ref(L, LUA_REGISTRYINDEX); + }else if( sl == 8 && c_strcmp(method, "unsuback") == 0){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_unsuback_ref); + mud->cb_unsuback_ref = luaL_ref(L, LUA_REGISTRYINDEX); }else{ lua_pop(L, 1); return luaL_error( L, "method not supported" ); diff --git a/docs/modules/mqtt.md b/docs/modules/mqtt.md index 64817e19..bf285a35 100644 --- a/docs/modules/mqtt.md +++ b/docs/modules/mqtt.md @@ -123,13 +123,12 @@ none Connects to the broker specified by the given host, port, and secure options. #### Syntax -`mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)[, function(client, reason)]])` +`mqtt:connect(host[, port[, secure]][, function(client)[, function(client, reason)]])` #### Parameters - `host` host, domain or IP (string) - `port` broker port (number), default 1883 -- `secure` 0/1 for `false`/`true`, default 0. Take note of constraints documented in the [net module](net.md). -- `autoreconnect` 0/1 for `false`/`true`, default 0. This option is *deprecated*. +- `secure` boolean: if `true`, use TLS. Take note of constraints documented in the [net module](net.md). - `function(client)` callback function for when the connection was established - `function(client, reason)` callback function for when the connection could not be established. No further callbacks should be called. @@ -138,11 +137,8 @@ Connects to the broker specified by the given host, port, and secure options. #### Notes -Don't use `autoreconnect`. Let me repeat that, don't use `autoreconnect`. You should handle the errors explicitly and appropriately for -your application. In particular, the default for `cleansession` above is `true`, so all subscriptions are destroyed when the connection -is lost for any reason. - -In order to acheive a consistent connection, handle errors in the error callback. For example: +An application should watch for connection failures and handle errors in the error callback, +in order to achieve a reliable connection to the server. For example: ``` function handle_mqtt_error(client, reason) @@ -156,13 +152,12 @@ end In reality, the connected function should do something useful! -This is the description of how the `autoreconnect` functionality may (or may not) work. +The two callbacks to `:connect()` alias with the "connect" and "offline" +callbacks available through `:on()`. -> When `autoreconnect` is set, then the connection will be re-established when it breaks. No error indication will be given (but all the -> subscriptions may be lost if `cleansession` is true). However, if the -> very first connection fails, then no reconnect attempt is made, and the error is signalled through the callback (if any). The first connection -> is considered a success if the client connects to a server and gets back a good response packet in response to its MQTT connection request. -> This implies (for example) that the username and password are correct. +Previously, we instructed an application to pass either the *integer* 0 or +*integer* 1 for `secure`. Now, this will trigger a deprecation warning; please +use the *boolean* `false` or `true` instead. #### Connection failure callback reason codes: @@ -213,7 +208,7 @@ Registers a callback function for an event. `mqtt:on(event, function(client[, topic[, message]]))` #### Parameters -- `event` can be "connect", "message", "offline" or "overflow" +- `event` can be "connect", "suback", "unsuback", "puback", "message", "overflow", or "offline" - `function(client[, topic[, message]])` callback function. The first parameter is the client. If event is "message", the 2nd and 3rd param are received topic and message (strings). #### Returns @@ -231,8 +226,13 @@ Publishes a message. - `message` the message to publish, (buffer or string) - `qos` QoS level - `retain` retain flag -- `function(client)` optional callback fired when PUBACK received. NOTE: When calling publish() more than once, the last callback function defined will be called for ALL publish commands. +- `function(client)` optional callback fired when PUBACK received. +#### Notes + +When calling publish() more than once, the last callback function defined will +be called for ALL publish commands. This callback argument also aliases with +the "puback" callback for `:on()`. #### Returns `true` on success, `false` otherwise @@ -249,7 +249,13 @@ Subscribes to one or several topics. - `topic` a [topic string](http://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices) - `qos` QoS subscription level, default 0 - `table` array of 'topic, qos' pairs to subscribe to -- `function(client)` optional callback fired when subscription(s) succeeded. NOTE: When calling subscribe() more than once, the last callback function defined will be called for ALL subscribe commands. +- `function(client)` optional callback fired when subscription(s) succeeded. + +#### Notes + +When calling subscribe() more than once, the last callback function defined +will be called for ALL subscribe commands. This callback argument also aliases +with the "suback" callback for `:on()`. #### Returns `true` on success, `false` otherwise @@ -278,7 +284,13 @@ Unsubscribes from one or several topics. #### Parameters - `topic` a [topic string](http://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices) - `table` array of 'topic, anything' pairs to unsubscribe from -- `function(client)` optional callback fired when unsubscription(s) succeeded. NOTE: When calling unsubscribe() more than once, the last callback function defined will be called for ALL unsubscribe commands. +- `function(client)` optional callback fired when unsubscription(s) succeeded. + +#### Notes + +When calling subscribe() more than once, the last callback function defined +will be called for ALL subscribe commands. This callback argument also aliases +with the "unsuback" callback for `:on()`. #### Returns `true` on success, `false` otherwise