WIP: MQTT fixes (#2986)

* mqtt: expose "connfail" callback via :on()

This makes it just like all the other callbacks in the module and is a
revision of behavior called out in
https://github.com/nodemcu/nodemcu-firmware/pull/2967

* mqtt: clarify when puback callback fires

* mqtt: Don't reference stack buffers from the heap

The confusingly-named "mqtt_connection_t" object is just a triple of
  - a serialized mqtt message pointer and length
  - a buffer pointer (to which the above can be written)
  - a message identifier

The last of these must be passed around the mqtt state machine, but the
first two are very local and the buffer is always sourced from the C
stack.  Unfortunately, because the entire structure is persisted in the
heap, some callers assume that they can always use the structure without
reinitialization (see mqtt_socket_close), which will trash the C stack.

Sever the pairing between message id and local state, punt the local
state entirely out of the heap, and rename things to be less confusing.
This commit is contained in:
Nathaniel Wesley Filardo 2020-03-14 22:51:03 +00:00 committed by Marcel Stör
parent 1719f90a3b
commit 30f706fb03
4 changed files with 236 additions and 188 deletions

View File

@ -61,8 +61,8 @@ typedef struct mqtt_state_t
{
uint16_t port;
mqtt_connect_info_t* connect_info;
mqtt_connection_t mqtt_connection;
msg_queue_t* pending_msg_q;
uint16_t next_message_id;
uint8_t * recv_buffer; // heap buffer for multi-packet rx
uint8_t * recv_buffer_wp; // write pointer in multi-packet rx
@ -108,6 +108,15 @@ static void mqtt_socket_reconnected(void *arg, sint8_t err);
static void mqtt_socket_connected(void *arg);
static void mqtt_connack_fail(lmqtt_userdata * mud, int reason_code);
static uint16_t mqtt_next_message_id(lmqtt_userdata * mud)
{
mud->mqtt_state.next_message_id++;
if (mud->mqtt_state.next_message_id == 0)
mud->mqtt_state.next_message_id++;
return mud->mqtt_state.next_message_id;
}
static void mqtt_socket_disconnected(void *arg) // tcp only
{
NODE_DBG("enter mqtt_socket_disconnected.\n");
@ -399,7 +408,8 @@ READPACKET:
// temp buffer for control messages
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_buffer_t msgb;
mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = NULL;
lua_State *L = lua_getstate();
@ -450,8 +460,6 @@ READPACKET:
mud->connState = MQTT_DATA;
NODE_DBG("MQTT: Connected\r\n");
mud->keepalive_sent = 0;
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref);
mud->cb_connect_fail_ref = LUA_NOREF;
if(mud->cb_connect_ref == LUA_NOREF)
break;
if(mud->self_ref == LUA_NOREF)
@ -492,12 +500,12 @@ READPACKET:
// buffering and special code to handle this corner-case. Server will most likely have
// written all to OS socket anyway, and not be aware that we "should" not have received it all yet.
if(msg_qos == 1){
temp_msg = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id);
temp_msg = mqtt_msg_puback(&msgb, msg_id);
msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBACK, (int)mqtt_get_qos(temp_msg->data) );
}
else if(msg_qos == 2){
temp_msg = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id);
temp_msg = mqtt_msg_pubrec(&msgb, msg_id);
msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBREC, (int)mqtt_get_qos(temp_msg->data) );
}
@ -596,12 +604,12 @@ READPACKET:
break;
case MQTT_MSG_TYPE_PUBLISH:
if(msg_qos == 1){
temp_msg = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id);
temp_msg = mqtt_msg_puback(&msgb, msg_id);
msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBACK, (int)mqtt_get_qos(temp_msg->data) );
}
else if(msg_qos == 2){
temp_msg = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id);
temp_msg = mqtt_msg_pubrec(&msgb, msg_id);
msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBREC, (int)mqtt_get_qos(temp_msg->data) );
}
@ -629,7 +637,7 @@ READPACKET:
NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n");
// Note: actually, should not destroy the msg until PUBCOMP is received.
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
temp_msg = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id);
temp_msg = mqtt_msg_pubrel(&msgb, msg_id);
msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBREL, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("MQTT: Response PUBREL\r\n");
@ -638,7 +646,7 @@ READPACKET:
case MQTT_MSG_TYPE_PUBREL:
if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg->msg_id == msg_id){
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
temp_msg = mqtt_msg_pubcomp(&mud->mqtt_state.mqtt_connection, msg_id);
temp_msg = mqtt_msg_pubcomp(&msgb, msg_id);
msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBCOMP, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("MQTT: Response PUBCOMP\r\n");
@ -658,7 +666,7 @@ READPACKET:
}
break;
case MQTT_MSG_TYPE_PINGREQ:
temp_msg = mqtt_msg_pingresp(&mud->mqtt_state.mqtt_connection);
temp_msg = mqtt_msg_pingresp(&msgb);
msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PINGRESP, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("MQTT: Response PINGRESP\r\n");
@ -770,10 +778,12 @@ static void mqtt_socket_connected(void *arg)
espconn_regist_disconcb(pesp_conn, mqtt_socket_disconnected);
uint8_t temp_buffer[MQTT_BUF_SIZE];
// call mqtt_connect() to start a mqtt connect stage.
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t* temp_msg = mqtt_msg_connect(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.connect_info);
mqtt_message_buffer_t msgb;
mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t* temp_msg = mqtt_msg_connect(&msgb, mud->mqtt_state.connect_info);
NODE_DBG("Send MQTT connection infomation, data len: %d, d[0]=%d \r\n", temp_msg->length, temp_msg->data[0]);
mud->event_timeout = MQTT_SEND_TIMEOUT;
// not queue this message. should send right now. or should enqueue this before head.
#ifdef CLIENT_SSL_ENABLE
@ -879,9 +889,11 @@ void mqtt_socket_timer(void *arg)
mqtt_socket_reconnected(mud->pesp_conn, 0);
} else {
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_buffer_t msgb;
mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE);
NODE_DBG("\r\nMQTT: Send keepalive packet\r\n");
mqtt_message_t* temp_msg = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection);
mqtt_message_t* temp_msg = mqtt_msg_pingreq(&msgb);
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) );
mud->keepalive_sent = 1;
@ -1380,8 +1392,12 @@ static int mqtt_socket_close( lua_State* L )
sint8 espconn_status = ESPCONN_CONN;
if (mud->connected) {
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_message_buffer_t msgb;
mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE);
// Send disconnect message
mqtt_message_t* temp_msg = mqtt_msg_disconnect(&mud->mqtt_state.mqtt_connection);
mqtt_message_t* temp_msg = mqtt_msg_disconnect(&msgb);
NODE_DBG("Send MQTT disconnect infomation, data len: %d, d[0]=%d \r\n", temp_msg->length, temp_msg->data[0]);
#ifdef CLIENT_SSL_ENABLE
@ -1437,6 +1453,9 @@ static int mqtt_socket_on( lua_State* L )
if( sl == 7 && strcmp(method, "connect") == 0){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref);
mud->cb_connect_ref = luaL_ref(L, LUA_REGISTRYINDEX);
}else if( sl == 7 && strcmp(method, "connfail") == 0){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref);
mud->cb_connect_ref = luaL_ref(L, LUA_REGISTRYINDEX);
}else if( sl == 7 && strcmp(method, "offline") == 0){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref);
mud->cb_disconnect_ref = luaL_ref(L, LUA_REGISTRYINDEX);
@ -1468,7 +1487,7 @@ static int mqtt_socket_unsubscribe( lua_State* L ) {
NODE_DBG("enter mqtt_socket_unsubscribe.\n");
uint8_t stack = 1;
uint16_t msg_id = 0;
uint16_t msg_id;
const char *topic;
size_t il;
lmqtt_userdata *mud;
@ -1496,7 +1515,8 @@ static int mqtt_socket_unsubscribe( lua_State* L ) {
}
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_buffer_t msgb;
mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = NULL;
if( lua_istable( L, stack ) ) {
@ -1510,9 +1530,10 @@ static int mqtt_socket_unsubscribe( lua_State* L ) {
topic = luaL_checkstring( L, -2 );
if (topic_count == 0) {
temp_msg = mqtt_msg_unsubscribe_init( &mud->mqtt_state.mqtt_connection, &msg_id );
msg_id = mqtt_next_message_id(mud);
temp_msg = mqtt_msg_unsubscribe_init( &msgb, msg_id );
}
temp_msg = mqtt_msg_unsubscribe_topic( &mud->mqtt_state.mqtt_connection, topic );
temp_msg = mqtt_msg_unsubscribe_topic( &msgb, topic );
topic_count++;
NODE_DBG("topic: %s - length: %d\n", topic, temp_msg->length);
@ -1533,7 +1554,7 @@ static int mqtt_socket_unsubscribe( lua_State* L ) {
return luaL_error( L, "buffer overflow, can't enqueue all unsubscriptions" );
}
temp_msg = mqtt_msg_unsubscribe_fini( &mud->mqtt_state.mqtt_connection );
temp_msg = mqtt_msg_unsubscribe_fini( &msgb );
if (temp_msg->length == 0) {
return luaL_error( L, "buffer overflow, can't enqueue all unsubscriptions" );
}
@ -1546,7 +1567,8 @@ static int mqtt_socket_unsubscribe( lua_State* L ) {
if( topic == NULL ){
return luaL_error( L, "need topic name" );
}
temp_msg = mqtt_msg_unsubscribe( &mud->mqtt_state.mqtt_connection, topic, &msg_id );
msg_id = mqtt_next_message_id(mud);
temp_msg = mqtt_msg_unsubscribe( &msgb, topic, msg_id );
}
if( lua_type( L, stack ) == LUA_TFUNCTION || lua_type( L, stack ) == LUA_TLIGHTFUNCTION ) { // TODO: this will overwrite the previous one.
@ -1580,7 +1602,7 @@ static int mqtt_socket_subscribe( lua_State* L ) {
NODE_DBG("enter mqtt_socket_subscribe.\n");
uint8_t stack = 1, qos = 0;
uint16_t msg_id = 0;
uint16_t msg_id;
const char *topic;
size_t il;
lmqtt_userdata *mud;
@ -1608,7 +1630,8 @@ static int mqtt_socket_subscribe( lua_State* L ) {
}
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_buffer_t msgb;
mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = NULL;
if( lua_istable( L, stack ) ) {
@ -1623,9 +1646,10 @@ static int mqtt_socket_subscribe( lua_State* L ) {
qos = luaL_checkinteger( L, -1 );
if (topic_count == 0) {
temp_msg = mqtt_msg_subscribe_init( &mud->mqtt_state.mqtt_connection, &msg_id );
msg_id = mqtt_next_message_id(mud);
temp_msg = mqtt_msg_subscribe_init( &msgb, msg_id );
}
temp_msg = mqtt_msg_subscribe_topic( &mud->mqtt_state.mqtt_connection, topic, qos );
temp_msg = mqtt_msg_subscribe_topic( &msgb, topic, qos );
topic_count++;
NODE_DBG("topic: %s - qos: %d, length: %d\n", topic, qos, temp_msg->length);
@ -1646,7 +1670,7 @@ static int mqtt_socket_subscribe( lua_State* L ) {
return luaL_error( L, "buffer overflow, can't enqueue all subscriptions" );
}
temp_msg = mqtt_msg_subscribe_fini( &mud->mqtt_state.mqtt_connection );
temp_msg = mqtt_msg_subscribe_fini( &msgb );
if (temp_msg->length == 0) {
return luaL_error( L, "buffer overflow, can't enqueue all subscriptions" );
}
@ -1660,7 +1684,8 @@ static int mqtt_socket_subscribe( lua_State* L ) {
return luaL_error( L, "need topic name" );
}
qos = luaL_checkinteger( L, stack );
temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id );
msg_id = mqtt_next_message_id(mud);
temp_msg = mqtt_msg_subscribe( &msgb, topic, qos, msg_id );
stack++;
}
@ -1732,12 +1757,17 @@ static int mqtt_socket_publish( lua_State* L )
uint8_t retain = luaL_checkinteger( L, stack);
stack ++;
if (qos != 0) {
msg_id = mqtt_next_message_id(mud);
}
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = mqtt_msg_publish(&mud->mqtt_state.mqtt_connection,
mqtt_message_buffer_t msgb;
mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = mqtt_msg_publish(&msgb,
topic, payload, l,
qos, retain,
&msg_id);
msg_id);
if (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION){
lua_pushvalue(L, stack); // copy argument (func) to the top of stack

View File

@ -54,76 +54,71 @@ struct __attribute((__packed__)) mqtt_connect_variable_header
uint8_t keepaliveLsb;
};
static int append_string(mqtt_connection_t* connection, const char* string, int len)
static int append_string(mqtt_message_buffer_t *msgb, const char* string, int len)
{
if(connection->message.length + len + 2 > connection->buffer_length)
if(msgb->message.length + len + 2 > msgb->buffer_length)
return -1;
connection->buffer[connection->message.length++] = len >> 8;
connection->buffer[connection->message.length++] = len & 0xff;
memcpy(connection->buffer + connection->message.length, string, len);
connection->message.length += len;
msgb->buffer[msgb->message.length++] = len >> 8;
msgb->buffer[msgb->message.length++] = len & 0xff;
memcpy(msgb->buffer + msgb->message.length, string, len);
msgb->message.length += len;
return len + 2;
}
static uint16_t append_message_id(mqtt_connection_t* connection, uint16_t message_id)
static uint16_t append_message_id(mqtt_message_buffer_t* msgb, uint16_t message_id)
{
// If message_id is zero then we should assign one, otherwise
// we'll use the one supplied by the caller
while(message_id == 0)
message_id = ++connection->message_id;
if(connection->message.length + 2 > connection->buffer_length)
if(msgb->message.length + 2 > msgb->buffer_length)
return 0;
connection->buffer[connection->message.length++] = message_id >> 8;
connection->buffer[connection->message.length++] = message_id & 0xff;
msgb->buffer[msgb->message.length++] = message_id >> 8;
msgb->buffer[msgb->message.length++] = message_id & 0xff;
return message_id;
return 1;
}
static int init_message(mqtt_connection_t* connection)
static int init_message(mqtt_message_buffer_t* msgb)
{
connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE;
msgb->message.length = MQTT_MAX_FIXED_HEADER_SIZE;
return MQTT_MAX_FIXED_HEADER_SIZE;
}
static mqtt_message_t* fail_message(mqtt_connection_t* connection)
static mqtt_message_t* fail_message(mqtt_message_buffer_t* msgb)
{
connection->message.data = connection->buffer;
connection->message.length = 0;
return &connection->message;
msgb->message.data = msgb->buffer;
msgb->message.length = 0;
return &msgb->message;
}
static mqtt_message_t* fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain)
static mqtt_message_t* fini_message(mqtt_message_buffer_t* msgb, int type, int dup, int qos, int retain)
{
int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
int remaining_length = msgb->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
if(remaining_length > 127)
{
connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
connection->buffer[1] = 0x80 | (remaining_length % 128);
connection->buffer[2] = remaining_length / 128;
connection->message.length = remaining_length + 3;
connection->message.data = connection->buffer;
msgb->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
msgb->buffer[1] = 0x80 | (remaining_length % 128);
msgb->buffer[2] = remaining_length / 128;
msgb->message.length = remaining_length + 3;
msgb->message.data = msgb->buffer;
}
else
{
connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
connection->buffer[2] = remaining_length;
connection->message.length = remaining_length + 2;
connection->message.data = connection->buffer + 1;
msgb->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
msgb->buffer[2] = remaining_length;
msgb->message.length = remaining_length + 2;
msgb->message.data = msgb->buffer + 1;
}
return &connection->message;
return &msgb->message;
}
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length)
void mqtt_msg_init(mqtt_message_buffer_t* msgb, uint8_t* buffer, uint16_t buffer_length)
{
memset(connection, 0, sizeof(connection));
connection->buffer = buffer;
connection->buffer_length = buffer_length;
memset(msgb, 0, sizeof(msgb));
msgb->buffer = buffer;
msgb->buffer_length = buffer_length;
}
// Returns total length of message, or -1 if not enough bytes are available
@ -286,16 +281,16 @@ uint16_t mqtt_get_id(uint8_t* buffer, uint16_t buffer_length)
}
}
mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info)
mqtt_message_t* mqtt_msg_connect(mqtt_message_buffer_t* msgb, mqtt_connect_info_t* info)
{
struct mqtt_connect_variable_header* variable_header;
init_message(connection);
init_message(msgb);
if(connection->message.length + sizeof(*variable_header) > connection->buffer_length)
return fail_message(connection);
variable_header = (void*)(connection->buffer + connection->message.length);
connection->message.length += sizeof(*variable_header);
if(msgb->message.length + sizeof(*variable_header) > msgb->buffer_length)
return fail_message(msgb);
variable_header = (void*)(msgb->buffer + msgb->message.length);
msgb->message.length += sizeof(*variable_header);
variable_header->lengthMsb = 0;
variable_header->lengthLsb = 4;
@ -310,19 +305,19 @@ mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_inf
if(info->client_id != NULL && info->client_id[0] != '\0')
{
if(append_string(connection, info->client_id, strlen(info->client_id)) < 0)
return fail_message(connection);
if(append_string(msgb, info->client_id, strlen(info->client_id)) < 0)
return fail_message(msgb);
}
else
return fail_message(connection);
return fail_message(msgb);
if(info->will_topic != NULL && info->will_topic[0] != '\0')
{
if(append_string(connection, info->will_topic, strlen(info->will_topic)) < 0)
return fail_message(connection);
if(append_string(msgb, info->will_topic, strlen(info->will_topic)) < 0)
return fail_message(msgb);
if(append_string(connection, info->will_message, strlen(info->will_message)) < 0)
return fail_message(connection);
if(append_string(msgb, info->will_message, strlen(info->will_message)) < 0)
return fail_message(msgb);
variable_header->flags |= MQTT_CONNECT_FLAG_WILL;
if(info->will_retain)
@ -332,176 +327,174 @@ mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_inf
if(info->username != NULL && info->username[0] != '\0')
{
if(append_string(connection, info->username, strlen(info->username)) < 0)
return fail_message(connection);
if(append_string(msgb, info->username, strlen(info->username)) < 0)
return fail_message(msgb);
variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME;
}
if(info->password != NULL && info->password[0] != '\0')
{
if(append_string(connection, info->password, strlen(info->password)) < 0)
return fail_message(connection);
if(append_string(msgb, info->password, strlen(info->password)) < 0)
return fail_message(msgb);
variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD;
}
return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);
return fini_message(msgb, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);
}
mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id)
mqtt_message_t* mqtt_msg_publish(mqtt_message_buffer_t* msgb, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t message_id)
{
init_message(connection);
init_message(msgb);
if(topic == NULL || topic[0] == '\0')
return fail_message(connection);
return fail_message(msgb);
if(append_string(connection, topic, strlen(topic)) < 0)
return fail_message(connection);
if(append_string(msgb, topic, strlen(topic)) < 0)
return fail_message(msgb);
if(qos > 0)
{
if((*message_id = append_message_id(connection, 0)) == 0)
return fail_message(connection);
if(!append_message_id(msgb, message_id))
return fail_message(msgb);
}
else
*message_id = 0;
if(connection->message.length + data_length > connection->buffer_length)
return fail_message(connection);
memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
if(msgb->message.length + data_length > msgb->buffer_length)
return fail_message(msgb);
memcpy(msgb->buffer + msgb->message.length, data, data_length);
msgb->message.length += data_length;
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
return fini_message(msgb, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}
mqtt_message_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id)
mqtt_message_t* mqtt_msg_puback(mqtt_message_buffer_t* msgb, uint16_t message_id)
{
init_message(connection);
if(append_message_id(connection, message_id) == 0)
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
init_message(msgb);
if(!append_message_id(msgb, message_id))
return fail_message(msgb);
return fini_message(msgb, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
}
mqtt_message_t* mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id)
mqtt_message_t* mqtt_msg_pubrec(mqtt_message_buffer_t* msgb, uint16_t message_id)
{
init_message(connection);
if(append_message_id(connection, message_id) == 0)
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
init_message(msgb);
if(!append_message_id(msgb, message_id))
return fail_message(msgb);
return fini_message(msgb, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
}
mqtt_message_t* mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id)
mqtt_message_t* mqtt_msg_pubrel(mqtt_message_buffer_t* msgb, uint16_t message_id)
{
init_message(connection);
if(append_message_id(connection, message_id) == 0)
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
init_message(msgb);
if(!append_message_id(msgb, message_id))
return fail_message(msgb);
return fini_message(msgb, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
}
mqtt_message_t* mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id)
mqtt_message_t* mqtt_msg_pubcomp(mqtt_message_buffer_t* msgb, uint16_t message_id)
{
init_message(connection);
if(append_message_id(connection, message_id) == 0)
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
init_message(msgb);
if(!append_message_id(msgb, message_id))
return fail_message(msgb);
return fini_message(msgb, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
}
mqtt_message_t* mqtt_msg_subscribe_init(mqtt_connection_t* connection, uint16_t *message_id)
mqtt_message_t* mqtt_msg_subscribe_init(mqtt_message_buffer_t* msgb, uint16_t message_id)
{
init_message(connection);
init_message(msgb);
if((*message_id = append_message_id(connection, 0)) == 0)
return fail_message(connection);
if(!append_message_id(msgb, message_id))
return fail_message(msgb);
return &connection->message;
return &msgb->message;
}
mqtt_message_t* mqtt_msg_subscribe_topic(mqtt_connection_t* connection, const char* topic, int qos)
mqtt_message_t* mqtt_msg_subscribe_topic(mqtt_message_buffer_t* msgb, const char* topic, int qos)
{
if(topic == NULL || topic[0] == '\0')
return fail_message(connection);
return fail_message(msgb);
if(append_string(connection, topic, strlen(topic)) < 0)
return fail_message(connection);
if(append_string(msgb, topic, strlen(topic)) < 0)
return fail_message(msgb);
if(connection->message.length + 1 > connection->buffer_length)
return fail_message(connection);
connection->buffer[connection->message.length++] = qos;
if(msgb->message.length + 1 > msgb->buffer_length)
return fail_message(msgb);
msgb->buffer[msgb->message.length++] = qos;
return &connection->message;
return &msgb->message;
}
mqtt_message_t* mqtt_msg_subscribe_fini(mqtt_connection_t* connection)
mqtt_message_t* mqtt_msg_subscribe_fini(mqtt_message_buffer_t* msgb)
{
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
return fini_message(msgb, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}
mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id)
mqtt_message_t* mqtt_msg_subscribe(mqtt_message_buffer_t* msgb, const char* topic, int qos, uint16_t message_id)
{
mqtt_message_t* result;
result = mqtt_msg_subscribe_init(connection, message_id);
result = mqtt_msg_subscribe_init(msgb, message_id);
if (result->length != 0) {
result = mqtt_msg_subscribe_topic(connection, topic, qos);
result = mqtt_msg_subscribe_topic(msgb, topic, qos);
}
if (result->length != 0) {
result = mqtt_msg_subscribe_fini(connection);
result = mqtt_msg_subscribe_fini(msgb);
}
return result;
}
mqtt_message_t* mqtt_msg_unsubscribe_init(mqtt_connection_t* connection, uint16_t *message_id)
mqtt_message_t* mqtt_msg_unsubscribe_init(mqtt_message_buffer_t* msgb, uint16_t message_id)
{
return mqtt_msg_subscribe_init(connection, message_id);
return mqtt_msg_subscribe_init(msgb, message_id);
}
mqtt_message_t* mqtt_msg_unsubscribe_topic(mqtt_connection_t* connection, const char* topic)
mqtt_message_t* mqtt_msg_unsubscribe_topic(mqtt_message_buffer_t* msgb, const char* topic)
{
if(topic == NULL || topic[0] == '\0')
return fail_message(connection);
return fail_message(msgb);
if(append_string(connection, topic, strlen(topic)) < 0)
return fail_message(connection);
if(append_string(msgb, topic, strlen(topic)) < 0)
return fail_message(msgb);
return &connection->message;
return &msgb->message;
}
mqtt_message_t* mqtt_msg_unsubscribe_fini(mqtt_connection_t* connection)
mqtt_message_t* mqtt_msg_unsubscribe_fini(mqtt_message_buffer_t* msgb)
{
return fini_message(connection, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0);
return fini_message(msgb, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0);
}
mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id)
mqtt_message_t* mqtt_msg_unsubscribe(mqtt_message_buffer_t* msgb, const char* topic, uint16_t message_id)
{
mqtt_message_t* result;
result = mqtt_msg_unsubscribe_init(connection, message_id);
result = mqtt_msg_unsubscribe_init(msgb, message_id);
if (result->length != 0) {
result = mqtt_msg_unsubscribe_topic(connection, topic);
result = mqtt_msg_unsubscribe_topic(msgb, topic);
}
if (result->length != 0) {
result = mqtt_msg_unsubscribe_fini(connection);
result = mqtt_msg_unsubscribe_fini(msgb);
}
return result;
}
mqtt_message_t* mqtt_msg_pingreq(mqtt_connection_t* connection)
mqtt_message_t* mqtt_msg_pingreq(mqtt_message_buffer_t* msgb)
{
init_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
init_message(msgb);
return fini_message(msgb, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
}
mqtt_message_t* mqtt_msg_pingresp(mqtt_connection_t* connection)
mqtt_message_t* mqtt_msg_pingresp(mqtt_message_buffer_t* msgb)
{
init_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
init_message(msgb);
return fini_message(msgb, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
}
mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection)
mqtt_message_t* mqtt_msg_disconnect(mqtt_message_buffer_t* msgb)
{
init_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
init_message(msgb);
return fini_message(msgb, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}

View File

@ -87,7 +87,7 @@ typedef struct mqtt_message
} mqtt_message_t;
typedef struct mqtt_connection
typedef struct mqtt_message_buffer
{
mqtt_message_t message;
@ -95,7 +95,7 @@ typedef struct mqtt_connection
uint8_t* buffer;
uint16_t buffer_length;
} mqtt_connection_t;
} mqtt_message_buffer_t;
typedef struct mqtt_connect_info
{
@ -119,31 +119,31 @@ static inline uint8_t mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06)
static inline uint8_t mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }
static inline uint8_t mqtt_get_connect_ret_code(uint8_t* buffer) { return (buffer[3]); }
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
void mqtt_msg_init(mqtt_message_buffer_t* msgb, uint8_t* buffer, uint16_t buffer_length);
int32_t mqtt_get_total_length(uint8_t* buffer, uint16_t buffer_length);
const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* buffer_length);
const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* buffer_length);
uint16_t mqtt_get_id(uint8_t* buffer, uint16_t buffer_length);
mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info);
mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id);
mqtt_message_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id);
mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id);
mqtt_message_t* mqtt_msg_pingreq(mqtt_connection_t* connection);
mqtt_message_t* mqtt_msg_pingresp(mqtt_connection_t* connection);
mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection);
mqtt_message_t* mqtt_msg_connect(mqtt_message_buffer_t* msgb, mqtt_connect_info_t* info);
mqtt_message_t* mqtt_msg_publish(mqtt_message_buffer_t* msgb, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t message_id);
mqtt_message_t* mqtt_msg_puback(mqtt_message_buffer_t* msgb, uint16_t message_id);
mqtt_message_t* mqtt_msg_pubrec(mqtt_message_buffer_t* msgb, uint16_t message_id);
mqtt_message_t* mqtt_msg_pubrel(mqtt_message_buffer_t* msgb, uint16_t message_id);
mqtt_message_t* mqtt_msg_pubcomp(mqtt_message_buffer_t* msgb, uint16_t message_id);
mqtt_message_t* mqtt_msg_subscribe(mqtt_message_buffer_t* msgb, const char* topic, int qos, uint16_t message_id);
mqtt_message_t* mqtt_msg_unsubscribe(mqtt_message_buffer_t* msgb, const char* topic, uint16_t message_id);
mqtt_message_t* mqtt_msg_pingreq(mqtt_message_buffer_t* msgb);
mqtt_message_t* mqtt_msg_pingresp(mqtt_message_buffer_t* msgb);
mqtt_message_t* mqtt_msg_disconnect(mqtt_message_buffer_t* msgb);
mqtt_message_t* mqtt_msg_subscribe_init(mqtt_connection_t* connection, uint16_t* message_id);
mqtt_message_t* mqtt_msg_subscribe_topic(mqtt_connection_t* connection, const char* topic, int qos);
mqtt_message_t* mqtt_msg_subscribe_fini(mqtt_connection_t* connection);
mqtt_message_t* mqtt_msg_subscribe_init(mqtt_message_buffer_t* msgb, uint16_t message_id);
mqtt_message_t* mqtt_msg_subscribe_topic(mqtt_message_buffer_t* msgb, const char* topic, int qos);
mqtt_message_t* mqtt_msg_subscribe_fini(mqtt_message_buffer_t* msgb);
mqtt_message_t* mqtt_msg_unsubscribe_init(mqtt_connection_t* connection, uint16_t* message_id);
mqtt_message_t* mqtt_msg_unsubscribe_topic(mqtt_connection_t* connection, const char* topic);
mqtt_message_t* mqtt_msg_unsubscribe_fini(mqtt_connection_t* connection);
mqtt_message_t* mqtt_msg_unsubscribe_init(mqtt_message_buffer_t* msgb, uint16_t message_id);
mqtt_message_t* mqtt_msg_unsubscribe_topic(mqtt_message_buffer_t* msgb, const char* topic);
mqtt_message_t* mqtt_msg_unsubscribe_fini(mqtt_message_buffer_t* msgb);
#ifdef __cplusplus

