checkpoints.asciidoc 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. [role="xpack"]
  2. [[transform-checkpoints]]
  3. = How {transform} checkpoints work
  4. ++++
  5. <titleabbrev>How checkpoints work</titleabbrev>
  6. ++++
  7. Each time a {transform} examines the source indices and creates or updates the
  8. destination index, it generates a _checkpoint_.
  9. If your {transform} runs only once, there is logically only one checkpoint. If
  10. your {transform} runs continuously, however, it creates checkpoints as it
  11. ingests and transforms new source data. The `sync` property of the {transform}
  12. configures checkpointing by specifying a time field.
  13. To create a checkpoint, the {ctransform}:
  14. . Checks for changes to source indices.
  15. +
  16. Using a simple periodic timer, the {transform} checks for changes to the source
  17. indices. This check is done based on the interval defined in the transform's
  18. `frequency` property.
  19. +
  20. If the source indices remain unchanged or if a checkpoint is already in progress
  21. then it waits for the next timer.
  22. +
  23. If changes are found a checkpoint is created.
  24. . Identifies which entities and/or time buckets have changed.
  25. +
  26. The {transform} searches to see which entities or time buckets have changed
  27. between the last and the new checkpoint. The {transform} uses the values to
  28. synchronize the source and destination indices with fewer operations than a
  29. full re-run.
  30. . Updates the destination index (the {dataframe}) with the changes.
  31. +
  32. --
  33. The {transform} applies changes related to either new or changed entities or
  34. time buckets to the destination index. The set of changes can be paginated. The
  35. {transform} performs a composite aggregation similarly to the batch {transform}
  36. operation, however it also injects query filters based on the previous step to
  37. reduce the amount of work. After all changes have been applied, the checkpoint
  38. is complete.
  39. --
  40. This checkpoint process involves both search and indexing activity on the
  41. cluster. We have attempted to favor control over performance while developing
  42. {transforms}. We decided it was preferable for the {transform} to take longer to
  43. complete, rather than to finish quickly and take precedence in resource
  44. consumption. That being said, the cluster still requires enough resources to
  45. support both the composite aggregation search and the indexing of its results.
  46. TIP: If the cluster experiences unsuitable performance degradation due to the
  47. {transform}, stop the {transform} and refer to <<transform-performance>>.
  48. [discrete]
  49. [[sync-field-ingest-timestamp]]
  50. == Using the ingest timestamp for syncing the {transform}
  51. In most cases, it is strongly recommended to use the ingest timestamp of the
  52. source indices for syncing the {transform}. This is the most optimal way for
  53. {transforms} to be able to identify new changes. If your data source follows the
  54. {ecs-ref}/ecs-reference.html[ECS standard], you might already have an
  55. {ecs-ref}/ecs-event.html#field-event-ingested[`event.ingested`] field. In this
  56. case, use `event.ingested` as the `sync`.`time`.`field` property of your
  57. {transform}.
  58. If you don't have a `event.ingested` field or it isn't populated, you can set it
  59. by using an ingest pipeline. Create an ingest pipeline either using the
  60. <<put-pipeline-api, ingest pipeline API>> (like the example below) or via {kib}
  61. under **Stack Management > Ingest Pipelines**. Use a
  62. <<set-processor,`set` processor>> to set the field and associate it with the
  63. value of the ingest timestamp.
  64. [source,console]
  65. ----------------------------------
  66. PUT _ingest/pipeline/set_ingest_time
  67. {
  68. "description": "Set ingest timestamp.",
  69. "processors": [
  70. {
  71. "set": {
  72. "field": "event.ingested",
  73. "value": "{{{_ingest.timestamp}}}"
  74. }
  75. }
  76. ]
  77. }
  78. ----------------------------------
  79. After you created the ingest pipeline, apply it to the source indices of your
  80. {transform}. The pipeline adds the field `event.ingested` to every document with
  81. the value of the ingest timestamp. Configure the `sync`.`time`.`field` property
  82. of your {transform} to use the field by using the
  83. <<put-transform,create {transform} API>> for new {transforms} or the
  84. <<update-transform, update {transform} API>> for existing {transforms}. The
  85. `event.ingested` field is used for syncing the {transform}.
  86. Refer to <<add-pipeline-to-indexing-request>> and <<ingest>> to learn more about
  87. how to use an ingest pipeline.
  88. [discrete]
  89. [[ml-transform-checkpoint-heuristics]]
  90. == Change detection heuristics
  91. When the {transform} runs in continuous mode, it updates the documents in the
  92. destination index as new data comes in. The {transform} uses a set of heuristics
  93. called change detection to update the destination index with fewer operations.
  94. In this example, the data is grouped by host names. Change detection detects
  95. which host names have changed, for example, host `A`, `C` and `G` and only
  96. updates documents with those hosts but does not update documents that store
  97. information about host `B`, `D`, or any other host that are not changed.
  98. Another heuristic can be applied for time buckets when a `date_histogram` is
  99. used to group by time buckets. Change detection detects which time buckets have
  100. changed and only update those.
  101. [discrete]
  102. [[ml-transform-checkpoint-errors]]
  103. == Error handling
  104. Failures in {transforms} tend to be related to searching or indexing.
  105. To increase the resiliency of {transforms}, the cursor positions of
  106. the aggregated search and the changed entities search are tracked in memory and
  107. persisted periodically.
  108. Checkpoint failures can be categorized as follows:
  109. * Temporary failures: The checkpoint is retried. If 10 consecutive failures
  110. occur, the {transform} has a failed status. For example, this situation might
  111. occur when there are shard failures and queries return only partial results.
  112. * Irrecoverable failures: The {transform} immediately fails. For example, this
  113. situation occurs when the source index is not found.
  114. * Adjustment failures: The {transform} retries with adjusted settings. For
  115. example, if a parent circuit breaker memory errors occur during the composite
  116. aggregation, the {transform} receives partial results. The aggregated search is
  117. retried with a smaller number of buckets. This retry is performed at the
  118. interval defined in the `frequency` property for the {transform}. If the search
  119. is retried to the point where it reaches a minimal number of buckets, an
  120. irrecoverable failure occurs.
  121. If the node running the {transforms} fails, the {transform} restarts from the
  122. most recent persisted cursor position. This recovery process might repeat some
  123. of the work the {transform} had already done, but it ensures data consistency.