From 32e062f52333541d27ab44c105b81b92baa24c5e Mon Sep 17 00:00:00 2001 From: funshine Date: Tue, 31 Mar 2015 23:38:28 +0800 Subject: [PATCH] polish mqtt module --- README.md | 6 +++++ app/modules/mqtt.c | 53 +++++++++++++++++++++++++++++++------------ app/mqtt/msg_queue.c | 22 ++++++++++++++++++ app/mqtt/msg_queue.h | 2 ++ examples/fragment.lua | 12 +++++++++- 5 files changed, 79 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 13a817da..1e71a8fe 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,12 @@ Tencent QQ group: 309957875
- cross compiler (done) # Change log +2015-03-31
+polish mqtt module, add queue for mqtt module.
+add reconnect option to mqtt.connect api, :connect( host, port, secure, auto_reconnect, function(client) )
+move node.readvdd33 to adc.readvdd33.
+tools/esptool.py supported NodeMCU devkit automatic flash. + 2015-03-18
update u8glib.
merge everything to master. diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index 2cc3ad6d..d64a831b 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -231,7 +231,7 @@ READPACKET: msg_qos = mqtt_get_qos(mud->mqtt_state.in_buffer); msg_id = mqtt_get_id(mud->mqtt_state.in_buffer, mud->mqtt_state.in_buffer_length); - msg_queue_t *pending_msg = mud->mqtt_state.pending_msg_q; + msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q)); NODE_DBG("MQTT_DATA: type: %d, qos: %d, msg_id: %d, pending_id: %d\r\n", msg_type, @@ -263,12 +263,12 @@ READPACKET: if(msg_qos == 1){ mud->mqtt_state.outbound_message = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id); node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, - msg_id, MQTT_MSG_TYPE_PUBACK, (int)msg_qos ); + msg_id, MQTT_MSG_TYPE_PUBACK, 0 ); } else if(msg_qos == 2){ mud->mqtt_state.outbound_message = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id); node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, - msg_id, MQTT_MSG_TYPE_PUBREC, (int)msg_qos ); + msg_id, MQTT_MSG_TYPE_PUBREC, 0 ); } if(msg_qos == 1 || msg_qos == 2){ NODE_DBG("MQTT: Queue response QoS: %d\r\n", msg_qos); @@ -290,19 +290,27 @@ READPACKET: break; case MQTT_MSG_TYPE_PUBREC: + if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){ + NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n"); + // Note: actrually, should not destroy the msg until PUBCOMP is received. + msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); mud->mqtt_state.outbound_message = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id); node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, - msg_id, MQTT_MSG_TYPE_PUBREL, (int)msg_qos ); + msg_id, MQTT_MSG_TYPE_PUBREL, 1 ); NODE_DBG("MQTT: Response PUBREL\r\n"); + } break; case MQTT_MSG_TYPE_PUBREL: + if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg->msg_id == msg_id){ + msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); mud->mqtt_state.outbound_message = mqtt_msg_pubcomp(&mud->mqtt_state.mqtt_connection, msg_id); node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, - msg_id, MQTT_MSG_TYPE_PUBCOMP, (int)msg_qos ); + msg_id, MQTT_MSG_TYPE_PUBCOMP, 0 ); NODE_DBG("MQTT: Response PUBCOMP\r\n"); + } break; case MQTT_MSG_TYPE_PUBCOMP: - if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){ + if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBREL && pending_msg->msg_id == msg_id){ NODE_DBG("MQTT: Publish with QoS = 2 successful\r\n"); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); if(mud->cb_puback_ref == LUA_NOREF) @@ -317,7 +325,7 @@ READPACKET: case MQTT_MSG_TYPE_PINGREQ: mud->mqtt_state.outbound_message = mqtt_msg_pingresp(&mud->mqtt_state.mqtt_connection); node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, - msg_id, MQTT_MSG_TYPE_PINGRESP, (int)msg_qos ); + msg_id, MQTT_MSG_TYPE_PINGRESP, 1 ); NODE_DBG("MQTT: Response PINGRESP\r\n"); break; case MQTT_MSG_TYPE_PINGRESP: @@ -345,7 +353,7 @@ READPACKET: break; } - if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->event_timeout == 0){ + if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){ mud->event_timeout = MQTT_SEND_TIMEOUT; NODE_DBG("Sent: %d\n", node->msg.length); if( mud->secure ) @@ -355,6 +363,7 @@ READPACKET: mud->keep_alive_tick = 0; mud->mqtt_state.outbound_message = NULL; } + NODE_DBG("receive, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("leave mqtt_socket_received.\n"); return; } @@ -376,9 +385,9 @@ static void mqtt_socket_sent(void *arg) // call mqtt_sent() mud->event_timeout = 0; - + NODE_DBG("sent1, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); // qos = 0, publish and forgot. - msg_queue_t *node = mud->mqtt_state.pending_msg_q; + msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q)); if(node && node->msg_type == MQTT_MSG_TYPE_PUBLISH && node->publish_qos == 0) { msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); if(mud->cb_puback_ref == LUA_NOREF) @@ -388,7 +397,14 @@ static void mqtt_socket_sent(void *arg) lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_puback_ref); lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua lua_call(gL, 1, 0); + } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK && node->publish_qos == 1) { + msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); + } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) { + msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); + } else if(node && node->msg_type == MQTT_MSG_TYPE_PINGRESP) { + msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); } + NODE_DBG("sent2, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("leave mqtt_socket_sent.\n"); } @@ -423,16 +439,21 @@ static void mqtt_socket_connected(void *arg) void mqtt_socket_timer(void *arg) { - // NODE_DBG("enter mqtt_socket_timer.\n"); + NODE_DBG("enter mqtt_socket_timer.\n"); lmqtt_userdata *mud = (lmqtt_userdata*) arg; if(mud == NULL) return; + NODE_DBG("timer, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); if(mud->event_timeout > 0){ NODE_DBG("event_timeout: %d.\n", mud->event_timeout); mud->event_timeout --; if(mud->event_timeout > 0){ return; + } else { + NODE_DBG("event timeout. \n"); + // should remove the head of the queue and re-send with DUP = 1 + // Not implemented yet. } } @@ -453,7 +474,7 @@ void mqtt_socket_timer(void *arg) } else if(mud->connState == MQTT_CONNECT_SENT){ // wait for CONACK time out. NODE_DBG("MQTT_CONNECT failed.\n"); } else if(mud->connState == MQTT_DATA){ - msg_queue_t *pending_msg = mud->mqtt_state.pending_msg_q; + msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q)); if(pending_msg){ mud->event_timeout = MQTT_SEND_TIMEOUT; if(mud->secure) @@ -477,7 +498,7 @@ void mqtt_socket_timer(void *arg) } } } - // NODE_DBG("leave mqtt_socket_timer.\n"); + NODE_DBG("leave mqtt_socket_timer.\n"); } // Lua: mqtt.Client(clientid, keepalive, user, pass) @@ -1057,7 +1078,7 @@ static int mqtt_socket_subscribe( lua_State* L ) { NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length); - if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->event_timeout == 0){ + if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){ mud->event_timeout = MQTT_SEND_TIMEOUT; NODE_DBG("Sent: %d\n", node->msg.length); if( mud->secure ) @@ -1073,6 +1094,7 @@ static int mqtt_socket_subscribe( lua_State* L ) { lua_pushboolean(L, 1); // enqueued succeed. } mud->mqtt_state.outbound_message = NULL; + NODE_DBG("subscribe, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("leave mqtt_socket_subscribe.\n"); return 1; } @@ -1133,7 +1155,7 @@ static int mqtt_socket_publish( lua_State* L ) msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos ); - if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->event_timeout == 0){ + if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){ mud->event_timeout = MQTT_SEND_TIMEOUT; NODE_DBG("Sent: %d\n", node->msg.length); if( mud->secure ) @@ -1149,6 +1171,7 @@ static int mqtt_socket_publish( lua_State* L ) lua_pushboolean(L, 1); // enqueued succeed. } mud->mqtt_state.outbound_message = NULL; + NODE_DBG("publish, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("leave mqtt_socket_publish.\n"); return 1; } diff --git a/app/mqtt/msg_queue.c b/app/mqtt/msg_queue.c index afa98d41..1258ad6e 100644 --- a/app/mqtt/msg_queue.c +++ b/app/mqtt/msg_queue.c @@ -58,3 +58,25 @@ msg_queue_t * msg_dequeue(msg_queue_t **head){ node->next = NULL; return node; } + +msg_queue_t * msg_peek(msg_queue_t **head){ + if(!head || !*head){ + return NULL; + } + return *head; // fetch head. +} + +int msg_size(msg_queue_t **head){ + if(!head || !*head){ + return 0; + } + int i = 1; + msg_queue_t *tail = *head; + if(tail){ + while(tail->next!=NULL){ + tail = tail->next; + i++; + } + } + return i; +} diff --git a/app/mqtt/msg_queue.h b/app/mqtt/msg_queue.h index 9da3f6bc..05b910ae 100644 --- a/app/mqtt/msg_queue.h +++ b/app/mqtt/msg_queue.h @@ -18,6 +18,8 @@ typedef struct msg_queue_t { msg_queue_t * msg_enqueue(msg_queue_t **head, mqtt_message_t *msg, uint16_t msg_id, int msg_type, int publish_qos); void msg_destroy(msg_queue_t *node); msg_queue_t * msg_dequeue(msg_queue_t **head); +msg_queue_t * msg_peek(msg_queue_t **head); +int msg_size(msg_queue_t **head); #ifdef __cplusplus } diff --git a/examples/fragment.lua b/examples/fragment.lua index fb938191..2e2e752a 100644 --- a/examples/fragment.lua +++ b/examples/fragment.lua @@ -418,7 +418,7 @@ m:publish("/topic1","hello",0,0) m:publish("/topic3","hello",0,0) m:publish("/topic4","hello",0,0) m:publish("/topic1","hello1",0,0) m:publish("/topic2","hello2",0,0) m:publish("/topic1","hello",1,0) -m:subscribe("/topic3",2,function(m) print("sub done") end) +m:subscribe("/topic3",0,function(m) print("sub done") end) m:publish("/topic3","hello3",2,0) m=mqtt.Client() @@ -450,3 +450,13 @@ end) m:connect("192.168.18.88",1883) m:close() + +m=mqtt.Client() +m:connect("192.168.18.88",1883) +m:on("message",function(m,t,pl) print(t..":") if pl~=nil then print(pl) end end ) +m:subscribe("/topic1",0,function(m) print("sub done") end) +m:publish("/topic1","hello3",2,0) m:publish("/topic1","hello2",2,0) +m:publish("/topic1","hello3",0,0) m:publish("/topic1","hello2",2,0) + +m:subscribe("/topic2",2,function(m) print("sub done") end) +m:publish("/topic2","hello3",0,0) m:publish("/topic2","hello2",2,0)