mirror of https://github.com/rclone/rclone.git
sync: fix --cutoff-mode soft & cautious so it doesn't end the transfer early
Before ths fix --cutoff-mode soft and cautious would emit a Fatal error which stopped the sync immediately. This fix introduces a new error which is checked in the sync error processing which stops the sync gracefully. Fixes #4576
This commit is contained in:
parent
233bed6a73
commit
75de30cfa8
|
@ -26,6 +26,10 @@ var ErrorMaxTransferLimitReached = errors.New("Max transfer limit reached as set
|
||||||
// transfer limit is reached.
|
// transfer limit is reached.
|
||||||
var ErrorMaxTransferLimitReachedFatal = fserrors.FatalError(ErrorMaxTransferLimitReached)
|
var ErrorMaxTransferLimitReachedFatal = fserrors.FatalError(ErrorMaxTransferLimitReached)
|
||||||
|
|
||||||
|
// ErrorMaxTransferLimitReachedGraceful is returned from operations.Copy when the max
|
||||||
|
// transfer limit is reached and a graceful stop is required.
|
||||||
|
var ErrorMaxTransferLimitReachedGraceful = fserrors.NoRetryError(ErrorMaxTransferLimitReached)
|
||||||
|
|
||||||
// Account limits and accounts for one transfer
|
// Account limits and accounts for one transfer
|
||||||
type Account struct {
|
type Account struct {
|
||||||
stats *StatsInfo
|
stats *StatsInfo
|
||||||
|
|
|
@ -363,7 +363,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
|
||||||
actionTaken = "Copied (server side copy)"
|
actionTaken = "Copied (server side copy)"
|
||||||
if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) ||
|
if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) ||
|
||||||
(fs.Config.CutoffMode == fs.CutoffModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) {
|
(fs.Config.CutoffMode == fs.CutoffModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) {
|
||||||
return nil, accounting.ErrorMaxTransferLimitReachedFatal
|
return nil, accounting.ErrorMaxTransferLimitReachedGraceful
|
||||||
}
|
}
|
||||||
if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) {
|
if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) {
|
||||||
in := tr.Account(ctx, nil) // account the transfer
|
in := tr.Account(ctx, nil) // account the transfer
|
||||||
|
|
|
@ -1440,7 +1440,7 @@ func TestCopyFileMaxTransfer(t *testing.T) {
|
||||||
err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file3.Path, file3.Path)
|
err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file3.Path, file3.Path)
|
||||||
require.NotNil(t, err)
|
require.NotNil(t, err)
|
||||||
assert.Contains(t, err.Error(), "Max transfer limit reached")
|
assert.Contains(t, err.Error(), "Max transfer limit reached")
|
||||||
assert.True(t, fserrors.IsFatalError(err))
|
assert.True(t, fserrors.IsNoRetryError(err))
|
||||||
fstest.CheckItems(t, r.Flocal, file1, file2, file3, file4)
|
fstest.CheckItems(t, r.Flocal, file1, file2, file3, file4)
|
||||||
fstest.CheckItems(t, r.Fremote, file1)
|
fstest.CheckItems(t, r.Fremote, file1)
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,8 @@ type syncCopyMove struct {
|
||||||
// internal state
|
// internal state
|
||||||
ctx context.Context // internal context for controlling go-routines
|
ctx context.Context // internal context for controlling go-routines
|
||||||
cancel func() // cancel the context
|
cancel func() // cancel the context
|
||||||
|
inCtx context.Context // internal context for controlling march
|
||||||
|
inCancel func() // cancel the march context
|
||||||
noTraverse bool // if set don't traverse the dst
|
noTraverse bool // if set don't traverse the dst
|
||||||
noCheckDest bool // if set transfer all objects regardless without checking dst
|
noCheckDest bool // if set transfer all objects regardless without checking dst
|
||||||
noUnicodeNormalization bool // don't normalize unicode characters in filenames
|
noUnicodeNormalization bool // don't normalize unicode characters in filenames
|
||||||
|
@ -144,6 +146,8 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete
|
||||||
} else {
|
} else {
|
||||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||||
}
|
}
|
||||||
|
// Input context - cancel this for graceful stop
|
||||||
|
s.inCtx, s.inCancel = context.WithCancel(s.ctx)
|
||||||
if s.noTraverse && s.deleteMode != fs.DeleteModeOff {
|
if s.noTraverse && s.deleteMode != fs.DeleteModeOff {
|
||||||
fs.Errorf(nil, "Ignoring --no-traverse with sync")
|
fs.Errorf(nil, "Ignoring --no-traverse with sync")
|
||||||
s.noTraverse = false
|
s.noTraverse = false
|
||||||
|
@ -248,6 +252,12 @@ func (s *syncCopyMove) processError(err error) {
|
||||||
}
|
}
|
||||||
if err == context.DeadlineExceeded {
|
if err == context.DeadlineExceeded {
|
||||||
err = fserrors.NoRetryError(err)
|
err = fserrors.NoRetryError(err)
|
||||||
|
} else if err == accounting.ErrorMaxTransferLimitReachedGraceful {
|
||||||
|
if s.inCtx.Err() == nil {
|
||||||
|
fs.Logf(nil, "%v - stopping transfers", err)
|
||||||
|
// Cancel the march and stop the pipes
|
||||||
|
s.inCancel()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
s.errorMu.Lock()
|
s.errorMu.Lock()
|
||||||
defer s.errorMu.Unlock()
|
defer s.errorMu.Unlock()
|
||||||
|
@ -287,7 +297,7 @@ func (s *syncCopyMove) currentError() error {
|
||||||
func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, fraction int, wg *sync.WaitGroup) {
|
func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, fraction int, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for {
|
for {
|
||||||
pair, ok := in.GetMax(s.ctx, fraction)
|
pair, ok := in.GetMax(s.inCtx, fraction)
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -343,7 +353,7 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, fraction int, wg *sync.W
|
||||||
func (s *syncCopyMove) pairRenamer(in *pipe, out *pipe, fraction int, wg *sync.WaitGroup) {
|
func (s *syncCopyMove) pairRenamer(in *pipe, out *pipe, fraction int, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for {
|
for {
|
||||||
pair, ok := in.GetMax(s.ctx, fraction)
|
pair, ok := in.GetMax(s.inCtx, fraction)
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -363,7 +373,7 @@ func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs,
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
var err error
|
var err error
|
||||||
for {
|
for {
|
||||||
pair, ok := in.GetMax(s.ctx, fraction)
|
pair, ok := in.GetMax(s.inCtx, fraction)
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -809,7 +819,7 @@ func (s *syncCopyMove) run() error {
|
||||||
|
|
||||||
// set up a march over fdst and fsrc
|
// set up a march over fdst and fsrc
|
||||||
m := &march.March{
|
m := &march.March{
|
||||||
Ctx: s.ctx,
|
Ctx: s.inCtx,
|
||||||
Fdst: s.fdst,
|
Fdst: s.fdst,
|
||||||
Fsrc: s.fsrc,
|
Fsrc: s.fsrc,
|
||||||
Dir: s.dir,
|
Dir: s.dir,
|
||||||
|
|
|
@ -1851,27 +1851,32 @@ func TestSyncIgnoreCase(t *testing.T) {
|
||||||
fstest.CheckItems(t, r.Fremote, file2)
|
fstest.CheckItems(t, r.Fremote, file2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test that aborting on max upload works
|
// Test that aborting on --max-transfer works
|
||||||
func TestAbort(t *testing.T) {
|
func TestMaxTransfer(t *testing.T) {
|
||||||
r := fstest.NewRun(t)
|
|
||||||
defer r.Finalise()
|
|
||||||
|
|
||||||
if r.Fremote.Name() != "local" {
|
|
||||||
t.Skip("This test only runs on local")
|
|
||||||
}
|
|
||||||
|
|
||||||
oldMaxTransfer := fs.Config.MaxTransfer
|
oldMaxTransfer := fs.Config.MaxTransfer
|
||||||
oldTransfers := fs.Config.Transfers
|
oldTransfers := fs.Config.Transfers
|
||||||
oldCheckers := fs.Config.Checkers
|
oldCheckers := fs.Config.Checkers
|
||||||
|
oldCutoff := fs.Config.CutoffMode
|
||||||
fs.Config.MaxTransfer = 3 * 1024
|
fs.Config.MaxTransfer = 3 * 1024
|
||||||
fs.Config.Transfers = 1
|
fs.Config.Transfers = 1
|
||||||
fs.Config.Checkers = 1
|
fs.Config.Checkers = 1
|
||||||
|
fs.Config.CutoffMode = fs.CutoffModeHard
|
||||||
defer func() {
|
defer func() {
|
||||||
fs.Config.MaxTransfer = oldMaxTransfer
|
fs.Config.MaxTransfer = oldMaxTransfer
|
||||||
fs.Config.Transfers = oldTransfers
|
fs.Config.Transfers = oldTransfers
|
||||||
fs.Config.Checkers = oldCheckers
|
fs.Config.Checkers = oldCheckers
|
||||||
|
fs.Config.CutoffMode = oldCutoff
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
test := func(t *testing.T, cutoff fs.CutoffMode) {
|
||||||
|
r := fstest.NewRun(t)
|
||||||
|
defer r.Finalise()
|
||||||
|
fs.Config.CutoffMode = cutoff
|
||||||
|
|
||||||
|
if r.Fremote.Name() != "local" {
|
||||||
|
t.Skip("This test only runs on local")
|
||||||
|
}
|
||||||
|
|
||||||
// Create file on source
|
// Create file on source
|
||||||
file1 := r.WriteFile("file1", string(make([]byte, 5*1024)), t1)
|
file1 := r.WriteFile("file1", string(make([]byte, 5*1024)), t1)
|
||||||
file2 := r.WriteFile("file2", string(make([]byte, 2*1024)), t1)
|
file2 := r.WriteFile("file2", string(make([]byte, 2*1024)), t1)
|
||||||
|
@ -1883,6 +1888,14 @@ func TestAbort(t *testing.T) {
|
||||||
|
|
||||||
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
err := Sync(context.Background(), r.Fremote, r.Flocal, false)
|
||||||
expectedErr := fserrors.FsError(accounting.ErrorMaxTransferLimitReachedFatal)
|
expectedErr := fserrors.FsError(accounting.ErrorMaxTransferLimitReachedFatal)
|
||||||
|
if cutoff != fs.CutoffModeHard {
|
||||||
|
expectedErr = accounting.ErrorMaxTransferLimitReachedGraceful
|
||||||
|
}
|
||||||
fserrors.Count(expectedErr)
|
fserrors.Count(expectedErr)
|
||||||
assert.Equal(t, expectedErr, err)
|
assert.Equal(t, expectedErr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("Hard", func(t *testing.T) { test(t, fs.CutoffModeHard) })
|
||||||
|
t.Run("Soft", func(t *testing.T) { test(t, fs.CutoffModeSoft) })
|
||||||
|
t.Run("Cautious", func(t *testing.T) { test(t, fs.CutoffModeCautious) })
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue