Browse Source

Core: add only_ancient_segments to upgrade API, so only segments with an old Lucene version are upgraded

This option defaults to false, because it is also important to upgrade
the "merely old" segments since many Lucene improvements happen within
minor releases.

But you can pass true to do the minimal work necessary to upgrade to
the next major Elasticsearch release.

The HTTP GET upgrade request now also breaks out how many bytes of
ancient segments need upgrading.

Closes #10213

Closes #10540

Conflicts:
	dev-tools/create_bwc_index.py
	rest-api-spec/api/indices.upgrade.json
	src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java
	src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeRequest.java
	src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java
	src/main/java/org/elasticsearch/index/engine/InternalEngine.java
	src/test/java/org/elasticsearch/bwcompat/StaticIndexBackwardCompatibilityTest.java
	src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
	src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeReallyOldIndexTest.java
Michael McCandless 10 years ago
parent
commit
399f0ccce9

+ 37 - 19
dev-tools/create-bwc-index.py → dev-tools/create_bwc_index.py

@@ -19,11 +19,15 @@ import glob
 import logging
 import os
 import random
+import shutil
 import subprocess
 import sys
 import tempfile
 import time
 
+DEFAULT_TRANSPORT_TCP_PORT = 9300
+DEFAULT_HTTP_TCP_PORT = 9200
+
 if sys.version_info[0] < 3:
   print('%s must use python 3.x (for the ES python client)' % sys.argv[0])
 
@@ -126,14 +130,17 @@ def build_version(version_tuple):
 def build_tuple(version_string):
   return [int(x) for x in version_string.split('.')]
 
-def start_node(version, release_dir, data_dir, tcp_port, http_port):
-  logging.info('Starting node from %s on port %s/%s' % (release_dir, tcp_port, http_port))
+def start_node(version, release_dir, data_dir, tcp_port=DEFAULT_TRANSPORT_TCP_PORT, http_port=DEFAULT_HTTP_TCP_PORT, cluster_name=None):
+  logging.info('Starting node from %s on port %s/%s, data_dir %s' % (release_dir, tcp_port, http_port, data_dir))
+  if cluster_name is None:
+    cluster_name = 'bwc_index_' + version
+    
   cmd = [
     os.path.join(release_dir, 'bin/elasticsearch'),
     '-Des.path.data=%s' % data_dir,
     '-Des.path.logs=logs',
-    '-Des.cluster.name=bwc_index_' + version,  
-    '-Des.network.host=localhost', 
+    '-Des.cluster.name=%s' % cluster_name,
+    '-Des.network.host=localhost',
     '-Des.discovery.zen.ping.multicast.enabled=false',
     '-Des.transport.tcp.port=%s' % tcp_port,
     '-Des.http.port=%s' % http_port
@@ -142,7 +149,7 @@ def start_node(version, release_dir, data_dir, tcp_port, http_port):
     cmd.append('-f') # version before 1.0 start in background automatically
   return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
-def create_client(http_port, timeout=30):
+def create_client(http_port=DEFAULT_HTTP_TCP_PORT, timeout=30):
   logging.info('Waiting for node to startup')
   for _ in range(0, timeout):
     # TODO: ask Honza if there is a better way to do this?
@@ -158,8 +165,6 @@ def create_client(http_port, timeout=30):
 
 def generate_index(client, version, index_name):
   client.indices.delete(index=index_name, ignore=404)
-  num_shards = random.randint(1, 10)
-  num_replicas = random.randint(0, 1)
   logging.info('Create single shard test index')
 
   mappings = {}
@@ -300,7 +305,7 @@ def compress(tmp_dir, output_dir, zipfile, directory):
   zipfile = os.path.join(abs_output_dir, zipfile)
   if os.path.exists(zipfile):
     os.remove(zipfile)
-  logging.info('Compressing index into %s', zipfile)
+  logging.info('Compressing index into %s, tmpDir %s', zipfile, tmp_dir)
   olddir = os.getcwd()
   os.chdir(tmp_dir)
   subprocess.check_call('zip -r %s %s' % (zipfile, directory), shell=True)
@@ -318,9 +323,9 @@ def parse_config():
                       help='The directory containing elasticsearch releases')
   parser.add_argument('--output-dir', '-o', default='src/test/resources/org/elasticsearch/bwcompat',
                       help='The directory to write the zipped index into')
