fix mqtt, do a format when wrong flash size is detected

This commit is contained in:
funshine 2015-04-03 00:51:02 +08:00
parent 32e062f523
commit 1c2ee75a00
9 changed files with 214 additions and 131 deletions

View File

@ -243,7 +243,7 @@ m:subscribe("/topic",0, function(conn) print("subscribe success") end)
-- publish a message with data = hello, QoS = 0, retain = 0 -- publish a message with data = hello, QoS = 0, retain = 0
m:publish("/topic","hello",0,0, function(conn) print("sent") end) m:publish("/topic","hello",0,0, function(conn) print("sent") end)
m:close(); -- if auto-reconnect = 1, will reconnect. m:close(); -- if auto-reconnect == 1, will disable auto-reconnect and then disconnect from host.
-- you can call m:connect again -- you can call m:connect again
``` ```

View File

@ -7,6 +7,6 @@
#define NODE_VERSION_INTERNAL 0U #define NODE_VERSION_INTERNAL 0U
#define NODE_VERSION "NodeMCU 0.9.5" #define NODE_VERSION "NodeMCU 0.9.5"
#define BUILD_DATE "build 20150331" #define BUILD_DATE "build 20150403"
#endif /* __USER_VERSION_H__ */ #endif /* __USER_VERSION_H__ */

View File

