Make the MQTT PING functionality work better. (#1557)

Deal with flow control stopped case
This commit is contained in:
Philip Gladstone 2016-11-08 15:00:17 -05:00 committed by Marcel Stör
parent 73773fd8a5
commit b74a9dbdf7
1 changed files with 47 additions and 47 deletions

View File

@ -72,6 +72,7 @@ typedef struct lmqtt_userdata
uint8_t secure;
#endif
bool connected; // indicate socket connected, not mqtt prot connected.
bool keepalive_sent;
ETSTimer mqttTimer;
tConnState connState;
}lmqtt_userdata;
@ -241,12 +242,12 @@ static sint8 mqtt_send_if_possible(struct espconn *pesp_conn)
#ifdef CLIENT_SSL_ENABLE
if( mud->secure )
{
espconn_status = espconn_secure_send( pesp_conn, pending_msg->msg.data, pending_msg->msg.length );
espconn_status = espconn_secure_send( pesp_conn, pending_msg->msg.data, pending_msg->msg.length );
}
else
#endif
{
espconn_status = espconn_send( pesp_conn, pending_msg->msg.data, pending_msg->msg.length );
espconn_status = espconn_send( pesp_conn, pending_msg->msg.data, pending_msg->msg.length );
}
mud->keep_alive_tick = 0;
}
@ -275,7 +276,7 @@ static void mqtt_socket_received(void *arg, char *pdata, unsigned short len)
READPACKET:
if(length > MQTT_BUF_SIZE || length <= 0)
return;
return;
// c_memcpy(in_buffer, pdata, length);
uint8_t temp_buffer[MQTT_BUF_SIZE];
@ -454,6 +455,7 @@ READPACKET:
break;
case MQTT_MSG_TYPE_PINGRESP:
// Ignore
mud->keepalive_sent = 0;
NODE_DBG("MQTT: PINGRESP received\r\n");
break;
}
@ -585,7 +587,7 @@ void mqtt_socket_timer(void *arg)
NODE_DBG("timer, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
if(mud->event_timeout > 0){
NODE_DBG("event_timeout: %d.\n", mud->event_timeout);
mud->event_timeout --;
mud->event_timeout --;
if(mud->event_timeout > 0){
return;
} else {
@ -640,13 +642,20 @@ void mqtt_socket_timer(void *arg)
// no queued event.
mud->keep_alive_tick ++;
if(mud->keep_alive_tick > mud->mqtt_state.connect_info->keepalive){
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
NODE_DBG("\r\nMQTT: Send keepalive packet\r\n");
mqtt_message_t* temp_msg = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection);
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) );
mqtt_send_if_possible(mud->pesp_conn);
if (mud->keepalive_sent) {
// Oh dear -- keepalive timer expired and still no ack of previous message
mqtt_socket_reconnected(mud->pesp_conn, 0);
} else {
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
NODE_DBG("\r\nMQTT: Send keepalive packet\r\n");
mqtt_message_t* temp_msg = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection);
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) );
mud->keepalive_sent = 1;
mud->keep_alive_tick = 0; // Need to reset to zero in case flow control stopped.
mqtt_send_if_possible(mud->pesp_conn);
}
}
}
}
@ -675,6 +684,7 @@ static int mqtt_socket_client( lua_State* L )
// create a object
mud = (lmqtt_userdata *)lua_newuserdata(L, sizeof(lmqtt_userdata));
c_memset(mud, 0, sizeof(*mud));
// pre-initialize it, in case of errors
mud->self_ref = LUA_NOREF;
mud->cb_connect_ref = LUA_NOREF;
@ -685,18 +695,8 @@ static int mqtt_socket_client( lua_State* L )
mud->cb_suback_ref = LUA_NOREF;
mud->cb_unsuback_ref = LUA_NOREF;
mud->cb_puback_ref = LUA_NOREF;
mud->pesp_conn = NULL;
#ifdef CLIENT_SSL_ENABLE
mud->secure = 0;
#endif
mud->keep_alive_tick = 0;
mud->event_timeout = 0;
mud->connState = MQTT_INIT;
mud->connected = false;
c_memset(&mud->mqttTimer, 0, sizeof(ETSTimer));
c_memset(&mud->mqtt_state, 0, sizeof(mqtt_state_t));
c_memset(&mud->connect_info, 0, sizeof(mqtt_connect_info_t));
// set its metatable
luaL_getmetatable(L, "mqtt.socket");
@ -761,7 +761,7 @@ static int mqtt_socket_client( lua_State* L )
c_free(mud->connect_info.password);
mud->connect_info.password = NULL;
}
return luaL_error(L, "not enough memory");
return luaL_error(L, "not enough memory");
}
c_memcpy(mud->connect_info.client_id, clientId, idl);
@ -819,8 +819,8 @@ static int mqtt_delete( lua_State* L )
// ---- alloc-ed in mqtt_socket_lwt()
if(mud->connect_info.will_topic){
c_free(mud->connect_info.will_topic);
mud->connect_info.will_topic = NULL;
c_free(mud->connect_info.will_topic);
mud->connect_info.will_topic = NULL;
}
if(mud->connect_info.will_message){