Browse Source

Refactored TransportMessage context

Removed CHM in favour of an OpenHashMap and synchronized accessor/mutator methods. Also, the context is now lazily inititialied (just like we do with the headers)
uboness 11 years ago
parent
commit
221eafab59

+ 9 - 0
src/main/java/org/elasticsearch/common/collect/ImmutableOpenMap.java

@@ -189,6 +189,15 @@ public final class ImmutableOpenMap<KType, VType> implements Iterable<ObjectObje
         return EMPTY;
     }
 
+    /**
+     * @return  An immutable copy of the given map
+     */
+    public static <KType, VType> ImmutableOpenMap<KType, VType> copyOf(ObjectObjectMap<KType, VType> map) {
+        Builder<KType, VType> builder = builder();
+        builder.putAll(map);
+        return builder.build();
+    }
+
     public static <KType, VType> Builder<KType, VType> builder() {
         return new Builder<>();
     }

+ 76 - 20
src/main/java/org/elasticsearch/transport/TransportMessage.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.transport;
 
+import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
@@ -29,8 +31,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 /**
  *
@@ -38,14 +38,13 @@ import java.util.concurrent.ConcurrentMap;
 public abstract class TransportMessage<TM extends TransportMessage<TM>> implements Streamable {
 
     // a transient (not serialized with the request) key/value registry
-    private final ConcurrentMap<Object, Object> context;
+    private ObjectObjectOpenHashMap<Object, Object> context;
 
     private Map<String, Object> headers;
 
     private TransportAddress remoteAddress;
 
     protected TransportMessage() {
-        context = new ConcurrentHashMap<>();
     }
 
     protected TransportMessage(TM message) {
@@ -55,21 +54,9 @@ public abstract class TransportMessage<TM extends TransportMessage<TM>> implemen
         if (((TransportMessage<?>) message).headers != null) {
             this.headers = new HashMap<>(((TransportMessage<?>) message).headers);
         }
-        this.context = new ConcurrentHashMap<>(((TransportMessage<?>) message).context);
-    }
-
-    /**
-     * The request context enables attaching transient data with the request - data
-     * that is not serialized along with the request.
-     *
-     * There are many use cases such data is required, for example, when processing the
-     * request headers and building other constructs from them, one could "cache" the
-     * already built construct to avoid reprocessing the header over and over again.
-     *
-     * @return The request context
-     */
-    public ConcurrentMap<Object, Object> context() {
-        return context;
+        if (((TransportMessage<?>) message).context != null) {
+            this.context = new ObjectObjectOpenHashMap<>(((TransportMessage<?>) message).context);
+        }
     }
 
     public void remoteAddress(TransportAddress remoteAddress) {
@@ -102,6 +89,76 @@ public abstract class TransportMessage<TM extends TransportMessage<TM>> implemen
         return headers != null ? headers.keySet() : Collections.<String>emptySet();
     }
 
+    /**
+     * Attaches the given transient value to the request - this value will not be serialized
+     * along with the request.
+     *
+     * There are many use cases such data is required, for example, when processing the
+     * request headers and building other constructs from them, one could "cache" the
+     * already built construct to avoid reprocessing the header over and over again.
+     *
+     * @return  The previous value that was associated with the given key in the context, or
+     *          {@code null} if there was none.
+     */
+    @SuppressWarnings("unchecked")
+    public final synchronized <V> V putInContext(Object key, Object value) {
+        if (context == null) {
+            context = new ObjectObjectOpenHashMap<>(2);
+        }
+        return (V) context.put(key, value);
+    }
+
+    /**
+     * @return  The transient value that is associated with the given key in the request context
+     * @see #putInContext(Object, Object)
+     */
+    @SuppressWarnings("unchecked")
+    public final synchronized <V> V getFromContext(Object key) {
+        return context != null ? (V) context.get(key) : null;
+    }
+
+    /**
+     * @param defaultValue  The default value that should be returned for the given key, if no
+     *                      value is currently associated with it.
+     *
+     * @return  The transient value that is associated with the given key in the request context
+     *
+     * @see #putInContext(Object, Object)
+     */
+    @SuppressWarnings("unchecked")
+    public final synchronized <V> V getFromContext(Object key, V defaultValue) {
+        V value = getFromContext(key);
+        return value == null ? defaultValue : value;
+    }
+
+    /**
+     * Checks if the request context contains an entry with the given key
+     */
+    public final synchronized boolean hasInContext(Object key) {
+        return context != null && context.containsKey(key);
+    }
+
+    /**
+     * @return  The number of transient values attached in the request context.
+     */
+    public final synchronized int contextSize() {
+        return context != null ? context.size() : 0;
+    }
+
+    /**
+     * Checks if the request context is empty.
+     */
+    public final synchronized boolean isContextEmpty() {
+        return context == null || context.isEmpty();
+    }
+
+    /**
+     * @return  A safe immutable copy of the current context of this request.
+     */
+    public synchronized ImmutableOpenMap<Object, Object> getContext() {
+        return context != null ? ImmutableOpenMap.copyOf(context) : ImmutableOpenMap.of();
+    }
+
     @Override
     public void readFrom(StreamInput in) throws IOException {
         headers = in.readBoolean() ? in.readMap() : null;
@@ -116,5 +173,4 @@ public abstract class TransportMessage<TM extends TransportMessage<TM>> implemen
             out.writeMap(headers);
         }
     }
-
 }

+ 5 - 5
src/test/java/org/elasticsearch/transport/TransportMessageTests.java

@@ -34,11 +34,11 @@ import static org.hamcrest.Matchers.is;
 public class TransportMessageTests extends ElasticsearchTestCase {
 
     @Test
-    public void testTransientContext() throws Exception {
+    public void testSerialization() throws Exception {
         Message message = new Message();
         message.putHeader("key1", "value1");
         message.putHeader("key2", "value2");
-        message.context().put("key3", "value3");
+        message.putInContext("key3", "value3");
 
         BytesStreamOutput out = new BytesStreamOutput();
         out.setVersion(Version.CURRENT);
@@ -50,7 +50,7 @@ public class TransportMessageTests extends ElasticsearchTestCase {
         assertThat(message.getHeaders().size(), is(2));
         assertThat((String) message.getHeader("key1"), equalTo("value1"));
         assertThat((String) message.getHeader("key2"), equalTo("value2"));
-        assertThat(message.context().isEmpty(), is(true));
+        assertThat(message.isContextEmpty(), is(true));
     }
 
     @Test
@@ -58,14 +58,14 @@ public class TransportMessageTests extends ElasticsearchTestCase {
         Message m1 = new Message();
         m1.putHeader("key1", "value1");
         m1.putHeader("key2", "value2");
-        m1.context().put("key3", "value3");
+        m1.putInContext("key3", "value3");
 
         Message m2 = new Message(m1);
 
         assertThat(m2.getHeaders().size(), is(2));
         assertThat((String) m2.getHeader("key1"), equalTo("value1"));
         assertThat((String) m2.getHeader("key2"), equalTo("value2"));
-        assertThat((String) m2.context().get("key3"), equalTo("value3"));
+        assertThat((String) m2.getFromContext("key3"), equalTo("value3"));
     }
 
     private static class Message extends TransportMessage<Message> {