diff --git a/backend/cache/cache.go b/backend/cache/cache.go index 0094d9b4f..91f799c06 100644 --- a/backend/cache/cache.go +++ b/backend/cache/cache.go @@ -174,6 +174,7 @@ type Fs struct { plexConnector *plexConnector backgroundRunner *backgroundWriter cleanupChan chan bool + parentsForgetFn []func(string) } // parseRootPath returns a cleaned root path and a nil error or "" and an error when the path is invalid @@ -380,24 +381,16 @@ func NewFs(name, rootPath string) (fs.Fs, error) { } }() - // TODO: Explore something here but now it's not something we want - // when writing from cache, source FS will send a notification and clear it out immediately - //setup dir notification - //doDirChangeNotify := wrappedFs.Features().DirChangeNotify - //if doDirChangeNotify != nil { - // doDirChangeNotify(func(dir string) { - // d := NewAbsDirectory(f, dir) - // d.Flush() - // fs.Infof(dir, "updated from notification") - // }, time.Second * 10) - //} + if doDirChangeNotify := wrappedFs.Features().DirChangeNotify; doDirChangeNotify != nil { + doDirChangeNotify(f.receiveDirChangeNotify, f.chunkCleanInterval) + } f.features = (&fs.Features{ CanHaveEmptyDirectories: true, DuplicateFiles: false, // storage doesn't permit this - DirChangeNotify: nil, }).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs) // override only those features that use a temp fs and it doesn't support them + f.features.DirChangeNotify = f.DirChangeNotify if f.tempWritePath != "" { if f.tempFs.Features().Copy == nil { f.features.Copy = nil @@ -421,6 +414,81 @@ func NewFs(name, rootPath string) (fs.Fs, error) { return f, fsErr } +func (f *Fs) receiveDirChangeNotify(forgetPath string) { + fs.Debugf(f, "notify: expiring cache for '%v'", forgetPath) + // notify upstreams too (vfs) + f.notifyDirChange(forgetPath) + + var cd *Directory + co := NewObject(f, forgetPath) + err := f.cache.GetObject(co) + if err == nil { + cd = NewDirectory(f, cleanPath(path.Dir(co.Remote()))) + } else { + cd = NewDirectory(f, forgetPath) + } + + // we list all the cached objects and expire all of them + entries, err := f.cache.GetDirEntries(cd) + if err != nil { + fs.Debugf(forgetPath, "notify: ignoring notification on non cached dir") + return + } + for i := 0; i < len(entries); i++ { + if co, ok := entries[i].(*Object); ok { + co.CacheTs = time.Now().Add(f.fileAge * -1) + err = f.cache.AddObject(co) + if err != nil { + fs.Errorf(forgetPath, "notify: error expiring '%v': %v", co, err) + } else { + fs.Debugf(forgetPath, "notify: expired %v", co) + } + } + } + // finally, we expire the dir as well + err = f.cache.ExpireDir(cd) + if err != nil { + fs.Errorf(forgetPath, "notify: error expiring '%v': %v", cd, err) + } else { + fs.Debugf(forgetPath, "notify: expired '%v'", cd) + } +} + +// notifyDirChange takes a remote (can be dir or entry) and +// tries to determine which is it and notify upstreams of the dir change +func (f *Fs) notifyDirChange(remote string) { + var cd *Directory + co := NewObject(f, remote) + err := f.cache.GetObject(co) + if err == nil { + pd := cleanPath(path.Dir(remote)) + cd = NewDirectory(f, pd) + } else { + cd = NewDirectory(f, remote) + } + + f.notifyDirChangeUpstream(cd.Remote()) +} + +// notifyDirChangeUpstream will loop through all the upstreams and notify +// of the provided remote (should be only a dir) +func (f *Fs) notifyDirChangeUpstream(remote string) { + if len(f.parentsForgetFn) > 0 { + for _, fn := range f.parentsForgetFn { + fn(remote) + } + } +} + +// DirChangeNotify can subsribe multiple callers +// this is coupled with the wrapped fs DirChangeNotify (if it supports it) +// and also notifies other caches (i.e VFS) to clear out whenever something changes +func (f *Fs) DirChangeNotify(notifyFunc func(string), pollInterval time.Duration) chan bool { + fs.Debugf(f, "subscribing to DirChangeNotify") + f.parentsForgetFn = append(f.parentsForgetFn, notifyFunc) + return make(chan bool) +} + // Name of the remote (as passed into NewFs) func (f *Fs) Name() string { return f.name @@ -683,6 +751,10 @@ func (f *Fs) Mkdir(dir string) error { } else { fs.Infof(parentCd, "mkdir: cache expired") } + // advertise to DirChangeNotify if wrapped doesn't do that + if f.Fs.Features().DirChangeNotify == nil { + f.notifyDirChangeUpstream(parentCd.Remote()) + } return nil } @@ -751,6 +823,10 @@ func (f *Fs) Rmdir(dir string) error { } else { fs.Infof(parentCd, "rmdir: cache expired") } + // advertise to DirChangeNotify if wrapped doesn't do that + if f.Fs.Features().DirChangeNotify == nil { + f.notifyDirChangeUpstream(parentCd.Remote()) + } return nil } @@ -847,6 +923,10 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { } else { fs.Debugf(srcParent, "dirmove: cache expired") } + // advertise to DirChangeNotify if wrapped doesn't do that + if f.Fs.Features().DirChangeNotify == nil { + f.notifyDirChangeUpstream(srcParent.Remote()) + } // expire parent dir at the destination path dstParent := NewDirectory(f, cleanPath(path.Dir(dstRemote))) @@ -856,6 +936,10 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { } else { fs.Debugf(dstParent, "dirmove: cache expired") } + // advertise to DirChangeNotify if wrapped doesn't do that + if f.Fs.Features().DirChangeNotify == nil { + f.notifyDirChangeUpstream(dstParent.Remote()) + } // TODO: precache dst dir and save the chunks return nil @@ -978,6 +1062,10 @@ func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put p } else { fs.Infof(parentCd, "put: cache expired") } + // advertise to DirChangeNotify if wrapped doesn't do that + if f.Fs.Features().DirChangeNotify == nil { + f.notifyDirChangeUpstream(parentCd.Remote()) + } return cachedObj, nil } @@ -1066,6 +1154,10 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { } else { fs.Infof(parentCd, "copy: cache expired") } + // advertise to DirChangeNotify if wrapped doesn't do that + if f.Fs.Features().DirChangeNotify == nil { + f.notifyDirChangeUpstream(parentCd.Remote()) + } // expire src parent srcParent := NewDirectory(f, cleanPath(path.Dir(src.Remote()))) err = f.cache.ExpireDir(srcParent) @@ -1074,6 +1166,10 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { } else { fs.Infof(srcParent, "copy: cache expired") } + // advertise to DirChangeNotify if wrapped doesn't do that + if f.Fs.Features().DirChangeNotify == nil { + f.notifyDirChangeUpstream(srcParent.Remote()) + } return co, nil } @@ -1158,6 +1254,10 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { } else { fs.Infof(parentCd, "move: cache expired") } + // advertise to DirChangeNotify if wrapped doesn't do that + if f.Fs.Features().DirChangeNotify == nil { + f.notifyDirChangeUpstream(parentCd.Remote()) + } // persist new cachedObj := ObjectFromOriginal(f, obj).persist() fs.Debugf(cachedObj, "move: added to cache") @@ -1169,6 +1269,10 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { } else { fs.Infof(parentCd, "move: cache expired") } + // advertise to DirChangeNotify if wrapped doesn't do that + if f.Fs.Features().DirChangeNotify == nil { + f.notifyDirChangeUpstream(parentCd.Remote()) + } return cachedObj, nil } @@ -1321,15 +1425,16 @@ func cleanPath(p string) string { // Check the interfaces are satisfied var ( - _ fs.Fs = (*Fs)(nil) - _ fs.Purger = (*Fs)(nil) - _ fs.Copier = (*Fs)(nil) - _ fs.Mover = (*Fs)(nil) - _ fs.DirMover = (*Fs)(nil) - _ fs.PutUncheckeder = (*Fs)(nil) - _ fs.PutStreamer = (*Fs)(nil) - _ fs.CleanUpper = (*Fs)(nil) - _ fs.UnWrapper = (*Fs)(nil) - _ fs.Wrapper = (*Fs)(nil) - _ fs.ListRer = (*Fs)(nil) + _ fs.Fs = (*Fs)(nil) + _ fs.Purger = (*Fs)(nil) + _ fs.Copier = (*Fs)(nil) + _ fs.Mover = (*Fs)(nil) + _ fs.DirMover = (*Fs)(nil) + _ fs.PutUncheckeder = (*Fs)(nil) + _ fs.PutStreamer = (*Fs)(nil) + _ fs.CleanUpper = (*Fs)(nil) + _ fs.UnWrapper = (*Fs)(nil) + _ fs.Wrapper = (*Fs)(nil) + _ fs.ListRer = (*Fs)(nil) + _ fs.DirChangeNotifier = (*Fs)(nil) ) diff --git a/backend/cache/cache_internal_test.go b/backend/cache/cache_internal_test.go index cb7ca5d03..137f423e0 100644 --- a/backend/cache/cache_internal_test.go +++ b/backend/cache/cache_internal_test.go @@ -54,34 +54,35 @@ var ( runInstance *run errNotSupported = errors.New("not supported") decryptedToEncryptedRemotes = map[string]string{ - "one": "lm4u7jjt3c85bf56vjqgeenuno", - "second": "qvt1ochrkcfbptp5mu9ugb2l14", - "test": "jn4tegjtpqro30t3o11thb4b5s", - "test2": "qakvqnh8ttei89e0gc76crpql4", - "data.bin": "0q2847tfko6mhj3dag3r809qbc", - "ticw/data.bin": "5mv97b0ule6pht33srae5pice8/0q2847tfko6mhj3dag3r809qbc", - "tiutfo/test/one": "legd371aa8ol36tjfklt347qnc/jn4tegjtpqro30t3o11thb4b5s/lm4u7jjt3c85bf56vjqgeenuno", - "tiuufo/test/one": "vi6u1olqhirqv14cd8qlej1mgo/jn4tegjtpqro30t3o11thb4b5s/lm4u7jjt3c85bf56vjqgeenuno", - "tiutfo/second/one": "legd371aa8ol36tjfklt347qnc/qvt1ochrkcfbptp5mu9ugb2l14/lm4u7jjt3c85bf56vjqgeenuno", - "second/one": "qvt1ochrkcfbptp5mu9ugb2l14/lm4u7jjt3c85bf56vjqgeenuno", - "test/one": "jn4tegjtpqro30t3o11thb4b5s/lm4u7jjt3c85bf56vjqgeenuno", - "test/second": "jn4tegjtpqro30t3o11thb4b5s/qvt1ochrkcfbptp5mu9ugb2l14", - "test/third": "jn4tegjtpqro30t3o11thb4b5s/2nd7fjiop5h3ihfj1vl953aa5g", - "test/0.bin": "jn4tegjtpqro30t3o11thb4b5s/e6frddt058b6kvbpmlstlndmtk", - "test/1.bin": "jn4tegjtpqro30t3o11thb4b5s/kck472nt1k7qbmob0mt1p1crgc", - "test/2.bin": "jn4tegjtpqro30t3o11thb4b5s/744oe9ven2rmak4u27if51qk24", - "test/3.bin": "jn4tegjtpqro30t3o11thb4b5s/2bjd8kef0u5lmsu6qhqll34bcs", - "test/4.bin": "jn4tegjtpqro30t3o11thb4b5s/cvjs73iv0a82v0c7r67avllh7s", - "test/5.bin": "jn4tegjtpqro30t3o11thb4b5s/0plkdo790b6bnmt33qsdqmhv9c", - "test/6.bin": "jn4tegjtpqro30t3o11thb4b5s/s5r633srnjtbh83893jovjt5d0", - "test/7.bin": "jn4tegjtpqro30t3o11thb4b5s/6rq45tr9bjsammku622flmqsu4", - "test/8.bin": "jn4tegjtpqro30t3o11thb4b5s/37bc6tcl3e31qb8cadvjb749vk", - "test/9.bin": "jn4tegjtpqro30t3o11thb4b5s/t4pr35hnls32789o8fk0chk1ec", + "one": "lm4u7jjt3c85bf56vjqgeenuno", + "second": "qvt1ochrkcfbptp5mu9ugb2l14", + "test": "jn4tegjtpqro30t3o11thb4b5s", + "test2": "qakvqnh8ttei89e0gc76crpql4", + "data.bin": "0q2847tfko6mhj3dag3r809qbc", + "ticw/data.bin": "5mv97b0ule6pht33srae5pice8/0q2847tfko6mhj3dag3r809qbc", + "tiuufo/test/one": "vi6u1olqhirqv14cd8qlej1mgo/jn4tegjtpqro30t3o11thb4b5s/lm4u7jjt3c85bf56vjqgeenuno", + "tiuufo/test/second": "vi6u1olqhirqv14cd8qlej1mgo/jn4tegjtpqro30t3o11thb4b5s/qvt1ochrkcfbptp5mu9ugb2l14", + "tiutfo/test/one": "legd371aa8ol36tjfklt347qnc/jn4tegjtpqro30t3o11thb4b5s/lm4u7jjt3c85bf56vjqgeenuno", + "tiutfo/second/one": "legd371aa8ol36tjfklt347qnc/qvt1ochrkcfbptp5mu9ugb2l14/lm4u7jjt3c85bf56vjqgeenuno", + "second/one": "qvt1ochrkcfbptp5mu9ugb2l14/lm4u7jjt3c85bf56vjqgeenuno", + "test/one": "jn4tegjtpqro30t3o11thb4b5s/lm4u7jjt3c85bf56vjqgeenuno", + "test/second": "jn4tegjtpqro30t3o11thb4b5s/qvt1ochrkcfbptp5mu9ugb2l14", + "test/third": "jn4tegjtpqro30t3o11thb4b5s/2nd7fjiop5h3ihfj1vl953aa5g", + "test/0.bin": "jn4tegjtpqro30t3o11thb4b5s/e6frddt058b6kvbpmlstlndmtk", + "test/1.bin": "jn4tegjtpqro30t3o11thb4b5s/kck472nt1k7qbmob0mt1p1crgc", + "test/2.bin": "jn4tegjtpqro30t3o11thb4b5s/744oe9ven2rmak4u27if51qk24", + "test/3.bin": "jn4tegjtpqro30t3o11thb4b5s/2bjd8kef0u5lmsu6qhqll34bcs", + "test/4.bin": "jn4tegjtpqro30t3o11thb4b5s/cvjs73iv0a82v0c7r67avllh7s", + "test/5.bin": "jn4tegjtpqro30t3o11thb4b5s/0plkdo790b6bnmt33qsdqmhv9c", + "test/6.bin": "jn4tegjtpqro30t3o11thb4b5s/s5r633srnjtbh83893jovjt5d0", + "test/7.bin": "jn4tegjtpqro30t3o11thb4b5s/6rq45tr9bjsammku622flmqsu4", + "test/8.bin": "jn4tegjtpqro30t3o11thb4b5s/37bc6tcl3e31qb8cadvjb749vk", + "test/9.bin": "jn4tegjtpqro30t3o11thb4b5s/t4pr35hnls32789o8fk0chk1ec", } ) func init() { - goflag.StringVar(&remoteName, "remote-internal", "TestCache", "Remote to test with, defaults to local filesystem") + goflag.StringVar(&remoteName, "remote-internal", "TestInternalCache", "Remote to test with, defaults to local filesystem") goflag.StringVar(&mountDir, "mount-dir-internal", "", "") goflag.StringVar(&uploadDir, "upload-dir-internal", "", "") goflag.BoolVar(&useMount, "cache-use-mount", false, "Test only with mount") @@ -109,8 +110,10 @@ func TestInternalListRootAndInnerRemotes(t *testing.T) { defer runInstance.cleanupFs(t, rootFs2, boltDb2) runInstance.writeObjectString(t, rootFs2, "one", "content") - listRoot := runInstance.list(t, rootFs, "") - listRootInner := runInstance.list(t, rootFs, innerFolder) + listRoot, err := runInstance.list(t, rootFs, "") + require.NoError(t, err) + listRootInner, err := runInstance.list(t, rootFs, innerFolder) + require.NoError(t, err) listInner, err := rootFs2.List("") require.NoError(t, err) @@ -119,6 +122,104 @@ func TestInternalListRootAndInnerRemotes(t *testing.T) { require.Len(t, listInner, 1) } +func TestInternalVfsCache(t *testing.T) { + vfsflags.Opt.DirCacheTime = time.Second * 30 + testSize := int64(524288000) + + vfsflags.Opt.CacheMode = vfs.CacheModeWrites + id := "tiuufo" + rootFs, boltDb := runInstance.newCacheFs(t, remoteName, id, true, true, nil, map[string]string{"cache-writes": "true", "cache-info-age": "1h"}) + defer runInstance.cleanupFs(t, rootFs, boltDb) + + err := rootFs.Mkdir("test") + require.NoError(t, err) + runInstance.writeObjectString(t, rootFs, "test/second", "content") + _, err = rootFs.List("test") + require.NoError(t, err) + + testReader := runInstance.randomReader(t, testSize) + writeCh := make(chan interface{}) + //write2Ch := make(chan interface{}) + readCh := make(chan interface{}) + cacheCh := make(chan interface{}) + // write the main file + go func() { + defer func() { + writeCh <- true + }() + + log.Printf("========== started writing file 'test/one'") + runInstance.writeRemoteReader(t, rootFs, "test/one", testReader) + log.Printf("========== done writing file 'test/one'") + }() + // routine to check which cache has what, autostarts + go func() { + for { + select { + case <-cacheCh: + log.Printf("========== finished checking caches") + return + default: + } + li2 := [2]string{path.Join("test", "one"), path.Join("test", "second")} + for _, r := range li2 { + var err error + ci, err := ioutil.ReadDir(path.Join(runInstance.chunkPath, runInstance.encryptRemoteIfNeeded(t, path.Join(id, r)))) + if err != nil || len(ci) == 0 { + log.Printf("========== '%v' not in cache", r) + } else { + log.Printf("========== '%v' IN CACHE", r) + } + _, err = os.Stat(path.Join(runInstance.vfsCachePath, id, r)) + if err != nil { + log.Printf("========== '%v' not in vfs", r) + } else { + log.Printf("========== '%v' IN VFS", r) + } + } + time.Sleep(time.Second * 10) + } + }() + // routine to list, autostarts + go func() { + for { + select { + case <-readCh: + log.Printf("========== finished checking listings and readings") + return + default: + } + li, err := runInstance.list(t, rootFs, "test") + if err != nil { + log.Printf("========== error listing 'test' folder: %v", err) + } else { + log.Printf("========== list 'test' folder count: %v", len(li)) + } + + time.Sleep(time.Second * 10) + } + }() + + // wait for main file to be written + <-writeCh + log.Printf("========== waiting for VFS to expire") + time.Sleep(time.Second * 120) + + // try a final read + li2 := [2]string{"test/one", "test/second"} + for _, r := range li2 { + _, err := runInstance.readDataFromRemote(t, rootFs, r, int64(0), int64(2), false) + if err != nil { + log.Printf("========== error reading '%v': %v", r, err) + } else { + log.Printf("========== read '%v'", r) + } + } + // close the cache and list checkers + cacheCh <- true + readCh <- true +} + func TestInternalObjWrapFsFound(t *testing.T) { id := fmt.Sprintf("tiowff%v", time.Now().Unix()) rootFs, boltDb := runInstance.newCacheFs(t, remoteName, id, true, true, nil, nil) @@ -137,15 +238,18 @@ func TestInternalObjWrapFsFound(t *testing.T) { } runInstance.writeObjectBytes(t, wrappedFs, runInstance.encryptRemoteIfNeeded(t, "test"), testData) - listRoot := runInstance.list(t, rootFs, "") + listRoot, err := runInstance.list(t, rootFs, "") + require.NoError(t, err) require.Len(t, listRoot, 1) - cachedData := runInstance.readDataFromRemote(t, rootFs, "test", 0, int64(len([]byte("test content"))), false) + cachedData, err := runInstance.readDataFromRemote(t, rootFs, "test", 0, int64(len([]byte("test content"))), false) + require.NoError(t, err) require.Equal(t, "test content", string(cachedData)) err = runInstance.rm(t, rootFs, "test") require.NoError(t, err) - listRoot = runInstance.list(t, rootFs, "") + listRoot, err = runInstance.list(t, rootFs, "") + require.NoError(t, err) require.Len(t, listRoot, 0) } @@ -179,7 +283,8 @@ func TestInternalRemoteWrittenFileFoundInMount(t *testing.T) { } runInstance.writeObjectBytes(t, cfs.UnWrap(), runInstance.encryptRemoteIfNeeded(t, "test"), testData) - data := runInstance.readDataFromRemote(t, rootFs, "test", 0, int64(len([]byte("test content"))), false) + data, err := runInstance.readDataFromRemote(t, rootFs, "test", 0, int64(len([]byte("test content"))), false) + require.NoError(t, err) require.Equal(t, "test content", string(data)) } @@ -202,7 +307,8 @@ func TestInternalCachedWrittenContentMatches(t *testing.T) { sampleStart := chunkSize / 2 sampleEnd := chunkSize testSample := testData[sampleStart:sampleEnd] - checkSample := runInstance.readDataFromRemote(t, rootFs, "data.bin", sampleStart, sampleEnd, false) + checkSample, err := runInstance.readDataFromRemote(t, rootFs, "data.bin", sampleStart, sampleEnd, false) + require.NoError(t, err) require.Equal(t, int64(len(checkSample)), sampleEnd-sampleStart) require.Equal(t, checkSample, testSample) } @@ -231,7 +337,8 @@ func TestInternalCachedUpdatedContentMatches(t *testing.T) { require.Equal(t, o.Size(), int64(len(testData2))) // check data from in-file - checkSample := runInstance.readDataFromRemote(t, rootFs, "data.bin", 0, int64(len(testData2)), false) + checkSample, err := runInstance.readDataFromRemote(t, rootFs, "data.bin", 0, int64(len(testData2)), false) + require.NoError(t, err) require.Equal(t, checkSample, testData2) } @@ -257,14 +364,16 @@ func TestInternalWrappedWrittenContentMatches(t *testing.T) { require.Equal(t, o.Size(), int64(testSize)) time.Sleep(time.Second * 3) - data2 := runInstance.readDataFromRemote(t, rootFs, "data.bin", 0, int64(testSize), false) + data2, err := runInstance.readDataFromRemote(t, rootFs, "data.bin", 0, int64(testSize), false) + require.NoError(t, err) require.Equal(t, int64(len(data2)), o.Size()) // check sample of data from in-file sampleStart := chunkSize / 2 sampleEnd := chunkSize testSample := testData[sampleStart:sampleEnd] - checkSample := runInstance.readDataFromRemote(t, rootFs, "data.bin", sampleStart, sampleEnd, false) + checkSample, err := runInstance.readDataFromRemote(t, rootFs, "data.bin", sampleStart, sampleEnd, false) + require.NoError(t, err) require.Equal(t, len(checkSample), len(testSample)) for i := 0; i < len(checkSample); i++ { @@ -293,7 +402,8 @@ func TestInternalLargeWrittenContentMatches(t *testing.T) { runInstance.writeObjectBytes(t, cfs.UnWrap(), "data.bin", testData) time.Sleep(time.Second * 3) - readData := runInstance.readDataFromRemote(t, rootFs, "data.bin", 0, testSize, false) + readData, err := runInstance.readDataFromRemote(t, rootFs, "data.bin", 0, testSize, false) + require.NoError(t, err) for i := 0; i < len(readData); i++ { require.Equalf(t, testData[i], readData[i], "at byte %v", i) } @@ -320,9 +430,23 @@ func TestInternalWrappedFsChangeNotSeen(t *testing.T) { require.NoError(t, err) // get a new instance from the cache - co, err := rootFs.NewObject("data.bin") - require.NoError(t, err) - require.NotEqual(t, co.ModTime().String(), o.ModTime().String()) + if runInstance.wrappedIsExternal { + err = runInstance.retryBlock(func() error { + coModTime, err := runInstance.modTime(t, rootFs, "data.bin") + if err != nil { + return err + } + if coModTime.Unix() != o.ModTime().Unix() { + return errors.Errorf("%v <> %v", coModTime, o.ModTime()) + } + return nil + }, 12, time.Second*10) + require.NoError(t, err) + } else { + coModTime, err := runInstance.modTime(t, rootFs, "data.bin") + require.NoError(t, err) + require.NotEqual(t, coModTime.Unix(), o.ModTime().Unix()) + } } func TestInternalChangeSeenAfterDirCacheFlush(t *testing.T) { @@ -425,17 +549,22 @@ func TestInternalExpiredEntriesRemoved(t *testing.T) { runInstance.mkdir(t, rootFs, "test") runInstance.writeRemoteString(t, rootFs, "test/second", "second content") - l := runInstance.list(t, rootFs, "test") + l, err := runInstance.list(t, rootFs, "test") + require.NoError(t, err) require.Len(t, l, 1) err = cfs.UnWrap().Mkdir(runInstance.encryptRemoteIfNeeded(t, "test/third")) require.NoError(t, err) - l = runInstance.list(t, rootFs, "test") + l, err = runInstance.list(t, rootFs, "test") + require.NoError(t, err) require.Len(t, l, 1) err = runInstance.retryBlock(func() error { - l = runInstance.list(t, rootFs, "test") + l, err = runInstance.list(t, rootFs, "test") + if err != nil { + return err + } if len(l) != 2 { return errors.New("list is not 2") } @@ -470,7 +599,8 @@ func testInternalUploadQueueOneFile(t *testing.T, id string, rootFs fs.Fs, boltD } else { require.Equal(t, testSize, ti.Size()) } - de1 := runInstance.list(t, rootFs, "") + de1, err := runInstance.list(t, rootFs, "") + require.NoError(t, err) require.Len(t, de1, 1) runInstance.completeBackgroundUpload(t, "one", bu) @@ -479,7 +609,8 @@ func testInternalUploadQueueOneFile(t *testing.T, id string, rootFs fs.Fs, boltD require.True(t, os.IsNotExist(err)) // check if it can be read - data2 := runInstance.readDataFromRemote(t, rootFs, "one", 0, int64(1024), false) + data2, err := runInstance.readDataFromRemote(t, rootFs, "one", 0, int64(1024), false) + require.NoError(t, err) require.Len(t, data2, 1024) } @@ -536,7 +667,8 @@ func TestInternalUploadQueueMoreFiles(t *testing.T) { } // check if cache lists all files, likely temp upload didn't finish yet - de1 := runInstance.list(t, rootFs, "test") + de1, err := runInstance.list(t, rootFs, "test") + require.NoError(t, err) require.Len(t, de1, totalFiles) // wait for background uploader to do its thing @@ -548,7 +680,8 @@ func TestInternalUploadQueueMoreFiles(t *testing.T) { require.Len(t, tf, 0) // check if cache lists all files - de1 = runInstance.list(t, rootFs, "test") + de1, err = runInstance.list(t, rootFs, "test") + require.NoError(t, err) require.Len(t, de1, totalFiles) } @@ -566,10 +699,11 @@ func TestInternalUploadTempFileOperations(t *testing.T) { runInstance.writeRemoteString(t, rootFs, "test/one", "one content") // check if it can be read - data1 := runInstance.readDataFromRemote(t, rootFs, "test/one", 0, int64(len([]byte("one content"))), false) + data1, err := runInstance.readDataFromRemote(t, rootFs, "test/one", 0, int64(len([]byte("one content"))), false) + require.NoError(t, err) require.Equal(t, []byte("one content"), data1) // validate that it exists in temp fs - _, err := os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one"))) + _, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one"))) require.NoError(t, err) // test DirMove - allowed @@ -616,7 +750,8 @@ func TestInternalUploadTempFileOperations(t *testing.T) { require.Error(t, err) _, err = rootFs.NewObject("test/second") require.NoError(t, err) - data2 := runInstance.readDataFromRemote(t, rootFs, "test/second", 0, int64(len([]byte("one content"))), false) + data2, err := runInstance.readDataFromRemote(t, rootFs, "test/second", 0, int64(len([]byte("one content"))), false) + require.NoError(t, err) require.Equal(t, []byte("one content"), data2) // validate that it exists in temp fs _, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one"))) @@ -634,7 +769,8 @@ func TestInternalUploadTempFileOperations(t *testing.T) { require.NoError(t, err) _, err = rootFs.NewObject("test/third") require.NoError(t, err) - data2 := runInstance.readDataFromRemote(t, rootFs, "test/third", 0, int64(len([]byte("one content"))), false) + data2, err := runInstance.readDataFromRemote(t, rootFs, "test/third", 0, int64(len([]byte("one content"))), false) + require.NoError(t, err) require.Equal(t, []byte("one content"), data2) // validate that it exists in temp fs _, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one"))) @@ -692,10 +828,11 @@ func TestInternalUploadUploadingFileOperations(t *testing.T) { runInstance.writeRemoteString(t, rootFs, "test/one", "one content") // check if it can be read - data1 := runInstance.readDataFromRemote(t, rootFs, "test/one", 0, int64(len([]byte("one content"))), false) + data1, err := runInstance.readDataFromRemote(t, rootFs, "test/one", 0, int64(len([]byte("one content"))), false) + require.NoError(t, err) require.Equal(t, []byte("one content"), data1) // validate that it exists in temp fs - _, err := os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one"))) + _, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one"))) require.NoError(t, err) err = boltDb.SetPendingUploadToStarted(runInstance.encryptRemoteIfNeeded(t, path.Join(rootFs.Root(), "test/one"))) @@ -747,7 +884,8 @@ func TestInternalUploadUploadingFileOperations(t *testing.T) { require.NoError(t, err) _, err = rootFs.NewObject("test/third") require.NoError(t, err) - data2 := runInstance.readDataFromRemote(t, rootFs, "test/third", 0, int64(len([]byte("one content"))), false) + data2, err := runInstance.readDataFromRemote(t, rootFs, "test/third", 0, int64(len([]byte("one content"))), false) + require.NoError(t, err) require.Equal(t, []byte("one content"), data2) // validate that it exists in temp fs _, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one"))) @@ -867,6 +1005,9 @@ type run struct { unmountRes chan error vfs *vfs.VFS tempFiles []*os.File + dbPath string + chunkPath string + vfsCachePath string } func newRun() *run { @@ -1012,9 +1153,10 @@ func (r *run) newCacheFs(t *testing.T, remote, id string, needRemote, purge bool } } runInstance.rootIsCrypt = rootIsCrypt - dbPath := filepath.Join(config.CacheDir, "cache-backend", cacheRemote+".db") - chunkPath := filepath.Join(config.CacheDir, "cache-backend", cacheRemote) - boltDb, err := cache.GetPersistent(dbPath, chunkPath, &cache.Features{PurgeDb: true}) + runInstance.dbPath = filepath.Join(config.CacheDir, "cache-backend", cacheRemote+".db") + runInstance.chunkPath = filepath.Join(config.CacheDir, "cache-backend", cacheRemote) + runInstance.vfsCachePath = filepath.Join(config.CacheDir, "vfs", remote) + boltDb, err := cache.GetPersistent(runInstance.dbPath, runInstance.chunkPath, &cache.Features{PurgeDb: true}) require.NoError(t, err) for k, v := range r.runDefaultCfgMap { @@ -1046,7 +1188,7 @@ func (r *run) newCacheFs(t *testing.T, remote, id string, needRemote, purge bool _, isCrypt := cfs.Features().UnWrap().(*crypt.Fs) _, isLocal := cfs.Features().UnWrap().(*local.Fs) if isCache || isCrypt || isLocal { - r.wrappedIsExternal = true + r.wrappedIsExternal = false } else { r.wrappedIsExternal = true } @@ -1230,7 +1372,7 @@ func (r *run) updateObjectRemote(t *testing.T, f fs.Fs, remote string, data1 []b return obj } -func (r *run) readDataFromRemote(t *testing.T, f fs.Fs, remote string, offset, end int64, noLengthCheck bool) []byte { +func (r *run) readDataFromRemote(t *testing.T, f fs.Fs, remote string, offset, end int64, noLengthCheck bool) ([]byte, error) { size := end - offset checkSample := make([]byte, size) @@ -1239,27 +1381,32 @@ func (r *run) readDataFromRemote(t *testing.T, f fs.Fs, remote string, offset, e defer func() { _ = f.Close() }() - require.NoError(t, err) + if err != nil { + return checkSample, err + } _, _ = f.Seek(offset, 0) totalRead, err := io.ReadFull(f, checkSample) checkSample = checkSample[:totalRead] if err == io.EOF || err == io.ErrUnexpectedEOF { err = nil } - require.NoError(t, err) - if !noLengthCheck { - require.Equal(t, size, int64(totalRead)) + if err != nil { + return checkSample, err + } + if !noLengthCheck && size != int64(totalRead) { + return checkSample, errors.Errorf("read size doesn't match expected: %v <> %v", totalRead, size) } - require.NoError(t, err) } else { co, err := f.NewObject(remote) - require.NoError(t, err) + if err != nil { + return checkSample, err + } checkSample = r.readDataFromObj(t, co, offset, end, noLengthCheck) } - if !noLengthCheck { - require.Equal(t, size, int64(len(checkSample)), "wrong data read size from file") + if !noLengthCheck && size != int64(len(checkSample)) { + return checkSample, errors.Errorf("read size doesn't match expected: %v <> %v", len(checkSample), size) } - return checkSample + return checkSample, nil } func (r *run) readDataFromObj(t *testing.T, o fs.Object, offset, end int64, noLengthCheck bool) []byte { @@ -1305,7 +1452,7 @@ func (r *run) rm(t *testing.T, f fs.Fs, remote string) error { return err } -func (r *run) list(t *testing.T, f fs.Fs, remote string) []interface{} { +func (r *run) list(t *testing.T, f fs.Fs, remote string) ([]interface{}, error) { var err error var l []interface{} if r.useMount { @@ -1321,8 +1468,7 @@ func (r *run) list(t *testing.T, f fs.Fs, remote string) []interface{} { l = append(l, ll) } } - require.NoError(t, err) - return l + return l, err } func (r *run) listPath(t *testing.T, f fs.Fs, remote string) []string { diff --git a/backend/cache/handle.go b/backend/cache/handle.go index f96dae4c7..e0a106913 100644 --- a/backend/cache/handle.go +++ b/backend/cache/handle.go @@ -249,7 +249,7 @@ func (r *Handle) getChunk(chunkStart int64) ([]byte, error) { if !found { // we're gonna give the workers a chance to pickup the chunk // and retry a couple of times - for i := 0; i < r.cacheFs().readRetries*2; i++ { + for i := 0; i < r.cacheFs().readRetries*8; i++ { data, err = r.storage().GetChunk(r.cachedObject, chunkStart) if err == nil { found = true @@ -656,6 +656,7 @@ func (b *backgroundWriter) run() { if err != nil { fs.Errorf(parentCd, "background upload: cache expire error: %v", err) } + b.fs.notifyDirChange(remote) fs.Infof(remote, "finished background upload") b.notify(remote, BackgroundUploadCompleted, nil) } diff --git a/backend/cache/object.go b/backend/cache/object.go index 68c0becec..b88a30285 100644 --- a/backend/cache/object.go +++ b/backend/cache/object.go @@ -272,7 +272,12 @@ func (o *Object) Remove() error { fs.Debugf(o, "removing object") _ = o.CacheFs.cache.RemoveObject(o.abs()) _ = o.CacheFs.cache.removePendingUpload(o.abs()) - _ = o.CacheFs.cache.ExpireDir(NewDirectory(o.CacheFs, cleanPath(path.Dir(o.Remote())))) + parentCd := NewDirectory(o.CacheFs, cleanPath(path.Dir(o.Remote()))) + _ = o.CacheFs.cache.ExpireDir(parentCd) + // advertise to DirChangeNotify if wrapped doesn't do that + if o.CacheFs.Fs.Features().DirChangeNotify == nil { + o.CacheFs.notifyDirChangeUpstream(parentCd.Remote()) + } return nil }