From c5e9d800a1494ea943506eb4ed863d51a8a1fee2 Mon Sep 17 00:00:00 2001 From: Tuan PM Date: Mon, 2 Feb 2015 16:58:54 +0700 Subject: [PATCH] Add support multiple subscriptions, resolve nodemcu/nodemcu-firmware#152 --- README.md | 3 +- app/modules/mqtt.c | 163 ++++++++++++++++++++++++++++++++------------- 2 files changed, 119 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index 674c44e4..35c5d127 100644 --- a/README.md +++ b/README.md @@ -220,7 +220,8 @@ m:connect("192.168.11.118", 1880, 0, function(conn) print("connected") end) -- subscribe topic with qos = 0 m:subscribe("/topic",0, function(conn) print("subscribe success") end) - +-- or subscribe multiple topic (topic/0, qos = 0; topic/1, qos = 1; topic2 , qos = 2) +-- m:subscribe({["topic/0"]=0,["topic/1"]=1,topic2=2}, function(conn) print("subscribe success") end) -- publish a message with data = hello, QoS = 0, retain = 0 m:publish("/topic","hello",0,0, function(conn) print("sent") end) diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index 3c970feb..a37974ec 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -529,23 +529,35 @@ static int mqtt_delete( lua_State* L ) } if(mud->connect_info.will_topic){ - c_free(mud->connect_info.client_id); + c_free(mud->connect_info.will_topic); + mud->connect_info.will_topic = NULL; } if(mud->connect_info.will_message){ c_free(mud->connect_info.will_message); + mud->connect_info.will_message = NULL; } - if(mud->connect_info.client_id) + if(mud->connect_info.client_id){ c_free(mud->connect_info.client_id); - if(mud->connect_info.username) + mud->connect_info.client_id = NULL; + } + if(mud->connect_info.username){ c_free(mud->connect_info.username); - if(mud->connect_info.password) + mud->connect_info.username = NULL; + } + if(mud->connect_info.password){ c_free(mud->connect_info.password); - if(mud->mqtt_state.in_buffer) + mud->connect_info.password = NULL; + } + if(mud->mqtt_state.in_buffer){ c_free(mud->mqtt_state.in_buffer); - if(mud->mqtt_state.out_buffer) + mud->mqtt_state.in_buffer = NULL; + } + if(mud->mqtt_state.out_buffer){ c_free(mud->mqtt_state.out_buffer); + mud->mqtt_state.out_buffer = NULL; + } // free (unref) callback ref if(LUA_NOREF!=mud->cb_connect_ref){ @@ -879,52 +891,111 @@ static int mqtt_socket_on( lua_State* L ) } // Lua: mqtt:subscribe(topic, qos, function()) -static int mqtt_socket_subscribe( lua_State* L ) -{ - NODE_DBG("mqtt_socket_subscribe is called.\n"); - uint8_t stack = 1, qos = 0, retain = 0; - const char *topic; - size_t il; - lmqtt_userdata *mud; +static int mqtt_socket_subscribe( lua_State* L ) { + NODE_DBG("mqtt_socket_subscribe is called.\n"); + typedef struct SUB_STORAGE { + uint32_t length; + uint8_t *data; + struct SUB_STORAGE *next; + } SUB_STORAGE; - mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket"); - luaL_argcheck(L, mud, stack, "mqtt.socket expected"); - stack++; - - if(mud->send_timeout != 0) - return luaL_error( L, "sending in process" ); + uint8_t stack = 1, qos = 0; + const char *topic; + size_t il; + lmqtt_userdata *mud; - if(!mud->connected) - return luaL_error(L, "not connected"); + mud = (lmqtt_userdata *) luaL_checkudata( L, stack, "mqtt.socket" ); + luaL_argcheck( L, mud, stack, "mqtt.socket expected" ); + stack++; - topic = luaL_checklstring( L, stack, &il ); - stack++; - if(topic == NULL) - return luaL_error( L, "need topic name" ); - - qos = luaL_checkinteger( L, stack); - stack++; + if( mud->send_timeout != 0 ) + return luaL_error( L, "sending in process" ); - mud->mqtt_state.outbound_message = mqtt_msg_subscribe(&mud->mqtt_state.mqtt_connection, - topic, qos, - &mud->mqtt_state.pending_msg_id); - mud->send_timeout = MQTT_SEND_TIMEOUT; - mud->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_SUBSCRIBE; - mud->mqtt_state.pending_publish_qos = mqtt_get_qos(mud->mqtt_state.outbound_message->data); + if( !mud->connected ) + return luaL_error( L, "not connected" ); - if (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION){ - lua_pushvalue(L, stack); // copy argument (func) to the top of stack - if(mud->cb_suback_ref != LUA_NOREF) - luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_suback_ref); - mud->cb_suback_ref = luaL_ref(L, LUA_REGISTRYINDEX); - } + if( lua_istable( L, stack ) ) { + NODE_DBG("subscribe table\n"); + lua_pushnil( L ); /* first key */ + SUB_STORAGE *first, *last, *curr; + first = (SUB_STORAGE*) c_zalloc(sizeof(SUB_STORAGE)); + if( first == NULL ) + return luaL_error( L, "not enough memory" ); + first->length = 0; + last = first; + first->next = NULL; + while( lua_next( L, stack ) != 0 ) { + curr = (SUB_STORAGE*) c_zalloc(sizeof(SUB_STORAGE)); - if(mud->secure) - espconn_secure_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); - else - espconn_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); - - return 0; + if( curr == NULL ) + return luaL_error( L, "not enough memory" ); + topic = luaL_checkstring( L, -2 ); + qos = luaL_checkinteger( L, -1 ); + + mud->mqtt_state.outbound_message = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &mud->mqtt_state.pending_msg_id ); + NODE_DBG("topic: %s - qos: %d, length: %d\n", topic, qos, mud->mqtt_state.outbound_message->length); + curr->data = (uint8_t*) c_zalloc(mud->mqtt_state.outbound_message->length); + if( curr->data == NULL ) + return luaL_error( L, "not enough memory" ); + c_memcpy( curr->data, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length ); + + curr->length = mud->mqtt_state.outbound_message->length; + curr->next = NULL; + last->next = curr; + last = curr; + lua_pop( L, 1 ); + } + + curr = first; + uint32_t ptr = 0; + while( curr != NULL ) { + if( curr->length == 0 ) { + curr = curr->next; + continue; + } + if( ptr + curr->length < mud->mqtt_state.out_buffer_length ) { + c_memcpy( mud->mqtt_state.out_buffer + ptr, curr->data, curr->length ); + ptr += curr->length; + } + c_free(curr->data); + c_free(curr); + curr = curr->next; + } + c_free(first); + if( ptr == 0 ) { + return luaL_error( L, "invalid data" ); + } + mud->mqtt_state.outbound_message->data = mud->mqtt_state.out_buffer; + mud->mqtt_state.outbound_message->length = ptr; + stack++; + } else { + NODE_DBG("subscribe string\n"); + topic = luaL_checklstring( L, stack, &il ); + stack++; + if( topic == NULL ) + return luaL_error( L, "need topic name" ); + qos = luaL_checkinteger( L, stack ); + mud->mqtt_state.outbound_message = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &mud->mqtt_state.pending_msg_id ); + stack++; + } + + mud->send_timeout = MQTT_SEND_TIMEOUT; + mud->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_SUBSCRIBE; + mud->mqtt_state.pending_publish_qos = mqtt_get_qos( mud->mqtt_state.outbound_message->data ); + + if( lua_type( L, stack ) == LUA_TFUNCTION || lua_type( L, stack ) == LUA_TLIGHTFUNCTION ) { + lua_pushvalue( L, stack ); // copy argument (func) to the top of stack + if( mud->cb_suback_ref != LUA_NOREF ) + luaL_unref( L, LUA_REGISTRYINDEX, mud->cb_suback_ref ); + mud->cb_suback_ref = luaL_ref( L, LUA_REGISTRYINDEX ); + } + NODE_DBG("Sent: %d\n", mud->mqtt_state.outbound_message->length); + if( mud->secure ) + espconn_secure_sent( mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length ); + else + espconn_sent( mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length ); + + return 0; } // Lua: mqtt:publish( topic, payload, qos, retain, function() )