Merge pull request #3772 from weaveworks/downgrade-fluent-lib

dependencies: downgrade fluent-logger-golang library used in multitenant mode
This commit is contained in:
Bryan Boreham
2020-03-26 19:04:08 +00:00
committed by GitHub
8 changed files with 217 additions and 1061 deletions

3
go.mod
View File

@@ -153,3 +153,6 @@ require (
k8s.io/kubernetes v1.13.0
sigs.k8s.io/yaml v1.1.0 // indirect
)
// Do not upgrade until https://github.com/fluent/fluent-logger-golang/issues/80 is fixed
replace github.com/fluent/fluent-logger-golang => github.com/fluent/fluent-logger-golang v1.2.1

View File

@@ -4,19 +4,13 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net"
"os"
"reflect"
"strconv"
"sync"
"time"
"bytes"
"encoding/base64"
"encoding/binary"
"github.com/tinylib/msgp/msgp"
"math/rand"
)
const (
@@ -25,71 +19,35 @@ const (
defaultSocketPath = ""
defaultPort = 24224
defaultTimeout = 3 * time.Second
defaultWriteTimeout = time.Duration(0) // Write() will not time out
defaultBufferLimit = 8 * 1024
defaultBufferLimit = 8 * 1024 * 1024
defaultRetryWait = 500
defaultMaxRetryWait = 60000
defaultMaxRetry = 13
defaultReconnectWaitIncreRate = 1.5
// Default sub-second precision value to false since it is only compatible
// with fluentd versions v0.14 and above.
defaultSubSecondPrecision = false
)
type Config struct {
FluentPort int `json:"fluent_port"`
FluentHost string `json:"fluent_host"`
FluentNetwork string `json:"fluent_network"`
FluentSocketPath string `json:"fluent_socket_path"`
Timeout time.Duration `json:"timeout"`
WriteTimeout time.Duration `json:"write_timeout"`
BufferLimit int `json:"buffer_limit"`
RetryWait int `json:"retry_wait"`
MaxRetry int `json:"max_retry"`
MaxRetryWait int `json:"max_retry_wait"`
TagPrefix string `json:"tag_prefix"`
Async bool `json:"async"`
ForceStopAsyncSend bool `json:"force_stop_async_send"`
// Deprecated: Use Async instead
AsyncConnect bool `json:"async_connect"`
MarshalAsJSON bool `json:"marshal_as_json"`
// Sub-second precision timestamps are only possible for those using fluentd
// v0.14+ and serializing their messages with msgpack.
SubSecondPrecision bool `json:"sub_second_precision"`
// RequestAck sends the chunk option with a unique ID. The server will
// respond with an acknowledgement. This option improves the reliability
// of the message transmission.
RequestAck bool `json:"request_ack"`
}
type ErrUnknownNetwork struct {
network string
}
func (e *ErrUnknownNetwork) Error() string {
return "unknown network " + e.network
}
func NewErrUnknownNetwork(network string) error {
return &ErrUnknownNetwork{network}
}
type msgToSend struct {
data []byte
ack string
FluentPort int `json:"fluent_port"`
FluentHost string `json:"fluent_host"`
FluentNetwork string `json:"fluent_network"`
FluentSocketPath string `json:"fluent_socket_path"`
Timeout time.Duration `json:"timeout"`
BufferLimit int `json:"buffer_limit"`
RetryWait int `json:"retry_wait"`
MaxRetry int `json:"max_retry"`
TagPrefix string `json:"tag_prefix"`
AsyncConnect bool `json:"async_connect"`
MarshalAsJSON bool `json:"marshal_as_json"`
}
type Fluent struct {
Config
stopRunning chan bool
pending chan *msgToSend
wg sync.WaitGroup
mubuff sync.Mutex
pending []byte
muconn sync.Mutex
conn net.Conn
muconn sync.Mutex
conn io.WriteCloser
reconnecting bool
}
// New creates a new Logger.
@@ -109,9 +67,6 @@ func New(config Config) (f *Fluent, err error) {
if config.Timeout == 0 {
config.Timeout = defaultTimeout
}
if config.WriteTimeout == 0 {
config.WriteTimeout = defaultWriteTimeout
}
if config.BufferLimit == 0 {
config.BufferLimit = defaultBufferLimit
}
@@ -121,22 +76,11 @@ func New(config Config) (f *Fluent, err error) {
if config.MaxRetry == 0 {
config.MaxRetry = defaultMaxRetry
}
if config.MaxRetryWait == 0 {
config.MaxRetryWait = defaultMaxRetryWait
}
if config.AsyncConnect {
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
config.Async = config.Async || config.AsyncConnect
}
if config.Async {
f = &Fluent{
Config: config,
pending: make(chan *msgToSend, config.BufferLimit),
}
f.wg.Add(1)
go f.run()
f = &Fluent{Config: config, reconnecting: true}
go f.reconnect()
} else {
f = &Fluent{Config: config}
f = &Fluent{Config: config, reconnecting: false}
err = f.connect()
}
return
@@ -146,6 +90,9 @@ func New(config Config) (f *Fluent, err error) {
//
// Examples:
//
// // send string
// f.Post("tag_name", "data")
//
// // send map[string]
// mapStringData := map[string]string{
// "foo": "bar",
@@ -177,10 +124,6 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
tag = f.TagPrefix + "." + tag
}
if m, ok := message.(msgp.Marshaler); ok {
return f.EncodeAndPostData(tag, tm, m)
}
msg := reflect.ValueOf(message)
msgtype := msg.Type()
@@ -216,25 +159,28 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
}
func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
var msg *msgToSend
var data []byte
var err error
if msg, err = f.EncodeData(tag, tm, message); err != nil {
if data, err = f.EncodeData(tag, tm, message); err != nil {
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
}
return f.postRawData(msg)
return f.postRawData(data)
}
// Deprecated: Use EncodeAndPostData instead
func (f *Fluent) PostRawData(msg *msgToSend) {
f.postRawData(msg)
func (f *Fluent) PostRawData(data []byte) {
f.postRawData(data)
}
func (f *Fluent) postRawData(msg *msgToSend) error {
if f.Config.Async {
return f.appendBuffer(msg)
func (f *Fluent) postRawData(data []byte) error {
if err := f.appendBuffer(data); err != nil {
return err
}
// Synchronous write
return f.write(msg)
if err := f.send(); err != nil {
f.close()
return err
}
return nil
}
// For sending forward protocol adopted JSON
@@ -247,91 +193,47 @@ type MessageChunk struct {
// So, it should write JSON marshaler by hand.
func (chunk *MessageChunk) MarshalJSON() ([]byte, error) {
data, err := json.Marshal(chunk.message.Record)
if err != nil {
return nil, err
}
option, err := json.Marshal(chunk.message.Option)
if err != nil {
return nil, err
}
return []byte(fmt.Sprintf("[\"%s\",%d,%s,%s]", chunk.message.Tag,
chunk.message.Time, data, option)), err
return []byte(fmt.Sprintf("[\"%s\",%d,%s,null]", chunk.message.Tag,
chunk.message.Time, data)), err
}
// getUniqueID returns a base64 encoded unique ID that can be used for chunk/ack
// mechanism, see
// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#option
func getUniqueID(timeUnix int64) (string, error) {
buf := bytes.NewBuffer(nil)
enc := base64.NewEncoder(base64.StdEncoding, buf)
if err := binary.Write(enc, binary.LittleEndian, timeUnix); err != nil {
enc.Close()
return "", err
}
if err := binary.Write(enc, binary.LittleEndian, rand.Uint64()); err != nil {
enc.Close()
return "", err
}
// encoder needs to be closed before buf.String(), defer does not work
// here
enc.Close()
return buf.String(), nil
}
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) {
option := make(map[string]string)
msg = &msgToSend{}
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data []byte, err error) {
timeUnix := tm.Unix()
if f.Config.RequestAck {
var err error
msg.ack, err = getUniqueID(timeUnix)
if err != nil {
return nil, err
}
option["chunk"] = msg.ack
}
if f.Config.MarshalAsJSON {
m := Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
chunk := &MessageChunk{message: m}
msg.data, err = json.Marshal(chunk)
} else if f.Config.SubSecondPrecision {
m := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message, Option: option}
msg.data, err = m.MarshalMsg(nil)
msg := Message{Tag: tag, Time: timeUnix, Record: message}
chunk := &MessageChunk{message: msg}
data, err = json.Marshal(chunk)
} else {
m := &Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
msg.data, err = m.MarshalMsg(nil)
msg := &Message{Tag: tag, Time: timeUnix, Record: message}
data, err = msg.MarshalMsg(nil)
}
return
}
// Close closes the connection, waiting for pending logs to be sent
// Close closes the connection.
func (f *Fluent) Close() (err error) {
if f.Config.Async {
if f.Config.ForceStopAsyncSend {
f.stopRunning <- true
close(f.stopRunning)
}
close(f.pending)
f.wg.Wait()
if len(f.pending) > 0 {
err = f.send()
}
f.close(f.conn)
f.close()
return
}
// appendBuffer appends data to buffer with lock.
func (f *Fluent) appendBuffer(msg *msgToSend) error {
select {
case f.pending <- msg:
default:
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
func (f *Fluent) appendBuffer(data []byte) error {
f.mubuff.Lock()
defer f.mubuff.Unlock()
if len(f.pending)+len(data) > f.Config.BufferLimit {
return errors.New(fmt.Sprintf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit))
}
f.pending = append(f.pending, data...)
return nil
}
// close closes the connection.
func (f *Fluent) close(c net.Conn) {
func (f *Fluent) close() {
f.muconn.Lock()
if f.conn != nil && f.conn == c {
if f.conn != nil {
f.conn.Close()
f.conn = nil
}
@@ -340,6 +242,8 @@ func (f *Fluent) close(c net.Conn) {
// connect establishes a new connection using the specified transport.
func (f *Fluent) connect() (err error) {
f.muconn.Lock()
defer f.muconn.Unlock()
switch f.Config.FluentNetwork {
case "tcp":
@@ -347,101 +251,59 @@ func (f *Fluent) connect() (err error) {
case "unix":
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentSocketPath, f.Config.Timeout)
default:
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
err = net.UnknownNetworkError(f.Config.FluentNetwork)
}
return err
}
func (f *Fluent) run() {
drainEvents := false
var emitEventDrainMsg sync.Once
for {
select {
case entry, ok := <-f.pending:
if !ok {
f.wg.Done()
return
}
if drainEvents {
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
continue
}
err := f.write(entry)
if err != nil {
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
}
}
select {
case stopRunning, ok := <-f.stopRunning:
if stopRunning || !ok {
drainEvents = true
}
default:
}
if err == nil {
f.reconnecting = false
}
return
}
func e(x, y float64) int {
return int(math.Pow(x, y))
}
func (f *Fluent) write(msg *msgToSend) error {
var c net.Conn
for i := 0; i < f.Config.MaxRetry; i++ {
c = f.conn
// Connect if needed
if c == nil {
f.muconn.Lock()
if f.conn == nil {
err := f.connect()
if err != nil {
f.muconn.Unlock()
func (f *Fluent) reconnect() {
for i := 0; ; i++ {
err := f.connect()
if err == nil {
f.send()
return
}
if i == f.Config.MaxRetry {
// TODO: What we can do when connection failed MaxRetry times?
panic("fluent#reconnect: failed to reconnect!")
}
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
time.Sleep(time.Duration(waitTime) * time.Millisecond)
}
}
if _, ok := err.(*ErrUnknownNetwork); ok {
// do not retry on unknown network error
break
}
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
if waitTime > f.Config.MaxRetryWait {
waitTime = f.Config.MaxRetryWait
}
time.Sleep(time.Duration(waitTime) * time.Millisecond)
continue
}
}
c = f.conn
f.muconn.Unlock()
}
func (f *Fluent) send() error {
f.muconn.Lock()
defer f.muconn.Unlock()
// We're connected, write msg
t := f.Config.WriteTimeout
if time.Duration(0) < t {
c.SetWriteDeadline(time.Now().Add(t))
} else {
c.SetWriteDeadline(time.Time{})
}
_, err := c.Write(msg.data)
if err != nil {
f.close(c)
} else {
// Acknowledgment check
if msg.ack != "" {
resp := &AckResp{}
if f.Config.MarshalAsJSON {
dec := json.NewDecoder(c)
err = dec.Decode(resp)
} else {
r := msgp.NewReader(c)
err = resp.DecodeMsg(r)
}
if err != nil || resp.Ack != msg.ack {
f.close(c)
continue
}
}
return err
if f.conn == nil {
if f.reconnecting == false {
f.reconnecting = true
go f.reconnect()
}
return errors.New("fluent#send: can't send logs, client is reconnecting")
}
return fmt.Errorf("fluent#write: failed to reconnect, max retry: %v", f.Config.MaxRetry)
f.mubuff.Lock()
defer f.mubuff.Unlock()
var err error
if len(f.pending) > 0 {
_, err = f.conn.Write(f.pending)
if err != nil {
f.conn.Close()
f.conn = nil
} else {
f.pending = f.pending[:0]
}
}
return err
}

View File

@@ -2,13 +2,6 @@
package fluent
import (
"fmt"
"time"
"github.com/tinylib/msgp/msgp"
)
//msgp:tuple Entry
type Entry struct {
Time int64 `msg:"time"`
@@ -17,9 +10,9 @@ type Entry struct {
//msgp:tuple Forward
type Forward struct {
Tag string `msg:"tag"`
Entries []Entry `msg:"entries"`
Option map[string]string
Tag string `msg:"tag"`
Entries []Entry `msg:"entries"`
Option interface{} `msg:"option"`
}
//msgp:tuple Message
@@ -27,86 +20,5 @@ type Message struct {
Tag string `msg:"tag"`
Time int64 `msg:"time"`
Record interface{} `msg:"record"`
Option map[string]string
}
//msgp:tuple MessageExt
type MessageExt struct {
Tag string `msg:"tag"`
Time EventTime `msg:"time,extension"`
Record interface{} `msg:"record"`
Option map[string]string
}
type AckResp struct {
Ack string `json:"ack" msg:"ack"`
}
// EventTime is an extension to the serialized time value. It builds in support
// for sub-second (nanosecond) precision in serialized timestamps.
//
// You can find the full specification for the msgpack message payload here:
// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.
//
// You can find more information on msgpack extension types here:
// https://github.com/tinylib/msgp/wiki/Using-Extensions.
type EventTime time.Time
const (
extensionType = 0
length = 8
)
func init() {
msgp.RegisterExtension(extensionType, func() msgp.Extension { return new(EventTime) })
}
func (t *EventTime) ExtensionType() int8 { return extensionType }
func (t *EventTime) Len() int { return length }
func (t *EventTime) MarshalBinaryTo(b []byte) error {
// Unwrap to Golang time
goTime := time.Time(*t)
// There's no support for timezones in fluentd's protocol for EventTime.
// Convert to UTC.
utc := goTime.UTC()
// Warning! Converting seconds to an int32 is a lossy operation. This code
// will hit the "Year 2038" problem.
sec := int32(utc.Unix())
nsec := utc.Nanosecond()
// Fill the buffer with 4 bytes for the second component of the timestamp.
b[0] = byte(sec >> 24)
b[1] = byte(sec >> 16)
b[2] = byte(sec >> 8)
b[3] = byte(sec)
// Fill the buffer with 4 bytes for the nanosecond component of the
// timestamp.
b[4] = byte(nsec >> 24)
b[5] = byte(nsec >> 16)
b[6] = byte(nsec >> 8)
b[7] = byte(nsec)
return nil
}
// Although decoding messages is not officially supported by this library,
// UnmarshalBinary is implemented for testing and general completeness.
func (t *EventTime) UnmarshalBinary(b []byte) error {
if len(b) != length {
return fmt.Errorf("Invalid EventTime byte length: %d", len(b))
}
sec := (int32(b[0]) << 24) | (int32(b[1]) << 16)
sec = sec | (int32(b[2]) << 8) | int32(b[3])
nsec := (int32(b[4]) << 24) | (int32(b[5]) << 16)
nsec = nsec | (int32(b[6]) << 8) | int32(b[7])
*t = EventTime(time.Unix(int64(sec), int64(nsec)))
return nil
Option interface{} `msg:"option"`
}

View File

@@ -1,134 +1,30 @@
package fluent
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
// NOTE: THIS FILE WAS PRODUCED BY THE
// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp)
// DO NOT EDIT
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *AckResp) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "ack":
z.Ack, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Ack")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z AckResp) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1
// write "ack"
err = en.Append(0x81, 0xa3, 0x61, 0x63, 0x6b)
if err != nil {
return
}
err = en.WriteString(z.Ack)
if err != nil {
err = msgp.WrapError(err, "Ack")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z AckResp) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 1
// string "ack"
o = append(o, 0x81, 0xa3, 0x61, 0x63, 0x6b)
o = msgp.AppendString(o, z.Ack)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *AckResp) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "ack":
z.Ack, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Ack")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z AckResp) Msgsize() (s int) {
s = 1 + 4 + msgp.StringPrefixSize + len(z.Ack)
return
}
// DecodeMsg implements msgp.Decodable
func (z *Entry) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0001 uint32
zb0001, err = dc.ReadArrayHeader()
var ssz uint32
ssz, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
if zb0001 != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zb0001}
if ssz != 2 {
err = msgp.ArrayError{Wanted: 2, Got: ssz}
return
}
z.Time, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
z.Record, err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
return
@@ -136,19 +32,16 @@ func (z *Entry) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z Entry) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 2
err = en.Append(0x92)
err = en.WriteArrayHeader(2)
if err != nil {
return
}
err = en.WriteInt64(z.Time)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
err = en.WriteIntf(z.Record)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
return
@@ -157,12 +50,10 @@ func (z Entry) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z Entry) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// array header, size 2
o = append(o, 0x92)
o = msgp.AppendArrayHeader(o, 2)
o = msgp.AppendInt64(o, z.Time)
o, err = msgp.AppendIntf(o, z.Record)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
return
@@ -170,682 +61,312 @@ func (z Entry) MarshalMsg(b []byte) (o []byte, err error) {
// UnmarshalMsg implements msgp.Unmarshaler
func (z *Entry) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zb0001 uint32
zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if zb0001 != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zb0001}
return
{
var ssz uint32
ssz, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if ssz != 2 {
err = msgp.ArrayError{Wanted: 2, Got: ssz}
return
}
}
z.Time, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
z.Record, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z Entry) Msgsize() (s int) {
s = 1 + msgp.Int64Size + msgp.GuessSize(z.Record)
s = msgp.ArrayHeaderSize + msgp.Int64Size + msgp.GuessSize(z.Record)
return
}
// DecodeMsg implements msgp.Decodable
func (z *Forward) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0001 uint32
zb0001, err = dc.ReadArrayHeader()
var ssz uint32
ssz, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
if zb0001 != 3 {
err = msgp.ArrayError{Wanted: 3, Got: zb0001}
if ssz != 3 {
err = msgp.ArrayError{Wanted: 3, Got: ssz}
return
}
z.Tag, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
var xsz uint32
xsz, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "Entries")
return
}
if cap(z.Entries) >= int(zb0002) {
z.Entries = (z.Entries)[:zb0002]
if cap(z.Entries) >= int(xsz) {
z.Entries = z.Entries[:xsz]
} else {
z.Entries = make([]Entry, zb0002)
z.Entries = make([]Entry, xsz)
}
for za0001 := range z.Entries {
var zb0003 uint32
zb0003, err = dc.ReadArrayHeader()
for xvk := range z.Entries {
var ssz uint32
ssz, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "Entries", za0001)
return
}
if zb0003 != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zb0003}
if ssz != 2 {
err = msgp.ArrayError{Wanted: 2, Got: ssz}
return
}
z.Entries[za0001].Time, err = dc.ReadInt64()
z.Entries[xvk].Time, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Time")
return
}
z.Entries[za0001].Record, err = dc.ReadIntf()
z.Entries[xvk].Record, err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Record")
return
}
}
var zb0004 uint32
zb0004, err = dc.ReadMapHeader()
z.Option, err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
if z.Option == nil {
z.Option = make(map[string]string, zb0004)
} else if len(z.Option) > 0 {
for key := range z.Option {
delete(z.Option, key)
}
}
for zb0004 > 0 {
zb0004--
var za0002 string
var za0003 string
za0002, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
za0003, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Option", za0002)
return
}
z.Option[za0002] = za0003
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *Forward) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 3
err = en.Append(0x93)
err = en.WriteArrayHeader(3)
if err != nil {
return
}
err = en.WriteString(z.Tag)
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
err = en.WriteArrayHeader(uint32(len(z.Entries)))
if err != nil {
err = msgp.WrapError(err, "Entries")
return
}
for za0001 := range z.Entries {
// array header, size 2
err = en.Append(0x92)
for xvk := range z.Entries {
err = en.WriteArrayHeader(2)
if err != nil {
return
}
err = en.WriteInt64(z.Entries[za0001].Time)
err = en.WriteInt64(z.Entries[xvk].Time)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Time")
return
}
err = en.WriteIntf(z.Entries[za0001].Record)
err = en.WriteIntf(z.Entries[xvk].Record)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Record")
return
}
}
err = en.WriteMapHeader(uint32(len(z.Option)))
err = en.WriteIntf(z.Option)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
for za0002, za0003 := range z.Option {
err = en.WriteString(za0002)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
err = en.WriteString(za0003)
if err != nil {
err = msgp.WrapError(err, "Option", za0002)
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *Forward) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// array header, size 3
o = append(o, 0x93)
o = msgp.AppendArrayHeader(o, 3)
o = msgp.AppendString(o, z.Tag)
o = msgp.AppendArrayHeader(o, uint32(len(z.Entries)))
for za0001 := range z.Entries {
// array header, size 2
o = append(o, 0x92)
o = msgp.AppendInt64(o, z.Entries[za0001].Time)
o, err = msgp.AppendIntf(o, z.Entries[za0001].Record)
for xvk := range z.Entries {
o = msgp.AppendArrayHeader(o, 2)
o = msgp.AppendInt64(o, z.Entries[xvk].Time)
o, err = msgp.AppendIntf(o, z.Entries[xvk].Record)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Record")
return
}
}
o = msgp.AppendMapHeader(o, uint32(len(z.Option)))
for za0002, za0003 := range z.Option {
o = msgp.AppendString(o, za0002)
o = msgp.AppendString(o, za0003)
o, err = msgp.AppendIntf(o, z.Option)
if err != nil {
return
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *Forward) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zb0001 uint32
zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if zb0001 != 3 {
err = msgp.ArrayError{Wanted: 3, Got: zb0001}
return
{
var ssz uint32
ssz, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if ssz != 3 {
err = msgp.ArrayError{Wanted: 3, Got: ssz}
return
}
}
z.Tag, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
var xsz uint32
xsz, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries")
return
}
if cap(z.Entries) >= int(zb0002) {
z.Entries = (z.Entries)[:zb0002]
if cap(z.Entries) >= int(xsz) {
z.Entries = z.Entries[:xsz]
} else {
z.Entries = make([]Entry, zb0002)
z.Entries = make([]Entry, xsz)
}
for za0001 := range z.Entries {
var zb0003 uint32
zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts)
for xvk := range z.Entries {
{
var ssz uint32
ssz, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if ssz != 2 {
err = msgp.ArrayError{Wanted: 2, Got: ssz}
return
}
}
z.Entries[xvk].Time, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001)
return
}
if zb0003 != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zb0003}
return
}
z.Entries[za0001].Time, bts, err = msgp.ReadInt64Bytes(bts)
z.Entries[xvk].Record, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Time")
return
}
z.Entries[za0001].Record, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Entries", za0001, "Record")
return
}
}
var zb0004 uint32
zb0004, bts, err = msgp.ReadMapHeaderBytes(bts)
z.Option, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
if z.Option == nil {
z.Option = make(map[string]string, zb0004)
} else if len(z.Option) > 0 {
for key := range z.Option {
delete(z.Option, key)
}
}
for zb0004 > 0 {
var za0002 string
var za0003 string
zb0004--
za0002, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
za0003, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option", za0002)
return
}
z.Option[za0002] = za0003
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *Forward) Msgsize() (s int) {
s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ArrayHeaderSize
for za0001 := range z.Entries {
s += 1 + msgp.Int64Size + msgp.GuessSize(z.Entries[za0001].Record)
}
s += msgp.MapHeaderSize
if z.Option != nil {
for za0002, za0003 := range z.Option {
_ = za0003
s += msgp.StringPrefixSize + len(za0002) + msgp.StringPrefixSize + len(za0003)
}
s = msgp.ArrayHeaderSize + msgp.StringPrefixSize + len(z.Tag) + msgp.ArrayHeaderSize
for xvk := range z.Entries {
s += msgp.ArrayHeaderSize + msgp.Int64Size + msgp.GuessSize(z.Entries[xvk].Record)
}
s += msgp.GuessSize(z.Option)
return
}
// DecodeMsg implements msgp.Decodable
func (z *Message) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0001 uint32
zb0001, err = dc.ReadArrayHeader()
var ssz uint32
ssz, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
if zb0001 != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zb0001}
if ssz != 4 {
err = msgp.ArrayError{Wanted: 4, Got: ssz}
return
}
z.Tag, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
z.Time, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
z.Record, err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
z.Option, err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
if z.Option == nil {
z.Option = make(map[string]string, zb0002)
} else if len(z.Option) > 0 {
for key := range z.Option {
delete(z.Option, key)
}
}
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 string
za0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
za0002, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Option", za0001)
return
}
z.Option[za0001] = za0002
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *Message) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 4
err = en.Append(0x94)
err = en.WriteArrayHeader(4)
if err != nil {
return
}
err = en.WriteString(z.Tag)
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
err = en.WriteInt64(z.Time)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
err = en.WriteIntf(z.Record)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
err = en.WriteMapHeader(uint32(len(z.Option)))
err = en.WriteIntf(z.Option)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
for za0001, za0002 := range z.Option {
err = en.WriteString(za0001)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
err = en.WriteString(za0002)
if err != nil {
err = msgp.WrapError(err, "Option", za0001)
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *Message) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// array header, size 4
o = append(o, 0x94)
o = msgp.AppendArrayHeader(o, 4)
o = msgp.AppendString(o, z.Tag)
o = msgp.AppendInt64(o, z.Time)
o, err = msgp.AppendIntf(o, z.Record)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
o = msgp.AppendMapHeader(o, uint32(len(z.Option)))
for za0001, za0002 := range z.Option {
o = msgp.AppendString(o, za0001)
o = msgp.AppendString(o, za0002)
o, err = msgp.AppendIntf(o, z.Option)
if err != nil {
return
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *Message) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zb0001 uint32
zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if zb0001 != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zb0001}
return
{
var ssz uint32
ssz, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if ssz != 4 {
err = msgp.ArrayError{Wanted: 4, Got: ssz}
return
}
}
z.Tag, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
z.Time, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
z.Record, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
z.Option, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
if z.Option == nil {
z.Option = make(map[string]string, zb0002)
} else if len(z.Option) > 0 {
for key := range z.Option {
delete(z.Option, key)
}
}
for zb0002 > 0 {
var za0001 string
var za0002 string
zb0002--
za0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
za0002, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option", za0001)
return
}
z.Option[za0001] = za0002
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *Message) Msgsize() (s int) {
s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.Int64Size + msgp.GuessSize(z.Record) + msgp.MapHeaderSize
if z.Option != nil {
for za0001, za0002 := range z.Option {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002)
}
}
return
}
// DecodeMsg implements msgp.Decodable
func (z *MessageExt) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0001 uint32
zb0001, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
if zb0001 != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zb0001}
return
}
z.Tag, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
err = dc.ReadExtension(&z.Time)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
z.Record, err = dc.ReadIntf()
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
if z.Option == nil {
z.Option = make(map[string]string, zb0002)
} else if len(z.Option) > 0 {
for key := range z.Option {
delete(z.Option, key)
}
}
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 string
za0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
za0002, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Option", za0001)
return
}
z.Option[za0001] = za0002
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *MessageExt) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 4
err = en.Append(0x94)
if err != nil {
return
}
err = en.WriteString(z.Tag)
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
err = en.WriteExtension(&z.Time)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
err = en.WriteIntf(z.Record)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
err = en.WriteMapHeader(uint32(len(z.Option)))
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
for za0001, za0002 := range z.Option {
err = en.WriteString(za0001)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
err = en.WriteString(za0002)
if err != nil {
err = msgp.WrapError(err, "Option", za0001)
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *MessageExt) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// array header, size 4
o = append(o, 0x94)
o = msgp.AppendString(o, z.Tag)
o, err = msgp.AppendExtension(o, &z.Time)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
o, err = msgp.AppendIntf(o, z.Record)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
o = msgp.AppendMapHeader(o, uint32(len(z.Option)))
for za0001, za0002 := range z.Option {
o = msgp.AppendString(o, za0001)
o = msgp.AppendString(o, za0002)
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MessageExt) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zb0001 uint32
zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if zb0001 != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zb0001}
return
}
z.Tag, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Tag")
return
}
bts, err = msgp.ReadExtensionBytes(bts, &z.Time)
if err != nil {
err = msgp.WrapError(err, "Time")
return
}
z.Record, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Record")
return
}
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
if z.Option == nil {
z.Option = make(map[string]string, zb0002)
} else if len(z.Option) > 0 {
for key := range z.Option {
delete(z.Option, key)
}
}
for zb0002 > 0 {
var za0001 string
var za0002 string
zb0002--
za0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option")
return
}
za0002, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Option", za0001)
return
}
z.Option[za0001] = za0002
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *MessageExt) Msgsize() (s int) {
s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ExtensionPrefixSize + z.Time.Len() + msgp.GuessSize(z.Record) + msgp.MapHeaderSize
if z.Option != nil {
for za0001, za0002 := range z.Option {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002)
}
}
s = msgp.ArrayHeaderSize + msgp.StringPrefixSize + len(z.Tag) + msgp.Int64Size + msgp.GuessSize(z.Record) + msgp.GuessSize(z.Option)
return
}

