Streaming to disk before passing onto S3.
This commit is contained in:
parent
26a0801b75
commit
155de99b68
|
@ -14,6 +14,7 @@ import { v4 } from "uuid"
|
|||
import { APP_PREFIX, APP_DEV_PREFIX } from "../db"
|
||||
import fsp from "fs/promises"
|
||||
import { HeadObjectOutput } from "aws-sdk/clients/s3"
|
||||
import { ReadableStream } from "stream/web"
|
||||
|
||||
const streamPipeline = promisify(stream.pipeline)
|
||||
// use this as a temporary store of buckets that are being created
|
||||
|
@ -261,14 +262,6 @@ export async function streamUpload({
|
|||
...extra,
|
||||
}
|
||||
|
||||
// make sure we have the stream before we try to push it to object store
|
||||
if (stream.on) {
|
||||
await new Promise((resolve, reject) => {
|
||||
stream.on("finish", resolve)
|
||||
stream.on("error", reject)
|
||||
})
|
||||
}
|
||||
|
||||
const details = await objectStore.upload(params).promise()
|
||||
const headDetails = await objectStore
|
||||
.headObject({
|
||||
|
|
|
@ -68,7 +68,6 @@
|
|||
"aws-sdk": "2.1030.0",
|
||||
"bcrypt": "5.1.0",
|
||||
"bcryptjs": "2.4.3",
|
||||
"bl": "^6.0.12",
|
||||
"bull": "4.10.1",
|
||||
"chokidar": "3.5.3",
|
||||
"content-disposition": "^0.5.4",
|
||||
|
@ -116,7 +115,8 @@
|
|||
"uuid": "^8.3.2",
|
||||
"validate.js": "0.13.1",
|
||||
"worker-farm": "1.7.0",
|
||||
"xml2js": "0.5.0"
|
||||
"xml2js": "0.5.0",
|
||||
"tmp": "0.2.3"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@babel/preset-env": "7.16.11",
|
||||
|
@ -137,6 +137,7 @@
|
|||
"@types/supertest": "2.0.14",
|
||||
"@types/tar": "6.1.5",
|
||||
"@types/uuid": "8.3.4",
|
||||
"@types/tmp": "0.2.6",
|
||||
"copyfiles": "2.4.1",
|
||||
"docker-compose": "0.23.17",
|
||||
"jest": "29.7.0",
|
||||
|
|
|
@ -657,6 +657,7 @@ describe("REST Integration", () => {
|
|||
mockReadable.push(null)
|
||||
;(fetch as unknown as jest.Mock).mockImplementationOnce(() =>
|
||||
Promise.resolve({
|
||||
status: 200,
|
||||
headers: {
|
||||
raw: () => ({
|
||||
"content-type": [contentType],
|
||||
|
@ -700,6 +701,7 @@ describe("REST Integration", () => {
|
|||
mockReadable.push(null)
|
||||
;(fetch as unknown as jest.Mock).mockImplementationOnce(() =>
|
||||
Promise.resolve({
|
||||
status: 200,
|
||||
headers: {
|
||||
raw: () => ({
|
||||
"content-type": [contentType],
|
||||
|
|
|
@ -9,10 +9,12 @@ import { context, objectStore, sql } from "@budibase/backend-core"
|
|||
import { v4 } from "uuid"
|
||||
import { parseStringPromise as xmlParser } from "xml2js"
|
||||
import { formatBytes } from "../../utilities"
|
||||
import bl from "bl"
|
||||
import env from "../../environment"
|
||||
import { InvalidColumns } from "../../constants"
|
||||
import { helpers, utils } from "@budibase/shared-core"
|
||||
import { pipeline } from "stream/promises"
|
||||
import tmp from "tmp"
|
||||
import fs from "fs"
|
||||
|
||||
type PrimitiveTypes =
|
||||
| FieldType.STRING
|
||||
|
@ -360,38 +362,44 @@ export async function handleFileResponse(
|
|||
const key = `${context.getProdAppId()}/${processedFileName}`
|
||||
const bucket = objectStore.ObjectStoreBuckets.TEMP
|
||||
|
||||
const stream = response.body.pipe(bl((error, data) => data))
|
||||
// put the response stream to disk temporarily as a buffer
|
||||
const tmpObj = tmp.fileSync()
|
||||
try {
|
||||
await pipeline(response.body, fs.createWriteStream(tmpObj.name))
|
||||
if (response.body) {
|
||||
const contentLength = response.headers.get("content-length")
|
||||
if (contentLength) {
|
||||
size = parseInt(contentLength, 10)
|
||||
}
|
||||
|
||||
if (response.body) {
|
||||
const contentLength = response.headers.get("content-length")
|
||||
if (contentLength) {
|
||||
size = parseInt(contentLength, 10)
|
||||
const details = await objectStore.streamUpload({
|
||||
bucket,
|
||||
filename: key,
|
||||
stream: fs.createReadStream(tmpObj.name),
|
||||
ttl: 1,
|
||||
type: response.headers["content-type"],
|
||||
})
|
||||
if (!size && details.ContentLength) {
|
||||
size = details.ContentLength
|
||||
}
|
||||
}
|
||||
|
||||
const details = await objectStore.streamUpload({
|
||||
bucket,
|
||||
filename: key,
|
||||
stream,
|
||||
ttl: 1,
|
||||
type: response.headers["content-type"],
|
||||
})
|
||||
if (!size && details.ContentLength) {
|
||||
size = details.ContentLength
|
||||
presignedUrl = objectStore.getPresignedUrl(bucket, key)
|
||||
return {
|
||||
data: {
|
||||
size,
|
||||
name: processedFileName,
|
||||
url: presignedUrl,
|
||||
extension: fileExtension,
|
||||
key: key,
|
||||
},
|
||||
info: {
|
||||
code: response.status,
|
||||
size: formatBytes(size.toString()),
|
||||
time: `${Math.round(performance.now() - startTime)}ms`,
|
||||
},
|
||||
}
|
||||
}
|
||||
presignedUrl = objectStore.getPresignedUrl(bucket, key)
|
||||
return {
|
||||
data: {
|
||||
size,
|
||||
name: processedFileName,
|
||||
url: presignedUrl,
|
||||
extension: fileExtension,
|
||||
key: key,
|
||||
},
|
||||
info: {
|
||||
code: response.status,
|
||||
size: formatBytes(size.toString()),
|
||||
time: `${Math.round(performance.now() - startTime)}ms`,
|
||||
},
|
||||
} finally {
|
||||
// cleanup tmp
|
||||
tmpObj.removeCallback()
|
||||
}
|
||||
}
|
||||
|
|
12
yarn.lock
12
yarn.lock
|
@ -6348,6 +6348,11 @@
|
|||
dependencies:
|
||||
"@types/estree" "*"
|
||||
|
||||
"@types/tmp@0.2.6":
|
||||
version "0.2.6"
|
||||
resolved "https://registry.yarnpkg.com/@types/tmp/-/tmp-0.2.6.tgz#d785ee90c52d7cc020e249c948c36f7b32d1e217"
|
||||
integrity sha512-chhaNf2oKHlRkDGt+tiKE2Z5aJ6qalm7Z9rlLdBwmOiAAf09YQvvoLXjWK4HWPF1xU/fqvMgfNfpVoBscA/tKA==
|
||||
|
||||
"@types/tough-cookie@*", "@types/tough-cookie@^4.0.2":
|
||||
version "4.0.2"
|
||||
resolved "https://registry.yarnpkg.com/@types/tough-cookie/-/tough-cookie-4.0.2.tgz#6286b4c7228d58ab7866d19716f3696e03a09397"
|
||||
|
@ -7700,7 +7705,7 @@ bl@^4.0.3, bl@^4.1.0:
|
|||
inherits "^2.0.4"
|
||||
readable-stream "^3.4.0"
|
||||
|
||||
bl@^6.0.12, bl@^6.0.3:
|
||||
bl@^6.0.3:
|
||||
version "6.0.12"
|
||||
resolved "https://registry.yarnpkg.com/bl/-/bl-6.0.12.tgz#77c35b96e13aeff028496c798b75389ddee9c7f8"
|
||||
integrity sha512-EnEYHilP93oaOa2MnmNEjAcovPS3JlQZOyzGXi3EyEpPhm9qWvdDp7BmAVEVusGzp8LlwQK56Av+OkDoRjzE0w==
|
||||
|
@ -21283,6 +21288,11 @@ tlhunter-sorted-set@^0.1.0:
|
|||
resolved "https://registry.yarnpkg.com/tlhunter-sorted-set/-/tlhunter-sorted-set-0.1.0.tgz#1c3eae28c0fa4dff97e9501d2e3c204b86406f4b"
|
||||
integrity sha512-eGYW4bjf1DtrHzUYxYfAcSytpOkA44zsr7G2n3PV7yOUR23vmkGe3LL4R+1jL9OsXtbsFOwe8XtbCrabeaEFnw==
|
||||
|
||||
tmp@0.2.3:
|
||||
version "0.2.3"
|
||||
resolved "https://registry.yarnpkg.com/tmp/-/tmp-0.2.3.tgz#eb783cc22bc1e8bebd0671476d46ea4eb32a79ae"
|
||||
integrity sha512-nZD7m9iCPC5g0pYmcaxogYKggSfLsdxl8of3Q/oIbqCqLLIO9IAF0GWjX1z9NZRHPiXv8Wex4yDCaZsgEw0Y8w==
|
||||
|
||||
tmp@^0.0.33:
|
||||
version "0.0.33"
|
||||
resolved "https://registry.yarnpkg.com/tmp/-/tmp-0.0.33.tgz#6d34335889768d21b2bcda0aa277ced3b1bfadf9"
|
||||
|
|
Loading…
Reference in New Issue