From a42dc57358f138e60d4b40373ad59534eb46eeb9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 5 Mar 2026 17:55:25 +0000 Subject: [PATCH] Refactor RegisterJob signature and implement BackOff handling Co-authored-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- backend/internal/job/analytics_job.go | 2 +- backend/internal/job/api_key_expiry_job.go | 2 +- backend/internal/job/db_cleanup_job.go | 21 ++++++++----- backend/internal/job/file_cleanup_job.go | 5 ++-- backend/internal/job/geoloite_update_job.go | 2 +- backend/internal/job/ldap_job.go | 2 +- backend/internal/job/scheduler.go | 33 ++++++++++++++++++--- backend/internal/job/scim_job.go | 2 +- backend/internal/service/scheduler.go | 25 ++++++++++++++++ backend/internal/service/scim_service.go | 7 +---- 10 files changed, 76 insertions(+), 25 deletions(-) create mode 100644 backend/internal/service/scheduler.go diff --git a/backend/internal/job/analytics_job.go b/backend/internal/job/analytics_job.go index f67e2042..c2e658df 100644 --- a/backend/internal/job/analytics_job.go +++ b/backend/internal/job/analytics_job.go @@ -28,7 +28,7 @@ func (s *Scheduler) RegisterAnalyticsJob(ctx context.Context, appConfig *service appConfig: appConfig, httpClient: httpClient, } - return s.RegisterJob(ctx, "SendHeartbeat", gocron.DurationJob(24*time.Hour), jobs.sendHeartbeat, true) + return s.RegisterJob(ctx, "SendHeartbeat", gocron.DurationJob(24*time.Hour), jobs.sendHeartbeat, service.RegisterJobOpts{RunImmediately: true}) } type AnalyticsJob struct { diff --git a/backend/internal/job/api_key_expiry_job.go b/backend/internal/job/api_key_expiry_job.go index 969b7c7e..f7771c27 100644 --- a/backend/internal/job/api_key_expiry_job.go +++ b/backend/internal/job/api_key_expiry_job.go @@ -22,7 +22,7 @@ func (s *Scheduler) RegisterApiKeyExpiryJob(ctx context.Context, apiKeyService * } // Send every day at midnight - return s.RegisterJob(ctx, "ExpiredApiKeyEmailJob", gocron.CronJob("0 0 * * *", false), jobs.checkAndNotifyExpiringApiKeys, false) + return s.RegisterJob(ctx, "ExpiredApiKeyEmailJob", gocron.CronJob("0 0 * * *", false), jobs.checkAndNotifyExpiringApiKeys, service.RegisterJobOpts{RunImmediately: false}) } func (j *ApiKeyEmailJobs) checkAndNotifyExpiringApiKeys(ctx context.Context) error { diff --git a/backend/internal/job/db_cleanup_job.go b/backend/internal/job/db_cleanup_job.go index e5c1e085..4d184d6f 100644 --- a/backend/internal/job/db_cleanup_job.go +++ b/backend/internal/job/db_cleanup_job.go @@ -7,25 +7,30 @@ import ( "log/slog" "time" + backoff "github.com/cenkalti/backoff/v5" "gorm.io/gorm" "github.com/pocket-id/pocket-id/backend/internal/common" "github.com/pocket-id/pocket-id/backend/internal/model" datatype "github.com/pocket-id/pocket-id/backend/internal/model/types" + "github.com/pocket-id/pocket-id/backend/internal/service" ) func (s *Scheduler) RegisterDbCleanupJobs(ctx context.Context, db *gorm.DB) error { jobs := &DbCleanupJobs{db: db} + // Use exponential backoff for each DB cleanup job so transient query failures + // are retried automatically rather than causing an immediate job failure. + // Each job gets its own backoff instance to avoid shared state. return errors.Join( - s.RegisterJob(ctx, "ClearWebauthnSessions", jobDefWithJitter(24*time.Hour), jobs.clearWebauthnSessions, true), - s.RegisterJob(ctx, "ClearOneTimeAccessTokens", jobDefWithJitter(24*time.Hour), jobs.clearOneTimeAccessTokens, true), - s.RegisterJob(ctx, "ClearSignupTokens", jobDefWithJitter(24*time.Hour), jobs.clearSignupTokens, true), - s.RegisterJob(ctx, "ClearEmailVerificationTokens", jobDefWithJitter(24*time.Hour), jobs.clearEmailVerificationTokens, true), - s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", jobDefWithJitter(24*time.Hour), jobs.clearOidcAuthorizationCodes, true), - s.RegisterJob(ctx, "ClearOidcRefreshTokens", jobDefWithJitter(24*time.Hour), jobs.clearOidcRefreshTokens, true), - s.RegisterJob(ctx, "ClearReauthenticationTokens", jobDefWithJitter(24*time.Hour), jobs.clearReauthenticationTokens, true), - s.RegisterJob(ctx, "ClearAuditLogs", jobDefWithJitter(24*time.Hour), jobs.clearAuditLogs, true), + s.RegisterJob(ctx, "ClearWebauthnSessions", jobDefWithJitter(24*time.Hour), jobs.clearWebauthnSessions, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}), + s.RegisterJob(ctx, "ClearOneTimeAccessTokens", jobDefWithJitter(24*time.Hour), jobs.clearOneTimeAccessTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}), + s.RegisterJob(ctx, "ClearSignupTokens", jobDefWithJitter(24*time.Hour), jobs.clearSignupTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}), + s.RegisterJob(ctx, "ClearEmailVerificationTokens", jobDefWithJitter(24*time.Hour), jobs.clearEmailVerificationTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}), + s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", jobDefWithJitter(24*time.Hour), jobs.clearOidcAuthorizationCodes, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}), + s.RegisterJob(ctx, "ClearOidcRefreshTokens", jobDefWithJitter(24*time.Hour), jobs.clearOidcRefreshTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}), + s.RegisterJob(ctx, "ClearReauthenticationTokens", jobDefWithJitter(24*time.Hour), jobs.clearReauthenticationTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}), + s.RegisterJob(ctx, "ClearAuditLogs", jobDefWithJitter(24*time.Hour), jobs.clearAuditLogs, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}), ) } diff --git a/backend/internal/job/file_cleanup_job.go b/backend/internal/job/file_cleanup_job.go index 2f70cb7c..45b9a63b 100644 --- a/backend/internal/job/file_cleanup_job.go +++ b/backend/internal/job/file_cleanup_job.go @@ -13,6 +13,7 @@ import ( "gorm.io/gorm" "github.com/pocket-id/pocket-id/backend/internal/model" + "github.com/pocket-id/pocket-id/backend/internal/service" "github.com/pocket-id/pocket-id/backend/internal/storage" ) @@ -21,13 +22,13 @@ func (s *Scheduler) RegisterFileCleanupJobs(ctx context.Context, db *gorm.DB, fi var errs []error errs = append(errs, - s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, false), + s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, service.RegisterJobOpts{RunImmediately: false}), ) // Only necessary for file system storage if fileStorage.Type() == storage.TypeFileSystem { errs = append(errs, - s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, true), + s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, service.RegisterJobOpts{RunImmediately: true}), ) } diff --git a/backend/internal/job/geoloite_update_job.go b/backend/internal/job/geoloite_update_job.go index 65353757..7b163a8d 100644 --- a/backend/internal/job/geoloite_update_job.go +++ b/backend/internal/job/geoloite_update_job.go @@ -23,7 +23,7 @@ func (s *Scheduler) RegisterGeoLiteUpdateJobs(ctx context.Context, geoLiteServic jobs := &GeoLiteUpdateJobs{geoLiteService: geoLiteService} // Run every 24 hours (and right away) - return s.RegisterJob(ctx, "UpdateGeoLiteDB", gocron.DurationJob(24*time.Hour), jobs.updateGoeLiteDB, true) + return s.RegisterJob(ctx, "UpdateGeoLiteDB", gocron.DurationJob(24*time.Hour), jobs.updateGoeLiteDB, service.RegisterJobOpts{RunImmediately: true}) } func (j *GeoLiteUpdateJobs) updateGoeLiteDB(ctx context.Context) error { diff --git a/backend/internal/job/ldap_job.go b/backend/internal/job/ldap_job.go index 4ba7728e..1547d954 100644 --- a/backend/internal/job/ldap_job.go +++ b/backend/internal/job/ldap_job.go @@ -16,7 +16,7 @@ func (s *Scheduler) RegisterLdapJobs(ctx context.Context, ldapService *service.L jobs := &LdapJobs{ldapService: ldapService, appConfigService: appConfigService} // Register the job to run every hour (with some jitter) - return s.RegisterJob(ctx, "SyncLdap", jobDefWithJitter(time.Hour), jobs.syncLdap, true) + return s.RegisterJob(ctx, "SyncLdap", jobDefWithJitter(time.Hour), jobs.syncLdap, service.RegisterJobOpts{RunImmediately: true}) } func (j *LdapJobs) syncLdap(ctx context.Context) error { diff --git a/backend/internal/job/scheduler.go b/backend/internal/job/scheduler.go index 85b7e1ff..d7427b1b 100644 --- a/backend/internal/job/scheduler.go +++ b/backend/internal/job/scheduler.go @@ -7,8 +7,11 @@ import ( "log/slog" "time" + backoff "github.com/cenkalti/backoff/v5" "github.com/go-co-op/gocron/v2" "github.com/google/uuid" + + "github.com/pocket-id/pocket-id/backend/internal/service" ) type Scheduler struct { @@ -61,7 +64,29 @@ func (s *Scheduler) Run(ctx context.Context) error { return nil } -func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, runImmediately bool, extraOptions ...gocron.JobOption) error { +func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, jobFn func(ctx context.Context) error, opts service.RegisterJobOpts) error { + // If a BackOff strategy is provided, wrap the job with retry logic. + if opts.BackOff != nil { + origJob := jobFn + jobFn = func(ctx context.Context) error { + _, err := backoff.Retry( + ctx, + func() (struct{}, error) { + return struct{}{}, origJob(ctx) + }, + backoff.WithBackOff(opts.BackOff), + backoff.WithNotify(func(err error, d time.Duration) { + slog.WarnContext(ctx, "Job failed, retrying", + slog.String("name", name), + slog.Any("error", err), + slog.Duration("retryIn", d), + ) + }), + ) + return err + } + } + jobOptions := []gocron.JobOption{ gocron.WithContext(ctx), gocron.WithName(name), @@ -88,13 +113,13 @@ func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.Job ), } - if runImmediately { + if opts.RunImmediately { jobOptions = append(jobOptions, gocron.JobOption(gocron.WithStartImmediately())) } - jobOptions = append(jobOptions, extraOptions...) + jobOptions = append(jobOptions, opts.ExtraOptions...) - _, err := s.scheduler.NewJob(def, gocron.NewTask(job), jobOptions...) + _, err := s.scheduler.NewJob(def, gocron.NewTask(jobFn), jobOptions...) if err != nil { return fmt.Errorf("failed to register job %q: %w", name, err) diff --git a/backend/internal/job/scim_job.go b/backend/internal/job/scim_job.go index cfddd2ba..5c4336f6 100644 --- a/backend/internal/job/scim_job.go +++ b/backend/internal/job/scim_job.go @@ -17,7 +17,7 @@ func (s *Scheduler) RegisterScimJobs(ctx context.Context, scimService *service.S jobs := &ScimJobs{scimService: scimService} // Register the job to run every hour (with some jitter) - return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(time.Hour), jobs.SyncScim, true) + return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(time.Hour), jobs.SyncScim, service.RegisterJobOpts{RunImmediately: true}) } func (j *ScimJobs) SyncScim(ctx context.Context) error { diff --git a/backend/internal/service/scheduler.go b/backend/internal/service/scheduler.go new file mode 100644 index 00000000..b53f195b --- /dev/null +++ b/backend/internal/service/scheduler.go @@ -0,0 +1,25 @@ +package service + +import ( + "context" + + backoff "github.com/cenkalti/backoff/v5" + "github.com/go-co-op/gocron/v2" +) + +// RegisterJobOpts holds optional configuration for registering a scheduled job. +type RegisterJobOpts struct { + // RunImmediately runs the job immediately after registration. + RunImmediately bool + // ExtraOptions are additional gocron job options. + ExtraOptions []gocron.JobOption + // BackOff is an optional backoff strategy. If non-nil, the job will be wrapped + // with automatic retry logic using the provided backoff on transient failures. + BackOff backoff.BackOff +} + +// Scheduler is an interface for registering and managing background jobs. +type Scheduler interface { + RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, opts RegisterJobOpts) error + RemoveJob(name string) error +} diff --git a/backend/internal/service/scim_service.go b/backend/internal/service/scim_service.go index c8fc4339..57c5cec3 100644 --- a/backend/internal/service/scim_service.go +++ b/backend/internal/service/scim_service.go @@ -34,11 +34,6 @@ const scimErrorBodyLimit = 4096 type scimSyncAction int -type Scheduler interface { - RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, runImmediately bool, extraOptions ...gocron.JobOption) error - RemoveJob(name string) error -} - const ( scimActionNone scimSyncAction = iota scimActionCreated @@ -149,7 +144,7 @@ func (s *ScimService) ScheduleSync() { err := s.scheduler.RegisterJob( context.Background(), jobName, - gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(start)), s.SyncAll, false) + gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(start)), s.SyncAll, RegisterJobOpts{RunImmediately: false}) if err != nil { slog.Error("Failed to schedule SCIM sync", slog.Any("error", err))