diff --git a/app/modules/pipe.c b/app/modules/pipe.c index 77788352..c46770af 100644 --- a/app/modules/pipe.c +++ b/app/modules/pipe.c @@ -435,12 +435,19 @@ static int pipe_reader(lua_State *L) { return 1; } +// return number of records +static int pipe_nrec (lua_State *L) { + lua_pushinteger(L, lua_objlen(L, 1) - 1); + return 1; +} + LROT_BEGIN(pipe_funcs, NULL, 0) LROT_FUNCENTRY( __len, pipe__len ) LROT_FUNCENTRY( __tostring, pipe__tostring ) LROT_FUNCENTRY( read, pipe_read ) LROT_FUNCENTRY( reader, pipe_reader ) LROT_FUNCENTRY( unread, pipe_unread ) + LROT_FUNCENTRY( nrec, pipe_nrec ) LROT_END(pipe_funcs, NULL, 0) /* Using a index func is needed because the write method is at pipe[1] */ diff --git a/docs/modules/pipe.md b/docs/modules/pipe.md index 1be4fa4f..0b717d54 100644 --- a/docs/modules/pipe.md +++ b/docs/modules/pipe.md @@ -129,3 +129,11 @@ Write a string to a pipe object. #### Returns Nothing + +## pobj:nrec() + +Return the number of internal records in the pipe. Each record ranges from 1 +to 256 bytes in length, with full chunks being the most common case. As +extracting from a pipe only to `unread` if too few bytes are available, it may +be useful to have a quickly estimated upper bound on the length of the string +that would be returned. diff --git a/lua_examples/pipeutils.lua b/lua_examples/pipeutils.lua new file mode 100644 index 00000000..9caf9ab0 --- /dev/null +++ b/lua_examples/pipeutils.lua @@ -0,0 +1,54 @@ +-- A collection of pipe-based utility functions + +-- A convenience wrapper for chunking data arriving in bursts into more sizable +-- blocks; `o` will be called once per chunk. The `flush` method can be used to +-- drain the internal buffer. `flush` MUST be called at the end of the stream, +-- **even if the stream is a multiple of the chunk size** due to internal +-- buffering. Flushing results in smaller chunk(s) being output, of course. +local function chunker(o, csize, prio) + assert (type(o) == "function" and type(csize) == "number" and 1 <= csize) + local p = pipe.create(function(p) + -- wait until it looks very likely that read is going to succeed + -- and we won't have to unread. This may hold slightly more than + -- a chunk in the underlying pipe object. + if 256 * (p:nrec() - 1) <= csize then return nil end + + local d = p:read(csize) + if #d < csize + then p:unread(d) return false + else o(d) return true + end + end, prio or node.task.LOW_PRIORITY) + + return { + flush = function() for d in p:reader(csize) do o(d) end end, + write = function(d) p:write(d) end + } +end + +-- Stream and decode lines of complete base64 blocks, calling `o(data)` with +-- decoded chunks or calling `e(badinput, errorstr)` on error; the error +-- callback must ensure that this conduit is never written to again. +local function debase64(o, e, prio) + assert (type(o) == "function" and type(e) == "function") + local p = pipe.create(function(p) + local s = p:read("\n+") + if s:sub(-1) == "\n" then -- guard against incomplete line + s = s:match("^%s*(%S*)%s*$") + if #s ~= 0 then -- guard against empty line + local ok, d = pcall(encoder.fromBase64, s) + if ok then o(d) else e(s, d); return false end + end + return true + else + p:unread(s) + return false + end + end, prio or node.task.LOW_PRIORITY) + return { write = function(d) p:write(d) end } +end + +return { + chunker = chunker, + debase64 = debase64, +} diff --git a/tools/luacheck_config.lua b/tools/luacheck_config.lua index 947270d0..653b6dd5 100644 --- a/tools/luacheck_config.lua +++ b/tools/luacheck_config.lua @@ -487,6 +487,11 @@ stds.nodemcu_libs = { new = empty } }, + pipe = { + fields = { + create = empty + } + }, pwm = { fields = { close = empty,