浏览代码

Percolate on Index and Bulk, closes #636.

kimchy 14 年之前
父节点
当前提交
b1d13febbf
共有 17 个文件被更改,包括 355 次插入40 次删除
  1. 2 2
      modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java
  2. 2 2
      modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java
  3. 9 3
      modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
  4. 37 2
      modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
  5. 3 2
      modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java
  6. 3 2
      modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java
  7. 2 2
      modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java
  8. 24 0
      modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java
  9. 56 0
      modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexResponse.java
  10. 19 2
      modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
  11. 63 17
      modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java
  12. 10 0
      modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java
  13. 30 0
      modules/elasticsearch/src/main/java/org/elasticsearch/index/percolator/PercolatorExecutor.java
  14. 4 0
      modules/elasticsearch/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java
  15. 12 0
      modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java
  16. 15 6
      modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java
  17. 64 0
      modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java

+ 2 - 2
modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java

@@ -57,8 +57,8 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati
         return "ping/replication/shard";
     }
 
-    @Override protected ShardReplicationPingResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
-        return new ShardReplicationPingResponse();
+    @Override protected PrimaryResponse<ShardReplicationPingResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
+        return new PrimaryResponse<ShardReplicationPingResponse>(new ShardReplicationPingResponse(), null);
     }
 
     @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {

+ 2 - 2
modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java

@@ -232,8 +232,8 @@ public class BulkItemResponse implements Streamable {
      * The actual response ({@link IndexResponse} or {@link DeleteResponse}). <tt>null</tt> in
      * case of failure.
      */
-    public ActionResponse response() {
-        return response;
+    public <T extends ActionResponse> T response() {
+        return (T) response;
     }
 
     /**

+ 9 - 3
modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

@@ -113,6 +113,7 @@ public class BulkRequest implements ActionRequest {
             String parent = null;
             String opType = null;
             long version = 0;
+            String percolate = null;
 
             String currentFieldName = null;
             while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@@ -133,6 +134,8 @@ public class BulkRequest implements ActionRequest {
                         opType = parser.text();
                     } else if ("_version".equals(currentFieldName)) {
                         version = parser.longValue();
+                    } else if ("percolate".equals(currentFieldName)) {
+                        percolate = parser.textOrNull();
                     }
                 }
             }
@@ -148,16 +151,19 @@ public class BulkRequest implements ActionRequest {
                 if ("index".equals(action)) {
                     if (opType == null) {
                         add(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
-                                .source(data, from, nextMarker - from, contentUnsafe));
+                                .source(data, from, nextMarker - from, contentUnsafe)
+                                .percolate(percolate));
                     } else {
                         add(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
                                 .create("create".equals(opType))
-                                .source(data, from, nextMarker - from, contentUnsafe));
+                                .source(data, from, nextMarker - from, contentUnsafe)
+                                .percolate(percolate));
                     }
                 } else if ("create".equals(action)) {
                     add(new IndexRequest(index, type, id).routing(routing).parent(parent).version(version)
                             .create(true)
-                            .source(data, from, nextMarker - from, contentUnsafe));
+                            .source(data, from, nextMarker - from, contentUnsafe)
+                            .percolate(percolate));
                 }
                 // move pointers
                 from = nextMarker + 1;

+ 37 - 2
modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

@@ -34,6 +34,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.routing.ShardIterator;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.Sets;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
@@ -43,6 +44,8 @@ import org.elasticsearch.index.mapper.DocumentMapper;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.SourceToParse;
+import org.elasticsearch.index.percolator.PercolatorExecutor;
+import org.elasticsearch.index.service.IndexService;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.service.IndexShard;
 import org.elasticsearch.indices.IndicesService;
@@ -98,7 +101,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
         return clusterState.routingTable().index(request.index()).shard(request.shardId()).shardsIt();
     }
 
-    @Override protected BulkShardResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
+    @Override protected PrimaryResponse<BulkShardResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
         IndexShard indexShard = indexShard(shardRequest);
         final BulkShardRequest request = shardRequest.request;
         BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
@@ -202,7 +205,39 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
                 }
             }
         }
-        return new BulkShardResponse(new ShardId(request.index(), request.shardId()), responses);
+        BulkShardResponse response = new BulkShardResponse(new ShardId(request.index(), request.shardId()), responses);
+        return new PrimaryResponse<BulkShardResponse>(response, ops);
+    }
+
+    @Override protected void postPrimaryOperation(BulkShardRequest request, PrimaryResponse<BulkShardResponse> response) {
+        IndexService indexService = indicesService.indexServiceSafe(request.index());
+        Engine.Operation[] ops = (Engine.Operation[]) response.payload();
+        for (int i = 0; i < ops.length; i++) {
+            BulkItemRequest itemRequest = request.items()[i];
+            BulkItemResponse itemResponse = response.response().responses()[i];
+            if (itemResponse.failed()) {
+                // failure, continue
+                continue;
+            }
+            if (itemRequest.request() instanceof IndexRequest) {
+                IndexRequest indexRequest = (IndexRequest) itemRequest.request();
+                if (!Strings.hasLength(indexRequest.percolate())) {
+                    continue;
+                }
+                ParsedDocument doc;
+                if (ops[i] instanceof Engine.Create) {
+                    doc = ((Engine.Create) ops[i]).parsedDoc();
+                } else {
+                    doc = ((Engine.Index) ops[i]).parsedDoc();
+                }
+                try {
+                    PercolatorExecutor.Response percolate = indexService.percolateService().percolate(new PercolatorExecutor.DocAndSourceQueryRequest(doc, indexRequest.percolate()));
+                    ((IndexResponse) itemResponse.response()).matches(percolate.matches());
+                } catch (Exception e) {
+                    logger.warn("failed to percolate [{}]", e, itemRequest.request());
+                }
+            }
+        }
     }
 
     @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {

+ 3 - 2
modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java

@@ -142,7 +142,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
         state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
     }
 
-    @Override protected DeleteResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
+    @Override protected PrimaryResponse<DeleteResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
         DeleteRequest request = shardRequest.request;
         IndexShard indexShard = indexShard(shardRequest);
         Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
@@ -151,7 +151,8 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
         indexShard.delete(delete);
         // update the request with teh version so it will go to the replicas
         request.version(delete.version());
-        return new DeleteResponse(request.index(), request.type(), request.id(), delete.version(), delete.notFound());
+        DeleteResponse response = new DeleteResponse(request.index(), request.type(), request.id(), delete.version(), delete.notFound());
+        return new PrimaryResponse<DeleteResponse>(response, null);
     }
 
     @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {

+ 3 - 2
modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java

@@ -66,7 +66,7 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
         state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
     }
 
-    @Override protected ShardDeleteResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
+    @Override protected PrimaryResponse<ShardDeleteResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
         ShardDeleteRequest request = shardRequest.request;
         IndexShard indexShard = indexShard(shardRequest);
         Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
@@ -75,7 +75,8 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
         indexShard.delete(delete);
         // update the version to happen on the replicas
         request.version(delete.version());
-        return new ShardDeleteResponse(delete.version(), delete.notFound());
+        ShardDeleteResponse response = new ShardDeleteResponse(delete.version(), delete.notFound());
+        return new PrimaryResponse<ShardDeleteResponse>(response, null);
     }
 
     @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {

+ 2 - 2
modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java

@@ -64,10 +64,10 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
         state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
     }
 
-    @Override protected ShardDeleteByQueryResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
+    @Override protected PrimaryResponse<ShardDeleteByQueryResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
         ShardDeleteByQueryRequest request = shardRequest.request;
         indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.types());
-        return new ShardDeleteByQueryResponse();
+        return new PrimaryResponse<ShardDeleteByQueryResponse>(new ShardDeleteByQueryResponse(), null);
     }
 
     @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {

+ 24 - 0
modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java

@@ -125,6 +125,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
 
     private boolean refresh = false;
     private long version = 0;
+    private String percolate;
 
     private XContentType contentType = Requests.INDEX_CONTENT_TYPE;
 
@@ -533,6 +534,20 @@ public class IndexRequest extends ShardReplicationOperationRequest {
         return this.version;
     }
 
+    /**
+     * Causes the index request document to be percolated. The parameter is the percolate query
+     * to use to reduce the percolated queries that are going to run against this doc. Can be
+     * set to <tt>*</tt> to indicate that all percolate queries should be run.
+     */
+    public IndexRequest percolate(String percolate) {
+        this.percolate = percolate;
+        return this;
+    }
+
+    public String percolate() {
+        return this.percolate;
+    }
+
     public void processRouting(MappingMetaData mappingMd) throws ElasticSearchException {
         if (routing == null && mappingMd.routing().hasPath()) {
             XContentParser parser = null;
@@ -576,6 +591,9 @@ public class IndexRequest extends ShardReplicationOperationRequest {
         opType = OpType.fromId(in.readByte());
         refresh = in.readBoolean();
         version = in.readLong();
+        if (in.readBoolean()) {
+            percolate = in.readUTF();
+        }
     }
 
     @Override public void writeTo(StreamOutput out) throws IOException {
@@ -604,6 +622,12 @@ public class IndexRequest extends ShardReplicationOperationRequest {
         out.writeByte(opType.id());
         out.writeBoolean(refresh);
         out.writeLong(version);
+        if (percolate == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            out.writeUTF(percolate);
+        }
     }
 
     @Override public String toString() {

+ 56 - 0
modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexResponse.java

@@ -20,11 +20,14 @@
 package org.elasticsearch.action.index;
 
 import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.collect.ImmutableList;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * A response of an index operation,
@@ -43,6 +46,8 @@ public class IndexResponse implements ActionResponse, Streamable {
 
     private long version;
 
+    private List<String> matches;
+
     public IndexResponse() {
 
     }
@@ -110,11 +115,53 @@ public class IndexResponse implements ActionResponse, Streamable {
         return version();
     }
 
+    /**
+     * Returns the percolate queries matches. <tt>null</tt> if no percolation was requested.
+     */
+    public List<String> matches() {
+        return this.matches;
+    }
+
+    /**
+     * Returns the percolate queries matches. <tt>null</tt> if no percolation was requested.
+     */
+    public List<String> getMatches() {
+        return this.matches;
+    }
+
+    /**
+     * Internal.
+     */
+    public void matches(List<String> matches) {
+        this.matches = matches;
+    }
+
     @Override public void readFrom(StreamInput in) throws IOException {
         index = in.readUTF();
         id = in.readUTF();
         type = in.readUTF();
         version = in.readLong();
+        if (in.readBoolean()) {
+            int size = in.readVInt();
+            if (size == 0) {
+                matches = ImmutableList.of();
+            } else if (size == 1) {
+                matches = ImmutableList.of(in.readUTF());
+            } else if (size == 2) {
+                matches = ImmutableList.of(in.readUTF(), in.readUTF());
+            } else if (size == 3) {
+                matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF());
+            } else if (size == 4) {
+                matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF());
+            } else if (size == 5) {
+                matches = ImmutableList.of(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF());
+            } else {
+                matches = new ArrayList<String>();
+                for (int i = 0; i < size; i++) {
+                    matches.add(in.readUTF());
+                }
+            }
+        }
     }
 
     @Override public void writeTo(StreamOutput out) throws IOException {
@@ -122,5 +169,14 @@ public class IndexResponse implements ActionResponse, Streamable {
         out.writeUTF(id);
         out.writeUTF(type);
         out.writeLong(version);
+        if (matches == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            out.writeVInt(matches.size());
+            for (String match : matches) {
+                out.writeUTF(match);
+            }
+        }
     }
 }

+ 19 - 2
modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

@@ -35,6 +35,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.routing.ShardIterator;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.UUID;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
@@ -43,6 +44,8 @@ import org.elasticsearch.index.mapper.DocumentMapper;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.SourceToParse;
+import org.elasticsearch.index.percolator.PercolatorExecutor;
+import org.elasticsearch.index.service.IndexService;
 import org.elasticsearch.index.shard.service.IndexShard;
 import org.elasticsearch.indices.IndexAlreadyExistsException;
 import org.elasticsearch.indices.IndicesService;
@@ -148,7 +151,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
                 .indexShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing());
     }
 
-    @Override protected IndexResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
+    @Override protected PrimaryResponse<IndexResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
         final IndexRequest request = shardRequest.request;
 
         // validate, if routing is required, that we got routing
@@ -185,7 +188,21 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
         // update the version on the request, so it will be used for the replicas
         request.version(version);
 
-        return new IndexResponse(request.index(), request.type(), request.id(), version);
+        IndexResponse response = new IndexResponse(request.index(), request.type(), request.id(), version);
+        return new PrimaryResponse<IndexResponse>(response, doc);
+    }
+
+    @Override protected void postPrimaryOperation(IndexRequest request, PrimaryResponse<IndexResponse> response) {
+        if (!Strings.hasLength(request.percolate())) {
+            return;
+        }
+        IndexService indexService = indicesService.indexServiceSafe(request.index());
+        try {
+            PercolatorExecutor.Response percolate = indexService.percolateService().percolate(new PercolatorExecutor.DocAndSourceQueryRequest((ParsedDocument) response.payload(), request.percolate()));
+            response.response().matches(percolate.matches());
+        } catch (Exception e) {
+            logger.warn("failed to percolate [{}]", e, request);
+        }
     }
 
     @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {

+ 63 - 17
modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java

@@ -105,10 +105,17 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
 
     protected abstract String transportAction();
 
-    protected abstract Response shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest);
+    protected abstract PrimaryResponse<Response> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest);
 
     protected abstract void shardOperationOnReplica(ShardOperationRequest shardRequest);
 
+    /**
+     * Called once replica operations have been dispatched on the
+     */
+    protected void postPrimaryOperation(Request request, PrimaryResponse<Response> response) {
+
+    }
+
     protected abstract ShardIterator shards(ClusterState clusterState, Request request) throws ElasticSearchException;
 
     protected abstract boolean checkWriteConsistency();
@@ -418,7 +425,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
 
         private void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, boolean alreadyThreaded, final ShardRouting shard, ClusterState clusterState) {
             try {
-                Response response = shardOperationOnPrimary(clusterState, new ShardOperationRequest(primaryShardId, request));
+                PrimaryResponse<Response> response = shardOperationOnPrimary(clusterState, new ShardOperationRequest(primaryShardId, request));
                 performReplicas(response, alreadyThreaded);
             } catch (Exception e) {
                 // shard has not been allocated yet, retry it here
@@ -433,14 +440,15 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
             }
         }
 
-        private void performReplicas(final Response response, boolean alreadyThreaded) {
+        private void performReplicas(final PrimaryResponse<Response> response, boolean alreadyThreaded) {
             if (ignoreReplicas() || shardIt.size() == 1 /* no replicas */) {
+                postPrimaryOperation(request, response);
                 if (alreadyThreaded || !request.listenerThreaded()) {
-                    listener.onResponse(response);
+                    listener.onResponse(response.response());
                 } else {
                     threadPool.execute(new Runnable() {
                         @Override public void run() {
-                            listener.onResponse(response);
+                            listener.onResponse(response.response());
                         }
                     });
                 }
@@ -474,12 +482,13 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
             }
 
             if (replicaCounter == 0) {
+                postPrimaryOperation(request, response);
                 if (alreadyThreaded || !request.listenerThreaded()) {
-                    listener.onResponse(response);
+                    listener.onResponse(response.response());
                 } else {
                     threadPool.execute(new Runnable() {
                         @Override public void run() {
-                            listener.onResponse(response);
+                            listener.onResponse(response.response());
                         }
                     });
                 }
@@ -487,13 +496,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
             }
 
             if (replicationType == ReplicationType.ASYNC) {
+                postPrimaryOperation(request, response);
                 // async replication, notify the listener
                 if (alreadyThreaded || !request.listenerThreaded()) {
-                    listener.onResponse(response);
+                    listener.onResponse(response.response());
                 } else {
                     threadPool.execute(new Runnable() {
                         @Override public void run() {
-                            listener.onResponse(response);
+                            listener.onResponse(response.response());
                         }
                     });
                 }
@@ -501,6 +511,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
                 replicaCounter = -100;
             }
 
+            // we add one to the replica count to do the postPrimaryOperation
+            replicaCounter++;
+
             AtomicInteger counter = new AtomicInteger(replicaCounter);
             for (final ShardRouting shard : shardIt.reset()) {
                 // if its unassigned, nothing to do here...
@@ -528,19 +541,34 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
                     performOnReplica(response, alreadyThreaded, counter, shard, shard.relocatingNodeId());
                 }
             }
+
+            // now do the postPrimary operation, and check if the listener needs to be invoked
+            postPrimaryOperation(request, response);
+            // we also invoke here in case replicas finish before postPrimaryAction does
+            if (counter.decrementAndGet() == 0) {
+                if (alreadyThreaded || !request.listenerThreaded()) {
+                    listener.onResponse(response.response());
+                } else {
+                    threadPool.execute(new Runnable() {
+                        @Override public void run() {
+                            listener.onResponse(response.response());
+                        }
+                    });
+                }
+            }
         }
 
-        private void performOnReplica(final Response response, boolean alreadyThreaded, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
+        private void performOnReplica(final PrimaryResponse<Response> response, boolean alreadyThreaded, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
             // if we don't have that node, it means that it might have failed and will be created again, in
             // this case, we don't have to do the operation, and just let it failover
             if (!nodes.nodeExists(nodeId)) {
                 if (counter.decrementAndGet() == 0) {
                     if (alreadyThreaded || !request.listenerThreaded()) {
-                        listener.onResponse(response);
+                        listener.onResponse(response.response());
                     } else {
                         threadPool.execute(new Runnable() {
                             @Override public void run() {
-                                listener.onResponse(response);
+                                listener.onResponse(response.response());
                             }
                         });
                     }
@@ -569,11 +597,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
                             if (request.listenerThreaded()) {
                                 threadPool.execute(new Runnable() {
                                     @Override public void run() {
-                                        listener.onResponse(response);
+                                        listener.onResponse(response.response());
                                     }
                                 });
                             } else {
-                                listener.onResponse(response);
+                                listener.onResponse(response.response());
                             }
                         }
                     }
@@ -597,7 +625,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
                                 }
                             }
                             if (counter.decrementAndGet() == 0) {
-                                listener.onResponse(response);
+                                listener.onResponse(response.response());
                             }
                         }
                     });
