diff --git a/README.md b/README.md index 87a4ce80..1bb17511 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,8 @@ Flash tool for NodeMCU [nodemcu-flasher](https://github.com/nodemcu/nodemcu-flas wiki: [nodemcu wiki](https://github.com/nodemcu/nodemcu-firmware/wiki)
home: [nodemcu.com](http://www.nodemcu.com)
-bbs: [中文论坛Chinese bbs](http://bbs.nodemcu.com)
-Tencent QQ group QQ群: 309957875
+bbs: [Chinese bbs](http://bbs.nodemcu.com)
+Tencent QQ group: 309957875
# Summary - Easy to access wireless router @@ -26,6 +26,10 @@ Tencent QQ group QQ群: 309957875
- add coap module # Change log +2015-01-23
+merge mqtt branch to master.
+build pre_build bin. + 2015-01-18
merge mqtt module to [new branch mqtt](https://github.com/nodemcu/nodemcu-firmware/tree/mqtt) from [https://github.com/tuanpmt/esp_mqtt](https://github.com/tuanpmt/esp_mqtt).
merge spi module from iabdalkader:spi.
@@ -40,7 +44,7 @@ fix file.read() api, take 0xFF as a regular byte, not EOF.
pre_build/latest/nodemcu_512k_latest.bin is removed. use pre_build/latest/nodemcu_latest.bin instead. [more change log](https://github.com/nodemcu/nodemcu-firmware/wiki/nodemcu_api_en#change_log)
-[更多变更日志](https://github.com/nodemcu/nodemcu-firmware/wiki/nodemcu_api_cn#change_log) +[more change_log cn](https://github.com/nodemcu/nodemcu-firmware/wiki/nodemcu_api_cn#change_log) ##GPIO NEW TABLE ( Build 20141219 and later) @@ -186,6 +190,42 @@ baudrate:9600 .."Connection: keep-alive\r\nAccept: */*\r\n\r\n") ``` +####Connect to MQTT Broker + +```lua +-- init mqtt client with keepalive timer 120sec +m = mqtt.Client("clientid", 120, "user", "password") + +-- setup Last Will and Testament (optional) +-- Broker will publish a message with qos = 0, retain = 0, data = "offline" +-- to topic "/lwt" if client don't send keepalive packet +m:lwt("/lwt", "offline", 0, 0) + +m:on("connect", function(con) print ("connected") end) +m:on("offline", function(con) print ("offline") end) + +-- on publish message receive event +m:on("message", function(conn, topic, data) + print(topic .. ":" ) + if data ~= nil then + print(data) + end +end) + +-- for secure: m:connect("192.168.11.118", 1880, 1) +m:connect("192.168.11.118", 1880, 0, function(conn) print("connected") end) + +-- subscribe topic with qos = 0 +m:subscribe("/topic",0, function(conn) print("subscribe success") end) + +-- publish a message with data = hello, QoS = 0, retain = 0 +m:publish("/topic","hello",0,0, function(conn) print("sent") end) + +m:close(); +-- you can call m:connect again + +``` + ####Or a simple http server ```lua diff --git a/app/Makefile b/app/Makefile index 753595d6..584d4545 100644 --- a/app/Makefile +++ b/app/Makefile @@ -31,6 +31,7 @@ SUBDIRS= \ platform \ libc \ lua \ + mqtt \ smart \ wofs \ modules \ @@ -77,6 +78,7 @@ COMPONENTS_eagle.app.v6 = \ platform/libplatform.a \ libc/liblibc.a \ lua/liblua.a \ + mqtt/mqtt.a \ smart/smart.a \ wofs/wofs.a \ spiffs/spiffs.a \ diff --git a/app/include/user_config.h b/app/include/user_config.h index 6211f09a..1af91c0a 100644 --- a/app/include/user_config.h +++ b/app/include/user_config.h @@ -7,7 +7,7 @@ #define NODE_VERSION_INTERNAL 0U #define NODE_VERSION "NodeMCU 0.9.5" -#define BUILD_DATE "build 20150118" +#define BUILD_DATE "build 20150123" // #define FLASH_512K // #define FLASH_1M @@ -61,6 +61,7 @@ #define LUA_USE_MODULES_UART #define LUA_USE_MODULES_OW #define LUA_USE_MODULES_BIT +#define LUA_USE_MODULES_MQTT #endif /* LUA_USE_MODULES */ #define LUA_NUMBER_INTEGRAL diff --git a/app/modules/Makefile b/app/modules/Makefile index 1d9f19d9..fb170792 100644 --- a/app/modules/Makefile +++ b/app/modules/Makefile @@ -39,6 +39,7 @@ endif INCLUDES := $(INCLUDES) -I $(PDIR)include INCLUDES += -I ./ INCLUDES += -I ../libc +INCLUDES += -I ../mqtt INCLUDES += -I ../lua INCLUDES += -I ../platform INCLUDES += -I ../wofs diff --git a/app/modules/auxmods.h b/app/modules/auxmods.h index d8166643..6ab6ef46 100644 --- a/app/modules/auxmods.h +++ b/app/modules/auxmods.h @@ -61,6 +61,9 @@ LUALIB_API int ( luaopen_i2c )( lua_State *L ); #define AUXLIB_WIFI "wifi" LUALIB_API int ( luaopen_wifi )( lua_State *L ); +#define AUXLIB_MQTT "mqtt" +LUALIB_API int ( luaopen_mqtt )( lua_State *L ); + #define AUXLIB_NODE "node" LUALIB_API int ( luaopen_node )( lua_State *L ); diff --git a/app/modules/modules.h b/app/modules/modules.h index eda8e16d..f28775d5 100644 --- a/app/modules/modules.h +++ b/app/modules/modules.h @@ -37,6 +37,14 @@ #define ROM_MODULES_NET #endif +#if defined(LUA_USE_MODULES_MQTT) +#define MODULES_MQTT "mqtt" +#define ROM_MODULES_MQTT \ + _ROM(MODULES_MQTT, luaopen_mqtt, mqtt_map) +#else +#define ROM_MODULES_MQTT +#endif + #if defined(LUA_USE_MODULES_I2C) #define MODULES_I2C "i2c" #define ROM_MODULES_I2C \ @@ -113,6 +121,7 @@ ROM_MODULES_GPIO \ ROM_MODULES_PWM \ ROM_MODULES_WIFI \ + ROM_MODULES_MQTT \ ROM_MODULES_I2C \ ROM_MODULES_SPI \ ROM_MODULES_TMR \ diff --git a/app/modules/mqtt.c b/app/modules/mqtt.c new file mode 100644 index 00000000..332a9e87 --- /dev/null +++ b/app/modules/mqtt.c @@ -0,0 +1,1051 @@ +// Module for mqtt + +//#include "lua.h" +#include "lualib.h" +#include "lauxlib.h" +#include "platform.h" +#include "auxmods.h" +#include "lrotable.h" + +#include "c_string.h" +#include "c_stdlib.h" + +#include "c_types.h" +#include "mem.h" +#include "espconn.h" + +#include "mqtt_msg.h" + +static lua_State *gL = NULL; + +#define MQTT_BUF_SIZE 1024 +#define MQTT_DEFAULT_KEEPALIVE 60 +#define MQTT_MAX_CLIENT_LEN 64 +#define MQTT_MAX_USER_LEN 64 +#define MQTT_MAX_PASS_LEN 64 +#define MQTT_SEND_TIMEOUT 5 + +typedef enum { + MQTT_INIT, + MQTT_CONNECT_SEND, + MQTT_CONNECT_SENDING, + MQTT_DATA +} tConnState; + +typedef struct mqtt_event_data_t +{ + uint8_t type; + const char* topic; + const char* data; + uint16_t topic_length; + uint16_t data_length; + uint16_t data_offset; +} mqtt_event_data_t; + +typedef struct mqtt_state_t +{ + uint16_t port; + int auto_reconnect; + mqtt_connect_info_t* connect_info; + uint8_t* in_buffer; + uint8_t* out_buffer; + int in_buffer_length; + int out_buffer_length; + uint16_t message_length; + uint16_t message_length_read; + mqtt_message_t* outbound_message; + mqtt_connection_t mqtt_connection; + + uint16_t pending_msg_id; + int pending_msg_type; + int pending_publish_qos; +} mqtt_state_t; + +typedef struct lmqtt_userdata +{ + struct espconn *pesp_conn; + int self_ref; + int cb_connect_ref; + int cb_disconnect_ref; + int cb_message_ref; + int cb_suback_ref; + int cb_puback_ref; + mqtt_state_t mqtt_state; + mqtt_connect_info_t connect_info; + uint32_t keep_alive_tick; + uint32_t send_timeout; + uint8_t secure; + uint8_t connected; + ETSTimer mqttTimer; + tConnState connState; +}lmqtt_userdata; + +static void mqtt_socket_disconnected(void *arg) // tcp only +{ + NODE_DBG("mqtt_socket_disconnected is called.\n"); + struct espconn *pesp_conn = arg; + if(pesp_conn == NULL) + return; + lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse; + if(mud == NULL) + return; + if(mud->cb_disconnect_ref != LUA_NOREF && mud->self_ref != LUA_NOREF) + { + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua + lua_call(gL, 1, 0); + } + mud->connected = 0; + os_timer_disarm(&mud->mqttTimer); + + if(pesp_conn->proto.tcp) + c_free(pesp_conn->proto.tcp); + pesp_conn->proto.tcp = NULL; + if(mud->pesp_conn) + c_free(mud->pesp_conn); + mud->pesp_conn = NULL; // espconn is already disconnected + lua_gc(gL, LUA_GCSTOP, 0); + if(mud->self_ref != LUA_NOREF){ + luaL_unref(gL, LUA_REGISTRYINDEX, mud->self_ref); + mud->self_ref = LUA_NOREF; // unref this, and the mqtt.socket userdata will delete it self + } + lua_gc(gL, LUA_GCRESTART, 0); +} + +static void mqtt_socket_reconnected(void *arg, sint8_t err) +{ + NODE_DBG("mqtt_socket_reconnected is called.\n"); + mqtt_socket_disconnected(arg); +} + +static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length) +{ + const char comma[] = ","; + mqtt_event_data_t event_data; + + event_data.topic_length = length; + event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length); + + event_data.data_length = length; + event_data.data = mqtt_get_publish_data(message, &event_data.data_length); + + if(mud->cb_message_ref == LUA_NOREF) + return; + if(mud->self_ref == LUA_NOREF) + return; + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_message_ref); + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua + // expose_array(gL, pdata, len); + // *(pdata+len) = 0; + // NODE_DBG(pdata); + // NODE_DBG("\n"); + lua_pushlstring(gL, event_data.topic, event_data.topic_length); + if(event_data.data_length > 0){ + lua_pushlstring(gL, event_data.data, event_data.data_length); + lua_call(gL, 3, 0); + } else { + lua_call(gL, 2, 0); + } +} + +static void mqtt_socket_received(void *arg, char *pdata, unsigned short len) +{ + NODE_DBG("mqtt_socket_received is called.\n"); + + uint8_t msg_type; + uint8_t msg_qos; + uint16_t msg_id; + + struct espconn *pesp_conn = arg; + if(pesp_conn == NULL) + return; + lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse; + if(mud == NULL) + return; + +READPACKET: + if(len > MQTT_BUF_SIZE && len <= 0) + return; + + c_memcpy(mud->mqtt_state.in_buffer, pdata, len); + mud->mqtt_state.outbound_message = NULL; + switch(mud->connState){ + case MQTT_CONNECT_SENDING: + if(mqtt_get_type(mud->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK){ + NODE_DBG("MQTT: Invalid packet\r\n"); + mud->connState = MQTT_INIT; + if(mud->secure){ + espconn_secure_disconnect(pesp_conn); + } + else { + espconn_disconnect(pesp_conn); + } + } else { + mud->connState = MQTT_DATA; + NODE_DBG("MQTT: Connected\r\n"); + if(mud->cb_connect_ref == LUA_NOREF) + return; + if(mud->self_ref == LUA_NOREF) + return; + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_connect_ref); + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua + lua_call(gL, 1, 0); + return; + } + break; + + case MQTT_DATA: + mud->mqtt_state.message_length_read = len; + mud->mqtt_state.message_length = mqtt_get_total_length(mud->mqtt_state.in_buffer, mud->mqtt_state.message_length_read); + msg_type = mqtt_get_type(mud->mqtt_state.in_buffer); + msg_qos = mqtt_get_qos(mud->mqtt_state.in_buffer); + msg_id = mqtt_get_id(mud->mqtt_state.in_buffer, mud->mqtt_state.in_buffer_length); + + NODE_DBG("MQTT_DATA: type: %d, qos: %d, msg_id: %d, pending_id: %d\r\n", + msg_type, + msg_qos, + msg_id, + mud->mqtt_state.pending_msg_id); + switch(msg_type) + { + case MQTT_MSG_TYPE_SUBACK: + if(mud->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && mud->mqtt_state.pending_msg_id == msg_id) + NODE_DBG("MQTT: Subscribe successful\r\n"); + if (mud->cb_suback_ref == LUA_NOREF) + break; + if (mud->self_ref == LUA_NOREF) + break; + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_suback_ref); + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); + lua_call(gL, 1, 0); + break; + case MQTT_MSG_TYPE_UNSUBACK: + if(mud->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && mud->mqtt_state.pending_msg_id == msg_id) + NODE_DBG("MQTT: UnSubscribe successful\r\n"); + break; + case MQTT_MSG_TYPE_PUBLISH: + if(msg_qos == 1) + mud->mqtt_state.outbound_message = mqtt_msg_puback(&mud->mqtt_state.mqtt_connection, msg_id); + else if(msg_qos == 2) + mud->mqtt_state.outbound_message = mqtt_msg_pubrec(&mud->mqtt_state.mqtt_connection, msg_id); + + deliver_publish(mud, mud->mqtt_state.in_buffer, mud->mqtt_state.message_length_read); + break; + case MQTT_MSG_TYPE_PUBACK: + if(mud->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && mud->mqtt_state.pending_msg_id == msg_id){ + NODE_DBG("MQTT: Publish with QoS = 1 successful\r\n"); + if(mud->cb_puback_ref == LUA_NOREF) + break; + if(mud->self_ref == LUA_NOREF) + break; + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_puback_ref); + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua + lua_call(gL, 1, 0); + } + + break; + case MQTT_MSG_TYPE_PUBREC: + mud->mqtt_state.outbound_message = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id); + NODE_DBG("MQTT: Response PUBREL\r\n"); + break; + case MQTT_MSG_TYPE_PUBREL: + mud->mqtt_state.outbound_message = mqtt_msg_pubcomp(&mud->mqtt_state.mqtt_connection, msg_id); + NODE_DBG("MQTT: Response PUBCOMP\r\n"); + break; + case MQTT_MSG_TYPE_PUBCOMP: + if(mud->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && mud->mqtt_state.pending_msg_id == msg_id){ + NODE_DBG("MQTT: Publish with QoS = 2 successful\r\n"); + if(mud->cb_puback_ref == LUA_NOREF) + break; + if(mud->self_ref == LUA_NOREF) + break; + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_puback_ref); + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua + lua_call(gL, 1, 0); + } + break; + case MQTT_MSG_TYPE_PINGREQ: + mud->mqtt_state.outbound_message = mqtt_msg_pingresp(&mud->mqtt_state.mqtt_connection); + break; + case MQTT_MSG_TYPE_PINGRESP: + // Ignore + break; + } + // NOTE: this is done down here and not in the switch case above + // because the PSOCK_READBUF_LEN() won't work inside a switch + // statement due to the way protothreads resume. + if(msg_type == MQTT_MSG_TYPE_PUBLISH) + { + + len = mud->mqtt_state.message_length_read; + + if(mud->mqtt_state.message_length < mud->mqtt_state.message_length_read) + { + len -= mud->mqtt_state.message_length; + pdata += mud->mqtt_state.message_length; + + NODE_DBG("Get another published message\r\n"); + goto READPACKET; + } + } + break; + } + + if(mud->mqtt_state.outbound_message != NULL){ + if(mud->secure) + espconn_secure_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + else + espconn_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + mud->mqtt_state.outbound_message = NULL; + } + return; +} + +static void mqtt_socket_sent(void *arg) +{ + // NODE_DBG("mqtt_socket_sent is called.\n"); + struct espconn *pesp_conn = arg; + if(pesp_conn == NULL) + return; + lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse; + if(mud == NULL) + return; + if(!mud->connected) + return; + // call mqtt_sent() + mud->send_timeout = 0; + if(mud->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && mud->mqtt_state.pending_publish_qos == 0) { + if(mud->cb_puback_ref == LUA_NOREF) + return; + if(mud->self_ref == LUA_NOREF) + return; + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->cb_puback_ref); + lua_rawgeti(gL, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata to callback func in lua + lua_call(gL, 1, 0); + } +} + +static int mqtt_socket_client( lua_State* L ); +static void mqtt_socket_connected(void *arg) +{ + NODE_DBG("mqtt_socket_connected is called.\n"); + struct espconn *pesp_conn = arg; + if(pesp_conn == NULL) + return; + lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse; + if(mud == NULL) + return; + mud->connected = true; + espconn_regist_recvcb(pesp_conn, mqtt_socket_received); + espconn_regist_sentcb(pesp_conn, mqtt_socket_sent); + espconn_regist_disconcb(pesp_conn, mqtt_socket_disconnected); + + // call mqtt_connect() to start a mqtt connect stage. + mqtt_msg_init(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.out_buffer, mud->mqtt_state.out_buffer_length); + mud->mqtt_state.outbound_message = mqtt_msg_connect(&mud->mqtt_state.mqtt_connection, mud->mqtt_state.connect_info); + NODE_DBG("Send MQTT connection infomation, data len: %d, d[0]=%d \r\n", mud->mqtt_state.outbound_message->length, mud->mqtt_state.outbound_message->data[0]); + if(mud->secure){ + espconn_secure_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + } + else + { + espconn_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + } + mud->mqtt_state.outbound_message = NULL; + mud->connState = MQTT_CONNECT_SENDING; + return; +} + +void mqtt_socket_timer(void *arg) +{ + lmqtt_userdata *mud = (lmqtt_userdata*) arg; + + if(mud->connState == MQTT_DATA){ + mud->keep_alive_tick ++; + if(mud->keep_alive_tick > mud->mqtt_state.connect_info->keepalive){ + mud->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ; + mud->send_timeout = MQTT_SEND_TIMEOUT; + NODE_DBG("\r\nMQTT: Send keepalive packet\r\n"); + mud->mqtt_state.outbound_message = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection); + + if(mud->secure) + espconn_secure_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + else + espconn_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + mud->keep_alive_tick = 0; + } + } + if(mud->send_timeout > 0) + mud->send_timeout --; +} + +// Lua: mqtt.Client(clientid, keepalive, user, pass) +static int mqtt_socket_client( lua_State* L ) +{ + NODE_DBG("mqtt_socket_client is called.\n"); + + lmqtt_userdata *mud; + char tempid[20] = {0}; + c_sprintf(tempid, "%s%x", "NodeMCU_", system_get_chip_id() ); + NODE_DBG(tempid); + NODE_DBG("\n"); + size_t il = c_strlen(tempid); + const char *clientId = tempid, *username = NULL, *password = NULL; + int stack = 1; + unsigned secure = 0; + int top = lua_gettop(L); + + // create a object + mud = (lmqtt_userdata *)lua_newuserdata(L, sizeof(lmqtt_userdata)); + // pre-initialize it, in case of errors + mud->self_ref = LUA_NOREF; + mud->cb_connect_ref = LUA_NOREF; + mud->cb_disconnect_ref = LUA_NOREF; + + mud->cb_message_ref = LUA_NOREF; + mud->cb_suback_ref = LUA_NOREF; + mud->cb_puback_ref = LUA_NOREF; + mud->pesp_conn = NULL; + mud->secure = 0; + + mud->keep_alive_tick = 0; + mud->send_timeout = 0; + mud->connState = MQTT_INIT; + c_memset(&mud->mqttTimer, 0, sizeof(ETSTimer)); + c_memset(&mud->mqtt_state, 0, sizeof(mqtt_state_t)); + c_memset(&mud->connect_info, 0, sizeof(mqtt_connect_info_t)); + + // set its metatable + luaL_getmetatable(L, "mqtt.socket"); + lua_setmetatable(L, -2); + + if( lua_isstring(L,stack) ) // deal with the clientid string + { + clientId = luaL_checklstring( L, stack, &il ); + stack++; + } + + // TODO: check the zalloc result. + mud->connect_info.client_id = (uint8_t *)c_zalloc(il+1); + if(!mud->connect_info.client_id){ + return luaL_error(L, "not enough memory"); + } + c_memcpy(mud->connect_info.client_id, clientId, il); + mud->connect_info.client_id[il] = 0; + + mud->mqtt_state.in_buffer = (uint8_t *)c_zalloc(MQTT_BUF_SIZE); + if(!mud->mqtt_state.in_buffer){ + return luaL_error(L, "not enough memory"); + } + + mud->mqtt_state.out_buffer = (uint8_t *)c_zalloc(MQTT_BUF_SIZE); + if(!mud->mqtt_state.out_buffer){ + return luaL_error(L, "not enough memory"); + } + + mud->mqtt_state.in_buffer_length = MQTT_BUF_SIZE; + mud->mqtt_state.out_buffer_length = MQTT_BUF_SIZE; + + mud->connState = MQTT_INIT; + mud->connect_info.clean_session = 1; + mud->connect_info.will_qos = 0; + mud->connect_info.will_retain = 0; + mud->keep_alive_tick = 0; + mud->connect_info.keepalive = 0; + mud->mqtt_state.connect_info = &mud->connect_info; + + gL = L; // global L for mqtt module. + + if(lua_isnumber( L, stack )) + { + mud->connect_info.keepalive = luaL_checkinteger( L, stack); + stack++; + } + + if(mud->connect_info.keepalive == 0){ + mud->connect_info.keepalive = MQTT_DEFAULT_KEEPALIVE; + return 1; + } + + if(lua_isstring( L, stack )){ + username = luaL_checklstring( L, stack, &il ); + stack++; + } + if(username == NULL) + return 1; + mud->connect_info.username = (uint8_t *)c_zalloc(il + 1); + if(!mud->connect_info.username){ + return luaL_error(L, "not enough memory"); + } + + c_memcpy(mud->connect_info.username, username, il); + mud->connect_info.username[il] = 0; + + if(lua_isstring( L, stack )){ + password = luaL_checklstring( L, stack, &il ); + stack++; + } + if(password == NULL) + return 1; + mud->connect_info.password = (uint8_t *)c_zalloc(il + 1); + if(!mud->connect_info.password){ + return luaL_error(L, "not enough memory"); + } + + c_memcpy(mud->connect_info.password, password, il); + mud->connect_info.password[il] = 0; + + NODE_DBG("MQTT: Init info: %s, %s, %s\r\n", mud->connect_info.client_id, mud->connect_info.username, mud->connect_info.password); + + return 1; +} + +// Lua: mqtt.delete( socket ) +// call close() first +// socket: unref everything +static int mqtt_delete( lua_State* L ) +{ + NODE_DBG("mqtt_delete is called.\n"); + + lmqtt_userdata *mud = (lmqtt_userdata *)luaL_checkudata(L, 1, "mqtt.socket"); + luaL_argcheck(L, mud, 1, "mqtt.socket expected"); + if(mud==NULL){ + NODE_DBG("userdata is nil.\n"); + return 0; + } + + os_timer_disarm(&mud->mqttTimer); + mud->connected = 0; + if(mud->pesp_conn){ // for client connected to tcp server, this should set NULL in disconnect cb + mud->pesp_conn->reverse = NULL; + if(mud->pesp_conn->proto.tcp) + c_free(mud->pesp_conn->proto.tcp); + mud->pesp_conn->proto.tcp = NULL; + c_free(mud->pesp_conn); + mud->pesp_conn = NULL; // for socket, it will free this when disconnected + } + + if(mud->connect_info.will_topic){ + c_free(mud->connect_info.client_id); + } + + if(mud->connect_info.will_message){ + c_free(mud->connect_info.will_message); + } + + if(mud->connect_info.client_id) + c_free(mud->connect_info.client_id); + if(mud->connect_info.username) + c_free(mud->connect_info.username); + if(mud->connect_info.password) + c_free(mud->connect_info.password); + if(mud->mqtt_state.in_buffer) + c_free(mud->mqtt_state.in_buffer); + if(mud->mqtt_state.out_buffer) + c_free(mud->mqtt_state.out_buffer); + + // free (unref) callback ref + if(LUA_NOREF!=mud->cb_connect_ref){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref); + mud->cb_connect_ref = LUA_NOREF; + } + if(LUA_NOREF!=mud->cb_disconnect_ref){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); + mud->cb_disconnect_ref = LUA_NOREF; + } + if(LUA_NOREF!=mud->cb_message_ref){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_message_ref); + mud->cb_message_ref = LUA_NOREF; + } + if(LUA_NOREF!=mud->cb_suback_ref){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_suback_ref); + mud->cb_suback_ref = LUA_NOREF; + } + if(LUA_NOREF!=mud->cb_puback_ref){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref); + mud->cb_puback_ref = LUA_NOREF; + } + lua_gc(gL, LUA_GCSTOP, 0); + if(LUA_NOREF!=mud->self_ref){ + luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref); + mud->self_ref = LUA_NOREF; + } + lua_gc(gL, LUA_GCRESTART, 0); + return 0; +} + +static void socket_connect(struct espconn *pesp_conn) +{ + if(pesp_conn == NULL) + return; + lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse; + if(mud == NULL) + return; + + if(mud->secure){ + espconn_secure_connect(pesp_conn); + } + else + { + espconn_connect(pesp_conn); + } + + NODE_DBG("socket_connect is called.\n"); +} + +static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg); +static dns_reconn_count = 0; +static ip_addr_t host_ip; // for dns +static void socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) +{ + NODE_DBG("socket_dns_found is called.\n"); + struct espconn *pesp_conn = arg; + if(pesp_conn == NULL){ + NODE_DBG("pesp_conn null.\n"); + return; + } + + if(ipaddr == NULL) + { + dns_reconn_count++; + if( dns_reconn_count >= 5 ){ + NODE_ERR( "DNS Fail!\n" ); + // Note: should delete the pesp_conn or unref self_ref here. + mqtt_socket_disconnected(arg); // although not connected, but fire disconnect callback to release every thing. + return; + } + NODE_ERR( "DNS retry %d!\n", dns_reconn_count ); + host_ip.addr = 0; + espconn_gethostbyname(pesp_conn, name, &host_ip, socket_dns_found); + return; + } + + // ipaddr->addr is a uint32_t ip + if(ipaddr->addr != 0) + { + dns_reconn_count = 0; + c_memcpy(pesp_conn->proto.tcp->remote_ip, &(ipaddr->addr), 4); + NODE_DBG("TCP ip is set: "); + NODE_DBG(IPSTR, IP2STR(&(ipaddr->addr))); + NODE_DBG("\n"); + socket_connect(pesp_conn); + } +} + +// Lua: mqtt:connect( host, port, secure, function(client) ) +static int mqtt_socket_lwt( lua_State* L ) +{ + uint8_t stack = 1; + size_t topicSize, il; + NODE_DBG("mqtt_socket_lwt.\n"); + lmqtt_userdata *mud = NULL; + const char *lwtTopic, *lwtMsg; + uint8_t lwtQoS, lwtRetain; + + mud = (lmqtt_userdata *)luaL_checkudata( L, stack, "mqtt.socket" ); + luaL_argcheck( L, mud, stack, "mqtt.socket expected" ); + + if(mud == NULL) + return 0; + + stack++; + lwtTopic = luaL_checklstring( L, stack, &topicSize ); + if (lwtTopic == NULL) + { + return luaL_error( L, "need lwt topic"); + } + + stack++; + lwtMsg = luaL_checklstring( L, stack, &il ); + if (lwtMsg == NULL) + { + return luaL_error( L, "need lwt topic"); + } + + mud->connect_info.will_topic = (uint8_t*) c_zalloc( topicSize + 1 ); + if(!mud->connect_info.will_topic){ + return luaL_error( L, "not enough memory"); + } + c_memcpy(mud->connect_info.will_topic, lwtTopic, topicSize); + mud->connect_info.will_topic[topicSize] = 0; + + mud->connect_info.will_message = (uint8_t*) c_zalloc( il + 1 ); + if(!mud->connect_info.will_message){ + return luaL_error( L, "not enough memory"); + } + c_memcpy(mud->connect_info.will_message, lwtMsg, il); + mud->connect_info.will_message[il] = 0; + + + stack++; + mud->connect_info.will_qos = luaL_checkinteger( L, stack ); + + stack++; + mud->connect_info.will_retain = luaL_checkinteger( L, stack ); + + NODE_DBG("mqtt_socket_lwt: topic: %s, message: %s, qos: %d, retain: %d\n", + mud->connect_info.will_topic, + mud->connect_info.will_message, + mud->connect_info.will_qos, + mud->connect_info.will_retain); + return 0; +} + +// Lua: mqtt:connect( host, port, secure, function(client) ) +static int mqtt_socket_connect( lua_State* L ) +{ + NODE_DBG("mqtt_socket_connect is called.\n"); + lmqtt_userdata *mud = NULL; + unsigned port = 1883; + size_t il; + ip_addr_t ipaddr; + const char *domain; + int stack = 1; + unsigned secure = 0; + int top = lua_gettop(L); + + mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket"); + luaL_argcheck(L, mud, stack, "mqtt.socket expected"); + stack++; + if(mud == NULL) + return 0; + + if(mud->pesp_conn) + c_free(mud->pesp_conn); + struct espconn *pesp_conn = NULL; + pesp_conn = mud->pesp_conn = (struct espconn *)c_zalloc(sizeof(struct espconn)); + if(!pesp_conn) + return luaL_error(L, "not enough memory"); + + pesp_conn->proto.udp = NULL; + pesp_conn->proto.tcp = (esp_tcp *)c_zalloc(sizeof(esp_tcp)); + if(!pesp_conn->proto.tcp){ + c_free(pesp_conn); + pesp_conn = mud->pesp_conn = NULL; + return luaL_error(L, "not enough memory"); + } + // reverse is for the callback function + pesp_conn->reverse = mud; + pesp_conn->type = ESPCONN_TCP; + pesp_conn->state = ESPCONN_NONE; + mud->connected = 0; + + if( (stack<=top) && lua_isstring(L,stack) ) // deal with the domain string + { + domain = luaL_checklstring( L, stack, &il ); + + stack++; + if (domain == NULL) + { + domain = "127.0.0.1"; + } + ipaddr.addr = ipaddr_addr(domain); + c_memcpy(pesp_conn->proto.tcp->remote_ip, &ipaddr.addr, 4); + NODE_DBG("TCP ip is set: "); + NODE_DBG(IPSTR, IP2STR(&ipaddr.addr)); + NODE_DBG("\n"); + } + + if ( (stack<=top) && lua_isnumber(L, stack) ) + { + port = lua_tointeger(L, stack); + stack++; + NODE_DBG("TCP port is set: %d.\n", port); + } + pesp_conn->proto.tcp->remote_port = port; + pesp_conn->proto.tcp->local_port = espconn_port(); + + if ( (stack<=top) && lua_isnumber(L, stack) ) + { + secure = lua_tointeger(L, stack); + stack++; + if ( secure != 0 && secure != 1 ){ + secure = 0; // default to 0 + } + } else { + secure = 0; // default to 0 + } + mud->secure = secure; // save + + // call back function when a connection is obtained, tcp only + if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){ + lua_pushvalue(L, stack); // copy argument (func) to the top of stack + if(mud->cb_connect_ref != LUA_NOREF) + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref); + mud->cb_connect_ref = luaL_ref(L, LUA_REGISTRYINDEX); + stack++; + } + + lua_pushvalue(L, 1); // copy userdata to the top of stack + if(mud->self_ref != LUA_NOREF) + luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref); + mud->self_ref = luaL_ref(L, LUA_REGISTRYINDEX); + + espconn_regist_connectcb(pesp_conn, mqtt_socket_connected); + espconn_regist_reconcb(pesp_conn, mqtt_socket_reconnected); + + if((ipaddr.addr == IPADDR_NONE) && (c_memcmp(domain,"255.255.255.255",16) != 0)) + { + host_ip.addr = 0; + dns_reconn_count = 0; + if(ESPCONN_OK == espconn_gethostbyname(pesp_conn, domain, &host_ip, socket_dns_found)){ + socket_dns_found(domain, &host_ip, pesp_conn); // ip is returned in host_ip. + } + } + else + { + socket_connect(pesp_conn); + } + + os_timer_disarm(&mud->mqttTimer); + os_timer_setfn(&mud->mqttTimer, (os_timer_func_t *)mqtt_socket_timer, mud); + os_timer_arm(&mud->mqttTimer, 1000, 1); + + return 0; +} + +// Lua: mqtt:close() +// client disconnect and unref itself +static int mqtt_socket_close( lua_State* L ) +{ + NODE_DBG("mqtt_socket_close is called.\n"); + int i = 0; + lmqtt_userdata *mud = NULL; + + mud = (lmqtt_userdata *)luaL_checkudata(L, 1, "mqtt.socket"); + luaL_argcheck(L, mud, 1, "mqtt.socket expected"); + if(mud == NULL) + return 0; + + if(mud->pesp_conn == NULL) + return 0; + + // call mqtt_disconnect() + + if(mud->secure){ + if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port) + espconn_secure_disconnect(mud->pesp_conn); + } + else + { + if(mud->pesp_conn->proto.tcp->remote_port || mud->pesp_conn->proto.tcp->local_port) + espconn_disconnect(mud->pesp_conn); + } + return 0; +} + +// Lua: mqtt:on( "method", function() ) +static int mqtt_socket_on( lua_State* L ) +{ + NODE_DBG("mqtt_on is called.\n"); + lmqtt_userdata *mud; + size_t sl; + + mud = (lmqtt_userdata *)luaL_checkudata(L, 1, "mqtt.socket"); + luaL_argcheck(L, mud, 1, "mqtt.socket expected"); + if(mud==NULL){ + NODE_DBG("userdata is nil.\n"); + return 0; + } + + const char *method = luaL_checklstring( L, 2, &sl ); + if (method == NULL) + return luaL_error( L, "wrong arg type" ); + + luaL_checkanyfunction(L, 3); + lua_pushvalue(L, 3); // copy argument (func) to the top of stack + + if( sl == 7 && c_strcmp(method, "connect") == 0){ + if(mud->cb_connect_ref != LUA_NOREF) + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref); + mud->cb_connect_ref = luaL_ref(L, LUA_REGISTRYINDEX); + }else if( sl == 7 && c_strcmp(method, "offline") == 0){ + if(mud->cb_disconnect_ref != LUA_NOREF) + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); + mud->cb_disconnect_ref = luaL_ref(L, LUA_REGISTRYINDEX); + }else if( sl == 7 && c_strcmp(method, "message") == 0){ + if(mud->cb_message_ref != LUA_NOREF) + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_message_ref); + mud->cb_message_ref = luaL_ref(L, LUA_REGISTRYINDEX); + }else{ + lua_pop(L, 1); + return luaL_error( L, "method not supported" ); + } + + return 0; +} + +// Lua: mqtt:subscribe(topic, qos, function()) +static int mqtt_socket_subscribe( lua_State* L ) +{ + NODE_DBG("mqtt_socket_subscribe is called.\n"); + uint8_t stack = 1, qos = 0, retain = 0; + const char *topic; + size_t il; + lmqtt_userdata *mud; + + mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket"); + luaL_argcheck(L, mud, stack, "mqtt.socket expected"); + stack++; + + if(mud->send_timeout != 0) + return luaL_error( L, "sending in process" ); + + if(!mud->connected) + return luaL_error(L, "not connected"); + + topic = luaL_checklstring( L, stack, &il ); + stack++; + if(topic == NULL) + return luaL_error( L, "need topic name" ); + + qos = luaL_checkinteger( L, stack); + stack++; + + mud->mqtt_state.outbound_message = mqtt_msg_subscribe(&mud->mqtt_state.mqtt_connection, + topic, qos, + &mud->mqtt_state.pending_msg_id); + mud->send_timeout = MQTT_SEND_TIMEOUT; + mud->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_SUBSCRIBE; + mud->mqtt_state.pending_publish_qos = mqtt_get_qos(mud->mqtt_state.outbound_message->data); + + if (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION){ + lua_pushvalue(L, stack); // copy argument (func) to the top of stack + if(mud->cb_suback_ref != LUA_NOREF) + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_suback_ref); + mud->cb_suback_ref = luaL_ref(L, LUA_REGISTRYINDEX); + } + + if(mud->secure) + espconn_secure_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + else + espconn_sent(mud->pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + + return 0; +} + +// Lua: mqtt:publish( topic, payload, qos, retain, function() ) +static int mqtt_socket_publish( lua_State* L ) +{ + // NODE_DBG("mqtt_publish is called.\n"); + struct espconn *pesp_conn = NULL; + lmqtt_userdata *mud; + size_t l; + uint8_t stack = 1; + mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket"); + luaL_argcheck(L, mud, stack, "mqtt.socket expected"); + stack++; + if(mud==NULL){ + NODE_DBG("userdata is nil.\n"); + return 0; + } + + if(mud->pesp_conn == NULL){ + NODE_DBG("mud->pesp_conn is NULL.\n"); + return 0; + } + if(mud->send_timeout != 0) + return luaL_error( L, "sending in process" ); + pesp_conn = mud->pesp_conn; + +#if 0 + char temp[20] = {0}; + c_sprintf(temp, IPSTR, IP2STR( &(pesp_conn->proto.tcp->remote_ip) ) ); + NODE_DBG("remote "); + NODE_DBG(temp); + NODE_DBG(":"); + NODE_DBG("%d",pesp_conn->proto.tcp->remote_port); + NODE_DBG(" sending data.\n"); +#endif + const char *topic = luaL_checklstring( L, stack, &l ); + stack ++; + if (topic == NULL) + return luaL_error( L, "need topic" ); + + const char *payload = luaL_checklstring( L, stack, &l ); + stack ++; + uint8_t qos = luaL_checkinteger( L, stack); + stack ++; + uint8_t retain = luaL_checkinteger( L, stack); + stack ++; + + + mud->mqtt_state.outbound_message = mqtt_msg_publish(&mud->mqtt_state.mqtt_connection, + topic, payload, l, + qos, retain, + &mud->mqtt_state.pending_msg_id); + mud->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PUBLISH; + mud->mqtt_state.pending_publish_qos = qos; + mud->send_timeout = MQTT_SEND_TIMEOUT; + if (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION){ + lua_pushvalue(L, stack); // copy argument (func) to the top of stack + if(mud->cb_puback_ref != LUA_NOREF) + luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref); + mud->cb_puback_ref = luaL_ref(L, LUA_REGISTRYINDEX); + } + + if(mud->secure) + espconn_secure_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + else + espconn_sent(pesp_conn, mud->mqtt_state.outbound_message->data, mud->mqtt_state.outbound_message->length); + mud->mqtt_state.outbound_message = NULL; + return 0; +} + +// Module function map +#define MIN_OPT_LEVEL 2 +#include "lrodefs.h" + +static const LUA_REG_TYPE mqtt_socket_map[] = +{ + { LSTRKEY( "lwt" ), LFUNCVAL ( mqtt_socket_lwt ) }, + { LSTRKEY( "connect" ), LFUNCVAL ( mqtt_socket_connect ) }, + { LSTRKEY( "close" ), LFUNCVAL ( mqtt_socket_close ) }, + { LSTRKEY( "publish" ), LFUNCVAL ( mqtt_socket_publish ) }, + { LSTRKEY( "subscribe" ), LFUNCVAL ( mqtt_socket_subscribe ) }, + { LSTRKEY( "on" ), LFUNCVAL ( mqtt_socket_on ) }, + { LSTRKEY( "__gc" ), LFUNCVAL ( mqtt_delete ) }, +#if LUA_OPTIMIZE_MEMORY > 0 + { LSTRKEY( "__index" ), LROVAL ( mqtt_socket_map ) }, +#endif + { LNILKEY, LNILVAL } +}; + +const LUA_REG_TYPE mqtt_map[] = +{ + { LSTRKEY( "Client" ), LFUNCVAL ( mqtt_socket_client ) }, +#if LUA_OPTIMIZE_MEMORY > 0 + + { LSTRKEY( "__metatable" ), LROVAL( mqtt_map ) }, +#endif + { LNILKEY, LNILVAL } +}; + +LUALIB_API int ICACHE_FLASH_ATTR luaopen_mqtt( lua_State *L ) +{ +#if LUA_OPTIMIZE_MEMORY > 0 + luaL_rometatable(L, "mqtt.socket", (void *)mqtt_socket_map); // create metatable for mqtt.socket + return 0; +#else // #if LUA_OPTIMIZE_MEMORY > 0 + int n; + luaL_register( L, AUXLIB_MQTT, mqtt_map ); + + // Set it as its own metatable + lua_pushvalue( L, -1 ); + lua_setmetatable( L, -2 ); + + // Module constants + // MOD_REG_NUMBER( L, "TCP", TCP ); + + // create metatable + luaL_newmetatable(L, "mqtt.socket"); + // metatable.__index = metatable + lua_pushliteral(L, "__index"); + lua_pushvalue(L,-2); + lua_rawset(L,-3); + // Setup the methods inside metatable + luaL_register( L, NULL, mqtt_socket_map ); + + return 1; +#endif // #if LUA_OPTIMIZE_MEMORY > 0 +} diff --git a/app/mqtt/Makefile b/app/mqtt/Makefile new file mode 100644 index 00000000..3fd35717 --- /dev/null +++ b/app/mqtt/Makefile @@ -0,0 +1,44 @@ + +############################################################# +# Required variables for each makefile +# Discard this section from all parent makefiles +# Expected variables (with automatic defaults): +# CSRCS (all "C" files in the dir) +# SUBDIRS (all subdirs with a Makefile) +# GEN_LIBS - list of libs to be generated () +# GEN_IMAGES - list of images to be generated () +# COMPONENTS_xxx - a list of libs/objs in the form +# subdir/lib to be extracted and rolled up into +# a generated lib/image xxx.a () +# +ifndef PDIR +GEN_LIBS = mqtt.a +endif + +############################################################# +# Configuration i.e. compile options etc. +# Target specific stuff (defines etc.) goes in here! +# Generally values applying to a tree are captured in the +# makefile at its root level - these are then overridden +# for a subtree within the makefile rooted therein +# +#DEFINES += + +############################################################# +# Recursion Magic - Don't touch this!! +# +# Each subtree potentially has an include directory +# corresponding to the common APIs applicable to modules +# rooted at that subtree. Accordingly, the INCLUDE PATH +# of a module can only contain the include directories up +# its parent path, and not its siblings +# +# Required for each makefile to inherit from the parent +# + +INCLUDES := $(INCLUDES) -I $(PDIR)include +INCLUDES += -I ./ +INCLUDES += -I ../libc +PDIR := ../$(PDIR) +sinclude $(PDIR)Makefile + diff --git a/app/mqtt/mqtt_msg.c b/app/mqtt/mqtt_msg.c new file mode 100644 index 00000000..5e49bfb1 --- /dev/null +++ b/app/mqtt/mqtt_msg.c @@ -0,0 +1,457 @@ +/* +* Copyright (c) 2014, Stephen Robinson +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* 1. Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* 2. Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* 3. Neither the name of the copyright holder nor the names of its +* contributors may be used to endorse or promote products derived +* from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +* +*/ + +#include +#include "mqtt_msg.h" + +#define MQTT_MAX_FIXED_HEADER_SIZE 3 + +enum mqtt_connect_flag +{ + MQTT_CONNECT_FLAG_USERNAME = 1 << 7, + MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, + MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, + MQTT_CONNECT_FLAG_WILL = 1 << 2, + MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 +}; + +struct __attribute((__packed__)) mqtt_connect_variable_header +{ + uint8_t lengthMsb; + uint8_t lengthLsb; + uint8_t magic[6]; + uint8_t version; + uint8_t flags; + uint8_t keepaliveMsb; + uint8_t keepaliveLsb; +}; + +static int append_string(mqtt_connection_t* connection, const char* string, int len) +{ + if(connection->message.length + len + 2 > connection->buffer_length) + return -1; + + connection->buffer[connection->message.length++] = len >> 8; + connection->buffer[connection->message.length++] = len & 0xff; + memcpy(connection->buffer + connection->message.length, string, len); + connection->message.length += len; + + return len + 2; +} + +static uint16_t append_message_id(mqtt_connection_t* connection, uint16_t message_id) +{ + // If message_id is zero then we should assign one, otherwise + // we'll use the one supplied by the caller + while(message_id == 0) + message_id = ++connection->message_id; + + if(connection->message.length + 2 > connection->buffer_length) + return 0; + + connection->buffer[connection->message.length++] = message_id >> 8; + connection->buffer[connection->message.length++] = message_id & 0xff; + + return message_id; +} + +static int init_message(mqtt_connection_t* connection) +{ + connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE; + return MQTT_MAX_FIXED_HEADER_SIZE; +} + +static mqtt_message_t* fail_message(mqtt_connection_t* connection) +{ + connection->message.data = connection->buffer; + connection->message.length = 0; + return &connection->message; +} + +static mqtt_message_t* fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain) +{ + int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; + + if(remaining_length > 127) + { + connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); + connection->buffer[1] = 0x80 | (remaining_length % 128); + connection->buffer[2] = remaining_length / 128; + connection->message.length = remaining_length + 3; + connection->message.data = connection->buffer; + } + else + { + connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); + connection->buffer[2] = remaining_length; + connection->message.length = remaining_length + 2; + connection->message.data = connection->buffer + 1; + } + + return &connection->message; +} + +void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length) +{ + memset(connection, 0, sizeof(connection)); + connection->buffer = buffer; + connection->buffer_length = buffer_length; +} + +int mqtt_get_total_length(uint8_t* buffer, uint16_t length) +{ + int i; + int totlen = 0; + + for(i = 1; i < length; ++i) + { + totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); + if((buffer[i] & 0x80) == 0) + { + ++i; + break; + } + } + totlen += i; + + return totlen; +} + +const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length) +{ + int i; + int totlen = 0; + int topiclen; + + for(i = 1; i < *length; ++i) + { + totlen += (buffer[i] & 0x7f) << (7 * (i -1)); + if((buffer[i] & 0x80) == 0) + { + ++i; + break; + } + } + totlen += i; + + if(i + 2 >= *length) + return NULL; + topiclen = buffer[i++] << 8; + topiclen |= buffer[i++]; + + if(i + topiclen > *length) + return NULL; + + *length = topiclen; + return (const char*)(buffer + i); +} + +const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length) +{ + int i; + int totlen = 0; + int topiclen; + + for(i = 1; i < *length; ++i) + { + totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); + if((buffer[i] & 0x80) == 0) + { + ++i; + break; + } + } + totlen += i; + + if(i + 2 >= *length) + return NULL; + topiclen = buffer[i++] << 8; + topiclen |= buffer[i++]; + + if(i + topiclen >= *length){ + *length = 0; + return NULL; + } + i += topiclen; + + if(mqtt_get_qos(buffer) > 0) + { + if(i + 2 >= *length) + return NULL; + i += 2; + } + + if(totlen < i) + return NULL; + + if(totlen <= *length) + *length = totlen - i; + else + *length = *length - i; + return (const char*)(buffer + i); +} + +uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length) +{ + if(length < 1) + return 0; + + switch(mqtt_get_type(buffer)) + { + case MQTT_MSG_TYPE_PUBLISH: + { + int i; + int topiclen; + + for(i = 1; i < length; ++i) + { + if((buffer[i] & 0x80) == 0) + { + ++i; + break; + } + } + + if(i + 2 >= length) + return 0; + topiclen = buffer[i++] << 8; + topiclen |= buffer[i++]; + + if(i + topiclen >= length) + return 0; + i += topiclen; + + if(mqtt_get_qos(buffer) > 0) + { + if(i + 2 >= length) + return 0; + //i += 2; + } else { + return 0; + } + + return (buffer[i] << 8) | buffer[i + 1]; + } + case MQTT_MSG_TYPE_PUBACK: + case MQTT_MSG_TYPE_PUBREC: + case MQTT_MSG_TYPE_PUBREL: + case MQTT_MSG_TYPE_PUBCOMP: + case MQTT_MSG_TYPE_SUBACK: + case MQTT_MSG_TYPE_UNSUBACK: + case MQTT_MSG_TYPE_SUBSCRIBE: + { + // This requires the remaining length to be encoded in 1 byte, + // which it should be. + if(length >= 4 && (buffer[1] & 0x80) == 0) + return (buffer[2] << 8) | buffer[3]; + else + return 0; + } + + default: + return 0; + } +} + +mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info) +{ + struct mqtt_connect_variable_header* variable_header; + + init_message(connection); + + if(connection->message.length + sizeof(*variable_header) > connection->buffer_length) + return fail_message(connection); + variable_header = (void*)(connection->buffer + connection->message.length); + connection->message.length += sizeof(*variable_header); + + variable_header->lengthMsb = 0; + variable_header->lengthLsb = 6; + memcpy(variable_header->magic, "MQIsdp", 6); + variable_header->version = 3; + variable_header->flags = 0; + variable_header->keepaliveMsb = info->keepalive >> 8; + variable_header->keepaliveLsb = info->keepalive & 0xff; + + if(info->clean_session) + variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; + + if(info->client_id != NULL && info->client_id[0] != '\0') + { + if(append_string(connection, info->client_id, strlen(info->client_id)) < 0) + return fail_message(connection); + } + else + return fail_message(connection); + + if(info->will_topic != NULL && info->will_topic[0] != '\0') + { + if(append_string(connection, info->will_topic, strlen(info->will_topic)) < 0) + return fail_message(connection); + + if(append_string(connection, info->will_message, strlen(info->will_message)) < 0) + return fail_message(connection); + + variable_header->flags |= MQTT_CONNECT_FLAG_WILL; + if(info->will_retain) + variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; + variable_header->flags |= (info->will_qos & 3) << 4; + } + + if(info->username != NULL && info->username[0] != '\0') + { + if(append_string(connection, info->username, strlen(info->username)) < 0) + return fail_message(connection); + + variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME; + } + + if(info->password != NULL && info->password[0] != '\0') + { + if(append_string(connection, info->password, strlen(info->password)) < 0) + return fail_message(connection); + + variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD; + } + + return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0); +} + +mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id) +{ + init_message(connection); + + if(topic == NULL || topic[0] == '\0') + return fail_message(connection); + + if(append_string(connection, topic, strlen(topic)) < 0) + return fail_message(connection); + + if(qos > 0) + { + if((*message_id = append_message_id(connection, 0)) == 0) + return fail_message(connection); + } + else + *message_id = 0; + + if(connection->message.length + data_length > connection->buffer_length) + return fail_message(connection); + memcpy(connection->buffer + connection->message.length, data, data_length); + connection->message.length += data_length; + + return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); +} + +mqtt_message_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id) +{ + init_message(connection); + if(append_message_id(connection, message_id) == 0) + return fail_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0); +} + +mqtt_message_t* mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id) +{ + init_message(connection); + if(append_message_id(connection, message_id) == 0) + return fail_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0); +} + +mqtt_message_t* mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id) +{ + init_message(connection); + if(append_message_id(connection, message_id) == 0) + return fail_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0); +} + +mqtt_message_t* mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id) +{ + init_message(connection); + if(append_message_id(connection, message_id) == 0) + return fail_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); +} + +mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id) +{ + init_message(connection); + + if(topic == NULL || topic[0] == '\0') + return fail_message(connection); + + if((*message_id = append_message_id(connection, 0)) == 0) + return fail_message(connection); + + if(append_string(connection, topic, strlen(topic)) < 0) + return fail_message(connection); + + if(connection->message.length + 1 > connection->buffer_length) + return fail_message(connection); + connection->buffer[connection->message.length++] = qos; + + return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); +} + +mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id) +{ + init_message(connection); + + if(topic == NULL || topic[0] == '\0') + return fail_message(connection); + + if((*message_id = append_message_id(connection, 0)) == 0) + return fail_message(connection); + + if(append_string(connection, topic, strlen(topic)) < 0) + return fail_message(connection); + + return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); +} + +mqtt_message_t* mqtt_msg_pingreq(mqtt_connection_t* connection) +{ + init_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0); +} + +mqtt_message_t* mqtt_msg_pingresp(mqtt_connection_t* connection) +{ + init_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0); +} + +mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection) +{ + init_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0); +} diff --git a/app/mqtt/mqtt_msg.h b/app/mqtt/mqtt_msg.h new file mode 100644 index 00000000..225ba642 --- /dev/null +++ b/app/mqtt/mqtt_msg.h @@ -0,0 +1,129 @@ +/* + * File: mqtt_msg.h + * Author: Minh Tuan + * + * Created on July 12, 2014, 1:05 PM + */ + +#ifndef MQTT_MSG_H +#define MQTT_MSG_H +#include "c_types.h" +#ifdef __cplusplus +extern "C" { +#endif + +/* +* Copyright (c) 2014, Stephen Robinson +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* 1. Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* 2. Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* 3. Neither the name of the copyright holder nor the names of its +* contributors may be used to endorse or promote products derived +* from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +* +*/ +/* 7 6 5 4 3 2 1 0*/ +/*| --- Message Type---- | DUP Flag | QoS Level | Retain | +/* Remaining Length */ + + +enum mqtt_message_type +{ + MQTT_MSG_TYPE_CONNECT = 1, + MQTT_MSG_TYPE_CONNACK = 2, + MQTT_MSG_TYPE_PUBLISH = 3, + MQTT_MSG_TYPE_PUBACK = 4, + MQTT_MSG_TYPE_PUBREC = 5, + MQTT_MSG_TYPE_PUBREL = 6, + MQTT_MSG_TYPE_PUBCOMP = 7, + MQTT_MSG_TYPE_SUBSCRIBE = 8, + MQTT_MSG_TYPE_SUBACK = 9, + MQTT_MSG_TYPE_UNSUBSCRIBE = 10, + MQTT_MSG_TYPE_UNSUBACK = 11, + MQTT_MSG_TYPE_PINGREQ = 12, + MQTT_MSG_TYPE_PINGRESP = 13, + MQTT_MSG_TYPE_DISCONNECT = 14 +}; + +typedef struct mqtt_message +{ + uint8_t* data; + uint16_t length; + +} mqtt_message_t; + +typedef struct mqtt_connection +{ + mqtt_message_t message; + + uint16_t message_id; + uint8_t* buffer; + uint16_t buffer_length; + +} mqtt_connection_t; + +typedef struct mqtt_connect_info +{ + char* client_id; + char* username; + char* password; + char* will_topic; + char* will_message; + int keepalive; + int will_qos; + int will_retain; + int clean_session; + +} mqtt_connect_info_t; + + +static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> 4; } +static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; } +static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; } +static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } + +void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); +int mqtt_get_total_length(uint8_t* buffer, uint16_t length); +const char* mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length); +const char* mqtt_get_publish_data(uint8_t* buffer, uint16_t* length); +uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length); + +mqtt_message_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info); +mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id); +mqtt_message_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id); +mqtt_message_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id); +mqtt_message_t* mqtt_msg_pingreq(mqtt_connection_t* connection); +mqtt_message_t* mqtt_msg_pingresp(mqtt_connection_t* connection); +mqtt_message_t* mqtt_msg_disconnect(mqtt_connection_t* connection); + + +#ifdef __cplusplus +} +#endif + +#endif /* MQTT_MSG_H */ + diff --git a/examples/fragment.lua b/examples/fragment.lua index 196232c1..d619b321 100644 --- a/examples/fragment.lua +++ b/examples/fragment.lua @@ -313,3 +313,9 @@ uart.on("data", 0 ,function(input) if input=="q" then uart.on("data") else print uart.on("data","\r",function(input) if input=="quit" then uart.on("data") else print(input) end end, 1) for k, v in pairs(file.list()) do print('file:'..k..' len:'..v) end + +m=mqtt.Client() +m:connect("192.168.18.101",1883) +m:subscribe("/topic",0,function(m) print("sub done") end) +m:on("message",function(m,t,pl) print(t..":") if pl~=nil then print(pl) end end ) +m:publish("/topic","hello",0,0) diff --git a/pre_build/0.9.5/nodemcu_latest.bin b/pre_build/0.9.5/nodemcu_20150118.bin similarity index 100% rename from pre_build/0.9.5/nodemcu_latest.bin rename to pre_build/0.9.5/nodemcu_20150118.bin diff --git a/pre_build/latest/nodemcu_latest.bin b/pre_build/latest/nodemcu_latest.bin index 3fd92e0b..2317b603 100644 Binary files a/pre_build/latest/nodemcu_latest.bin and b/pre_build/latest/nodemcu_latest.bin differ diff --git a/tools/.gitattributes b/tools/.gitattributes new file mode 100644 index 00000000..b93363f7 --- /dev/null +++ b/tools/.gitattributes @@ -0,0 +1,11 @@ +# Enforce Unix newlines +*.css text eol=lf +*.html text eol=lf +*.js text eol=lf +*.json text eol=lf +*.less text eol=lf +*.md text eol=lf +*.svg text eol=lf +*.yml text eol=lf +*.py text eol=lf +*.sh text eol=lf