mirror of
https://github.com/prymitive/karma
synced 2026-05-05 03:16:51 +00:00
Refactor transport package
With this change we'll initialize Transport object for each Alertmanager and just call Read() on it when we need to use this transport to read from upstream Alertmanager
This commit is contained in:
@@ -2,14 +2,16 @@ package transport
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type fileReader struct {
|
||||
filename string
|
||||
fd *os.File
|
||||
fd *os.File
|
||||
}
|
||||
|
||||
func (fr *fileReader) Read(b []byte) (n int, err error) {
|
||||
@@ -20,9 +22,37 @@ func (fr *fileReader) Close() error {
|
||||
return fr.fd.Close()
|
||||
}
|
||||
|
||||
func newFileReader(filname string) (io.ReadCloser, error) {
|
||||
log.Infof("Reading file '%s'", filname)
|
||||
fd, err := os.Open(filname)
|
||||
fr := fileReader{filename: filname, fd: fd}
|
||||
// FileTransport can read data from file:// URIs
|
||||
type FileTransport struct {
|
||||
}
|
||||
|
||||
func (t *FileTransport) pathFromURI(uri string) (string, error) {
|
||||
u, err := url.Parse(uri)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// if we have a file URI with relative path we need to expand it into an
|
||||
// absolute path, url.Parse doesn't support relative file paths
|
||||
if strings.HasPrefix(uri, "file:///") {
|
||||
return u.Path, nil
|
||||
}
|
||||
wd, err := os.Getwd()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
absolutePath := path.Join(wd, strings.TrimPrefix(uri, "file://"))
|
||||
return absolutePath, nil
|
||||
}
|
||||
|
||||
func (t *FileTransport) Read(uri string) (io.ReadCloser, error) {
|
||||
filename, err := t.pathFromURI(uri)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Infof("Reading file '%s'", filename)
|
||||
fd, err := os.Open(filename)
|
||||
fr := fileReader{fd: fd}
|
||||
return &fr, err
|
||||
}
|
||||
|
||||
@@ -5,37 +5,31 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type httpReader struct {
|
||||
URL string
|
||||
Timeout time.Duration
|
||||
// HTTPTransport can read data from http:// and https:// URIs
|
||||
type HTTPTransport struct {
|
||||
client http.Client
|
||||
}
|
||||
|
||||
func newHTTPReader(url string, timeout time.Duration) (io.ReadCloser, error) {
|
||||
hr := httpReader{URL: url, Timeout: timeout}
|
||||
func (t *HTTPTransport) Read(uri string) (io.ReadCloser, error) {
|
||||
log.Infof("GET %s timeout=%s", uri, t.client.Timeout)
|
||||
|
||||
log.Infof("GET %s timeout=%s", hr.URL, hr.Timeout)
|
||||
|
||||
c := &http.Client{
|
||||
Timeout: timeout,
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", hr.URL, nil)
|
||||
request, err := http.NewRequest("GET", uri, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Add("Accept-Encoding", "gzip")
|
||||
resp, err := c.Do(req)
|
||||
request.Header.Add("Accept-Encoding", "gzip")
|
||||
|
||||
resp, err := t.client.Do(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("Request to Alertmanager failed with %s", resp.Status)
|
||||
return nil, fmt.Errorf("Request to %s failed with %s", uri, resp.Status)
|
||||
}
|
||||
|
||||
var reader io.ReadCloser
|
||||
|
||||
@@ -1,45 +1,32 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ReadJSON using one of supported transports (file:// http://)
|
||||
func ReadJSON(uri string, timeout time.Duration, target interface{}) error {
|
||||
// Transport reads from a specific URI schema
|
||||
type Transport interface {
|
||||
Read(string) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
// NewTransport creates an instance of Transport that can handle URI schema
|
||||
// for the passed uri string
|
||||
func NewTransport(uri string, timeout time.Duration) (Transport, error) {
|
||||
u, err := url.Parse(uri)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
var reader io.ReadCloser
|
||||
|
||||
switch u.Scheme {
|
||||
case "http", "https":
|
||||
reader, err = newHTTPReader(u.String(), timeout)
|
||||
return &HTTPTransport{client: http.Client{Timeout: timeout}}, nil
|
||||
case "file":
|
||||
// if we have a file URI with relative path we need to expand it into an
|
||||
// absolute path, url.Parse doesn't support relative file paths
|
||||
if strings.HasPrefix(uri, "file:///") {
|
||||
reader, err = newFileReader(u.Path)
|
||||
} else {
|
||||
wd, e := os.Getwd()
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
absolutePath := path.Join(wd, strings.TrimPrefix(uri, "file://"))
|
||||
reader, err = newFileReader(absolutePath)
|
||||
}
|
||||
return &FileTransport{}, nil
|
||||
default:
|
||||
return fmt.Errorf("Unsupported URI scheme '%s' in '%s'", u.Scheme, u)
|
||||
return nil, fmt.Errorf("Unsupported URI scheme '%s' in '%s'", u.Scheme, u)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer reader.Close()
|
||||
return json.NewDecoder(reader).Decode(target)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package transport_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -61,8 +62,8 @@ type mockStatus struct {
|
||||
no bool
|
||||
}
|
||||
|
||||
func TestFileReader(t *testing.T) {
|
||||
log.SetLevel(log.ErrorLevel)
|
||||
func TestTransport(t *testing.T) {
|
||||
log.SetLevel(log.FatalLevel)
|
||||
httpmock.Activate()
|
||||
defer httpmock.DeactivateAndReset()
|
||||
mockJSON := `{
|
||||
@@ -79,8 +80,23 @@ func TestFileReader(t *testing.T) {
|
||||
httpmock.RegisterResponder("GET", "https://localhost/invalid", httpmock.NewStringResponder(200, "bad json}{}"))
|
||||
|
||||
for _, testCase := range transportTests {
|
||||
tr, err := transport.NewTransport(testCase.uri, testCase.timeout)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
source, err := tr.Read(testCase.uri)
|
||||
if err != nil {
|
||||
if !testCase.failed {
|
||||
t.Errorf("[%s] transport Read() failed with: %s", testCase.uri, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
r := mockStatus{}
|
||||
err := transport.ReadJSON(testCase.uri, testCase.timeout, &r)
|
||||
err = json.NewDecoder(source).Decode(&r)
|
||||
source.Close()
|
||||
|
||||
if (err != nil) != testCase.failed {
|
||||
t.Errorf("[%s] Expected failure: %v, Read() failed: %v, error: %s", testCase.uri, testCase.failed, (err != nil), err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user