@ -17,8 +17,6 @@
#include "mqtt_msg.h" #include "mqtt_msg.h"
#include "msg_queue.h" #include "msg_queue.h"
static lua_State *gL = NULL;
#define MQTT_BUF_SIZE 1024 #define MQTT_BUF_SIZE 1024
#define MQTT_DEFAULT_KEEPALIVE 60 #define MQTT_DEFAULT_KEEPALIVE 60
#define MQTT_MAX_CLIENT_LEN 64 #define MQTT_MAX_CLIENT_LEN 64
@ -50,18 +48,16 @@ typedef struct mqtt_state_t
int auto_reconnect; int auto_reconnect;
mqtt_connect_info_t* connect_info; mqtt_connect_info_t* connect_info;
uint8_t* in_buffer; uint8_t* in_buffer;
uint8_t* out_buffer;
int in_buffer_length; int in_buffer_length;
int out_buffer_length;
uint16_t message_length; uint16_t message_length;
uint16_t message_length_read; uint16_t message_length_read;
mqtt_message_t* outbound_message;
mqtt_connection_t mqtt_connection; mqtt_connection_t mqtt_connection;
msg_queue_t* pending_msg_q; msg_queue_t* pending_msg_q;
} mqtt_state_t; } mqtt_state_t;
typedef struct lmqtt_userdata typedef struct lmqtt_userdata
{ {
lua_State *L;
struct espconn *pesp_conn; struct espconn *pesp_conn;
int self_ref; int self_ref;
int cb_connect_ref; int cb_connect_ref;
@ -80,6 +76,8 @@ typedef struct lmqtt_userdata
}lmqtt_userdata; }lmqtt_userdata;
static void socket_connect(struct espconn *pesp_conn); static void socket_connect(struct espconn *pesp_conn);
static void mqtt_socket_reconnected(void *arg, sint8_t err);
static void mqtt_socket_connected(void *arg);
static void mqtt_socket_disconnected(void *arg) // tcp only static void mqtt_socket_disconnected(void *arg) // tcp only
{ {
@ -94,33 +92,47 @@ static void mqtt_socket_disconnected(void *arg) // tcp only
os_timer_disarm(&mud->mqttTimer); os_timer_disarm(&mud->mqttTimer);
if(mud->pesp_conn){
mud->pesp_conn->reverse = NULL;
if(mud->pesp_conn->proto.tcp)
c_free(mud->pesp_conn->proto.tcp);
mud->pesp_conn->proto.tcp = NULL;
c_free(mud->pesp_conn);
mud->pesp_conn = NULL;
}
if(mud->connected){ // call back only called when socket is from connection to disconnection. if(mud->connected){ // call back only called when socket is from connection to disconnection.
mud->connected = false; mud->connected = false;
if((mud->cb_disconnect_ref != LUA_NOREF) && (mud->self_ref != LUA_NOREF)) { if((mud->L != NULL) && (mud->cb_disconnect_ref != LUA_NOREF) && (mud->self_ref != LUA_NOREF)) {
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref);
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua
call_back = true; call_back = true;
} }
} }
lua_gc(gL, LUA_GCSTOP, 0); if(mud->mqtt_state.auto_reconnect){
if(mud->self_ref != LUA_NOREF){ // TODO: should we unref the client and delete it? mud->pesp_conn->reverse = mud;
luaL_unref(gL, LUA_REGISTRYINDEX, mud->self_ref); mud->pesp_conn->type = ESPCONN_TCP;
mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self mud->pesp_conn->state = ESPCONN_NONE;
} mud->connected = false;
lua_gc(gL, LUA_GCRESTART, 0); mud->pesp_conn->proto.tcp->remote_port = mud->mqtt_state.port;
mud->pesp_conn->proto.tcp->local_port = espconn_port();
espconn_regist_connectcb(mud->pesp_conn, mqtt_socket_connected);
espconn_regist_reconcb(mud->pesp_conn, mqtt_socket_reconnected);
socket_connect(pesp_conn);
} else {
if(mud->pesp_conn){
mud->pesp_conn->reverse = NULL;
if(mud->pesp_conn->proto.tcp)
c_free(mud->pesp_conn->proto.tcp);
mud->pesp_conn->proto.tcp = NULL;
c_free(mud->pesp_conn);
mud->pesp_conn = NULL;
}
if(call_back){ if(mud->L == NULL)
lua_call(gL, 1, 0); return;
lua_gc(mud->L, LUA_GCSTOP, 0);
if(mud->self_ref != LUA_NOREF){ // TODO: should we unref the client and delete it?
luaL_unref(mud->L, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self
}
lua_gc(mud->L, LUA_GCRESTART, 0);
}
if((mud->L != NULL) && call_back){
lua_call(mud->L, 1, 0);
} }
NODE_DBG("leave mqtt_socket_disconnected.\n"); NODE_DBG("leave mqtt_socket_disconnected.\n");
@ -137,11 +149,11 @@ static void mqtt_socket_reconnected(void *arg, sint8_t err)
if(mud == NULL) if(mud == NULL)
return; return;
pesp_conn->proto.tcp->remote_port = mud->mqtt_state.port;
pesp_conn->proto.tcp->local_port = espconn_port();
os_timer_disarm(&mud->mqttTimer); os_timer_disarm(&mud->mqttTimer);
if( (mud->event_timeout != 0) || mud->mqtt_state.auto_reconnect ){
if(mud->mqtt_state.auto_reconnect){
pesp_conn->proto.tcp->remote_port = mud->mqtt_state.port;
pesp_conn->proto.tcp->local_port = espconn_port();
socket_connect(pesp_conn); socket_connect(pesp_conn);
} else { } else {
mqtt_socket_disconnected(arg); mqtt_socket_disconnected(arg);
@ -152,6 +164,8 @@ static void mqtt_socket_reconnected(void *arg, sint8_t err)
static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length) static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length)
{ {
NODE_DBG("enter deliver_publish.\n"); NODE_DBG("enter deliver_publish.\n");
if(mud == NULL)
return;
const char comma[] = ","; const char comma[] = ",";
mqtt_event_data_t event_data; mqtt_event_data_t event_data;
@ -165,15 +179,17 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length)
return; return;
if(mud->self_ref == LUA_NOREF) if(mud->self_ref == LUA_NOREF)
return; return;
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_message_ref); if(mud->L == NULL)
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua return;
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_message_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_pushlstring(gL, event_data.topic, event_data.topic_length); lua_pushlstring(mud->L, event_data.topic, event_data.topic_length);
if(event_data.data_length > 0){ if(event_data.data_length > 0){
lua_pushlstring(gL, event_data.data, event_data.data_length); lua_pushlstring(mud->L, event_data.data, event_data.data_length);
lua_call(gL, 3, 0); lua_call(mud->L, 3, 0);
} else { } else {
lua_call(gL, 2, 0); lua_call(mud->L, 2, 0);
} }
NODE_DBG("leave deliver_publish.\n"); NODE_DBG("leave deliver_publish.\n");
} }
@ -199,7 +215,9 @@ READPACKET:
return; return;
c_memcpy(mud->mqtt_state.in_buffer, pdata, len); c_memcpy(mud->mqtt_state.in_buffer, pdata, len);
mud->mqtt_state.outbound_message = NULL; uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = NULL;
switch(mud->connState){ switch(mud->connState){
case MQTT_CONNECT_SENDING: case MQTT_CONNECT_SENDING:
case MQTT_CONNECT_SENT: case MQTT_CONNECT_SENT:
@ -214,13 +232,15 @@ READPACKET:
mud->connState = MQTT_DATA; mud->connState = MQTT_DATA;
NODE_DBG("MQTT: Connected\r\n"); NODE_DBG("MQTT: Connected\r\n");
if(mud->cb_connect_ref == LUA_NOREF) if(mud->cb_connect_ref == LUA_NOREF)
return; break;
if(mud->self_ref == LUA_NOREF) if(mud->self_ref == LUA_NOREF)
return; break;
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_connect_ref); if(mud->L == NULL)
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua break;
lua_call(gL, 1, 0); lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_ref);
return; lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua
lua_call(mud->L, 1, 0);
break;
} }
break; break;
@ -248,9 +268,11 @@ READPACKET:
break; break;
if (mud->self_ref == LUA_NOREF) if (mud->self_ref == LUA_NOREF)
break; break;
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_suback_ref); if(mud->L == NULL)
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); break;
lua_call(gL, 1, 0); lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_suback_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref);
lua_call(mud->L, 1, 0);
} }
break; break;
case MQTT_MSG_TYPE_UNSUBACK: case MQTT_MSG_TYPE_UNSUBACK:
@ -261,14 +283,14 @@ READPACKET:
break; break;
case MQTT_MSG_TYPE_PUBLISH: case MQTT_MSG_TYPE_PUBLISH:
if(msg_qos == 1){ if(msg_qos == 1){
mud->mqtt_state.outbound_message = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id); temp_msg = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBACK, 0 ); msg_id, MQTT_MSG_TYPE_PUBACK, (int)mqtt_get_qos(temp_msg->data) );
} }
else if(msg_qos == 2){ else if(msg_qos == 2){
mud->mqtt_state.outbound_message = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id); temp_msg = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBREC, 0 ); msg_id, MQTT_MSG_TYPE_PUBREC, (int)mqtt_get_qos(temp_msg->data) );
} }
if(msg_qos == 1 || msg_qos == 2){ if(msg_qos == 1 || msg_qos == 2){
NODE_DBG("MQTT: Queue response QoS: %d\r\n", msg_qos); NODE_DBG("MQTT: Queue response QoS: %d\r\n", msg_qos);
@ -283,9 +305,11 @@ READPACKET:
break; break;
if(mud->self_ref == LUA_NOREF) if(mud->self_ref == LUA_NOREF)
break; break;
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_puback_ref); if(mud->L == NULL)
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua break;
lua_call(gL, 1, 0); lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_call(mud->L, 1, 0);
} }
break; break;
@ -294,18 +318,18 @@ READPACKET:
NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n"); NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n");
// Note: actrually, should not destroy the msg until PUBCOMP is received. // Note: actrually, should not destroy the msg until PUBCOMP is received.
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
mud->mqtt_state.outbound_message = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id); temp_msg = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBREL, 1 ); msg_id, MQTT_MSG_TYPE_PUBREL, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("MQTT: Response PUBREL\r\n"); NODE_DBG("MQTT: Response PUBREL\r\n");
} }
break; break;
case MQTT_MSG_TYPE_PUBREL: case MQTT_MSG_TYPE_PUBREL:
if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg->msg_id == msg_id){ if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg->msg_id == msg_id){
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
mud->mqtt_state.outbound_message = mqtt_msg_pubcomp(&mud->mqtt_state.mqtt_connection, msg_id); temp_msg = mqtt_msg_pubcomp(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBCOMP, 0 ); msg_id, MQTT_MSG_TYPE_PUBCOMP, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("MQTT: Response PUBCOMP\r\n"); NODE_DBG("MQTT: Response PUBCOMP\r\n");
} }
break; break;
@ -317,15 +341,17 @@ READPACKET:
break; break;
if(mud->self_ref == LUA_NOREF) if(mud->self_ref == LUA_NOREF)
break; break;
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_puback_ref); if(mud->L == NULL)
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua break;
lua_call(gL, 1, 0); lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_call(mud->L, 1, 0);
} }
break; break;
case MQTT_MSG_TYPE_PINGREQ: case MQTT_MSG_TYPE_PINGREQ:
mud->mqtt_state.outbound_message = mqtt_msg_pingresp(&mud->mqtt_state.mqtt_connection); temp_msg = mqtt_msg_pingresp(&mud->mqtt_state.mqtt_connection);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PINGRESP, 1 ); msg_id, MQTT_MSG_TYPE_PINGRESP, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("MQTT: Response PINGRESP\r\n"); NODE_DBG("MQTT: Response PINGRESP\r\n");
break; break;
case MQTT_MSG_TYPE_PINGRESP: case MQTT_MSG_TYPE_PINGRESP:
@ -361,7 +387,6 @@ READPACKET:
else else
espconn_sent( pesp_conn, node->msg.data, node->msg.length ); espconn_sent( pesp_conn, node->msg.data, node->msg.length );
mud->keep_alive_tick = 0; mud->keep_alive_tick = 0;
mud->mqtt_state.outbound_message = NULL;
} }
NODE_DBG("receive, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("receive, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
NODE_DBG("leave mqtt_socket_received.\n"); NODE_DBG("leave mqtt_socket_received.\n");
@ -379,12 +404,14 @@ static void mqtt_socket_sent(void *arg)
return; return;
if(!mud->connected) if(!mud->connected)
return; return;
if(mud->connState == MQTT_CONNECT_SENDING){
mud->connState = MQTT_CONNECT_SENT;
}
// call mqtt_sent() // call mqtt_sent()
mud->event_timeout = 0; mud->event_timeout = 0;
if(mud->connState == MQTT_CONNECT_SENDING){
mud->connState = MQTT_CONNECT_SENT;
// MQTT_CONNECT not queued.
return;
}
NODE_DBG("sent1, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("sent1, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
// qos = 0, publish and forgot. // qos = 0, publish and forgot.
msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q)); msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q));
@ -394,9 +421,11 @@ static void mqtt_socket_sent(void *arg)
return; return;
if(mud->self_ref == LUA_NOREF) if(mud->self_ref == LUA_NOREF)
return; return;
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_puback_ref); if(mud->L == NULL)
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua return;
lua_call(gL, 1, 0); lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_call(mud->L, 1, 0);
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK && node->publish_qos == 1) { } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK && node->publish_qos == 1) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) { } else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) {
@ -422,16 +451,18 @@ static void mqtt_socket_connected(void *arg)
espconn_regist_sentcb(pesp_conn, mqtt_socket_sent); espconn_regist_sentcb(pesp_conn, mqtt_socket_sent);
espconn_regist_disconcb(pesp_conn, mqtt_socket_disconnected); espconn_regist_disconcb(pesp_conn, mqtt_socket_disconnected);
uint8_t temp_buffer[MQTT_BUF_SIZE];
// call mqtt_connect() to start a mqtt connect stage. // call mqtt_connect() to start a mqtt connect stage.
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.out_buffer, mud->mqtt_state.out_buffer_length); mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mud->mqtt_state.outbound_message = mqtt_msg_connect(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.connect_info); mqtt_message_t* temp_msg = mqtt_msg_connect(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.connect_info);
NODE_DBG("Send MQTT connection infomation, data len: %d, d[0]=%d \r\n", mud->mqtt_state.outbound_message->length, mud->mqtt_state.outbound_message->data[0]); NODE_DBG("Send MQTT connection infomation, data len: %d, d[0]=%d \r\n", temp_msg->length, temp_msg->data[0]);
mud->event_timeout = MQTT_SEND_TIMEOUT; mud->event_timeout = MQTT_SEND_TIMEOUT;
// not queue this message. should send right now. or should enqueue this before head.
if(mud->secure) if(mud->secure)
espconn_secure_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); espconn_secure_sent(pesp_conn, temp_msg->data, temp_msg->length);
else else
espconn_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); espconn_sent(pesp_conn, temp_msg->data, temp_msg->length);
mud->mqtt_state.outbound_message = NULL;
mud->connState = MQTT_CONNECT_SENDING; mud->connState = MQTT_CONNECT_SENDING;
NODE_DBG("leave mqtt_socket_connected.\n"); NODE_DBG("leave mqtt_socket_connected.\n");
return; return;
@ -444,6 +475,12 @@ void mqtt_socket_timer(void *arg)
if(mud == NULL) if(mud == NULL)
return; return;
if(mud->pesp_conn == NULL){
NODE_DBG("mud->pesp_conn is NULL.\n");
os_timer_disarm(&mud->mqttTimer);
return;
}
NODE_DBG("timer, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("timer, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
if(mud->event_timeout > 0){ if(mud->event_timeout > 0){
NODE_DBG("event_timeout: %d.\n", mud->event_timeout); NODE_DBG("event_timeout: %d.\n", mud->event_timeout);
@ -451,20 +488,19 @@ void mqtt_socket_timer(void *arg)
if(mud->event_timeout > 0){ if(mud->event_timeout > 0){
return; return;
} else { } else {
NODE_DBG("event timeout. \n"); NODE_DBG("event timeout. \n");
if(mud->connState == MQTT_DATA)
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
// should remove the head of the queue and re-send with DUP = 1 // should remove the head of the queue and re-send with DUP = 1
// Not implemented yet. // Not implemented yet.
} }
} }
if(mud->pesp_conn == NULL){
NODE_DBG("mud->pesp_conn is NULL.\n");
return;
}
if(mud->connState == MQTT_INIT){ // socket connect time out. if(mud->connState == MQTT_INIT){ // socket connect time out.
NODE_DBG("Can not connect to broker.\n"); NODE_DBG("Can not connect to broker.\n");
// Never goes here.
} else if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out. } else if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out.
NODE_DBG("sSend MQTT_CONNECT failed.\n");
mud->connState = MQTT_INIT; mud->connState = MQTT_INIT;
if(mud->secure) if(mud->secure)
espconn_secure_disconnect(mud->pesp_conn); espconn_secure_disconnect(mud->pesp_conn);
@ -488,12 +524,17 @@ void mqtt_socket_timer(void *arg)
mud->keep_alive_tick ++; mud->keep_alive_tick ++;
if(mud->keep_alive_tick > mud->mqtt_state.connect_info->keepalive){ if(mud->keep_alive_tick > mud->mqtt_state.connect_info->keepalive){
mud->event_timeout = MQTT_SEND_TIMEOUT; mud->event_timeout = MQTT_SEND_TIMEOUT;
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
NODE_DBG("\r\nMQTT: Send keepalive packet\r\n"); NODE_DBG("\r\nMQTT: Send keepalive packet\r\n");
mud->mqtt_state.outbound_message = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection); mqtt_message_t* temp_msg = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection);
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) );
// only one message in queue, send immediately.
if(mud->secure) if(mud->secure)
espconn_secure_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); espconn_secure_sent(mud->pesp_conn, temp_msg->data, temp_msg->length);
else else
espconn_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); espconn_sent(mud->pesp_conn, temp_msg->data, temp_msg->length);
mud->keep_alive_tick = 0; mud->keep_alive_tick = 0;
} }
} }
@ -523,6 +564,7 @@ static int mqtt_socket_client( lua_State* L )
// create a object // create a object
mud = (lmqtt_userdata *)lua_newuserdata(L, sizeof(lmqtt_userdata)); mud = (lmqtt_userdata *)lua_newuserdata(L, sizeof(lmqtt_userdata));
// pre-initialize it, in case of errors // pre-initialize it, in case of errors
mud->L = NULL;
mud->self_ref = LUA_NOREF; mud->self_ref = LUA_NOREF;
mud->cb_connect_ref = LUA_NOREF; mud->cb_connect_ref = LUA_NOREF;
mud->cb_disconnect_ref = LUA_NOREF; mud->cb_disconnect_ref = LUA_NOREF;
@ -545,7 +587,7 @@ static int mqtt_socket_client( lua_State* L )
luaL_getmetatable(L, "mqtt.socket"); luaL_getmetatable(L, "mqtt.socket");
lua_setmetatable(L, -2); lua_setmetatable(L, -2);
gL = L; // global L for mqtt module. mud->L = L; // L for mqtt module.
if( lua_isstring(L,stack) ) // deal with the clientid string if( lua_isstring(L,stack) ) // deal with the clientid string
{ {
@ -584,8 +626,7 @@ static int mqtt_socket_client( lua_State* L )
mud->connect_info.username = (uint8_t *)c_zalloc(unl + 1); mud->connect_info.username = (uint8_t *)c_zalloc(unl + 1);
mud->connect_info.password = (uint8_t *)c_zalloc(pwl + 1); mud->connect_info.password = (uint8_t *)c_zalloc(pwl + 1);
mud->mqtt_state.in_buffer = (uint8_t *)c_zalloc(MQTT_BUF_SIZE); mud->mqtt_state.in_buffer = (uint8_t *)c_zalloc(MQTT_BUF_SIZE);
mud->mqtt_state.out_buffer = (uint8_t *)c_zalloc(MQTT_BUF_SIZE); if(!mud->connect_info.client_id || !mud->connect_info.username || !mud->connect_info.password || !mud->mqtt_state.in_buffer){
if(!mud->connect_info.client_id || !mud->connect_info.username || !mud->connect_info.password || !mud->mqtt_state.in_buffer || !mud->mqtt_state.out_buffer){
if(mud->connect_info.client_id) { if(mud->connect_info.client_id) {
c_free(mud->connect_info.client_id); c_free(mud->connect_info.client_id);
mud->connect_info.client_id = NULL; mud->connect_info.client_id = NULL;
@ -601,10 +642,6 @@ static int mqtt_socket_client( lua_State* L )
if(mud->mqtt_state.in_buffer) { if(mud->mqtt_state.in_buffer) {
c_free(mud->mqtt_state.in_buffer); c_free(mud->mqtt_state.in_buffer);
mud->mqtt_state.in_buffer = NULL; mud->mqtt_state.in_buffer = NULL;
}
if(mud->mqtt_state.out_buffer) {
c_free(mud->mqtt_state.out_buffer);
mud->mqtt_state.out_buffer = NULL;
} }
return luaL_error(L, "not enough memory"); return luaL_error(L, "not enough memory");
} }
@ -624,7 +661,6 @@ static int mqtt_socket_client( lua_State* L )
mud->connect_info.keepalive = keepalive; mud->connect_info.keepalive = keepalive;
mud->mqtt_state.in_buffer_length = MQTT_BUF_SIZE; mud->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;
mud->mqtt_state.out_buffer_length = MQTT_BUF_SIZE;
mud->mqtt_state.pending_msg_q = NULL; mud->mqtt_state.pending_msg_q = NULL;
mud->mqtt_state.auto_reconnect = 1; mud->mqtt_state.auto_reconnect = 1;
mud->mqtt_state.port = 1883; mud->mqtt_state.port = 1883;
@ -690,10 +726,6 @@ static int mqtt_delete( lua_State* L )
c_free(mud->mqtt_state.in_buffer); c_free(mud->mqtt_state.in_buffer);
mud->mqtt_state.in_buffer = NULL; mud->mqtt_state.in_buffer = NULL;
} }
if(mud->mqtt_state.out_buffer){
c_free(mud->mqtt_state.out_buffer);
mud->mqtt_state.out_buffer = NULL;
}
// ------- // -------
// free (unref) callback ref // free (unref) callback ref
@ -717,12 +749,12 @@ static int mqtt_delete( lua_State* L )
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref); luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
mud->cb_puback_ref = LUA_NOREF; mud->cb_puback_ref = LUA_NOREF;
} }
lua_gc(gL, LUA_GCSTOP, 0); lua_gc(L, LUA_GCSTOP, 0);
if(LUA_NOREF!=mud->self_ref){ if(LUA_NOREF!=mud->self_ref){
luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref); luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = LUA_NOREF; mud->self_ref = LUA_NOREF;
} }
lua_gc(gL, LUA_GCRESTART, 0); lua_gc(L, LUA_GCRESTART, 0);
NODE_DBG("leave mqtt_delete.\n"); NODE_DBG("leave mqtt_delete.\n");
return 0; return 0;
} }
@ -736,6 +768,8 @@ static void socket_connect(struct espconn *pesp_conn)
if(mud == NULL) if(mud == NULL)
return; return;
mud->event_timeout = MQTT_CONNECT_TIMEOUT;
mud->connState = MQTT_INIT;
if(mud->secure) if(mud->secure)
espconn_secure_connect(pesp_conn); espconn_secure_connect(pesp_conn);
else else
@ -906,8 +940,6 @@ static int mqtt_socket_connect( lua_State* L )
os_timer_disarm(&mud->mqttTimer); os_timer_disarm(&mud->mqttTimer);
os_timer_setfn(&mud->mqttTimer, (os_timer_func_t *)mqtt_socket_timer, mud); os_timer_setfn(&mud->mqttTimer, (os_timer_func_t *)mqtt_socket_timer, mud);
// timer started in socket_connect() // timer started in socket_connect()
mud->event_timeout = MQTT_CONNECT_TIMEOUT;
mud->connState = MQTT_INIT;
if((ipaddr.addr == IPADDR_NONE) && (c_memcmp(domain,"255.255.255.255",16) != 0)) if((ipaddr.addr == IPADDR_NONE) && (c_memcmp(domain,"255.255.255.255",16) != 0))
{ {
@ -943,6 +975,7 @@ static int mqtt_socket_close( lua_State* L )
return 0; return 0;
// call mqtt_disconnect() // call mqtt_disconnect()
mud->mqtt_state.auto_reconnect = 0; // stop auto reconnect.
if(mud->secure){ if(mud->secure){
if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port) if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port)
@ -1012,32 +1045,48 @@ static int mqtt_socket_subscribe( lua_State* L ) {
luaL_argcheck( L, mud, stack, "mqtt.socket expected" ); luaL_argcheck( L, mud, stack, "mqtt.socket expected" );
stack++; stack++;
if(mud==NULL){
NODE_DBG("userdata is nil.\n");
lua_pushboolean(L, 0);
return 1;
}
if(mud->pesp_conn == NULL){
NODE_DBG("mud->pesp_conn is NULL.\n");
lua_pushboolean(L, 0);
return 1;
}
if(!mud->connected){ if(!mud->connected){
luaL_error( L, "not connected" ); luaL_error( L, "not connected" );
lua_pushboolean(L, 0); lua_pushboolean(L, 0);
return 1; return 1;
} }
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = NULL;
if( lua_istable( L, stack ) ) { if( lua_istable( L, stack ) ) {
NODE_DBG("subscribe table\n"); NODE_DBG("subscribe table\n");
lua_pushnil( L ); /* first key */ lua_pushnil( L ); /* first key */
uint8_t temp_buffer[MQTT_BUF_SIZE]; uint8_t temp_buf[MQTT_BUF_SIZE];
uint32_t temp_pos = 0; uint32_t temp_pos = 0;
while( lua_next( L, stack ) != 0 ) { while( lua_next( L, stack ) != 0 ) {
topic = luaL_checkstring( L, -2 ); topic = luaL_checkstring( L, -2 );
qos = luaL_checkinteger( L, -1 ); qos = luaL_checkinteger( L, -1 );
mud->mqtt_state.outbound_message = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id ); temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id );
NODE_DBG("topic: %s - qos: %d, length: %d\n", topic, qos, mud->mqtt_state.outbound_message->length); NODE_DBG("topic: %s - qos: %d, length: %d\n", topic, qos, temp_msg->length);
if (temp_pos + mud->mqtt_state.outbound_message->length > MQTT_BUF_SIZE){ if (temp_pos + temp_msg->length > MQTT_BUF_SIZE){
lua_pop(L, 1); lua_pop(L, 1);
break; // too long message for the outbuffer. break; // too long message for the outbuffer.
} }
c_memcpy( temp_buffer + temp_pos, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length ); c_memcpy( temp_buf + temp_pos, temp_msg->data, temp_msg->length );
temp_pos += mud->mqtt_state.outbound_message->length; temp_pos += temp_msg->length;
lua_pop( L, 1 ); lua_pop( L, 1 );
} }
@ -1048,9 +1097,9 @@ static int mqtt_socket_subscribe( lua_State* L ) {
return 1; return 1;
} }
c_memcpy( mud->mqtt_state.out_buffer, temp_buffer, temp_pos ); c_memcpy( temp_buffer, temp_buf, temp_pos );
mud->mqtt_state.outbound_message->data = mud->mqtt_state.out_buffer; temp_msg->data = temp_buffer;
mud->mqtt_state.outbound_message->length = temp_pos; temp_msg->length = temp_pos;
stack++; stack++;
} else { } else {
NODE_DBG("subscribe string\n"); NODE_DBG("subscribe string\n");
@ -1062,7 +1111,7 @@ static int mqtt_socket_subscribe( lua_State* L ) {
return 1; return 1;
} }
qos = luaL_checkinteger( L, stack ); qos = luaL_checkinteger( L, stack );
mud->mqtt_state.outbound_message = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id ); temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id );
stack++; stack++;
} }
@ -1073,8 +1122,8 @@ static int mqtt_socket_subscribe( lua_State* L ) {
mud->cb_suback_ref = luaL_ref( L, LUA_REGISTRYINDEX ); mud->cb_suback_ref = luaL_ref( L, LUA_REGISTRYINDEX );
} }
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(mud->mqtt_state.outbound_message->data) ); msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length); NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length);
@ -1093,7 +1142,6 @@ static int mqtt_socket_subscribe( lua_State* L ) {
} else { } else {
lua_pushboolean(L, 1); // enqueued succeed. lua_pushboolean(L, 1); // enqueued succeed.
} }
mud->mqtt_state.outbound_message = NULL;
NODE_DBG("subscribe, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("subscribe, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
NODE_DBG("leave mqtt_socket_subscribe.\n"); NODE_DBG("leave mqtt_socket_subscribe.\n");
return 1; return 1;
@ -1123,7 +1171,11 @@ static int mqtt_socket_publish( lua_State* L )
return 1; return 1;
} }
pesp_conn = mud->pesp_conn; if(!mud->connected){
luaL_error( L, "not connected" );
lua_pushboolean(L, 0);
return 1;
}
const char *topic = luaL_checklstring( L, stack, &l ); const char *topic = luaL_checklstring( L, stack, &l );
stack ++; stack ++;
@ -1140,7 +1192,9 @@ static int mqtt_socket_publish( lua_State* L )
uint8_t retain = luaL_checkinteger( L, stack); uint8_t retain = luaL_checkinteger( L, stack);
stack ++; stack ++;
mud->mqtt_state.outbound_message = mqtt_msg_publish(&mud->mqtt_state.mqtt_connection, uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = mqtt_msg_publish(&mud->mqtt_state.mqtt_connection,
topic, payload, l, topic, payload, l,
qos, retain, qos, retain,
&msg_id); &msg_id);
@ -1152,7 +1206,7 @@ static int mqtt_socket_publish( lua_State* L )
mud->cb_puback_ref = luaL_ref(L, LUA_REGISTRYINDEX); mud->cb_puback_ref = luaL_ref(L, LUA_REGISTRYINDEX);
} }
msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message, msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos ); msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos );
if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){ if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){
@ -1170,7 +1224,7 @@ static int mqtt_socket_publish( lua_State* L )
} else { } else {
lua_pushboolean(L, 1); // enqueued succeed. lua_pushboolean(L, 1); // enqueued succeed.
} }
mud->mqtt_state.outbound_message = NULL;
NODE_DBG("publish, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q))); NODE_DBG("publish, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
NODE_DBG("leave mqtt_socket_publish.\n"); NODE_DBG("leave mqtt_socket_publish.\n");
return 1; return 1;

View File

@ -71,6 +71,9 @@
#define fs_rename myspiffs_rename #define fs_rename myspiffs_rename
#define fs_size myspiffs_size #define fs_size myspiffs_size
#define fs_mount myspiffs_mount
#define fs_unmount myspiffs_unmount
#define FS_NAME_MAX_LENGTH SPIFFS_OBJ_NAME_LEN #define FS_NAME_MAX_LENGTH SPIFFS_OBJ_NAME_LEN
#endif #endif

View File

@ -42,7 +42,7 @@ The small 4KB sectors allow for greater flexibility in applications th
********************/ ********************/
void spiffs_mount() { void myspiffs_mount() {
spiffs_config cfg; spiffs_config cfg;
cfg.phys_addr = ( u32_t )platform_flash_get_first_free_block_address( NULL ); cfg.phys_addr = ( u32_t )platform_flash_get_first_free_block_address( NULL );
cfg.phys_addr += 0x3000; cfg.phys_addr += 0x3000;
@ -69,6 +69,10 @@ void spiffs_mount() {
NODE_DBG("mount res: %i\n", res); NODE_DBG("mount res: %i\n", res);
} }
void myspiffs_unmount() {
SPIFFS_unmount(&fs);
}
// FS formatting function // FS formatting function
// Returns 1 if OK, 0 for error // Returns 1 if OK, 0 for error
int myspiffs_format( void ) int myspiffs_format( void )
@ -85,7 +89,7 @@ int myspiffs_format( void )
while( sect_first <= sect_last ) while( sect_first <= sect_last )
if( platform_flash_erase_sector( sect_first ++ ) == PLATFORM_ERR ) if( platform_flash_erase_sector( sect_first ++ ) == PLATFORM_ERR )
return 0; return 0;
spiffs_mount(); myspiffs_mount();
return 1; return 1;
} }

