|
|
@@ -341,10 +341,22 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
|
|
|
delegate.onFailure(e);
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
+ /*
|
|
|
+ * When creating the downsample index, we copy the index.number_of_shards from source index,
|
|
|
+ * and we set the index.number_of_replicas to 0, to avoid replicating the index being built.
|
|
|
+ * Also, we set the index.refresh_interval to -1.
|
|
|
+ * We will set the correct number of replicas and refresh the index later.
|
|
|
+ *
|
|
|
+ * We should note that there is a risk of losing a node during the downsample process. In this
|
|
|
+ * case downsample will fail.
|
|
|
+ */
|
|
|
+ int minNumReplicas = clusterService.getSettings().getAsInt(Downsample.DOWNSAMPLE_MIN_NUMBER_OF_REPLICAS_NAME, 0);
|
|
|
+
|
|
|
// 3. Create downsample index
|
|
|
createDownsampleIndex(
|
|
|
- clusterService.getSettings(),
|
|
|
downsampleIndexName,
|
|
|
+ minNumReplicas,
|
|
|
sourceIndexMetadata,
|
|
|
mapping,
|
|
|
request,
|
|
|
@@ -353,6 +365,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
|
|
|
performShardDownsampling(
|
|
|
request,
|
|
|
delegate,
|
|
|
+ minNumReplicas,
|
|
|
sourceIndexMetadata,
|
|
|
downsampleIndexName,
|
|
|
parentTask,
|
|
|
@@ -382,6 +395,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
|
|
|
performShardDownsampling(
|
|
|
request,
|
|
|
delegate,
|
|
|
+ minNumReplicas,
|
|
|
sourceIndexMetadata,
|
|
|
downsampleIndexName,
|
|
|
parentTask,
|
|
|
@@ -451,6 +465,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
|
|
|
private void performShardDownsampling(
|
|
|
DownsampleAction.Request request,
|
|
|
ActionListener<AcknowledgedResponse> listener,
|
|
|
+ int minNumReplicas,
|
|
|
IndexMetadata sourceIndexMetadata,
|
|
|
String downsampleIndexName,
|
|
|
TaskId parentTask,
|
|
|
@@ -509,7 +524,15 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
|
|
|
logger.info("Downsampling task [" + persistentTaskId + " completed for shard " + params.shardId());
|
|
|
if (countDown.decrementAndGet() == 0) {
|
|
|
logger.info("All downsampling tasks completed [" + numberOfShards + "]");
|
|
|
- updateTargetIndexSettingStep(request, listener, sourceIndexMetadata, downsampleIndexName, parentTask, startTime);
|
|
|
+ updateTargetIndexSettingStep(
|
|
|
+ request,
|
|
|
+ listener,
|
|
|
+ minNumReplicas,
|
|
|
+ sourceIndexMetadata,
|
|
|
+ downsampleIndexName,
|
|
|
+ parentTask,
|
|
|
+ startTime
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -556,6 +579,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
|
|
|
private void updateTargetIndexSettingStep(
|
|
|
final DownsampleAction.Request request,
|
|
|
final ActionListener<AcknowledgedResponse> listener,
|
|
|
+ int minNumReplicas,
|
|
|
final IndexMetadata sourceIndexMetadata,
|
|
|
final String downsampleIndexName,
|
|
|
final TaskId parentTask,
|
|
|
@@ -564,7 +588,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
|
|
|
// 4. Make downsample index read-only and set the correct number of replicas
|
|
|
final Settings.Builder settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true);
|
|
|
// Number of replicas had been previously set to 0 to speed up index population
|
|
|
- if (sourceIndexMetadata.getNumberOfReplicas() > 0) {
|
|
|
+ if (sourceIndexMetadata.getNumberOfReplicas() > 0 && minNumReplicas == 0) {
|
|
|
settings.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, sourceIndexMetadata.getNumberOfReplicas());
|
|
|
}
|
|
|
// Setting index.hidden has been initially set to true. We revert this to the value of the
|
|
|
@@ -842,28 +866,18 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
|
|
|
}
|
|
|
|
|
|
private void createDownsampleIndex(
|
|
|
- Settings settings,
|
|
|
String downsampleIndexName,
|
|
|
+ int minNumReplicas,
|
|
|
IndexMetadata sourceIndexMetadata,
|
|
|
String mapping,
|
|
|
DownsampleAction.Request request,
|
|
|
ActionListener<AcknowledgedResponse> listener
|
|
|
) {
|
|
|
- /*
|
|
|
- * When creating the downsample index, we copy the index.number_of_shards from source index,
|
|
|
- * and we set the index.number_of_replicas to 0, to avoid replicating the index being built.
|
|
|
- * Also, we set the index.refresh_interval to -1.
|
|
|
- * We will set the correct number of replicas and refresh the index later.
|
|
|
- *
|
|
|
- * We should note that there is a risk of losing a node during the downsample process. In this
|
|
|
- * case downsample will fail.
|
|
|
- */
|
|
|
- int numberOfReplicas = settings.getAsInt(Downsample.DOWNSAMPLE_MIN_NUMBER_OF_REPLICAS_NAME, 0);
|
|
|
var downsampleInterval = request.getDownsampleConfig().getInterval().toString();
|
|
|
Settings.Builder builder = Settings.builder()
|
|
|
.put(IndexMetadata.SETTING_INDEX_HIDDEN, true)
|
|
|
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, sourceIndexMetadata.getNumberOfShards())
|
|
|
- .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, String.valueOf(numberOfReplicas))
|
|
|
+ .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, minNumReplicas)
|
|
|
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1")
|
|
|
.put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey(), DownsampleTaskStatus.STARTED)
|
|
|
.put(IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL.getKey(), downsampleInterval)
|