123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- [role="xpack"]
- [[transform-checkpoints]]
- = How {transform} checkpoints work
- ++++
- <titleabbrev>How checkpoints work</titleabbrev>
- ++++
- Each time a {transform} examines the source indices and creates or updates the
- destination index, it generates a _checkpoint_.
- If your {transform} runs only once, there is logically only one checkpoint. If
- your {transform} runs continuously, however, it creates checkpoints as it
- ingests and transforms new source data. The `sync` property of the {transform}
- configures checkpointing by specifying a time field.
- To create a checkpoint, the {ctransform}:
- . Checks for changes to source indices.
- +
- Using a simple periodic timer, the {transform} checks for changes to the source
- indices. This check is done based on the interval defined in the transform's
- `frequency` property.
- +
- If the source indices remain unchanged or if a checkpoint is already in progress
- then it waits for the next timer.
- +
- If changes are found a checkpoint is created.
- . Identifies which entities and/or time buckets have changed.
- +
- The {transform} searches to see which entities or time buckets have changed
- between the last and the new checkpoint. The {transform} uses the values to
- synchronize the source and destination indices with fewer operations than a
- full re-run.
-
- . Updates the destination index (the {dataframe}) with the changes.
- +
- --
- The {transform} applies changes related to either new or changed entities or
- time buckets to the destination index. The set of changes can be paginated. The
- {transform} performs a composite aggregation similarly to the batch {transform}
- operation, however it also injects query filters based on the previous step to
- reduce the amount of work. After all changes have been applied, the checkpoint
- is complete.
- --
- This checkpoint process involves both search and indexing activity on the
- cluster. We have attempted to favor control over performance while developing
- {transforms}. We decided it was preferable for the {transform} to take longer to
- complete, rather than to finish quickly and take precedence in resource
- consumption. That being said, the cluster still requires enough resources to
- support both the composite aggregation search and the indexing of its results.
- TIP: If the cluster experiences unsuitable performance degradation due to the
- {transform}, stop the {transform} and refer to <<transform-performance>>.
- [discrete]
- [[sync-field-ingest-timestamp]]
- == Using the ingest timestamp for syncing the {transform}
- In most cases, it is strongly recommended to use the ingest timestamp of the
- source indices for syncing the {transform}. This is the most optimal way for
- {transforms} to be able to identify new changes. If your data source follows the
- {ecs-ref}/ecs-reference.html[ECS standard], you might already have an
- {ecs-ref}/ecs-event.html#field-event-ingested[`event.ingested`] field. In this
- case, use `event.ingested` as the `sync`.`time`.`field` property of your
- {transform}.
- If you don't have a `event.ingested` field or it isn't populated, you can set it
- by using an ingest pipeline. Create an ingest pipeline either using the
- <<put-pipeline-api, ingest pipeline API>> (like the example below) or via {kib}
- under **Stack Management > Ingest Pipelines**. Use a
- <<set-processor,`set` processor>> to set the field and associate it with the
- value of the ingest timestamp.
- [source,console]
- ----------------------------------
- PUT _ingest/pipeline/set_ingest_time
- {
- "description": "Set ingest timestamp.",
- "processors": [
- {
- "set": {
- "field": "event.ingested",
- "value": "{{{_ingest.timestamp}}}"
- }
- }
- ]
- }
- ----------------------------------
- After you created the ingest pipeline, apply it to the source indices of your
- {transform}. The pipeline adds the field `event.ingested` to every document with
- the value of the ingest timestamp. Configure the `sync`.`time`.`field` property
- of your {transform} to use the field by using the
- <<put-transform,create {transform} API>> for new {transforms} or the
- <<update-transform, update {transform} API>> for existing {transforms}. The
- `event.ingested` field is used for syncing the {transform}.
- Refer to <<add-pipeline-to-indexing-request>> and <<ingest>> to learn more about
- how to use an ingest pipeline.
- [discrete]
- [[ml-transform-checkpoint-heuristics]]
- == Change detection heuristics
- When the {transform} runs in continuous mode, it updates the documents in the
- destination index as new data comes in. The {transform} uses a set of heuristics
- called change detection to update the destination index with fewer operations.
- In this example, the data is grouped by host names. Change detection detects
- which host names have changed, for example, host `A`, `C` and `G` and only
- updates documents with those hosts but does not update documents that store
- information about host `B`, `D`, or any other host that are not changed.
- Another heuristic can be applied for time buckets when a `date_histogram` is
- used to group by time buckets. Change detection detects which time buckets have
- changed and only update those.
- [discrete]
- [[ml-transform-checkpoint-errors]]
- == Error handling
- Failures in {transforms} tend to be related to searching or indexing.
- To increase the resiliency of {transforms}, the cursor positions of
- the aggregated search and the changed entities search are tracked in memory and
- persisted periodically.
- Checkpoint failures can be categorized as follows:
- * Temporary failures: The checkpoint is retried. If 10 consecutive failures
- occur, the {transform} has a failed status. For example, this situation might
- occur when there are shard failures and queries return only partial results.
- * Irrecoverable failures: The {transform} immediately fails. For example, this
- situation occurs when the source index is not found.
- * Adjustment failures: The {transform} retries with adjusted settings. For
- example, if a parent circuit breaker memory errors occur during the composite
- aggregation, the {transform} receives partial results. The aggregated search is
- retried with a smaller number of buckets. This retry is performed at the
- interval defined in the `frequency` property for the {transform}. If the search
- is retried to the point where it reaches a minimal number of buckets, an
- irrecoverable failure occurs.
- If the node running the {transforms} fails, the {transform} restarts from the
- most recent persisted cursor position. This recovery process might repeat some
- of the work the {transform} had already done, but it ensures data consistency.
|