Merge pull request #2852 from weaveworks/s3-connections

AWS connection keep-alive
This commit is contained in:
Bryan Boreham
2017-09-18 10:29:54 +01:00
committed by GitHub
10 changed files with 246 additions and 33 deletions

View File

@@ -13,13 +13,12 @@ import (
"time"
log "github.com/Sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/tylerb/graceful"
billing "github.com/weaveworks/billing-client"
"github.com/weaveworks/common/aws"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/network"
"github.com/weaveworks/go-checkpoint"
@@ -76,21 +75,6 @@ func router(collector app.Collector, controlRouter app.ControlRouter, pipeRouter
return instrument.Wrap(router)
}
func awsConfigFromURL(url *url.URL) (*aws.Config, error) {
if url.User == nil {
return nil, fmt.Errorf("Must specify username & password in URL")
}
password, _ := url.User.Password()
creds := credentials.NewStaticCredentials(url.User.Username(), password, "")
config := aws.NewConfig().WithCredentials(creds)
if strings.Contains(url.Host, ".") {
config = config.WithEndpoint(fmt.Sprintf("http://%s", url.Host)).WithRegion("dummy")
} else {
config = config.WithRegion(url.Host)
}
return config, nil
}
func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHostname string,
memcacheConfig multitenant.MemcacheConfig, window time.Duration, createTables bool) (app.Collector, error) {
if collectorURL == "local" {
@@ -110,11 +94,11 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo
if err != nil {
return nil, fmt.Errorf("Valid URL for s3 required: %v", err)
}
dynamoDBConfig, err := awsConfigFromURL(parsed)
dynamoDBConfig, err := aws.ConfigFromURL(parsed)
if err != nil {
return nil, err
}
s3Config, err := awsConfigFromURL(s3)
s3Config, err := aws.ConfigFromURL(s3)
if err != nil {
return nil, err
}
@@ -175,7 +159,7 @@ func controlRouterFactory(userIDer multitenant.UserIDer, controlRouterURL string
if parsed.Scheme == "sqs" {
prefix := strings.TrimPrefix(parsed.Path, "/")
sqsConfig, err := awsConfigFromURL(parsed)
sqsConfig, err := aws.ConfigFromURL(parsed)
if err != nil {
return nil, err
}

53
vendor/github.com/weaveworks/common/aws/config.go generated vendored Normal file
View File

@@ -0,0 +1,53 @@
package aws
import (
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
)
// ConfigFromURL returns AWS config from given URL. It expects escaped
// AWS Access key ID & Secret Access Key to be encoded in the URL. It
// also expects region specified as a host (letting AWS generate full
// endpoint) or fully valid endpoint with dummy region assumed (e.g
// for URLs to emulated services).
func ConfigFromURL(awsURL *url.URL) (*aws.Config, error) {
if awsURL.User == nil {
return nil, fmt.Errorf("must specify escaped Access Key & Secret Access in URL")
}
password, _ := awsURL.User.Password()
creds := credentials.NewStaticCredentials(awsURL.User.Username(), password, "")
config := aws.NewConfig().
WithCredentials(creds).
// Use a custom http.Client with the golang defaults but also specifying
// MaxIdleConnsPerHost because of a bug in golang https://github.com/golang/go/issues/13801
// where MaxIdleConnsPerHost does not work as expected.
WithHTTPClient(&http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
MaxIdleConnsPerHost: 100,
TLSHandshakeTimeout: 3 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
})
if strings.Contains(awsURL.Host, ".") {
return config.WithEndpoint(fmt.Sprintf("http://%s", awsURL.Host)).WithRegion("dummy"), nil
}
// Let AWS generate default endpoint based on region passed as a host in URL.
return config.WithRegion(awsURL.Host), nil
}

View File

@@ -0,0 +1,35 @@
package client
import (
"context"
"fmt"
"net/http"
"strconv"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"
oldcontext "golang.org/x/net/context"
)
// Requester executes an HTTP request.
type Requester interface {
Do(req *http.Request) (*http.Response, error)
}
// TimeRequestHistogram performs an HTTP client request and records the duration in a histogram
func TimeRequestHistogram(ctx context.Context, operation string, metric *prometheus.HistogramVec, client Requester, request *http.Request) (*http.Response, error) {
var response *http.Response
doRequest := func(_ oldcontext.Context) error {
var err error
response, err = client.Do(request)
return err
}
toStatusCode := func(err error) string {
if err == nil {
return strconv.Itoa(response.StatusCode)
}
return "error"
}
err := instrument.TimeRequestHistogramStatus(ctx, fmt.Sprintf("%s %s", request.Method, operation), metric, toStatusCode, doRequest)
return response, err
}

1
vendor/github.com/weaveworks/common/http/http.go generated vendored Normal file
View File

@@ -0,0 +1 @@
package http

View File

@@ -5,6 +5,8 @@ import (
"fmt"
"os"
"strings"
"sync"
"time"
"golang.org/x/net/context"
@@ -12,6 +14,10 @@ import (
"github.com/weaveworks/common/user"
)
const (
defaultDedupeInterval = time.Minute
)
// Setup configures logging output to stderr, sets the log level and sets the formatter.
func Setup(logLevel string) error {
log.SetOutput(os.Stderr)
@@ -24,6 +30,39 @@ func Setup(logLevel string) error {
return nil
}
// SetupDeduplication should be performed after any other logging setup.
// For all logs less severe or equal to the given log level (but still higher than the logger's configured log level),
// these logs will be 'deduplicated'. What this means is that, excluding certain special fields like time, multiple
// identical log entries will be grouped up and a summary message emitted.
// For example, instead of:
// 00:00:00 INFO User 123 did xyz
// 00:00:10 INFO User 123 did xyz
// 00:00:25 INFO User 123 did xyz
// 00:00:55 INFO User 123 did xyz
// you would get:
// 00:00:00 INFO User 123 did xyz
// 00:01:00 INFO Repeated 3 times: User 123 did xyz
// The interval argument controls how long to wait for additional messages to arrive before reporting.
// Increase it to deduplicate more aggressively, decrease it to lower latency from a log occurring to it appearing.
// Set it to 0 to pick a sensible default value (recommended).
// NOTE: For simplicity and efficiency, fields are considered 'equal' if and only if their string representations (%v) are equal.
func SetupDeduplication(logLevel string, interval time.Duration) error {
dedupeLevel, err := log.ParseLevel(logLevel)
if err != nil {
return fmt.Errorf("Error parsing log level: %v", err)
}
if interval <= 0 {
interval = defaultDedupeInterval
}
// We use a special Formatter to either format the log using the original formatter, or to return ""
// so nothing will be written for that event. The repeated entries are later logged along with a field flag
// that tells the formatter to ignore the message.
stdLogger := log.StandardLogger()
stdLogger.Formatter = newDedupeFormatter(stdLogger.Formatter, dedupeLevel, interval)
return nil
}
type textFormatter struct{}
// Based off logrus.TextFormatter, which behaves completely
@@ -35,9 +74,7 @@ func (f *textFormatter) Format(entry *log.Entry) ([]byte, error) {
timeStamp := entry.Time.Format("2006/01/02 15:04:05.000000")
if len(entry.Data) > 0 {
fmt.Fprintf(b, "%s: %s %-44s ", levelText, timeStamp, entry.Message)
for k, v := range entry.Data {
fmt.Fprintf(b, " %s=%v", k, v)
}
b.WriteString(fieldsToString(entry.Data))
} else {
// No padding when there's no fields
fmt.Fprintf(b, "%s: %s %s", levelText, timeStamp, entry.Message)
@@ -55,3 +92,93 @@ func (f *textFormatter) Format(entry *log.Entry) ([]byte, error) {
func With(ctx context.Context) *log.Entry {
return log.WithFields(user.LogFields(ctx))
}
type entryCount struct {
entry log.Entry
count int
}
type dedupeFormatter struct {
innerFormatter log.Formatter
level log.Level
interval time.Duration
seen map[string]entryCount
lock sync.Mutex
}
func newDedupeFormatter(innerFormatter log.Formatter, level log.Level, interval time.Duration) *dedupeFormatter {
return &dedupeFormatter{
innerFormatter: innerFormatter,
level: level,
interval: interval,
seen: map[string]entryCount{},
}
}
func (f *dedupeFormatter) Format(entry *log.Entry) ([]byte, error) {
if f.shouldLog(entry) {
b, err := f.innerFormatter.Format(entry)
return b, err
}
return []byte{}, nil
}
func (f *dedupeFormatter) shouldLog(entry *log.Entry) bool {
if _, ok := entry.Data["deduplicated"]; ok {
// ignore our own logs about deduped messages
return true
}
if entry.Level < f.level {
// ignore logs more severe than our level
return true
}
key := fmt.Sprintf("%s %s", entry.Message, fieldsToString(entry.Data))
f.lock.Lock()
defer f.lock.Unlock()
if ec, ok := f.seen[key]; ok {
// already seen, increment count and do not log
ec.count++
f.seen[key] = ec
return false
}
// New message, log it but add it to seen.
// We need to copy because the pointer ceases to be valid after we return from Format
f.seen[key] = entryCount{entry: *entry}
go f.evictEntry(key) // queue to evict later
return true
}
// Wait for interval seconds then evict the entry and send the log
func (f *dedupeFormatter) evictEntry(key string) {
time.Sleep(f.interval)
var ec entryCount
func() {
f.lock.Lock()
defer f.lock.Unlock()
ec = f.seen[key]
delete(f.seen, key)
}()
if ec.count == 0 {
return
}
entry := log.WithFields(ec.entry.Data).WithField("deduplicated", ec.count)
message := fmt.Sprintf("Repeated %d times: %s", ec.count, ec.entry.Message)
// There's no way to choose the log level dynamically, so we have to do this hack
map[log.Level]func(args ...interface{}){
log.PanicLevel: entry.Panic,
log.FatalLevel: entry.Fatal,
log.ErrorLevel: entry.Error,
log.WarnLevel: entry.Warn,
log.InfoLevel: entry.Info,
log.DebugLevel: entry.Debug,
}[ec.entry.Level](message)
}
func fieldsToString(data log.Fields) string {
parts := make([]string, 0, len(data))
// traversal order here is arbitrary but stable, which is fine for our purposes
for k, v := range data {
parts = append(parts, fmt.Sprintf("%s=%v", k, v))
}
return strings.Join(parts, " ")
}

View File

@@ -6,6 +6,8 @@ import (
log "github.com/Sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/weaveworks/common/logging"
)
const gRPC = "gRPC"
@@ -14,10 +16,11 @@ const gRPC = "gRPC"
var ServerLoggingInterceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
begin := time.Now()
resp, err := handler(ctx, req)
entry := logging.With(ctx).WithFields(log.Fields{"method": info.FullMethod, "duration": time.Since(begin)})
if err != nil {
log.Warnf("%s %s (%v) %s", gRPC, info.FullMethod, err, time.Since(begin))
entry.WithError(err).Warn(gRPC)
} else {
log.Debugf("%s %s (success) %s", gRPC, info.FullMethod, time.Since(begin))
entry.Debugf("%s (success)", gRPC)
}
return resp, err
}

View File

@@ -40,6 +40,8 @@ type Config struct {
HTTPListenPort int
GRPCListenPort int
RegisterInstrumentation bool
ServerGracefulShutdownTimeout time.Duration
HTTPServerReadTimeout time.Duration
HTTPServerWriteTimeout time.Duration
@@ -53,6 +55,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.HTTPListenPort, "server.http-listen-port", 80, "HTTP server listen port.")
f.IntVar(&cfg.GRPCListenPort, "server.grpc-listen-port", 9095, "gRPC server listen port.")
f.BoolVar(&cfg.RegisterInstrumentation, "server.register-instrumentation", true, "Register the intrumentation handlers (/metrics etc).")
f.DurationVar(&cfg.ServerGracefulShutdownTimeout, "server.graceful-shutdown-timeout", 5*time.Second, "Timeout for graceful shutdowns")
f.DurationVar(&cfg.HTTPServerReadTimeout, "server.http-read-timeout", 5*time.Second, "Read timeout for HTTP server")
f.DurationVar(&cfg.HTTPServerWriteTimeout, "server.http-write-timeout", 5*time.Second, "Write timeout for HTTP server")
@@ -111,9 +114,9 @@ func New(cfg Config) (*Server, error) {
// Setup HTTP server
router := mux.NewRouter()
router.Handle("/metrics", prometheus.Handler())
router.Handle("/traces", loki.Handler())
router.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux)
if cfg.RegisterInstrumentation {
RegisterInstrumentation(router)
}
httpMiddleware := []middleware.Interface{
middleware.Log{},
middleware.Instrument{
@@ -144,6 +147,13 @@ func New(cfg Config) (*Server, error) {
}, nil
}
// RegisterInstrumentation on the given router.
func RegisterInstrumentation(router *mux.Router) {
router.Handle("/metrics", prometheus.Handler())
router.Handle("/traces", loki.Handler())
router.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux)
}
// Run the server; blocks until SIGTERM is received.
func (s *Server) Run() {
go s.httpServer.Serve(s.httpListener)

View File

@@ -35,8 +35,8 @@ func ExtractOrgID(ctx context.Context) (string, error) {
}
// InjectOrgID returns a derived context containing the org ID.
func InjectOrgID(ctx context.Context, userID string) context.Context {
return context.WithValue(ctx, interface{}(orgIDContextKey), userID)
func InjectOrgID(ctx context.Context, orgID string) context.Context {
return context.WithValue(ctx, interface{}(orgIDContextKey), orgID)
}
// ExtractUserID gets the user ID from the context.

View File

@@ -10,11 +10,11 @@ import (
func LogFields(ctx context.Context) log.Fields {
fields := log.Fields{}
userID, err := ExtractUserID(ctx)
if err != nil {
if err == nil {
fields["userID"] = userID
}
orgID, err := ExtractOrgID(ctx)
if err != nil {
if err == nil {
fields["orgID"] = orgID
}
return fields

2
vendor/manifest vendored
View File

@@ -980,7 +980,7 @@
"importpath": "github.com/weaveworks/common",
"repository": "https://github.com/weaveworks/common",
"vcs": "git",
"revision": "493a1f760f47ed3b50afd5baabb36589d96017b8",
"revision": "b811bc96d43d51edbae6693e7d1b0a367114595b",
"branch": "master",
"notests": true
},