From dcb6e53af7b534b1d679339949922cc09bffa286 Mon Sep 17 00:00:00 2001 From: funshine Date: Tue, 31 Mar 2015 00:36:44 +0800 Subject: [PATCH] add auto-reconnect option to mqtt:connect api --- README.md | 8 +- app/include/user_version.h | 2 +- app/modules/mqtt.c | 421 +++++++++++++++++++++---------------- examples/fragment.lua | 24 +++ 4 files changed, 275 insertions(+), 180 deletions(-) diff --git a/README.md b/README.md index 90ec76e3..13a817da 100644 --- a/README.md +++ b/README.md @@ -225,8 +225,10 @@ m:on("message", function(conn, topic, data) end end) --- for secure: m:connect("192.168.11.118", 1880, 1) -m:connect("192.168.11.118", 1880, 0, function(conn) print("connected") end) +-- m:connect( host, port, secure, auto_reconnect, function(client) ) +-- for secure: m:connect("192.168.11.118", 1880, 1, 0) +-- for auto-reconnect: m:connect("192.168.11.118", 1880, 0, 1) +m:connect("192.168.11.118", 1880, 0, 0, function(conn) print("connected") end) -- subscribe topic with qos = 0 m:subscribe("/topic",0, function(conn) print("subscribe success") end) @@ -235,7 +237,7 @@ m:subscribe("/topic",0, function(conn) print("subscribe success") end) -- publish a message with data = hello, QoS = 0, retain = 0 m:publish("/topic","hello",0,0, function(conn) print("sent") end) -m:close(); +m:close(); -- if auto-reconnect = 1, will reconnect. -- you can call m:connect again ``` diff --git a/app/include/user_version.h b/app/include/user_version.h index 89a63fc3..79d104c5 100644 --- a/app/include/user_version.h +++ b/app/include/user_version.h @@ -7,6 +7,6 @@ #define NODE_VERSION_INTERNAL 0U #define NODE_VERSION "NodeMCU 0.9.5" -#define BUILD_DATE "build 20150330" +#define BUILD_DATE "build 20150331" #endif /* __USER_VERSION_H__ */ diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index db9990c5..2cc3ad6d 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -25,6 +25,7 @@ static lua_State *gL = NULL; #define MQTT_MAX_USER_LEN 64 #define MQTT_MAX_PASS_LEN 64 #define MQTT_SEND_TIMEOUT 5 +#define MQTT_CONNECT_TIMEOUT 5 typedef enum { MQTT_INIT, @@ -71,16 +72,64 @@ typedef struct lmqtt_userdata mqtt_state_t mqtt_state; mqtt_connect_info_t connect_info; uint32_t keep_alive_tick; - uint32_t send_timeout; + uint32_t event_timeout; uint8_t secure; bool connected; // indicate socket connected, not mqtt prot connected. ETSTimer mqttTimer; tConnState connState; }lmqtt_userdata; +static void socket_connect(struct espconn *pesp_conn); + static void mqtt_socket_disconnected(void *arg) // tcp only { - NODE_DBG("mqtt_socket_disconnected is called.\n"); + NODE_DBG("enter mqtt_socket_disconnected.\n"); + struct espconn *pesp_conn = arg; + bool call_back = false; + if(pesp_conn == NULL) + return; + lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse; + if(mud == NULL) + return; + + os_timer_disarm(&mud->mqttTimer); + + if(mud->pesp_conn){ + mud->pesp_conn->reverse = NULL; + if(mud->pesp_conn->proto.tcp) + c_free(mud->pesp_conn->proto.tcp); + mud->pesp_conn->proto.tcp = NULL; + c_free(mud->pesp_conn); + mud->pesp_conn = NULL; + } + + if(mud->connected){ // call back only called when socket is from connection to disconnection. + mud->connected = false; + if((mud->cb_disconnect_ref != LUA_NOREF) && (mud->self_ref != LUA_NOREF)) { + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua + call_back = true; + } + } + + lua_gc(gL, LUA_GCSTOP, 0); + if(mud->self_ref != LUA_NOREF){ // TODO: should we unref the client and delete it? + luaL_unref(gL, LUA_REGISTRYINDEX, mud->self_ref); + mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self + } + lua_gc(gL, LUA_GCRESTART, 0); + + if(call_back){ + lua_call(gL, 1, 0); + } + + NODE_DBG("leave mqtt_socket_disconnected.\n"); +} + +static void mqtt_socket_reconnected(void *arg, sint8_t err) +{ + NODE_DBG("enter mqtt_socket_reconnected.\n"); + // mqtt_socket_disconnected(arg); struct espconn *pesp_conn = arg; if(pesp_conn == NULL) return; @@ -88,44 +137,21 @@ static void mqtt_socket_disconnected(void *arg) // tcp only if(mud == NULL) return; - if(mud->connected){ - mud->connected = false; - if(mud->pesp_conn && mud->pesp_conn->proto.tcp) - c_free(mud->pesp_conn->proto.tcp); - mud->pesp_conn->proto.tcp = NULL; - if(mud->pesp_conn) - c_free(mud->pesp_conn); - mud->pesp_conn = NULL; // espconn is already disconnected - lua_gc(gL, LUA_GCSTOP, 0); - if(mud->self_ref != LUA_NOREF){ // TODO: should we unref the client and delete it? - luaL_unref(gL, LUA_REGISTRYINDEX, mud->self_ref); - mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self - } - lua_gc(gL, LUA_GCRESTART, 0); - } + pesp_conn->proto.tcp->remote_port = mud->mqtt_state.port; + pesp_conn->proto.tcp->local_port = espconn_port(); - mud->connected = false; os_timer_disarm(&mud->mqttTimer); - - if(mud->cb_disconnect_ref != LUA_NOREF) - { - lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); - if( mud->self_ref != LUA_NOREF) - lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua - else - lua_pushnil(gL); - lua_call(gL, 1, 0); + if( (mud->event_timeout != 0) || mud->mqtt_state.auto_reconnect ){ + socket_connect(pesp_conn); + } else { + mqtt_socket_disconnected(arg); } -} - -static void mqtt_socket_reconnected(void *arg, sint8_t err) -{ - NODE_DBG("mqtt_socket_reconnected is called.\n"); - mqtt_socket_disconnected(arg); + NODE_DBG("leave mqtt_socket_reconnected.\n"); } static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length) { + NODE_DBG("enter deliver_publish.\n"); const char comma[] = ","; mqtt_event_data_t event_data; @@ -149,11 +175,12 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length) } else { lua_call(gL, 2, 0); } + NODE_DBG("leave deliver_publish.\n"); } static void mqtt_socket_received(void *arg, char *pdata, unsigned short len) { - NODE_DBG("mqtt_socket_received is called.\n"); + NODE_DBG("enter mqtt_socket_received.\n"); uint8_t msg_type; uint8_t msg_qos; @@ -179,12 +206,10 @@ READPACKET: if(mqtt_get_type(mud->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK){ NODE_DBG("MQTT: Invalid packet\r\n"); mud->connState = MQTT_INIT; - if(mud->secure){ + if(mud->secure) espconn_secure_disconnect(pesp_conn); - } - else { + else espconn_disconnect(pesp_conn); - } } else { mud->connState = MQTT_DATA; NODE_DBG("MQTT: Connected\r\n"); @@ -320,8 +345,8 @@ READPACKET: break; } - if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->send_timeout == 0){ - mud->send_timeout = MQTT_SEND_TIMEOUT; + if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->event_timeout == 0){ + mud->event_timeout = MQTT_SEND_TIMEOUT; NODE_DBG("Sent: %d\n", node->msg.length); if( mud->secure ) espconn_secure_sent( pesp_conn, node->msg.data, node->msg.length ); @@ -330,13 +355,13 @@ READPACKET: mud->keep_alive_tick = 0; mud->mqtt_state.outbound_message = NULL; } - + NODE_DBG("leave mqtt_socket_received.\n"); return; } static void mqtt_socket_sent(void *arg) { - // NODE_DBG("mqtt_socket_sent is called.\n"); + NODE_DBG("enter mqtt_socket_sent.\n"); struct espconn *pesp_conn = arg; if(pesp_conn == NULL) return; @@ -350,7 +375,7 @@ static void mqtt_socket_sent(void *arg) } // call mqtt_sent() - mud->send_timeout = 0; + mud->event_timeout = 0; // qos = 0, publish and forgot. msg_queue_t *node = mud->mqtt_state.pending_msg_q; @@ -364,12 +389,12 @@ static void mqtt_socket_sent(void *arg) lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua lua_call(gL, 1, 0); } + NODE_DBG("leave mqtt_socket_sent.\n"); } -static int mqtt_socket_client( lua_State* L ); static void mqtt_socket_connected(void *arg) { - NODE_DBG("mqtt_socket_connected is called.\n"); + NODE_DBG("enter mqtt_socket_connected.\n"); struct espconn *pesp_conn = arg; if(pesp_conn == NULL) return; @@ -385,26 +410,30 @@ static void mqtt_socket_connected(void *arg) mqtt_msg_init(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.out_buffer, mud->mqtt_state.out_buffer_length); mud->mqtt_state.outbound_message = mqtt_msg_connect(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.connect_info); NODE_DBG("Send MQTT connection infomation, data len: %d, d[0]=%d \r\n", mud->mqtt_state.outbound_message->length, mud->mqtt_state.outbound_message->data[0]); - if(mud->secure){ + mud->event_timeout = MQTT_SEND_TIMEOUT; + if(mud->secure) espconn_secure_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); - } else - { espconn_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); - } mud->mqtt_state.outbound_message = NULL; mud->connState = MQTT_CONNECT_SENDING; + NODE_DBG("leave mqtt_socket_connected.\n"); return; } void mqtt_socket_timer(void *arg) { + // NODE_DBG("enter mqtt_socket_timer.\n"); lmqtt_userdata *mud = (lmqtt_userdata*) arg; if(mud == NULL) return; - if(mud->send_timeout > 0){ - mud->send_timeout --; + if(mud->event_timeout > 0){ + NODE_DBG("event_timeout: %d.\n", mud->event_timeout); + mud->event_timeout --; + if(mud->event_timeout > 0){ + return; + } } if(mud->pesp_conn == NULL){ @@ -412,50 +441,49 @@ void mqtt_socket_timer(void *arg) return; } - if(mud->send_timeout == 0){ // switch to next queued event. - if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT time out. - mud->connState = MQTT_INIT; - if(mud->secure){ - espconn_secure_disconnect(mud->pesp_conn); - } - else { - espconn_disconnect(mud->pesp_conn); - } - mud->keep_alive_tick = 0; // not need count anymore - } else if(mud->connState == MQTT_DATA){ - msg_queue_t *pending_msg = mud->mqtt_state.pending_msg_q; - if(pending_msg){ - mud->send_timeout = MQTT_SEND_TIMEOUT; - if(mud->secure) - espconn_secure_sent(mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length); - else - espconn_sent(mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length); - mud->keep_alive_tick = 0; - NODE_DBG("id: %d - qos: %d, length: %d\n", pending_msg->msg_id, pending_msg->publish_qos, pending_msg->msg.length); - } - // no queued event. - } - } - - if(mud->connState == MQTT_DATA){ - mud->keep_alive_tick ++; - if(mud->keep_alive_tick > mud->mqtt_state.connect_info->keepalive){ - mud->send_timeout = MQTT_SEND_TIMEOUT; - NODE_DBG("\r\nMQTT: Send keepalive packet\r\n"); - mud->mqtt_state.outbound_message = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection); + if(mud->connState == MQTT_INIT){ // socket connect time out. + NODE_DBG("Can not connect to broker.\n"); + } else if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out. + mud->connState = MQTT_INIT; + if(mud->secure) + espconn_secure_disconnect(mud->pesp_conn); + else + espconn_disconnect(mud->pesp_conn); + mud->keep_alive_tick = 0; // not need count anymore + } else if(mud->connState == MQTT_CONNECT_SENT){ // wait for CONACK time out. + NODE_DBG("MQTT_CONNECT failed.\n"); + } else if(mud->connState == MQTT_DATA){ + msg_queue_t *pending_msg = mud->mqtt_state.pending_msg_q; + if(pending_msg){ + mud->event_timeout = MQTT_SEND_TIMEOUT; if(mud->secure) - espconn_secure_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + espconn_secure_sent(mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length); else - espconn_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + espconn_sent(mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length); mud->keep_alive_tick = 0; + NODE_DBG("id: %d - qos: %d, length: %d\n", pending_msg->msg_id, pending_msg->publish_qos, pending_msg->msg.length); + } else { + // no queued event. + mud->keep_alive_tick ++; + if(mud->keep_alive_tick > mud->mqtt_state.connect_info->keepalive){ + mud->event_timeout = MQTT_SEND_TIMEOUT; + NODE_DBG("\r\nMQTT: Send keepalive packet\r\n"); + mud->mqtt_state.outbound_message = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection); + if(mud->secure) + espconn_secure_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + else + espconn_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + mud->keep_alive_tick = 0; + } } } + // NODE_DBG("leave mqtt_socket_timer.\n"); } // Lua: mqtt.Client(clientid, keepalive, user, pass) static int mqtt_socket_client( lua_State* L ) { - NODE_DBG("mqtt_socket_client is called.\n"); + NODE_DBG("enter mqtt_socket_client.\n"); lmqtt_userdata *mud; char tempid[20] = {0}; @@ -485,8 +513,9 @@ static int mqtt_socket_client( lua_State* L ) mud->secure = 0; mud->keep_alive_tick = 0; - mud->send_timeout = 0; + mud->event_timeout = 0; mud->connState = MQTT_INIT; + mud->connected = false; c_memset(&mud->mqttTimer, 0, sizeof(ETSTimer)); c_memset(&mud->mqtt_state, 0, sizeof(mqtt_state_t)); c_memset(&mud->connect_info, 0, sizeof(mqtt_connect_info_t)); @@ -576,8 +605,11 @@ static int mqtt_socket_client( lua_State* L ) mud->mqtt_state.in_buffer_length = MQTT_BUF_SIZE; mud->mqtt_state.out_buffer_length = MQTT_BUF_SIZE; mud->mqtt_state.pending_msg_q = NULL; + mud->mqtt_state.auto_reconnect = 1; + mud->mqtt_state.port = 1883; mud->mqtt_state.connect_info = &mud->connect_info; + NODE_DBG("leave mqtt_socket_client.\n"); return 1; } @@ -586,7 +618,7 @@ static int mqtt_socket_client( lua_State* L ) // socket: unref everything static int mqtt_delete( lua_State* L ) { - NODE_DBG("mqtt_delete is called.\n"); + NODE_DBG("enter mqtt_delete.\n"); lmqtt_userdata *mud = (lmqtt_userdata *)luaL_checkudata(L, 1, "mqtt.socket"); luaL_argcheck(L, mud, 1, "mqtt.socket expected"); @@ -670,26 +702,27 @@ static int mqtt_delete( lua_State* L ) mud->self_ref = LUA_NOREF; } lua_gc(gL, LUA_GCRESTART, 0); + NODE_DBG("leave mqtt_delete.\n"); return 0; } static void socket_connect(struct espconn *pesp_conn) { + NODE_DBG("enter socket_connect.\n"); if(pesp_conn == NULL) return; lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse; if(mud == NULL) return; - if(mud->secure){ + if(mud->secure) espconn_secure_connect(pesp_conn); - } else - { espconn_connect(pesp_conn); - } + + os_timer_arm(&mud->mqttTimer, 1000, 1); - NODE_DBG("socket_connect is called.\n"); + NODE_DBG("leave socket_connect.\n"); } static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg); @@ -697,7 +730,7 @@ static dns_reconn_count = 0; static ip_addr_t host_ip; // for dns static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) { - NODE_DBG("socket_dns_found is called.\n"); + NODE_DBG("enter socket_dns_found.\n"); struct espconn *pesp_conn = arg; if(pesp_conn == NULL){ NODE_DBG("pesp_conn null.\n"); @@ -729,86 +762,20 @@ static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) NODE_DBG("\n"); socket_connect(pesp_conn); } + NODE_DBG("leave socket_dns_found.\n"); } -// Lua: mqtt:lwt( topic, message, qos, retain, function(client) ) -static int mqtt_socket_lwt( lua_State* L ) -{ - uint8_t stack = 1; - size_t topicSize, msgSize; - NODE_DBG("mqtt_socket_lwt.\n"); - lmqtt_userdata *mud = NULL; - const char *lwtTopic, *lwtMsg; - uint8_t lwtQoS, lwtRetain; - - mud = (lmqtt_userdata *)luaL_checkudata( L, stack, "mqtt.socket" ); - luaL_argcheck( L, mud, stack, "mqtt.socket expected" ); - - if(mud == NULL) - return 0; - - stack++; - lwtTopic = luaL_checklstring( L, stack, &topicSize ); - if (lwtTopic == NULL) - { - return luaL_error( L, "need lwt topic"); - } - - stack++; - lwtMsg = luaL_checklstring( L, stack, &msgSize ); - if (lwtMsg == NULL) - { - return luaL_error( L, "need lwt message"); - } - - mud->connect_info.will_topic = (uint8_t*) c_zalloc( topicSize + 1 ); - mud->connect_info.will_message = (uint8_t*) c_zalloc( msgSize + 1 ); - if(!mud->connect_info.will_topic || !mud->connect_info.will_message){ - if(mud->connect_info.will_topic){ - c_free(mud->connect_info.will_topic); - mud->connect_info.will_topic = NULL; - } - if(mud->connect_info.will_message){ - c_free(mud->connect_info.will_message); - mud->connect_info.will_message = NULL; - } - return luaL_error( L, "not enough memory"); - } - c_memcpy(mud->connect_info.will_topic, lwtTopic, topicSize); - mud->connect_info.will_topic[topicSize] = 0; - c_memcpy(mud->connect_info.will_message, lwtMsg, msgSize); - mud->connect_info.will_message[msgSize] = 0; - - if ( lua_isnumber(L, stack) ) - { - mud->connect_info.will_qos = lua_tointeger(L, stack); - stack++; - } - if ( lua_isnumber(L, stack) ) - { - mud->connect_info.will_retain = lua_tointeger(L, stack); - stack++; - } - - NODE_DBG("mqtt_socket_lwt: topic: %s, message: %s, qos: %d, retain: %d\n", - mud->connect_info.will_topic, - mud->connect_info.will_message, - mud->connect_info.will_qos, - mud->connect_info.will_retain); - return 0; -} - -// Lua: mqtt:connect( host, port, secure, function(client) ) +// Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client) ) static int mqtt_socket_connect( lua_State* L ) { - NODE_DBG("mqtt_socket_connect is called.\n"); + NODE_DBG("enter mqtt_socket_connect.\n"); lmqtt_userdata *mud = NULL; unsigned port = 1883; size_t il; ip_addr_t ipaddr; const char *domain; int stack = 1; - unsigned secure = 0; + unsigned secure = 0, auto_reconnect = 0; int top = lua_gettop(L); mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket"); @@ -817,6 +784,10 @@ static int mqtt_socket_connect( lua_State* L ) if(mud == NULL) return 0; + if(mud->connected){ + return luaL_error(L, "already connected"); + } + if(mud->pesp_conn){ //TODO: should I free tcp struct directly or ask user to call close()??? mud->pesp_conn->reverse = NULL; if(mud->pesp_conn->proto.tcp) @@ -868,6 +839,7 @@ static int mqtt_socket_connect( lua_State* L ) } pesp_conn->proto.tcp->remote_port = port; pesp_conn->proto.tcp->local_port = espconn_port(); + mud->mqtt_state.port = port; if ( (stack<=top) && lua_isnumber(L, stack) ) { @@ -881,6 +853,18 @@ static int mqtt_socket_connect( lua_State* L ) } mud->secure = secure; // save + if ( (stack<=top) && lua_isnumber(L, stack) ) + { + auto_reconnect = lua_tointeger(L, stack); + stack++; + if ( auto_reconnect != 0 && auto_reconnect != 1 ){ + auto_reconnect = 0; // default to 0 + } + } else { + auto_reconnect = 0; // default to 0 + } + mud->mqtt_state.auto_reconnect = auto_reconnect; + // call back function when a connection is obtained, tcp only if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){ lua_pushvalue(L, stack); // copy argument (func) to the top of stack @@ -898,6 +882,12 @@ static int mqtt_socket_connect( lua_State* L ) espconn_regist_connectcb(pesp_conn, mqtt_socket_connected); espconn_regist_reconcb(pesp_conn, mqtt_socket_reconnected); + os_timer_disarm(&mud->mqttTimer); + os_timer_setfn(&mud->mqttTimer, (os_timer_func_t *)mqtt_socket_timer, mud); + // timer started in socket_connect() + mud->event_timeout = MQTT_CONNECT_TIMEOUT; + mud->connState = MQTT_INIT; + if((ipaddr.addr == IPADDR_NONE) && (c_memcmp(domain,"255.255.255.255",16) != 0)) { host_ip.addr = 0; @@ -911,9 +901,7 @@ static int mqtt_socket_connect( lua_State* L ) socket_connect(pesp_conn); } - os_timer_disarm(&mud->mqttTimer); - os_timer_setfn(&mud->mqttTimer, (os_timer_func_t *)mqtt_socket_timer, mud); - os_timer_arm(&mud->mqttTimer, 1000, 1); + NODE_DBG("leave mqtt_socket_connect.\n"); return 0; } @@ -921,7 +909,7 @@ static int mqtt_socket_connect( lua_State* L ) // client disconnect and unref itself static int mqtt_socket_close( lua_State* L ) { - NODE_DBG("mqtt_socket_close is called.\n"); + NODE_DBG("enter mqtt_socket_close.\n"); int i = 0; lmqtt_userdata *mud = NULL; @@ -944,13 +932,14 @@ static int mqtt_socket_close( lua_State* L ) if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port) espconn_disconnect(mud->pesp_conn); } + NODE_DBG("leave mqtt_socket_close.\n"); return 0; } // Lua: mqtt:on( "method", function() ) static int mqtt_socket_on( lua_State* L ) { - NODE_DBG("mqtt_on is called.\n"); + NODE_DBG("enter mqtt_socket_on.\n"); lmqtt_userdata *mud; size_t sl; @@ -984,13 +973,13 @@ static int mqtt_socket_on( lua_State* L ) lua_pop(L, 1); return luaL_error( L, "method not supported" ); } - + NODE_DBG("leave mqtt_socket_on.\n"); return 0; } // Lua: bool = mqtt:subscribe(topic, qos, function()) static int mqtt_socket_subscribe( lua_State* L ) { - NODE_DBG("mqtt_socket_subscribe is called.\n"); + NODE_DBG("enter mqtt_socket_subscribe.\n"); uint8_t stack = 1, qos = 0; uint16_t msg_id = 0; @@ -1068,8 +1057,8 @@ static int mqtt_socket_subscribe( lua_State* L ) { NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length); - if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->send_timeout == 0){ - mud->send_timeout = MQTT_SEND_TIMEOUT; + if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->event_timeout == 0){ + mud->event_timeout = MQTT_SEND_TIMEOUT; NODE_DBG("Sent: %d\n", node->msg.length); if( mud->secure ) espconn_secure_sent( mud->pesp_conn, node->msg.data, node->msg.length ); @@ -1084,13 +1073,14 @@ static int mqtt_socket_subscribe( lua_State* L ) { lua_pushboolean(L, 1); // enqueued succeed. } mud->mqtt_state.outbound_message = NULL; + NODE_DBG("leave mqtt_socket_subscribe.\n"); return 1; } // Lua: bool = mqtt:publish( topic, payload, qos, retain, function() ) static int mqtt_socket_publish( lua_State* L ) { - // NODE_DBG("mqtt_publish is called.\n"); + NODE_DBG("enter mqtt_socket_publish.\n"); struct espconn *pesp_conn = NULL; lmqtt_userdata *mud; size_t l; @@ -1143,8 +1133,8 @@ static int mqtt_socket_publish( lua_State* L ) msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos ); - if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->send_timeout == 0){ - mud->send_timeout = MQTT_SEND_TIMEOUT; + if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->event_timeout == 0){ + mud->event_timeout = MQTT_SEND_TIMEOUT; NODE_DBG("Sent: %d\n", node->msg.length); if( mud->secure ) espconn_secure_sent( mud->pesp_conn, node->msg.data, node->msg.length ); @@ -1159,20 +1149,99 @@ static int mqtt_socket_publish( lua_State* L ) lua_pushboolean(L, 1); // enqueued succeed. } mud->mqtt_state.outbound_message = NULL; + NODE_DBG("leave mqtt_socket_publish.\n"); return 1; } +// Lua: mqtt:lwt( topic, message, qos, retain, function(client) ) +static int mqtt_socket_lwt( lua_State* L ) +{ + NODE_DBG("enter mqtt_socket_lwt.\n"); + uint8_t stack = 1; + size_t topicSize, msgSize; + NODE_DBG("mqtt_socket_lwt.\n"); + lmqtt_userdata *mud = NULL; + const char *lwtTopic, *lwtMsg; + uint8_t lwtQoS, lwtRetain; + + mud = (lmqtt_userdata *)luaL_checkudata( L, stack, "mqtt.socket" ); + luaL_argcheck( L, mud, stack, "mqtt.socket expected" ); + + if(mud == NULL) + return 0; + + stack++; + lwtTopic = luaL_checklstring( L, stack, &topicSize ); + if (lwtTopic == NULL) + { + return luaL_error( L, "need lwt topic"); + } + + stack++; + lwtMsg = luaL_checklstring( L, stack, &msgSize ); + if (lwtMsg == NULL) + { + return luaL_error( L, "need lwt message"); + } + + if(mud->connect_info.will_topic){ // free the previous one if there is any + c_free(mud->connect_info.will_topic); + mud->connect_info.will_topic = NULL; + } + if(mud->connect_info.will_message){ + c_free(mud->connect_info.will_message); + mud->connect_info.will_message = NULL; + } + + mud->connect_info.will_topic = (uint8_t*) c_zalloc( topicSize + 1 ); + mud->connect_info.will_message = (uint8_t*) c_zalloc( msgSize + 1 ); + if(!mud->connect_info.will_topic || !mud->connect_info.will_message){ + if(mud->connect_info.will_topic){ + c_free(mud->connect_info.will_topic); + mud->connect_info.will_topic = NULL; + } + if(mud->connect_info.will_message){ + c_free(mud->connect_info.will_message); + mud->connect_info.will_message = NULL; + } + return luaL_error( L, "not enough memory"); + } + c_memcpy(mud->connect_info.will_topic, lwtTopic, topicSize); + mud->connect_info.will_topic[topicSize] = 0; + c_memcpy(mud->connect_info.will_message, lwtMsg, msgSize); + mud->connect_info.will_message[msgSize] = 0; + + if ( lua_isnumber(L, stack) ) + { + mud->connect_info.will_qos = lua_tointeger(L, stack); + stack++; + } + if ( lua_isnumber(L, stack) ) + { + mud->connect_info.will_retain = lua_tointeger(L, stack); + stack++; + } + + NODE_DBG("mqtt_socket_lwt: topic: %s, message: %s, qos: %d, retain: %d\n", + mud->connect_info.will_topic, + mud->connect_info.will_message, + mud->connect_info.will_qos, + mud->connect_info.will_retain); + NODE_DBG("leave mqtt_socket_lwt.\n"); + return 0; +} + // Module function map #define MIN_OPT_LEVEL 2 #include "lrodefs.h" static const LUA_REG_TYPE mqtt_socket_map[] = { - { LSTRKEY( "lwt" ), LFUNCVAL ( mqtt_socket_lwt ) }, { LSTRKEY( "connect" ), LFUNCVAL ( mqtt_socket_connect ) }, { LSTRKEY( "close" ), LFUNCVAL ( mqtt_socket_close ) }, { LSTRKEY( "publish" ), LFUNCVAL ( mqtt_socket_publish ) }, { LSTRKEY( "subscribe" ), LFUNCVAL ( mqtt_socket_subscribe ) }, + { LSTRKEY( "lwt" ), LFUNCVAL ( mqtt_socket_lwt ) }, { LSTRKEY( "on" ), LFUNCVAL ( mqtt_socket_on ) }, { LSTRKEY( "__gc" ), LFUNCVAL ( mqtt_delete ) }, #if LUA_OPTIMIZE_MEMORY > 0 diff --git a/examples/fragment.lua b/examples/fragment.lua index f24729de..fb938191 100644 --- a/examples/fragment.lua +++ b/examples/fragment.lua @@ -397,6 +397,8 @@ string.gsub("abc%0Ddef", "%%(%x%x)", ex) print("hello") v="abc%0D%0Adef" pcall(function() print(string.gsub(v, "%%(%x%x)", function(x) return string.char(tonumber(x, 16)) end)) end) +mosca -v | bunyan + m=mqtt.Client() m:connect("192.168.18.88",1883) topic={} @@ -426,3 +428,25 @@ m=mqtt.Client() m:on("connect",function(m) print("connection") end ) m:connect("192.168.18.88",1883) m:on("offline",function(m) print("disconnection") end ) + +m=mqtt.Client() +m:on("connect",function(m) print("connection "..node.heap()) end ) +m:on("offline", function(conn) + if conn == nil then print("conn is nil") end + print("Reconnect to broker...") + print(node.heap()) + conn:connect("192.168.18.88",1883,0,1) +end) +m:connect("192.168.18.88",1883,0,1) + +m=mqtt.Client() +m:on("connect",function(m) print("connection "..node.heap()) end ) +m:on("offline", function(conn) + if conn == nil then print("conn is nil") end + print("Reconnect to broker...") + print(node.heap()) + conn:connect("192.168.18.88",1883) +end) +m:connect("192.168.18.88",1883) + +m:close()