2018-07-19 23:41:34 +02:00
|
|
|
package sync
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2021-11-04 11:12:57 +01:00
|
|
|
"fmt"
|
2020-05-15 12:23:16 +02:00
|
|
|
"math/bits"
|
2020-03-13 22:12:22 +01:00
|
|
|
"strconv"
|
2019-11-28 18:01:21 +01:00
|
|
|
"strings"
|
2018-07-19 23:41:34 +02:00
|
|
|
"sync"
|
|
|
|
|
2020-03-13 18:10:06 +01:00
|
|
|
"github.com/aalpar/deheap"
|
2019-07-28 19:47:38 +02:00
|
|
|
"github.com/rclone/rclone/fs"
|
2019-11-28 18:01:21 +01:00
|
|
|
"github.com/rclone/rclone/fs/fserrors"
|
2018-07-19 23:41:34 +02:00
|
|
|
)
|
|
|
|
|
2019-11-28 18:01:21 +01:00
|
|
|
// compare two items for order by
|
|
|
|
type lessFn func(a, b fs.ObjectPair) bool
|
|
|
|
|
2018-07-19 23:41:34 +02:00
|
|
|
// pipe provides an unbounded channel like experience
|
|
|
|
//
|
|
|
|
// Note unlike channels these aren't strictly ordered.
|
|
|
|
type pipe struct {
|
|
|
|
mu sync.Mutex
|
|
|
|
c chan struct{}
|
|
|
|
queue []fs.ObjectPair
|
|
|
|
closed bool
|
|
|
|
totalSize int64
|
|
|
|
stats func(items int, totalSize int64)
|
2019-11-28 18:01:21 +01:00
|
|
|
less lessFn
|
2020-03-13 22:12:22 +01:00
|
|
|
fraction int
|
2018-07-19 23:41:34 +02:00
|
|
|
}
|
|
|
|
|
2019-11-28 18:01:21 +01:00
|
|
|
func newPipe(orderBy string, stats func(items int, totalSize int64), maxBacklog int) (*pipe, error) {
|
2020-05-15 12:23:16 +02:00
|
|
|
if maxBacklog < 0 {
|
Spelling fixes
Fix spelling of: above, already, anonymous, associated,
authentication, bandwidth, because, between, blocks, calculate,
candidates, cautious, changelog, cleaner, clipboard, command,
completely, concurrently, considered, constructs, corrupt, current,
daemon, dependencies, deprecated, directory, dispatcher, download,
eligible, ellipsis, encrypter, endpoint, entrieslist, essentially,
existing writers, existing, expires, filesystem, flushing, frequently,
hierarchy, however, implementation, implements, inaccurate,
individually, insensitive, longer, maximum, metadata, modified,
multipart, namedirfirst, nextcloud, obscured, opened, optional,
owncloud, pacific, passphrase, password, permanently, persimmon,
positive, potato, protocol, quota, receiving, recommends, referring,
requires, revisited, satisfied, satisfies, satisfy, semver,
serialized, session, storage, strategies, stringlist, successful,
supported, surprise, temporarily, temporary, transactions, unneeded,
update, uploads, wrapped
Signed-off-by: Josh Soref <jsoref@users.noreply.github.com>
2020-10-09 02:17:24 +02:00
|
|
|
maxBacklog = (1 << (bits.UintSize - 1)) - 1 // largest positive int
|
2020-05-15 12:23:16 +02:00
|
|
|
}
|
2020-03-13 22:12:22 +01:00
|
|
|
less, fraction, err := newLess(orderBy)
|
2019-11-28 18:01:21 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, fserrors.FatalError(err)
|
|
|
|
}
|
|
|
|
p := &pipe{
|
2020-03-13 22:12:22 +01:00
|
|
|
c: make(chan struct{}, maxBacklog),
|
|
|
|
stats: stats,
|
|
|
|
less: less,
|
|
|
|
fraction: fraction,
|
2018-07-19 23:41:34 +02:00
|
|
|
}
|
2019-11-28 18:01:21 +01:00
|
|
|
if p.less != nil {
|
2020-03-13 18:10:06 +01:00
|
|
|
deheap.Init(p)
|
2019-11-28 18:01:21 +01:00
|
|
|
}
|
|
|
|
return p, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Len satisfy heap.Interface - must be called with lock held
|
|
|
|
func (p *pipe) Len() int {
|
|
|
|
return len(p.queue)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Len satisfy heap.Interface - must be called with lock held
|
|
|
|
func (p *pipe) Less(i, j int) bool {
|
|
|
|
return p.less(p.queue[i], p.queue[j])
|
2018-07-19 23:41:34 +02:00
|
|
|
}
|
|
|
|
|
2019-11-28 18:01:21 +01:00
|
|
|
// Swap satisfy heap.Interface - must be called with lock held
|
|
|
|
func (p *pipe) Swap(i, j int) {
|
|
|
|
p.queue[i], p.queue[j] = p.queue[j], p.queue[i]
|
|
|
|
}
|
|
|
|
|
|
|
|
// Push satisfy heap.Interface - must be called with lock held
|
build: modernize Go usage
This commit modernizes Go usage. This was done with:
go run golang.org/x/tools/gopls/internal/analysis/modernize/cmd/modernize@latest -fix -test ./...
Then files needed to be `go fmt`ed and a few comments needed to be
restored.
The modernizations include replacing
- if/else conditional assignment by a call to the built-in min or max functions added in go1.21
- sort.Slice(x, func(i, j int) bool) { return s[i] < s[j] } by a call to slices.Sort(s), added in go1.21
- interface{} by the 'any' type added in go1.18
- append([]T(nil), s...) by slices.Clone(s) or slices.Concat(s), added in go1.21
- loop around an m[k]=v map update by a call to one of the Collect, Copy, Clone, or Insert functions from the maps package, added in go1.21
- []byte(fmt.Sprintf...) by fmt.Appendf(nil, ...), added in go1.19
- append(s[:i], s[i+1]...) by slices.Delete(s, i, i+1), added in go1.21
- a 3-clause for i := 0; i < n; i++ {} loop by for i := range n {}, added in go1.22
2025-02-26 22:08:12 +01:00
|
|
|
func (p *pipe) Push(item any) {
|
2019-11-28 18:01:21 +01:00
|
|
|
p.queue = append(p.queue, item.(fs.ObjectPair))
|
|
|
|
}
|
|
|
|
|
|
|
|
// Pop satisfy heap.Interface - must be called with lock held
|
build: modernize Go usage
This commit modernizes Go usage. This was done with:
go run golang.org/x/tools/gopls/internal/analysis/modernize/cmd/modernize@latest -fix -test ./...
Then files needed to be `go fmt`ed and a few comments needed to be
restored.
The modernizations include replacing
- if/else conditional assignment by a call to the built-in min or max functions added in go1.21
- sort.Slice(x, func(i, j int) bool) { return s[i] < s[j] } by a call to slices.Sort(s), added in go1.21
- interface{} by the 'any' type added in go1.18
- append([]T(nil), s...) by slices.Clone(s) or slices.Concat(s), added in go1.21
- loop around an m[k]=v map update by a call to one of the Collect, Copy, Clone, or Insert functions from the maps package, added in go1.21
- []byte(fmt.Sprintf...) by fmt.Appendf(nil, ...), added in go1.19
- append(s[:i], s[i+1]...) by slices.Delete(s, i, i+1), added in go1.21
- a 3-clause for i := 0; i < n; i++ {} loop by for i := range n {}, added in go1.22
2025-02-26 22:08:12 +01:00
|
|
|
func (p *pipe) Pop() any {
|
2019-11-28 18:01:21 +01:00
|
|
|
old := p.queue
|
|
|
|
n := len(old)
|
|
|
|
item := old[n-1]
|
|
|
|
old[n-1] = fs.ObjectPair{} // avoid memory leak
|
|
|
|
p.queue = old[0 : n-1]
|
|
|
|
return item
|
|
|
|
}
|
|
|
|
|
2020-05-20 12:39:20 +02:00
|
|
|
// Put a pair into the pipe
|
2018-07-19 23:41:34 +02:00
|
|
|
//
|
|
|
|
// It returns ok = false if the context was cancelled
|
|
|
|
//
|
|
|
|
// It will panic if you call it after Close()
|
2023-03-08 14:03:05 +01:00
|
|
|
//
|
|
|
|
// Note that pairs where src==dst aren't counted for stats
|
2018-07-19 23:41:34 +02:00
|
|
|
func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) {
|
|
|
|
if ctx.Err() != nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
p.mu.Lock()
|
2019-11-28 18:01:21 +01:00
|
|
|
if p.less == nil {
|
|
|
|
// no order-by
|
|
|
|
p.queue = append(p.queue, pair)
|
|
|
|
} else {
|
2020-03-13 18:10:06 +01:00
|
|
|
deheap.Push(p, pair)
|
2019-11-28 18:01:21 +01:00
|
|
|
}
|
2018-07-19 23:41:34 +02:00
|
|
|
size := pair.Src.Size()
|
2023-03-08 14:03:05 +01:00
|
|
|
if size > 0 && pair.Src != pair.Dst {
|
2018-07-19 23:41:34 +02:00
|
|
|
p.totalSize += size
|
|
|
|
}
|
|
|
|
p.stats(len(p.queue), p.totalSize)
|
|
|
|
p.mu.Unlock()
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return false
|
|
|
|
case p.c <- struct{}{}:
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get a pair from the pipe
|
|
|
|
//
|
2020-03-13 22:12:22 +01:00
|
|
|
// If fraction is > the mixed fraction set in the pipe then it gets it
|
|
|
|
// from the other end of the heap if order-by is in effect
|
|
|
|
//
|
2018-07-19 23:41:34 +02:00
|
|
|
// It returns ok = false if the context was cancelled or Close() has
|
|
|
|
// been called.
|
2020-03-13 22:12:22 +01:00
|
|
|
func (p *pipe) GetMax(ctx context.Context, fraction int) (pair fs.ObjectPair, ok bool) {
|
2018-07-19 23:41:34 +02:00
|
|
|
if ctx.Err() != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case _, ok = <-p.c:
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
p.mu.Lock()
|
2019-11-28 18:01:21 +01:00
|
|
|
if p.less == nil {
|
|
|
|
// no order-by
|
|
|
|
pair = p.queue[0]
|
|
|
|
p.queue[0] = fs.ObjectPair{} // avoid memory leak
|
|
|
|
p.queue = p.queue[1:]
|
2020-03-13 22:12:22 +01:00
|
|
|
} else if p.fraction < 0 || fraction < p.fraction {
|
2020-03-13 18:10:06 +01:00
|
|
|
pair = deheap.Pop(p).(fs.ObjectPair)
|
2020-03-13 22:12:22 +01:00
|
|
|
} else {
|
|
|
|
pair = deheap.PopMax(p).(fs.ObjectPair)
|
2019-11-28 18:01:21 +01:00
|
|
|
}
|
2018-07-19 23:41:34 +02:00
|
|
|
size := pair.Src.Size()
|
2023-03-08 14:03:05 +01:00
|
|
|
if size > 0 && pair.Src != pair.Dst {
|
2018-07-19 23:41:34 +02:00
|
|
|
p.totalSize -= size
|
|
|
|
}
|
|
|
|
if p.totalSize < 0 {
|
|
|
|
p.totalSize = 0
|
|
|
|
}
|
|
|
|
p.stats(len(p.queue), p.totalSize)
|
|
|
|
p.mu.Unlock()
|
|
|
|
return pair, true
|
|
|
|
}
|
|
|
|
|
2020-03-13 22:12:22 +01:00
|
|
|
// Get a pair from the pipe
|
|
|
|
//
|
|
|
|
// It returns ok = false if the context was cancelled or Close() has
|
|
|
|
// been called.
|
|
|
|
func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) {
|
|
|
|
return p.GetMax(ctx, -1)
|
|
|
|
}
|
|
|
|
|
2018-07-19 23:41:34 +02:00
|
|
|
// Stats reads the number of items in the queue and the totalSize
|
|
|
|
func (p *pipe) Stats() (items int, totalSize int64) {
|
|
|
|
p.mu.Lock()
|
|
|
|
items, totalSize = len(p.queue), p.totalSize
|
|
|
|
p.mu.Unlock()
|
|
|
|
return items, totalSize
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close the pipe
|
|
|
|
//
|
|
|
|
// Writes to a closed pipe will panic as will double closing a pipe
|
|
|
|
func (p *pipe) Close() {
|
|
|
|
p.mu.Lock()
|
|
|
|
close(p.c)
|
|
|
|
p.closed = true
|
|
|
|
p.mu.Unlock()
|
|
|
|
}
|
2019-11-28 18:01:21 +01:00
|
|
|
|
|
|
|
// newLess returns a less function for the heap comparison or nil if
|
|
|
|
// one is not required
|
2020-03-13 22:12:22 +01:00
|
|
|
func newLess(orderBy string) (less lessFn, fraction int, err error) {
|
|
|
|
fraction = -1
|
2019-11-28 18:01:21 +01:00
|
|
|
if orderBy == "" {
|
2020-03-13 22:12:22 +01:00
|
|
|
return nil, fraction, nil
|
2019-11-28 18:01:21 +01:00
|
|
|
}
|
|
|
|
parts := strings.Split(strings.ToLower(orderBy), ",")
|
|
|
|
switch parts[0] {
|
|
|
|
case "name":
|
|
|
|
less = func(a, b fs.ObjectPair) bool {
|
|
|
|
return a.Src.Remote() < b.Src.Remote()
|
|
|
|
}
|
|
|
|
case "size":
|
|
|
|
less = func(a, b fs.ObjectPair) bool {
|
|
|
|
return a.Src.Size() < b.Src.Size()
|
|
|
|
}
|
|
|
|
case "modtime":
|
|
|
|
less = func(a, b fs.ObjectPair) bool {
|
|
|
|
ctx := context.Background()
|
|
|
|
return a.Src.ModTime(ctx).Before(b.Src.ModTime(ctx))
|
|
|
|
}
|
|
|
|
default:
|
2021-11-04 11:12:57 +01:00
|
|
|
return nil, fraction, fmt.Errorf("unknown --order-by comparison %q", parts[0])
|
2019-11-28 18:01:21 +01:00
|
|
|
}
|
|
|
|
descending := false
|
|
|
|
if len(parts) > 1 {
|
|
|
|
switch parts[1] {
|
|
|
|
case "ascending", "asc":
|
|
|
|
case "descending", "desc":
|
|
|
|
descending = true
|
2020-03-13 22:12:22 +01:00
|
|
|
case "mixed":
|
|
|
|
fraction = 50
|
|
|
|
if len(parts) > 2 {
|
|
|
|
fraction, err = strconv.Atoi(parts[2])
|
|
|
|
if err != nil {
|
2021-11-04 11:12:57 +01:00
|
|
|
return nil, fraction, fmt.Errorf("bad mixed fraction --order-by %q", parts[2])
|
2020-03-13 22:12:22 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-28 18:01:21 +01:00
|
|
|
default:
|
2021-11-04 11:12:57 +01:00
|
|
|
return nil, fraction, fmt.Errorf("unknown --order-by sort direction %q", parts[1])
|
2019-11-28 18:01:21 +01:00
|
|
|
}
|
|
|
|
}
|
2020-03-13 22:12:22 +01:00
|
|
|
if (fraction >= 0 && len(parts) > 3) || (fraction < 0 && len(parts) > 2) {
|
2021-11-04 11:12:57 +01:00
|
|
|
return nil, fraction, fmt.Errorf("bad --order-by string %q", orderBy)
|
2020-03-13 22:12:22 +01:00
|
|
|
}
|
2019-11-28 18:01:21 +01:00
|
|
|
if descending {
|
|
|
|
oldLess := less
|
|
|
|
less = func(a, b fs.ObjectPair) bool {
|
|
|
|
return !oldLess(a, b)
|
|
|
|
}
|
|
|
|
}
|
2020-03-13 22:12:22 +01:00
|
|
|
return less, fraction, nil
|
2019-11-28 18:01:21 +01:00
|
|
|
}
|