Compare commits

...

7 Commits
3.3.1 ... v3.x

Author SHA1 Message Date
Stefan Prodan
713d1094a2 Merge pull request #71 from stefanprodan/register-instance
Register hostname and version in cache
2020-05-20 13:57:19 +03:00
stefanprodan
3197ad3e45 Register hostname and version in cache
If the caching server is online, podinfo registers its hostname and version in Redis. The set expires after one minute and it's refreshed every 30 seconds.
2020-05-20 13:51:07 +03:00
Stefan Prodan
92f415d633 Merge pull request #70 from stefanprodan/redis-cache-api
Add cache CRUD API
2020-05-20 13:15:03 +03:00
stefanprodan
0352a3c822 Add Helm test for the cache routes 2020-05-20 13:05:50 +03:00
stefanprodan
5ba5808722 Add cache CRUD API 2020-05-20 12:59:27 +03:00
Stefan Prodan
1d416a8513 Merge pull request #69 from seaneagan/helm2and3tests
Reverts tests as Jobs
2020-05-20 12:04:19 +03:00
Sean Eagan
95028a0fb0 Reverts tests as Jobs
This reverts the #61 change to use test Jobs, which was premature
since this feature hasn't been back ported to Helm 2 yet, which
leads to the tests not being run there.

It would be possible to use presence of .Capabilities.TillerVersion
to implement tests differently for Helm 2 vs 3, but this seems
not worth the trouble.
2020-05-19 15:27:08 -05:00
12 changed files with 264 additions and 181 deletions

View File

@@ -41,8 +41,9 @@ Web API:
* `POST /token` issues a JWT token valid for one minute `JWT=$(curl -sd 'anon' podinfo:9898/token | jq -r .token)`
* `GET /token/validate` validates the JWT token `curl -H "Authorization: Bearer $JWT" podinfo:9898/token/validate`
* `GET /configs` returns a JSON with configmaps and/or secrets mounted in the `config` volume
* `POST /cache` saves the posted content to Redis and returns the SHA1 hash of the content
* `GET /cache/{hash}` returns the content from Redis if the key exists
* `POST/PUT /cache/{key}` saves the posted content to Redis
* `GET /cache/{key}` returns the content from Redis if the key exists
* `DELETE /cache/{key}` deletes the key from Redis if exists
* `POST /store` writes the posted content to disk at /data/hash and returns the SHA1 hash of the content
* `GET /store/{hash}` returns the content of the file /data/hash if exists
* `GET /ws/echo` echos content via websockets `podcli ws ws://localhost:9898/ws/echo`

View File

@@ -0,0 +1,32 @@
{{- if .Values.cache }}
apiVersion: v1
kind: Pod
metadata:
name: {{ template "podinfo.fullname" . }}-cache-test-{{ randAlphaNum 5 | lower }}
labels:
heritage: {{ .Release.Service }}
release: {{ .Release.Name }}
chart: {{ .Chart.Name }}-{{ .Chart.Version }}
app: {{ template "podinfo.name" . }}
annotations:
"helm.sh/hook": test
"helm.sh/hook-delete-policy": before-hook-creation,hook-succeeded
sidecar.istio.io/inject: "false"
linkerd.io/inject: disabled
appmesh.k8s.aws/sidecarInjectorWebhook: disabled
spec:
containers:
- name: curl
image: curlimages/curl:7.69.0
command:
- sh
- -c
- |
curl -sd 'data' ${PODINFO_SVC}/cache/test &&
curl -s ${PODINFO_SVC}/cache/test | grep data &&
curl -s -XDELETE ${PODINFO_SVC}/cache/test
env:
- name: PODINFO_SVC
value: "{{ template "podinfo.fullname" . }}.{{ .Release.Namespace }}:{{ .Values.service.externalPort }}"
restartPolicy: Never
{{- end }}

View File

@@ -1,6 +1,6 @@
{{- if .Values.faults.testFail }}
apiVersion: batch/v1
kind: Job
apiVersion: v1
kind: Pod
metadata:
name: {{ template "podinfo.fullname" . }}-fault-test-{{ randAlphaNum 5 | lower }}
labels:
@@ -15,12 +15,10 @@ metadata:
linkerd.io/inject: disabled
appmesh.k8s.aws/sidecarInjectorWebhook: disabled
spec:
template:
spec:
containers:
- name: fault
image: alpine:3.11
command: ['/bin/sh']
args: ['-c', 'exit 1']
restartPolicy: Never
containers:
- name: fault
image: alpine:3.11
command: ['/bin/sh']
args: ['-c', 'exit 1']
restartPolicy: Never
{{- end }}

View File

