From 421585dd72344d735228c470eafbb0b41c910450 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 4 Jun 2020 15:09:03 +0100 Subject: [PATCH] accounting: add context to Account and propagate changes #3257 This is preparation for getting the Accounting to check the context, buf first we need to get it in place. Since this is one of those changes that makes lots of noise, this is in a seperate commit. --- cmd/serve/httplib/serve/serve.go | 2 +- fs/accounting/accounting.go | 7 +++++-- fs/accounting/accounting_test.go | 25 +++++++++++++------------ fs/accounting/transfer.go | 7 ++++--- fs/operations/check.go | 4 ++-- fs/operations/multithread.go | 2 +- fs/operations/operations.go | 10 +++++----- vfs/read.go | 4 ++-- vfs/vfscache/downloaders/downloaders.go | 2 +- 9 files changed, 34 insertions(+), 29 deletions(-) diff --git a/cmd/serve/httplib/serve/serve.go b/cmd/serve/httplib/serve/serve.go index 89c4f696d..fd0cdb5b8 100644 --- a/cmd/serve/httplib/serve/serve.go +++ b/cmd/serve/httplib/serve/serve.go @@ -79,7 +79,7 @@ func Object(w http.ResponseWriter, r *http.Request, o fs.Object) { defer func() { tr.Done(err) }() - in := tr.Account(file) // account the transfer (no buffering) + in := tr.Account(r.Context(), file) // account the transfer (no buffering) w.WriteHeader(code) diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index 71aff851a..2686f2ebf 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -36,6 +36,7 @@ type Account struct { // shouldn't. mu sync.Mutex // mutex protects these values in io.Reader + ctx context.Context // current context for transfer - may change origIn io.ReadCloser close io.Closer size int64 @@ -64,10 +65,11 @@ const averagePeriod = 16 // period to do exponentially weighted averages over // newAccountSizeName makes an Account reader for an io.ReadCloser of // the given size and name -func newAccountSizeName(stats *StatsInfo, in io.ReadCloser, size int64, name string) *Account { +func newAccountSizeName(ctx context.Context, stats *StatsInfo, in io.ReadCloser, size int64, name string) *Account { acc := &Account{ stats: stats, in: in, + ctx: ctx, close: in, origIn: in, size: size, @@ -160,7 +162,7 @@ func (acc *Account) Abandon() { // UpdateReader updates the underlying io.ReadCloser stopping the // async buffer (if any) and re-adding it -func (acc *Account) UpdateReader(in io.ReadCloser) { +func (acc *Account) UpdateReader(ctx context.Context, in io.ReadCloser) { acc.mu.Lock() withBuf := acc.withBuf if withBuf { @@ -168,6 +170,7 @@ func (acc *Account) UpdateReader(in io.ReadCloser) { acc.withBuf = false } acc.in = in + acc.ctx = ctx acc.close = in acc.origIn = in acc.closed = false diff --git a/fs/accounting/accounting_test.go b/fs/accounting/accounting_test.go index bfee18a26..8161522f2 100644 --- a/fs/accounting/accounting_test.go +++ b/fs/accounting/accounting_test.go @@ -2,6 +2,7 @@ package accounting import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -29,7 +30,7 @@ var ( func TestNewAccountSizeName(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) stats := NewStats() - acc := newAccountSizeName(stats, in, 1, "test") + acc := newAccountSizeName(context.Background(), stats, in, 1, "test") assert.Equal(t, in, acc.in) assert.Equal(t, acc, stats.inProgress.get("test")) err := acc.Close() @@ -44,7 +45,7 @@ func TestAccountWithBuffer(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) stats := NewStats() - acc := newAccountSizeName(stats, in, -1, "test") + acc := newAccountSizeName(context.Background(), stats, in, -1, "test") assert.False(t, acc.HasBuffer()) acc.WithBuffer() assert.True(t, acc.HasBuffer()) @@ -53,7 +54,7 @@ func TestAccountWithBuffer(t *testing.T) { require.True(t, ok) assert.NoError(t, acc.Close()) - acc = newAccountSizeName(stats, in, 1, "test") + acc = newAccountSizeName(context.Background(), stats, in, 1, "test") acc.WithBuffer() // should not have a buffer for a small size _, ok = acc.in.(*asyncreader.AsyncReader) @@ -66,7 +67,7 @@ func TestAccountGetUpdateReader(t *testing.T) { return func(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) stats := NewStats() - acc := newAccountSizeName(stats, in, 1, "test") + acc := newAccountSizeName(context.Background(), stats, in, 1, "test") assert.Equal(t, in, acc.GetReader()) assert.Equal(t, acc, stats.inProgress.get("test")) @@ -77,7 +78,7 @@ func TestAccountGetUpdateReader(t *testing.T) { } in2 := ioutil.NopCloser(bytes.NewBuffer([]byte{1})) - acc.UpdateReader(in2) + acc.UpdateReader(context.Background(), in2) assert.Equal(t, in2, acc.GetReader()) assert.Equal(t, acc, stats.inProgress.get("test")) @@ -92,7 +93,7 @@ func TestAccountGetUpdateReader(t *testing.T) { func TestAccountRead(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) stats := NewStats() - acc := newAccountSizeName(stats, in, 1, "test") + acc := newAccountSizeName(context.Background(), stats, in, 1, "test") assert.True(t, acc.values.start.IsZero()) acc.values.mu.Lock() @@ -133,7 +134,7 @@ func testAccountWriteTo(t *testing.T, withBuffer bool) { } in := ioutil.NopCloser(bytes.NewBuffer(buf)) stats := NewStats() - acc := newAccountSizeName(stats, in, int64(len(buf)), "test") + acc := newAccountSizeName(context.Background(), stats, in, int64(len(buf)), "test") if withBuffer { acc = acc.WithBuffer() } @@ -173,7 +174,7 @@ func TestAccountWriteToWithBuffer(t *testing.T) { func TestAccountString(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) stats := NewStats() - acc := newAccountSizeName(stats, in, 3, "test") + acc := newAccountSizeName(context.Background(), stats, in, 3, "test") // FIXME not an exhaustive test! @@ -193,7 +194,7 @@ func TestAccountString(t *testing.T) { func TestAccountAccounter(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})) stats := NewStats() - acc := newAccountSizeName(stats, in, 3, "test") + acc := newAccountSizeName(context.Background(), stats, in, 3, "test") assert.True(t, in == acc.OldStream()) @@ -260,7 +261,7 @@ func TestAccountMaxTransfer(t *testing.T) { in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100))) stats := NewStats() - acc := newAccountSizeName(stats, in, 1, "test") + acc := newAccountSizeName(context.Background(), stats, in, 1, "test") var b = make([]byte, 10) @@ -277,7 +278,7 @@ func TestAccountMaxTransfer(t *testing.T) { fs.Config.CutoffMode = fs.CutoffModeSoft stats = NewStats() - acc = newAccountSizeName(stats, in, 1, "test") + acc = newAccountSizeName(context.Background(), stats, in, 1, "test") n, err = acc.Read(b) assert.Equal(t, 10, n) @@ -302,7 +303,7 @@ func TestAccountMaxTransferWriteTo(t *testing.T) { in := ioutil.NopCloser(readers.NewPatternReader(1024)) stats := NewStats() - acc := newAccountSizeName(stats, in, 1, "test") + acc := newAccountSizeName(context.Background(), stats, in, 1, "test") var b bytes.Buffer diff --git a/fs/accounting/transfer.go b/fs/accounting/transfer.go index 8234d2730..8365a4a0a 100644 --- a/fs/accounting/transfer.go +++ b/fs/accounting/transfer.go @@ -1,6 +1,7 @@ package accounting import ( + "context" "encoding/json" "io" "sync" @@ -135,12 +136,12 @@ func (tr *Transfer) Reset() { } // Account returns reader that knows how to keep track of transfer progress. -func (tr *Transfer) Account(in io.ReadCloser) *Account { +func (tr *Transfer) Account(ctx context.Context, in io.ReadCloser) *Account { tr.mu.Lock() if tr.acc == nil { - tr.acc = newAccountSizeName(tr.stats, in, tr.size, tr.remote) + tr.acc = newAccountSizeName(ctx, tr.stats, in, tr.size, tr.remote) } else { - tr.acc.UpdateReader(in) + tr.acc.UpdateReader(ctx, in) } tr.mu.Unlock() return tr.acc diff --git a/fs/operations/check.go b/fs/operations/check.go index eded6f825..dc2f99c07 100644 --- a/fs/operations/check.go +++ b/fs/operations/check.go @@ -318,7 +318,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo defer func() { tr1.Done(nil) // error handling is done by the caller }() - in1 = tr1.Account(in1).WithBuffer() // account and buffer the transfer + in1 = tr1.Account(ctx, in1).WithBuffer() // account and buffer the transfer in2, err := src.Open(ctx) if err != nil { @@ -328,7 +328,7 @@ func checkIdenticalDownload(ctx context.Context, dst, src fs.Object) (differ boo defer func() { tr2.Done(nil) // error handling is done by the caller }() - in2 = tr2.Account(in2).WithBuffer() // account and buffer the transfer + in2 = tr2.Account(ctx, in2).WithBuffer() // account and buffer the transfer // To assign err variable before defer. differ, err = CheckEqualReaders(in1, in2) diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index c5bed49bf..85b91bd74 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -158,7 +158,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, mc.calculateChunks() // Make accounting - mc.acc = tr.Account(nil) + mc.acc = tr.Account(ctx, nil) // create write file handle mc.wc, err = openWriterAt(gCtx, remote, mc.size) diff --git a/fs/operations/operations.go b/fs/operations/operations.go index e785198de..2d3a65108 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -366,7 +366,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj return nil, accounting.ErrorMaxTransferLimitReachedFatal } if doCopy := f.Features().Copy; doCopy != nil && (SameConfig(src.Fs(), f) || (SameRemoteType(src.Fs(), f) && f.Features().ServerSideAcrossConfigs)) { - in := tr.Account(nil) // account the transfer + in := tr.Account(ctx, nil) // account the transfer in.ServerSideCopyStart() newDst, err = doCopy(ctx, src, remote) if err == nil { @@ -421,7 +421,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj dst, err = Rcat(ctx, f, remote, in0, src.ModTime(ctx)) newDst = dst } else { - in := tr.Account(in0).WithBuffer() // account and buffer the transfer + in := tr.Account(ctx, in0).WithBuffer() // account and buffer the transfer var wrappedSrc fs.ObjectInfo = src // We try to pass the original object if possible if src.Remote() != remote { @@ -1054,7 +1054,7 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64) error { if count >= 0 { in = &readCloser{Reader: &io.LimitedReader{R: in, N: count}, Closer: in} } - in = tr.Account(in).WithBuffer() // account and buffer the transfer + in = tr.Account(ctx, in).WithBuffer() // account and buffer the transfer // take the lock just before we output stuff, so at the last possible moment mu.Lock() defer mu.Unlock() @@ -1072,7 +1072,7 @@ func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, defer func() { tr.Done(err) }() - in = tr.Account(in).WithBuffer() + in = tr.Account(ctx, in).WithBuffer() readCounter := readers.NewCountingReader(in) var trackingIn io.Reader @@ -1420,7 +1420,7 @@ func RcatSize(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClo tr.Done(err) }() body := ioutil.NopCloser(in) // we let the server close the body - in := tr.Account(body) // account the transfer (no buffering) + in := tr.Account(ctx, body) // account the transfer (no buffering) if SkipDestructive(ctx, dstFileName, "upload from pipe") { // prevents "broken pipe" errors diff --git a/vfs/read.go b/vfs/read.go index 532728b8a..dcbae07d7 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -79,7 +79,7 @@ func (fh *ReadFileHandle) openPending() (err error) { } tr := accounting.GlobalStats().NewTransfer(o) fh.done = tr.Done - fh.r = tr.Account(r).WithBuffer() // account the transfer + fh.r = tr.Account(context.TODO(), r).WithBuffer() // account the transfer fh.opened = true return nil @@ -158,7 +158,7 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) { return err } } - fh.r.UpdateReader(r) + fh.r.UpdateReader(context.TODO(), r) fh.offset = offset return nil } diff --git a/vfs/vfscache/downloaders/downloaders.go b/vfs/vfscache/downloaders/downloaders.go index e5d3d3ce5..d8dfba5af 100644 --- a/vfs/vfscache/downloaders/downloaders.go +++ b/vfs/vfscache/downloaders/downloaders.go @@ -495,7 +495,7 @@ func (dl *downloader) open(offset int64) (err error) { if err != nil { return errors.Wrap(err, "vfs reader: failed to open source file") } - dl.in = dl.tr.Account(in0).WithBuffer() // account and buffer the transfer + dl.in = dl.tr.Account(dl.dls.ctx, in0).WithBuffer() // account and buffer the transfer dl.offset = offset