diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index 035c0059..09b4d454 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -61,8 +61,8 @@ typedef struct mqtt_state_t { uint16_t port; mqtt_connect_info_t* connect_info; - mqtt_connection_t mqtt_connection; msg_queue_t* pending_msg_q; + uint16_t next_message_id; uint8_t * recv_buffer; // heap buffer for multi-packet rx uint8_t * recv_buffer_wp; // write pointer in multi-packet rx @@ -108,6 +108,15 @@ static void mqtt_socket_reconnected(void *arg, sint8_t err); static void mqtt_socket_connected(void *arg); static void mqtt_connack_fail(lmqtt_userdata * mud, int reason_code); +static uint16_t mqtt_next_message_id(lmqtt_userdata * mud) +{ + mud->mqtt_state.next_message_id++; + if (mud->mqtt_state.next_message_id == 0) + mud->mqtt_state.next_message_id++; + + return mud->mqtt_state.next_message_id; +} + static void mqtt_socket_disconnected(void *arg) // tcp only { NODE_DBG("enter mqtt_socket_disconnected.\n"); @@ -399,7 +408,8 @@ READPACKET: // temp buffer for control messages uint8_t temp_buffer[MQTT_BUF_SIZE]; - mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); + mqtt_message_buffer_t msgb; + mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE); mqtt_message_t *temp_msg = NULL; lua_State *L = lua_getstate(); @@ -450,8 +460,6 @@ READPACKET: mud->connState = MQTT_DATA; NODE_DBG("MQTT: Connected\r\n"); mud->keepalive_sent = 0; - luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); - mud->cb_connect_fail_ref = LUA_NOREF; if(mud->cb_connect_ref == LUA_NOREF) break; if(mud->self_ref == LUA_NOREF) @@ -492,12 +500,12 @@ READPACKET: // buffering and special code to handle this corner-case. Server will most likely have // written all to OS socket anyway, and not be aware that we "should" not have received it all yet. if(msg_qos == 1){ - temp_msg = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id); + temp_msg = mqtt_msg_puback(&msgb, msg_id); msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBACK, (int)mqtt_get_qos(temp_msg->data) ); } else if(msg_qos == 2){ - temp_msg = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id); + temp_msg = mqtt_msg_pubrec(&msgb, msg_id); msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBREC, (int)mqtt_get_qos(temp_msg->data) ); } @@ -596,12 +604,12 @@ READPACKET: break; case MQTT_MSG_TYPE_PUBLISH: if(msg_qos == 1){ - temp_msg = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id); + temp_msg = mqtt_msg_puback(&msgb, msg_id); msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBACK, (int)mqtt_get_qos(temp_msg->data) ); } else if(msg_qos == 2){ - temp_msg = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id); + temp_msg = mqtt_msg_pubrec(&msgb, msg_id); msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBREC, (int)mqtt_get_qos(temp_msg->data) ); } @@ -629,7 +637,7 @@ READPACKET: NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n"); // Note: actually, should not destroy the msg until PUBCOMP is received. msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - temp_msg = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id); + temp_msg = mqtt_msg_pubrel(&msgb, msg_id); msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBREL, (int)mqtt_get_qos(temp_msg->data) ); NODE_DBG("MQTT: Response PUBREL\r\n"); @@ -638,7 +646,7 @@ READPACKET: case MQTT_MSG_TYPE_PUBREL: if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg->msg_id == msg_id){ msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - temp_msg = mqtt_msg_pubcomp(&mud->mqtt_state.mqtt_connection, msg_id); + temp_msg = mqtt_msg_pubcomp(&msgb, msg_id); msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBCOMP, (int)mqtt_get_qos(temp_msg->data) ); NODE_DBG("MQTT: Response PUBCOMP\r\n"); @@ -658,7 +666,7 @@ READPACKET: } break; case MQTT_MSG_TYPE_PINGREQ: - temp_msg = mqtt_msg_pingresp(&mud->mqtt_state.mqtt_connection); + temp_msg = mqtt_msg_pingresp(&msgb); msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PINGRESP, (int)mqtt_get_qos(temp_msg->data) ); NODE_DBG("MQTT: Response PINGRESP\r\n"); @@ -770,10 +778,12 @@ static void mqtt_socket_connected(void *arg) espconn_regist_disconcb(pesp_conn, mqtt_socket_disconnected); uint8_t temp_buffer[MQTT_BUF_SIZE]; - // call mqtt_connect() to start a mqtt connect stage. - mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); - mqtt_message_t* temp_msg = mqtt_msg_connect(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.connect_info); + mqtt_message_buffer_t msgb; + mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE); + + mqtt_message_t* temp_msg = mqtt_msg_connect(&msgb, mud->mqtt_state.connect_info); NODE_DBG("Send MQTT connection infomation, data len: %d, d[0]=%d \r\n", temp_msg->length, temp_msg->data[0]); + mud->event_timeout = MQTT_SEND_TIMEOUT; // not queue this message. should send right now. or should enqueue this before head. #ifdef CLIENT_SSL_ENABLE @@ -879,9 +889,11 @@ void mqtt_socket_timer(void *arg) mqtt_socket_reconnected(mud->pesp_conn, 0); } else { uint8_t temp_buffer[MQTT_BUF_SIZE]; - mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); + mqtt_message_buffer_t msgb; + mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE); + NODE_DBG("\r\nMQTT: Send keepalive packet\r\n"); - mqtt_message_t* temp_msg = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection); + mqtt_message_t* temp_msg = mqtt_msg_pingreq(&msgb); msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg, 0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) ); mud->keepalive_sent = 1; @@ -1380,8 +1392,12 @@ static int mqtt_socket_close( lua_State* L ) sint8 espconn_status = ESPCONN_CONN; if (mud->connected) { + uint8_t temp_buffer[MQTT_BUF_SIZE]; + mqtt_message_buffer_t msgb; + mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE); + // Send disconnect message - mqtt_message_t* temp_msg = mqtt_msg_disconnect(&mud->mqtt_state.mqtt_connection); + mqtt_message_t* temp_msg = mqtt_msg_disconnect(&msgb); NODE_DBG("Send MQTT disconnect infomation, data len: %d, d[0]=%d \r\n", temp_msg->length, temp_msg->data[0]); #ifdef CLIENT_SSL_ENABLE @@ -1437,6 +1453,9 @@ static int mqtt_socket_on( lua_State* L ) if( sl == 7 && strcmp(method, "connect") == 0){ luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref); mud->cb_connect_ref = luaL_ref(L, LUA_REGISTRYINDEX); + }else if( sl == 7 && strcmp(method, "connfail") == 0){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref); + mud->cb_connect_ref = luaL_ref(L, LUA_REGISTRYINDEX); }else if( sl == 7 && strcmp(method, "offline") == 0){ luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); mud->cb_disconnect_ref = luaL_ref(L, LUA_REGISTRYINDEX); @@ -1468,7 +1487,7 @@ static int mqtt_socket_unsubscribe( lua_State* L ) { NODE_DBG("enter mqtt_socket_unsubscribe.\n"); uint8_t stack = 1; - uint16_t msg_id = 0; + uint16_t msg_id; const char *topic; size_t il; lmqtt_userdata *mud; @@ -1496,7 +1515,8 @@ static int mqtt_socket_unsubscribe( lua_State* L ) { } uint8_t temp_buffer[MQTT_BUF_SIZE]; - mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); + mqtt_message_buffer_t msgb; + mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE); mqtt_message_t *temp_msg = NULL; if( lua_istable( L, stack ) ) { @@ -1510,9 +1530,10 @@ static int mqtt_socket_unsubscribe( lua_State* L ) { topic = luaL_checkstring( L, -2 ); if (topic_count == 0) { - temp_msg = mqtt_msg_unsubscribe_init( &mud->mqtt_state.mqtt_connection, &msg_id ); + msg_id = mqtt_next_message_id(mud); + temp_msg = mqtt_msg_unsubscribe_init( &msgb, msg_id ); } - temp_msg = mqtt_msg_unsubscribe_topic( &mud->mqtt_state.mqtt_connection, topic ); + temp_msg = mqtt_msg_unsubscribe_topic( &msgb, topic ); topic_count++; NODE_DBG("topic: %s - length: %d\n", topic, temp_msg->length); @@ -1533,7 +1554,7 @@ static int mqtt_socket_unsubscribe( lua_State* L ) { return luaL_error( L, "buffer overflow, can't enqueue all unsubscriptions" ); } - temp_msg = mqtt_msg_unsubscribe_fini( &mud->mqtt_state.mqtt_connection ); + temp_msg = mqtt_msg_unsubscribe_fini( &msgb ); if (temp_msg->length == 0) { return luaL_error( L, "buffer overflow, can't enqueue all unsubscriptions" ); } @@ -1546,7 +1567,8 @@ static int mqtt_socket_unsubscribe( lua_State* L ) { if( topic == NULL ){ return luaL_error( L, "need topic name" ); } - temp_msg = mqtt_msg_unsubscribe( &mud->mqtt_state.mqtt_connection, topic, &msg_id ); + msg_id = mqtt_next_message_id(mud); + temp_msg = mqtt_msg_unsubscribe( &msgb, topic, msg_id ); } if( lua_type( L, stack ) == LUA_TFUNCTION || lua_type( L, stack ) == LUA_TLIGHTFUNCTION ) { // TODO: this will overwrite the previous one. @@ -1580,7 +1602,7 @@ static int mqtt_socket_subscribe( lua_State* L ) { NODE_DBG("enter mqtt_socket_subscribe.\n"); uint8_t stack = 1, qos = 0; - uint16_t msg_id = 0; + uint16_t msg_id; const char *topic; size_t il; lmqtt_userdata *mud; @@ -1608,7 +1630,8 @@ static int mqtt_socket_subscribe( lua_State* L ) { } uint8_t temp_buffer[MQTT_BUF_SIZE]; - mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); + mqtt_message_buffer_t msgb; + mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE); mqtt_message_t *temp_msg = NULL; if( lua_istable( L, stack ) ) { @@ -1623,9 +1646,10 @@ static int mqtt_socket_subscribe( lua_State* L ) { qos = luaL_checkinteger( L, -1 ); if (topic_count == 0) { - temp_msg = mqtt_msg_subscribe_init( &mud->mqtt_state.mqtt_connection, &msg_id ); + msg_id = mqtt_next_message_id(mud); + temp_msg = mqtt_msg_subscribe_init( &msgb, msg_id ); } - temp_msg = mqtt_msg_subscribe_topic( &mud->mqtt_state.mqtt_connection, topic, qos ); + temp_msg = mqtt_msg_subscribe_topic( &msgb, topic, qos ); topic_count++; NODE_DBG("topic: %s - qos: %d, length: %d\n", topic, qos, temp_msg->length); @@ -1646,7 +1670,7 @@ static int mqtt_socket_subscribe( lua_State* L ) { return luaL_error( L, "buffer overflow, can't enqueue all subscriptions" ); } - temp_msg = mqtt_msg_subscribe_fini( &mud->mqtt_state.mqtt_connection ); + temp_msg = mqtt_msg_subscribe_fini( &msgb ); if (temp_msg->length == 0) { return luaL_error( L, "buffer overflow, can't enqueue all subscriptions" ); } @@ -1660,7 +1684,8 @@ static int mqtt_socket_subscribe( lua_State* L ) { return luaL_error( L, "need topic name" ); } qos = luaL_checkinteger( L, stack ); - temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id ); + msg_id = mqtt_next_message_id(mud); + temp_msg = mqtt_msg_subscribe( &msgb, topic, qos, msg_id ); stack++; } @@ -1732,12 +1757,17 @@ static int mqtt_socket_publish( lua_State* L ) uint8_t retain = luaL_checkinteger( L, stack); stack ++; + if (qos != 0) { + msg_id = mqtt_next_message_id(mud); + } + uint8_t temp_buffer[MQTT_BUF_SIZE]; - mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); - mqtt_message_t *temp_msg = mqtt_msg_publish(&mud->mqtt_state.mqtt_connection, + mqtt_message_buffer_t msgb; + mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE); + mqtt_message_t *temp_msg = mqtt_msg_publish(&msgb, topic, payload, l, qos, retain, - &msg_id); + msg_id); if (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION){ lua_pushvalue(L, stack); // copy argument (func) to the top of stack diff --git a/app/mqtt/mqtt_msg.c b/app/mqtt/mqtt_msg.c index f3975695..aab91e20 100644 --- a/app/mqtt/mqtt_msg.c +++ b/app/mqtt/mqtt_msg.c @@ -54,76 +54,71 @@ struct __attribute((__packed__)) mqtt_connect_variable_header uint8_t keepaliveLsb; }; -static int append_string(mqtt_connection_t* connection, const char* string, int len) +static int append_string(mqtt_message_buffer_t *msgb, const char* string, int len) { - if(connection->message.length + len + 2 > connection->buffer_length) + if(msgb->message.length + len + 2 > msgb->buffer_length) return -1; - connection->buffer[connection->message.length++] = len >> 8; - connection->buffer[connection->message.length++] = len & 0xff; - memcpy(connection->buffer + connection->message.length, string, len); - connection->message.length += len; + msgb->buffer[msgb->message.length++] = len >> 8; + msgb->buffer[msgb->message.length++] = len & 0xff; + memcpy(msgb->buffer + msgb->message.length, string, len); + msgb->message.length += len; return len + 2; } -static uint16_t append_message_id(mqtt_connection_t* connection, uint16_t message_id) +static uint16_t append_message_id(mqtt_message_buffer_t* msgb, uint16_t message_id) { - // If message_id is zero then we should assign one, otherwise - // we'll use the one supplied by the caller - while(message_id == 0) - message_id = ++connection->message_id; - - if(connection->message.length + 2 > connection->buffer_length) + if(msgb->message.length + 2 > msgb->buffer_length) return 0; - connection->buffer[connection->message.length++] = message_id >> 8; - connection->buffer[connection->message.length++] = message_id & 0xff; + msgb->buffer[msgb->message.length++] = message_id >> 8; + msgb->buffer[msgb->message.length++] = message_id & 0xff; - return message_id; + return 1; } -static int init_message(mqtt_connection_t* connection) +static int init_message(mqtt_message_buffer_t* msgb) { - connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE; + msgb->message.length = MQTT_MAX_FIXED_HEADER_SIZE; return MQTT_MAX_FIXED_HEADER_SIZE; } -static mqtt_message_t* fail_message(mqtt_connection_t* connection) +static mqtt_message_t* fail_message(mqtt_message_buffer_t* msgb) { - connection->message.data = connection->buffer; - connection->message.length = 0; - return &connection->message; + msgb->message.data = msgb->buffer; + msgb->message.length = 0; + return &msgb->message; } -static mqtt_message_t* fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain) +static mqtt_message_t* fini_message(mqtt_message_buffer_t* msgb, int type, int dup, int qos, int retain) { - int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; + int remaining_length = msgb->message.length - MQTT_MAX_FIXED_HEADER_SIZE; if(remaining_length > 127) { - connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); - connection->buffer[1] = 0x80 | (remaining_length % 128); - connection->buffer[2] = remaining_length / 128; - connection->message.length = remaining_length + 3; - connection->message.data = connection->buffer; + msgb->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); + msgb->buffer[1] = 0x80 | (remaining_length % 128); + msgb->buffer[2] = remaining_length / 128; + msgb->message.length = remaining_length + 3; + msgb->message.data = msgb->buffer; } else { - connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); - connection->buffer[2] = remaining_length; - connection->message.length = remaining_length + 2; - connection->message.data = connection->buffer + 1; + msgb->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); + msgb->buffer[2] = remaining_length; + msgb->message.length = remaining_length + 2; + msgb->message.data = msgb->buffer + 1; } - return &connection->message; + return &msgb->message; } -void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length) +void mqtt_msg_init(mqtt_message_buffer_t* msgb, uint8_t* buffer, uint16_t buffer_length) { - memset(connection, 0, sizeof(connection)); - connection->buffer = buffer; - connection->buffer_length = buffer_length; + memset(msgb, 0, sizeof(msgb)); + msgb->buffer = buffer; + msgb->buffer_length = buffer_length; } // Returns total length of message, or -1 if not enough bytes are available @@ -286,16 +281,16 @@ uint16_t mqtt_get_id(uint8_t* buffer, uint16_t buffer_length) } } -mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info) +mqtt_message_t* mqtt_msg_connect(mqtt_message_buffer_t* msgb, mqtt_connect_info_t* info) { struct mqtt_connect_variable_header* variable_header; - init_message(connection); + init_message(msgb); - if(connection->message.length + sizeof(*variable_header) > connection->buffer_length) - return fail_message(connection); - variable_header = (void*)(connection->buffer + connection->message.length); - connection->message.length += sizeof(*variable_header); + if(msgb->message.length + sizeof(*variable_header) > msgb->buffer_length) + return fail_message(msgb); + variable_header = (void*)(msgb->buffer + msgb->message.length); + msgb->message.length += sizeof(*variable_header); variable_header->lengthMsb = 0; variable_header->lengthLsb = 4; @@ -310,19 +305,19 @@ mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_inf if(info->client_id != NULL && info->client_id[0] != '\0') { - if(append_string(connection, info->client_id, strlen(info->client_id)) < 0) - return fail_message(connection); + if(append_string(msgb, info->client_id, strlen(info->client_id)) < 0) + return fail_message(msgb); } else - return fail_message(connection); + return fail_message(msgb); if(info->will_topic != NULL && info->will_topic[0] != '\0') { - if(append_string(connection, info->will_topic, strlen(info->will_topic)) < 0) - return fail_message(connection); + if(append_string(msgb, info->will_topic, strlen(info->will_topic)) < 0) + return fail_message(msgb); - if(append_string(connection, info->will_message, strlen(info->will_message)) < 0) - return fail_message(connection); + if(append_string(msgb, info->will_message, strlen(info->will_message)) < 0) + return fail_message(msgb); variable_header->flags |= MQTT_CONNECT_FLAG_WILL; if(info->will_retain) @@ -332,176 +327,174 @@ mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_inf if(info->username != NULL && info->username[0] != '\0') { - if(append_string(connection, info->username, strlen(info->username)) < 0) - return fail_message(connection); + if(append_string(msgb, info->username, strlen(info->username)) < 0) + return fail_message(msgb); variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME; } if(info->password != NULL && info->password[0] != '\0') { - if(append_string(connection, info->password, strlen(info->password)) < 0) - return fail_message(connection); + if(append_string(msgb, info->password, strlen(info->password)) < 0) + return fail_message(msgb); variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD; } - return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0); + return fini_message(msgb, MQTT_MSG_TYPE_CONNECT, 0, 0, 0); } -mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id) +mqtt_message_t* mqtt_msg_publish(mqtt_message_buffer_t* msgb, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t message_id) { - init_message(connection); + init_message(msgb); if(topic == NULL || topic[0] == '\0') - return fail_message(connection); + return fail_message(msgb); - if(append_string(connection, topic, strlen(topic)) < 0) - return fail_message(connection); + if(append_string(msgb, topic, strlen(topic)) < 0) + return fail_message(msgb); if(qos > 0) { - if((*message_id = append_message_id(connection, 0)) == 0) - return fail_message(connection); + if(!append_message_id(msgb, message_id)) + return fail_message(msgb); } - else - *message_id = 0; - if(connection->message.length + data_length > connection->buffer_length) - return fail_message(connection); - memcpy(connection->buffer + connection->message.length, data, data_length); - connection->message.length += data_length; + if(msgb->message.length + data_length > msgb->buffer_length) + return fail_message(msgb); + memcpy(msgb->buffer + msgb->message.length, data, data_length); + msgb->message.length += data_length; - return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); + return fini_message(msgb, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); } -mqtt_message_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id) +mqtt_message_t* mqtt_msg_puback(mqtt_message_buffer_t* msgb, uint16_t message_id) { - init_message(connection); - if(append_message_id(connection, message_id) == 0) - return fail_message(connection); - return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0); + init_message(msgb); + if(!append_message_id(msgb, message_id)) + return fail_message(msgb); + return fini_message(msgb, MQTT_MSG_TYPE_PUBACK, 0, 0, 0); } -mqtt_message_t* mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id) +mqtt_message_t* mqtt_msg_pubrec(mqtt_message_buffer_t* msgb, uint16_t message_id) { - init_message(connection); - if(append_message_id(connection, message_id) == 0) - return fail_message(connection); - return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0); + init_message(msgb); + if(!append_message_id(msgb, message_id)) + return fail_message(msgb); + return fini_message(msgb, MQTT_MSG_TYPE_PUBREC, 0, 0, 0); } -mqtt_message_t* mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id) +mqtt_message_t* mqtt_msg_pubrel(mqtt_message_buffer_t* msgb, uint16_t message_id) { - init_message(connection); - if(append_message_id(connection, message_id) == 0) - return fail_message(connection); - return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0); + init_message(msgb); + if(!append_message_id(msgb, message_id)) + return fail_message(msgb); + return fini_message(msgb, MQTT_MSG_TYPE_PUBREL, 0, 1, 0); } -mqtt_message_t* mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id) +mqtt_message_t* mqtt_msg_pubcomp(mqtt_message_buffer_t* msgb, uint16_t message_id) { - init_message(connection); - if(append_message_id(connection, message_id) == 0) - return fail_message(connection); - return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); + init_message(msgb); + if(!append_message_id(msgb, message_id)) + return fail_message(msgb); + return fini_message(msgb, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); } -mqtt_message_t* mqtt_msg_subscribe_init(mqtt_connection_t* connection, uint16_t *message_id) +mqtt_message_t* mqtt_msg_subscribe_init(mqtt_message_buffer_t* msgb, uint16_t message_id) { - init_message(connection); + init_message(msgb); - if((*message_id = append_message_id(connection, 0)) == 0) - return fail_message(connection); + if(!append_message_id(msgb, message_id)) + return fail_message(msgb); - return &connection->message; + return &msgb->message; } -mqtt_message_t* mqtt_msg_subscribe_topic(mqtt_connection_t* connection, const char* topic, int qos) +mqtt_message_t* mqtt_msg_subscribe_topic(mqtt_message_buffer_t* msgb, const char* topic, int qos) { if(topic == NULL || topic[0] == '\0') - return fail_message(connection); + return fail_message(msgb); - if(append_string(connection, topic, strlen(topic)) < 0) - return fail_message(connection); + if(append_string(msgb, topic, strlen(topic)) < 0) + return fail_message(msgb); - if(connection->message.length + 1 > connection->buffer_length) - return fail_message(connection); - connection->buffer[connection->message.length++] = qos; + if(msgb->message.length + 1 > msgb->buffer_length) + return fail_message(msgb); + msgb->buffer[msgb->message.length++] = qos; - return &connection->message; + return &msgb->message; } -mqtt_message_t* mqtt_msg_subscribe_fini(mqtt_connection_t* connection) +mqtt_message_t* mqtt_msg_subscribe_fini(mqtt_message_buffer_t* msgb) { - return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); + return fini_message(msgb, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); } -mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id) +mqtt_message_t* mqtt_msg_subscribe(mqtt_message_buffer_t* msgb, const char* topic, int qos, uint16_t message_id) { mqtt_message_t* result; - result = mqtt_msg_subscribe_init(connection, message_id); + result = mqtt_msg_subscribe_init(msgb, message_id); if (result->length != 0) { - result = mqtt_msg_subscribe_topic(connection, topic, qos); + result = mqtt_msg_subscribe_topic(msgb, topic, qos); } if (result->length != 0) { - result = mqtt_msg_subscribe_fini(connection); + result = mqtt_msg_subscribe_fini(msgb); } return result; } -mqtt_message_t* mqtt_msg_unsubscribe_init(mqtt_connection_t* connection, uint16_t *message_id) +mqtt_message_t* mqtt_msg_unsubscribe_init(mqtt_message_buffer_t* msgb, uint16_t message_id) { - return mqtt_msg_subscribe_init(connection, message_id); + return mqtt_msg_subscribe_init(msgb, message_id); } -mqtt_message_t* mqtt_msg_unsubscribe_topic(mqtt_connection_t* connection, const char* topic) +mqtt_message_t* mqtt_msg_unsubscribe_topic(mqtt_message_buffer_t* msgb, const char* topic) { if(topic == NULL || topic[0] == '\0') - return fail_message(connection); + return fail_message(msgb); - if(append_string(connection, topic, strlen(topic)) < 0) - return fail_message(connection); + if(append_string(msgb, topic, strlen(topic)) < 0) + return fail_message(msgb); - return &connection->message; + return &msgb->message; } -mqtt_message_t* mqtt_msg_unsubscribe_fini(mqtt_connection_t* connection) +mqtt_message_t* mqtt_msg_unsubscribe_fini(mqtt_message_buffer_t* msgb) { - return fini_message(connection, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0); + return fini_message(msgb, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0); } -mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id) +mqtt_message_t* mqtt_msg_unsubscribe(mqtt_message_buffer_t* msgb, const char* topic, uint16_t message_id) { mqtt_message_t* result; - result = mqtt_msg_unsubscribe_init(connection, message_id); + result = mqtt_msg_unsubscribe_init(msgb, message_id); if (result->length != 0) { - result = mqtt_msg_unsubscribe_topic(connection, topic); + result = mqtt_msg_unsubscribe_topic(msgb, topic); } if (result->length != 0) { - result = mqtt_msg_unsubscribe_fini(connection); + result = mqtt_msg_unsubscribe_fini(msgb); } return result; } -mqtt_message_t* mqtt_msg_pingreq(mqtt_connection_t* connection) +mqtt_message_t* mqtt_msg_pingreq(mqtt_message_buffer_t* msgb) { - init_message(connection); - return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0); + init_message(msgb); + return fini_message(msgb, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0); } -mqtt_message_t* mqtt_msg_pingresp(mqtt_connection_t* connection) +mqtt_message_t* mqtt_msg_pingresp(mqtt_message_buffer_t* msgb) { - init_message(connection); - return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0); + init_message(msgb); + return fini_message(msgb, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0); } -mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection) +mqtt_message_t* mqtt_msg_disconnect(mqtt_message_buffer_t* msgb) { - init_message(connection); - return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0); + init_message(msgb); + return fini_message(msgb, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0); } diff --git a/app/mqtt/mqtt_msg.h b/app/mqtt/mqtt_msg.h index 85f70f6d..bdaeb3e6 100644 --- a/app/mqtt/mqtt_msg.h +++ b/app/mqtt/mqtt_msg.h @@ -87,7 +87,7 @@ typedef struct mqtt_message } mqtt_message_t; -typedef struct mqtt_connection +typedef struct mqtt_message_buffer { mqtt_message_t message; @@ -95,7 +95,7 @@ typedef struct mqtt_connection uint8_t* buffer; uint16_t buffer_length; -} mqtt_connection_t; +} mqtt_message_buffer_t; typedef struct mqtt_connect_info { @@ -119,31 +119,31 @@ static inline uint8_t mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) static inline uint8_t mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } static inline uint8_t mqtt_get_connect_ret_code(uint8_t* buffer) { return (buffer[3]); } -void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); +void mqtt_msg_init(mqtt_message_buffer_t* msgb, uint8_t* buffer, uint16_t buffer_length); int32_t mqtt_get_total_length(uint8_t* buffer, uint16_t buffer_length); const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* buffer_length); const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* buffer_length); uint16_t mqtt_get_id(uint8_t* buffer, uint16_t buffer_length); -mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info); -mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id); -mqtt_message_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id); -mqtt_message_t* mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id); -mqtt_message_t* mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id); -mqtt_message_t* mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id); -mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id); -mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id); -mqtt_message_t* mqtt_msg_pingreq(mqtt_connection_t* connection); -mqtt_message_t* mqtt_msg_pingresp(mqtt_connection_t* connection); -mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection); +mqtt_message_t* mqtt_msg_connect(mqtt_message_buffer_t* msgb, mqtt_connect_info_t* info); +mqtt_message_t* mqtt_msg_publish(mqtt_message_buffer_t* msgb, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t message_id); +mqtt_message_t* mqtt_msg_puback(mqtt_message_buffer_t* msgb, uint16_t message_id); +mqtt_message_t* mqtt_msg_pubrec(mqtt_message_buffer_t* msgb, uint16_t message_id); +mqtt_message_t* mqtt_msg_pubrel(mqtt_message_buffer_t* msgb, uint16_t message_id); +mqtt_message_t* mqtt_msg_pubcomp(mqtt_message_buffer_t* msgb, uint16_t message_id); +mqtt_message_t* mqtt_msg_subscribe(mqtt_message_buffer_t* msgb, const char* topic, int qos, uint16_t message_id); +mqtt_message_t* mqtt_msg_unsubscribe(mqtt_message_buffer_t* msgb, const char* topic, uint16_t message_id); +mqtt_message_t* mqtt_msg_pingreq(mqtt_message_buffer_t* msgb); +mqtt_message_t* mqtt_msg_pingresp(mqtt_message_buffer_t* msgb); +mqtt_message_t* mqtt_msg_disconnect(mqtt_message_buffer_t* msgb); -mqtt_message_t* mqtt_msg_subscribe_init(mqtt_connection_t* connection, uint16_t* message_id); -mqtt_message_t* mqtt_msg_subscribe_topic(mqtt_connection_t* connection, const char* topic, int qos); -mqtt_message_t* mqtt_msg_subscribe_fini(mqtt_connection_t* connection); +mqtt_message_t* mqtt_msg_subscribe_init(mqtt_message_buffer_t* msgb, uint16_t message_id); +mqtt_message_t* mqtt_msg_subscribe_topic(mqtt_message_buffer_t* msgb, const char* topic, int qos); +mqtt_message_t* mqtt_msg_subscribe_fini(mqtt_message_buffer_t* msgb); -mqtt_message_t* mqtt_msg_unsubscribe_init(mqtt_connection_t* connection, uint16_t* message_id); -mqtt_message_t* mqtt_msg_unsubscribe_topic(mqtt_connection_t* connection, const char* topic); -mqtt_message_t* mqtt_msg_unsubscribe_fini(mqtt_connection_t* connection); +mqtt_message_t* mqtt_msg_unsubscribe_init(mqtt_message_buffer_t* msgb, uint16_t message_id); +mqtt_message_t* mqtt_msg_unsubscribe_topic(mqtt_message_buffer_t* msgb, const char* topic); +mqtt_message_t* mqtt_msg_unsubscribe_fini(mqtt_message_buffer_t* msgb); #ifdef __cplusplus diff --git a/docs/modules/mqtt.md b/docs/modules/mqtt.md index 0b9261df..fd1e36bc 100644 --- a/docs/modules/mqtt.md +++ b/docs/modules/mqtt.md @@ -65,6 +65,7 @@ m = mqtt.Client("clientid", 120, "user", "password") m:lwt("/lwt", "offline", 0, 0) m:on("connect", function(client) print ("connected") end) +m:on("connfail", function(client, reason) print ("connection failed", reason) end) m:on("offline", function(client) print ("offline") end) -- on publish message receive event @@ -157,8 +158,16 @@ end In reality, the connected function should do something useful! -The first callback to `:connect()` aliases with the "connect" callback available through `:on()` (the last passed callback to either of those are used). -The second (failure) callback is however not the same as the "offline" `:on()` callback. The "offline" callback is only called after an already established connection becomes closed. If the `connect()` call fails to establish a connection, the callback passed to `:connect()` is called and nothing else. +The first callback to `:connect()` aliases with the "connect" callback +available through `:on()` (the last passed callback to either of those are +used). However, if `nil` is passed to `:connect()`, any existing callback +will be preserved, rather than removed. + +The second (failure) callback aliases with the "connfail" callback available +through `:on()`. (The "offline" callback is only called after an already +established connection becomes closed. If the `connect()` call fails to +establish a connection, the callback passed to `:connect()` is called and +nothing else.) Previously, we instructed an application to pass either the *integer* 0 or *integer* 1 for `secure`. Now, this will trigger a deprecation warning; please @@ -213,8 +222,23 @@ Registers a callback function for an event. `mqtt:on(event, function(client[, topic[, message]]))` #### Parameters -- `event` can be "connect", "suback", "unsuback", "puback", "message", "overflow", or "offline" -- `function(client[, topic[, message]])` callback function. The first parameter is the client. If event is "message", the 2nd and 3rd param are received topic and message (strings). +- `event` can be "connect", "connfail", "suback", "unsuback", "puback", "message", "overflow", or "offline" +- callback function. The first parameter is always the client object itself. + Any remaining parameters passed differ by event: + + - If event is "message", the 2nd and 3rd parameters are received topic and + message, respectively, as Lua strings. + + - If the event is "overflow", the parameters are as with "message", save + that the message string is truncated to the maximum message size. + + - If the event is "connfail", the 2nd parameter will be the connection + failure code; see above. + + - Other event types do not provide additional arguments. This has some + unfortunate consequences: the broker-provided subscription maximum QoS + information is lost, and the application must, if it expects per-event + acknowledgements, manage a queue or queues itself. #### Returns `nil` @@ -231,7 +255,8 @@ Publishes a message. - `message` the message to publish, (buffer or string) - `qos` QoS level - `retain` retain flag -- `function(client)` optional callback fired when PUBACK received. +- `function(client)` optional callback fired when PUBACK received (for QoS 1 + or 2) or when message sent (for QoS 0). #### Notes