diff --git a/components/modules/Kconfig b/components/modules/Kconfig index b07b6d0e..a44d5850 100644 --- a/components/modules/Kconfig +++ b/components/modules/Kconfig @@ -126,6 +126,14 @@ menu "NodeMCU modules" Includes the HTTPD module. This module uses the regular IDF http server component internally. + config NODEMCU_CMODULE_HTTPD_WS + bool "Include websocket support." if NODEMCU_CMODULE_HTTPD + default "n" + select HTTPD_WS_SUPPORT + help + Includes the websocket support. This module uses the regular IDF + http server component internally. + config NODEMCU_CMODULE_HTTPD_MAX_RESPONSE_HEADERS int "Max response header fields" if NODEMCU_CMODULE_HTTPD default 5 diff --git a/components/modules/httpd.c b/components/modules/httpd.c index 6fcbc299..ee666faf 100644 --- a/components/modules/httpd.c +++ b/components/modules/httpd.c @@ -73,6 +73,9 @@ typedef enum { READ_BODY_CHUNK, SEND_RESPONSE, SEND_PARTIAL_RESPONSE, + SEND_ERROR, + SEND_OK, + FREE_WS_OBJECT, } request_type_t; typedef struct { @@ -99,6 +102,11 @@ typedef struct { const char *query_str; int method; size_t body_len; + httpd_req_t *req; +#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS + httpd_ws_frame_t ws_pkt; + int reference; +#endif } request_data_t; @@ -107,6 +115,21 @@ typedef struct { uint32_t guard; } req_udata_t; +#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS +#define WS_METATABLE "httpd.ws" +#define HTTP_WEBSOCKET 1234 +#define HTTP_WEBSOCKET_GET 1235 + +typedef struct { + bool closed; + httpd_handle_t handle; + int fd; + int self_ref; + int text_fn_ref; + int binary_fn_ref; + int close_fn_ref; +} ws_connection_t; +#endif typedef enum { INDEX_NONE, INDEX_ROOT, INDEX_ALL } index_mode_t; @@ -240,10 +263,9 @@ static esp_err_t auto_index_handler(httpd_req_t *req) return ESP_OK; } - static esp_err_t dynamic_handler_httpd(httpd_req_t *req) { - size_t query_len = httpd_req_get_url_query_len(req); + size_t query_len = req->method != HTTP_WEBSOCKET ? httpd_req_get_url_query_len(req) : 0; char *query = query_len ? malloc(query_len + 1) : NULL; if (query_len) httpd_req_get_url_query_str(req, query, query_len + 1); @@ -254,7 +276,34 @@ static esp_err_t dynamic_handler_httpd(httpd_req_t *req) .query_str = query, .method = req->method, .body_len = req->content_len, + .req = req, }; + +#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS + memset(&req_data.ws_pkt, 0, sizeof(httpd_ws_frame_t)); + if (req->method == HTTP_WEBSOCKET) { + printf("Handling callback for websocket\n"); + req_data.ws_pkt.type = HTTPD_WS_TYPE_TEXT; + /* Set max_len = 0 to get the frame len */ + esp_err_t ret = httpd_ws_recv_frame(req, &req_data.ws_pkt, 0); + if (ret != ESP_OK) { + return ret; + } + + printf("About to allocate %d bytes for buffer\n", req_data.ws_pkt.len); + char *buf = malloc(req_data.ws_pkt.len); + if (!buf) { + return ESP_ERR_NO_MEM; + } + req_data.ws_pkt.payload = (void *) buf; + ret = httpd_ws_recv_frame(req, &req_data.ws_pkt, req_data.ws_pkt.len); + if (ret != ESP_OK) { + free(buf); + return ret; + } + } + #endif + // Pass the req info over to the LVM thread task_post_medium(dynamic_task, (task_param_t)&req_data); @@ -319,13 +368,50 @@ static esp_err_t dynamic_handler_httpd(httpd_req_t *req) } } +#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS + if (req_data.ws_pkt.payload) { + free(req_data.ws_pkt.payload); + req_data.ws_pkt.payload = 0; + } +#endif + // Request processed, release LVM thread xSemaphoreGive(done); - } while(tr.request_type != SEND_RESPONSE); // done + } while(tr.request_type != SEND_RESPONSE && tr.request_type != SEND_ERROR && tr.request_type != SEND_OK); // done + + if (tr.request_type == SEND_ERROR) { + return ESP_FAIL; + } return ESP_OK; } +#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS +static esp_err_t websocket_handler_httpd(httpd_req_t *req) +{ + if (req->method == HTTP_GET) { + req->method = HTTP_WEBSOCKET_GET; + } else { + req->method = HTTP_WEBSOCKET; + } + return dynamic_handler_httpd(req); +} + +static void free_sess_ctx(void *ctx) { + int ref = (int) ctx; + + request_data_t req_data = { + .method = FREE_WS_OBJECT, + .reference = ref, + }; + task_post_medium(dynamic_task, (task_param_t)&req_data); + thread_request_t tr; + xQueueReceive(queue, &tr, portMAX_DELAY); // Receive the reply + xSemaphoreGive(done); +} + +#endif + // ---- helper functions ---------------------------------------------- @@ -434,7 +520,6 @@ static int lhttpd_req_index(lua_State *L) #undef KEY_IS } - static void dynamic_handler_lvm(task_param_t param, task_prio_t prio) { UNUSED(prio); @@ -452,84 +537,162 @@ static void dynamic_handler_lvm(task_param_t param, task_prio_t prio) .response = &resp, }; - lua_rawgeti(L, LUA_REGISTRYINDEX, dynamic_handlers_table_ref); // +1 - lua_getfield(L, -1, req_info->key); // +1 - if (lua_isfunction(L, -1)) - { - // push req - req_udata_t *ud = - (req_udata_t *)lua_newuserdata(L, sizeof(req_udata_t)); // +1 - ud->req_info = req_info; - ud->guard = guard; - luaL_getmetatable(L, REQUEST_METATABLE); // +1 - lua_setmetatable(L, -2); // -1 +#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS + if (req_info->method == FREE_WS_OBJECT) { + printf("Freeing WS Object %d\n", req_info->reference); + lua_rawgeti(L, LUA_REGISTRYINDEX, req_info->reference); + ws_connection_t *ws = (ws_connection_t *) luaL_checkudata(L, -1, WS_METATABLE); - int err = luaL_pcallx(L, 1, 1); // -1 +1 - if (!err && lua_istable(L, -1)) - { - // pull out response data - int t = lua_gettop(L); // response table index - lua_getfield(L, t, "status"); // +1 - resp.status_str = luaL_optstring(L, -1, "200 OK"); - lua_getfield(L, t, "type"); // +1 - resp.content_type = luaL_optstring(L, -1, NULL); - lua_getfield(L, t, "body"); // +1 - resp.body_data = luaL_optlstring(L, -1, NULL, &resp.body_len); - if (!resp.body_data) - resp.body_len = 0; - lua_getfield(L, t, "headers"); // +1 - if (lua_istable(L, -1)) - { - lua_pushnil(L); // +1 - for (unsigned i = 0; lua_next(L, -2); ++i) // +1 - { - if (i >= MAX_RESPONSE_HEADERS) - { - printf("Warning - too many response headers, ignoring some!\n"); - break; - } - resp.headers[i].key = lua_tostring(L, -2); - resp.headers[i].value = lua_tostring(L, -1); - lua_pop(L, 1); // drop value, keep key for lua_next() - } + if (!ws->closed) { + printf("FIrst close\n"); + ws->closed = true; + if (ws->close_fn_ref > 0) { + printf("Calling close handler\n"); + lua_rawgeti(L, LUA_REGISTRYINDEX, ws->close_fn_ref); + luaL_pcallx(L, 0, 0); } - lua_getfield(L, t, "getbody"); // +1 - if (lua_isfunction(L, -1)) + } + + luaL_unref(L, LUA_REGISTRYINDEX, ws->self_ref); + ws->self_ref = LUA_NOREF; + tr.request_type = SEND_OK; + } else if (req_info->method == HTTP_WEBSOCKET) { + printf("Handling websocket callbacks\n"); + // Just handle the callbacks here + if (req_info->req->sess_ctx) { + // Websocket event arrived + printf("Sess_ctx = %d\n", (int) req_info->req->sess_ctx); + lua_rawgeti(L, LUA_REGISTRYINDEX, (int) req_info->req->sess_ctx); + ws_connection_t *ws = (ws_connection_t *) luaL_checkudata(L, -1, WS_METATABLE); + int fn = 0; + + if (req_info->ws_pkt.type == HTTPD_WS_TYPE_TEXT) { + fn = ws->text_fn_ref; + } else if (req_info->ws_pkt.type == HTTPD_WS_TYPE_BINARY) { + fn = ws->binary_fn_ref; + } + + if (fn) { + lua_rawgeti(L, LUA_REGISTRYINDEX, fn); + + lua_pushlstring(L, (const char *) req_info->ws_pkt.payload, (size_t) req_info->ws_pkt.len); + + luaL_pcallx(L, 1, 0); + } + } + tr.request_type = SEND_OK; + } else +#endif + { + lua_rawgeti(L, LUA_REGISTRYINDEX, dynamic_handlers_table_ref); // +1 + lua_getfield(L, -1, req_info->key); // +1 + if (lua_isfunction(L, -1)) + { + // push req + req_udata_t *ud = + (req_udata_t *)lua_newuserdata(L, sizeof(req_udata_t)); // +1 + ud->req_info = req_info; + ud->guard = guard; + luaL_getmetatable(L, REQUEST_METATABLE); // +1 + lua_setmetatable(L, -2); // -1 + +#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS + if (strncmp(req_info->key, "[WS]", 4) == 0) { + if (req_info->method == HTTP_WEBSOCKET_GET) { + // web socket + ws_connection_t *ws = (ws_connection_t *) lua_newuserdata(L, sizeof(*ws)); + memset(ws, 0, sizeof(*ws)); + luaL_getmetatable(L, WS_METATABLE); + lua_setmetatable(L, -2); + lua_pushvalue(L, -1); + ws->self_ref = luaL_ref(L, LUA_REGISTRYINDEX); + ws->handle = req_info->req->handle; + ws->fd = httpd_req_to_sockfd(req_info->req); + + // Set the session context so we know what is going on. + req_info->req->sess_ctx = (void *) ws->self_ref; + req_info->req->free_ctx = free_sess_ctx; + + int err = luaL_pcallx(L, 2, 0); + if (err) { + tr.request_type = SEND_ERROR; + luaL_unref(L, LUA_REGISTRYINDEX, ws->self_ref); + ws->self_ref = LUA_NOREF; + } else { + tr.request_type = SEND_OK; + } + } + } else +#endif { - // Okay, we're doing a chunked body send, so we have to repeatedly - // call the provided getbody() function until it returns nil - bool headers_cleared = false; - tr.request_type = SEND_PARTIAL_RESPONSE; -next_chunk: - resp.body_data = NULL; - resp.body_len = 0; - err = luaL_pcallx(L, 0, 1); // -1 +1 - resp.body_data = - err ? NULL : luaL_optlstring(L, -1, NULL, &resp.body_len); - if (resp.body_data) + int err = luaL_pcallx(L, 1, 1); // -1 +1 + if (!err && lua_istable(L, -1)) { - // Toss this bit of response data over to the httpd thread - xQueueSend(queue, &tr, portMAX_DELAY); - // ...and wait until it's done sending it - xSemaphoreTake(done, portMAX_DELAY); - - lua_pop(L, 1); // -1 - - if (!headers_cleared) + // pull out response data + int t = lua_gettop(L); // response table index + lua_getfield(L, t, "status"); // +1 + resp.status_str = luaL_optstring(L, -1, "200 OK"); + lua_getfield(L, t, "type"); // +1 + resp.content_type = luaL_optstring(L, -1, NULL); + lua_getfield(L, t, "body"); // +1 + resp.body_data = luaL_optlstring(L, -1, NULL, &resp.body_len); + if (!resp.body_data) + resp.body_len = 0; + lua_getfield(L, t, "headers"); // +1 + if (lua_istable(L, -1)) { - // Clear the header data; it's only used for the first chunk - resp.status_str = NULL; - resp.content_type = NULL; - for (unsigned i = 0; i < MAX_RESPONSE_HEADERS; ++i) - resp.headers[i].key = resp.headers[i].value = NULL; - - headers_cleared = true; + lua_pushnil(L); // +1 + for (unsigned i = 0; lua_next(L, -2); ++i) // +1 + { + if (i >= MAX_RESPONSE_HEADERS) + { + printf("Warning - too many response headers, ignoring some!\n"); + break; + } + resp.headers[i].key = lua_tostring(L, -2); + resp.headers[i].value = lua_tostring(L, -1); + lua_pop(L, 1); // drop value, keep key for lua_next() + } } lua_getfield(L, t, "getbody"); // +1 - goto next_chunk; + if (lua_isfunction(L, -1)) + { + // Okay, we're doing a chunked body send, so we have to repeatedly + // call the provided getbody() function until it returns nil + bool headers_cleared = false; + tr.request_type = SEND_PARTIAL_RESPONSE; + next_chunk: + resp.body_data = NULL; + resp.body_len = 0; + err = luaL_pcallx(L, 0, 1); // -1 +1 + resp.body_data = + err ? NULL : luaL_optlstring(L, -1, NULL, &resp.body_len); + if (resp.body_data) + { + // Toss this bit of response data over to the httpd thread + xQueueSend(queue, &tr, portMAX_DELAY); + // ...and wait until it's done sending it + xSemaphoreTake(done, portMAX_DELAY); + + lua_pop(L, 1); // -1 + + if (!headers_cleared) + { + // Clear the header data; it's only used for the first chunk + resp.status_str = NULL; + resp.content_type = NULL; + for (unsigned i = 0; i < MAX_RESPONSE_HEADERS; ++i) + resp.headers[i].key = resp.headers[i].value = NULL; + + headers_cleared = true; + } + lua_getfield(L, t, "getbody"); // +1 + goto next_chunk; + } + // else, getbody() returned nil, so let the normal exit path + // toss the final SEND_PARTIAL_RESPONSE request over to the httpd + } } - // else, getbody() returned nil, so let the normal exit path - // toss the final SEND_PARTIAL_RESPONSE request over to the httpd } } } @@ -584,6 +747,46 @@ static int lhttpd_static(lua_State *L) return 1; } +#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS +// add websocket route: httpd.websocket(uri, handler) +static int lhttpd_websocket(lua_State *L) +{ + if (!server) + return luaL_error(L, "Server not started"); + + const char *match = luaL_checkstring(L, 1); + luaL_checkfunction(L, 2); + lua_settop(L, 2); + + if (!match[0]) + return luaL_error(L, "Null route not supported"); + + // Create a key for this entry + const char *key = lua_pushfstring(L, "[WS]%s", match); + + // Store this in our dynamic handlers table, so the ref lives on + // on, but so that we can also free it after server shutdown. + lua_rawgeti(L, LUA_REGISTRYINDEX, dynamic_handlers_table_ref); + lua_pushvalue(L, -2); // key + lua_pushvalue(L, 2); // handler + lua_settable(L, -3); + + httpd_uri_t websocket_handler = { + .uri = match, + .method = HTTP_GET, + .handler = websocket_handler_httpd, + .user_ctx = (void *)key, + .is_websocket = true, + .handle_ws_control_frames = false, + }; + if (httpd_register_uri_handler(server, &websocket_handler) == 1) + lua_pushinteger(L, 1); + else + lua_pushnil(L); + + return 1; +} +#endif // add dynamic route: httpd.dynamic(method, uri, handler) static int lhttpd_dynamic(lua_State *L) @@ -715,17 +918,140 @@ static int lhttpd_stop(lua_State *L) return 0; } +#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS +// Websocket functions + +typedef struct { + httpd_handle_t hd; + int fd; + int type; + size_t len; + char *data; +} async_send_t; + +/* + * async send function, which we put into the httpd work queue + */ +static void ws_async_send(void *arg) +{ + async_send_t *async_send = arg; + httpd_handle_t hd = async_send->hd; + int fd = async_send->fd; + httpd_ws_frame_t ws_pkt; + memset(&ws_pkt, 0, sizeof(httpd_ws_frame_t)); + ws_pkt.payload = (uint8_t*)async_send->data; + ws_pkt.len = async_send->len; + ws_pkt.type = async_send->type; + + httpd_ws_send_frame_async(hd, fd, &ws_pkt); + free(async_send); +} + +static esp_err_t trigger_async_send(ws_connection_t *ws, int type, const char *data, size_t len) { + async_send_t *async_send = malloc(sizeof(async_send_t) + len); + async_send->hd = ws->handle; + async_send->fd = ws->fd; + async_send->type = type; + async_send->len = len; + async_send->data = (char *) (async_send + 1); + memcpy(async_send->data, data, len); + return httpd_queue_work(ws->handle, ws_async_send, async_send); +} + +static void ws_async_close(void *arg) { + async_send_t *async_close = arg; + + httpd_sess_trigger_close(async_close->hd, async_close->fd); + free(async_close); +} + +static int ws_close(lua_State *L) { + ws_connection_t *ws = (ws_connection_t*)luaL_checkudata(L, 1, WS_METATABLE); + + if (!ws->closed) { + ws->closed = true; + async_send_t *async_close = malloc(sizeof(async_send_t)); + async_close->hd = ws->handle; + async_close->fd = ws->fd; + httpd_queue_work(ws->handle, ws_async_close, async_close); + } + return 0; +} + +// event types: text, binary, close +static int ws_on(lua_State *L) { + ws_connection_t *ws = (ws_connection_t*)luaL_checkudata(L, 1, WS_METATABLE); + const char *event = lua_tostring(L, 2); + + int *slot = NULL; + if (strcmp(event, "text") == 0) { + slot = &ws->text_fn_ref; + } else if (strcmp(event, "binary") == 0) { + slot = &ws->binary_fn_ref; + } else if (strcmp(event, "close") == 0) { + slot = &ws->close_fn_ref; + } else { + return luaL_error(L, "Incorrect event argument"); + } + + if (*slot) { + luaL_unref(L, LUA_REGISTRYINDEX, *slot); + *slot = 0; + } + + if (!lua_isnil(L, 3)) { + luaL_checkfunction(L, 3); + lua_pushvalue(L, 3); + *slot = luaL_ref(L, LUA_REGISTRYINDEX); + } + + return 0; +} + +static int ws_textbinary(lua_State *L, int type) { + ws_connection_t *ws = (ws_connection_t*)luaL_checkudata(L, 1, WS_METATABLE); + + size_t len; + const char *data = lua_tolstring(L, 2, &len); + + esp_err_t rc = trigger_async_send(ws, type, data, len); + if (rc) { + return luaL_error(L, "Failed to send to websocket"); + } + + return 0; +} + +static int ws_text(lua_State *L) { + return ws_textbinary(L, HTTPD_WS_TYPE_TEXT); +} + +static int ws_binary(lua_State *L) { + return ws_textbinary(L, HTTPD_WS_TYPE_BINARY); +} + +LROT_BEGIN(httpd_ws_mt, NULL, LROT_MASK_GC_INDEX) + LROT_TABENTRY( __index, httpd_ws_mt ) + LROT_FUNCENTRY( __gc, ws_close ) + LROT_FUNCENTRY( close, ws_close ) + LROT_FUNCENTRY( on, ws_on ) + LROT_FUNCENTRY( text, ws_text ) + LROT_FUNCENTRY( binary, ws_binary ) +LROT_END(httpd_ws_mt, NULL, LROT_MASK_GC_INDEX) +#endif LROT_BEGIN(httpd_req_mt, NULL, LROT_MASK_INDEX) LROT_FUNCENTRY( __index, lhttpd_req_index ) LROT_END(httpd_req_mt, NULL, LROT_MASK_INDEX) - LROT_BEGIN(httpd, NULL, 0) LROT_FUNCENTRY( start, lhttpd_start ) LROT_FUNCENTRY( stop, lhttpd_stop ) LROT_FUNCENTRY( static, lhttpd_static ) LROT_FUNCENTRY( dynamic, lhttpd_dynamic ) +#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS + LROT_FUNCENTRY( websocket, lhttpd_websocket ) +#endif LROT_FUNCENTRY( unregister, lhttpd_unregister ) LROT_NUMENTRY( GET, HTTP_GET ) @@ -748,6 +1074,10 @@ static int lhttpd_init(lua_State *L) luaL_rometatable(L, REQUEST_METATABLE, LROT_TABLEREF(httpd_req_mt)); +#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS + luaL_rometatable(L, WS_METATABLE, LROT_TABLEREF(httpd_ws_mt)); +#endif + return 0; } diff --git a/docs/modules/httpd.md b/docs/modules/httpd.md index f651f5c3..c41f754d 100644 --- a/docs/modules/httpd.md +++ b/docs/modules/httpd.md @@ -256,6 +256,44 @@ end httpd.dynamic(httpd.PUT, "/foo", put_foo) ``` +## httpd.websocket() + +Registers a websocket route handler. + +#### Syntax +```lua +httpd.websocket(route, handler) +``` + +#### Parameters +- `route` The route prefix. Be mindful of any trailing "/" as that may interact +with the `auto_index` functionality. +- `handler` The route handler function - `handler(req, ws)`. The `req` object is +the same as for a regular dynamic route. The provided websocket +object `ws` has the following fields/functions: + - `on` This allows registration of handlers when data is received. This is invoked with + two arguments -- the name of the event and the handler for that event. The allowable names are: + - `text` The handler is called with a single string argument whenever a text message is received. + - `binary` The handler is called with a single string argument whenever a binary message is received. + - `close` The handler is called when the client wants to close the connection. + - `text` This can be called with a string argument and it sends a text message. + - `binary` This can be called with a string argument and it sends a binary message. + - `close` The connection to the client is closed. + +#### Returns +nil + +#### Example +```lua +httpd.start({ webroot = "web" }) + +function echo_ws(req, ws) + ws:on('text', function(data) ws:text(data) end) +end + +httpd.websocket("/echo", echo_ws) +``` + ## httpd.unregister() Unregisters a previously registered handler. The default handlers may be