|
|
@@ -29,7 +29,6 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
-import org.elasticsearch.common.util.concurrent.ThreadContext;
|
|
|
import org.elasticsearch.index.IndexNotFoundException;
|
|
|
import org.elasticsearch.index.IndexingPressure;
|
|
|
import org.elasticsearch.index.shard.IndexShard;
|
|
|
@@ -111,47 +110,43 @@ public class RetentionLeaseSyncAction extends TransportWriteAction<
|
|
|
RetentionLeases retentionLeases,
|
|
|
ActionListener<ReplicationResponse> listener
|
|
|
) {
|
|
|
- final ThreadContext threadContext = threadPool.getThreadContext();
|
|
|
- try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
|
|
+ try (var ignore = threadPool.getThreadContext().newEmptySystemContext()) {
|
|
|
// we have to execute under the system context so that if security is enabled the sync is authorized
|
|
|
- threadContext.markAsSystemContext();
|
|
|
final Request request = new Request(shardId, retentionLeases);
|
|
|
- try (var ignored = threadContext.newTraceContext()) {
|
|
|
- final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
|
|
|
- transportService.sendChildRequest(
|
|
|
- clusterService.localNode(),
|
|
|
- transportPrimaryAction,
|
|
|
- new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
|
|
|
- task,
|
|
|
- transportOptions,
|
|
|
- new TransportResponseHandler<ReplicationResponse>() {
|
|
|
- @Override
|
|
|
- public ReplicationResponse read(StreamInput in) throws IOException {
|
|
|
- return newResponseInstance(in);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Executor executor() {
|
|
|
- return TransportResponseHandler.TRANSPORT_WORKER;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void handleResponse(ReplicationResponse response) {
|
|
|
- task.setPhase("finished");
|
|
|
- taskManager.unregister(task);
|
|
|
- listener.onResponse(response);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void handleException(TransportException e) {
|
|
|
- LOGGER.log(getExceptionLogLevel(e), () -> format("%s retention lease sync failed", shardId), e);
|
|
|
- task.setPhase("finished");
|
|
|
- taskManager.unregister(task);
|
|
|
- listener.onFailure(e);
|
|
|
- }
|
|
|
+ final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
|
|
|
+ transportService.sendChildRequest(
|
|
|
+ clusterService.localNode(),
|
|
|
+ transportPrimaryAction,
|
|
|
+ new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
|
|
|
+ task,
|
|
|
+ transportOptions,
|
|
|
+ new TransportResponseHandler<ReplicationResponse>() {
|
|
|
+ @Override
|
|
|
+ public ReplicationResponse read(StreamInput in) throws IOException {
|
|
|
+ return newResponseInstance(in);
|
|
|
}
|
|
|
- );
|
|
|
- }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Executor executor() {
|
|
|
+ return TransportResponseHandler.TRANSPORT_WORKER;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handleResponse(ReplicationResponse response) {
|
|
|
+ task.setPhase("finished");
|
|
|
+ taskManager.unregister(task);
|
|
|
+ listener.onResponse(response);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handleException(TransportException e) {
|
|
|
+ LOGGER.log(getExceptionLogLevel(e), () -> format("%s retention lease sync failed", shardId), e);
|
|
|
+ task.setPhase("finished");
|
|
|
+ taskManager.unregister(task);
|
|
|
+ listener.onFailure(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
|