diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index 78a41f18..2b6a35b3 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -75,7 +75,7 @@ typedef struct lmqtt_userdata tConnState connState; }lmqtt_userdata; -static void socket_connect(struct espconn *pesp_conn); +static sint8 socket_connect(struct espconn *pesp_conn); static void mqtt_socket_reconnected(void *arg, sint8_t err); static void mqtt_socket_connected(void *arg); @@ -397,12 +397,12 @@ READPACKET: #ifdef CLIENT_SSL_ENABLE if( mud->secure ) { - espconn_secure_sent( pesp_conn, node->msg.data, node->msg.length ); + espconn_secure_send( pesp_conn, node->msg.data, node->msg.length ); } else #endif { - espconn_sent( pesp_conn, node->msg.data, node->msg.length ); + espconn_send( pesp_conn, node->msg.data, node->msg.length ); } } NODE_DBG("receive, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); @@ -479,12 +479,12 @@ static void mqtt_socket_connected(void *arg) #ifdef CLIENT_SSL_ENABLE if(mud->secure) { - espconn_secure_sent(pesp_conn, temp_msg->data, temp_msg->length); + espconn_secure_send(pesp_conn, temp_msg->data, temp_msg->length); } else #endif { - espconn_sent(pesp_conn, temp_msg->data, temp_msg->length); + espconn_send(pesp_conn, temp_msg->data, temp_msg->length); } mud->keep_alive_tick = 0; @@ -547,12 +547,12 @@ void mqtt_socket_timer(void *arg) #ifdef CLIENT_SSL_ENABLE if(mud->secure) { - espconn_secure_sent(mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length); + espconn_secure_send(mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length); } else #endif { - espconn_sent(mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length); + espconn_send(mud->pesp_conn, pending_msg->msg.data, pending_msg->msg.length); } mud->keep_alive_tick = 0; NODE_DBG("id: %d - qos: %d, length: %d\n", pending_msg->msg_id, pending_msg->publish_qos, pending_msg->msg.length); @@ -571,12 +571,12 @@ void mqtt_socket_timer(void *arg) #ifdef CLIENT_SSL_ENABLE if(mud->secure) { - espconn_secure_sent(mud->pesp_conn, temp_msg->data, temp_msg->length); + espconn_secure_send(mud->pesp_conn, temp_msg->data, temp_msg->length); } else #endif { - espconn_sent(mud->pesp_conn, temp_msg->data, temp_msg->length); + espconn_send(mud->pesp_conn, temp_msg->data, temp_msg->length); } mud->keep_alive_tick = 0; } @@ -805,43 +805,55 @@ static int mqtt_delete( lua_State* L ) return 0; } -static void socket_connect(struct espconn *pesp_conn) +static sint8 socket_connect(struct espconn *pesp_conn) { NODE_DBG("enter socket_connect.\n"); + sint8 espconn_status; + if(pesp_conn == NULL) - return; + return ESPCONN_CONN; lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse; if(mud == NULL) - return; + return ESPCONN_ARG; mud->event_timeout = MQTT_CONNECT_TIMEOUT; mud->connState = MQTT_INIT; #ifdef CLIENT_SSL_ENABLE if(mud->secure) { - espconn_secure_connect(pesp_conn); + espconn_status = espconn_secure_connect(pesp_conn); } else #endif { - espconn_connect(pesp_conn); + espconn_status = espconn_connect(pesp_conn); } os_timer_arm(&mud->mqttTimer, 1000, 1); NODE_DBG("leave socket_connect.\n"); + + return espconn_status; } -static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg); +static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg); static int dns_reconn_count = 0; static ip_addr_t host_ip; // for dns -static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) + +/* wrapper for using socket_dns_found() as callback function */ +static void socket_dns_foundcb(const char *name, ip_addr_t *ipaddr, void *arg) +{ + socket_dns_found(name, ipaddr, arg); +} + +static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) { NODE_DBG("enter socket_dns_found.\n"); + sint8 espconn_status = ESPCONN_OK; struct espconn *pesp_conn = arg; if(pesp_conn == NULL){ NODE_DBG("pesp_conn null.\n"); - return; + return -1; } if(ipaddr == NULL) @@ -851,12 +863,11 @@ static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) NODE_ERR( "DNS Fail!\n" ); // Note: should delete the pesp_conn or unref self_ref here. mqtt_socket_disconnected(arg); // although not connected, but fire disconnect callback to release every thing. - return; + return -1; } NODE_ERR( "DNS retry %d!\n", dns_reconn_count ); host_ip.addr = 0; - espconn_gethostbyname(pesp_conn, name, &host_ip, socket_dns_found); - return; + return espconn_gethostbyname(pesp_conn, name, &host_ip, socket_dns_foundcb); } // ipaddr->addr is a uint32_t ip @@ -867,9 +878,11 @@ static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) NODE_DBG("TCP ip is set: "); NODE_DBG(IPSTR, IP2STR(&(ipaddr->addr))); NODE_DBG("\n"); - socket_connect(pesp_conn); + espconn_status = socket_connect(pesp_conn); } NODE_DBG("leave socket_dns_found.\n"); + + return espconn_status; } // Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client) ) @@ -884,6 +897,7 @@ static int mqtt_socket_connect( lua_State* L ) int stack = 1; unsigned secure = 0, auto_reconnect = 0; int top = lua_gettop(L); + sint8 espconn_status; mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket"); luaL_argcheck(L, mud, stack, "mqtt.socket expected"); @@ -905,21 +919,21 @@ static int mqtt_socket_connect( lua_State* L ) } struct espconn *pesp_conn = NULL; - pesp_conn = mud->pesp_conn = (struct espconn *)c_zalloc(sizeof(struct espconn)); - if(!pesp_conn) - return luaL_error(L, "not enough memory"); + pesp_conn = mud->pesp_conn = (struct espconn *)c_zalloc(sizeof(struct espconn)); + if(!pesp_conn) + return luaL_error(L, "not enough memory"); - pesp_conn->proto.udp = NULL; - pesp_conn->proto.tcp = (esp_tcp *)c_zalloc(sizeof(esp_tcp)); - if(!pesp_conn->proto.tcp){ - c_free(pesp_conn); - pesp_conn = mud->pesp_conn = NULL; - return luaL_error(L, "not enough memory"); - } - // reverse is for the callback function - pesp_conn->reverse = mud; - pesp_conn->type = ESPCONN_TCP; - pesp_conn->state = ESPCONN_NONE; + pesp_conn->proto.udp = NULL; + pesp_conn->proto.tcp = (esp_tcp *)c_zalloc(sizeof(esp_tcp)); + if(!pesp_conn->proto.tcp){ + c_free(pesp_conn); + pesp_conn = mud->pesp_conn = NULL; + return luaL_error(L, "not enough memory"); + } + // reverse is for the callback function + pesp_conn->reverse = mud; + pesp_conn->type = ESPCONN_TCP; + pesp_conn->state = ESPCONN_NONE; mud->connected = false; if( (stack<=top) && lua_isstring(L,stack) ) // deal with the domain string @@ -993,8 +1007,8 @@ static int mqtt_socket_connect( lua_State* L ) luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref); mud->self_ref = luaL_ref(L, LUA_REGISTRYINDEX); - espconn_regist_connectcb(pesp_conn, mqtt_socket_connected); - espconn_regist_reconcb(pesp_conn, mqtt_socket_reconnected); + espconn_status = espconn_regist_connectcb(pesp_conn, mqtt_socket_connected); + espconn_status |= espconn_regist_reconcb(pesp_conn, mqtt_socket_reconnected); os_timer_disarm(&mud->mqttTimer); os_timer_setfn(&mud->mqttTimer, (os_timer_func_t *)mqtt_socket_timer, mud); @@ -1004,17 +1018,23 @@ static int mqtt_socket_connect( lua_State* L ) { host_ip.addr = 0; dns_reconn_count = 0; - if(ESPCONN_OK == espconn_gethostbyname(pesp_conn, domain, &host_ip, socket_dns_found)){ - socket_dns_found(domain, &host_ip, pesp_conn); // ip is returned in host_ip. + if(ESPCONN_OK == espconn_gethostbyname(pesp_conn, domain, &host_ip, socket_dns_foundcb)){ + espconn_status |= socket_dns_found(domain, &host_ip, pesp_conn); // ip is returned in host_ip. } } else { - socket_connect(pesp_conn); + espconn_status |= socket_connect(pesp_conn); } NODE_DBG("leave mqtt_socket_connect.\n"); - return 0; + + if (espconn_status == ESPCONN_OK) { + lua_pushboolean(L, 1); + } else { + lua_pushboolean(L, 0); + } + return 1; } // Lua: mqtt:close() @@ -1027,37 +1047,44 @@ static int mqtt_socket_close( lua_State* L ) mud = (lmqtt_userdata *)luaL_checkudata(L, 1, "mqtt.socket"); luaL_argcheck(L, mud, 1, "mqtt.socket expected"); - if(mud == NULL) - return 0; - - if(mud->pesp_conn == NULL) - return 0; + if (mud == NULL || mud->pesp_conn == NULL) { + lua_pushboolean(L, 0); + return 1; + } // Send disconnect message mqtt_message_t* temp_msg = mqtt_msg_disconnect(&mud->mqtt_state.mqtt_connection); NODE_DBG("Send MQTT disconnect infomation, data len: %d, d[0]=%d \r\n", temp_msg->length, temp_msg->data[0]); + + sint8 espconn_status; #ifdef CLIENT_SSL_ENABLE if(mud->secure) - espconn_secure_sent(mud->pesp_conn, temp_msg->data, temp_msg->length); + espconn_status = espconn_secure_send(mud->pesp_conn, temp_msg->data, temp_msg->length); else #endif - espconn_sent(mud->pesp_conn, temp_msg->data, temp_msg->length); + espconn_status = espconn_send(mud->pesp_conn, temp_msg->data, temp_msg->length); mud->mqtt_state.auto_reconnect = 0; // stop auto reconnect. #ifdef CLIENT_SSL_ENABLE if(mud->secure){ if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port) - espconn_secure_disconnect(mud->pesp_conn); + espconn_status |= espconn_secure_disconnect(mud->pesp_conn); } else #endif { if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port) - espconn_disconnect(mud->pesp_conn); + espconn_status |= espconn_disconnect(mud->pesp_conn); } NODE_DBG("leave mqtt_socket_close.\n"); - return 0; + + if (espconn_status == ESPCONN_OK) { + lua_pushboolean(L, 1); + } else { + lua_pushboolean(L, 0); + } + return 1; } // Lua: mqtt:on( "method", function() ) @@ -1103,17 +1130,17 @@ static int mqtt_socket_on( lua_State* L ) // Lua: bool = mqtt:subscribe(topic, qos, function()) static int mqtt_socket_subscribe( lua_State* L ) { - NODE_DBG("enter mqtt_socket_subscribe.\n"); + NODE_DBG("enter mqtt_socket_subscribe.\n"); - uint8_t stack = 1, qos = 0; + uint8_t stack = 1, qos = 0; uint16_t msg_id = 0; - const char *topic; - size_t il; - lmqtt_userdata *mud; + const char *topic; + size_t il; + lmqtt_userdata *mud; - mud = (lmqtt_userdata *) luaL_checkudata( L, stack, "mqtt.socket" ); - luaL_argcheck( L, mud, stack, "mqtt.socket expected" ); - stack++; + mud = (lmqtt_userdata *) luaL_checkudata( L, stack, "mqtt.socket" ); + luaL_argcheck( L, mud, stack, "mqtt.socket expected" ); + stack++; if(mud==NULL){ NODE_DBG("userdata is nil.\n"); @@ -1127,8 +1154,8 @@ static int mqtt_socket_subscribe( lua_State* L ) { return 1; } - if(!mud->connected){ - luaL_error( L, "not connected" ); + if(!mud->connected){ + luaL_error( L, "not connected" ); lua_pushboolean(L, 0); return 1; } @@ -1137,53 +1164,60 @@ static int mqtt_socket_subscribe( lua_State* L ) { mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); mqtt_message_t *temp_msg = NULL; - if( lua_istable( L, stack ) ) { - NODE_DBG("subscribe table\n"); - lua_pushnil( L ); /* first key */ + if( lua_istable( L, stack ) ) { + NODE_DBG("subscribe table\n"); + lua_pushnil( L ); /* first key */ uint8_t temp_buf[MQTT_BUF_SIZE]; uint32_t temp_pos = 0; + uint8_t overflow = 0; - while( lua_next( L, stack ) != 0 ) { - topic = luaL_checkstring( L, -2 ); - qos = luaL_checkinteger( L, -1 ); + while( lua_next( L, stack ) != 0 ) { + topic = luaL_checkstring( L, -2 ); + qos = luaL_checkinteger( L, -1 ); - temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id ); - NODE_DBG("topic: %s - qos: %d, length: %d\n", topic, qos, temp_msg->length); + temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id ); + NODE_DBG("topic: %s - qos: %d, length: %d\n", topic, qos, temp_msg->length); if (temp_pos + temp_msg->length > MQTT_BUF_SIZE){ lua_pop(L, 1); + overflow = 1; break; // too long message for the outbuffer. } c_memcpy( temp_buf + temp_pos, temp_msg->data, temp_msg->length ); temp_pos += temp_msg->length; - lua_pop( L, 1 ); - } + lua_pop( L, 1 ); + } if (temp_pos == 0){ luaL_error( L, "invalid data" ); lua_pushboolean(L, 0); return 1; } - - c_memcpy( temp_buffer, temp_buf, temp_pos ); - temp_msg->data = temp_buffer; - temp_msg->length = temp_pos; - stack++; - } else { - NODE_DBG("subscribe string\n"); - topic = luaL_checklstring( L, stack, &il ); - stack++; - if( topic == NULL ){ - luaL_error( L, "need topic name" ); + if (overflow != 0){ + luaL_error( L, "buffer overflow, can't enqueue all subscriptions" ); lua_pushboolean(L, 0); return 1; } - qos = luaL_checkinteger( L, stack ); - temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id ); - stack++; - } + + c_memcpy( temp_buffer, temp_buf, temp_pos ); + temp_msg->data = temp_buffer; + temp_msg->length = temp_pos; + stack++; + } else { + NODE_DBG("subscribe string\n"); + topic = luaL_checklstring( L, stack, &il ); + stack++; + if( topic == NULL ){ + luaL_error( L, "need topic name" ); + lua_pushboolean(L, 0); + return 1; + } + qos = luaL_checkinteger( L, stack ); + temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id ); + stack++; + } if( lua_type( L, stack ) == LUA_TFUNCTION || lua_type( L, stack ) == LUA_TLIGHTFUNCTION ) { // TODO: this will overwrite the previous one. lua_pushvalue( L, stack ); // copy argument (func) to the top of stack @@ -1193,34 +1227,36 @@ static int mqtt_socket_subscribe( lua_State* L ) { } msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg, - msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) ); + msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) ); NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length); + sint8 espconn_status = ESPCONN_IF; + if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){ - mud->event_timeout = MQTT_SEND_TIMEOUT; - NODE_DBG("Sent: %d\n", node->msg.length); + mud->event_timeout = MQTT_SEND_TIMEOUT; + NODE_DBG("Sent: %d\n", node->msg.length); #ifdef CLIENT_SSL_ENABLE - if( mud->secure ) - { - espconn_secure_sent( mud->pesp_conn, node->msg.data, node->msg.length ); - } - else + if( mud->secure ) + { + espconn_status = espconn_secure_send( mud->pesp_conn, node->msg.data, node->msg.length ); + } + else #endif - { - espconn_sent( mud->pesp_conn, node->msg.data, node->msg.length ); - } + { + espconn_status = espconn_send( mud->pesp_conn, node->msg.data, node->msg.length ); + } mud->keep_alive_tick = 0; } - if(!node){ + if(!node || espconn_status != ESPCONN_OK){ lua_pushboolean(L, 0); } else { lua_pushboolean(L, 1); // enqueued succeed. } NODE_DBG("subscribe, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("leave mqtt_socket_subscribe.\n"); - return 1; + return 1; } // Lua: bool = mqtt:publish( topic, payload, qos, retain, function() ) @@ -1232,6 +1268,7 @@ static int mqtt_socket_publish( lua_State* L ) size_t l; uint8_t stack = 1; uint16_t msg_id = 0; + mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket"); luaL_argcheck(L, mud, stack, "mqtt.socket expected"); stack++; @@ -1285,23 +1322,25 @@ static int mqtt_socket_publish( lua_State* L ) msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos ); + sint8 espconn_status = ESPCONN_IF; + if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){ mud->event_timeout = MQTT_SEND_TIMEOUT; NODE_DBG("Sent: %d\n", node->msg.length); #ifdef CLIENT_SSL_ENABLE if( mud->secure ) { - espconn_secure_sent( mud->pesp_conn, node->msg.data, node->msg.length ); + espconn_status = espconn_secure_send( mud->pesp_conn, node->msg.data, node->msg.length ); } else #endif { - espconn_sent( mud->pesp_conn, node->msg.data, node->msg.length ); + espconn_status = espconn_send( mud->pesp_conn, node->msg.data, node->msg.length ); } mud->keep_alive_tick = 0; } - if(!node){ + if(!node || espconn_status != ESPCONN_OK){ lua_pushboolean(L, 0); } else { lua_pushboolean(L, 1); // enqueued succeed. diff --git a/docs/en/modules/mqtt.md b/docs/en/modules/mqtt.md index e8e06969..1d07cd64 100644 --- a/docs/en/modules/mqtt.md +++ b/docs/en/modules/mqtt.md @@ -71,7 +71,7 @@ Closes connection to the broker. none #### Returns -`nil` +`true` on success, `false` otherwise ## mqtt.client:connect() @@ -88,7 +88,7 @@ Connects to the broker specified by the given host, port, and secure options. - `function(client)` call back function for when the connection was established #### Returns -`nil` +`true` on success, `false` otherwise ## mqtt.client:lwt() @@ -135,7 +135,7 @@ Publishes a message. - `function(client)` optional callback fired when PUBACK received #### Returns -`nil` +`true` on success, `false` otherwise ## mqtt.client:subscribe() @@ -143,11 +143,21 @@ Subscribes to one or several topics. #### Syntax `mqtt:subscribe(topic, qos[, function(client, topic, message)])` +`mqtt:subscribe(table[, function(client, topic, message)])` #### Parameters - `topic` a [topic string](http://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices) - `qos` QoS subscription level, default 0 +- `table` array of 'topic, qos' pairs to subscribe to - `function(client, topic, message)` optional callback fired when message received #### Returns -`nil` +`true` on success, `false` otherwise + +#### Example +```lua +-- subscribe topic with qos = 0 +m:subscribe("/topic",0, function(conn) print("subscribe success") end) + +-- or subscribe multiple topic (topic/0, qos = 0; topic/1, qos = 1; topic2 , qos = 2) +m:subscribe({["topic/0"]=0,["topic/1"]=1,topic2=2}, function(conn) print("subscribe success") end)