1
0
Эх сурвалжийг харах

Make Timestamps Returned by Snapshot APIs Consistent (#43148)

* We don't have to calculate the start and end times form the shards for the status API, we have the start time available from the CS or the `SnapshotInfo` in the repo and can either take the end time form the `SnapshotInfo` or
take the most recent time from the shard stats for in progress snapshots
* Closes #43074
Armin Braun 6 жил өмнө
parent
commit
deafb5923a

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexStatus.java

@@ -58,7 +58,7 @@ public class SnapshotIndexStatus implements Iterable<SnapshotIndexShardStatus>,
         stats = new SnapshotStats();
         for (SnapshotIndexShardStatus shard : shards) {
             indexShards.put(shard.getShardId().getId(), shard);
-            stats.add(shard.getStats());
+            stats.add(shard.getStats(), true);
         }
         shardsStats = new SnapshotShardsStats(shards);
         this.indexShards = unmodifiableMap(indexShards);

+ 7 - 2
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java

@@ -296,7 +296,12 @@ public class SnapshotStats implements Streamable, ToXContentObject {
             processedSize);
     }
 
-    void add(SnapshotStats stats) {
+    /**
+     * Add stats instance to the total
+     * @param stats Stats instance to add
+     * @param updateTimestamps Whether or not start time and duration should be updated
+     */
+    void add(SnapshotStats stats, boolean updateTimestamps) {
         incrementalFileCount += stats.incrementalFileCount;
         totalFileCount += stats.totalFileCount;
         processedFileCount += stats.processedFileCount;
@@ -309,7 +314,7 @@ public class SnapshotStats implements Streamable, ToXContentObject {
             // First time here
             startTime = stats.startTime;
             time = stats.time;
-        } else {
+        } else if (updateTimestamps) {
             // The time the last snapshot ends
             long endTime = Math.max(startTime + time, stats.startTime + stats.time);
 

+ 23 - 8
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

@@ -19,11 +19,12 @@
 
 package org.elasticsearch.action.admin.cluster.snapshots.status;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.cluster.SnapshotsInProgress.State;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
@@ -71,14 +72,14 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
     @Nullable
     private Boolean includeGlobalState;
 
-    SnapshotStatus(final Snapshot snapshot, final State state, final List<SnapshotIndexShardStatus> shards,
-                   final Boolean includeGlobalState) {
+    SnapshotStatus(Snapshot snapshot, State state, List<SnapshotIndexShardStatus> shards, Boolean includeGlobalState,
+                   long startTime, long time) {
         this.snapshot = Objects.requireNonNull(snapshot);
         this.state = Objects.requireNonNull(state);
         this.shards = Objects.requireNonNull(shards);
         this.includeGlobalState = includeGlobalState;
         shardsStats = new SnapshotShardsStats(shards);
-        updateShardStats();
+        updateShardStats(startTime, time);
     }
 
     private SnapshotStatus(Snapshot snapshot, State state, List<SnapshotIndexShardStatus> shards,
@@ -169,7 +170,16 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
         }
         shards = Collections.unmodifiableList(builder);
         includeGlobalState = in.readOptionalBoolean();
-        updateShardStats();
+        final long startTime;
+        final long time;
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            startTime = in.readLong();
+            time = in.readLong();
+        } else {
+            startTime = 0L;
+            time = 0L;
+        }
+        updateShardStats(startTime, time);
     }
 
     @Override
@@ -181,6 +191,10 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
             shard.writeTo(out);
         }
         out.writeOptionalBoolean(includeGlobalState);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeLong(stats.getStartTime());
+            out.writeLong(stats.getTime());
+        }
     }
 
     /**
@@ -281,11 +295,12 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
         return PARSER.parse(parser, null);
     }
 
-    private void updateShardStats() {
-        stats = new SnapshotStats();
+    private void updateShardStats(long startTime, long time) {
+        stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, 0, 0);
         shardsStats = new SnapshotShardsStats(shards);
         for (SnapshotIndexShardStatus shard : shards) {
-            stats.add(shard.getStats());
+            // BWC: only update timestamps when we did not get a start time from an old node
+            stats.add(shard.getStats(), startTime == 0L);
         }
     }
 

+ 5 - 2
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

@@ -187,7 +187,8 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
                     shardStatusBuilder.add(shardStatus);
                 }
                 builder.add(new SnapshotStatus(entry.snapshot(), entry.state(),
-                    Collections.unmodifiableList(shardStatusBuilder), entry.includeGlobalState()));
+                    Collections.unmodifiableList(shardStatusBuilder), entry.includeGlobalState(), entry.startTime(),
+                    Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L)));
             }
         }
         // Now add snapshots on disk that are not currently running
@@ -240,8 +241,10 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
                         default:
                             throw new IllegalArgumentException("Unknown snapshot state " + snapshotInfo.state());
                     }
+                    final long startTime = snapshotInfo.startTime();
                     builder.add(new SnapshotStatus(new Snapshot(repositoryName, snapshotId), state,
-                        Collections.unmodifiableList(shardStatusBuilder), snapshotInfo.includeGlobalState()));
+                        Collections.unmodifiableList(shardStatusBuilder), snapshotInfo.includeGlobalState(),
+                        startTime, snapshotInfo.endTime() - startTime));
                 }
             }
         }

+ 5 - 5
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -561,7 +561,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                                          final Map<String, Object> userMetadata) {
         SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
             indices.stream().map(IndexId::getName).collect(Collectors.toList()),
-            startTime, failure, System.currentTimeMillis(), totalShards, shardFailures,
+            startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
             includeGlobalState, userMetadata);
         try {
             final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
@@ -853,7 +853,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
                               IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
         final ShardId shardId = store.shardId();
-        final long startTime = threadPool.relativeTimeInMillis();
+        final long startTime = threadPool.absoluteTimeInMillis();
         try {
             logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name());
 
@@ -953,7 +953,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 lastSnapshotStatus.getStartTime(),
                 // snapshotStatus.startTime() is assigned on the same machine,
                 // so it's safe to use the relative time in millis
-                threadPool.relativeTimeInMillis() - lastSnapshotStatus.getStartTime(),
+                threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
                 lastSnapshotStatus.getIncrementalFileCount(),
                 lastSnapshotStatus.getIncrementalSize()
             );
@@ -976,9 +976,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
             finalizeShard(newSnapshotsList, fileListGeneration, blobs, "snapshot creation [" + snapshotId + "]", shardContainer,
                 shardId, snapshotId);
-            snapshotStatus.moveToDone(threadPool.relativeTimeInMillis());
+            snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis());
         } catch (Exception e) {
-            snapshotStatus.moveToFailed(threadPool.relativeTimeInMillis(), ExceptionsHelper.detailedMessage(e));
+            snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e));
             if (e instanceof IndexShardSnapshotFailedException) {
                 throw (IndexShardSnapshotFailedException) e;
             } else {

+ 2 - 2
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -285,7 +285,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                                                                 request.partial(),
                                                                 State.INIT,
                                                                 snapshotIndices,
-                                                                System.currentTimeMillis(),
+                                                                threadPool.absoluteTimeInMillis(),
                                                                 repositoryData.getGenId(),
                                                                 null,
                                                                 request.userMetadata());
@@ -1169,7 +1169,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
                     // add the snapshot deletion to the cluster state
                     SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry(
                         snapshot,
-                        System.currentTimeMillis(),
+                        threadPool.absoluteTimeInMillis(),
                         repositoryStateId
                     );
                     if (deletionsInProgress != null) {

+ 2 - 2
server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java

@@ -50,7 +50,7 @@ public class SnapshotStatusTests extends AbstractXContentTestCase<SnapshotStatus
         List<SnapshotIndexShardStatus> snapshotIndexShardStatuses = new ArrayList<>();
         snapshotIndexShardStatuses.add(snapshotIndexShardStatus);
         boolean includeGlobalState = randomBoolean();
-        SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState);
+        SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L);
 
         int initializingShards = 0;
         int startedShards = 0;
@@ -166,7 +166,7 @@ public class SnapshotStatusTests extends AbstractXContentTestCase<SnapshotStatus
             snapshotIndexShardStatuses.add(snapshotIndexShardStatus);
         }
         boolean includeGlobalState = randomBoolean();
-        return new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState);
+        return new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L);
     }
 
     @Override

+ 75 - 0
server/src/test/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java

@@ -0,0 +1,75 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.snapshots;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
+
+import java.util.List;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+
+public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
+
+    public void testStatusApiConsistency() {
+        Client client = client();
+
+        logger.info("-->  creating repository");
+        assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings(
+            Settings.builder().put("location", randomRepoPath()).build()));
+
+        createIndex("test-idx-1", "test-idx-2", "test-idx-3");
+        ensureGreen();
+
+        logger.info("--> indexing some data");
+        for (int i = 0; i < 100; i++) {
+            index("test-idx-1", "_doc", Integer.toString(i), "foo", "bar" + i);
+            index("test-idx-2", "_doc", Integer.toString(i), "foo", "baz" + i);
+            index("test-idx-3", "_doc", Integer.toString(i), "foo", "baz" + i);
+        }
+        refresh();
+
+        logger.info("--> snapshot");
+        CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
+            .setWaitForCompletion(true).get();
+        assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
+        assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
+            equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
+
+        List<SnapshotInfo> snapshotInfos =
+            client.admin().cluster().prepareGetSnapshots("test-repo").get().getSnapshots("test-repo");
+        assertThat(snapshotInfos.size(), equalTo(1));
+        SnapshotInfo snapshotInfo = snapshotInfos.get(0);
+        assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
+        assertThat(snapshotInfo.version(), equalTo(Version.CURRENT));
+
+        final List<SnapshotStatus> snapshotStatus = client.admin().cluster().snapshotsStatus(
+            new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots();
+        assertThat(snapshotStatus.size(), equalTo(1));
+        final SnapshotStatus snStatus = snapshotStatus.get(0);
+        assertEquals(snStatus.getStats().getStartTime(), snapshotInfo.startTime());
+        assertEquals(snStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime());
+    }
+}