This commit is contained in:
HuangRui 2015-04-03 09:50:24 +08:00
commit 2859286e03
11 changed files with 286 additions and 140 deletions

View File

@ -36,6 +36,12 @@ Tencent QQ group: 309957875<br />
- cross compiler (done)
# Change log
2015-03-31<br />
polish mqtt module, add queue for mqtt module.<br />
add reconnect option to mqtt.connect api, :connect( host, port, secure, auto_reconnect, function(client) )<br />
move node.readvdd33 to adc.readvdd33.<br />
tools/esptool.py supported NodeMCU devkit automatic flash.
2015-03-18<br />
update u8glib.<br />
merge everything to master.
@ -237,7 +243,7 @@ m:subscribe("/topic",0, function(conn) print("subscribe success") end)
-- publish a message with data = hello, QoS = 0, retain = 0
m:publish("/topic","hello",0,0, function(conn) print("sent") end)
m:close(); -- if auto-reconnect = 1, will reconnect.
m:close(); -- if auto-reconnect == 1, will disable auto-reconnect and then disconnect from host.
-- you can call m:connect again
```

View File

@ -7,6 +7,6 @@
#define NODE_VERSION_INTERNAL 0U
#define NODE_VERSION "NodeMCU 0.9.5"
#define BUILD_DATE "build 20150331"
#define BUILD_DATE "build 20150403"
#endif /* __USER_VERSION_H__ */

View File

@ -17,8 +17,6 @@
#include "mqtt_msg.h"
#include "msg_queue.h"
static lua_State *gL = NULL;
#define MQTT_BUF_SIZE 1024
#define MQTT_DEFAULT_KEEPALIVE 60
#define MQTT_MAX_CLIENT_LEN 64
@ -50,18 +48,16 @@ typedef struct mqtt_state_t
int auto_reconnect;
mqtt_connect_info_t* connect_info;
uint8_t* in_buffer;
uint8_t* out_buffer;
int in_buffer_length;
int out_buffer_length;
uint16_t message_length;
uint16_t message_length_read;
mqtt_message_t* outbound_message;
mqtt_connection_t mqtt_connection;
msg_queue_t* pending_msg_q;
} mqtt_state_t;
typedef struct lmqtt_userdata
{
lua_State *L;
struct espconn *pesp_conn;
int self_ref;
int cb_connect_ref;
@ -80,6 +76,8 @@ typedef struct lmqtt_userdata
}lmqtt_userdata;
static void socket_connect(struct espconn *pesp_conn);
static void mqtt_socket_reconnected(void *arg, sint8_t err);
static void mqtt_socket_connected(void *arg);
static void mqtt_socket_disconnected(void *arg) // tcp only
{
@ -94,33 +92,47 @@ static void mqtt_socket_disconnected(void *arg) // tcp only
os_timer_disarm(&mud->mqttTimer);
if(mud->pesp_conn){
mud->pesp_conn->reverse = NULL;
if(mud->pesp_conn->proto.tcp)
c_free(mud->pesp_conn->proto.tcp);
mud->pesp_conn->proto.tcp = NULL;
c_free(mud->pesp_conn);
mud->pesp_conn = NULL;
}
if(mud->connected){ // call back only called when socket is from connection to disconnection.
mud->connected = false;
if((mud->cb_disconnect_ref != LUA_NOREF) && (mud->self_ref != LUA_NOREF)) {
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_disconnect_ref);
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua
if((mud->L != NULL) && (mud->cb_disconnect_ref != LUA_NOREF) && (mud->self_ref != LUA_NOREF)) {
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua
call_back = true;
}
}
lua_gc(gL, LUA_GCSTOP, 0);
if(mud->self_ref != LUA_NOREF){ // TODO: should we unref the client and delete it?
luaL_unref(gL, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self
}
lua_gc(gL, LUA_GCRESTART, 0);
if(mud->mqtt_state.auto_reconnect){
mud->pesp_conn->reverse = mud;
mud->pesp_conn->type = ESPCONN_TCP;
mud->pesp_conn->state = ESPCONN_NONE;
mud->connected = false;
mud->pesp_conn->proto.tcp->remote_port = mud->mqtt_state.port;
mud->pesp_conn->proto.tcp->local_port = espconn_port();
espconn_regist_connectcb(mud->pesp_conn, mqtt_socket_connected);
espconn_regist_reconcb(mud->pesp_conn, mqtt_socket_reconnected);
socket_connect(pesp_conn);
} else {
if(mud->pesp_conn){
mud->pesp_conn->reverse = NULL;
if(mud->pesp_conn->proto.tcp)
c_free(mud->pesp_conn->proto.tcp);
mud->pesp_conn->proto.tcp = NULL;
c_free(mud->pesp_conn);
mud->pesp_conn = NULL;
}
if(call_back){
lua_call(gL, 1, 0);
if(mud->L == NULL)
return;
lua_gc(mud->L, LUA_GCSTOP, 0);
if(mud->self_ref != LUA_NOREF){ // TODO: should we unref the client and delete it?
luaL_unref(mud->L, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self
}
lua_gc(mud->L, LUA_GCRESTART, 0);
}
if((mud->L != NULL) && call_back){
lua_call(mud->L, 1, 0);
}
NODE_DBG("leave mqtt_socket_disconnected.\n");
@ -137,11 +149,11 @@ static void mqtt_socket_reconnected(void *arg, sint8_t err)
if(mud == NULL)
return;
pesp_conn->proto.tcp->remote_port = mud->mqtt_state.port;
pesp_conn->proto.tcp->local_port = espconn_port();
os_timer_disarm(&mud->mqttTimer);
if( (mud->event_timeout != 0) || mud->mqtt_state.auto_reconnect ){
if(mud->mqtt_state.auto_reconnect){
pesp_conn->proto.tcp->remote_port = mud->mqtt_state.port;
pesp_conn->proto.tcp->local_port = espconn_port();
socket_connect(pesp_conn);
} else {
mqtt_socket_disconnected(arg);
@ -152,6 +164,8 @@ static void mqtt_socket_reconnected(void *arg, sint8_t err)
static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length)
{
NODE_DBG("enter deliver_publish.\n");
if(mud == NULL)
return;
const char comma[] = ",";
mqtt_event_data_t event_data;
@ -165,15 +179,17 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length)
return;
if(mud->self_ref == LUA_NOREF)
return;
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_message_ref);
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
if(mud->L == NULL)
return;
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_message_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_pushlstring(gL, event_data.topic, event_data.topic_length);
lua_pushlstring(mud->L, event_data.topic, event_data.topic_length);
if(event_data.data_length > 0){
lua_pushlstring(gL, event_data.data, event_data.data_length);
lua_call(gL, 3, 0);
lua_pushlstring(mud->L, event_data.data, event_data.data_length);
lua_call(mud->L, 3, 0);
} else {
lua_call(gL, 2, 0);
lua_call(mud->L, 2, 0);
}
NODE_DBG("leave deliver_publish.\n");
}
@ -199,7 +215,9 @@ READPACKET:
return;
c_memcpy(mud->mqtt_state.in_buffer, pdata, len);
mud->mqtt_state.outbound_message = NULL;
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = NULL;
switch(mud->connState){
case MQTT_CONNECT_SENDING:
case MQTT_CONNECT_SENT:
@ -214,13 +232,15 @@ READPACKET:
mud->connState = MQTT_DATA;
NODE_DBG("MQTT: Connected\r\n");
if(mud->cb_connect_ref == LUA_NOREF)
return;
break;
if(mud->self_ref == LUA_NOREF)
return;
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_connect_ref);
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua
lua_call(gL, 1, 0);
return;
break;
if(mud->L == NULL)
break;
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_connect_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua
lua_call(mud->L, 1, 0);
break;
}
break;
@ -231,7 +251,7 @@ READPACKET:
msg_qos = mqtt_get_qos(mud->mqtt_state.in_buffer);
msg_id = mqtt_get_id(mud->mqtt_state.in_buffer, mud->mqtt_state.in_buffer_length);
msg_queue_t *pending_msg = mud->mqtt_state.pending_msg_q;
msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q));
NODE_DBG("MQTT_DATA: type: %d, qos: %d, msg_id: %d, pending_id: %d\r\n",
msg_type,
@ -248,9 +268,11 @@ READPACKET:
break;
if (mud->self_ref == LUA_NOREF)
break;
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_suback_ref);
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref);
lua_call(gL, 1, 0);
if(mud->L == NULL)
break;
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_suback_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref);
lua_call(mud->L, 1, 0);
}
break;
case MQTT_MSG_TYPE_UNSUBACK:
@ -261,14 +283,14 @@ READPACKET:
break;
case MQTT_MSG_TYPE_PUBLISH:
if(msg_qos == 1){
mud->mqtt_state.outbound_message = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message,
msg_id, MQTT_MSG_TYPE_PUBACK, (int)msg_qos );
temp_msg = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBACK, (int)mqtt_get_qos(temp_msg->data) );
}
else if(msg_qos == 2){
mud->mqtt_state.outbound_message = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message,
msg_id, MQTT_MSG_TYPE_PUBREC, (int)msg_qos );
temp_msg = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBREC, (int)mqtt_get_qos(temp_msg->data) );
}
if(msg_qos == 1 || msg_qos == 2){
NODE_DBG("MQTT: Queue response QoS: %d\r\n", msg_qos);
@ -283,41 +305,53 @@ READPACKET:
break;
if(mud->self_ref == LUA_NOREF)
break;
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_puback_ref);
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_call(gL, 1, 0);
if(mud->L == NULL)
break;
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_call(mud->L, 1, 0);
}
break;
case MQTT_MSG_TYPE_PUBREC:
mud->mqtt_state.outbound_message = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message,
msg_id, MQTT_MSG_TYPE_PUBREL, (int)msg_qos );
if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){
NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n");
// Note: actrually, should not destroy the msg until PUBCOMP is received.
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
temp_msg = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBREL, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("MQTT: Response PUBREL\r\n");
}
break;
case MQTT_MSG_TYPE_PUBREL:
mud->mqtt_state.outbound_message = mqtt_msg_pubcomp(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message,
msg_id, MQTT_MSG_TYPE_PUBCOMP, (int)msg_qos );
if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg->msg_id == msg_id){
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
temp_msg = mqtt_msg_pubcomp(&mud->mqtt_state.mqtt_connection, msg_id);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBCOMP, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("MQTT: Response PUBCOMP\r\n");
}
break;
case MQTT_MSG_TYPE_PUBCOMP:
if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){
if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBREL && pending_msg->msg_id == msg_id){
NODE_DBG("MQTT: Publish with QoS = 2 successful\r\n");
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
if(mud->cb_puback_ref == LUA_NOREF)
break;
if(mud->self_ref == LUA_NOREF)
break;
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_puback_ref);
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_call(gL, 1, 0);
if(mud->L == NULL)
break;
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_call(mud->L, 1, 0);
}
break;
case MQTT_MSG_TYPE_PINGREQ:
mud->mqtt_state.outbound_message = mqtt_msg_pingresp(&mud->mqtt_state.mqtt_connection);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message,
msg_id, MQTT_MSG_TYPE_PINGRESP, (int)msg_qos );
temp_msg = mqtt_msg_pingresp(&mud->mqtt_state.mqtt_connection);
node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PINGRESP, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("MQTT: Response PINGRESP\r\n");
break;
case MQTT_MSG_TYPE_PINGRESP:
@ -345,7 +379,7 @@ READPACKET:
break;
}
if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->event_timeout == 0){
if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){
mud->event_timeout = MQTT_SEND_TIMEOUT;
NODE_DBG("Sent: %d\n", node->msg.length);
if( mud->secure )
@ -353,8 +387,8 @@ READPACKET:
else
espconn_sent( pesp_conn, node->msg.data, node->msg.length );
mud->keep_alive_tick = 0;
mud->mqtt_state.outbound_message = NULL;
}
NODE_DBG("receive, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
NODE_DBG("leave mqtt_socket_received.\n");
return;
}
@ -370,25 +404,36 @@ static void mqtt_socket_sent(void *arg)
return;
if(!mud->connected)
return;
if(mud->connState == MQTT_CONNECT_SENDING){
mud->connState = MQTT_CONNECT_SENT;
}
// call mqtt_sent()
mud->event_timeout = 0;
if(mud->connState == MQTT_CONNECT_SENDING){
mud->connState = MQTT_CONNECT_SENT;
// MQTT_CONNECT not queued.
return;
}
NODE_DBG("sent1, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
// qos = 0, publish and forgot.
msg_queue_t *node = mud->mqtt_state.pending_msg_q;
msg_queue_t *node = msg_peek(&(mud->mqtt_state.pending_msg_q));
if(node && node->msg_type == MQTT_MSG_TYPE_PUBLISH && node->publish_qos == 0) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
if(mud->cb_puback_ref == LUA_NOREF)
return;
if(mud->self_ref == LUA_NOREF)
return;
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_puback_ref);
lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_call(gL, 1, 0);
if(mud->L == NULL)
return;
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
lua_rawgeti(mud->L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua
lua_call(mud->L, 1, 0);
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBACK && node->publish_qos == 1) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
} else if(node && node->msg_type == MQTT_MSG_TYPE_PUBCOMP) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
} else if(node && node->msg_type == MQTT_MSG_TYPE_PINGRESP) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
}
NODE_DBG("sent2, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
NODE_DBG("leave mqtt_socket_sent.\n");
}
@ -406,16 +451,18 @@ static void mqtt_socket_connected(void *arg)
espconn_regist_sentcb(pesp_conn, mqtt_socket_sent);
espconn_regist_disconcb(pesp_conn, mqtt_socket_disconnected);
uint8_t temp_buffer[MQTT_BUF_SIZE];
// call mqtt_connect() to start a mqtt connect stage.
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.out_buffer, mud->mqtt_state.out_buffer_length);
mud->mqtt_state.outbound_message = mqtt_msg_connect(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.connect_info);
NODE_DBG("Send MQTT connection infomation, data len: %d, d[0]=%d \r\n", mud->mqtt_state.outbound_message->length, mud->mqtt_state.outbound_message->data[0]);
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t* temp_msg = mqtt_msg_connect(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.connect_info);
NODE_DBG("Send MQTT connection infomation, data len: %d, d[0]=%d \r\n", temp_msg->length, temp_msg->data[0]);
mud->event_timeout = MQTT_SEND_TIMEOUT;
// not queue this message. should send right now. or should enqueue this before head.
if(mud->secure)
espconn_secure_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length);
espconn_secure_sent(pesp_conn, temp_msg->data, temp_msg->length);
else
espconn_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length);
mud->mqtt_state.outbound_message = NULL;
espconn_sent(pesp_conn, temp_msg->data, temp_msg->length);
mud->connState = MQTT_CONNECT_SENDING;
NODE_DBG("leave mqtt_socket_connected.\n");
return;
@ -423,27 +470,37 @@ static void mqtt_socket_connected(void *arg)
void mqtt_socket_timer(void *arg)
{
// NODE_DBG("enter mqtt_socket_timer.\n");
NODE_DBG("enter mqtt_socket_timer.\n");
lmqtt_userdata *mud = (lmqtt_userdata*) arg;
if(mud == NULL)
return;
if(mud->pesp_conn == NULL){
NODE_DBG("mud->pesp_conn is NULL.\n");
os_timer_disarm(&mud->mqttTimer);
return;
}
NODE_DBG("timer, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
if(mud->event_timeout > 0){
NODE_DBG("event_timeout: %d.\n", mud->event_timeout);
mud->event_timeout --;
if(mud->event_timeout > 0){
return;
} else {
NODE_DBG("event timeout. \n");
if(mud->connState == MQTT_DATA)
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
// should remove the head of the queue and re-send with DUP = 1
// Not implemented yet.
}
}
if(mud->pesp_conn == NULL){
NODE_DBG("mud->pesp_conn is NULL.\n");
return;
}
if(mud->connState == MQTT_INIT){ // socket connect time out.
NODE_DBG("Can not connect to broker.\n");
// Never goes here.
} else if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out.
NODE_DBG("sSend MQTT_CONNECT failed.\n");
mud->connState = MQTT_INIT;
if(mud->secure)
espconn_secure_disconnect(mud->pesp_conn);
@ -453,7 +510,7 @@ void mqtt_socket_timer(void *arg)
} else if(mud->connState == MQTT_CONNECT_SENT){ // wait for CONACK time out.
NODE_DBG("MQTT_CONNECT failed.\n");
} else if(mud->connState == MQTT_DATA){
msg_queue_t *pending_msg = mud->mqtt_state.pending_msg_q;
msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q));
if(pending_msg){
mud->event_timeout = MQTT_SEND_TIMEOUT;
if(mud->secure)
@ -467,17 +524,22 @@ void mqtt_socket_timer(void *arg)
mud->keep_alive_tick ++;
if(mud->keep_alive_tick > mud->mqtt_state.connect_info->keepalive){
mud->event_timeout = MQTT_SEND_TIMEOUT;
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
NODE_DBG("\r\nMQTT: Send keepalive packet\r\n");
mud->mqtt_state.outbound_message = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection);
mqtt_message_t* temp_msg = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection);
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) );
// only one message in queue, send immediately.
if(mud->secure)
espconn_secure_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length);
espconn_secure_sent(mud->pesp_conn, temp_msg->data, temp_msg->length);
else
espconn_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length);
espconn_sent(mud->pesp_conn, temp_msg->data, temp_msg->length);
mud->keep_alive_tick = 0;
}
}
}
// NODE_DBG("leave mqtt_socket_timer.\n");
NODE_DBG("leave mqtt_socket_timer.\n");
}
// Lua: mqtt.Client(clientid, keepalive, user, pass)
@ -502,6 +564,7 @@ static int mqtt_socket_client( lua_State* L )
// create a object
mud = (lmqtt_userdata *)lua_newuserdata(L, sizeof(lmqtt_userdata));
// pre-initialize it, in case of errors
mud->L = NULL;
mud->self_ref = LUA_NOREF;
mud->cb_connect_ref = LUA_NOREF;
mud->cb_disconnect_ref = LUA_NOREF;
@ -524,7 +587,7 @@ static int mqtt_socket_client( lua_State* L )
luaL_getmetatable(L, "mqtt.socket");
lua_setmetatable(L, -2);
gL = L; // global L for mqtt module.
mud->L = L; // L for mqtt module.
if( lua_isstring(L,stack) ) // deal with the clientid string
{
@ -563,8 +626,7 @@ static int mqtt_socket_client( lua_State* L )
mud->connect_info.username = (uint8_t *)c_zalloc(unl + 1);
mud->connect_info.password = (uint8_t *)c_zalloc(pwl + 1);
mud->mqtt_state.in_buffer = (uint8_t *)c_zalloc(MQTT_BUF_SIZE);
mud->mqtt_state.out_buffer = (uint8_t *)c_zalloc(MQTT_BUF_SIZE);
if(!mud->connect_info.client_id || !mud->connect_info.username || !mud->connect_info.password || !mud->mqtt_state.in_buffer || !mud->mqtt_state.out_buffer){
if(!mud->connect_info.client_id || !mud->connect_info.username || !mud->connect_info.password || !mud->mqtt_state.in_buffer){
if(mud->connect_info.client_id) {
c_free(mud->connect_info.client_id);
mud->connect_info.client_id = NULL;
@ -580,10 +642,6 @@ static int mqtt_socket_client( lua_State* L )
if(mud->mqtt_state.in_buffer) {
c_free(mud->mqtt_state.in_buffer);
mud->mqtt_state.in_buffer = NULL;
}
if(mud->mqtt_state.out_buffer) {
c_free(mud->mqtt_state.out_buffer);
mud->mqtt_state.out_buffer = NULL;
}
return luaL_error(L, "not enough memory");
}
@ -603,7 +661,6 @@ static int mqtt_socket_client( lua_State* L )
mud->connect_info.keepalive = keepalive;
mud->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;
mud->mqtt_state.out_buffer_length = MQTT_BUF_SIZE;
mud->mqtt_state.pending_msg_q = NULL;
mud->mqtt_state.auto_reconnect = 1;
mud->mqtt_state.port = 1883;
@ -669,10 +726,6 @@ static int mqtt_delete( lua_State* L )
c_free(mud->mqtt_state.in_buffer);
mud->mqtt_state.in_buffer = NULL;
}
if(mud->mqtt_state.out_buffer){
c_free(mud->mqtt_state.out_buffer);
mud->mqtt_state.out_buffer = NULL;
}
// -------
// free (unref) callback ref
@ -696,12 +749,12 @@ static int mqtt_delete( lua_State* L )
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
mud->cb_puback_ref = LUA_NOREF;
}
lua_gc(gL, LUA_GCSTOP, 0);
lua_gc(L, LUA_GCSTOP, 0);
if(LUA_NOREF!=mud->self_ref){
luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref);
mud->self_ref = LUA_NOREF;
}
lua_gc(gL, LUA_GCRESTART, 0);
lua_gc(L, LUA_GCRESTART, 0);
NODE_DBG("leave mqtt_delete.\n");
return 0;
}
@ -715,6 +768,8 @@ static void socket_connect(struct espconn *pesp_conn)
if(mud == NULL)
return;
mud->event_timeout = MQTT_CONNECT_TIMEOUT;
mud->connState = MQTT_INIT;
if(mud->secure)
espconn_secure_connect(pesp_conn);
else
@ -885,8 +940,6 @@ static int mqtt_socket_connect( lua_State* L )
os_timer_disarm(&mud->mqttTimer);
os_timer_setfn(&mud->mqttTimer, (os_timer_func_t *)mqtt_socket_timer, mud);
// timer started in socket_connect()
mud->event_timeout = MQTT_CONNECT_TIMEOUT;
mud->connState = MQTT_INIT;
if((ipaddr.addr == IPADDR_NONE) && (c_memcmp(domain,"255.255.255.255",16) != 0))
{
@ -922,6 +975,7 @@ static int mqtt_socket_close( lua_State* L )
return 0;
// call mqtt_disconnect()
mud->mqtt_state.auto_reconnect = 0; // stop auto reconnect.
if(mud->secure){
if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port)
@ -991,32 +1045,48 @@ static int mqtt_socket_subscribe( lua_State* L ) {
luaL_argcheck( L, mud, stack, "mqtt.socket expected" );
stack++;
if(mud==NULL){
NODE_DBG("userdata is nil.\n");
lua_pushboolean(L, 0);
return 1;
}
if(mud->pesp_conn == NULL){
NODE_DBG("mud->pesp_conn is NULL.\n");
lua_pushboolean(L, 0);
return 1;
}
if(!mud->connected){
luaL_error( L, "not connected" );
lua_pushboolean(L, 0);
return 1;
}
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = NULL;
if( lua_istable( L, stack ) ) {
NODE_DBG("subscribe table\n");
lua_pushnil( L ); /* first key */
uint8_t temp_buffer[MQTT_BUF_SIZE];
uint8_t temp_buf[MQTT_BUF_SIZE];
uint32_t temp_pos = 0;
while( lua_next( L, stack ) != 0 ) {
topic = luaL_checkstring( L, -2 );
qos = luaL_checkinteger( L, -1 );
mud->mqtt_state.outbound_message = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id );
NODE_DBG("topic: %s - qos: %d, length: %d\n", topic, qos, mud->mqtt_state.outbound_message->length);
temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id );
NODE_DBG("topic: %s - qos: %d, length: %d\n", topic, qos, temp_msg->length);
if (temp_pos + mud->mqtt_state.outbound_message->length > MQTT_BUF_SIZE){
if (temp_pos + temp_msg->length > MQTT_BUF_SIZE){
lua_pop(L, 1);
break; // too long message for the outbuffer.
}
c_memcpy( temp_buffer + temp_pos, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length );
temp_pos += mud->mqtt_state.outbound_message->length;
c_memcpy( temp_buf + temp_pos, temp_msg->data, temp_msg->length );
temp_pos += temp_msg->length;
lua_pop( L, 1 );
}
@ -1027,9 +1097,9 @@ static int mqtt_socket_subscribe( lua_State* L ) {
return 1;
}
c_memcpy( mud->mqtt_state.out_buffer, temp_buffer, temp_pos );
mud->mqtt_state.outbound_message->data = mud->mqtt_state.out_buffer;
mud->mqtt_state.outbound_message->length = temp_pos;
c_memcpy( temp_buffer, temp_buf, temp_pos );
temp_msg->data = temp_buffer;
temp_msg->length = temp_pos;
stack++;
} else {
NODE_DBG("subscribe string\n");
@ -1041,7 +1111,7 @@ static int mqtt_socket_subscribe( lua_State* L ) {
return 1;
}
qos = luaL_checkinteger( L, stack );
mud->mqtt_state.outbound_message = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id );
temp_msg = mqtt_msg_subscribe( &mud->mqtt_state.mqtt_connection, topic, qos, &msg_id );
stack++;
}
@ -1052,12 +1122,12 @@ static int mqtt_socket_subscribe( lua_State* L ) {
mud->cb_suback_ref = luaL_ref( L, LUA_REGISTRYINDEX );
}
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message,
msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(mud->mqtt_state.outbound_message->data) );
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length);
if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->event_timeout == 0){
if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){
mud->event_timeout = MQTT_SEND_TIMEOUT;
NODE_DBG("Sent: %d\n", node->msg.length);
if( mud->secure )
@ -1072,7 +1142,7 @@ static int mqtt_socket_subscribe( lua_State* L ) {
} else {
lua_pushboolean(L, 1); // enqueued succeed.
}
mud->mqtt_state.outbound_message = NULL;
NODE_DBG("subscribe, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
NODE_DBG("leave mqtt_socket_subscribe.\n");
return 1;
}
@ -1101,7 +1171,11 @@ static int mqtt_socket_publish( lua_State* L )
return 1;
}
pesp_conn = mud->pesp_conn;
if(!mud->connected){
luaL_error( L, "not connected" );
lua_pushboolean(L, 0);
return 1;
}
const char *topic = luaL_checklstring( L, stack, &l );
stack ++;
@ -1118,7 +1192,9 @@ static int mqtt_socket_publish( lua_State* L )
uint8_t retain = luaL_checkinteger( L, stack);
stack ++;
mud->mqtt_state.outbound_message = mqtt_msg_publish(&mud->mqtt_state.mqtt_connection,
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = mqtt_msg_publish(&mud->mqtt_state.mqtt_connection,
topic, payload, l,
qos, retain,
&msg_id);
@ -1130,10 +1206,10 @@ static int mqtt_socket_publish( lua_State* L )
mud->cb_puback_ref = luaL_ref(L, LUA_REGISTRYINDEX);
}
msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), mud->mqtt_state.outbound_message,
msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos );
if(node && (mud->mqtt_state.pending_msg_q->next == NULL) && mud->event_timeout == 0){
if(node && (1==msg_size(&(mud->mqtt_state.pending_msg_q))) && mud->event_timeout == 0){
mud->event_timeout = MQTT_SEND_TIMEOUT;
NODE_DBG("Sent: %d\n", node->msg.length);
if( mud->secure )
@ -1148,7 +1224,8 @@ static int mqtt_socket_publish( lua_State* L )
} else {
lua_pushboolean(L, 1); // enqueued succeed.
}
mud->mqtt_state.outbound_message = NULL;
NODE_DBG("publish, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
NODE_DBG("leave mqtt_socket_publish.\n");
return 1;
}

View File

@ -58,3 +58,25 @@ msg_queue_t * msg_dequeue(msg_queue_t **head){
node->next = NULL;
return node;
}
msg_queue_t * msg_peek(msg_queue_t **head){
if(!head || !*head){
return NULL;
}
return *head; // fetch head.
}
int msg_size(msg_queue_t **head){
if(!head || !*head){
return 0;
}
int i = 1;
msg_queue_t *tail = *head;
if(tail){
while(tail->next!=NULL){
tail = tail->next;
i++;
}
}
return i;
}

View File

@ -18,6 +18,8 @@ typedef struct msg_queue_t {
msg_queue_t * msg_enqueue(msg_queue_t **head, mqtt_message_t *msg, uint16_t msg_id, int msg_type, int publish_qos);
void msg_destroy(msg_queue_t *node);
msg_queue_t * msg_dequeue(msg_queue_t **head);
msg_queue_t * msg_peek(msg_queue_t **head);
int msg_size(msg_queue_t **head);
#ifdef __cplusplus
}

View File

@ -71,6 +71,9 @@
#define fs_rename myspiffs_rename
#define fs_size myspiffs_size
#define fs_mount myspiffs_mount
#define fs_unmount myspiffs_unmount
#define FS_NAME_MAX_LENGTH SPIFFS_OBJ_NAME_LEN
#endif

View File

@ -42,7 +42,7 @@ The small 4KB sectors allow for greater flexibility in applications th
********************/
void spiffs_mount() {
void myspiffs_mount() {
spiffs_config cfg;
cfg.phys_addr = ( u32_t )platform_flash_get_first_free_block_address( NULL );
cfg.phys_addr += 0x3000;
@ -69,6 +69,10 @@ void spiffs_mount() {
NODE_DBG("mount res: %i\n", res);
}
void myspiffs_unmount() {
SPIFFS_unmount(&fs);
}
// FS formatting function
// Returns 1 if OK, 0 for error
int myspiffs_format( void )
@ -85,7 +89,7 @@ int myspiffs_format( void )
while( sect_first <= sect_last )
if( platform_flash_erase_sector( sect_first ++ ) == PLATFORM_ERR )
return 0;
spiffs_mount();
myspiffs_mount();
return 1;
}

View File

@ -477,6 +477,8 @@ u32_t SPIFFS_buffer_bytes_for_cache(spiffs *fs, u32_t num_pages);
#if SPIFFS_CACHE
#endif
void myspiffs_mount();
void myspiffs_unmount();
int myspiffs_open(const char *name, int flags);
int myspiffs_close( int fd );
size_t myspiffs_write( int fd, const void* ptr, size_t len );

View File

@ -44,6 +44,7 @@ INCLUDES += -I ../libc
INCLUDES += -I ../platform
INCLUDES += -I ../lua
INCLUDES += -I ../wofs
INCLUDES += -I ../spiffs
PDIR := ../$(PDIR)
sinclude $(PDIR)Makefile

View File

@ -14,8 +14,7 @@
#include "c_stdlib.h"
#include "c_stdio.h"
#include "romfs.h"
#include "flash_fs.h"
#include "user_interface.h"
#include "ets_sys.h"
@ -44,7 +43,6 @@ void task_init(void){
system_os_task(task_lua, USER_TASK_PRIO_0, taskQueue, TASK_QUEUE_LEN);
}
extern void spiffs_mount();
// extern void test_spiffs();
// extern int test_romfs();
@ -69,7 +67,16 @@ void nodemcu_init(void)
// Flash init data at FLASHSIZE - 0x04000 Byte.
flash_init_data_default();
// Flash blank data at FLASHSIZE - 0x02000 Byte.
flash_init_data_blank();
flash_init_data_blank();
if( !fs_format() )
{
NODE_ERR( "\ni*** ERROR ***: unable to format. FS might be compromised.\n" );
NODE_ERR( "It is advised to re-flash the NodeMCU image.\n" );
}
else{
NODE_ERR( "format done.\n" );
}
fs_unmount(); // mounted by format.
}
#endif // defined(FLASH_SAFE_API)
@ -94,7 +101,7 @@ void nodemcu_init(void)
// test_romfs();
#elif defined ( BUILD_SPIFFS )
spiffs_mount();
fs_mount();
// test_spiffs();
#endif
// endpoint_setup();

View File

@ -418,7 +418,7 @@ m:publish("/topic1","hello",0,0)
m:publish("/topic3","hello",0,0) m:publish("/topic4","hello",0,0)
m:publish("/topic1","hello1",0,0) m:publish("/topic2","hello2",0,0)
m:publish("/topic1","hello",1,0)
m:subscribe("/topic3",2,function(m) print("sub done") end)
m:subscribe("/topic3",0,function(m) print("sub done") end)
m:publish("/topic3","hello3",2,0)
m=mqtt.Client()
@ -450,3 +450,25 @@ end)
m:connect("192.168.18.88",1883)
m:close()
m=mqtt.Client()
m:connect("192.168.18.88",1883)
m:on("message",function(m,t,pl) print(t..":") if pl~=nil then print(pl) end end )
m:subscribe("/topic1",0,function(m) print("sub done") end)
m:publish("/topic1","hello3",2,0) m:publish("/topic1","hello2",2,0)
m:publish("/topic1","hello3",0,0) m:publish("/topic1","hello2",2,0)
m:subscribe("/topic2",2,function(m) print("sub done") end)
m:publish("/topic2","hello3",0,0) m:publish("/topic2","hello2",2,0)
m=mqtt.Client()
m:on("connect",function(m)
print("connection "..node.heap())
m:subscribe("/topic1",0,function(m) print("sub done") end)
m:publish("/topic1","hello3",0,0) m:publish("/topic1","hello2",2,0)
end )
m:on("offline", function(conn)
print("disconnect to broker...")
print(node.heap())
end)
m:connect("192.168.18.88",1883,0,1)