Fix problem with subscribing to multiple topics

This commit is contained in:
philip 2016-03-09 22:45:20 -05:00
parent dd6359e7db
commit 8f355d5d5f
3 changed files with 51 additions and 28 deletions

View File

@ -1154,51 +1154,49 @@ static int mqtt_socket_subscribe( lua_State* L ) {
NODE_DBG("subscribe table\n"); NODE_DBG("subscribe table\n");
lua_pushnil( L ); /* first key */ lua_pushnil( L ); /* first key */
uint8_t temp_buf[MQTT_BUF_SIZE]; int topic_count = 0;
uint32_t temp_pos = 0;
uint8_t overflow = 0; uint8_t overflow = 0;
while( lua_next( L, stack ) != 0 ) { while( lua_next( L, stack ) != 0 ) {
topic = luaL_checkstring( L, -2 ); topic = luaL_checkstring( L, -2 );
qos = luaL_checkinteger( L, -1 ); qos = luaL_checkinteger( L, -1 );
temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id ); if (topic_count == 0) {
temp_msg = mqtt_msg_subscribe_init( &mud->mqtt_state.mqtt_connection, &msg_id );
}
temp_msg = mqtt_msg_subscribe_topic( &mud->mqtt_state.mqtt_connection, topic, qos );
topic_count++;
NODE_DBG("topic: %s - qos: %d, length: %d\n", topic, qos, temp_msg->length); NODE_DBG("topic: %s - qos: %d, length: %d\n", topic, qos, temp_msg->length);
if (temp_pos + temp_msg->length > MQTT_BUF_SIZE){ if (temp_msg->length == 0) {
lua_pop(L, 1); lua_pop(L, 1);
overflow = 1; overflow = 1;
break; // too long message for the outbuffer. break; // too long message for the outbuffer.
} }
c_memcpy( temp_buf + temp_pos, temp_msg->data, temp_msg->length );
temp_pos += temp_msg->length;
lua_pop( L, 1 ); lua_pop( L, 1 );
} }
if (temp_pos == 0){ if (topic_count == 0){
luaL_error( L, "invalid data" ); return luaL_error( L, "no topics found" );
lua_pushboolean(L, 0);
return 1;
} }
if (overflow != 0){ if (overflow != 0){
luaL_error( L, "buffer overflow, can't enqueue all subscriptions" ); return luaL_error( L, "buffer overflow, can't enqueue all subscriptions" );
lua_pushboolean(L, 0); }
return 1;
temp_msg = mqtt_msg_subscribe_fini( &mud->mqtt_state.mqtt_connection );
if (temp_msg->length == 0) {
return luaL_error( L, "buffer overflow, can't enqueue all subscriptions" );
} }
c_memcpy( temp_buffer, temp_buf, temp_pos );
temp_msg->data = temp_buffer;
temp_msg->length = temp_pos;
stack++; stack++;
} else { } else {
NODE_DBG("subscribe string\n"); NODE_DBG("subscribe string\n");
topic = luaL_checklstring( L, stack, &il ); topic = luaL_checklstring( L, stack, &il );
stack++; stack++;
if( topic == NULL ){ if( topic == NULL ){
luaL_error( L, "need topic name" ); return luaL_error( L, "need topic name" );
lua_pushboolean(L, 0);
return 1;
} }
qos = luaL_checkinteger( L, stack ); qos = luaL_checkinteger( L, stack );
temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id ); temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id );
@ -1257,17 +1255,13 @@ static int mqtt_socket_publish( lua_State* L )
} }
if(!mud->connected){ if(!mud->connected){
luaL_error( L, "not connected" ); return luaL_error( L, "not connected" );
lua_pushboolean(L, 0);
return 1;
} }
const char *topic = luaL_checklstring( L, stack, &l ); const char *topic = luaL_checklstring( L, stack, &l );
stack ++; stack ++;
if (topic == NULL){ if (topic == NULL){
luaL_error( L, "need topic" ); return luaL_error( L, "need topic" );
lua_pushboolean(L, 0);
return 1;
} }
const char *payload = luaL_checklstring( L, stack, &l ); const char *payload = luaL_checklstring( L, stack, &l );

View File

@ -402,14 +402,19 @@ mqtt_message_t* mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
} }
mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id) mqtt_message_t* mqtt_msg_subscribe_init(mqtt_connection_t* connection, uint16_t *message_id)
{ {
init_message(connection); init_message(connection);
if(topic == NULL || topic[0] == '\0') if((*message_id = append_message_id(connection, 0)) == 0)
return fail_message(connection); return fail_message(connection);
if((*message_id = append_message_id(connection, 0)) == 0) return &connection->message;
}
mqtt_message_t* mqtt_msg_subscribe_topic(mqtt_connection_t* connection, const char* topic, int qos)
{
if(topic == NULL || topic[0] == '\0')
return fail_message(connection); return fail_message(connection);
if(append_string(connection, topic, c_strlen(topic)) < 0) if(append_string(connection, topic, c_strlen(topic)) < 0)
@ -419,9 +424,29 @@ mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* to
return fail_message(connection); return fail_message(connection);
connection->buffer[connection->message.length++] = qos; connection->buffer[connection->message.length++] = qos;
return &connection->message;
}
mqtt_message_t* mqtt_msg_subscribe_fini(mqtt_connection_t* connection)
{
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
} }
mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id)
{
mqtt_message_t* result;
result = mqtt_msg_subscribe_init(connection, message_id);
if (result->length != 0) {
result = mqtt_msg_subscribe_topic(connection, topic, qos);
}
if (result->length != 0) {
result = mqtt_msg_subscribe_fini(connection);
}
return result;
}
mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id) mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id)
{ {
init_message(connection); init_message(connection);

View File

@ -120,6 +120,10 @@ mqtt_message_t* mqtt_msg_pingreq(mqtt_connection_t* connection);
mqtt_message_t* mqtt_msg_pingresp(mqtt_connection_t* connection); mqtt_message_t* mqtt_msg_pingresp(mqtt_connection_t* connection);
mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection); mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection);
mqtt_message_t* mqtt_msg_subscribe_init(mqtt_connection_t* connection, uint16_t* message_id);
mqtt_message_t* mqtt_msg_subscribe_topic(mqtt_connection_t* connection, const char* topic, int qos);
mqtt_message_t* mqtt_msg_subscribe_fini(mqtt_connection_t* connection);
#ifdef __cplusplus #ifdef __cplusplus
} }