@@ -1,5 +1,5 @@
apiVersion: batch/v1
kind: Job
apiVersion: v1
kind: Pod
metadata:
name: {{ template "podinfo.fullname" . }}-grpc-test-{{ randAlphaNum 5 | lower }}
labels:
@@ -14,11 +14,9 @@ metadata:
linkerd.io/inject: disabled
appmesh.k8s.aws/sidecarInjectorWebhook: disabled
spec:
template:
spec:
containers:
- name: grpc-health-probe
image: stefanprodan/grpc_health_probe:v0.3.0
command: ['grpc_health_probe']
args: ['-addr={{ template "podinfo.fullname" . }}.{{ .Release.Namespace }}:{{ .Values.service.grpcPort }}']
restartPolicy: Never
containers:
- name: grpc-health-probe
image: stefanprodan/grpc_health_probe:v0.3.0
command: ['grpc_health_probe']
args: ['-addr={{ template "podinfo.fullname" . }}.{{ .Release.Namespace }}:{{ .Values.service.grpcPort }}']
restartPolicy: Never

View File

@@ -1,5 +1,5 @@
apiVersion: batch/v1
kind: Job
apiVersion: v1
kind: Pod
metadata:
name: {{ template "podinfo.fullname" . }}-jwt-test-{{ randAlphaNum 5 | lower }}
labels:
@@ -14,18 +14,16 @@ metadata:
linkerd.io/inject: disabled
appmesh.k8s.aws/sidecarInjectorWebhook: disabled
spec:
template:
spec:
containers:
- name: tools
image: giantswarm/tiny-tools
command:
- sh
- -c
- |
TOKEN=$(curl -sd 'test' ${PODINFO_SVC}/token | jq -r .token) &&
curl -sH "Authorization: Bearer ${TOKEN}" ${PODINFO_SVC}/token/validate | grep test
env:
- name: PODINFO_SVC
value: "{{ template "podinfo.fullname" . }}.{{ .Release.Namespace }}:{{ .Values.service.externalPort }}"
restartPolicy: Never
containers:
- name: tools
image: giantswarm/tiny-tools
command:
- sh
- -c
- |
TOKEN=$(curl -sd 'test' ${PODINFO_SVC}/token | jq -r .token) &&
curl -sH "Authorization: Bearer ${TOKEN}" ${PODINFO_SVC}/token/validate | grep test
env:
- name: PODINFO_SVC
value: "{{ template "podinfo.fullname" . }}.{{ .Release.Namespace }}:{{ .Values.service.externalPort }}"
restartPolicy: Never

View File

@@ -1,5 +1,5 @@
apiVersion: batch/v1
kind: Job
apiVersion: v1
kind: Pod
metadata:
name: {{ template "podinfo.fullname" . }}-service-test-{{ randAlphaNum 5 | lower }}
labels:
@@ -14,17 +14,15 @@ metadata:
linkerd.io/inject: disabled
appmesh.k8s.aws/sidecarInjectorWebhook: disabled
spec:
template:
spec:
containers:
- name: curl
image: curlimages/curl:7.69.0
command:
- sh
- -c
- |
curl -s ${PODINFO_SVC}/api/info | grep version
env:
- name: PODINFO_SVC
value: "{{ template "podinfo.fullname" . }}.{{ .Release.Namespace }}:{{ .Values.service.externalPort }}"
restartPolicy: Never
containers:
- name: curl
image: curlimages/curl:7.69.0
command:
- sh
- -c
- |
curl -s ${PODINFO_SVC}/api/info | grep version
env:
- name: PODINFO_SVC
value: "{{ template "podinfo.fullname" . }}.{{ .Release.Namespace }}:{{ .Values.service.externalPort }}"
restartPolicy: Never

View File

@@ -1,6 +1,6 @@
{{- if .Values.faults.testTimeout }}
apiVersion: batch/v1
kind: Job
apiVersion: v1
kind: Pod
metadata:
name: {{ template "podinfo.fullname" . }}-fault-test-{{ randAlphaNum 5 | lower }}
labels:
@@ -15,12 +15,10 @@ metadata:
linkerd.io/inject: disabled
appmesh.k8s.aws/sidecarInjectorWebhook: disabled
spec:
template:
spec:
containers:
- name: fault
image: alpine:3.11
command: ['/bin/sh']
args: ['-c', 'while sleep 3600; do :; done']
restartPolicy: Never
containers:
- name: fault
image: alpine:3.11
command: ['/bin/sh']
args: ['-c', 'while sleep 3600; do :; done']
restartPolicy: Never
{{- end }}

View File

