|
@@ -23,10 +23,16 @@ import org.elasticsearch.ExceptionsHelper;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.common.io.stream.Writeable;
|
|
|
+import org.elasticsearch.common.logging.DeprecationLogger;
|
|
|
import org.elasticsearch.common.logging.ESLoggerFactory;
|
|
|
import org.elasticsearch.common.settings.Setting;
|
|
|
import org.elasticsearch.common.settings.Setting.Property;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
+import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
+import org.elasticsearch.http.HttpTransportSettings;
|
|
|
+
|
|
|
+import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
|
|
|
+import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE;
|
|
|
|
|
|
import java.io.Closeable;
|
|
|
import java.io.IOException;
|
|
@@ -39,13 +45,14 @@ import java.util.Optional;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.CancellationException;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
-import java.util.concurrent.FutureTask;
|
|
|
import java.util.concurrent.RunnableFuture;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.function.Supplier;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with
|
|
@@ -81,6 +88,8 @@ public final class ThreadContext implements Closeable, Writeable {
|
|
|
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
|
|
|
private final Map<String, String> defaultHeader;
|
|
|
private final ContextThreadLocal threadLocal;
|
|
|
+ private final int maxWarningHeaderCount;
|
|
|
+ private final long maxWarningHeaderSize;
|
|
|
|
|
|
/**
|
|
|
* Creates a new ThreadContext instance
|
|
@@ -98,6 +107,8 @@ public final class ThreadContext implements Closeable, Writeable {
|
|
|
this.defaultHeader = Collections.unmodifiableMap(defaultHeader);
|
|
|
}
|
|
|
threadLocal = new ContextThreadLocal();
|
|
|
+ this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
|
|
|
+ this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -282,7 +293,7 @@ public final class ThreadContext implements Closeable, Writeable {
|
|
|
* @param uniqueValue the function that produces de-duplication values
|
|
|
*/
|
|
|
public void addResponseHeader(final String key, final String value, final Function<String, String> uniqueValue) {
|
|
|
- threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue));
|
|
|
+ threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -359,7 +370,7 @@ public final class ThreadContext implements Closeable, Writeable {
|
|
|
private final Map<String, Object> transientHeaders;
|
|
|
private final Map<String, List<String>> responseHeaders;
|
|
|
private final boolean isSystemContext;
|
|
|
-
|
|
|
+ private long warningHeadersSize; //saving current warning headers' size not to recalculate the size with every new warning header
|
|
|
private ThreadContextStruct(StreamInput in) throws IOException {
|
|
|
final int numRequest = in.readVInt();
|
|
|
Map<String, String> requestHeaders = numRequest == 0 ? Collections.emptyMap() : new HashMap<>(numRequest);
|
|
@@ -371,6 +382,7 @@ public final class ThreadContext implements Closeable, Writeable {
|
|
|
this.responseHeaders = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
|
|
|
this.transientHeaders = Collections.emptyMap();
|
|
|
isSystemContext = false; // we never serialize this it's a transient flag
|
|
|
+ this.warningHeadersSize = 0L;
|
|
|
}
|
|
|
|
|
|
private ThreadContextStruct setSystemContext() {
|
|
@@ -387,6 +399,18 @@ public final class ThreadContext implements Closeable, Writeable {
|
|
|
this.responseHeaders = responseHeaders;
|
|
|
this.transientHeaders = transientHeaders;
|
|
|
this.isSystemContext = isSystemContext;
|
|
|
+ this.warningHeadersSize = 0L;
|
|
|
+ }
|
|
|
+
|
|
|
+ private ThreadContextStruct(Map<String, String> requestHeaders,
|
|
|
+ Map<String, List<String>> responseHeaders,
|
|
|
+ Map<String, Object> transientHeaders, boolean isSystemContext,
|
|
|
+ long warningHeadersSize) {
|
|
|
+ this.requestHeaders = requestHeaders;
|
|
|
+ this.responseHeaders = responseHeaders;
|
|
|
+ this.transientHeaders = transientHeaders;
|
|
|
+ this.isSystemContext = isSystemContext;
|
|
|
+ this.warningHeadersSize = warningHeadersSize;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -440,30 +464,58 @@ public final class ThreadContext implements Closeable, Writeable {
|
|
|
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext);
|
|
|
}
|
|
|
|
|
|
- private ThreadContextStruct putResponse(final String key, final String value, final Function<String, String> uniqueValue) {
|
|
|
+ private ThreadContextStruct putResponse(final String key, final String value, final Function<String, String> uniqueValue,
|
|
|
+ final int maxWarningHeaderCount, final long maxWarningHeaderSize) {
|
|
|
assert value != null;
|
|
|
+ long newWarningHeaderSize = warningHeadersSize;
|
|
|
+ //check if we can add another warning header - if max size within limits
|
|
|
+ if (key.equals("Warning") && (maxWarningHeaderSize != -1)) { //if size is NOT unbounded, check its limits
|
|
|
+ if (warningHeadersSize > maxWarningHeaderSize) { // if max size has already been reached before
|
|
|
+ final String message = "Dropping a warning header, as their total size reached the maximum allowed of [" +
|
|
|
+ maxWarningHeaderSize + "] bytes set in [" +
|
|
|
+ HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE.getKey() + "]!";
|
|
|
+ ESLoggerFactory.getLogger(ThreadContext.class).warn(message);
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+ newWarningHeaderSize += "Warning".getBytes(StandardCharsets.UTF_8).length + value.getBytes(StandardCharsets.UTF_8).length;
|
|
|
+ if (newWarningHeaderSize > maxWarningHeaderSize) {
|
|
|
+ final String message = "Dropping a warning header, as their total size reached the maximum allowed of [" +
|
|
|
+ maxWarningHeaderSize + "] bytes set in [" +
|
|
|
+ HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE.getKey() + "]!";
|
|
|
+ ESLoggerFactory.getLogger(ThreadContext.class).warn(message);
|
|
|
+ return new ThreadContextStruct(requestHeaders, responseHeaders, transientHeaders, isSystemContext, newWarningHeaderSize);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
final Map<String, List<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
|
|
|
final List<String> existingValues = newResponseHeaders.get(key);
|
|
|
-
|
|
|
if (existingValues != null) {
|
|
|
final Set<String> existingUniqueValues = existingValues.stream().map(uniqueValue).collect(Collectors.toSet());
|
|
|
assert existingValues.size() == existingUniqueValues.size();
|
|
|
if (existingUniqueValues.contains(uniqueValue.apply(value))) {
|
|
|
return this;
|
|
|
}
|
|
|
-
|
|
|
final List<String> newValues = new ArrayList<>(existingValues);
|
|
|
newValues.add(value);
|
|
|
-
|
|
|
newResponseHeaders.put(key, Collections.unmodifiableList(newValues));
|
|
|
} else {
|
|
|
newResponseHeaders.put(key, Collections.singletonList(value));
|
|
|
}
|
|
|
|
|
|
- return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext);
|
|
|
+ //check if we can add another warning header - if max count within limits
|
|
|
+ if ((key.equals("Warning")) && (maxWarningHeaderCount != -1)) { //if count is NOT unbounded, check its limits
|
|
|
+ final int warningHeaderCount = newResponseHeaders.containsKey("Warning") ? newResponseHeaders.get("Warning").size() : 0;
|
|
|
+ if (warningHeaderCount > maxWarningHeaderCount) {
|
|
|
+ final String message = "Dropping a warning header, as their total count reached the maximum allowed of [" +
|
|
|
+ maxWarningHeaderCount + "] set in [" + HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT.getKey() + "]!";
|
|
|
+ ESLoggerFactory.getLogger(ThreadContext.class).warn(message);
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext, newWarningHeaderSize);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
private ThreadContextStruct putTransient(String key, Object value) {
|
|
|
Map<String, Object> newTransient = new HashMap<>(this.transientHeaders);
|
|
|
if (newTransient.putIfAbsent(key, value) != null) {
|