@@ -614,11 +642,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
                         if (request.listenerThreaded()) {
                             threadPool.execute(new Runnable() {
                                 @Override public void run() {
-                                    listener.onResponse(response);
+                                    listener.onResponse(response.response());
                                 }
                             });
                         } else {
-                            listener.onResponse(response);
+                            listener.onResponse(response.response());
                         }
                     }
                 }
@@ -654,4 +682,22 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
             return false;
         }
     }
+
+    public static class PrimaryResponse<T> {
+        private final T response;
+        private final Object payload;
+
+        public PrimaryResponse(T response, Object payload) {
+            this.response = response;
+            this.payload = payload;
+        }
+
+        public T response() {
+            return response;
+        }
+
+        public Object payload() {
+            return payload;
+        }
+    }
 }

+ 10 - 0
modules/elasticsearch/src/main/java/org/elasticsearch/client/action/index/IndexRequestBuilder.java

@@ -284,6 +284,16 @@ public class IndexRequestBuilder extends BaseRequestBuilder<IndexRequest, IndexR
         return this;
     }
 
+    /**
+     * Causes the index request document to be percolated. The parameter is the percolate query
+     * to use to reduce the percolated queries that are going to run against this doc. Can be
+     * set to <tt>*</tt> to indicate that all percolate queries should be run.
+     */
+    public IndexRequestBuilder setPercolate(String percolate) {
+        request.percolate(percolate);
+        return this;
+    }
+
     /**
      * Should the listener be called on a separate thread if needed.
      */