-  parser.add_argument('--tcp-port', default=9300, type=int,
+  parser.add_argument('--tcp-port', default=DEFAULT_TRANSPORT_TCP_PORT, type=int,
                       help='The port to use as the minimum port for TCP communication')
-  parser.add_argument('--http-port', default=9200, type=int,
+  parser.add_argument('--http-port', default=DEFAULT_HTTP_TCP_PORT, type=int,
                       help='The port to use as the minimum port for HTTP communication')
   cfg = parser.parse_args()
 
@@ -339,14 +344,17 @@ def create_bwc_index(cfg, version):
   logging.info('--> Creating bwc index for %s' % version)
   release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % version)
   if not os.path.exists(release_dir):
-    parser.error('ES version %s does not exist in %s' % (version, cfg.releases_dir)) 
+    raise RuntimeError('ES version %s does not exist in %s' % (version, cfg.releases_dir))
   snapshot_supported = not (version.startswith('0.') or version == '1.0.0.Beta1')
   tmp_dir = tempfile.mkdtemp()
+
   data_dir = os.path.join(tmp_dir, 'data')
   repo_dir = os.path.join(tmp_dir, 'repo')
   logging.info('Temp data dir: %s' % data_dir)
   logging.info('Temp repo dir: %s' % repo_dir)
 
+  node = None
+
   try:
     node = start_node(version, release_dir, data_dir, cfg.tcp_port, cfg.http_port)
     client = create_client(cfg.http_port)
@@ -359,16 +367,26 @@ def create_bwc_index(cfg, version):
     # this after the snapshot, because it calls flush.  Otherwise the index
     # will already have the deletions applied on upgrade.
     delete_by_query(client, version, index_name, 'doc')
-    
+
+    shutdown_node(node)
+    node = None
+
+    compress_index(version, tmp_dir, cfg.output_dir)
+    if snapshot_supported:
+      compress_repo(version, tmp_dir, cfg.output_dir)
   finally:
-    if 'node' in vars():
-      logging.info('Shutting down node with pid %d', node.pid)
-      node.terminate()
-      time.sleep(1) # some nodes take time to terminate
-  compress_index(version, tmp_dir, cfg.output_dir)
-  if snapshot_supported:
-    compress_repo(version, tmp_dir, cfg.output_dir)
 
+    if node is not None:
+      # This only happens if we've hit an exception:
+      shutdown_node(node)
+      
+    shutil.rmtree(tmp_dir)
+
+def shutdown_node(node):
+  logging.info('Shutting down node with pid %d', node.pid)
+  node.terminate()
+  node.wait()
+    
 def main():
   logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
                       datefmt='%Y-%m-%d %I:%M:%S %p')

+ 113 - 0
dev-tools/create_bwc_index_with_some_ancient_segments.py