View File

@ -477,6 +477,8 @@ u32_t SPIFFS_buffer_bytes_for_cache(spiffs *fs, u32_t num_pages);
#if SPIFFS_CACHE #if SPIFFS_CACHE
#endif #endif
void myspiffs_mount();
void myspiffs_unmount();
int myspiffs_open(const char *name, int flags); int myspiffs_open(const char *name, int flags);
int myspiffs_close( int fd ); int myspiffs_close( int fd );
size_t myspiffs_write( int fd, const void* ptr, size_t len ); size_t myspiffs_write( int fd, const void* ptr, size_t len );

View File

@ -44,6 +44,7 @@ INCLUDES += -I ../libc
INCLUDES += -I ../platform INCLUDES += -I ../platform
INCLUDES += -I ../lua INCLUDES += -I ../lua
INCLUDES += -I ../wofs INCLUDES += -I ../wofs
INCLUDES += -I ../spiffs
PDIR := ../$(PDIR) PDIR := ../$(PDIR)
sinclude $(PDIR)Makefile sinclude $(PDIR)Makefile

View File

@ -14,8 +14,7 @@
#include "c_stdlib.h" #include "c_stdlib.h"
#include "c_stdio.h" #include "c_stdio.h"
#include "romfs.h" #include "flash_fs.h"
#include "user_interface.h" #include "user_interface.h"
#include "ets_sys.h" #include "ets_sys.h"
@ -44,7 +43,6 @@ void task_init(void){
system_os_task(task_lua, USER_TASK_PRIO_0, taskQueue, TASK_QUEUE_LEN); system_os_task(task_lua, USER_TASK_PRIO_0, taskQueue, TASK_QUEUE_LEN);
} }
extern void spiffs_mount();
// extern void test_spiffs(); // extern void test_spiffs();
// extern int test_romfs(); // extern int test_romfs();
@ -69,7 +67,16 @@ void nodemcu_init(void)
// Flash init data at FLASHSIZE - 0x04000 Byte. // Flash init data at FLASHSIZE - 0x04000 Byte.
flash_init_data_default(); flash_init_data_default();
// Flash blank data at FLASHSIZE - 0x02000 Byte. // Flash blank data at FLASHSIZE - 0x02000 Byte.
flash_init_data_blank(); flash_init_data_blank();
if( !fs_format() )
{
NODE_ERR( "\ni*** ERROR ***: unable to format. FS might be compromised.\n" );
NODE_ERR( "It is advised to re-flash the NodeMCU image.\n" );
}
else{
NODE_ERR( "format done.\n" );
}
fs_unmount(); // mounted by format.
} }
#endif // defined(FLASH_SAFE_API) #endif // defined(FLASH_SAFE_API)
@ -94,7 +101,7 @@ void nodemcu_init(void)
// test_romfs(); // test_romfs();
#elif defined ( BUILD_SPIFFS ) #elif defined ( BUILD_SPIFFS )
spiffs_mount(); fs_mount();
// test_spiffs(); // test_spiffs();
#endif #endif
// endpoint_setup(); // endpoint_setup();

View File

@ -460,3 +460,15 @@ m:publish("/topic1","hello3",0,0) m:publish("/topic1","hello2",2,0)
m:subscribe("/topic2",2,function(m) print("sub done") end) m:subscribe("/topic2",2,function(m) print("sub done") end)
m:publish("/topic2","hello3",0,0) m:publish("/topic2","hello2",2,0) m:publish("/topic2","hello3",0,0) m:publish("/topic2","hello2",2,0)
m=mqtt.Client()
m:on("connect",function(m)
print("connection "..node.heap())
m:subscribe("/topic1",0,function(m) print("sub done") end)
m:publish("/topic1","hello3",0,0) m:publish("/topic1","hello2",2,0)
end )
m:on("offline", function(conn)
print("disconnect to broker...")
print(node.heap())
end)
m:connect("192.168.18.88",1883,0,1)