mirror of https://github.com/rclone/rclone.git
filter: parallelise reading of --files-from - fixes #2835
Before this change rclone would read the list of files from the files-from parameter and check they existed one at a time. This could take a very long time for lots of files. After this change, rclone will check up to --checkers in parallel.
This commit is contained in:
parent
63b51c6742
commit
5ee1816a71
|
@ -13,6 +13,7 @@ import (
|
||||||
|
|
||||||
"github.com/ncw/rclone/fs"
|
"github.com/ncw/rclone/fs"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Active is the globally active filter
|
// Active is the globally active filter
|
||||||
|
@ -511,17 +512,33 @@ func (f *Filter) MakeListR(NewObject func(remote string) (fs.Object, error)) fs.
|
||||||
if !f.HaveFilesFrom() {
|
if !f.HaveFilesFrom() {
|
||||||
return errFilesFromNotSet
|
return errFilesFromNotSet
|
||||||
}
|
}
|
||||||
var entries fs.DirEntries
|
var (
|
||||||
for remote := range f.files {
|
remotes = make(chan string, fs.Config.Checkers)
|
||||||
entry, err := NewObject(remote)
|
g errgroup.Group
|
||||||
if err == fs.ErrorObjectNotFound {
|
)
|
||||||
// Skip files that are not found
|
for i := 0; i < fs.Config.Checkers; i++ {
|
||||||
} else if err != nil {
|
g.Go(func() (err error) {
|
||||||
return err
|
var entries = make(fs.DirEntries, 1)
|
||||||
} else {
|
for remote := range remotes {
|
||||||
entries = append(entries, entry)
|
entries[0], err = NewObject(remote)
|
||||||
}
|
if err == fs.ErrorObjectNotFound {
|
||||||
|
// Skip files that are not found
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
err = callback(entries)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
return callback(entries)
|
for remote := range f.files {
|
||||||
|
remotes <- remote
|
||||||
|
}
|
||||||
|
close(remotes)
|
||||||
|
return g.Wait()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -220,7 +221,10 @@ func TestNewFilterMakeListR(t *testing.T) {
|
||||||
|
|
||||||
// NewObject function for MakeListR
|
// NewObject function for MakeListR
|
||||||
newObjects := FilesMap{}
|
newObjects := FilesMap{}
|
||||||
|
var newObjectMu sync.Mutex
|
||||||
NewObject := func(remote string) (fs.Object, error) {
|
NewObject := func(remote string) (fs.Object, error) {
|
||||||
|
newObjectMu.Lock()
|
||||||
|
defer newObjectMu.Unlock()
|
||||||
if remote == "notfound" {
|
if remote == "notfound" {
|
||||||
return nil, fs.ErrorObjectNotFound
|
return nil, fs.ErrorObjectNotFound
|
||||||
} else if remote == "error" {
|
} else if remote == "error" {
|
||||||
|
@ -233,7 +237,10 @@ func TestNewFilterMakeListR(t *testing.T) {
|
||||||
|
|
||||||
// Callback for ListRFn
|
// Callback for ListRFn
|
||||||
listRObjects := FilesMap{}
|
listRObjects := FilesMap{}
|
||||||
|
var callbackMu sync.Mutex
|
||||||
listRcallback := func(entries fs.DirEntries) error {
|
listRcallback := func(entries fs.DirEntries) error {
|
||||||
|
callbackMu.Lock()
|
||||||
|
defer callbackMu.Unlock()
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
listRObjects[entry.Remote()] = struct{}{}
|
listRObjects[entry.Remote()] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue