mirror of
https://github.com/pocket-id/pocket-id.git
synced 2026-03-06 02:40:49 +00:00
Refactor RegisterJob signature and implement BackOff handling
Co-authored-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
@@ -28,7 +28,7 @@ func (s *Scheduler) RegisterAnalyticsJob(ctx context.Context, appConfig *service
|
||||
appConfig: appConfig,
|
||||
httpClient: httpClient,
|
||||
}
|
||||
return s.RegisterJob(ctx, "SendHeartbeat", gocron.DurationJob(24*time.Hour), jobs.sendHeartbeat, true)
|
||||
return s.RegisterJob(ctx, "SendHeartbeat", gocron.DurationJob(24*time.Hour), jobs.sendHeartbeat, service.RegisterJobOpts{RunImmediately: true})
|
||||
}
|
||||
|
||||
type AnalyticsJob struct {
|
||||
|
||||
@@ -22,7 +22,7 @@ func (s *Scheduler) RegisterApiKeyExpiryJob(ctx context.Context, apiKeyService *
|
||||
}
|
||||
|
||||
// Send every day at midnight
|
||||
return s.RegisterJob(ctx, "ExpiredApiKeyEmailJob", gocron.CronJob("0 0 * * *", false), jobs.checkAndNotifyExpiringApiKeys, false)
|
||||
return s.RegisterJob(ctx, "ExpiredApiKeyEmailJob", gocron.CronJob("0 0 * * *", false), jobs.checkAndNotifyExpiringApiKeys, service.RegisterJobOpts{RunImmediately: false})
|
||||
}
|
||||
|
||||
func (j *ApiKeyEmailJobs) checkAndNotifyExpiringApiKeys(ctx context.Context) error {
|
||||
|
||||
@@ -7,25 +7,30 @@ import (
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
backoff "github.com/cenkalti/backoff/v5"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/pocket-id/pocket-id/backend/internal/common"
|
||||
"github.com/pocket-id/pocket-id/backend/internal/model"
|
||||
datatype "github.com/pocket-id/pocket-id/backend/internal/model/types"
|
||||
"github.com/pocket-id/pocket-id/backend/internal/service"
|
||||
)
|
||||
|
||||
func (s *Scheduler) RegisterDbCleanupJobs(ctx context.Context, db *gorm.DB) error {
|
||||
jobs := &DbCleanupJobs{db: db}
|
||||
|
||||
// Use exponential backoff for each DB cleanup job so transient query failures
|
||||
// are retried automatically rather than causing an immediate job failure.
|
||||
// Each job gets its own backoff instance to avoid shared state.
|
||||
return errors.Join(
|
||||
s.RegisterJob(ctx, "ClearWebauthnSessions", jobDefWithJitter(24*time.Hour), jobs.clearWebauthnSessions, true),
|
||||
s.RegisterJob(ctx, "ClearOneTimeAccessTokens", jobDefWithJitter(24*time.Hour), jobs.clearOneTimeAccessTokens, true),
|
||||
s.RegisterJob(ctx, "ClearSignupTokens", jobDefWithJitter(24*time.Hour), jobs.clearSignupTokens, true),
|
||||
s.RegisterJob(ctx, "ClearEmailVerificationTokens", jobDefWithJitter(24*time.Hour), jobs.clearEmailVerificationTokens, true),
|
||||
s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", jobDefWithJitter(24*time.Hour), jobs.clearOidcAuthorizationCodes, true),
|
||||
s.RegisterJob(ctx, "ClearOidcRefreshTokens", jobDefWithJitter(24*time.Hour), jobs.clearOidcRefreshTokens, true),
|
||||
s.RegisterJob(ctx, "ClearReauthenticationTokens", jobDefWithJitter(24*time.Hour), jobs.clearReauthenticationTokens, true),
|
||||
s.RegisterJob(ctx, "ClearAuditLogs", jobDefWithJitter(24*time.Hour), jobs.clearAuditLogs, true),
|
||||
s.RegisterJob(ctx, "ClearWebauthnSessions", jobDefWithJitter(24*time.Hour), jobs.clearWebauthnSessions, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
|
||||
s.RegisterJob(ctx, "ClearOneTimeAccessTokens", jobDefWithJitter(24*time.Hour), jobs.clearOneTimeAccessTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
|
||||
s.RegisterJob(ctx, "ClearSignupTokens", jobDefWithJitter(24*time.Hour), jobs.clearSignupTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
|
||||
s.RegisterJob(ctx, "ClearEmailVerificationTokens", jobDefWithJitter(24*time.Hour), jobs.clearEmailVerificationTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
|
||||
s.RegisterJob(ctx, "ClearOidcAuthorizationCodes", jobDefWithJitter(24*time.Hour), jobs.clearOidcAuthorizationCodes, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
|
||||
s.RegisterJob(ctx, "ClearOidcRefreshTokens", jobDefWithJitter(24*time.Hour), jobs.clearOidcRefreshTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
|
||||
s.RegisterJob(ctx, "ClearReauthenticationTokens", jobDefWithJitter(24*time.Hour), jobs.clearReauthenticationTokens, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
|
||||
s.RegisterJob(ctx, "ClearAuditLogs", jobDefWithJitter(24*time.Hour), jobs.clearAuditLogs, service.RegisterJobOpts{RunImmediately: true, BackOff: backoff.NewExponentialBackOff()}),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/pocket-id/pocket-id/backend/internal/model"
|
||||
"github.com/pocket-id/pocket-id/backend/internal/service"
|
||||
"github.com/pocket-id/pocket-id/backend/internal/storage"
|
||||
)
|
||||
|
||||
@@ -21,13 +22,13 @@ func (s *Scheduler) RegisterFileCleanupJobs(ctx context.Context, db *gorm.DB, fi
|
||||
|
||||
var errs []error
|
||||
errs = append(errs,
|
||||
s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, false),
|
||||
s.RegisterJob(ctx, "ClearUnusedDefaultProfilePictures", gocron.DurationJob(24*time.Hour), jobs.clearUnusedDefaultProfilePictures, service.RegisterJobOpts{RunImmediately: false}),
|
||||
)
|
||||
|
||||
// Only necessary for file system storage
|
||||
if fileStorage.Type() == storage.TypeFileSystem {
|
||||
errs = append(errs,
|
||||
s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, true),
|
||||
s.RegisterJob(ctx, "ClearOrphanedTempFiles", gocron.DurationJob(12*time.Hour), jobs.clearOrphanedTempFiles, service.RegisterJobOpts{RunImmediately: true}),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ func (s *Scheduler) RegisterGeoLiteUpdateJobs(ctx context.Context, geoLiteServic
|
||||
jobs := &GeoLiteUpdateJobs{geoLiteService: geoLiteService}
|
||||
|
||||
// Run every 24 hours (and right away)
|
||||
return s.RegisterJob(ctx, "UpdateGeoLiteDB", gocron.DurationJob(24*time.Hour), jobs.updateGoeLiteDB, true)
|
||||
return s.RegisterJob(ctx, "UpdateGeoLiteDB", gocron.DurationJob(24*time.Hour), jobs.updateGoeLiteDB, service.RegisterJobOpts{RunImmediately: true})
|
||||
}
|
||||
|
||||
func (j *GeoLiteUpdateJobs) updateGoeLiteDB(ctx context.Context) error {
|
||||
|
||||
@@ -16,7 +16,7 @@ func (s *Scheduler) RegisterLdapJobs(ctx context.Context, ldapService *service.L
|
||||
jobs := &LdapJobs{ldapService: ldapService, appConfigService: appConfigService}
|
||||
|
||||
// Register the job to run every hour (with some jitter)
|
||||
return s.RegisterJob(ctx, "SyncLdap", jobDefWithJitter(time.Hour), jobs.syncLdap, true)
|
||||
return s.RegisterJob(ctx, "SyncLdap", jobDefWithJitter(time.Hour), jobs.syncLdap, service.RegisterJobOpts{RunImmediately: true})
|
||||
}
|
||||
|
||||
func (j *LdapJobs) syncLdap(ctx context.Context) error {
|
||||
|
||||
@@ -7,8 +7,11 @@ import (
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
backoff "github.com/cenkalti/backoff/v5"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/pocket-id/pocket-id/backend/internal/service"
|
||||
)
|
||||
|
||||
type Scheduler struct {
|
||||
@@ -61,7 +64,29 @@ func (s *Scheduler) Run(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, runImmediately bool, extraOptions ...gocron.JobOption) error {
|
||||
func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, jobFn func(ctx context.Context) error, opts service.RegisterJobOpts) error {
|
||||
// If a BackOff strategy is provided, wrap the job with retry logic.
|
||||
if opts.BackOff != nil {
|
||||
origJob := jobFn
|
||||
jobFn = func(ctx context.Context) error {
|
||||
_, err := backoff.Retry(
|
||||
ctx,
|
||||
func() (struct{}, error) {
|
||||
return struct{}{}, origJob(ctx)
|
||||
},
|
||||
backoff.WithBackOff(opts.BackOff),
|
||||
backoff.WithNotify(func(err error, d time.Duration) {
|
||||
slog.WarnContext(ctx, "Job failed, retrying",
|
||||
slog.String("name", name),
|
||||
slog.Any("error", err),
|
||||
slog.Duration("retryIn", d),
|
||||
)
|
||||
}),
|
||||
)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
jobOptions := []gocron.JobOption{
|
||||
gocron.WithContext(ctx),
|
||||
gocron.WithName(name),
|
||||
@@ -88,13 +113,13 @@ func (s *Scheduler) RegisterJob(ctx context.Context, name string, def gocron.Job
|
||||
),
|
||||
}
|
||||
|
||||
if runImmediately {
|
||||
if opts.RunImmediately {
|
||||
jobOptions = append(jobOptions, gocron.JobOption(gocron.WithStartImmediately()))
|
||||
}
|
||||
|
||||
jobOptions = append(jobOptions, extraOptions...)
|
||||
jobOptions = append(jobOptions, opts.ExtraOptions...)
|
||||
|
||||
_, err := s.scheduler.NewJob(def, gocron.NewTask(job), jobOptions...)
|
||||
_, err := s.scheduler.NewJob(def, gocron.NewTask(jobFn), jobOptions...)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to register job %q: %w", name, err)
|
||||
|
||||
@@ -17,7 +17,7 @@ func (s *Scheduler) RegisterScimJobs(ctx context.Context, scimService *service.S
|
||||
jobs := &ScimJobs{scimService: scimService}
|
||||
|
||||
// Register the job to run every hour (with some jitter)
|
||||
return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(time.Hour), jobs.SyncScim, true)
|
||||
return s.RegisterJob(ctx, "SyncScim", gocron.DurationJob(time.Hour), jobs.SyncScim, service.RegisterJobOpts{RunImmediately: true})
|
||||
}
|
||||
|
||||
func (j *ScimJobs) SyncScim(ctx context.Context) error {
|
||||
|
||||
25
backend/internal/service/scheduler.go
Normal file
25
backend/internal/service/scheduler.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
backoff "github.com/cenkalti/backoff/v5"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
)
|
||||
|
||||
// RegisterJobOpts holds optional configuration for registering a scheduled job.
|
||||
type RegisterJobOpts struct {
|
||||
// RunImmediately runs the job immediately after registration.
|
||||
RunImmediately bool
|
||||
// ExtraOptions are additional gocron job options.
|
||||
ExtraOptions []gocron.JobOption
|
||||
// BackOff is an optional backoff strategy. If non-nil, the job will be wrapped
|
||||
// with automatic retry logic using the provided backoff on transient failures.
|
||||
BackOff backoff.BackOff
|
||||
}
|
||||
|
||||
// Scheduler is an interface for registering and managing background jobs.
|
||||
type Scheduler interface {
|
||||
RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, opts RegisterJobOpts) error
|
||||
RemoveJob(name string) error
|
||||
}
|
||||
@@ -34,11 +34,6 @@ const scimErrorBodyLimit = 4096
|
||||
|
||||
type scimSyncAction int
|
||||
|
||||
type Scheduler interface {
|
||||
RegisterJob(ctx context.Context, name string, def gocron.JobDefinition, job func(ctx context.Context) error, runImmediately bool, extraOptions ...gocron.JobOption) error
|
||||
RemoveJob(name string) error
|
||||
}
|
||||
|
||||
const (
|
||||
scimActionNone scimSyncAction = iota
|
||||
scimActionCreated
|
||||
@@ -149,7 +144,7 @@ func (s *ScimService) ScheduleSync() {
|
||||
|
||||
err := s.scheduler.RegisterJob(
|
||||
context.Background(), jobName,
|
||||
gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(start)), s.SyncAll, false)
|
||||
gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(start)), s.SyncAll, RegisterJobOpts{RunImmediately: false})
|
||||
|
||||
if err != nil {
|
||||
slog.Error("Failed to schedule SCIM sync", slog.Any("error", err))
|
||||
|
||||
Reference in New Issue
Block a user