mirror of
https://github.com/paralus/paralus.git
synced 2026-02-14 17:49:51 +00:00
Delete relay agent from target cluster while cluster deletion (#120)
* updated reconciler to handle delete events Signed-off-by: mabhi <abhijit.mukherjee@infracloud.io> * fix to ensure relay agent is deleted from taget cluster Signed-off-by: Nirav Parikh <nir.parikh05@gmail.com> * updated changelog Signed-off-by: niravparikh05 <nir.parikh05@gmail.com> Signed-off-by: mabhi <abhijit.mukherjee@infracloud.io> Signed-off-by: Nirav Parikh <nir.parikh05@gmail.com> Signed-off-by: niravparikh05 <nir.parikh05@gmail.com> Co-authored-by: mabhi <abhijit.mukherjee@infracloud.io>
This commit is contained in:
@@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.
|
||||
|
||||
## Fixed
|
||||
- Fix modify userinfo service to include scope in response [mabhi](https://github.com/mabhi)
|
||||
- Kubectl commands work even after deleting an imported cluster from [mabhi](https://github.com/mabhi) and [niravparikh05](https://github.com/niravparikh05)
|
||||
|
||||
## [0.1.8] - 2022-11-25
|
||||
|
||||
|
||||
2
main.go
2
main.go
@@ -697,7 +697,7 @@ func runEventHandlers(wg *sync.WaitGroup, ctx context.Context) {
|
||||
defer wg.Done()
|
||||
|
||||
//TODO: need to add a bunch of other handlers with gitops
|
||||
ceh := reconcile.NewClusterEventHandler(cs)
|
||||
ceh := reconcile.NewClusterEventHandler(cs, db, bs, kekFunc)
|
||||
_log.Infow("starting cluster event handler")
|
||||
go ceh.Handle(ctx.Done())
|
||||
|
||||
|
||||
@@ -25,6 +25,8 @@ type Resource struct {
|
||||
ID string `json:"id,omitempty"`
|
||||
Name string `json:"n,omitempty"`
|
||||
EventType ResourceEventType `json:"t,omitempty"`
|
||||
Username string `json:"un,omitempty"`
|
||||
Account string `json:"acc,omitempty"`
|
||||
}
|
||||
|
||||
// Key is the key for this event which can be used as a cache key etc
|
||||
|
||||
@@ -4,11 +4,14 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/paralus/paralus/pkg/common"
|
||||
"github.com/paralus/paralus/pkg/event"
|
||||
"github.com/paralus/paralus/pkg/query"
|
||||
"github.com/paralus/paralus/pkg/sentry/cryptoutil"
|
||||
"github.com/paralus/paralus/pkg/service"
|
||||
commonv3 "github.com/paralus/paralus/proto/types/commonpb/v3"
|
||||
infrav3 "github.com/paralus/paralus/proto/types/infrapb/v3"
|
||||
"github.com/uptrace/bun"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
@@ -43,14 +46,22 @@ type clusterEventHandler struct {
|
||||
|
||||
// cluster workload work queue
|
||||
wwq workqueue.RateLimitingInterface
|
||||
|
||||
// required for cluster event reconciler
|
||||
db *bun.DB
|
||||
bs service.BootstrapService
|
||||
pf cryptoutil.PasswordFunc
|
||||
}
|
||||
|
||||
// NewClusterEventHandler returns new cluster event handler
|
||||
func NewClusterEventHandler(cs service.ClusterService) ClusterEventHandler {
|
||||
func NewClusterEventHandler(cs service.ClusterService, db *bun.DB, bs service.BootstrapService, pf cryptoutil.PasswordFunc) ClusterEventHandler {
|
||||
return &clusterEventHandler{
|
||||
cs: cs,
|
||||
cwq: workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter()),
|
||||
wwq: workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter()),
|
||||
db: db,
|
||||
bs: bs,
|
||||
pf: pf,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,9 +170,14 @@ func (h *clusterEventHandler) handleClusterEvent(ev event.Resource) {
|
||||
cluster.Metadata.Partner = ev.PartnerID
|
||||
cluster.Metadata.Id = ev.ID
|
||||
|
||||
ctx = context.WithValue(ctx, common.SessionDataKey, &commonv3.SessionData{
|
||||
Username: ev.Username,
|
||||
Account: ev.Account,
|
||||
})
|
||||
|
||||
_log.Debugw("handling cluster reconcile", "cluster", cluster.Metadata, "event", ev, "cluster status", cluster.Spec.ClusterData.ClusterStatus)
|
||||
|
||||
reconciler := NewClusterReconciler(h.cs)
|
||||
reconciler := NewClusterReconciler(h.cs, h.db, h.bs, h.pf)
|
||||
err = reconciler.Reconcile(ctx, cluster)
|
||||
if err != nil {
|
||||
_log.Infow("unable to reconcile cluster", "error", err, "event", "ev")
|
||||
|
||||
@@ -7,11 +7,17 @@ import (
|
||||
|
||||
clstrutil "github.com/paralus/paralus/internal/cluster"
|
||||
"github.com/paralus/paralus/internal/cluster/constants"
|
||||
"github.com/paralus/paralus/pkg/service"
|
||||
infrav3 "github.com/paralus/paralus/proto/types/infrapb/v3"
|
||||
|
||||
"github.com/paralus/paralus/pkg/query"
|
||||
"github.com/paralus/paralus/pkg/sentry/cryptoutil"
|
||||
"github.com/paralus/paralus/pkg/sentry/kubeconfig"
|
||||
sentryutil "github.com/paralus/paralus/pkg/sentry/util"
|
||||
"github.com/paralus/paralus/pkg/service"
|
||||
sentryrpc "github.com/paralus/paralus/proto/rpc/sentry"
|
||||
ctypesv3 "github.com/paralus/paralus/proto/types/commonpb/v3"
|
||||
infrav3 "github.com/paralus/paralus/proto/types/infrapb/v3"
|
||||
"github.com/paralus/paralus/proto/types/sentry"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/uptrace/bun"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -25,11 +31,14 @@ type ClusterReconciler interface {
|
||||
|
||||
type clusterReconciler struct {
|
||||
cs service.ClusterService
|
||||
db *bun.DB
|
||||
bs service.BootstrapService
|
||||
pf cryptoutil.PasswordFunc
|
||||
}
|
||||
|
||||
// NewClusterReconciler returns new cluster reconciler
|
||||
func NewClusterReconciler(cs service.ClusterService) ClusterReconciler {
|
||||
return &clusterReconciler{cs: cs}
|
||||
func NewClusterReconciler(cs service.ClusterService, db *bun.DB, bs service.BootstrapService, pf cryptoutil.PasswordFunc) ClusterReconciler {
|
||||
return &clusterReconciler{cs: cs, db: db, bs: bs, pf: pf}
|
||||
}
|
||||
|
||||
func (r *clusterReconciler) Reconcile(ctx context.Context, cluster *infrav3.Cluster) error {
|
||||
@@ -66,10 +75,79 @@ func canReconcileClusterBootstrapAgent(c *infrav3.Cluster) bool {
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteForCluster delete bootstrap agent
|
||||
func (r *clusterReconciler) deleteBootstrapAgentForCluster(ctx context.Context, cluster *infrav3.Cluster) error {
|
||||
|
||||
resp, err := r.bs.SelectBootstrapAgentTemplates(ctx, query.WithOptions(&ctypesv3.QueryOptions{
|
||||
GlobalScope: true,
|
||||
Selector: "paralus.dev/defaultRelay=true",
|
||||
}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, bat := range resp.Items {
|
||||
|
||||
agent := &sentry.BootstrapAgent{
|
||||
Metadata: &ctypesv3.Metadata{
|
||||
Id: cluster.Metadata.Id,
|
||||
Name: cluster.Metadata.Name,
|
||||
Partner: cluster.Metadata.Partner,
|
||||
Organization: cluster.Metadata.Organization,
|
||||
Project: cluster.Metadata.Project,
|
||||
},
|
||||
Spec: &sentry.BootstrapAgentSpec{
|
||||
TemplateRef: fmt.Sprintf("template/%s", bat.Metadata.Name),
|
||||
},
|
||||
}
|
||||
|
||||
templateRef, err := sentryutil.GetTemplateScope(agent.Spec.TemplateRef)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = r.bs.DeleteBootstrapAgent(ctx, templateRef, query.WithMeta(agent.Metadata))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *clusterReconciler) handleClusterDelete(ctx context.Context, cluster *infrav3.Cluster) error {
|
||||
_log.Infow("handling cluster delete", "cluster.Name", cluster.Metadata.Name)
|
||||
try := func() error {
|
||||
_log.Debugw("no relay networks to disassociate", "cluster name", cluster.Metadata.Name)
|
||||
|
||||
sd, ok := service.GetSessionDataFromContext(ctx)
|
||||
if !ok {
|
||||
return errors.New("failed to get session data")
|
||||
}
|
||||
|
||||
in := &sentryrpc.GetForClusterRequest{
|
||||
Namespace: "paralus-system",
|
||||
SystemUser: false,
|
||||
Opts: &ctypesv3.QueryOptions{
|
||||
Name: cluster.Metadata.Name,
|
||||
ClusterID: cluster.Metadata.Id,
|
||||
Organization: cluster.Metadata.Organization,
|
||||
Partner: cluster.Metadata.Partner,
|
||||
Username: sd.Username,
|
||||
Account: sd.Account,
|
||||
},
|
||||
}
|
||||
|
||||
kss := service.NewKubeconfigSettingService(r.db)
|
||||
config, err := kubeconfig.GetConfigForCluster(ctx, r.bs, in, r.pf, kss, kubeconfig.ParalusSystem)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
status := service.DeleteRelayAgent(ctx, config, "paralus-system")
|
||||
|
||||
_log.Infof("deleted relay agent in cluster with status: ", fmt.Sprint(status))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -77,7 +155,13 @@ func (r *clusterReconciler) handleClusterDelete(ctx context.Context, cluster *in
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
err := r.cs.UpdateStatus(ctx, &infrav3.Cluster{
|
||||
err := r.deleteBootstrapAgentForCluster(ctx, cluster)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_log.Info("deleted bootstrap agent for cluster")
|
||||
|
||||
err = r.cs.UpdateStatus(ctx, &infrav3.Cluster{
|
||||
Metadata: cluster.Metadata,
|
||||
Spec: &infrav3.ClusterSpec{
|
||||
ClusterData: &infrav3.ClusterData{
|
||||
|
||||
@@ -398,7 +398,7 @@ func GetAuthorization(ctx context.Context, req *sentryrpc.GetUserAuthorizationRe
|
||||
// is local user active
|
||||
if ok, _ := aps.IsSSOAccount(ctx, accountID); !ok {
|
||||
active, err := aps.IsAccountActive(ctx, accountID, orgID)
|
||||
_log.Infow("accountID ", accountID, "orgID ", orgID, "active ", active)
|
||||
_log.Infow("accountID ", accountID, "orgID ", orgID, "active ", fmt.Sprint(active))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -680,7 +680,7 @@ func GetAuthorization(ctx context.Context, req *sentryrpc.GetUserAuthorizationRe
|
||||
resp.EnforceOrgAdminOnlySecretAccess = enforceOrgAdminOnlySecretAccess
|
||||
resp.IsOrgAdmin = isOrgAdmin
|
||||
|
||||
_log.Infow("username", userName)
|
||||
_log.Infof("username %s", userName)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ import (
|
||||
|
||||
const (
|
||||
kubeconfigPermission = sentry.KubeconfigReadPermission
|
||||
systemUsername = "admin@paralus.co"
|
||||
systemUsername = "admin@paralus.local"
|
||||
)
|
||||
|
||||
var _log = log.GetLogger()
|
||||
@@ -356,7 +356,7 @@ func getCertValidity(ctx context.Context, orgID, accountID string, isSSO bool, k
|
||||
return (360 * (time.Hour * 24)), nil
|
||||
}
|
||||
|
||||
func getConfig(username, namespace, certCN, serverHost string, bootstrapInfra *sentry.BootstrapInfra, bootstrapAgents []*sentry.BootstrapAgent, pf cryptoutil.PasswordFunc, certValidity time.Duration) (*clientcmdapiv1.Config, error) {
|
||||
func getConfig(username, namespace, certCN, serverHost string, bootstrapInfra *sentry.BootstrapInfra, bootstrapAgents []*sentry.BootstrapAgent, pf cryptoutil.PasswordFunc, certValidity time.Duration, clusterName string) (*clientcmdapiv1.Config, error) {
|
||||
|
||||
if namespace == "" {
|
||||
namespace = "default"
|
||||
@@ -443,7 +443,9 @@ func getConfig(username, namespace, certCN, serverHost string, bootstrapInfra *s
|
||||
Contexts: contexts,
|
||||
}
|
||||
|
||||
if len(contexts) > 0 {
|
||||
if clusterName = strings.Trim(clusterName, " "); clusterName != "" {
|
||||
config.CurrentContext = clusterName
|
||||
} else if len(contexts) > 0 {
|
||||
config.CurrentContext = contexts[0].Name
|
||||
}
|
||||
|
||||
@@ -498,9 +500,6 @@ func GetConfigForCluster(ctx context.Context, bs service.BootstrapService, req *
|
||||
if req.SystemUser {
|
||||
username = systemUsername
|
||||
}
|
||||
if sessionType == ParalusSystem {
|
||||
username = ParalusSystem + "-" + username
|
||||
}
|
||||
enforceSession := false
|
||||
|
||||
// get user level settings if exist
|
||||
@@ -553,7 +552,7 @@ func GetConfigForCluster(ctx context.Context, bs service.BootstrapService, req *
|
||||
certValidity = 8 * time.Hour
|
||||
}
|
||||
|
||||
config, err := getConfig(username, req.Namespace, cn, serverHost, bi, bal.Items, pf, certValidity)
|
||||
config, err := getConfig(username, req.Namespace, cn, serverHost, bi, bal.Items, pf, certValidity, opts.Name)
|
||||
if err != nil {
|
||||
_log.Errorw("error generating kubeconfig", "error", err.Error())
|
||||
return nil, err
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -419,13 +418,6 @@ func (s *clusterService) prepareClusterResponse(ctx context.Context, clstr *infr
|
||||
CreatedAt: timestamppb.New(c.CreatedAt),
|
||||
}
|
||||
|
||||
sm := int32(infrav3.ClusterShareMode_ClusterShareModeNotSet)
|
||||
smv, err := strconv.ParseInt(c.ShareMode, 10, 32)
|
||||
if err != nil {
|
||||
_log.Infow("unable to convert value, ", err.Error())
|
||||
} else {
|
||||
sm = int32(smv)
|
||||
}
|
||||
var proxy infrav3.ProxyConfig
|
||||
if c.ProxyConfig != nil {
|
||||
json.Unmarshal(c.ProxyConfig, &proxy)
|
||||
@@ -451,7 +443,6 @@ func (s *clusterService) prepareClusterResponse(ctx context.Context, clstr *infr
|
||||
clstr.Spec = &infrav3.ClusterSpec{
|
||||
ClusterType: c.ClusterType,
|
||||
OverrideSelector: c.OverrideSelector,
|
||||
ShareMode: infrav3.ClusterShareMode(sm),
|
||||
ProxyConfig: &proxy,
|
||||
Params: ¶ms,
|
||||
ClusterData: &infrav3.ClusterData{
|
||||
@@ -592,6 +583,12 @@ func (s *clusterService) Update(ctx context.Context, cluster *infrav3.Cluster) (
|
||||
ID: cluster.Metadata.Id,
|
||||
}
|
||||
|
||||
sd, ok := GetSessionDataFromContext(ctx)
|
||||
if ok {
|
||||
ev.Username = sd.Username
|
||||
ev.Account = sd.Account
|
||||
}
|
||||
|
||||
for _, h := range s.clusterHandlers {
|
||||
h.OnChange(ev)
|
||||
}
|
||||
@@ -618,21 +615,21 @@ func (s *clusterService) Delete(ctx context.Context, cluster *infrav3.Cluster) e
|
||||
clusterId := cluster.Metadata.Id
|
||||
projectId := cluster.Metadata.Project
|
||||
|
||||
err = s.deleteBootstrapAgentForCluster(ctx, cluster)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_log.Infow("deleting cluster", "name", cluster.Metadata.Name)
|
||||
|
||||
_log.Debugw("setting cluster condition to pending delete", "name", cluster.Metadata.Name, "conditions", cluster.Spec.ClusterData.ClusterStatus.Conditions)
|
||||
clstrutil.SetClusterCondition(cluster, clstrutil.NewClusterDelete(constants.Pending, "deleted"))
|
||||
clstrutil.SetClusterCondition(cluster, clstrutil.NewClusterDelete(constants.Pending, "delete request submitted"))
|
||||
|
||||
err = s.UpdateClusterConditionStatus(ctx, cluster)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not update cluster %s status to pending delete", cluster.Metadata.Name)
|
||||
}
|
||||
|
||||
sd, ok := GetSessionDataFromContext(ctx)
|
||||
if !ok {
|
||||
return errors.New("failed to get session data")
|
||||
}
|
||||
|
||||
ev := event.Resource{
|
||||
PartnerID: cluster.Metadata.Partner,
|
||||
OrganizationID: cluster.Metadata.Organization,
|
||||
@@ -640,6 +637,8 @@ func (s *clusterService) Delete(ctx context.Context, cluster *infrav3.Cluster) e
|
||||
Name: cluster.Metadata.Name,
|
||||
EventType: event.ResourceDelete,
|
||||
ID: clusterId,
|
||||
Username: sd.Username,
|
||||
Account: sd.Account,
|
||||
}
|
||||
|
||||
for _, h := range s.clusterHandlers {
|
||||
@@ -873,46 +872,6 @@ func (s *clusterService) UpdateStatus(ctx context.Context, current *infrav3.Clus
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteForCluster delete bootstrap agent
|
||||
func (s *clusterService) deleteBootstrapAgentForCluster(ctx context.Context, cluster *infrav3.Cluster) error {
|
||||
|
||||
resp, err := s.bs.SelectBootstrapAgentTemplates(ctx, query.WithOptions(&commonv3.QueryOptions{
|
||||
GlobalScope: true,
|
||||
Selector: "paralus.dev/defaultRelay=true",
|
||||
}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, bat := range resp.Items {
|
||||
|
||||
agent := &sentry.BootstrapAgent{
|
||||
Metadata: &commonv3.Metadata{
|
||||
Id: cluster.Metadata.Id,
|
||||
Name: cluster.Metadata.Name,
|
||||
Partner: cluster.Metadata.Partner,
|
||||
Organization: cluster.Metadata.Organization,
|
||||
Project: cluster.Metadata.Project,
|
||||
},
|
||||
Spec: &sentry.BootstrapAgentSpec{
|
||||
TemplateRef: fmt.Sprintf("template/%s", bat.Metadata.Name),
|
||||
},
|
||||
}
|
||||
|
||||
templateRef, err := sentryutil.GetTemplateScope(agent.Spec.TemplateRef)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.bs.DeleteBootstrapAgent(ctx, templateRef, query.WithMeta(agent.Metadata))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateForCluster creates bootstrap agent for cluster
|
||||
func (s *clusterService) CreateBootstrapAgentForCluster(ctx context.Context, cluster *infrav3.Cluster) error {
|
||||
var relays []common.Relay
|
||||
|
||||
49
pkg/service/deleterelayservice.go
Normal file
49
pkg/service/deleterelayservice.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
func DeleteRelayAgent(ctx context.Context, kubeConfig []byte, namespace string) bool {
|
||||
|
||||
config, err := clientcmd.NewClientConfigFromBytes(kubeConfig)
|
||||
if err != nil {
|
||||
_log.Errorf("Unable to build kube configuration %s", err.Error())
|
||||
return false
|
||||
}
|
||||
clientConfig, err := config.ClientConfig()
|
||||
if err != nil {
|
||||
_log.Errorf("Unable to get client config %s", err.Error())
|
||||
return false
|
||||
}
|
||||
clientSet, err := kubernetes.NewForConfig(clientConfig)
|
||||
if err != nil {
|
||||
_log.Errorf("Unable to clientset %s", err.Error())
|
||||
return false
|
||||
}
|
||||
status, err := processDeleteDeployment(ctx, clientSet, namespace)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return status
|
||||
}
|
||||
|
||||
func processDeleteDeployment(ctx context.Context, clientset *kubernetes.Clientset, ns string) (bool, error) {
|
||||
err := clientset.AppsV1().Deployments(ns).Delete(ctx, "relay-agent", v1.DeleteOptions{})
|
||||
if err != nil {
|
||||
_log.Errorf("Error while deleting deployment %s", err.Error())
|
||||
return false, err
|
||||
}
|
||||
err = clientset.CoreV1().ConfigMaps(ns).Delete(ctx, "relay-agent-config", v1.DeleteOptions{})
|
||||
if err != nil {
|
||||
_log.Errorf("Error while deleting ConfigMap %s", err.Error())
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
Reference in New Issue
Block a user