Update controller to remove unnecessary code

This commit is contained in:
faizanahmad055
2018-07-09 20:23:15 +05:00
parent ae4fabc635
commit b79f90e709
9 changed files with 116 additions and 399 deletions

View File

@@ -4,6 +4,8 @@ This controller watches for changes to `ConfigMap` and `Secret` objects and perf
This is particularly useful if the `ConfigMap` is used to define environment variables - or your app cannot easily and reliably watch the `ConfigMap` and update itself on the fly.
**NOTE:** This controller has been inspired from [configmapController](https://github.com/fabric8io/configmapcontroller)
## How to use Reloader
For a `Deployment` called `foo` have a `ConfigMap` called `foo`. Then add this annotation to your `Deployment`
@@ -11,13 +13,13 @@ For a `Deployment` called `foo` have a `ConfigMap` called `foo`. Then add this a
```yaml
metadata:
annotations:
configmap.fabric8.io/update-on-change: "foo"
reloader.stakater.com/update-on-change: "foo"
```
Then, providing `Reloader` is running, whenever you edit the `ConfigMap` called `foo` the Reloader will update the `Deployment` by adding the environment variable:
```
FABRICB_FOO_REVISION=${configMapRevision}
STAKATER_FOO_REVISION=${reloaderRevision}
```
This then triggers a rolling upgrade of your deployment's pods to use the new configuration.
This then triggers a rolling upgrade of your deployment's pods to use the new configuration.

View File

@@ -1,5 +1,5 @@
FROM scratch
ENTRYPOINT ["/reloader"]
ENTRYPOINT ["/Reloader"]
COPY ./reloader /
COPY ./Reloader /

2
configs/config.yaml Normal file
View File

@@ -0,0 +1,2 @@
controllers:
- type: pods

View File

@@ -1,35 +1,14 @@
package: .
package: github.com/stakater/Reloader
import:
- package: github.com/openshift/origin
version: v1.3.0
subpackages:
- pkg/client
- package: k8s.io/api
version: kubernetes-1.8.0
- package: k8s.io/apimachinery
version: kubernetes-1.8.0
- package: k8s.io/client-go
version: 5.0.0
- package: github.com/spf13/cobra
version: ef82de70bb3f60c65fb8eebacbb2d122ef517385
- package: github.com/spf13/pflag
- package: k8s.io/kubernetes
version: 52492b4bff99ef3b8ca617d385a3ff0612f9402d
repo: git://github.com/openshift/kubernetes.git
vcs: git
subpackages:
- pkg/api
- pkg/api/resource
- pkg/api/unversioned
- pkg/client/unversioned
- pkg/fields
- pkg/kubectl/cmd
- pkg/kubectl/cmd/util
- pkg/labels
- pkg/runtime
- pkg/util
- package: github.com/opencontainers/runc
version: v0.0.7
subpackages:
- libcontainer
- package: github.com/imdario/mergo
version: 6633656539c1639d9d78127b7d47c622b5d7b6dc
- package: github.com/Sirupsen/logrus
version: aaf92c95712104318fc35409745f1533aa5ff327
- package: github.com/docker/distribution
version: 559433598c7be9d30d6cfc5cad5b5dfdb686725c
repo: git://github.com/openshift/docker-distribution.git
vcs: git
version: 583c0c0531f06d5278b7d917446061adc344b5cd
- package: github.com/sirupsen/logrus
version: ~1.0.3

View File

@@ -1,49 +0,0 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package client
import (
oclient "github.com/openshift/origin/pkg/client"
"github.com/pkg/errors"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
)
func NewClient(f *cmdutil.Factory) (*client.Client, *restclient.Config, error) {
var err error
cfg, err := f.ClientConfig()
if err != nil {
return nil, nil, errors.Wrap(err, "Could not initialise client")
}
c, err := client.New(cfg)
if err != nil {
return nil, nil, errors.Wrap(err, "Could not initialise client")
}
return c, cfg, nil
}
func NewOpenShiftClient(cfg *restclient.Config) (*oclient.Client, *restclient.Config, error) {
ocfg := *cfg
ocfg.APIPath = ""
c, err := oclient.New(&ocfg)
if err != nil {
return nil, nil, errors.Wrap(err, "Could not initialise an OpenShift client")
}
return c, cfg, nil
}

View File

@@ -1,25 +1,13 @@
package main
package cmd
import (
"flag"
"fmt"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
"github.com/spf13/cobra"
"github.com/stakater/Reloader/internal/pkg/client"
"github.com/stakater/Reloader/internal/pkg/controller"
"github.com/stakater/Reloader/internal/pkg/util"
"github.com/stakater/Reloader/pkg/kube"
"github.com/golang/glog"
oclient "github.com/openshift/origin/pkg/client"
"github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/api"
kubectlutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"github.com/stakater/Reloader/internal/pkg/config"
"github.com/sirupsen/logrus"
)
func NewReloaderCommand() *cobra.Command {
@@ -31,42 +19,24 @@ func NewReloaderCommand() *cobra.Command {
return cmds
}
const (
healthPort = 10254
)
var (
flags = pflag.NewFlagSet("", pflag.ExitOnError)
resyncPeriod = flags.Duration("sync-period", 30*time.Second,
`Relist and confirm services this often.`)
healthzPort = flags.Int("healthz-port", healthPort, "port for healthz endpoint.")
profiling = flags.Bool("profiling", true, `Enable profiling via web interface host:port/debug/pprof/`)
)
func startReloader(cmd *cobra.Command, args []string) {
glog.Println("Starting Reloader")
logrus.Info("Starting Reloader")
// create the clientset
clientset, err := kube.GetClient()
if err != nil {
log.Fatal(err)
logrus.Fatal(err)
}
// get the Controller config file
config := getControllerConfig()
for k, v := range kube.ResourceMap {
c, err := controller.NewController(clientset, *resyncPeriod, config, v)
for _, v := range kube.ResourceMap {
c, err := controller.NewController(clientset, config.Controllers[0], v)
if err != nil {
glog.Fatalf("%s", err)
logrus.Fatalf("%s", err)
}
go registerHandlers()
go handleSigterm(c)
// Now let's start the controller
stop := make(chan struct{})
defer close(stop)
@@ -78,30 +48,6 @@ func startReloader(cmd *cobra.Command, args []string) {
select {}
}
func registerHandlers() {
mux := http.NewServeMux()
if *profiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
}
server := &http.Server{
Addr: fmt.Sprintf(":%v", *healthzPort),
Handler: mux,
}
glog.Fatal(server.ListenAndServe())
}
func handleSigterm(c *controller.Controller) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
sig := <-signalChan
glog.Infof("Received %s, shutting down", sig)
c.Stop()
}
// get the yaml configuration for the controller
func getControllerConfig() config.Config {
configFilePath := os.Getenv("CONFIG_FILE_PATH")
@@ -111,7 +57,7 @@ func getControllerConfig() config.Config {
}
configuration, err := config.ReadConfig(configFilePath)
if err != nil {
log.Panic(err)
logrus.Panic(err)
}
return configuration
}

View File

@@ -0,0 +1,62 @@
package config
import (
"io/ioutil"
yaml "gopkg.in/yaml.v2"
)
// Config which would be read from the config.yaml
type Config struct {
Controllers []Controller
}
// Controller which contains all the information for a specific controller
type Controller struct {
Type string
WatchCriterion Criterion
Actions []Action
}
// Criterion as to what fields should the controller be looking at
type Criterion struct {
Operator string
Identifiers []string
}
// Action that the controller will be taking based on the Parameters
type Action struct {
Name string
Params map[interface{}]interface{}
}
// ReadConfig function that reads the yaml file
func ReadConfig(filePath string) (Config, error) {
var config Config
// Read YML
source, err := ioutil.ReadFile(filePath)
if err != nil {
return config, err
}
// Unmarshall
err = yaml.Unmarshal(source, &config)
if err != nil {
return config, err
}
return config, nil
}
// WriteConfig function that can write to the yaml file
func WriteConfig(config Config, path string) error {
b, err := yaml.Marshal(config)
if err != nil {
return err
}
err = ioutil.WriteFile(path, b, 0644)
if err != nil {
return err
}
return nil
}

View File

@@ -1,32 +1,23 @@
package controller
import (
"bytes"
"strings"
"time"
"fmt"
"github.com/stakater/Reloader/internal/pkg/actions"
"github.com/golang/glog"
"github.com/pkg/errors"
"k8s.io/kubernetes/pkg/api"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes"
"k8s.io/apimachinery/pkg/util/runtime"
informerruntime "k8s.io/apimachinery/pkg/runtime"
"github.com/stakater/Reloader/internal/pkg/config"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/apimachinery/pkg/util/wait"
errorHandler "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sort"
"github.com/sirupsen/logrus"
)
const (
updateOnChangeAnnotation = "configmap.fabric8.io/update-on-change"
updateOnChangeAnnotation = "reloader.stakater.com.io/update-on-change"
// AllNamespaces as our controller will be looking for events in all namespaces
AllNamespaces = ""
)
@@ -41,90 +32,40 @@ type Event struct {
// Controller for checking events
type Controller struct {
clientset clientset.Interface
client kubernetes.Interface
indexer cache.Indexer
queue workqueue.RateLimitingInterface
informer cache.Controller
controllerConfig config.Controller
Actions []actions.Action
stopCh chan struct{}
}
// NewController for initializing a Controller
func NewController(
clientset clientset.Interface,
resyncPeriod time.Duration, controllerConfig config.Controller, objType informerruntime.Object) (*Controller, error) {
client kubernetes.Interface,
controllerConfig config.Controller, objType informerruntime.Object) (*Controller, error) {
c := Controller{
clientset: clientset,
client: client,
controllerConfig: controllerConfig,
stopCh: make(chan struct{}),
}
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
listWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), controllerConfig.Type, AllNamespaces, fields.Everything())
listWatcher := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), controllerConfig.Type, AllNamespaces, fields.Everything())
indexer, informer := cache.NewIndexerInformer(listWatcher, objType, 0, cache.ResourceEventHandlerFuncs {
AddFunc: c.Add,
UpdateFunc: c.Update,
DeleteFunc: c.Delete,
}, cache.Indexers{})
/*c.cmLister.Store, c.cmController = framework.NewInformer(
&cache.ListWatch{
ListFunc: configMapListFunc(c.client, namespace),
WatchFunc: configMapWatchFunc(c.client, namespace),
},
&api.ConfigMap{},
resyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
newCM := obj.(*api.ConfigMap)
//typeOfMaster, err := util.TypeOfMaster(kubeClient)
if err != nil {
glog.Fatalf("failed to create REST client config: %s", err)
}
err = rollingUpgradeDeployments(newCM, kubeClient)
if err != nil {
glog.Errorf("failed to update Deployment: %v", err)
}
},
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
oldM := oldObj.(*api.ConfigMap)
newCM := newObj.(*api.ConfigMap)
if oldM.ResourceVersion != newCM.ResourceVersion {
//typeOfMaster, err := util.TypeOfMaster(kubeClient)
if err != nil {
glog.Fatalf("failed to create REST client config: %s", err)
}
err = rollingUpgradeDeployments(newCM, kubeClient)
if err != nil {
glog.Errorf("failed to update Deployment: %v", err)
}
}
},
},
)*/
c.indexer = indexer
c.informer = informer
c.queue = queue
return &c, nil
}
// Run starts the controller.
/*func (c *Controller) Run() {
glog.Infof("starting reloader")
<-c.stopCh
}*/
// Stop stops the controller.
/*func (c *Controller) Stop() {
glog.Infof("stopping reloader")
close(c.stopCh)
}*/
// Add function to add a 'create' event to the queue in case of creating a pod
func (c *Controller) Add(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
@@ -143,6 +84,8 @@ func (c *Controller) Update(old interface{}, new interface{}) {
var event Event
if err == nil {
event.key = key
event.eventType = "update"
c.queue.Add(event)
}
}
@@ -155,7 +98,7 @@ func (c *Controller) Delete(obj interface{}) {
//Run function for controller which handles the queue
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
glog.Infof("Starting Controller for type ", c.controllerConfig.Type)
logrus.Infof("Starting Controller for type ", c.controllerConfig.Type)
defer errorHandler.HandleCrash()
// Let the workers stop when we are done
@@ -174,7 +117,7 @@ func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
}
<-stopCh
glog.Infof("Stopping Controller for type ", c.controllerConfig.Type)
logrus.Infof("Stopping Controller for type ", c.controllerConfig.Type)
}
func (c *Controller) runWorker() {
@@ -205,16 +148,16 @@ func (c *Controller) takeAction(event Event) error {
obj, _, err := c.indexer.GetByKey(event.key)
if err != nil {
glog.Infof("Fetching object with key %s from store failed with %v", event.key, err)
logrus.Infof("Fetching object with key %s from store failed with %v", event.key, err)
}
if obj == nil {
glog.Infof("Error in Action")
logrus.Infof("Error in Action")
} else {
glog.Infof("Detected changes in object %s", obj)
/*glog.Infof("Resource block not found, performing actions")
logrus.Infof("Detected changes in object %s", obj)
/*logrus.Infof("Resource block not found, performing actions")
// process events based on its type
for index, action := range c.Actions {
glog.Infof("Performing '%s' action for controller of type '%s'", c.controllerConfig.Actions[index].Name, c.controllerConfig.Type)
gllogrusog.Infof("Performing '%s' action for controller of type '%s'", c.controllerConfig.Actions[index].Name, c.controllerConfig.Type)
switch event.eventType {
case "create":
action.ObjectCreated(obj)
@@ -241,7 +184,7 @@ func (c *Controller) handleErr(err error, key interface{}) {
// This controller retries 5 times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < 5 {
log.Printf("Error syncing events %v: %v", key, err)
logrus.Infof("Error syncing events %v: %v", key, err)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
@@ -252,122 +195,5 @@ func (c *Controller) handleErr(err error, key interface{}) {
c.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
runtime.HandleError(err)
log.Printf("Dropping the key %q out of the queue: %v", key, err)
}
/*func configMapListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
return c.ConfigMaps(ns).List(opts)
}
}
func configMapWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) {
return func(options api.ListOptions) (watch.Interface, error) {
return c.ConfigMaps(ns).Watch(options)
}
}
func rollingUpgradeDeployments(cm *api.ConfigMap, c *client.Client) error {
ns := cm.Namespace
configMapName := cm.Name
configMapVersion := convertConfigMapToToken(cm)
deployments, err := c.Deployments(ns).List(api.ListOptions{})
if err != nil {
return errors.Wrap(err, "failed to list deployments")
}
for _, d := range deployments.Items {
containers := d.Spec.Template.Spec.Containers
// match deployments with the correct annotation
annotationValue, _ := d.ObjectMeta.Annotations[updateOnChangeAnnotation]
if annotationValue != "" {
values := strings.Split(annotationValue, ",")
matches := false
for _, value := range values {
if value == configMapName {
matches = true
break
}
}
if matches {
updateContainers(containers, annotationValue, configMapVersion)
// update the deployment
_, err := c.Deployments(ns).Update(&d)
if err != nil {
return errors.Wrap(err, "update deployment failed")
}
glog.Infof("Updated Deployment %s", d.Name)
}
}
}
return nil
}*/
// lets convert the configmap into a unique token based on the data values
func convertConfigMapToToken(cm *api.ConfigMap) string {
values := []string{}
for k, v := range cm.Data {
values = append(values, k+"="+v)
}
sort.Strings(values)
text := strings.Join(values, ";")
// we could zip and base64 encode
// but for now we could leave this easy to read so that its easier to diagnose when & why things changed
return text
}
func updateContainers(containers []api.Container, annotationValue, configMapVersion string) bool {
// we can have multiple configmaps to update
answer := false
configmaps := strings.Split(annotationValue, ",")
for _, cmNameToUpdate := range configmaps {
configmapEnvar := "FABRIC8_" + convertToEnvVarName(cmNameToUpdate) + "_CONFIGMAP"
for i := range containers {
envs := containers[i].Env
matched := false
for j := range envs {
if envs[j].Name == configmapEnvar {
matched = true
if envs[j].Value != configMapVersion {
glog.Infof("Updating %s to %s", configmapEnvar, configMapVersion)
envs[j].Value = configMapVersion
answer = true
}
}
}
// if no existing env var exists lets create one
if !matched {
e := api.EnvVar{
Name: configmapEnvar,
Value: configMapVersion,
}
containers[i].Env = append(containers[i].Env, e)
answer = true
}
}
}
return answer
}
// convertToEnvVarName converts the given text into a usable env var
// removing any special chars with '_'
func convertToEnvVarName(text string) string {
var buffer bytes.Buffer
lower := strings.ToUpper(text)
lastCharValid := false
for i := 0; i < len(lower); i++ {
ch := lower[i]
if (ch >= 'A' && ch <= 'Z') || (ch >= '0' && ch <= '9') {
buffer.WriteString(string(ch))
lastCharValid = true
} else {
if lastCharValid {
buffer.WriteString("_")
}
lastCharValid = false
}
}
return buffer.String()
}
logrus.Infof("Dropping the key %q out of the queue: %v", key, err)
}

View File

@@ -1,51 +0,0 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package util
import (
"encoding/json"
"github.com/pkg/errors"
api "k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
)
type MasterType string
const (
OpenShift MasterType = "OpenShift"
Kubernetes MasterType = "Kubernetes"
)
func TypeOfMaster(c *client.Client) (MasterType, error) {
res, err := c.Get().AbsPath("").DoRaw()
if err != nil {
return "", errors.Wrap(err, "could not discover the type of your installation")
}
var rp api.RootPaths
err = json.Unmarshal(res, &rp)
if err != nil {
errors.Wrap(err, "could not discover the type of your installation")
}
for _, p := range rp.Paths {
if p == "/oapi" {
return OpenShift, nil
}
}
return Kubernetes, nil
}