浏览代码

Adapt low-level REST client to java 8 (#41537)

As a follow-up to #38540 we can use lambda functions and method
references where convenient in the low-level REST client.

Also, we need to update the docs to state that the minimum java version
required is 1.8.
Luca Cavanna 6 年之前
父节点
当前提交
ee51702e78

+ 10 - 27
client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.client;
 
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 /**
  * Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and
@@ -30,10 +31,11 @@ final class DeadHostState implements Comparable<DeadHostState> {
 
     private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
     static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);
+    static final Supplier<Long> DEFAULT_TIME_SUPPLIER = System::nanoTime;
 
     private final int failedAttempts;
     private final long deadUntilNanos;
-    private final TimeSupplier timeSupplier;
+    private final Supplier<Long> timeSupplier;
 
     /**
      * Build the initial dead state of a host. Useful when a working host stops functioning
@@ -41,9 +43,9 @@ final class DeadHostState implements Comparable<DeadHostState> {
      *
      * @param timeSupplier a way to supply the current time and allow for unit testing
      */
-    DeadHostState(TimeSupplier timeSupplier) {
+    DeadHostState(Supplier<Long> timeSupplier) {
         this.failedAttempts = 1;
-        this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
+        this.deadUntilNanos = timeSupplier.get() + MIN_CONNECTION_TIMEOUT_NANOS;
         this.timeSupplier = timeSupplier;
     }
 
@@ -51,14 +53,14 @@ final class DeadHostState implements Comparable<DeadHostState> {
      * Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence
      * it already failed for one or more consecutive times. The more failed attempts we register the longer we wait
      * to retry that same host again. Minimum is 1 minute (for a node the only failed once created
-     * through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
+     * through {@link #DeadHostState(Supplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
      *
      * @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt
      */
     DeadHostState(DeadHostState previousDeadHostState) {
         long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
                 MAX_CONNECTION_TIMEOUT_NANOS);
-        this.deadUntilNanos = previousDeadHostState.timeSupplier.nanoTime() + timeoutNanos;
+        this.deadUntilNanos = previousDeadHostState.timeSupplier.get() + timeoutNanos;
         this.failedAttempts = previousDeadHostState.failedAttempts + 1;
         this.timeSupplier = previousDeadHostState.timeSupplier;
     }
@@ -69,7 +71,7 @@ final class DeadHostState implements Comparable<DeadHostState> {
      * @return true if the host should be retried, false otherwise
      */
     boolean shallBeRetried() {
-        return timeSupplier.nanoTime() - deadUntilNanos > 0;
+        return timeSupplier.get() - deadUntilNanos > 0;
     }
 
     /**
@@ -87,8 +89,8 @@ final class DeadHostState implements Comparable<DeadHostState> {
     @Override
     public int compareTo(DeadHostState other) {
         if (timeSupplier != other.timeSupplier) {
-            throw new IllegalArgumentException("can't compare DeadHostStates with different clocks ["
-                    + timeSupplier + " != " + other.timeSupplier + "]");
+            throw new IllegalArgumentException("can't compare DeadHostStates holding different time suppliers as they may " +
+                "be based on different clocks");
         }
         return Long.compare(deadUntilNanos, other.deadUntilNanos);
     }
@@ -101,23 +103,4 @@ final class DeadHostState implements Comparable<DeadHostState> {
                 ", timeSupplier=" + timeSupplier +
                 '}';
     }
-
-    /**
-     * Time supplier that makes timing aspects pluggable to ease testing
-     */
-    interface TimeSupplier {
-        TimeSupplier DEFAULT = new TimeSupplier() {
-            @Override
-            public long nanoTime() {
-                return System.nanoTime();
-            }
-
-            @Override
-            public String toString() {
-                return "nanoTime";
-            }
-        };
-
-        long nanoTime();
-    }
 }

+ 0 - 1
client/rest/src/main/java/org/elasticsearch/client/Request.java

@@ -74,7 +74,6 @@ public final class Request {
      */
     public void addParameter(String name, String value) {
         Objects.requireNonNull(name, "url parameter name cannot be null");
-        // .putIfAbsent(name, value) except we are in Java 7 which doesn't have that.
         if (parameters.containsKey(name)) {
             throw new IllegalArgumentException("url parameter [" + name + "] has already been set to [" + parameters.get(name) + "]");
         } else {

+ 2 - 2
client/rest/src/main/java/org/elasticsearch/client/RequestOptions.java

@@ -19,8 +19,8 @@
 
 package org.elasticsearch.client;
 
-import org.apache.http.message.BasicHeader;
 import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
 import org.elasticsearch.client.HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory;
 
@@ -38,7 +38,7 @@ public final class RequestOptions {
      * Default request options.
      */
     public static final RequestOptions DEFAULT = new Builder(
-            Collections.<Header>emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build();
+            Collections.emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT, null).build();
 
     private final List<Header> headers;
     private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;

+ 11 - 27
client/rest/src/main/java/org/elasticsearch/client/RestClient.java

@@ -46,7 +46,6 @@ import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.apache.http.nio.client.methods.HttpAsyncMethods;
 import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
-import org.elasticsearch.client.DeadHostState.TimeSupplier;
 
 import javax.net.ssl.SSLHandshakeException;
 import java.io.Closeable;
@@ -72,6 +71,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonList;
 
@@ -139,7 +139,11 @@ public class RestClient implements Closeable {
      * @see Node#Node(HttpHost)
      */
     public static RestClientBuilder builder(HttpHost... hosts) {
-        return new RestClientBuilder(hostsToNodes(hosts));
+        if (hosts == null || hosts.length == 0) {
+            throw new IllegalArgumentException("hosts must not be null nor empty");
+        }
+        List<Node> nodes = Arrays.stream(hosts).map(Node::new).collect(Collectors.toList());
+        return new RestClientBuilder(nodes);
     }
 
     /**
@@ -163,17 +167,6 @@ public class RestClient implements Closeable {
         this.blacklist.clear();
     }
 
-    private static List<Node> hostsToNodes(HttpHost[] hosts) {
-        if (hosts == null || hosts.length == 0) {
-            throw new IllegalArgumentException("hosts must not be null nor empty");
-        }
-        List<Node> nodes = new ArrayList<>(hosts.length);
-        for (HttpHost host : hosts) {
-            nodes.add(new Node(host));
-        }
-        return nodes;
-    }
-
     /**
      * Get the list of nodes that the client knows about. The list is
      * unmodifiable.
@@ -369,15 +362,11 @@ public class RestClient implements Closeable {
         List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
         for (Node node : nodeTuple.nodes) {
             DeadHostState deadness = blacklist.get(node.getHost());
-            if (deadness == null) {
-                livingNodes.add(node);
-                continue;
-            }
-            if (deadness.shallBeRetried()) {
+            if (deadness == null || deadness.shallBeRetried()) {
                 livingNodes.add(node);
-                continue;
+            } else {
+                deadNodes.add(new DeadNode(node, deadness));
             }
-            deadNodes.add(new DeadNode(node, deadness));
         }
 
         if (false == livingNodes.isEmpty()) {
@@ -415,12 +404,7 @@ public class RestClient implements Closeable {
              * to compare many things. This saves us a sort on the unfiltered
              * list.
              */
-            nodeSelector.select(new Iterable<Node>() {
-                @Override
-                public Iterator<Node> iterator() {
-                    return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());
-                }
-            });
+            nodeSelector.select(() -> new DeadNodeIteratorAdapter(selectedDeadNodes.iterator()));
             if (false == selectedDeadNodes.isEmpty()) {
                 return singletonList(Collections.min(selectedDeadNodes).node);
             }
@@ -447,7 +431,7 @@ public class RestClient implements Closeable {
     private void onFailure(Node node) {
         while(true) {
             DeadHostState previousDeadHostState =
-                blacklist.putIfAbsent(node.getHost(), new DeadHostState(TimeSupplier.DEFAULT));
+                blacklist.putIfAbsent(node.getHost(), new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER));
             if (previousDeadHostState == null) {
                 if (logger.isDebugEnabled()) {
                     logger.debug("added [" + node + "] to blacklist");

+ 3 - 12
client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java

@@ -186,12 +186,8 @@ public final class RestClientBuilder {
         if (failureListener == null) {
             failureListener = new RestClient.FailureListener();
         }
-        CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
-            @Override
-            public CloseableHttpAsyncClient run() {
-                return createHttpClient();
-            }
-        });
+        CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
+            (PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient);
         RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
                 pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
         httpClient.start();
@@ -218,12 +214,7 @@ public final class RestClientBuilder {
             }
 
             final HttpAsyncClientBuilder finalBuilder = httpClientBuilder;
-            return AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
-                @Override
-                public CloseableHttpAsyncClient run() {
-                    return finalBuilder.build();
-                }
-            });
+            return AccessController.doPrivileged((PrivilegedAction<CloseableHttpAsyncClient>) finalBuilder::build);
         } catch (NoSuchAlgorithmException e) {
             throw new IllegalStateException("could not create the default ssl context", e);
         }

+ 16 - 51
client/rest/src/test/java/org/elasticsearch/client/DeadHostStateTests.java

@@ -22,8 +22,6 @@ package org.elasticsearch.client;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.elasticsearch.client.DeadHostState.TimeSupplier;
-
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -38,14 +36,14 @@ public class DeadHostStateTests extends RestClientTestCase {
     private static long[] EXPECTED_TIMEOUTS_SECONDS = new long[]{60, 84, 120, 169, 240, 339, 480, 678, 960, 1357, 1800};
 
     public void testInitialDeadHostStateDefaultTimeSupplier() {
-        DeadHostState deadHostState = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
+        DeadHostState deadHostState = new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER);
         long currentTime = System.nanoTime();
         assertThat(deadHostState.getDeadUntilNanos(), greaterThanOrEqualTo(currentTime));
         assertThat(deadHostState.getFailedAttempts(), equalTo(1));
     }
 
     public void testDeadHostStateFromPreviousDefaultTimeSupplier() {
-        DeadHostState previous = new DeadHostState(DeadHostState.TimeSupplier.DEFAULT);
+        DeadHostState previous = new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER);
         int iters = randomIntBetween(5, 30);
         for (int i = 0; i < iters; i++) {
             DeadHostState deadHostState = new DeadHostState(previous);
@@ -58,10 +56,13 @@ public class DeadHostStateTests extends RestClientTestCase {
     public void testCompareToTimeSupplier() {
         int numObjects = randomIntBetween(EXPECTED_TIMEOUTS_SECONDS.length, 30);
         DeadHostState[] deadHostStates = new DeadHostState[numObjects];
+        final AtomicLong time = new AtomicLong(0);
         for (int i = 0; i < numObjects; i++) {
             if (i == 0) {
-                // this test requires a strictly increasing timer
-                deadHostStates[i] = new DeadHostState(new StrictMonotonicTimeSupplier());
+                // this test requires a strictly increasing timer. This ensures that even if we call this time supplier in a very tight
+                // loop we always notice time moving forward. This does not happen for real timer implementations
+                // (e.g. on Linux <code>clock_gettime</code> provides microsecond resolution).
+                deadHostStates[i] = new DeadHostState(time::incrementAndGet);
             } else {
                 deadHostStates[i] = new DeadHostState(deadHostStates[i - 1]);
             }
@@ -74,42 +75,39 @@ public class DeadHostStateTests extends RestClientTestCase {
 
     public void testCompareToDifferingTimeSupplier() {
         try {
-            new DeadHostState(TimeSupplier.DEFAULT).compareTo(
-                    new DeadHostState(new ConfigurableTimeSupplier()));
+            new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER).compareTo(
+                    new DeadHostState(() -> 0L));
             fail("expected failure");
         } catch (IllegalArgumentException e) {
-            assertEquals("can't compare DeadHostStates with different clocks [nanoTime != configured[0]]",
-                    e.getMessage());
+            assertEquals("can't compare DeadHostStates holding different time suppliers as they may " +
+            "be based on different clocks", e.getMessage());
         }
     }
 
     public void testShallBeRetried() {
-        ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier();
+        final AtomicLong time = new AtomicLong(0);
         DeadHostState deadHostState = null;
         for (int i = 0; i < EXPECTED_TIMEOUTS_SECONDS.length; i++) {
             long expectedTimeoutSecond = EXPECTED_TIMEOUTS_SECONDS[i];
-            timeSupplier.nanoTime = 0;
             if (i == 0) {
-                deadHostState = new DeadHostState(timeSupplier);
+                deadHostState = new DeadHostState(time::get);
             } else {
                 deadHostState = new DeadHostState(deadHostState);
             }
             for (int j = 0; j < expectedTimeoutSecond; j++) {
-                timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
+                time.addAndGet(TimeUnit.SECONDS.toNanos(1));
                 assertThat(deadHostState.shallBeRetried(), is(false));
             }
             int iters = randomIntBetween(5, 30);
             for (int j = 0; j < iters; j++) {
-                timeSupplier.nanoTime += TimeUnit.SECONDS.toNanos(1);
+                time.addAndGet(TimeUnit.SECONDS.toNanos(1));
                 assertThat(deadHostState.shallBeRetried(), is(true));
             }
         }
     }
 
     public void testDeadHostStateTimeouts() {
-        ConfigurableTimeSupplier zeroTimeSupplier = new ConfigurableTimeSupplier();
-        zeroTimeSupplier.nanoTime = 0L;
-        DeadHostState previous = new DeadHostState(zeroTimeSupplier);
+        DeadHostState previous = new DeadHostState(() -> 0L);
         for (long expectedTimeoutsSecond : EXPECTED_TIMEOUTS_SECONDS) {
             assertThat(TimeUnit.NANOSECONDS.toSeconds(previous.getDeadUntilNanos()), equalTo(expectedTimeoutsSecond));
             previous = new DeadHostState(previous);
@@ -123,37 +121,4 @@ public class DeadHostStateTests extends RestClientTestCase {
             previous = deadHostState;
         }
     }
-
-    static class ConfigurableTimeSupplier implements DeadHostState.TimeSupplier {
-        long nanoTime;
-
-        @Override
-        public long nanoTime() {
-            return nanoTime;
-        }
-
-        @Override
-        public String toString() {
-            return "configured[" + nanoTime + "]";
-        }
-    }
-
-    /**
-     * Simulates a monotonically strict increasing time (i.e. the value increases on every call to <code>#nanoTime()</code>). This ensures
-     * that even if we call this time supplier in a very tight loop we always notice time moving forward. This does not happen for real
-     * timer implementations (e.g. on Linux <code>clock_gettime</code> provides microsecond resolution).
-     */
-    static class StrictMonotonicTimeSupplier implements DeadHostState.TimeSupplier {
-        private final AtomicLong time = new AtomicLong(0);
-
-        @Override
-        public long nanoTime() {
-            return time.incrementAndGet();
-        }
-
-        @Override
-        public String toString() {
-            return "strict monotonic[" + time.get() + "]";
-        }
-    }
 }

+ 8 - 6
client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java

@@ -25,7 +25,6 @@ import org.apache.http.client.AuthCache;
 import org.apache.http.impl.auth.BasicScheme;
 import org.apache.http.impl.client.BasicAuthCache;
 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.elasticsearch.client.DeadHostStateTests.ConfigurableTimeSupplier;
 import org.elasticsearch.client.RestClient.NodeTuple;
 
 import java.io.IOException;
@@ -40,6 +39,8 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 import static java.util.Collections.singletonList;
 import static org.hamcrest.Matchers.instanceOf;
@@ -266,14 +267,15 @@ public class RestClientTests extends RestClientTestCase {
 
         // Mark all the nodes dead for a few test cases
         {
-            ConfigurableTimeSupplier timeSupplier = new ConfigurableTimeSupplier();
+            final AtomicLong time = new AtomicLong(0L);
+            Supplier<Long> timeSupplier = time::get;
             Map<HttpHost, DeadHostState> blacklist = new HashMap<>();
             blacklist.put(n1.getHost(), new DeadHostState(timeSupplier));
             blacklist.put(n2.getHost(), new DeadHostState(new DeadHostState(timeSupplier)));
             blacklist.put(n3.getHost(), new DeadHostState(new DeadHostState(new DeadHostState(timeSupplier))));
 
             /*
-             * case when fewer nodeTuple than blacklist, wont result in any IllegalCapacityException
+             * case when fewer nodeTuple than blacklist, won't result in any IllegalCapacityException
              */
             {
                 NodeTuple<List<Node>> fewerNodeTuple = new NodeTuple<>(Arrays.asList(n1, n2), null);
@@ -282,7 +284,7 @@ public class RestClientTests extends RestClientTestCase {
             }
 
             /*
-             * selectHosts will revive a single host if regardless of
+             * selectHosts will revive a single host regardless of
              * blacklist time. It'll revive the node that is closest
              * to being revived that the NodeSelector is ok with.
              */
@@ -304,7 +306,7 @@ public class RestClientTests extends RestClientTestCase {
              * Now lets wind the clock forward, past the timeout for one of
              * the dead nodes. We should return it.
              */
-            timeSupplier.nanoTime = new DeadHostState(timeSupplier).getDeadUntilNanos();
+            time.set(new DeadHostState(timeSupplier).getDeadUntilNanos());
             assertSelectLivingHosts(Arrays.asList(n1), nodeTuple, blacklist, NodeSelector.ANY);
 
             /*
@@ -318,7 +320,7 @@ public class RestClientTests extends RestClientTestCase {
              * blacklist timeouts then we function as though the nodes aren't
              * in the blacklist at all.
              */
-            timeSupplier.nanoTime += DeadHostState.MAX_CONNECTION_TIMEOUT_NANOS;
+            time.addAndGet(DeadHostState.MAX_CONNECTION_TIMEOUT_NANOS);
             assertSelectLivingHosts(Arrays.asList(n1, n2, n3), nodeTuple, blacklist, NodeSelector.ANY);
             assertSelectLivingHosts(Arrays.asList(n2, n3), nodeTuple, blacklist, not1);
         }

+ 1 - 1
docs/java-rest/low-level/usage.asciidoc

@@ -14,7 +14,7 @@ The javadoc for the low level REST client can be found at {rest-client-javadoc}/
 
 The low-level Java REST client is hosted on
 http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.elasticsearch.client%22[Maven
-Central]. The minimum Java version required is `1.7`.
+Central]. The minimum Java version required is `1.8`.
 
 The low-level REST client is subject to the same release cycle as
 Elasticsearch. Replace the version with the desired client version, first