@@ -0,0 +1,113 @@
+import create_bwc_index
+import logging
+import os
+import random
+import shutil
+import subprocess
+import sys
+import tempfile
+
+def fetch_version(version):
+  logging.info('fetching ES version %s' % version)
+  if subprocess.call([sys.executable, os.path.join(os.path.split(sys.argv[0])[0], 'get-bwc-version.py'), version]) != 0:
+    raise RuntimeError('failed to download ES version %s' % version)
+
+def main():
+  '''
+  Creates a static back compat index (.zip) with mixed 0.20 (Lucene 3.x) and 0.90 (Lucene 4.x) segments. 
+  '''
+  
+  logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
+                      datefmt='%Y-%m-%d %I:%M:%S %p')
+  logging.getLogger('elasticsearch').setLevel(logging.ERROR)
+  logging.getLogger('urllib3').setLevel(logging.WARN)
+
+  tmp_dir = tempfile.mkdtemp()
+  try:
+    data_dir = os.path.join(tmp_dir, 'data')
+    logging.info('Temp data dir: %s' % data_dir)
+
+    first_version = '0.20.6'
+    second_version = '0.90.6'
+    index_name = 'index-%s-and-%s' % (first_version, second_version)
+
+    # Download old ES releases if necessary:
+    release_dir = os.path.join('backwards', 'elasticsearch-%s' % first_version)
+    if not os.path.exists(release_dir):
+      fetch_version(first_version)
+
+    node = create_bwc_index.start_node(first_version, release_dir, data_dir, cluster_name=index_name)
+    client = create_bwc_index.create_client()
+
+    # Creates the index & indexes docs w/ first_version:
+    create_bwc_index.generate_index(client, first_version, index_name)
+
+    # Make sure we write segments:
+    flush_result = client.indices.flush(index=index_name)
+    if not flush_result['ok']:
+      raise RuntimeError('flush failed: %s' % str(flush_result))
+
+    segs = client.indices.segments(index=index_name)
+    shards = segs['indices'][index_name]['shards']
+    if len(shards) != 1:
+      raise RuntimeError('index should have 1 shard but got %s' % len(shards))
+
+    first_version_segs = shards['0'][0]['segments'].keys()
+
+    create_bwc_index.shutdown_node(node)
+    print('%s server output:\n%s' % (first_version, node.stdout.read().decode('utf-8')))
+    node = None
+
+    release_dir = os.path.join('backwards', 'elasticsearch-%s' % second_version)
+    if not os.path.exists(release_dir):
+      fetch_version(second_version)
+
+    # Now also index docs with second_version:
+    node = create_bwc_index.start_node(second_version, release_dir, data_dir, cluster_name=index_name)
+    client = create_bwc_index.create_client()
+
+    # If we index too many docs, the random refresh/flush causes the ancient segments to be merged away:
+    num_docs = 10
+    create_bwc_index.index_documents(client, index_name, 'doc', num_docs)
+
+    # Make sure we get a segment:
+    flush_result = client.indices.flush(index=index_name)
+    if not flush_result['ok']:
+      raise RuntimeError('flush failed: %s' % str(flush_result))
+
+    # Make sure we see mixed segments (it's possible Lucene could have "accidentally" merged away the first_version segments):
+    segs = client.indices.segments(index=index_name)
+    shards = segs['indices'][index_name]['shards']
+    if len(shards) != 1:
+      raise RuntimeError('index should have 1 shard but got %s' % len(shards))
+
+    second_version_segs = shards['0'][0]['segments'].keys()
+    #print("first: %s" % first_version_segs)
+    #print("second: %s" % second_version_segs)
+
+    for segment_name in first_version_segs:
+      if segment_name in second_version_segs:
+        # Good: an ancient version seg "survived":
+        break
+    else:
+      raise RuntimeError('index has no first_version segs left')
+
+    for segment_name in second_version_segs:
+      if segment_name not in first_version_segs:
+        # Good: a second_version segment was written
+        break
+    else:
+      raise RuntimeError('index has no second_version segs left')
+
+    create_bwc_index.shutdown_node(node)
+    print('%s server output:\n%s' % (second_version, node.stdout.read().decode('utf-8')))
+    node = None
+    create_bwc_index.compress_index('%s-and-%s' % (first_version, second_version), tmp_dir, 'src/test/resources/org/elasticsearch/rest/action/admin/indices/upgrade')
+  finally:
+    if node is not None:
+      create_bwc_index.shutdown_node(node)
+    shutil.rmtree(tmp_dir)
+    
+if __name__ == '__main__':
+  main()
+  

+ 22 - 2
docs/reference/indices/upgrade.asciidoc

@@ -21,12 +21,30 @@ This call will block until the upgrade is complete. If the http connection
 is lost, the request will continue in the background, and
 any new requests will block until the previous upgrade is complete.
 
+[float]
+[[upgrade-parameters]]
+==== Request Parameters
+
+The `upgrade` API accepts the following request parameters:
+
+[horizontal]
+`only_ancient_segments`:: If true, only very old segments (from a
+previous Lucene major release) will be upgraded.  While this will do
+the minimal work to ensure the next major release of Elasticsearch can
+read the segments, it's dangerous because it can leave other very old
+segments in sub-optimal formats.  Defaults to `false`.
+
 [float]
 === Check upgrade status
 
 Use a `GET` request to monitor how much of an index is upgraded.  This
-can also be used prior to starting an upgrade to identify which indices
-you want to upgrade at the same time.
+can also be used prior to starting an upgrade to identify which
+indices you want to upgrade at the same time.
+
+The `ancient` byte values that are returned indicate total bytes of
+segments whose version is extremely old (Lucene major version is
+different from the current version), showing how much upgrading is
+necessary when you run with `only_ancient_segments=true`.
 
 [source,sh]
 --------------------------------------------------
@@ -41,6 +59,8 @@ curl 'http://localhost:9200/twitter/_upgrade?pretty&human'
       "size_in_bytes": "21000000000",
       "size_to_upgrade": "10gb",
       "size_to_upgrade_in_bytes": "10000000000"
+      "size_to_upgrade_ancient": "1gb",
+      "size_to_upgrade_ancient_in_bytes": "1000000000"
    }
 }
 --------------------------------------------------

+ 5 - 1
rest-api-spec/api/indices.upgrade.json

@@ -27,8 +27,12 @@
             "description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)"
         },
         "wait_for_completion": {
+           "type" : "boolean",
+           "description" : "Specify whether the request should block until the all segments are upgraded (default: false)"
+	},
+        "only_ancient_segments": {
           "type" : "boolean",
-          "description" : "Specify whether the request should block until the all segments are upgraded (default: false)"
+          "description" : "If true, only ancient (an older Lucene major release) segments will be upgraded"
         }
       }
     },

