fix mqtt keepalive ping, add a example for mqtt in lua_examples.

This commit is contained in:
funshine 2015-04-05 02:22:51 +08:00
parent 8558641190
commit bcbde08bf7
4 changed files with 91 additions and 34 deletions

View File

@ -7,6 +7,6 @@
#define NODE_VERSION_INTERNAL 0U #define NODE_VERSION_INTERNAL 0U
#define NODE_VERSION "NodeMCU 0.9.5" #define NODE_VERSION "NodeMCU 0.9.5"
#define BUILD_DATE "build 20150403" #define BUILD_DATE "build 20150405"
#endif /* __USER_VERSION_H__ */ #endif /* __USER_VERSION_H__ */

View File

@ -47,8 +47,6 @@ typedef struct mqtt_state_t
uint16_t port; uint16_t port;
int auto_reconnect; int auto_reconnect;
mqtt_connect_info_t* connect_info; mqtt_connect_info_t* connect_info;
uint8_t* in_buffer;
int in_buffer_length;
uint16_t message_length; uint16_t message_length;
uint16_t message_length_read; uint16_t message_length_read;
mqtt_connection_t mqtt_connection; mqtt_connection_t mqtt_connection;
@ -166,7 +164,6 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length)
NODE_DBG("enter deliver_publish.\n"); NODE_DBG("enter deliver_publish.\n");
if(mud == NULL) if(mud == NULL)
return; return;
const char comma[] = ",";
mqtt_event_data_t event_data; mqtt_event_data_t event_data;
event_data.topic_length = length; event_data.topic_length = length;
@ -181,11 +178,15 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length)
return; return;
if(mud->L == NULL) if(mud->L == NULL)
return; return;
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_message_ref); if(event_data.topic && (event_data.topic_length > 0)){
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_message_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_pushlstring(mud->L, event_data.topic, event_data.topic_length); lua_pushlstring(mud->L, event_data.topic, event_data.topic_length);
if(event_data.data_length > 0){ } else {
NODE_DBG("get wrong packet.\n");
return;
}
if(event_data.data && (event_data.data_length > 0)){
lua_pushlstring(mud->L, event_data.data, event_data.data_length); lua_pushlstring(mud->L, event_data.data, event_data.data_length);
lua_call(mud->L, 3, 0); lua_call(mud->L, 3, 0);
} else { } else {
@ -202,6 +203,9 @@ static void mqtt_socket_received(void *arg, char *pdata, unsigned short len)
uint8_t msg_qos; uint8_t msg_qos;
uint16_t msg_id; uint16_t msg_id;
msg_queue_t *node = NULL; msg_queue_t *node = NULL;
int length = (int)len;
// uint8_t in_buffer[MQTT_BUF_SIZE];
uint8_t *in_buffer = (uint8_t *)pdata;
struct espconn *pesp_conn = arg; struct espconn *pesp_conn = arg;
if(pesp_conn == NULL) if(pesp_conn == NULL)
@ -211,17 +215,17 @@ static void mqtt_socket_received(void *arg, char *pdata, unsigned short len)
return; return;
READPACKET: READPACKET:
if(len > MQTT_BUF_SIZE && len <= 0) if(length > MQTT_BUF_SIZE || length <= 0)
return; return;
c_memcpy(mud->mqtt_state.in_buffer, pdata, len); // c_memcpy(in_buffer, pdata, length);
uint8_t temp_buffer[MQTT_BUF_SIZE]; uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = NULL; mqtt_message_t *temp_msg = NULL;
switch(mud->connState){ switch(mud->connState){
case MQTT_CONNECT_SENDING: case MQTT_CONNECT_SENDING:
case MQTT_CONNECT_SENT: case MQTT_CONNECT_SENT:
if(mqtt_get_type(mud->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK){ if(mqtt_get_type(in_buffer) != MQTT_MSG_TYPE_CONNACK){
NODE_DBG("MQTT: Invalid packet\r\n"); NODE_DBG("MQTT: Invalid packet\r\n");
mud->connState = MQTT_INIT; mud->connState = MQTT_INIT;
if(mud->secure) if(mud->secure)
@ -245,11 +249,11 @@ READPACKET:
break; break;
case MQTT_DATA: case MQTT_DATA:
mud->mqtt_state.message_length_read = len; mud->mqtt_state.message_length_read = length;
mud->mqtt_state.message_length = mqtt_get_total_length(mud->mqtt_state.in_buffer, mud->mqtt_state.message_length_read); mud->mqtt_state.message_length = mqtt_get_total_length(in_buffer, mud->mqtt_state.message_length_read);
msg_type = mqtt_get_type(mud->mqtt_state.in_buffer); msg_type = mqtt_get_type(in_buffer);
msg_qos = mqtt_get_qos(mud->mqtt_state.in_buffer); msg_qos = mqtt_get_qos(in_buffer);
msg_id = mqtt_get_id(mud->mqtt_state.in_buffer, mud->mqtt_state.in_buffer_length); msg_id = mqtt_get_id(in_buffer, mud->mqtt_state.message_length);
msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q)); msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q));
@ -295,7 +299,7 @@ READPACKET:
if(msg_qos == 1 || msg_qos == 2){ if(msg_qos == 1 || msg_qos == 2){
NODE_DBG("MQTT: Queue response QoS: %d\r\n", msg_qos); NODE_DBG("MQTT: Queue response QoS: %d\r\n", msg_qos);
} }
deliver_publish(mud, mud->mqtt_state.in_buffer, mud->mqtt_state.message_length_read); deliver_publish(mud, in_buffer, mud->mqtt_state.message_length);
break; break;
case MQTT_MSG_TYPE_PUBACK: case MQTT_MSG_TYPE_PUBACK:
if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){ if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){
@ -365,11 +369,11 @@ READPACKET:
if(msg_type == MQTT_MSG_TYPE_PUBLISH) if(msg_type == MQTT_MSG_TYPE_PUBLISH)
{ {
len = mud->mqtt_state.message_length_read; length = mud->mqtt_state.message_length_read;
if(mud->mqtt_state.message_length < mud->mqtt_state.message_length_read) if(mud->mqtt_state.message_length < mud->mqtt_state.message_length_read)
{ {
len -= mud->mqtt_state.message_length; length -= mud->mqtt_state.message_length;
pdata += mud->mqtt_state.message_length; pdata += mud->mqtt_state.message_length;
NODE_DBG("Get another published message\r\n"); NODE_DBG("Get another published message\r\n");
@ -386,8 +390,8 @@ READPACKET:
espconn_secure_sent( pesp_conn, node->msg.data, node->msg.length ); espconn_secure_sent( pesp_conn, node->msg.data, node->msg.length );
else else
espconn_sent( pesp_conn, node->msg.data, node->msg.length ); espconn_sent( pesp_conn, node->msg.data, node->msg.length );
mud->keep_alive_tick = 0;
} }
mud->keep_alive_tick = 0;
NODE_DBG("receive, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("receive, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
NODE_DBG("leave mqtt_socket_received.\n"); NODE_DBG("leave mqtt_socket_received.\n");
return; return;
@ -406,6 +410,7 @@ static void mqtt_socket_sent(void *arg)
return; return;
// call mqtt_sent() // call mqtt_sent()
mud->event_timeout = 0; mud->event_timeout = 0;
mud->keep_alive_tick = 0;
if(mud->connState == MQTT_CONNECT_SENDING){ if(mud->connState == MQTT_CONNECT_SENDING){
mud->connState = MQTT_CONNECT_SENT; mud->connState = MQTT_CONNECT_SENT;
@ -430,7 +435,7 @@ static void mqtt_socket_sent(void *arg)
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) { } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
} else if(node && node->msg_type == MQTT_MSG_TYPE_PINGRESP) { } else if(node && node->msg_type == MQTT_MSG_TYPE_PINGREQ) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
} }
NODE_DBG("sent2, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("sent2, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
@ -462,6 +467,7 @@ static void mqtt_socket_connected(void *arg)
espconn_secure_sent(pesp_conn, temp_msg->data, temp_msg->length); espconn_secure_sent(pesp_conn, temp_msg->data, temp_msg->length);
else else
espconn_sent(pesp_conn, temp_msg->data, temp_msg->length); espconn_sent(pesp_conn, temp_msg->data, temp_msg->length);
mud->keep_alive_tick = 0;
mud->connState = MQTT_CONNECT_SENDING; mud->connState = MQTT_CONNECT_SENDING;
NODE_DBG("leave mqtt_socket_connected.\n"); NODE_DBG("leave mqtt_socket_connected.\n");
@ -625,8 +631,7 @@ static int mqtt_socket_client( lua_State* L )
mud->connect_info.client_id = (uint8_t *)c_zalloc(idl+1); mud->connect_info.client_id = (uint8_t *)c_zalloc(idl+1);
mud->connect_info.username = (uint8_t *)c_zalloc(unl + 1); mud->connect_info.username = (uint8_t *)c_zalloc(unl + 1);
mud->connect_info.password = (uint8_t *)c_zalloc(pwl + 1); mud->connect_info.password = (uint8_t *)c_zalloc(pwl + 1);
mud->mqtt_state.in_buffer = (uint8_t *)c_zalloc(MQTT_BUF_SIZE); if(!mud->connect_info.client_id || !mud->connect_info.username || !mud->connect_info.password){
if(!mud->connect_info.client_id || !mud->connect_info.username || !mud->connect_info.password || !mud->mqtt_state.in_buffer){
if(mud->connect_info.client_id) { if(mud->connect_info.client_id) {
c_free(mud->connect_info.client_id); c_free(mud->connect_info.client_id);
mud->connect_info.client_id = NULL; mud->connect_info.client_id = NULL;
@ -638,10 +643,6 @@ static int mqtt_socket_client( lua_State* L )
if(mud->connect_info.password) { if(mud->connect_info.password) {
c_free(mud->connect_info.password); c_free(mud->connect_info.password);
mud->connect_info.password = NULL; mud->connect_info.password = NULL;
}
if(mud->mqtt_state.in_buffer) {
c_free(mud->mqtt_state.in_buffer);
mud->mqtt_state.in_buffer = NULL;
} }
return luaL_error(L, "not enough memory"); return luaL_error(L, "not enough memory");
} }
@ -660,9 +661,8 @@ static int mqtt_socket_client( lua_State* L )
mud->connect_info.will_retain = 0; mud->connect_info.will_retain = 0;
mud->connect_info.keepalive = keepalive; mud->connect_info.keepalive = keepalive;
mud->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;
mud->mqtt_state.pending_msg_q = NULL; mud->mqtt_state.pending_msg_q = NULL;
mud->mqtt_state.auto_reconnect = 1; mud->mqtt_state.auto_reconnect = 0;
mud->mqtt_state.port = 1883; mud->mqtt_state.port = 1883;
mud->mqtt_state.connect_info = &mud->connect_info; mud->mqtt_state.connect_info = &mud->connect_info;
@ -722,10 +722,6 @@ static int mqtt_delete( lua_State* L )
c_free(mud->connect_info.password); c_free(mud->connect_info.password);
mud->connect_info.password = NULL; mud->connect_info.password = NULL;
} }
if(mud->mqtt_state.in_buffer){
c_free(mud->mqtt_state.in_buffer);
mud->mqtt_state.in_buffer = NULL;
}
// ------- // -------
// free (unref) callback ref // free (unref) callback ref

View File

@ -484,3 +484,34 @@ gpio.mode(1,gpio.OUTPUT,gpio.PULLUP)
gpio.serout(1,0,{20,10,10,20,10,10,10,100}) -- sim uart one byte 0x5A at about 100kbps gpio.serout(1,0,{20,10,10,20,10,10,10,100}) -- sim uart one byte 0x5A at about 100kbps
gpio.serout(1,1,{8,18},8) -- serial 30% pwm 38k, lasts 8 cycles gpio.serout(1,1,{8,18},8) -- serial 30% pwm 38k, lasts 8 cycles
-- Lua: mqtt.Client(clientid, keepalive, user, pass)
-- test with cloudmqtt.com
m_dis={}
function dispatch(m,t,pl)
if pl~=nil and m_dis[t] then
m_dis[t](pl)
end
end
function topic1func(pl)
print("get1: "..pl)
end
function topic2func(pl)
print("get2: "..pl)
end
m_dis["/topic1"]=topic1func
m_dis["/topic2"]=topic2func
m=mqtt.Client("nodemcu1",60,"test","test123")
m:on("connect",function(m)
print("connection "..node.heap())
m:subscribe("/topic1",0,function(m) print("sub done") end)
m:subscribe("/topic2",0,function(m) print("sub done") end)
m:publish("/topic1","hello",0,0) m:publish("/topic2","world",0,0)
end )
m:on("offline", function(conn)
print("disconnect to broker...")
print(node.heap())
end)
m:on("message",dispatch )
m:connect("m11.cloudmqtt.com",11214,0,1)
-- Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client) )

View File

@ -0,0 +1,30 @@
-- Lua: mqtt.Client(clientid, keepalive, user, pass)
-- test with cloudmqtt.com
m_dis={}
function dispatch(m,t,pl)
if pl~=nil and m_dis[t] then
m_dis[t](pl)
end
end
function topic1func(pl)
print("get1: "..pl)
end
function topic2func(pl)
print("get2: "..pl)
end
m_dis["/topic1"]=topic1func
m_dis["/topic2"]=topic2func
m=mqtt.Client("nodemcu1",60,"test","test123")
m:on("connect",function(m)
print("connection "..node.heap())
m:subscribe("/topic1",0,function(m) print("sub done") end)
m:subscribe("/topic2",0,function(m) print("sub done") end)
m:publish("/topic1","hello",0,0) m:publish("/topic2","world",0,0)
end )
m:on("offline", function(conn)
print("disconnect to broker...")
print(node.heap())
end)
m:on("message",dispatch )
m:connect("m11.cloudmqtt.com",11214,0,1)
-- Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client) )