| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- package eventstream
- import (
- "bytes"
- "encoding/binary"
- "encoding/hex"
- "encoding/json"
- "fmt"
- "hash"
- "hash/crc32"
- "io"
- "github.com/aws/aws-sdk-go/aws"
- )
- // Decoder provides decoding of an Event Stream messages.
- type Decoder struct {
- r io.Reader
- logger aws.Logger
- }
- // NewDecoder initializes and returns a Decoder for decoding event
- // stream messages from the reader provided.
- func NewDecoder(r io.Reader) *Decoder {
- return &Decoder{
- r: r,
- }
- }
- // Decode attempts to decode a single message from the event stream reader.
- // Will return the event stream message, or error if Decode fails to read
- // the message from the stream.
- func (d *Decoder) Decode(payloadBuf []byte) (m Message, err error) {
- reader := d.r
- if d.logger != nil {
- debugMsgBuf := bytes.NewBuffer(nil)
- reader = io.TeeReader(reader, debugMsgBuf)
- defer func() {
- logMessageDecode(d.logger, debugMsgBuf, m, err)
- }()
- }
- crc := crc32.New(crc32IEEETable)
- hashReader := io.TeeReader(reader, crc)
- prelude, err := decodePrelude(hashReader, crc)
- if err != nil {
- return Message{}, err
- }
- if prelude.HeadersLen > 0 {
- lr := io.LimitReader(hashReader, int64(prelude.HeadersLen))
- m.Headers, err = decodeHeaders(lr)
- if err != nil {
- return Message{}, err
- }
- }
- if payloadLen := prelude.PayloadLen(); payloadLen > 0 {
- buf, err := decodePayload(payloadBuf, io.LimitReader(hashReader, int64(payloadLen)))
- if err != nil {
- return Message{}, err
- }
- m.Payload = buf
- }
- msgCRC := crc.Sum32()
- if err := validateCRC(reader, msgCRC); err != nil {
- return Message{}, err
- }
- return m, nil
- }
- // UseLogger specifies the Logger that that the decoder should use to log the
- // message decode to.
- func (d *Decoder) UseLogger(logger aws.Logger) {
- d.logger = logger
- }
- func logMessageDecode(logger aws.Logger, msgBuf *bytes.Buffer, msg Message, decodeErr error) {
- w := bytes.NewBuffer(nil)
- defer func() { logger.Log(w.String()) }()
- fmt.Fprintf(w, "Raw message:\n%s\n",
- hex.Dump(msgBuf.Bytes()))
- if decodeErr != nil {
- fmt.Fprintf(w, "Decode error: %v\n", decodeErr)
- return
- }
- rawMsg, err := msg.rawMessage()
- if err != nil {
- fmt.Fprintf(w, "failed to create raw message, %v\n", err)
- return
- }
- decodedMsg := decodedMessage{
- rawMessage: rawMsg,
- Headers: decodedHeaders(msg.Headers),
- }
- fmt.Fprintf(w, "Decoded message:\n")
- encoder := json.NewEncoder(w)
- if err := encoder.Encode(decodedMsg); err != nil {
- fmt.Fprintf(w, "failed to generate decoded message, %v\n", err)
- }
- }
- func decodePrelude(r io.Reader, crc hash.Hash32) (messagePrelude, error) {
- var p messagePrelude
- var err error
- p.Length, err = decodeUint32(r)
- if err != nil {
- return messagePrelude{}, err
- }
- p.HeadersLen, err = decodeUint32(r)
- if err != nil {
- return messagePrelude{}, err
- }
- if err := p.ValidateLens(); err != nil {
- return messagePrelude{}, err
- }
- preludeCRC := crc.Sum32()
- if err := validateCRC(r, preludeCRC); err != nil {
- return messagePrelude{}, err
- }
- p.PreludeCRC = preludeCRC
- return p, nil
- }
- func decodePayload(buf []byte, r io.Reader) ([]byte, error) {
- w := bytes.NewBuffer(buf[0:0])
- _, err := io.Copy(w, r)
- return w.Bytes(), err
- }
- func decodeUint8(r io.Reader) (uint8, error) {
- type byteReader interface {
- ReadByte() (byte, error)
- }
- if br, ok := r.(byteReader); ok {
- v, err := br.ReadByte()
- return uint8(v), err
- }
- var b [1]byte
- _, err := io.ReadFull(r, b[:])
- return uint8(b[0]), err
- }
- func decodeUint16(r io.Reader) (uint16, error) {
- var b [2]byte
- bs := b[:]
- _, err := io.ReadFull(r, bs)
- if err != nil {
- return 0, err
- }
- return binary.BigEndian.Uint16(bs), nil
- }
- func decodeUint32(r io.Reader) (uint32, error) {
- var b [4]byte
- bs := b[:]
- _, err := io.ReadFull(r, bs)
- if err != nil {
- return 0, err
- }
- return binary.BigEndian.Uint32(bs), nil
- }
- func decodeUint64(r io.Reader) (uint64, error) {
- var b [8]byte
- bs := b[:]
- _, err := io.ReadFull(r, bs)
- if err != nil {
- return 0, err
- }
- return binary.BigEndian.Uint64(bs), nil
- }
- func validateCRC(r io.Reader, expect uint32) error {
- msgCRC, err := decodeUint32(r)
- if err != nil {
- return err
- }
- if msgCRC != expect {
- return ChecksumError{}
- }
- return nil
- }
|