Merge pull request #2466 from weaveworks/map-write-optimise

Msgpack perf: write psMap out directly
This commit is contained in:
Bryan Boreham
2017-04-25 14:55:45 +01:00
committed by GitHub
12 changed files with 45 additions and 228 deletions

View File

@@ -225,7 +225,7 @@ func TestAPITopologyAddsKubernetes(t *testing.T) {
buf := &bytes.Buffer{}
encoder := codec.NewEncoder(buf, &codec.MsgpackHandle{})
if err := encoder.Encode(rpt); err != nil {
t.Fatalf("GOB encoding error: %s", err)
t.Fatalf("Msgpack encoding error: %s", err)
}
checkRequest(t, ts, "POST", "/api/report", buf.Bytes())

View File

@@ -2,7 +2,6 @@ package xfer
import (
"bytes"
"encoding/gob"
"fmt"
"sort"
@@ -209,23 +208,6 @@ func (*PluginSpecs) UnmarshalJSON(b []byte) error {
panic("UnmarshalJSON shouldn't be used, use CodecDecodeSelf instead")
}
// GobEncode implements gob.Marshaller
func (n PluginSpecs) GobEncode() ([]byte, error) {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(n.toIntermediate())
return buf.Bytes(), err
}
// GobDecode implements gob.Unmarshaller
func (n *PluginSpecs) GobDecode(input []byte) error {
in := []PluginSpec{}
if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*n = PluginSpecs{}.fromIntermediate(in)
return nil
}
// PluginSpecsByID implements sort.Interface, so we can sort the specs by the
// ID field.
type PluginSpecsByID []PluginSpec

View File

@@ -155,23 +155,11 @@ function generate_latest_map() {
})
}
func (m ${latest_map_type}) toIntermediate() map[string]${entry_type} {
intermediate := make(map[string]${entry_type}, m.Size())
if m.Map != nil {
m.Map.ForEach(func(key string, val interface{}) {
intermediate[key] = *val.(*${entry_type})
})
}
return intermediate
}
// CodecEncodeSelf implements codec.Selfer.
func (m *${latest_map_type}) CodecEncodeSelf(encoder *codec.Encoder) {
if m.Map != nil {
encoder.Encode(m.toIntermediate())
} else {
encoder.Encode(nil)
}
mapWrite(m.Map, encoder, func(encoder *codec.Encoder, val interface{}) {
val.(*${entry_type}).CodecEncodeSelf(encoder)
})
}
// CodecDecodeSelf implements codec.Selfer.

View File

