|
|
|
@ -13,7 +13,7 @@ import (
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/ncw/swift"
|
|
|
|
|
"github.com/ncw/swift/v2"
|
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
|
|
|
"github.com/rclone/rclone/fs/config"
|
|
|
|
@ -391,7 +391,7 @@ func swiftConnection(ctx context.Context, opt *Options, name string) (*swift.Con
|
|
|
|
|
if c.AuthUrl == "" {
|
|
|
|
|
return nil, errors.New("auth not found")
|
|
|
|
|
}
|
|
|
|
|
err := c.Authenticate() // fills in c.StorageUrl and c.AuthToken
|
|
|
|
|
err := c.Authenticate(ctx) // fills in c.StorageUrl and c.AuthToken
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
@ -467,7 +467,7 @@ func NewFsWithConnection(ctx context.Context, opt *Options, name, root string, c
|
|
|
|
|
encodedDirectory := f.opt.Enc.FromStandardPath(f.rootDirectory)
|
|
|
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
|
|
|
var rxHeaders swift.Headers
|
|
|
|
|
info, rxHeaders, err = f.c.Object(f.rootContainer, encodedDirectory)
|
|
|
|
|
info, rxHeaders, err = f.c.Object(ctx, f.rootContainer, encodedDirectory)
|
|
|
|
|
return shouldRetryHeaders(rxHeaders, err)
|
|
|
|
|
})
|
|
|
|
|
if err == nil && info.ContentType != directoryMarkerContentType {
|
|
|
|
@ -506,7 +506,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|
|
|
|
// Return an Object from a path
|
|
|
|
|
//
|
|
|
|
|
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
|
|
|
|
func (f *Fs) newObjectWithInfo(remote string, info *swift.Object) (fs.Object, error) {
|
|
|
|
|
func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *swift.Object) (fs.Object, error) {
|
|
|
|
|
o := &Object{
|
|
|
|
|
fs: f,
|
|
|
|
|
remote: remote,
|
|
|
|
@ -516,7 +516,7 @@ func (f *Fs) newObjectWithInfo(remote string, info *swift.Object) (fs.Object, er
|
|
|
|
|
// making sure we read the full metadata for all 0 byte files.
|
|
|
|
|
// We don't read the metadata for directory marker objects.
|
|
|
|
|
if info != nil && info.Bytes == 0 && info.ContentType != "application/directory" {
|
|
|
|
|
err := o.readMetaData() // reads info and headers, returning an error
|
|
|
|
|
err := o.readMetaData(ctx) // reads info and headers, returning an error
|
|
|
|
|
if err == fs.ErrorObjectNotFound {
|
|
|
|
|
// We have a dangling large object here so just return the original metadata
|
|
|
|
|
fs.Errorf(o, "dangling large object with no contents")
|
|
|
|
@ -533,7 +533,7 @@ func (f *Fs) newObjectWithInfo(remote string, info *swift.Object) (fs.Object, er
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
err := o.readMetaData() // reads info and headers, returning an error
|
|
|
|
|
err := o.readMetaData(ctx) // reads info and headers, returning an error
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
@ -544,7 +544,7 @@ func (f *Fs) newObjectWithInfo(remote string, info *swift.Object) (fs.Object, er
|
|
|
|
|
// NewObject finds the Object at remote. If it can't be found it
|
|
|
|
|
// returns the error fs.ErrorObjectNotFound.
|
|
|
|
|
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
|
|
|
|
return f.newObjectWithInfo(remote, nil)
|
|
|
|
|
return f.newObjectWithInfo(ctx, remote, nil)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// listFn is called from list and listContainerRoot to handle an object.
|
|
|
|
@ -556,7 +556,7 @@ type listFn func(remote string, object *swift.Object, isDirectory bool) error
|
|
|
|
|
// container to the start.
|
|
|
|
|
//
|
|
|
|
|
// Set recurse to read sub directories
|
|
|
|
|
func (f *Fs) listContainerRoot(container, directory, prefix string, addContainer bool, recurse bool, includeDirMarkers bool, fn listFn) error {
|
|
|
|
|
func (f *Fs) listContainerRoot(ctx context.Context, container, directory, prefix string, addContainer bool, recurse bool, includeDirMarkers bool, fn listFn) error {
|
|
|
|
|
if prefix != "" && !strings.HasSuffix(prefix, "/") {
|
|
|
|
|
prefix += "/"
|
|
|
|
|
}
|
|
|
|
@ -571,11 +571,11 @@ func (f *Fs) listContainerRoot(container, directory, prefix string, addContainer
|
|
|
|
|
if !recurse {
|
|
|
|
|
opts.Delimiter = '/'
|
|
|
|
|
}
|
|
|
|
|
return f.c.ObjectsWalk(container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) {
|
|
|
|
|
return f.c.ObjectsWalk(ctx, container, &opts, func(ctx context.Context, opts *swift.ObjectsOpts) (interface{}, error) {
|
|
|
|
|
var objects []swift.Object
|
|
|
|
|
var err error
|
|
|
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
|
|
|
objects, err = f.c.Objects(container, opts)
|
|
|
|
|
objects, err = f.c.Objects(ctx, container, opts)
|
|
|
|
|
return shouldRetry(err)
|
|
|
|
|
})
|
|
|
|
|
if err == nil {
|
|
|
|
@ -613,8 +613,8 @@ func (f *Fs) listContainerRoot(container, directory, prefix string, addContainer
|
|
|
|
|
type addEntryFn func(fs.DirEntry) error
|
|
|
|
|
|
|
|
|
|
// list the objects into the function supplied
|
|
|
|
|
func (f *Fs) list(container, directory, prefix string, addContainer bool, recurse bool, includeDirMarkers bool, fn addEntryFn) error {
|
|
|
|
|
err := f.listContainerRoot(container, directory, prefix, addContainer, recurse, includeDirMarkers, func(remote string, object *swift.Object, isDirectory bool) (err error) {
|
|
|
|
|
func (f *Fs) list(ctx context.Context, container, directory, prefix string, addContainer bool, recurse bool, includeDirMarkers bool, fn addEntryFn) error {
|
|
|
|
|
err := f.listContainerRoot(ctx, container, directory, prefix, addContainer, recurse, includeDirMarkers, func(remote string, object *swift.Object, isDirectory bool) (err error) {
|
|
|
|
|
if isDirectory {
|
|
|
|
|
remote = strings.TrimRight(remote, "/")
|
|
|
|
|
d := fs.NewDir(remote, time.Time{}).SetSize(object.Bytes)
|
|
|
|
@ -622,7 +622,7 @@ func (f *Fs) list(container, directory, prefix string, addContainer bool, recurs
|
|
|
|
|
} else {
|
|
|
|
|
// newObjectWithInfo does a full metadata read on 0 size objects which might be dynamic large objects
|
|
|
|
|
var o fs.Object
|
|
|
|
|
o, err = f.newObjectWithInfo(remote, object)
|
|
|
|
|
o, err = f.newObjectWithInfo(ctx, remote, object)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -639,12 +639,12 @@ func (f *Fs) list(container, directory, prefix string, addContainer bool, recurs
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// listDir lists a single directory
|
|
|
|
|
func (f *Fs) listDir(container, directory, prefix string, addContainer bool) (entries fs.DirEntries, err error) {
|
|
|
|
|
func (f *Fs) listDir(ctx context.Context, container, directory, prefix string, addContainer bool) (entries fs.DirEntries, err error) {
|
|
|
|
|
if container == "" {
|
|
|
|
|
return nil, fs.ErrorListBucketRequired
|
|
|
|
|
}
|
|
|
|
|
// List the objects
|
|
|
|
|
err = f.list(container, directory, prefix, addContainer, false, false, func(entry fs.DirEntry) error {
|
|
|
|
|
err = f.list(ctx, container, directory, prefix, addContainer, false, false, func(entry fs.DirEntry) error {
|
|
|
|
|
entries = append(entries, entry)
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
@ -660,7 +660,7 @@ func (f *Fs) listDir(container, directory, prefix string, addContainer bool) (en
|
|
|
|
|
func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err error) {
|
|
|
|
|
var containers []swift.Container
|
|
|
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
|
|
|
containers, err = f.c.ContainersAll(nil)
|
|
|
|
|
containers, err = f.c.ContainersAll(ctx, nil)
|
|
|
|
|
return shouldRetry(err)
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
@ -691,7 +691,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
|
|
|
|
}
|
|
|
|
|
return f.listContainers(ctx)
|
|
|
|
|
}
|
|
|
|
|
return f.listDir(container, directory, f.rootDirectory, f.rootContainer == "")
|
|
|
|
|
return f.listDir(ctx, container, directory, f.rootDirectory, f.rootContainer == "")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ListR lists the objects and directories of the Fs starting
|
|
|
|
@ -714,7 +714,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
|
|
|
|
|
container, directory := f.split(dir)
|
|
|
|
|
list := walk.NewListRHelper(callback)
|
|
|
|
|
listR := func(container, directory, prefix string, addContainer bool) error {
|
|
|
|
|
return f.list(container, directory, prefix, addContainer, true, false, func(entry fs.DirEntry) error {
|
|
|
|
|
return f.list(ctx, container, directory, prefix, addContainer, true, false, func(entry fs.DirEntry) error {
|
|
|
|
|
return list.Add(entry)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
@ -752,7 +752,7 @@ func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
|
|
|
|
|
var containers []swift.Container
|
|
|
|
|
var err error
|
|
|
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
|
|
|
containers, err = f.c.ContainersAll(nil)
|
|
|
|
|
containers, err = f.c.ContainersAll(ctx, nil)
|
|
|
|
|
return shouldRetry(err)
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
@ -804,7 +804,7 @@ func (f *Fs) makeContainer(ctx context.Context, container string) error {
|
|
|
|
|
if !f.noCheckContainer {
|
|
|
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
|
|
|
var rxHeaders swift.Headers
|
|
|
|
|
_, rxHeaders, err = f.c.Container(container)
|
|
|
|
|
_, rxHeaders, err = f.c.Container(ctx, container)
|
|
|
|
|
return shouldRetryHeaders(rxHeaders, err)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
@ -814,7 +814,7 @@ func (f *Fs) makeContainer(ctx context.Context, container string) error {
|
|
|
|
|
headers["X-Storage-Policy"] = f.opt.StoragePolicy
|
|
|
|
|
}
|
|
|
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
|
|
|
err = f.c.ContainerCreate(container, headers)
|
|
|
|
|
err = f.c.ContainerCreate(ctx, container, headers)
|
|
|
|
|
return shouldRetry(err)
|
|
|
|
|
})
|
|
|
|
|
if err == nil {
|
|
|
|
@ -835,7 +835,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
|
|
|
|
}
|
|
|
|
|
err := f.cache.Remove(container, func() error {
|
|
|
|
|
err := f.pacer.Call(func() (bool, error) {
|
|
|
|
|
err := f.c.ContainerDelete(container)
|
|
|
|
|
err := f.c.ContainerDelete(ctx, container)
|
|
|
|
|
return shouldRetry(err)
|
|
|
|
|
})
|
|
|
|
|
if err == nil {
|
|
|
|
@ -865,7 +865,7 @@ func (f *Fs) Purge(ctx context.Context, dir string) error {
|
|
|
|
|
go func() {
|
|
|
|
|
delErr <- operations.DeleteFiles(ctx, toBeDeleted)
|
|
|
|
|
}()
|
|
|
|
|
err := f.list(container, directory, f.rootDirectory, false, true, true, func(entry fs.DirEntry) error {
|
|
|
|
|
err := f.list(ctx, container, directory, f.rootDirectory, false, true, true, func(entry fs.DirEntry) error {
|
|
|
|
|
if o, ok := entry.(*Object); ok {
|
|
|
|
|
toBeDeleted <- o
|
|
|
|
|
}
|
|
|
|
@ -905,7 +905,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
|
|
|
|
srcContainer, srcPath := srcObj.split()
|
|
|
|
|
err = f.pacer.Call(func() (bool, error) {
|
|
|
|
|
var rxHeaders swift.Headers
|
|
|
|
|
rxHeaders, err = f.c.ObjectCopy(srcContainer, srcPath, dstContainer, dstPath, nil)
|
|
|
|
|
rxHeaders, err = f.c.ObjectCopy(ctx, srcContainer, srcPath, dstContainer, dstPath, nil)
|
|
|
|
|
return shouldRetryHeaders(rxHeaders, err)
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
@ -944,11 +944,11 @@ func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) {
|
|
|
|
|
if t != hash.MD5 {
|
|
|
|
|
return "", hash.ErrUnsupported
|
|
|
|
|
}
|
|
|
|
|
isDynamicLargeObject, err := o.isDynamicLargeObject()
|
|
|
|
|
isDynamicLargeObject, err := o.isDynamicLargeObject(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
isStaticLargeObject, err := o.isStaticLargeObject()
|
|
|
|
|
isStaticLargeObject, err := o.isStaticLargeObject(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
@ -961,8 +961,8 @@ func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) {
|
|
|
|
|
|
|
|
|
|
// hasHeader checks for the header passed in returning false if the
|
|
|
|
|
// object isn't found.
|
|
|
|
|
func (o *Object) hasHeader(header string) (bool, error) {
|
|
|
|
|
err := o.readMetaData()
|
|
|
|
|
func (o *Object) hasHeader(ctx context.Context, header string) (bool, error) {
|
|
|
|
|
err := o.readMetaData(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if err == fs.ErrorObjectNotFound {
|
|
|
|
|
return false, nil
|
|
|
|
@ -974,29 +974,29 @@ func (o *Object) hasHeader(header string) (bool, error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// isDynamicLargeObject checks for X-Object-Manifest header
|
|
|
|
|
func (o *Object) isDynamicLargeObject() (bool, error) {
|
|
|
|
|
return o.hasHeader("X-Object-Manifest")
|
|
|
|
|
func (o *Object) isDynamicLargeObject(ctx context.Context) (bool, error) {
|
|
|
|
|
return o.hasHeader(ctx, "X-Object-Manifest")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// isStaticLargeObjectFile checks for the X-Static-Large-Object header
|
|
|
|
|
func (o *Object) isStaticLargeObject() (bool, error) {
|
|
|
|
|
return o.hasHeader("X-Static-Large-Object")
|
|
|
|
|
func (o *Object) isStaticLargeObject(ctx context.Context) (bool, error) {
|
|
|
|
|
return o.hasHeader(ctx, "X-Static-Large-Object")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (o *Object) isLargeObject() (result bool, err error) {
|
|
|
|
|
result, err = o.hasHeader("X-Static-Large-Object")
|
|
|
|
|
func (o *Object) isLargeObject(ctx context.Context) (result bool, err error) {
|
|
|
|
|
result, err = o.hasHeader(ctx, "X-Static-Large-Object")
|
|
|
|
|
if result {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
result, err = o.hasHeader("X-Object-Manifest")
|
|
|
|
|
result, err = o.hasHeader(ctx, "X-Object-Manifest")
|
|
|
|
|
if result {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
return false, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (o *Object) isInContainerVersioning(container string) (bool, error) {
|
|
|
|
|
_, headers, err := o.fs.c.Container(container)
|
|
|
|
|
func (o *Object) isInContainerVersioning(ctx context.Context, container string) (bool, error) {
|
|
|
|
|
_, headers, err := o.fs.c.Container(ctx, container)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return false, err
|
|
|
|
|
}
|
|
|
|
@ -1032,7 +1032,7 @@ func (o *Object) decodeMetaData(info *swift.Object) (err error) {
|
|
|
|
|
// it also sets the info
|
|
|
|
|
//
|
|
|
|
|
// it returns fs.ErrorObjectNotFound if the object isn't found
|
|
|
|
|
func (o *Object) readMetaData() (err error) {
|
|
|
|
|
func (o *Object) readMetaData(ctx context.Context) (err error) {
|
|
|
|
|
if o.headers != nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@ -1040,7 +1040,7 @@ func (o *Object) readMetaData() (err error) {
|
|
|
|
|
var h swift.Headers
|
|
|
|
|
container, containerPath := o.split()
|
|
|
|
|
err = o.fs.pacer.Call(func() (bool, error) {
|
|
|
|
|
info, h, err = o.fs.c.Object(container, containerPath)
|
|
|
|
|
info, h, err = o.fs.c.Object(ctx, container, containerPath)
|
|
|
|
|
return shouldRetryHeaders(h, err)
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
@ -1066,7 +1066,7 @@ func (o *Object) ModTime(ctx context.Context) time.Time {
|
|
|
|
|
if o.fs.ci.UseServerModTime {
|
|
|
|
|
return o.lastModified
|
|
|
|
|
}
|
|
|
|
|
err := o.readMetaData()
|
|
|
|
|
err := o.readMetaData(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Debugf(o, "Failed to read metadata: %s", err)
|
|
|
|
|
return o.lastModified
|
|
|
|
@ -1081,7 +1081,7 @@ func (o *Object) ModTime(ctx context.Context) time.Time {
|
|
|
|
|
|
|
|
|
|
// SetModTime sets the modification time of the local fs object
|
|
|
|
|
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
|
|
|
|
err := o.readMetaData()
|
|
|
|
|
err := o.readMetaData(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -1099,7 +1099,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
|
|
|
|
}
|
|
|
|
|
container, containerPath := o.split()
|
|
|
|
|
return o.fs.pacer.Call(func() (bool, error) {
|
|
|
|
|
err = o.fs.c.ObjectUpdate(container, containerPath, newHeaders)
|
|
|
|
|
err = o.fs.c.ObjectUpdate(ctx, container, containerPath, newHeaders)
|
|
|
|
|
return shouldRetry(err)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
@ -1120,7 +1120,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
|
|
|
|
container, containerPath := o.split()
|
|
|
|
|
err = o.fs.pacer.Call(func() (bool, error) {
|
|
|
|
|
var rxHeaders swift.Headers
|
|
|
|
|
in, rxHeaders, err = o.fs.c.ObjectOpen(container, containerPath, !isRanging, headers)
|
|
|
|
|
in, rxHeaders, err = o.fs.c.ObjectOpen(ctx, container, containerPath, !isRanging, headers)
|
|
|
|
|
return shouldRetryHeaders(rxHeaders, err)
|
|
|
|
|
})
|
|
|
|
|
return
|
|
|
|
@ -1134,9 +1134,9 @@ func min(x, y int64) int64 {
|
|
|
|
|
return y
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (o *Object) getSegmentsLargeObject() (map[string][]string, error) {
|
|
|
|
|
func (o *Object) getSegmentsLargeObject(ctx context.Context) (map[string][]string, error) {
|
|
|
|
|
container, objectName := o.split()
|
|
|
|
|
segmentContainer, segmentObjects, err := o.fs.c.LargeObjectGetSegments(container, objectName)
|
|
|
|
|
segmentContainer, segmentObjects, err := o.fs.c.LargeObjectGetSegments(ctx, container, objectName)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Debugf(o, "Failed to get list segments of object: %v", err)
|
|
|
|
|
return nil, err
|
|
|
|
@ -1153,12 +1153,12 @@ func (o *Object) getSegmentsLargeObject() (map[string][]string, error) {
|
|
|
|
|
return containerSegments, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (o *Object) removeSegmentsLargeObject(containerSegments map[string][]string) error {
|
|
|
|
|
func (o *Object) removeSegmentsLargeObject(ctx context.Context, containerSegments map[string][]string) error {
|
|
|
|
|
if containerSegments == nil || len(containerSegments) <= 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
for container, segments := range containerSegments {
|
|
|
|
|
_, err := o.fs.c.BulkDelete(container, segments)
|
|
|
|
|
_, err := o.fs.c.BulkDelete(ctx, container, segments)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Debugf(o, "Failed to delete bulk segments %v", err)
|
|
|
|
|
return err
|
|
|
|
@ -1167,8 +1167,8 @@ func (o *Object) removeSegmentsLargeObject(containerSegments map[string][]string
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (o *Object) getSegmentsDlo() (segmentsContainer string, prefix string, err error) {
|
|
|
|
|
if err = o.readMetaData(); err != nil {
|
|
|
|
|
func (o *Object) getSegmentsDlo(ctx context.Context) (segmentsContainer string, prefix string, err error) {
|
|
|
|
|
if err = o.readMetaData(ctx); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
dirManifest := o.headers["X-Object-Manifest"]
|
|
|
|
@ -1203,14 +1203,14 @@ func urlEncode(str string) string {
|
|
|
|
|
|
|
|
|
|
// updateChunks updates the existing object using chunks to a separate
|
|
|
|
|
// container. It returns a string which prefixes current segments.
|
|
|
|
|
func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, contentType string) (string, error) {
|
|
|
|
|
func (o *Object) updateChunks(ctx context.Context, in0 io.Reader, headers swift.Headers, size int64, contentType string) (string, error) {
|
|
|
|
|
container, containerPath := o.split()
|
|
|
|
|
segmentsContainer := container + "_segments"
|
|
|
|
|
// Create the segmentsContainer if it doesn't exist
|
|
|
|
|
var err error
|
|
|
|
|
err = o.fs.pacer.Call(func() (bool, error) {
|
|
|
|
|
var rxHeaders swift.Headers
|
|
|
|
|
_, rxHeaders, err = o.fs.c.Container(segmentsContainer)
|
|
|
|
|
_, rxHeaders, err = o.fs.c.Container(ctx, segmentsContainer)
|
|
|
|
|
return shouldRetryHeaders(rxHeaders, err)
|
|
|
|
|
})
|
|
|
|
|
if err == swift.ContainerNotFound {
|
|
|
|
@ -1219,7 +1219,7 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
|
|
|
|
headers["X-Storage-Policy"] = o.fs.opt.StoragePolicy
|
|
|
|
|
}
|
|
|
|
|
err = o.fs.pacer.Call(func() (bool, error) {
|
|
|
|
|
err = o.fs.c.ContainerCreate(segmentsContainer, headers)
|
|
|
|
|
err = o.fs.c.ContainerCreate(ctx, segmentsContainer, headers)
|
|
|
|
|
return shouldRetry(err)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
@ -1241,7 +1241,7 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
|
|
|
|
if segmentInfos == nil || len(segmentInfos) == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
deleteChunks(o, segmentsContainer, segmentInfos)
|
|
|
|
|
deleteChunks(ctx, o, segmentsContainer, segmentInfos)
|
|
|
|
|
})()
|
|
|
|
|
for {
|
|
|
|
|
// can we read at least one byte?
|
|
|
|
@ -1263,7 +1263,7 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
|
|
|
|
fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, segmentsContainer)
|
|
|
|
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
|
|
|
|
var rxHeaders swift.Headers
|
|
|
|
|
rxHeaders, err = o.fs.c.ObjectPut(segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
|
|
|
|
|
rxHeaders, err = o.fs.c.ObjectPut(ctx, segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
|
|
|
|
|
if err == nil {
|
|
|
|
|
segmentInfos = append(segmentInfos, segmentPath)
|
|
|
|
|
}
|
|
|
|
@ -1280,7 +1280,7 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
|
|
|
|
emptyReader := bytes.NewReader(nil)
|
|
|
|
|
err = o.fs.pacer.Call(func() (bool, error) {
|
|
|
|
|
var rxHeaders swift.Headers
|
|
|
|
|
rxHeaders, err = o.fs.c.ObjectPut(container, containerPath, emptyReader, true, "", contentType, headers)
|
|
|
|
|
rxHeaders, err = o.fs.c.ObjectPut(ctx, container, containerPath, emptyReader, true, "", contentType, headers)
|
|
|
|
|
return shouldRetryHeaders(rxHeaders, err)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
@ -1291,13 +1291,13 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
|
|
|
|
return uniquePrefix + "/", err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func deleteChunks(o *Object, segmentsContainer string, segmentInfos []string) {
|
|
|
|
|
func deleteChunks(ctx context.Context, o *Object, segmentsContainer string, segmentInfos []string) {
|
|
|
|
|
if segmentInfos == nil || len(segmentInfos) == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
for _, v := range segmentInfos {
|
|
|
|
|
fs.Debugf(o, "Delete segment file %q on %q", v, segmentsContainer)
|
|
|
|
|
e := o.fs.c.ObjectDelete(segmentsContainer, v)
|
|
|
|
|
e := o.fs.c.ObjectDelete(ctx, segmentsContainer, v)
|
|
|
|
|
if e != nil {
|
|
|
|
|
fs.Errorf(o, "Error occurred in delete segment file %q on %q, error: %q", v, segmentsContainer, e)
|
|
|
|
|
}
|
|
|
|
@ -1320,7 +1320,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|
|
|
|
modTime := src.ModTime(ctx)
|
|
|
|
|
|
|
|
|
|
// Note whether this is a dynamic large object before starting
|
|
|
|
|
isLargeObject, err := o.isLargeObject()
|
|
|
|
|
isLargeObject, err := o.isLargeObject(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -1328,7 +1328,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|
|
|
|
//capture segments before upload
|
|
|
|
|
var segmentsContainer map[string][]string
|
|
|
|
|
if isLargeObject {
|
|
|
|
|
segmentsContainer, _ = o.getSegmentsLargeObject()
|
|
|
|
|
segmentsContainer, _ = o.getSegmentsLargeObject(ctx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Set the mtime
|
|
|
|
@ -1339,7 +1339,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|
|
|
|
fs.OpenOptionAddHeaders(options, headers)
|
|
|
|
|
|
|
|
|
|
if size > int64(o.fs.opt.ChunkSize) || (size == -1 && !o.fs.opt.NoChunk) {
|
|
|
|
|
_, err = o.updateChunks(in, headers, size, contentType)
|
|
|
|
|
_, err = o.updateChunks(ctx, in, headers, size, contentType)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -1355,7 +1355,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|
|
|
|
}
|
|
|
|
|
var rxHeaders swift.Headers
|
|
|
|
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
|
|
|
|
rxHeaders, err = o.fs.c.ObjectPut(container, containerPath, in, true, "", contentType, headers)
|
|
|
|
|
rxHeaders, err = o.fs.c.ObjectPut(ctx, container, containerPath, in, true, "", contentType, headers)
|
|
|
|
|
return shouldRetryHeaders(rxHeaders, err)
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
@ -1373,17 +1373,17 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|
|
|
|
o.size = int64(inCount.BytesRead())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
isInContainerVersioning, _ := o.isInContainerVersioning(container)
|
|
|
|
|
isInContainerVersioning, _ := o.isInContainerVersioning(ctx, container)
|
|
|
|
|
// If file was a large object and the container is not enable versioning then remove old/all segments
|
|
|
|
|
if isLargeObject && len(segmentsContainer) > 0 && !isInContainerVersioning {
|
|
|
|
|
err := o.removeSegmentsLargeObject(segmentsContainer)
|
|
|
|
|
err := o.removeSegmentsLargeObject(ctx, segmentsContainer)
|
|
|
|
|
if err != nil {
|
|
|
|
|
fs.Logf(o, "Failed to remove old segments - carrying on with upload: %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read the metadata from the newly created object if necessary
|
|
|
|
|
return o.readMetaData()
|
|
|
|
|
return o.readMetaData(ctx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Remove an object
|
|
|
|
@ -1391,14 +1391,14 @@ func (o *Object) Remove(ctx context.Context) (err error) {
|
|
|
|
|
container, containerPath := o.split()
|
|
|
|
|
|
|
|
|
|
//check object is large object
|
|
|
|
|
isLargeObject, err := o.isLargeObject()
|
|
|
|
|
isLargeObject, err := o.isLargeObject(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
//check container has enabled version to reserve segment when delete
|
|
|
|
|
isInContainerVersioning := false
|
|
|
|
|
if isLargeObject {
|
|
|
|
|
isInContainerVersioning, err = o.isInContainerVersioning(container)
|
|
|
|
|
isInContainerVersioning, err = o.isInContainerVersioning(ctx, container)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -1406,14 +1406,14 @@ func (o *Object) Remove(ctx context.Context) (err error) {
|
|
|
|
|
//capture segments object if this object is large object
|
|
|
|
|
var containerSegments map[string][]string
|
|
|
|
|
if isLargeObject {
|
|
|
|
|
containerSegments, err = o.getSegmentsLargeObject()
|
|
|
|
|
containerSegments, err = o.getSegmentsLargeObject(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// Remove file/manifest first
|
|
|
|
|
err = o.fs.pacer.Call(func() (bool, error) {
|
|
|
|
|
err = o.fs.c.ObjectDelete(container, containerPath)
|
|
|
|
|
err = o.fs.c.ObjectDelete(ctx, container, containerPath)
|
|
|
|
|
return shouldRetry(err)
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
@ -1425,7 +1425,7 @@ func (o *Object) Remove(ctx context.Context) (err error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if isLargeObject {
|
|
|
|
|
return o.removeSegmentsLargeObject(containerSegments)
|
|
|
|
|
return o.removeSegmentsLargeObject(ctx, containerSegments)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|