+ 30 - 0
modules/elasticsearch/src/main/java/org/elasticsearch/index/percolator/PercolatorExecutor.java

@@ -29,6 +29,7 @@ import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Scorer;
 import org.elasticsearch.ElasticSearchException;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.ImmutableMap;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.inject.Inject;
@@ -50,6 +51,7 @@ import org.elasticsearch.index.mapper.*;
 import org.elasticsearch.index.query.IndexQueryParser;
 import org.elasticsearch.index.query.IndexQueryParserService;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.xcontent.QueryBuilders;
 import org.elasticsearch.index.service.IndexService;
 import org.elasticsearch.index.settings.IndexSettings;
 import org.elasticsearch.index.shard.ShardId;
@@ -97,6 +99,25 @@ public class PercolatorExecutor extends AbstractIndexComponent {
         }
     }
 
+    public static class DocAndSourceQueryRequest {
+        private final ParsedDocument doc;
+        @Nullable private final String query;
+
+        public DocAndSourceQueryRequest(ParsedDocument doc, @Nullable String query) {
+            this.doc = doc;
+            this.query = query;
+        }
+
+        public ParsedDocument doc() {
+            return this.doc;
+        }
+
+        @Nullable String query() {
+            return this.query;
+        }
+    }
+
+
     public static class DocAndQueryRequest {
         private final ParsedDocument doc;
         @Nullable private final Query query;
@@ -276,6 +297,15 @@ public class PercolatorExecutor extends AbstractIndexComponent {
         return percolate(new DocAndQueryRequest(doc, query));
     }
 
+    public Response percolate(DocAndSourceQueryRequest request) throws ElasticSearchException {
+        Query query = null;
+        if (Strings.hasLength(request.query()) && !request.query().equals("*")) {
+            IndexQueryParser queryParser = queryParserService.defaultIndexQueryParser();
+            query = queryParser.parse(QueryBuilders.queryString(request.query())).query();
+        }
+        return percolate(new DocAndQueryRequest(request.doc(), query));
+    }
+
     public Response percolate(DocAndQueryRequest request) throws ElasticSearchException {
         // first, parse the source doc into a MemoryIndex
         final MemoryIndex memoryIndex = new MemoryIndex();

+ 4 - 0
modules/elasticsearch/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java

@@ -82,6 +82,10 @@ public class PercolatorService extends AbstractIndexComponent {
         return percolator.percolate(request);
     }
 
+    public PercolatorExecutor.Response percolate(PercolatorExecutor.DocAndSourceQueryRequest request) throws PercolatorException {
+        return percolator.percolate(request);
+    }
+
     private void loadQueries(String indexName) {
         IndexService indexService = percolatorIndexService();
         IndexShard shard = indexService.shard(0);

+ 12 - 0
modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java

@@ -24,6 +24,7 @@ import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.replication.ReplicationType;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
@@ -105,6 +106,16 @@ public class RestBulkAction extends BaseRestHandler {
                         } else {
                             builder.field(Fields.OK, true);
                         }
+                        if (itemResponse.response() instanceof IndexResponse) {
+                            IndexResponse indexResponse = itemResponse.response();
+                            if (indexResponse.matches() != null) {
+                                builder.startArray(Fields.MATCHES);
+                                for (String match : indexResponse.matches()) {
+                                    builder.value(match);
+                                }
+                                builder.endArray();
+                            }
+                        }
                         builder.endObject();
                         builder.endObject();
                     }
@@ -136,6 +147,7 @@ public class RestBulkAction extends BaseRestHandler {
         static final XContentBuilderString OK = new XContentBuilderString("ok");
         static final XContentBuilderString TOOK = new XContentBuilderString("took");
         static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
+        static final XContentBuilderString MATCHES = new XContentBuilderString("matches");
     }
 
 }

+ 15 - 6
modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java

@@ -70,6 +70,7 @@ public class RestIndexAction extends BaseRestHandler {
         indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
         indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));
         indexRequest.version(RestActions.parseVersion(request));
