Updating object store stream upload to make sure the stream has finished being processed before trying to upload to AWS (and only uploading a partial stream).
This commit is contained in:
parent
6df5315b32
commit
75501c2251
|
@ -41,10 +41,7 @@ type UploadParams = BaseUploadParams & {
|
||||||
path?: string | PathLike
|
path?: string | PathLike
|
||||||
}
|
}
|
||||||
|
|
||||||
export type StreamTypes =
|
export type StreamTypes = ReadStream | NodeJS.ReadableStream
|
||||||
| ReadStream
|
|
||||||
| NodeJS.ReadableStream
|
|
||||||
| ReadableStream<Uint8Array>
|
|
||||||
|
|
||||||
export type StreamUploadParams = BaseUploadParams & {
|
export type StreamUploadParams = BaseUploadParams & {
|
||||||
stream?: StreamTypes
|
stream?: StreamTypes
|
||||||
|
@ -222,6 +219,9 @@ export async function streamUpload({
|
||||||
extra,
|
extra,
|
||||||
ttl,
|
ttl,
|
||||||
}: StreamUploadParams) {
|
}: StreamUploadParams) {
|
||||||
|
if (!stream) {
|
||||||
|
throw new Error("Stream to upload is invalid/undefined")
|
||||||
|
}
|
||||||
const extension = filename.split(".").pop()
|
const extension = filename.split(".").pop()
|
||||||
const objectStore = ObjectStore(bucketName)
|
const objectStore = ObjectStore(bucketName)
|
||||||
const bucketCreated = await createBucketIfNotExists(objectStore, bucketName)
|
const bucketCreated = await createBucketIfNotExists(objectStore, bucketName)
|
||||||
|
@ -251,14 +251,35 @@ export async function streamUpload({
|
||||||
: CONTENT_TYPE_MAP.txt
|
: CONTENT_TYPE_MAP.txt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const bucket = sanitizeBucket(bucketName),
|
||||||
|
objKey = sanitizeKey(filename)
|
||||||
const params = {
|
const params = {
|
||||||
Bucket: sanitizeBucket(bucketName),
|
Bucket: bucket,
|
||||||
Key: sanitizeKey(filename),
|
Key: objKey,
|
||||||
Body: stream,
|
Body: stream,
|
||||||
ContentType: contentType,
|
ContentType: contentType,
|
||||||
...extra,
|
...extra,
|
||||||
}
|
}
|
||||||
return objectStore.upload(params).promise()
|
|
||||||
|
// make sure we have the stream before we try to push it to object store
|
||||||
|
if (stream.on) {
|
||||||
|
await new Promise((resolve, reject) => {
|
||||||
|
stream.on("finish", resolve)
|
||||||
|
stream.on("error", reject)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
const details = await objectStore.upload(params).promise()
|
||||||
|
const headDetails = await objectStore
|
||||||
|
.headObject({
|
||||||
|
Bucket: bucket,
|
||||||
|
Key: objKey,
|
||||||
|
})
|
||||||
|
.promise()
|
||||||
|
return {
|
||||||
|
...details,
|
||||||
|
ContentLength: headDetails.ContentLength,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -245,7 +245,7 @@ export type AutomationAttachment = {
|
||||||
|
|
||||||
export type AutomationAttachmentContent = {
|
export type AutomationAttachmentContent = {
|
||||||
filename: string
|
filename: string
|
||||||
content: ReadStream | NodeJS.ReadableStream | ReadableStream<Uint8Array>
|
content: ReadStream | NodeJS.ReadableStream
|
||||||
}
|
}
|
||||||
|
|
||||||
export type BucketedContent = AutomationAttachmentContent & {
|
export type BucketedContent = AutomationAttachmentContent & {
|
||||||
|
|
Loading…
Reference in New Issue