diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index 8fdedabc..7ab49146 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -46,17 +46,32 @@ typedef struct mqtt_event_data_t #define RECONNECT_POSSIBLE 1 #define RECONNECT_ON 2 +typedef enum { + MQTT_RECV_NORMAL, + MQTT_RECV_BUFFERING_SHORT, + MQTT_RECV_BUFFERING, + MQTT_RECV_SKIPPING, +} tReceiveState; + typedef struct mqtt_state_t { uint16_t port; uint8_t auto_reconnect; // 0 is not auto_reconnect. 1 is auto reconnect, but never connected. 2 is auto reconnect, but once connected mqtt_connect_info_t* connect_info; - uint16_t message_length; - uint16_t message_length_read; mqtt_connection_t mqtt_connection; msg_queue_t* pending_msg_q; + + uint8_t * recv_buffer; // heap buffer for multi-packet rx + uint8_t * recv_buffer_wp; // write pointer in multi-packet rx + union { + uint16_t recv_buffer_size; // size of recv_buffer + uint32_t recv_buffer_skip; // number of bytes left to skip, in skipping state + }; + tReceiveState recv_buffer_state; + } mqtt_state_t; + typedef struct lmqtt_userdata { struct espconn *pesp_conn; @@ -65,6 +80,7 @@ typedef struct lmqtt_userdata int cb_connect_fail_ref; int cb_disconnect_ref; int cb_message_ref; + int cb_overflow_ref; int cb_suback_ref; int cb_unsuback_ref; int cb_puback_ref; @@ -81,6 +97,9 @@ typedef struct lmqtt_userdata tConnState connState; }lmqtt_userdata; +// How large MQTT messages to accept by default +#define DEFAULT_MAX_MESSAGE_LENGTH 1024 + static sint8 socket_connect(struct espconn *pesp_conn); static void mqtt_socket_reconnected(void *arg, sint8_t err); static void mqtt_socket_connected(void *arg); @@ -110,6 +129,13 @@ static void mqtt_socket_disconnected(void *arg) // tcp only } } + if(mud->mqtt_state.recv_buffer) { + c_free(mud->mqtt_state.recv_buffer); + mud->mqtt_state.recv_buffer = NULL; + } + mud->mqtt_state.recv_buffer_size = 0; + mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL; + if(mud->mqtt_state.auto_reconnect == RECONNECT_ON) { mud->pesp_conn->reverse = mud; mud->pesp_conn->type = ESPCONN_TCP; @@ -178,9 +204,9 @@ static void mqtt_socket_reconnected(void *arg, sint8_t err) NODE_DBG("leave mqtt_socket_reconnected.\n"); } -static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length) +static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, uint16_t length, uint8_t is_overflow) { - NODE_DBG("enter deliver_publish.\n"); + NODE_DBG("enter deliver_publish (len=%d, overflow=%d).\n", length, is_overflow); if(mud == NULL) return; mqtt_event_data_t event_data; @@ -191,13 +217,15 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length) event_data.data_length = length; event_data.data = mqtt_get_publish_data(message, &event_data.data_length); - if(mud->cb_message_ref == LUA_NOREF) + int cb_ref = !is_overflow ? mud->cb_message_ref : mud->cb_overflow_ref; + + if(cb_ref == LUA_NOREF) return; if(mud->self_ref == LUA_NOREF) return; lua_State *L = lua_getstate(); if(event_data.topic && (event_data.topic_length > 0)){ - lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_message_ref); + lua_rawgeti(L, LUA_REGISTRYINDEX, cb_ref); lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua lua_pushlstring(L, event_data.topic, event_data.topic_length); } else { @@ -265,14 +293,15 @@ static sint8 mqtt_send_if_possible(struct espconn *pesp_conn) static void mqtt_socket_received(void *arg, char *pdata, unsigned short len) { - NODE_DBG("enter mqtt_socket_received.\n"); + NODE_DBG("enter mqtt_socket_received (rxlen=%u).\n", len); uint8_t msg_type; uint8_t msg_qos; uint16_t msg_id; - int length = (int)len; - // uint8_t in_buffer[MQTT_BUF_SIZE]; uint8_t *in_buffer = (uint8_t *)pdata; + uint16_t in_buffer_length = len; + uint8_t *continuation_buffer = NULL; + uint8_t *temp_pdata = NULL; struct espconn *pesp_conn = arg; if(pesp_conn == NULL) @@ -281,11 +310,109 @@ static void mqtt_socket_received(void *arg, char *pdata, unsigned short len) if(mud == NULL) return; -READPACKET: - if(length > MQTT_BUF_SIZE || length <= 0) - return; + switch(mud->mqtt_state.recv_buffer_state) { + case MQTT_RECV_NORMAL: + // No previous buffer. + break; + case MQTT_RECV_BUFFERING_SHORT: + // Last buffer had so few byte that we could not determine message length. + // Store in a local heap buffer and operate on this, as if was the regular pdata buffer. + // Avoids having to repeat message size/overflow logic. + temp_pdata = c_zalloc(mud->mqtt_state.recv_buffer_size + len); + if(temp_pdata == NULL) { + NODE_DBG("MQTT[buffering-short]: Failed to allocate %u bytes, disconnecting...\n", mud->mqtt_state.recv_buffer_size + len); +#ifdef CLIENT_SSL_ENABLE + if (mud->secure) { + espconn_secure_disconnect(pesp_conn); + } else +#endif + { + espconn_disconnect(pesp_conn); + } + return; + } - // c_memcpy(in_buffer, pdata, length); + NODE_DBG("MQTT[buffering-short]: Continuing with %u + %u bytes\n", mud->mqtt_state.recv_buffer_size, len); + memcpy(temp_pdata, mud->mqtt_state.recv_buffer, mud->mqtt_state.recv_buffer_size); + memcpy(temp_pdata + mud->mqtt_state.recv_buffer_size, pdata, len); + c_free(mud->mqtt_state.recv_buffer); + mud->mqtt_state.recv_buffer = NULL; + mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL; + + in_buffer = temp_pdata; + in_buffer_length = mud->mqtt_state.recv_buffer_size + len; + break; + + case MQTT_RECV_BUFFERING: { + // safe cast: we never allow longer buffer. + uint16_t current_length = (uint16_t) (mud->mqtt_state.recv_buffer_wp - mud->mqtt_state.recv_buffer); + + NODE_DBG("MQTT[buffering]: appending %u bytes to previous recv buffer (%u out of wanted %u)\n", + in_buffer_length, + current_length, + mud->mqtt_state.recv_buffer_size); + + // Copy from rx buffer to heap buffer. Smallest of [remainder of pending message] and [all of buffer] + uint16_t copy_length = LWIP_MIN(mud->mqtt_state.recv_buffer_size - current_length, in_buffer_length); + memcpy(mud->mqtt_state.recv_buffer_wp, pdata, copy_length); + mud->mqtt_state.recv_buffer_wp += copy_length; + + in_buffer_length = (uint16_t) (mud->mqtt_state.recv_buffer_wp - mud->mqtt_state.recv_buffer); + if (in_buffer_length < mud->mqtt_state.recv_buffer_size) { + NODE_DBG("MQTT[buffering]: need %u more bytes, waiting for next rx.\n", + mud->mqtt_state.recv_buffer_size - in_buffer_length + ); + goto RX_PACKET_FINISHED; + } + + NODE_DBG("MQTT[buffering]: Full message received (%u). remainding bytes=%u\n", + mud->mqtt_state.recv_buffer_size, + len - copy_length); + + // Point continuation_buffer to any additional data in pdata. + // May become 0 bytes, but used to trigger free! + continuation_buffer = pdata + copy_length; + len -= copy_length; // borrow len instead of having another variable.. + + in_buffer = mud->mqtt_state.recv_buffer; + // in_buffer_length was set above + mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL; + + break; + } + case MQTT_RECV_SKIPPING: + // Last rx had a message which was too large to process, skip it. + if(mud->mqtt_state.recv_buffer_skip > in_buffer_length) { + NODE_DBG("MQTT[skipping]: skip=%u. Skipping full RX buffer (%u).\n", + mud->mqtt_state.recv_buffer_skip, + in_buffer_length + ); + mud->mqtt_state.recv_buffer_skip -= in_buffer_length; + goto RX_PACKET_FINISHED; + } + + NODE_DBG("MQTT[skipping]: skip=%u. Skipping partial RX buffer, continuing at %u\n", + mud->mqtt_state.recv_buffer_skip, + in_buffer_length + ); + + in_buffer += mud->mqtt_state.recv_buffer_skip; + in_buffer_length -= mud->mqtt_state.recv_buffer_skip; + + mud->mqtt_state.recv_buffer_skip = 0; + mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL; + break; + } + +READPACKET: + if(in_buffer_length <= 0) + goto RX_PACKET_FINISHED; + + // MQTT publish message can in theory be 256Mb, while we do not support it we need to be + // able to do math on it. + int32_t message_length; + + // temp buffer for control messages uint8_t temp_buffer[MQTT_BUF_SIZE]; mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); mqtt_message_t *temp_msg = NULL; @@ -355,19 +482,107 @@ READPACKET: break; case MQTT_DATA: - mud->mqtt_state.message_length_read = length; - mud->mqtt_state.message_length = mqtt_get_total_length(in_buffer, mud->mqtt_state.message_length_read); + message_length = mqtt_get_total_length(in_buffer, in_buffer_length); msg_type = mqtt_get_type(in_buffer); msg_qos = mqtt_get_qos(in_buffer); - msg_id = mqtt_get_id(in_buffer, mud->mqtt_state.message_length); + msg_id = mqtt_get_id(in_buffer, in_buffer_length); + + NODE_DBG("MQTT_DATA: msg length: %u, buffer length: %u\r\n", + message_length, + in_buffer_length); + + if (message_length > mud->connect_info.max_message_length) { + // The pending message length is larger than we was configured to allow + if(msg_qos > 0 && msg_id == 0) { + NODE_DBG("MQTT: msg too long, but not enough data to get msg_id: total=%u, deliver=%u\r\n", message_length, in_buffer_length); + // qos requested, but too short buffer to get a packet ID. + // Trigger the "short buffer" mode + message_length = -1; + // Drop through to partial message handling below. + } else { + NODE_DBG("MQTT: msg too long: total=%u, deliver=%u\r\n", message_length, in_buffer_length); + if (msg_type == MQTT_MSG_TYPE_PUBLISH) { + // In practice we should never get any other types.. + deliver_publish(mud, in_buffer, in_buffer_length, 1); + + // If qos specified, we should ACK it. + // In theory it might be wrong to ack it before we received all TCP packets, but this avoids + // buffering and special code to handle this corner-case. Server will most likely have + // written all to OS socket anyway, and not be aware that we "should" not have received it all yet. + if(msg_qos == 1){ + temp_msg = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id); + msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, + msg_id, MQTT_MSG_TYPE_PUBACK, (int)mqtt_get_qos(temp_msg->data) ); + } + else if(msg_qos == 2){ + temp_msg = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id); + msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, + msg_id, MQTT_MSG_TYPE_PUBREC, (int)mqtt_get_qos(temp_msg->data) ); + } + if(msg_qos == 1 || msg_qos == 2){ + NODE_DBG("MQTT: Queue response QoS: %d\r\n", msg_qos); + } + } + + if (message_length > in_buffer_length) { + // Ignore bytes in subsequent packet(s) too. + NODE_DBG("MQTT: skipping into next rx\n"); + mud->mqtt_state.recv_buffer_state = MQTT_RECV_SKIPPING; + mud->mqtt_state.recv_buffer_skip = (uint32_t) message_length - in_buffer_length; + break; + } else { + NODE_DBG("MQTT: Skipping message\n"); + mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL; + goto RX_MESSAGE_PROCESSED; + } + } + } + + if (message_length == -1 || message_length > in_buffer_length) { + // Partial message in buffer, need to store on heap until next RX. Allocate size for full message directly, + // instead of potential reallocs, to avoid fragmentation. + // If message_length is indicated as -1, we do not have enough data to determine the length properly. + // Just put what we have on heap, and place in state BUFFERING_SHORT. + NODE_DBG("MQTT: Partial message received (%u of %d). Buffering\r\n", + in_buffer_length, + message_length); + + // although message_length is 32bit, it should never go above 16bit since + // max_message_length is 16bit. + uint16_t alloc_size = message_length > 0 ? (uint16_t)message_length : in_buffer_length; + + mud->mqtt_state.recv_buffer = c_zalloc(alloc_size); + if (mud->mqtt_state.recv_buffer == NULL) { + NODE_DBG("MQTT: Failed to allocate %u bytes, disconnecting...\n", alloc_size); +#ifdef CLIENT_SSL_ENABLE + if (mud->secure) { + espconn_secure_disconnect(pesp_conn); + } else +#endif + { + espconn_disconnect(pesp_conn); + } + return; + } + + memcpy(mud->mqtt_state.recv_buffer, in_buffer, in_buffer_length); + mud->mqtt_state.recv_buffer_wp = mud->mqtt_state.recv_buffer + in_buffer_length; + mud->mqtt_state.recv_buffer_state = message_length > 0 ? MQTT_RECV_BUFFERING : MQTT_RECV_BUFFERING_SHORT; + mud->mqtt_state.recv_buffer_size = alloc_size; + + NODE_DBG("MQTT: Wait for next recv\n"); + break; + } msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q)); + NODE_DBG("MQTT_DATA: type: %d, qos: %d, msg_id: %d, pending_id: %d, msg length: %u, buffer length: %u\r\n", + msg_type, + msg_qos, + msg_id, + (pending_msg)?pending_msg->msg_id:0, + message_length, + in_buffer_length); - NODE_DBG("MQTT_DATA: type: %d, qos: %d, msg_id: %d, pending_id: %d\r\n", - msg_type, - msg_qos, - msg_id, - (pending_msg)?pending_msg->msg_id:0); switch(msg_type) { case MQTT_MSG_TYPE_SUBACK: @@ -411,7 +626,7 @@ READPACKET: if(msg_qos == 1 || msg_qos == 2){ NODE_DBG("MQTT: Queue response QoS: %d\r\n", msg_qos); } - deliver_publish(mud, in_buffer, mud->mqtt_state.message_length); + deliver_publish(mud, in_buffer, (uint16_t)message_length, 0); break; case MQTT_MSG_TYPE_PUBACK: if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){ @@ -472,28 +687,40 @@ READPACKET: NODE_DBG("MQTT: PINGRESP received\r\n"); break; } - // NOTE: this is done down here and not in the switch case above - // because the PSOCK_READBUF_LEN() won't work inside a switch - // statement due to the way protothreads resume. - if(msg_type == MQTT_MSG_TYPE_PUBLISH) - { - length = mud->mqtt_state.message_length_read; +RX_MESSAGE_PROCESSED: + if(continuation_buffer != NULL) { + NODE_DBG("MQTT[buffering]: buffered message finished. Continuing with rest of rx buffer (%u)\n", + len); + c_free(mud->mqtt_state.recv_buffer); + mud->mqtt_state.recv_buffer = NULL; - if(mud->mqtt_state.message_length < mud->mqtt_state.message_length_read) - { - length -= mud->mqtt_state.message_length; - in_buffer += mud->mqtt_state.message_length; - - NODE_DBG("Get another published message\r\n"); - goto READPACKET; - } + in_buffer = continuation_buffer; + in_buffer_length = len; + continuation_buffer = NULL; + }else{ + // Message have been fully processed (or ignored). Move pointer ahead + // and continue with next message, if any. + in_buffer_length -= message_length; + in_buffer += message_length; } + + if(in_buffer_length > 0) + { + NODE_DBG("Get another published message\r\n"); + goto READPACKET; + } + break; } +RX_PACKET_FINISHED: + if(temp_pdata != NULL) { + c_free(temp_pdata); + } + mqtt_send_if_possible(pesp_conn); - NODE_DBG("leave mqtt_socket_received.\n"); + NODE_DBG("leave mqtt_socket_received\n"); return; } @@ -580,7 +807,7 @@ static void mqtt_socket_connected(void *arg) mud->keep_alive_tick = 0; mud->connState = MQTT_CONNECT_SENDING; - NODE_DBG("leave mqtt_socket_connected.\n"); + NODE_DBG("leave mqtt_socket_connectet, heap = %u.\n", system_get_free_heap_size()); return; } @@ -686,7 +913,7 @@ void mqtt_socket_timer(void *arg) NODE_DBG("leave mqtt_socket_timer.\n"); } -// Lua: mqtt.Client(clientid, keepalive, user, pass, clean_session) +// Lua: mqtt.Client(clientid, keepalive, user, pass, clean_session, max_message_length) static int mqtt_socket_client( lua_State* L ) { NODE_DBG("enter mqtt_socket_client.\n"); @@ -703,6 +930,7 @@ static int mqtt_socket_client( lua_State* L ) int keepalive = 0; int stack = 1; int clean_session = 1; + int max_message_length = 0; int top = lua_gettop(L); // create a object @@ -715,6 +943,7 @@ static int mqtt_socket_client( lua_State* L ) mud->cb_disconnect_ref = LUA_NOREF; mud->cb_message_ref = LUA_NOREF; + mud->cb_overflow_ref = LUA_NOREF; mud->cb_suback_ref = LUA_NOREF; mud->cb_unsuback_ref = LUA_NOREF; mud->cb_puback_ref = LUA_NOREF; @@ -767,6 +996,16 @@ static int mqtt_socket_client( lua_State* L ) clean_session = 1; } + if(lua_isnumber( L, stack )) + { + max_message_length = luaL_checkinteger( L, stack); + stack++; + } + + if(max_message_length == 0) { + max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; + } + // TODO: check the zalloc result. mud->connect_info.client_id = (uint8_t *)c_zalloc(idl+1); mud->connect_info.username = (uint8_t *)c_zalloc(unl + 1); @@ -784,7 +1023,7 @@ static int mqtt_socket_client( lua_State* L ) c_free(mud->connect_info.password); mud->connect_info.password = NULL; } - return luaL_error(L, "not enough memory"); + return luaL_error(L, "not enough memory"); } c_memcpy(mud->connect_info.client_id, clientId, idl); @@ -800,11 +1039,15 @@ static int mqtt_socket_client( lua_State* L ) mud->connect_info.will_qos = 0; mud->connect_info.will_retain = 0; mud->connect_info.keepalive = keepalive; + mud->connect_info.max_message_length = max_message_length; mud->mqtt_state.pending_msg_q = NULL; mud->mqtt_state.auto_reconnect = RECONNECT_OFF; mud->mqtt_state.port = 1883; mud->mqtt_state.connect_info = &mud->connect_info; + mud->mqtt_state.recv_buffer = NULL; + mud->mqtt_state.recv_buffer_size = 0; + mud->mqtt_state.recv_buffer_state = MQTT_RECV_NORMAL; NODE_DBG("leave mqtt_socket_client.\n"); return 1; @@ -852,6 +1095,13 @@ static int mqtt_delete( lua_State* L ) } // ---- + //--------- alloc-ed in mqtt_socket_received() + if(mud->mqtt_state.recv_buffer) { + c_free(mud->mqtt_state.recv_buffer); + mud->mqtt_state.recv_buffer = NULL; + } + // ---- + //--------- alloc-ed in mqtt_socket_client() if(mud->connect_info.client_id){ c_free(mud->connect_info.client_id); @@ -876,6 +1126,8 @@ static int mqtt_delete( lua_State* L ) mud->cb_disconnect_ref = LUA_NOREF; luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_message_ref); mud->cb_message_ref = LUA_NOREF; + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_overflow_ref); + mud->cb_overflow_ref = LUA_NOREF; luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_suback_ref); mud->cb_suback_ref = LUA_NOREF; luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_unsuback_ref); @@ -918,7 +1170,7 @@ static sint8 socket_connect(struct espconn *pesp_conn) os_timer_arm(&mud->mqttTimer, 1000, 1); - NODE_DBG("leave socket_connect, heap = %u.\n", system_get_free_heap_size()); + NODE_DBG("leave socket_connect\n"); return espconn_status; } @@ -1225,6 +1477,9 @@ static int mqtt_socket_on( lua_State* L ) }else if( sl == 7 && c_strcmp(method, "message") == 0){ luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_message_ref); mud->cb_message_ref = luaL_ref(L, LUA_REGISTRYINDEX); + }else if( sl == 8 && c_strcmp(method, "overflow") == 0){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_overflow_ref); + mud->cb_overflow_ref = luaL_ref(L, LUA_REGISTRYINDEX); }else{ lua_pop(L, 1); return luaL_error( L, "method not supported" ); diff --git a/app/mqtt/mqtt_msg.c b/app/mqtt/mqtt_msg.c index 0f206fcc..135a5139 100644 --- a/app/mqtt/mqtt_msg.c +++ b/app/mqtt/mqtt_msg.c @@ -126,12 +126,16 @@ void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buff connection->buffer_length = buffer_length; } -int mqtt_get_total_length(uint8_t* buffer, uint16_t length) +// Returns total length of message, or -1 if not enough bytes are available +int32_t mqtt_get_total_length(uint8_t* buffer, uint16_t buffer_length) { int i; int totlen = 0; - for(i = 1; i < length; ++i) + if(buffer_length == 1) + return -1; + + for(i = 1; i < buffer_length; ++i) { totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); if((buffer[i] & 0x80) == 0) @@ -139,19 +143,23 @@ int mqtt_get_total_length(uint8_t* buffer, uint16_t length) ++i; break; } + + if(i == buffer_length) + return -1; } + totlen += i; return totlen; } -const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length) +const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* buffer_length) { int i; int totlen = 0; int topiclen; - for(i = 1; i < *length; ++i) + for(i = 1; i < *buffer_length; ++i) { totlen += (buffer[i] & 0x7f) << (7 * (i -1)); if((buffer[i] & 0x80) == 0) @@ -162,25 +170,25 @@ const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length) } totlen += i; - if(i + 2 > *length) + if(i + 2 > *buffer_length) return NULL; topiclen = buffer[i++] << 8; topiclen |= buffer[i++]; - if(i + topiclen > *length) + if(i + topiclen > *buffer_length) return NULL; - *length = topiclen; + *buffer_length = topiclen; return (const char*)(buffer + i); } -const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length) +const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* buffer_length) { int i; int totlen = 0; int topiclen; - for(i = 1; i < *length; ++i) + for(i = 1; i < *buffer_length; ++i) { totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); if((buffer[i] & 0x80) == 0) @@ -191,20 +199,20 @@ const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length) } totlen += i; - if(i + 2 > *length) + if(i + 2 > *buffer_length) return NULL; topiclen = buffer[i++] << 8; topiclen |= buffer[i++]; - if(i + topiclen > *length){ - *length = 0; + if(i + topiclen > *buffer_length){ + *buffer_length = 0; return NULL; } i += topiclen; if(mqtt_get_qos(buffer) > 0) { - if(i + 2 > *length) + if(i + 2 > *buffer_length) return NULL; i += 2; } @@ -212,16 +220,16 @@ const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length) if(totlen < i) return NULL; - if(totlen <= *length) - *length = totlen - i; + if(totlen <= *buffer_length) + *buffer_length = totlen - i; else - *length = *length - i; + *buffer_length = *buffer_length - i; return (const char*)(buffer + i); } -uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length) +uint16_t mqtt_get_id(uint8_t* buffer, uint16_t buffer_length) { - if(length < 1) + if(buffer_length < 1) return 0; switch(mqtt_get_type(buffer)) @@ -234,7 +242,7 @@ uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length) if(mqtt_get_qos(buffer) <= 0) return 0; - for(i = 1; i < length; ++i) + for(i = 1; i < buffer_length; ++i) { if((buffer[i] & 0x80) == 0) { @@ -243,16 +251,16 @@ uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length) } } - if(i + 2 > length) + if(i + 2 > buffer_length) return 0; topiclen = buffer[i++] << 8; topiclen |= buffer[i++]; - if(i + topiclen > length) + if(i + topiclen > buffer_length) return 0; i += topiclen; - if(i + 2 > length) + if(i + 2 > buffer_length) return 0; return (buffer[i] << 8) | buffer[i + 1]; @@ -267,7 +275,7 @@ uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length) { // This requires the remaining length to be encoded in 1 byte, // which it should be. - if(length >= 4 && (buffer[1] & 0x80) == 0) + if(buffer_length >= 4 && (buffer[1] & 0x80) == 0) return (buffer[2] << 8) | buffer[3]; else return 0; diff --git a/app/mqtt/mqtt_msg.h b/app/mqtt/mqtt_msg.h index 74ea9b81..b6dbe13d 100644 --- a/app/mqtt/mqtt_msg.h +++ b/app/mqtt/mqtt_msg.h @@ -108,21 +108,22 @@ typedef struct mqtt_connect_info int will_qos; int will_retain; int clean_session; + uint16_t max_message_length; } mqtt_connect_info_t; -static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> 4; } -static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; } -static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; } -static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } -static inline int mqtt_get_connect_ret_code(uint8_t* buffer) { return (buffer[3]); } +static inline uint8_t mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> 4; } +static inline uint8_t mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; } +static inline uint8_t mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; } +static inline uint8_t mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } +static inline uint8_t mqtt_get_connect_ret_code(uint8_t* buffer) { return (buffer[3]); } void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); -int mqtt_get_total_length(uint8_t* buffer, uint16_t length); -const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length); -const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length); -uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length); +int32_t mqtt_get_total_length(uint8_t* buffer, uint16_t buffer_length); +const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* buffer_length); +const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* buffer_length); +uint16_t mqtt_get_id(uint8_t* buffer, uint16_t buffer_length); mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info); mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id); diff --git a/docs/en/modules/mqtt.md b/docs/en/modules/mqtt.md index feb402f3..5c6704a0 100644 --- a/docs/en/modules/mqtt.md +++ b/docs/en/modules/mqtt.md @@ -11,7 +11,7 @@ The client adheres to version 3.1.1 of the [MQTT](https://en.wikipedia.org/wiki/ Creates a MQTT client. #### Syntax -`mqtt.Client(clientid, keepalive[, username, password, cleansession])` +`mqtt.Client(clientid, keepalive[, username, password, cleansession, max_message_length])` #### Parameters - `clientid` client ID @@ -19,10 +19,38 @@ Creates a MQTT client. - `username` user name - `password` user password - `cleansession` 0/1 for `false`/`true`. Default is 1 (`true`). +- `max_message_length`, how large messages to accept. Default is 1024. #### Returns MQTT client +#### Notes + +According to MQTT specification the max PUBLISH length is 256Mb. This is too large for NodeMCU to realistically handle. To avoid +an out-of-memory situation, there is a limit on how big messages to accept. This is controlled by the `max_message_length` parameter. +In practice, this only affects incoming PUBLISH messages since all regular control packets are small. +The default 1024 was chosen as this was the implicit limit in NodeMCU 2.2.1 and older (where this was not handled at all). + +Note that "message length" refers to the full MQTT message size, including fixed & variable headers, topic name, packet ID (if applicable), +and payload. For exact details, please see [the MQTT specification](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037). + +Any message *larger* than `max_message_length` will be (partially) delivered to the `overflow` callback, if defined. The rest +of the message will be discarded. Any subsequent messages should be handled as expected. +Discarded messages will still be ACK'ed if QoS level 1 or 2 was requested, even if the application stack cannot handle them. + +Heap memory will be used to buffer any message which spans more than a single TCP packet. A single allocation for the full +message will be performed when the message header is first seen, to avoid heap fragmentation. +If allocation fails, the MQTT session will be disconnected. +Naturally, messages larger than `max_message_length` will not be stored. + +Note that heap allocation may occur even if the individual messages are not larger than the configured max! For example, +the broker may send multiple smaller messages in quick succession, which could go into the same TCP packet. If the last message +in the TCP packet did not fit fully, a heap buffer will be allocated to hold the incomplete message while waiting for the next TCP packet. + +The typical maximum size for a message to fit into a single TCP packet is 1460 bytes, but this depends on the network's MTU +configuration, any packet fragmentation, and as described above, multiple messages in the same TCP packet. + + #### Example ```lua -- init mqtt client without logins, keepalive timer 120s @@ -47,6 +75,11 @@ m:on("message", function(client, topic, data) end end) +-- on publish overflow receive event +m:on("overflow", function(client, topic, data) + print(topic .. " partial overflowed message: " .. data ) +end) + -- for TLS: m:connect("192.168.11.118", secure-port, 1) m:connect("192.168.11.118", 1883, 0, function(client) print("connected") @@ -180,7 +213,7 @@ Registers a callback function for an event. `mqtt:on(event, function(client[, topic[, message]]))` #### Parameters -- `event` can be "connect", "message" or "offline" +- `event` can be "connect", "message", "offline" or "overflow" - `function(client[, topic[, message]])` callback function. The first parameter is the client. If event is "message", the 2nd and 3rd param are received topic and message (strings). #### Returns