diff --git a/fs/operations/operations.go b/fs/operations/operations.go index a8e544078..d0a2a2f77 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -289,7 +289,7 @@ func Copy(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Objec // If can't server side copy, do it manually if err == fs.ErrorCantCopy { var in0 io.ReadCloser - in0, err = newReOpen(src, hashOption, fs.Config.LowLevelRetries) + in0, err = newReOpen(src, hashOption, nil, fs.Config.LowLevelRetries) if err != nil { err = errors.Wrap(err, "failed to open source object") } else { diff --git a/fs/operations/reopen.go b/fs/operations/reopen.go index 8ad6be55b..f51e3d3d6 100644 --- a/fs/operations/reopen.go +++ b/fs/operations/reopen.go @@ -10,15 +10,16 @@ import ( // reOpen is a wrapper for an object reader which reopens the stream on error type reOpen struct { - mu sync.Mutex // mutex to protect the below - src fs.Object // object to open - hashOption *fs.HashesOption // option to pass to initial open - rc io.ReadCloser // underlying stream - read int64 // number of bytes read from this stream - maxTries int // maximum number of retries - tries int // number of retries we've had so far in this stream - err error // if this is set then Read/Close calls will return it - opened bool // if set then rc is valid and needs closing + mu sync.Mutex // mutex to protect the below + src fs.Object // object to open + hashOption *fs.HashesOption // option to pass to initial open + rangeOption *fs.RangeOption // option to pass to initial open + rc io.ReadCloser // underlying stream + read int64 // number of bytes read from this stream + maxTries int // maximum number of retries + tries int // number of retries we've had so far in this stream + err error // if this is set then Read/Close calls will return it + opened bool // if set then rc is valid and needs closing } var ( @@ -27,11 +28,17 @@ var ( ) // newReOpen makes a handle which will reopen itself and seek to where it was on errors -func newReOpen(src fs.Object, hashOption *fs.HashesOption, maxTries int) (rc io.ReadCloser, err error) { +// +// If hashOption is set this will be applied when reading from the start +// +// If rangeOption is set then this will applied when reading from the +// start, and updated on retries. +func newReOpen(src fs.Object, hashOption *fs.HashesOption, rangeOption *fs.RangeOption, maxTries int) (rc io.ReadCloser, err error) { h := &reOpen{ - src: src, - hashOption: hashOption, - maxTries: maxTries, + src: src, + hashOption: hashOption, + rangeOption: rangeOption, + maxTries: maxTries, } h.mu.Lock() defer h.mu.Unlock() @@ -46,15 +53,24 @@ func newReOpen(src fs.Object, hashOption *fs.HashesOption, maxTries int) (rc io. // // we don't retry here as the Open() call will itself have low level retries func (h *reOpen) open() error { - var opts = make([]fs.OpenOption, 1) - if h.tries > 0 { - } + var optsArray [2]fs.OpenOption + var opts = optsArray[:0] if h.read == 0 { - // put hashOption on if reading from the start, ditch otherwise - opts[0] = h.hashOption + if h.rangeOption != nil { + opts = append(opts, h.rangeOption) + } + if h.hashOption != nil { + // put hashOption on if reading from the start, ditch otherwise + opts = append(opts, h.hashOption) + } } else { - // seek to the read point - opts[0] = &fs.SeekOption{Offset: h.read} + if h.rangeOption != nil { + // range to the read point + opts = append(opts, &fs.RangeOption{Start: h.rangeOption.Start + h.read, End: h.rangeOption.End}) + } else { + // seek to the read point + opts = append(opts, &fs.SeekOption{Offset: h.read}) + } } h.tries++ if h.tries > h.maxTries { diff --git a/fs/operations/reopen_test.go b/fs/operations/reopen_test.go index c878788bc..fd4c3af16 100644 --- a/fs/operations/reopen_test.go +++ b/fs/operations/reopen_test.go @@ -60,85 +60,99 @@ func (er errorReader) Read(p []byte) (n int, err error) { return 0, er.err } -// Contents for the mock object -var reOpenTestcontents = []byte("0123456789") +func TestReOpen(t *testing.T) { + for testIndex, testName := range []string{"Seek", "Range"} { + t.Run(testName, func(t *testing.T) { + // Contents for the mock object + var ( + reOpenTestcontents = []byte("0123456789") + expectedRead = reOpenTestcontents + rangeOption *fs.RangeOption + ) + if testIndex > 0 { + rangeOption = &fs.RangeOption{Start: 1, End: 7} + expectedRead = reOpenTestcontents[1:8] + } -// Start the test with the given breaks -func testReOpen(breaks []int64, maxRetries int) (io.ReadCloser, error) { - srcOrig := mockobject.New("potato").WithContent(reOpenTestcontents, mockobject.SeekModeRegular) - src := &reOpenTestObject{ - Object: srcOrig, - breaks: breaks, + // Start the test with the given breaks + testReOpen := func(breaks []int64, maxRetries int) (io.ReadCloser, error) { + srcOrig := mockobject.New("potato").WithContent(reOpenTestcontents, mockobject.SeekModeNone) + src := &reOpenTestObject{ + Object: srcOrig, + breaks: breaks, + } + hashOption := &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)} + return newReOpen(src, hashOption, rangeOption, maxRetries) + } + + t.Run("Basics", func(t *testing.T) { + // open + h, err := testReOpen(nil, 10) + assert.NoError(t, err) + + // Check contents read correctly + got, err := ioutil.ReadAll(h) + assert.NoError(t, err) + assert.Equal(t, expectedRead, got) + + // Check read after end + var buf = make([]byte, 1) + n, err := h.Read(buf) + assert.Equal(t, 0, n) + assert.Equal(t, io.EOF, err) + + // Check close + assert.NoError(t, h.Close()) + + // Check double close + assert.Equal(t, errorFileClosed, h.Close()) + + // Check read after close + n, err = h.Read(buf) + assert.Equal(t, 0, n) + assert.Equal(t, errorFileClosed, err) + }) + + t.Run("ErrorAtStart", func(t *testing.T) { + // open with immediate breaking + h, err := testReOpen([]int64{0}, 10) + assert.Equal(t, errorTestError, err) + assert.Nil(t, h) + }) + + t.Run("WithErrors", func(t *testing.T) { + // open with a few break points but less than the max + h, err := testReOpen([]int64{2, 1, 3}, 10) + assert.NoError(t, err) + + // check contents + got, err := ioutil.ReadAll(h) + assert.NoError(t, err) + assert.Equal(t, expectedRead, got) + + // check close + assert.NoError(t, h.Close()) + }) + + t.Run("TooManyErrors", func(t *testing.T) { + // open with a few break points but >= the max + h, err := testReOpen([]int64{2, 1, 3}, 3) + assert.NoError(t, err) + + // check contents + got, err := ioutil.ReadAll(h) + assert.Equal(t, errorTestError, err) + assert.Equal(t, expectedRead[:6], got) + + // check old error is returned + var buf = make([]byte, 1) + n, err := h.Read(buf) + assert.Equal(t, 0, n) + assert.Equal(t, errorTooManyTries, err) + + // Check close + assert.Equal(t, errorFileClosed, h.Close()) + }) + }) } - hashOption := &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)} - return newReOpen(src, hashOption, maxRetries) -} - -func TestReOpenBasics(t *testing.T) { - // open - h, err := testReOpen(nil, 10) - assert.NoError(t, err) - - // Check contents read correctly - got, err := ioutil.ReadAll(h) - assert.NoError(t, err) - assert.Equal(t, reOpenTestcontents, got) - - // Check read after end - var buf = make([]byte, 1) - n, err := h.Read(buf) - assert.Equal(t, 0, n) - assert.Equal(t, io.EOF, err) - - // Check close - assert.NoError(t, h.Close()) - - // Check double close - assert.Equal(t, errorFileClosed, h.Close()) - - // Check read after close - n, err = h.Read(buf) - assert.Equal(t, 0, n) - assert.Equal(t, errorFileClosed, err) -} - -func TestReOpenErrorAtStart(t *testing.T) { - // open with immediate breaking - h, err := testReOpen([]int64{0}, 10) - assert.Equal(t, errorTestError, err) - assert.Nil(t, h) -} - -func TestReOpenError(t *testing.T) { - // open with a few break points but less than the max - h, err := testReOpen([]int64{2, 1, 3}, 10) - assert.NoError(t, err) - - // check contents - got, err := ioutil.ReadAll(h) - assert.NoError(t, err) - assert.Equal(t, reOpenTestcontents, got) - - // check close - assert.NoError(t, h.Close()) -} - -func TestReOpenFail(t *testing.T) { - // open with a few break points but >= the max - h, err := testReOpen([]int64{2, 1, 3}, 3) - assert.NoError(t, err) - - // check contents - got, err := ioutil.ReadAll(h) - assert.Equal(t, errorTestError, err) - assert.Equal(t, reOpenTestcontents[:6], got) - - // check old error is returned - var buf = make([]byte, 1) - n, err := h.Read(buf) - assert.Equal(t, 0, n) - assert.Equal(t, errorTooManyTries, err) - - // Check close - assert.Equal(t, errorFileClosed, h.Close()) }