message.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package eventstream
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "hash/crc32"
  6. )
  7. const preludeLen = 8
  8. const preludeCRCLen = 4
  9. const msgCRCLen = 4
  10. const minMsgLen = preludeLen + preludeCRCLen + msgCRCLen
  11. const maxPayloadLen = 1024 * 1024 * 16 // 16MB
  12. const maxHeadersLen = 1024 * 128 // 128KB
  13. const maxMsgLen = minMsgLen + maxHeadersLen + maxPayloadLen
  14. var crc32IEEETable = crc32.MakeTable(crc32.IEEE)
  15. // A Message provides the eventstream message representation.
  16. type Message struct {
  17. Headers Headers
  18. Payload []byte
  19. }
  20. func (m *Message) rawMessage() (rawMessage, error) {
  21. var raw rawMessage
  22. if len(m.Headers) > 0 {
  23. var headers bytes.Buffer
  24. if err := encodeHeaders(&headers, m.Headers); err != nil {
  25. return rawMessage{}, err
  26. }
  27. raw.Headers = headers.Bytes()
  28. raw.HeadersLen = uint32(len(raw.Headers))
  29. }
  30. raw.Length = raw.HeadersLen + uint32(len(m.Payload)) + minMsgLen
  31. hash := crc32.New(crc32IEEETable)
  32. binaryWriteFields(hash, binary.BigEndian, raw.Length, raw.HeadersLen)
  33. raw.PreludeCRC = hash.Sum32()
  34. binaryWriteFields(hash, binary.BigEndian, raw.PreludeCRC)
  35. if raw.HeadersLen > 0 {
  36. hash.Write(raw.Headers)
  37. }
  38. // Read payload bytes and update hash for it as well.
  39. if len(m.Payload) > 0 {
  40. raw.Payload = m.Payload
  41. hash.Write(raw.Payload)
  42. }
  43. raw.CRC = hash.Sum32()
  44. return raw, nil
  45. }
  46. type messagePrelude struct {
  47. Length uint32
  48. HeadersLen uint32
  49. PreludeCRC uint32
  50. }
  51. func (p messagePrelude) PayloadLen() uint32 {
  52. return p.Length - p.HeadersLen - minMsgLen
  53. }
  54. func (p messagePrelude) ValidateLens() error {
  55. if p.Length == 0 || p.Length > maxMsgLen {
  56. return LengthError{
  57. Part: "message prelude",
  58. Want: maxMsgLen,
  59. Have: int(p.Length),
  60. }
  61. }
  62. if p.HeadersLen > maxHeadersLen {
  63. return LengthError{
  64. Part: "message headers",
  65. Want: maxHeadersLen,
  66. Have: int(p.HeadersLen),
  67. }
  68. }
  69. if payloadLen := p.PayloadLen(); payloadLen > maxPayloadLen {
  70. return LengthError{
  71. Part: "message payload",
  72. Want: maxPayloadLen,
  73. Have: int(payloadLen),
  74. }
  75. }
  76. return nil
  77. }
  78. type rawMessage struct {
  79. messagePrelude
  80. Headers []byte
  81. Payload []byte
  82. CRC uint32
  83. }