Compare commits

..

6 Commits

Author SHA1 Message Date
David Levanon
b0c8c0c192 Add response body to the error in case of failure (#503)
* add response body to the error in case of failure

* fix typo + make inline condition
2021-11-23 20:16:07 +02:00
Nimrod Gilboa Markevich
1c18eb1b84 Use one channel for events instead of three (#495)
Use one channel for events instead of three separate channels by event type
2021-11-23 15:06:27 +02:00
David Levanon
01d6005a7b minor logging changes (#499)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2021-11-23 14:21:53 +02:00
Nimrod Gilboa Markevich
4c97316c02 Remove prevPodPhase (#497)
prevPodPhase does not take into account the fact that there may be more
than one tapper pod. Therefore it is not clear what its value
represents. It is only used in a debug print. It is not worth the effort
to fix for that one debug print.

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2021-11-23 10:03:36 +02:00
M. Mert Yıldıran
d66c7445e6 Remove SetHostname method in HTTP extension (#496) 2021-11-22 19:30:06 +03:00
M. Mert Yıldıran
12ca3d8779 Make the gRPC and HTTP/2 distinction (#492)
* Remove the extra negation on `nodefrag` flag's value

* Support IPv4 fragmentation and IPv6 at the same time

* Set `Method` and `StatusCode` fields correctly for `HTTP/2`

* Replace unnecessary `grpc` naming with `http2`

* Make the `gRPC` and `HTTP/2` distinction

* Fix the macros of `http` extension

* Fix the macros of other protocol extensions

* Update the method signature of `Represent`

* Fix the `HTTP/2` support

* Fix some minor issues

* Upgrade Basenine version from `0.2.10` to `0.2.11`

Sorts macros before expanding them and prioritize the long macros.

* Don't regex split the gRPC method name

* Re-enable `nodefrag` flag
2021-11-22 17:46:35 +03:00
22 changed files with 281 additions and 338 deletions

3
.gitignore vendored
View File

@@ -32,3 +32,6 @@ pprof/*
# Database Files
*.bin
*.gob
# Nohup Files - https://man7.org/linux/man-pages/man1/nohup.1p.html
nohup.*

View File

@@ -42,8 +42,8 @@ RUN go build -ldflags="-s -w \
-X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.2.11/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.11/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
RUN shasum -a 256 -c basenine_linux_amd64.sha256
RUN chmod +x ./basenine_linux_amd64

View File

@@ -16,7 +16,7 @@ require (
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73
github.com/up9inc/basenine/client/go v0.0.0-20211121072216-04366911881c
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0

View File

@@ -450,8 +450,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73 h1:FJUM7w7E0jRGFPcSMa7cVy+jr5zcpbyT6qA30dEtGGI=
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20211121072216-04366911881c h1:GJsCVhDKjV/k3mNG255VN7hAQ7fxyNgX5T+VJyzoOQ0=
github.com/up9inc/basenine/client/go v0.0.0-20211121072216-04366911881c/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=

View File

@@ -493,19 +493,19 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource added event: %+v", err)
logger.Log.Errorf("error parsing Mizu resource event: %+v", err)
cancel()
}
@@ -514,45 +514,7 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource removed event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource modified event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
logger.Log.Warningf("resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case err, ok := <-errorChan:
if !ok {

View File

@@ -55,7 +55,7 @@ func GetEntry(c *gin.Context) {
}
extension := extensionsMap[entry.Protocol.Name]
protocol, representation, bodySize, _ := extension.Dissector.Represent(entry.Protocol, entry.Request, entry.Response)
representation, bodySize, _ := extension.Dissector.Represent(entry.Request, entry.Response)
var rules []map[string]interface{}
var isRulesEnabled bool
@@ -68,7 +68,7 @@ func GetEntry(c *gin.Context) {
}
c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{
Protocol: protocol,
Protocol: entry.Protocol,
Representation: string(representation),
BodySize: bodySize,
Data: entry,

View File

@@ -3,11 +3,12 @@ package apiserver
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/up9inc/mizu/shared/kubernetes"
@@ -61,7 +62,14 @@ func (provider *Provider) GetHealthStatus() (*shared.HealthResponse, error) {
if response, err := provider.client.Get(healthUrl); err != nil {
return nil, err
} else if response.StatusCode > 299 {
return nil, errors.New(fmt.Sprintf("status code: %d", response.StatusCode))
responseBody := new(strings.Builder)
if _, err := io.Copy(responseBody, response.Body); err != nil {
return nil, fmt.Errorf("status code: %d - (bad response - %v)", response.StatusCode, err)
} else {
singleLineResponse := strings.ReplaceAll(responseBody.String(), "\n", "")
return nil, fmt.Errorf("status code: %d - (response - %v)", response.StatusCode, singleLineResponse)
}
} else {
defer response.Body.Close()

View File

@@ -559,76 +559,73 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
isPodReady := false
timeAfter := time.After(25 * time.Second)
for {
select {
case _, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
logger.Log.Debugf("Watching API Server pod loop, added")
case _, ok := <-removed:
if !ok {
removed = nil
continue
}
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
cancel()
return
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
switch wEvent.Type {
case kubernetes.EventAdded:
logger.Log.Debugf("Watching API Server pod loop, added")
case kubernetes.EventDeleted:
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
cancel()
continue
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodPending {
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
return
case kubernetes.EventModified:
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
break
continue
}
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath()))
cancel()
break
}
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
if modifiedPod.Status.Phase == core.PodPending {
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
}
url := GetApiServerUrl()
if err := apiProvider.TestConnection(); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath()))
cancel()
break
}
}
logger.Log.Infof("Mizu is available at %s", url)
if !config.Config.HeadlessMode {
uiUtils.OpenBrowser(url)
}
if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil {
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
url := GetApiServerUrl()
if err := apiProvider.TestConnection(); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
}
logger.Log.Infof("Mizu is available at %s", url)
if !config.Config.HeadlessMode {
uiUtils.OpenBrowser(url)
}
if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil {
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
}
case kubernetes.EventBookmark:
break
case kubernetes.EventError:
break
}
case err, ok := <-errorChan:
if !ok {
@@ -654,76 +651,53 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
var prevPodPhase core.PodPhase
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
for {
select {
case wEvent, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
addedPod, err := wEvent.ToPod()
pod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
switch wEvent.Type {
case kubernetes.EventAdded:
logger.Log.Debugf("Tapper is created [%s]", pod.Name)
case kubernetes.EventDeleted:
logger.Log.Debugf("Tapper is removed [%s]", pod.Name)
case kubernetes.EventModified:
if pod.Status.Phase == core.PodPending && pod.Status.Conditions[0].Type == core.PodScheduled && pod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", pod.Name, pod.Status.Conditions[0].Message))
cancel()
continue
}
removedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
podStatus := pod.Status
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message))
cancel()
continue
}
podStatus := modifiedPod.Status
if podStatus.Phase == core.PodPending && prevPodPhase == podStatus.Phase {
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
continue
}
prevPodPhase = podStatus.Phase
if podStatus.Phase == core.PodRunning {
state := podStatus.ContainerStatuses[0].State
if state.Terminated != nil {
switch state.Terminated.Reason {
case "OOMKilled":
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name))
if podStatus.Phase == core.PodRunning {
state := podStatus.ContainerStatuses[0].State
if state.Terminated != nil {
switch state.Terminated.Reason {
case "OOMKilled":
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", pod.Name))
}
}
}
}
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
logger.Log.Debugf("Tapper %s is %s", pod.Name, strings.ToLower(string(podStatus.Phase)))
case kubernetes.EventBookmark:
break
case kubernetes.EventError:
break
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
@@ -746,57 +720,19 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource added event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource removed event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource modified event: %+v", err))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource event: %+v", err))
cancel()
}

View File

@@ -37,8 +37,8 @@ COPY agent .
RUN go build -gcflags="all=-N -l" -o mizuagent .
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.2.11/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.11/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
RUN shasum -a 256 -c basenine_linux_amd64.sha256
RUN chmod +x ./basenine_linux_amd64

View File

@@ -70,7 +70,7 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro
func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
restartTappers := func() {
err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
@@ -96,9 +96,9 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
for {
select {
case wEvent, ok := <-added:
case wEvent, ok := <-eventChan:
if !ok {
added = nil
eventChan = nil
continue
}
@@ -109,44 +109,28 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
}
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
switch wEvent.Type {
case EventAdded:
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case EventDeleted:
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case EventModified:
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
restartTappersDebouncer.SetOn()
}
case EventBookmark:
break
case EventError:
break
}
case err, ok := <-errorChan:
if !ok {

View File

@@ -20,10 +20,8 @@ type WatchCreator interface {
NewWatcher(ctx context.Context, namespace string) (watch.Interface, error)
}
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (chan *WatchEvent, chan *WatchEvent, chan *WatchEvent, chan error) {
addedChan := make(chan *WatchEvent)
modifiedChan := make(chan *WatchEvent)
removedChan := make(chan *WatchEvent)
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (<-chan *WatchEvent, <-chan error) {
eventChan := make(chan *WatchEvent)
errorChan := make(chan error)
var wg sync.WaitGroup
@@ -42,7 +40,7 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
break
}
err = startWatchLoop(ctx, watcher, filterer, addedChan, modifiedChan, removedChan) // blocking
err = startWatchLoop(ctx, watcher, filterer, eventChan) // blocking
watcher.Stop()
select {
@@ -73,16 +71,14 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
go func() {
<-ctx.Done()
wg.Wait()
close(addedChan)
close(modifiedChan)
close(removedChan)
close(eventChan)
close(errorChan)
}()
return addedChan, modifiedChan, removedChan, errorChan
return eventChan, errorChan
}
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, addedChan chan *WatchEvent, modifiedChan chan *WatchEvent, removedChan chan *WatchEvent) error {
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, eventChan chan<- *WatchEvent) error {
resultChan := watcher.ResultChan()
for {
select {
@@ -103,14 +99,7 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer Event
continue
}
switch wEvent.Type {
case watch.Added:
addedChan <- &wEvent
case watch.Modified:
modifiedChan <- &wEvent
case watch.Deleted:
removedChan <- &wEvent
}
eventChan <- &wEvent
case <-ctx.Done():
return nil
}

View File

@@ -10,6 +10,14 @@ import (
"k8s.io/apimachinery/pkg/watch"
)
const (
EventAdded watch.EventType = watch.Added
EventModified watch.EventType = watch.Modified
EventDeleted watch.EventType = watch.Deleted
EventBookmark watch.EventType = watch.Bookmark
EventError watch.EventType = watch.Error
)
type InvalidObjectType struct {
RequestedType reflect.Type
}

View File

@@ -9,7 +9,7 @@ import (
var Log = logging.MustGetLogger("mizu")
var format = logging.MustStringFormatter(
`%{time:2006-01-02T15:04:05.999Z-07:00} %{level:-5s} ▶ %{message} ▶ %{pid} %{shortfile} %{shortfunc}`,
`[%{time:2006-01-02T15:04:05.000-0700}] %{level:-5s} ▶ %{message} ▶ [%{pid} %{shortfile} %{shortfunc}]`,
)
func InitLogger(logPath string) {

View File

@@ -99,7 +99,7 @@ type Dissector interface {
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *MizuEntry
Summarize(entry *MizuEntry) *BaseEntryDetails
Represent(pIn Protocol, request map[string]interface{}, response map[string]interface{}) (pOut Protocol, object []byte, bodySize int64, err error)
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
Macros() map[string]string
}

View File

@@ -325,8 +325,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
protoOut = protocol
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
representation := make(map[string]interface{}, 0)
var repRequest []interface{}
@@ -363,7 +362,7 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
func (d dissecting) Macros() map[string]string {
return map[string]string{
`amqp`: fmt.Sprintf(`proto.abbr == "%s"`, protocol.Abbreviation),
`amqp`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name),
}
}

View File

@@ -23,8 +23,8 @@ func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *ap
emitter.Emit(item)
}
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
streamID, messageHTTP1, err := grpcAssembler.readMessage()
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil {
return err
}
@@ -73,7 +73,11 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTime
}
if item != nil {
item.Protocol = http2Protocol
if isGrpc {
item.Protocol = grpcProtocol
} else {
item.Protocol = http2Protocol
}
filterAndEmit(item, emitter, options)
}

View File

@@ -10,6 +10,7 @@ import (
"math"
"net/http"
"net/url"
"strconv"
"strings"
"golang.org/x/net/http2"
@@ -27,6 +28,26 @@ const protoMinorHTTP2 = 0
var maxHTTP2DataLen = 1 * 1024 * 1024 // 1MB
var grpcStatusCodes = []string{
"OK",
"CANCELLED",
"UNKNOWN",
"INVALID_ARGUMENT",
"DEADLINE_EXCEEDED",
"NOT_FOUND",
"ALREADY_EXISTS",
"PERMISSION_DENIED",
"RESOURCE_EXHAUSTED",
"FAILED_PRECONDITION",
"ABORTED",
"OUT_OF_RANGE",
"UNIMPLEMENTED",
"INTERNAL",
"UNAVAILABLE",
"DATA_LOSS",
"UNAUTHENTICATED",
}
type messageFragment struct {
headers []hpack.HeaderField
data []byte
@@ -71,37 +92,38 @@ func (fbs *fragmentsByStream) pop(streamID uint32) ([]hpack.HeaderField, []byte)
return headers, data
}
func createGrpcAssembler(b *bufio.Reader) *GrpcAssembler {
func createHTTP2Assembler(b *bufio.Reader) *Http2Assembler {
var framerOutput bytes.Buffer
framer := http2.NewFramer(&framerOutput, b)
framer.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
return &GrpcAssembler{
return &Http2Assembler{
fragmentsByStream: make(fragmentsByStream),
framer: framer,
}
}
type GrpcAssembler struct {
type Http2Assembler struct {
fragmentsByStream fragmentsByStream
framer *http2.Framer
}
func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
func (ga *Http2Assembler) readMessage() (streamID uint32, messageHTTP1 interface{}, isGrpc bool, err error) {
// Exactly one Framer is used for each half connection.
// (Instead of creating a new Framer for each ReadFrame operation)
// This is needed in order to decompress the headers,
// because the compression context is updated with each requests/response.
frame, err := ga.framer.ReadFrame()
if err != nil {
return 0, nil, err
return
}
streamID := frame.Header().StreamID
streamID = frame.Header().StreamID
ga.fragmentsByStream.appendFrame(streamID, frame)
if !(ga.isStreamEnd(frame)) {
return 0, nil, nil
streamID = 0
return
}
headers, data := ga.fragmentsByStream.pop(streamID)
@@ -115,13 +137,29 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
dataString := base64.StdEncoding.EncodeToString(data)
// Use http1 types only because they are expected in http_matcher.
// TODO: Create an interface that will be used by http_matcher:registerRequest and http_matcher:registerRequest
// to accept both HTTP/1.x and HTTP/2 requests and responses
var messageHTTP1 interface{}
if _, ok := headersHTTP1[":method"]; ok {
method := headersHTTP1.Get(":method")
status := headersHTTP1.Get(":status")
// gRPC detection
grpcStatus := headersHTTP1.Get("Grpc-Status")
if grpcStatus != "" {
isGrpc = true
status = grpcStatus
}
if strings.Contains(headersHTTP1.Get("Content-Type"), "application/grpc") {
isGrpc = true
grpcPath := headersHTTP1.Get(":path")
pathSegments := strings.Split(grpcPath, "/")
if len(pathSegments) > 0 {
method = pathSegments[len(pathSegments)-1]
}
}
if method != "" {
messageHTTP1 = http.Request{
URL: &url.URL{},
Method: "POST",
Method: method,
Header: headersHTTP1,
Proto: protoHTTP2,
ProtoMajor: protoMajorHTTP2,
@@ -129,8 +167,16 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
Body: io.NopCloser(strings.NewReader(dataString)),
ContentLength: int64(len(dataString)),
}
} else if _, ok := headersHTTP1[":status"]; ok {
} else if status != "" {
var statusCode int
statusCode, err = strconv.Atoi(status)
if err != nil {
return
}
messageHTTP1 = http.Response{
StatusCode: statusCode,
Header: headersHTTP1,
Proto: protoHTTP2,
ProtoMajor: protoMajorHTTP2,
@@ -139,13 +185,14 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
ContentLength: int64(len(dataString)),
}
} else {
return 0, nil, errors.New("failed to assemble stream: neither a request nor a message")
err = errors.New("failed to assemble stream: neither a request nor a message")
return
}
return streamID, messageHTTP1, nil
return
}
func (ga *GrpcAssembler) isStreamEnd(frame http2.Frame) bool {
func (ga *Http2Assembler) isStreamEnd(frame http2.Frame) bool {
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if frame.StreamEnded() {

View File

@@ -23,21 +23,35 @@ var protocol api.Protocol = api.Protocol{
ForegroundColor: "#ffffff",
FontSize: 12,
ReferenceLink: "https://datatracker.ietf.org/doc/html/rfc2616",
Ports: []string{"80", "8080", "50051"},
Ports: []string{"80", "443", "8080"},
Priority: 0,
}
var http2Protocol api.Protocol = api.Protocol{
Name: "http",
LongName: "Hypertext Transfer Protocol Version 2 (HTTP/2) (gRPC)",
LongName: "Hypertext Transfer Protocol Version 2 (HTTP/2)",
Abbreviation: "HTTP/2",
Macro: "grpc",
Macro: "http2",
Version: "2.0",
BackgroundColor: "#244c5a",
ForegroundColor: "#ffffff",
FontSize: 11,
ReferenceLink: "https://datatracker.ietf.org/doc/html/rfc7540",
Ports: []string{"80", "8080"},
Ports: []string{"80", "443", "8080"},
Priority: 0,
}
var grpcProtocol api.Protocol = api.Protocol{
Name: "http",
LongName: "Hypertext Transfer Protocol Version 2 (HTTP/2) [ gRPC over HTTP/2 ]",
Abbreviation: "gRPC",
Macro: "grpc",
Version: "2.0",
BackgroundColor: "#244c5a",
ForegroundColor: "#ffffff",
FontSize: 11,
ReferenceLink: "https://grpc.github.io/grpc/core/md_doc_statuscodes.html",
Ports: []string{"80", "443", "8080", "50051"},
Priority: 0,
}
@@ -64,10 +78,10 @@ func (d dissecting) Ping() {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
var grpcAssembler *GrpcAssembler
var http2Assembler *Http2Assembler
if isHTTP2 {
prepareHTTP2Connection(b, isClient)
grpcAssembler = createGrpcAssembler(b)
http2Assembler = createHTTP2Assembler(b)
}
dissected := false
@@ -77,7 +91,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isHTTP2 {
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter, options)
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -110,17 +124,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
return nil
}
func SetHostname(address, newHostname string) string {
replacedUrl, err := url.Parse(address)
if err != nil {
return address
}
replacedUrl.Host = newHostname
return replacedUrl.String()
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.MizuEntry {
var host, scheme, authority, path, service string
var host, authority, path, service string
request := item.Pair.Request.Payload.(map[string]interface{})
response := item.Pair.Response.Payload.(map[string]interface{})
@@ -135,9 +140,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
if h["name"] == ":authority" {
authority = h["value"].(string)
}
if h["name"] == ":scheme" {
scheme = h["value"].(string)
}
if h["name"] == ":path" {
path = h["value"].(string)
}
@@ -148,9 +150,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
if item.Protocol.Version == "2.0" {
service = fmt.Sprintf("%s://%s", scheme, authority)
service = authority
} else {
service = fmt.Sprintf("http://%s", host)
service = host
u, err := url.Parse(reqDetails["url"].(string))
if err != nil {
path = reqDetails["url"].(string)
@@ -178,9 +180,20 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
reqDetails["queryString"] = mapSliceRebuildAsMap(reqDetails["_queryString"].([]interface{}))
if resolvedDestination != "" {
service = SetHostname(service, resolvedDestination)
service = resolvedDestination
} else if resolvedSource != "" {
service = SetHostname(service, resolvedSource)
service = resolvedSource
}
method := reqDetails["method"].(string)
statusCode := int(resDetails["status"].(float64))
if item.Protocol.Abbreviation == "gRPC" {
resDetails["statusText"] = grpcStatusCodes[statusCode]
}
if item.Protocol.Version == "2.0" {
reqDetails["url"] = path
request["url"] = path
}
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
@@ -188,10 +201,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
elapsedTime = 0
}
httpPair, _ := json.Marshal(item.Pair)
_protocol := protocol
_protocol.Version = item.Protocol.Version
return &api.MizuEntry{
Protocol: _protocol,
Protocol: item.Protocol,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@@ -206,8 +217,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
Request: reqDetails,
Response: resDetails,
Url: fmt.Sprintf("%s%s", service, path),
Method: reqDetails["method"].(string),
Status: int(resDetails["status"].(float64)),
Method: method,
Status: statusCode,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
@@ -226,15 +237,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
var p api.Protocol
if entry.Protocol.Version == "2.0" {
p = http2Protocol
} else {
p = protocol
}
return &api.BaseEntryDetails{
Id: entry.Id,
Protocol: p,
Protocol: entry.Protocol,
Url: entry.Url,
RequestSenderIp: entry.RequestSenderIp,
Service: entry.Service,
@@ -408,12 +413,7 @@ func representResponse(response map[string]interface{}) (repResponse []interface
return
}
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
if protoIn.Version == "2.0" {
protoOut = http2Protocol
} else {
protoOut = protocol
}
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
representation := make(map[string]interface{}, 0)
repRequest := representRequest(request)
repResponse, bodySize := representResponse(response)
@@ -425,9 +425,9 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
func (d dissecting) Macros() map[string]string {
return map[string]string{
`http`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, protocol.Version),
`grpc`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, http2Protocol.Version),
`http2`: fmt.Sprintf(`proto.abbr == "%s" and proto.version == "%s"`, protocol.Abbreviation, http2Protocol.Version),
`http`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s"`, protocol.Name, protocol.Version),
`http2`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s"`, protocol.Name, http2Protocol.Version),
`grpc`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s" and proto.macro == "%s"`, protocol.Name, grpcProtocol.Version, grpcProtocol.Macro),
}
}

View File

@@ -210,8 +210,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
protoOut = _protocol
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
representation := make(map[string]interface{}, 0)
@@ -258,7 +257,7 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
func (d dissecting) Macros() map[string]string {
return map[string]string{
`kafka`: fmt.Sprintf(`proto.abbr == "%s"`, _protocol.Abbreviation),
`kafka`: fmt.Sprintf(`proto.name == "%s"`, _protocol.Name),
}
}

View File

@@ -146,8 +146,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface{}, response map[string]interface{}) (protoOut api.Protocol, object []byte, bodySize int64, err error) {
protoOut = protocol
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
representation := make(map[string]interface{}, 0)
repRequest := representGeneric(request, `request.`)
@@ -160,7 +159,7 @@ func (d dissecting) Represent(protoIn api.Protocol, request map[string]interface
func (d dissecting) Macros() map[string]string {
return map[string]string{
`redis`: fmt.Sprintf(`proto.abbr == "%s"`, protocol.Abbreviation),
`redis`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name),
}
}

View File

@@ -16,6 +16,8 @@ import (
"github.com/up9inc/mizu/tap/source"
)
const PACKETS_SEEN_LOG_THRESHOLD = 1000
type tcpAssembler struct {
*reassembly.Assembler
streamPool *reassembly.StreamPool
@@ -63,7 +65,11 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
for packetInfo := range packets {
packetsCount := diagnose.AppStats.IncPacketsCount()
logger.Log.Debugf("PACKET #%d", packetsCount)
if packetsCount % PACKETS_SEEN_LOG_THRESHOLD == 0 {
logger.Log.Debugf("Packets seen: #%d", packetsCount)
}
packet := packetInfo.Packet
data := packet.Data()
diagnose.AppStats.UpdateProcessedBytes(uint64(len(data)))
@@ -85,7 +91,7 @@ func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.Tcp
CaptureInfo: packet.Metadata().CaptureInfo,
}
diagnose.InternalStats.Totalsz += len(tcp.Payload)
logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
logger.Log.Debugf("%s:%v -> %s:%v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
a.assemblerMutex.Lock()
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
a.assemblerMutex.Unlock()

View File

@@ -5,7 +5,6 @@
border: solid 1px $secondary-font-color
margin-left: 4px
padding: 2px 5px
text-transform: uppercase
font-family: "Source Sans Pro", sans-serif
font-size: 11px
font-weight: bold