Files
weave-scope/probe/appclient/multi_client.go
Marc Carré d46c2266ce Change Sirupsen/logrus to sirupsen/logrus
```
$ git grep -l Sirupsen | grep -v vendor | xargs sed -i '' 's:github.com/Sirupsen/logrus:github.com/sirupsen/logrus:g'
$ gofmt -s -w app
$ gofmt -s -w common
$ gofmt -s -w probe
$ gofmt -s -w prog
$ gofmt -s -w tools
```
2018-07-23 20:10:14 +02:00

191 lines
4.5 KiB
Go

package appclient
import (
"bytes"
"errors"
"fmt"
"net/url"
"strings"
"sync"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/report"
)
const maxConcurrentGET = 10
// ClientFactory is a thing thats makes AppClients
type ClientFactory func(string, url.URL) (AppClient, error)
type multiClient struct {
clientFactory ClientFactory
mtx sync.Mutex
sema semaphore
clients map[string]AppClient // holds map from app id -> client
ids map[string]report.IDList // holds map from hostname -> app ids
quit chan struct{}
noControls bool
}
type clientTuple struct {
xfer.Details
AppClient
}
// MultiAppClient maintains a set of upstream apps, and ensures we have an
// AppClient for each one.
type MultiAppClient interface {
Set(hostname string, urls []url.URL)
PipeConnection(appID, pipeID string, pipe xfer.Pipe) error
PipeClose(appID, pipeID string) error
Stop()
Publish(r report.Report) error
}
// NewMultiAppClient creates a new MultiAppClient.
func NewMultiAppClient(clientFactory ClientFactory, noControls bool) MultiAppClient {
return &multiClient{
clientFactory: clientFactory,
sema: newSemaphore(maxConcurrentGET),
clients: map[string]AppClient{},
ids: map[string]report.IDList{},
quit: make(chan struct{}),
noControls: noControls,
}
}
// Set the list of endpoints for the given hostname.
func (c *multiClient) Set(hostname string, urls []url.URL) {
wg := sync.WaitGroup{}
wg.Add(len(urls))
clients := make(chan clientTuple, len(urls))
for _, u := range urls {
go func(u url.URL) {
c.sema.acquire()
defer c.sema.release()
defer wg.Done()
client, err := c.clientFactory(hostname, u)
if err != nil {
log.Errorf("Error creating new app client: %v", err)
return
}
details, err := client.Details()
if err != nil {
log.Errorf("Error fetching app details: %v", err)
return
}
clients <- clientTuple{details, client}
}(u)
}
wg.Wait()
close(clients)
c.mtx.Lock()
defer c.mtx.Unlock()
// Start any new apps, and replace the list of app ids for this hostname
hostIDs := report.MakeIDList()
for tuple := range clients {
hostIDs = hostIDs.Add(tuple.ID)
if client, ok := c.clients[tuple.ID]; ok {
client.ReTarget(tuple.AppClient.Target())
} else {
c.clients[tuple.ID] = tuple.AppClient
if !c.noControls {
tuple.AppClient.ControlConnection()
}
}
}
c.ids[hostname] = hostIDs
// Remove apps that are no longer referenced (by id) from any hostname
allReferencedIDs := report.MakeIDList()
for _, ids := range c.ids {
allReferencedIDs = allReferencedIDs.Add(ids...)
}
for id, client := range c.clients {
if !allReferencedIDs.Contains(id) {
client.Stop()
delete(c.clients, id)
}
}
}
func (c *multiClient) withClient(appID string, f func(AppClient) error) error {
c.mtx.Lock()
client, ok := c.clients[appID]
c.mtx.Unlock()
if !ok {
return fmt.Errorf("No such app id: %s", appID)
}
return f(client)
}
func (c *multiClient) PipeConnection(appID, pipeID string, pipe xfer.Pipe) error {
return c.withClient(appID, func(client AppClient) error {
client.PipeConnection(pipeID, pipe)
return nil
})
}
func (c *multiClient) PipeClose(appID, pipeID string) error {
return c.withClient(appID, func(client AppClient) error {
return client.PipeClose(pipeID)
})
}
// Stop the MultiAppClient.
func (c *multiClient) Stop() {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, c := range c.clients {
c.Stop()
}
c.clients = map[string]AppClient{}
close(c.quit)
}
// Publish implements Publisher by publishing the reader to all of the
// underlying publishers sequentially. To do that, it needs to drain the
// reader, and recreate new readers for each publisher. Note that it will
// publish to one endpoint for each unique ID. Failed publishes don't count.
func (c *multiClient) Publish(r report.Report) error {
c.mtx.Lock()
defer c.mtx.Unlock()
buf, err := r.WriteBinary()
if err != nil {
return err
}
errs := []string{}
for _, c := range c.clients {
if err := c.Publish(bytes.NewReader(buf.Bytes()), r.Shortcut); err != nil {
errs = append(errs, err.Error())
}
}
if len(errs) > 0 {
return errors.New(strings.Join(errs, "; "))
}
return nil
}
type semaphore chan struct{}
func newSemaphore(n int) semaphore {
c := make(chan struct{}, n)
for i := 0; i < n; i++ {
c <- struct{}{}
}
return semaphore(c)
}
func (s semaphore) acquire() { <-s }
func (s semaphore) release() { s <- struct{}{} }