mirror of
https://github.com/stakater/Reloader.git
synced 2026-02-14 18:09:50 +00:00
Implement PR-1 code review comments
This commit is contained in:
@@ -3,13 +3,14 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/stakater/Reloader/internal/pkg/controller"
|
"github.com/stakater/Reloader/internal/pkg/controller"
|
||||||
"github.com/stakater/Reloader/pkg/kube"
|
"github.com/stakater/Reloader/pkg/kube"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// NewReloaderCommand starts the reloader controller
|
||||||
func NewReloaderCommand() *cobra.Command {
|
func NewReloaderCommand() *cobra.Command {
|
||||||
cmds := &cobra.Command{
|
cmds := &cobra.Command{
|
||||||
Use: "reloader",
|
Use: "reloader",
|
||||||
|
|||||||
@@ -1,36 +1,35 @@
|
|||||||
package controller
|
package controller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/client-go/kubernetes"
|
"github.com/sirupsen/logrus"
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"github.com/stakater/Reloader/internal/pkg/upgrader"
|
||||||
|
"github.com/stakater/Reloader/pkg/kube"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
errorHandler "k8s.io/apimachinery/pkg/util/runtime"
|
errorHandler "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"github.com/stakater/Reloader/pkg/kube"
|
|
||||||
"github.com/stakater/Reloader/internal/pkg/upgrader"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Event indicate the informerEvent
|
// Event indicate the informerEvent
|
||||||
type Event struct {
|
type Event struct {
|
||||||
key string
|
key string
|
||||||
eventType string
|
eventType string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Controller for checking events
|
// Controller for checking events
|
||||||
type Controller struct {
|
type Controller struct {
|
||||||
client kubernetes.Interface
|
client kubernetes.Interface
|
||||||
indexer cache.Indexer
|
indexer cache.Indexer
|
||||||
queue workqueue.RateLimitingInterface
|
queue workqueue.RateLimitingInterface
|
||||||
informer cache.Controller
|
informer cache.Controller
|
||||||
resource string
|
resource string
|
||||||
namespace string
|
namespace string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewController for initializing a Controller
|
// NewController for initializing a Controller
|
||||||
@@ -38,15 +37,15 @@ func NewController(
|
|||||||
client kubernetes.Interface, resource string, namespace string) (*Controller, error) {
|
client kubernetes.Interface, resource string, namespace string) (*Controller, error) {
|
||||||
|
|
||||||
c := Controller{
|
c := Controller{
|
||||||
client: client,
|
client: client,
|
||||||
resource: resource,
|
resource: resource,
|
||||||
namespace: namespace,
|
namespace: namespace,
|
||||||
}
|
}
|
||||||
|
|
||||||
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
|
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
|
||||||
listWatcher := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), resource, namespace, fields.Everything())
|
listWatcher := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), resource, namespace, fields.Everything())
|
||||||
|
|
||||||
indexer, informer := cache.NewIndexerInformer(listWatcher, kube.ResourceMap[resource], 0, cache.ResourceEventHandlerFuncs {
|
indexer, informer := cache.NewIndexerInformer(listWatcher, kube.ResourceMap[resource], 0, cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: c.Add,
|
AddFunc: c.Add,
|
||||||
UpdateFunc: c.Update,
|
UpdateFunc: c.Update,
|
||||||
}, cache.Indexers{})
|
}, cache.Indexers{})
|
||||||
@@ -144,10 +143,10 @@ func (c *Controller) takeAction(event Event) error {
|
|||||||
u, _ := upgrader.NewUpgrader(c.client, c.resource)
|
u, _ := upgrader.NewUpgrader(c.client, c.resource)
|
||||||
if c.resource == "configMaps" {
|
if c.resource == "configMaps" {
|
||||||
switch event.eventType {
|
switch event.eventType {
|
||||||
case "create":
|
case "create":
|
||||||
u.ObjectCreated(obj)
|
u.ObjectCreated(obj)
|
||||||
case "update":
|
case "update":
|
||||||
u.ObjectUpdated(obj)
|
u.ObjectUpdated(obj)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -178,4 +177,4 @@ func (c *Controller) handleErr(err error, key interface{}) {
|
|||||||
// Report to an external entity that, even after several retries, we could not successfully process this key
|
// Report to an external entity that, even after several retries, we could not successfully process this key
|
||||||
runtime.HandleError(err)
|
runtime.HandleError(err)
|
||||||
logrus.Infof("Dropping the key %q out of the queue: %v", key, err)
|
logrus.Infof("Dropping the key %q out of the queue: %v", key, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,21 +1,20 @@
|
|||||||
package controller
|
package controller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stakater/Reloader/pkg/kube"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
"github.com/stakater/Reloader/pkg/kube"
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
client, err = kube.GetClient()
|
client, err = kube.GetClient()
|
||||||
configmapNamePrefix = "testconfigmap-reloader"
|
configmapNamePrefix = "testconfigmap-reloader"
|
||||||
letters = []rune("abcdefghijklmnopqrstuvwxyz")
|
letters = []rune("abcdefghijklmnopqrstuvwxyz")
|
||||||
)
|
)
|
||||||
|
|
||||||
func randSeq(n int) string {
|
func randSeq(n int) string {
|
||||||
@@ -42,8 +41,8 @@ func TestControllerForUpdatingConfigmapShouldUpdateDeployment(t *testing.T) {
|
|||||||
defer close(stop)
|
defer close(stop)
|
||||||
go controller.Run(1, stop)
|
go controller.Run(1, stop)
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
|
|
||||||
configmapName := configmapNamePrefix + "-withoutresources-update-" + randSeq(5)
|
configmapName := configmapNamePrefix + "-update-" + randSeq(5)
|
||||||
configmapClient := client.CoreV1().ConfigMaps(namespace)
|
configmapClient := client.CoreV1().ConfigMaps(namespace)
|
||||||
configmap := initConfigmap(namespace, configmapName)
|
configmap := initConfigmap(namespace, configmapName)
|
||||||
configmap, err = configmapClient.Create(configmap)
|
configmap, err = configmapClient.Create(configmap)
|
||||||
@@ -61,9 +60,8 @@ func TestControllerForUpdatingConfigmapShouldUpdateDeployment(t *testing.T) {
|
|||||||
configmap = updateConfigmap(namespace, configmapName)
|
configmap = updateConfigmap(namespace, configmapName)
|
||||||
_, updateErr := configmapClient.Update(configmap)
|
_, updateErr := configmapClient.Update(configmap)
|
||||||
|
|
||||||
|
|
||||||
// TODO: Add functionality to verify reloader functionality here
|
// TODO: Add functionality to verify reloader functionality here
|
||||||
|
|
||||||
if updateErr != nil {
|
if updateErr != nil {
|
||||||
controller.client.CoreV1().ConfigMaps(namespace).Delete(configmapName, &metav1.DeleteOptions{})
|
controller.client.CoreV1().ConfigMaps(namespace).Delete(configmapName, &metav1.DeleteOptions{})
|
||||||
panic(updateErr)
|
panic(updateErr)
|
||||||
@@ -79,9 +77,9 @@ func initConfigmap(namespace string, configmapName string) *v1.ConfigMap {
|
|||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: configmapName,
|
Name: configmapName,
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Labels: map[string]string{"firstLabel": "temp"},
|
Labels: map[string]string{"firstLabel": "temp"},
|
||||||
},
|
},
|
||||||
Data: map[string]string{"test.url":"www.google.com"},
|
Data: map[string]string{"test.url": "www.google.com"},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -89,6 +87,8 @@ func createNamespace(t *testing.T, namespace string) {
|
|||||||
_, err := client.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}})
|
_, err := client.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error("Failed to create namespace for testing", err)
|
t.Error("Failed to create namespace for testing", err)
|
||||||
|
} else {
|
||||||
|
logrus.Infof("Creating namespace for testing = %s", namespace)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -96,6 +96,8 @@ func deleteNamespace(t *testing.T, namespace string) {
|
|||||||
err := client.CoreV1().Namespaces().Delete(namespace, &metav1.DeleteOptions{})
|
err := client.CoreV1().Namespaces().Delete(namespace, &metav1.DeleteOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error("Failed to delete namespace that was created for testing", err)
|
t.Error("Failed to delete namespace that was created for testing", err)
|
||||||
|
} else {
|
||||||
|
logrus.Infof("Deleting namespace for testing = %s", namespace)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -104,8 +106,8 @@ func updateConfigmap(namespace string, configmapName string) *v1.ConfigMap {
|
|||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: configmapName,
|
Name: configmapName,
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Labels: map[string]string{"firstLabel": "temp"},
|
Labels: map[string]string{"firstLabel": "temp"},
|
||||||
},
|
},
|
||||||
Data: map[string]string{"test.url":"www.stakater.com"},
|
Data: map[string]string{"test.url": "www.stakater.com"},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Upgrader will upgrade the relevent deployment, deamonset and deamonset.
|
// Upgrader will upgrade the relevent deployment, deamonset and deamonset.
|
||||||
type Upgrader struct{
|
type Upgrader struct {
|
||||||
client kubernetes.Interface
|
client kubernetes.Interface
|
||||||
resourceType string
|
resourceType string
|
||||||
}
|
}
|
||||||
@@ -32,8 +32,8 @@ func NewUpgrader(client kubernetes.Interface, resourceType string) (*Upgrader, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ObjectCreated Detects if the configmap or secret has been created
|
// ObjectCreated Detects if the configmap or secret has been created
|
||||||
func (u *Upgrader)ObjectCreated(obj interface{}) {
|
func (u *Upgrader) ObjectCreated(obj interface{}) {
|
||||||
message := u.resourceType+": `" + obj.(*v1.ConfigMap).Name + "`has been created in Namespace: `" + obj.(*v1.ConfigMap).Namespace + "`"
|
message := u.resourceType + ": `" + obj.(*v1.ConfigMap).Name + "`has been created in Namespace: `" + obj.(*v1.ConfigMap).Namespace + "`"
|
||||||
logrus.Infof(message)
|
logrus.Infof(message)
|
||||||
err := rollingUpgradeDeployments(obj, u.client)
|
err := rollingUpgradeDeployments(obj, u.client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -42,8 +42,8 @@ func (u *Upgrader)ObjectCreated(obj interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ObjectUpdated Detects if the configmap or secret has been updated
|
// ObjectUpdated Detects if the configmap or secret has been updated
|
||||||
func (u *Upgrader)ObjectUpdated(oldObj interface{}) {
|
func (u *Upgrader) ObjectUpdated(oldObj interface{}) {
|
||||||
message := u.resourceType+": `" + oldObj.(*v1.ConfigMap).Name + "`has been updated in Namespace: `" + oldObj.(*v1.ConfigMap).Namespace + "`"
|
message := u.resourceType + ": `" + oldObj.(*v1.ConfigMap).Name + "`has been updated in Namespace: `" + oldObj.(*v1.ConfigMap).Namespace + "`"
|
||||||
logrus.Infof(message)
|
logrus.Infof(message)
|
||||||
err := rollingUpgradeDeployments(oldObj, u.client)
|
err := rollingUpgradeDeployments(oldObj, u.client)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -58,7 +58,7 @@ func rollingUpgradeDeployments(oldObj interface{}, client kubernetes.Interface)
|
|||||||
configMapName := oldObj.(*v1.ConfigMap).Name
|
configMapName := oldObj.(*v1.ConfigMap).Name
|
||||||
configMapVersion := convertConfigMapToToken(oldObj.(*v1.ConfigMap))
|
configMapVersion := convertConfigMapToToken(oldObj.(*v1.ConfigMap))
|
||||||
|
|
||||||
deployments, err := client.AppsV1().Deployments(ns).List(meta_v1.ListOptions{})
|
deployments, err := client.Apps().Deployments(ns).List(meta_v1.ListOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "failed to list deployments")
|
return errors.Wrap(err, "failed to list deployments")
|
||||||
}
|
}
|
||||||
@@ -79,7 +79,7 @@ func rollingUpgradeDeployments(oldObj interface{}, client kubernetes.Interface)
|
|||||||
updateContainers(containers, annotationValue, configMapVersion)
|
updateContainers(containers, annotationValue, configMapVersion)
|
||||||
|
|
||||||
// update the deployment
|
// update the deployment
|
||||||
_, err := client.AppsV1().Deployments(ns).Update(&d)
|
_, err := client.Apps().Deployments(ns).Update(&d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "update deployment failed")
|
return errors.Wrap(err, "update deployment failed")
|
||||||
}
|
}
|
||||||
@@ -156,4 +156,4 @@ func convertConfigMapToToken(cm *v1.ConfigMap) string {
|
|||||||
// we could zip and base64 encode
|
// we could zip and base64 encode
|
||||||
// but for now we could leave this easy to read so that its easier to diagnose when & why things changed
|
// but for now we could leave this easy to read so that its easier to diagnose when & why things changed
|
||||||
return text
|
return text
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user