diff --git a/fs/accounting/token_bucket.go b/fs/accounting/token_bucket.go index 377d9af44..80831c4f2 100644 --- a/fs/accounting/token_bucket.go +++ b/fs/accounting/token_bucket.go @@ -27,6 +27,16 @@ const ( type buckets [TokenBucketSlots]*rate.Limiter +// can't request more than this many bytes at once +// +// set small for edge bandwidth limiters, but big for core bandwidth +// limiters since we may be using both at once +var maxBurstSizes = [TokenBucketSlots]int{ + TokenBucketSlotAccounting: 4 * 1024 * 1024, + TokenBucketSlotTransportRx: 4 * 1024, + TokenBucketSlotTransportTx: 4 * 1024, +} + // tokenBucket holds info about the rate limiters in use type tokenBucket struct { mu sync.RWMutex // protects the token bucket variables @@ -53,28 +63,26 @@ func (bs *buckets) _setOff() { } } -const maxBurstSize = 4 * 1024 * 1024 // must be bigger than the biggest request - // make a new empty token bucket with the bandwidth(s) given func newTokenBucket(bandwidth fs.BwPair) (tbs buckets) { bandwidthAccounting := fs.SizeSuffix(-1) if bandwidth.Tx > 0 { - tbs[TokenBucketSlotTransportTx] = rate.NewLimiter(rate.Limit(bandwidth.Tx), maxBurstSize) + tbs[TokenBucketSlotTransportTx] = rate.NewLimiter(rate.Limit(bandwidth.Tx), maxBurstSizes[TokenBucketSlotTransportTx]) bandwidthAccounting = bandwidth.Tx } if bandwidth.Rx > 0 { - tbs[TokenBucketSlotTransportRx] = rate.NewLimiter(rate.Limit(bandwidth.Rx), maxBurstSize) + tbs[TokenBucketSlotTransportRx] = rate.NewLimiter(rate.Limit(bandwidth.Rx), maxBurstSizes[TokenBucketSlotTransportRx]) if bandwidth.Rx > bandwidthAccounting { bandwidthAccounting = bandwidth.Rx } } if bandwidthAccounting > 0 { - tbs[TokenBucketSlotAccounting] = rate.NewLimiter(rate.Limit(bandwidthAccounting), maxBurstSize) + tbs[TokenBucketSlotAccounting] = rate.NewLimiter(rate.Limit(bandwidthAccounting), maxBurstSizes[TokenBucketSlotAccounting]) } - for _, tb := range tbs { + for i, tb := range tbs { if tb != nil { // empty the bucket - err := tb.WaitN(context.Background(), maxBurstSize) + err := tb.WaitN(context.Background(), maxBurstSizes[i]) if err != nil { fs.Errorf(nil, "Failed to empty token bucket: %v", err) } @@ -149,20 +157,28 @@ func (tb *tokenBucket) StartTokenTicker(ctx context.Context) { }() } -// LimitBandwidth sleeps for the correct amount of time for the passage -// of n bytes according to the current bandwidth limit +// LimitBandwidth sleeps for the correct amount of time for the +// passage of n bytes according to the current bandwidth limit. func (tb *tokenBucket) LimitBandwidth(i TokenBucketSlot, n int) { tb.mu.RLock() + t := tb.curr[i] + maxBurstSize := maxBurstSizes[i] + tb.mu.RUnlock() // Limit the transfer speed if required - if tb.curr[i] != nil { - err := tb.curr[i].WaitN(context.Background(), n) - if err != nil { - fs.Errorf(nil, "Token bucket error: %v", err) + if t != nil && n > 0 { + // wait in chunks of maxBurstSize + for toWait := maxBurstSize; n > 0; n -= toWait { + if n < maxBurstSize { + toWait = n + } + err := t.WaitN(context.Background(), toWait) + if err != nil { + fs.Errorf(nil, "Token bucket error: %v", err) + } } } - tb.mu.RUnlock() } // SetBwLimit sets the current bandwidth limit