View File

@ -65,6 +65,7 @@ m = mqtt.Client("clientid", 120, "user", "password")
m:lwt("/lwt", "offline", 0, 0)
m:on("connect", function(client) print ("connected") end)
m:on("connfail", function(client, reason) print ("connection failed", reason) end)
m:on("offline", function(client) print ("offline") end)
-- on publish message receive event
@ -157,8 +158,16 @@ end
In reality, the connected function should do something useful!
The first callback to `:connect()` aliases with the "connect" callback available through `:on()` (the last passed callback to either of those are used).
The second (failure) callback is however not the same as the "offline" `:on()` callback. The "offline" callback is only called after an already established connection becomes closed. If the `connect()` call fails to establish a connection, the callback passed to `:connect()` is called and nothing else.
The first callback to `:connect()` aliases with the "connect" callback
available through `:on()` (the last passed callback to either of those are
used). However, if `nil` is passed to `:connect()`, any existing callback
will be preserved, rather than removed.
The second (failure) callback aliases with the "connfail" callback available
through `:on()`. (The "offline" callback is only called after an already
established connection becomes closed. If the `connect()` call fails to
establish a connection, the callback passed to `:connect()` is called and
nothing else.)
Previously, we instructed an application to pass either the *integer* 0 or
*integer* 1 for `secure`. Now, this will trigger a deprecation warning; please
@ -213,8 +222,23 @@ Registers a callback function for an event.
`mqtt:on(event, function(client[, topic[, message]]))`
#### Parameters
- `event` can be "connect", "suback", "unsuback", "puback", "message", "overflow", or "offline"
- `function(client[, topic[, message]])` callback function. The first parameter is the client. If event is "message", the 2nd and 3rd param are received topic and message (strings).
- `event` can be "connect", "connfail", "suback", "unsuback", "puback", "message", "overflow", or "offline"
- callback function. The first parameter is always the client object itself.
Any remaining parameters passed differ by event:
- If event is "message", the 2nd and 3rd parameters are received topic and
message, respectively, as Lua strings.
- If the event is "overflow", the parameters are as with "message", save
that the message string is truncated to the maximum message size.
- If the event is "connfail", the 2nd parameter will be the connection
failure code; see above.
- Other event types do not provide additional arguments. This has some
unfortunate consequences: the broker-provided subscription maximum QoS
information is lost, and the application must, if it expects per-event
acknowledgements, manage a queue or queues itself.
#### Returns
`nil`
@ -231,7 +255,8 @@ Publishes a message.
- `message` the message to publish, (buffer or string)
- `qos` QoS level
- `retain` retain flag
- `function(client)` optional callback fired when PUBACK received.
- `function(client)` optional callback fired when PUBACK received (for QoS 1
or 2) or when message sent (for QoS 0).
#### Notes