Selaa lähdekoodia

Fail shard if search execution uncovers corruption
If, as part of the search execution, a corruption is uncovered, we should fail the shard
relates to #11419

Shay Banon 10 vuotta sitten
vanhempi
commit
740fe483bb
1 muutettua tiedostoa jossa 23 lisäystä ja 11 poistoa
  1. 23 11
      src/main/java/org/elasticsearch/search/SearchService.java

+ 23 - 11
src/main/java/org/elasticsearch/search/SearchService.java

@@ -41,6 +41,7 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
@@ -233,7 +234,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
             return context.dfsResult();
         } catch (Throwable e) {
             logger.trace("Dfs phase failed", e);
-            freeContext(context.id());
+            processFailure(context, e);
             throw ExceptionsHelper.convertToRuntime(e);
         } finally {
             cleanContext(context);
@@ -263,7 +264,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
             return context.queryResult();
         } catch (Throwable e) {
             logger.trace("Scan phase failed", e);
-            freeContext(context.id());
+            processFailure(context, e);
             throw ExceptionsHelper.convertToRuntime(e);
         } finally {
             context.size(originalSize);
@@ -292,7 +293,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
             return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
         } catch (Throwable e) {
             logger.trace("Scan phase failed", e);
-            freeContext(context.id());
+            processFailure(context, e);
             throw ExceptionsHelper.convertToRuntime(e);
         } finally {
             cleanContext(context);
@@ -336,7 +337,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
             }
             context.indexShard().searchService().onFailedQueryPhase(context);
             logger.trace("Query phase failed", e);
-            freeContext(context.id());
+            processFailure(context, e);
             throw ExceptionsHelper.convertToRuntime(e);
         } finally {
             cleanContext(context);
@@ -357,7 +358,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
         } catch (Throwable e) {
             context.indexShard().searchService().onFailedQueryPhase(context);
             logger.trace("Query phase failed", e);
-            freeContext(context.id());
+            processFailure(context, e);
             throw ExceptionsHelper.convertToRuntime(e);
         } finally {
             cleanContext(context);
@@ -372,7 +373,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
             context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity(),
                     indexCache.filter(), indexCache.filterPolicy()));
         } catch (Throwable e) {
-            freeContext(context.id());
+            processFailure(context, e);
             cleanContext(context);
             throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
         }
@@ -391,7 +392,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
         } catch (Throwable e) {
             context.indexShard().searchService().onFailedQueryPhase(context);
             logger.trace("Query phase failed", e);
-            freeContext(context.id());
+            processFailure(context, e);
             throw ExceptionsHelper.convertToRuntime(e);
         } finally {
             cleanContext(context);
@@ -429,7 +430,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
             return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
         } catch (Throwable e) {
             logger.trace("Fetch phase failed", e);
-            freeContext(context.id());
+            processFailure(context, e);
             throw ExceptionsHelper.convertToRuntime(e);
         } finally {
             cleanContext(context);
@@ -476,7 +477,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
             return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
         } catch (Throwable e) {
             logger.trace("Fetch phase failed", e);
-            freeContext(context.id());
+            processFailure(context, e);
             throw ExceptionsHelper.convertToRuntime(e);
         } finally {
             cleanContext(context);
@@ -515,7 +516,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
             return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
         } catch (Throwable e) {
             logger.trace("Fetch phase failed", e);
-            freeContext(context.id());
+            processFailure(context, e);
             throw ExceptionsHelper.convertToRuntime(e);
         } finally {
             cleanContext(context);
@@ -543,7 +544,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
         } catch (Throwable e) {
             context.indexShard().searchService().onFailedFetchPhase(context);
             logger.trace("Fetch phase failed", e);
-            freeContext(context.id()); // we just try to make sure this is freed - rethrow orig exception.
+            processFailure(context, e);
             throw ExceptionsHelper.convertToRuntime(e);
         } finally {
             cleanContext(context);
@@ -668,6 +669,17 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
         SearchContext.removeCurrent();
     }
 
+    private void processFailure(SearchContext context, Throwable t) {
+        freeContext(context.id());
+        try {
+            if (Lucene.isCorruptionException(t)) {
+                context.indexShard().failShard("search execution corruption failure", t);
+            }
+        } catch (Throwable e) {
+            logger.warn("failed to process shard failure to (potentially) send back shard failure on corruption", e);
+        }
+    }
+
     private void parseTemplate(ShardSearchRequest request) {
 
         final ExecutableScript executable;