+ 21 - 0
src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java

@@ -44,12 +44,14 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
         public static final boolean ONLY_EXPUNGE_DELETES = false;
         public static final boolean FLUSH = true;
         public static final boolean UPGRADE = false;
+        public static final boolean UPGRADE_ONLY_ANCIENT_SEGMENTS = false;
     }
     
     private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
     private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
     private boolean flush = Defaults.FLUSH;
     private boolean upgrade = Defaults.UPGRADE;
+    private boolean upgradeOnlyAncientSegments = Defaults.UPGRADE_ONLY_ANCIENT_SEGMENTS;
 
     /**
      * Constructs an optimization request over one or more indices.
@@ -136,6 +138,7 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
         onlyExpungeDeletes = in.readBoolean();
         flush = in.readBoolean();
         upgrade = in.readBoolean();
+        upgradeOnlyAncientSegments = in.readBoolean();
     }
 
     @Override
@@ -145,6 +148,23 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
         out.writeBoolean(onlyExpungeDeletes);
         out.writeBoolean(flush);
         out.writeBoolean(upgrade);
+        out.writeBoolean(upgradeOnlyAncientSegments);
+    }
+
+    /**
+     * Should the merge upgrade only the ancient (older major version of Lucene) segments?
+     * Defaults to <tt>false</tt>.
+     */
+    public boolean upgradeOnlyAncientSegments() {
+        return upgradeOnlyAncientSegments;
+    }
+
+    /**
+     * See {@link #upgradeOnlyAncientSegments()}
+     */
+    public OptimizeRequest upgradeOnlyAncientSegments(boolean upgradeOnlyAncientSegments) {
+        this.upgradeOnlyAncientSegments = upgradeOnlyAncientSegments;
+        return this;
     }
 
     @Override
@@ -154,6 +174,7 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
                 ", onlyExpungeDeletes=" + onlyExpungeDeletes +
                 ", flush=" + flush +
                 ", upgrade=" + upgrade +
+                ", upgradeOnlyAncientSegments=" + upgradeOnlyAncientSegments +
                 '}';
     }
 }

+ 2 - 2
src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -430,13 +430,13 @@ public abstract class Engine implements Closeable {
      * Optimizes to 1 segment
      */
     public void forceMerge(boolean flush) {
-        forceMerge(flush, 1, false, false);
+        forceMerge(flush, 1, false, false, false);
     }
 
     /**
      * Triggers a forced merge on this engine
      */
-    public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
+    public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException;
 
     /**
      * Snapshots the index and returns a handle to it. Will always try and "commit" the

+ 6 - 5
src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -615,7 +615,7 @@ public class InternalEngine extends Engine {
             }
         }
         /*
-         * Unfortunately the lock order is important here. We have to acquire the readlock fist otherwise
+         * Unfortunately the lock order is important here. We have to acquire the readlock first otherwise
          * if we are flushing at the end of the recovery while holding the write lock we can deadlock if:
          *  Thread 1: flushes via API and gets the flush lock but blocks on the readlock since Thread 2 has the writeLock
          *  Thread 2: flushes at the end of the recovery holding the writeLock and blocks on the flushLock owned by Thread 1
@@ -742,7 +742,8 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
+    public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
+                           final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException {
         /*
          * We do NOT acquire the readlock here since we are waiting on the merges to finish
          * that's fine since the IW.rollback should stop all the threads and trigger an IOException
@@ -760,8 +761,8 @@ public class InternalEngine extends Engine {
         try {
             ensureOpen();
             if (upgrade) {
-                logger.info("starting segment upgrade");
-                mp.setUpgradeInProgress(true);
+                logger.info("starting segment upgrade upgradeOnlyAncientSegments={}", upgradeOnlyAncientSegments);
+                mp.setUpgradeInProgress(true, upgradeOnlyAncientSegments);
             }
             store.incRef(); // increment the ref just to ensure nobody closes the store while we optimize
             try {
@@ -789,7 +790,7 @@ public class InternalEngine extends Engine {
             throw ex;
         } finally {
             try {
-                mp.setUpgradeInProgress(false); // reset it just to make sure we reset it in a case of an error
+                mp.setUpgradeInProgress(false, false); // reset it just to make sure we reset it in a case of an error
             } finally {
                 optimizeLock.unlock();
             }

+ 1 - 1
src/main/java/org/elasticsearch/index/engine/ShadowEngine.java

@@ -147,7 +147,7 @@ public class ShadowEngine extends Engine {
     }
 
     @Override
-    public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException {
+    public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException {
         // no-op
         logger.trace("skipping FORCE-MERGE on shadow engine");
     }

+ 42 - 7
src/main/java/org/elasticsearch/index/merge/policy/ElasticsearchMergePolicy.java

@@ -48,7 +48,13 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
     private static ESLogger logger = Loggers.getLogger(ElasticsearchMergePolicy.class);
 
     private final MergePolicy delegate;
+
+    // True if the next merge request should do segment upgrades:
     private volatile boolean upgradeInProgress;
+
+    // True if the next merge request should only upgrade ancient (an older Lucene major version than current) segments;
+    private volatile boolean upgradeOnlyAncientSegments;
+
     private static final int MAX_CONCURRENT_UPGRADE_MERGES = 5;
 
     /** @param delegate the merge policy to wrap */
@@ -113,6 +119,26 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
         return upgradedMergeSpecification(delegate.findMerges(mergeTrigger, segmentInfos, writer));
     }
 
