| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- package eventstream
- import (
- "encoding/base64"
- "encoding/binary"
- "fmt"
- "io"
- "strconv"
- "time"
- )
- const maxHeaderValueLen = 1<<15 - 1 // 2^15-1 or 32KB - 1
- // valueType is the EventStream header value type.
- type valueType uint8
- // Header value types
- const (
- trueValueType valueType = iota
- falseValueType
- int8ValueType // Byte
- int16ValueType // Short
- int32ValueType // Integer
- int64ValueType // Long
- bytesValueType
- stringValueType
- timestampValueType
- uuidValueType
- )
- func (t valueType) String() string {
- switch t {
- case trueValueType:
- return "bool"
- case falseValueType:
- return "bool"
- case int8ValueType:
- return "int8"
- case int16ValueType:
- return "int16"
- case int32ValueType:
- return "int32"
- case int64ValueType:
- return "int64"
- case bytesValueType:
- return "byte_array"
- case stringValueType:
- return "string"
- case timestampValueType:
- return "timestamp"
- case uuidValueType:
- return "uuid"
- default:
- return fmt.Sprintf("unknown value type %d", uint8(t))
- }
- }
- type rawValue struct {
- Type valueType
- Len uint16 // Only set for variable length slices
- Value []byte // byte representation of value, BigEndian encoding.
- }
- func (r rawValue) encodeScalar(w io.Writer, v interface{}) error {
- return binaryWriteFields(w, binary.BigEndian,
- r.Type,
- v,
- )
- }
- func (r rawValue) encodeFixedSlice(w io.Writer, v []byte) error {
- binary.Write(w, binary.BigEndian, r.Type)
- _, err := w.Write(v)
- return err
- }
- func (r rawValue) encodeBytes(w io.Writer, v []byte) error {
- if len(v) > maxHeaderValueLen {
- return LengthError{
- Part: "header value",
- Want: maxHeaderValueLen, Have: len(v),
- Value: v,
- }
- }
- r.Len = uint16(len(v))
- err := binaryWriteFields(w, binary.BigEndian,
- r.Type,
- r.Len,
- )
- if err != nil {
- return err
- }
- _, err = w.Write(v)
- return err
- }
- func (r rawValue) encodeString(w io.Writer, v string) error {
- if len(v) > maxHeaderValueLen {
- return LengthError{
- Part: "header value",
- Want: maxHeaderValueLen, Have: len(v),
- Value: v,
- }
- }
- r.Len = uint16(len(v))
- type stringWriter interface {
- WriteString(string) (int, error)
- }
- err := binaryWriteFields(w, binary.BigEndian,
- r.Type,
- r.Len,
- )
- if err != nil {
- return err
- }
- if sw, ok := w.(stringWriter); ok {
- _, err = sw.WriteString(v)
- } else {
- _, err = w.Write([]byte(v))
- }
- return err
- }
- func decodeFixedBytesValue(r io.Reader, buf []byte) error {
- _, err := io.ReadFull(r, buf)
- return err
- }
- func decodeBytesValue(r io.Reader) ([]byte, error) {
- var raw rawValue
- var err error
- raw.Len, err = decodeUint16(r)
- if err != nil {
- return nil, err
- }
- buf := make([]byte, raw.Len)
- _, err = io.ReadFull(r, buf)
- if err != nil {
- return nil, err
- }
- return buf, nil
- }
- func decodeStringValue(r io.Reader) (string, error) {
- v, err := decodeBytesValue(r)
- return string(v), err
- }
- // Value represents the abstract header value.
- type Value interface {
- Get() interface{}
- String() string
- valueType() valueType
- encode(io.Writer) error
- }
- // An BoolValue provides eventstream encoding, and representation
- // of a Go bool value.
- type BoolValue bool
- // Get returns the underlying type
- func (v BoolValue) Get() interface{} {
- return bool(v)
- }
- // valueType returns the EventStream header value type value.
- func (v BoolValue) valueType() valueType {
- if v {
- return trueValueType
- }
- return falseValueType
- }
- func (v BoolValue) String() string {
- return strconv.FormatBool(bool(v))
- }
- // encode encodes the BoolValue into an eventstream binary value
- // representation.
- func (v BoolValue) encode(w io.Writer) error {
- return binary.Write(w, binary.BigEndian, v.valueType())
- }
- // An Int8Value provides eventstream encoding, and representation of a Go
- // int8 value.
- type Int8Value int8
- // Get returns the underlying value.
- func (v Int8Value) Get() interface{} {
- return int8(v)
- }
- // valueType returns the EventStream header value type value.
- func (Int8Value) valueType() valueType {
- return int8ValueType
- }
- func (v Int8Value) String() string {
- return fmt.Sprintf("0x%02x", int8(v))
- }
- // encode encodes the Int8Value into an eventstream binary value
- // representation.
- func (v Int8Value) encode(w io.Writer) error {
- raw := rawValue{
- Type: v.valueType(),
- }
- return raw.encodeScalar(w, v)
- }
- func (v *Int8Value) decode(r io.Reader) error {
- n, err := decodeUint8(r)
- if err != nil {
- return err
- }
- *v = Int8Value(n)
- return nil
- }
- // An Int16Value provides eventstream encoding, and representation of a Go
- // int16 value.
- type Int16Value int16
- // Get returns the underlying value.
- func (v Int16Value) Get() interface{} {
- return int16(v)
- }
- // valueType returns the EventStream header value type value.
- func (Int16Value) valueType() valueType {
- return int16ValueType
- }
- func (v Int16Value) String() string {
- return fmt.Sprintf("0x%04x", int16(v))
- }
- // encode encodes the Int16Value into an eventstream binary value
- // representation.
- func (v Int16Value) encode(w io.Writer) error {
- raw := rawValue{
- Type: v.valueType(),
- }
- return raw.encodeScalar(w, v)
- }
- func (v *Int16Value) decode(r io.Reader) error {
- n, err := decodeUint16(r)
- if err != nil {
- return err
- }
- *v = Int16Value(n)
- return nil
- }
- // An Int32Value provides eventstream encoding, and representation of a Go
- // int32 value.
- type Int32Value int32
- // Get returns the underlying value.
- func (v Int32Value) Get() interface{} {
- return int32(v)
- }
- // valueType returns the EventStream header value type value.
- func (Int32Value) valueType() valueType {
- return int32ValueType
- }
- func (v Int32Value) String() string {
- return fmt.Sprintf("0x%08x", int32(v))
- }
- // encode encodes the Int32Value into an eventstream binary value
- // representation.
- func (v Int32Value) encode(w io.Writer) error {
- raw := rawValue{
- Type: v.valueType(),
- }
- return raw.encodeScalar(w, v)
- }
- func (v *Int32Value) decode(r io.Reader) error {
- n, err := decodeUint32(r)
- if err != nil {
- return err
- }
- *v = Int32Value(n)
- return nil
- }
- // An Int64Value provides eventstream encoding, and representation of a Go
- // int64 value.
- type Int64Value int64
- // Get returns the underlying value.
- func (v Int64Value) Get() interface{} {
- return int64(v)
- }
- // valueType returns the EventStream header value type value.
- func (Int64Value) valueType() valueType {
- return int64ValueType
- }
- func (v Int64Value) String() string {
- return fmt.Sprintf("0x%016x", int64(v))
- }
- // encode encodes the Int64Value into an eventstream binary value
- // representation.
- func (v Int64Value) encode(w io.Writer) error {
- raw := rawValue{
- Type: v.valueType(),
- }
- return raw.encodeScalar(w, v)
- }
- func (v *Int64Value) decode(r io.Reader) error {
- n, err := decodeUint64(r)
- if err != nil {
- return err
- }
- *v = Int64Value(n)
- return nil
- }
- // An BytesValue provides eventstream encoding, and representation of a Go
- // byte slice.
- type BytesValue []byte
- // Get returns the underlying value.
- func (v BytesValue) Get() interface{} {
- return []byte(v)
- }
- // valueType returns the EventStream header value type value.
- func (BytesValue) valueType() valueType {
- return bytesValueType
- }
- func (v BytesValue) String() string {
- return base64.StdEncoding.EncodeToString([]byte(v))
- }
- // encode encodes the BytesValue into an eventstream binary value
- // representation.
- func (v BytesValue) encode(w io.Writer) error {
- raw := rawValue{
- Type: v.valueType(),
- }
- return raw.encodeBytes(w, []byte(v))
- }
- func (v *BytesValue) decode(r io.Reader) error {
- buf, err := decodeBytesValue(r)
- if err != nil {
- return err
- }
- *v = BytesValue(buf)
- return nil
- }
- // An StringValue provides eventstream encoding, and representation of a Go
- // string.
- type StringValue string
- // Get returns the underlying value.
- func (v StringValue) Get() interface{} {
- return string(v)
- }
- // valueType returns the EventStream header value type value.
- func (StringValue) valueType() valueType {
- return stringValueType
- }
- func (v StringValue) String() string {
- return string(v)
- }
- // encode encodes the StringValue into an eventstream binary value
- // representation.
- func (v StringValue) encode(w io.Writer) error {
- raw := rawValue{
- Type: v.valueType(),
- }
- return raw.encodeString(w, string(v))
- }
- func (v *StringValue) decode(r io.Reader) error {
- s, err := decodeStringValue(r)
- if err != nil {
- return err
- }
- *v = StringValue(s)
- return nil
- }
- // An TimestampValue provides eventstream encoding, and representation of a Go
- // timestamp.
- type TimestampValue time.Time
- // Get returns the underlying value.
- func (v TimestampValue) Get() interface{} {
- return time.Time(v)
- }
- // valueType returns the EventStream header value type value.
- func (TimestampValue) valueType() valueType {
- return timestampValueType
- }
- func (v TimestampValue) epochMilli() int64 {
- nano := time.Time(v).UnixNano()
- msec := nano / int64(time.Millisecond)
- return msec
- }
- func (v TimestampValue) String() string {
- msec := v.epochMilli()
- return strconv.FormatInt(msec, 10)
- }
- // encode encodes the TimestampValue into an eventstream binary value
- // representation.
- func (v TimestampValue) encode(w io.Writer) error {
- raw := rawValue{
- Type: v.valueType(),
- }
- msec := v.epochMilli()
- return raw.encodeScalar(w, msec)
- }
- func (v *TimestampValue) decode(r io.Reader) error {
- n, err := decodeUint64(r)
- if err != nil {
- return err
- }
- *v = TimestampValue(timeFromEpochMilli(int64(n)))
- return nil
- }
- func timeFromEpochMilli(t int64) time.Time {
- secs := t / 1e3
- msec := t % 1e3
- return time.Unix(secs, msec*int64(time.Millisecond)).UTC()
- }
- // An UUIDValue provides eventstream encoding, and representation of a UUID
- // value.
- type UUIDValue [16]byte
- // Get returns the underlying value.
- func (v UUIDValue) Get() interface{} {
- return v[:]
- }
- // valueType returns the EventStream header value type value.
- func (UUIDValue) valueType() valueType {
- return uuidValueType
- }
- func (v UUIDValue) String() string {
- return fmt.Sprintf(`%X-%X-%X-%X-%X`, v[0:4], v[4:6], v[6:8], v[8:10], v[10:])
- }
- // encode encodes the UUIDValue into an eventstream binary value
- // representation.
- func (v UUIDValue) encode(w io.Writer) error {
- raw := rawValue{
- Type: v.valueType(),
- }
- return raw.encodeFixedSlice(w, v[:])
- }
- func (v *UUIDValue) decode(r io.Reader) error {
- tv := (*v)[:]
- return decodeFixedBytesValue(r, tv)
- }
|