Browse Source

Non-searchable shards wait for commit durability (#95690)

When a wait_until bulk request comes in to a non-searchable shard, we
are missing a step to wait until the commit as fully been persisted to
the durability mechanism configured by the engine implementation.
Tim Brooks 2 years ago
parent
commit
3af537993c

+ 14 - 4
server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java

@@ -45,7 +45,7 @@ public class PostWriteRefresh {
             case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
                 @Override
                 public void onResponse(Boolean forced) {
-                    if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0 && location != null) {
+                    if (indexShard.routingEntry().isSearchable() == false && location != null) {
                         refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout);
                     } else {
                         listener.onResponse(forced);
@@ -116,17 +116,27 @@ public class PostWriteRefresh {
             return;
         }
 
-        engineOrNull.addFlushListener(location, new ActionListener<>() {
+        engineOrNull.addFlushListener(location, ActionListener.wrap(new ActionListener<>() {
             @Override
             public void onResponse(Long generation) {
-                sendUnpromotableRequests(indexShard, generation, forced, listener, postWriteRefreshTimeout);
+                engineOrNull.addFlushDurabilityListener(generation, ActionListener.wrap(new ActionListener<>() {
+                    @Override
+                    public void onResponse(Void unused) {
+                        sendUnpromotableRequests(indexShard, generation, forced, listener, postWriteRefreshTimeout);
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        listener.onFailure(e);
+                    }
+                }));
             }
 
             @Override
             public void onFailure(Exception e) {
                 listener.onFailure(e);
             }
-        });
+        }));
     }
 
     private void sendUnpromotableRequests(

+ 9 - 0
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -1967,6 +1967,15 @@ public abstract class Engine implements Closeable {
         listener.onFailure(new UnsupportedOperationException("Engine type " + this.getClass() + " does not support flush listeners."));
     }
 
+    /**
+     * Adds a listener which will be executed once the request generation has been durably persisted.
+     */
+    public void addFlushDurabilityListener(long generation, ActionListener<Void> listener) {
+        listener.onFailure(
+            new UnsupportedOperationException("Engine type " + this.getClass() + " does not support generation durability listeners.")
+        );
+    }
+
     /**
      * Captures the result of a refresh operation on the index shard.
      * <p>

+ 9 - 0
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -3128,4 +3128,13 @@ public class InternalEngine extends Engine {
     public void addFlushListener(Translog.Location location, ActionListener<Long> listener) {
         this.flushListener.addOrNotify(location, listener);
     }
+
+    @Override
+    public void addFlushDurabilityListener(long generation, ActionListener<Void> listener) {
+        ensureOpen();
+        if (lastCommittedSegmentInfos.getGeneration() < generation) {
+            listener.onFailure(new IllegalStateException("Cannot wait on generation which has not been committed"));
+        }
+        listener.onResponse(null);
+    }
 }

+ 12 - 1
server/src/test/java/org/elasticsearch/action/support/replication/PostWriteRefreshTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.index.engine.EngineTestCase;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTestCase;
 import org.elasticsearch.index.shard.ReplicationGroup;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -137,16 +138,26 @@ public class PostWriteRefreshTests extends IndexShardTestCase {
 
             ReplicationGroup replicationGroup = mock(ReplicationGroup.class);
             IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class);
+            ShardId shardId = primary.shardId();
+            ShardRouting routing = ShardRouting.newUnassigned(
+                shardId,
+                true,
+                RecoverySource.EmptyStoreRecoverySource.INSTANCE,
+                new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""),
+                ShardRouting.Role.INDEX_ONLY
+            );
+            when(primary.routingEntry()).thenReturn(routing);
             when(primary.getReplicationGroup()).thenReturn(replicationGroup).thenReturn(realReplicationGroup);
             when(replicationGroup.getRoutingTable()).thenReturn(routingTable);
             ShardRouting shardRouting = ShardRouting.newUnassigned(
-                primary.shardId(),
+                shardId,
                 false,
                 RecoverySource.PeerRecoverySource.INSTANCE,
                 new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "message"),
                 ShardRouting.Role.SEARCH_ONLY
             );
             when(routingTable.unpromotableShards()).thenReturn(List.of(shardRouting));
+            when(routingTable.shardId()).thenReturn(shardId);
             WriteRequest.RefreshPolicy policy = randomFrom(WriteRequest.RefreshPolicy.IMMEDIATE, WriteRequest.RefreshPolicy.WAIT_UNTIL);
             postWriteRefresh.refreshShard(policy, primary, result.getTranslogLocation(), f, postWriteRefreshTimeout);
             final Releasable releasable;