From bcbde08bf70b703e32aa43bdc6a76102a50944d1 Mon Sep 17 00:00:00 2001 From: funshine Date: Sun, 5 Apr 2015 02:22:51 +0800 Subject: [PATCH] fix mqtt keepalive ping, add a example for mqtt in lua_examples. --- app/include/user_version.h | 2 +- app/modules/mqtt.c | 62 ++++++++++++++++++-------------------- examples/fragment.lua | 31 +++++++++++++++++++ lua_examples/mqtt_test.lua | 30 ++++++++++++++++++ 4 files changed, 91 insertions(+), 34 deletions(-) create mode 100644 lua_examples/mqtt_test.lua diff --git a/app/include/user_version.h b/app/include/user_version.h index 06bce33b..e9357259 100644 --- a/app/include/user_version.h +++ b/app/include/user_version.h @@ -7,6 +7,6 @@ #define NODE_VERSION_INTERNAL 0U #define NODE_VERSION "NodeMCU 0.9.5" -#define BUILD_DATE "build 20150403" +#define BUILD_DATE "build 20150405" #endif /* __USER_VERSION_H__ */ diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index ce303af2..c7afd0ff 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -47,8 +47,6 @@ typedef struct mqtt_state_t uint16_t port; int auto_reconnect; mqtt_connect_info_t* connect_info; - uint8_t* in_buffer; - int in_buffer_length; uint16_t message_length; uint16_t message_length_read; mqtt_connection_t mqtt_connection; @@ -166,7 +164,6 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length) NODE_DBG("enter deliver_publish.\n"); if(mud == NULL) return; - const char comma[] = ","; mqtt_event_data_t event_data; event_data.topic_length = length; @@ -181,11 +178,15 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length) return; if(mud->L == NULL) return; - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_message_ref); - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua - - lua_pushlstring(mud->L, event_data.topic, event_data.topic_length); - if(event_data.data_length > 0){ + if(event_data.topic && (event_data.topic_length > 0)){ + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_message_ref); + lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua + lua_pushlstring(mud->L, event_data.topic, event_data.topic_length); + } else { + NODE_DBG("get wrong packet.\n"); + return; + } + if(event_data.data && (event_data.data_length > 0)){ lua_pushlstring(mud->L, event_data.data, event_data.data_length); lua_call(mud->L, 3, 0); } else { @@ -202,6 +203,9 @@ static void mqtt_socket_received(void *arg, char *pdata, unsigned short len) uint8_t msg_qos; uint16_t msg_id; msg_queue_t *node = NULL; + int length = (int)len; + // uint8_t in_buffer[MQTT_BUF_SIZE]; + uint8_t *in_buffer = (uint8_t *)pdata; struct espconn *pesp_conn = arg; if(pesp_conn == NULL) @@ -211,17 +215,17 @@ static void mqtt_socket_received(void *arg, char *pdata, unsigned short len) return; READPACKET: - if(len > MQTT_BUF_SIZE && len <= 0) + if(length > MQTT_BUF_SIZE || length <= 0) return; - c_memcpy(mud->mqtt_state.in_buffer, pdata, len); + // c_memcpy(in_buffer, pdata, length); uint8_t temp_buffer[MQTT_BUF_SIZE]; mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); mqtt_message_t *temp_msg = NULL; switch(mud->connState){ case MQTT_CONNECT_SENDING: case MQTT_CONNECT_SENT: - if(mqtt_get_type(mud->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK){ + if(mqtt_get_type(in_buffer) != MQTT_MSG_TYPE_CONNACK){ NODE_DBG("MQTT: Invalid packet\r\n"); mud->connState = MQTT_INIT; if(mud->secure) @@ -245,11 +249,11 @@ READPACKET: break; case MQTT_DATA: - mud->mqtt_state.message_length_read = len; - mud->mqtt_state.message_length = mqtt_get_total_length(mud->mqtt_state.in_buffer, mud->mqtt_state.message_length_read); - msg_type = mqtt_get_type(mud->mqtt_state.in_buffer); - msg_qos = mqtt_get_qos(mud->mqtt_state.in_buffer); - msg_id = mqtt_get_id(mud->mqtt_state.in_buffer, mud->mqtt_state.in_buffer_length); + mud->mqtt_state.message_length_read = length; + mud->mqtt_state.message_length = mqtt_get_total_length(in_buffer, mud->mqtt_state.message_length_read); + msg_type = mqtt_get_type(in_buffer); + msg_qos = mqtt_get_qos(in_buffer); + msg_id = mqtt_get_id(in_buffer, mud->mqtt_state.message_length); msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q)); @@ -295,7 +299,7 @@ READPACKET: if(msg_qos == 1 || msg_qos == 2){ NODE_DBG("MQTT: Queue response QoS: %d\r\n", msg_qos); } - deliver_publish(mud, mud->mqtt_state.in_buffer, mud->mqtt_state.message_length_read); + deliver_publish(mud, in_buffer, mud->mqtt_state.message_length); break; case MQTT_MSG_TYPE_PUBACK: if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){ @@ -365,11 +369,11 @@ READPACKET: if(msg_type == MQTT_MSG_TYPE_PUBLISH) { - len = mud->mqtt_state.message_length_read; + length = mud->mqtt_state.message_length_read; if(mud->mqtt_state.message_length < mud->mqtt_state.message_length_read) { - len -= mud->mqtt_state.message_length; + length -= mud->mqtt_state.message_length; pdata += mud->mqtt_state.message_length; NODE_DBG("Get another published message\r\n"); @@ -386,8 +390,8 @@ READPACKET: espconn_secure_sent( pesp_conn, node->msg.data, node->msg.length ); else espconn_sent( pesp_conn, node->msg.data, node->msg.length ); - mud->keep_alive_tick = 0; } + mud->keep_alive_tick = 0; NODE_DBG("receive, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("leave mqtt_socket_received.\n"); return; @@ -406,6 +410,7 @@ static void mqtt_socket_sent(void *arg) return; // call mqtt_sent() mud->event_timeout = 0; + mud->keep_alive_tick = 0; if(mud->connState == MQTT_CONNECT_SENDING){ mud->connState = MQTT_CONNECT_SENT; @@ -430,7 +435,7 @@ static void mqtt_socket_sent(void *arg) msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) { msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - } else if(node && node->msg_type == MQTT_MSG_TYPE_PINGRESP) { + } else if(node && node->msg_type == MQTT_MSG_TYPE_PINGREQ) { msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); } NODE_DBG("sent2, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); @@ -462,6 +467,7 @@ static void mqtt_socket_connected(void *arg) espconn_secure_sent(pesp_conn, temp_msg->data, temp_msg->length); else espconn_sent(pesp_conn, temp_msg->data, temp_msg->length); + mud->keep_alive_tick = 0; mud->connState = MQTT_CONNECT_SENDING; NODE_DBG("leave mqtt_socket_connected.\n"); @@ -625,8 +631,7 @@ static int mqtt_socket_client( lua_State* L ) mud->connect_info.client_id = (uint8_t *)c_zalloc(idl+1); mud->connect_info.username = (uint8_t *)c_zalloc(unl + 1); mud->connect_info.password = (uint8_t *)c_zalloc(pwl + 1); - mud->mqtt_state.in_buffer = (uint8_t *)c_zalloc(MQTT_BUF_SIZE); - if(!mud->connect_info.client_id || !mud->connect_info.username || !mud->connect_info.password || !mud->mqtt_state.in_buffer){ + if(!mud->connect_info.client_id || !mud->connect_info.username || !mud->connect_info.password){ if(mud->connect_info.client_id) { c_free(mud->connect_info.client_id); mud->connect_info.client_id = NULL; @@ -638,10 +643,6 @@ static int mqtt_socket_client( lua_State* L ) if(mud->connect_info.password) { c_free(mud->connect_info.password); mud->connect_info.password = NULL; - } - if(mud->mqtt_state.in_buffer) { - c_free(mud->mqtt_state.in_buffer); - mud->mqtt_state.in_buffer = NULL; } return luaL_error(L, "not enough memory"); } @@ -660,9 +661,8 @@ static int mqtt_socket_client( lua_State* L ) mud->connect_info.will_retain = 0; mud->connect_info.keepalive = keepalive; - mud->mqtt_state.in_buffer_length = MQTT_BUF_SIZE; mud->mqtt_state.pending_msg_q = NULL; - mud->mqtt_state.auto_reconnect = 1; + mud->mqtt_state.auto_reconnect = 0; mud->mqtt_state.port = 1883; mud->mqtt_state.connect_info = &mud->connect_info; @@ -722,10 +722,6 @@ static int mqtt_delete( lua_State* L ) c_free(mud->connect_info.password); mud->connect_info.password = NULL; } - if(mud->mqtt_state.in_buffer){ - c_free(mud->mqtt_state.in_buffer); - mud->mqtt_state.in_buffer = NULL; - } // ------- // free (unref) callback ref diff --git a/examples/fragment.lua b/examples/fragment.lua index 157d8c41..3f4ee2d0 100644 --- a/examples/fragment.lua +++ b/examples/fragment.lua @@ -484,3 +484,34 @@ gpio.mode(1,gpio.OUTPUT,gpio.PULLUP) gpio.serout(1,0,{20,10,10,20,10,10,10,100}) -- sim uart one byte 0x5A at about 100kbps gpio.serout(1,1,{8,18},8) -- serial 30% pwm 38k, lasts 8 cycles + +-- Lua: mqtt.Client(clientid, keepalive, user, pass) +-- test with cloudmqtt.com +m_dis={} +function dispatch(m,t,pl) + if pl~=nil and m_dis[t] then + m_dis[t](pl) + end +end +function topic1func(pl) + print("get1: "..pl) +end +function topic2func(pl) + print("get2: "..pl) +end +m_dis["/topic1"]=topic1func +m_dis["/topic2"]=topic2func +m=mqtt.Client("nodemcu1",60,"test","test123") +m:on("connect",function(m) + print("connection "..node.heap()) + m:subscribe("/topic1",0,function(m) print("sub done") end) + m:subscribe("/topic2",0,function(m) print("sub done") end) + m:publish("/topic1","hello",0,0) m:publish("/topic2","world",0,0) + end ) +m:on("offline", function(conn) + print("disconnect to broker...") + print(node.heap()) +end) +m:on("message",dispatch ) +m:connect("m11.cloudmqtt.com",11214,0,1) +-- Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client) ) diff --git a/lua_examples/mqtt_test.lua b/lua_examples/mqtt_test.lua new file mode 100644 index 00000000..d27c6a93 --- /dev/null +++ b/lua_examples/mqtt_test.lua @@ -0,0 +1,30 @@ +-- Lua: mqtt.Client(clientid, keepalive, user, pass) +-- test with cloudmqtt.com +m_dis={} +function dispatch(m,t,pl) + if pl~=nil and m_dis[t] then + m_dis[t](pl) + end +end +function topic1func(pl) + print("get1: "..pl) +end +function topic2func(pl) + print("get2: "..pl) +end +m_dis["/topic1"]=topic1func +m_dis["/topic2"]=topic2func +m=mqtt.Client("nodemcu1",60,"test","test123") +m:on("connect",function(m) + print("connection "..node.heap()) + m:subscribe("/topic1",0,function(m) print("sub done") end) + m:subscribe("/topic2",0,function(m) print("sub done") end) + m:publish("/topic1","hello",0,0) m:publish("/topic2","world",0,0) + end ) +m:on("offline", function(conn) + print("disconnect to broker...") + print(node.heap()) +end) +m:on("message",dispatch ) +m:connect("m11.cloudmqtt.com",11214,0,1) +-- Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client) )