+    private boolean shouldUpgrade(SegmentCommitInfo info) {
+        org.apache.lucene.util.Version old = info.info.getVersion();
+        org.apache.lucene.util.Version cur = Version.CURRENT.luceneVersion;
+
+        // Something seriously wrong if this trips:
+        assert old.major <= cur.major;
+
+        if (cur.major > old.major) {
+            // Always upgrade segment if Lucene's major version is too old
+            return true;
+        }
+        if (upgradeOnlyAncientSegments == false && cur.minor > old.minor) {
+            // If it's only a minor version difference, and we are not upgrading only ancient segments,
+            // also upgrade:
+            return true;
+        }
+        // Version matches, or segment is not ancient and we are only upgrading ancient segments:
+        return false;
+    }
+
     @Override
     public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
         int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
@@ -121,27 +147,35 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
         if (upgradeInProgress) {
             MergeSpecification spec = new IndexUpgraderMergeSpecification();
             for (SegmentCommitInfo info : segmentInfos) {
-                org.apache.lucene.util.Version old = info.info.getVersion();
-                org.apache.lucene.util.Version cur = Version.CURRENT.luceneVersion;
-                if (cur.major > old.major ||
-                    cur.major == old.major && cur.minor > old.minor) {
+
+                if (shouldUpgrade(info)) {
+
                     // TODO: Use IndexUpgradeMergePolicy instead.  We should be comparing codecs,
                     // for now we just assume every minor upgrade has a new format.
                     logger.debug("Adding segment " + info.info.name + " to be upgraded");
                     spec.add(new OneMerge(Lists.newArrayList(info)));
                 }
+
+                // TODO: we could check IndexWriter.getMergingSegments and avoid adding merges that IW will just reject?
+
                 if (spec.merges.size() == MAX_CONCURRENT_UPGRADE_MERGES) {
                     // hit our max upgrades, so return the spec.  we will get a cascaded call to continue.
                     logger.debug("Returning " + spec.merges.size() + " merges for upgrade");
                     return spec;
                 }
             }
+
             // We must have less than our max upgrade merges, so the next return will be our last in upgrading mode.
-            upgradeInProgress = false;
             if (spec.merges.isEmpty() == false) {
-                logger.debug("Return " + spec.merges.size() + " merges for end of upgrade");
+                logger.debug("Returning " + spec.merges.size() + " merges for end of upgrade");
                 return spec;
             }
+
+            // Only set this once there are 0 segments needing upgrading, because when we return a
+            // spec, IndexWriter may (silently!) reject that merge if some of the segments we asked
+            // to be merged were already being (naturally) merged:
+            upgradeInProgress = false;
+
             // fall through, so when we don't have any segments to upgrade, the delegate policy
             // has a chance to decide what to do (e.g. collapse the segments to satisfy maxSegmentCount)
         }
@@ -166,8 +200,9 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
      * {@link IndexWriter#forceMerge} that is handled by this {@link MergePolicy}, as well as
      * cascading calls made by {@link IndexWriter}.
      */
-    public void setUpgradeInProgress(boolean upgrade) {
+    public void setUpgradeInProgress(boolean upgrade, boolean onlyAncientSegments) {
         this.upgradeInProgress = upgrade;
+        this.upgradeOnlyAncientSegments = onlyAncientSegments;
     }
 
     @Override

+ 2 - 1
src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -702,7 +702,8 @@ public class IndexShard extends AbstractIndexShardComponent {
         if (logger.isTraceEnabled()) {
             logger.trace("optimize with {}", optimize);
         }
-        engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
+        engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(),
+                            optimize.upgrade(), optimize.upgradeOnlyAncientSegments());
     }
 
     public SnapshotIndexCommit snapshotIndex() throws EngineException {

+ 16 - 8
src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java

@@ -38,6 +38,7 @@ import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
 import org.elasticsearch.rest.action.support.RestBuilderListener;
+import java.io.IOException;
 
 import static org.elasticsearch.rest.RestRequest.Method.GET;
 import static org.elasticsearch.rest.RestRequest.Method.POST;
@@ -73,12 +74,10 @@ public class RestUpgradeAction extends BaseRestHandler {
             public RestResponse buildResponse(IndicesSegmentResponse response, XContentBuilder builder) throws Exception {
                 builder.startObject();
                 
-                // TODO: getIndices().values() is what IndecesSegmentsResponse uses, but this will produce different orders with jdk8?
+                // TODO: getIndices().values() is what IndicesSegmentsResponse uses, but this will produce different orders with jdk8?
                 for (IndexSegments indexSegments : response.getIndices().values()) {
-                    Tuple<Long, Long> summary = calculateUpgradeStatus(indexSegments);
                     builder.startObject(indexSegments.getIndex());
-                    builder.byteSizeField(SIZE_IN_BYTES, SIZE, summary.v1());
-                    builder.byteSizeField(SIZE_TO_UPGRADE_IN_BYTES, SIZE_TO_UPGRADE, summary.v2());
+                    buildUpgradeStatus(indexSegments, builder);
                     builder.endObject();
                 }
                 
@@ -92,6 +91,7 @@ public class RestUpgradeAction extends BaseRestHandler {
         OptimizeRequest optimizeReq = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
         optimizeReq.flush(true);
         optimizeReq.upgrade(true);
+        optimizeReq.upgradeOnlyAncientSegments(request.paramAsBoolean("only_ancient_segments", false));
         optimizeReq.maxNumSegments(Integer.MAX_VALUE); // we just want to upgrade the segments, not actually optimize to a single segment
         client.admin().indices().optimize(optimizeReq, new RestBuilderListener<OptimizeResponse>(channel) {
             @Override
@@ -104,15 +104,18 @@ public class RestUpgradeAction extends BaseRestHandler {
         });
     }
     
-    Tuple<Long, Long> calculateUpgradeStatus(IndexSegments indexSegments) {
+    void buildUpgradeStatus(IndexSegments indexSegments, XContentBuilder builder) throws IOException {
         long total_bytes = 0;
         long to_upgrade_bytes = 0;
+        long to_upgrade_bytes_ancient = 0;
         for (IndexShardSegments shard : indexSegments) {
             for (ShardSegments segs : shard.getShards()) {
                 for (Segment seg : segs.getSegments()) {
                     total_bytes += seg.sizeInBytes;
-                    if (seg.version.major != Version.CURRENT.luceneVersion.major ||
-                        seg.version.minor != Version.CURRENT.luceneVersion.minor) {
+                    if (seg.version.major != Version.CURRENT.luceneVersion.major) {
+                        to_upgrade_bytes_ancient += seg.sizeInBytes;
+                        to_upgrade_bytes += seg.sizeInBytes;
+                    } else if (seg.version.minor != Version.CURRENT.luceneVersion.minor) {
                         // TODO: this comparison is bogus! it would cause us to upgrade even with the same format
                         // instead, we should check if the codec has changed
                         to_upgrade_bytes += seg.sizeInBytes;
@@ -120,11 +123,16 @@ public class RestUpgradeAction extends BaseRestHandler {
                 }
             }
         }
-        return new Tuple<>(total_bytes, to_upgrade_bytes);
+
+        builder.byteSizeField(SIZE_IN_BYTES, SIZE, total_bytes);
+        builder.byteSizeField(SIZE_TO_UPGRADE_IN_BYTES, SIZE_TO_UPGRADE, to_upgrade_bytes);
+        builder.byteSizeField(SIZE_TO_UPGRADE_ANCIENT_IN_BYTES, SIZE_TO_UPGRADE_ANCIENT, to_upgrade_bytes_ancient);
     }
 
     static final XContentBuilderString SIZE = new XContentBuilderString("size");
     static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes");
     static final XContentBuilderString SIZE_TO_UPGRADE = new XContentBuilderString("size_to_upgrade");
+    static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT = new XContentBuilderString("size_to_upgrade_ancient");
     static final XContentBuilderString SIZE_TO_UPGRADE_IN_BYTES = new XContentBuilderString("size_to_upgrade_in_bytes");
+    static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT_IN_BYTES = new XContentBuilderString("size_to_upgrade_ancient_in_bytes");
 }

+ 4 - 2
src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java

@@ -113,9 +113,11 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
             String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
             if (buffer.readerIndex() != expectedIndexReader) {
                 if (buffer.readerIndex() < expectedIndexReader) {
-                    logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
+                    logger.warn("Message not fully read (request) for requestId [{}], action [{}], readerIndex [{}] vs expected [{}]; resetting",
+                                requestId, action, buffer.readerIndex(), expectedIndexReader);
                 } else {
-                    logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
+                    logger.warn("Message read past expected size (request) for requestId=[{}], action [{}], readerIndex [{}] vs expected [{}]; resetting",
+                                requestId, action, buffer.readerIndex(), expectedIndexReader);
                 }
                 buffer.readerIndex(expectedIndexReader);
             }

+ 6 - 6
src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -1034,13 +1034,13 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
         try (Engine.Searcher test = engine.acquireSearcher("test")) {
             assertEquals(numDocs, test.reader().numDocs());
         }
-        engine.forceMerge(true, 1, false, false);
+        engine.forceMerge(true, 1, false, false, false);
         assertEquals(engine.segments(true).size(), 1);
 
         ParsedDocument doc = testParsedDocument(Integer.toString(0), Integer.toString(0), "test", null, -1, -1, testDocument(), B_1, false);
         Engine.Index index = new Engine.Index(null, newUid(Integer.toString(0)), doc);
         engine.delete(new Engine.Delete(index.type(), index.id(), index.uid()));
-        engine.forceMerge(true, 10, true, false); //expunge deletes
+        engine.forceMerge(true, 10, true, false, false); //expunge deletes
 
         assertEquals(engine.segments(true).size(), 1);
         try (Engine.Searcher test = engine.acquireSearcher("test")) {
@@ -1051,7 +1051,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
         doc = testParsedDocument(Integer.toString(1), Integer.toString(1), "test", null, -1, -1, testDocument(), B_1, false);
         index = new Engine.Index(null, newUid(Integer.toString(1)), doc);
         engine.delete(new Engine.Delete(index.type(), index.id(), index.uid()));
-        engine.forceMerge(true, 10, false, false); //expunge deletes
+        engine.forceMerge(true, 10, false, false, false); //expunge deletes
 
         assertEquals(engine.segments(true).size(), 1);
         try (Engine.Searcher test = engine.acquireSearcher("test")) {
@@ -1089,7 +1089,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
                                 engine.refresh("test");
                                 indexed.countDown();
                                 try {
-                                    engine.forceMerge(randomBoolean(), 1, false, randomBoolean());
+                                    engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean());
                                 } catch (ForceMergeFailedEngineException ex) {
                                     // ok
                                     return;
@@ -1105,7 +1105,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
                 startGun.countDown();
                 int someIters = randomIntBetween(1, 10);
                 for (int i = 0; i < someIters; i++) {
-                    engine.forceMerge(randomBoolean(), 1, false, randomBoolean());
+                    engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean());
                 }
                 indexed.await();
                 IOUtils.close(engine, translog);
@@ -1711,7 +1711,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
             }
 
             // Force merge so we know all merges are done before we start deleting:
-            engine.forceMerge(true, 1, false, false);
+            engine.forceMerge(true, 1, false, false, false);
 
             Searcher s = engine.acquireSearcher("test");
             final long version1 = ((DirectoryReader) s.reader()).getVersion();

+ 52 - 12
src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java

@@ -137,32 +137,34 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest {
         logger.info("--> Nodes upgrade complete");
         logSegmentsState();
         
-        final HttpRequestBuilder httpClient = httpClient();
-
-        assertNotUpgraded(httpClient, null);
+        assertNotUpgraded(httpClient(), null);
         final String indexToUpgrade = "test" + randomInt(numIndexes - 1);
+
+        // This test fires up another node running an older version of ES, but because wire protocol changes across major ES versions, it
+        // means we can never generate ancient segments in this test (unless Lucene major version bumps but ES major version does not):
+        assertFalse(hasAncientSegments(httpClient(), indexToUpgrade));
         
         logger.info("--> Running upgrade on index " + indexToUpgrade);
-        runUpgrade(httpClient, indexToUpgrade);
+        runUpgrade(httpClient(), indexToUpgrade);
         awaitBusy(new Predicate<Object>() {
             @Override
             public boolean apply(Object o) {
                 try {
-                    return isUpgraded(httpClient, indexToUpgrade);
+                    return isUpgraded(httpClient(), indexToUpgrade);
                 } catch (Exception e) {
                     throw ExceptionsHelper.convertToRuntime(e);
                 }
             }
         });
         logger.info("--> Single index upgrade complete");
-        
+
         logger.info("--> Running upgrade on the rest of the indexes");
-        runUpgrade(httpClient, null);
+        runUpgrade(httpClient(), null);
         logSegmentsState();
         logger.info("--> Full upgrade complete");
-        assertUpgraded(httpClient, null);
+        assertUpgraded(httpClient(), null);
     }
-    
+
     static String upgradePath(String index) {
         String path = "/_upgrade";
         if (index != null) {
@@ -182,6 +184,39 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest {
         }
     }
 
+    public static void assertNoAncientSegments(HttpRequestBuilder httpClient, String index) throws Exception {
+        for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) {
+            assertTrue("index " + status.indexName + " should not be zero sized", status.totalBytes != 0);
+            // TODO: it would be better for this to be strictly greater, but sometimes an extra flush
+            // mysteriously happens after the second round of docs are indexed
+            assertTrue("index " + status.indexName + " should not have any ancient segments",
+                       status.toUpgradeBytesAncient == 0);
+            assertTrue("index " + status.indexName + " should have recovered some segments from transaction log",
+                       status.totalBytes >= status.toUpgradeBytes);
+            assertTrue("index " + status.indexName + " should need upgrading", status.toUpgradeBytes != 0);
+        }
+    }
+
+    /** Returns true if there are any ancient segments. */
+    public static boolean hasAncientSegments(HttpRequestBuilder httpClient, String index) throws Exception {
+        for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) {
+            if (status.toUpgradeBytesAncient != 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /** Returns true if there are any old but not ancient segments. */
+    public static boolean hasOldButNotAncientSegments(HttpRequestBuilder httpClient, String index) throws Exception {
+        for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) {
+            if (status.toUpgradeBytes > status.toUpgradeBytesAncient) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public static void assertUpgraded(HttpRequestBuilder httpClient, String index) throws Exception {
         for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) {
             assertTrue("index " + status.indexName + " should not be zero sized", status.totalBytes != 0);
@@ -209,7 +244,7 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest {
             }
         }
     }
-    
+
     static boolean isUpgraded(HttpRequestBuilder httpClient, String index) throws Exception {
         ESLogger logger = Loggers.getLogger(UpgradeTest.class);
         int toUpgrade = 0;
@@ -224,11 +259,14 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest {
         public final String indexName;
         public final int totalBytes;
         public final int toUpgradeBytes;
+        public final int toUpgradeBytesAncient;
         
-        public UpgradeStatus(String indexName, int totalBytes, int toUpgradeBytes) {
+        public UpgradeStatus(String indexName, int totalBytes, int toUpgradeBytes, int toUpgradeBytesAncient) {
             this.indexName = indexName;
             this.totalBytes = totalBytes;
             this.toUpgradeBytes = toUpgradeBytes;
+            this.toUpgradeBytesAncient = toUpgradeBytesAncient;
+            assert toUpgradeBytesAncient <= toUpgradeBytes;
         }
     }
     
@@ -256,7 +294,9 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest {
             assertTrue("missing key size_to_upgrade_in_bytes for index " + index, status.containsKey("size_to_upgrade_in_bytes"));
             Object toUpgradeBytes = status.get("size_to_upgrade_in_bytes");
             assertTrue("size_to_upgrade_in_bytes for index " + index + " is not an integer", toUpgradeBytes instanceof Integer);
-            ret.add(new UpgradeStatus(index, (Integer)totalBytes, (Integer)toUpgradeBytes));
+            Object toUpgradeBytesAncient = status.get("size_to_upgrade_ancient_in_bytes");
+            assertTrue("size_to_upgrade_ancient_in_bytes for index " + index + " is not an integer", toUpgradeBytesAncient instanceof Integer);
+            ret.add(new UpgradeStatus(index, (Integer) totalBytes, (Integer) toUpgradeBytes, (Integer) toUpgradeBytesAncient));
         }
         return ret;
     }

+ 1 - 1
src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java

@@ -1003,7 +1003,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
                 .health(Requests.clusterHealthRequest(indices).timeout(timeout).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
         if (actionGet.isTimedOut()) {
             logger.info("ensureGreen timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
-            assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false));
+            fail("timed out waiting for green state");
         }
         assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN));
         logger.debug("indices {} are green", indices.length == 0 ? "[_all]" : indices);