Compare commits

...

4 Commits

Author SHA1 Message Date
ItalyPaleAle
c63ef67c2c 💄 2026-03-05 16:18:38 -08:00
copilot-swe-agent[bot]
e827f80496 Address review: omit RunImmediately: false, add max 3 retries to backoff
Co-authored-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
2026-03-05 18:16:47 +00:00
copilot-swe-agent[bot]
a42dc57358 Refactor RegisterJob signature and implement BackOff handling
Co-authored-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
2026-03-05 17:55:25 +00:00
copilot-swe-agent[bot]
881a2c64be Initial plan 2026-03-05 16:32:05 +00:00
10 changed files with 74 additions and 25 deletions

View File

@@ -28,7 +28,7 @@ func (s *Scheduler) RegisterAnalyticsJob(ctx context.Context, appConfig *service
appConfig: appConfig,
httpClient: httpClient,
}
return s.RegisterJob(ctx, "SendHeartbeat", gocron.DurationJob(24*time.Hour), jobs.sendHeartbeat, true)
return s.RegisterJob(ctx, "SendHeartbeat", gocron.DurationJob(24*time.Hour), jobs.sendHeartbeat, service.RegisterJobOpts{RunImmediately: true})
}
type AnalyticsJob struct {

View File

@@ -22,7 +22,7 @@ func (s *Scheduler) RegisterApiKeyExpiryJob(ctx context.Context, apiKeyService *
}
// Send every day at midnight
return s.RegisterJob(ctx, "ExpiredApiKeyEmailJob", gocron.CronJob("0 0 * * *", false), jobs.checkAndNotifyExpiringApiKeys, false)
return s.RegisterJob(ctx, "ExpiredApiKeyEmailJob", gocron.CronJob("0 0 * * *", false), jobs.checkAndNotifyExpiringApiKeys, service.RegisterJobOpts{})
}
func (j *ApiKeyEmailJobs) checkAndNotifyExpiringApiKeys(ctx context.Context) error {

View File

@@ -7,25 +7,28 @@ import (
"log/slog"
"time"
backoff "github.com/cenkalti/backoff/v5"
"gorm.io/gorm"
"github.com/pocket-id/pocket-id/backend/internal/common"
"github.com/pocket-id/pocket-id/backend/internal/model"
datatype "github.com/pocket-id/pocket-id/backend/internal/model/types"
"github.com/pocket-id/pocket-id/backend/internal/service"
)
func (s *Scheduler) RegisterDbCleanupJobs(ctx context.Context, db *gorm.DB) error {
jobs := &DbCleanupJobs{db: db}
// Use exponential backoff for each DB cleanup job so transient query failures are retried automatically rather than causing an immediate job failure
return errors.Join(
s.RegisterJob(ctx, "ClearWebauthnSessions", jobDefWithJitter(24*time.Hour), jobs.clearWebauthnSessions, true),
s.RegisterJob(ctx, "ClearOneTimeAccessTokens", jobDefWithJitter(24*time.Hour), jobs.clearOneTimeAccessTokens, true),
s.RegisterJob(ctx, "ClearSignupTokens", jobDefWithJitter(24*time.Hour), jobs.clearSignupTokens, true),
s.RegisterJob(ctx, "ClearEmailVerificationTokens", jobDefWithJitter(24*time.Hour), jobs.clearEmailVerificationTokens, true),
s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", jobDefWithJitter(24*time.Hour), jobs.clearOidcAuthorizationCodes, true),
s.RegisterJob(ctx, "ClearOidcRefreshTokens", jobDefWithJitter(24*time.Hour), jobs.clearOidcRefreshTokens, true),
s.RegisterJob(ctx, "ClearReauthenticationTokens", jobDefWithJitter(24*time.Hour), jobs.clearReauthenticationTokens, true),
s.RegisterJob(ctx, "ClearAuditLogs", jobDefWithJitter(24*time.Hour), jobs.clearAuditLogs, true),
s.RegisterJob(ctx, "ClearWebauthnSessions", jobDefWithJitter(24*time.Hour), jobs.clearWebauthnSessions, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
s.RegisterJob(ctx, "ClearOneTimeAccessTokens", jobDefWithJitter(24*time.Hour), jobs.clearOneTimeAccessTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
s.RegisterJob(ctx, "ClearSignupTokens", jobDefWithJitter(24*time.Hour), jobs.clearSignupTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
s.RegisterJob(ctx, "ClearEmailVerificationTokens", jobDefWithJitter(24*time.Hour), jobs.clearEmailVerificationTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", jobDefWithJitter(24*time.Hour), jobs.clearOidcAuthorizationCodes, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
s.RegisterJob(ctx, "ClearOidcRefreshTokens", jobDefWithJitter(24*time.Hour), jobs.clearOidcRefreshTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
s.RegisterJob(ctx, "ClearReauthenticationTokens", jobDefWithJitter(24*time.Hour), jobs.clearReauthenticationTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
s.RegisterJob(ctx, "ClearAuditLogs", jobDefWithJitter(24*time.Hour), jobs.clearAuditLogs, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
)
}

View File

@@ -13,6 +13,7 @@ import (
"gorm.io/gorm"
"github.com/pocket-id/pocket-id/backend/internal/model"
"github.com/pocket-id/pocket-id/backend/internal/service"
"github.com/pocket-id/pocket-id/backend/internal/storage"
)
@@ -21,13 +22,13 @@ func (s *Scheduler) RegisterFileCleanupJobs(ctx context.Context, db *gorm.DB, fi
var errs []error
errs = append(errs,
s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, false),
s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, service.RegisterJobOpts{}),
)
// Only necessary for file system storage
if fileStorage.Type() == storage.TypeFileSystem {
errs = append(errs,
s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, true),
s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, service.RegisterJobOpts{RunImmediately: true}),
)
}

View File

@@ -23,7 +23,7 @@ func (s *Scheduler) RegisterGeoLiteUpdateJobs(ctx context.Context, geoLiteServic
jobs := &GeoLiteUpdateJobs{geoLiteService: geoLiteService}
// Run every 24 hours (and right away)
return s.RegisterJob(ctx, "UpdateGeoLiteDB", gocron.DurationJob(24*time.Hour), jobs.updateGoeLiteDB, true)
return s.RegisterJob(ctx, "UpdateGeoLiteDB", gocron.DurationJob(24*time.Hour), jobs.updateGoeLiteDB, service.RegisterJobOpts{RunImmediately: true})
}
func (j *GeoLiteUpdateJobs) updateGoeLiteDB(ctx context.Context) error {

View File

@@ -16,7 +16,7 @@ func (s *Scheduler) RegisterLdapJobs(ctx context.Context, ldapService *service.L
jobs := &LdapJobs{ldapService: ldapService, appConfigService: appConfigService}
// Register the job to run every hour (with some jitter)
return s.RegisterJob(ctx, "SyncLdap", jobDefWithJitter(time.Hour), jobs.syncLdap, true)
return s.RegisterJob(ctx, "SyncLdap", jobDefWithJitter(time.Hour), jobs.syncLdap, service.RegisterJobOpts{RunImmediately: true})
}
func (j *LdapJobs) syncLdap(ctx context.Context) error {

View File

@@ -7,8 +7,11 @@ import (
"log/slog"
"time"
backoff "github.com/cenkalti/backoff/v5"
"github.com/go-co-op/gocron/v2"
"github.com/google/uuid"
"github.com/pocket-id/pocket-id/backend/internal/service"
)
type Scheduler struct {
@@ -61,7 +64,29 @@ func (s *Scheduler) Run(ctx context.Context) error {
return nil
}
func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, runImmediately bool, extraOptions ...gocron.JobOption) error {
func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, jobFn func(ctx context.Context) error, opts service.RegisterJobOpts) error {
// If a BackOff strategy is provided, wrap the job with retry logic
if opts.BackOff != nil {
origJob := jobFn
jobFn = func(ctx context.Context) error {
_, err := backoff.Retry(
ctx,
func() (struct{}, error) {
return struct{}{}, origJob(ctx)
},
backoff.WithBackOff(opts.BackOff),
backoff.WithNotify(func(err error, d time.Duration) {
slog.WarnContext(ctx, "Job failed, retrying",
slog.String("name", name),
slog.Any("error", err),
slog.Duration("retryIn", d),
)
}),
)
return err
}
}
jobOptions := []gocron.JobOption{
gocron.WithContext(ctx),
gocron.WithName(name),
@@ -88,13 +113,13 @@ func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.Job
),
}
if runImmediately {
if opts.RunImmediately {
jobOptions = append(jobOptions, gocron.JobOption(gocron.WithStartImmediately()))
}
jobOptions = append(jobOptions, extraOptions...)
jobOptions = append(jobOptions, opts.ExtraOptions...)
_, err := s.scheduler.NewJob(def, gocron.NewTask(job), jobOptions...)
_, err := s.scheduler.NewJob(def, gocron.NewTask(jobFn), jobOptions...)
if err != nil {
return fmt.Errorf("failed to register job %q: %w", name, err)

View File

@@ -17,7 +17,7 @@ func (s *Scheduler) RegisterScimJobs(ctx context.Context, scimService *service.S
jobs := &ScimJobs{scimService: scimService}
// Register the job to run every hour (with some jitter)
return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(time.Hour), jobs.SyncScim, true)
return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(time.Hour), jobs.SyncScim, service.RegisterJobOpts{RunImmediately: true})
}
func (j *ScimJobs) SyncScim(ctx context.Context) error {

View File

@@ -0,0 +1,25 @@
package service
import (
"context"
backoff "github.com/cenkalti/backoff/v5"
"github.com/go-co-op/gocron/v2"
)
// RegisterJobOpts holds optional configuration for registering a scheduled job.
type RegisterJobOpts struct {
// RunImmediately runs the job immediately after registration.
RunImmediately bool
// ExtraOptions are additional gocron job options.
ExtraOptions []gocron.JobOption
// BackOff is an optional backoff strategy. If non-nil, the job will be wrapped
// with automatic retry logic using the provided backoff on transient failures.
BackOff backoff.BackOff
}
// Scheduler is an interface for registering and managing background jobs.
type Scheduler interface {
RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, opts RegisterJobOpts) error
RemoveJob(name string) error
}

View File

@@ -34,11 +34,6 @@ const scimErrorBodyLimit = 4096
type scimSyncAction int
type Scheduler interface {
RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, runImmediately bool, extraOptions ...gocron.JobOption) error
RemoveJob(name string) error
}
const (
scimActionNone scimSyncAction = iota
scimActionCreated
@@ -149,7 +144,7 @@ func (s *ScimService) ScheduleSync() {
err := s.scheduler.RegisterJob(
context.Background(), jobName,
gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(start)), s.SyncAll, false)
gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(start)), s.SyncAll, RegisterJobOpts{})
if err != nil {
slog.Error("Failed to schedule SCIM sync", slog.Any("error", err))