header_value.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. package eventstream
  2. import (
  3. "encoding/base64"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "strconv"
  8. "time"
  9. )
  10. const maxHeaderValueLen = 1<<15 - 1 // 2^15-1 or 32KB - 1
  11. // valueType is the EventStream header value type.
  12. type valueType uint8
  13. // Header value types
  14. const (
  15. trueValueType valueType = iota
  16. falseValueType
  17. int8ValueType // Byte
  18. int16ValueType // Short
  19. int32ValueType // Integer
  20. int64ValueType // Long
  21. bytesValueType
  22. stringValueType
  23. timestampValueType
  24. uuidValueType
  25. )
  26. func (t valueType) String() string {
  27. switch t {
  28. case trueValueType:
  29. return "bool"
  30. case falseValueType:
  31. return "bool"
  32. case int8ValueType:
  33. return "int8"
  34. case int16ValueType:
  35. return "int16"
  36. case int32ValueType:
  37. return "int32"
  38. case int64ValueType:
  39. return "int64"
  40. case bytesValueType:
  41. return "byte_array"
  42. case stringValueType:
  43. return "string"
  44. case timestampValueType:
  45. return "timestamp"
  46. case uuidValueType:
  47. return "uuid"
  48. default:
  49. return fmt.Sprintf("unknown value type %d", uint8(t))
  50. }
  51. }
  52. type rawValue struct {
  53. Type valueType
  54. Len uint16 // Only set for variable length slices
  55. Value []byte // byte representation of value, BigEndian encoding.
  56. }
  57. func (r rawValue) encodeScalar(w io.Writer, v interface{}) error {
  58. return binaryWriteFields(w, binary.BigEndian,
  59. r.Type,
  60. v,
  61. )
  62. }
  63. func (r rawValue) encodeFixedSlice(w io.Writer, v []byte) error {
  64. binary.Write(w, binary.BigEndian, r.Type)
  65. _, err := w.Write(v)
  66. return err
  67. }
  68. func (r rawValue) encodeBytes(w io.Writer, v []byte) error {
  69. if len(v) > maxHeaderValueLen {
  70. return LengthError{
  71. Part: "header value",
  72. Want: maxHeaderValueLen, Have: len(v),
  73. Value: v,
  74. }
  75. }
  76. r.Len = uint16(len(v))
  77. err := binaryWriteFields(w, binary.BigEndian,
  78. r.Type,
  79. r.Len,
  80. )
  81. if err != nil {
  82. return err
  83. }
  84. _, err = w.Write(v)
  85. return err
  86. }
  87. func (r rawValue) encodeString(w io.Writer, v string) error {
  88. if len(v) > maxHeaderValueLen {
  89. return LengthError{
  90. Part: "header value",
  91. Want: maxHeaderValueLen, Have: len(v),
  92. Value: v,
  93. }
  94. }
  95. r.Len = uint16(len(v))
  96. type stringWriter interface {
  97. WriteString(string) (int, error)
  98. }
  99. err := binaryWriteFields(w, binary.BigEndian,
  100. r.Type,
  101. r.Len,
  102. )
  103. if err != nil {
  104. return err
  105. }
  106. if sw, ok := w.(stringWriter); ok {
  107. _, err = sw.WriteString(v)
  108. } else {
  109. _, err = w.Write([]byte(v))
  110. }
  111. return err
  112. }
  113. func decodeFixedBytesValue(r io.Reader, buf []byte) error {
  114. _, err := io.ReadFull(r, buf)
  115. return err
  116. }
  117. func decodeBytesValue(r io.Reader) ([]byte, error) {
  118. var raw rawValue
  119. var err error
  120. raw.Len, err = decodeUint16(r)
  121. if err != nil {
  122. return nil, err
  123. }
  124. buf := make([]byte, raw.Len)
  125. _, err = io.ReadFull(r, buf)
  126. if err != nil {
  127. return nil, err
  128. }
  129. return buf, nil
  130. }
  131. func decodeStringValue(r io.Reader) (string, error) {
  132. v, err := decodeBytesValue(r)
  133. return string(v), err
  134. }
  135. // Value represents the abstract header value.
  136. type Value interface {
  137. Get() interface{}
  138. String() string
  139. valueType() valueType
  140. encode(io.Writer) error
  141. }
  142. // An BoolValue provides eventstream encoding, and representation
  143. // of a Go bool value.
  144. type BoolValue bool
  145. // Get returns the underlying type
  146. func (v BoolValue) Get() interface{} {
  147. return bool(v)
  148. }
  149. // valueType returns the EventStream header value type value.
  150. func (v BoolValue) valueType() valueType {
  151. if v {
  152. return trueValueType
  153. }
  154. return falseValueType
  155. }
  156. func (v BoolValue) String() string {
  157. return strconv.FormatBool(bool(v))
  158. }
  159. // encode encodes the BoolValue into an eventstream binary value
  160. // representation.
  161. func (v BoolValue) encode(w io.Writer) error {
  162. return binary.Write(w, binary.BigEndian, v.valueType())
  163. }
  164. // An Int8Value provides eventstream encoding, and representation of a Go
  165. // int8 value.
  166. type Int8Value int8
  167. // Get returns the underlying value.
  168. func (v Int8Value) Get() interface{} {
  169. return int8(v)
  170. }
  171. // valueType returns the EventStream header value type value.
  172. func (Int8Value) valueType() valueType {
  173. return int8ValueType
  174. }
  175. func (v Int8Value) String() string {
  176. return fmt.Sprintf("0x%02x", int8(v))
  177. }
  178. // encode encodes the Int8Value into an eventstream binary value
  179. // representation.
  180. func (v Int8Value) encode(w io.Writer) error {
  181. raw := rawValue{
  182. Type: v.valueType(),
  183. }
  184. return raw.encodeScalar(w, v)
  185. }
  186. func (v *Int8Value) decode(r io.Reader) error {
  187. n, err := decodeUint8(r)
  188. if err != nil {
  189. return err
  190. }
  191. *v = Int8Value(n)
  192. return nil
  193. }
  194. // An Int16Value provides eventstream encoding, and representation of a Go
  195. // int16 value.
  196. type Int16Value int16
  197. // Get returns the underlying value.
  198. func (v Int16Value) Get() interface{} {
  199. return int16(v)
  200. }
  201. // valueType returns the EventStream header value type value.
  202. func (Int16Value) valueType() valueType {
  203. return int16ValueType
  204. }
  205. func (v Int16Value) String() string {
  206. return fmt.Sprintf("0x%04x", int16(v))
  207. }
  208. // encode encodes the Int16Value into an eventstream binary value
  209. // representation.
  210. func (v Int16Value) encode(w io.Writer) error {
  211. raw := rawValue{
  212. Type: v.valueType(),
  213. }
  214. return raw.encodeScalar(w, v)
  215. }
  216. func (v *Int16Value) decode(r io.Reader) error {
  217. n, err := decodeUint16(r)
  218. if err != nil {
  219. return err
  220. }
  221. *v = Int16Value(n)
  222. return nil
  223. }
  224. // An Int32Value provides eventstream encoding, and representation of a Go
  225. // int32 value.
  226. type Int32Value int32
  227. // Get returns the underlying value.
  228. func (v Int32Value) Get() interface{} {
  229. return int32(v)
  230. }
  231. // valueType returns the EventStream header value type value.
  232. func (Int32Value) valueType() valueType {
  233. return int32ValueType
  234. }
  235. func (v Int32Value) String() string {
  236. return fmt.Sprintf("0x%08x", int32(v))
  237. }
  238. // encode encodes the Int32Value into an eventstream binary value
  239. // representation.
  240. func (v Int32Value) encode(w io.Writer) error {
  241. raw := rawValue{
  242. Type: v.valueType(),
  243. }
  244. return raw.encodeScalar(w, v)
  245. }
  246. func (v *Int32Value) decode(r io.Reader) error {
  247. n, err := decodeUint32(r)
  248. if err != nil {
  249. return err
  250. }
  251. *v = Int32Value(n)
  252. return nil
  253. }
  254. // An Int64Value provides eventstream encoding, and representation of a Go
  255. // int64 value.
  256. type Int64Value int64
  257. // Get returns the underlying value.
  258. func (v Int64Value) Get() interface{} {
  259. return int64(v)
  260. }
  261. // valueType returns the EventStream header value type value.
  262. func (Int64Value) valueType() valueType {
  263. return int64ValueType
  264. }
  265. func (v Int64Value) String() string {
  266. return fmt.Sprintf("0x%016x", int64(v))
  267. }
  268. // encode encodes the Int64Value into an eventstream binary value
  269. // representation.
  270. func (v Int64Value) encode(w io.Writer) error {
  271. raw := rawValue{
  272. Type: v.valueType(),
  273. }
  274. return raw.encodeScalar(w, v)
  275. }
  276. func (v *Int64Value) decode(r io.Reader) error {
  277. n, err := decodeUint64(r)
  278. if err != nil {
  279. return err
  280. }
  281. *v = Int64Value(n)
  282. return nil
  283. }
  284. // An BytesValue provides eventstream encoding, and representation of a Go
  285. // byte slice.
  286. type BytesValue []byte
  287. // Get returns the underlying value.
  288. func (v BytesValue) Get() interface{} {
  289. return []byte(v)
  290. }
  291. // valueType returns the EventStream header value type value.
  292. func (BytesValue) valueType() valueType {
  293. return bytesValueType
  294. }
  295. func (v BytesValue) String() string {
  296. return base64.StdEncoding.EncodeToString([]byte(v))
  297. }
  298. // encode encodes the BytesValue into an eventstream binary value
  299. // representation.
  300. func (v BytesValue) encode(w io.Writer) error {
  301. raw := rawValue{
  302. Type: v.valueType(),
  303. }
  304. return raw.encodeBytes(w, []byte(v))
  305. }
  306. func (v *BytesValue) decode(r io.Reader) error {
  307. buf, err := decodeBytesValue(r)
  308. if err != nil {
  309. return err
  310. }
  311. *v = BytesValue(buf)
  312. return nil
  313. }
  314. // An StringValue provides eventstream encoding, and representation of a Go
  315. // string.
  316. type StringValue string
  317. // Get returns the underlying value.
  318. func (v StringValue) Get() interface{} {
  319. return string(v)
  320. }
  321. // valueType returns the EventStream header value type value.
  322. func (StringValue) valueType() valueType {
  323. return stringValueType
  324. }
  325. func (v StringValue) String() string {
  326. return string(v)
  327. }
  328. // encode encodes the StringValue into an eventstream binary value
  329. // representation.
  330. func (v StringValue) encode(w io.Writer) error {
  331. raw := rawValue{
  332. Type: v.valueType(),
  333. }
  334. return raw.encodeString(w, string(v))
  335. }
  336. func (v *StringValue) decode(r io.Reader) error {
  337. s, err := decodeStringValue(r)
  338. if err != nil {
  339. return err
  340. }
  341. *v = StringValue(s)
  342. return nil
  343. }
  344. // An TimestampValue provides eventstream encoding, and representation of a Go
  345. // timestamp.
  346. type TimestampValue time.Time
  347. // Get returns the underlying value.
  348. func (v TimestampValue) Get() interface{} {
  349. return time.Time(v)
  350. }
  351. // valueType returns the EventStream header value type value.
  352. func (TimestampValue) valueType() valueType {
  353. return timestampValueType
  354. }
  355. func (v TimestampValue) epochMilli() int64 {
  356. nano := time.Time(v).UnixNano()
  357. msec := nano / int64(time.Millisecond)
  358. return msec
  359. }
  360. func (v TimestampValue) String() string {
  361. msec := v.epochMilli()
  362. return strconv.FormatInt(msec, 10)
  363. }
  364. // encode encodes the TimestampValue into an eventstream binary value
  365. // representation.
  366. func (v TimestampValue) encode(w io.Writer) error {
  367. raw := rawValue{
  368. Type: v.valueType(),
  369. }
  370. msec := v.epochMilli()
  371. return raw.encodeScalar(w, msec)
  372. }
  373. func (v *TimestampValue) decode(r io.Reader) error {
  374. n, err := decodeUint64(r)
  375. if err != nil {
  376. return err
  377. }
  378. *v = TimestampValue(timeFromEpochMilli(int64(n)))
  379. return nil
  380. }
  381. func timeFromEpochMilli(t int64) time.Time {
  382. secs := t / 1e3
  383. msec := t % 1e3
  384. return time.Unix(secs, msec*int64(time.Millisecond)).UTC()
  385. }
  386. // An UUIDValue provides eventstream encoding, and representation of a UUID
  387. // value.
  388. type UUIDValue [16]byte
  389. // Get returns the underlying value.
  390. func (v UUIDValue) Get() interface{} {
  391. return v[:]
  392. }
  393. // valueType returns the EventStream header value type value.
  394. func (UUIDValue) valueType() valueType {
  395. return uuidValueType
  396. }
  397. func (v UUIDValue) String() string {
  398. return fmt.Sprintf(`%X-%X-%X-%X-%X`, v[0:4], v[4:6], v[6:8], v[8:10], v[10:])
  399. }
  400. // encode encodes the UUIDValue into an eventstream binary value
  401. // representation.
  402. func (v UUIDValue) encode(w io.Writer) error {
  403. raw := rawValue{
  404. Type: v.valueType(),
  405. }
  406. return raw.encodeFixedSlice(w, v[:])
  407. }
  408. func (v *UUIDValue) decode(r io.Reader) error {
  409. tv := (*v)[:]
  410. return decodeFixedBytesValue(r, tv)
  411. }