@@ -2,7 +2,6 @@ package report
import (
"bytes"
"encoding/gob"
"fmt"
"reflect"
"sort"
@@ -137,14 +136,6 @@ func (c Counters) DeepEqual(d Counters) bool {
return equal
}
func (c Counters) toIntermediate() map[string]int {
intermediate := map[string]int{}
c.ForEach(func(key string, val int) {
intermediate[key] = val
})
return intermediate
}
func (c Counters) fromIntermediate(in map[string]int) Counters {
out := ps.NewMap()
for k, v := range in {
@@ -155,7 +146,10 @@ func (c Counters) fromIntermediate(in map[string]int) Counters {
// CodecEncodeSelf implements codec.Selfer
func (c *Counters) CodecEncodeSelf(encoder *codec.Encoder) {
encoder.Encode(c.toIntermediate())
mapWrite(c.psMap, encoder, func(encoder *codec.Encoder, val interface{}) {
i := val.(int)
encoder.Encode(i)
})
}
// CodecDecodeSelf implements codec.Selfer
@@ -179,20 +173,3 @@ func (Counters) MarshalJSON() ([]byte, error) {
func (*Counters) UnmarshalJSON(b []byte) error {
panic("UnmarshalJSON shouldn't be used, use CodecDecodeSelf instead")
}
// GobEncode implements gob.Marshaller
func (c Counters) GobEncode() ([]byte, error) {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(c.toIntermediate())
return buf.Bytes(), err
}
// GobDecode implements gob.Unmarshaller
func (c *Counters) GobDecode(input []byte) error {
in := map[string]int{}
if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*c = Counters{}.fromIntermediate(in)
return nil
}

View File

@@ -101,18 +101,6 @@ func TestCountersEncoding(t *testing.T) {
Add("foo", 1).
Add("bar", 2)
{
gobs, err := want.GobEncode()
if err != nil {
t.Fatal(err)
}
have := EmptyCounters
have.GobDecode(gobs)
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}
{
for _, h := range []codec.Handle{

View File

@@ -2,7 +2,6 @@ package report
import (
"bytes"
"encoding/gob"
"fmt"
"reflect"
"sort"
@@ -148,31 +147,12 @@ func (c EdgeMetadatas) DeepEqual(d EdgeMetadatas) bool {
return equal
}
func (c EdgeMetadatas) toIntermediate() map[string]EdgeMetadata {
intermediate := map[string]EdgeMetadata{}
if c.psMap != nil {
c.psMap.ForEach(func(key string, val interface{}) {
intermediate[key] = val.(EdgeMetadata)
})
}
return intermediate
}
func (c EdgeMetadatas) fromIntermediate(in map[string]EdgeMetadata) EdgeMetadatas {
out := ps.NewMap()
for k, v := range in {
out = out.Set(k, v)
}
return EdgeMetadatas{out}
}
// CodecEncodeSelf implements codec.Selfer
func (c *EdgeMetadatas) CodecEncodeSelf(encoder *codec.Encoder) {
if c.psMap != nil {
encoder.Encode(c.toIntermediate())
} else {
encoder.Encode(nil)
}
mapWrite(c.psMap, encoder, func(encoder *codec.Encoder, val interface{}) {
e := val.(EdgeMetadata)
(&e).CodecEncodeSelf(encoder)
})
}
// CodecDecodeSelf implements codec.Selfer
@@ -197,23 +177,6 @@ func (*EdgeMetadatas) UnmarshalJSON(b []byte) error {
panic("UnmarshalJSON shouldn't be used, use CodecDecodeSelf instead")
}
// GobEncode implements gob.Marshaller
func (c EdgeMetadatas) GobEncode() ([]byte, error) {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(c.toIntermediate())
return buf.Bytes(), err
}
// GobDecode implements gob.Unmarshaller
func (c *EdgeMetadatas) GobDecode(input []byte) error {
in := map[string]EdgeMetadata{}
if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*c = EdgeMetadatas{}.fromIntermediate(in)
return nil
}
// EdgeMetadata describes a superset of the metadata that probes can possibly
// collect about a directed edge between two nodes in any topology.
type EdgeMetadata struct {

View File

@@ -214,18 +214,6 @@ func TestEdgeMetadatasEncoding(t *testing.T) {
EgressPacketCount: newu64(3),
})
{
gobs, err := want.GobEncode()
if err != nil {
t.Fatal(err)
}
have := EmptyEdgeMetadatas
have.GobDecode(gobs)
if !reflect.DeepEqual(want, have) {
t.Error(test.Diff(want, have))
}
}
{
for _, h := range []codec.Handle{
codec.Handle(&codec.MsgpackHandle{}),
@@ -247,18 +235,6 @@ func TestEdgeMetadatasEncoding(t *testing.T) {
func TestEdgeMetadatasEncodingNil(t *testing.T) {
want := EdgeMetadatas{}
{
gobs, err := want.GobEncode()
if err != nil {
t.Fatal(err)
}
have := EmptyEdgeMetadatas
have.GobDecode(gobs)
if have.psMap == nil {
t.Error("needed to get back a non-nil psMap for EdgeMetadata")
}
}
{
for _, h := range []codec.Handle{

View File

@@ -122,23 +122,11 @@ func (m StringLatestMap) DeepEqual(n StringLatestMap) bool {
})
}
func (m StringLatestMap) toIntermediate() map[string]stringLatestEntry {
intermediate := make(map[string]stringLatestEntry, m.Size())
if m.Map != nil {
m.Map.ForEach(func(key string, val interface{}) {
intermediate[key] = *val.(*stringLatestEntry)
})
}
return intermediate
}
// CodecEncodeSelf implements codec.Selfer.
func (m *StringLatestMap) CodecEncodeSelf(encoder *codec.Encoder) {
if m.Map != nil {
encoder.Encode(m.toIntermediate())
} else {
encoder.Encode(nil)
}
mapWrite(m.Map, encoder, func(encoder *codec.Encoder, val interface{}) {
val.(*stringLatestEntry).CodecEncodeSelf(encoder)
})
}
// CodecDecodeSelf implements codec.Selfer.
@@ -274,23 +262,11 @@ func (m NodeControlDataLatestMap) DeepEqual(n NodeControlDataLatestMap) bool {
})
}
func (m NodeControlDataLatestMap) toIntermediate() map[string]nodeControlDataLatestEntry {
intermediate := make(map[string]nodeControlDataLatestEntry, m.Size())
if m.Map != nil {
m.Map.ForEach(func(key string, val interface{}) {
intermediate[key] = *val.(*nodeControlDataLatestEntry)
})
}
return intermediate
}
// CodecEncodeSelf implements codec.Selfer.
func (m *NodeControlDataLatestMap) CodecEncodeSelf(encoder *codec.Encoder) {
if m.Map != nil {
encoder.Encode(m.toIntermediate())
} else {
encoder.Encode(nil)
}
mapWrite(m.Map, encoder, func(encoder *codec.Encoder, val interface{}) {
val.(*nodeControlDataLatestEntry).CodecEncodeSelf(encoder)
})
}
// CodecDecodeSelf implements codec.Selfer.

View File

@@ -85,9 +85,10 @@ const (
containerMapKey = 2
containerMapValue = 3
containerMapEnd = 4
// from https://github.com/ugorji/go/blob/master/codec/helper.go#L152
cUTF8 = 2
)
// CodecDecodeSelf implements codec.Selfer.
// This implementation does not use the intermediate form as that was a
// performance issue; skipping it saved almost 10% CPU. Note this means
// we are using undocumented, internal APIs, which could break in the future.
@@ -118,3 +119,21 @@ func mapRead(decoder *codec.Decoder, decodeValue func(isNil bool) interface{}) p
z.DecSendContainerState(containerMapEnd)
return out
}
// Inverse of mapRead, done for performance. Same comments about
// undocumented internal APIs apply.
func mapWrite(m ps.Map, encoder *codec.Encoder, encodeValue func(*codec.Encoder, interface{})) {
z, r := codec.GenHelperEncoder(encoder)
if m == nil {
r.EncodeNil()
return
}
r.EncodeMapStart(m.Size())
m.ForEach(func(key string, val interface{}) {
z.EncSendContainerState(containerMapKey)
r.EncodeString(cUTF8, key)
z.EncSendContainerState(containerMapValue)
encodeValue(encoder, val)
})
z.EncSendContainerState(containerMapEnd)
}

View File

@@ -17,6 +17,10 @@ func (s *dummySelfer) CodecDecodeSelf(decoder *codec.Decoder) {
panic("This shouldn't happen: perhaps something has gone wrong in code generation?")
}
func (s *dummySelfer) CodecEncodeSelf(encoder *codec.Encoder) {
panic("This shouldn't happen: perhaps something has gone wrong in code generation?")
}
// WriteBinary writes a Report as a gzipped msgpack.
func (rep Report) WriteBinary(w io.Writer, compressionLevel int) error {
gzwriter, err := gzip.NewWriterLevel(w, compressionLevel)

View File

@@ -2,7 +2,6 @@ package report
import (
"bytes"
"encoding/gob"
"fmt"
"sort"
@@ -206,20 +205,3 @@ func (NodeSet) MarshalJSON() ([]byte, error) {
func (*NodeSet) UnmarshalJSON(b []byte) error {
panic("UnmarshalJSON shouldn't be used, use CodecDecodeSelf instead")
}
// GobEncode implements gob.Marshaller
func (n NodeSet) GobEncode() ([]byte, error) {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(n.toIntermediate())
return buf.Bytes(), err
}
// GobDecode implements gob.Unmarshaller
func (n *NodeSet) GobDecode(input []byte) error {
in := []Node{}
if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*n = NodeSet{}.fromIntermediate(in)
return nil
}

View File

@@ -2,7 +2,6 @@ package report
import (
"bytes"
"encoding/gob"
"fmt"
"reflect"
"sort"
@@ -148,31 +147,11 @@ func (s Sets) DeepEqual(t Sets) bool {
return equal
}
func (s Sets) toIntermediate() map[string]StringSet {
intermediate := map[string]StringSet{}
if s.psMap != nil {
s.psMap.ForEach(func(key string, val interface{}) {
intermediate[key] = val.(StringSet)
})
}
return intermediate
}
func (s Sets) fromIntermediate(in map[string]StringSet) Sets {
out := ps.NewMap()
for k, v := range in {
out = out.Set(k, v)
}
return Sets{out}
}
// CodecEncodeSelf implements codec.Selfer
func (s *Sets) CodecEncodeSelf(encoder *codec.Encoder) {
if s.psMap != nil {
encoder.Encode(s.toIntermediate())
} else {
encoder.Encode(nil)
}
mapWrite(s.psMap, encoder, func(encoder *codec.Encoder, val interface{}) {
encoder.Encode(val.(StringSet))
})
}
// CodecDecodeSelf implements codec.Selfer
@@ -196,20 +175,3 @@ func (Sets) MarshalJSON() ([]byte, error) {
func (*Sets) UnmarshalJSON(b []byte) error {
panic("UnmarshalJSON shouldn't be used, use CodecDecodeSelf instead")
}
// GobEncode implements gob.Marshaller
func (s Sets) GobEncode() ([]byte, error) {
buf := bytes.Buffer{}
err := gob.NewEncoder(&buf).Encode(s.toIntermediate())
return buf.Bytes(), err
}
// GobDecode implements gob.Unmarshaller
func (s *Sets) GobDecode(input []byte) error {
in := map[string]StringSet{}
if err := gob.NewDecoder(bytes.NewBuffer(input)).Decode(&in); err != nil {
return err
}
*s = Sets{}.fromIntermediate(in)
return nil
}