diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index 9b120155..84cca146 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -58,6 +58,7 @@ typedef struct lmqtt_userdata struct espconn *pesp_conn; int self_ref; int cb_connect_ref; + int cb_connect_fail_ref; int cb_disconnect_ref; int cb_message_ref; int cb_suback_ref; @@ -77,6 +78,7 @@ typedef struct lmqtt_userdata static sint8 socket_connect(struct espconn *pesp_conn); static void mqtt_socket_reconnected(void *arg, sint8_t err); static void mqtt_socket_connected(void *arg); +static void mqtt_connack_fail(lmqtt_userdata * mud, int reason_code); static void mqtt_socket_disconnected(void *arg) // tcp only { @@ -193,6 +195,22 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length) NODE_DBG("leave deliver_publish.\n"); } + +static void mqtt_connack_fail(lmqtt_userdata * mud, int reason_code) +{ + if(mud->cb_connect_fail_ref == LUA_NOREF || mud->self_ref == LUA_NOREF) + { + return; + } + + lua_State *L = lua_getstate(); + + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua + lua_pushinteger(L, reason_code); + lua_call(L, 2, 0); +} + static sint8 mqtt_send_if_possible(struct espconn *pesp_conn) { if(pesp_conn == NULL) @@ -253,10 +271,13 @@ READPACKET: uint8_t temp_buffer[MQTT_BUF_SIZE]; mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); mqtt_message_t *temp_msg = NULL; + lua_State *L = lua_getstate(); switch(mud->connState){ case MQTT_CONNECT_SENDING: case MQTT_CONNECT_SENT: + mud->event_timeout = 0; + if(mqtt_get_type(in_buffer) != MQTT_MSG_TYPE_CONNACK){ NODE_DBG("MQTT: Invalid packet\r\n"); mud->connState = MQTT_INIT; @@ -270,6 +291,31 @@ READPACKET: { espconn_disconnect(pesp_conn); } + + mqtt_connack_fail(mud, MQTT_CONN_FAIL_NOT_A_CONNACK_MSG); + + break; + + } else if (mqtt_get_connect_ret_code(in_buffer) != MQTT_CONNACK_ACCEPTED) { + NODE_DBG("MQTT: CONNACK REFUSED (CODE: %d)\n", mqtt_get_connect_ret_code(in_buffer)); + + mud->connState = MQTT_INIT; + +#ifdef CLIENT_SSL_ENABLE + if(mud->secure) + { + espconn_secure_disconnect(pesp_conn); + } + else +#endif + { + espconn_disconnect(pesp_conn); + } + + mqtt_connack_fail(mud, mqtt_get_connect_ret_code(in_buffer)); + + break; + } else { mud->connState = MQTT_DATA; NODE_DBG("MQTT: Connected\r\n"); @@ -352,7 +398,7 @@ READPACKET: case MQTT_MSG_TYPE_PUBREC: if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){ NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n"); - // Note: actrually, should not destroy the msg until PUBCOMP is received. + // Note: actually, should not destroy the msg until PUBCOMP is received. msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); temp_msg = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id); msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, @@ -402,13 +448,13 @@ READPACKET: length = mud->mqtt_state.message_length_read; if(mud->mqtt_state.message_length < mud->mqtt_state.message_length_read) - { - length -= mud->mqtt_state.message_length; - in_buffer += mud->mqtt_state.message_length; + { + length -= mud->mqtt_state.message_length; + in_buffer += mud->mqtt_state.message_length; - NODE_DBG("Get another published message\r\n"); - goto READPACKET; - } + NODE_DBG("Get another published message\r\n"); + goto READPACKET; + } } break; } @@ -435,6 +481,7 @@ static void mqtt_socket_sent(void *arg) if(mud->connState == MQTT_CONNECT_SENDING){ mud->connState = MQTT_CONNECT_SENT; + mud->event_timeout = MQTT_SEND_TIMEOUT; // MQTT_CONNECT not queued. return; } @@ -534,10 +581,13 @@ void mqtt_socket_timer(void *arg) if(mud->connState == MQTT_INIT){ // socket connect time out. NODE_DBG("Can not connect to broker.\n"); - // Never goes here. + os_timer_disarm(&mud->mqttTimer); + mqtt_connack_fail(mud, MQTT_CONN_FAIL_SERVER_NOT_FOUND); } else if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out. NODE_DBG("sSend MQTT_CONNECT failed.\n"); mud->connState = MQTT_INIT; + mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_SENDING); + #ifdef CLIENT_SSL_ENABLE if(mud->secure) { @@ -549,8 +599,21 @@ void mqtt_socket_timer(void *arg) espconn_disconnect(mud->pesp_conn); } mud->keep_alive_tick = 0; // not need count anymore - } else if(mud->connState == MQTT_CONNECT_SENT){ // wait for CONACK time out. - NODE_DBG("MQTT_CONNECT failed.\n"); + } else if(mud->connState == MQTT_CONNECT_SENT) { // wait for CONACK time out. + NODE_DBG("MQTT_CONNECT timeout.\n"); + mud->connState == MQTT_INIT; + +#ifdef CLIENT_SSL_ENABLE + if(mud->secure) + { + espconn_secure_disconnect(mud->pesp_conn); + } + else +#endif + { + espconn_disconnect(mud->pesp_conn); + } + mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_RECEIVING); } else if(mud->connState == MQTT_DATA){ msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q)); if(pending_msg){ @@ -597,6 +660,7 @@ static int mqtt_socket_client( lua_State* L ) // pre-initialize it, in case of errors mud->self_ref = LUA_NOREF; mud->cb_connect_ref = LUA_NOREF; + mud->cb_connect_fail_ref = LUA_NOREF; mud->cb_disconnect_ref = LUA_NOREF; mud->cb_message_ref = LUA_NOREF; @@ -641,7 +705,7 @@ static int mqtt_socket_client( lua_State* L ) } if(username == NULL) unl = 0; - NODE_DBG("lengh username: %d\r\n", unl); + NODE_DBG("length username: %d\r\n", unl); if(lua_isstring( L, stack )){ password = luaL_checklstring( L, stack, &pwl ); @@ -649,7 +713,7 @@ static int mqtt_socket_client( lua_State* L ) } if(password == NULL) pwl = 0; - NODE_DBG("lengh password: %d\r\n", pwl); + NODE_DBG("length password: %d\r\n", pwl); if(lua_isnumber( L, stack )) { @@ -763,6 +827,10 @@ static int mqtt_delete( lua_State* L ) luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref); mud->cb_connect_ref = LUA_NOREF; } + if(LUA_NOREF!=mud->cb_connect_fail_ref){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + mud->cb_connect_fail_ref = LUA_NOREF; + } if(LUA_NOREF!=mud->cb_disconnect_ref){ luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); mud->cb_disconnect_ref = LUA_NOREF; @@ -791,7 +859,9 @@ static int mqtt_delete( lua_State* L ) static sint8 socket_connect(struct espconn *pesp_conn) { + NODE_DBG("enter socket_connect.\n"); + sint8 espconn_status; if(pesp_conn == NULL) @@ -847,6 +917,15 @@ static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) if( dns_reconn_count >= 5 ){ NODE_ERR( "DNS Fail!\n" ); // Note: should delete the pesp_conn or unref self_ref here. + + struct espconn *pesp_conn = arg; + if(pesp_conn != NULL) { + lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse; + if(mud != NULL) { + mqtt_connack_fail(mud, MQTT_CONN_FAIL_DNS); + } + } + mqtt_socket_disconnected(arg); // although not connected, but fire disconnect callback to release every thing. return -1; } @@ -870,7 +949,7 @@ static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) return espconn_status; } -// Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client) ) +// Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client), function(client, connect_return_code) ) static int mqtt_socket_connect( lua_State* L ) { NODE_DBG("enter mqtt_socket_connect.\n"); @@ -986,6 +1065,15 @@ static int mqtt_socket_connect( lua_State* L ) mud->cb_connect_ref = luaL_ref(L, LUA_REGISTRYINDEX); stack++; } + + // call back function when a connection fails + if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){ + lua_pushvalue(L, stack); // copy argument (func) to the top of stack + if(mud->cb_connect_fail_ref != LUA_NOREF) + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + mud->cb_connect_fail_ref = luaL_ref(L, LUA_REGISTRYINDEX); + stack++; + } lua_pushvalue(L, 1); // copy userdata to the top of stack if(mud->self_ref != LUA_NOREF) @@ -1161,7 +1249,7 @@ static int mqtt_socket_subscribe( lua_State* L ) { qos = luaL_checkinteger( L, -1 ); if (topic_count == 0) { - temp_msg = mqtt_msg_subscribe_init( &mud->mqtt_state.mqtt_connection, &msg_id ); + temp_msg = mqtt_msg_subscribe_init( &mud->mqtt_state.mqtt_connection, &msg_id ); } temp_msg = mqtt_msg_subscribe_topic( &mud->mqtt_state.mqtt_connection, topic, qos ); topic_count++; @@ -1213,6 +1301,7 @@ static int mqtt_socket_subscribe( lua_State* L ) { msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) ); NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length); + NODE_DBG("msg_size: %d, event_timeout: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)), mud->event_timeout); sint8 espconn_status = ESPCONN_IF; @@ -1393,9 +1482,23 @@ static const LUA_REG_TYPE mqtt_socket_map[] = { { LNILKEY, LNILVAL } }; + static const LUA_REG_TYPE mqtt_map[] = { - { LSTRKEY( "Client" ), LFUNCVAL( mqtt_socket_client ) }, - { LSTRKEY( "__metatable" ), LROVAL( mqtt_map ) }, + { LSTRKEY( "Client" ), LFUNCVAL( mqtt_socket_client ) }, + + { LSTRKEY( "CONN_FAIL_SERVER_NOT_FOUND" ), LNUMVAL( MQTT_CONN_FAIL_SERVER_NOT_FOUND ) }, + { LSTRKEY( "CONN_FAIL_NOT_A_CONNACK_MSG" ), LNUMVAL( MQTT_CONN_FAIL_NOT_A_CONNACK_MSG ) }, + { LSTRKEY( "CONN_FAIL_DNS" ), LNUMVAL( MQTT_CONN_FAIL_DNS ) }, + { LSTRKEY( "CONN_FAIL_TIMEOUT_RECEIVING" ), LNUMVAL( MQTT_CONN_FAIL_TIMEOUT_RECEIVING ) }, + { LSTRKEY( "CONN_FAIL_TIMEOUT_SENDING" ), LNUMVAL( MQTT_CONN_FAIL_TIMEOUT_SENDING ) }, + { LSTRKEY( "CONNACK_ACCEPTED" ), LNUMVAL( MQTT_CONNACK_ACCEPTED ) }, + { LSTRKEY( "CONNACK_REFUSED_PROTOCOL_VER" ), LNUMVAL( MQTT_CONNACK_REFUSED_PROTOCOL_VER ) }, + { LSTRKEY( "CONNACK_REFUSED_ID_REJECTED" ), LNUMVAL( MQTT_CONNACK_REFUSED_ID_REJECTED ) }, + { LSTRKEY( "CONNACK_REFUSED_SERVER_UNAVAILABLE" ), LNUMVAL( MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE ) }, + { LSTRKEY( "CONNACK_REFUSED_BAD_USER_OR_PASS" ), LNUMVAL( MQTT_CONNACK_REFUSED_BAD_USER_OR_PASS ) }, + { LSTRKEY( "CONNACK_REFUSED_NOT_AUTHORIZED" ), LNUMVAL( MQTT_CONNACK_REFUSED_NOT_AUTHORIZED ) }, + + { LSTRKEY( "__metatable" ), LROVAL( mqtt_map ) }, { LNILKEY, LNILVAL } }; diff --git a/app/mqtt/mqtt_msg.h b/app/mqtt/mqtt_msg.h index e1f372b9..5732831d 100644 --- a/app/mqtt/mqtt_msg.h +++ b/app/mqtt/mqtt_msg.h @@ -65,6 +65,21 @@ enum mqtt_message_type MQTT_MSG_TYPE_DISCONNECT = 14 }; +enum mqtt_connack_return_code +{ + MQTT_CONN_FAIL_SERVER_NOT_FOUND = -5, + MQTT_CONN_FAIL_NOT_A_CONNACK_MSG = -4, + MQTT_CONN_FAIL_DNS = -3, + MQTT_CONN_FAIL_TIMEOUT_RECEIVING = -2, + MQTT_CONN_FAIL_TIMEOUT_SENDING = -1, + MQTT_CONNACK_ACCEPTED = 0, + MQTT_CONNACK_REFUSED_PROTOCOL_VER = 1, + MQTT_CONNACK_REFUSED_ID_REJECTED = 2, + MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3, + MQTT_CONNACK_REFUSED_BAD_USER_OR_PASS = 4, + MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5 +}; + typedef struct mqtt_message { uint8_t* data; @@ -101,6 +116,7 @@ static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; } static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; } static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } +static inline int mqtt_get_connect_ret_code(uint8_t* buffer) { return (buffer[3]); } void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); int mqtt_get_total_length(uint8_t* buffer, uint16_t length); diff --git a/docs/en/modules/mqtt.md b/docs/en/modules/mqtt.md index 844bd0de..7a339d20 100644 --- a/docs/en/modules/mqtt.md +++ b/docs/en/modules/mqtt.md @@ -45,7 +45,8 @@ m:on("message", function(client, topic, data) end) -- for TLS: m:connect("192.168.11.118", secure-port, 1) -m:connect("192.168.11.118", 1880, 0, function(client) print("connected") end) +m:connect("192.168.11.118", 1883, 0, function(client) print("connected") end, + function(client, reason) print("failed reason: "..reason) end) -- Calling subscribe/publish only makes sense once the connection -- was successfully established. In a real-world application you want @@ -82,18 +83,35 @@ none Connects to the broker specified by the given host, port, and secure options. #### Syntax -`mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)])` +`mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)[, function(client, reason)]])` #### Parameters - `host` host, domain or IP (string) - `port` broker port (number), default 1883 - `secure` 0/1 for `false`/`true`, default 0. [As per #996](https://github.com/nodemcu/nodemcu-firmware/issues/996#issuecomment-178053308) secure connections use **TLS 1.1** with the following cipher suites: `TLS_RSA_WITH_AES_128_CBC_SHA`, `TLS_RSA_WITH_AES_256_CBC_SHA`, `TLS_RSA_WITH_RC4_128_SHA`, and `TLS_RSA_WITH_RC4_128_MD5`. - `autoreconnect` 0/1 for `false`/`true`, default 0 -- `function(client)` call back function for when the connection was established +- `function(client)` callback function for when the connection was established +- `function(client, reason)` callback function for when the connection could not be established #### Returns `true` on success, `false` otherwise +#### Connection failure callback reason codes: + +| Constant | Value | Description | +|----------|-------|-------------| +|`mqtt.CONN_FAIL_SERVER_NOT_FOUND`|-5|There is no broker listening at the specified IP Address and Port| +|`mqtt.CONN_FAIL_NOT_A_CONNACK_MSG`|-4|The response from the broker was not a CONNACK as required by the protocol| +|`mqtt.CONN_FAIL_DNS`|-3|DNS Lookup failed| +|`mqtt.CONN_FAIL_TIMEOUT_RECEIVING`|-2|Timeout waiting for a CONNACK from the broker| +|`mqtt.CONN_FAIL_TIMEOUT_SENDING`|-1|Timeout trying to send the Connect message| +|`mqtt.CONNACK_ACCEPTED`|0|No errors. _Note: This will not trigger a failure callback._| +|`mqtt.CONNACK_REFUSED_PROTOCOL_VER`|1|The broker is not a 3.1.1 MQTT broker.| +|`mqtt.CONNACK_REFUSED_ID_REJECTED`|2|The specified ClientID was rejected by the broker. (See `mqtt.Client()`)| +|`mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE`|3|The server is unavailable.| +|`mqtt.CONNACK_REFUSED_BAD_USER_OR_PASS`|4|The broker refused the specified username or password.| +|`mqtt.CONNACK_REFUSED_NOT_AUTHORIZED`|5|The username is not authorized.| + ## mqtt.client:lwt() Setup [Last Will and Testament](http://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament) (optional). A broker will publish a message with qos = 0, retain = 0, data = "offline" to topic "/lwt" if client does not send keepalive packet.