+        indexRequest.percolate(request.param("percolate", null));
         String sOpType = request.param("op_type");
         if (sOpType != null) {
             if ("index".equals(sOpType)) {
@@ -99,16 +100,23 @@ public class RestIndexAction extends BaseRestHandler {
         // we don't spawn, then fork if local
         indexRequest.operationThreaded(true);
         client.index(indexRequest, new ActionListener<IndexResponse>() {
-            @Override public void onResponse(IndexResponse result) {
+            @Override public void onResponse(IndexResponse response) {
                 try {
                     XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
                     builder.startObject()
                             .field(Fields.OK, true)
-                            .field(Fields._INDEX, result.index())
-                            .field(Fields._TYPE, result.type())
-                            .field(Fields._ID, result.id())
-                            .field(Fields._VERSION, result.version())
-                            .endObject();
+                            .field(Fields._INDEX, response.index())
+                            .field(Fields._TYPE, response.type())
+                            .field(Fields._ID, response.id())
+                            .field(Fields._VERSION, response.version());
+                    if (response.matches() != null) {
+                        builder.startArray(Fields.MATCHES);
+                        for (String match : response.matches()) {
+                            builder.value(match);
+                        }
+                        builder.endArray();
+                    }
+                    builder.endObject();
                     channel.sendResponse(new XContentRestResponse(request, OK, builder));
                 } catch (Exception e) {
                     onFailure(e);
@@ -138,6 +146,7 @@ public class RestIndexAction extends BaseRestHandler {
         static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
         static final XContentBuilderString _ID = new XContentBuilderString("_id");
         static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
+        static final XContentBuilderString MATCHES = new XContentBuilderString("matches");
     }
 
 }

+ 64 - 0
modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java

@@ -19,8 +19,12 @@
 
 package org.elasticsearch.test.integration.percolator;
 
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.percolate.PercolateResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.test.integration.AbstractNodesTests;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -126,6 +130,66 @@ public class SimplePercolatorTests extends AbstractNodesTests {
         }
     }
 
+    @Test public void percolateOnIndexOperation() throws Exception {
+        try {
+            client.admin().indices().prepareDelete("test").execute().actionGet();
+        } catch (Exception e) {
+            // ignore
+        }
+        try {
+            client.admin().indices().prepareDelete("_percolator").execute().actionGet();
+        } catch (Exception e) {
+            // ignore
+        }
+
+        client.admin().indices().prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 2)).execute().actionGet();
+        client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
+
+        logger.info("--> register a query");
+        client.prepareIndex("_percolator", "test", "kuku")
+                .setSource(jsonBuilder().startObject()
+                        .field("color", "blue")
+                        .field("query", termQuery("field1", "value1"))
+                        .endObject())
+                .setRefresh(true)
+                .execute().actionGet();
+        client.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(4).execute().actionGet();
+
+        for (int i = 0; i < 10; i++) {
+            IndexResponse index = client.prepareIndex("test", "type1", Integer.toString(i)).setSource("field1", "value1")
+                    .setPercolate("*").execute().actionGet();
+            assertThat(index.matches().size(), equalTo(1));
+            assertThat(index.matches(), hasItem("kuku"));
+        }
+
+        for (int i = 0; i < 10; i++) {
+            IndexResponse index = client.prepareIndex("test", "type1", Integer.toString(i)).setSource("field1", "value1")
+                    .setPercolate("color:blue").execute().actionGet();
+            assertThat(index.matches().size(), equalTo(1));
+            assertThat(index.matches(), hasItem("kuku"));
+        }
+
+        for (int i = 0; i < 10; i++) {
+            IndexResponse index = client.prepareIndex("test", "type1", Integer.toString(i)).setSource("field1", "value1")
+                    .setPercolate("color:green").execute().actionGet();
+            assertThat(index.matches().size(), equalTo(0));
+        }
+
+        // test bulk
+        BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
+        for (int i = 0; i < 10; i++) {
+            bulkRequestBuilder.add(client.prepareIndex("test", "type1", Integer.toString(i)).setSource("field1", "value1")
+                    .setPercolate("*"));
+        }
+        BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
+        assertThat(bulkResponse.hasFailures(), equalTo(false));
+        for (BulkItemResponse bulkItemResponse : bulkResponse) {
+            IndexResponse index = bulkItemResponse.response();
+            assertThat(index.matches().size(), equalTo(1));
+            assertThat(index.matches(), hasItem("kuku"));
+        }
+    }
+
     @Test public void dynamicAddingRemovingQueries() throws Exception {
         try {
             client.admin().indices().prepareDelete("test").execute().actionGet();