Merge pull request #1179 from pjsg/unsubscribe

MQTT Unsubscribe support
This commit is contained in:
Marcel Stör 2016-03-28 12:23:10 +02:00
commit 117df40f58
4 changed files with 201 additions and 54 deletions

View File

@ -62,6 +62,7 @@ typedef struct lmqtt_userdata
int cb_disconnect_ref;
int cb_message_ref;
int cb_suback_ref;
int cb_unsuback_ref;
int cb_puback_ref;
mqtt_state_t mqtt_state;
mqtt_connect_info_t connect_info;
@ -124,10 +125,8 @@ static void mqtt_socket_disconnected(void *arg) // tcp only
mud->pesp_conn = NULL;
}
if(mud->self_ref != LUA_NOREF){ // TODO: should we unref the client and delete it?
luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self
}
luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self
}
if(call_back){
@ -363,6 +362,14 @@ READPACKET:
if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && pending_msg->msg_id == msg_id){
NODE_DBG("MQTT: UnSubscribe successful\r\n");
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
if (mud->cb_unsuback_ref == LUA_NOREF)
break;
if (mud->self_ref == LUA_NOREF)
break;
lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_unsuback_ref);
lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref);
lua_call(L, 1, 0);
}
break;
case MQTT_MSG_TYPE_PUBLISH:
@ -823,35 +830,21 @@ static int mqtt_delete( lua_State* L )
// -------
// free (unref) callback ref
if(LUA_NOREF!=mud->cb_connect_ref){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref);
mud->cb_connect_ref = LUA_NOREF;
}
if(LUA_NOREF!=mud->cb_connect_fail_ref){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref);
mud->cb_connect_fail_ref = LUA_NOREF;
}
if(LUA_NOREF!=mud->cb_disconnect_ref){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref);
mud->cb_disconnect_ref = LUA_NOREF;
}
if(LUA_NOREF!=mud->cb_message_ref){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_message_ref);
mud->cb_message_ref = LUA_NOREF;
}
if(LUA_NOREF!=mud->cb_suback_ref){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_suback_ref);
mud->cb_suback_ref = LUA_NOREF;
}
if(LUA_NOREF!=mud->cb_puback_ref){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
mud->cb_puback_ref = LUA_NOREF;
}
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref);
mud->cb_connect_ref = LUA_NOREF;
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref);
mud->cb_connect_fail_ref = LUA_NOREF;
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref);
mud->cb_disconnect_ref = LUA_NOREF;
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_message_ref);
mud->cb_message_ref = LUA_NOREF;
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_suback_ref);
mud->cb_suback_ref = LUA_NOREF;
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
mud->cb_puback_ref = LUA_NOREF;
lua_gc(L, LUA_GCSTOP, 0);
if(LUA_NOREF!=mud->self_ref){
luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = LUA_NOREF;
}
luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = LUA_NOREF;
lua_gc(L, LUA_GCRESTART, 0);
NODE_DBG("leave mqtt_delete.\n");
return 0;
@ -1060,8 +1053,7 @@ static int mqtt_socket_connect( lua_State* L )
// call back function when a connection is obtained, tcp only
if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){
lua_pushvalue(L, stack); // copy argument (func) to the top of stack
if(mud->cb_connect_ref != LUA_NOREF)
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref);
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref);
mud->cb_connect_ref = luaL_ref(L, LUA_REGISTRYINDEX);
stack++;
}
@ -1069,15 +1061,13 @@ static int mqtt_socket_connect( lua_State* L )
// call back function when a connection fails
if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){
lua_pushvalue(L, stack); // copy argument (func) to the top of stack
if(mud->cb_connect_fail_ref != LUA_NOREF)
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref);
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref);
mud->cb_connect_fail_ref = luaL_ref(L, LUA_REGISTRYINDEX);
stack++;
}
lua_pushvalue(L, 1); // copy userdata to the top of stack
if(mud->self_ref != LUA_NOREF)
luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref);
luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
espconn_status = espconn_regist_connectcb(pesp_conn, mqtt_socket_connected);
@ -1182,16 +1172,13 @@ static int mqtt_socket_on( lua_State* L )
lua_pushvalue(L, 3); // copy argument (func) to the top of stack
if( sl == 7 && c_strcmp(method, "connect") == 0){
if(mud->cb_connect_ref != LUA_NOREF)
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref);
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref);
mud->cb_connect_ref = luaL_ref(L, LUA_REGISTRYINDEX);
}else if( sl == 7 && c_strcmp(method, "offline") == 0){
if(mud->cb_disconnect_ref != LUA_NOREF)
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref);
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref);
mud->cb_disconnect_ref = luaL_ref(L, LUA_REGISTRYINDEX);
}else if( sl == 7 && c_strcmp(method, "message") == 0){
if(mud->cb_message_ref != LUA_NOREF)
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_message_ref);
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_message_ref);
mud->cb_message_ref = luaL_ref(L, LUA_REGISTRYINDEX);
}else{
lua_pop(L, 1);
@ -1201,6 +1188,118 @@ static int mqtt_socket_on( lua_State* L )
return 0;
}
// Lua: bool = mqtt:unsubscribe(topic, function())
static int mqtt_socket_unsubscribe( lua_State* L ) {
NODE_DBG("enter mqtt_socket_unsubscribe.\n");
uint8_t stack = 1;
uint16_t msg_id = 0;
const char *topic;
size_t il;
lmqtt_userdata *mud;
mud = (lmqtt_userdata *) luaL_checkudata( L, stack, "mqtt.socket" );
luaL_argcheck( L, mud, stack, "mqtt.socket expected" );
stack++;
if(mud==NULL){
NODE_DBG("userdata is nil.\n");
lua_pushboolean(L, 0);
return 1;
}
if(mud->pesp_conn == NULL){
NODE_DBG("mud->pesp_conn is NULL.\n");
lua_pushboolean(L, 0);
return 1;
}
if(!mud->connected){
luaL_error( L, "not connected" );
lua_pushboolean(L, 0);
return 1;
}
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = NULL;
if( lua_istable( L, stack ) ) {
NODE_DBG("unsubscribe table\n");
lua_pushnil( L ); /* first key */
int topic_count = 0;
uint8_t overflow = 0;
while( lua_next( L, stack ) != 0 ) {
topic = luaL_checkstring( L, -2 );
if (topic_count == 0) {
temp_msg = mqtt_msg_unsubscribe_init( &mud->mqtt_state.mqtt_connection, &msg_id );
}
temp_msg = mqtt_msg_unsubscribe_topic( &mud->mqtt_state.mqtt_connection, topic );
topic_count++;
NODE_DBG("topic: %s - length: %d\n", topic, temp_msg->length);
if (temp_msg->length == 0) {
lua_pop(L, 1);
overflow = 1;
break; // too long message for the outbuffer.
}
lua_pop( L, 1 );
}
if (topic_count == 0){
return luaL_error( L, "no topics found" );
}
if (overflow != 0){
return luaL_error( L, "buffer overflow, can't enqueue all unsubscriptions" );
}
temp_msg = mqtt_msg_unsubscribe_fini( &mud->mqtt_state.mqtt_connection );
if (temp_msg->length == 0) {
return luaL_error( L, "buffer overflow, can't enqueue all unsubscriptions" );
}
stack++;
} else {
NODE_DBG("unsubscribe string\n");
topic = luaL_checklstring( L, stack, &il );
stack++;
if( topic == NULL ){
return luaL_error( L, "need topic name" );
}
temp_msg = mqtt_msg_unsubscribe( &mud->mqtt_state.mqtt_connection, topic, &msg_id );
}
if( lua_type( L, stack ) == LUA_TFUNCTION || lua_type( L, stack ) == LUA_TLIGHTFUNCTION ) { // TODO: this will overwrite the previous one.
lua_pushvalue( L, stack ); // copy argument (func) to the top of stack
luaL_unref( L, LUA_REGISTRYINDEX, mud->cb_unsuback_ref );
mud->cb_unsuback_ref = luaL_ref( L, LUA_REGISTRYINDEX );
}
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_UNSUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length);
NODE_DBG("msg_size: %d, event_timeout: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)), mud->event_timeout);
sint8 espconn_status = ESPCONN_IF;
espconn_status = mqtt_send_if_possible(mud->pesp_conn);
if(!node || espconn_status != ESPCONN_OK){
lua_pushboolean(L, 0);
} else {
lua_pushboolean(L, 1); // enqueued succeed.
}
NODE_DBG("unsubscribe, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
NODE_DBG("leave mqtt_socket_unsubscribe.\n");
return 1;
}
// Lua: bool = mqtt:subscribe(topic, qos, function())
static int mqtt_socket_subscribe( lua_State* L ) {
NODE_DBG("enter mqtt_socket_subscribe.\n");
@ -1292,8 +1391,7 @@ static int mqtt_socket_subscribe( lua_State* L ) {
if( lua_type( L, stack ) == LUA_TFUNCTION || lua_type( L, stack ) == LUA_TLIGHTFUNCTION ) { // TODO: this will overwrite the previous one.
lua_pushvalue( L, stack ); // copy argument (func) to the top of stack
if( mud->cb_suback_ref != LUA_NOREF )
luaL_unref( L, LUA_REGISTRYINDEX, mud->cb_suback_ref );
luaL_unref( L, LUA_REGISTRYINDEX, mud->cb_suback_ref );
mud->cb_suback_ref = luaL_ref( L, LUA_REGISTRYINDEX );
}
@ -1368,8 +1466,7 @@ static int mqtt_socket_publish( lua_State* L )
if (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION){
lua_pushvalue(L, stack); // copy argument (func) to the top of stack
if(mud->cb_puback_ref != LUA_NOREF)
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
mud->cb_puback_ref = luaL_ref(L, LUA_REGISTRYINDEX);
}
@ -1475,6 +1572,7 @@ static const LUA_REG_TYPE mqtt_socket_map[] = {
{ LSTRKEY( "close" ), LFUNCVAL( mqtt_socket_close ) },
{ LSTRKEY( "publish" ), LFUNCVAL( mqtt_socket_publish ) },
{ LSTRKEY( "subscribe" ), LFUNCVAL( mqtt_socket_subscribe ) },
{ LSTRKEY( "unsubscribe" ), LFUNCVAL( mqtt_socket_unsubscribe ) },
{ LSTRKEY( "lwt" ), LFUNCVAL( mqtt_socket_lwt ) },
{ LSTRKEY( "on" ), LFUNCVAL( mqtt_socket_on ) },
{ LSTRKEY( "__gc" ), LFUNCVAL( mqtt_delete ) },

View File

@ -447,20 +447,40 @@ mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* to
return result;
}
mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id)
mqtt_message_t* mqtt_msg_unsubscribe_init(mqtt_connection_t* connection, uint16_t *message_id)
{
init_message(connection);
return mqtt_msg_subscribe_init(connection, message_id);
}
mqtt_message_t* mqtt_msg_unsubscribe_topic(mqtt_connection_t* connection, const char* topic)
{
if(topic == NULL || topic[0] == '\0')
return fail_message(connection);
if((*message_id = append_message_id(connection, 0)) == 0)
return fail_message(connection);
if(append_string(connection, topic, c_strlen(topic)) < 0)
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
return &connection->message;
}
mqtt_message_t* mqtt_msg_unsubscribe_fini(mqtt_connection_t* connection)
{
return fini_message(connection, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0);
}
mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id)
{
mqtt_message_t* result;
result = mqtt_msg_unsubscribe_init(connection, message_id);
if (result->length != 0) {
result = mqtt_msg_unsubscribe_topic(connection, topic);
}
if (result->length != 0) {
result = mqtt_msg_unsubscribe_fini(connection);
}
return result;
}
mqtt_message_t* mqtt_msg_pingreq(mqtt_connection_t* connection)

View File

@ -140,6 +140,10 @@ mqtt_message_t* mqtt_msg_subscribe_init(mqtt_connection_t* connection, uint16_t*
mqtt_message_t* mqtt_msg_subscribe_topic(mqtt_connection_t* connection, const char* topic, int qos);
mqtt_message_t* mqtt_msg_subscribe_fini(mqtt_connection_t* connection);
mqtt_message_t* mqtt_msg_unsubscribe_init(mqtt_connection_t* connection, uint16_t* message_id);
mqtt_message_t* mqtt_msg_unsubscribe_topic(mqtt_connection_t* connection, const char* topic);
mqtt_message_t* mqtt_msg_unsubscribe_fini(mqtt_connection_t* connection);
#ifdef __cplusplus
}

View File

@ -184,3 +184,28 @@ m:subscribe("/topic",0, function(conn) print("subscribe success") end)
-- or subscribe multiple topic (topic/0, qos = 0; topic/1, qos = 1; topic2 , qos = 2)
m:subscribe({["topic/0"]=0,["topic/1"]=1,topic2=2}, function(conn) print("subscribe success") end)
```
## mqtt.client:unsubscribe()
Unsubscribes from one or several topics.
#### Syntax
`mqtt:unsubscribe(topic[, function(client)])`
`mqtt:unsubscribe(table[, function(client)])`
#### Parameters
- `topic` a [topic string](http://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices)
- `table` array of 'topic, anything' pairs to unsubscribe from
- `function(client)` optional callback fired when unsubscription(s) succeeded
#### Returns
`true` on success, `false` otherwise
#### Example
```lua
-- unsubscribe topic
m:unsubscribe("/topic", function(conn) print("unsubscribe success") end)
-- or unsubscribe multiple topic (topic/0; topic/1; topic2)
m:unsubscribe({["topic/0"]=0,["topic/1"]=0,topic2="anything"}, function(conn) print("unsubscribe success") end)
```