Add support multiple subscriptions, resolve nodemcu/nodemcu-firmware#152

This commit is contained in:
Tuan PM 2015-02-02 16:58:54 +07:00
parent 055c55a73c
commit c5e9d800a1
2 changed files with 119 additions and 47 deletions

View File

@ -220,7 +220,8 @@ m:connect("192.168.11.118", 1880, 0, function(conn) print("connected") end)
-- subscribe topic with qos = 0 -- subscribe topic with qos = 0
m:subscribe("/topic",0, function(conn) print("subscribe success") end) m:subscribe("/topic",0, function(conn) print("subscribe success") end)
-- or subscribe multiple topic (topic/0, qos = 0; topic/1, qos = 1; topic2 , qos = 2)
-- m:subscribe({["topic/0"]=0,["topic/1"]=1,topic2=2}, function(conn) print("subscribe success") end)
-- publish a message with data = hello, QoS = 0, retain = 0 -- publish a message with data = hello, QoS = 0, retain = 0
m:publish("/topic","hello",0,0, function(conn) print("sent") end) m:publish("/topic","hello",0,0, function(conn) print("sent") end)

View File

@ -529,23 +529,35 @@ static int mqtt_delete( lua_State* L )
} }
if(mud->connect_info.will_topic){ if(mud->connect_info.will_topic){
c_free(mud->connect_info.client_id); c_free(mud->connect_info.will_topic);
mud->connect_info.will_topic = NULL;
} }
if(mud->connect_info.will_message){ if(mud->connect_info.will_message){
c_free(mud->connect_info.will_message); c_free(mud->connect_info.will_message);
mud->connect_info.will_message = NULL;
} }
if(mud->connect_info.client_id) if(mud->connect_info.client_id){
c_free(mud->connect_info.client_id); c_free(mud->connect_info.client_id);
if(mud->connect_info.username) mud->connect_info.client_id = NULL;
}
if(mud->connect_info.username){
c_free(mud->connect_info.username); c_free(mud->connect_info.username);
if(mud->connect_info.password) mud->connect_info.username = NULL;
}
if(mud->connect_info.password){
c_free(mud->connect_info.password); c_free(mud->connect_info.password);
if(mud->mqtt_state.in_buffer) mud->connect_info.password = NULL;
}
if(mud->mqtt_state.in_buffer){
c_free(mud->mqtt_state.in_buffer); c_free(mud->mqtt_state.in_buffer);
if(mud->mqtt_state.out_buffer) mud->mqtt_state.in_buffer = NULL;
}
if(mud->mqtt_state.out_buffer){
c_free(mud->mqtt_state.out_buffer); c_free(mud->mqtt_state.out_buffer);
mud->mqtt_state.out_buffer = NULL;
}
// free (unref) callback ref // free (unref) callback ref
if(LUA_NOREF!=mud->cb_connect_ref){ if(LUA_NOREF!=mud->cb_connect_ref){
@ -879,52 +891,111 @@ static int mqtt_socket_on( lua_State* L )
} }
// Lua: mqtt:subscribe(topic, qos, function()) // Lua: mqtt:subscribe(topic, qos, function())
static int mqtt_socket_subscribe( lua_State* L ) static int mqtt_socket_subscribe( lua_State* L ) {
{ NODE_DBG("mqtt_socket_subscribe is called.\n");
NODE_DBG("mqtt_socket_subscribe is called.\n"); typedef struct SUB_STORAGE {
uint8_t stack = 1, qos = 0, retain = 0; uint32_t length;
const char *topic; uint8_t *data;
size_t il; struct SUB_STORAGE *next;
lmqtt_userdata *mud; } SUB_STORAGE;
mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket"); uint8_t stack = 1, qos = 0;
luaL_argcheck(L, mud, stack, "mqtt.socket expected"); const char *topic;
stack++; size_t il;
lmqtt_userdata *mud;
if(mud->send_timeout != 0)
return luaL_error( L, "sending in process" );
if(!mud->connected) mud = (lmqtt_userdata *) luaL_checkudata( L, stack, "mqtt.socket" );
return luaL_error(L, "not connected"); luaL_argcheck( L, mud, stack, "mqtt.socket expected" );
stack++;
topic = luaL_checklstring( L, stack, &il ); if( mud->send_timeout != 0 )
stack++; return luaL_error( L, "sending in process" );
if(topic == NULL)
return luaL_error( L, "need topic name" );
qos = luaL_checkinteger( L, stack);
stack++;
mud->mqtt_state.outbound_message = mqtt_msg_subscribe(&mud->mqtt_state.mqtt_connection, if( !mud->connected )
topic, qos, return luaL_error( L, "not connected" );
&mud->mqtt_state.pending_msg_id);
mud->send_timeout = MQTT_SEND_TIMEOUT;
mud->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_SUBSCRIBE;
mud->mqtt_state.pending_publish_qos = mqtt_get_qos(mud->mqtt_state.outbound_message->data);
if (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION){ if( lua_istable( L, stack ) ) {
lua_pushvalue(L, stack); // copy argument (func) to the top of stack NODE_DBG("subscribe table\n");
if(mud->cb_suback_ref != LUA_NOREF) lua_pushnil( L ); /* first key */
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_suback_ref); SUB_STORAGE *first, *last, *curr;
mud->cb_suback_ref = luaL_ref(L, LUA_REGISTRYINDEX); first = (SUB_STORAGE*) c_zalloc(sizeof(SUB_STORAGE));
} if( first == NULL )
return luaL_error( L, "not enough memory" );
first->length = 0;
last = first;
first->next = NULL;
while( lua_next( L, stack ) != 0 ) {
curr = (SUB_STORAGE*) c_zalloc(sizeof(SUB_STORAGE));
if(mud->secure) if( curr == NULL )
espconn_secure_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); return luaL_error( L, "not enough memory" );
else topic = luaL_checkstring( L, -2 );
espconn_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); qos = luaL_checkinteger( L, -1 );
return 0; mud->mqtt_state.outbound_message = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &mud->mqtt_state.pending_msg_id );
NODE_DBG("topic: %s - qos: %d, length: %d\n", topic, qos, mud->mqtt_state.outbound_message->length);
curr->data = (uint8_t*) c_zalloc(mud->mqtt_state.outbound_message->length);
if( curr->data == NULL )
return luaL_error( L, "not enough memory" );
c_memcpy( curr->data, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length );
curr->length = mud->mqtt_state.outbound_message->length;
curr->next = NULL;
last->next = curr;
last = curr;
lua_pop( L, 1 );
}
curr = first;
uint32_t ptr = 0;
while( curr != NULL ) {
if( curr->length == 0 ) {
curr = curr->next;
continue;
}
if( ptr + curr->length < mud->mqtt_state.out_buffer_length ) {
c_memcpy( mud->mqtt_state.out_buffer + ptr, curr->data, curr->length );
ptr += curr->length;
}
c_free(curr->data);
c_free(curr);
curr = curr->next;
}
c_free(first);
if( ptr == 0 ) {
return luaL_error( L, "invalid data" );
}
mud->mqtt_state.outbound_message->data = mud->mqtt_state.out_buffer;
mud->mqtt_state.outbound_message->length = ptr;
stack++;
} else {
NODE_DBG("subscribe string\n");
topic = luaL_checklstring( L, stack, &il );
stack++;
if( topic == NULL )
return luaL_error( L, "need topic name" );
qos = luaL_checkinteger( L, stack );
mud->mqtt_state.outbound_message = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &mud->mqtt_state.pending_msg_id );
stack++;
}
mud->send_timeout = MQTT_SEND_TIMEOUT;
mud->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_SUBSCRIBE;
mud->mqtt_state.pending_publish_qos = mqtt_get_qos( mud->mqtt_state.outbound_message->data );
if( lua_type( L, stack ) == LUA_TFUNCTION || lua_type( L, stack ) == LUA_TLIGHTFUNCTION ) {
lua_pushvalue( L, stack ); // copy argument (func) to the top of stack
if( mud->cb_suback_ref != LUA_NOREF )
luaL_unref( L, LUA_REGISTRYINDEX, mud->cb_suback_ref );
mud->cb_suback_ref = luaL_ref( L, LUA_REGISTRYINDEX );
}
NODE_DBG("Sent: %d\n", mud->mqtt_state.outbound_message->length);
if( mud->secure )
espconn_secure_sent( mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length );
else
espconn_sent( mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length );
return 0;
} }
// Lua: mqtt:publish( topic, payload, qos, retain, function() ) // Lua: mqtt:publish( topic, payload, qos, retain, function() )