diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index 42908557e..93b4556c7 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -304,7 +304,7 @@ func (opts BackupOptions) Check(gopts GlobalOptions, args []string) error { // from being saved in a snapshot based on path only func collectRejectByNameFuncs(opts BackupOptions, repo *repository.Repository) (fs []archiver.RejectByNameFunc, err error) { // exclude restic cache - if repo.Cache != nil { + if repo.Cache() != nil { f, err := rejectResticCache(repo) if err != nil { return nil, err diff --git a/cmd/restic/cmd_forget.go b/cmd/restic/cmd_forget.go index f770dc365..f9ae85cd1 100644 --- a/cmd/restic/cmd_forget.go +++ b/cmd/restic/cmd_forget.go @@ -304,7 +304,7 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption if len(removeSnIDs) > 0 { if !opts.DryRun { bar := printer.NewCounter("files deleted") - err := restic.ParallelRemove(ctx, repo, removeSnIDs, restic.SnapshotFile, func(id restic.ID, err error) error { + err := restic.ParallelRemove(ctx, repo, removeSnIDs, restic.WriteableSnapshotFile, func(id restic.ID, err error) error { if err != nil { printer.E("unable to remove %v/%v from the repository\n", restic.SnapshotFile, id) } else { diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index 213714799..fce109bdd 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -171,7 +171,7 @@ func runPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, term } func runPruneWithRepo(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo *repository.Repository, ignoreSnapshots restic.IDSet, term *termstatus.Terminal) error { - if repo.Cache == nil { + if repo.Cache() == nil { Print("warning: running prune without a cache, this may be very slow!\n") } diff --git a/cmd/restic/cmd_recover.go b/cmd/restic/cmd_recover.go index 133f77978..78fc2d148 100644 --- a/cmd/restic/cmd_recover.go +++ b/cmd/restic/cmd_recover.go @@ -168,7 +168,7 @@ func runRecover(ctx context.Context, gopts GlobalOptions) error { } -func createSnapshot(ctx context.Context, name, hostname string, tags []string, repo restic.SaverUnpacked, tree *restic.ID) error { +func createSnapshot(ctx context.Context, name, hostname string, tags []string, repo restic.SaverUnpacked[restic.WriteableFileType], tree *restic.ID) error { sn, err := restic.NewSnapshot([]string{name}, tags, hostname, time.Now()) if err != nil { return errors.Fatalf("unable to save snapshot: %v", err) diff --git a/cmd/restic/cmd_rewrite.go b/cmd/restic/cmd_rewrite.go index b62d1ed95..707f8af9b 100644 --- a/cmd/restic/cmd_rewrite.go +++ b/cmd/restic/cmd_rewrite.go @@ -194,7 +194,7 @@ func filterAndReplaceSnapshot(ctx context.Context, repo restic.Repository, sn *r if dryRun { Verbosef("would delete empty snapshot\n") } else { - if err = repo.RemoveUnpacked(ctx, restic.SnapshotFile, *sn.ID()); err != nil { + if err = repo.RemoveUnpacked(ctx, restic.WriteableSnapshotFile, *sn.ID()); err != nil { return false, err } debug.Log("removed empty snapshot %v", sn.ID()) @@ -253,7 +253,7 @@ func filterAndReplaceSnapshot(ctx context.Context, repo restic.Repository, sn *r Verbosef("saved new snapshot %v\n", id.Str()) if forget { - if err = repo.RemoveUnpacked(ctx, restic.SnapshotFile, *sn.ID()); err != nil { + if err = repo.RemoveUnpacked(ctx, restic.WriteableSnapshotFile, *sn.ID()); err != nil { return false, err } debug.Log("removed old snapshot %v", sn.ID()) diff --git a/cmd/restic/cmd_tag.go b/cmd/restic/cmd_tag.go index 8a2a83678..539a0cc59 100644 --- a/cmd/restic/cmd_tag.go +++ b/cmd/restic/cmd_tag.go @@ -90,7 +90,7 @@ func changeTags(ctx context.Context, repo *repository.Repository, sn *restic.Sna debug.Log("new snapshot saved as %v", id) // Remove the old snapshot. - if err = repo.RemoveUnpacked(ctx, restic.SnapshotFile, *sn.ID()); err != nil { + if err = repo.RemoveUnpacked(ctx, restic.WriteableSnapshotFile, *sn.ID()); err != nil { return false, err } diff --git a/cmd/restic/cmd_unlock.go b/cmd/restic/cmd_unlock.go index d87cde065..825eb815c 100644 --- a/cmd/restic/cmd_unlock.go +++ b/cmd/restic/cmd_unlock.go @@ -3,7 +3,7 @@ package main import ( "context" - "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/repository" "github.com/spf13/cobra" ) @@ -45,9 +45,9 @@ func runUnlock(ctx context.Context, opts UnlockOptions, gopts GlobalOptions) err return err } - fn := restic.RemoveStaleLocks + fn := repository.RemoveStaleLocks if opts.RemoveAll { - fn = restic.RemoveAllLocks + fn = repository.RemoveAllLocks } processed, err := fn(ctx, repo) diff --git a/cmd/restic/exclude.go b/cmd/restic/exclude.go index 99d1128a9..1c05f4abb 100644 --- a/cmd/restic/exclude.go +++ b/cmd/restic/exclude.go @@ -11,12 +11,12 @@ import ( // rejectResticCache returns a RejectByNameFunc that rejects the restic cache // directory (if set). func rejectResticCache(repo *repository.Repository) (archiver.RejectByNameFunc, error) { - if repo.Cache == nil { + if repo.Cache() == nil { return func(string) bool { return false }, nil } - cacheBase := repo.Cache.BaseDir() + cacheBase := repo.Cache().BaseDir() if cacheBase == "" { return nil, errors.New("cacheBase is empty string") diff --git a/cmd/restic/integration_helpers_test.go b/cmd/restic/integration_helpers_test.go index 8ae3bb78a..21944a9ce 100644 --- a/cmd/restic/integration_helpers_test.go +++ b/cmd/restic/integration_helpers_test.go @@ -275,17 +275,30 @@ func listTreePacks(gopts GlobalOptions, t *testing.T) restic.IDSet { return treePacks } +func captureBackend(gopts *GlobalOptions) func() backend.Backend { + var be backend.Backend + gopts.backendTestHook = func(r backend.Backend) (backend.Backend, error) { + be = r + return r, nil + } + return func() backend.Backend { + return be + } +} + func removePacks(gopts GlobalOptions, t testing.TB, remove restic.IDSet) { - ctx, r, unlock, err := openWithExclusiveLock(context.TODO(), gopts, false) + be := captureBackend(&gopts) + ctx, _, unlock, err := openWithExclusiveLock(context.TODO(), gopts, false) rtest.OK(t, err) defer unlock() for id := range remove { - rtest.OK(t, r.RemoveUnpacked(ctx, restic.PackFile, id)) + rtest.OK(t, be().Remove(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()})) } } func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, removeTreePacks bool) { + be := captureBackend(&gopts) ctx, r, unlock, err := openWithExclusiveLock(context.TODO(), gopts, false) rtest.OK(t, err) defer unlock() @@ -305,7 +318,7 @@ func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, rem if treePacks.Has(id) != removeTreePacks || keep.Has(id) { return nil } - return r.RemoveUnpacked(ctx, restic.PackFile, id) + return be().Remove(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()}) })) } diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 55b6ee4b3..0b71cbacf 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -74,7 +74,7 @@ type ToNoder interface { type archiverRepo interface { restic.Loader restic.BlobSaver - restic.SaverUnpacked + restic.SaverUnpacked[restic.WriteableFileType] Config() restic.Config StartPackUploader(ctx context.Context, wg *errgroup.Group) diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index 0e2125bba..92bbb1da6 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -145,11 +145,11 @@ func TestUnreferencedPack(t *testing.T) { } func TestUnreferencedBlobs(t *testing.T) { - repo, _, cleanup := repository.TestFromFixture(t, checkerTestData) + repo, be, cleanup := repository.TestFromFixture(t, checkerTestData) defer cleanup() snapshotID := restic.TestParseID("51d249d28815200d59e4be7b3f21a157b864dc343353df9d8e498220c2499b02") - test.OK(t, repo.RemoveUnpacked(context.TODO(), restic.SnapshotFile, snapshotID)) + test.OK(t, be.Remove(context.TODO(), backend.Handle{Type: restic.SnapshotFile, Name: snapshotID.String()})) unusedBlobsBySnapshot := restic.BlobHandles{ restic.TestParseHandle("58c748bbe2929fdf30c73262bd8313fe828f8925b05d1d4a87fe109082acb849", restic.DataBlob), @@ -334,7 +334,7 @@ func (b *errorOnceBackend) Load(ctx context.Context, h backend.Handle, length in } func TestCheckerModifiedData(t *testing.T) { - repo, be := repository.TestRepositoryWithVersion(t, 0) + repo, _, be := repository.TestRepositoryWithVersion(t, 0) sn := archiver.TestSnapshot(t, repo, ".", nil) t.Logf("archived as %v", sn.ID().Str()) diff --git a/internal/migrations/upgrade_repo_v2_test.go b/internal/migrations/upgrade_repo_v2_test.go index 44a39b6c5..1f4cba4e5 100644 --- a/internal/migrations/upgrade_repo_v2_test.go +++ b/internal/migrations/upgrade_repo_v2_test.go @@ -8,7 +8,7 @@ import ( ) func TestUpgradeRepoV2(t *testing.T) { - repo, _ := repository.TestRepositoryWithVersion(t, 1) + repo, _, _ := repository.TestRepositoryWithVersion(t, 1) if repo.Config().Version != 1 { t.Fatal("test repo has wrong version") } diff --git a/internal/repository/check.go b/internal/repository/check.go index 4e57a7c1c..2bf2ac8f3 100644 --- a/internal/repository/check.go +++ b/internal/repository/check.go @@ -40,9 +40,9 @@ func (e *partialReadError) Error() string { func CheckPack(ctx context.Context, r *Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error { err := checkPackInner(ctx, r, id, blobs, size, bufRd, dec) if err != nil { - if r.Cache != nil { + if r.cache != nil { // ignore error as there's not much we can do here - _ = r.Cache.Forget(backend.Handle{Type: restic.PackFile, Name: id.String()}) + _ = r.cache.Forget(backend.Handle{Type: restic.PackFile, Name: id.String()}) } // retry pack verification to detect transient errors diff --git a/internal/repository/fuzz_test.go b/internal/repository/fuzz_test.go index f1fb06157..c20f9a710 100644 --- a/internal/repository/fuzz_test.go +++ b/internal/repository/fuzz_test.go @@ -18,7 +18,7 @@ func FuzzSaveLoadBlob(f *testing.F) { } id := restic.Hash(blob) - repo, _ := TestRepositoryWithVersion(t, 2) + repo, _, _ := TestRepositoryWithVersion(t, 2) var wg errgroup.Group repo.StartPackUploader(context.TODO(), &wg) diff --git a/internal/repository/index/index.go b/internal/repository/index/index.go index 14e4543bd..c62c1c462 100644 --- a/internal/repository/index/index.go +++ b/internal/repository/index/index.go @@ -351,7 +351,7 @@ func (idx *Index) Encode(w io.Writer) error { } // SaveIndex saves an index in the repository. -func (idx *Index) SaveIndex(ctx context.Context, repo restic.SaverUnpacked) (restic.ID, error) { +func (idx *Index) SaveIndex(ctx context.Context, repo restic.SaverUnpacked[restic.FileType]) (restic.ID, error) { buf := bytes.NewBuffer(nil) err := idx.Encode(buf) diff --git a/internal/repository/index/master_index.go b/internal/repository/index/master_index.go index ce9afcde4..16923090b 100644 --- a/internal/repository/index/master_index.go +++ b/internal/repository/index/master_index.go @@ -321,7 +321,7 @@ type MasterIndexRewriteOpts struct { // This is used by repair index to only rewrite and delete the old indexes. // // Must not be called concurrently to any other MasterIndex operation. -func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, excludePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, opts MasterIndexRewriteOpts) error { +func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked[restic.FileType], excludePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, opts MasterIndexRewriteOpts) error { for _, idx := range mi.idx { if !idx.Final() { panic("internal error - index must be saved before calling MasterIndex.Rewrite") @@ -499,7 +499,7 @@ func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, exclud // It is only intended for use by prune with the UnsafeRecovery option. // // Must not be called concurrently to any other MasterIndex operation. -func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, p *progress.Counter) error { +func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemoverUnpacked[restic.FileType], excludePacks restic.IDSet, p *progress.Counter) error { p.SetMax(uint64(len(mi.Packs(excludePacks)))) mi.idxMutex.Lock() @@ -574,7 +574,7 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove } // saveIndex saves all indexes in the backend. -func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked, indexes ...*Index) error { +func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked[restic.FileType], indexes ...*Index) error { for i, idx := range indexes { debug.Log("Saving index %d", i) @@ -590,12 +590,12 @@ func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked, in } // SaveIndex saves all new indexes in the backend. -func (mi *MasterIndex) SaveIndex(ctx context.Context, r restic.SaverUnpacked) error { +func (mi *MasterIndex) SaveIndex(ctx context.Context, r restic.SaverUnpacked[restic.FileType]) error { return mi.saveIndex(ctx, r, mi.finalizeNotFinalIndexes()...) } // SaveFullIndex saves all full indexes in the backend. -func (mi *MasterIndex) SaveFullIndex(ctx context.Context, r restic.SaverUnpacked) error { +func (mi *MasterIndex) SaveFullIndex(ctx context.Context, r restic.SaverUnpacked[restic.FileType]) error { return mi.saveIndex(ctx, r, mi.finalizeFullIndexes()...) } diff --git a/internal/repository/index/master_index_test.go b/internal/repository/index/master_index_test.go index 23185962e..516ef045c 100644 --- a/internal/repository/index/master_index_test.go +++ b/internal/repository/index/master_index_test.go @@ -346,13 +346,13 @@ var ( depth = 3 ) -func createFilledRepo(t testing.TB, snapshots int, version uint) restic.Repository { - repo, _ := repository.TestRepositoryWithVersion(t, version) +func createFilledRepo(t testing.TB, snapshots int, version uint) (restic.Repository, restic.Unpacked[restic.FileType]) { + repo, unpacked, _ := repository.TestRepositoryWithVersion(t, version) for i := 0; i < snapshots; i++ { restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(i)*time.Second), depth) } - return repo + return repo, unpacked } func TestIndexSave(t *testing.T) { @@ -362,15 +362,15 @@ func TestIndexSave(t *testing.T) { func testIndexSave(t *testing.T, version uint) { for _, test := range []struct { name string - saver func(idx *index.MasterIndex, repo restic.Repository) error + saver func(idx *index.MasterIndex, repo restic.Unpacked[restic.FileType]) error }{ - {"rewrite no-op", func(idx *index.MasterIndex, repo restic.Repository) error { + {"rewrite no-op", func(idx *index.MasterIndex, repo restic.Unpacked[restic.FileType]) error { return idx.Rewrite(context.TODO(), repo, nil, nil, nil, index.MasterIndexRewriteOpts{}) }}, - {"rewrite skip-all", func(idx *index.MasterIndex, repo restic.Repository) error { + {"rewrite skip-all", func(idx *index.MasterIndex, repo restic.Unpacked[restic.FileType]) error { return idx.Rewrite(context.TODO(), repo, nil, restic.NewIDSet(), nil, index.MasterIndexRewriteOpts{}) }}, - {"SaveFallback", func(idx *index.MasterIndex, repo restic.Repository) error { + {"SaveFallback", func(idx *index.MasterIndex, repo restic.Unpacked[restic.FileType]) error { err := restic.ParallelRemove(context.TODO(), repo, idx.IDs(), restic.IndexFile, nil, nil) if err != nil { return nil @@ -379,7 +379,7 @@ func testIndexSave(t *testing.T, version uint) { }}, } { t.Run(test.name, func(t *testing.T) { - repo := createFilledRepo(t, 3, version) + repo, unpacked := createFilledRepo(t, 3, version) idx := index.NewMasterIndex() rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) @@ -388,7 +388,7 @@ func testIndexSave(t *testing.T, version uint) { blobs[pb] = struct{}{} })) - rtest.OK(t, test.saver(idx, repo)) + rtest.OK(t, test.saver(idx, unpacked)) idx = index.NewMasterIndex() rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) @@ -411,7 +411,7 @@ func TestIndexSavePartial(t *testing.T) { } func testIndexSavePartial(t *testing.T, version uint) { - repo := createFilledRepo(t, 3, version) + repo, unpacked := createFilledRepo(t, 3, version) // capture blob list before adding fourth snapshot idx := index.NewMasterIndex() @@ -424,14 +424,14 @@ func testIndexSavePartial(t *testing.T, version uint) { // add+remove new snapshot and track its pack files packsBefore := listPacks(t, repo) sn := restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(4)*time.Second), depth) - rtest.OK(t, repo.RemoveUnpacked(context.TODO(), restic.SnapshotFile, *sn.ID())) + rtest.OK(t, repo.RemoveUnpacked(context.TODO(), restic.WriteableSnapshotFile, *sn.ID())) packsAfter := listPacks(t, repo) newPacks := packsAfter.Sub(packsBefore) // rewrite index and remove pack files of new snapshot idx = index.NewMasterIndex() rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) - rtest.OK(t, idx.Rewrite(context.TODO(), repo, newPacks, nil, nil, index.MasterIndexRewriteOpts{})) + rtest.OK(t, idx.Rewrite(context.TODO(), unpacked, newPacks, nil, nil, index.MasterIndexRewriteOpts{})) // check blobs idx = index.NewMasterIndex() @@ -446,7 +446,7 @@ func testIndexSavePartial(t *testing.T, version uint) { rtest.Equals(t, 0, len(blobs), "saved index is missing blobs") // remove pack files to make check happy - rtest.OK(t, restic.ParallelRemove(context.TODO(), repo, newPacks, restic.PackFile, nil, nil)) + rtest.OK(t, restic.ParallelRemove(context.TODO(), unpacked, newPacks, restic.PackFile, nil, nil)) checker.TestCheckRepo(t, repo, false) } diff --git a/internal/repository/lock.go b/internal/repository/lock.go index fd46066d1..a50195233 100644 --- a/internal/repository/lock.go +++ b/internal/repository/lock.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/restic/restic/internal/backend" @@ -42,13 +43,7 @@ func Lock(ctx context.Context, repo *Repository, exclusive bool, retryLock time. // Lock wraps the ctx such that it is cancelled when the repository is unlocked // cancelling the original context also stops the lock refresh -func (l *locker) Lock(ctx context.Context, repo *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) { - - lockFn := restic.NewLock - if exclusive { - lockFn = restic.NewExclusiveLock - } - +func (l *locker) Lock(ctx context.Context, r *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) { var lock *restic.Lock var err error @@ -56,9 +51,11 @@ func (l *locker) Lock(ctx context.Context, repo *Repository, exclusive bool, ret retryMessagePrinted := false retryTimeout := time.After(retryLock) + repo := &internalRepository{r} + retryLoop: for { - lock, err = lockFn(ctx, repo) + lock, err = restic.NewLock(ctx, repo, exclusive) if err != nil && restic.IsAlreadyLocked(err) { if !retryMessagePrinted { @@ -75,7 +72,7 @@ retryLoop: case <-retryTimeout: debug.Log("repo already locked, timeout expired") // Last lock attempt - lock, err = lockFn(ctx, repo) + lock, err = restic.NewLock(ctx, repo, exclusive) break retryLoop case <-retrySleepCh: retrySleep = minDuration(retrySleep*2, l.retrySleepMax) @@ -272,3 +269,39 @@ func (l *Unlocker) Unlock() { l.info.cancel() l.info.refreshWG.Wait() } + +// RemoveStaleLocks deletes all locks detected as stale from the repository. +func RemoveStaleLocks(ctx context.Context, repo *Repository) (uint, error) { + var processed uint + err := restic.ForAllLocks(ctx, repo, nil, func(id restic.ID, lock *restic.Lock, err error) error { + if err != nil { + // ignore locks that cannot be loaded + debug.Log("ignore lock %v: %v", id, err) + return nil + } + + if lock.Stale() { + err = (&internalRepository{repo}).RemoveUnpacked(ctx, restic.LockFile, id) + if err == nil { + processed++ + } + return err + } + + return nil + }) + return processed, err +} + +// RemoveAllLocks removes all locks forcefully. +func RemoveAllLocks(ctx context.Context, repo *Repository) (uint, error) { + var processed uint32 + err := restic.ParallelList(ctx, repo, restic.LockFile, repo.Connections(), func(ctx context.Context, id restic.ID, _ int64) error { + err := (&internalRepository{repo}).RemoveUnpacked(ctx, restic.LockFile, id) + if err == nil { + atomic.AddUint32(&processed, 1) + } + return err + }) + return uint(processed), err +} diff --git a/internal/repository/lock_test.go b/internal/repository/lock_test.go index a9ff369c2..c31221e42 100644 --- a/internal/repository/lock_test.go +++ b/internal/repository/lock_test.go @@ -3,6 +3,7 @@ package repository import ( "context" "fmt" + "os" "runtime" "strings" "sync" @@ -301,3 +302,83 @@ func TestLockWaitSuccess(t *testing.T) { rtest.OK(t, err) lock.Unlock() } + +func createFakeLock(repo *Repository, t time.Time, pid int) (restic.ID, error) { + hostname, err := os.Hostname() + if err != nil { + return restic.ID{}, err + } + + newLock := &restic.Lock{Time: t, PID: pid, Hostname: hostname} + return restic.SaveJSONUnpacked(context.TODO(), &internalRepository{repo}, restic.LockFile, &newLock) +} + +func lockExists(repo restic.Lister, t testing.TB, lockID restic.ID) bool { + var exists bool + rtest.OK(t, repo.List(context.TODO(), restic.LockFile, func(id restic.ID, size int64) error { + if id == lockID { + exists = true + } + return nil + })) + + return exists +} + +func removeLock(repo *Repository, id restic.ID) error { + return (&internalRepository{repo}).RemoveUnpacked(context.TODO(), restic.LockFile, id) +} + +func TestLockWithStaleLock(t *testing.T) { + repo := TestRepository(t) + + id1, err := createFakeLock(repo, time.Now().Add(-time.Hour), os.Getpid()) + rtest.OK(t, err) + + id2, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid()) + rtest.OK(t, err) + + id3, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid()+500000) + rtest.OK(t, err) + + processed, err := RemoveStaleLocks(context.TODO(), repo) + rtest.OK(t, err) + + rtest.Assert(t, lockExists(repo, t, id1) == false, + "stale lock still exists after RemoveStaleLocks was called") + rtest.Assert(t, lockExists(repo, t, id2) == true, + "non-stale lock was removed by RemoveStaleLocks") + rtest.Assert(t, lockExists(repo, t, id3) == false, + "stale lock still exists after RemoveStaleLocks was called") + rtest.Assert(t, processed == 2, + "number of locks removed does not match: expected %d, got %d", + 2, processed) + + rtest.OK(t, removeLock(repo, id2)) +} + +func TestRemoveAllLocks(t *testing.T) { + repo := TestRepository(t) + + id1, err := createFakeLock(repo, time.Now().Add(-time.Hour), os.Getpid()) + rtest.OK(t, err) + + id2, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid()) + rtest.OK(t, err) + + id3, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid()+500000) + rtest.OK(t, err) + + processed, err := RemoveAllLocks(context.TODO(), repo) + rtest.OK(t, err) + + rtest.Assert(t, lockExists(repo, t, id1) == false, + "lock still exists after RemoveAllLocks was called") + rtest.Assert(t, lockExists(repo, t, id2) == false, + "lock still exists after RemoveAllLocks was called") + rtest.Assert(t, lockExists(repo, t, id3) == false, + "lock still exists after RemoveAllLocks was called") + rtest.Assert(t, processed == 3, + "number of locks removed does not match: expected %d, got %d", + 3, processed) +} diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index 731ad9a6a..9d53c911b 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -190,5 +190,5 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *packe r.idx.StorePack(id, p.Packer.Blobs()) // Save index if full - return r.idx.SaveFullIndex(ctx, r) + return r.idx.SaveFullIndex(ctx, &internalRepository{r}) } diff --git a/internal/repository/prune.go b/internal/repository/prune.go index d5fdbba07..3803b6f33 100644 --- a/internal/repository/prune.go +++ b/internal/repository/prune.go @@ -544,7 +544,7 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er // unreferenced packs can be safely deleted first if len(plan.removePacksFirst) != 0 { printer.P("deleting unreferenced packs\n") - _ = deleteFiles(ctx, true, repo, plan.removePacksFirst, restic.PackFile, printer) + _ = deleteFiles(ctx, true, &internalRepository{repo}, plan.removePacksFirst, restic.PackFile, printer) // forget unused data plan.removePacksFirst = nil } @@ -588,7 +588,7 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er if plan.opts.UnsafeRecovery { printer.P("deleting index files\n") indexFiles := repo.idx.IDs() - err := deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer) + err := deleteFiles(ctx, false, &internalRepository{repo}, indexFiles, restic.IndexFile, printer) if err != nil { return errors.Fatalf("%s", err) } @@ -601,14 +601,14 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er if len(plan.removePacks) != 0 { printer.P("removing %d old packs\n", len(plan.removePacks)) - _ = deleteFiles(ctx, true, repo, plan.removePacks, restic.PackFile, printer) + _ = deleteFiles(ctx, true, &internalRepository{repo}, plan.removePacks, restic.PackFile, printer) } if ctx.Err() != nil { return ctx.Err() } if plan.opts.UnsafeRecovery { - err := repo.idx.SaveFallback(ctx, repo, plan.ignorePacks, printer.NewCounter("packs processed")) + err := repo.idx.SaveFallback(ctx, &internalRepository{repo}, plan.ignorePacks, printer.NewCounter("packs processed")) if err != nil { return errors.Fatalf("%s", err) } @@ -623,7 +623,7 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er // deleteFiles deletes the given fileList of fileType in parallel // if ignoreError=true, it will print a warning if there was an error, else it will abort. -func deleteFiles(ctx context.Context, ignoreError bool, repo restic.RemoverUnpacked, fileList restic.IDSet, fileType restic.FileType, printer progress.Printer) error { +func deleteFiles(ctx context.Context, ignoreError bool, repo restic.RemoverUnpacked[restic.FileType], fileList restic.IDSet, fileType restic.FileType, printer progress.Printer) error { bar := printer.NewCounter("files deleted") defer bar.Done() diff --git a/internal/repository/prune_test.go b/internal/repository/prune_test.go index 94d0dfa94..cc569aa43 100644 --- a/internal/repository/prune_test.go +++ b/internal/repository/prune_test.go @@ -20,7 +20,7 @@ func testPrune(t *testing.T, opts repository.PruneOptions, errOnUnused bool) { random := rand.New(rand.NewSource(seed)) t.Logf("rand initialized with seed %d", seed) - repo, be := repository.TestRepositoryWithVersion(t, 0) + repo, _, be := repository.TestRepositoryWithVersion(t, 0) createRandomBlobs(t, random, repo, 4, 0.5, true) createRandomBlobs(t, random, repo, 5, 0.5, true) keep, _ := selectBlobs(t, random, repo, 0.5) diff --git a/internal/repository/raw.go b/internal/repository/raw.go index 31443b010..c5a4a72b7 100644 --- a/internal/repository/raw.go +++ b/internal/repository/raw.go @@ -21,10 +21,10 @@ func (r *Repository) LoadRaw(ctx context.Context, t restic.FileType, id restic.I // retry loading damaged data only once. If a file fails to download correctly // the second time, then it is likely corrupted at the backend. if h.Type != backend.ConfigFile && id != restic.Hash(buf) { - if r.Cache != nil { + if r.cache != nil { // Cleanup cache to make sure it's not the cached copy that is broken. // Ignore error as there's not much we can do in that case. - _ = r.Cache.Forget(h) + _ = r.cache.Forget(h) } buf, err = loadRaw(ctx, r.be, h) diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 59bafe84c..0691cdbbb 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -159,14 +159,14 @@ func findPacksForBlobs(t *testing.T, repo restic.Repository, blobs restic.BlobSe return packs } -func repack(t *testing.T, repo restic.Repository, packs restic.IDSet, blobs restic.BlobSet) { +func repack(t *testing.T, repo restic.Repository, be backend.Backend, packs restic.IDSet, blobs restic.BlobSet) { repackedBlobs, err := repository.Repack(context.TODO(), repo, repo, packs, blobs, nil) if err != nil { t.Fatal(err) } for id := range repackedBlobs { - err = repo.RemoveUnpacked(context.TODO(), restic.PackFile, id) + err = be.Remove(context.TODO(), backend.Handle{Type: restic.PackFile, Name: id.String()}) if err != nil { t.Fatal(err) } @@ -186,7 +186,7 @@ func TestRepack(t *testing.T) { } func testRepack(t *testing.T, version uint) { - repo, _ := repository.TestRepositoryWithVersion(t, version) + repo, _, be := repository.TestRepositoryWithVersion(t, version) seed := time.Now().UnixNano() random := rand.New(rand.NewSource(seed)) @@ -199,7 +199,7 @@ func testRepack(t *testing.T, version uint) { packsBefore := listPacks(t, repo) // Running repack on empty ID sets should not do anything at all. - repack(t, repo, nil, nil) + repack(t, repo, be, nil, nil) packsAfter := listPacks(t, repo) @@ -212,7 +212,7 @@ func testRepack(t *testing.T, version uint) { removePacks := findPacksForBlobs(t, repo, removeBlobs) - repack(t, repo, removePacks, keepBlobs) + repack(t, repo, be, removePacks, keepBlobs) rebuildAndReloadIndex(t, repo) packsAfter = listPacks(t, repo) @@ -261,8 +261,8 @@ func (r oneConnectionRepo) Connections() uint { } func testRepackCopy(t *testing.T, version uint) { - repo, _ := repository.TestRepositoryWithVersion(t, version) - dstRepo, _ := repository.TestRepositoryWithVersion(t, version) + repo, _, _ := repository.TestRepositoryWithVersion(t, version) + dstRepo, _, _ := repository.TestRepositoryWithVersion(t, version) // test with minimal possible connection count repoWrapped := &oneConnectionRepo{repo} diff --git a/internal/repository/repair_index.go b/internal/repository/repair_index.go index bff7ec5da..cc08206d5 100644 --- a/internal/repository/repair_index.go +++ b/internal/repository/repair_index.go @@ -123,7 +123,7 @@ func rewriteIndexFiles(ctx context.Context, repo *Repository, removePacks restic printer.P("rebuilding index\n") bar := printer.NewCounter("indexes processed") - return repo.idx.Rewrite(ctx, repo, removePacks, oldIndexes, extraObsolete, index.MasterIndexRewriteOpts{ + return repo.idx.Rewrite(ctx, &internalRepository{repo}, removePacks, oldIndexes, extraObsolete, index.MasterIndexRewriteOpts{ SaveProgress: bar, DeleteProgress: func() *progress.Counter { return printer.NewCounter("old indexes deleted") diff --git a/internal/repository/repair_index_test.go b/internal/repository/repair_index_test.go index 3b0af4e22..0fc89c79a 100644 --- a/internal/repository/repair_index_test.go +++ b/internal/repository/repair_index_test.go @@ -23,7 +23,7 @@ func testRebuildIndex(t *testing.T, readAllPacks bool, damage func(t *testing.T, random := rand.New(rand.NewSource(seed)) t.Logf("rand initialized with seed %d", seed) - repo, be := repository.TestRepositoryWithVersion(t, 0) + repo, _, be := repository.TestRepositoryWithVersion(t, 0) createRandomBlobs(t, random, repo, 4, 0.5, true) createRandomBlobs(t, random, repo, 5, 0.5, true) indexes := listIndex(t, repo) diff --git a/internal/repository/repair_pack.go b/internal/repository/repair_pack.go index 811388cc9..a9f8413e4 100644 --- a/internal/repository/repair_pack.go +++ b/internal/repository/repair_pack.go @@ -65,7 +65,7 @@ func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printe printer.P("removing salvaged pack files") // if we fail to delete the damaged pack files, then prune will remove them later on bar = printer.NewCounter("files deleted") - _ = restic.ParallelRemove(ctx, repo, ids, restic.PackFile, nil, bar) + _ = restic.ParallelRemove(ctx, &internalRepository{repo}, ids, restic.PackFile, nil, bar) bar.Done() return nil diff --git a/internal/repository/repository.go b/internal/repository/repository.go index d408e3105..aee0db103 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -38,7 +38,7 @@ type Repository struct { key *crypto.Key keyID restic.ID idx *index.MasterIndex - Cache *cache.Cache + cache *cache.Cache opts Options @@ -53,6 +53,11 @@ type Repository struct { dec *zstd.Decoder } +// internalRepository allows using SaveUnpacked and RemoveUnpacked with all FileTypes +type internalRepository struct { + *Repository +} + type Options struct { Compression CompressionMode PackSize uint @@ -149,10 +154,14 @@ func (r *Repository) UseCache(c *cache.Cache) { return } debug.Log("using cache") - r.Cache = c + r.cache = c r.be = c.Wrap(r.be) } +func (r *Repository) Cache() *cache.Cache { + return r.cache +} + // SetDryRun sets the repo backend into dry-run mode. func (r *Repository) SetDryRun() { r.be = dryrun.New(r.be) @@ -225,15 +234,15 @@ func (r *Repository) LoadBlob(ctx context.Context, t restic.BlobType, id restic. } // try cached pack files first - sortCachedPacksFirst(r.Cache, blobs) + sortCachedPacksFirst(r.cache, blobs) buf, err := r.loadBlob(ctx, blobs, buf) if err != nil { - if r.Cache != nil { + if r.cache != nil { for _, blob := range blobs { h := backend.Handle{Type: restic.PackFile, Name: blob.PackID.String(), IsMetadata: blob.Type.IsMetadata()} // ignore errors as there's not much we can do here - _ = r.Cache.Forget(h) + _ = r.cache.Forget(h) } } @@ -446,7 +455,15 @@ func (r *Repository) decompressUnpacked(p []byte) ([]byte, error) { // SaveUnpacked encrypts data and stores it in the backend. Returned is the // storage hash. -func (r *Repository) SaveUnpacked(ctx context.Context, t restic.FileType, buf []byte) (id restic.ID, err error) { +func (r *Repository) SaveUnpacked(ctx context.Context, t restic.WriteableFileType, buf []byte) (id restic.ID, err error) { + return r.saveUnpacked(ctx, t.ToFileType(), buf) +} + +func (r *internalRepository) SaveUnpacked(ctx context.Context, t restic.FileType, buf []byte) (id restic.ID, err error) { + return r.Repository.saveUnpacked(ctx, t, buf) +} + +func (r *Repository) saveUnpacked(ctx context.Context, t restic.FileType, buf []byte) (id restic.ID, err error) { p := buf if t != restic.ConfigFile { p, err = r.compressUnpacked(p) @@ -507,8 +524,15 @@ func (r *Repository) verifyUnpacked(buf []byte, t restic.FileType, expected []by return nil } -func (r *Repository) RemoveUnpacked(ctx context.Context, t restic.FileType, id restic.ID) error { - // TODO prevent everything except removing snapshots for non-repository code +func (r *Repository) RemoveUnpacked(ctx context.Context, t restic.WriteableFileType, id restic.ID) error { + return r.removeUnpacked(ctx, t.ToFileType(), id) +} + +func (r *internalRepository) RemoveUnpacked(ctx context.Context, t restic.FileType, id restic.ID) error { + return r.Repository.removeUnpacked(ctx, t, id) +} + +func (r *Repository) removeUnpacked(ctx context.Context, t restic.FileType, id restic.ID) error { return r.be.Remove(ctx, backend.Handle{Type: t, Name: id.String()}) } @@ -518,7 +542,7 @@ func (r *Repository) Flush(ctx context.Context) error { return err } - return r.idx.SaveIndex(ctx, r) + return r.idx.SaveIndex(ctx, &internalRepository{r}) } func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) { @@ -702,14 +726,14 @@ func (r *Repository) createIndexFromPacks(ctx context.Context, packsize map[rest // prepareCache initializes the local cache. indexIDs is the list of IDs of // index files still present in the repo. func (r *Repository) prepareCache() error { - if r.Cache == nil { + if r.cache == nil { return nil } packs := r.idx.Packs(restic.NewIDSet()) // clear old packs - err := r.Cache.Clear(restic.PackFile, packs) + err := r.cache.Clear(restic.PackFile, packs) if err != nil { fmt.Fprintf(os.Stderr, "error clearing pack files in cache: %v\n", err) } @@ -803,7 +827,7 @@ func (r *Repository) init(ctx context.Context, password string, cfg restic.Confi r.key = key.master r.keyID = key.ID() r.setConfig(cfg) - return restic.SaveConfig(ctx, r, cfg) + return restic.SaveConfig(ctx, &internalRepository{r}, cfg) } // Key returns the current master key. @@ -835,9 +859,9 @@ func (r *Repository) ListPack(ctx context.Context, id restic.ID, size int64) ([] entries, hdrSize, err := pack.List(r.Key(), backend.ReaderAt(ctx, r.be, h), size) if err != nil { - if r.Cache != nil { + if r.cache != nil { // ignore error as there is not much we can do here - _ = r.Cache.Forget(h) + _ = r.cache.Forget(h) } // retry on error diff --git a/internal/repository/repository_internal_test.go b/internal/repository/repository_internal_test.go index 35082774c..edec4aa48 100644 --- a/internal/repository/repository_internal_test.go +++ b/internal/repository/repository_internal_test.go @@ -16,6 +16,7 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/repository/index" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" ) @@ -84,6 +85,53 @@ func BenchmarkSortCachedPacksFirst(b *testing.B) { } } +func BenchmarkLoadIndex(b *testing.B) { + BenchmarkAllVersions(b, benchmarkLoadIndex) +} + +func benchmarkLoadIndex(b *testing.B, version uint) { + TestUseLowSecurityKDFParameters(b) + + repo, _, be := TestRepositoryWithVersion(b, version) + idx := index.NewIndex() + + for i := 0; i < 5000; i++ { + idx.StorePack(restic.NewRandomID(), []restic.Blob{ + { + BlobHandle: restic.NewRandomBlobHandle(), + Length: 1234, + Offset: 1235, + }, + }) + } + idx.Finalize() + + id, err := idx.SaveIndex(context.TODO(), &internalRepository{repo}) + rtest.OK(b, err) + + b.Logf("index saved as %v", id.Str()) + fi, err := be.Stat(context.TODO(), backend.Handle{Type: restic.IndexFile, Name: id.String()}) + rtest.OK(b, err) + b.Logf("filesize is %v", fi.Size) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err := loadIndex(context.TODO(), repo, id) + rtest.OK(b, err) + } +} + +// loadIndex loads the index id from backend and returns it. +func loadIndex(ctx context.Context, repo restic.LoaderUnpacked, id restic.ID) (*index.Index, error) { + buf, err := repo.LoadUnpacked(ctx, restic.IndexFile, id) + if err != nil { + return nil, err + } + + return index.DecodeIndex(buf, id) +} + // buildPackfileWithoutHeader returns a manually built pack file without a header. func buildPackfileWithoutHeader(blobSizes []int, key *crypto.Key, compress bool) (blobs []restic.Blob, packfile []byte) { opts := []zstd.EOption{ diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index 5a6897f8f..1b0d47c8f 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -43,7 +43,7 @@ func testSaveCalculateID(t *testing.T, version uint) { } func testSave(t *testing.T, version uint, calculateID bool) { - repo, _ := repository.TestRepositoryWithVersion(t, version) + repo, _, _ := repository.TestRepositoryWithVersion(t, version) for _, size := range testSizes { data := make([]byte, size) @@ -86,7 +86,7 @@ func BenchmarkSaveAndEncrypt(t *testing.B) { } func benchmarkSaveAndEncrypt(t *testing.B, version uint) { - repo, _ := repository.TestRepositoryWithVersion(t, version) + repo, _, _ := repository.TestRepositoryWithVersion(t, version) size := 4 << 20 // 4MiB data := make([]byte, size) @@ -112,7 +112,7 @@ func TestLoadBlob(t *testing.T) { } func testLoadBlob(t *testing.T, version uint) { - repo, _ := repository.TestRepositoryWithVersion(t, version) + repo, _, _ := repository.TestRepositoryWithVersion(t, version) length := 1000000 buf := crypto.NewBlobBuffer(length) _, err := io.ReadFull(rnd, buf) @@ -168,7 +168,7 @@ func BenchmarkLoadBlob(b *testing.B) { } func benchmarkLoadBlob(b *testing.B, version uint) { - repo, _ := repository.TestRepositoryWithVersion(b, version) + repo, _, _ := repository.TestRepositoryWithVersion(b, version) length := 1000000 buf := crypto.NewBlobBuffer(length) _, err := io.ReadFull(rnd, buf) @@ -209,7 +209,7 @@ func BenchmarkLoadUnpacked(b *testing.B) { } func benchmarkLoadUnpacked(b *testing.B, version uint) { - repo, _ := repository.TestRepositoryWithVersion(b, version) + repo, _, _ := repository.TestRepositoryWithVersion(b, version) length := 1000000 buf := crypto.NewBlobBuffer(length) _, err := io.ReadFull(rnd, buf) @@ -217,7 +217,7 @@ func benchmarkLoadUnpacked(b *testing.B, version uint) { dataID := restic.Hash(buf) - storageID, err := repo.SaveUnpacked(context.TODO(), restic.PackFile, buf) + storageID, err := repo.SaveUnpacked(context.TODO(), restic.WriteableSnapshotFile, buf) rtest.OK(b, err) // rtest.OK(b, repo.Flush()) @@ -225,7 +225,7 @@ func benchmarkLoadUnpacked(b *testing.B, version uint) { b.SetBytes(int64(length)) for i := 0; i < b.N; i++ { - data, err := repo.LoadUnpacked(context.TODO(), restic.PackFile, storageID) + data, err := repo.LoadUnpacked(context.TODO(), restic.SnapshotFile, storageID) rtest.OK(b, err) // See comment in BenchmarkLoadBlob. @@ -262,7 +262,7 @@ func loadIndex(ctx context.Context, repo restic.LoaderUnpacked, id restic.ID) (* } func TestRepositoryLoadUnpackedBroken(t *testing.T) { - repo, be := repository.TestRepositoryWithVersion(t, 0) + repo, _, be := repository.TestRepositoryWithVersion(t, 0) data := rtest.Random(23, 12345) id := restic.Hash(data) @@ -309,43 +309,6 @@ func TestRepositoryLoadUnpackedRetryBroken(t *testing.T) { rtest.OK(t, repo.LoadIndex(context.TODO(), nil)) } -func BenchmarkLoadIndex(b *testing.B) { - repository.BenchmarkAllVersions(b, benchmarkLoadIndex) -} - -func benchmarkLoadIndex(b *testing.B, version uint) { - repository.TestUseLowSecurityKDFParameters(b) - - repo, be := repository.TestRepositoryWithVersion(b, version) - idx := index.NewIndex() - - for i := 0; i < 5000; i++ { - idx.StorePack(restic.NewRandomID(), []restic.Blob{ - { - BlobHandle: restic.NewRandomBlobHandle(), - Length: 1234, - Offset: 1235, - }, - }) - } - idx.Finalize() - - id, err := idx.SaveIndex(context.TODO(), repo) - rtest.OK(b, err) - - b.Logf("index saved as %v", id.Str()) - fi, err := be.Stat(context.TODO(), backend.Handle{Type: restic.IndexFile, Name: id.String()}) - rtest.OK(b, err) - b.Logf("filesize is %v", fi.Size) - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _, err := loadIndex(context.TODO(), repo, id) - rtest.OK(b, err) - } -} - // saveRandomDataBlobs generates random data blobs and saves them to the repository. func saveRandomDataBlobs(t testing.TB, repo restic.Repository, num int, sizeMax int) { var wg errgroup.Group @@ -368,7 +331,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) { } func testRepositoryIncrementalIndex(t *testing.T, version uint) { - repo, _ := repository.TestRepositoryWithVersion(t, version) + repo, _, _ := repository.TestRepositoryWithVersion(t, version) index.IndexFull = func(*index.Index) bool { return true } @@ -453,7 +416,7 @@ func TestListPack(t *testing.T) { } func TestNoDoubleInit(t *testing.T) { - r, be := repository.TestRepositoryWithVersion(t, restic.StableRepoVersion) + r, _, be := repository.TestRepositoryWithVersion(t, restic.StableRepoVersion) repo, err := repository.New(be, repository.Options{}) rtest.OK(t, err) diff --git a/internal/repository/testing.go b/internal/repository/testing.go index 2155cad16..a8321faad 100644 --- a/internal/repository/testing.go +++ b/internal/repository/testing.go @@ -78,30 +78,31 @@ func TestRepositoryWithBackend(t testing.TB, be backend.Backend, version uint, o // instead. The directory is not removed, but left there for inspection. func TestRepository(t testing.TB) *Repository { t.Helper() - repo, _ := TestRepositoryWithVersion(t, 0) + repo, _, _ := TestRepositoryWithVersion(t, 0) return repo } -func TestRepositoryWithVersion(t testing.TB, version uint) (*Repository, backend.Backend) { +func TestRepositoryWithVersion(t testing.TB, version uint) (*Repository, restic.Unpacked[restic.FileType], backend.Backend) { t.Helper() dir := os.Getenv("RESTIC_TEST_REPO") opts := Options{} + var repo *Repository + var be backend.Backend if dir != "" { _, err := os.Stat(dir) if err != nil { - be, err := local.Create(context.TODO(), local.Config{Path: dir}) + lbe, err := local.Create(context.TODO(), local.Config{Path: dir}) if err != nil { t.Fatalf("error creating local backend at %v: %v", dir, err) } - return TestRepositoryWithBackend(t, be, version, opts) - } - - if err == nil { + repo, be = TestRepositoryWithBackend(t, lbe, version, opts) + } else { t.Logf("directory at %v already exists, using mem backend", dir) } + } else { + repo, be = TestRepositoryWithBackend(t, nil, version, opts) } - - return TestRepositoryWithBackend(t, nil, version, opts) + return repo, &internalRepository{repo}, be } func TestFromFixture(t testing.TB, repoFixture string) (*Repository, backend.Backend, func()) { @@ -156,3 +157,8 @@ func BenchmarkAllVersions(b *testing.B, bench VersionedBenchmark) { }) } } + +func TestNewLock(t *testing.T, repo *Repository, exclusive bool) (*restic.Lock, error) { + // TODO get rid of this test helper + return restic.NewLock(context.TODO(), &internalRepository{repo}, exclusive) +} diff --git a/internal/repository/upgrade_repo.go b/internal/repository/upgrade_repo.go index ea3ae2c0e..0a91b1093 100644 --- a/internal/repository/upgrade_repo.go +++ b/internal/repository/upgrade_repo.go @@ -45,7 +45,7 @@ func upgradeRepository(ctx context.Context, repo *Repository) error { cfg := repo.Config() cfg.Version = 2 - err := restic.SaveConfig(ctx, repo, cfg) + err := restic.SaveConfig(ctx, &internalRepository{repo}, cfg) if err != nil { return fmt.Errorf("save new config file failed: %w", err) } diff --git a/internal/repository/upgrade_repo_test.go b/internal/repository/upgrade_repo_test.go index 61ca6ef95..c6bc574cf 100644 --- a/internal/repository/upgrade_repo_test.go +++ b/internal/repository/upgrade_repo_test.go @@ -13,7 +13,7 @@ import ( ) func TestUpgradeRepoV2(t *testing.T) { - repo, _ := TestRepositoryWithVersion(t, 1) + repo, _, _ := TestRepositoryWithVersion(t, 1) if repo.Config().Version != 1 { t.Fatal("test repo has wrong version") } diff --git a/internal/restic/config.go b/internal/restic/config.go index 3fb61cc13..264792e11 100644 --- a/internal/restic/config.go +++ b/internal/restic/config.go @@ -87,7 +87,7 @@ func LoadConfig(ctx context.Context, r LoaderUnpacked) (Config, error) { return cfg, nil } -func SaveConfig(ctx context.Context, r SaverUnpacked, cfg Config) error { +func SaveConfig(ctx context.Context, r SaverUnpacked[FileType], cfg Config) error { _, err := SaveJSONUnpacked(ctx, r, ConfigFile, cfg) return err } diff --git a/internal/restic/json.go b/internal/restic/json.go index 05d049b59..ec64ff153 100644 --- a/internal/restic/json.go +++ b/internal/restic/json.go @@ -21,7 +21,7 @@ func LoadJSONUnpacked(ctx context.Context, repo LoaderUnpacked, t FileType, id I // SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the // backend as type t, without a pack. It returns the storage hash. -func SaveJSONUnpacked(ctx context.Context, repo SaverUnpacked, t FileType, item interface{}) (ID, error) { +func SaveJSONUnpacked[FT FileTypes](ctx context.Context, repo SaverUnpacked[FT], t FT, item interface{}) (ID, error) { debug.Log("save new blob %v", t) plaintext, err := json.Marshal(item) if err != nil { diff --git a/internal/restic/lock.go b/internal/restic/lock.go index 8ad84091a..20fa1e20e 100644 --- a/internal/restic/lock.go +++ b/internal/restic/lock.go @@ -7,7 +7,6 @@ import ( "os/signal" "os/user" "sync" - "sync/atomic" "syscall" "testing" "time" @@ -39,7 +38,7 @@ type Lock struct { UID uint32 `json:"uid,omitempty"` GID uint32 `json:"gid,omitempty"` - repo Unpacked + repo Unpacked[FileType] lockID *ID } @@ -87,20 +86,6 @@ func IsInvalidLock(err error) bool { var ErrRemovedLock = errors.New("lock file was removed in the meantime") -// NewLock returns a new, non-exclusive lock for the repository. If an -// exclusive lock is already held by another process, it returns an error -// that satisfies IsAlreadyLocked. -func NewLock(ctx context.Context, repo Unpacked) (*Lock, error) { - return newLock(ctx, repo, false) -} - -// NewExclusiveLock returns a new, exclusive lock for the repository. If -// another lock (normal and exclusive) is already held by another process, -// it returns an error that satisfies IsAlreadyLocked. -func NewExclusiveLock(ctx context.Context, repo Unpacked) (*Lock, error) { - return newLock(ctx, repo, true) -} - var waitBeforeLockCheck = 200 * time.Millisecond // delay increases by factor 2 on each retry @@ -113,11 +98,15 @@ func TestSetLockTimeout(t testing.TB, d time.Duration) { initialWaitBetweenLockRetries = d } -func newLock(ctx context.Context, repo Unpacked, excl bool) (*Lock, error) { +// NewLock returns a new lock for the repository. If an +// exclusive lock is already held by another process, it returns an error +// that satisfies IsAlreadyLocked. If the new lock is exclude, then other +// non-exclusive locks also result in an IsAlreadyLocked error. +func NewLock(ctx context.Context, repo Unpacked[FileType], exclusive bool) (*Lock, error) { lock := &Lock{ Time: time.Now(), PID: os.Getpid(), - Exclusive: excl, + Exclusive: exclusive, repo: repo, } @@ -444,42 +433,6 @@ func LoadLock(ctx context.Context, repo LoaderUnpacked, id ID) (*Lock, error) { return lock, nil } -// RemoveStaleLocks deletes all locks detected as stale from the repository. -func RemoveStaleLocks(ctx context.Context, repo Unpacked) (uint, error) { - var processed uint - err := ForAllLocks(ctx, repo, nil, func(id ID, lock *Lock, err error) error { - if err != nil { - // ignore locks that cannot be loaded - debug.Log("ignore lock %v: %v", id, err) - return nil - } - - if lock.Stale() { - err = repo.RemoveUnpacked(ctx, LockFile, id) - if err == nil { - processed++ - } - return err - } - - return nil - }) - return processed, err -} - -// RemoveAllLocks removes all locks forcefully. -func RemoveAllLocks(ctx context.Context, repo Unpacked) (uint, error) { - var processed uint32 - err := ParallelList(ctx, repo, LockFile, repo.Connections(), func(ctx context.Context, id ID, _ int64) error { - err := repo.RemoveUnpacked(ctx, LockFile, id) - if err == nil { - atomic.AddUint32(&processed, 1) - } - return err - }) - return uint(processed), err -} - // ForAllLocks reads all locks in parallel and calls the given callback. // It is guaranteed that the function is not run concurrently. If the // callback returns an error, this function is cancelled and also returns that error. diff --git a/internal/restic/lock_test.go b/internal/restic/lock_test.go index 606ed210d..67d2b9a46 100644 --- a/internal/restic/lock_test.go +++ b/internal/restic/lock_test.go @@ -19,7 +19,7 @@ func TestLock(t *testing.T) { repo := repository.TestRepository(t) restic.TestSetLockTimeout(t, 5*time.Millisecond) - lock, err := restic.NewLock(context.TODO(), repo) + lock, err := repository.TestNewLock(t, repo, false) rtest.OK(t, err) rtest.OK(t, lock.Unlock(context.TODO())) @@ -29,7 +29,7 @@ func TestDoubleUnlock(t *testing.T) { repo := repository.TestRepository(t) restic.TestSetLockTimeout(t, 5*time.Millisecond) - lock, err := restic.NewLock(context.TODO(), repo) + lock, err := repository.TestNewLock(t, repo, false) rtest.OK(t, err) rtest.OK(t, lock.Unlock(context.TODO())) @@ -43,10 +43,10 @@ func TestMultipleLock(t *testing.T) { repo := repository.TestRepository(t) restic.TestSetLockTimeout(t, 5*time.Millisecond) - lock1, err := restic.NewLock(context.TODO(), repo) + lock1, err := repository.TestNewLock(t, repo, false) rtest.OK(t, err) - lock2, err := restic.NewLock(context.TODO(), repo) + lock2, err := repository.TestNewLock(t, repo, false) rtest.OK(t, err) rtest.OK(t, lock1.Unlock(context.TODO())) @@ -69,10 +69,10 @@ func TestMultipleLockFailure(t *testing.T) { repo, _ := repository.TestRepositoryWithBackend(t, be, 0, repository.Options{}) restic.TestSetLockTimeout(t, 5*time.Millisecond) - lock1, err := restic.NewLock(context.TODO(), repo) + lock1, err := repository.TestNewLock(t, repo, false) rtest.OK(t, err) - _, err = restic.NewLock(context.TODO(), repo) + _, err = repository.TestNewLock(t, repo, false) rtest.Assert(t, err != nil, "unreadable lock file did not result in an error") rtest.OK(t, lock1.Unlock(context.TODO())) @@ -81,7 +81,7 @@ func TestMultipleLockFailure(t *testing.T) { func TestLockExclusive(t *testing.T) { repo := repository.TestRepository(t) - elock, err := restic.NewExclusiveLock(context.TODO(), repo) + elock, err := repository.TestNewLock(t, repo, true) rtest.OK(t, err) rtest.OK(t, elock.Unlock(context.TODO())) } @@ -90,10 +90,10 @@ func TestLockOnExclusiveLockedRepo(t *testing.T) { repo := repository.TestRepository(t) restic.TestSetLockTimeout(t, 5*time.Millisecond) - elock, err := restic.NewExclusiveLock(context.TODO(), repo) + elock, err := repository.TestNewLock(t, repo, true) rtest.OK(t, err) - lock, err := restic.NewLock(context.TODO(), repo) + lock, err := repository.TestNewLock(t, repo, false) rtest.Assert(t, err != nil, "create normal lock with exclusively locked repo didn't return an error") rtest.Assert(t, restic.IsAlreadyLocked(err), @@ -107,10 +107,10 @@ func TestExclusiveLockOnLockedRepo(t *testing.T) { repo := repository.TestRepository(t) restic.TestSetLockTimeout(t, 5*time.Millisecond) - elock, err := restic.NewLock(context.TODO(), repo) + elock, err := repository.TestNewLock(t, repo, false) rtest.OK(t, err) - lock, err := restic.NewExclusiveLock(context.TODO(), repo) + lock, err := repository.TestNewLock(t, repo, true) rtest.Assert(t, err != nil, "create normal lock with exclusively locked repo didn't return an error") rtest.Assert(t, restic.IsAlreadyLocked(err), @@ -120,20 +120,6 @@ func TestExclusiveLockOnLockedRepo(t *testing.T) { rtest.OK(t, elock.Unlock(context.TODO())) } -func createFakeLock(repo restic.SaverUnpacked, t time.Time, pid int) (restic.ID, error) { - hostname, err := os.Hostname() - if err != nil { - return restic.ID{}, err - } - - newLock := &restic.Lock{Time: t, PID: pid, Hostname: hostname} - return restic.SaveJSONUnpacked(context.TODO(), repo, restic.LockFile, &newLock) -} - -func removeLock(repo restic.RemoverUnpacked, id restic.ID) error { - return repo.RemoveUnpacked(context.TODO(), restic.LockFile, id) -} - var staleLockTests = []struct { timestamp time.Time stale bool @@ -190,72 +176,6 @@ func TestLockStale(t *testing.T) { } } -func lockExists(repo restic.Lister, t testing.TB, lockID restic.ID) bool { - var exists bool - rtest.OK(t, repo.List(context.TODO(), restic.LockFile, func(id restic.ID, size int64) error { - if id == lockID { - exists = true - } - return nil - })) - - return exists -} - -func TestLockWithStaleLock(t *testing.T) { - repo := repository.TestRepository(t) - - id1, err := createFakeLock(repo, time.Now().Add(-time.Hour), os.Getpid()) - rtest.OK(t, err) - - id2, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid()) - rtest.OK(t, err) - - id3, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid()+500000) - rtest.OK(t, err) - - processed, err := restic.RemoveStaleLocks(context.TODO(), repo) - rtest.OK(t, err) - - rtest.Assert(t, lockExists(repo, t, id1) == false, - "stale lock still exists after RemoveStaleLocks was called") - rtest.Assert(t, lockExists(repo, t, id2) == true, - "non-stale lock was removed by RemoveStaleLocks") - rtest.Assert(t, lockExists(repo, t, id3) == false, - "stale lock still exists after RemoveStaleLocks was called") - rtest.Assert(t, processed == 2, - "number of locks removed does not match: expected %d, got %d", - 2, processed) - - rtest.OK(t, removeLock(repo, id2)) -} - -func TestRemoveAllLocks(t *testing.T) { - repo := repository.TestRepository(t) - - id1, err := createFakeLock(repo, time.Now().Add(-time.Hour), os.Getpid()) - rtest.OK(t, err) - - id2, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid()) - rtest.OK(t, err) - - id3, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid()+500000) - rtest.OK(t, err) - - processed, err := restic.RemoveAllLocks(context.TODO(), repo) - rtest.OK(t, err) - - rtest.Assert(t, lockExists(repo, t, id1) == false, - "lock still exists after RemoveAllLocks was called") - rtest.Assert(t, lockExists(repo, t, id2) == false, - "lock still exists after RemoveAllLocks was called") - rtest.Assert(t, lockExists(repo, t, id3) == false, - "lock still exists after RemoveAllLocks was called") - rtest.Assert(t, processed == 3, - "number of locks removed does not match: expected %d, got %d", - 3, processed) -} - func checkSingleLock(t *testing.T, repo restic.Lister) restic.ID { t.Helper() var lockID *restic.ID @@ -279,7 +199,7 @@ func testLockRefresh(t *testing.T, refresh func(lock *restic.Lock) error) { repo := repository.TestRepository(t) restic.TestSetLockTimeout(t, 5*time.Millisecond) - lock, err := restic.NewLock(context.TODO(), repo) + lock, err := repository.TestNewLock(t, repo, false) rtest.OK(t, err) time0 := lock.Time @@ -312,10 +232,10 @@ func TestLockRefreshStale(t *testing.T) { } func TestLockRefreshStaleMissing(t *testing.T) { - repo, be := repository.TestRepositoryWithVersion(t, 0) + repo, _, be := repository.TestRepositoryWithVersion(t, 0) restic.TestSetLockTimeout(t, 5*time.Millisecond) - lock, err := restic.NewLock(context.TODO(), repo) + lock, err := repository.TestNewLock(t, repo, false) rtest.OK(t, err) lockID := checkSingleLock(t, repo) diff --git a/internal/restic/parallel.go b/internal/restic/parallel.go index 0c2215325..1c56f6848 100644 --- a/internal/restic/parallel.go +++ b/internal/restic/parallel.go @@ -54,7 +54,7 @@ func ParallelList(ctx context.Context, r Lister, t FileType, parallelism uint, f // ParallelRemove deletes the given fileList of fileType in parallel // if callback returns an error, then it will abort. -func ParallelRemove(ctx context.Context, repo RemoverUnpacked, fileList IDSet, fileType FileType, report func(id ID, err error) error, bar *progress.Counter) error { +func ParallelRemove[FT FileTypes](ctx context.Context, repo RemoverUnpacked[FT], fileList IDSet, fileType FT, report func(id ID, err error) error, bar *progress.Counter) error { fileChan := make(chan ID) wg, ctx := errgroup.WithContext(ctx) wg.Go(func() error { diff --git a/internal/restic/repository.go b/internal/restic/repository.go index b18b036a7..07ef9cbc0 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -57,14 +57,16 @@ type Repository interface { LoadRaw(ctx context.Context, t FileType, id ID) (data []byte, err error) // LoadUnpacked loads and decrypts the file with the given type and ID. LoadUnpacked(ctx context.Context, t FileType, id ID) (data []byte, err error) - SaveUnpacked(ctx context.Context, t FileType, buf []byte) (ID, error) + SaveUnpacked(ctx context.Context, t WriteableFileType, buf []byte) (ID, error) // RemoveUnpacked removes a file from the repository. This will eventually be restricted to deleting only snapshots. - RemoveUnpacked(ctx context.Context, t FileType, id ID) error + RemoveUnpacked(ctx context.Context, t WriteableFileType, id ID) error } type FileType = backend.FileType -// These are the different data types a backend can store. +// These are the different data types a backend can store. Only filetypes contained +// in the `WriteableFileType` subset can be modified via the Repository interface. +// All other filetypes are considered internal datastructures of the Repository. const ( PackFile FileType = backend.PackFile KeyFile FileType = backend.KeyFile @@ -74,6 +76,26 @@ const ( ConfigFile FileType = backend.ConfigFile ) +type WriteableFileType backend.FileType + +// These are the different data types that can be modified via SaveUnpacked or RemoveUnpacked. +const ( + WriteableSnapshotFile WriteableFileType = WriteableFileType(SnapshotFile) +) + +func (w *WriteableFileType) ToFileType() FileType { + switch *w { + case WriteableSnapshotFile: + return SnapshotFile + default: + panic("invalid WriteableFileType") + } +} + +type FileTypes interface { + FileType | WriteableFileType +} + // LoaderUnpacked allows loading a blob not stored in a pack file type LoaderUnpacked interface { // Connections returns the maximum number of concurrent backend operations @@ -82,22 +104,22 @@ type LoaderUnpacked interface { } // SaverUnpacked allows saving a blob not stored in a pack file -type SaverUnpacked interface { +type SaverUnpacked[FT FileTypes] interface { // Connections returns the maximum number of concurrent backend operations Connections() uint - SaveUnpacked(ctx context.Context, t FileType, buf []byte) (ID, error) + SaveUnpacked(ctx context.Context, t FT, buf []byte) (ID, error) } // RemoverUnpacked allows removing an unpacked blob -type RemoverUnpacked interface { +type RemoverUnpacked[FT FileTypes] interface { // Connections returns the maximum number of concurrent backend operations Connections() uint - RemoveUnpacked(ctx context.Context, t FileType, id ID) error + RemoveUnpacked(ctx context.Context, t FT, id ID) error } -type SaverRemoverUnpacked interface { - SaverUnpacked - RemoverUnpacked +type SaverRemoverUnpacked[FT FileTypes] interface { + SaverUnpacked[FT] + RemoverUnpacked[FT] } type PackBlobs struct { @@ -126,10 +148,10 @@ type ListerLoaderUnpacked interface { LoaderUnpacked } -type Unpacked interface { +type Unpacked[FT FileTypes] interface { ListerLoaderUnpacked - SaverUnpacked - RemoverUnpacked + SaverUnpacked[FT] + RemoverUnpacked[FT] } type ListBlobser interface { diff --git a/internal/restic/snapshot.go b/internal/restic/snapshot.go index 39ed80627..f9cdf4daf 100644 --- a/internal/restic/snapshot.go +++ b/internal/restic/snapshot.go @@ -90,8 +90,8 @@ func LoadSnapshot(ctx context.Context, loader LoaderUnpacked, id ID) (*Snapshot, } // SaveSnapshot saves the snapshot sn and returns its ID. -func SaveSnapshot(ctx context.Context, repo SaverUnpacked, sn *Snapshot) (ID, error) { - return SaveJSONUnpacked(ctx, repo, SnapshotFile, sn) +func SaveSnapshot(ctx context.Context, repo SaverUnpacked[WriteableFileType], sn *Snapshot) (ID, error) { + return SaveJSONUnpacked(ctx, repo, WriteableSnapshotFile, sn) } // ForAllSnapshots reads all snapshots in parallel and calls the diff --git a/internal/restic/snapshot_test.go b/internal/restic/snapshot_test.go index 9099c8b5f..68016287a 100644 --- a/internal/restic/snapshot_test.go +++ b/internal/restic/snapshot_test.go @@ -32,7 +32,7 @@ func TestLoadJSONUnpacked(t *testing.T) { } func testLoadJSONUnpacked(t *testing.T, version uint) { - repo, _ := repository.TestRepositoryWithVersion(t, version) + repo, _, _ := repository.TestRepositoryWithVersion(t, version) // archive a snapshot sn := restic.Snapshot{} diff --git a/internal/restic/tree_test.go b/internal/restic/tree_test.go index 07ca254f1..5c9c0739c 100644 --- a/internal/restic/tree_test.go +++ b/internal/restic/tree_test.go @@ -184,7 +184,7 @@ func testLoadTree(t *testing.T, version uint) { } // archive a few files - repo, _ := repository.TestRepositoryWithVersion(t, version) + repo, _, _ := repository.TestRepositoryWithVersion(t, version) sn := archiver.TestSnapshot(t, repo, rtest.BenchArchiveDirectory, nil) rtest.OK(t, repo.Flush(context.Background())) @@ -202,7 +202,7 @@ func benchmarkLoadTree(t *testing.B, version uint) { } // archive a few files - repo, _ := repository.TestRepositoryWithVersion(t, version) + repo, _, _ := repository.TestRepositoryWithVersion(t, version) sn := archiver.TestSnapshot(t, repo, rtest.BenchArchiveDirectory, nil) rtest.OK(t, repo.Flush(context.Background()))