diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 83745adb2..44ad4d963 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -26,6 +26,10 @@ var ErrorMaxTransferLimitReached = errors.New("Max transfer limit reached as set // transfer limit is reached. var ErrorMaxTransferLimitReachedFatal = fserrors.FatalError(ErrorMaxTransferLimitReached) +// ErrorMaxTransferLimitReachedGraceful is returned from operations.Copy when the max +// transfer limit is reached and a graceful stop is required. +var ErrorMaxTransferLimitReachedGraceful = fserrors.NoRetryError(ErrorMaxTransferLimitReached) + // Account limits and accounts for one transfer type Account struct { stats *StatsInfo diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 915a15c4f..da2813827 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -363,7 +363,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj actionTaken = "Copied (server side copy)" if fs.Config.MaxTransfer >= 0 && (accounting.Stats(ctx).GetBytes() >= int64(fs.Config.MaxTransfer) || (fs.Config.CutoffMode == fs.CutoffModeCautious && accounting.Stats(ctx).GetBytesWithPending()+src.Size() >= int64(fs.Config.MaxTransfer))) { - return nil, accounting.ErrorMaxTransferLimitReachedFatal + return nil, accounting.ErrorMaxTransferLimitReachedGraceful } if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) { in := tr.Account(ctx, nil) // account the transfer diff --git a/fs/operations/operations_test.go b/fs/operations/operations_test.go index f432c9dd3..be8c1a0c6 100644 --- a/fs/operations/operations_test.go +++ b/fs/operations/operations_test.go @@ -1440,7 +1440,7 @@ func TestCopyFileMaxTransfer(t *testing.T) { err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file3.Path, file3.Path) require.NotNil(t, err) assert.Contains(t, err.Error(), "Max transfer limit reached") - assert.True(t, fserrors.IsFatalError(err)) + assert.True(t, fserrors.IsNoRetryError(err)) fstest.CheckItems(t, r.Flocal, file1, file2, file3, file4) fstest.CheckItems(t, r.Fremote, file1) diff --git a/fs/sync/sync.go b/fs/sync/sync.go index fc6101b39..161f13790 100644 --- a/fs/sync/sync.go +++ b/fs/sync/sync.go @@ -32,6 +32,8 @@ type syncCopyMove struct { // internal state ctx context.Context // internal context for controlling go-routines cancel func() // cancel the context + inCtx context.Context // internal context for controlling march + inCancel func() // cancel the march context noTraverse bool // if set don't traverse the dst noCheckDest bool // if set transfer all objects regardless without checking dst noUnicodeNormalization bool // don't normalize unicode characters in filenames @@ -144,6 +146,8 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete } else { s.ctx, s.cancel = context.WithCancel(ctx) } + // Input context - cancel this for graceful stop + s.inCtx, s.inCancel = context.WithCancel(s.ctx) if s.noTraverse && s.deleteMode != fs.DeleteModeOff { fs.Errorf(nil, "Ignoring --no-traverse with sync") s.noTraverse = false @@ -248,6 +252,12 @@ func (s *syncCopyMove) processError(err error) { } if err == context.DeadlineExceeded { err = fserrors.NoRetryError(err) + } else if err == accounting.ErrorMaxTransferLimitReachedGraceful { + if s.inCtx.Err() == nil { + fs.Logf(nil, "%v - stopping transfers", err) + // Cancel the march and stop the pipes + s.inCancel() + } } s.errorMu.Lock() defer s.errorMu.Unlock() @@ -287,7 +297,7 @@ func (s *syncCopyMove) currentError() error { func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, fraction int, wg *sync.WaitGroup) { defer wg.Done() for { - pair, ok := in.GetMax(s.ctx, fraction) + pair, ok := in.GetMax(s.inCtx, fraction) if !ok { return } @@ -343,7 +353,7 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, fraction int, wg *sync.W func (s *syncCopyMove) pairRenamer(in *pipe, out *pipe, fraction int, wg *sync.WaitGroup) { defer wg.Done() for { - pair, ok := in.GetMax(s.ctx, fraction) + pair, ok := in.GetMax(s.inCtx, fraction) if !ok { return } @@ -363,7 +373,7 @@ func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs, defer wg.Done() var err error for { - pair, ok := in.GetMax(s.ctx, fraction) + pair, ok := in.GetMax(s.inCtx, fraction) if !ok { return } @@ -809,7 +819,7 @@ func (s *syncCopyMove) run() error { // set up a march over fdst and fsrc m := &march.March{ - Ctx: s.ctx, + Ctx: s.inCtx, Fdst: s.fdst, Fsrc: s.fsrc, Dir: s.dir, diff --git a/fs/sync/sync_test.go b/fs/sync/sync_test.go index b43f2adc2..c29459f41 100644 --- a/fs/sync/sync_test.go +++ b/fs/sync/sync_test.go @@ -1851,38 +1851,51 @@ func TestSyncIgnoreCase(t *testing.T) { fstest.CheckItems(t, r.Fremote, file2) } -// Test that aborting on max upload works -func TestAbort(t *testing.T) { - r := fstest.NewRun(t) - defer r.Finalise() - - if r.Fremote.Name() != "local" { - t.Skip("This test only runs on local") - } - +// Test that aborting on --max-transfer works +func TestMaxTransfer(t *testing.T) { oldMaxTransfer := fs.Config.MaxTransfer oldTransfers := fs.Config.Transfers oldCheckers := fs.Config.Checkers + oldCutoff := fs.Config.CutoffMode fs.Config.MaxTransfer = 3 * 1024 fs.Config.Transfers = 1 fs.Config.Checkers = 1 + fs.Config.CutoffMode = fs.CutoffModeHard defer func() { fs.Config.MaxTransfer = oldMaxTransfer fs.Config.Transfers = oldTransfers fs.Config.Checkers = oldCheckers + fs.Config.CutoffMode = oldCutoff }() - // Create file on source - file1 := r.WriteFile("file1", string(make([]byte, 5*1024)), t1) - file2 := r.WriteFile("file2", string(make([]byte, 2*1024)), t1) - file3 := r.WriteFile("file3", string(make([]byte, 3*1024)), t1) - fstest.CheckItems(t, r.Flocal, file1, file2, file3) - fstest.CheckItems(t, r.Fremote) + test := func(t *testing.T, cutoff fs.CutoffMode) { + r := fstest.NewRun(t) + defer r.Finalise() + fs.Config.CutoffMode = cutoff - accounting.GlobalStats().ResetCounters() + if r.Fremote.Name() != "local" { + t.Skip("This test only runs on local") + } - err := Sync(context.Background(), r.Fremote, r.Flocal, false) - expectedErr := fserrors.FsError(accounting.ErrorMaxTransferLimitReachedFatal) - fserrors.Count(expectedErr) - assert.Equal(t, expectedErr, err) + // Create file on source + file1 := r.WriteFile("file1", string(make([]byte, 5*1024)), t1) + file2 := r.WriteFile("file2", string(make([]byte, 2*1024)), t1) + file3 := r.WriteFile("file3", string(make([]byte, 3*1024)), t1) + fstest.CheckItems(t, r.Flocal, file1, file2, file3) + fstest.CheckItems(t, r.Fremote) + + accounting.GlobalStats().ResetCounters() + + err := Sync(context.Background(), r.Fremote, r.Flocal, false) + expectedErr := fserrors.FsError(accounting.ErrorMaxTransferLimitReachedFatal) + if cutoff != fs.CutoffModeHard { + expectedErr = accounting.ErrorMaxTransferLimitReachedGraceful + } + fserrors.Count(expectedErr) + assert.Equal(t, expectedErr, err) + } + + t.Run("Hard", func(t *testing.T) { test(t, fs.CutoffModeHard) }) + t.Run("Soft", func(t *testing.T) { test(t, fs.CutoffModeSoft) }) + t.Run("Cautious", func(t *testing.T) { test(t, fs.CutoffModeCautious) }) }