Mostly working version of the websocket support

This commit is contained in:
Philip Gladstone 2022-03-10 21:52:23 -05:00
parent cb434811ca
commit ec604d9c70
3 changed files with 451 additions and 75 deletions

View File

@ -126,6 +126,14 @@ menu "NodeMCU modules"
Includes the HTTPD module. This module uses the regular IDF Includes the HTTPD module. This module uses the regular IDF
http server component internally. http server component internally.
config NODEMCU_CMODULE_HTTPD_WS
bool "Include websocket support." if NODEMCU_CMODULE_HTTPD
default "n"
select HTTPD_WS_SUPPORT
help
Includes the websocket support. This module uses the regular IDF
http server component internally.
config NODEMCU_CMODULE_HTTPD_MAX_RESPONSE_HEADERS config NODEMCU_CMODULE_HTTPD_MAX_RESPONSE_HEADERS
int "Max response header fields" if NODEMCU_CMODULE_HTTPD int "Max response header fields" if NODEMCU_CMODULE_HTTPD
default 5 default 5

View File

@ -73,6 +73,9 @@ typedef enum {
READ_BODY_CHUNK, READ_BODY_CHUNK,
SEND_RESPONSE, SEND_RESPONSE,
SEND_PARTIAL_RESPONSE, SEND_PARTIAL_RESPONSE,
SEND_ERROR,
SEND_OK,
FREE_WS_OBJECT,
} request_type_t; } request_type_t;
typedef struct { typedef struct {
@ -99,6 +102,11 @@ typedef struct {
const char *query_str; const char *query_str;
int method; int method;
size_t body_len; size_t body_len;
httpd_req_t *req;
#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS
httpd_ws_frame_t ws_pkt;
int reference;
#endif
} request_data_t; } request_data_t;
@ -107,6 +115,21 @@ typedef struct {
uint32_t guard; uint32_t guard;
} req_udata_t; } req_udata_t;
#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS
#define WS_METATABLE "httpd.ws"
#define HTTP_WEBSOCKET 1234
#define HTTP_WEBSOCKET_GET 1235
typedef struct {
bool closed;
httpd_handle_t handle;
int fd;
int self_ref;
int text_fn_ref;
int binary_fn_ref;
int close_fn_ref;
} ws_connection_t;
#endif
typedef enum { INDEX_NONE, INDEX_ROOT, INDEX_ALL } index_mode_t; typedef enum { INDEX_NONE, INDEX_ROOT, INDEX_ALL } index_mode_t;
@ -240,10 +263,9 @@ static esp_err_t auto_index_handler(httpd_req_t *req)
return ESP_OK; return ESP_OK;
} }
static esp_err_t dynamic_handler_httpd(httpd_req_t *req) static esp_err_t dynamic_handler_httpd(httpd_req_t *req)
{ {
size_t query_len = httpd_req_get_url_query_len(req); size_t query_len = req->method != HTTP_WEBSOCKET ? httpd_req_get_url_query_len(req) : 0;
char *query = query_len ? malloc(query_len + 1) : NULL; char *query = query_len ? malloc(query_len + 1) : NULL;
if (query_len) if (query_len)
httpd_req_get_url_query_str(req, query, query_len + 1); httpd_req_get_url_query_str(req, query, query_len + 1);
@ -254,7 +276,34 @@ static esp_err_t dynamic_handler_httpd(httpd_req_t *req)
.query_str = query, .query_str = query,
.method = req->method, .method = req->method,
.body_len = req->content_len, .body_len = req->content_len,
.req = req,
}; };
#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS
memset(&req_data.ws_pkt, 0, sizeof(httpd_ws_frame_t));
if (req->method == HTTP_WEBSOCKET) {
printf("Handling callback for websocket\n");
req_data.ws_pkt.type = HTTPD_WS_TYPE_TEXT;
/* Set max_len = 0 to get the frame len */
esp_err_t ret = httpd_ws_recv_frame(req, &req_data.ws_pkt, 0);
if (ret != ESP_OK) {
return ret;
}
printf("About to allocate %d bytes for buffer\n", req_data.ws_pkt.len);
char *buf = malloc(req_data.ws_pkt.len);
if (!buf) {
return ESP_ERR_NO_MEM;
}
req_data.ws_pkt.payload = (void *) buf;
ret = httpd_ws_recv_frame(req, &req_data.ws_pkt, req_data.ws_pkt.len);
if (ret != ESP_OK) {
free(buf);
return ret;
}
}
#endif
// Pass the req info over to the LVM thread // Pass the req info over to the LVM thread
task_post_medium(dynamic_task, (task_param_t)&req_data); task_post_medium(dynamic_task, (task_param_t)&req_data);
@ -319,13 +368,50 @@ static esp_err_t dynamic_handler_httpd(httpd_req_t *req)
} }
} }
#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS
if (req_data.ws_pkt.payload) {
free(req_data.ws_pkt.payload);
req_data.ws_pkt.payload = 0;
}
#endif
// Request processed, release LVM thread // Request processed, release LVM thread
xSemaphoreGive(done); xSemaphoreGive(done);
} while(tr.request_type != SEND_RESPONSE); // done } while(tr.request_type != SEND_RESPONSE && tr.request_type != SEND_ERROR && tr.request_type != SEND_OK); // done
if (tr.request_type == SEND_ERROR) {
return ESP_FAIL;
}
return ESP_OK; return ESP_OK;
} }
#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS
static esp_err_t websocket_handler_httpd(httpd_req_t *req)
{
if (req->method == HTTP_GET) {
req->method = HTTP_WEBSOCKET_GET;
} else {
req->method = HTTP_WEBSOCKET;
}
return dynamic_handler_httpd(req);
}
static void free_sess_ctx(void *ctx) {
int ref = (int) ctx;
request_data_t req_data = {
.method = FREE_WS_OBJECT,
.reference = ref,
};
task_post_medium(dynamic_task, (task_param_t)&req_data);
thread_request_t tr;
xQueueReceive(queue, &tr, portMAX_DELAY); // Receive the reply
xSemaphoreGive(done);
}
#endif
// ---- helper functions ---------------------------------------------- // ---- helper functions ----------------------------------------------
@ -434,7 +520,6 @@ static int lhttpd_req_index(lua_State *L)
#undef KEY_IS #undef KEY_IS
} }
static void dynamic_handler_lvm(task_param_t param, task_prio_t prio) static void dynamic_handler_lvm(task_param_t param, task_prio_t prio)
{ {
UNUSED(prio); UNUSED(prio);
@ -452,84 +537,162 @@ static void dynamic_handler_lvm(task_param_t param, task_prio_t prio)
.response = &resp, .response = &resp,
}; };
lua_rawgeti(L, LUA_REGISTRYINDEX, dynamic_handlers_table_ref); // +1 #ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS
lua_getfield(L, -1, req_info->key); // +1 if (req_info->method == FREE_WS_OBJECT) {
if (lua_isfunction(L, -1)) printf("Freeing WS Object %d\n", req_info->reference);
{ lua_rawgeti(L, LUA_REGISTRYINDEX, req_info->reference);
// push req ws_connection_t *ws = (ws_connection_t *) luaL_checkudata(L, -1, WS_METATABLE);
req_udata_t *ud =
(req_udata_t *)lua_newuserdata(L, sizeof(req_udata_t)); // +1
ud->req_info = req_info;
ud->guard = guard;
luaL_getmetatable(L, REQUEST_METATABLE); // +1
lua_setmetatable(L, -2); // -1
int err = luaL_pcallx(L, 1, 1); // -1 +1 if (!ws->closed) {
if (!err && lua_istable(L, -1)) printf("FIrst close\n");
{ ws->closed = true;
// pull out response data if (ws->close_fn_ref > 0) {
int t = lua_gettop(L); // response table index printf("Calling close handler\n");
lua_getfield(L, t, "status"); // +1 lua_rawgeti(L, LUA_REGISTRYINDEX, ws->close_fn_ref);
resp.status_str = luaL_optstring(L, -1, "200 OK"); luaL_pcallx(L, 0, 0);
lua_getfield(L, t, "type"); // +1
resp.content_type = luaL_optstring(L, -1, NULL);
lua_getfield(L, t, "body"); // +1
resp.body_data = luaL_optlstring(L, -1, NULL, &resp.body_len);
if (!resp.body_data)
resp.body_len = 0;
lua_getfield(L, t, "headers"); // +1
if (lua_istable(L, -1))
{
lua_pushnil(L); // +1
for (unsigned i = 0; lua_next(L, -2); ++i) // +1
{
if (i >= MAX_RESPONSE_HEADERS)
{
printf("Warning - too many response headers, ignoring some!\n");
break;
}
resp.headers[i].key = lua_tostring(L, -2);
resp.headers[i].value = lua_tostring(L, -1);
lua_pop(L, 1); // drop value, keep key for lua_next()
}
} }
lua_getfield(L, t, "getbody"); // +1 }
if (lua_isfunction(L, -1))
luaL_unref(L, LUA_REGISTRYINDEX, ws->self_ref);
ws->self_ref = LUA_NOREF;
tr.request_type = SEND_OK;
} else if (req_info->method == HTTP_WEBSOCKET) {
printf("Handling websocket callbacks\n");
// Just handle the callbacks here
if (req_info->req->sess_ctx) {
// Websocket event arrived
printf("Sess_ctx = %d\n", (int) req_info->req->sess_ctx);
lua_rawgeti(L, LUA_REGISTRYINDEX, (int) req_info->req->sess_ctx);
ws_connection_t *ws = (ws_connection_t *) luaL_checkudata(L, -1, WS_METATABLE);
int fn = 0;
if (req_info->ws_pkt.type == HTTPD_WS_TYPE_TEXT) {
fn = ws->text_fn_ref;
} else if (req_info->ws_pkt.type == HTTPD_WS_TYPE_BINARY) {
fn = ws->binary_fn_ref;
}
if (fn) {
lua_rawgeti(L, LUA_REGISTRYINDEX, fn);
lua_pushlstring(L, (const char *) req_info->ws_pkt.payload, (size_t) req_info->ws_pkt.len);
luaL_pcallx(L, 1, 0);
}
}
tr.request_type = SEND_OK;
} else
#endif
{
lua_rawgeti(L, LUA_REGISTRYINDEX, dynamic_handlers_table_ref); // +1
lua_getfield(L, -1, req_info->key); // +1
if (lua_isfunction(L, -1))
{
// push req
req_udata_t *ud =
(req_udata_t *)lua_newuserdata(L, sizeof(req_udata_t)); // +1
ud->req_info = req_info;
ud->guard = guard;
luaL_getmetatable(L, REQUEST_METATABLE); // +1
lua_setmetatable(L, -2); // -1
#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS
if (strncmp(req_info->key, "[WS]", 4) == 0) {
if (req_info->method == HTTP_WEBSOCKET_GET) {
// web socket
ws_connection_t *ws = (ws_connection_t *) lua_newuserdata(L, sizeof(*ws));
memset(ws, 0, sizeof(*ws));
luaL_getmetatable(L, WS_METATABLE);
lua_setmetatable(L, -2);
lua_pushvalue(L, -1);
ws->self_ref = luaL_ref(L, LUA_REGISTRYINDEX);
ws->handle = req_info->req->handle;
ws->fd = httpd_req_to_sockfd(req_info->req);
// Set the session context so we know what is going on.
req_info->req->sess_ctx = (void *) ws->self_ref;
req_info->req->free_ctx = free_sess_ctx;
int err = luaL_pcallx(L, 2, 0);
if (err) {
tr.request_type = SEND_ERROR;
luaL_unref(L, LUA_REGISTRYINDEX, ws->self_ref);
ws->self_ref = LUA_NOREF;
} else {
tr.request_type = SEND_OK;
}
}
} else
#endif
{ {
// Okay, we're doing a chunked body send, so we have to repeatedly int err = luaL_pcallx(L, 1, 1); // -1 +1
// call the provided getbody() function until it returns nil if (!err && lua_istable(L, -1))
bool headers_cleared = false;
tr.request_type = SEND_PARTIAL_RESPONSE;
next_chunk:
resp.body_data = NULL;
resp.body_len = 0;
err = luaL_pcallx(L, 0, 1); // -1 +1
resp.body_data =
err ? NULL : luaL_optlstring(L, -1, NULL, &resp.body_len);
if (resp.body_data)
{ {
// Toss this bit of response data over to the httpd thread // pull out response data
xQueueSend(queue, &tr, portMAX_DELAY); int t = lua_gettop(L); // response table index
// ...and wait until it's done sending it lua_getfield(L, t, "status"); // +1
xSemaphoreTake(done, portMAX_DELAY); resp.status_str = luaL_optstring(L, -1, "200 OK");
lua_getfield(L, t, "type"); // +1
lua_pop(L, 1); // -1 resp.content_type = luaL_optstring(L, -1, NULL);
lua_getfield(L, t, "body"); // +1
if (!headers_cleared) resp.body_data = luaL_optlstring(L, -1, NULL, &resp.body_len);
if (!resp.body_data)
resp.body_len = 0;
lua_getfield(L, t, "headers"); // +1
if (lua_istable(L, -1))
{ {
// Clear the header data; it's only used for the first chunk lua_pushnil(L); // +1
resp.status_str = NULL; for (unsigned i = 0; lua_next(L, -2); ++i) // +1
resp.content_type = NULL; {
for (unsigned i = 0; i < MAX_RESPONSE_HEADERS; ++i) if (i >= MAX_RESPONSE_HEADERS)
resp.headers[i].key = resp.headers[i].value = NULL; {
printf("Warning - too many response headers, ignoring some!\n");
headers_cleared = true; break;
}
resp.headers[i].key = lua_tostring(L, -2);
resp.headers[i].value = lua_tostring(L, -1);
lua_pop(L, 1); // drop value, keep key for lua_next()
}
} }
lua_getfield(L, t, "getbody"); // +1 lua_getfield(L, t, "getbody"); // +1
goto next_chunk; if (lua_isfunction(L, -1))
{
// Okay, we're doing a chunked body send, so we have to repeatedly
// call the provided getbody() function until it returns nil
bool headers_cleared = false;
tr.request_type = SEND_PARTIAL_RESPONSE;
next_chunk:
resp.body_data = NULL;
resp.body_len = 0;
err = luaL_pcallx(L, 0, 1); // -1 +1
resp.body_data =
err ? NULL : luaL_optlstring(L, -1, NULL, &resp.body_len);
if (resp.body_data)
{
// Toss this bit of response data over to the httpd thread
xQueueSend(queue, &tr, portMAX_DELAY);
// ...and wait until it's done sending it
xSemaphoreTake(done, portMAX_DELAY);
lua_pop(L, 1); // -1
if (!headers_cleared)
{
// Clear the header data; it's only used for the first chunk
resp.status_str = NULL;
resp.content_type = NULL;
for (unsigned i = 0; i < MAX_RESPONSE_HEADERS; ++i)
resp.headers[i].key = resp.headers[i].value = NULL;
headers_cleared = true;
}
lua_getfield(L, t, "getbody"); // +1
goto next_chunk;
}
// else, getbody() returned nil, so let the normal exit path
// toss the final SEND_PARTIAL_RESPONSE request over to the httpd
}
} }
// else, getbody() returned nil, so let the normal exit path
// toss the final SEND_PARTIAL_RESPONSE request over to the httpd
} }
} }
} }
@ -584,6 +747,46 @@ static int lhttpd_static(lua_State *L)
return 1; return 1;
} }
#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS
// add websocket route: httpd.websocket(uri, handler)
static int lhttpd_websocket(lua_State *L)
{
if (!server)
return luaL_error(L, "Server not started");
const char *match = luaL_checkstring(L, 1);
luaL_checkfunction(L, 2);
lua_settop(L, 2);
if (!match[0])
return luaL_error(L, "Null route not supported");
// Create a key for this entry
const char *key = lua_pushfstring(L, "[WS]%s", match);
// Store this in our dynamic handlers table, so the ref lives on
// on, but so that we can also free it after server shutdown.
lua_rawgeti(L, LUA_REGISTRYINDEX, dynamic_handlers_table_ref);
lua_pushvalue(L, -2); // key
lua_pushvalue(L, 2); // handler
lua_settable(L, -3);
httpd_uri_t websocket_handler = {
.uri = match,
.method = HTTP_GET,
.handler = websocket_handler_httpd,
.user_ctx = (void *)key,
.is_websocket = true,
.handle_ws_control_frames = false,
};
if (httpd_register_uri_handler(server, &websocket_handler) == 1)
lua_pushinteger(L, 1);
else
lua_pushnil(L);
return 1;
}
#endif
// add dynamic route: httpd.dynamic(method, uri, handler) // add dynamic route: httpd.dynamic(method, uri, handler)
static int lhttpd_dynamic(lua_State *L) static int lhttpd_dynamic(lua_State *L)
@ -715,17 +918,140 @@ static int lhttpd_stop(lua_State *L)
return 0; return 0;
} }
#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS
// Websocket functions
typedef struct {
httpd_handle_t hd;
int fd;
int type;
size_t len;
char *data;
} async_send_t;
/*
* async send function, which we put into the httpd work queue
*/
static void ws_async_send(void *arg)
{
async_send_t *async_send = arg;
httpd_handle_t hd = async_send->hd;
int fd = async_send->fd;
httpd_ws_frame_t ws_pkt;
memset(&ws_pkt, 0, sizeof(httpd_ws_frame_t));
ws_pkt.payload = (uint8_t*)async_send->data;
ws_pkt.len = async_send->len;
ws_pkt.type = async_send->type;
httpd_ws_send_frame_async(hd, fd, &ws_pkt);
free(async_send);
}
static esp_err_t trigger_async_send(ws_connection_t *ws, int type, const char *data, size_t len) {
async_send_t *async_send = malloc(sizeof(async_send_t) + len);
async_send->hd = ws->handle;
async_send->fd = ws->fd;
async_send->type = type;
async_send->len = len;
async_send->data = (char *) (async_send + 1);
memcpy(async_send->data, data, len);
return httpd_queue_work(ws->handle, ws_async_send, async_send);
}
static void ws_async_close(void *arg) {
async_send_t *async_close = arg;
httpd_sess_trigger_close(async_close->hd, async_close->fd);
free(async_close);
}
static int ws_close(lua_State *L) {
ws_connection_t *ws = (ws_connection_t*)luaL_checkudata(L, 1, WS_METATABLE);
if (!ws->closed) {
ws->closed = true;
async_send_t *async_close = malloc(sizeof(async_send_t));
async_close->hd = ws->handle;
async_close->fd = ws->fd;
httpd_queue_work(ws->handle, ws_async_close, async_close);
}
return 0;
}
// event types: text, binary, close
static int ws_on(lua_State *L) {
ws_connection_t *ws = (ws_connection_t*)luaL_checkudata(L, 1, WS_METATABLE);
const char *event = lua_tostring(L, 2);
int *slot = NULL;
if (strcmp(event, "text") == 0) {
slot = &ws->text_fn_ref;
} else if (strcmp(event, "binary") == 0) {
slot = &ws->binary_fn_ref;
} else if (strcmp(event, "close") == 0) {
slot = &ws->close_fn_ref;
} else {
return luaL_error(L, "Incorrect event argument");
}
if (*slot) {
luaL_unref(L, LUA_REGISTRYINDEX, *slot);
*slot = 0;
}
if (!lua_isnil(L, 3)) {
luaL_checkfunction(L, 3);
lua_pushvalue(L, 3);
*slot = luaL_ref(L, LUA_REGISTRYINDEX);
}
return 0;
}
static int ws_textbinary(lua_State *L, int type) {
ws_connection_t *ws = (ws_connection_t*)luaL_checkudata(L, 1, WS_METATABLE);
size_t len;
const char *data = lua_tolstring(L, 2, &len);
esp_err_t rc = trigger_async_send(ws, type, data, len);
if (rc) {
return luaL_error(L, "Failed to send to websocket");
}
return 0;
}
static int ws_text(lua_State *L) {
return ws_textbinary(L, HTTPD_WS_TYPE_TEXT);
}
static int ws_binary(lua_State *L) {
return ws_textbinary(L, HTTPD_WS_TYPE_BINARY);
}
LROT_BEGIN(httpd_ws_mt, NULL, LROT_MASK_GC_INDEX)
LROT_TABENTRY( __index, httpd_ws_mt )
LROT_FUNCENTRY( __gc, ws_close )
LROT_FUNCENTRY( close, ws_close )
LROT_FUNCENTRY( on, ws_on )
LROT_FUNCENTRY( text, ws_text )
LROT_FUNCENTRY( binary, ws_binary )
LROT_END(httpd_ws_mt, NULL, LROT_MASK_GC_INDEX)
#endif
LROT_BEGIN(httpd_req_mt, NULL, LROT_MASK_INDEX) LROT_BEGIN(httpd_req_mt, NULL, LROT_MASK_INDEX)
LROT_FUNCENTRY( __index, lhttpd_req_index ) LROT_FUNCENTRY( __index, lhttpd_req_index )
LROT_END(httpd_req_mt, NULL, LROT_MASK_INDEX) LROT_END(httpd_req_mt, NULL, LROT_MASK_INDEX)
LROT_BEGIN(httpd, NULL, 0) LROT_BEGIN(httpd, NULL, 0)
LROT_FUNCENTRY( start, lhttpd_start ) LROT_FUNCENTRY( start, lhttpd_start )
LROT_FUNCENTRY( stop, lhttpd_stop ) LROT_FUNCENTRY( stop, lhttpd_stop )
LROT_FUNCENTRY( static, lhttpd_static ) LROT_FUNCENTRY( static, lhttpd_static )
LROT_FUNCENTRY( dynamic, lhttpd_dynamic ) LROT_FUNCENTRY( dynamic, lhttpd_dynamic )
#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS
LROT_FUNCENTRY( websocket, lhttpd_websocket )
#endif
LROT_FUNCENTRY( unregister, lhttpd_unregister ) LROT_FUNCENTRY( unregister, lhttpd_unregister )
LROT_NUMENTRY( GET, HTTP_GET ) LROT_NUMENTRY( GET, HTTP_GET )
@ -748,6 +1074,10 @@ static int lhttpd_init(lua_State *L)
luaL_rometatable(L, REQUEST_METATABLE, LROT_TABLEREF(httpd_req_mt)); luaL_rometatable(L, REQUEST_METATABLE, LROT_TABLEREF(httpd_req_mt));
#ifdef CONFIG_NODEMCU_CMODULE_HTTPD_WS
luaL_rometatable(L, WS_METATABLE, LROT_TABLEREF(httpd_ws_mt));
#endif
return 0; return 0;
} }