@@ -1,45 +1,76 @@
package api
import (
"github.com/gomodule/redigo/redis"
"github.com/gorilla/mux"
"go.uber.org/zap"
"io/ioutil"
"net/http"
"time"
"github.com/gomodule/redigo/redis"
"github.com/gorilla/mux"
"go.uber.org/zap"
"github.com/stefanprodan/podinfo/pkg/version"
)
// Cache godoc
// @Summary Save payload in cache
// @Description writes the posted content in cache and returns the SHA1 hash of the content
// @Description writes the posted content in cache
// @Tags HTTP API
// @Accept json
// @Produce json
// @Router /cache [post]
// @Success 200 {object} api.MapResponse
// @Router /cache/{key} [post]
// @Success 202
func (s *Server) cacheWriteHandler(w http.ResponseWriter, r *http.Request) {
if s.pool == nil {
s.ErrorResponse(w, r, "cache server is offline", http.StatusBadRequest)
return
}
key := mux.Vars(r)["key"]
body, err := ioutil.ReadAll(r.Body)
if err != nil {
s.ErrorResponse(w, r, "reading the request body failed", http.StatusBadRequest)
return
}
hash := hash(string(body))
conn := s.pool.Get()
defer conn.Close()
_, err = conn.Do("SET", hash, string(body))
_, err = conn.Do("SET", key, string(body))
if err != nil {
s.logger.Warn("cache set failed", zap.Error(err))
s.ErrorResponse(w, r, "cache set failed", http.StatusInternalServerError)
return
}
s.JSONResponseCode(w, r, map[string]string{"hash": hash}, http.StatusAccepted)
w.WriteHeader(http.StatusAccepted)
}
// Cache godoc
// @Summary Delete payload from cache
// @Description deletes the key and its value from cache
// @Tags HTTP API
// @Accept json
// @Produce json
// @Router /cache/{key} [delete]
// @Success 202
func (s *Server) cacheDeleteHandler(w http.ResponseWriter, r *http.Request) {
if s.pool == nil {
s.ErrorResponse(w, r, "cache server is offline", http.StatusBadRequest)
return
}
key := mux.Vars(r)["key"]
conn := s.pool.Get()
defer conn.Close()
_, err := conn.Do("DEL", key)
if err != nil {
s.logger.Warn("cache delete failed", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusAccepted)
}
// Cache godoc
@@ -48,46 +79,72 @@ func (s *Server) cacheWriteHandler(w http.ResponseWriter, r *http.Request) {
// @Tags HTTP API
// @Accept json
// @Produce json
// @Router /cache/{hash} [get]
// @Success 200 {string} api.MapResponse
// @Router /cache/{key} [get]
// @Success 200 {string} string value
func (s *Server) cacheReadHandler(w http.ResponseWriter, r *http.Request) {
if s.pool == nil {
s.ErrorResponse(w, r, "cache server is offline", http.StatusBadRequest)
return
}
hash := mux.Vars(r)["hash"]
key := mux.Vars(r)["key"]
conn := s.pool.Get()
defer conn.Close()
ok, err := redis.Bool(conn.Do("EXISTS", hash))
ok, err := redis.Bool(conn.Do("EXISTS", key))
if err != nil || !ok {
s.ErrorResponse(w, r, "key not found in cache", http.StatusNotFound)
s.logger.Warn("cache key not found", zap.String("key", key))
w.WriteHeader(http.StatusNotFound)
return
}
data, err := redis.String(conn.Do("GET", hash))
data, err := redis.String(conn.Do("GET", key))
if err != nil {
s.logger.Warn("cache get failed", zap.Error(err))
s.ErrorResponse(w, r, "cache get failed", http.StatusInternalServerError)
w.WriteHeader(http.StatusInternalServerError)
return
}
s.JSONResponseCode(w, r, map[string]string{"data": data}, http.StatusAccepted)
w.WriteHeader(http.StatusOK)
w.Write([]byte(data))
}
func (s *Server) startCachePool() {
if s.config.CacheServer != "" {
s.pool = &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", s.config.CacheServer)
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
func (s *Server) startCachePool(ticker *time.Ticker, stopCh <-chan struct{}) {
if s.config.CacheServer == "" {
return
}
s.pool = &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", s.config.CacheServer)
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
// set <hostname>=<version> with an expiry time of one minute
setVersion := func() {
conn := s.pool.Get()
if _, err := conn.Do("SET", s.config.Hostname, version.VERSION, "EX", 60); err != nil {
s.logger.Warn("cache server is offline", zap.Error(err), zap.String("server", s.config.CacheServer))
}
_ = conn.Close()
}
// set version on a schedule
go func() {
setVersion()
for {
select {
case <-stopCh:
return
case <-ticker.C:
setVersion()
}
}
}()
}

View File

@@ -1,6 +1,6 @@
// GENERATED BY THE COMMAND ABOVE; DO NOT EDIT
// This file was generated by swaggo/swag at
// 2020-05-16 09:49:23.920068 +0300 EEST m=+0.052088436
// 2020-05-20 12:48:10.564627 +0300 EEST m=+0.030136350
package docs
@@ -98,30 +98,7 @@ var doc = `{
}
}
},
"/cache": {
"post": {
"description": "writes the posted content in cache and returns the SHA1 hash of the content",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"HTTP API"
],
"summary": "Save payload in cache",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.MapResponse"
}
}
}
}
},
"/cache/{hash}": {
"/cache/{key}": {
"get": {
"description": "returns the content from cache if key exists",
"consumes": [
@@ -142,6 +119,38 @@ var doc = `{
}
}
}
},
"post": {
"description": "writes the posted content in cache",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"HTTP API"
],
"summary": "Save payload in cache",
"responses": {
"202": {}
}
},
"delete": {
"description": "deletes the key and its value from cache",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"HTTP API"
],
"summary": "Delete payload from cache",
"responses": {
"202": {}
}
}
},
"/chunked/{seconds}": {

View File

@@ -86,30 +86,7 @@
}
}
},
"/cache": {
"post": {
"description": "writes the posted content in cache and returns the SHA1 hash of the content",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"HTTP API"
],
"summary": "Save payload in cache",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.MapResponse"
}
}
}
}
},
"/cache/{hash}": {
"/cache/{key}": {
"get": {
"description": "returns the content from cache if key exists",
"consumes": [
@@ -130,6 +107,38 @@
}
}
}
},
"post": {
"description": "writes the posted content in cache",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"HTTP API"
],
"summary": "Save payload in cache",
"responses": {
"202": {}
}
},
"delete": {
"description": "deletes the key and its value from cache",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"HTTP API"
],
"summary": "Delete payload from cache",
"responses": {
"202": {}
}
}
},
"/chunked/{seconds}": {

View File

@@ -102,23 +102,18 @@ paths:
summary: Runtime information
tags:
- HTTP API
/cache:
post:
/cache/{key}:
delete:
consumes:
- application/json
description: writes the posted content in cache and returns the SHA1 hash of
the content
description: deletes the key and its value from cache
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/api.MapResponse'
summary: Save payload in cache
"202": {}
summary: Delete payload from cache
tags:
- HTTP API
/cache/{hash}:
get:
consumes:
- application/json
@@ -133,6 +128,17 @@ paths:
summary: Get payload from cache
tags:
- HTTP API
post:
consumes:
- application/json
description: writes the posted content in cache
produces:
- application/json
responses:
"202": {}
summary: Save payload in cache
tags:
- HTTP API
/chunked/{seconds}:
get:
consumes:

View File

@@ -43,11 +43,6 @@ var (
watcher *fscache.Watcher
)
type FluxConfig struct {
GitUrl string `mapstructure:"git-url"`
GitBranch string `mapstructure:"git-branch"`
}
type Config struct {
HttpClientTimeout time.Duration `mapstructure:"http-client-timeout"`
HttpServerTimeout time.Duration `mapstructure:"http-server-timeout"`
@@ -106,8 +101,9 @@ func (s *Server) registerHandlers() {
s.router.HandleFunc("/status/{code:[0-9]+}", s.statusHandler).Methods("GET", "POST", "PUT").Name("status")
s.router.HandleFunc("/store", s.storeWriteHandler).Methods("POST", "PUT")
s.router.HandleFunc("/store/{hash}", s.storeReadHandler).Methods("GET").Name("store")
s.router.HandleFunc("/cache", s.cacheWriteHandler).Methods("POST", "PUT")
s.router.HandleFunc("/cache/{hash}", s.cacheReadHandler).Methods("GET").Name("cache")
s.router.HandleFunc("/cache/{key}", s.cacheWriteHandler).Methods("POST", "PUT")
s.router.HandleFunc("/cache/{key}", s.cacheDeleteHandler).Methods("DELETE")
s.router.HandleFunc("/cache/{key}", s.cacheReadHandler).Methods("GET").Name("cache")
s.router.HandleFunc("/configs", s.configReadHandler).Methods("GET")
s.router.HandleFunc("/token", s.tokenGenerateHandler).Methods("POST")
s.router.HandleFunc("/token/validate", s.tokenValidateHandler).Methods("GET")
@@ -180,25 +176,8 @@ func (s *Server) ListenAndServe(stopCh <-chan struct{}) {
}
// start redis connection pool
s.startCachePool()
if s.pool != nil {
ticker := time.NewTicker(30 * time.Second)
go func() {
for {
select {
case <-stopCh:
return
case <-ticker.C:
conn := s.pool.Get()
_, err := redis.String(conn.Do("PING"))
if err != nil {
s.logger.Warn("cache server is offline", zap.Error(err), zap.String("server", s.config.CacheServer))
}
_ = conn.Close()
}
}
}()
}
ticker := time.NewTicker(30 * time.Second)
s.startCachePool(ticker, stopCh)
// run server in background
go func() {