Merge pull request #5168 from MichaelEischer/restrict-repository-unpacked

repository: restrict SaveUnpacked and RemoveUnpacked
This commit is contained in:
Michael Eischer 2025-01-13 22:49:44 +01:00 committed by GitHub
commit e6f9cfb8c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 374 additions and 311 deletions

View File

@ -304,7 +304,7 @@ func (opts BackupOptions) Check(gopts GlobalOptions, args []string) error {
// from being saved in a snapshot based on path only
func collectRejectByNameFuncs(opts BackupOptions, repo *repository.Repository) (fs []archiver.RejectByNameFunc, err error) {
// exclude restic cache
if repo.Cache != nil {
if repo.Cache() != nil {
f, err := rejectResticCache(repo)
if err != nil {
return nil, err

View File

@ -304,7 +304,7 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
if len(removeSnIDs) > 0 {
if !opts.DryRun {
bar := printer.NewCounter("files deleted")
err := restic.ParallelRemove(ctx, repo, removeSnIDs, restic.SnapshotFile, func(id restic.ID, err error) error {
err := restic.ParallelRemove(ctx, repo, removeSnIDs, restic.WriteableSnapshotFile, func(id restic.ID, err error) error {
if err != nil {
printer.E("unable to remove %v/%v from the repository\n", restic.SnapshotFile, id)
} else {

View File

@ -171,7 +171,7 @@ func runPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, term
}
func runPruneWithRepo(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo *repository.Repository, ignoreSnapshots restic.IDSet, term *termstatus.Terminal) error {
if repo.Cache == nil {
if repo.Cache() == nil {
Print("warning: running prune without a cache, this may be very slow!\n")
}

View File

@ -168,7 +168,7 @@ func runRecover(ctx context.Context, gopts GlobalOptions) error {
}
func createSnapshot(ctx context.Context, name, hostname string, tags []string, repo restic.SaverUnpacked, tree *restic.ID) error {
func createSnapshot(ctx context.Context, name, hostname string, tags []string, repo restic.SaverUnpacked[restic.WriteableFileType], tree *restic.ID) error {
sn, err := restic.NewSnapshot([]string{name}, tags, hostname, time.Now())
if err != nil {
return errors.Fatalf("unable to save snapshot: %v", err)

View File

@ -194,7 +194,7 @@ func filterAndReplaceSnapshot(ctx context.Context, repo restic.Repository, sn *r
if dryRun {
Verbosef("would delete empty snapshot\n")
} else {
if err = repo.RemoveUnpacked(ctx, restic.SnapshotFile, *sn.ID()); err != nil {
if err = repo.RemoveUnpacked(ctx, restic.WriteableSnapshotFile, *sn.ID()); err != nil {
return false, err
}
debug.Log("removed empty snapshot %v", sn.ID())
@ -253,7 +253,7 @@ func filterAndReplaceSnapshot(ctx context.Context, repo restic.Repository, sn *r
Verbosef("saved new snapshot %v\n", id.Str())
if forget {
if err = repo.RemoveUnpacked(ctx, restic.SnapshotFile, *sn.ID()); err != nil {
if err = repo.RemoveUnpacked(ctx, restic.WriteableSnapshotFile, *sn.ID()); err != nil {
return false, err
}
debug.Log("removed old snapshot %v", sn.ID())

View File

@ -90,7 +90,7 @@ func changeTags(ctx context.Context, repo *repository.Repository, sn *restic.Sna
debug.Log("new snapshot saved as %v", id)
// Remove the old snapshot.
if err = repo.RemoveUnpacked(ctx, restic.SnapshotFile, *sn.ID()); err != nil {
if err = repo.RemoveUnpacked(ctx, restic.WriteableSnapshotFile, *sn.ID()); err != nil {
return false, err
}

View File

@ -3,7 +3,7 @@ package main
import (
"context"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/repository"
"github.com/spf13/cobra"
)
@ -45,9 +45,9 @@ func runUnlock(ctx context.Context, opts UnlockOptions, gopts GlobalOptions) err
return err
}
fn := restic.RemoveStaleLocks
fn := repository.RemoveStaleLocks
if opts.RemoveAll {
fn = restic.RemoveAllLocks
fn = repository.RemoveAllLocks
}
processed, err := fn(ctx, repo)

View File

@ -11,12 +11,12 @@ import (
// rejectResticCache returns a RejectByNameFunc that rejects the restic cache
// directory (if set).
func rejectResticCache(repo *repository.Repository) (archiver.RejectByNameFunc, error) {
if repo.Cache == nil {
if repo.Cache() == nil {
return func(string) bool {
return false
}, nil
}
cacheBase := repo.Cache.BaseDir()
cacheBase := repo.Cache().BaseDir()
if cacheBase == "" {
return nil, errors.New("cacheBase is empty string")

View File

@ -275,17 +275,30 @@ func listTreePacks(gopts GlobalOptions, t *testing.T) restic.IDSet {
return treePacks
}
func captureBackend(gopts *GlobalOptions) func() backend.Backend {
var be backend.Backend
gopts.backendTestHook = func(r backend.Backend) (backend.Backend, error) {
be = r
return r, nil
}
return func() backend.Backend {
return be
}
}
func removePacks(gopts GlobalOptions, t testing.TB, remove restic.IDSet) {
ctx, r, unlock, err := openWithExclusiveLock(context.TODO(), gopts, false)
be := captureBackend(&gopts)
ctx, _, unlock, err := openWithExclusiveLock(context.TODO(), gopts, false)
rtest.OK(t, err)
defer unlock()
for id := range remove {
rtest.OK(t, r.RemoveUnpacked(ctx, restic.PackFile, id))
rtest.OK(t, be().Remove(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()}))
}
}
func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, removeTreePacks bool) {
be := captureBackend(&gopts)
ctx, r, unlock, err := openWithExclusiveLock(context.TODO(), gopts, false)
rtest.OK(t, err)
defer unlock()
@ -305,7 +318,7 @@ func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, rem
if treePacks.Has(id) != removeTreePacks || keep.Has(id) {
return nil
}
return r.RemoveUnpacked(ctx, restic.PackFile, id)
return be().Remove(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()})
}))
}

View File

@ -74,7 +74,7 @@ type ToNoder interface {
type archiverRepo interface {
restic.Loader
restic.BlobSaver
restic.SaverUnpacked
restic.SaverUnpacked[restic.WriteableFileType]
Config() restic.Config
StartPackUploader(ctx context.Context, wg *errgroup.Group)

View File

@ -145,11 +145,11 @@ func TestUnreferencedPack(t *testing.T) {
}
func TestUnreferencedBlobs(t *testing.T) {
repo, _, cleanup := repository.TestFromFixture(t, checkerTestData)
repo, be, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup()
snapshotID := restic.TestParseID("51d249d28815200d59e4be7b3f21a157b864dc343353df9d8e498220c2499b02")
test.OK(t, repo.RemoveUnpacked(context.TODO(), restic.SnapshotFile, snapshotID))
test.OK(t, be.Remove(context.TODO(), backend.Handle{Type: restic.SnapshotFile, Name: snapshotID.String()}))
unusedBlobsBySnapshot := restic.BlobHandles{
restic.TestParseHandle("58c748bbe2929fdf30c73262bd8313fe828f8925b05d1d4a87fe109082acb849", restic.DataBlob),
@ -334,7 +334,7 @@ func (b *errorOnceBackend) Load(ctx context.Context, h backend.Handle, length in
}
func TestCheckerModifiedData(t *testing.T) {
repo, be := repository.TestRepositoryWithVersion(t, 0)
repo, _, be := repository.TestRepositoryWithVersion(t, 0)
sn := archiver.TestSnapshot(t, repo, ".", nil)
t.Logf("archived as %v", sn.ID().Str())

View File

@ -8,7 +8,7 @@ import (
)
func TestUpgradeRepoV2(t *testing.T) {
repo, _ := repository.TestRepositoryWithVersion(t, 1)
repo, _, _ := repository.TestRepositoryWithVersion(t, 1)
if repo.Config().Version != 1 {
t.Fatal("test repo has wrong version")
}

View File

@ -40,9 +40,9 @@ func (e *partialReadError) Error() string {
func CheckPack(ctx context.Context, r *Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error {
err := checkPackInner(ctx, r, id, blobs, size, bufRd, dec)
if err != nil {
if r.Cache != nil {
if r.cache != nil {
// ignore error as there's not much we can do here
_ = r.Cache.Forget(backend.Handle{Type: restic.PackFile, Name: id.String()})
_ = r.cache.Forget(backend.Handle{Type: restic.PackFile, Name: id.String()})
}
// retry pack verification to detect transient errors

View File

@ -18,7 +18,7 @@ func FuzzSaveLoadBlob(f *testing.F) {
}
id := restic.Hash(blob)
repo, _ := TestRepositoryWithVersion(t, 2)
repo, _, _ := TestRepositoryWithVersion(t, 2)
var wg errgroup.Group
repo.StartPackUploader(context.TODO(), &wg)

View File

@ -351,7 +351,7 @@ func (idx *Index) Encode(w io.Writer) error {
}
// SaveIndex saves an index in the repository.
func (idx *Index) SaveIndex(ctx context.Context, repo restic.SaverUnpacked) (restic.ID, error) {
func (idx *Index) SaveIndex(ctx context.Context, repo restic.SaverUnpacked[restic.FileType]) (restic.ID, error) {
buf := bytes.NewBuffer(nil)
err := idx.Encode(buf)

View File

@ -321,7 +321,7 @@ type MasterIndexRewriteOpts struct {
// This is used by repair index to only rewrite and delete the old indexes.
//
// Must not be called concurrently to any other MasterIndex operation.
func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, excludePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, opts MasterIndexRewriteOpts) error {
func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked[restic.FileType], excludePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, opts MasterIndexRewriteOpts) error {
for _, idx := range mi.idx {
if !idx.Final() {
panic("internal error - index must be saved before calling MasterIndex.Rewrite")
@ -499,7 +499,7 @@ func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, exclud
// It is only intended for use by prune with the UnsafeRecovery option.
//
// Must not be called concurrently to any other MasterIndex operation.
func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, p *progress.Counter) error {
func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemoverUnpacked[restic.FileType], excludePacks restic.IDSet, p *progress.Counter) error {
p.SetMax(uint64(len(mi.Packs(excludePacks))))
mi.idxMutex.Lock()
@ -574,7 +574,7 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove
}
// saveIndex saves all indexes in the backend.
func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked, indexes ...*Index) error {
func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked[restic.FileType], indexes ...*Index) error {
for i, idx := range indexes {
debug.Log("Saving index %d", i)
@ -590,12 +590,12 @@ func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked, in
}
// SaveIndex saves all new indexes in the backend.
func (mi *MasterIndex) SaveIndex(ctx context.Context, r restic.SaverUnpacked) error {
func (mi *MasterIndex) SaveIndex(ctx context.Context, r restic.SaverUnpacked[restic.FileType]) error {
return mi.saveIndex(ctx, r, mi.finalizeNotFinalIndexes()...)
}
// SaveFullIndex saves all full indexes in the backend.
func (mi *MasterIndex) SaveFullIndex(ctx context.Context, r restic.SaverUnpacked) error {
func (mi *MasterIndex) SaveFullIndex(ctx context.Context, r restic.SaverUnpacked[restic.FileType]) error {
return mi.saveIndex(ctx, r, mi.finalizeFullIndexes()...)
}

View File

@ -346,13 +346,13 @@ var (
depth = 3
)
func createFilledRepo(t testing.TB, snapshots int, version uint) restic.Repository {
repo, _ := repository.TestRepositoryWithVersion(t, version)
func createFilledRepo(t testing.TB, snapshots int, version uint) (restic.Repository, restic.Unpacked[restic.FileType]) {
repo, unpacked, _ := repository.TestRepositoryWithVersion(t, version)
for i := 0; i < snapshots; i++ {
restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(i)*time.Second), depth)
}
return repo
return repo, unpacked
}
func TestIndexSave(t *testing.T) {
@ -362,15 +362,15 @@ func TestIndexSave(t *testing.T) {
func testIndexSave(t *testing.T, version uint) {
for _, test := range []struct {
name string
saver func(idx *index.MasterIndex, repo restic.Repository) error
saver func(idx *index.MasterIndex, repo restic.Unpacked[restic.FileType]) error
}{
{"rewrite no-op", func(idx *index.MasterIndex, repo restic.Repository) error {
{"rewrite no-op", func(idx *index.MasterIndex, repo restic.Unpacked[restic.FileType]) error {
return idx.Rewrite(context.TODO(), repo, nil, nil, nil, index.MasterIndexRewriteOpts{})
}},
{"rewrite skip-all", func(idx *index.MasterIndex, repo restic.Repository) error {
{"rewrite skip-all", func(idx *index.MasterIndex, repo restic.Unpacked[restic.FileType]) error {
return idx.Rewrite(context.TODO(), repo, nil, restic.NewIDSet(), nil, index.MasterIndexRewriteOpts{})
}},
{"SaveFallback", func(idx *index.MasterIndex, repo restic.Repository) error {
{"SaveFallback", func(idx *index.MasterIndex, repo restic.Unpacked[restic.FileType]) error {
err := restic.ParallelRemove(context.TODO(), repo, idx.IDs(), restic.IndexFile, nil, nil)
if err != nil {
return nil
@ -379,7 +379,7 @@ func testIndexSave(t *testing.T, version uint) {
}},
} {
t.Run(test.name, func(t *testing.T) {
repo := createFilledRepo(t, 3, version)
repo, unpacked := createFilledRepo(t, 3, version)
idx := index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
@ -388,7 +388,7 @@ func testIndexSave(t *testing.T, version uint) {
blobs[pb] = struct{}{}
}))
rtest.OK(t, test.saver(idx, repo))
rtest.OK(t, test.saver(idx, unpacked))
idx = index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
@ -411,7 +411,7 @@ func TestIndexSavePartial(t *testing.T) {
}
func testIndexSavePartial(t *testing.T, version uint) {
repo := createFilledRepo(t, 3, version)
repo, unpacked := createFilledRepo(t, 3, version)
// capture blob list before adding fourth snapshot
idx := index.NewMasterIndex()
@ -424,14 +424,14 @@ func testIndexSavePartial(t *testing.T, version uint) {
// add+remove new snapshot and track its pack files
packsBefore := listPacks(t, repo)
sn := restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(4)*time.Second), depth)
rtest.OK(t, repo.RemoveUnpacked(context.TODO(), restic.SnapshotFile, *sn.ID()))
rtest.OK(t, repo.RemoveUnpacked(context.TODO(), restic.WriteableSnapshotFile, *sn.ID()))
packsAfter := listPacks(t, repo)
newPacks := packsAfter.Sub(packsBefore)
// rewrite index and remove pack files of new snapshot
idx = index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
rtest.OK(t, idx.Rewrite(context.TODO(), repo, newPacks, nil, nil, index.MasterIndexRewriteOpts{}))
rtest.OK(t, idx.Rewrite(context.TODO(), unpacked, newPacks, nil, nil, index.MasterIndexRewriteOpts{}))
// check blobs
idx = index.NewMasterIndex()
@ -446,7 +446,7 @@ func testIndexSavePartial(t *testing.T, version uint) {
rtest.Equals(t, 0, len(blobs), "saved index is missing blobs")
// remove pack files to make check happy
rtest.OK(t, restic.ParallelRemove(context.TODO(), repo, newPacks, restic.PackFile, nil, nil))
rtest.OK(t, restic.ParallelRemove(context.TODO(), unpacked, newPacks, restic.PackFile, nil, nil))
checker.TestCheckRepo(t, repo, false)
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/restic/restic/internal/backend"
@ -42,13 +43,7 @@ func Lock(ctx context.Context, repo *Repository, exclusive bool, retryLock time.
// Lock wraps the ctx such that it is cancelled when the repository is unlocked
// cancelling the original context also stops the lock refresh
func (l *locker) Lock(ctx context.Context, repo *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
lockFn := restic.NewLock
if exclusive {
lockFn = restic.NewExclusiveLock
}
func (l *locker) Lock(ctx context.Context, r *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
var lock *restic.Lock
var err error
@ -56,9 +51,11 @@ func (l *locker) Lock(ctx context.Context, repo *Repository, exclusive bool, ret
retryMessagePrinted := false
retryTimeout := time.After(retryLock)
repo := &internalRepository{r}
retryLoop:
for {
lock, err = lockFn(ctx, repo)
lock, err = restic.NewLock(ctx, repo, exclusive)
if err != nil && restic.IsAlreadyLocked(err) {
if !retryMessagePrinted {
@ -75,7 +72,7 @@ retryLoop:
case <-retryTimeout:
debug.Log("repo already locked, timeout expired")
// Last lock attempt
lock, err = lockFn(ctx, repo)
lock, err = restic.NewLock(ctx, repo, exclusive)
break retryLoop
case <-retrySleepCh:
retrySleep = minDuration(retrySleep*2, l.retrySleepMax)
@ -272,3 +269,39 @@ func (l *Unlocker) Unlock() {
l.info.cancel()
l.info.refreshWG.Wait()
}
// RemoveStaleLocks deletes all locks detected as stale from the repository.
func RemoveStaleLocks(ctx context.Context, repo *Repository) (uint, error) {
var processed uint
err := restic.ForAllLocks(ctx, repo, nil, func(id restic.ID, lock *restic.Lock, err error) error {
if err != nil {
// ignore locks that cannot be loaded
debug.Log("ignore lock %v: %v", id, err)
return nil
}
if lock.Stale() {
err = (&internalRepository{repo}).RemoveUnpacked(ctx, restic.LockFile, id)
if err == nil {
processed++
}
return err
}
return nil
})
return processed, err
}
// RemoveAllLocks removes all locks forcefully.
func RemoveAllLocks(ctx context.Context, repo *Repository) (uint, error) {
var processed uint32
err := restic.ParallelList(ctx, repo, restic.LockFile, repo.Connections(), func(ctx context.Context, id restic.ID, _ int64) error {
err := (&internalRepository{repo}).RemoveUnpacked(ctx, restic.LockFile, id)
if err == nil {
atomic.AddUint32(&processed, 1)
}
return err
})
return uint(processed), err
}

View File

@ -3,6 +3,7 @@ package repository
import (
"context"
"fmt"
"os"
"runtime"
"strings"
"sync"
@ -301,3 +302,83 @@ func TestLockWaitSuccess(t *testing.T) {
rtest.OK(t, err)
lock.Unlock()
}
func createFakeLock(repo *Repository, t time.Time, pid int) (restic.ID, error) {
hostname, err := os.Hostname()
if err != nil {
return restic.ID{}, err
}
newLock := &restic.Lock{Time: t, PID: pid, Hostname: hostname}
return restic.SaveJSONUnpacked(context.TODO(), &internalRepository{repo}, restic.LockFile, &newLock)
}
func lockExists(repo restic.Lister, t testing.TB, lockID restic.ID) bool {
var exists bool
rtest.OK(t, repo.List(context.TODO(), restic.LockFile, func(id restic.ID, size int64) error {
if id == lockID {
exists = true
}
return nil
}))
return exists
}
func removeLock(repo *Repository, id restic.ID) error {
return (&internalRepository{repo}).RemoveUnpacked(context.TODO(), restic.LockFile, id)
}
func TestLockWithStaleLock(t *testing.T) {
repo := TestRepository(t)
id1, err := createFakeLock(repo, time.Now().Add(-time.Hour), os.Getpid())
rtest.OK(t, err)
id2, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid())
rtest.OK(t, err)
id3, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid()+500000)
rtest.OK(t, err)
processed, err := RemoveStaleLocks(context.TODO(), repo)
rtest.OK(t, err)
rtest.Assert(t, lockExists(repo, t, id1) == false,
"stale lock still exists after RemoveStaleLocks was called")
rtest.Assert(t, lockExists(repo, t, id2) == true,
"non-stale lock was removed by RemoveStaleLocks")
rtest.Assert(t, lockExists(repo, t, id3) == false,
"stale lock still exists after RemoveStaleLocks was called")
rtest.Assert(t, processed == 2,
"number of locks removed does not match: expected %d, got %d",
2, processed)
rtest.OK(t, removeLock(repo, id2))
}
func TestRemoveAllLocks(t *testing.T) {
repo := TestRepository(t)
id1, err := createFakeLock(repo, time.Now().Add(-time.Hour), os.Getpid())
rtest.OK(t, err)
id2, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid())
rtest.OK(t, err)
id3, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid()+500000)
rtest.OK(t, err)
processed, err := RemoveAllLocks(context.TODO(), repo)
rtest.OK(t, err)
rtest.Assert(t, lockExists(repo, t, id1) == false,
"lock still exists after RemoveAllLocks was called")
rtest.Assert(t, lockExists(repo, t, id2) == false,
"lock still exists after RemoveAllLocks was called")
rtest.Assert(t, lockExists(repo, t, id3) == false,
"lock still exists after RemoveAllLocks was called")
rtest.Assert(t, processed == 3,
"number of locks removed does not match: expected %d, got %d",
3, processed)
}

View File

@ -190,5 +190,5 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *packe
r.idx.StorePack(id, p.Packer.Blobs())
// Save index if full
return r.idx.SaveFullIndex(ctx, r)
return r.idx.SaveFullIndex(ctx, &internalRepository{r})
}

View File

@ -544,7 +544,7 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er
// unreferenced packs can be safely deleted first
if len(plan.removePacksFirst) != 0 {
printer.P("deleting unreferenced packs\n")
_ = deleteFiles(ctx, true, repo, plan.removePacksFirst, restic.PackFile, printer)
_ = deleteFiles(ctx, true, &internalRepository{repo}, plan.removePacksFirst, restic.PackFile, printer)
// forget unused data
plan.removePacksFirst = nil
}
@ -588,7 +588,7 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er
if plan.opts.UnsafeRecovery {
printer.P("deleting index files\n")
indexFiles := repo.idx.IDs()
err := deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer)
err := deleteFiles(ctx, false, &internalRepository{repo}, indexFiles, restic.IndexFile, printer)
if err != nil {
return errors.Fatalf("%s", err)
}
@ -601,14 +601,14 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er
if len(plan.removePacks) != 0 {
printer.P("removing %d old packs\n", len(plan.removePacks))
_ = deleteFiles(ctx, true, repo, plan.removePacks, restic.PackFile, printer)
_ = deleteFiles(ctx, true, &internalRepository{repo}, plan.removePacks, restic.PackFile, printer)
}
if ctx.Err() != nil {
return ctx.Err()
}
if plan.opts.UnsafeRecovery {
err := repo.idx.SaveFallback(ctx, repo, plan.ignorePacks, printer.NewCounter("packs processed"))
err := repo.idx.SaveFallback(ctx, &internalRepository{repo}, plan.ignorePacks, printer.NewCounter("packs processed"))
if err != nil {
return errors.Fatalf("%s", err)
}
@ -623,7 +623,7 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er
// deleteFiles deletes the given fileList of fileType in parallel
// if ignoreError=true, it will print a warning if there was an error, else it will abort.
func deleteFiles(ctx context.Context, ignoreError bool, repo restic.RemoverUnpacked, fileList restic.IDSet, fileType restic.FileType, printer progress.Printer) error {
func deleteFiles(ctx context.Context, ignoreError bool, repo restic.RemoverUnpacked[restic.FileType], fileList restic.IDSet, fileType restic.FileType, printer progress.Printer) error {
bar := printer.NewCounter("files deleted")
defer bar.Done()

View File

@ -20,7 +20,7 @@ func testPrune(t *testing.T, opts repository.PruneOptions, errOnUnused bool) {
random := rand.New(rand.NewSource(seed))
t.Logf("rand initialized with seed %d", seed)
repo, be := repository.TestRepositoryWithVersion(t, 0)
repo, _, be := repository.TestRepositoryWithVersion(t, 0)
createRandomBlobs(t, random, repo, 4, 0.5, true)
createRandomBlobs(t, random, repo, 5, 0.5, true)
keep, _ := selectBlobs(t, random, repo, 0.5)

View File

@ -21,10 +21,10 @@ func (r *Repository) LoadRaw(ctx context.Context, t restic.FileType, id restic.I
// retry loading damaged data only once. If a file fails to download correctly
// the second time, then it is likely corrupted at the backend.
if h.Type != backend.ConfigFile && id != restic.Hash(buf) {
if r.Cache != nil {
if r.cache != nil {
// Cleanup cache to make sure it's not the cached copy that is broken.
// Ignore error as there's not much we can do in that case.
_ = r.Cache.Forget(h)
_ = r.cache.Forget(h)
}
buf, err = loadRaw(ctx, r.be, h)

View File

@ -159,14 +159,14 @@ func findPacksForBlobs(t *testing.T, repo restic.Repository, blobs restic.BlobSe
return packs
}
func repack(t *testing.T, repo restic.Repository, packs restic.IDSet, blobs restic.BlobSet) {
func repack(t *testing.T, repo restic.Repository, be backend.Backend, packs restic.IDSet, blobs restic.BlobSet) {
repackedBlobs, err := repository.Repack(context.TODO(), repo, repo, packs, blobs, nil)
if err != nil {
t.Fatal(err)
}
for id := range repackedBlobs {
err = repo.RemoveUnpacked(context.TODO(), restic.PackFile, id)
err = be.Remove(context.TODO(), backend.Handle{Type: restic.PackFile, Name: id.String()})
if err != nil {
t.Fatal(err)
}
@ -186,7 +186,7 @@ func TestRepack(t *testing.T) {
}
func testRepack(t *testing.T, version uint) {
repo, _ := repository.TestRepositoryWithVersion(t, version)
repo, _, be := repository.TestRepositoryWithVersion(t, version)
seed := time.Now().UnixNano()
random := rand.New(rand.NewSource(seed))
@ -199,7 +199,7 @@ func testRepack(t *testing.T, version uint) {
packsBefore := listPacks(t, repo)
// Running repack on empty ID sets should not do anything at all.
repack(t, repo, nil, nil)
repack(t, repo, be, nil, nil)
packsAfter := listPacks(t, repo)
@ -212,7 +212,7 @@ func testRepack(t *testing.T, version uint) {
removePacks := findPacksForBlobs(t, repo, removeBlobs)
repack(t, repo, removePacks, keepBlobs)
repack(t, repo, be, removePacks, keepBlobs)
rebuildAndReloadIndex(t, repo)
packsAfter = listPacks(t, repo)
@ -261,8 +261,8 @@ func (r oneConnectionRepo) Connections() uint {
}
func testRepackCopy(t *testing.T, version uint) {
repo, _ := repository.TestRepositoryWithVersion(t, version)
dstRepo, _ := repository.TestRepositoryWithVersion(t, version)
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
dstRepo, _, _ := repository.TestRepositoryWithVersion(t, version)
// test with minimal possible connection count
repoWrapped := &oneConnectionRepo{repo}

View File

@ -123,7 +123,7 @@ func rewriteIndexFiles(ctx context.Context, repo *Repository, removePacks restic
printer.P("rebuilding index\n")
bar := printer.NewCounter("indexes processed")
return repo.idx.Rewrite(ctx, repo, removePacks, oldIndexes, extraObsolete, index.MasterIndexRewriteOpts{
return repo.idx.Rewrite(ctx, &internalRepository{repo}, removePacks, oldIndexes, extraObsolete, index.MasterIndexRewriteOpts{
SaveProgress: bar,
DeleteProgress: func() *progress.Counter {
return printer.NewCounter("old indexes deleted")

View File

@ -23,7 +23,7 @@ func testRebuildIndex(t *testing.T, readAllPacks bool, damage func(t *testing.T,
random := rand.New(rand.NewSource(seed))
t.Logf("rand initialized with seed %d", seed)
repo, be := repository.TestRepositoryWithVersion(t, 0)
repo, _, be := repository.TestRepositoryWithVersion(t, 0)
createRandomBlobs(t, random, repo, 4, 0.5, true)
createRandomBlobs(t, random, repo, 5, 0.5, true)
indexes := listIndex(t, repo)

View File

@ -65,7 +65,7 @@ func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printe
printer.P("removing salvaged pack files")
// if we fail to delete the damaged pack files, then prune will remove them later on
bar = printer.NewCounter("files deleted")
_ = restic.ParallelRemove(ctx, repo, ids, restic.PackFile, nil, bar)
_ = restic.ParallelRemove(ctx, &internalRepository{repo}, ids, restic.PackFile, nil, bar)
bar.Done()
return nil

View File

@ -38,7 +38,7 @@ type Repository struct {
key *crypto.Key
keyID restic.ID
idx *index.MasterIndex
Cache *cache.Cache
cache *cache.Cache
opts Options
@ -53,6 +53,11 @@ type Repository struct {
dec *zstd.Decoder
}
// internalRepository allows using SaveUnpacked and RemoveUnpacked with all FileTypes
type internalRepository struct {
*Repository
}
type Options struct {
Compression CompressionMode
PackSize uint
@ -149,10 +154,14 @@ func (r *Repository) UseCache(c *cache.Cache) {
return
}
debug.Log("using cache")
r.Cache = c
r.cache = c
r.be = c.Wrap(r.be)
}
func (r *Repository) Cache() *cache.Cache {
return r.cache
}
// SetDryRun sets the repo backend into dry-run mode.
func (r *Repository) SetDryRun() {
r.be = dryrun.New(r.be)
@ -225,15 +234,15 @@ func (r *Repository) LoadBlob(ctx context.Context, t restic.BlobType, id restic.
}
// try cached pack files first
sortCachedPacksFirst(r.Cache, blobs)
sortCachedPacksFirst(r.cache, blobs)
buf, err := r.loadBlob(ctx, blobs, buf)
if err != nil {
if r.Cache != nil {
if r.cache != nil {
for _, blob := range blobs {
h := backend.Handle{Type: restic.PackFile, Name: blob.PackID.String(), IsMetadata: blob.Type.IsMetadata()}
// ignore errors as there's not much we can do here
_ = r.Cache.Forget(h)
_ = r.cache.Forget(h)
}
}
@ -446,7 +455,15 @@ func (r *Repository) decompressUnpacked(p []byte) ([]byte, error) {
// SaveUnpacked encrypts data and stores it in the backend. Returned is the
// storage hash.
func (r *Repository) SaveUnpacked(ctx context.Context, t restic.FileType, buf []byte) (id restic.ID, err error) {
func (r *Repository) SaveUnpacked(ctx context.Context, t restic.WriteableFileType, buf []byte) (id restic.ID, err error) {
return r.saveUnpacked(ctx, t.ToFileType(), buf)
}
func (r *internalRepository) SaveUnpacked(ctx context.Context, t restic.FileType, buf []byte) (id restic.ID, err error) {
return r.Repository.saveUnpacked(ctx, t, buf)
}
func (r *Repository) saveUnpacked(ctx context.Context, t restic.FileType, buf []byte) (id restic.ID, err error) {
p := buf
if t != restic.ConfigFile {
p, err = r.compressUnpacked(p)
@ -507,8 +524,15 @@ func (r *Repository) verifyUnpacked(buf []byte, t restic.FileType, expected []by
return nil
}
func (r *Repository) RemoveUnpacked(ctx context.Context, t restic.FileType, id restic.ID) error {
// TODO prevent everything except removing snapshots for non-repository code
func (r *Repository) RemoveUnpacked(ctx context.Context, t restic.WriteableFileType, id restic.ID) error {
return r.removeUnpacked(ctx, t.ToFileType(), id)
}
func (r *internalRepository) RemoveUnpacked(ctx context.Context, t restic.FileType, id restic.ID) error {
return r.Repository.removeUnpacked(ctx, t, id)
}
func (r *Repository) removeUnpacked(ctx context.Context, t restic.FileType, id restic.ID) error {
return r.be.Remove(ctx, backend.Handle{Type: t, Name: id.String()})
}
@ -518,7 +542,7 @@ func (r *Repository) Flush(ctx context.Context) error {
return err
}
return r.idx.SaveIndex(ctx, r)
return r.idx.SaveIndex(ctx, &internalRepository{r})
}
func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) {
@ -702,14 +726,14 @@ func (r *Repository) createIndexFromPacks(ctx context.Context, packsize map[rest
// prepareCache initializes the local cache. indexIDs is the list of IDs of
// index files still present in the repo.
func (r *Repository) prepareCache() error {
if r.Cache == nil {
if r.cache == nil {
return nil
}
packs := r.idx.Packs(restic.NewIDSet())
// clear old packs
err := r.Cache.Clear(restic.PackFile, packs)
err := r.cache.Clear(restic.PackFile, packs)
if err != nil {
fmt.Fprintf(os.Stderr, "error clearing pack files in cache: %v\n", err)
}
@ -803,7 +827,7 @@ func (r *Repository) init(ctx context.Context, password string, cfg restic.Confi
r.key = key.master
r.keyID = key.ID()
r.setConfig(cfg)
return restic.SaveConfig(ctx, r, cfg)
return restic.SaveConfig(ctx, &internalRepository{r}, cfg)
}
// Key returns the current master key.
@ -835,9 +859,9 @@ func (r *Repository) ListPack(ctx context.Context, id restic.ID, size int64) ([]
entries, hdrSize, err := pack.List(r.Key(), backend.ReaderAt(ctx, r.be, h), size)
if err != nil {
if r.Cache != nil {
if r.cache != nil {
// ignore error as there is not much we can do here
_ = r.Cache.Forget(h)
_ = r.cache.Forget(h)
}
// retry on error

View File

@ -16,6 +16,7 @@ import (
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository/index"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
@ -84,6 +85,53 @@ func BenchmarkSortCachedPacksFirst(b *testing.B) {
}
}
func BenchmarkLoadIndex(b *testing.B) {
BenchmarkAllVersions(b, benchmarkLoadIndex)
}
func benchmarkLoadIndex(b *testing.B, version uint) {
TestUseLowSecurityKDFParameters(b)
repo, _, be := TestRepositoryWithVersion(b, version)
idx := index.NewIndex()
for i := 0; i < 5000; i++ {
idx.StorePack(restic.NewRandomID(), []restic.Blob{
{
BlobHandle: restic.NewRandomBlobHandle(),
Length: 1234,
Offset: 1235,
},
})
}
idx.Finalize()
id, err := idx.SaveIndex(context.TODO(), &internalRepository{repo})
rtest.OK(b, err)
b.Logf("index saved as %v", id.Str())
fi, err := be.Stat(context.TODO(), backend.Handle{Type: restic.IndexFile, Name: id.String()})
rtest.OK(b, err)
b.Logf("filesize is %v", fi.Size)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := loadIndex(context.TODO(), repo, id)
rtest.OK(b, err)
}
}
// loadIndex loads the index id from backend and returns it.
func loadIndex(ctx context.Context, repo restic.LoaderUnpacked, id restic.ID) (*index.Index, error) {
buf, err := repo.LoadUnpacked(ctx, restic.IndexFile, id)
if err != nil {
return nil, err
}
return index.DecodeIndex(buf, id)
}
// buildPackfileWithoutHeader returns a manually built pack file without a header.
func buildPackfileWithoutHeader(blobSizes []int, key *crypto.Key, compress bool) (blobs []restic.Blob, packfile []byte) {
opts := []zstd.EOption{

View File

@ -43,7 +43,7 @@ func testSaveCalculateID(t *testing.T, version uint) {
}
func testSave(t *testing.T, version uint, calculateID bool) {
repo, _ := repository.TestRepositoryWithVersion(t, version)
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
for _, size := range testSizes {
data := make([]byte, size)
@ -86,7 +86,7 @@ func BenchmarkSaveAndEncrypt(t *testing.B) {
}
func benchmarkSaveAndEncrypt(t *testing.B, version uint) {
repo, _ := repository.TestRepositoryWithVersion(t, version)
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
size := 4 << 20 // 4MiB
data := make([]byte, size)
@ -112,7 +112,7 @@ func TestLoadBlob(t *testing.T) {
}
func testLoadBlob(t *testing.T, version uint) {
repo, _ := repository.TestRepositoryWithVersion(t, version)
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
length := 1000000
buf := crypto.NewBlobBuffer(length)
_, err := io.ReadFull(rnd, buf)
@ -168,7 +168,7 @@ func BenchmarkLoadBlob(b *testing.B) {
}
func benchmarkLoadBlob(b *testing.B, version uint) {
repo, _ := repository.TestRepositoryWithVersion(b, version)
repo, _, _ := repository.TestRepositoryWithVersion(b, version)
length := 1000000
buf := crypto.NewBlobBuffer(length)
_, err := io.ReadFull(rnd, buf)
@ -209,7 +209,7 @@ func BenchmarkLoadUnpacked(b *testing.B) {
}
func benchmarkLoadUnpacked(b *testing.B, version uint) {
repo, _ := repository.TestRepositoryWithVersion(b, version)
repo, _, _ := repository.TestRepositoryWithVersion(b, version)
length := 1000000
buf := crypto.NewBlobBuffer(length)
_, err := io.ReadFull(rnd, buf)
@ -217,7 +217,7 @@ func benchmarkLoadUnpacked(b *testing.B, version uint) {
dataID := restic.Hash(buf)
storageID, err := repo.SaveUnpacked(context.TODO(), restic.PackFile, buf)
storageID, err := repo.SaveUnpacked(context.TODO(), restic.WriteableSnapshotFile, buf)
rtest.OK(b, err)
// rtest.OK(b, repo.Flush())
@ -225,7 +225,7 @@ func benchmarkLoadUnpacked(b *testing.B, version uint) {
b.SetBytes(int64(length))
for i := 0; i < b.N; i++ {
data, err := repo.LoadUnpacked(context.TODO(), restic.PackFile, storageID)
data, err := repo.LoadUnpacked(context.TODO(), restic.SnapshotFile, storageID)
rtest.OK(b, err)
// See comment in BenchmarkLoadBlob.
@ -262,7 +262,7 @@ func loadIndex(ctx context.Context, repo restic.LoaderUnpacked, id restic.ID) (*
}
func TestRepositoryLoadUnpackedBroken(t *testing.T) {
repo, be := repository.TestRepositoryWithVersion(t, 0)
repo, _, be := repository.TestRepositoryWithVersion(t, 0)
data := rtest.Random(23, 12345)
id := restic.Hash(data)
@ -309,43 +309,6 @@ func TestRepositoryLoadUnpackedRetryBroken(t *testing.T) {
rtest.OK(t, repo.LoadIndex(context.TODO(), nil))
}
func BenchmarkLoadIndex(b *testing.B) {
repository.BenchmarkAllVersions(b, benchmarkLoadIndex)
}
func benchmarkLoadIndex(b *testing.B, version uint) {
repository.TestUseLowSecurityKDFParameters(b)
repo, be := repository.TestRepositoryWithVersion(b, version)
idx := index.NewIndex()
for i := 0; i < 5000; i++ {
idx.StorePack(restic.NewRandomID(), []restic.Blob{
{
BlobHandle: restic.NewRandomBlobHandle(),
Length: 1234,
Offset: 1235,
},
})
}
idx.Finalize()
id, err := idx.SaveIndex(context.TODO(), repo)
rtest.OK(b, err)
b.Logf("index saved as %v", id.Str())
fi, err := be.Stat(context.TODO(), backend.Handle{Type: restic.IndexFile, Name: id.String()})
rtest.OK(b, err)
b.Logf("filesize is %v", fi.Size)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := loadIndex(context.TODO(), repo, id)
rtest.OK(b, err)
}
}
// saveRandomDataBlobs generates random data blobs and saves them to the repository.
func saveRandomDataBlobs(t testing.TB, repo restic.Repository, num int, sizeMax int) {
var wg errgroup.Group
@ -368,7 +331,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) {
}
func testRepositoryIncrementalIndex(t *testing.T, version uint) {
repo, _ := repository.TestRepositoryWithVersion(t, version)
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
index.IndexFull = func(*index.Index) bool { return true }
@ -453,7 +416,7 @@ func TestListPack(t *testing.T) {
}
func TestNoDoubleInit(t *testing.T) {
r, be := repository.TestRepositoryWithVersion(t, restic.StableRepoVersion)
r, _, be := repository.TestRepositoryWithVersion(t, restic.StableRepoVersion)
repo, err := repository.New(be, repository.Options{})
rtest.OK(t, err)

View File

@ -78,30 +78,31 @@ func TestRepositoryWithBackend(t testing.TB, be backend.Backend, version uint, o
// instead. The directory is not removed, but left there for inspection.
func TestRepository(t testing.TB) *Repository {
t.Helper()
repo, _ := TestRepositoryWithVersion(t, 0)
repo, _, _ := TestRepositoryWithVersion(t, 0)
return repo
}
func TestRepositoryWithVersion(t testing.TB, version uint) (*Repository, backend.Backend) {
func TestRepositoryWithVersion(t testing.TB, version uint) (*Repository, restic.Unpacked[restic.FileType], backend.Backend) {
t.Helper()
dir := os.Getenv("RESTIC_TEST_REPO")
opts := Options{}
var repo *Repository
var be backend.Backend
if dir != "" {
_, err := os.Stat(dir)
if err != nil {
be, err := local.Create(context.TODO(), local.Config{Path: dir})
lbe, err := local.Create(context.TODO(), local.Config{Path: dir})
if err != nil {
t.Fatalf("error creating local backend at %v: %v", dir, err)
}
return TestRepositoryWithBackend(t, be, version, opts)
}
if err == nil {
repo, be = TestRepositoryWithBackend(t, lbe, version, opts)
} else {
t.Logf("directory at %v already exists, using mem backend", dir)
}
} else {
repo, be = TestRepositoryWithBackend(t, nil, version, opts)
}
return TestRepositoryWithBackend(t, nil, version, opts)
return repo, &internalRepository{repo}, be
}
func TestFromFixture(t testing.TB, repoFixture string) (*Repository, backend.Backend, func()) {
@ -156,3 +157,8 @@ func BenchmarkAllVersions(b *testing.B, bench VersionedBenchmark) {
})
}
}
func TestNewLock(t *testing.T, repo *Repository, exclusive bool) (*restic.Lock, error) {
// TODO get rid of this test helper
return restic.NewLock(context.TODO(), &internalRepository{repo}, exclusive)
}

View File

@ -45,7 +45,7 @@ func upgradeRepository(ctx context.Context, repo *Repository) error {
cfg := repo.Config()
cfg.Version = 2
err := restic.SaveConfig(ctx, repo, cfg)
err := restic.SaveConfig(ctx, &internalRepository{repo}, cfg)
if err != nil {
return fmt.Errorf("save new config file failed: %w", err)
}

View File

@ -13,7 +13,7 @@ import (
)
func TestUpgradeRepoV2(t *testing.T) {
repo, _ := TestRepositoryWithVersion(t, 1)
repo, _, _ := TestRepositoryWithVersion(t, 1)
if repo.Config().Version != 1 {
t.Fatal("test repo has wrong version")
}

View File

@ -87,7 +87,7 @@ func LoadConfig(ctx context.Context, r LoaderUnpacked) (Config, error) {
return cfg, nil
}
func SaveConfig(ctx context.Context, r SaverUnpacked, cfg Config) error {
func SaveConfig(ctx context.Context, r SaverUnpacked[FileType], cfg Config) error {
_, err := SaveJSONUnpacked(ctx, r, ConfigFile, cfg)
return err
}

View File

@ -21,7 +21,7 @@ func LoadJSONUnpacked(ctx context.Context, repo LoaderUnpacked, t FileType, id I
// SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the
// backend as type t, without a pack. It returns the storage hash.
func SaveJSONUnpacked(ctx context.Context, repo SaverUnpacked, t FileType, item interface{}) (ID, error) {
func SaveJSONUnpacked[FT FileTypes](ctx context.Context, repo SaverUnpacked[FT], t FT, item interface{}) (ID, error) {
debug.Log("save new blob %v", t)
plaintext, err := json.Marshal(item)
if err != nil {

View File

@ -7,7 +7,6 @@ import (
"os/signal"
"os/user"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
@ -39,7 +38,7 @@ type Lock struct {
UID uint32 `json:"uid,omitempty"`
GID uint32 `json:"gid,omitempty"`
repo Unpacked
repo Unpacked[FileType]
lockID *ID
}
@ -87,20 +86,6 @@ func IsInvalidLock(err error) bool {
var ErrRemovedLock = errors.New("lock file was removed in the meantime")
// NewLock returns a new, non-exclusive lock for the repository. If an
// exclusive lock is already held by another process, it returns an error
// that satisfies IsAlreadyLocked.
func NewLock(ctx context.Context, repo Unpacked) (*Lock, error) {
return newLock(ctx, repo, false)
}
// NewExclusiveLock returns a new, exclusive lock for the repository. If
// another lock (normal and exclusive) is already held by another process,
// it returns an error that satisfies IsAlreadyLocked.
func NewExclusiveLock(ctx context.Context, repo Unpacked) (*Lock, error) {
return newLock(ctx, repo, true)
}
var waitBeforeLockCheck = 200 * time.Millisecond
// delay increases by factor 2 on each retry
@ -113,11 +98,15 @@ func TestSetLockTimeout(t testing.TB, d time.Duration) {
initialWaitBetweenLockRetries = d
}
func newLock(ctx context.Context, repo Unpacked, excl bool) (*Lock, error) {
// NewLock returns a new lock for the repository. If an
// exclusive lock is already held by another process, it returns an error
// that satisfies IsAlreadyLocked. If the new lock is exclude, then other
// non-exclusive locks also result in an IsAlreadyLocked error.
func NewLock(ctx context.Context, repo Unpacked[FileType], exclusive bool) (*Lock, error) {
lock := &Lock{
Time: time.Now(),
PID: os.Getpid(),
Exclusive: excl,
Exclusive: exclusive,
repo: repo,
}
@ -444,42 +433,6 @@ func LoadLock(ctx context.Context, repo LoaderUnpacked, id ID) (*Lock, error) {
return lock, nil
}
// RemoveStaleLocks deletes all locks detected as stale from the repository.
func RemoveStaleLocks(ctx context.Context, repo Unpacked) (uint, error) {
var processed uint
err := ForAllLocks(ctx, repo, nil, func(id ID, lock *Lock, err error) error {
if err != nil {
// ignore locks that cannot be loaded
debug.Log("ignore lock %v: %v", id, err)
return nil
}
if lock.Stale() {
err = repo.RemoveUnpacked(ctx, LockFile, id)
if err == nil {
processed++
}
return err
}
return nil
})
return processed, err
}
// RemoveAllLocks removes all locks forcefully.
func RemoveAllLocks(ctx context.Context, repo Unpacked) (uint, error) {
var processed uint32
err := ParallelList(ctx, repo, LockFile, repo.Connections(), func(ctx context.Context, id ID, _ int64) error {
err := repo.RemoveUnpacked(ctx, LockFile, id)
if err == nil {
atomic.AddUint32(&processed, 1)
}
return err
})
return uint(processed), err
}
// ForAllLocks reads all locks in parallel and calls the given callback.
// It is guaranteed that the function is not run concurrently. If the
// callback returns an error, this function is cancelled and also returns that error.

View File

@ -19,7 +19,7 @@ func TestLock(t *testing.T) {
repo := repository.TestRepository(t)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
lock, err := restic.NewLock(context.TODO(), repo)
lock, err := repository.TestNewLock(t, repo, false)
rtest.OK(t, err)
rtest.OK(t, lock.Unlock(context.TODO()))
@ -29,7 +29,7 @@ func TestDoubleUnlock(t *testing.T) {
repo := repository.TestRepository(t)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
lock, err := restic.NewLock(context.TODO(), repo)
lock, err := repository.TestNewLock(t, repo, false)
rtest.OK(t, err)
rtest.OK(t, lock.Unlock(context.TODO()))
@ -43,10 +43,10 @@ func TestMultipleLock(t *testing.T) {
repo := repository.TestRepository(t)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
lock1, err := restic.NewLock(context.TODO(), repo)
lock1, err := repository.TestNewLock(t, repo, false)
rtest.OK(t, err)
lock2, err := restic.NewLock(context.TODO(), repo)
lock2, err := repository.TestNewLock(t, repo, false)
rtest.OK(t, err)
rtest.OK(t, lock1.Unlock(context.TODO()))
@ -69,10 +69,10 @@ func TestMultipleLockFailure(t *testing.T) {
repo, _ := repository.TestRepositoryWithBackend(t, be, 0, repository.Options{})
restic.TestSetLockTimeout(t, 5*time.Millisecond)
lock1, err := restic.NewLock(context.TODO(), repo)
lock1, err := repository.TestNewLock(t, repo, false)
rtest.OK(t, err)
_, err = restic.NewLock(context.TODO(), repo)
_, err = repository.TestNewLock(t, repo, false)
rtest.Assert(t, err != nil, "unreadable lock file did not result in an error")
rtest.OK(t, lock1.Unlock(context.TODO()))
@ -81,7 +81,7 @@ func TestMultipleLockFailure(t *testing.T) {
func TestLockExclusive(t *testing.T) {
repo := repository.TestRepository(t)
elock, err := restic.NewExclusiveLock(context.TODO(), repo)
elock, err := repository.TestNewLock(t, repo, true)
rtest.OK(t, err)
rtest.OK(t, elock.Unlock(context.TODO()))
}
@ -90,10 +90,10 @@ func TestLockOnExclusiveLockedRepo(t *testing.T) {
repo := repository.TestRepository(t)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
elock, err := restic.NewExclusiveLock(context.TODO(), repo)
elock, err := repository.TestNewLock(t, repo, true)
rtest.OK(t, err)
lock, err := restic.NewLock(context.TODO(), repo)
lock, err := repository.TestNewLock(t, repo, false)
rtest.Assert(t, err != nil,
"create normal lock with exclusively locked repo didn't return an error")
rtest.Assert(t, restic.IsAlreadyLocked(err),
@ -107,10 +107,10 @@ func TestExclusiveLockOnLockedRepo(t *testing.T) {
repo := repository.TestRepository(t)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
elock, err := restic.NewLock(context.TODO(), repo)
elock, err := repository.TestNewLock(t, repo, false)
rtest.OK(t, err)
lock, err := restic.NewExclusiveLock(context.TODO(), repo)
lock, err := repository.TestNewLock(t, repo, true)
rtest.Assert(t, err != nil,
"create normal lock with exclusively locked repo didn't return an error")
rtest.Assert(t, restic.IsAlreadyLocked(err),
@ -120,20 +120,6 @@ func TestExclusiveLockOnLockedRepo(t *testing.T) {
rtest.OK(t, elock.Unlock(context.TODO()))
}
func createFakeLock(repo restic.SaverUnpacked, t time.Time, pid int) (restic.ID, error) {
hostname, err := os.Hostname()
if err != nil {
return restic.ID{}, err
}
newLock := &restic.Lock{Time: t, PID: pid, Hostname: hostname}
return restic.SaveJSONUnpacked(context.TODO(), repo, restic.LockFile, &newLock)
}
func removeLock(repo restic.RemoverUnpacked, id restic.ID) error {
return repo.RemoveUnpacked(context.TODO(), restic.LockFile, id)
}
var staleLockTests = []struct {
timestamp time.Time
stale bool
@ -190,72 +176,6 @@ func TestLockStale(t *testing.T) {
}
}
func lockExists(repo restic.Lister, t testing.TB, lockID restic.ID) bool {
var exists bool
rtest.OK(t, repo.List(context.TODO(), restic.LockFile, func(id restic.ID, size int64) error {
if id == lockID {
exists = true
}
return nil
}))
return exists
}
func TestLockWithStaleLock(t *testing.T) {
repo := repository.TestRepository(t)
id1, err := createFakeLock(repo, time.Now().Add(-time.Hour), os.Getpid())
rtest.OK(t, err)
id2, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid())
rtest.OK(t, err)
id3, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid()+500000)
rtest.OK(t, err)
processed, err := restic.RemoveStaleLocks(context.TODO(), repo)
rtest.OK(t, err)
rtest.Assert(t, lockExists(repo, t, id1) == false,
"stale lock still exists after RemoveStaleLocks was called")
rtest.Assert(t, lockExists(repo, t, id2) == true,
"non-stale lock was removed by RemoveStaleLocks")
rtest.Assert(t, lockExists(repo, t, id3) == false,
"stale lock still exists after RemoveStaleLocks was called")
rtest.Assert(t, processed == 2,
"number of locks removed does not match: expected %d, got %d",
2, processed)
rtest.OK(t, removeLock(repo, id2))
}
func TestRemoveAllLocks(t *testing.T) {
repo := repository.TestRepository(t)
id1, err := createFakeLock(repo, time.Now().Add(-time.Hour), os.Getpid())
rtest.OK(t, err)
id2, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid())
rtest.OK(t, err)
id3, err := createFakeLock(repo, time.Now().Add(-time.Minute), os.Getpid()+500000)
rtest.OK(t, err)
processed, err := restic.RemoveAllLocks(context.TODO(), repo)
rtest.OK(t, err)
rtest.Assert(t, lockExists(repo, t, id1) == false,
"lock still exists after RemoveAllLocks was called")
rtest.Assert(t, lockExists(repo, t, id2) == false,
"lock still exists after RemoveAllLocks was called")
rtest.Assert(t, lockExists(repo, t, id3) == false,
"lock still exists after RemoveAllLocks was called")
rtest.Assert(t, processed == 3,
"number of locks removed does not match: expected %d, got %d",
3, processed)
}
func checkSingleLock(t *testing.T, repo restic.Lister) restic.ID {
t.Helper()
var lockID *restic.ID
@ -279,7 +199,7 @@ func testLockRefresh(t *testing.T, refresh func(lock *restic.Lock) error) {
repo := repository.TestRepository(t)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
lock, err := restic.NewLock(context.TODO(), repo)
lock, err := repository.TestNewLock(t, repo, false)
rtest.OK(t, err)
time0 := lock.Time
@ -312,10 +232,10 @@ func TestLockRefreshStale(t *testing.T) {
}
func TestLockRefreshStaleMissing(t *testing.T) {
repo, be := repository.TestRepositoryWithVersion(t, 0)
repo, _, be := repository.TestRepositoryWithVersion(t, 0)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
lock, err := restic.NewLock(context.TODO(), repo)
lock, err := repository.TestNewLock(t, repo, false)
rtest.OK(t, err)
lockID := checkSingleLock(t, repo)

View File

@ -54,7 +54,7 @@ func ParallelList(ctx context.Context, r Lister, t FileType, parallelism uint, f
// ParallelRemove deletes the given fileList of fileType in parallel
// if callback returns an error, then it will abort.
func ParallelRemove(ctx context.Context, repo RemoverUnpacked, fileList IDSet, fileType FileType, report func(id ID, err error) error, bar *progress.Counter) error {
func ParallelRemove[FT FileTypes](ctx context.Context, repo RemoverUnpacked[FT], fileList IDSet, fileType FT, report func(id ID, err error) error, bar *progress.Counter) error {
fileChan := make(chan ID)
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {

View File

@ -57,14 +57,16 @@ type Repository interface {
LoadRaw(ctx context.Context, t FileType, id ID) (data []byte, err error)
// LoadUnpacked loads and decrypts the file with the given type and ID.
LoadUnpacked(ctx context.Context, t FileType, id ID) (data []byte, err error)
SaveUnpacked(ctx context.Context, t FileType, buf []byte) (ID, error)
SaveUnpacked(ctx context.Context, t WriteableFileType, buf []byte) (ID, error)
// RemoveUnpacked removes a file from the repository. This will eventually be restricted to deleting only snapshots.
RemoveUnpacked(ctx context.Context, t FileType, id ID) error
RemoveUnpacked(ctx context.Context, t WriteableFileType, id ID) error
}
type FileType = backend.FileType
// These are the different data types a backend can store.
// These are the different data types a backend can store. Only filetypes contained
// in the `WriteableFileType` subset can be modified via the Repository interface.
// All other filetypes are considered internal datastructures of the Repository.
const (
PackFile FileType = backend.PackFile
KeyFile FileType = backend.KeyFile
@ -74,6 +76,26 @@ const (
ConfigFile FileType = backend.ConfigFile
)
type WriteableFileType backend.FileType
// These are the different data types that can be modified via SaveUnpacked or RemoveUnpacked.
const (
WriteableSnapshotFile WriteableFileType = WriteableFileType(SnapshotFile)
)
func (w *WriteableFileType) ToFileType() FileType {
switch *w {
case WriteableSnapshotFile:
return SnapshotFile
default:
panic("invalid WriteableFileType")
}
}
type FileTypes interface {
FileType | WriteableFileType
}
// LoaderUnpacked allows loading a blob not stored in a pack file
type LoaderUnpacked interface {
// Connections returns the maximum number of concurrent backend operations
@ -82,22 +104,22 @@ type LoaderUnpacked interface {
}
// SaverUnpacked allows saving a blob not stored in a pack file
type SaverUnpacked interface {
type SaverUnpacked[FT FileTypes] interface {
// Connections returns the maximum number of concurrent backend operations
Connections() uint
SaveUnpacked(ctx context.Context, t FileType, buf []byte) (ID, error)
SaveUnpacked(ctx context.Context, t FT, buf []byte) (ID, error)
}
// RemoverUnpacked allows removing an unpacked blob
type RemoverUnpacked interface {
type RemoverUnpacked[FT FileTypes] interface {
// Connections returns the maximum number of concurrent backend operations
Connections() uint
RemoveUnpacked(ctx context.Context, t FileType, id ID) error
RemoveUnpacked(ctx context.Context, t FT, id ID) error
}
type SaverRemoverUnpacked interface {
SaverUnpacked
RemoverUnpacked
type SaverRemoverUnpacked[FT FileTypes] interface {
SaverUnpacked[FT]
RemoverUnpacked[FT]
}
type PackBlobs struct {
@ -126,10 +148,10 @@ type ListerLoaderUnpacked interface {
LoaderUnpacked
}
type Unpacked interface {
type Unpacked[FT FileTypes] interface {
ListerLoaderUnpacked
SaverUnpacked
RemoverUnpacked
SaverUnpacked[FT]
RemoverUnpacked[FT]
}
type ListBlobser interface {

View File

@ -90,8 +90,8 @@ func LoadSnapshot(ctx context.Context, loader LoaderUnpacked, id ID) (*Snapshot,
}
// SaveSnapshot saves the snapshot sn and returns its ID.
func SaveSnapshot(ctx context.Context, repo SaverUnpacked, sn *Snapshot) (ID, error) {
return SaveJSONUnpacked(ctx, repo, SnapshotFile, sn)
func SaveSnapshot(ctx context.Context, repo SaverUnpacked[WriteableFileType], sn *Snapshot) (ID, error) {
return SaveJSONUnpacked(ctx, repo, WriteableSnapshotFile, sn)
}
// ForAllSnapshots reads all snapshots in parallel and calls the

View File

@ -32,7 +32,7 @@ func TestLoadJSONUnpacked(t *testing.T) {
}
func testLoadJSONUnpacked(t *testing.T, version uint) {
repo, _ := repository.TestRepositoryWithVersion(t, version)
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
// archive a snapshot
sn := restic.Snapshot{}

View File

@ -184,7 +184,7 @@ func testLoadTree(t *testing.T, version uint) {
}
// archive a few files
repo, _ := repository.TestRepositoryWithVersion(t, version)
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
sn := archiver.TestSnapshot(t, repo, rtest.BenchArchiveDirectory, nil)
rtest.OK(t, repo.Flush(context.Background()))
@ -202,7 +202,7 @@ func benchmarkLoadTree(t *testing.B, version uint) {
}
// archive a few files
repo, _ := repository.TestRepositoryWithVersion(t, version)
repo, _, _ := repository.TestRepositoryWithVersion(t, version)
sn := archiver.TestSnapshot(t, repo, rtest.BenchArchiveDirectory, nil)
rtest.OK(t, repo.Flush(context.Background()))