add auto-reconnect option to mqtt:connect api

This commit is contained in:
funshine 2015-03-31 00:36:44 +08:00
parent 09750b5653
commit dcb6e53af7
4 changed files with 275 additions and 180 deletions

View File

@ -225,8 +225,10 @@ m:on("message", function(conn, topic, data)
end end
end) end)
-- for secure: m:connect("192.168.11.118", 1880, 1) -- m:connect( host, port, secure, auto_reconnect, function(client) )
m:connect("192.168.11.118", 1880, 0, function(conn) print("connected") end) -- for secure: m:connect("192.168.11.118", 1880, 1, 0)
-- for auto-reconnect: m:connect("192.168.11.118", 1880, 0, 1)
m:connect("192.168.11.118", 1880, 0, 0, function(conn) print("connected") end)
-- subscribe topic with qos = 0 -- subscribe topic with qos = 0
m:subscribe("/topic",0, function(conn) print("subscribe success") end) m:subscribe("/topic",0, function(conn) print("subscribe success") end)
@ -235,7 +237,7 @@ m:subscribe("/topic",0, function(conn) print("subscribe success") end)
-- publish a message with data = hello, QoS = 0, retain = 0 -- publish a message with data = hello, QoS = 0, retain = 0
m:publish("/topic","hello",0,0, function(conn) print("sent") end) m:publish("/topic","hello",0,0, function(conn) print("sent") end)
m:close(); m:close(); -- if auto-reconnect = 1, will reconnect.
-- you can call m:connect again -- you can call m:connect again
``` ```

View File

@ -7,6 +7,6 @@
#define NODE_VERSION_INTERNAL 0U #define NODE_VERSION_INTERNAL 0U
#define NODE_VERSION "NodeMCU 0.9.5" #define NODE_VERSION "NodeMCU 0.9.5"
#define BUILD_DATE "build 20150330" #define BUILD_DATE "build 20150331"
#endif /* __USER_VERSION_H__ */ #endif /* __USER_VERSION_H__ */

View File

