mirror of https://github.com/restic/restic.git
Merge pull request #4803 from restic/permanent-retry-failure
Do not retry permanent backend failures
This commit is contained in:
commit
dcd151147c
|
@ -1,8 +0,0 @@
|
|||
Change: Don't retry to load files that don't exist
|
||||
|
||||
Restic used to always retry to load files. It now only retries to load
|
||||
files if they exist.
|
||||
|
||||
https://github.com/restic/restic/issues/4515
|
||||
https://github.com/restic/restic/issues/1523
|
||||
https://github.com/restic/restic/pull/4520
|
|
@ -1,4 +1,4 @@
|
|||
Enhancement: Improve reliability of backend operations
|
||||
Change: Redesign backend error handling to improve reliability
|
||||
|
||||
Restic now downloads pack files in large chunks instead of using a streaming
|
||||
download. This prevents failures due to interrupted streams. The `restore`
|
||||
|
@ -6,12 +6,20 @@ command now also retries downloading individual blobs that cannot be retrieved.
|
|||
|
||||
HTTP requests that are stuck for more than two minutes while uploading or
|
||||
downloading are now forcibly interrupted. This ensures that stuck requests are
|
||||
retried after a short timeout. These new request timeouts can temporarily be
|
||||
disabled by setting the environment variable
|
||||
`RESTIC_FEATURES=http-timeouts=false`. Note that this feature flag will be
|
||||
removed in the next minor restic version.
|
||||
retried after a short timeout.
|
||||
|
||||
Attempts to access a missing file or a truncated file will no longer be retried.
|
||||
This avoids unnecessary retries in those cases.
|
||||
|
||||
Most parts of the new backend error handling can temporarily be disabled by
|
||||
setting the environment variable
|
||||
`RESTIC_FEATURES=backend-error-redesign=false`. Note that this feature flag will
|
||||
be removed in the next minor restic version.
|
||||
|
||||
https://github.com/restic/restic/issues/4627
|
||||
https://github.com/restic/restic/issues/4193
|
||||
https://github.com/restic/restic/pull/4605
|
||||
https://github.com/restic/restic/pull/4792
|
||||
https://github.com/restic/restic/issues/4515
|
||||
https://github.com/restic/restic/issues/1523
|
||||
https://github.com/restic/restic/pull/4520
|
||||
|
|
|
@ -167,6 +167,20 @@ func (be *Backend) IsNotExist(err error) bool {
|
|||
return bloberror.HasCode(err, bloberror.BlobNotFound)
|
||||
}
|
||||
|
||||
func (be *Backend) IsPermanentError(err error) bool {
|
||||
if be.IsNotExist(err) {
|
||||
return true
|
||||
}
|
||||
|
||||
var aerr *azcore.ResponseError
|
||||
if errors.As(err, &aerr) {
|
||||
if aerr.StatusCode == http.StatusRequestedRangeNotSatisfiable || aerr.StatusCode == http.StatusUnauthorized || aerr.StatusCode == http.StatusForbidden {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Join combines path components with slashes.
|
||||
func (be *Backend) Join(p ...string) string {
|
||||
return path.Join(p...)
|
||||
|
@ -313,6 +327,11 @@ func (be *Backend) openReader(ctx context.Context, h backend.Handle, length int,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if length > 0 && (resp.ContentLength == nil || *resp.ContentLength != int64(length)) {
|
||||
_ = resp.Body.Close()
|
||||
return nil, &azcore.ResponseError{ErrorCode: "restic-file-too-short", StatusCode: http.StatusRequestedRangeNotSatisfiable}
|
||||
}
|
||||
|
||||
return resp.Body, err
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package b2
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"net/http"
|
||||
|
@ -31,6 +32,8 @@ type b2Backend struct {
|
|||
canDelete bool
|
||||
}
|
||||
|
||||
var errTooShort = fmt.Errorf("file is too short")
|
||||
|
||||
// Billing happens in 1000 item granularity, but we are more interested in reducing the number of network round trips
|
||||
const defaultListMaxItems = 10 * 1000
|
||||
|
||||
|
@ -186,13 +189,36 @@ func (be *b2Backend) IsNotExist(err error) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (be *b2Backend) IsPermanentError(err error) bool {
|
||||
// the library unfortunately endlessly retries authentication errors
|
||||
return be.IsNotExist(err) || errors.Is(err, errTooShort)
|
||||
}
|
||||
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (be *b2Backend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
return util.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
|
||||
return util.DefaultLoad(ctx, h, length, offset, be.openReader, func(rd io.Reader) error {
|
||||
if length == 0 {
|
||||
return fn(rd)
|
||||
}
|
||||
|
||||
// there is no direct way to efficiently check whether the file is too short
|
||||
// use a LimitedReader to track the number of bytes read
|
||||
limrd := &io.LimitedReader{R: rd, N: int64(length)}
|
||||
err := fn(limrd)
|
||||
|
||||
// check the underlying reader to be agnostic to however fn() handles the returned error
|
||||
_, rderr := rd.Read([]byte{0})
|
||||
if rderr == io.EOF && limrd.N != 0 {
|
||||
// file is too short
|
||||
return fmt.Errorf("%w: %v", errTooShort, err)
|
||||
}
|
||||
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (be *b2Backend) openReader(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
|
|
|
@ -38,7 +38,9 @@ type Backend interface {
|
|||
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is larger than zero, only a portion of the file
|
||||
// is read.
|
||||
// is read. If the length is larger than zero and the file is too short to return
|
||||
// the requested length bytes, then an error MUST be returned that is recognized
|
||||
// by IsPermanentError().
|
||||
//
|
||||
// The function fn may be called multiple times during the same Load invocation
|
||||
// and therefore must be idempotent.
|
||||
|
@ -66,6 +68,12 @@ type Backend interface {
|
|||
// for unwrapping it.
|
||||
IsNotExist(err error) bool
|
||||
|
||||
// IsPermanentError returns true if the error can very likely not be resolved
|
||||
// by retrying the operation. Backends should return true if the file is missing,
|
||||
// the requested range does not (completely) exist in the file or the user is
|
||||
// not authorized to perform the requested operation.
|
||||
IsPermanentError(err error) bool
|
||||
|
||||
// Delete removes all data in the backend.
|
||||
Delete(ctx context.Context) error
|
||||
}
|
||||
|
|
|
@ -72,6 +72,10 @@ func (be *Backend) IsNotExist(err error) bool {
|
|||
return be.b.IsNotExist(err)
|
||||
}
|
||||
|
||||
func (be *Backend) IsPermanentError(err error) bool {
|
||||
return be.b.IsPermanentError(err)
|
||||
}
|
||||
|
||||
func (be *Backend) List(ctx context.Context, t backend.FileType, fn func(backend.FileInfo) error) error {
|
||||
return be.b.List(ctx, t, fn)
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ func TestDry(t *testing.T) {
|
|||
}
|
||||
case "load":
|
||||
data := ""
|
||||
err = step.be.Load(ctx, handle, 100, 0, func(rd io.Reader) error {
|
||||
err = step.be.Load(ctx, handle, 0, 0, func(rd io.Reader) error {
|
||||
buf, err := io.ReadAll(rd)
|
||||
data = string(buf)
|
||||
return err
|
||||
|
|
|
@ -173,6 +173,21 @@ func (be *Backend) IsNotExist(err error) bool {
|
|||
return errors.Is(err, storage.ErrObjectNotExist)
|
||||
}
|
||||
|
||||
func (be *Backend) IsPermanentError(err error) bool {
|
||||
if be.IsNotExist(err) {
|
||||
return true
|
||||
}
|
||||
|
||||
var gerr *googleapi.Error
|
||||
if errors.As(err, &gerr) {
|
||||
if gerr.Code == http.StatusRequestedRangeNotSatisfiable || gerr.Code == http.StatusUnauthorized || gerr.Code == http.StatusForbidden {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Join combines path components with slashes.
|
||||
func (be *Backend) Join(p ...string) string {
|
||||
return path.Join(p...)
|
||||
|
@ -273,6 +288,11 @@ func (be *Backend) openReader(ctx context.Context, h backend.Handle, length int,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if length > 0 && r.Attrs.Size < offset+int64(length) {
|
||||
_ = r.Close()
|
||||
return nil, &googleapi.Error{Code: http.StatusRequestedRangeNotSatisfiable, Message: "restic-file-too-short"}
|
||||
}
|
||||
|
||||
return r, err
|
||||
}
|
||||
|
||||
|
|
|
@ -89,7 +89,7 @@ func Transport(opts TransportOptions) (http.RoundTripper, error) {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if feature.Flag.Enabled(feature.HTTPTimeouts) {
|
||||
if feature.Flag.Enabled(feature.BackendErrorRedesign) {
|
||||
h2.WriteByteTimeout = 120 * time.Second
|
||||
h2.ReadIdleTimeout = 60 * time.Second
|
||||
h2.PingTimeout = 60 * time.Second
|
||||
|
@ -132,7 +132,7 @@ func Transport(opts TransportOptions) (http.RoundTripper, error) {
|
|||
}
|
||||
|
||||
rt := http.RoundTripper(tr)
|
||||
if feature.Flag.Enabled(feature.HTTPTimeouts) {
|
||||
if feature.Flag.Enabled(feature.BackendErrorRedesign) {
|
||||
rt = newWatchdogRoundtripper(rt, 120*time.Second, 128*1024)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package local
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"os"
|
||||
|
@ -30,6 +31,8 @@ type Local struct {
|
|||
// ensure statically that *Local implements backend.Backend.
|
||||
var _ backend.Backend = &Local{}
|
||||
|
||||
var errTooShort = fmt.Errorf("file is too short")
|
||||
|
||||
func NewFactory() location.Factory {
|
||||
return location.NewLimitedBackendFactory("local", ParseConfig, location.NoPassword, limiter.WrapBackendConstructor(Create), limiter.WrapBackendConstructor(Open))
|
||||
}
|
||||
|
@ -110,6 +113,10 @@ func (b *Local) IsNotExist(err error) bool {
|
|||
return errors.Is(err, os.ErrNotExist)
|
||||
}
|
||||
|
||||
func (b *Local) IsPermanentError(err error) bool {
|
||||
return b.IsNotExist(err) || errors.Is(err, errTooShort) || errors.Is(err, os.ErrPermission)
|
||||
}
|
||||
|
||||
// Save stores data in the backend at the handle.
|
||||
func (b *Local) Save(_ context.Context, h backend.Handle, rd backend.RewindReader) (err error) {
|
||||
finalname := b.Filename(h)
|
||||
|
@ -219,6 +226,18 @@ func (b *Local) openReader(_ context.Context, h backend.Handle, length int, offs
|
|||
return nil, err
|
||||
}
|
||||
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
_ = f.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
size := fi.Size()
|
||||
if size < offset+int64(length) {
|
||||
_ = f.Close()
|
||||
return nil, errTooShort
|
||||
}
|
||||
|
||||
if offset > 0 {
|
||||
_, err = f.Seek(offset, 0)
|
||||
if err != nil {
|
||||
|
|
|
@ -43,6 +43,7 @@ func NewFactory() location.Factory {
|
|||
}
|
||||
|
||||
var errNotFound = fmt.Errorf("not found")
|
||||
var errTooSmall = errors.New("access beyond end of file")
|
||||
|
||||
const connectionCount = 2
|
||||
|
||||
|
@ -69,6 +70,10 @@ func (be *MemoryBackend) IsNotExist(err error) bool {
|
|||
return errors.Is(err, errNotFound)
|
||||
}
|
||||
|
||||
func (be *MemoryBackend) IsPermanentError(err error) bool {
|
||||
return be.IsNotExist(err) || errors.Is(err, errTooSmall)
|
||||
}
|
||||
|
||||
// Save adds new Data to the backend.
|
||||
func (be *MemoryBackend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error {
|
||||
be.m.Lock()
|
||||
|
@ -131,12 +136,12 @@ func (be *MemoryBackend) openReader(ctx context.Context, h backend.Handle, lengt
|
|||
}
|
||||
|
||||
buf := be.data[h]
|
||||
if offset > int64(len(buf)) {
|
||||
return nil, errors.New("offset beyond end of file")
|
||||
if offset+int64(length) > int64(len(buf)) {
|
||||
return nil, errTooSmall
|
||||
}
|
||||
|
||||
buf = buf[offset:]
|
||||
if length > 0 && len(buf) > length {
|
||||
if length > 0 {
|
||||
buf = buf[:length]
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
type Backend struct {
|
||||
CloseFn func() error
|
||||
IsNotExistFn func(err error) bool
|
||||
IsPermanentErrorFn func(err error) bool
|
||||
SaveFn func(ctx context.Context, h backend.Handle, rd backend.RewindReader) error
|
||||
OpenReaderFn func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error)
|
||||
StatFn func(ctx context.Context, h backend.Handle) (backend.FileInfo, error)
|
||||
|
@ -83,6 +84,14 @@ func (m *Backend) IsNotExist(err error) bool {
|
|||
return m.IsNotExistFn(err)
|
||||
}
|
||||
|
||||
func (m *Backend) IsPermanentError(err error) bool {
|
||||
if m.IsPermanentErrorFn == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return m.IsPermanentErrorFn(err)
|
||||
}
|
||||
|
||||
// Save data in the backend.
|
||||
func (m *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error {
|
||||
if m.SaveFn == nil {
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/restic/restic/internal/backend/util"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/feature"
|
||||
)
|
||||
|
||||
// make sure the rest backend implements backend.Backend
|
||||
|
@ -30,6 +31,20 @@ type Backend struct {
|
|||
layout.Layout
|
||||
}
|
||||
|
||||
// restError is returned whenever the server returns a non-successful HTTP status.
|
||||
type restError struct {
|
||||
backend.Handle
|
||||
StatusCode int
|
||||
Status string
|
||||
}
|
||||
|
||||
func (e *restError) Error() string {
|
||||
if e.StatusCode == http.StatusNotFound && e.Handle.Type.String() != "invalid" {
|
||||
return fmt.Sprintf("%v does not exist", e.Handle)
|
||||
}
|
||||
return fmt.Sprintf("unexpected HTTP response (%v): %v", e.StatusCode, e.Status)
|
||||
}
|
||||
|
||||
func NewFactory() location.Factory {
|
||||
return location.NewHTTPBackendFactory("rest", ParseConfig, StripPassword, Create, Open)
|
||||
}
|
||||
|
@ -96,7 +111,7 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, er
|
|||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("server response unexpected: %v (%v)", resp.Status, resp.StatusCode)
|
||||
return nil, &restError{backend.Handle{}, resp.StatusCode, resp.Status}
|
||||
}
|
||||
|
||||
return be, nil
|
||||
|
@ -150,26 +165,31 @@ func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindR
|
|||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return errors.Errorf("server response unexpected: %v (%v)", resp.Status, resp.StatusCode)
|
||||
return &restError{h, resp.StatusCode, resp.Status}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// notExistError is returned whenever the requested file does not exist on the
|
||||
// server.
|
||||
type notExistError struct {
|
||||
backend.Handle
|
||||
}
|
||||
|
||||
func (e *notExistError) Error() string {
|
||||
return fmt.Sprintf("%v does not exist", e.Handle)
|
||||
}
|
||||
|
||||
// IsNotExist returns true if the error was caused by a non-existing file.
|
||||
func (b *Backend) IsNotExist(err error) bool {
|
||||
var e *notExistError
|
||||
return errors.As(err, &e)
|
||||
var e *restError
|
||||
return errors.As(err, &e) && e.StatusCode == http.StatusNotFound
|
||||
}
|
||||
|
||||
func (b *Backend) IsPermanentError(err error) bool {
|
||||
if b.IsNotExist(err) {
|
||||
return true
|
||||
}
|
||||
|
||||
var rerr *restError
|
||||
if errors.As(err, &rerr) {
|
||||
if rerr.StatusCode == http.StatusRequestedRangeNotSatisfiable || rerr.StatusCode == http.StatusUnauthorized || rerr.StatusCode == http.StatusForbidden {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
|
@ -221,14 +241,13 @@ func (b *Backend) openReader(ctx context.Context, h backend.Handle, length int,
|
|||
return nil, errors.Wrap(err, "client.Do")
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
_ = drainAndClose(resp)
|
||||
return nil, ¬ExistError{h}
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
|
||||
_ = drainAndClose(resp)
|
||||
return nil, errors.Errorf("unexpected HTTP response (%v): %v", resp.StatusCode, resp.Status)
|
||||
return nil, &restError{h, resp.StatusCode, resp.Status}
|
||||
}
|
||||
|
||||
if feature.Flag.Enabled(feature.BackendErrorRedesign) && length > 0 && resp.ContentLength != int64(length) {
|
||||
return nil, &restError{h, http.StatusRequestedRangeNotSatisfiable, "partial out of bounds read"}
|
||||
}
|
||||
|
||||
return resp.Body, nil
|
||||
|
@ -251,12 +270,8 @@ func (b *Backend) Stat(ctx context.Context, h backend.Handle) (backend.FileInfo,
|
|||
return backend.FileInfo{}, err
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return backend.FileInfo{}, ¬ExistError{h}
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return backend.FileInfo{}, errors.Errorf("unexpected HTTP response (%v): %v", resp.StatusCode, resp.Status)
|
||||
return backend.FileInfo{}, &restError{h, resp.StatusCode, resp.Status}
|
||||
}
|
||||
|
||||
if resp.ContentLength < 0 {
|
||||
|
@ -288,12 +303,8 @@ func (b *Backend) Remove(ctx context.Context, h backend.Handle) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return ¬ExistError{h}
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return errors.Errorf("blob not removed, server response: %v (%v)", resp.Status, resp.StatusCode)
|
||||
return &restError{h, resp.StatusCode, resp.Status}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -330,7 +341,7 @@ func (b *Backend) List(ctx context.Context, t backend.FileType, fn func(backend.
|
|||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
_ = drainAndClose(resp)
|
||||
return errors.Errorf("List failed, server response: %v (%v)", resp.Status, resp.StatusCode)
|
||||
return &restError{backend.Handle{Type: t}, resp.StatusCode, resp.Status}
|
||||
}
|
||||
|
||||
if resp.Header.Get("Content-Type") == ContentTypeV2 {
|
||||
|
|
|
@ -2,13 +2,16 @@ package retry
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/feature"
|
||||
)
|
||||
|
||||
// Backend retries operations on the backend in case of an error with a
|
||||
|
@ -18,6 +21,8 @@ type Backend struct {
|
|||
MaxTries int
|
||||
Report func(string, error, time.Duration)
|
||||
Success func(string, int)
|
||||
|
||||
failedLoads sync.Map
|
||||
}
|
||||
|
||||
// statically ensure that RetryBackend implements backend.Backend.
|
||||
|
@ -74,7 +79,16 @@ func (be *Backend) retry(ctx context.Context, msg string, f func() error) error
|
|||
bo.InitialInterval = 1 * time.Millisecond
|
||||
}
|
||||
|
||||
err := retryNotifyErrorWithSuccess(f,
|
||||
err := retryNotifyErrorWithSuccess(
|
||||
func() error {
|
||||
err := f()
|
||||
// don't retry permanent errors as those very likely cannot be fixed by retrying
|
||||
// TODO remove IsNotExist(err) special cases when removing the feature flag
|
||||
if feature.Flag.Enabled(feature.BackendErrorRedesign) && !errors.Is(err, &backoff.PermanentError{}) && be.Backend.IsPermanentError(err) {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
return err
|
||||
},
|
||||
backoff.WithContext(backoff.WithMaxRetries(bo, uint64(be.MaxTries)), ctx),
|
||||
func(err error, d time.Duration) {
|
||||
if be.Report != nil {
|
||||
|
@ -121,19 +135,39 @@ func (be *Backend) Save(ctx context.Context, h backend.Handle, rd backend.Rewind
|
|||
})
|
||||
}
|
||||
|
||||
// Failed loads expire after an hour
|
||||
var failedLoadExpiry = time.Hour
|
||||
|
||||
// Load returns a reader that yields the contents of the file at h at the
|
||||
// given offset. If length is larger than zero, only a portion of the file
|
||||
// is returned. rd must be closed after use. If an error is returned, the
|
||||
// ReadCloser must be nil.
|
||||
func (be *Backend) Load(ctx context.Context, h backend.Handle, length int, offset int64, consumer func(rd io.Reader) error) (err error) {
|
||||
return be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset),
|
||||
key := h
|
||||
key.IsMetadata = false
|
||||
|
||||
// Implement the circuit breaker pattern for files that exhausted all retries due to a non-permanent error
|
||||
if v, ok := be.failedLoads.Load(key); ok {
|
||||
if time.Since(v.(time.Time)) > failedLoadExpiry {
|
||||
be.failedLoads.Delete(key)
|
||||
} else {
|
||||
// fail immediately if the file was already problematic during the last hour
|
||||
return fmt.Errorf("circuit breaker open for file %v", h)
|
||||
}
|
||||
}
|
||||
|
||||
err = be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset),
|
||||
func() error {
|
||||
err := be.Backend.Load(ctx, h, length, offset, consumer)
|
||||
if be.Backend.IsNotExist(err) {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
return err
|
||||
return be.Backend.Load(ctx, h, length, offset, consumer)
|
||||
})
|
||||
|
||||
if feature.Flag.Enabled(feature.BackendErrorRedesign) && err != nil && !be.IsPermanentError(err) {
|
||||
// We've exhausted the retries, the file is likely inaccessible. By excluding permanent
|
||||
// errors, not found or truncated files are not recorded.
|
||||
be.failedLoads.LoadOrStore(key, time.Now())
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Stat returns information about the File identified by h.
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -289,7 +290,7 @@ func TestBackendLoadNotExists(t *testing.T) {
|
|||
}
|
||||
return nil, notFound
|
||||
}
|
||||
be.IsNotExistFn = func(err error) bool {
|
||||
be.IsPermanentErrorFn = func(err error) bool {
|
||||
return errors.Is(err, notFound)
|
||||
}
|
||||
|
||||
|
@ -299,10 +300,61 @@ func TestBackendLoadNotExists(t *testing.T) {
|
|||
err := retryBackend.Load(context.TODO(), backend.Handle{}, 0, 0, func(rd io.Reader) (err error) {
|
||||
return nil
|
||||
})
|
||||
test.Assert(t, be.IsNotExistFn(err), "unexpected error %v", err)
|
||||
test.Assert(t, be.IsPermanentErrorFn(err), "unexpected error %v", err)
|
||||
test.Equals(t, 1, attempt)
|
||||
}
|
||||
|
||||
func TestBackendLoadCircuitBreaker(t *testing.T) {
|
||||
// retry should not retry if the error matches IsPermanentError
|
||||
notFound := errors.New("not found")
|
||||
otherError := errors.New("something")
|
||||
attempt := 0
|
||||
|
||||
be := mock.NewBackend()
|
||||
be.IsPermanentErrorFn = func(err error) bool {
|
||||
return errors.Is(err, notFound)
|
||||
}
|
||||
be.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
attempt++
|
||||
return nil, otherError
|
||||
}
|
||||
nilRd := func(rd io.Reader) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
TestFastRetries(t)
|
||||
retryBackend := New(be, 2, nil, nil)
|
||||
// trip the circuit breaker for file "other"
|
||||
err := retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd)
|
||||
test.Equals(t, otherError, err, "unexpected error")
|
||||
test.Equals(t, 3, attempt)
|
||||
|
||||
attempt = 0
|
||||
err = retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd)
|
||||
test.Assert(t, strings.Contains(err.Error(), "circuit breaker open for file"), "expected circuit breaker error, got %v")
|
||||
test.Equals(t, 0, attempt)
|
||||
|
||||
// don't trip for permanent errors
|
||||
be.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
attempt++
|
||||
return nil, notFound
|
||||
}
|
||||
err = retryBackend.Load(context.TODO(), backend.Handle{Name: "notfound"}, 0, 0, nilRd)
|
||||
test.Equals(t, notFound, err, "expected circuit breaker to only affect other file, got %v")
|
||||
err = retryBackend.Load(context.TODO(), backend.Handle{Name: "notfound"}, 0, 0, nilRd)
|
||||
test.Equals(t, notFound, err, "persistent error must not trigger circuit breaker, got %v")
|
||||
|
||||
// wait for circuit breaker to expire
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
old := failedLoadExpiry
|
||||
defer func() {
|
||||
failedLoadExpiry = old
|
||||
}()
|
||||
failedLoadExpiry = 3 * time.Millisecond
|
||||
err = retryBackend.Load(context.TODO(), backend.Handle{Name: "other"}, 0, 0, nilRd)
|
||||
test.Equals(t, notFound, err, "expected circuit breaker to reset, got %v")
|
||||
}
|
||||
|
||||
func TestBackendStatNotExists(t *testing.T) {
|
||||
// stat should not retry if the error matches IsNotExist
|
||||
notFound := errors.New("not found")
|
||||
|
@ -329,6 +381,36 @@ func TestBackendStatNotExists(t *testing.T) {
|
|||
test.Equals(t, 1, attempt)
|
||||
}
|
||||
|
||||
func TestBackendRetryPermanent(t *testing.T) {
|
||||
// retry should not retry if the error matches IsPermanentError
|
||||
notFound := errors.New("not found")
|
||||
attempt := 0
|
||||
|
||||
be := mock.NewBackend()
|
||||
be.IsPermanentErrorFn = func(err error) bool {
|
||||
return errors.Is(err, notFound)
|
||||
}
|
||||
|
||||
TestFastRetries(t)
|
||||
retryBackend := New(be, 2, nil, nil)
|
||||
err := retryBackend.retry(context.TODO(), "test", func() error {
|
||||
attempt++
|
||||
return notFound
|
||||
})
|
||||
|
||||
test.Assert(t, be.IsPermanentErrorFn(err), "unexpected error %v", err)
|
||||
test.Equals(t, 1, attempt)
|
||||
|
||||
attempt = 0
|
||||
err = retryBackend.retry(context.TODO(), "test", func() error {
|
||||
attempt++
|
||||
return errors.New("something")
|
||||
})
|
||||
test.Assert(t, !be.IsPermanentErrorFn(err), "error unexpectedly considered permanent %v", err)
|
||||
test.Equals(t, 3, attempt)
|
||||
|
||||
}
|
||||
|
||||
func assertIsCanceled(t *testing.T, err error) {
|
||||
test.Assert(t, err == context.Canceled, "got unexpected err %v", err)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/restic/restic/internal/backend/util"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/feature"
|
||||
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
|
@ -229,6 +230,21 @@ func (be *Backend) IsNotExist(err error) bool {
|
|||
return errors.As(err, &e) && e.Code == "NoSuchKey"
|
||||
}
|
||||
|
||||
func (be *Backend) IsPermanentError(err error) bool {
|
||||
if be.IsNotExist(err) {
|
||||
return true
|
||||
}
|
||||
|
||||
var merr minio.ErrorResponse
|
||||
if errors.As(err, &merr) {
|
||||
if merr.Code == "InvalidRange" || merr.Code == "AccessDenied" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Join combines path components with slashes.
|
||||
func (be *Backend) Join(p ...string) string {
|
||||
return path.Join(p...)
|
||||
|
@ -384,11 +400,18 @@ func (be *Backend) openReader(ctx context.Context, h backend.Handle, length int,
|
|||
}
|
||||
|
||||
coreClient := minio.Core{Client: be.client}
|
||||
rd, _, _, err := coreClient.GetObject(ctx, be.cfg.Bucket, objName, opts)
|
||||
rd, info, _, err := coreClient.GetObject(ctx, be.cfg.Bucket, objName, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if feature.Flag.Enabled(feature.BackendErrorRedesign) && length > 0 {
|
||||
if info.Size > 0 && info.Size != int64(length) {
|
||||
_ = rd.Close()
|
||||
return nil, minio.ErrorResponse{Code: "InvalidRange", Message: "restic-file-too-short"}
|
||||
}
|
||||
}
|
||||
|
||||
return rd, err
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"github.com/restic/restic/internal/backend/util"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/feature"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/pkg/sftp"
|
||||
|
@ -43,6 +44,8 @@ type SFTP struct {
|
|||
|
||||
var _ backend.Backend = &SFTP{}
|
||||
|
||||
var errTooShort = fmt.Errorf("file is too short")
|
||||
|
||||
func NewFactory() location.Factory {
|
||||
return location.NewLimitedBackendFactory("sftp", ParseConfig, location.NoPassword, limiter.WrapBackendConstructor(Create), limiter.WrapBackendConstructor(Open))
|
||||
}
|
||||
|
@ -212,6 +215,10 @@ func (r *SFTP) IsNotExist(err error) bool {
|
|||
return errors.Is(err, os.ErrNotExist)
|
||||
}
|
||||
|
||||
func (r *SFTP) IsPermanentError(err error) bool {
|
||||
return r.IsNotExist(err) || errors.Is(err, errTooShort) || errors.Is(err, os.ErrPermission)
|
||||
}
|
||||
|
||||
func buildSSHCommand(cfg Config) (cmd string, args []string, err error) {
|
||||
if cfg.Command != "" {
|
||||
args, err := backend.SplitShellStrings(cfg.Command)
|
||||
|
@ -419,7 +426,24 @@ func (r *SFTP) checkNoSpace(dir string, size int64, origErr error) error {
|
|||
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||
// given offset.
|
||||
func (r *SFTP) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
return util.DefaultLoad(ctx, h, length, offset, r.openReader, fn)
|
||||
return util.DefaultLoad(ctx, h, length, offset, r.openReader, func(rd io.Reader) error {
|
||||
if length == 0 || !feature.Flag.Enabled(feature.BackendErrorRedesign) {
|
||||
return fn(rd)
|
||||
}
|
||||
|
||||
// there is no direct way to efficiently check whether the file is too short
|
||||
// rd is already a LimitedReader which can be used to track the number of bytes read
|
||||
err := fn(rd)
|
||||
|
||||
// check the underlying reader to be agnostic to however fn() handles the returned error
|
||||
_, rderr := rd.Read([]byte{0})
|
||||
if rderr == io.EOF && rd.(*backend.LimitedReadCloser).N != 0 {
|
||||
// file is too short
|
||||
return fmt.Errorf("%w: %v", errTooShort, err)
|
||||
}
|
||||
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (r *SFTP) openReader(_ context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/restic/restic/internal/backend/util"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/feature"
|
||||
|
||||
"github.com/ncw/swift/v2"
|
||||
)
|
||||
|
@ -153,7 +154,18 @@ func (be *beSwift) openReader(ctx context.Context, h backend.Handle, length int,
|
|||
|
||||
obj, _, err := be.conn.ObjectOpen(ctx, be.container, objName, false, headers)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "conn.ObjectOpen")
|
||||
return nil, fmt.Errorf("conn.ObjectOpen: %w", err)
|
||||
}
|
||||
|
||||
if feature.Flag.Enabled(feature.BackendErrorRedesign) && length > 0 {
|
||||
// get response length, but don't cause backend calls
|
||||
cctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
objLength, e := obj.Length(cctx)
|
||||
if e == nil && objLength != int64(length) {
|
||||
_ = obj.Close()
|
||||
return nil, &swift.Error{StatusCode: http.StatusRequestedRangeNotSatisfiable, Text: "restic-file-too-short"}
|
||||
}
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
|
@ -242,6 +254,21 @@ func (be *beSwift) IsNotExist(err error) bool {
|
|||
return errors.As(err, &e) && e.StatusCode == http.StatusNotFound
|
||||
}
|
||||
|
||||
func (be *beSwift) IsPermanentError(err error) bool {
|
||||
if be.IsNotExist(err) {
|
||||
return true
|
||||
}
|
||||
|
||||
var serr *swift.Error
|
||||
if errors.As(err, &serr) {
|
||||
if serr.StatusCode == http.StatusRequestedRangeNotSatisfiable || serr.StatusCode == http.StatusUnauthorized || serr.StatusCode == http.StatusForbidden {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Delete removes all restic objects in the container.
|
||||
// It will not remove the container itself.
|
||||
func (be *beSwift) Delete(ctx context.Context) error {
|
||||
|
|
|
@ -99,6 +99,7 @@ func (s *Suite[C]) TestConfig(t *testing.T) {
|
|||
t.Fatalf("did not get expected error for non-existing config")
|
||||
}
|
||||
test.Assert(t, b.IsNotExist(err), "IsNotExist() did not recognize error from LoadAll(): %v", err)
|
||||
test.Assert(t, b.IsPermanentError(err), "IsPermanentError() did not recognize error from LoadAll(): %v", err)
|
||||
|
||||
err = b.Save(context.TODO(), backend.Handle{Type: backend.ConfigFile}, backend.NewByteReader([]byte(testString), b.Hasher()))
|
||||
if err != nil {
|
||||
|
@ -135,6 +136,7 @@ func (s *Suite[C]) TestLoad(t *testing.T) {
|
|||
t.Fatalf("Load() did not return an error for non-existing blob")
|
||||
}
|
||||
test.Assert(t, b.IsNotExist(err), "IsNotExist() did not recognize non-existing blob: %v", err)
|
||||
test.Assert(t, b.IsPermanentError(err), "IsPermanentError() did not recognize non-existing blob: %v", err)
|
||||
|
||||
length := rand.Intn(1<<24) + 2000
|
||||
|
||||
|
@ -181,8 +183,12 @@ func (s *Suite[C]) TestLoad(t *testing.T) {
|
|||
}
|
||||
|
||||
getlen := l
|
||||
if l >= len(d) && rand.Float32() >= 0.5 {
|
||||
getlen = 0
|
||||
if l >= len(d) {
|
||||
if rand.Float32() >= 0.5 {
|
||||
getlen = 0
|
||||
} else {
|
||||
getlen = len(d)
|
||||
}
|
||||
}
|
||||
|
||||
if l > 0 && l < len(d) {
|
||||
|
@ -225,6 +231,18 @@ func (s *Suite[C]) TestLoad(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// test error checking for partial and fully out of bounds read
|
||||
// only test for length > 0 as we currently do not need strict out of bounds handling for length==0
|
||||
for _, offset := range []int{length - 99, length - 50, length, length + 100} {
|
||||
err = b.Load(context.TODO(), handle, 100, int64(offset), func(rd io.Reader) (ierr error) {
|
||||
_, ierr = io.ReadAll(rd)
|
||||
return ierr
|
||||
})
|
||||
test.Assert(t, err != nil, "Load() did not return error on out of bounds read! o %v, l %v, filelength %v", offset, 100, length)
|
||||
test.Assert(t, b.IsPermanentError(err), "IsPermanentError() did not recognize out of range read: %v", err)
|
||||
test.Assert(t, !b.IsNotExist(err), "IsNotExist() must not recognize out of range read: %v", err)
|
||||
}
|
||||
|
||||
test.OK(t, b.Remove(context.TODO(), handle))
|
||||
}
|
||||
|
||||
|
@ -762,6 +780,7 @@ func (s *Suite[C]) TestBackend(t *testing.T) {
|
|||
defer s.close(t, b)
|
||||
|
||||
test.Assert(t, !b.IsNotExist(nil), "IsNotExist() recognized nil error")
|
||||
test.Assert(t, !b.IsPermanentError(nil), "IsPermanentError() recognized nil error")
|
||||
|
||||
for _, tpe := range []backend.FileType{
|
||||
backend.PackFile, backend.KeyFile, backend.LockFile,
|
||||
|
@ -782,11 +801,13 @@ func (s *Suite[C]) TestBackend(t *testing.T) {
|
|||
_, err = b.Stat(context.TODO(), h)
|
||||
test.Assert(t, err != nil, "blob data could be extracted before creation")
|
||||
test.Assert(t, b.IsNotExist(err), "IsNotExist() did not recognize Stat() error: %v", err)
|
||||
test.Assert(t, b.IsPermanentError(err), "IsPermanentError() did not recognize Stat() error: %v", err)
|
||||
|
||||
// try to read not existing blob
|
||||
err = testLoad(b, h)
|
||||
test.Assert(t, err != nil, "blob could be read before creation")
|
||||
test.Assert(t, b.IsNotExist(err), "IsNotExist() did not recognize Load() error: %v", err)
|
||||
test.Assert(t, b.IsPermanentError(err), "IsPermanentError() did not recognize Load() error: %v", err)
|
||||
|
||||
// try to get string out, should fail
|
||||
ret, err = beTest(context.TODO(), b, h)
|
||||
|
|
|
@ -61,7 +61,7 @@ func (c *Cache) load(h backend.Handle, length int, offset int64) (io.ReadCloser,
|
|||
if size < offset+int64(length) {
|
||||
_ = f.Close()
|
||||
_ = c.remove(h)
|
||||
return nil, errors.Errorf("cached file %v is too small, removing", h)
|
||||
return nil, errors.Errorf("cached file %v is too short, removing", h)
|
||||
}
|
||||
|
||||
if offset > 0 {
|
||||
|
|
|
@ -299,7 +299,7 @@ func (k *Key) Open(dst, nonce, ciphertext, _ []byte) ([]byte, error) {
|
|||
|
||||
// check for plausible length
|
||||
if len(ciphertext) < k.Overhead() {
|
||||
return nil, errors.Errorf("trying to decrypt invalid data: ciphertext too small")
|
||||
return nil, errors.Errorf("trying to decrypt invalid data: ciphertext too short")
|
||||
}
|
||||
|
||||
l := len(ciphertext) - macSize
|
||||
|
|
|
@ -5,17 +5,17 @@ var Flag = New()
|
|||
|
||||
// flag names are written in kebab-case
|
||||
const (
|
||||
BackendErrorRedesign FlagName = "backend-error-redesign"
|
||||
DeprecateLegacyIndex FlagName = "deprecate-legacy-index"
|
||||
DeprecateS3LegacyLayout FlagName = "deprecate-s3-legacy-layout"
|
||||
DeviceIDForHardlinks FlagName = "device-id-for-hardlinks"
|
||||
HTTPTimeouts FlagName = "http-timeouts"
|
||||
)
|
||||
|
||||
func init() {
|
||||
Flag.SetFlags(map[FlagName]FlagDesc{
|
||||
BackendErrorRedesign: {Type: Beta, Description: "enforce timeouts for stuck HTTP requests and use new backend error handling design."},
|
||||
DeprecateLegacyIndex: {Type: Beta, Description: "disable support for index format used by restic 0.1.0. Use `restic repair index` to update the index if necessary."},
|
||||
DeprecateS3LegacyLayout: {Type: Beta, Description: "disable support for S3 legacy layout used up to restic 0.7.0. Use `RESTIC_FEATURES=deprecate-s3-legacy-layout=false restic migrate s3_layout` to migrate your S3 repository if necessary."},
|
||||
DeviceIDForHardlinks: {Type: Alpha, Description: "store deviceID only for hardlinks to reduce metadata changes for example when using btrfs subvolumes. Will be removed in a future restic version after repository format 3 is available"},
|
||||
HTTPTimeouts: {Type: Beta, Description: "enforce timeouts for stuck HTTP requests."},
|
||||
})
|
||||
}
|
||||
|
|
|
@ -204,7 +204,7 @@ func (h *hashedArrayTree) Size() uint {
|
|||
func (h *hashedArrayTree) grow() {
|
||||
idx, subIdx := h.index(h.size)
|
||||
if int(idx) == len(h.blockList) {
|
||||
// blockList is too small -> double list and block size
|
||||
// blockList is too short -> double list and block size
|
||||
h.blockSize *= 2
|
||||
h.mask = h.mask*2 + 1
|
||||
h.maskShift++
|
||||
|
|
|
@ -239,7 +239,7 @@ func readRecords(rd io.ReaderAt, size int64, bufsize int) ([]byte, int, error) {
|
|||
case hlen == 0:
|
||||
err = InvalidFileError{Message: "header length is zero"}
|
||||
case hlen < crypto.Extension:
|
||||
err = InvalidFileError{Message: "header length is too small"}
|
||||
err = InvalidFileError{Message: "header length is too short"}
|
||||
case int64(hlen) > size-int64(headerLengthSize):
|
||||
err = InvalidFileError{Message: "header is larger than file"}
|
||||
case int64(hlen) > MaxHeaderSize-int64(headerLengthSize):
|
||||
|
@ -263,7 +263,7 @@ func readRecords(rd io.ReaderAt, size int64, bufsize int) ([]byte, int, error) {
|
|||
func readHeader(rd io.ReaderAt, size int64) ([]byte, error) {
|
||||
debug.Log("size: %v", size)
|
||||
if size < int64(minFileSize) {
|
||||
err := InvalidFileError{Message: "file is too small"}
|
||||
err := InvalidFileError{Message: "file is too short"}
|
||||
return nil, errors.Wrap(err, "readHeader")
|
||||
}
|
||||
|
||||
|
@ -305,7 +305,7 @@ func List(k *crypto.Key, rd io.ReaderAt, size int64) (entries []restic.Blob, hdr
|
|||
}
|
||||
|
||||
if len(buf) < crypto.CiphertextLength(0) {
|
||||
return nil, 0, errors.New("invalid header, too small")
|
||||
return nil, 0, errors.New("invalid header, too short")
|
||||
}
|
||||
|
||||
hdrSize = headerLengthSize + uint32(len(buf))
|
||||
|
|
|
@ -444,7 +444,7 @@ func decidePackAction(ctx context.Context, opts PruneOptions, repo restic.Reposi
|
|||
// This is equivalent to sorting by unused / total space.
|
||||
// Instead of unused[i] / used[i] > unused[j] / used[j] we use
|
||||
// unused[i] * used[j] > unused[j] * used[i] as uint32*uint32 < uint64
|
||||
// Moreover packs containing trees and too small packs are sorted to the beginning
|
||||
// Moreover packs containing trees and too short packs are sorted to the beginning
|
||||
sort.Slice(repackCandidates, func(i, j int) bool {
|
||||
pi := repackCandidates[i].packInfo
|
||||
pj := repackCandidates[j].packInfo
|
||||
|
|
Loading…
Reference in New Issue