mirror of https://github.com/rclone/rclone.git
smb: implement OpenChunkWriter instead of OpenWriterAt
This enables true multi-connection upload instread of writing all the chunks in a single connection.
This commit is contained in:
parent
dc9c87279b
commit
bd5f6e969b
|
@ -0,0 +1,91 @@
|
|||
package smb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/cloudsoda/go-smb2"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type smbChunkWriterFile struct {
|
||||
*smb2.File
|
||||
c *conn
|
||||
}
|
||||
|
||||
func (w *smbChunkWriter) getFile(ctx context.Context) (f *smbChunkWriterFile, err error) {
|
||||
w.poolMu.Lock()
|
||||
if len(w.pool) > 0 {
|
||||
f = w.pool[0]
|
||||
w.pool = w.pool[1:]
|
||||
}
|
||||
w.poolMu.Unlock()
|
||||
|
||||
if f != nil {
|
||||
return f, nil
|
||||
}
|
||||
|
||||
w.f.addSession() // Show session in use
|
||||
|
||||
c, err := w.f.getConnection(ctx, w.share)
|
||||
if err != nil {
|
||||
w.f.removeSession()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fl, err := c.smbShare.OpenFile(w.filename, os.O_WRONLY, 0o644)
|
||||
if err != nil {
|
||||
w.f.putConnection(&c, err)
|
||||
w.f.removeSession()
|
||||
return nil, fmt.Errorf("failed to open: %w", err)
|
||||
}
|
||||
|
||||
return &smbChunkWriterFile{File: fl, c: c}, nil
|
||||
}
|
||||
|
||||
func (w *smbChunkWriter) putFile(pf **smbChunkWriterFile, err error) {
|
||||
if pf == nil {
|
||||
return
|
||||
}
|
||||
f := *pf
|
||||
if f == nil {
|
||||
return
|
||||
}
|
||||
*pf = nil
|
||||
|
||||
if err != nil {
|
||||
_ = f.Close()
|
||||
w.f.putConnection(&f.c, err)
|
||||
w.f.removeSession()
|
||||
return
|
||||
}
|
||||
|
||||
w.poolMu.Lock()
|
||||
w.pool = append(w.pool, f)
|
||||
w.poolMu.Unlock()
|
||||
}
|
||||
|
||||
func (w *smbChunkWriter) drainPool(ctx context.Context) (err error) {
|
||||
w.poolMu.Lock()
|
||||
defer w.poolMu.Unlock()
|
||||
|
||||
if len(w.pool) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
g, _ := errgroup.WithContext(ctx)
|
||||
for i, f := range w.pool {
|
||||
g.Go(func() error {
|
||||
err := f.Close()
|
||||
w.f.putConnection(&f.c, err)
|
||||
w.f.removeSession()
|
||||
w.pool[i] = nil
|
||||
return err
|
||||
})
|
||||
}
|
||||
err = g.Wait()
|
||||
w.pool = nil
|
||||
|
||||
return err
|
||||
}
|
|
@ -3,6 +3,7 @@ package smb
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
@ -20,14 +21,18 @@ import (
|
|||
"github.com/rclone/rclone/lib/bucket"
|
||||
"github.com/rclone/rclone/lib/encoder"
|
||||
"github.com/rclone/rclone/lib/env"
|
||||
"github.com/rclone/rclone/lib/multipart"
|
||||
"github.com/rclone/rclone/lib/pacer"
|
||||
"github.com/rclone/rclone/lib/readers"
|
||||
)
|
||||
|
||||
const (
|
||||
minSleep = 10 * time.Millisecond
|
||||
maxSleep = 2 * time.Second
|
||||
decayConstant = 2 // bigger for slower decay, exponential
|
||||
minSleep = 10 * time.Millisecond
|
||||
maxSleep = 2 * time.Second
|
||||
decayConstant = 2 // bigger for slower decay, exponential
|
||||
defaultChunkSize = 5 * fs.Mebi
|
||||
defaultUploadCutoff = 200 * fs.Mebi
|
||||
defaultUploadConcurrency = 4
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -124,22 +129,54 @@ Set to 0 to keep connections indefinitely.
|
|||
encoder.EncodeRightPeriod |
|
||||
//
|
||||
encoder.EncodeInvalidUtf8,
|
||||
}, {
|
||||
Name: "upload_cutoff",
|
||||
Help: `Cutoff for switching to chunked upload.
|
||||
|
||||
Files above this size will be uploaded in chunks of "--smb-chunk-size".`,
|
||||
Default: defaultUploadCutoff,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "chunk_size",
|
||||
Help: `Upload chunk size.
|
||||
|
||||
When uploading large files, chunk the file into this size.
|
||||
|
||||
Must fit in memory. These chunks are buffered in memory and there
|
||||
might a maximum of "--transfers" chunks in progress at once.`,
|
||||
Default: defaultChunkSize,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "upload_concurrency",
|
||||
Help: `Concurrency for multipart uploads.
|
||||
|
||||
This is the number of chunks of the same file that are uploaded
|
||||
concurrently.
|
||||
|
||||
Note that chunks are stored in memory and there may be up to
|
||||
"--transfers" * "--smb-upload-concurrency" chunks stored at once
|
||||
in memory.`,
|
||||
Default: defaultUploadConcurrency,
|
||||
Advanced: true,
|
||||
},
|
||||
}})
|
||||
}
|
||||
|
||||
// Options defines the configuration for this backend
|
||||
type Options struct {
|
||||
Host string `config:"host"`
|
||||
Port string `config:"port"`
|
||||
User string `config:"user"`
|
||||
Pass string `config:"pass"`
|
||||
Domain string `config:"domain"`
|
||||
SPN string `config:"spn"`
|
||||
UseKerberos bool `config:"use_kerberos"`
|
||||
HideSpecial bool `config:"hide_special_share"`
|
||||
CaseInsensitive bool `config:"case_insensitive"`
|
||||
IdleTimeout fs.Duration `config:"idle_timeout"`
|
||||
Host string `config:"host"`
|
||||
Port string `config:"port"`
|
||||
User string `config:"user"`
|
||||
Pass string `config:"pass"`
|
||||
Domain string `config:"domain"`
|
||||
SPN string `config:"spn"`
|
||||
UseKerberos bool `config:"use_kerberos"`
|
||||
HideSpecial bool `config:"hide_special_share"`
|
||||
CaseInsensitive bool `config:"case_insensitive"`
|
||||
IdleTimeout fs.Duration `config:"idle_timeout"`
|
||||
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
|
||||
ChunkSize fs.SizeSuffix `config:"chunk_size"`
|
||||
UploadConcurrency int `config:"upload_concurrency"`
|
||||
|
||||
Enc encoder.MultiEncoder `config:"encoding"`
|
||||
}
|
||||
|
@ -176,6 +213,19 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if opt.UploadCutoff < opt.ChunkSize {
|
||||
opt.UploadCutoff = opt.ChunkSize
|
||||
fs.Infof(nil, "smb: raising upload cutoff to chunk size: %v", opt.UploadCutoff)
|
||||
}
|
||||
err = checkUploadChunkSize(opt.ChunkSize)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("smb: chunk size: %w", err)
|
||||
}
|
||||
err = checkUploadCutoff(opt, opt.UploadCutoff)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("smb: upload cutoff: %w", err)
|
||||
}
|
||||
|
||||
root = strings.Trim(root, "/")
|
||||
|
||||
f := &Fs{
|
||||
|
@ -488,25 +538,70 @@ func (f *Fs) About(ctx context.Context) (_ *fs.Usage, err error) {
|
|||
return usage, nil
|
||||
}
|
||||
|
||||
// OpenWriterAt opens with a handle for random access writes
|
||||
func checkUploadChunkSize(cs fs.SizeSuffix) error {
|
||||
const minChunkSize = fs.SizeSuffixBase
|
||||
if cs < minChunkSize {
|
||||
return fmt.Errorf("%s is less than %s", cs, minChunkSize)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
||||
err = checkUploadChunkSize(cs)
|
||||
if err == nil {
|
||||
old, f.opt.ChunkSize = f.opt.ChunkSize, cs
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func checkUploadCutoff(opt *Options, cs fs.SizeSuffix) error {
|
||||
if cs < opt.ChunkSize {
|
||||
return fmt.Errorf("%v is less than chunk size %v", cs, opt.ChunkSize)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
||||
err = checkUploadCutoff(&f.opt, cs)
|
||||
if err == nil {
|
||||
old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Implements the fs.ChunkWriter interface
|
||||
type smbChunkWriter struct {
|
||||
chunkSize int64
|
||||
f *Fs
|
||||
o *Object
|
||||
src fs.ObjectInfo
|
||||
share string
|
||||
filename string
|
||||
|
||||
closed bool
|
||||
closeMu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
poolMu sync.Mutex
|
||||
pool []*smbChunkWriterFile
|
||||
}
|
||||
|
||||
// OpenChunkWriter returns the chunk size and a ChunkWriter
|
||||
//
|
||||
// Pass in the remote desired and the size if known.
|
||||
//
|
||||
// It truncates any existing object
|
||||
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
|
||||
var err error
|
||||
// Pass in the remote and the src object
|
||||
// You can also use options to hint at the desired chunk size
|
||||
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
|
||||
o := &Object{
|
||||
fs: f,
|
||||
remote: remote,
|
||||
}
|
||||
share, filename := o.split()
|
||||
if share == "" || filename == "" {
|
||||
return nil, fs.ErrorIsDir
|
||||
return info, nil, fs.ErrorIsDir
|
||||
}
|
||||
|
||||
err = o.fs.ensureDirectory(ctx, share, filename)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to make parent directories: %w", err)
|
||||
return info, nil, fmt.Errorf("failed to make parent directories: %w", err)
|
||||
}
|
||||
|
||||
filename = o.fs.toSambaPath(filename)
|
||||
|
@ -516,15 +611,132 @@ func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.Wr
|
|||
|
||||
cn, err := o.fs.getConnection(ctx, share)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return info, nil, err
|
||||
}
|
||||
defer o.fs.putConnection(&cn, err)
|
||||
|
||||
// create the file or truncate it
|
||||
fl, err := cn.smbShare.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open: %w", err)
|
||||
return info, nil, fmt.Errorf("failed to open: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
cErr := fl.Close()
|
||||
if cErr != nil {
|
||||
err = errors.Join(err, fmt.Errorf("failed to close: %w", cErr))
|
||||
}
|
||||
}()
|
||||
|
||||
// preallocate the file to the correct size if possible
|
||||
if src.Size() > 0 {
|
||||
err = fl.Truncate(src.Size())
|
||||
if err != nil {
|
||||
return info, nil, fmt.Errorf("failed to truncate: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return fl, nil
|
||||
chunkSize := int64(f.opt.ChunkSize)
|
||||
for _, opt := range options {
|
||||
if chunkOption, ok := opt.(*fs.ChunkOption); ok {
|
||||
chunkSize = chunkOption.ChunkSize
|
||||
}
|
||||
}
|
||||
|
||||
chunkWriter := &smbChunkWriter{
|
||||
chunkSize: chunkSize,
|
||||
f: f,
|
||||
o: o,
|
||||
src: src,
|
||||
share: share,
|
||||
filename: filename,
|
||||
}
|
||||
|
||||
info = fs.ChunkWriterInfo{
|
||||
ChunkSize: chunkSize,
|
||||
Concurrency: f.opt.UploadConcurrency,
|
||||
}
|
||||
|
||||
fs.Debugf(o, "open chunk writer: started multipart upload")
|
||||
|
||||
return info, chunkWriter, nil
|
||||
}
|
||||
|
||||
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
|
||||
func (w *smbChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (n int64, err error) {
|
||||
if w.closed {
|
||||
return 0, errors.New("multipart upload already closed")
|
||||
}
|
||||
|
||||
w.wg.Add(1)
|
||||
defer w.wg.Done()
|
||||
|
||||
fl, err := w.getFile(ctx)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to open: %w", err)
|
||||
}
|
||||
defer w.putFile(&fl, err)
|
||||
|
||||
_, err = fl.Seek(int64(chunkNumber)*w.chunkSize, io.SeekStart)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to seek: %w", err)
|
||||
}
|
||||
|
||||
n, err = io.CopyN(fl, reader, w.chunkSize)
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return n, fmt.Errorf("failed to copy: %w", err)
|
||||
}
|
||||
|
||||
fs.Debugf(w.o, "multipart upload wrote chunk %d with %d bytes", chunkNumber+1, n)
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Close and finalise the multipart upload
|
||||
func (w *smbChunkWriter) Close(ctx context.Context) (err error) {
|
||||
w.closeMu.Lock()
|
||||
defer w.closeMu.Unlock()
|
||||
|
||||
if w.closed {
|
||||
return nil
|
||||
}
|
||||
w.closed = true
|
||||
|
||||
w.wg.Wait() // wait for all pending writes to finish
|
||||
|
||||
var errs []error
|
||||
if err := w.drainPool(ctx); err != nil {
|
||||
errs = append(errs, fmt.Errorf("failed to drain file pool: %w", err))
|
||||
}
|
||||
if err := w.o.SetModTime(ctx, w.src.ModTime(ctx)); err != nil {
|
||||
errs = append(errs, fmt.Errorf("failed to set modtime: %w", err))
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
fs.Debug(w.o, "multipart upload finished")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Abort the multipart upload.
|
||||
func (w *smbChunkWriter) Abort(ctx context.Context) (err error) {
|
||||
fs.Debugf(w.o, "aborting multipart upload")
|
||||
|
||||
err = w.Close(ctx)
|
||||
if err != nil {
|
||||
fs.Errorf(w.o, "failed to close before aborting: %v", err)
|
||||
}
|
||||
|
||||
err = w.o.Remove(ctx)
|
||||
if err != nil {
|
||||
fs.Errorf(w.o, "failed to remove file after aborting: %v", err)
|
||||
}
|
||||
|
||||
fs.Debug(w.o, "multipart upload aborted")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown the backend, closing any background tasks and any
|
||||
|
@ -676,8 +888,27 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
|||
return in, nil
|
||||
}
|
||||
|
||||
func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) (err error) {
|
||||
cw, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
|
||||
Open: o.fs,
|
||||
OpenOptions: options,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
o.statResult = cw.(*smbChunkWriter).o.statResult
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update the Object from in with modTime and size
|
||||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
||||
size := src.Size()
|
||||
if size < 0 || size >= int64(o.fs.opt.UploadCutoff) {
|
||||
return o.uploadMultipart(ctx, src, in, options...)
|
||||
}
|
||||
|
||||
share, filename := o.split()
|
||||
if share == "" || filename == "" {
|
||||
return fs.ErrorIsDir
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
// Test smb filesystem interface
|
||||
package smb_test
|
||||
package smb
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/rclone/rclone/backend/smb"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fstest/fstests"
|
||||
)
|
||||
|
||||
|
@ -13,7 +13,7 @@ import (
|
|||
func TestIntegration(t *testing.T) {
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestSMB:rclone",
|
||||
NilObject: (*smb.Object)(nil),
|
||||
NilObject: (*Object)(nil),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,19 @@ func TestIntegration2(t *testing.T) {
|
|||
t.Setenv("KRB5CCNAME", filepath.Join(krb5Dir, "ccache"))
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestSMBKerberos:rclone",
|
||||
NilObject: (*smb.Object)(nil),
|
||||
NilObject: (*Object)(nil),
|
||||
})
|
||||
}
|
||||
|
||||
func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
|
||||
return f.setUploadChunkSize(cs)
|
||||
}
|
||||
|
||||
func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
|
||||
return f.setUploadCutoff(cs)
|
||||
}
|
||||
|
||||
var (
|
||||
_ fstests.SetUploadChunkSizer = (*Fs)(nil)
|
||||
_ fstests.SetUploadCutoffer = (*Fs)(nil)
|
||||
)
|
||||
|
|
|
@ -1751,7 +1751,7 @@ multiples of 16k performed much better than other values.
|
|||
### --multi-thread-chunk-size=SizeSuffix ###
|
||||
|
||||
Normally the chunk size for multi thread transfers is set by the backend.
|
||||
However some backends such as `local` and `smb` (which implement `OpenWriterAt`
|
||||
However some backends such as `local` and `pcloud` (which implement `OpenWriterAt`
|
||||
but not `OpenChunkWriter`) don't have a natural chunk size.
|
||||
|
||||
In this case the value of this option is used (default 64Mi).
|
||||
|
|
Loading…
Reference in New Issue