Handle large/chunked/fragmented MQTT messages properly (#2571)

* MQTT: handle large/chunked/fragmented messages properly

If a message spans multiple TCP packets it must be buffered before
delivered to LUA. Prior code did not do this at all, so this "patch"
really adds proper handling of fragmented MQTT packets.
This could also occur if multiple small messages was sent in a
single TCP packet, and the last message did not completely fit in that
packet.

Introduces a new option to the mqtt.Client constructor:
max_publish_length which defaults to 1024

Introduces a new 'overflow' callback.

Fixes issue #2308 and proper fix for PR #2544.

* mqtt.md: clarified heap allocation

* mqtt: ensure ack is sent for overflowed publish

If QoS is used we should still acknowledge that we received it, or server might retransmit it later.
This commit is contained in:
Johan Ström 2018-11-30 22:12:46 +01:00 committed by Marcel Stör
parent b77033f920
commit 2d958750b5
4 changed files with 372 additions and 75 deletions

View File

@ -46,17 +46,32 @@ typedef struct mqtt_event_data_t
#define RECONNECT_POSSIBLE 1
#define RECONNECT_ON 2
typedef enum {
MQTT_RECV_NORMAL,
MQTT_RECV_BUFFERING_SHORT,
MQTT_RECV_BUFFERING,
MQTT_RECV_SKIPPING,
} tReceiveState;
typedef struct mqtt_state_t
{
uint16_t port;
uint8_t auto_reconnect; // 0 is not auto_reconnect. 1 is auto reconnect, but never connected. 2 is auto reconnect, but once connected
mqtt_connect_info_t* connect_info;
uint16_t message_length;
uint16_t message_length_read;
mqtt_connection_t mqtt_connection;
msg_queue_t* pending_msg_q;
uint8_t * recv_buffer; // heap buffer for multi-packet rx
uint8_t * recv_buffer_wp; // write pointer in multi-packet rx
union {
uint16_t recv_buffer_size; // size of recv_buffer
uint32_t recv_buffer_skip; // number of bytes left to skip, in skipping state
};
tReceiveState recv_buffer_state;
} mqtt_state_t;
typedef struct lmqtt_userdata
{
struct espconn *pesp_conn;
@ -65,6 +80,7 @@ typedef struct lmqtt_userdata
int cb_connect_fail_ref;
int cb_disconnect_ref;
int cb_message_ref;
int cb_overflow_ref;
int cb_suback_ref;
int cb_unsuback_ref;
int cb_puback_ref;
@ -81,6 +97,9 @@ typedef struct lmqtt_userdata
tConnState connState;
}lmqtt_userdata;
// How large MQTT messages to accept by default
#define DEFAULT_MAX_MESSAGE_LENGTH 1024
static sint8 socket_connect(struct espconn *pesp_conn);
static void mqtt_socket_reconnected(void *arg, sint8_t err);
static void mqtt_socket_connected(void *arg);
@ -110,6 +129,13 @@ static void mqtt_socket_disconnected(void *arg) // tcp only
}
}
if(mud->mqtt_state.recv_buffer) {
c_free(mud->mqtt_state.recv_buffer);
mud->mqtt_state.recv_buffer = NULL;
}
mud->mqtt_state.recv_buffer_size = 0;
mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL;
if(mud->mqtt_state.auto_reconnect == RECONNECT_ON) {
mud->pesp_conn->reverse = mud;
mud->pesp_conn->type = ESPCONN_TCP;
@ -178,9 +204,9 @@ static void mqtt_socket_reconnected(void *arg, sint8_t err)
NODE_DBG("leave mqtt_socket_reconnected.\n");
}
static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length)
static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, uint16_t length, uint8_t is_overflow)
{
NODE_DBG("enter deliver_publish.\n");
NODE_DBG("enter deliver_publish (len=%d, overflow=%d).\n", length, is_overflow);
if(mud == NULL)
return;
mqtt_event_data_t event_data;
@ -191,13 +217,15 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length)
event_data.data_length = length;
event_data.data = mqtt_get_publish_data(message, &event_data.data_length);
if(mud->cb_message_ref == LUA_NOREF)
int cb_ref = !is_overflow ? mud->cb_message_ref : mud->cb_overflow_ref;
if(cb_ref == LUA_NOREF)
return;
if(mud->self_ref == LUA_NOREF)
return;
lua_State *L = lua_getstate();
if(event_data.topic && (event_data.topic_length > 0)){
lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_message_ref);
lua_rawgeti(L, LUA_REGISTRYINDEX, cb_ref);
lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_pushlstring(L, event_data.topic, event_data.topic_length);
} else {
@ -265,14 +293,15 @@ static sint8 mqtt_send_if_possible(struct espconn *pesp_conn)
static void mqtt_socket_received(void *arg, char *pdata, unsigned short len)
{
NODE_DBG("enter mqtt_socket_received.\n");
NODE_DBG("enter mqtt_socket_received (rxlen=%u).\n", len);
uint8_t msg_type;
uint8_t msg_qos;
uint16_t msg_id;
int length = (int)len;
// uint8_t in_buffer[MQTT_BUF_SIZE];
uint8_t *in_buffer = (uint8_t *)pdata;
uint16_t in_buffer_length = len;
uint8_t *continuation_buffer = NULL;
uint8_t *temp_pdata = NULL;
struct espconn *pesp_conn = arg;
if(pesp_conn == NULL)
@ -281,11 +310,109 @@ static void mqtt_socket_received(void *arg, char *pdata, unsigned short len)
if(mud == NULL)
return;
READPACKET:
if(length > MQTT_BUF_SIZE || length <= 0)
return;
switch(mud->mqtt_state.recv_buffer_state) {
case MQTT_RECV_NORMAL:
// No previous buffer.
break;
case MQTT_RECV_BUFFERING_SHORT:
// Last buffer had so few byte that we could not determine message length.
// Store in a local heap buffer and operate on this, as if was the regular pdata buffer.
// Avoids having to repeat message size/overflow logic.
temp_pdata = c_zalloc(mud->mqtt_state.recv_buffer_size + len);
if(temp_pdata == NULL) {
NODE_DBG("MQTT[buffering-short]: Failed to allocate %u bytes, disconnecting...\n", mud->mqtt_state.recv_buffer_size + len);
#ifdef CLIENT_SSL_ENABLE
if (mud->secure) {
espconn_secure_disconnect(pesp_conn);
} else
#endif
{
espconn_disconnect(pesp_conn);
}
return;
}
// c_memcpy(in_buffer, pdata, length);
NODE_DBG("MQTT[buffering-short]: Continuing with %u + %u bytes\n", mud->mqtt_state.recv_buffer_size, len);
memcpy(temp_pdata, mud->mqtt_state.recv_buffer, mud->mqtt_state.recv_buffer_size);
memcpy(temp_pdata + mud->mqtt_state.recv_buffer_size, pdata, len);
c_free(mud->mqtt_state.recv_buffer);
mud->mqtt_state.recv_buffer = NULL;
mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL;
in_buffer = temp_pdata;
in_buffer_length = mud->mqtt_state.recv_buffer_size + len;
break;
case MQTT_RECV_BUFFERING: {
// safe cast: we never allow longer buffer.
uint16_t current_length = (uint16_t) (mud->mqtt_state.recv_buffer_wp - mud->mqtt_state.recv_buffer);
NODE_DBG("MQTT[buffering]: appending %u bytes to previous recv buffer (%u out of wanted %u)\n",
in_buffer_length,
current_length,
mud->mqtt_state.recv_buffer_size);
// Copy from rx buffer to heap buffer. Smallest of [remainder of pending message] and [all of buffer]
uint16_t copy_length = LWIP_MIN(mud->mqtt_state.recv_buffer_size - current_length, in_buffer_length);
memcpy(mud->mqtt_state.recv_buffer_wp, pdata, copy_length);
mud->mqtt_state.recv_buffer_wp += copy_length;
in_buffer_length = (uint16_t) (mud->mqtt_state.recv_buffer_wp - mud->mqtt_state.recv_buffer);
if (in_buffer_length < mud->mqtt_state.recv_buffer_size) {
NODE_DBG("MQTT[buffering]: need %u more bytes, waiting for next rx.\n",
mud->mqtt_state.recv_buffer_size - in_buffer_length
);
goto RX_PACKET_FINISHED;
}
NODE_DBG("MQTT[buffering]: Full message received (%u). remainding bytes=%u\n",
mud->mqtt_state.recv_buffer_size,
len - copy_length);
// Point continuation_buffer to any additional data in pdata.
// May become 0 bytes, but used to trigger free!
continuation_buffer = pdata + copy_length;
len -= copy_length; // borrow len instead of having another variable..
in_buffer = mud->mqtt_state.recv_buffer;
// in_buffer_length was set above
mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL;
break;
}
case MQTT_RECV_SKIPPING:
// Last rx had a message which was too large to process, skip it.
if(mud->mqtt_state.recv_buffer_skip > in_buffer_length) {
NODE_DBG("MQTT[skipping]: skip=%u. Skipping full RX buffer (%u).\n",
mud->mqtt_state.recv_buffer_skip,
in_buffer_length
);
mud->mqtt_state.recv_buffer_skip -= in_buffer_length;
goto RX_PACKET_FINISHED;
}
NODE_DBG("MQTT[skipping]: skip=%u. Skipping partial RX buffer, continuing at %u\n",
mud->mqtt_state.recv_buffer_skip,
in_buffer_length
);
in_buffer += mud->mqtt_state.recv_buffer_skip;
in_buffer_length -= mud->mqtt_state.recv_buffer_skip;
mud->mqtt_state.recv_buffer_skip = 0;
mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL;
break;
}
READPACKET:
if(in_buffer_length <= 0)
goto RX_PACKET_FINISHED;
// MQTT publish message can in theory be 256Mb, while we do not support it we need to be
// able to do math on it.
int32_t message_length;
// temp buffer for control messages
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = NULL;
@ -355,19 +482,107 @@ READPACKET:
break;
case MQTT_DATA:
mud->mqtt_state.message_length_read = length;
mud->mqtt_state.message_length = mqtt_get_total_length(in_buffer, mud->mqtt_state.message_length_read);
message_length = mqtt_get_total_length(in_buffer, in_buffer_length);
msg_type = mqtt_get_type(in_buffer);
msg_qos = mqtt_get_qos(in_buffer);
msg_id = mqtt_get_id(in_buffer, mud->mqtt_state.message_length);
msg_id = mqtt_get_id(in_buffer, in_buffer_length);
NODE_DBG("MQTT_DATA: msg length: %u, buffer length: %u\r\n",
message_length,
in_buffer_length);
if (message_length > mud->connect_info.max_message_length) {
// The pending message length is larger than we was configured to allow
if(msg_qos > 0 && msg_id == 0) {
NODE_DBG("MQTT: msg too long, but not enough data to get msg_id: total=%u, deliver=%u\r\n", message_length, in_buffer_length);
// qos requested, but too short buffer to get a packet ID.
// Trigger the "short buffer" mode
message_length = -1;
// Drop through to partial message handling below.
} else {
NODE_DBG("MQTT: msg too long: total=%u, deliver=%u\r\n", message_length, in_buffer_length);
if (msg_type == MQTT_MSG_TYPE_PUBLISH) {
// In practice we should never get any other types..
deliver_publish(mud, in_buffer, in_buffer_length, 1);
// If qos specified, we should ACK it.
// In theory it might be wrong to ack it before we received all TCP packets, but this avoids
// buffering and special code to handle this corner-case. Server will most likely have
// written all to OS socket anyway, and not be aware that we "should" not have received it all yet.
if(msg_qos == 1){
temp_msg = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id);
msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBACK, (int)mqtt_get_qos(temp_msg->data) );
}
else if(msg_qos == 2){
temp_msg = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id);
msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBREC, (int)mqtt_get_qos(temp_msg->data) );
}
if(msg_qos == 1 || msg_qos == 2){
NODE_DBG("MQTT: Queue response QoS: %d\r\n", msg_qos);
}
}
if (message_length > in_buffer_length) {
// Ignore bytes in subsequent packet(s) too.
NODE_DBG("MQTT: skipping into next rx\n");
mud->mqtt_state.recv_buffer_state = MQTT_RECV_SKIPPING;
mud->mqtt_state.recv_buffer_skip = (uint32_t) message_length - in_buffer_length;
break;
} else {
NODE_DBG("MQTT: Skipping message\n");
mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL;
goto RX_MESSAGE_PROCESSED;
}
}
}
if (message_length == -1 || message_length > in_buffer_length) {
// Partial message in buffer, need to store on heap until next RX. Allocate size for full message directly,
// instead of potential reallocs, to avoid fragmentation.
// If message_length is indicated as -1, we do not have enough data to determine the length properly.
// Just put what we have on heap, and place in state BUFFERING_SHORT.
NODE_DBG("MQTT: Partial message received (%u of %d). Buffering\r\n",
in_buffer_length,
message_length);
// although message_length is 32bit, it should never go above 16bit since
// max_message_length is 16bit.
uint16_t alloc_size = message_length > 0 ? (uint16_t)message_length : in_buffer_length;
mud->mqtt_state.recv_buffer = c_zalloc(alloc_size);
if (mud->mqtt_state.recv_buffer == NULL) {
NODE_DBG("MQTT: Failed to allocate %u bytes, disconnecting...\n", alloc_size);
#ifdef CLIENT_SSL_ENABLE
if (mud->secure) {
espconn_secure_disconnect(pesp_conn);
} else
#endif
{
espconn_disconnect(pesp_conn);
}
return;
}
memcpy(mud->mqtt_state.recv_buffer, in_buffer, in_buffer_length);
mud->mqtt_state.recv_buffer_wp = mud->mqtt_state.recv_buffer + in_buffer_length;
mud->mqtt_state.recv_buffer_state = message_length > 0 ? MQTT_RECV_BUFFERING : MQTT_RECV_BUFFERING_SHORT;
mud->mqtt_state.recv_buffer_size = alloc_size;
NODE_DBG("MQTT: Wait for next recv\n");
break;
}
msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q));
NODE_DBG("MQTT_DATA: type: %d, qos: %d, msg_id: %d, pending_id: %d, msg length: %u, buffer length: %u\r\n",
msg_type,
msg_qos,
msg_id,
(pending_msg)?pending_msg->msg_id:0,
message_length,
in_buffer_length);
NODE_DBG("MQTT_DATA: type: %d, qos: %d, msg_id: %d, pending_id: %d\r\n",
msg_type,
msg_qos,
msg_id,
(pending_msg)?pending_msg->msg_id:0);
switch(msg_type)
{
case MQTT_MSG_TYPE_SUBACK:
@ -411,7 +626,7 @@ READPACKET:
if(msg_qos == 1 || msg_qos == 2){
NODE_DBG("MQTT: Queue response QoS: %d\r\n", msg_qos);
}
deliver_publish(mud, in_buffer, mud->mqtt_state.message_length);
deliver_publish(mud, in_buffer, (uint16_t)message_length, 0);
break;
case MQTT_MSG_TYPE_PUBACK:
if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){
@ -472,28 +687,40 @@ READPACKET:
NODE_DBG("MQTT: PINGRESP received\r\n");
break;
}
// NOTE: this is done down here and not in the switch case above
// because the PSOCK_READBUF_LEN() won't work inside a switch
// statement due to the way protothreads resume.
if(msg_type == MQTT_MSG_TYPE_PUBLISH)
{
length = mud->mqtt_state.message_length_read;
RX_MESSAGE_PROCESSED:
if(continuation_buffer != NULL) {
NODE_DBG("MQTT[buffering]: buffered message finished. Continuing with rest of rx buffer (%u)\n",
len);
c_free(mud->mqtt_state.recv_buffer);
mud->mqtt_state.recv_buffer = NULL;
if(mud->mqtt_state.message_length < mud->mqtt_state.message_length_read)
{
length -= mud->mqtt_state.message_length;
in_buffer += mud->mqtt_state.message_length;
NODE_DBG("Get another published message\r\n");
goto READPACKET;
}
in_buffer = continuation_buffer;
in_buffer_length = len;
continuation_buffer = NULL;
}else{
// Message have been fully processed (or ignored). Move pointer ahead
// and continue with next message, if any.
in_buffer_length -= message_length;
in_buffer += message_length;
}
if(in_buffer_length > 0)
{
NODE_DBG("Get another published message\r\n");
goto READPACKET;
}
break;
}
RX_PACKET_FINISHED:
if(temp_pdata != NULL) {
c_free(temp_pdata);
}
mqtt_send_if_possible(pesp_conn);
NODE_DBG("leave mqtt_socket_received.\n");
NODE_DBG("leave mqtt_socket_received\n");
return;
}
@ -580,7 +807,7 @@ static void mqtt_socket_connected(void *arg)
mud->keep_alive_tick = 0;
mud->connState = MQTT_CONNECT_SENDING;
NODE_DBG("leave mqtt_socket_connected.\n");
NODE_DBG("leave mqtt_socket_connectet, heap = %u.\n", system_get_free_heap_size());
return;
}
@ -686,7 +913,7 @@ void mqtt_socket_timer(void *arg)
NODE_DBG("leave mqtt_socket_timer.\n");
}
// Lua: mqtt.Client(clientid, keepalive, user, pass, clean_session)
// Lua: mqtt.Client(clientid, keepalive, user, pass, clean_session, max_message_length)
static int mqtt_socket_client( lua_State* L )
{
NODE_DBG("enter mqtt_socket_client.\n");
@ -703,6 +930,7 @@ static int mqtt_socket_client( lua_State* L )
int keepalive = 0;
int stack = 1;
int clean_session = 1;
int max_message_length = 0;
int top = lua_gettop(L);
// create a object
@ -715,6 +943,7 @@ static int mqtt_socket_client( lua_State* L )
mud->cb_disconnect_ref = LUA_NOREF;
mud->cb_message_ref = LUA_NOREF;
mud->cb_overflow_ref = LUA_NOREF;
mud->cb_suback_ref = LUA_NOREF;
mud->cb_unsuback_ref = LUA_NOREF;
mud->cb_puback_ref = LUA_NOREF;
@ -767,6 +996,16 @@ static int mqtt_socket_client( lua_State* L )
clean_session = 1;
}
if(lua_isnumber( L, stack ))
{
max_message_length = luaL_checkinteger( L, stack);
stack++;
}
if(max_message_length == 0) {
max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
}
// TODO: check the zalloc result.
mud->connect_info.client_id = (uint8_t *)c_zalloc(idl+1);
mud->connect_info.username = (uint8_t *)c_zalloc(unl + 1);
@ -784,7 +1023,7 @@ static int mqtt_socket_client( lua_State* L )
c_free(mud->connect_info.password);
mud->connect_info.password = NULL;
}
return luaL_error(L, "not enough memory");
return luaL_error(L, "not enough memory");
}
c_memcpy(mud->connect_info.client_id, clientId, idl);
@ -800,11 +1039,15 @@ static int mqtt_socket_client( lua_State* L )
mud->connect_info.will_qos = 0;
mud->connect_info.will_retain = 0;
mud->connect_info.keepalive = keepalive;
mud->connect_info.max_message_length = max_message_length;
mud->mqtt_state.pending_msg_q = NULL;
mud->mqtt_state.auto_reconnect = RECONNECT_OFF;
mud->mqtt_state.port = 1883;
mud->mqtt_state.connect_info = &mud->connect_info;
mud->mqtt_state.recv_buffer = NULL;
mud->mqtt_state.recv_buffer_size = 0;
mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL;
NODE_DBG("leave mqtt_socket_client.\n");
return 1;
@ -852,6 +1095,13 @@ static int mqtt_delete( lua_State* L )
}
// ----
//--------- alloc-ed in mqtt_socket_received()
if(mud->mqtt_state.recv_buffer) {
c_free(mud->mqtt_state.recv_buffer);
mud->mqtt_state.recv_buffer = NULL;
}
// ----
//--------- alloc-ed in mqtt_socket_client()
if(mud->connect_info.client_id){
c_free(mud->connect_info.client_id);
@ -876,6 +1126,8 @@ static int mqtt_delete( lua_State* L )
mud->cb_disconnect_ref = LUA_NOREF;
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_message_ref);
mud->cb_message_ref = LUA_NOREF;
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_overflow_ref);
mud->cb_overflow_ref = LUA_NOREF;
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_suback_ref);
mud->cb_suback_ref = LUA_NOREF;
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_unsuback_ref);
@ -918,7 +1170,7 @@ static sint8 socket_connect(struct espconn *pesp_conn)
os_timer_arm(&mud->mqttTimer, 1000, 1);
NODE_DBG("leave socket_connect, heap = %u.\n", system_get_free_heap_size());
NODE_DBG("leave socket_connect\n");
return espconn_status;
}
@ -1225,6 +1477,9 @@ static int mqtt_socket_on( lua_State* L )
}else if( sl == 7 && c_strcmp(method, "message") == 0){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_message_ref);
mud->cb_message_ref = luaL_ref(L, LUA_REGISTRYINDEX);
}else if( sl == 8 && c_strcmp(method, "overflow") == 0){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_overflow_ref);
mud->cb_overflow_ref = luaL_ref(L, LUA_REGISTRYINDEX);
}else{
lua_pop(L, 1);
return luaL_error( L, "method not supported" );

View File

@ -126,12 +126,16 @@ void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buff
connection->buffer_length = buffer_length;
}
int mqtt_get_total_length(uint8_t* buffer, uint16_t length)
// Returns total length of message, or -1 if not enough bytes are available
int32_t mqtt_get_total_length(uint8_t* buffer, uint16_t buffer_length)
{
int i;
int totlen = 0;
for(i = 1; i < length; ++i)
if(buffer_length == 1)
return -1;
for(i = 1; i < buffer_length; ++i)
{
totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
if((buffer[i] & 0x80) == 0)
@ -139,19 +143,23 @@ int mqtt_get_total_length(uint8_t* buffer, uint16_t length)
++i;
break;
}
if(i == buffer_length)
return -1;
}
totlen += i;
return totlen;
}
const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length)
const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* buffer_length)
{
int i;
int totlen = 0;
int topiclen;
for(i = 1; i < *length; ++i)
for(i = 1; i < *buffer_length; ++i)
{
totlen += (buffer[i] & 0x7f) << (7 * (i -1));
if((buffer[i] & 0x80) == 0)
@ -162,25 +170,25 @@ const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length)
}
totlen += i;
if(i + 2 > *length)
if(i + 2 > *buffer_length)
return NULL;
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if(i + topiclen > *length)
if(i + topiclen > *buffer_length)
return NULL;
*length = topiclen;
*buffer_length = topiclen;
return (const char*)(buffer + i);
}
const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length)
const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* buffer_length)
{
int i;
int totlen = 0;
int topiclen;
for(i = 1; i < *length; ++i)
for(i = 1; i < *buffer_length; ++i)
{
totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
if((buffer[i] & 0x80) == 0)
@ -191,20 +199,20 @@ const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length)
}
totlen += i;
if(i + 2 > *length)
if(i + 2 > *buffer_length)
return NULL;
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if(i + topiclen > *length){
*length = 0;
if(i + topiclen > *buffer_length){
*buffer_length = 0;
return NULL;
}
i += topiclen;
if(mqtt_get_qos(buffer) > 0)
{
if(i + 2 > *length)
if(i + 2 > *buffer_length)
return NULL;
i += 2;
}
@ -212,16 +220,16 @@ const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length)
if(totlen < i)
return NULL;
if(totlen <= *length)
*length = totlen - i;
if(totlen <= *buffer_length)
*buffer_length = totlen - i;
else
*length = *length - i;
*buffer_length = *buffer_length - i;
return (const char*)(buffer + i);
}
uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length)
uint16_t mqtt_get_id(uint8_t* buffer, uint16_t buffer_length)
{
if(length < 1)
if(buffer_length < 1)
return 0;
switch(mqtt_get_type(buffer))
@ -234,7 +242,7 @@ uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length)
if(mqtt_get_qos(buffer) <= 0)
return 0;
for(i = 1; i < length; ++i)
for(i = 1; i < buffer_length; ++i)
{
if((buffer[i] & 0x80) == 0)
{
@ -243,16 +251,16 @@ uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length)
}
}
if(i + 2 > length)
if(i + 2 > buffer_length)
return 0;
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if(i + topiclen > length)
if(i + topiclen > buffer_length)
return 0;
i += topiclen;
if(i + 2 > length)
if(i + 2 > buffer_length)
return 0;
return (buffer[i] << 8) | buffer[i + 1];
@ -267,7 +275,7 @@ uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length)
{
// This requires the remaining length to be encoded in 1 byte,
// which it should be.
if(length >= 4 && (buffer[1] & 0x80) == 0)
if(buffer_length >= 4 && (buffer[1] & 0x80) == 0)
return (buffer[2] << 8) | buffer[3];
else
return 0;

View File

@ -108,21 +108,22 @@ typedef struct mqtt_connect_info
int will_qos;
int will_retain;
int clean_session;
uint16_t max_message_length;
} mqtt_connect_info_t;
static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> 4; }
static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; }
static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; }
static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }
static inline int mqtt_get_connect_ret_code(uint8_t* buffer) { return (buffer[3]); }
static inline uint8_t mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> 4; }
static inline uint8_t mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; }
static inline uint8_t mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; }
static inline uint8_t mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }
static inline uint8_t mqtt_get_connect_ret_code(uint8_t* buffer) { return (buffer[3]); }
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
int mqtt_get_total_length(uint8_t* buffer, uint16_t length);
const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length);
const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length);
uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length);
int32_t mqtt_get_total_length(uint8_t* buffer, uint16_t buffer_length);
const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* buffer_length);
const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* buffer_length);
uint16_t mqtt_get_id(uint8_t* buffer, uint16_t buffer_length);
mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info);
mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id);

