create-bwc-index.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. # Licensed to Elasticsearch under one or more contributor
  2. # license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright
  4. # ownership. Elasticsearch licenses this file to you under
  5. # the Apache License, Version 2.0 (the "License"); you may
  6. # not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on
  13. # an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  14. # either express or implied. See the License for the specific
  15. # language governing permissions and limitations under the License.
  16. import random
  17. import os
  18. import tempfile
  19. import shutil
  20. import subprocess
  21. import time
  22. import argparse
  23. import logging
  24. import sys
  25. import re
  26. if sys.version_info[0] > 2:
  27. print('%s must use python 2.x (for the ES python client)' % sys.argv[0])
  28. from datetime import datetime
  29. try:
  30. from elasticsearch import Elasticsearch
  31. from elasticsearch.exceptions import ConnectionError
  32. from elasticsearch.exceptions import TransportError
  33. except ImportError as e:
  34. print('Can\'t import elasticsearch please install `sudo pip install elasticsearch`')
  35. sys.exit(1)
  36. # sometimes returns True
  37. def rarely():
  38. return random.randint(0, 10) == 0
  39. # usually returns True
  40. def frequently():
  41. return not rarely()
  42. # asserts the correctness of the given hits given they are sorted asc
  43. def assert_sort(hits):
  44. values = [hit['sort'] for hit in hits['hits']['hits']]
  45. assert len(values) > 0, 'expected non emtpy result'
  46. val = min(values)
  47. for x in values:
  48. assert x >= val, '%s >= %s' % (x, val)
  49. val = x
  50. # Indexes the given number of document into the given index
  51. # and randomly runs refresh, optimize and flush commands
  52. def index_documents(es, index_name, type, num_docs):
  53. logging.info('Indexing %s docs' % num_docs)
  54. for id in range(0, num_docs):
  55. es.index(index=index_name, doc_type=type, id=id, body={'string': str(random.randint(0, 100)),
  56. 'long_sort': random.randint(0, 100),
  57. 'double_sort' : float(random.randint(0, 100))})
  58. if rarely():
  59. es.indices.refresh(index=index_name)
  60. if rarely():
  61. es.indices.flush(index=index_name, force=frequently())
  62. if rarely():
  63. es.indices.optimize(index=index_name)
  64. logging.info('Flushing index')
  65. es.indices.flush(index=index_name)
  66. def run_basic_asserts(es, index_name, type, num_docs):
  67. count = es.count(index=index_name)['count']
  68. assert count == num_docs, 'Expected %r but got %r documents' % (num_docs, count)
  69. for _ in range(0, num_docs):
  70. random_doc_id = random.randint(0, num_docs-1)
  71. doc = es.get(index=index_name, doc_type=type, id=random_doc_id)
  72. assert doc, 'Expected document for id %s but got %s' % (random_doc_id, doc)
  73. assert_sort(es.search(index=index_name,
  74. body={
  75. 'sort': [
  76. {'double_sort': {'order': 'asc'}}
  77. ]
  78. }))
  79. assert_sort(es.search(index=index_name,
  80. body={
  81. 'sort': [
  82. {'long_sort': {'order': 'asc'}}
  83. ]
  84. }))
  85. def build_version(version_tuple):
  86. return '.'.join([str(x) for x in version_tuple])
  87. def build_tuple(version_string):
  88. return [int(x) for x in version_string.split('.')]
  89. def start_node(version, release_dir, data_dir, tcp_port, http_port):
  90. logging.info('Starting node from %s on port %s/%s' % (release_dir, tcp_port, http_port))
  91. cmd = [
  92. os.path.join(release_dir, 'bin/elasticsearch'),
  93. '-Des.path.data=%s' % data_dir,
  94. '-Des.path.logs=logs',
  95. '-Des.cluster.name=bwc_index_' + version,
  96. '-Des.network.host=localhost',
  97. '-Des.discovery.zen.ping.multicast.enabled=false',
  98. '-Des.script.disable_dynamic=true',
  99. '-Des.transport.tcp.port=%s' % tcp_port,
  100. '-Des.http.port=%s' % http_port
  101. ]
  102. if version.startswith('0.') or version.startswith('1.0.0.Beta') :
  103. cmd.append('-f') # version before 1.0 start in background automatically
  104. return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  105. def create_client(http_port, timeout=30):
  106. logging.info('Waiting for node to startup')
  107. for _ in range(0, timeout):
  108. # TODO: ask Honza if there is a better way to do this?
  109. try:
  110. client = Elasticsearch([{'host': '127.0.0.1', 'port': http_port}])
  111. client.cluster.health(wait_for_nodes=1)
  112. client.count() # can we actually search or do we get a 503? -- anyway retry
  113. return client
  114. except (ConnectionError, TransportError):
  115. pass
  116. time.sleep(1)
  117. assert False, 'Timed out waiting for node for %s seconds' % timeout
  118. def generate_index(client, version):
  119. client.indices.delete(index='test', ignore=404)
  120. num_shards = random.randint(1, 10)
  121. num_replicas = random.randint(0, 1)
  122. logging.info('Create single shard test index')
  123. mappings = {}
  124. if not version.startswith('2.'):
  125. # TODO: we need better "before/onOr/after" logic in python
  126. # backcompat test for legacy type level analyzer settings, see #8874
  127. mappings['analyzer_type1'] = {
  128. 'analyzer': 'standard',
  129. 'properties': {
  130. 'string_with_index_analyzer': {
  131. 'type': 'string',
  132. 'index_analyzer': 'standard'
  133. },
  134. 'completion_with_index_analyzer': {
  135. 'type': 'completion',
  136. 'index_analyzer': 'standard'
  137. }
  138. }
  139. }
  140. mappings['analyzer_type2'] = {
  141. 'index_analyzer': 'standard',
  142. 'search_analyzer': 'keyword',
  143. 'search_quote_analyzer': 'english',
  144. }
  145. mappings['index_name_and_path'] = {
  146. 'properties': {
  147. 'parent_multi_field': {
  148. 'type': 'string',
  149. 'path': 'just_name',
  150. 'fields': {
  151. 'raw': {'type': 'string', 'index': 'not_analyzed', 'index_name': 'raw_multi_field'}
  152. }
  153. },
  154. 'field_with_index_name': {
  155. 'type': 'string',
  156. 'index_name': 'custom_index_name_for_field'
  157. }
  158. }
  159. }
  160. mappings['meta_fields'] = {
  161. '_id': {
  162. 'path': 'myid'
  163. },
  164. '_routing': {
  165. 'path': 'myrouting'
  166. }
  167. }
  168. client.indices.create(index='test', body={
  169. 'settings': {
  170. 'number_of_shards': 1,
  171. 'number_of_replicas': 0
  172. },
  173. 'mappings': mappings
  174. })
  175. health = client.cluster.health(wait_for_status='green', wait_for_relocating_shards=0)
  176. assert health['timed_out'] == False, 'cluster health timed out %s' % health
  177. num_docs = random.randint(10, 100)
  178. index_documents(client, 'test', 'doc', num_docs)
  179. logging.info('Running basic asserts on the data added')
  180. run_basic_asserts(client, 'test', 'doc', num_docs)
  181. def snapshot_index(client, cfg):
  182. # Add bogus persistent settings to make sure they can be restored
  183. client.cluster.put_settings(body={
  184. 'persistent': {
  185. 'cluster.routing.allocation.exclude.version_attr': cfg.version
  186. }
  187. })
  188. client.indices.put_template(name='template_' + cfg.version.lower(), order=0, body={
  189. "template": "te*",
  190. "settings": {
  191. "number_of_shards" : 1
  192. },
  193. "mappings": {
  194. "type1": {
  195. "_source": { "enabled" : False }
  196. }
  197. },
  198. "aliases": {
  199. "alias1": {},
  200. "alias2": {
  201. "filter": {
  202. "term": {"version" : cfg.version }
  203. },
  204. "routing": "kimchy"
  205. },
  206. "{index}-alias": {}
  207. }
  208. })
  209. client.snapshot.create_repository(repository='test_repo', body={
  210. 'type': 'fs',
  211. 'settings': {
  212. 'location': cfg.repo_dir
  213. }
  214. })
  215. client.snapshot.create(repository='test_repo', snapshot='test_1', wait_for_completion=True)
  216. def compress_index(version, tmp_dir, output_dir):
  217. compress(tmp_dir, output_dir, 'index-%s.zip' % version, 'data')
  218. def compress_repo(version, tmp_dir, output_dir):
  219. compress(tmp_dir, output_dir, 'repo-%s.zip' % version, 'repo')
  220. def compress(tmp_dir, output_dir, zipfile, directory):
  221. abs_output_dir = os.path.abspath(output_dir)
  222. zipfile = os.path.join(abs_output_dir, zipfile)
  223. if os.path.exists(zipfile):
  224. os.remove(zipfile)
  225. logging.info('Compressing index into %s', zipfile)
  226. olddir = os.getcwd()
  227. os.chdir(tmp_dir)
  228. subprocess.check_call('zip -r %s %s' % (zipfile, directory), shell=True)
  229. os.chdir(olddir)
  230. def parse_config():
  231. parser = argparse.ArgumentParser(description='Builds an elasticsearch index for backwards compatibility tests')
  232. parser.add_argument('version', metavar='X.Y.Z',
  233. help='The elasticsearch version to build an index for')
  234. parser.add_argument('--releases-dir', '-d', default='backwards', metavar='DIR',
  235. help='The directory containing elasticsearch releases')
  236. parser.add_argument('--output-dir', '-o', default='src/test/resources/org/elasticsearch/bwcompat',
  237. help='The directory to write the zipped index into')
  238. parser.add_argument('--tcp-port', default=9300, type=int,
  239. help='The port to use as the minimum port for TCP communication')
  240. parser.add_argument('--http-port', default=9200, type=int,
  241. help='The port to use as the minimum port for HTTP communication')
  242. cfg = parser.parse_args()
  243. cfg.release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % cfg.version)
  244. if not os.path.exists(cfg.release_dir):
  245. parser.error('ES version %s does not exist in %s' % (cfg.version, cfg.releases_dir))
  246. if not os.path.exists(cfg.output_dir):
  247. parser.error('Output directory does not exist: %s' % cfg.output_dir)
  248. cfg.tmp_dir = tempfile.mkdtemp()
  249. cfg.data_dir = os.path.join(cfg.tmp_dir, 'data')
  250. cfg.repo_dir = os.path.join(cfg.tmp_dir, 'repo')
  251. logging.info('Temp data dir: %s' % cfg.data_dir)
  252. logging.info('Temp repo dir: %s' % cfg.repo_dir)
  253. cfg.snapshot_supported = not (cfg.version.startswith('0.') or cfg.version == '1.0.0.Beta1')
  254. return cfg
  255. def main():
  256. logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
  257. datefmt='%Y-%m-%d %I:%M:%S %p')
  258. logging.getLogger('elasticsearch').setLevel(logging.ERROR)
  259. logging.getLogger('urllib3').setLevel(logging.WARN)
  260. cfg = parse_config()
  261. try:
  262. node = start_node(cfg.version, cfg.release_dir, cfg.data_dir, cfg.tcp_port, cfg.http_port)
  263. client = create_client(cfg.http_port)
  264. generate_index(client, cfg.version)
  265. if cfg.snapshot_supported:
  266. snapshot_index(client, cfg)
  267. finally:
  268. if 'node' in vars():
  269. logging.info('Shutting down node with pid %d', node.pid)
  270. node.terminate()
  271. time.sleep(1) # some nodes take time to terminate
  272. compress_index(cfg.version, cfg.tmp_dir, cfg.output_dir)
  273. if cfg.snapshot_supported:
  274. compress_repo(cfg.version, cfg.tmp_dir, cfg.output_dir)
  275. if __name__ == '__main__':
  276. try:
  277. main()
  278. except KeyboardInterrupt:
  279. print('Caught keyboard interrupt, exiting...')