From c695a451ee779509379f9e4fb6dc6f1c2c8214ee Mon Sep 17 00:00:00 2001 From: Nathaniel Wesley Filardo Date: Tue, 5 Jan 2021 11:07:09 +0000 Subject: [PATCH] First round of MQTT fixes (#3360) * mqtt: remove concept of connection timeout Just rely on the network stack to tell us when things have gone south. * mqtt: remove write-only mqtt_state.port field * mqtt: drop useless conditional * mqtt: decouple message sent flag from timer * mqtt: reconnect callback does not need to hang up The network stack has certainly done that for us at this point. Similarly, since we're about to call mqtt_socket_disconnected, don't bother unregistering the timer here, either. * mqtt: don't tick once per second Set the timer for the duration of the wait and cancel it on the other side. * mqtt: defer message queue destruction to _disconnect We're going to want to publish a disconnect message for real, so doing this in _close does no one any favors * mqtt: miscellaneous cleanups No functional change intended * mqtt: close() should send disconnect message for real This means waiting for _sent() to fire again before telling the network stack to disconnect. * mqtt: tidy connect and dns - Push the self-ref to after all allocations and error returns - Don't try to extract IPv4 from the domain string ourselves, let the resolver, since it can - Don't try to connect to localhost. That can't possibly work. * mqtt: common up some callback invocations * mqtt: don't retransmit messages on timeout There's no point in retransmitting messages on timeout; the network stack will be trying to do it for us anyway. * mqtt: remove unnecessary NULL udata checks * mqtt: hold strings in Lua, not C Eliminates a host of C-side allocations. While here, move the rest of the mqtt_connect_info structure out to its own thing, and pack some flags using a bitfield. * mqtt: mqtt_socket_on use lua_checkoption * mqtt: slightly augment debug messages These changes have made some debugging ever so slightly easier. --- app/modules/mqtt.c | 831 +++++++++++++++++-------------------------- app/mqtt/mqtt_msg.h | 11 +- app/mqtt/msg_queue.c | 2 + app/mqtt/msg_queue.h | 2 + docs/modules/mqtt.md | 16 +- 5 files changed, 340 insertions(+), 522 deletions(-) diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index a8d07268..32119573 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -22,21 +22,12 @@ #define MQTT_MAX_CLIENT_LEN 64 #define MQTT_MAX_USER_LEN 64 #define MQTT_MAX_PASS_LEN 64 -#define MQTT_SEND_TIMEOUT 5 - - /* - * This timeout needs to be long enough for a typical TCP connect() - * *and* the TLS handshake, if any. Most network stacks seem to wait - * tens of seconds for connect(), and TLS can take a good deal of time - * and several round trips. Because this matters only rarely, it may - * as well be set pretty high. - */ -#define MQTT_CONNECT_TIMEOUT 60 +#define MQTT_SEND_TIMEOUT 5 /* seconds */ typedef enum { MQTT_INIT, - MQTT_CONNECT_SENT, MQTT_CONNECT_SENDING, + MQTT_CONNECT_SENT, MQTT_DATA } tConnState; @@ -59,7 +50,6 @@ typedef enum { typedef struct mqtt_state_t { - uint16_t port; msg_queue_t* pending_msg_q; uint16_t next_message_id; @@ -86,15 +76,28 @@ typedef struct lmqtt_userdata int cb_suback_ref; int cb_unsuback_ref; int cb_puback_ref; + + /* Configuration options */ + struct { + int client_id_ref; + int username_ref; + int password_ref; + int will_topic_ref; + int will_message_ref; + int keepalive; + uint16_t max_message_length; + struct { + unsigned will_qos : 2; + bool will_retain : 1; + bool clean_session : 1; + bool secure : 1; + } flags; + } conf; + mqtt_state_t mqtt_state; - mqtt_connect_info_t connect_info; - uint16_t keep_alive_tick; - uint32_t event_timeout; -#ifdef CLIENT_SSL_ENABLE - uint8_t secure; -#endif bool connected; // indicate socket connected, not mqtt prot connected. bool keepalive_sent; + bool sending; // data sent to network stack, awaiting local acknowledge ETSTimer mqttTimer; tConnState connState; }lmqtt_userdata; @@ -116,25 +119,28 @@ static uint16_t mqtt_next_message_id(lmqtt_userdata * mud) return mud->mqtt_state.next_message_id; } +static void mqtt_socket_cb_lua_noarg(lua_State *L, lmqtt_userdata *mud, int cb) +{ + if (cb == LUA_NOREF) + return; + + lua_rawgeti(L, LUA_REGISTRYINDEX, cb); + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); + lua_call(L, 1, 0); +} + + static void mqtt_socket_disconnected(void *arg) // tcp only { NODE_DBG("enter mqtt_socket_disconnected.\n"); - bool call_back = false; lmqtt_userdata *mud = arg; if(mud == NULL) return; os_timer_disarm(&mud->mqttTimer); - lua_State *L = lua_getstate(); - - if(mud->connected){ // call back only called when socket is from connection to disconnection. - mud->connected = false; - if((mud->cb_disconnect_ref != LUA_NOREF) && (mud->self_ref != LUA_NOREF)) { - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua - call_back = true; - } + while (mud->mqtt_state.pending_msg_q) { + msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); } if(mud->mqtt_state.recv_buffer) { @@ -148,21 +154,30 @@ static void mqtt_socket_disconnected(void *arg) // tcp only free(mud->pesp_conn.proto.tcp); mud->pesp_conn.proto.tcp = NULL; - mud->connected = false; - luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref); - mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self + int selfref = mud->self_ref; + mud->self_ref = LUA_NOREF; - if(call_back){ - lua_call(L, 1, 0); + lua_State *L = lua_getstate(); + + if(mud->connected){ // call back only called when socket is from connection to disconnection. + mud->connected = false; + if((mud->cb_disconnect_ref != LUA_NOREF) && (selfref != LUA_NOREF)) { + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); + lua_rawgeti(L, LUA_REGISTRYINDEX, selfref); + lua_call(L, 1, 0); + } } + // unref this, and the mqtt.socket userdata will delete it self + luaL_unref(L, LUA_REGISTRYINDEX, selfref); + NODE_DBG("leave mqtt_socket_disconnected.\n"); } static void mqtt_socket_do_disconnect(struct lmqtt_userdata *mud) { #ifdef CLIENT_SSL_ENABLE - if (mud->secure) { + if (mud->conf.flags.secure) { espconn_secure_disconnect(&mud->pesp_conn); } else #endif @@ -173,17 +188,12 @@ static void mqtt_socket_do_disconnect(struct lmqtt_userdata *mud) static void mqtt_socket_reconnected(void *arg, sint8_t err) { - NODE_DBG("enter mqtt_socket_reconnected.\n"); + NODE_DBG("enter mqtt_socket_reconnected (err=%d)\n", err); lmqtt_userdata *mud = arg; if(mud == NULL) return; os_timer_disarm(&mud->mqttTimer); - - mud->event_timeout = 0; // no need to count anymore - - mqtt_socket_do_disconnect(mud); - mqtt_connack_fail(mud, MQTT_CONN_FAIL_SERVER_NOT_FOUND); mqtt_socket_disconnected(arg); @@ -212,7 +222,7 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, uint16_t len lua_State *L = lua_getstate(); if(event_data.topic && (event_data.topic_length > 0)){ lua_rawgeti(L, LUA_REGISTRYINDEX, cb_ref); - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); lua_pushlstring(L, event_data.topic, event_data.topic_length); } else { NODE_DBG("get wrong packet.\n"); @@ -230,6 +240,8 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, uint16_t len static void mqtt_connack_fail(lmqtt_userdata * mud, int reason_code) { + NODE_DBG("enter mqtt_connack_fail\n"); + if(mud->cb_connect_fail_ref == LUA_NOREF || mud->self_ref == LUA_NOREF) { return; @@ -238,35 +250,41 @@ static void mqtt_connack_fail(lmqtt_userdata * mud, int reason_code) lua_State *L = lua_getstate(); lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); lua_pushinteger(L, reason_code); lua_call(L, 2, 0); + + NODE_DBG("leave mqtt_connack_fail\n"); } static sint8 mqtt_send_if_possible(struct lmqtt_userdata *mud) { + /* Waiting for the local network stack to get back to us? Can't send. */ + if (mud->sending) + return ESPCONN_OK; + sint8 espconn_status = ESPCONN_OK; - // This indicates if we have sent something and are waiting for something to - // happen - if (mud->event_timeout == 0) { - msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q)); - if (pending_msg) { - mud->event_timeout = MQTT_SEND_TIMEOUT; - NODE_DBG("Sent: %d\n", pending_msg->msg.length); + msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q)); + if (pending_msg && !pending_msg->sent) { + pending_msg->sent = 1; + NODE_DBG("Sent: %d\n", pending_msg->msg.length); #ifdef CLIENT_SSL_ENABLE - if( mud->secure ) - { - espconn_status = espconn_secure_send(&mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length ); - } - else -#endif - { - espconn_status = espconn_send(&mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length ); - } - mud->keep_alive_tick = 0; + if( mud->conf.flags.secure ) + { + espconn_status = espconn_secure_send(&mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length ); } + else +#endif + { + espconn_status = espconn_send(&mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length ); + } + mud->sending = true; + + os_timer_disarm(&mud->mqttTimer); + os_timer_arm(&mud->mqttTimer, MQTT_SEND_TIMEOUT * 1000, 0); } + NODE_DBG("send_if_poss, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); return espconn_status; } @@ -388,11 +406,11 @@ READPACKET: mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE); mqtt_message_t *temp_msg = NULL; - lua_State *L = lua_getstate(); switch(mud->connState){ case MQTT_CONNECT_SENDING: case MQTT_CONNECT_SENT: - mud->event_timeout = 0; + os_timer_disarm(&mud->mqttTimer); + os_timer_arm(&mud->mqttTimer, mud->conf.keepalive * 1000, 0); if(mqtt_get_type(in_buffer) != MQTT_MSG_TYPE_CONNACK){ NODE_DBG("MQTT: Invalid packet\r\n"); @@ -414,13 +432,8 @@ READPACKET: mud->connState = MQTT_DATA; NODE_DBG("MQTT: Connected\r\n"); mud->keepalive_sent = 0; - if(mud->cb_connect_ref == LUA_NOREF) - break; - if(mud->self_ref == LUA_NOREF) - break; - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_connect_ref); - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua - lua_call(L, 1, 0); + + mqtt_socket_cb_lua_noarg(lua_getstate(), mud, mud->cb_connect_ref); break; } break; @@ -435,7 +448,7 @@ READPACKET: message_length, in_buffer_length); - if (message_length > mud->connect_info.max_message_length) { + if (message_length > mud->conf.max_message_length) { // The pending message length is larger than we was configured to allow if(msg_qos > 0 && msg_id == 0) { NODE_DBG("MQTT: msg too long, but not enough data to get msg_id: total=%u, deliver=%u\r\n", message_length, in_buffer_length); @@ -526,13 +539,8 @@ READPACKET: if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_SUBSCRIBE && pending_msg->msg_id == msg_id){ NODE_DBG("MQTT: Subscribe successful\r\n"); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - if (mud->cb_suback_ref == LUA_NOREF) - break; - if (mud->self_ref == LUA_NOREF) - break; - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_suback_ref); - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); - lua_call(L, 1, 0); + + mqtt_socket_cb_lua_noarg(lua_getstate(), mud, mud->cb_suback_ref); } break; case MQTT_MSG_TYPE_UNSUBACK: @@ -540,13 +548,7 @@ READPACKET: NODE_DBG("MQTT: UnSubscribe successful\r\n"); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - if (mud->cb_unsuback_ref == LUA_NOREF) - break; - if (mud->self_ref == LUA_NOREF) - break; - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_unsuback_ref); - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); - lua_call(L, 1, 0); + mqtt_socket_cb_lua_noarg(lua_getstate(), mud, mud->cb_unsuback_ref); } break; case MQTT_MSG_TYPE_PUBLISH: @@ -569,13 +571,8 @@ READPACKET: if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){ NODE_DBG("MQTT: Publish with QoS = 1 successful\r\n"); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - if(mud->cb_puback_ref == LUA_NOREF) - break; - if(mud->self_ref == LUA_NOREF) - break; - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_puback_ref); - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua - lua_call(L, 1, 0); + + mqtt_socket_cb_lua_noarg(lua_getstate(), mud, mud->cb_puback_ref); } break; @@ -603,13 +600,8 @@ READPACKET: if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBREL && pending_msg->msg_id == msg_id){ NODE_DBG("MQTT: Publish with QoS = 2 successful\r\n"); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - if(mud->cb_puback_ref == LUA_NOREF) - break; - if(mud->self_ref == LUA_NOREF) - break; - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_puback_ref); - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua - lua_call(L, 1, 0); + + mqtt_socket_cb_lua_noarg(lua_getstate(), mud, mud->cb_puback_ref); } break; case MQTT_MSG_TYPE_PINGREQ: @@ -669,40 +661,51 @@ static void mqtt_socket_sent(void *arg) return; if(!mud->connected) return; - // call mqtt_sent() - mud->event_timeout = 0; - mud->keep_alive_tick = 0; + + mud->sending = false; + + os_timer_disarm(&mud->mqttTimer); if(mud->connState == MQTT_CONNECT_SENDING){ mud->connState = MQTT_CONNECT_SENT; - mud->event_timeout = MQTT_SEND_TIMEOUT; + os_timer_arm(&mud->mqttTimer, MQTT_SEND_TIMEOUT * 1000, 0); // MQTT_CONNECT not queued. return; } + + /* Ready for timeout */ + os_timer_arm(&mud->mqttTimer, mud->conf.keepalive * 1000, 0); + NODE_DBG("sent1, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); - uint8_t try_send = 1; - // qos = 0, publish and forgot. + msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q)); - if(node && node->msg_type == MQTT_MSG_TYPE_PUBLISH && node->publish_qos == 0) { - msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - if(mud->cb_puback_ref != LUA_NOREF && mud->self_ref != LUA_NOREF) { - lua_State *L = lua_getstate(); - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_puback_ref); - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua - lua_call(L, 1, 0); + if (node) { + switch (node->msg_type) { + case MQTT_MSG_TYPE_PUBLISH: + // qos = 0, publish and forget. Run the callback now because we + // won't get a puback from the server and it's not clear when else + // we should tell the user the message drained from the egress queue + if (node->publish_qos == 0) { + msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); + mqtt_socket_cb_lua_noarg(lua_getstate(), mud, mud->cb_puback_ref); + mqtt_send_if_possible(mud); + } + break; + case MQTT_MSG_TYPE_PUBACK: + /* FALLTHROUGH */ + case MQTT_MSG_TYPE_PUBCOMP: + /* FALLTHROUGH */ + case MQTT_MSG_TYPE_PINGREQ: + msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); + mqtt_send_if_possible(mud); + break; + case MQTT_MSG_TYPE_DISCONNECT: + msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); + mqtt_socket_do_disconnect(mud); + break; } - } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK) { - msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) { - msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - } else if(node && node->msg_type == MQTT_MSG_TYPE_PINGREQ) { - msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - } else { - try_send = 0; - } - if (try_send) { - mqtt_send_if_possible(mud); } + NODE_DBG("sent2, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("leave mqtt_socket_sent.\n"); } @@ -723,13 +726,34 @@ static void mqtt_socket_connected(void *arg) mqtt_message_buffer_t msgb; mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE); - mqtt_message_t* temp_msg = mqtt_msg_connect(&msgb, &mud->connect_info); + /* Build the mqtt_connect_info on the stack and assemble the message */ + lua_State *L = lua_getstate(); + int ltop = lua_gettop(L); + struct mqtt_connect_info mci; + mci.clean_session = mud->conf.flags.clean_session; + mci.will_retain = mud->conf.flags.will_retain; + mci.will_qos = mud->conf.flags.will_qos; + mci.keepalive = mud->conf.keepalive; + + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->conf.client_id_ref); + mci.client_id = lua_tostring(L, -1); + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->conf.username_ref); + mci.username = lua_tostring(L, -1); + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->conf.password_ref); + mci.password = lua_tostring(L, -1); + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->conf.will_topic_ref); + mci.will_topic = lua_tostring(L, -1); + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->conf.will_message_ref); + mci.will_message = lua_tostring(L, -1); + + mqtt_message_t* temp_msg = mqtt_msg_connect(&msgb, &mci); NODE_DBG("Send MQTT connection infomation, data len: %d, d[0]=%d \r\n", temp_msg->length, temp_msg->data[0]); - mud->event_timeout = MQTT_SEND_TIMEOUT; + lua_settop(L, ltop); /* Done with strings, restore the stack */ + // not queue this message. should send right now. or should enqueue this before head. #ifdef CLIENT_SSL_ENABLE - if(mud->secure) + if(mud->conf.flags.secure) { espconn_secure_send(pesp_conn, temp_msg->data, temp_msg->length); } @@ -738,10 +762,14 @@ static void mqtt_socket_connected(void *arg) { espconn_send(pesp_conn, temp_msg->data, temp_msg->length); } - mud->keep_alive_tick = 0; + mud->sending = true; + + os_timer_disarm(&mud->mqttTimer); + os_timer_arm(&mud->mqttTimer, MQTT_SEND_TIMEOUT * 1000, 0); mud->connState = MQTT_CONNECT_SENDING; - NODE_DBG("leave mqtt_socket_connectet, heap = %u.\n", system_get_free_heap_size()); + + NODE_DBG("leave mqtt_socket_connected, heap = %u.\n", system_get_free_heap_size()); return; } @@ -755,70 +783,41 @@ void mqtt_socket_timer(void *arg) if(mud->pesp_conn.proto.tcp == NULL){ NODE_DBG("MQTT not connected\n"); - os_timer_disarm(&mud->mqttTimer); return; } NODE_DBG("timer, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); - if(mud->event_timeout > 0){ - NODE_DBG("event_timeout: %d.\n", mud->event_timeout); - mud->event_timeout --; - if(mud->event_timeout > 0){ - return; - } else { - NODE_DBG("event timeout. \n"); - if(mud->connState == MQTT_DATA) - msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - // should remove the head of the queue and re-send with DUP = 1 - // Not implemented yet. - } - } - if(mud->connState == MQTT_INIT){ // socket connect time out. - NODE_DBG("Can not connect to broker.\n"); - os_timer_disarm(&mud->mqttTimer); - mqtt_socket_do_disconnect(mud); - mqtt_connack_fail(mud, MQTT_CONN_FAIL_SERVER_NOT_FOUND); - } else if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out. + if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out. NODE_DBG("sSend MQTT_CONNECT failed.\n"); mud->connState = MQTT_INIT; mqtt_socket_do_disconnect(mud); mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_SENDING); - - mud->keep_alive_tick = 0; // not need count anymore } else if(mud->connState == MQTT_CONNECT_SENT) { // wait for CONACK time out. NODE_DBG("MQTT_CONNECT timeout.\n"); mud->connState = MQTT_INIT; mqtt_socket_do_disconnect(mud); mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_RECEIVING); } else if(mud->connState == MQTT_DATA){ - msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q)); - if(pending_msg){ - mqtt_send_if_possible(mud); - } else { + if(!msg_peek(&(mud->mqtt_state.pending_msg_q))) { // no queued event. - mud->keep_alive_tick ++; - if(mud->keep_alive_tick > mud->connect_info.keepalive){ - if (mud->keepalive_sent) { - // Oh dear -- keepalive timer expired and still no ack of previous message - mqtt_socket_reconnected(&mud->pesp_conn, 0); - } else { - uint8_t temp_buffer[MQTT_BUF_SIZE]; - mqtt_message_buffer_t msgb; - mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE); + if (mud->keepalive_sent) { + // Oh dear -- keepalive timer expired and still no ack of previous message + mqtt_socket_do_disconnect(mud); + } else { + uint8_t temp_buffer[MQTT_BUF_SIZE]; + mqtt_message_buffer_t msgb; + mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE); - NODE_DBG("\r\nMQTT: Send keepalive packet\r\n"); - mqtt_message_t* temp_msg = mqtt_msg_pingreq(&msgb); - msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg, - 0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) ); - mud->keepalive_sent = 1; - mud->keep_alive_tick = 0; // Need to reset to zero in case flow control stopped. - mqtt_send_if_possible(mud); - } + NODE_DBG("\r\nMQTT: Send keepalive packet\r\n"); + mqtt_message_t* temp_msg = mqtt_msg_pingreq(&msgb); + msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg, + 0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) ); + mud->keepalive_sent = 1; + mqtt_send_if_possible(mud); } } } - NODE_DBG("keep_alive_tick: %d\n", mud->keep_alive_tick); NODE_DBG("leave mqtt_socket_timer.\n"); } @@ -828,18 +827,7 @@ static int mqtt_socket_client( lua_State* L ) NODE_DBG("enter mqtt_socket_client.\n"); lmqtt_userdata *mud; - char tempid[20] = {0}; - sprintf(tempid, "%s%x", "NodeMCU_", system_get_chip_id() ); - NODE_DBG(tempid); - NODE_DBG("\n"); - - const char *clientId = tempid, *username = NULL, *password = NULL; - size_t idl = strlen(tempid); - size_t unl = 0, pwl = 0; - int keepalive = 0; int stack = 1; - int clean_session = 1; - int max_message_length = 0; int top = lua_gettop(L); // create a object @@ -857,6 +845,12 @@ static int mqtt_socket_client( lua_State* L ) mud->cb_unsuback_ref = LUA_NOREF; mud->cb_puback_ref = LUA_NOREF; + mud->conf.client_id_ref = LUA_NOREF; + mud->conf.username_ref = LUA_NOREF; + mud->conf.password_ref = LUA_NOREF; + mud->conf.will_topic_ref = LUA_NOREF; + mud->conf.will_message_ref = LUA_NOREF; + mud->connState = MQTT_INIT; // set its metatable @@ -865,93 +859,52 @@ static int mqtt_socket_client( lua_State* L ) if( lua_isstring(L,stack) ) // deal with the clientid string { - clientId = luaL_checklstring( L, stack, &idl ); + lua_pushvalue(L, stack); stack++; + } else { + char tempid[20] = {0}; + sprintf(tempid, "%s%x", "NodeMCU_", system_get_chip_id() ); + lua_pushstring(L, tempid); } + mud->conf.client_id_ref = luaL_ref(L, LUA_REGISTRYINDEX); if(lua_isnumber( L, stack )) { - keepalive = luaL_checkinteger( L, stack); + mud->conf.keepalive = luaL_checkinteger( L, stack); stack++; } - - if(keepalive == 0){ - keepalive = MQTT_DEFAULT_KEEPALIVE; + if(mud->conf.keepalive == 0) { + mud->conf.keepalive = MQTT_DEFAULT_KEEPALIVE; } if(lua_isstring( L, stack )){ - username = luaL_checklstring( L, stack, &unl ); + lua_pushvalue(L, stack); + mud->conf.username_ref = luaL_ref(L, LUA_REGISTRYINDEX); stack++; } - if(username == NULL) - unl = 0; - NODE_DBG("length username: %d\r\n", unl); if(lua_isstring( L, stack )){ - password = luaL_checklstring( L, stack, &pwl ); + lua_pushvalue(L, stack); + mud->conf.password_ref = luaL_ref(L, LUA_REGISTRYINDEX); stack++; } - if(password == NULL) - pwl = 0; - NODE_DBG("length password: %d\r\n", pwl); - - if(lua_isnumber( L, stack )) - { - clean_session = luaL_checkinteger( L, stack); - stack++; - } - - if(clean_session > 1){ - clean_session = 1; - } if(lua_isnumber( L, stack )) { - max_message_length = luaL_checkinteger( L, stack); + mud->conf.flags.clean_session = !!luaL_checkinteger(L, stack); + stack++; + } + + if(lua_isnumber( L, stack )) + { + mud->conf.max_message_length = luaL_checkinteger( L, stack); stack++; } - - if(max_message_length == 0) { - max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; + if(mud->conf.max_message_length == 0) { + mud->conf.max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; } - // TODO: check the zalloc result. - mud->connect_info.client_id = (uint8_t *)calloc(1,idl+1); - mud->connect_info.username = (uint8_t *)calloc(1,unl + 1); - mud->connect_info.password = (uint8_t *)calloc(1,pwl + 1); - if(!mud->connect_info.client_id || !mud->connect_info.username || !mud->connect_info.password){ - if(mud->connect_info.client_id) { - free(mud->connect_info.client_id); - mud->connect_info.client_id = NULL; - } - if(mud->connect_info.username) { - free(mud->connect_info.username); - mud->connect_info.username = NULL; - } - if(mud->connect_info.password) { - free(mud->connect_info.password); - mud->connect_info.password = NULL; - } - return luaL_error(L, "not enough memory"); - } - - memcpy(mud->connect_info.client_id, clientId, idl); - mud->connect_info.client_id[idl] = 0; - memcpy(mud->connect_info.username, username, unl); - mud->connect_info.username[unl] = 0; - memcpy(mud->connect_info.password, password, pwl); - mud->connect_info.password[pwl] = 0; - - NODE_DBG("MQTT: Init info: %s, %s, %s\r\n", mud->connect_info.client_id, mud->connect_info.username, mud->connect_info.password); - - mud->connect_info.clean_session = clean_session; - mud->connect_info.will_qos = 0; - mud->connect_info.will_retain = 0; - mud->connect_info.keepalive = keepalive; - mud->connect_info.max_message_length = max_message_length; - mud->mqtt_state.pending_msg_q = NULL; - mud->mqtt_state.port = 1883; mud->mqtt_state.recv_buffer = NULL; mud->mqtt_state.recv_buffer_size = 0; mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL; @@ -968,11 +921,6 @@ static int mqtt_delete( lua_State* L ) NODE_DBG("enter mqtt_delete.\n"); lmqtt_userdata *mud = (lmqtt_userdata *)luaL_checkudata(L, 1, "mqtt.socket"); - luaL_argcheck(L, mud, 1, "mqtt.socket expected"); - if(mud==NULL){ - NODE_DBG("userdata is nil.\n"); - return 0; - } os_timer_disarm(&mud->mqttTimer); mud->connected = false; @@ -986,18 +934,6 @@ static int mqtt_delete( lua_State* L ) msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); } - // ---- alloc-ed in mqtt_socket_lwt() - if(mud->connect_info.will_topic){ - free(mud->connect_info.will_topic); - mud->connect_info.will_topic = NULL; - } - - if(mud->connect_info.will_message){ - free(mud->connect_info.will_message); - mud->connect_info.will_message = NULL; - } - // ---- - //--------- alloc-ed in mqtt_socket_received() if(mud->mqtt_state.recv_buffer) { free(mud->mqtt_state.recv_buffer); @@ -1005,21 +941,6 @@ static int mqtt_delete( lua_State* L ) } // ---- - //--------- alloc-ed in mqtt_socket_client() - if(mud->connect_info.client_id){ - free(mud->connect_info.client_id); - mud->connect_info.client_id = NULL; - } - if(mud->connect_info.username){ - free(mud->connect_info.username); - mud->connect_info.username = NULL; - } - if(mud->connect_info.password){ - free(mud->connect_info.password); - mud->connect_info.password = NULL; - } - // ------- - // free (unref) callback ref luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref); mud->cb_connect_ref = LUA_NOREF; @@ -1038,6 +959,17 @@ static int mqtt_delete( lua_State* L ) luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref); mud->cb_puback_ref = LUA_NOREF; + luaL_unref(L, LUA_REGISTRYINDEX, mud->conf.client_id_ref); + mud->conf.client_id_ref = LUA_NOREF; + luaL_unref(L, LUA_REGISTRYINDEX, mud->conf.username_ref); + mud->conf.username_ref = LUA_NOREF; + luaL_unref(L, LUA_REGISTRYINDEX, mud->conf.password_ref); + mud->conf.password_ref = LUA_NOREF; + luaL_unref(L, LUA_REGISTRYINDEX, mud->conf.will_topic_ref); + mud->conf.will_topic_ref = LUA_NOREF; + luaL_unref(L, LUA_REGISTRYINDEX, mud->conf.will_message_ref); + mud->conf.will_message_ref = LUA_NOREF; + int selfref = mud->self_ref; mud->self_ref = LUA_NOREF; luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref); @@ -1049,14 +981,14 @@ static int mqtt_delete( lua_State* L ) static sint8 mqtt_socket_do_connect(struct lmqtt_userdata *mud) { - NODE_DBG("enter socket_connect.\n"); + NODE_DBG("enter mqtt_socket_do_connect.\n"); sint8 espconn_status; - mud->event_timeout = MQTT_CONNECT_TIMEOUT; mud->connState = MQTT_INIT; #ifdef CLIENT_SSL_ENABLE - if(mud->secure) + if(mud->conf.flags.secure) { + NODE_DBG("mqtt_socket_do_connect using espconn_secure\n"); espconn_status = espconn_secure_connect(&mud->pesp_conn); } else @@ -1065,48 +997,31 @@ static sint8 mqtt_socket_do_connect(struct lmqtt_userdata *mud) espconn_status = espconn_connect(&mud->pesp_conn); } - os_timer_arm(&mud->mqttTimer, 1000, 1); - - NODE_DBG("leave socket_connect\n"); + NODE_DBG("leave mqtt_socket_do_connect espconn_status=%d\n", espconn_status); return espconn_status; } -static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) +static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) { lmqtt_userdata *mud = arg; - NODE_DBG("enter socket_dns_found.\n"); - sint8 espconn_status = ESPCONN_OK; - - if(ipaddr == NULL) + if((ipaddr == NULL) || (ipaddr->addr == 0)) { + NODE_DBG("socket_dns_found failure\n"); mqtt_connack_fail(mud, MQTT_CONN_FAIL_DNS); // although not connected, but fire disconnect callback to release every thing. mqtt_socket_disconnected(arg); - return -1; + return; } - // ipaddr->addr is a uint32_t ip - if(ipaddr->addr != 0) - { - memcpy(&mud->pesp_conn.proto.tcp->remote_ip, &(ipaddr->addr), 4); - NODE_DBG("TCP ip is set: "); - NODE_DBG(IPSTR, IP2STR(&(ipaddr->addr))); - NODE_DBG("\n"); - espconn_status = mqtt_socket_do_connect(mud); - } + memcpy(&mud->pesp_conn.proto.tcp->remote_ip, &(ipaddr->addr), 4); + NODE_DBG("socket_dns_found success: "); + NODE_DBG(IPSTR, IP2STR(&(ipaddr->addr))); + NODE_DBG("\n"); - NODE_DBG("leave socket_dns_found.\n"); - - return espconn_status; -} - -/* wrapper for using socket_dns_found() as callback function */ -static void socket_dns_foundcb(const char *name, ip_addr_t *ipaddr, void *arg) -{ - socket_dns_found(name, ipaddr, arg); + mqtt_socket_do_connect(mud); } #include "pm/swtimer.h" @@ -1120,76 +1035,47 @@ static int mqtt_socket_connect( lua_State* L ) ip_addr_t ipaddr; const char *domain; int stack = 1; - unsigned secure = 0; int top = lua_gettop(L); mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket"); - luaL_argcheck(L, mud, stack, "mqtt.socket expected"); stack++; - if(mud == NULL) - return 0; + + struct espconn *pesp_conn = &mud->pesp_conn; if(mud->connected){ return luaL_error(L, "already connected"); } - struct espconn *pesp_conn = &mud->pesp_conn; - - if (!pesp_conn->proto.tcp) - pesp_conn->proto.tcp = (esp_tcp *)calloc(1,sizeof(esp_tcp)); - - if(!pesp_conn->proto.tcp) { - return luaL_error(L, "not enough memory"); - } - - // reverse is for the callback function - pesp_conn->type = ESPCONN_TCP; - pesp_conn->state = ESPCONN_NONE; mud->connected = false; + mud->sending = false; if( (stack<=top) && lua_isstring(L,stack) ) // deal with the domain string { domain = luaL_checklstring( L, stack, &il ); - + if (!domain) + return luaL_error(L, "invalid domain"); stack++; - if (domain == NULL) - { - domain = "127.0.0.1"; - } - ipaddr.addr = ipaddr_addr(domain); - memcpy(pesp_conn->proto.tcp->remote_ip, &ipaddr.addr, 4); - NODE_DBG("TCP ip is set: "); - NODE_DBG(IPSTR, IP2STR(&ipaddr.addr)); - NODE_DBG("\n"); } if ( (stack<=top) && lua_isnumber(L, stack) ) { port = lua_tointeger(L, stack); stack++; - NODE_DBG("TCP port is set: %d.\n", port); } - pesp_conn->proto.tcp->remote_port = port; - if (pesp_conn->proto.tcp->local_port == 0) - pesp_conn->proto.tcp->local_port = espconn_port(); - mud->mqtt_state.port = port; if ( (stack<=top) && (lua_isnumber(L, stack) || lua_isboolean(L, stack)) ) { if (lua_isnumber(L, stack)) { platform_print_deprecation_note("mqtt.connect secure parameter as integer","in the future"); - secure = !!lua_tointeger(L, stack); + mud->conf.flags.secure = !!lua_tointeger(L, stack); } else { - secure = lua_toboolean(L, stack); + mud->conf.flags.secure = lua_toboolean(L, stack); } stack++; - } else { - secure = 0; // default to 0 } -#ifdef CLIENT_SSL_ENABLE - mud->secure = secure; // save -#else - if ( secure ) + +#ifndef CLIENT_SSL_ENABLE + if ( mud->conf.flags.secure ) { return luaL_error(L, "ssl not available"); } @@ -1211,10 +1097,24 @@ static int mqtt_socket_connect( lua_State* L ) mud->cb_connect_fail_ref = luaL_ref(L, LUA_REGISTRYINDEX); } - lua_pushvalue(L, 1); // copy userdata to the top of stack + if (!pesp_conn->proto.tcp) + pesp_conn->proto.tcp = (esp_tcp *)calloc(1,sizeof(esp_tcp)); + + if(!pesp_conn->proto.tcp) { + return luaL_error(L, "not enough memory"); + } + luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref); + lua_pushvalue(L, 1); // copy userdata and persist it in the registry mud->self_ref = luaL_ref(L, LUA_REGISTRYINDEX); + pesp_conn->type = ESPCONN_TCP; + pesp_conn->state = ESPCONN_NONE; + + pesp_conn->proto.tcp->remote_port = port; + if (pesp_conn->proto.tcp->local_port == 0) + pesp_conn->proto.tcp->local_port = espconn_port(); + espconn_regist_connectcb(pesp_conn, mqtt_socket_connected); espconn_regist_reconcb(pesp_conn, mqtt_socket_reconnected); @@ -1225,24 +1125,17 @@ static int mqtt_socket_connect( lua_State* L ) //My guess: If in doubt, resume the timer // timer started in socket_connect() - if((ipaddr.addr == IPADDR_NONE) && (memcmp(domain,"255.255.255.255",16) != 0)) + ip_addr_t host_ip; + switch (dns_gethostbyname(domain, &host_ip, socket_dns_found, mud)) { - ip_addr_t host_ip; - switch (dns_gethostbyname(domain, &host_ip, socket_dns_foundcb, mud)) - { - case ERR_OK: - socket_dns_found(domain, &host_ip, mud); // ip is returned in host_ip. - break; - case ERR_INPROGRESS: - break; - default: - // Something has gone wrong; bail out? - mqtt_connack_fail(mud, MQTT_CONN_FAIL_DNS); - } - } - else - { - mqtt_socket_do_connect(mud); + case ERR_OK: + socket_dns_found(domain, &host_ip, mud); // ip is returned in host_ip. + break; + case ERR_INPROGRESS: + break; + default: + // Something has gone wrong; bail out? + mqtt_connack_fail(mud, MQTT_CONN_FAIL_DNS); } NODE_DBG("leave mqtt_socket_connect.\n"); @@ -1255,17 +1148,11 @@ static int mqtt_socket_connect( lua_State* L ) static int mqtt_socket_close( lua_State* L ) { NODE_DBG("enter mqtt_socket_close.\n"); - int i = 0; lmqtt_userdata *mud = NULL; mud = (lmqtt_userdata *)luaL_checkudata(L, 1, "mqtt.socket"); - luaL_argcheck(L, mud, 1, "mqtt.socket expected"); - if (mud == NULL) { - lua_pushboolean(L, 0); - return 1; - } - sint8 espconn_status = ESPCONN_CONN; + sint8 espconn_status = 0; if (mud->connected) { uint8_t temp_buffer[MQTT_BUF_SIZE]; mqtt_message_buffer_t msgb; @@ -1275,34 +1162,23 @@ static int mqtt_socket_close( lua_State* L ) mqtt_message_t* temp_msg = mqtt_msg_disconnect(&msgb); NODE_DBG("Send MQTT disconnect infomation, data len: %d, d[0]=%d \r\n", temp_msg->length, temp_msg->data[0]); - /* XXX This fails to actually send the disconnect message before hanging up */ -#ifdef CLIENT_SSL_ENABLE - if(mud->secure) { - espconn_status = espconn_secure_send(&mud->pesp_conn, temp_msg->data, temp_msg->length); - if(mud->pesp_conn.proto.tcp->remote_port || mud->pesp_conn.proto.tcp->local_port) - espconn_status |= espconn_secure_disconnect(&mud->pesp_conn); - } else -#endif - { - espconn_status = espconn_send(&mud->pesp_conn, temp_msg->data, temp_msg->length); - if(mud->pesp_conn.proto.tcp->remote_port || mud->pesp_conn.proto.tcp->local_port) - espconn_status |= espconn_disconnect(&mud->pesp_conn); - } - } - mud->connected = false; + if (!msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, 0, + MQTT_MSG_TYPE_DISCONNECT, 0)) + goto err; - while (mud->mqtt_state.pending_msg_q) { - msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); + mqtt_send_if_possible(mud); } NODE_DBG("leave mqtt_socket_close.\n"); + return 0; - if (espconn_status == ESPCONN_OK) { - lua_pushboolean(L, 1); - } else { - lua_pushboolean(L, 0); - } - return 1; +err: + /* OOM while trying to build the disconnect message. Fail the connection */ + mqtt_socket_do_disconnect(mud); + + NODE_DBG("leave mqtt_socket_close on error path\n"); + + return 0; } // Lua: mqtt:on( "method", function() ) @@ -1313,47 +1189,51 @@ static int mqtt_socket_on( lua_State* L ) size_t sl; mud = (lmqtt_userdata *)luaL_checkudata(L, 1, "mqtt.socket"); - luaL_argcheck(L, mud, 1, "mqtt.socket expected"); - if(mud==NULL){ - NODE_DBG("userdata is nil.\n"); - return 0; - } - - const char *method = luaL_checklstring( L, 2, &sl ); - if (method == NULL) - return luaL_error( L, "wrong arg type" ); luaL_checktype(L, 3, LUA_TFUNCTION); lua_pushvalue(L, 3); // copy argument (func) to the top of stack - if( sl == 7 && strcmp(method, "connect") == 0){ - luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref); - mud->cb_connect_ref = luaL_ref(L, LUA_REGISTRYINDEX); - }else if( sl == 8 && strcmp(method, "connfail") == 0){ - luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); - mud->cb_connect_fail_ref = luaL_ref(L, LUA_REGISTRYINDEX); - }else if( sl == 7 && strcmp(method, "offline") == 0){ - luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); - mud->cb_disconnect_ref = luaL_ref(L, LUA_REGISTRYINDEX); - }else if( sl == 7 && strcmp(method, "message") == 0){ - luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_message_ref); - mud->cb_message_ref = luaL_ref(L, LUA_REGISTRYINDEX); - }else if( sl == 8 && strcmp(method, "overflow") == 0){ - luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_overflow_ref); - mud->cb_overflow_ref = luaL_ref(L, LUA_REGISTRYINDEX); - }else if( sl == 6 && strcmp(method, "puback") == 0){ - luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref); - mud->cb_puback_ref = luaL_ref(L, LUA_REGISTRYINDEX); - }else if( sl == 6 && strcmp(method, "suback") == 0){ - luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_suback_ref); - mud->cb_suback_ref = luaL_ref(L, LUA_REGISTRYINDEX); - }else if( sl == 8 && strcmp(method, "unsuback") == 0){ - luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_unsuback_ref); - mud->cb_unsuback_ref = luaL_ref(L, LUA_REGISTRYINDEX); - }else{ - lua_pop(L, 1); - return luaL_error( L, "method not supported" ); + static const char * const cbnames[] = { + "connect", "connfail", "offline", + "message", "overflow", + "puback", "suback", "unsuback", + NULL + }; + switch (luaL_checkoption(L, 2, NULL, cbnames)) { + case 0: + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref); + mud->cb_connect_ref = luaL_ref(L, LUA_REGISTRYINDEX); + break; + case 1: + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + mud->cb_connect_fail_ref = luaL_ref(L, LUA_REGISTRYINDEX); + break; + case 2: + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); + mud->cb_disconnect_ref = luaL_ref(L, LUA_REGISTRYINDEX); + break; + case 3: + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_message_ref); + mud->cb_message_ref = luaL_ref(L, LUA_REGISTRYINDEX); + break; + case 4: + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_overflow_ref); + mud->cb_overflow_ref = luaL_ref(L, LUA_REGISTRYINDEX); + break; + case 5: + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref); + mud->cb_puback_ref = luaL_ref(L, LUA_REGISTRYINDEX); + break; + case 6: + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_suback_ref); + mud->cb_suback_ref = luaL_ref(L, LUA_REGISTRYINDEX); + break; + case 7: + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_unsuback_ref); + mud->cb_unsuback_ref = luaL_ref(L, LUA_REGISTRYINDEX); + break; } + NODE_DBG("leave mqtt_socket_on.\n"); return 0; } @@ -1369,15 +1249,8 @@ static int mqtt_socket_unsubscribe( lua_State* L ) { lmqtt_userdata *mud; mud = (lmqtt_userdata *) luaL_checkudata( L, stack, "mqtt.socket" ); - luaL_argcheck( L, mud, stack, "mqtt.socket expected" ); stack++; - if(mud==NULL){ - NODE_DBG("userdata is nil.\n"); - lua_pushboolean(L, 0); - return 1; - } - if(!mud->connected){ luaL_error( L, "not connected" ); lua_pushboolean(L, 0); @@ -1451,7 +1324,7 @@ static int mqtt_socket_unsubscribe( lua_State* L ) { msg_id, MQTT_MSG_TYPE_UNSUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) ); NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length); - NODE_DBG("msg_size: %d, event_timeout: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)), mud->event_timeout); + NODE_DBG("msg_size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); sint8 espconn_status = ESPCONN_IF; @@ -1478,15 +1351,8 @@ static int mqtt_socket_subscribe( lua_State* L ) { lmqtt_userdata *mud; mud = (lmqtt_userdata *) luaL_checkudata( L, stack, "mqtt.socket" ); - luaL_argcheck( L, mud, stack, "mqtt.socket expected" ); stack++; - if(mud==NULL){ - NODE_DBG("userdata is nil.\n"); - lua_pushboolean(L, 0); - return 1; - } - if(!mud->connected){ luaL_error( L, "not connected" ); lua_pushboolean(L, 0); @@ -1563,7 +1429,7 @@ static int mqtt_socket_subscribe( lua_State* L ) { msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) ); NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length); - NODE_DBG("msg_size: %d, event_timeout: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)), mud->event_timeout); + NODE_DBG("msg_size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); sint8 espconn_status = ESPCONN_IF; @@ -1589,13 +1455,7 @@ static int mqtt_socket_publish( lua_State* L ) uint16_t msg_id = 0; mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket"); - luaL_argcheck(L, mud, stack, "mqtt.socket expected"); stack++; - if(mud==NULL){ - NODE_DBG("userdata is nil.\n"); - lua_pushboolean(L, 0); - return 1; - } if(!mud->connected){ return luaL_error( L, "not connected" ); @@ -1650,80 +1510,33 @@ static int mqtt_socket_publish( lua_State* L ) return 1; } -// Lua: mqtt:lwt( topic, message, qos, retain, function(client) ) +// Lua: mqtt:lwt( topic, message, [qos, [retain]]) static int mqtt_socket_lwt( lua_State* L ) { NODE_DBG("enter mqtt_socket_lwt.\n"); - uint8_t stack = 1; - size_t topicSize, msgSize; - NODE_DBG("mqtt_socket_lwt.\n"); - lmqtt_userdata *mud = NULL; - const char *lwtTopic, *lwtMsg; - uint8_t lwtQoS, lwtRetain; - mud = (lmqtt_userdata *)luaL_checkudata( L, stack, "mqtt.socket" ); - luaL_argcheck( L, mud, stack, "mqtt.socket expected" ); + lmqtt_userdata *mud = luaL_checkudata( L, 1, "mqtt.socket" ); + luaL_argcheck( L, lua_isstring(L, 2), 2, "need lwt topic"); + luaL_argcheck( L, lua_isstring(L, 3), 3, "need lwt message"); - if(mud == NULL) - return 0; + lua_pushvalue(L, 2); + luaL_reref(L, LUA_REGISTRYINDEX, &mud->conf.will_topic_ref); - stack++; - lwtTopic = luaL_checklstring( L, stack, &topicSize ); - if (lwtTopic == NULL) - { - return luaL_error( L, "need lwt topic"); - } - - stack++; - lwtMsg = luaL_checklstring( L, stack, &msgSize ); - if (lwtMsg == NULL) - { - return luaL_error( L, "need lwt message"); - } - stack++; - if(mud->connect_info.will_topic){ // free the previous one if there is any - free(mud->connect_info.will_topic); - mud->connect_info.will_topic = NULL; - } - if(mud->connect_info.will_message){ - free(mud->connect_info.will_message); - mud->connect_info.will_message = NULL; - } - - mud->connect_info.will_topic = (uint8_t*) calloc(1, topicSize + 1 ); - mud->connect_info.will_message = (uint8_t*) calloc(1, msgSize + 1 ); - if(!mud->connect_info.will_topic || !mud->connect_info.will_message){ - if(mud->connect_info.will_topic){ - free(mud->connect_info.will_topic); - mud->connect_info.will_topic = NULL; - } - if(mud->connect_info.will_message){ - free(mud->connect_info.will_message); - mud->connect_info.will_message = NULL; - } - return luaL_error( L, "not enough memory"); - } - memcpy(mud->connect_info.will_topic, lwtTopic, topicSize); - mud->connect_info.will_topic[topicSize] = 0; - memcpy(mud->connect_info.will_message, lwtMsg, msgSize); - mud->connect_info.will_message[msgSize] = 0; + lua_pushvalue(L, 3); + luaL_reref(L, LUA_REGISTRYINDEX, &mud->conf.will_message_ref); + int stack = 4; if ( lua_isnumber(L, stack) ) { - mud->connect_info.will_qos = lua_tointeger(L, stack); + mud->conf.flags.will_qos = lua_tointeger(L, stack) & 0x3; stack++; } if ( lua_isnumber(L, stack) ) { - mud->connect_info.will_retain = lua_tointeger(L, stack); + mud->conf.flags.will_retain = !!lua_tointeger(L, stack); stack++; } - NODE_DBG("mqtt_socket_lwt: topic: %s, message: %s, qos: %d, retain: %d\n", - mud->connect_info.will_topic, - mud->connect_info.will_message, - mud->connect_info.will_qos, - mud->connect_info.will_retain); NODE_DBG("leave mqtt_socket_lwt.\n"); return 0; } diff --git a/app/mqtt/mqtt_msg.h b/app/mqtt/mqtt_msg.h index bdaeb3e6..b192b6b1 100644 --- a/app/mqtt/mqtt_msg.h +++ b/app/mqtt/mqtt_msg.h @@ -99,16 +99,15 @@ typedef struct mqtt_message_buffer typedef struct mqtt_connect_info { - char* client_id; - char* username; - char* password; - char* will_topic; - char* will_message; + const char* client_id; + const char* username; + const char* password; + const char* will_topic; + const char* will_message; int keepalive; int will_qos; int will_retain; int clean_session; - uint16_t max_message_length; } mqtt_connect_info_t; diff --git a/app/mqtt/msg_queue.c b/app/mqtt/msg_queue.c index 1b00d5e8..c2df1ae0 100644 --- a/app/mqtt/msg_queue.c +++ b/app/mqtt/msg_queue.c @@ -18,6 +18,8 @@ msg_queue_t *msg_enqueue(msg_queue_t **head, mqtt_message_t *msg, uint16_t msg_i return NULL; } + node->sent = 0; + node->msg.data = (uint8_t *)calloc(1,msg->length); if(!node->msg.data){ NODE_DBG("not enough memory\n"); diff --git a/app/mqtt/msg_queue.h b/app/mqtt/msg_queue.h index 05b910ae..88b2fee2 100644 --- a/app/mqtt/msg_queue.h +++ b/app/mqtt/msg_queue.h @@ -13,6 +13,8 @@ typedef struct msg_queue_t { uint16_t msg_id; int msg_type; int publish_qos; + + bool sent; } msg_queue_t; msg_queue_t * msg_enqueue(msg_queue_t **head, mqtt_message_t *msg, uint16_t msg_id, int msg_type, int publish_qos); diff --git a/docs/modules/mqtt.md b/docs/modules/mqtt.md index 289550c7..9ffeb0a3 100644 --- a/docs/modules/mqtt.md +++ b/docs/modules/mqtt.md @@ -64,8 +64,6 @@ m = mqtt.Client("clientid", 120, "user", "password") -- to topic "/lwt" if client don't send keepalive packet m:lwt("/lwt", "offline", 0, 0) -m:on("connect", function(client) print ("connected") end) -m:on("connfail", function(client, reason) print ("connection failed", reason) end) m:on("offline", function(client) print ("offline") end) -- on publish message receive event @@ -96,11 +94,11 @@ m:connect("192.168.11.118", 1883, false, function(client) client:publish("/topic", "hello", 0, 0, function(client) print("sent") end) end, function(client, reason) - print("failed reason: " .. reason) + print("Connection failed reason: " .. reason) end) -m:close(); --- you can call m:connect again +m:close() +-- you can call m:connect again after the offline callback fires ``` # MQTT Client @@ -108,7 +106,11 @@ m:close(); ## mqtt.client:close() -Closes connection to the broker. +Schedules a clean teardown of the connection. + +MQTT requires clients to actively signal a desire to disconnect to the server +to avoid sending their LWT. Thus, the Client is not immediately reusable +after this call, but only after the "offline" callback has fired. #### Syntax `mqtt:close()` @@ -117,7 +119,7 @@ Closes connection to the broker. none #### Returns -`true` on success, `false` otherwise +`nil` ## mqtt.client:connect()