Merge pull request #688 from karrots/mqtt-puback-fix

Fix for MQTT PUBACK loop
This commit is contained in:
Vowstar 2015-11-01 14:34:12 +08:00
commit 95e2bbb0f0
1 changed files with 21 additions and 21 deletions

View File

@ -297,12 +297,12 @@ READPACKET:
case MQTT_MSG_TYPE_PUBLISH:
if(msg_qos == 1){
temp_msg = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBACK, (int)mqtt_get_qos(temp_msg->data) );
}
else if(msg_qos == 2){
temp_msg = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBREC, (int)mqtt_get_qos(temp_msg->data) );
}
if(msg_qos == 1 || msg_qos == 2){
@ -328,11 +328,11 @@ READPACKET:
break;
case MQTT_MSG_TYPE_PUBREC:
if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){
NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n");
NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n");
// Note: actrually, should not destroy the msg until PUBCOMP is received.
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
temp_msg = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBREL, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("MQTT: Response PUBREL\r\n");
}
@ -341,7 +341,7 @@ READPACKET:
if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg->msg_id == msg_id){
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
temp_msg = mqtt_msg_pubcomp(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBCOMP, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("MQTT: Response PUBCOMP\r\n");
}
@ -363,7 +363,7 @@ READPACKET:
break;
case MQTT_MSG_TYPE_PINGREQ:
temp_msg = mqtt_msg_pingresp(&mud->mqtt_state.mqtt_connection);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PINGRESP, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("MQTT: Response PINGRESP\r\n");
break;
@ -446,7 +446,7 @@ static void mqtt_socket_sent(void *arg)
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_call(mud->L, 1, 0);
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK && node->publish_qos == 1) {
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
@ -567,7 +567,7 @@ void mqtt_socket_timer(void *arg)
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
NODE_DBG("\r\nMQTT: Send keepalive packet\r\n");
mqtt_message_t* temp_msg = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection);
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) );
// only one message in queue, send immediately.
#ifdef CLIENT_SSL_ENABLE
@ -597,7 +597,7 @@ static int mqtt_socket_client( lua_State* L )
c_sprintf(tempid, "%s%x", "NodeMCU_", system_get_chip_id() );
NODE_DBG(tempid);
NODE_DBG("\n");
const char *clientId = tempid, *username = NULL, *password = NULL;
size_t idl = c_strlen(tempid);
size_t unl = 0, pwl = 0;
@ -643,7 +643,7 @@ static int mqtt_socket_client( lua_State* L )
}
if(lua_isnumber( L, stack ))
{
{
keepalive = luaL_checkinteger( L, stack);
stack++;
}
@ -667,9 +667,9 @@ static int mqtt_socket_client( lua_State* L )
if(password == NULL)
pwl = 0;
NODE_DBG("lengh password: %d\r\n", pwl);
if(lua_isnumber( L, stack ))
{
{
clean_session = luaL_checkinteger( L, stack);
stack++;
}
@ -704,7 +704,7 @@ static int mqtt_socket_client( lua_State* L )
mud->connect_info.username[unl] = 0;
c_memcpy(mud->connect_info.password, password, pwl);
mud->connect_info.password[pwl] = 0;
NODE_DBG("MQTT: Init info: %s, %s, %s\r\n", mud->connect_info.client_id, mud->connect_info.username, mud->connect_info.password);
mud->connect_info.clean_session = clean_session;
@ -803,7 +803,7 @@ static int mqtt_delete( lua_State* L )
}
lua_gc(L, LUA_GCRESTART, 0);
NODE_DBG("leave mqtt_delete.\n");
return 0;
return 0;
}
static void socket_connect(struct espconn *pesp_conn)
@ -829,7 +829,7 @@ static void socket_connect(struct espconn *pesp_conn)
}
os_timer_arm(&mud->mqttTimer, 1000, 1);
NODE_DBG("leave socket_connect.\n");
}
@ -1049,7 +1049,7 @@ static int mqtt_socket_close( lua_State* L )
espconn_disconnect(mud->pesp_conn);
}
NODE_DBG("leave mqtt_socket_close.\n");
return 0;
return 0;
}
// Lua: mqtt:on( "method", function() )
@ -1184,7 +1184,7 @@ static int mqtt_socket_subscribe( lua_State* L ) {
mud->cb_suback_ref = luaL_ref( L, LUA_REGISTRYINDEX );
}
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length);
@ -1274,7 +1274,7 @@ static int mqtt_socket_publish( lua_State* L )
mud->cb_puback_ref = luaL_ref(L, LUA_REGISTRYINDEX);
}
msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos );
if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){
@ -1401,7 +1401,7 @@ static const LUA_REG_TYPE mqtt_socket_map[] =
{ LNILKEY, LNILVAL }
};
const LUA_REG_TYPE mqtt_map[] =
const LUA_REG_TYPE mqtt_map[] =
{
{ LSTRKEY( "Client" ), LFUNCVAL ( mqtt_socket_client ) },
#if LUA_OPTIMIZE_MEMORY > 0
@ -1424,7 +1424,7 @@ LUALIB_API int luaopen_mqtt( lua_State *L )
lua_pushvalue( L, -1 );
lua_setmetatable( L, -2 );
// Module constants
// Module constants
// MOD_REG_NUMBER( L, "TCP", TCP );
// create metatable
@ -1437,5 +1437,5 @@ LUALIB_API int luaopen_mqtt( lua_State *L )
luaL_register( L, NULL, mqtt_socket_map );
return 1;
#endif // #if LUA_OPTIMIZE_MEMORY > 0
#endif // #if LUA_OPTIMIZE_MEMORY > 0
}