From 57c2e6eacd9d9b21394bbb3404b853ca05841636 Mon Sep 17 00:00:00 2001 From: Jonathan karras Date: Mon, 12 Oct 2015 23:49:10 -0600 Subject: [PATCH] Fix for MQTT PUBACK loop Conflicts: app/modules/mqtt.c --- app/modules/mqtt.c | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index ed616e72..1db350af 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -297,12 +297,12 @@ READPACKET: case MQTT_MSG_TYPE_PUBLISH: if(msg_qos == 1){ temp_msg = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id); - node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, + node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBACK, (int)mqtt_get_qos(temp_msg->data) ); } else if(msg_qos == 2){ temp_msg = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id); - node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, + node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBREC, (int)mqtt_get_qos(temp_msg->data) ); } if(msg_qos == 1 || msg_qos == 2){ @@ -328,11 +328,11 @@ READPACKET: break; case MQTT_MSG_TYPE_PUBREC: if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){ - NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n"); + NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n"); // Note: actrually, should not destroy the msg until PUBCOMP is received. msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); temp_msg = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id); - node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, + node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBREL, (int)mqtt_get_qos(temp_msg->data) ); NODE_DBG("MQTT: Response PUBREL\r\n"); } @@ -341,7 +341,7 @@ READPACKET: if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg->msg_id == msg_id){ msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); temp_msg = mqtt_msg_pubcomp(&mud->mqtt_state.mqtt_connection, msg_id); - node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, + node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBCOMP, (int)mqtt_get_qos(temp_msg->data) ); NODE_DBG("MQTT: Response PUBCOMP\r\n"); } @@ -363,7 +363,7 @@ READPACKET: break; case MQTT_MSG_TYPE_PINGREQ: temp_msg = mqtt_msg_pingresp(&mud->mqtt_state.mqtt_connection); - node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, + node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PINGRESP, (int)mqtt_get_qos(temp_msg->data) ); NODE_DBG("MQTT: Response PINGRESP\r\n"); break; @@ -446,7 +446,7 @@ static void mqtt_socket_sent(void *arg) lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_puback_ref); lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua lua_call(mud->L, 1, 0); - } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK && node->publish_qos == 1) { + } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK) { msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) { msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); @@ -567,7 +567,7 @@ void mqtt_socket_timer(void *arg) mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); NODE_DBG("\r\nMQTT: Send keepalive packet\r\n"); mqtt_message_t* temp_msg = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection); - msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg, + msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg, 0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) ); // only one message in queue, send immediately. #ifdef CLIENT_SSL_ENABLE @@ -597,7 +597,7 @@ static int mqtt_socket_client( lua_State* L ) c_sprintf(tempid, "%s%x", "NodeMCU_", system_get_chip_id() ); NODE_DBG(tempid); NODE_DBG("\n"); - + const char *clientId = tempid, *username = NULL, *password = NULL; size_t idl = c_strlen(tempid); size_t unl = 0, pwl = 0; @@ -643,7 +643,7 @@ static int mqtt_socket_client( lua_State* L ) } if(lua_isnumber( L, stack )) - { + { keepalive = luaL_checkinteger( L, stack); stack++; } @@ -667,9 +667,9 @@ static int mqtt_socket_client( lua_State* L ) if(password == NULL) pwl = 0; NODE_DBG("lengh password: %d\r\n", pwl); - + if(lua_isnumber( L, stack )) - { + { clean_session = luaL_checkinteger( L, stack); stack++; } @@ -704,7 +704,7 @@ static int mqtt_socket_client( lua_State* L ) mud->connect_info.username[unl] = 0; c_memcpy(mud->connect_info.password, password, pwl); mud->connect_info.password[pwl] = 0; - + NODE_DBG("MQTT: Init info: %s, %s, %s\r\n", mud->connect_info.client_id, mud->connect_info.username, mud->connect_info.password); mud->connect_info.clean_session = clean_session; @@ -803,7 +803,7 @@ static int mqtt_delete( lua_State* L ) } lua_gc(L, LUA_GCRESTART, 0); NODE_DBG("leave mqtt_delete.\n"); - return 0; + return 0; } static void socket_connect(struct espconn *pesp_conn) @@ -829,7 +829,7 @@ static void socket_connect(struct espconn *pesp_conn) } os_timer_arm(&mud->mqttTimer, 1000, 1); - + NODE_DBG("leave socket_connect.\n"); } @@ -1049,7 +1049,7 @@ static int mqtt_socket_close( lua_State* L ) espconn_disconnect(mud->pesp_conn); } NODE_DBG("leave mqtt_socket_close.\n"); - return 0; + return 0; } // Lua: mqtt:on( "method", function() ) @@ -1184,7 +1184,7 @@ static int mqtt_socket_subscribe( lua_State* L ) { mud->cb_suback_ref = luaL_ref( L, LUA_REGISTRYINDEX ); } - msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg, + msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) ); NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length); @@ -1274,7 +1274,7 @@ static int mqtt_socket_publish( lua_State* L ) mud->cb_puback_ref = luaL_ref(L, LUA_REGISTRYINDEX); } - msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, + msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos ); if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){ @@ -1401,7 +1401,7 @@ static const LUA_REG_TYPE mqtt_socket_map[] = { LNILKEY, LNILVAL } }; -const LUA_REG_TYPE mqtt_map[] = +const LUA_REG_TYPE mqtt_map[] = { { LSTRKEY( "Client" ), LFUNCVAL ( mqtt_socket_client ) }, #if LUA_OPTIMIZE_MEMORY > 0 @@ -1424,7 +1424,7 @@ LUALIB_API int luaopen_mqtt( lua_State *L ) lua_pushvalue( L, -1 ); lua_setmetatable( L, -2 ); - // Module constants + // Module constants // MOD_REG_NUMBER( L, "TCP", TCP ); // create metatable @@ -1437,5 +1437,5 @@ LUALIB_API int luaopen_mqtt( lua_State *L ) luaL_register( L, NULL, mqtt_socket_map ); return 1; -#endif // #if LUA_OPTIMIZE_MEMORY > 0 +#endif // #if LUA_OPTIMIZE_MEMORY > 0 }