mirror of
https://github.com/paralus/paralus.git
synced 2026-05-19 06:46:41 +00:00
Update more resources to use transactions
This commit is contained in:
@@ -13,7 +13,7 @@ import (
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
func CreateCluster(ctx context.Context, tx bun.Tx, cluster *models.Cluster) error {
|
||||
func CreateCluster(ctx context.Context, tx bun.IDB, cluster *models.Cluster) error {
|
||||
|
||||
clstrToken := &models.ClusterToken{
|
||||
OrganizationId: cluster.OrganizationId,
|
||||
|
||||
@@ -128,10 +128,7 @@ func CreateBootstrapAgent(ctx context.Context, db bun.IDB, ba *models.BootstrapA
|
||||
return err
|
||||
}
|
||||
|
||||
// We are explicitly taking in Tx here as it was previously doing RunInTx
|
||||
// TODO: should we take Tx here or just assume we will be passed in a tx? Or should we create one?
|
||||
func RegisterBootstrapAgent(ctx context.Context, db bun.Tx, token string) error {
|
||||
|
||||
ba, err := getBootstrapAgentForToken(ctx, db, token)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -235,13 +235,17 @@ func (es *clusterService) Create(ctx context.Context, cluster *infrav3.Cluster)
|
||||
|
||||
cluster.Spec.ClusterData.Health = infrav3.Health_EDGE_IGNORE
|
||||
|
||||
err = es.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
|
||||
return dao.CreateCluster(ctx, tx, edb)
|
||||
})
|
||||
tx, err := es.db.BeginTx(ctx, &sql.TxOptions{})
|
||||
if err != nil {
|
||||
return &infrav3.Cluster{}, err
|
||||
}
|
||||
|
||||
err = dao.CreateCluster(ctx, tx, edb)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return &infrav3.Cluster{}, err
|
||||
}
|
||||
|
||||
// if project is set create project cluster
|
||||
var pc *models.ProjectCluster
|
||||
pcList := make([]models.ProjectCluster, 0)
|
||||
@@ -250,8 +254,9 @@ func (es *clusterService) Create(ctx context.Context, cluster *infrav3.Cluster)
|
||||
ProjectID: edb.ProjectId,
|
||||
ClusterID: edb.ID,
|
||||
}
|
||||
err = dao.CreateProjectCluster(ctx, es.db, pc)
|
||||
err = dao.CreateProjectCluster(ctx, tx, pc)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return &infrav3.Cluster{}, err
|
||||
}
|
||||
pcList = append(pcList, *pc)
|
||||
@@ -273,9 +278,21 @@ func (es *clusterService) Create(ctx context.Context, cluster *infrav3.Cluster)
|
||||
YamlContent: operatorSpecEncoded,
|
||||
}
|
||||
|
||||
es.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
|
||||
return dao.CreateOperatorBootstrap(ctx, tx, &bootstrapData)
|
||||
})
|
||||
err = dao.CreateOperatorBootstrap(ctx, tx, &bootstrapData)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
cluster.Status = &commonv3.Status{
|
||||
ConditionType: "Create",
|
||||
ConditionStatus: commonv3.ConditionStatus_StatusFailed,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return cluster, err
|
||||
}
|
||||
}
|
||||
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
fmt.Println("unable to commit changes", err)
|
||||
}
|
||||
|
||||
ev := event.Resource{
|
||||
|
||||
@@ -47,10 +47,9 @@ func TestCreateCluster(t *testing.T) {
|
||||
WithArgs().WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(cuuid))
|
||||
mock.ExpectExec(`UPDATE "cluster_clusters"`).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectCommit()
|
||||
|
||||
mock.ExpectExec(`INSERT INTO "cluster_project_cluster"`).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectCommit()
|
||||
|
||||
cluster := &infrav3.Cluster{
|
||||
Metadata: &v3.Metadata{Id: cuuid, Name: "cluster-" + cuuid, Organization: "orgname", Project: "project-" + puuid},
|
||||
|
||||
@@ -52,16 +52,15 @@ func prepareKubeCfgRevocationResponse(kr *models.KubeconfigRevocation) *sentry.K
|
||||
}
|
||||
|
||||
func (krs *kubeconfigRevocationService) Patch(ctx context.Context, kr *sentry.KubeconfigRevocation) error {
|
||||
err := krs.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
|
||||
_, err := dao.GetKubeconfigRevocation(ctx, krs.db, uuid.MustParse(kr.OrganizationID), uuid.MustParse(kr.AccountID), kr.IsSSOUser)
|
||||
return krs.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
|
||||
_, err := dao.GetKubeconfigRevocation(ctx, tx, uuid.MustParse(kr.OrganizationID), uuid.MustParse(kr.AccountID), kr.IsSSOUser)
|
||||
if err != nil && err == sql.ErrNoRows {
|
||||
kcr := convertToModel(kr)
|
||||
kcr.CreatedAt = time.Now()
|
||||
return dao.CreateKubeconfigRevocation(ctx, krs.db, kcr)
|
||||
return dao.CreateKubeconfigRevocation(ctx, tx, kcr)
|
||||
}
|
||||
return dao.UpdateKubeconfigRevocation(ctx, krs.db, convertToModel(kr))
|
||||
return dao.UpdateKubeconfigRevocation(ctx, tx, convertToModel(kr))
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func convertToModel(kr *sentry.KubeconfigRevocation) *models.KubeconfigRevocation {
|
||||
|
||||
@@ -54,17 +54,16 @@ func (kss *kubeconfigSettingService) Patch(ctx context.Context, ks *sentry.Kubec
|
||||
if err != nil {
|
||||
accId = uuid.Nil
|
||||
}
|
||||
err = kss.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
|
||||
_, err := dao.GetKubeconfigSetting(ctx, kss.db, uuid.MustParse(ks.OrganizationID), accId, ks.IsSSOUser)
|
||||
return kss.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
|
||||
_, err := dao.GetKubeconfigSetting(ctx, tx, uuid.MustParse(ks.OrganizationID), accId, ks.IsSSOUser)
|
||||
db := convertToKubeCfgSettingModel(ks)
|
||||
if err != nil && err == sql.ErrNoRows {
|
||||
db.CreatedAt = time.Now()
|
||||
return dao.CreateKubeconfigSetting(ctx, kss.db, convertToKubeCfgSettingModel(ks))
|
||||
return dao.CreateKubeconfigSetting(ctx, tx, convertToKubeCfgSettingModel(ks))
|
||||
}
|
||||
db.ModifiedAt = time.Now()
|
||||
return dao.UpdateKubeconfigSetting(ctx, kss.db, convertToKubeCfgSettingModel(ks))
|
||||
return dao.UpdateKubeconfigSetting(ctx, tx, convertToKubeCfgSettingModel(ks))
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func prepareKubeCfgSettingResponse(ks *models.KubeconfigSetting) *sentry.KubeconfigSetting {
|
||||
|
||||
@@ -41,21 +41,20 @@ func (kcs *kubectlClusterSettingsService) Get(ctx context.Context, orgID string,
|
||||
}
|
||||
|
||||
func (kcs *kubectlClusterSettingsService) Patch(ctx context.Context, kc *sentry.KubectlClusterSettings) error {
|
||||
err := kcs.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
|
||||
_, err := dao.GetkubectlClusterSettings(ctx, kcs.db, uuid.MustParse(kc.OrganizationID), kc.Name)
|
||||
return kcs.db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
|
||||
_, err := dao.GetkubectlClusterSettings(ctx, tx, uuid.MustParse(kc.OrganizationID), kc.Name)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
kcsdb := convertToKubeCtlSettingModel(kc)
|
||||
kcsdb.CreatedAt = time.Now()
|
||||
dao.CreatekubectlClusterSettings(ctx, kcs.db, kcsdb)
|
||||
dao.CreatekubectlClusterSettings(ctx, tx, kcsdb)
|
||||
}
|
||||
return err
|
||||
}
|
||||
kcsdb := convertToKubeCtlSettingModel(kc)
|
||||
kcsdb.ModifiedAt = time.Now()
|
||||
return dao.UpdatekubectlClusterSettings(ctx, kcs.db, kcsdb)
|
||||
return dao.UpdatekubectlClusterSettings(ctx, tx, kcsdb)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func convertToKubeCtlSettingModel(kcs *sentry.KubectlClusterSettings) *models.KubectlClusterSetting {
|
||||
|
||||
Reference in New Issue
Block a user