mirror of
https://github.com/weaveworks/scope.git
synced 2026-02-14 18:09:59 +00:00
``` $ git grep -l Sirupsen | grep -v vendor | xargs sed -i '' 's:github.com/Sirupsen/logrus:github.com/sirupsen/logrus:g' $ gofmt -s -w app $ gofmt -s -w common $ gofmt -s -w probe $ gofmt -s -w prog $ gofmt -s -w tools ```
191 lines
4.5 KiB
Go
191 lines
4.5 KiB
Go
package appclient
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"github.com/weaveworks/scope/common/xfer"
|
|
"github.com/weaveworks/scope/report"
|
|
)
|
|
|
|
const maxConcurrentGET = 10
|
|
|
|
// ClientFactory is a thing thats makes AppClients
|
|
type ClientFactory func(string, url.URL) (AppClient, error)
|
|
|
|
type multiClient struct {
|
|
clientFactory ClientFactory
|
|
|
|
mtx sync.Mutex
|
|
sema semaphore
|
|
clients map[string]AppClient // holds map from app id -> client
|
|
ids map[string]report.IDList // holds map from hostname -> app ids
|
|
quit chan struct{}
|
|
noControls bool
|
|
}
|
|
|
|
type clientTuple struct {
|
|
xfer.Details
|
|
AppClient
|
|
}
|
|
|
|
// MultiAppClient maintains a set of upstream apps, and ensures we have an
|
|
// AppClient for each one.
|
|
type MultiAppClient interface {
|
|
Set(hostname string, urls []url.URL)
|
|
PipeConnection(appID, pipeID string, pipe xfer.Pipe) error
|
|
PipeClose(appID, pipeID string) error
|
|
Stop()
|
|
Publish(r report.Report) error
|
|
}
|
|
|
|
// NewMultiAppClient creates a new MultiAppClient.
|
|
func NewMultiAppClient(clientFactory ClientFactory, noControls bool) MultiAppClient {
|
|
return &multiClient{
|
|
clientFactory: clientFactory,
|
|
|
|
sema: newSemaphore(maxConcurrentGET),
|
|
clients: map[string]AppClient{},
|
|
ids: map[string]report.IDList{},
|
|
quit: make(chan struct{}),
|
|
noControls: noControls,
|
|
}
|
|
}
|
|
|
|
// Set the list of endpoints for the given hostname.
|
|
func (c *multiClient) Set(hostname string, urls []url.URL) {
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(len(urls))
|
|
clients := make(chan clientTuple, len(urls))
|
|
for _, u := range urls {
|
|
go func(u url.URL) {
|
|
c.sema.acquire()
|
|
defer c.sema.release()
|
|
defer wg.Done()
|
|
|
|
client, err := c.clientFactory(hostname, u)
|
|
if err != nil {
|
|
log.Errorf("Error creating new app client: %v", err)
|
|
return
|
|
}
|
|
|
|
details, err := client.Details()
|
|
if err != nil {
|
|
log.Errorf("Error fetching app details: %v", err)
|
|
return
|
|
}
|
|
|
|
clients <- clientTuple{details, client}
|
|
}(u)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(clients)
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
// Start any new apps, and replace the list of app ids for this hostname
|
|
hostIDs := report.MakeIDList()
|
|
for tuple := range clients {
|
|
hostIDs = hostIDs.Add(tuple.ID)
|
|
if client, ok := c.clients[tuple.ID]; ok {
|
|
client.ReTarget(tuple.AppClient.Target())
|
|
} else {
|
|
c.clients[tuple.ID] = tuple.AppClient
|
|
if !c.noControls {
|
|
tuple.AppClient.ControlConnection()
|
|
}
|
|
}
|
|
}
|
|
c.ids[hostname] = hostIDs
|
|
|
|
// Remove apps that are no longer referenced (by id) from any hostname
|
|
allReferencedIDs := report.MakeIDList()
|
|
for _, ids := range c.ids {
|
|
allReferencedIDs = allReferencedIDs.Add(ids...)
|
|
}
|
|
for id, client := range c.clients {
|
|
if !allReferencedIDs.Contains(id) {
|
|
client.Stop()
|
|
delete(c.clients, id)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *multiClient) withClient(appID string, f func(AppClient) error) error {
|
|
c.mtx.Lock()
|
|
client, ok := c.clients[appID]
|
|
c.mtx.Unlock()
|
|
if !ok {
|
|
return fmt.Errorf("No such app id: %s", appID)
|
|
}
|
|
return f(client)
|
|
}
|
|
|
|
func (c *multiClient) PipeConnection(appID, pipeID string, pipe xfer.Pipe) error {
|
|
return c.withClient(appID, func(client AppClient) error {
|
|
client.PipeConnection(pipeID, pipe)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (c *multiClient) PipeClose(appID, pipeID string) error {
|
|
return c.withClient(appID, func(client AppClient) error {
|
|
return client.PipeClose(pipeID)
|
|
})
|
|
}
|
|
|
|
// Stop the MultiAppClient.
|
|
func (c *multiClient) Stop() {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
for _, c := range c.clients {
|
|
c.Stop()
|
|
}
|
|
c.clients = map[string]AppClient{}
|
|
close(c.quit)
|
|
}
|
|
|
|
// Publish implements Publisher by publishing the reader to all of the
|
|
// underlying publishers sequentially. To do that, it needs to drain the
|
|
// reader, and recreate new readers for each publisher. Note that it will
|
|
// publish to one endpoint for each unique ID. Failed publishes don't count.
|
|
func (c *multiClient) Publish(r report.Report) error {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
buf, err := r.WriteBinary()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
errs := []string{}
|
|
for _, c := range c.clients {
|
|
if err := c.Publish(bytes.NewReader(buf.Bytes()), r.Shortcut); err != nil {
|
|
errs = append(errs, err.Error())
|
|
}
|
|
}
|
|
if len(errs) > 0 {
|
|
return errors.New(strings.Join(errs, "; "))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type semaphore chan struct{}
|
|
|
|
func newSemaphore(n int) semaphore {
|
|
c := make(chan struct{}, n)
|
|
for i := 0; i < n; i++ {
|
|
c <- struct{}{}
|
|
}
|
|
return semaphore(c)
|
|
}
|
|
func (s semaphore) acquire() { <-s }
|
|
func (s semaphore) release() { s <- struct{}{} }
|