View File

@@ -1,7 +0,0 @@
package fluent
//go:generate msgp
type TestMessage struct {
Foo string `msg:"foo" json:"foo,omitempty"`
Hoge string `msg:"hoge" json:"hoge,omitempty"`
}

View File

@@ -1,135 +0,0 @@
package fluent
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *TestMessage) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "foo":
z.Foo, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Foo")
return
}
case "hoge":
z.Hoge, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Hoge")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z TestMessage) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "foo"
err = en.Append(0x82, 0xa3, 0x66, 0x6f, 0x6f)
if err != nil {
return
}
err = en.WriteString(z.Foo)
if err != nil {
err = msgp.WrapError(err, "Foo")
return
}
// write "hoge"
err = en.Append(0xa4, 0x68, 0x6f, 0x67, 0x65)
if err != nil {
return
}
err = en.WriteString(z.Hoge)
if err != nil {
err = msgp.WrapError(err, "Hoge")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z TestMessage) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "foo"
o = append(o, 0x82, 0xa3, 0x66, 0x6f, 0x6f)
o = msgp.AppendString(o, z.Foo)
// string "hoge"
o = append(o, 0xa4, 0x68, 0x6f, 0x67, 0x65)
o = msgp.AppendString(o, z.Hoge)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *TestMessage) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "foo":
z.Foo, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Foo")
return
}
case "hoge":
z.Hoge, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Hoge")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z TestMessage) Msgsize() (s int) {
s = 1 + 4 + msgp.StringPrefixSize + len(z.Foo) + 5 + msgp.StringPrefixSize + len(z.Hoge)
return
}

View File

@@ -1,3 +1,3 @@
package fluent
const Version = "1.4.0"
const Version = "1.2.1"

2
vendor/modules.txt vendored
View File

@@ -116,7 +116,7 @@ github.com/dustin/go-humanize
github.com/evanphx/json-patch
# github.com/fatih/camelcase v1.0.0
github.com/fatih/camelcase
# github.com/fluent/fluent-logger-golang v1.5.0
# github.com/fluent/fluent-logger-golang v1.5.0 => github.com/fluent/fluent-logger-golang v1.2.1
github.com/fluent/fluent-logger-golang/fluent
# github.com/fsouza/go-dockerclient v1.3.0
github.com/fsouza/go-dockerclient