diff --git a/backend/smb/filepool.go b/backend/smb/filepool.go new file mode 100644 index 000000000..ace4d99bd --- /dev/null +++ b/backend/smb/filepool.go @@ -0,0 +1,91 @@ +package smb + +import ( + "context" + "fmt" + "os" + + "github.com/cloudsoda/go-smb2" + "golang.org/x/sync/errgroup" +) + +type smbChunkWriterFile struct { + *smb2.File + c *conn +} + +func (w *smbChunkWriter) getFile(ctx context.Context) (f *smbChunkWriterFile, err error) { + w.poolMu.Lock() + if len(w.pool) > 0 { + f = w.pool[0] + w.pool = w.pool[1:] + } + w.poolMu.Unlock() + + if f != nil { + return f, nil + } + + w.f.addSession() // Show session in use + + c, err := w.f.getConnection(ctx, w.share) + if err != nil { + w.f.removeSession() + return nil, err + } + + fl, err := c.smbShare.OpenFile(w.filename, os.O_WRONLY, 0o644) + if err != nil { + w.f.putConnection(&c, err) + w.f.removeSession() + return nil, fmt.Errorf("failed to open: %w", err) + } + + return &smbChunkWriterFile{File: fl, c: c}, nil +} + +func (w *smbChunkWriter) putFile(pf **smbChunkWriterFile, err error) { + if pf == nil { + return + } + f := *pf + if f == nil { + return + } + *pf = nil + + if err != nil { + _ = f.Close() + w.f.putConnection(&f.c, err) + w.f.removeSession() + return + } + + w.poolMu.Lock() + w.pool = append(w.pool, f) + w.poolMu.Unlock() +} + +func (w *smbChunkWriter) drainPool(ctx context.Context) (err error) { + w.poolMu.Lock() + defer w.poolMu.Unlock() + + if len(w.pool) == 0 { + return nil + } + + g, _ := errgroup.WithContext(ctx) + for i, f := range w.pool { + g.Go(func() error { + err := f.Close() + w.f.putConnection(&f.c, err) + w.f.removeSession() + w.pool[i] = nil + return err + }) + } + err = g.Wait() + w.pool = nil + + return err +} diff --git a/backend/smb/smb.go b/backend/smb/smb.go index c6911fb76..5eb04a37b 100644 --- a/backend/smb/smb.go +++ b/backend/smb/smb.go @@ -3,6 +3,7 @@ package smb import ( "context" + "errors" "fmt" "io" "os" @@ -20,14 +21,18 @@ import ( "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/env" + "github.com/rclone/rclone/lib/multipart" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/readers" ) const ( - minSleep = 10 * time.Millisecond - maxSleep = 2 * time.Second - decayConstant = 2 // bigger for slower decay, exponential + minSleep = 10 * time.Millisecond + maxSleep = 2 * time.Second + decayConstant = 2 // bigger for slower decay, exponential + defaultChunkSize = 5 * fs.Mebi + defaultUploadCutoff = 200 * fs.Mebi + defaultUploadConcurrency = 4 ) var ( @@ -124,22 +129,54 @@ Set to 0 to keep connections indefinitely. encoder.EncodeRightPeriod | // encoder.EncodeInvalidUtf8, + }, { + Name: "upload_cutoff", + Help: `Cutoff for switching to chunked upload. + +Files above this size will be uploaded in chunks of "--smb-chunk-size".`, + Default: defaultUploadCutoff, + Advanced: true, + }, { + Name: "chunk_size", + Help: `Upload chunk size. + +When uploading large files, chunk the file into this size. + +Must fit in memory. These chunks are buffered in memory and there +might a maximum of "--transfers" chunks in progress at once.`, + Default: defaultChunkSize, + Advanced: true, + }, { + Name: "upload_concurrency", + Help: `Concurrency for multipart uploads. + +This is the number of chunks of the same file that are uploaded +concurrently. + +Note that chunks are stored in memory and there may be up to +"--transfers" * "--smb-upload-concurrency" chunks stored at once +in memory.`, + Default: defaultUploadConcurrency, + Advanced: true, }, }}) } // Options defines the configuration for this backend type Options struct { - Host string `config:"host"` - Port string `config:"port"` - User string `config:"user"` - Pass string `config:"pass"` - Domain string `config:"domain"` - SPN string `config:"spn"` - UseKerberos bool `config:"use_kerberos"` - HideSpecial bool `config:"hide_special_share"` - CaseInsensitive bool `config:"case_insensitive"` - IdleTimeout fs.Duration `config:"idle_timeout"` + Host string `config:"host"` + Port string `config:"port"` + User string `config:"user"` + Pass string `config:"pass"` + Domain string `config:"domain"` + SPN string `config:"spn"` + UseKerberos bool `config:"use_kerberos"` + HideSpecial bool `config:"hide_special_share"` + CaseInsensitive bool `config:"case_insensitive"` + IdleTimeout fs.Duration `config:"idle_timeout"` + UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` + ChunkSize fs.SizeSuffix `config:"chunk_size"` + UploadConcurrency int `config:"upload_concurrency"` Enc encoder.MultiEncoder `config:"encoding"` } @@ -176,6 +213,19 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e return nil, err } + if opt.UploadCutoff < opt.ChunkSize { + opt.UploadCutoff = opt.ChunkSize + fs.Infof(nil, "smb: raising upload cutoff to chunk size: %v", opt.UploadCutoff) + } + err = checkUploadChunkSize(opt.ChunkSize) + if err != nil { + return nil, fmt.Errorf("smb: chunk size: %w", err) + } + err = checkUploadCutoff(opt, opt.UploadCutoff) + if err != nil { + return nil, fmt.Errorf("smb: upload cutoff: %w", err) + } + root = strings.Trim(root, "/") f := &Fs{ @@ -488,25 +538,70 @@ func (f *Fs) About(ctx context.Context) (_ *fs.Usage, err error) { return usage, nil } -// OpenWriterAt opens with a handle for random access writes +func checkUploadChunkSize(cs fs.SizeSuffix) error { + const minChunkSize = fs.SizeSuffixBase + if cs < minChunkSize { + return fmt.Errorf("%s is less than %s", cs, minChunkSize) + } + return nil +} + +func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadChunkSize(cs) + if err == nil { + old, f.opt.ChunkSize = f.opt.ChunkSize, cs + } + return +} + +func checkUploadCutoff(opt *Options, cs fs.SizeSuffix) error { + if cs < opt.ChunkSize { + return fmt.Errorf("%v is less than chunk size %v", cs, opt.ChunkSize) + } + return nil +} + +func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + err = checkUploadCutoff(&f.opt, cs) + if err == nil { + old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs + } + return +} + +// Implements the fs.ChunkWriter interface +type smbChunkWriter struct { + chunkSize int64 + f *Fs + o *Object + src fs.ObjectInfo + share string + filename string + + closed bool + closeMu sync.Mutex + wg sync.WaitGroup + poolMu sync.Mutex + pool []*smbChunkWriterFile +} + +// OpenChunkWriter returns the chunk size and a ChunkWriter // -// Pass in the remote desired and the size if known. -// -// It truncates any existing object -func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) { - var err error +// Pass in the remote and the src object +// You can also use options to hint at the desired chunk size +func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) { o := &Object{ fs: f, remote: remote, } share, filename := o.split() if share == "" || filename == "" { - return nil, fs.ErrorIsDir + return info, nil, fs.ErrorIsDir } err = o.fs.ensureDirectory(ctx, share, filename) if err != nil { - return nil, fmt.Errorf("failed to make parent directories: %w", err) + return info, nil, fmt.Errorf("failed to make parent directories: %w", err) } filename = o.fs.toSambaPath(filename) @@ -516,15 +611,132 @@ func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.Wr cn, err := o.fs.getConnection(ctx, share) if err != nil { - return nil, err + return info, nil, err } + defer o.fs.putConnection(&cn, err) + // create the file or truncate it fl, err := cn.smbShare.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) if err != nil { - return nil, fmt.Errorf("failed to open: %w", err) + return info, nil, fmt.Errorf("failed to open: %w", err) + } + defer func() { + cErr := fl.Close() + if cErr != nil { + err = errors.Join(err, fmt.Errorf("failed to close: %w", cErr)) + } + }() + + // preallocate the file to the correct size if possible + if src.Size() > 0 { + err = fl.Truncate(src.Size()) + if err != nil { + return info, nil, fmt.Errorf("failed to truncate: %w", err) + } } - return fl, nil + chunkSize := int64(f.opt.ChunkSize) + for _, opt := range options { + if chunkOption, ok := opt.(*fs.ChunkOption); ok { + chunkSize = chunkOption.ChunkSize + } + } + + chunkWriter := &smbChunkWriter{ + chunkSize: chunkSize, + f: f, + o: o, + src: src, + share: share, + filename: filename, + } + + info = fs.ChunkWriterInfo{ + ChunkSize: chunkSize, + Concurrency: f.opt.UploadConcurrency, + } + + fs.Debugf(o, "open chunk writer: started multipart upload") + + return info, chunkWriter, nil +} + +// WriteChunk will write chunk number with reader bytes, where chunk number >= 0 +func (w *smbChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (n int64, err error) { + if w.closed { + return 0, errors.New("multipart upload already closed") + } + + w.wg.Add(1) + defer w.wg.Done() + + fl, err := w.getFile(ctx) + if err != nil { + return 0, fmt.Errorf("failed to open: %w", err) + } + defer w.putFile(&fl, err) + + _, err = fl.Seek(int64(chunkNumber)*w.chunkSize, io.SeekStart) + if err != nil { + return 0, fmt.Errorf("failed to seek: %w", err) + } + + n, err = io.CopyN(fl, reader, w.chunkSize) + if err != nil && !errors.Is(err, io.EOF) { + return n, fmt.Errorf("failed to copy: %w", err) + } + + fs.Debugf(w.o, "multipart upload wrote chunk %d with %d bytes", chunkNumber+1, n) + + return n, nil +} + +// Close and finalise the multipart upload +func (w *smbChunkWriter) Close(ctx context.Context) (err error) { + w.closeMu.Lock() + defer w.closeMu.Unlock() + + if w.closed { + return nil + } + w.closed = true + + w.wg.Wait() // wait for all pending writes to finish + + var errs []error + if err := w.drainPool(ctx); err != nil { + errs = append(errs, fmt.Errorf("failed to drain file pool: %w", err)) + } + if err := w.o.SetModTime(ctx, w.src.ModTime(ctx)); err != nil { + errs = append(errs, fmt.Errorf("failed to set modtime: %w", err)) + } + + if len(errs) > 0 { + return errors.Join(errs...) + } + + fs.Debug(w.o, "multipart upload finished") + + return nil +} + +// Abort the multipart upload. +func (w *smbChunkWriter) Abort(ctx context.Context) (err error) { + fs.Debugf(w.o, "aborting multipart upload") + + err = w.Close(ctx) + if err != nil { + fs.Errorf(w.o, "failed to close before aborting: %v", err) + } + + err = w.o.Remove(ctx) + if err != nil { + fs.Errorf(w.o, "failed to remove file after aborting: %v", err) + } + + fs.Debug(w.o, "multipart upload aborted") + + return nil } // Shutdown the backend, closing any background tasks and any @@ -676,8 +888,27 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read return in, nil } +func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) (err error) { + cw, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ + Open: o.fs, + OpenOptions: options, + }) + if err != nil { + return err + } + + o.statResult = cw.(*smbChunkWriter).o.statResult + + return nil +} + // Update the Object from in with modTime and size func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { + size := src.Size() + if size < 0 || size >= int64(o.fs.opt.UploadCutoff) { + return o.uploadMultipart(ctx, src, in, options...) + } + share, filename := o.split() if share == "" || filename == "" { return fs.ErrorIsDir diff --git a/backend/smb/smb_test.go b/backend/smb/smb_test.go index f22bdb7f9..69f2bfa71 100644 --- a/backend/smb/smb_test.go +++ b/backend/smb/smb_test.go @@ -1,11 +1,11 @@ // Test smb filesystem interface -package smb_test +package smb import ( "path/filepath" "testing" - "github.com/rclone/rclone/backend/smb" + "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fstest/fstests" ) @@ -13,7 +13,7 @@ import ( func TestIntegration(t *testing.T) { fstests.Run(t, &fstests.Opt{ RemoteName: "TestSMB:rclone", - NilObject: (*smb.Object)(nil), + NilObject: (*Object)(nil), }) } @@ -23,6 +23,19 @@ func TestIntegration2(t *testing.T) { t.Setenv("KRB5CCNAME", filepath.Join(krb5Dir, "ccache")) fstests.Run(t, &fstests.Opt{ RemoteName: "TestSMBKerberos:rclone", - NilObject: (*smb.Object)(nil), + NilObject: (*Object)(nil), }) } + +func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadChunkSize(cs) +} + +func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + return f.setUploadCutoff(cs) +} + +var ( + _ fstests.SetUploadChunkSizer = (*Fs)(nil) + _ fstests.SetUploadCutoffer = (*Fs)(nil) +) diff --git a/docs/content/docs.md b/docs/content/docs.md index 17c909fea..bebc9eb4d 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -1751,7 +1751,7 @@ multiples of 16k performed much better than other values. ### --multi-thread-chunk-size=SizeSuffix ### Normally the chunk size for multi thread transfers is set by the backend. -However some backends such as `local` and `smb` (which implement `OpenWriterAt` +However some backends such as `local` and `pcloud` (which implement `OpenWriterAt` but not `OpenChunkWriter`) don't have a natural chunk size. In this case the value of this option is used (default 64Mi).