View File

@ -256,6 +256,44 @@ end
httpd.dynamic(httpd.PUT, "/foo", put_foo) httpd.dynamic(httpd.PUT, "/foo", put_foo)
``` ```
## httpd.websocket()
Registers a websocket route handler.
#### Syntax
```lua
httpd.websocket(route, handler)
```
#### Parameters
- `route` The route prefix. Be mindful of any trailing "/" as that may interact
with the `auto_index` functionality.
- `handler` The route handler function - `handler(req, ws)`. The `req` object is
the same as for a regular dynamic route. The provided websocket
object `ws` has the following fields/functions:
- `on` This allows registration of handlers when data is received. This is invoked with
two arguments -- the name of the event and the handler for that event. The allowable names are:
- `text` The handler is called with a single string argument whenever a text message is received.
- `binary` The handler is called with a single string argument whenever a binary message is received.
- `close` The handler is called when the client wants to close the connection.
- `text` This can be called with a string argument and it sends a text message.
- `binary` This can be called with a string argument and it sends a binary message.
- `close` The connection to the client is closed.
#### Returns
nil
#### Example
```lua
httpd.start({ webroot = "web" })
function echo_ws(req, ws)
ws:on('text', function(data) ws:text(data) end)
end
httpd.websocket("/echo", echo_ws)
```
## httpd.unregister() ## httpd.unregister()
Unregisters a previously registered handler. The default handlers may be Unregisters a previously registered handler. The default handlers may be