From 0abe2fe964437b2e4174a251b139874ed579a435 Mon Sep 17 00:00:00 2001 From: jfollas Date: Sun, 6 Mar 2016 16:33:57 -0500 Subject: [PATCH 1/2] MQTT Client - CONNACK processing - Process the CONNACK message received from the broker after Connect - Provide feedback to Lua via failure callback on client:connect() - Also provide failure information for other situations not covered by CONNACK --- app/modules/mqtt.c | 166 ++++++++++++++++++++++++++++++++++++---- app/mqtt/mqtt_msg.h | 16 ++++ docs/en/modules/mqtt.md | 24 +++++- 3 files changed, 188 insertions(+), 18 deletions(-) diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index f01e9aae..a9ad2ba6 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -59,6 +59,7 @@ typedef struct lmqtt_userdata struct espconn *pesp_conn; int self_ref; int cb_connect_ref; + int cb_connect_fail_ref; int cb_disconnect_ref; int cb_message_ref; int cb_suback_ref; @@ -224,9 +225,12 @@ READPACKET: uint8_t temp_buffer[MQTT_BUF_SIZE]; mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); mqtt_message_t *temp_msg = NULL; + switch(mud->connState){ case MQTT_CONNECT_SENDING: case MQTT_CONNECT_SENT: + mud->event_timeout = 0; + if(mqtt_get_type(in_buffer) != MQTT_MSG_TYPE_CONNACK){ NODE_DBG("MQTT: Invalid packet\r\n"); mud->connState = MQTT_INIT; @@ -240,6 +244,51 @@ READPACKET: { espconn_disconnect(pesp_conn); } + + if(mud->cb_connect_fail_ref == LUA_NOREF) + break; + if(mud->self_ref == LUA_NOREF) + break; + if(mud->L == NULL) + break; + + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua + lua_pushinteger(mud->L, MQTT_CONN_FAIL_NOT_A_CONNACK_MSG); + lua_call(mud->L, 2, 0); + + break; + + } else if (mqtt_get_connect_ret_code(in_buffer) != MQTT_CONNACK_ACCEPTED) { + NODE_DBG("MQTT: CONNACK REFUSED (CODE: %d)\n", mqtt_get_connect_ret_code(in_buffer)); + + mud->connState = MQTT_INIT; + +#ifdef CLIENT_SSL_ENABLE + if(mud->secure) + { + espconn_secure_disconnect(pesp_conn); + } + else +#endif + { + espconn_disconnect(pesp_conn); + } + + if(mud->cb_connect_fail_ref == LUA_NOREF) + break; + if(mud->self_ref == LUA_NOREF) + break; + if(mud->L == NULL) + break; + + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua + lua_pushinteger(mud->L, mqtt_get_connect_ret_code(in_buffer)); // CONNACK response code contains the reason why failure occurred + lua_call(mud->L, 2, 0); + + break; + } else { mud->connState = MQTT_DATA; NODE_DBG("MQTT: Connected\r\n"); @@ -328,7 +377,7 @@ READPACKET: case MQTT_MSG_TYPE_PUBREC: if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){ NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n"); - // Note: actrually, should not destroy the msg until PUBCOMP is received. + // Note: actually, should not destroy the msg until PUBCOMP is received. msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); temp_msg = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id); node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, @@ -380,13 +429,13 @@ READPACKET: length = mud->mqtt_state.message_length_read; if(mud->mqtt_state.message_length < mud->mqtt_state.message_length_read) - { - length -= mud->mqtt_state.message_length; - in_buffer += mud->mqtt_state.message_length; + { + length -= mud->mqtt_state.message_length; + in_buffer += mud->mqtt_state.message_length; - NODE_DBG("Get another published message\r\n"); - goto READPACKET; - } + NODE_DBG("Get another published message\r\n"); + goto READPACKET; + } } break; } @@ -427,6 +476,7 @@ static void mqtt_socket_sent(void *arg) if(mud->connState == MQTT_CONNECT_SENDING){ mud->connState = MQTT_CONNECT_SENT; + mud->event_timeout = MQTT_SEND_TIMEOUT; // MQTT_CONNECT not queued. return; } @@ -541,10 +591,27 @@ void mqtt_socket_timer(void *arg) if(mud->connState == MQTT_INIT){ // socket connect time out. NODE_DBG("Can not connect to broker.\n"); - // Never goes here. + + os_timer_disarm(&mud->mqttTimer); + + if(mud->cb_connect_fail_ref != LUA_NOREF && mud->self_ref != LUA_NOREF && mud->L != NULL) { + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua + lua_pushinteger(mud->L, MQTT_CONN_FAIL_SERVER_NOT_FOUND); + lua_call(mud->L, 2, 0); + } + } else if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out. NODE_DBG("sSend MQTT_CONNECT failed.\n"); mud->connState = MQTT_INIT; + + if(mud->cb_connect_fail_ref != LUA_NOREF && mud->self_ref != LUA_NOREF && mud->L != NULL) { + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua + lua_pushinteger(mud->L, MQTT_CONN_FAIL_TIMEOUT_SENDING); + lua_call(mud->L, 2, 0); + } + #ifdef CLIENT_SSL_ENABLE if(mud->secure) { @@ -556,12 +623,34 @@ void mqtt_socket_timer(void *arg) espconn_disconnect(mud->pesp_conn); } mud->keep_alive_tick = 0; // not need count anymore - } else if(mud->connState == MQTT_CONNECT_SENT){ // wait for CONACK time out. - NODE_DBG("MQTT_CONNECT failed.\n"); + } else if(mud->connState == MQTT_CONNECT_SENT) { // wait for CONACK time out. + NODE_DBG("MQTT_CONNECT timeout.\n"); + mud->connState == MQTT_INIT; + +#ifdef CLIENT_SSL_ENABLE + if(mud->secure) + { + espconn_secure_disconnect(mud->pesp_conn); + } + else +#endif + { + espconn_disconnect(mud->pesp_conn); + } + + if(mud->cb_connect_fail_ref != LUA_NOREF && mud->self_ref != LUA_NOREF && mud->L != NULL) { + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua + lua_pushinteger(mud->L, MQTT_CONN_FAIL_TIMEOUT_RECEIVING); + lua_call(mud->L, 2, 0); + } + } else if(mud->connState == MQTT_DATA){ msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q)); if(pending_msg){ + mud->event_timeout = MQTT_SEND_TIMEOUT; + #ifdef CLIENT_SSL_ENABLE if(mud->secure) { @@ -629,6 +718,7 @@ static int mqtt_socket_client( lua_State* L ) mud->L = NULL; mud->self_ref = LUA_NOREF; mud->cb_connect_ref = LUA_NOREF; + mud->cb_connect_fail_ref = LUA_NOREF; mud->cb_disconnect_ref = LUA_NOREF; mud->cb_message_ref = LUA_NOREF; @@ -675,7 +765,7 @@ static int mqtt_socket_client( lua_State* L ) } if(username == NULL) unl = 0; - NODE_DBG("lengh username: %d\r\n", unl); + NODE_DBG("length username: %d\r\n", unl); if(lua_isstring( L, stack )){ password = luaL_checklstring( L, stack, &pwl ); @@ -683,7 +773,7 @@ static int mqtt_socket_client( lua_State* L ) } if(password == NULL) pwl = 0; - NODE_DBG("lengh password: %d\r\n", pwl); + NODE_DBG("length password: %d\r\n", pwl); if(lua_isnumber( L, stack )) { @@ -797,6 +887,10 @@ static int mqtt_delete( lua_State* L ) luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref); mud->cb_connect_ref = LUA_NOREF; } + if(LUA_NOREF!=mud->cb_connect_fail_ref){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + mud->cb_connect_fail_ref = LUA_NOREF; + } if(LUA_NOREF!=mud->cb_disconnect_ref){ luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); mud->cb_disconnect_ref = LUA_NOREF; @@ -825,7 +919,9 @@ static int mqtt_delete( lua_State* L ) static sint8 socket_connect(struct espconn *pesp_conn) { + NODE_DBG("enter socket_connect.\n"); + sint8 espconn_status; if(pesp_conn == NULL) @@ -881,6 +977,20 @@ static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) if( dns_reconn_count >= 5 ){ NODE_ERR( "DNS Fail!\n" ); // Note: should delete the pesp_conn or unref self_ref here. + + struct espconn *pesp_conn = arg; + if(pesp_conn != NULL) { + lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse; + if(mud != NULL) { + if(mud->cb_connect_fail_ref != LUA_NOREF && mud->self_ref != LUA_NOREF && mud->L != NULL) { + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua + lua_pushinteger(mud->L, MQTT_CONN_FAIL_DNS); + lua_call(mud->L, 2, 0); + } + } + } + mqtt_socket_disconnected(arg); // although not connected, but fire disconnect callback to release every thing. return -1; } @@ -904,7 +1014,7 @@ static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) return espconn_status; } -// Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client) ) +// Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client), function(client, connect_return_code) ) static int mqtt_socket_connect( lua_State* L ) { NODE_DBG("enter mqtt_socket_connect.\n"); @@ -1020,6 +1130,15 @@ static int mqtt_socket_connect( lua_State* L ) mud->cb_connect_ref = luaL_ref(L, LUA_REGISTRYINDEX); stack++; } + + // call back function when a connection fails + if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){ + lua_pushvalue(L, stack); // copy argument (func) to the top of stack + if(mud->cb_connect_fail_ref != LUA_NOREF) + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + mud->cb_connect_fail_ref = luaL_ref(L, LUA_REGISTRYINDEX); + stack++; + } lua_pushvalue(L, 1); // copy userdata to the top of stack if(mud->self_ref != LUA_NOREF) @@ -1249,12 +1368,14 @@ static int mqtt_socket_subscribe( lua_State* L ) { msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) ); NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length); + NODE_DBG("msg_size: %d, event_timeout: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)), mud->event_timeout); sint8 espconn_status = ESPCONN_IF; if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){ mud->event_timeout = MQTT_SEND_TIMEOUT; NODE_DBG("Sent: %d\n", node->msg.length); + #ifdef CLIENT_SSL_ENABLE if( mud->secure ) { @@ -1343,6 +1464,7 @@ static int mqtt_socket_publish( lua_State* L ) sint8 espconn_status = ESPCONN_OK; + if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){ mud->event_timeout = MQTT_SEND_TIMEOUT; NODE_DBG("Sent: %d\n", node->msg.length); @@ -1461,9 +1583,23 @@ static const LUA_REG_TYPE mqtt_socket_map[] = { { LNILKEY, LNILVAL } }; + static const LUA_REG_TYPE mqtt_map[] = { - { LSTRKEY( "Client" ), LFUNCVAL( mqtt_socket_client ) }, - { LSTRKEY( "__metatable" ), LROVAL( mqtt_map ) }, + { LSTRKEY( "Client" ), LFUNCVAL( mqtt_socket_client ) }, + + { LSTRKEY( "CONN_FAIL_SERVER_NOT_FOUND" ), LNUMVAL( MQTT_CONN_FAIL_SERVER_NOT_FOUND ) }, + { LSTRKEY( "CONN_FAIL_NOT_A_CONNACK_MSG" ), LNUMVAL( MQTT_CONN_FAIL_NOT_A_CONNACK_MSG ) }, + { LSTRKEY( "CONN_FAIL_DNS" ), LNUMVAL( MQTT_CONN_FAIL_DNS ) }, + { LSTRKEY( "CONN_FAIL_TIMEOUT_RECEIVING" ), LNUMVAL( MQTT_CONN_FAIL_TIMEOUT_RECEIVING ) }, + { LSTRKEY( "CONN_FAIL_TIMEOUT_SENDING" ), LNUMVAL( MQTT_CONN_FAIL_TIMEOUT_SENDING ) }, + { LSTRKEY( "CONNACK_ACCEPTED" ), LNUMVAL( MQTT_CONNACK_ACCEPTED ) }, + { LSTRKEY( "CONNACK_REFUSED_PROTOCOL_VER" ), LNUMVAL( MQTT_CONNACK_REFUSED_PROTOCOL_VER ) }, + { LSTRKEY( "CONNACK_REFUSED_ID_REJECTED" ), LNUMVAL( MQTT_CONNACK_REFUSED_ID_REJECTED ) }, + { LSTRKEY( "CONNACK_REFUSED_SERVER_UNAVAILABLE" ), LNUMVAL( MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE ) }, + { LSTRKEY( "CONNACK_REFUSED_BAD_USER_OR_PASS" ), LNUMVAL( MQTT_CONNACK_REFUSED_BAD_USER_OR_PASS ) }, + { LSTRKEY( "CONNACK_REFUSED_NOT_AUTHORIZED" ), LNUMVAL( MQTT_CONNACK_REFUSED_NOT_AUTHORIZED ) }, + + { LSTRKEY( "__metatable" ), LROVAL( mqtt_map ) }, { LNILKEY, LNILVAL } }; diff --git a/app/mqtt/mqtt_msg.h b/app/mqtt/mqtt_msg.h index 225ba642..f3b6d5e6 100644 --- a/app/mqtt/mqtt_msg.h +++ b/app/mqtt/mqtt_msg.h @@ -65,6 +65,21 @@ enum mqtt_message_type MQTT_MSG_TYPE_DISCONNECT = 14 }; +enum mqtt_connack_return_code +{ + MQTT_CONN_FAIL_SERVER_NOT_FOUND = -5, + MQTT_CONN_FAIL_NOT_A_CONNACK_MSG = -4, + MQTT_CONN_FAIL_DNS = -3, + MQTT_CONN_FAIL_TIMEOUT_RECEIVING = -2, + MQTT_CONN_FAIL_TIMEOUT_SENDING = -1, + MQTT_CONNACK_ACCEPTED = 0, + MQTT_CONNACK_REFUSED_PROTOCOL_VER = 1, + MQTT_CONNACK_REFUSED_ID_REJECTED = 2, + MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3, + MQTT_CONNACK_REFUSED_BAD_USER_OR_PASS = 4, + MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5 +}; + typedef struct mqtt_message { uint8_t* data; @@ -101,6 +116,7 @@ static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; } static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; } static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } +static inline int mqtt_get_connect_ret_code(uint8_t* buffer) { return (buffer[3]); } void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); int mqtt_get_total_length(uint8_t* buffer, uint16_t length); diff --git a/docs/en/modules/mqtt.md b/docs/en/modules/mqtt.md index 36905325..20cc1a6a 100644 --- a/docs/en/modules/mqtt.md +++ b/docs/en/modules/mqtt.md @@ -41,7 +41,8 @@ m:on("message", function(client, topic, data) end) -- for TLS: m:connect("192.168.11.118", secure-port, 1) -m:connect("192.168.11.118", 1880, 0, function(client) print("connected") end) +m:connect("192.168.11.118", 1880, 0, function(client) print("connected") end, + function(client, reason) print("failed reason: "..reason) end) -- Calling subscribe/publish only makes sense once the connection -- was successfully established. In a real-world application you want @@ -78,18 +79,35 @@ none Connects to the broker specified by the given host, port, and secure options. #### Syntax -`mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)])` +`mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)[, function(client, reason)]])` #### Parameters - `host` host, domain or IP (string) - `port` broker port (number), default 1883 - `secure` 0/1 for `false`/`true`, default 0. [As per #996](https://github.com/nodemcu/nodemcu-firmware/issues/996#issuecomment-178053308) secure connections use **TLS 1.1** with the following cipher suites: `TLS_RSA_WITH_AES_128_CBC_SHA`, `TLS_RSA_WITH_AES_256_CBC_SHA`, `TLS_RSA_WITH_RC4_128_SHA`, and `TLS_RSA_WITH_RC4_128_MD5`. - `autoreconnect` 0/1 for `false`/`true`, default 0 -- `function(client)` call back function for when the connection was established +- `function(client)` callback function for when the connection was established +- `function(client, reason)` callback function for when the connection could not be established #### Returns `true` on success, `false` otherwise +#### Connection failure callback reason codes: + +| Constant | Value | Description | +|----------|-------|-------------| +|`mqtt.CONN_FAIL_SERVER_NOT_FOUND`|-5|There is no broker listening at the specified IP Address and Port| +|`mqtt.CONN_FAIL_NOT_A_CONNACK_MSG`|-4|The response from the broker was not a CONNACK as required by the protocol| +|`mqtt.CONN_FAIL_DNS`|-3|DNS Lookup failed| +|`mqtt.CONN_FAIL_TIMEOUT_RECEIVING`|-2|Timeout waiting for a CONNACK from the broker| +|`mqtt.CONN_FAIL_TIMEOUT_SENDING`|-1|Timeout trying to send the Connect message| +|`mqtt.CONNACK_ACCEPTED`|0|No errors. _Note: This will not trigger a failure callback._| +|`mqtt.CONNACK_REFUSED_PROTOCOL_VER`|1|The broker is not a 3.1.1 MQTT broker.| +|`mqtt.CONNACK_REFUSED_ID_REJECTED`|2|The specified ClientID was rejected by the broker. (See `mqtt.Client()`)| +|`mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE`|3|The server is unavailable.| +|`mqtt.CONNACK_REFUSED_BAD_USER_OR_PASS`|4|The broker refused the specified username or password.| +|`mqtt.CONNACK_REFUSED_NOT_AUTHORIZED`|5|The username is not authorized.| + ## mqtt.client:lwt() Setup [Last Will and Testament](http://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament) (optional). A broker will publish a message with qos = 0, retain = 0, data = "offline" to topic "/lwt" if client does not send keepalive packet. From aa50eca4f6c8857e13ed64347f79c734ecf8d5a9 Mon Sep 17 00:00:00 2001 From: jfollas Date: Sun, 6 Mar 2016 20:11:16 -0500 Subject: [PATCH 2/2] Refactoring of MQTT module to consolidate duplicate code into a function - per @pjsg's suggestion --- app/modules/mqtt.c | 74 ++++++++++++----------------------------- docs/en/modules/mqtt.md | 2 +- 2 files changed, 23 insertions(+), 53 deletions(-) diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index a9ad2ba6..0aad7cff 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -79,6 +79,7 @@ typedef struct lmqtt_userdata static sint8 socket_connect(struct espconn *pesp_conn); static void mqtt_socket_reconnected(void *arg, sint8_t err); static void mqtt_socket_connected(void *arg); +static void mqtt_connack_fail(lmqtt_userdata * mud, int reason_code); static void mqtt_socket_disconnected(void *arg) // tcp only { @@ -198,6 +199,21 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length) NODE_DBG("leave deliver_publish.\n"); } +static void mqtt_connack_fail(lmqtt_userdata * mud, int reason_code) +{ + if(mud->cb_connect_fail_ref == LUA_NOREF) + return; + if(mud->self_ref == LUA_NOREF) + return; + if(mud->L == NULL) + return; + + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua + lua_pushinteger(mud->L, reason_code); + lua_call(mud->L, 2, 0); +} + static void mqtt_socket_received(void *arg, char *pdata, unsigned short len) { NODE_DBG("enter mqtt_socket_received.\n"); @@ -245,17 +261,7 @@ READPACKET: espconn_disconnect(pesp_conn); } - if(mud->cb_connect_fail_ref == LUA_NOREF) - break; - if(mud->self_ref == LUA_NOREF) - break; - if(mud->L == NULL) - break; - - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua - lua_pushinteger(mud->L, MQTT_CONN_FAIL_NOT_A_CONNACK_MSG); - lua_call(mud->L, 2, 0); + mqtt_connack_fail(mud, MQTT_CONN_FAIL_NOT_A_CONNACK_MSG); break; @@ -275,17 +281,7 @@ READPACKET: espconn_disconnect(pesp_conn); } - if(mud->cb_connect_fail_ref == LUA_NOREF) - break; - if(mud->self_ref == LUA_NOREF) - break; - if(mud->L == NULL) - break; - - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua - lua_pushinteger(mud->L, mqtt_get_connect_ret_code(in_buffer)); // CONNACK response code contains the reason why failure occurred - lua_call(mud->L, 2, 0); + mqtt_connack_fail(mud, mqtt_get_connect_ret_code(in_buffer)); break; @@ -591,26 +587,12 @@ void mqtt_socket_timer(void *arg) if(mud->connState == MQTT_INIT){ // socket connect time out. NODE_DBG("Can not connect to broker.\n"); - os_timer_disarm(&mud->mqttTimer); - - if(mud->cb_connect_fail_ref != LUA_NOREF && mud->self_ref != LUA_NOREF && mud->L != NULL) { - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua - lua_pushinteger(mud->L, MQTT_CONN_FAIL_SERVER_NOT_FOUND); - lua_call(mud->L, 2, 0); - } - + mqtt_connack_fail(mud, MQTT_CONN_FAIL_SERVER_NOT_FOUND); } else if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out. NODE_DBG("sSend MQTT_CONNECT failed.\n"); mud->connState = MQTT_INIT; - - if(mud->cb_connect_fail_ref != LUA_NOREF && mud->self_ref != LUA_NOREF && mud->L != NULL) { - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua - lua_pushinteger(mud->L, MQTT_CONN_FAIL_TIMEOUT_SENDING); - lua_call(mud->L, 2, 0); - } + mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_SENDING); #ifdef CLIENT_SSL_ENABLE if(mud->secure) @@ -637,14 +619,7 @@ void mqtt_socket_timer(void *arg) { espconn_disconnect(mud->pesp_conn); } - - if(mud->cb_connect_fail_ref != LUA_NOREF && mud->self_ref != LUA_NOREF && mud->L != NULL) { - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua - lua_pushinteger(mud->L, MQTT_CONN_FAIL_TIMEOUT_RECEIVING); - lua_call(mud->L, 2, 0); - } - + mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_RECEIVING); } else if(mud->connState == MQTT_DATA){ msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q)); if(pending_msg){ @@ -982,12 +957,7 @@ static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) if(pesp_conn != NULL) { lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse; if(mud != NULL) { - if(mud->cb_connect_fail_ref != LUA_NOREF && mud->self_ref != LUA_NOREF && mud->L != NULL) { - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua - lua_pushinteger(mud->L, MQTT_CONN_FAIL_DNS); - lua_call(mud->L, 2, 0); - } + mqtt_connack_fail(mud, MQTT_CONN_FAIL_DNS); } } diff --git a/docs/en/modules/mqtt.md b/docs/en/modules/mqtt.md index 20cc1a6a..07b6d694 100644 --- a/docs/en/modules/mqtt.md +++ b/docs/en/modules/mqtt.md @@ -41,7 +41,7 @@ m:on("message", function(client, topic, data) end) -- for TLS: m:connect("192.168.11.118", secure-port, 1) -m:connect("192.168.11.118", 1880, 0, function(client) print("connected") end, +m:connect("192.168.11.118", 1883, 0, function(client) print("connected") end, function(client, reason) print("failed reason: "..reason) end) -- Calling subscribe/publish only makes sense once the connection