Send any queued packets when possible

This commit is contained in:
philip 2016-02-01 22:29:32 -05:00
parent fcb14a33b3
commit c9cf02ba31
1 changed files with 28 additions and 10 deletions

View File

@ -431,25 +431,43 @@ static void mqtt_socket_sent(void *arg)
return; return;
} }
NODE_DBG("sent1, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("sent1, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
uint8_t try_send = 1;
// qos = 0, publish and forgot. // qos = 0, publish and forgot.
msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q)); msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q));
if(node && node->msg_type == MQTT_MSG_TYPE_PUBLISH && node->publish_qos == 0) { if(node && node->msg_type == MQTT_MSG_TYPE_PUBLISH && node->publish_qos == 0) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
if(mud->cb_puback_ref == LUA_NOREF) if(mud->cb_puback_ref != LUA_NOREF && mud->self_ref != LUA_NOREF) {
return; lua_State *L = lua_getstate();
if(mud->self_ref == LUA_NOREF) lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
return; lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
if(mud->L == NULL) lua_call(L, 1, 0);
return; }
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_call(mud->L, 1, 0);
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK) { } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) { } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
} else if(node && node->msg_type == MQTT_MSG_TYPE_PINGREQ) { } else if(node && node->msg_type == MQTT_MSG_TYPE_PINGREQ) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
} else {
try_send = 0;
}
if (try_send) {
msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q));
if (node) {
mud->event_timeout = MQTT_SEND_TIMEOUT;
NODE_DBG("Sent: %d\n", node->msg.length);
#ifdef CLIENT_SSL_ENABLE
if( mud->secure )
{
(void) espconn_secure_send( mud->pesp_conn, node->msg.data, node->msg.length );
}
else
#endif
{
(void) espconn_send( mud->pesp_conn, node->msg.data, node->msg.length );
}
mud->keep_alive_tick = 0;
}
} }
NODE_DBG("sent2, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("sent2, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
NODE_DBG("leave mqtt_socket_sent.\n"); NODE_DBG("leave mqtt_socket_sent.\n");
@ -1322,7 +1340,7 @@ static int mqtt_socket_publish( lua_State* L )
msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos ); msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos );
sint8 espconn_status = ESPCONN_IF; sint8 espconn_status = ESPCONN_OK;
if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){ if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){
mud->event_timeout = MQTT_SEND_TIMEOUT; mud->event_timeout = MQTT_SEND_TIMEOUT;