diff --git a/backend/s3/s3.go b/backend/s3/s3.go index 57263616c..2ea83638d 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -23,7 +23,6 @@ import ( "path" "regexp" "strings" - "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -46,6 +45,7 @@ import ( "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/walk" + "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/rest" ) @@ -798,10 +798,9 @@ type Fs struct { features *fs.Features // optional features c *s3.S3 // the connection to the s3 server ses *session.Session // the s3 session - bucket string // the bucket we are working on - bucketOKMu sync.Mutex // mutex to protect bucket OK - bucketOK bool // true if we have created the bucket - bucketDeleted bool // true if we have deleted the bucket + rootBucket string // bucket part of root (if any) + rootDirectory string // directory part of root (if any) + cache *bucket.Cache // cache for bucket creation status pacer *fs.Pacer // To pace the API calls srv *http.Client // a plain http client } @@ -830,18 +829,18 @@ func (f *Fs) Name() string { // Root of the remote (as passed into NewFs) func (f *Fs) Root() string { - if f.root == "" { - return f.bucket - } - return f.bucket + "/" + f.root + return f.root } // String converts this Fs to a string func (f *Fs) String() string { - if f.root == "" { - return fmt.Sprintf("S3 bucket %s", f.bucket) + if f.rootBucket == "" { + return fmt.Sprintf("S3 root") } - return fmt.Sprintf("S3 bucket %s path %s", f.bucket, f.root) + if f.rootDirectory == "" { + return fmt.Sprintf("S3 bucket %s", f.rootBucket) + } + return fmt.Sprintf("S3 bucket %s path %s", f.rootBucket, f.rootDirectory) } // Features returns the optional features of this Fs @@ -868,14 +867,16 @@ func (f *Fs) shouldRetry(err error) (bool, error) { } // Failing that, if it's a RequestFailure it's probably got an http status code we can check if reqErr, ok := err.(awserr.RequestFailure); ok { - // 301 if wrong region for bucket - if reqErr.StatusCode() == http.StatusMovedPermanently { - urfbErr := f.updateRegionForBucket() - if urfbErr != nil { - fs.Errorf(f, "Failed to update region for bucket: %v", urfbErr) - return false, err + // 301 if wrong region for bucket - can only update if running from a bucket + if f.rootBucket != "" { + if reqErr.StatusCode() == http.StatusMovedPermanently { + urfbErr := f.updateRegionForBucket(f.rootBucket) + if urfbErr != nil { + fs.Errorf(f, "Failed to update region for bucket: %v", urfbErr) + return false, err + } + return true, err } - return true, err } for _, e := range retryErrorCodes { if reqErr.StatusCode() == e { @@ -888,21 +889,23 @@ func (f *Fs) shouldRetry(err error) (bool, error) { return fserrors.ShouldRetry(err), err } -// Pattern to match a s3 path -var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`) - -// parseParse parses a s3 'url' -func s3ParsePath(path string) (bucket, directory string, err error) { - parts := matcher.FindStringSubmatch(path) - if parts == nil { - err = errors.Errorf("couldn't parse bucket out of s3 path %q", path) - } else { - bucket, directory = parts[1], parts[2] - directory = strings.Trim(directory, "/") - } +// parsePath parses a remote 'url' +func parsePath(path string) (root string) { + root = strings.Trim(path, "/") return } +// split returns bucket and bucketPath from the rootRelativePath +// relative to f.root +func (f *Fs) split(rootRelativePath string) (bucketName, bucketPath string) { + return bucket.Split(path.Join(f.root, rootRelativePath)) +} + +// split returns bucket and bucketPath from the object +func (o *Object) split() (bucket, bucketPath string) { + return o.fs.split(o.remote) +} + // s3Connection makes a connection to s3 func s3Connection(opt *Options) (*s3.S3, *session.Session, error) { // Make the auth @@ -1039,6 +1042,12 @@ func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { return } +// setRoot changes the root of the Fs +func (f *Fs) setRoot(root string) { + f.root = parsePath(root) + f.rootBucket, f.rootDirectory = bucket.Split(f.root) +} + // NewFs constructs an Fs from the path, bucket:path func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { // Parse config into Options struct @@ -1055,10 +1064,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, errors.Wrap(err, "s3: upload cutoff") } - bucket, directory, err := s3ParsePath(root) - if err != nil { - return nil, err - } if opt.ACL == "" { opt.ACL = "private" } @@ -1070,38 +1075,37 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { return nil, err } f := &Fs{ - name: name, - root: directory, - opt: *opt, - c: c, - bucket: bucket, - ses: ses, - pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))), - srv: fshttp.NewClient(fs.Config), + name: name, + opt: *opt, + c: c, + ses: ses, + pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))), + cache: bucket.NewCache(), + srv: fshttp.NewClient(fs.Config), } + f.setRoot(root) f.features = (&fs.Features{ - ReadMimeType: true, - WriteMimeType: true, - BucketBased: true, + ReadMimeType: true, + WriteMimeType: true, + BucketBased: true, + BucketBasedRootOK: true, }).Fill(f) - if f.root != "" { - f.root += "/" + if f.rootBucket != "" && f.rootDirectory != "" { // Check to see if the object exists req := s3.HeadObjectInput{ - Bucket: &f.bucket, - Key: &directory, + Bucket: &f.rootBucket, + Key: &f.rootDirectory, } err = f.pacer.Call(func() (bool, error) { _, err = f.c.HeadObject(&req) return f.shouldRetry(err) }) if err == nil { - f.root = path.Dir(directory) - if f.root == "." { - f.root = "" - } else { - f.root += "/" + newRoot := path.Dir(f.root) + if newRoot == "." { + newRoot = "" } + f.setRoot(newRoot) // return an error with an fs which points to the parent return f, fs.ErrorIsFile } @@ -1144,9 +1148,9 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { } // Gets the bucket location -func (f *Fs) getBucketLocation() (string, error) { +func (f *Fs) getBucketLocation(bucket string) (string, error) { req := s3.GetBucketLocationInput{ - Bucket: &f.bucket, + Bucket: &bucket, } var resp *s3.GetBucketLocationOutput var err error @@ -1162,8 +1166,8 @@ func (f *Fs) getBucketLocation() (string, error) { // Updates the region for the bucket by reading the region from the // bucket then updating the session. -func (f *Fs) updateRegionForBucket() error { - region, err := f.getBucketLocation() +func (f *Fs) updateRegionForBucket(bucket string) error { + region, err := f.getBucketLocation(bucket) if err != nil { return errors.Wrap(err, "reading bucket location failed") } @@ -1191,15 +1195,18 @@ func (f *Fs) updateRegionForBucket() error { // listFn is called from list to handle an object. type listFn func(remote string, object *s3.Object, isDirectory bool) error -// list the objects into the function supplied -// -// dir is the starting directory, "" for root +// list lists the objects into the function supplied from +// the bucket and directory supplied. The remote has prefix +// removed from it and if addBucket is set then it adds the +// bucket to the start. // // Set recurse to read sub directories -func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) error { - root := f.root - if dir != "" { - root += dir + "/" +func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, fn listFn) error { + if prefix != "" { + prefix += "/" + } + if directory != "" { + directory += "/" } maxKeys := int64(listChunkSize) delimiter := "" @@ -1210,9 +1217,9 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro for { // FIXME need to implement ALL loop req := s3.ListObjectsInput{ - Bucket: &f.bucket, + Bucket: &bucket, Delimiter: &delimiter, - Prefix: &root, + Prefix: &directory, MaxKeys: &maxKeys, Marker: marker, } @@ -1228,9 +1235,19 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro err = fs.ErrorDirNotFound } } + if f.rootBucket == "" { + // if listing from the root ignore wrong region requests returning + // empty directory + if reqErr, ok := err.(awserr.RequestFailure); ok { + // 301 if wrong region for bucket + if reqErr.StatusCode() == http.StatusMovedPermanently { + fs.Errorf(f, "Can't change region for bucket %q with no bucket specified", bucket) + return nil + } + } + } return err } - rootLength := len(f.root) if !recurse { for _, commonPrefix := range resp.CommonPrefixes { if commonPrefix.Prefix == nil { @@ -1238,11 +1255,14 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro continue } remote := *commonPrefix.Prefix - if !strings.HasPrefix(remote, f.root) { + if !strings.HasPrefix(remote, prefix) { fs.Logf(f, "Odd name received %q", remote) continue } - remote = remote[rootLength:] + remote = remote[len(prefix):] + if addBucket { + remote = path.Join(bucket, remote) + } if strings.HasSuffix(remote, "/") { remote = remote[:len(remote)-1] } @@ -1253,22 +1273,18 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro } } for _, object := range resp.Contents { - key := aws.StringValue(object.Key) - if !strings.HasPrefix(key, f.root) { - fs.Logf(f, "Odd name received %q", key) + remote := aws.StringValue(object.Key) + if !strings.HasPrefix(remote, prefix) { + fs.Logf(f, "Odd name received %q", remote) continue } - remote := key[rootLength:] + remote = remote[len(prefix):] + isDirectory := strings.HasSuffix(remote, "/") + if addBucket { + remote = path.Join(bucket, remote) + } // is this a directory marker? - if (strings.HasSuffix(remote, "/") || remote == "") && *object.Size == 0 { - if recurse && remote != "" { - // add a directory in if --fast-list since will have no prefixes - remote = remote[:len(remote)-1] - err = fn(remote, &s3.Object{Key: &remote}, true) - if err != nil { - return err - } - } + if isDirectory && object.Size != nil && *object.Size == 0 { continue // skip directory marker } err = fn(remote, object, false) @@ -1309,20 +1325,10 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Objec return o, nil } -// mark the bucket as being OK -func (f *Fs) markBucketOK() { - if f.bucket != "" { - f.bucketOKMu.Lock() - f.bucketOK = true - f.bucketDeleted = false - f.bucketOKMu.Unlock() - } -} - // listDir lists files and directories to out -func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, err error) { +func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) { // List the objects and directories - err = f.list(ctx, dir, false, func(remote string, object *s3.Object, isDirectory bool) error { + err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *s3.Object, isDirectory bool) error { entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) if err != nil { return err @@ -1336,7 +1342,7 @@ func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, er return nil, err } // bucket must be present if listing succeeded - f.markBucketOK() + f.cache.MarkOK(bucket) return entries, nil } @@ -1355,7 +1361,9 @@ func (f *Fs) listBuckets(ctx context.Context, dir string) (entries fs.DirEntries return nil, err } for _, bucket := range resp.Buckets { - d := fs.NewDir(aws.StringValue(bucket.Name), aws.TimeValue(bucket.CreationDate)) + bucketName := aws.StringValue(bucket.Name) + f.cache.MarkOK(bucketName) + d := fs.NewDir(bucketName, aws.TimeValue(bucket.CreationDate)) entries = append(entries, d) } return entries, nil @@ -1371,10 +1379,11 @@ func (f *Fs) listBuckets(ctx context.Context, dir string) (entries fs.DirEntries // This should return ErrDirNotFound if the directory isn't // found. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { - if f.bucket == "" { + bucket, directory := f.split(dir) + if bucket == "" { return f.listBuckets(ctx, dir) } - return f.listDir(ctx, dir) + return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") } // ListR lists the objects and directories of the Fs starting @@ -1392,24 +1401,43 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // immediately. // // Don't implement this unless you have a more efficient way -// of listing recursively that doing a directory traversal. +// of listing recursively than doing a directory traversal. func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { - if f.bucket == "" { - return fs.ErrorListBucketRequired - } + bucket, directory := f.split(dir) list := walk.NewListRHelper(callback) - err = f.list(ctx, dir, true, func(remote string, object *s3.Object, isDirectory bool) error { - entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) + listR := func(bucket, directory, prefix string, addBucket bool) error { + return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *s3.Object, isDirectory bool) error { + entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory) + if err != nil { + return err + } + return list.Add(entry) + }) + } + if bucket == "" { + entries, err := f.listBuckets(ctx, "") + if err != nil { + return err + } + for _, entry := range entries { + err = list.Add(entry) + if err != nil { + return err + } + bucket := entry.Remote() + err = listR(bucket, "", f.rootDirectory, true) + if err != nil { + return err + } + } + } else { + err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "") if err != nil { return err } - return list.Add(entry) - }) - if err != nil { - return err } // bucket must be present if listing succeeded - f.markBucketOK() + f.cache.MarkOK(bucket) return list.Flush() } @@ -1431,9 +1459,9 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt // Check if the bucket exists // // NB this can return incorrect results if called immediately after bucket deletion -func (f *Fs) dirExists(ctx context.Context) (bool, error) { +func (f *Fs) bucketExists(ctx context.Context, bucket string) (bool, error) { req := s3.HeadBucketInput{ - Bucket: &f.bucket, + Bucket: &bucket, } err := f.pacer.Call(func() (bool, error) { _, err := f.c.HeadBucketWithContext(ctx, &req) @@ -1452,68 +1480,56 @@ func (f *Fs) dirExists(ctx context.Context) (bool, error) { // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { - f.bucketOKMu.Lock() - defer f.bucketOKMu.Unlock() - if f.bucketOK { - return nil - } - if !f.bucketDeleted { - exists, err := f.dirExists(ctx) + bucket, _ := f.split(dir) + return f.cache.Create(bucket, func() error { + req := s3.CreateBucketInput{ + Bucket: &bucket, + ACL: &f.opt.BucketACL, + } + if f.opt.LocationConstraint != "" { + req.CreateBucketConfiguration = &s3.CreateBucketConfiguration{ + LocationConstraint: &f.opt.LocationConstraint, + } + } + err := f.pacer.Call(func() (bool, error) { + _, err := f.c.CreateBucketWithContext(ctx, &req) + return f.shouldRetry(err) + }) if err == nil { - f.bucketOK = exists + fs.Infof(f, "Bucket %q created with ACL %q", bucket, f.opt.BucketACL) } - if err != nil || exists { - return err + if err, ok := err.(awserr.Error); ok { + if err.Code() == "BucketAlreadyOwnedByYou" { + err = nil + } } - } - req := s3.CreateBucketInput{ - Bucket: &f.bucket, - ACL: &f.opt.BucketACL, - } - if f.opt.LocationConstraint != "" { - req.CreateBucketConfiguration = &s3.CreateBucketConfiguration{ - LocationConstraint: &f.opt.LocationConstraint, - } - } - err := f.pacer.Call(func() (bool, error) { - _, err := f.c.CreateBucketWithContext(ctx, &req) - return f.shouldRetry(err) + return nil + }, func() (bool, error) { + return f.bucketExists(ctx, bucket) }) - if err, ok := err.(awserr.Error); ok { - if err.Code() == "BucketAlreadyOwnedByYou" { - err = nil - } - } - if err == nil { - f.bucketOK = true - f.bucketDeleted = false - fs.Infof(f, "Bucket created with ACL %q", *req.ACL) - } - return err } // Rmdir deletes the bucket if the fs is at the root // // Returns an error if it isn't empty func (f *Fs) Rmdir(ctx context.Context, dir string) error { - f.bucketOKMu.Lock() - defer f.bucketOKMu.Unlock() - if f.root != "" || dir != "" { + bucket, directory := f.split(dir) + if bucket == "" || directory != "" { return nil } - req := s3.DeleteBucketInput{ - Bucket: &f.bucket, - } - err := f.pacer.Call(func() (bool, error) { - _, err := f.c.DeleteBucketWithContext(ctx, &req) - return f.shouldRetry(err) + return f.cache.Remove(bucket, func() error { + req := s3.DeleteBucketInput{ + Bucket: &bucket, + } + err := f.pacer.Call(func() (bool, error) { + _, err := f.c.DeleteBucketWithContext(ctx, &req) + return f.shouldRetry(err) + }) + if err == nil { + fs.Infof(f, "Bucket %q deleted", bucket) + } + return err }) - if err == nil { - f.bucketOK = false - f.bucketDeleted = true - fs.Infof(f, "Bucket deleted") - } - return err } // Precision of the remote @@ -1537,6 +1553,7 @@ func pathEscape(s string) string { // // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { + dstBucket, dstPath := f.split(remote) err := f.Mkdir(ctx, "") if err != nil { return nil, err @@ -1546,13 +1563,12 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } - srcFs := srcObj.fs - key := f.root + remote - source := pathEscape(srcFs.bucket + "/" + srcFs.root + srcObj.remote) + srcBucket, srcPath := srcObj.split() + source := pathEscape(path.Join(srcBucket, srcPath)) req := s3.CopyObjectInput{ - Bucket: &f.bucket, + Bucket: &dstBucket, ACL: &f.opt.ACL, - Key: &key, + Key: &dstPath, CopySource: &source, MetadataDirective: aws.String(s3.MetadataDirectiveCopy), } @@ -1640,10 +1656,10 @@ func (o *Object) readMetaData(ctx context.Context) (err error) { if o.meta != nil { return nil } - key := o.fs.root + o.remote + bucket, bucketPath := o.split() req := s3.HeadObjectInput{ - Bucket: &o.fs.bucket, - Key: &key, + Bucket: &bucket, + Key: &bucketPath, } var resp *s3.HeadObjectOutput err = o.fs.pacer.Call(func() (bool, error) { @@ -1722,13 +1738,13 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error { mimeType := fs.MimeType(ctx, o) // Copy the object to itself to update the metadata - key := o.fs.root + o.remote - sourceKey := o.fs.bucket + "/" + key + bucket, bucketPath := o.split() + sourceKey := path.Join(bucket, bucketPath) directive := s3.MetadataDirectiveReplace // replace metadata with that passed in req := s3.CopyObjectInput{ - Bucket: &o.fs.bucket, + Bucket: &bucket, ACL: &o.fs.opt.ACL, - Key: &key, + Key: &bucketPath, ContentType: &mimeType, CopySource: aws.String(pathEscape(sourceKey)), Metadata: o.meta, @@ -1760,10 +1776,10 @@ func (o *Object) Storable() bool { // Open an object for read func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) { - key := o.fs.root + o.remote + bucket, bucketPath := o.split() req := s3.GetObjectInput{ - Bucket: &o.fs.bucket, - Key: &key, + Bucket: &bucket, + Key: &bucketPath, } fs.FixRangeOption(options, o.bytes) for _, option := range options { @@ -1785,7 +1801,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read }) if err, ok := err.(awserr.RequestFailure); ok { if err.Code() == "InvalidObjectState" { - return nil, errors.Errorf("Object in GLACIER, restore first: %v", key) + return nil, errors.Errorf("Object in GLACIER, restore first: bucket=%q, key=%q", bucket, bucketPath) } } if err != nil { @@ -1796,6 +1812,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read // Update the Object from in with modTime and size func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + bucket, bucketPath := o.split() err := o.fs.Mkdir(ctx, "") if err != nil { return err @@ -1849,13 +1866,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Guess the content type mimeType := fs.MimeType(ctx, src) - - key := o.fs.root + o.remote if multipart { req := s3manager.UploadInput{ - Bucket: &o.fs.bucket, + Bucket: &bucket, ACL: &o.fs.opt.ACL, - Key: &key, + Key: &bucketPath, Body: in, ContentType: &mimeType, Metadata: metadata, @@ -1879,9 +1894,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } } else { req := s3.PutObjectInput{ - Bucket: &o.fs.bucket, + Bucket: &bucket, ACL: &o.fs.opt.ACL, - Key: &key, + Key: &bucketPath, ContentType: &mimeType, Metadata: metadata, } @@ -1954,10 +1969,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Remove an object func (o *Object) Remove(ctx context.Context) error { - key := o.fs.root + o.remote + bucket, bucketPath := o.split() req := s3.DeleteObjectInput{ - Bucket: &o.fs.bucket, - Key: &key, + Bucket: &bucket, + Key: &bucketPath, } err := o.fs.pacer.Call(func() (bool, error) { _, err := o.fs.c.DeleteObjectWithContext(ctx, &req)