View File

@ -11,7 +11,7 @@ The client adheres to version 3.1.1 of the [MQTT](https://en.wikipedia.org/wiki/
Creates a MQTT client.
#### Syntax
`mqtt.Client(clientid, keepalive[, username, password, cleansession])`
`mqtt.Client(clientid, keepalive[, username, password, cleansession, max_message_length])`
#### Parameters
- `clientid` client ID
@ -19,10 +19,38 @@ Creates a MQTT client.
- `username` user name
- `password` user password
- `cleansession` 0/1 for `false`/`true`. Default is 1 (`true`).
- `max_message_length`, how large messages to accept. Default is 1024.
#### Returns
MQTT client
#### Notes
According to MQTT specification the max PUBLISH length is 256Mb. This is too large for NodeMCU to realistically handle. To avoid
an out-of-memory situation, there is a limit on how big messages to accept. This is controlled by the `max_message_length` parameter.
In practice, this only affects incoming PUBLISH messages since all regular control packets are small.
The default 1024 was chosen as this was the implicit limit in NodeMCU 2.2.1 and older (where this was not handled at all).
Note that "message length" refers to the full MQTT message size, including fixed & variable headers, topic name, packet ID (if applicable),
and payload. For exact details, please see [the MQTT specification](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037).
Any message *larger* than `max_message_length` will be (partially) delivered to the `overflow` callback, if defined. The rest
of the message will be discarded. Any subsequent messages should be handled as expected.
Discarded messages will still be ACK'ed if QoS level 1 or 2 was requested, even if the application stack cannot handle them.
Heap memory will be used to buffer any message which spans more than a single TCP packet. A single allocation for the full
message will be performed when the message header is first seen, to avoid heap fragmentation.
If allocation fails, the MQTT session will be disconnected.
Naturally, messages larger than `max_message_length` will not be stored.
Note that heap allocation may occur even if the individual messages are not larger than the configured max! For example,
the broker may send multiple smaller messages in quick succession, which could go into the same TCP packet. If the last message
in the TCP packet did not fit fully, a heap buffer will be allocated to hold the incomplete message while waiting for the next TCP packet.
The typical maximum size for a message to fit into a single TCP packet is 1460 bytes, but this depends on the network's MTU
configuration, any packet fragmentation, and as described above, multiple messages in the same TCP packet.
#### Example
```lua
-- init mqtt client without logins, keepalive timer 120s
@ -47,6 +75,11 @@ m:on("message", function(client, topic, data)
end
end)
-- on publish overflow receive event
m:on("overflow", function(client, topic, data)
print(topic .. " partial overflowed message: " .. data )
end)
-- for TLS: m:connect("192.168.11.118", secure-port, 1)
m:connect("192.168.11.118", 1883, 0, function(client)
print("connected")
@ -180,7 +213,7 @@ Registers a callback function for an event.
`mqtt:on(event, function(client[, topic[, message]]))`
#### Parameters
- `event` can be "connect", "message" or "offline"
- `event` can be "connect", "message", "offline" or "overflow"
- `function(client[, topic[, message]])` callback function. The first parameter is the client. If event is "message", the 2nd and 3rd param are received topic and message (strings).
#### Returns