| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- package eventstream
- import (
- "bytes"
- "encoding/binary"
- "hash/crc32"
- )
- const preludeLen = 8
- const preludeCRCLen = 4
- const msgCRCLen = 4
- const minMsgLen = preludeLen + preludeCRCLen + msgCRCLen
- const maxPayloadLen = 1024 * 1024 * 16 // 16MB
- const maxHeadersLen = 1024 * 128 // 128KB
- const maxMsgLen = minMsgLen + maxHeadersLen + maxPayloadLen
- var crc32IEEETable = crc32.MakeTable(crc32.IEEE)
- // A Message provides the eventstream message representation.
- type Message struct {
- Headers Headers
- Payload []byte
- }
- func (m *Message) rawMessage() (rawMessage, error) {
- var raw rawMessage
- if len(m.Headers) > 0 {
- var headers bytes.Buffer
- if err := encodeHeaders(&headers, m.Headers); err != nil {
- return rawMessage{}, err
- }
- raw.Headers = headers.Bytes()
- raw.HeadersLen = uint32(len(raw.Headers))
- }
- raw.Length = raw.HeadersLen + uint32(len(m.Payload)) + minMsgLen
- hash := crc32.New(crc32IEEETable)
- binaryWriteFields(hash, binary.BigEndian, raw.Length, raw.HeadersLen)
- raw.PreludeCRC = hash.Sum32()
- binaryWriteFields(hash, binary.BigEndian, raw.PreludeCRC)
- if raw.HeadersLen > 0 {
- hash.Write(raw.Headers)
- }
- // Read payload bytes and update hash for it as well.
- if len(m.Payload) > 0 {
- raw.Payload = m.Payload
- hash.Write(raw.Payload)
- }
- raw.CRC = hash.Sum32()
- return raw, nil
- }
- type messagePrelude struct {
- Length uint32
- HeadersLen uint32
- PreludeCRC uint32
- }
- func (p messagePrelude) PayloadLen() uint32 {
- return p.Length - p.HeadersLen - minMsgLen
- }
- func (p messagePrelude) ValidateLens() error {
- if p.Length == 0 || p.Length > maxMsgLen {
- return LengthError{
- Part: "message prelude",
- Want: maxMsgLen,
- Have: int(p.Length),
- }
- }
- if p.HeadersLen > maxHeadersLen {
- return LengthError{
- Part: "message headers",
- Want: maxHeadersLen,
- Have: int(p.HeadersLen),
- }
- }
- if payloadLen := p.PayloadLen(); payloadLen > maxPayloadLen {
- return LengthError{
- Part: "message payload",
- Want: maxPayloadLen,
- Have: int(payloadLen),
- }
- }
- return nil
- }
- type rawMessage struct {
- messagePrelude
- Headers []byte
- Payload []byte
- CRC uint32
- }
|