diff --git a/backend/box/box.go b/backend/box/box.go index 8c5903578..90e4ba83e 100644 --- a/backend/box/box.go +++ b/backend/box/box.go @@ -491,14 +491,14 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e // If using box config.json and JWT, renewing should just refresh the token and // should do so whether there are uploads pending or not. if ok && boxSubTypeOk && jsonFile != "" && boxSubType != "" { - f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error { + f.tokenRenewer = oauthutil.NewRenewBeforeExpiry(f.String(), ts, func() error { err := refreshJWTToken(ctx, jsonFile, boxSubType, name, m) return err }) f.tokenRenewer.Start() } else { // Renew the token in the background - f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error { + f.tokenRenewer = oauthutil.NewRenewOnExpiry(f.String(), ts, func() error { _, err := f.readMetaDataForPath(ctx, "") return err }) diff --git a/backend/hidrive/hidrive.go b/backend/hidrive/hidrive.go index 33b2fb09c..17890b193 100644 --- a/backend/hidrive/hidrive.go +++ b/backend/hidrive/hidrive.go @@ -324,7 +324,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e _, err := f.fetchMetadataForPath(ctx, resolvedRoot, api.HiDriveObjectNoMetadataFields) return err } - f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, transaction) + f.tokenRenewer = oauthutil.NewRenewOnExpiry(f.String(), ts, transaction) } // Do not allow the root-prefix to be nonexistent nor a directory, diff --git a/backend/jottacloud/jottacloud.go b/backend/jottacloud/jottacloud.go index 6482070ab..903134270 100644 --- a/backend/jottacloud/jottacloud.go +++ b/backend/jottacloud/jottacloud.go @@ -1001,7 +1001,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e } // Renew the token in the background - f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error { + f.tokenRenewer = oauthutil.NewRenewOnExpiry(f.String(), ts, func() error { _, err := f.readMetaDataForPath(ctx, "") if err == fs.ErrorNotAFile || err == fs.ErrorIsDir { err = nil diff --git a/backend/onedrive/onedrive.go b/backend/onedrive/onedrive.go index 980eec8c9..db443246f 100644 --- a/backend/onedrive/onedrive.go +++ b/backend/onedrive/onedrive.go @@ -1099,7 +1099,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e } // Renew the token in the background - f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error { + f.tokenRenewer = oauthutil.NewRenewOnExpiry(f.String(), ts, func() error { _, _, err := f.readMetaDataForPath(ctx, "") return err }) diff --git a/backend/pcloud/pcloud.go b/backend/pcloud/pcloud.go index 763784645..e2cbe6f87 100644 --- a/backend/pcloud/pcloud.go +++ b/backend/pcloud/pcloud.go @@ -336,7 +336,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e f.srv.SetErrorHandler(errorHandler) // Renew the token in the background - f.tokenRenewer = oauthutil.NewRenew(f.String(), f.ts, func() error { + f.tokenRenewer = oauthutil.NewRenewOnExpiry(f.String(), f.ts, func() error { _, err := f.readMetaDataForPath(ctx, "") return err }) diff --git a/backend/premiumizeme/premiumizeme.go b/backend/premiumizeme/premiumizeme.go index 5bf87badf..3b76a15f3 100644 --- a/backend/premiumizeme/premiumizeme.go +++ b/backend/premiumizeme/premiumizeme.go @@ -270,7 +270,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e // Renew the token in the background if ts != nil { - f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error { + f.tokenRenewer = oauthutil.NewRenewOnExpiry(f.String(), ts, func() error { _, err := f.About(ctx) return err }) diff --git a/backend/sharefile/sharefile.go b/backend/sharefile/sharefile.go index d35468e0c..096cd7950 100644 --- a/backend/sharefile/sharefile.go +++ b/backend/sharefile/sharefile.go @@ -463,7 +463,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e // Renew the token in the background if ts != nil { - f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error { + f.tokenRenewer = oauthutil.NewRenewOnExpiry(f.String(), ts, func() error { _, err := f.List(ctx, "") return err }) diff --git a/lib/oauthutil/oauthutil.go b/lib/oauthutil/oauthutil.go index a03fd592d..0d2251704 100644 --- a/lib/oauthutil/oauthutil.go +++ b/lib/oauthutil/oauthutil.go @@ -80,6 +80,11 @@ All done. Please go back to rclone. ` + + // BeforeExpiryDelta determines how long before a token's actual expiry + // that OnBeforeExpiry signals. This provides time for refreshing the token, + // when the refresh is handled outside of this package. + BeforeExpiryDelta = 60 * time.Second ) // OpenURL is used when rclone wants to open a browser window @@ -223,14 +228,15 @@ func PutToken(name string, m configmap.Mapper, token *oauth2.Token, newSection b // TokenSource stores updated tokens in the config file type TokenSource struct { - mu sync.Mutex - name string - m configmap.Mapper - tokenSource oauth2.TokenSource - token *oauth2.Token - config *Config - ctx context.Context - expiryTimer *time.Timer // signals whenever the token expires + mu sync.Mutex + name string + m configmap.Mapper + tokenSource oauth2.TokenSource + token *oauth2.Token + config *Config + ctx context.Context + expiryTimer *time.Timer // signals whenever the token expires + beforeExpiryTimer *time.Timer // signals shortly before the token expires } // If token has expired then first try re-reading it (and its refresh token) @@ -359,10 +365,13 @@ func (ts *TokenSource) Token() (*oauth2.Token, error) { changed = changed || ts.token == nil || token.AccessToken != ts.token.AccessToken || token.RefreshToken != ts.token.RefreshToken || token.Expiry != ts.token.Expiry ts.token = token if changed { - // Bump on the expiry timer if it is set + // Bump the expiry timers if they are set if ts.expiryTimer != nil { ts.expiryTimer.Reset(ts.timeToExpiry()) } + if ts.beforeExpiryTimer != nil { + ts.beforeExpiryTimer.Reset(ts.timeToExpiry() - BeforeExpiryDelta) + } err = PutToken(ts.name, ts.m, token, false) if err != nil { return nil, fmt.Errorf("couldn't store token: %w", err) @@ -421,6 +430,18 @@ func (ts *TokenSource) OnExpiry() <-chan time.Time { return ts.expiryTimer.C } +// OnBeforeExpiry returns a channel which has the time written to it shortly +// before the token expires. Note that there is only one channel so if +// attaching multiple go routines it will only signal to one of them. +func (ts *TokenSource) OnBeforeExpiry() <-chan time.Time { + ts.mu.Lock() + defer ts.mu.Unlock() + if ts.beforeExpiryTimer == nil { + ts.beforeExpiryTimer = time.NewTimer(ts.timeToExpiry() - BeforeExpiryDelta) + } + return ts.beforeExpiryTimer.C +} + // Check interface satisfied var _ oauth2.TokenSource = (*TokenSource)(nil) diff --git a/lib/oauthutil/renew.go b/lib/oauthutil/renew.go index e3652ee9c..c5f738719 100644 --- a/lib/oauthutil/renew.go +++ b/lib/oauthutil/renew.go @@ -17,12 +17,12 @@ type Renew struct { shutdown sync.Once } -// NewRenew creates a new Renew struct and starts a background process +// NewRenewOnExpiry creates a new Renew struct and starts a background process // which renews the token whenever it expires. It uses the run() call // to run a transaction to do this. // // It will only renew the token if the number of uploads > 0 -func NewRenew(name string, ts *TokenSource, run func() error) *Renew { +func NewRenewOnExpiry(name string, ts *TokenSource, run func() error) *Renew { r := &Renew{ name: name, ts: ts, @@ -33,6 +33,22 @@ func NewRenew(name string, ts *TokenSource, run func() error) *Renew { return r } +// NewRenewBeforeExpiry creates a new Renew struct and starts a background process +// which renews the token shortly before it expires. It uses the run() call +// to run a transaction to do this. +// +// It will only renew the token if the number of uploads > 0 +func NewRenewBeforeExpiry(name string, ts *TokenSource, run func() error) *Renew { + r := &Renew{ + name: name, + ts: ts, + run: run, + done: make(chan any), + } + go r.renewOnBeforeExpiry() + return r +} + // renewOnExpiry renews the token whenever it expires. Useful when there // are lots of uploads in progress and the token doesn't get renewed. // Amazon seem to cancel your uploads if you don't renew your token @@ -45,19 +61,39 @@ func (r *Renew) renewOnExpiry() { case <-r.done: return } - uploads := r.uploads.Load() - if uploads != 0 { - fs.Debugf(r.name, "Token expired - %d uploads in progress - refreshing", uploads) - // Do a transaction - err := r.run() - if err == nil { - fs.Debugf(r.name, "Token refresh successful") - } else { - fs.Errorf(r.name, "Token refresh failed: %v", err) - } - } else { - fs.Debugf(r.name, "Token expired but no uploads in progress - doing nothing") + r.renewToken() + } +} + +// renewOnBeforeExpiry renews the token shortly before it expires. This +// permits refresh of the token before it expires for packages that +// independently manage token refresh +func (r *Renew) renewOnBeforeExpiry() { + expiry := r.ts.OnBeforeExpiry() + for { + select { + case <-expiry: + case <-r.done: + return } + r.renewToken() + } +} + +// Renew the token by running the provided transaction if any uploads are in progress +func (r *Renew) renewToken() { + uploads := r.uploads.Load() + if uploads != 0 { + fs.Debugf(r.name, "Token expired - %d uploads in progress - refreshing", uploads) + // Do a transaction + err := r.run() + if err == nil { + fs.Debugf(r.name, "Token refresh successful") + } else { + fs.Errorf(r.name, "Token refresh failed: %v", err) + } + } else { + fs.Debugf(r.name, "Token expired but no uploads in progress - doing nothing") } } @@ -88,7 +124,12 @@ func (r *Renew) Shutdown() { } // closing a channel can only be done once r.shutdown.Do(func() { - r.ts.expiryTimer.Stop() + if r.ts.expiryTimer != nil { + r.ts.expiryTimer.Stop() + } + if r.ts.beforeExpiryTimer != nil { + r.ts.beforeExpiryTimer.Stop() + } close(r.done) }) }