Merge pull request #1002 from pjsg/mqtt
Fix memory exhaustion in mqtt under circumstances from issue #975
This commit is contained in:
commit
2c687313de
|
@ -431,25 +431,43 @@ static void mqtt_socket_sent(void *arg)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
NODE_DBG("sent1, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
|
NODE_DBG("sent1, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
|
||||||
|
uint8_t try_send = 1;
|
||||||
// qos = 0, publish and forgot.
|
// qos = 0, publish and forgot.
|
||||||
msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q));
|
msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q));
|
||||||
if(node && node->msg_type == MQTT_MSG_TYPE_PUBLISH && node->publish_qos == 0) {
|
if(node && node->msg_type == MQTT_MSG_TYPE_PUBLISH && node->publish_qos == 0) {
|
||||||
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
|
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
|
||||||
if(mud->cb_puback_ref == LUA_NOREF)
|
if(mud->cb_puback_ref != LUA_NOREF && mud->self_ref != LUA_NOREF) {
|
||||||
return;
|
lua_State *L = lua_getstate();
|
||||||
if(mud->self_ref == LUA_NOREF)
|
lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
|
||||||
return;
|
lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
|
||||||
if(mud->L == NULL)
|
lua_call(L, 1, 0);
|
||||||
return;
|
}
|
||||||
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
|
|
||||||
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
|
|
||||||
lua_call(mud->L, 1, 0);
|
|
||||||
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK) {
|
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK) {
|
||||||
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
|
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
|
||||||
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) {
|
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) {
|
||||||
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
|
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
|
||||||
} else if(node && node->msg_type == MQTT_MSG_TYPE_PINGREQ) {
|
} else if(node && node->msg_type == MQTT_MSG_TYPE_PINGREQ) {
|
||||||
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
|
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
|
||||||
|
} else {
|
||||||
|
try_send = 0;
|
||||||
|
}
|
||||||
|
if (try_send) {
|
||||||
|
msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q));
|
||||||
|
if (node) {
|
||||||
|
mud->event_timeout = MQTT_SEND_TIMEOUT;
|
||||||
|
NODE_DBG("Sent: %d\n", node->msg.length);
|
||||||
|
#ifdef CLIENT_SSL_ENABLE
|
||||||
|
if( mud->secure )
|
||||||
|
{
|
||||||
|
(void) espconn_secure_send( mud->pesp_conn, node->msg.data, node->msg.length );
|
||||||
|
}
|
||||||
|
else
|
||||||
|
#endif
|
||||||
|
{
|
||||||
|
(void) espconn_send( mud->pesp_conn, node->msg.data, node->msg.length );
|
||||||
|
}
|
||||||
|
mud->keep_alive_tick = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
NODE_DBG("sent2, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
|
NODE_DBG("sent2, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
|
||||||
NODE_DBG("leave mqtt_socket_sent.\n");
|
NODE_DBG("leave mqtt_socket_sent.\n");
|
||||||
|
@ -1323,7 +1341,7 @@ static int mqtt_socket_publish( lua_State* L )
|
||||||
msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
|
msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
|
||||||
msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos );
|
msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos );
|
||||||
|
|
||||||
sint8 espconn_status = ESPCONN_IF;
|
sint8 espconn_status = ESPCONN_OK;
|
||||||
|
|
||||||
if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){
|
if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){
|
||||||
mud->event_timeout = MQTT_SEND_TIMEOUT;
|
mud->event_timeout = MQTT_SEND_TIMEOUT;
|
||||||
|
|
Loading…
Reference in New Issue