@ -25,6 +25,7 @@ static lua_State *gL = NULL;
#define MQTT_MAX_USER_LEN 64 #define MQTT_MAX_USER_LEN 64
#define MQTT_MAX_PASS_LEN 64 #define MQTT_MAX_PASS_LEN 64
#define MQTT_SEND_TIMEOUT 5 #define MQTT_SEND_TIMEOUT 5
#define MQTT_CONNECT_TIMEOUT 5
typedef enum { typedef enum {
MQTT_INIT, MQTT_INIT,
@ -71,16 +72,64 @@ typedef struct lmqtt_userdata
mqtt_state_t mqtt_state; mqtt_state_t mqtt_state;
mqtt_connect_info_t connect_info; mqtt_connect_info_t connect_info;
uint32_t keep_alive_tick; uint32_t keep_alive_tick;
uint32_t send_timeout; uint32_t event_timeout;
uint8_t secure; uint8_t secure;
bool connected; // indicate socket connected, not mqtt prot connected. bool connected; // indicate socket connected, not mqtt prot connected.
ETSTimer mqttTimer; ETSTimer mqttTimer;
tConnState connState; tConnState connState;
}lmqtt_userdata; }lmqtt_userdata;
static void socket_connect(struct espconn *pesp_conn);
static void mqtt_socket_disconnected(void *arg) // tcp only static void mqtt_socket_disconnected(void *arg) // tcp only
{ {
NODE_DBG("mqtt_socket_disconnected is called.\n"); NODE_DBG("enter mqtt_socket_disconnected.\n");
struct espconn *pesp_conn = arg;
bool call_back = false;
if(pesp_conn == NULL)
return;
lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse;
if(mud == NULL)
return;
os_timer_disarm(&mud->mqttTimer);
if(mud->pesp_conn){
mud->pesp_conn->reverse = NULL;
if(mud->pesp_conn->proto.tcp)
c_free(mud->pesp_conn->proto.tcp);
mud->pesp_conn->proto.tcp = NULL;
c_free(mud->pesp_conn);
mud->pesp_conn = NULL;
}
if(mud->connected){ // call back only called when socket is from connection to disconnection.
mud->connected = false;
if((mud->cb_disconnect_ref != LUA_NOREF) && (mud->self_ref != LUA_NOREF)) {
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_disconnect_ref);
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua
call_back = true;
}
}
lua_gc(gL, LUA_GCSTOP, 0);
if(mud->self_ref != LUA_NOREF){ // TODO: should we unref the client and delete it?
luaL_unref(gL, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self
}
lua_gc(gL, LUA_GCRESTART, 0);
if(call_back){
lua_call(gL, 1, 0);
}
NODE_DBG("leave mqtt_socket_disconnected.\n");
}
static void mqtt_socket_reconnected(void *arg, sint8_t err)
{
NODE_DBG("enter mqtt_socket_reconnected.\n");
// mqtt_socket_disconnected(arg);
struct espconn *pesp_conn = arg; struct espconn *pesp_conn = arg;
if(pesp_conn == NULL) if(pesp_conn == NULL)
return; return;
@ -88,44 +137,21 @@ static void mqtt_socket_disconnected(void *arg) // tcp only
if(mud == NULL) if(mud == NULL)
return; return;
if(mud->connected){ pesp_conn->proto.tcp->remote_port = mud->mqtt_state.port;
mud->connected = false; pesp_conn->proto.tcp->local_port = espconn_port();
if(mud->pesp_conn && mud->pesp_conn->proto.tcp)
c_free(mud->pesp_conn->proto.tcp);
mud->pesp_conn->proto.tcp = NULL;
if(mud->pesp_conn)
c_free(mud->pesp_conn);
mud->pesp_conn = NULL; // espconn is already disconnected
lua_gc(gL, LUA_GCSTOP, 0);
if(mud->self_ref != LUA_NOREF){ // TODO: should we unref the client and delete it?
luaL_unref(gL, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self
}
lua_gc(gL, LUA_GCRESTART, 0);
}
mud->connected = false;
os_timer_disarm(&mud->mqttTimer); os_timer_disarm(&mud->mqttTimer);
if( (mud->event_timeout != 0) || mud->mqtt_state.auto_reconnect ){
if(mud->cb_disconnect_ref != LUA_NOREF) socket_connect(pesp_conn);
{ } else {
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_disconnect_ref);
if( mud->self_ref != LUA_NOREF)
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua
else
lua_pushnil(gL);
lua_call(gL, 1, 0);
}
}
static void mqtt_socket_reconnected(void *arg, sint8_t err)
{
NODE_DBG("mqtt_socket_reconnected is called.\n");
mqtt_socket_disconnected(arg); mqtt_socket_disconnected(arg);
}
NODE_DBG("leave mqtt_socket_reconnected.\n");
} }
static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length) static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length)
{ {
NODE_DBG("enter deliver_publish.\n");
const char comma[] = ","; const char comma[] = ",";
mqtt_event_data_t event_data; mqtt_event_data_t event_data;
@ -149,11 +175,12 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length)
} else { } else {
lua_call(gL, 2, 0); lua_call(gL, 2, 0);
} }
NODE_DBG("leave deliver_publish.\n");
} }
static void mqtt_socket_received(void *arg, char *pdata, unsigned short len) static void mqtt_socket_received(void *arg, char *pdata, unsigned short len)
{ {
NODE_DBG("mqtt_socket_received is called.\n"); NODE_DBG("enter mqtt_socket_received.\n");
uint8_t msg_type; uint8_t msg_type;
uint8_t msg_qos; uint8_t msg_qos;
@ -179,12 +206,10 @@ READPACKET:
if(mqtt_get_type(mud->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK){ if(mqtt_get_type(mud->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK){
NODE_DBG("MQTT: Invalid packet\r\n"); NODE_DBG("MQTT: Invalid packet\r\n");
mud->connState = MQTT_INIT; mud->connState = MQTT_INIT;
if(mud->secure){ if(mud->secure)
espconn_secure_disconnect(pesp_conn); espconn_secure_disconnect(pesp_conn);
} else
else {
espconn_disconnect(pesp_conn); espconn_disconnect(pesp_conn);
}
} else { } else {
mud->connState = MQTT_DATA; mud->connState = MQTT_DATA;
NODE_DBG("MQTT: Connected\r\n"); NODE_DBG("MQTT: Connected\r\n");
@ -320,8 +345,8 @@ READPACKET:
break; break;
} }
if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->send_timeout == 0){ if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->event_timeout == 0){
mud->send_timeout = MQTT_SEND_TIMEOUT; mud->event_timeout = MQTT_SEND_TIMEOUT;
NODE_DBG("Sent: %d\n", node->msg.length); NODE_DBG("Sent: %d\n", node->msg.length);
if( mud->secure ) if( mud->secure )
espconn_secure_sent( pesp_conn, node->msg.data, node->msg.length ); espconn_secure_sent( pesp_conn, node->msg.data, node->msg.length );
@ -330,13 +355,13 @@ READPACKET:
mud->keep_alive_tick = 0; mud->keep_alive_tick = 0;
mud->mqtt_state.outbound_message = NULL; mud->mqtt_state.outbound_message = NULL;
} }
NODE_DBG("leave mqtt_socket_received.\n");
return; return;
} }
static void mqtt_socket_sent(void *arg) static void mqtt_socket_sent(void *arg)
{ {
// NODE_DBG("mqtt_socket_sent is called.\n"); NODE_DBG("enter mqtt_socket_sent.\n");
struct espconn *pesp_conn = arg; struct espconn *pesp_conn = arg;
if(pesp_conn == NULL) if(pesp_conn == NULL)
return; return;
@ -350,7 +375,7 @@ static void mqtt_socket_sent(void *arg)
} }
// call mqtt_sent() // call mqtt_sent()
mud->send_timeout = 0; mud->event_timeout = 0;
// qos = 0, publish and forgot. // qos = 0, publish and forgot.
msg_queue_t *node = mud->mqtt_state.pending_msg_q; msg_queue_t *node = mud->mqtt_state.pending_msg_q;
@ -364,12 +389,12 @@ static void mqtt_socket_sent(void *arg)
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_call(gL, 1, 0); lua_call(gL, 1, 0);
} }
NODE_DBG("leave mqtt_socket_sent.\n");
} }
static int mqtt_socket_client( lua_State* L );
static void mqtt_socket_connected(void *arg) static void mqtt_socket_connected(void *arg)
{ {
NODE_DBG("mqtt_socket_connected is called.\n"); NODE_DBG("enter mqtt_socket_connected.\n");
struct espconn *pesp_conn = arg; struct espconn *pesp_conn = arg;
if(pesp_conn == NULL) if(pesp_conn == NULL)
return; return;
@ -385,26 +410,30 @@ static void mqtt_socket_connected(void *arg)
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.out_buffer, mud->mqtt_state.out_buffer_length); mqtt_msg_init(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.out_buffer, mud->mqtt_state.out_buffer_length);
mud->mqtt_state.outbound_message = mqtt_msg_connect(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.connect_info); mud->mqtt_state.outbound_message = mqtt_msg_connect(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.connect_info);
NODE_DBG("Send MQTT connection infomation, data len: %d, d[0]=%d \r\n", mud->mqtt_state.outbound_message->length, mud->mqtt_state.outbound_message->data[0]); NODE_DBG("Send MQTT connection infomation, data len: %d, d[0]=%d \r\n", mud->mqtt_state.outbound_message->length, mud->mqtt_state.outbound_message->data[0]);
if(mud->secure){ mud->event_timeout = MQTT_SEND_TIMEOUT;
if(mud->secure)
espconn_secure_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); espconn_secure_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length);
}
else else
{
espconn_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); espconn_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length);
}
mud->mqtt_state.outbound_message = NULL; mud->mqtt_state.outbound_message = NULL;
mud->connState = MQTT_CONNECT_SENDING; mud->connState = MQTT_CONNECT_SENDING;
NODE_DBG("leave mqtt_socket_connected.\n");
return; return;
} }
void mqtt_socket_timer(void *arg) void mqtt_socket_timer(void *arg)
{ {
// NODE_DBG("enter mqtt_socket_timer.\n");
lmqtt_userdata *mud = (lmqtt_userdata*) arg; lmqtt_userdata *mud = (lmqtt_userdata*) arg;
if(mud == NULL) if(mud == NULL)
return; return;
if(mud->send_timeout > 0){ if(mud->event_timeout > 0){
mud->send_timeout --; NODE_DBG("event_timeout: %d.\n", mud->event_timeout);
mud->event_timeout --;
if(mud->event_timeout > 0){
return;
}
} }
if(mud->pesp_conn == NULL){ if(mud->pesp_conn == NULL){
@ -412,35 +441,32 @@ void mqtt_socket_timer(void *arg)
return; return;
} }
if(mud->send_timeout == 0){ // switch to next queued event. if(mud->connState == MQTT_INIT){ // socket connect time out.
if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT time out. NODE_DBG("Can not connect to broker.\n");
} else if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out.
mud->connState = MQTT_INIT; mud->connState = MQTT_INIT;
if(mud->secure){ if(mud->secure)
espconn_secure_disconnect(mud->pesp_conn); espconn_secure_disconnect(mud->pesp_conn);
} else
else {
espconn_disconnect(mud->pesp_conn); espconn_disconnect(mud->pesp_conn);
}
mud->keep_alive_tick = 0; // not need count anymore mud->keep_alive_tick = 0; // not need count anymore
} else if(mud->connState == MQTT_CONNECT_SENT){ // wait for CONACK time out.
NODE_DBG("MQTT_CONNECT failed.\n");
} else if(mud->connState == MQTT_DATA){ } else if(mud->connState == MQTT_DATA){
msg_queue_t *pending_msg = mud->mqtt_state.pending_msg_q; msg_queue_t *pending_msg = mud->mqtt_state.pending_msg_q;
if(pending_msg){ if(pending_msg){
mud->send_timeout = MQTT_SEND_TIMEOUT; mud->event_timeout = MQTT_SEND_TIMEOUT;
if(mud->secure) if(mud->secure)
espconn_secure_sent(mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length); espconn_secure_sent(mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length);
else else
espconn_sent(mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length); espconn_sent(mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length);
mud->keep_alive_tick = 0; mud->keep_alive_tick = 0;
NODE_DBG("id: %d - qos: %d, length: %d\n", pending_msg->msg_id, pending_msg->publish_qos, pending_msg->msg.length); NODE_DBG("id: %d - qos: %d, length: %d\n", pending_msg->msg_id, pending_msg->publish_qos, pending_msg->msg.length);
} } else {
// no queued event. // no queued event.
}
}
if(mud->connState == MQTT_DATA){
mud->keep_alive_tick ++; mud->keep_alive_tick ++;
if(mud->keep_alive_tick > mud->mqtt_state.connect_info->keepalive){ if(mud->keep_alive_tick > mud->mqtt_state.connect_info->keepalive){
mud->send_timeout = MQTT_SEND_TIMEOUT; mud->event_timeout = MQTT_SEND_TIMEOUT;
NODE_DBG("\r\nMQTT: Send keepalive packet\r\n"); NODE_DBG("\r\nMQTT: Send keepalive packet\r\n");
mud->mqtt_state.outbound_message = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection); mud->mqtt_state.outbound_message = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection);
if(mud->secure) if(mud->secure)
@ -450,12 +476,14 @@ void mqtt_socket_timer(void *arg)
mud->keep_alive_tick = 0; mud->keep_alive_tick = 0;
} }
} }
}
// NODE_DBG("leave mqtt_socket_timer.\n");
} }
// Lua: mqtt.Client(clientid, keepalive, user, pass) // Lua: mqtt.Client(clientid, keepalive, user, pass)
static int mqtt_socket_client( lua_State* L ) static int mqtt_socket_client( lua_State* L )
{ {
NODE_DBG("mqtt_socket_client is called.\n"); NODE_DBG("enter mqtt_socket_client.\n");
lmqtt_userdata *mud; lmqtt_userdata *mud;
char tempid[20] = {0}; char tempid[20] = {0};
@ -485,8 +513,9 @@ static int mqtt_socket_client( lua_State* L )
mud->secure = 0; mud->secure = 0;
mud->keep_alive_tick = 0; mud->keep_alive_tick = 0;
mud->send_timeout = 0; mud->event_timeout = 0;
mud->connState = MQTT_INIT; mud->connState = MQTT_INIT;
mud->connected = false;
c_memset(&mud->mqttTimer, 0, sizeof(ETSTimer)); c_memset(&mud->mqttTimer, 0, sizeof(ETSTimer));
c_memset(&mud->mqtt_state, 0, sizeof(mqtt_state_t)); c_memset(&mud->mqtt_state, 0, sizeof(mqtt_state_t));
c_memset(&mud->connect_info, 0, sizeof(mqtt_connect_info_t)); c_memset(&mud->connect_info, 0, sizeof(mqtt_connect_info_t));
@ -576,8 +605,11 @@ static int mqtt_socket_client( lua_State* L )
mud->mqtt_state.in_buffer_length = MQTT_BUF_SIZE; mud->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;
mud->mqtt_state.out_buffer_length = MQTT_BUF_SIZE; mud->mqtt_state.out_buffer_length = MQTT_BUF_SIZE;
mud->mqtt_state.pending_msg_q = NULL; mud->mqtt_state.pending_msg_q = NULL;
mud->mqtt_state.auto_reconnect = 1;
mud->mqtt_state.port = 1883;
mud->mqtt_state.connect_info = &mud->connect_info; mud->mqtt_state.connect_info = &mud->connect_info;
NODE_DBG("leave mqtt_socket_client.\n");
return 1; return 1;
} }
@ -586,7 +618,7 @@ static int mqtt_socket_client( lua_State* L )
// socket: unref everything // socket: unref everything
static int mqtt_delete( lua_State* L ) static int mqtt_delete( lua_State* L )
{ {
NODE_DBG("mqtt_delete is called.\n"); NODE_DBG("enter mqtt_delete.\n");
lmqtt_userdata *mud = (lmqtt_userdata *)luaL_checkudata(L, 1, "mqtt.socket"); lmqtt_userdata *mud = (lmqtt_userdata *)luaL_checkudata(L, 1, "mqtt.socket");
luaL_argcheck(L, mud, 1, "mqtt.socket expected"); luaL_argcheck(L, mud, 1, "mqtt.socket expected");
@ -670,26 +702,27 @@ static int mqtt_delete( lua_State* L )
mud->self_ref = LUA_NOREF; mud->self_ref = LUA_NOREF;
} }
lua_gc(gL, LUA_GCRESTART, 0); lua_gc(gL, LUA_GCRESTART, 0);
NODE_DBG("leave mqtt_delete.\n");
return 0; return 0;
} }
static void socket_connect(struct espconn *pesp_conn) static void socket_connect(struct espconn *pesp_conn)
{ {
NODE_DBG("enter socket_connect.\n");
if(pesp_conn == NULL) if(pesp_conn == NULL)
return; return;
lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse; lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse;
if(mud == NULL) if(mud == NULL)
return; return;
if(mud->secure){ if(mud->secure)
espconn_secure_connect(pesp_conn); espconn_secure_connect(pesp_conn);
}
else else
{
espconn_connect(pesp_conn); espconn_connect(pesp_conn);
}
NODE_DBG("socket_connect is called.\n"); os_timer_arm(&mud->mqttTimer, 1000, 1);
NODE_DBG("leave socket_connect.\n");
} }
static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg); static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg);
@ -697,7 +730,7 @@ static dns_reconn_count = 0;
static ip_addr_t host_ip; // for dns static ip_addr_t host_ip; // for dns
static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
{ {
NODE_DBG("socket_dns_found is called.\n"); NODE_DBG("enter socket_dns_found.\n");
struct espconn *pesp_conn = arg; struct espconn *pesp_conn = arg;
if(pesp_conn == NULL){ if(pesp_conn == NULL){
NODE_DBG("pesp_conn null.\n"); NODE_DBG("pesp_conn null.\n");
@ -729,86 +762,20 @@ static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
NODE_DBG("\n"); NODE_DBG("\n");
socket_connect(pesp_conn); socket_connect(pesp_conn);
} }
NODE_DBG("leave socket_dns_found.\n");
} }
// Lua: mqtt:lwt( topic, message, qos, retain, function(client) ) // Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client) )
static int mqtt_socket_lwt( lua_State* L )
{
uint8_t stack = 1;
size_t topicSize, msgSize;
NODE_DBG("mqtt_socket_lwt.\n");
lmqtt_userdata *mud = NULL;
const char *lwtTopic, *lwtMsg;
uint8_t lwtQoS, lwtRetain;
mud = (lmqtt_userdata *)luaL_checkudata( L, stack, "mqtt.socket" );
luaL_argcheck( L, mud, stack, "mqtt.socket expected" );
if(mud == NULL)
return 0;
stack++;
lwtTopic = luaL_checklstring( L, stack, &topicSize );
if (lwtTopic == NULL)
{
return luaL_error( L, "need lwt topic");
}
stack++;
lwtMsg = luaL_checklstring( L, stack, &msgSize );
if (lwtMsg == NULL)
{
return luaL_error( L, "need lwt message");
}
mud->connect_info.will_topic = (uint8_t*) c_zalloc( topicSize + 1 );
mud->connect_info.will_message = (uint8_t*) c_zalloc( msgSize + 1 );
if(!mud->connect_info.will_topic || !mud->connect_info.will_message){
if(mud->connect_info.will_topic){
c_free(mud->connect_info.will_topic);
mud->connect_info.will_topic = NULL;
}
if(mud->connect_info.will_message){
c_free(mud->connect_info.will_message);
mud->connect_info.will_message = NULL;
}
return luaL_error( L, "not enough memory");
}
c_memcpy(mud->connect_info.will_topic, lwtTopic, topicSize);
mud->connect_info.will_topic[topicSize] = 0;
c_memcpy(mud->connect_info.will_message, lwtMsg, msgSize);
mud->connect_info.will_message[msgSize] = 0;
if ( lua_isnumber(L, stack) )
{
mud->connect_info.will_qos = lua_tointeger(L, stack);
stack++;
}
if ( lua_isnumber(L, stack) )
{
mud->connect_info.will_retain = lua_tointeger(L, stack);
stack++;
}
NODE_DBG("mqtt_socket_lwt: topic: %s, message: %s, qos: %d, retain: %d\n",
mud->connect_info.will_topic,
mud->connect_info.will_message,
mud->connect_info.will_qos,
mud->connect_info.will_retain);
return 0;
}
// Lua: mqtt:connect( host, port, secure, function(client) )
static int mqtt_socket_connect( lua_State* L ) static int mqtt_socket_connect( lua_State* L )
{ {
NODE_DBG("mqtt_socket_connect is called.\n"); NODE_DBG("enter mqtt_socket_connect.\n");
lmqtt_userdata *mud = NULL; lmqtt_userdata *mud = NULL;
unsigned port = 1883; unsigned port = 1883;
size_t il; size_t il;
ip_addr_t ipaddr; ip_addr_t ipaddr;
const char *domain; const char *domain;
int stack = 1; int stack = 1;
unsigned secure = 0; unsigned secure = 0, auto_reconnect = 0;
int top = lua_gettop(L); int top = lua_gettop(L);
mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket"); mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket");
@ -817,6 +784,10 @@ static int mqtt_socket_connect( lua_State* L )
if(mud == NULL) if(mud == NULL)
return 0; return 0;
if(mud->connected){
return luaL_error(L, "already connected");
}
if(mud->pesp_conn){ //TODO: should I free tcp struct directly or ask user to call close()??? if(mud->pesp_conn){ //TODO: should I free tcp struct directly or ask user to call close()???
mud->pesp_conn->reverse = NULL; mud->pesp_conn->reverse = NULL;
if(mud->pesp_conn->proto.tcp) if(mud->pesp_conn->proto.tcp)
@ -868,6 +839,7 @@ static int mqtt_socket_connect( lua_State* L )
} }
pesp_conn->proto.tcp->remote_port = port; pesp_conn->proto.tcp->remote_port = port;
pesp_conn->proto.tcp->local_port = espconn_port(); pesp_conn->proto.tcp->local_port = espconn_port();
mud->mqtt_state.port = port;
if ( (stack<=top) && lua_isnumber(L, stack) ) if ( (stack<=top) && lua_isnumber(L, stack) )
{ {
@ -881,6 +853,18 @@ static int mqtt_socket_connect( lua_State* L )
} }
mud->secure = secure; // save mud->secure = secure; // save
if ( (stack<=top) && lua_isnumber(L, stack) )
{
auto_reconnect = lua_tointeger(L, stack);
stack++;
if ( auto_reconnect != 0 && auto_reconnect != 1 ){
auto_reconnect = 0; // default to 0
}
} else {
auto_reconnect = 0; // default to 0
}
mud->mqtt_state.auto_reconnect = auto_reconnect;
// call back function when a connection is obtained, tcp only // call back function when a connection is obtained, tcp only
if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){ if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){
lua_pushvalue(L, stack); // copy argument (func) to the top of stack lua_pushvalue(L, stack); // copy argument (func) to the top of stack
@ -898,6 +882,12 @@ static int mqtt_socket_connect( lua_State* L )
espconn_regist_connectcb(pesp_conn, mqtt_socket_connected); espconn_regist_connectcb(pesp_conn, mqtt_socket_connected);
espconn_regist_reconcb(pesp_conn, mqtt_socket_reconnected); espconn_regist_reconcb(pesp_conn, mqtt_socket_reconnected);
os_timer_disarm(&mud->mqttTimer);
os_timer_setfn(&mud->mqttTimer, (os_timer_func_t *)mqtt_socket_timer, mud);
// timer started in socket_connect()
mud->event_timeout = MQTT_CONNECT_TIMEOUT;
mud->connState = MQTT_INIT;
if((ipaddr.addr == IPADDR_NONE) && (c_memcmp(domain,"255.255.255.255",16) != 0)) if((ipaddr.addr == IPADDR_NONE) && (c_memcmp(domain,"255.255.255.255",16) != 0))
{ {
host_ip.addr = 0; host_ip.addr = 0;
@ -911,9 +901,7 @@ static int mqtt_socket_connect( lua_State* L )
socket_connect(pesp_conn); socket_connect(pesp_conn);
} }
os_timer_disarm(&mud->mqttTimer); NODE_DBG("leave mqtt_socket_connect.\n");
os_timer_setfn(&mud->mqttTimer, (os_timer_func_t *)mqtt_socket_timer, mud);
os_timer_arm(&mud->mqttTimer, 1000, 1);
return 0; return 0;
} }
@ -921,7 +909,7 @@ static int mqtt_socket_connect( lua_State* L )
// client disconnect and unref itself // client disconnect and unref itself
static int mqtt_socket_close( lua_State* L ) static int mqtt_socket_close( lua_State* L )
{ {
NODE_DBG("mqtt_socket_close is called.\n"); NODE_DBG("enter mqtt_socket_close.\n");
int i = 0; int i = 0;
lmqtt_userdata *mud = NULL; lmqtt_userdata *mud = NULL;
@ -944,13 +932,14 @@ static int mqtt_socket_close( lua_State* L )
if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port) if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port)
espconn_disconnect(mud->pesp_conn); espconn_disconnect(mud->pesp_conn);
} }
NODE_DBG("leave mqtt_socket_close.\n");
return 0; return 0;
} }
// Lua: mqtt:on( "method", function() ) // Lua: mqtt:on( "method", function() )
static int mqtt_socket_on( lua_State* L ) static int mqtt_socket_on( lua_State* L )
{ {
NODE_DBG("mqtt_on is called.\n"); NODE_DBG("enter mqtt_socket_on.\n");
lmqtt_userdata *mud; lmqtt_userdata *mud;
size_t sl; size_t sl;
@ -984,13 +973,13 @@ static int mqtt_socket_on( lua_State* L )
lua_pop(L, 1); lua_pop(L, 1);
return luaL_error( L, "method not supported" ); return luaL_error( L, "method not supported" );
} }
NODE_DBG("leave mqtt_socket_on.\n");
return 0; return 0;
} }
// Lua: bool = mqtt:subscribe(topic, qos, function()) // Lua: bool = mqtt:subscribe(topic, qos, function())
static int mqtt_socket_subscribe( lua_State* L ) { static int mqtt_socket_subscribe( lua_State* L ) {
NODE_DBG("mqtt_socket_subscribe is called.\n"); NODE_DBG("enter mqtt_socket_subscribe.\n");
uint8_t stack = 1, qos = 0; uint8_t stack = 1, qos = 0;
uint16_t msg_id = 0; uint16_t msg_id = 0;
@ -1068,8 +1057,8 @@ static int mqtt_socket_subscribe( lua_State* L ) {
NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length); NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length);
if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->send_timeout == 0){ if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->event_timeout == 0){
mud->send_timeout = MQTT_SEND_TIMEOUT; mud->event_timeout = MQTT_SEND_TIMEOUT;
NODE_DBG("Sent: %d\n", node->msg.length); NODE_DBG("Sent: %d\n", node->msg.length);
if( mud->secure ) if( mud->secure )
espconn_secure_sent( mud->pesp_conn, node->msg.data, node->msg.length ); espconn_secure_sent( mud->pesp_conn, node->msg.data, node->msg.length );
@ -1084,13 +1073,14 @@ static int mqtt_socket_subscribe( lua_State* L ) {
lua_pushboolean(L, 1); // enqueued succeed. lua_pushboolean(L, 1); // enqueued succeed.
} }
mud->mqtt_state.outbound_message = NULL; mud->mqtt_state.outbound_message = NULL;
NODE_DBG("leave mqtt_socket_subscribe.\n");
return 1; return 1;
} }
// Lua: bool = mqtt:publish( topic, payload, qos, retain, function() ) // Lua: bool = mqtt:publish( topic, payload, qos, retain, function() )
static int mqtt_socket_publish( lua_State* L ) static int mqtt_socket_publish( lua_State* L )
{ {
// NODE_DBG("mqtt_publish is called.\n"); NODE_DBG("enter mqtt_socket_publish.\n");
struct espconn *pesp_conn = NULL; struct espconn *pesp_conn = NULL;
lmqtt_userdata *mud; lmqtt_userdata *mud;
size_t l; size_t l;
@ -1143,8 +1133,8 @@ static int mqtt_socket_publish( lua_State* L )
msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message,
msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos ); msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos );
if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->send_timeout == 0){ if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->event_timeout == 0){
mud->send_timeout = MQTT_SEND_TIMEOUT; mud->event_timeout = MQTT_SEND_TIMEOUT;
NODE_DBG("Sent: %d\n", node->msg.length); NODE_DBG("Sent: %d\n", node->msg.length);
if( mud->secure ) if( mud->secure )
espconn_secure_sent( mud->pesp_conn, node->msg.data, node->msg.length ); espconn_secure_sent( mud->pesp_conn, node->msg.data, node->msg.length );
@ -1159,20 +1149,99 @@ static int mqtt_socket_publish( lua_State* L )
lua_pushboolean(L, 1); // enqueued succeed. lua_pushboolean(L, 1); // enqueued succeed.
} }
mud->mqtt_state.outbound_message = NULL; mud->mqtt_state.outbound_message = NULL;
NODE_DBG("leave mqtt_socket_publish.\n");
return 1; return 1;
} }
// Lua: mqtt:lwt( topic, message, qos, retain, function(client) )
static int mqtt_socket_lwt( lua_State* L )
{
NODE_DBG("enter mqtt_socket_lwt.\n");
uint8_t stack = 1;
size_t topicSize, msgSize;
NODE_DBG("mqtt_socket_lwt.\n");
lmqtt_userdata *mud = NULL;
const char *lwtTopic, *lwtMsg;
uint8_t lwtQoS, lwtRetain;
mud = (lmqtt_userdata *)luaL_checkudata( L, stack, "mqtt.socket" );
luaL_argcheck( L, mud, stack, "mqtt.socket expected" );
if(mud == NULL)
return 0;
stack++;
lwtTopic = luaL_checklstring( L, stack, &topicSize );
if (lwtTopic == NULL)
{
return luaL_error( L, "need lwt topic");
}
stack++;
lwtMsg = luaL_checklstring( L, stack, &msgSize );
if (lwtMsg == NULL)
{
return luaL_error( L, "need lwt message");
}
if(mud->connect_info.will_topic){ // free the previous one if there is any
c_free(mud->connect_info.will_topic);
mud->connect_info.will_topic = NULL;
}
if(mud->connect_info.will_message){
c_free(mud->connect_info.will_message);
mud->connect_info.will_message = NULL;
}
mud->connect_info.will_topic = (uint8_t*) c_zalloc( topicSize + 1 );
mud->connect_info.will_message = (uint8_t*) c_zalloc( msgSize + 1 );
if(!mud->connect_info.will_topic || !mud->connect_info.will_message){
if(mud->connect_info.will_topic){
c_free(mud->connect_info.will_topic);
mud->connect_info.will_topic = NULL;
}
if(mud->connect_info.will_message){
c_free(mud->connect_info.will_message);
mud->connect_info.will_message = NULL;
}
return luaL_error( L, "not enough memory");
}
c_memcpy(mud->connect_info.will_topic, lwtTopic, topicSize);
mud->connect_info.will_topic[topicSize] = 0;
c_memcpy(mud->connect_info.will_message, lwtMsg, msgSize);
mud->connect_info.will_message[msgSize] = 0;
if ( lua_isnumber(L, stack) )
{
mud->connect_info.will_qos = lua_tointeger(L, stack);
stack++;
}
if ( lua_isnumber(L, stack) )
{
mud->connect_info.will_retain = lua_tointeger(L, stack);
stack++;
}
NODE_DBG("mqtt_socket_lwt: topic: %s, message: %s, qos: %d, retain: %d\n",
mud->connect_info.will_topic,
mud->connect_info.will_message,
mud->connect_info.will_qos,
mud->connect_info.will_retain);
NODE_DBG("leave mqtt_socket_lwt.\n");
return 0;
}
// Module function map // Module function map
#define MIN_OPT_LEVEL 2 #define MIN_OPT_LEVEL 2
#include "lrodefs.h" #include "lrodefs.h"
static const LUA_REG_TYPE mqtt_socket_map[] = static const LUA_REG_TYPE mqtt_socket_map[] =
{ {
{ LSTRKEY( "lwt" ), LFUNCVAL ( mqtt_socket_lwt ) },
{ LSTRKEY( "connect" ), LFUNCVAL ( mqtt_socket_connect ) }, { LSTRKEY( "connect" ), LFUNCVAL ( mqtt_socket_connect ) },
{ LSTRKEY( "close" ), LFUNCVAL ( mqtt_socket_close ) }, { LSTRKEY( "close" ), LFUNCVAL ( mqtt_socket_close ) },
{ LSTRKEY( "publish" ), LFUNCVAL ( mqtt_socket_publish ) }, { LSTRKEY( "publish" ), LFUNCVAL ( mqtt_socket_publish ) },
{ LSTRKEY( "subscribe" ), LFUNCVAL ( mqtt_socket_subscribe ) }, { LSTRKEY( "subscribe" ), LFUNCVAL ( mqtt_socket_subscribe ) },
{ LSTRKEY( "lwt" ), LFUNCVAL ( mqtt_socket_lwt ) },
{ LSTRKEY( "on" ), LFUNCVAL ( mqtt_socket_on ) }, { LSTRKEY( "on" ), LFUNCVAL ( mqtt_socket_on ) },
{ LSTRKEY( "__gc" ), LFUNCVAL ( mqtt_delete ) }, { LSTRKEY( "__gc" ), LFUNCVAL ( mqtt_delete ) },
#if LUA_OPTIMIZE_MEMORY > 0 #if LUA_OPTIMIZE_MEMORY > 0

View File

@ -397,6 +397,8 @@ string.gsub("abc%0Ddef", "%%(%x%x)", ex) print("hello")
v="abc%0D%0Adef" v="abc%0D%0Adef"
pcall(function() print(string.gsub(v, "%%(%x%x)", function(x) return string.char(tonumber(x, 16)) end)) end) pcall(function() print(string.gsub(v, "%%(%x%x)", function(x) return string.char(tonumber(x, 16)) end)) end)
mosca -v | bunyan
m=mqtt.Client() m=mqtt.Client()
m:connect("192.168.18.88",1883) m:connect("192.168.18.88",1883)
topic={} topic={}
@ -426,3 +428,25 @@ m=mqtt.Client()
m:on("connect",function(m) print("connection") end ) m:on("connect",function(m) print("connection") end )
m:connect("192.168.18.88",1883) m:connect("192.168.18.88",1883)
m:on("offline",function(m) print("disconnection") end ) m:on("offline",function(m) print("disconnection") end )
m=mqtt.Client()
m:on("connect",function(m) print("connection "..node.heap()) end )
m:on("offline", function(conn)
if conn == nil then print("conn is nil") end
print("Reconnect to broker...")
print(node.heap())
conn:connect("192.168.18.88",1883,0,1)
end)
m:connect("192.168.18.88",1883,0,1)
m=mqtt.Client()
m:on("connect",function(m) print("connection "..node.heap()) end )
m:on("offline", function(conn)
if conn == nil then print("conn is nil") end
print("Reconnect to broker...")
print(node.heap())
conn:connect("192.168.18.88",1883)
end)
m:connect("192.168.18.88",1883)
m:close()