DataStreamAlias.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. /*
  2. * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
  3. * or more contributor license agreements. Licensed under the Elastic License
  4. * 2.0 and the Server Side Public License, v 1; you may not use this file except
  5. * in compliance with, at your election, the Elastic License 2.0 or the Server
  6. * Side Public License, v 1.
  7. */
  8. package org.elasticsearch.cluster.metadata;
  9. import org.elasticsearch.Version;
  10. import org.elasticsearch.cluster.AbstractDiffable;
  11. import org.elasticsearch.cluster.Diff;
  12. import org.elasticsearch.common.bytes.BytesReference;
  13. import org.elasticsearch.common.compress.CompressedXContent;
  14. import org.elasticsearch.common.xcontent.ObjectParser;
  15. import org.elasticsearch.common.xcontent.ParseField;
  16. import org.elasticsearch.common.ParsingException;
  17. import org.elasticsearch.common.io.stream.StreamInput;
  18. import org.elasticsearch.common.io.stream.StreamOutput;
  19. import org.elasticsearch.common.xcontent.ConstructingObjectParser;
  20. import org.elasticsearch.common.xcontent.ToXContentFragment;
  21. import org.elasticsearch.common.xcontent.XContentBuilder;
  22. import org.elasticsearch.common.xcontent.XContentFactory;
  23. import org.elasticsearch.common.xcontent.XContentHelper;
  24. import org.elasticsearch.common.xcontent.XContentParser;
  25. import java.io.IOException;
  26. import java.io.UncheckedIOException;
  27. import java.util.HashSet;
  28. import java.util.List;
  29. import java.util.Map;
  30. import java.util.Objects;
  31. import java.util.Set;
  32. import java.util.function.Predicate;
  33. import java.util.stream.Collectors;
  34. public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implements ToXContentFragment {
  35. public static final ParseField DATA_STREAMS_FIELD = new ParseField("data_streams");
  36. public static final ParseField WRITE_DATA_STREAM_FIELD = new ParseField("write_data_stream");
  37. public static final ParseField FILTER_FIELD = new ParseField("filter");
  38. @SuppressWarnings("unchecked")
  39. private static final ConstructingObjectParser<DataStreamAlias, String> PARSER = new ConstructingObjectParser<>(
  40. "data_stream_alias",
  41. false,
  42. (args, name) -> new DataStreamAlias(name, (List<String>) args[0], (String) args[1], (CompressedXContent) args[2])
  43. );
  44. static {
  45. PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), DATA_STREAMS_FIELD);
  46. PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), WRITE_DATA_STREAM_FIELD);
  47. PARSER.declareField(
  48. ConstructingObjectParser.optionalConstructorArg(),
  49. (p, c) -> {
  50. if (p.currentToken() == XContentParser.Token.VALUE_EMBEDDED_OBJECT ||
  51. p.currentToken() == XContentParser.Token.VALUE_STRING) {
  52. return new CompressedXContent(p.binaryValue());
  53. } else if (p.currentToken() == XContentParser.Token.START_OBJECT) {
  54. XContentBuilder builder = XContentFactory.jsonBuilder().map(p.mapOrdered());
  55. return new CompressedXContent(BytesReference.bytes(builder));
  56. } else {
  57. assert false : "unexpected token [" + p.currentToken() + " ]";
  58. return null;
  59. }
  60. },
  61. FILTER_FIELD,
  62. ObjectParser.ValueType.VALUE_OBJECT_ARRAY
  63. );
  64. }
  65. private final String name;
  66. private final List<String> dataStreams;
  67. private final String writeDataStream;
  68. private final CompressedXContent filter;
  69. private DataStreamAlias(String name, List<String> dataStreams, String writeDataStream, CompressedXContent filter) {
  70. this.name = Objects.requireNonNull(name);
  71. this.dataStreams = List.copyOf(dataStreams);
  72. this.writeDataStream = writeDataStream;
  73. this.filter = filter;
  74. assert writeDataStream == null || dataStreams.contains(writeDataStream);
  75. }
  76. public DataStreamAlias(String name, List<String> dataStreams, String writeDataStream, Map<String, Object> filter) {
  77. this(name, dataStreams, writeDataStream, compress(filter));
  78. }
  79. private static CompressedXContent compress(Map<String, Object> filterAsMap) {
  80. if (filterAsMap == null) {
  81. return null;
  82. }
  83. try {
  84. XContentBuilder builder = XContentFactory.jsonBuilder().map(filterAsMap);
  85. return new CompressedXContent(BytesReference.bytes(builder));
  86. } catch (IOException e) {
  87. throw new UncheckedIOException(e);
  88. }
  89. }
  90. private static Map<String, Object> decompress(CompressedXContent filter) {
  91. String filterAsString = filter.string();
  92. return XContentHelper.convertToMap(XContentFactory.xContent(filterAsString), filterAsString, true);
  93. }
  94. public DataStreamAlias(StreamInput in) throws IOException {
  95. this.name = in.readString();
  96. this.dataStreams = in.readStringList();
  97. this.writeDataStream = in.readOptionalString();
  98. this.filter = in.getVersion().onOrAfter(Version.V_7_15_0) && in.readBoolean() ? CompressedXContent.readCompressedString(in) : null;
  99. }
  100. /**
  101. * Returns the name of this data stream alias.
  102. */
  103. public String getName() {
  104. return name;
  105. }
  106. /**
  107. * Returns the data streams that are referenced
  108. */
  109. public List<String> getDataStreams() {
  110. return dataStreams;
  111. }
  112. /**
  113. * Returns the write data stream this data stream alias is referring to.
  114. * Write requests targeting this instance will resolve the write index
  115. * of the write data stream this alias is referring to.
  116. *
  117. * Note that the write data stream is also included in {@link #getDataStreams()}.
  118. */
  119. public String getWriteDataStream() {
  120. return writeDataStream;
  121. }
  122. public CompressedXContent getFilter() {
  123. return filter;
  124. }
  125. /**
  126. * Returns a new {@link DataStreamAlias} instance with the provided data stream name added to it as a new member.
  127. * If the provided isWriteDataStream is set to <code>true</code> then the provided data stream is also set as write data stream.
  128. * If the provided isWriteDataStream is set to <code>false</code> and the provided data stream is also the write data stream of
  129. * this instance then the returned data stream alias instance's write data stream is unset.
  130. * If the provided filter is the same as the filter of this alias then this instance isn't updated, otherwise it is updated.
  131. *
  132. * The same instance is returned if the attempted addition of the provided data stream didn't change this instance.
  133. */
  134. public DataStreamAlias update(String dataStream, Boolean isWriteDataStream, Map<String, Object> filterAsMap) {
  135. String writeDataStream = this.writeDataStream;
  136. if (isWriteDataStream != null) {
  137. if (isWriteDataStream) {
  138. writeDataStream = dataStream;
  139. } else {
  140. if (dataStream.equals(writeDataStream)) {
  141. writeDataStream = null;
  142. }
  143. }
  144. }
  145. boolean filterUpdated;
  146. CompressedXContent filter;
  147. if (filterAsMap != null) {
  148. filter = compress(filterAsMap);
  149. if (this.filter == null) {
  150. filterUpdated = true;
  151. } else {
  152. filterUpdated = filterAsMap.equals(decompress(this.filter)) == false;
  153. }
  154. } else {
  155. filter = this.filter;
  156. filterUpdated = false;
  157. }
  158. Set<String> dataStreams = new HashSet<>(this.dataStreams);
  159. boolean added = dataStreams.add(dataStream);
  160. if (added || Objects.equals(this.writeDataStream, writeDataStream) == false || filterUpdated) {
  161. return new DataStreamAlias(name, List.copyOf(dataStreams), writeDataStream, filter);
  162. } else {
  163. return this;
  164. }
  165. }
  166. /**
  167. * Returns a {@link DataStreamAlias} instance based on this instance but with the specified data stream no longer referenced.
  168. * Returns <code>null</code> if because of the removal of the provided data stream name a new instance wouldn't reference to
  169. * any data stream. The same instance is returned if the attempted removal of the provided data stream didn't change this instance.
  170. */
  171. public DataStreamAlias removeDataStream(String dataStream) {
  172. Set<String> dataStreams = new HashSet<>(this.dataStreams);
  173. boolean removed = dataStreams.remove(dataStream);
  174. if (removed == false) {
  175. return this;
  176. }
  177. if (dataStreams.isEmpty()) {
  178. return null;
  179. } else {
  180. String writeDataStream = this.writeDataStream;
  181. if (dataStream.equals(writeDataStream)) {
  182. writeDataStream = null;
  183. }
  184. return new DataStreamAlias(name, List.copyOf(dataStreams), writeDataStream, filter);
  185. }
  186. }
  187. /**
  188. * Returns a new {@link DataStreamAlias} instance that contains a new intersection
  189. * of data streams from this instance and the provided filter.
  190. *
  191. * The write data stream gets set to null in the returned instance if the write
  192. * data stream no longer appears in the intersection.
  193. */
  194. public DataStreamAlias intersect(Predicate<String> filter) {
  195. List<String> intersectingDataStreams = this.dataStreams.stream()
  196. .filter(filter)
  197. .collect(Collectors.toList());
  198. String writeDataStream = this.writeDataStream;
  199. if (intersectingDataStreams.contains(writeDataStream) == false) {
  200. writeDataStream = null;
  201. }
  202. return new DataStreamAlias(this.name, intersectingDataStreams, writeDataStream, this.filter);
  203. }
  204. /**
  205. * Returns a new {@link DataStreamAlias} instance containing data streams referenced in this instance
  206. * and the other instance. If this instance doesn't have a write data stream then the write index of
  207. * the other data stream becomes the write data stream of the returned instance.
  208. */
  209. public DataStreamAlias merge(DataStreamAlias other) {
  210. Set<String> mergedDataStreams = new HashSet<>(other.getDataStreams());
  211. mergedDataStreams.addAll(this.getDataStreams());
  212. String writeDataStream = this.writeDataStream;
  213. if (writeDataStream == null) {
  214. if (other.getWriteDataStream() != null && mergedDataStreams.contains(other.getWriteDataStream())) {
  215. writeDataStream = other.getWriteDataStream();
  216. }
  217. }
  218. return new DataStreamAlias(this.name, List.copyOf(mergedDataStreams), writeDataStream, filter);
  219. }
  220. /**
  221. * Returns a new instance with potentially renamed data stream names and write data stream name.
  222. * If a data stream name matches with the provided rename pattern then it is renamed according
  223. * to the provided rename replacement.
  224. */
  225. public DataStreamAlias renameDataStreams(String renamePattern, String renameReplacement) {
  226. List<String> renamedDataStreams = this.dataStreams.stream()
  227. .map(s -> s.replaceAll(renamePattern, renameReplacement))
  228. .collect(Collectors.toList());
  229. String writeDataStream = this.writeDataStream;
  230. if (writeDataStream != null) {
  231. writeDataStream = writeDataStream.replaceAll(renamePattern, renameReplacement);
  232. }
  233. return new DataStreamAlias(this.name, renamedDataStreams, writeDataStream, filter);
  234. }
  235. public static Diff<DataStreamAlias> readDiffFrom(StreamInput in) throws IOException {
  236. return readDiffFrom(DataStreamAlias::new, in);
  237. }
  238. public static DataStreamAlias fromXContent(XContentParser parser) throws IOException {
  239. XContentParser.Token token = parser.currentToken();
  240. if (token != XContentParser.Token.FIELD_NAME) {
  241. throw new ParsingException(parser.getTokenLocation(), "unexpected token");
  242. }
  243. String name = parser.currentName();
  244. return PARSER.parse(parser, name);
  245. }
  246. @Override
  247. public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
  248. builder.startObject(name);
  249. builder.field(DATA_STREAMS_FIELD.getPreferredName(), dataStreams);
  250. if (writeDataStream != null) {
  251. builder.field(WRITE_DATA_STREAM_FIELD.getPreferredName(), writeDataStream);
  252. }
  253. if (filter != null) {
  254. boolean binary = params.paramAsBoolean("binary", false);
  255. if (binary) {
  256. builder.field("filter", filter.compressed());
  257. } else {
  258. builder.field("filter", XContentHelper.convertToMap(filter.uncompressed(), true).v2());
  259. }
  260. }
  261. builder.endObject();
  262. return builder;
  263. }
  264. @Override
  265. public void writeTo(StreamOutput out) throws IOException {
  266. out.writeString(name);
  267. out.writeStringCollection(dataStreams);
  268. out.writeOptionalString(writeDataStream);
  269. if (out.getVersion().onOrAfter(Version.V_7_15_0)) {
  270. if (filter != null) {
  271. out.writeBoolean(true);
  272. filter.writeTo(out);
  273. } else {
  274. out.writeBoolean(false);
  275. }
  276. }
  277. }
  278. @Override
  279. public boolean equals(Object o) {
  280. if (this == o) return true;
  281. if (o == null || getClass() != o.getClass()) return false;
  282. DataStreamAlias that = (DataStreamAlias) o;
  283. return Objects.equals(name, that.name) &&
  284. Objects.equals(dataStreams, that.dataStreams) &&
  285. Objects.equals(writeDataStream, that.writeDataStream) &&
  286. Objects.equals(filter, that.filter);
  287. }
  288. @Override
  289. public int hashCode() {
  290. return Objects.hash(name, dataStreams, writeDataStream, filter);
  291. }
  292. @Override
  293. public String toString() {
  294. return "DataStreamAlias{" +
  295. "name='" + name + '\'' +
  296. ", dataStreams=" + dataStreams +
  297. ", writeDataStream='" + writeDataStream + '\'' +
  298. ", filter=" + filter.string() +
  299. '}';
  300. }
  301. }