fs: allow setting a write buffer for multithread

when multi-thread downloading is enabled, rclone used
to send a write to disk after every read, resulting in a lot
of small writes to different locations of the file.

depending on the underlying filesystem or device, it can be more
efficient to send bigger writes.
This commit is contained in:
Paulo Schreiner 2023-06-02 14:00:06 +02:00 committed by Nick Craig-Wood
parent 5f938fb9ed
commit fcb912a664
4 changed files with 160 additions and 100 deletions

View File

@ -1511,6 +1511,25 @@ if you are reading and writing to an OS X filing system this will be
This command line flag allows you to override that computed default. This command line flag allows you to override that computed default.
### --multi-thread-write-buffer-size=SIZE ###
When downloading with multiple threads, rclone will buffer SIZE bytes in
memory before writing to disk for each thread.
This can improve performance if the underlying filesystem does not deal
well with a lot of small writes in different positions of the file, so
if you see downloads being limited by disk write speed, you might want
to experiment with different values. Specially for magnetic drives and
remote file systems a higher value can be useful.
Nevertheless, the default of `128k` should be fine for almost all use
cases, so before changing it ensure that network is not really your
bottleneck.
As a final hint, size is not the only factor: block size (or similar
concept) can have an impact. In one case, we observed that exact
multiples of 16k performed much better than other values.
### --multi-thread-cutoff=SIZE ### ### --multi-thread-cutoff=SIZE ###
When downloading files to the local backend above this size, rclone When downloading files to the local backend above this size, rclone

View File

