From c9cf02ba3143025da8cb7d0345a7de8352eb72be Mon Sep 17 00:00:00 2001 From: philip Date: Mon, 1 Feb 2016 22:29:32 -0500 Subject: [PATCH] Send any queued packets when possible --- app/modules/mqtt.c | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c index 2b6a35b3..234dae96 100644 --- a/app/modules/mqtt.c +++ b/app/modules/mqtt.c @@ -431,25 +431,43 @@ static void mqtt_socket_sent(void *arg) return; } NODE_DBG("sent1, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); + uint8_t try_send = 1; // qos = 0, publish and forgot. msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q)); if(node && node->msg_type == MQTT_MSG_TYPE_PUBLISH && node->publish_qos == 0) { msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); - if(mud->cb_puback_ref == LUA_NOREF) - return; - if(mud->self_ref == LUA_NOREF) - return; - if(mud->L == NULL) - return; - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_puback_ref); - lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua - lua_call(mud->L, 1, 0); + if(mud->cb_puback_ref != LUA_NOREF && mud->self_ref != LUA_NOREF) { + lua_State *L = lua_getstate(); + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_puback_ref); + lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua + lua_call(L, 1, 0); + } } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK) { msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) { msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); } else if(node && node->msg_type == MQTT_MSG_TYPE_PINGREQ) { msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); + } else { + try_send = 0; + } + if (try_send) { + msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q)); + if (node) { + mud->event_timeout = MQTT_SEND_TIMEOUT; + NODE_DBG("Sent: %d\n", node->msg.length); +#ifdef CLIENT_SSL_ENABLE + if( mud->secure ) + { + (void) espconn_secure_send( mud->pesp_conn, node->msg.data, node->msg.length ); + } + else +#endif + { + (void) espconn_send( mud->pesp_conn, node->msg.data, node->msg.length ); + } + mud->keep_alive_tick = 0; + } } NODE_DBG("sent2, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("leave mqtt_socket_sent.\n"); @@ -1322,7 +1340,7 @@ static int mqtt_socket_publish( lua_State* L ) msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos ); - sint8 espconn_status = ESPCONN_IF; + sint8 espconn_status = ESPCONN_OK; if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){ mud->event_timeout = MQTT_SEND_TIMEOUT;