swift: Add list_chunks and retry_403 options

Blomp tends to return 403 errors on directory listing requests which
are not really informative because such requests eventually succeed
if repeated.

This commit adds two options to relieve this behaviour and make syncing
large directories more reliable.

Discussion: https://forum.rclone.org/t/too-many-errors-with-blomp-especially-in-directories-with-thousands-of-files/46769
This commit is contained in:
Bohdan Horbeshko 2024-07-13 04:47:57 +03:00
parent 27267547b9
commit bb86f466b0
1 changed files with 40 additions and 22 deletions

View File

@ -36,7 +36,6 @@ import (
// Constants
const (
directoryMarkerContentType = "application/directory" // content type of directory marker objects
listChunks = 1000 // chunk size to read directory listings
defaultChunkSize = 5 * fs.Gibi
minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep.
segmentsContainerSuffix = "_segments"
@ -308,6 +307,20 @@ of the Swift API that do not implement pagination as expected. See
also "fetch_until_empty_page".`,
Default: 0,
Advanced: true,
}, {
Name: "list_chunks",
Help: `Maximum amount of directory entries per listing request.
Reduce this if you have errors with listing large directories.`,
Default: 1000,
Advanced: true,
}, {
Name: "retry_403",
Help: `Retry on 403 errors.
Blomp is known to return 403 errors on directory listing requests.`,
Default: false,
Advanced: true,
}}, SharedOptions...),
})
}
@ -340,6 +353,8 @@ type Options struct {
Enc encoder.MultiEncoder `config:"encoding"`
FetchUntilEmptyPage bool `config:"fetch_until_empty_page"`
PartialPageFetchThreshold int `config:"partial_page_fetch_threshold"`
ListChunks int `config:"list_chunks"`
Retry403 bool `config:"retry_403"`
}
// Fs represents a remote swift server
@ -411,7 +426,7 @@ var retryErrorCodes = []int{
// shouldRetry returns a boolean as to whether this err deserves to be
// retried. It returns the err as a convenience
func shouldRetry(ctx context.Context, err error) (bool, error) {
func shouldRetry(ctx context.Context, err error, retry403 bool) (bool, error) {
if fserrors.ContextError(ctx, &err) {
return false, err
}
@ -422,6 +437,9 @@ func shouldRetry(ctx context.Context, err error) (bool, error) {
return true, err
}
}
if retry403 && swiftError.StatusCode == 403 {
return true, err
}
}
// Check for generic failure conditions
return fserrors.ShouldRetry(err), err
@ -430,7 +448,7 @@ func shouldRetry(ctx context.Context, err error) (bool, error) {
// shouldRetryHeaders returns a boolean as to whether this err
// deserves to be retried. It reads the headers passed in looking for
// `Retry-After`. It returns the err as a convenience
func shouldRetryHeaders(ctx context.Context, headers swift.Headers, err error) (bool, error) {
func shouldRetryHeaders(ctx context.Context, headers swift.Headers, err error, retry403 bool) (bool, error) {
if swiftError, ok := err.(*swift.Error); ok && swiftError.StatusCode == 429 {
if value := headers["Retry-After"]; value != "" {
retryAfter, parseErr := strconv.Atoi(value)
@ -449,7 +467,7 @@ func shouldRetryHeaders(ctx context.Context, headers swift.Headers, err error) (
}
}
}
return shouldRetry(ctx, err)
return shouldRetry(ctx, err, retry403)
}
// parsePath parses a remote 'url'
@ -598,7 +616,7 @@ func NewFsWithConnection(ctx context.Context, opt *Options, name, root string, c
err = f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers
info, rxHeaders, err = f.c.Object(ctx, f.rootContainer, encodedDirectory)
return shouldRetryHeaders(ctx, rxHeaders, err)
return shouldRetryHeaders(ctx, rxHeaders, err, f.opt.Retry403)
})
if err == nil && info.ContentType != directoryMarkerContentType {
newRoot := path.Dir(f.root)
@ -696,7 +714,7 @@ func (f *Fs) listContainerRoot(ctx context.Context, container, directory, prefix
// Options for ObjectsWalk
opts := swift.ObjectsOpts{
Prefix: directory,
Limit: listChunks,
Limit: f.opt.ListChunks,
}
if !recurse {
opts.Delimiter = '/'
@ -706,7 +724,7 @@ func (f *Fs) listContainerRoot(ctx context.Context, container, directory, prefix
var err error
err = f.pacer.Call(func() (bool, error) {
objects, err = f.c.Objects(ctx, container, opts)
return shouldRetry(ctx, err)
return shouldRetry(ctx, err, f.opt.Retry403)
})
if err == nil {
for i := range objects {
@ -795,7 +813,7 @@ func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err err
var containers []swift.Container
err = f.pacer.Call(func() (bool, error) {
containers, err = f.c.ContainersAll(ctx, nil)
return shouldRetry(ctx, err)
return shouldRetry(ctx, err, f.opt.Retry403)
})
if err != nil {
return nil, fmt.Errorf("container listing failed: %w", err)
@ -888,7 +906,7 @@ func (f *Fs) About(ctx context.Context) (usage *fs.Usage, err error) {
var container swift.Container
err = f.pacer.Call(func() (bool, error) {
container, _, err = f.c.Container(ctx, f.rootContainer)
return shouldRetry(ctx, err)
return shouldRetry(ctx, err, f.opt.Retry403)
})
if err != nil {
return nil, fmt.Errorf("container info failed: %w", err)
@ -899,7 +917,7 @@ func (f *Fs) About(ctx context.Context) (usage *fs.Usage, err error) {
var containers []swift.Container
err = f.pacer.Call(func() (bool, error) {
containers, err = f.c.ContainersAll(ctx, nil)
return shouldRetry(ctx, err)
return shouldRetry(ctx, err, f.opt.Retry403)
})
if err != nil {
return nil, fmt.Errorf("container listing failed: %w", err)
@ -951,7 +969,7 @@ func (f *Fs) makeContainer(ctx context.Context, container string) error {
err = f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers
_, rxHeaders, err = f.c.Container(ctx, container)
return shouldRetryHeaders(ctx, rxHeaders, err)
return shouldRetryHeaders(ctx, rxHeaders, err, f.opt.Retry403)
})
}
if err == swift.ContainerNotFound {
@ -961,7 +979,7 @@ func (f *Fs) makeContainer(ctx context.Context, container string) error {
}
err = f.pacer.Call(func() (bool, error) {
err = f.c.ContainerCreate(ctx, container, headers)
return shouldRetry(ctx, err)
return shouldRetry(ctx, err, f.opt.Retry403)
})
if err == nil {
fs.Infof(f, "Container %q created", container)
@ -982,7 +1000,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
err := f.cache.Remove(container, func() error {
err := f.pacer.Call(func() (bool, error) {
err := f.c.ContainerDelete(ctx, container)
return shouldRetry(ctx, err)
return shouldRetry(ctx, err, f.opt.Retry403)
})
if err == nil {
fs.Infof(f, "Container %q removed", container)
@ -1060,7 +1078,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
err = f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers
rxHeaders, err = f.c.ObjectCopy(ctx, srcContainer, srcPath, dstContainer, dstPath, nil)
return shouldRetryHeaders(ctx, rxHeaders, err)
return shouldRetryHeaders(ctx, rxHeaders, err, f.opt.Retry403)
})
}
if err != nil {
@ -1155,7 +1173,7 @@ func (su *segmentedUpload) uploadManifest(ctx context.Context, contentType strin
err = su.f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers
rxHeaders, err = su.f.c.ObjectPut(ctx, su.dstContainer, su.dstPath, emptyReader, true, "", contentType, headers)
return shouldRetryHeaders(ctx, rxHeaders, err)
return shouldRetryHeaders(ctx, rxHeaders, err, su.f.opt.Retry403)
})
return err
}
@ -1179,7 +1197,7 @@ func (f *Fs) copyLargeObject(ctx context.Context, src *Object, dstContainer stri
err = f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers
rxHeaders, err = f.c.ObjectCopy(ctx, srcSegmentsContainer, srcSegment, su.container, dstSegment, nil)
return shouldRetryHeaders(ctx, rxHeaders, err)
return shouldRetryHeaders(ctx, rxHeaders, err, f.opt.Retry403)
})
if err != nil {
return err
@ -1326,7 +1344,7 @@ func (o *Object) readMetaData(ctx context.Context) (err error) {
container, containerPath := o.split()
err = o.fs.pacer.Call(func() (bool, error) {
info, h, err = o.fs.c.Object(ctx, container, containerPath)
return shouldRetryHeaders(ctx, h, err)
return shouldRetryHeaders(ctx, h, err, o.fs.opt.Retry403)
})
if err != nil {
if err == swift.ObjectNotFound {
@ -1384,7 +1402,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
container, containerPath := o.split()
return o.fs.pacer.Call(func() (bool, error) {
err = o.fs.c.ObjectUpdate(ctx, container, containerPath, newHeaders)
return shouldRetry(ctx, err)
return shouldRetry(ctx, err, o.fs.opt.Retry403)
})
}
@ -1405,7 +1423,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
err = o.fs.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers
in, rxHeaders, err = o.fs.c.ObjectOpen(ctx, container, containerPath, !isRanging, headers)
return shouldRetryHeaders(ctx, rxHeaders, err)
return shouldRetryHeaders(ctx, rxHeaders, err, o.fs.opt.Retry403)
})
return
}
@ -1497,7 +1515,7 @@ func (o *Object) updateChunks(ctx context.Context, in0 io.Reader, headers swift.
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
var rxHeaders swift.Headers
rxHeaders, err = o.fs.c.ObjectPut(ctx, su.container, segmentPath, segmentReader, true, "", "", headers)
return shouldRetryHeaders(ctx, rxHeaders, err)
return shouldRetryHeaders(ctx, rxHeaders, err, o.fs.opt.Retry403)
})
if err != nil {
return err
@ -1561,7 +1579,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
var rxHeaders swift.Headers
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
rxHeaders, err = o.fs.c.ObjectPut(ctx, container, containerPath, in, true, "", contentType, headers)
return shouldRetryHeaders(ctx, rxHeaders, err)
return shouldRetryHeaders(ctx, rxHeaders, err, o.fs.opt.Retry403)
})
if err != nil {
return err
@ -1626,7 +1644,7 @@ func (o *Object) Remove(ctx context.Context) (err error) {
fs.Errorf(o, "Dangling object - ignoring: %v", err)
err = nil
}
return shouldRetry(ctx, err)
return shouldRetry(ctx, err, o.fs.opt.Retry403)
})
if err != nil {
return err