@ -51,101 +51,102 @@ var (
// ConfigInfo is filesystem config options // ConfigInfo is filesystem config options
type ConfigInfo struct { type ConfigInfo struct {
LogLevel LogLevel LogLevel LogLevel
StatsLogLevel LogLevel StatsLogLevel LogLevel
UseJSONLog bool UseJSONLog bool
DryRun bool DryRun bool
Interactive bool Interactive bool
CheckSum bool CheckSum bool
SizeOnly bool SizeOnly bool
IgnoreTimes bool IgnoreTimes bool
IgnoreExisting bool IgnoreExisting bool
IgnoreErrors bool IgnoreErrors bool
ModifyWindow time.Duration ModifyWindow time.Duration
Checkers int Checkers int
Transfers int Transfers int
ConnectTimeout time.Duration // Connect timeout ConnectTimeout time.Duration // Connect timeout
Timeout time.Duration // Data channel timeout Timeout time.Duration // Data channel timeout
ExpectContinueTimeout time.Duration ExpectContinueTimeout time.Duration
Dump DumpFlags Dump DumpFlags
InsecureSkipVerify bool // Skip server certificate verification InsecureSkipVerify bool // Skip server certificate verification
DeleteMode DeleteMode DeleteMode DeleteMode
MaxDelete int64 MaxDelete int64
MaxDeleteSize SizeSuffix MaxDeleteSize SizeSuffix
TrackRenames bool // Track file renames. TrackRenames bool // Track file renames.
TrackRenamesStrategy string // Comma separated list of strategies used to track renames TrackRenamesStrategy string // Comma separated list of strategies used to track renames
LowLevelRetries int LowLevelRetries int
UpdateOlder bool // Skip files that are newer on the destination UpdateOlder bool // Skip files that are newer on the destination
NoGzip bool // Disable compression NoGzip bool // Disable compression
MaxDepth int MaxDepth int
IgnoreSize bool IgnoreSize bool
IgnoreChecksum bool IgnoreChecksum bool
IgnoreCaseSync bool IgnoreCaseSync bool
NoTraverse bool NoTraverse bool
CheckFirst bool CheckFirst bool
NoCheckDest bool NoCheckDest bool
NoUnicodeNormalization bool NoUnicodeNormalization bool
NoUpdateModTime bool NoUpdateModTime bool
DataRateUnit string DataRateUnit string
CompareDest []string CompareDest []string
CopyDest []string CopyDest []string
BackupDir string BackupDir string
Suffix string Suffix string
SuffixKeepExtension bool SuffixKeepExtension bool
UseListR bool UseListR bool
BufferSize SizeSuffix BufferSize SizeSuffix
BwLimit BwTimetable MultiThreadWriteBufferSize SizeSuffix
BwLimitFile BwTimetable BwLimit BwTimetable
TPSLimit float64 BwLimitFile BwTimetable
TPSLimitBurst int TPSLimit float64
BindAddr net.IP TPSLimitBurst int
DisableFeatures []string BindAddr net.IP
UserAgent string DisableFeatures []string
Immutable bool UserAgent string
AutoConfirm bool Immutable bool
StreamingUploadCutoff SizeSuffix AutoConfirm bool
StatsFileNameLength int StreamingUploadCutoff SizeSuffix
AskPassword bool StatsFileNameLength int
PasswordCommand SpaceSepList AskPassword bool
UseServerModTime bool PasswordCommand SpaceSepList
MaxTransfer SizeSuffix UseServerModTime bool
MaxDuration time.Duration MaxTransfer SizeSuffix
CutoffMode CutoffMode MaxDuration time.Duration
MaxBacklog int CutoffMode CutoffMode
MaxStatsGroups int MaxBacklog int
StatsOneLine bool MaxStatsGroups int
StatsOneLineDate bool // If we want a date prefix at all StatsOneLine bool
StatsOneLineDateFormat string // If we want to customize the prefix StatsOneLineDate bool // If we want a date prefix at all
ErrorOnNoTransfer bool // Set appropriate exit code if no files transferred StatsOneLineDateFormat string // If we want to customize the prefix
Progress bool ErrorOnNoTransfer bool // Set appropriate exit code if no files transferred
ProgressTerminalTitle bool Progress bool
Cookie bool ProgressTerminalTitle bool
UseMmap bool Cookie bool
CaCert []string // Client Side CA UseMmap bool
ClientCert string // Client Side Cert CaCert []string // Client Side CA
ClientKey string // Client Side Key ClientCert string // Client Side Cert
MultiThreadCutoff SizeSuffix ClientKey string // Client Side Key
MultiThreadStreams int MultiThreadCutoff SizeSuffix
MultiThreadSet bool // whether MultiThreadStreams was set (set in fs/config/configflags) MultiThreadStreams int
OrderBy string // instructions on how to order the transfer MultiThreadSet bool // whether MultiThreadStreams was set (set in fs/config/configflags)
UploadHeaders []*HTTPOption OrderBy string // instructions on how to order the transfer
DownloadHeaders []*HTTPOption UploadHeaders []*HTTPOption
Headers []*HTTPOption DownloadHeaders []*HTTPOption
MetadataSet Metadata // extra metadata to write when uploading Headers []*HTTPOption
RefreshTimes bool MetadataSet Metadata // extra metadata to write when uploading
NoConsole bool RefreshTimes bool
TrafficClass uint8 NoConsole bool
FsCacheExpireDuration time.Duration TrafficClass uint8
FsCacheExpireInterval time.Duration FsCacheExpireDuration time.Duration
DisableHTTP2 bool FsCacheExpireInterval time.Duration
HumanReadable bool DisableHTTP2 bool
KvLockTime time.Duration // maximum time to keep key-value database locked by process HumanReadable bool
DisableHTTPKeepAlives bool KvLockTime time.Duration // maximum time to keep key-value database locked by process
Metadata bool DisableHTTPKeepAlives bool
ServerSideAcrossConfigs bool Metadata bool
TerminalColorMode TerminalColorMode ServerSideAcrossConfigs bool
DefaultTime Time // time that directories with no time should display TerminalColorMode TerminalColorMode
Inplace bool // Download directly to destination file instead of atomic download to temp/rename DefaultTime Time // time that directories with no time should display
Inplace bool // Download directly to destination file instead of atomic download to temp/rename
} }
// NewConfig creates a new config with everything set to the default // NewConfig creates a new config with everything set to the default
@ -170,6 +171,7 @@ func NewConfig() *ConfigInfo {
c.MaxDepth = -1 c.MaxDepth = -1
c.DataRateUnit = "bytes" c.DataRateUnit = "bytes"
c.BufferSize = SizeSuffix(16 << 20) c.BufferSize = SizeSuffix(16 << 20)
c.MultiThreadWriteBufferSize = SizeSuffix(128 * 1024)
c.UserAgent = "rclone/" + Version c.UserAgent = "rclone/" + Version
c.StreamingUploadCutoff = SizeSuffix(100 * 1024) c.StreamingUploadCutoff = SizeSuffix(100 * 1024)
c.MaxStatsGroups = 1000 c.MaxStatsGroups = 1000

View File

@ -126,6 +126,7 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) {
flags.StringVarP(flagSet, &ci.ClientKey, "client-key", "", ci.ClientKey, "Client SSL private key (PEM) for mutual TLS auth") flags.StringVarP(flagSet, &ci.ClientKey, "client-key", "", ci.ClientKey, "Client SSL private key (PEM) for mutual TLS auth")
flags.FVarP(flagSet, &ci.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size") flags.FVarP(flagSet, &ci.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size")
flags.IntVarP(flagSet, &ci.MultiThreadStreams, "multi-thread-streams", "", ci.MultiThreadStreams, "Max number of streams to use for multi-thread downloads") flags.IntVarP(flagSet, &ci.MultiThreadStreams, "multi-thread-streams", "", ci.MultiThreadStreams, "Max number of streams to use for multi-thread downloads")
flags.FVarP(flagSet, &ci.MultiThreadWriteBufferSize, "multi-thread-write-buffer-size", "", "In memory buffer size for writing when in multi-thread mode")
flags.BoolVarP(flagSet, &ci.UseJSONLog, "use-json-log", "", ci.UseJSONLog, "Use json log format") flags.BoolVarP(flagSet, &ci.UseJSONLog, "use-json-log", "", ci.UseJSONLog, "Use json log format")
flags.StringVarP(flagSet, &ci.OrderBy, "order-by", "", ci.OrderBy, "Instructions on how to order the transfers, e.g. 'size,descending'") flags.StringVarP(flagSet, &ci.OrderBy, "order-by", "", ci.OrderBy, "Instructions on how to order the transfers, e.g. 'size,descending'")
flags.StringArrayVarP(flagSet, &uploadHeaders, "header-upload", "", nil, "Set HTTP header for upload transactions") flags.StringArrayVarP(flagSet, &uploadHeaders, "header-upload", "", nil, "Set HTTP header for upload transactions")

View File

@ -1,6 +1,7 @@
package operations package operations
import ( import (
"bufio"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -12,11 +13,32 @@ import (
) )
const ( const (
multithreadChunkSize = 64 << 10 multithreadChunkSize = 64 << 10
multithreadChunkSizeMask = multithreadChunkSize - 1 multithreadChunkSizeMask = multithreadChunkSize - 1
multithreadBufferSize = 32 * 1024 multithreadReadBufferSize = 32 * 1024
) )
// An offsetWriter maps writes at offset base to offset base+off in the underlying writer.
//
// Modified from the go source code. Can be replaced with
// io.OffsetWriter when we no longer need to support go1.19
type offsetWriter struct {
w io.WriterAt
off int64 // the current offset
}
// newOffsetWriter returns an offsetWriter that writes to w
// starting at offset off.
func newOffsetWriter(w io.WriterAt, off int64) *offsetWriter {
return &offsetWriter{w, off}
}
func (o *offsetWriter) Write(p []byte) (n int, err error) {
n, err = o.w.WriteAt(p, o.off)
o.off += int64(n)
return
}
// Return a boolean as to whether we should use multi thread copy for // Return a boolean as to whether we should use multi thread copy for
// this transfer // this transfer
func doMultiThreadCopy(ctx context.Context, f fs.Fs, src fs.Object) bool { func doMultiThreadCopy(ctx context.Context, f fs.Fs, src fs.Object) bool {
@ -62,6 +84,7 @@ type multiThreadCopyState struct {
// Copy a single stream into place // Copy a single stream into place
func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err error) { func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err error) {
ci := fs.GetConfig(ctx)
defer func() { defer func() {
if err != nil { if err != nil {
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d failed: %v", stream+1, mc.streams, err) fs.Debugf(mc.src, "multi-thread copy: stream %d/%d failed: %v", stream+1, mc.streams, err)
@ -84,8 +107,13 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err
} }
defer fs.CheckClose(rc, &err) defer fs.CheckClose(rc, &err)
var writer io.Writer = newOffsetWriter(mc.wc, start)
if ci.MultiThreadWriteBufferSize > 0 {
writer = bufio.NewWriterSize(writer, int(ci.MultiThreadWriteBufferSize))
fs.Debugf(mc.src, "multi-thread copy: write buffer set to %v", ci.MultiThreadWriteBufferSize)
}
// Copy the data // Copy the data
buf := make([]byte, multithreadBufferSize) buf := make([]byte, multithreadReadBufferSize)
offset := start offset := start
for { for {
// Check if context cancelled and exit if so // Check if context cancelled and exit if so
@ -98,7 +126,7 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err
if err != nil { if err != nil {
return fmt.Errorf("multipart copy: accounting failed: %w", err) return fmt.Errorf("multipart copy: accounting failed: %w", err)
} }
nw, ew := mc.wc.WriteAt(buf[0:nr], offset) nw, ew := writer.Write(buf[0:nr])
if nw > 0 { if nw > 0 {
offset += int64(nw) offset += int64(nw)
} }
@ -113,6 +141,16 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err
if er != io.EOF { if er != io.EOF {
return fmt.Errorf("multipart copy: read failed: %w", er) return fmt.Errorf("multipart copy: read failed: %w", er)
} }
// if we were buffering, flush do disk
switch w := writer.(type) {
case *bufio.Writer:
er2 := w.Flush()
if er2 != nil {
return fmt.Errorf("multipart copy: flush failed: %w", er2)
}
}
break break
} }
} }