create_bwc_index.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. # Licensed to Elasticsearch under one or more contributor
  2. # license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright
  4. # ownership. Elasticsearch licenses this file to you under
  5. # the Apache License, Version 2.0 (the "License"); you may
  6. # not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on
  13. # an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  14. # either express or implied. See the License for the specific
  15. # language governing permissions and limitations under the License.
  16. import argparse
  17. import base64
  18. import glob
  19. import logging
  20. import os
  21. import random
  22. import shutil
  23. import subprocess
  24. import sys
  25. import tempfile
  26. import time
  27. DEFAULT_TRANSPORT_TCP_PORT = 9300
  28. DEFAULT_HTTP_TCP_PORT = 9200
  29. if sys.version_info[0] < 3:
  30. print('%s must use python 3.x (for the ES python client)' % sys.argv[0])
  31. try:
  32. from elasticsearch import Elasticsearch
  33. from elasticsearch.exceptions import ConnectionError
  34. from elasticsearch.exceptions import TransportError
  35. except ImportError as e:
  36. print('Can\'t import elasticsearch please install `sudo pip3 install elasticsearch`')
  37. sys.exit(1)
  38. # sometimes returns True
  39. def rarely():
  40. return random.randint(0, 10) == 0
  41. # usually returns True
  42. def frequently():
  43. return not rarely()
  44. def capabilities_of(version):
  45. current_version = parse_version(version)
  46. return {
  47. 'warmers': current_version < parse_version('2.0.0-alpha1'),
  48. 'dots_in_field_names': current_version >= parse_version('2.4.0'),
  49. 'lenient_booleans': current_version < parse_version('6.0.0-alpha1')
  50. }
  51. def falsy(lenient):
  52. return random.choice(['off', 'no', '0', 0, 'false', False]) if lenient else False
  53. def truthy(lenient):
  54. return random.choice(['on', 'yes', '1', 1, 'true', True]) if lenient else True
  55. def random_bool(lenient):
  56. return random.choice([falsy, truthy])(lenient)
  57. # asserts the correctness of the given hits given they are sorted asc
  58. def assert_sort(hits):
  59. values = [hit['sort'] for hit in hits['hits']['hits']]
  60. assert len(values) > 0, 'expected non emtpy result'
  61. val = min(values)
  62. for x in values:
  63. assert x >= val, '%s >= %s' % (x, val)
  64. val = x
  65. # Indexes the given number of document into the given index
  66. # and randomly runs refresh, optimize and flush commands
  67. def index_documents(es, index_name, type, num_docs, capabilities):
  68. logging.info('Indexing %s docs' % num_docs)
  69. index(es, index_name, type, num_docs, capabilities, flush=True)
  70. logging.info('Flushing index')
  71. es.indices.flush(index=index_name)
  72. def index(es, index_name, type, num_docs, capabilities, flush=False):
  73. for id in range(0, num_docs):
  74. lenient_bool = capabilities['lenient_booleans']
  75. body = {
  76. 'string': str(random.randint(0, 100)),
  77. 'long_sort': random.randint(0, 100),
  78. 'double_sort': float(random.randint(0, 100)),
  79. # be sure to create a "proper" boolean (True, False) for the first document so that automapping is correct
  80. 'bool': random_bool(lenient_bool) if id > 0 else random.choice([True, False])
  81. }
  82. if capabilities['dots_in_field_names']:
  83. body['field.with.dots'] = str(random.randint(0, 100))
  84. body['binary'] = base64.b64encode(bytearray(random.getrandbits(8) for _ in range(16))).decode('ascii')
  85. es.index(index=index_name, doc_type=type, id=id, body=body)
  86. if rarely():
  87. es.indices.refresh(index=index_name)
  88. if rarely() and flush:
  89. es.indices.flush(index=index_name, force=frequently())
  90. def reindex_docs(es, index_name, type, num_docs, capabilities):
  91. logging.info('Re-indexing %s docs' % num_docs)
  92. # TODO: Translog recovery fails on mixed representation of booleans as strings / booleans (e.g. "true", true)
  93. # (see gradle :core:test -Dtests.seed=AF7BB7B3FA387AAE -Dtests.class=org.elasticsearch.index.engine.InternalEngineTests
  94. # -Dtests.method="testUpgradeOldIndex")
  95. capabilities['lenient_booleans'] = False
  96. # reindex some docs after the flush such that we have something in the translog
  97. index(es, index_name, type, num_docs, capabilities)
  98. def delete_by_query(es, version, index_name, doc_type):
  99. logging.info('Deleting long_sort:[10..20] docs')
  100. query = {'query':
  101. {'range':
  102. {'long_sort':
  103. {'gte': 10,
  104. 'lte': 20}}}}
  105. if version.startswith('0.') or version in ('1.0.0.Beta1', '1.0.0.Beta2'):
  106. # TODO #10262: we can't write DBQ into the translog for these old versions until we fix this back-compat bug:
  107. # #4074: these versions don't expect to see the top-level 'query' to count/delete_by_query:
  108. query = query['query']
  109. return
  110. deleted_count = es.count(index=index_name, doc_type=doc_type, body=query)['count']
  111. result = es.delete_by_query(index=index_name,
  112. doc_type=doc_type,
  113. body=query)
  114. # make sure no shards failed:
  115. assert result['_indices'][index_name]['_shards']['failed'] == 0, 'delete by query failed: %s' % result
  116. logging.info('Deleted %d docs' % deleted_count)
  117. def run_basic_asserts(es, version, index_name, type, num_docs):
  118. count = es.count(index=index_name)['count']
  119. assert count == num_docs, 'Expected %r but got %r documents' % (num_docs, count)
  120. if parse_version(version) < parse_version('5.1.0'):
  121. # This alias isn't allowed to be created after 5.1 so we can verify that we can still use it
  122. count = es.count(index='#' + index_name)['count']
  123. assert count == num_docs, 'Expected %r but got %r documents' % (num_docs, count)
  124. for _ in range(0, num_docs):
  125. random_doc_id = random.randint(0, num_docs-1)
  126. doc = es.get(index=index_name, doc_type=type, id=random_doc_id)
  127. assert doc, 'Expected document for id %s but got %s' % (random_doc_id, doc)
  128. assert_sort(es.search(index=index_name,
  129. body={
  130. 'sort': [
  131. {'double_sort': {'order': 'asc'}}
  132. ]
  133. }))
  134. assert_sort(es.search(index=index_name,
  135. body={
  136. 'sort': [
  137. {'long_sort': {'order': 'asc'}}
  138. ]
  139. }))
  140. def build_version(version_tuple):
  141. return '.'.join([str(x) for x in version_tuple])
  142. def build_tuple(version_string):
  143. return [int(x) for x in version_string.split('.')]
  144. def start_node(version, release_dir, data_dir, repo_dir, tcp_port=DEFAULT_TRANSPORT_TCP_PORT, http_port=DEFAULT_HTTP_TCP_PORT, cluster_name=None):
  145. logging.info('Starting node from %s on port %s/%s, data_dir %s' % (release_dir, tcp_port, http_port, data_dir))
  146. if cluster_name is None:
  147. cluster_name = 'bwc_index_' + version
  148. if parse_version(version) < parse_version("5.0.0-alpha1"):
  149. prefix = '-Des.'
  150. else:
  151. prefix = '-E'
  152. cmd = [
  153. os.path.join(release_dir, 'bin/elasticsearch'),
  154. '%spath.data=%s' % (prefix, data_dir),
  155. '%spath.logs=logs' % prefix,
  156. '%scluster.name=%s' % (prefix, cluster_name),
  157. '%snetwork.host=localhost' % prefix,
  158. '%stransport.tcp.port=%s' % (prefix, tcp_port),
  159. '%shttp.port=%s' % (prefix, http_port),
  160. '%spath.repo=%s' % (prefix, repo_dir)
  161. ]
  162. if version.startswith('0.') or version.startswith('1.0.0.Beta') :
  163. cmd.append('-f') # version before 1.0 start in background automatically
  164. return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
  165. env=dict(os.environ, ES_JAVA_OPTS='-Dmapper.allow_dots_in_name=true'))
  166. def install_plugin(version, release_dir, plugin_name):
  167. run_plugin(version, release_dir, 'install', [plugin_name])
  168. def remove_plugin(version, release_dir, plugin_name):
  169. run_plugin(version, release_dir, 'remove', [plugin_name])
  170. def run_plugin(version, release_dir, plugin_cmd, args):
  171. cmd = [os.path.join(release_dir, 'bin/elasticsearch-plugin'), plugin_cmd] + args
  172. subprocess.check_call(cmd)
  173. def create_client(http_port=DEFAULT_HTTP_TCP_PORT, timeout=30):
  174. logging.info('Waiting for node to startup')
  175. for _ in range(0, timeout):
  176. # TODO: ask Honza if there is a better way to do this?
  177. try:
  178. client = Elasticsearch([{'host': 'localhost', 'port': http_port}])
  179. client.cluster.health(wait_for_nodes=1)
  180. client.count() # can we actually search or do we get a 503? -- anyway retry
  181. return client
  182. except (ConnectionError, TransportError):
  183. pass
  184. time.sleep(1)
  185. assert False, 'Timed out waiting for node for %s seconds' % timeout
  186. def generate_index(client, version, index_name):
  187. client.indices.delete(index=index_name, ignore=404)
  188. logging.info('Create single shard test index')
  189. capabilities = capabilities_of(version)
  190. lenient_booleans = capabilities['lenient_booleans']
  191. mappings = {}
  192. warmers = {}
  193. if capabilities['warmers']:
  194. warmers['warmer1'] = {
  195. 'source': {
  196. 'query': {
  197. 'match_all': {}
  198. }
  199. }
  200. }
  201. # backcompat test for legacy type level analyzer settings, see #8874
  202. mappings['analyzer_type1'] = {
  203. 'analyzer': 'standard',
  204. 'properties': {
  205. 'string_with_index_analyzer': {
  206. 'type': 'string',
  207. 'index_analyzer': 'standard'
  208. },
  209. }
  210. }
  211. # completion type was added in 0.90.3
  212. if not version.startswith('0.20') and version not in ['0.90.0.Beta1', '0.90.0.RC1', '0.90.0.RC2', '0.90.0', '0.90.1', '0.90.2']:
  213. mappings['analyzer_type1']['properties']['completion_with_index_analyzer'] = {
  214. 'type': 'completion',
  215. 'index_analyzer': 'standard'
  216. }
  217. mappings['analyzer_type2'] = {
  218. 'index_analyzer': 'standard',
  219. 'search_analyzer': 'keyword',
  220. 'search_quote_analyzer': 'english',
  221. }
  222. mappings['index_name_and_path'] = {
  223. 'properties': {
  224. 'parent_multi_field': {
  225. 'type': 'string',
  226. 'path': 'just_name',
  227. 'fields': {
  228. 'raw': {'type': 'string', 'index': 'not_analyzed', 'index_name': 'raw_multi_field'}
  229. }
  230. },
  231. 'field_with_index_name': {
  232. 'type': 'string',
  233. 'index_name': 'custom_index_name_for_field'
  234. }
  235. }
  236. }
  237. mappings['meta_fields'] = {
  238. '_routing': {
  239. 'required': falsy(lenient_booleans)
  240. },
  241. }
  242. mappings['custom_formats'] = {
  243. 'properties': {
  244. 'string_with_custom_postings': {
  245. 'type': 'string',
  246. 'postings_format': 'Lucene41'
  247. },
  248. 'long_with_custom_doc_values': {
  249. 'type': 'long',
  250. 'doc_values_format': 'Lucene42'
  251. }
  252. }
  253. }
  254. mappings['auto_boost'] = {
  255. '_all': {
  256. 'auto_boost': truthy(lenient_booleans)
  257. }
  258. }
  259. mappings['doc'] = {'properties' : {}}
  260. if capabilities['dots_in_field_names']:
  261. if parse_version(version) < parse_version("5.0.0-alpha1"):
  262. mappings["doc"]['properties'].update({
  263. 'field.with.dots': {
  264. 'type': 'string',
  265. 'boost': 4
  266. }
  267. })
  268. else:
  269. mappings["doc"]['properties'].update({
  270. 'field.with.dots': {
  271. 'type': 'text'
  272. }
  273. })
  274. if parse_version(version) < parse_version("5.0.0-alpha1"):
  275. mappings['norms'] = {
  276. 'properties': {
  277. 'string_with_norms_disabled': {
  278. 'type': 'string',
  279. 'norms' : {
  280. 'enabled' : False
  281. }
  282. },
  283. 'string_with_norms_enabled': {
  284. 'type': 'string',
  285. 'index': 'not_analyzed',
  286. 'norms': {
  287. 'enabled' : True,
  288. 'loading': 'eager'
  289. }
  290. }
  291. }
  292. }
  293. mappings['doc'] = {
  294. 'properties': {
  295. 'string': {
  296. 'type': 'string',
  297. 'boost': 4
  298. }
  299. }
  300. }
  301. else: # current version of the norms mapping
  302. mappings['norms'] = {
  303. 'properties': {
  304. 'string_with_norms_disabled': {
  305. 'type': 'text',
  306. 'norms': False
  307. },
  308. 'string_with_norms_enabled': {
  309. 'type': 'keyword',
  310. 'index': 'not_analyzed',
  311. 'norms': True,
  312. 'eager_global_ordinals' : True
  313. }
  314. }
  315. }
  316. mappings['doc']['properties'].update({
  317. 'string': {
  318. 'type': 'text',
  319. 'boost': 4
  320. }
  321. })
  322. # test back-compat of stored binary fields
  323. mappings['doc']['properties']['binary'] = {
  324. 'type': 'binary',
  325. 'store': truthy(lenient_booleans),
  326. }
  327. settings = {
  328. 'number_of_shards': 1,
  329. 'number_of_replicas': 0,
  330. }
  331. if version.startswith('0.') or version.startswith('1.'):
  332. # Same as ES default (60 seconds), but missing the units to make sure they are inserted on upgrade:
  333. settings['gc_deletes'] = '60000',
  334. # Same as ES default (5 GB), but missing the units to make sure they are inserted on upgrade:
  335. settings['merge.policy.max_merged_segment'] = '5368709120'
  336. body = {
  337. 'settings': settings,
  338. 'mappings': mappings,
  339. }
  340. if warmers:
  341. body['warmers'] = warmers
  342. client.indices.create(index=index_name, body=body)
  343. if parse_version(version) < parse_version("5.0.0-alpha1"):
  344. health = client.cluster.health(wait_for_status='green', wait_for_relocating_shards=0)
  345. else:
  346. health = client.cluster.health(wait_for_status='green', wait_for_no_relocating_shards=True)
  347. assert health['timed_out'] == False, 'cluster health timed out %s' % health
  348. num_docs = random.randint(2000, 3000)
  349. if version == "1.1.0":
  350. # 1.1.0 is buggy and creates lots and lots of segments, so we create a
  351. # lighter index for it to keep bw tests reasonable
  352. # see https://github.com/elastic/elasticsearch/issues/5817
  353. num_docs = int(num_docs / 10)
  354. index_documents(client, index_name, 'doc', num_docs, capabilities)
  355. if parse_version(version) < parse_version('5.1.0'):
  356. logging.info("Adding a alias that can't be created in 5.1+ so we can assert that we can still use it")
  357. client.indices.put_alias(index=index_name, name='#' + index_name)
  358. logging.info('Running basic asserts on the data added')
  359. run_basic_asserts(client, version, index_name, 'doc', num_docs)
  360. return num_docs, capabilities
  361. def snapshot_index(client, version, repo_dir):
  362. persistent = {
  363. 'cluster.routing.allocation.exclude.version_attr': version
  364. }
  365. if parse_version(version) < parse_version('5.0.0-alpha1'):
  366. # Same as ES default (30 seconds), but missing the units to make sure they are inserted on upgrade:
  367. persistent['discovery.zen.publish_timeout'] = '30000'
  368. # Same as ES default (512 KB), but missing the units to make sure they are inserted on upgrade:
  369. persistent['indices.recovery.file_chunk_size'] = '524288'
  370. # Add bogus persistent settings to make sure they can be restored
  371. client.cluster.put_settings(body={
  372. 'persistent': persistent
  373. })
  374. client.indices.put_template(name='template_' + version.lower(), order=0, body={
  375. "template": "te*",
  376. "settings": {
  377. "number_of_shards" : 1
  378. },
  379. "mappings": {
  380. "type1": {
  381. "_source": {
  382. "enabled": falsy(capabilities_of(version)['lenient_booleans'])
  383. }
  384. }
  385. },
  386. "aliases": {
  387. "alias1": {},
  388. "alias2": {
  389. "filter": {
  390. "term": {"version" : version }
  391. },
  392. "routing": "kimchy"
  393. },
  394. "{index}-alias": {}
  395. }
  396. })
  397. client.snapshot.create_repository(repository='test_repo', body={
  398. 'type': 'fs',
  399. 'settings': {
  400. 'location': repo_dir
  401. }
  402. })
  403. client.snapshot.create(repository='test_repo', snapshot='test_1', wait_for_completion=True)
  404. client.snapshot.delete_repository(repository='test_repo')
  405. def compress_index(version, tmp_dir, output_dir):
  406. compress(tmp_dir, output_dir, 'index-%s.zip' % version, 'data')
  407. def compress_repo(version, tmp_dir, output_dir):
  408. compress(tmp_dir, output_dir, 'repo-%s.zip' % version, 'repo')
  409. def compress(tmp_dir, output_dir, zipfile, directory):
  410. abs_output_dir = os.path.abspath(output_dir)
  411. zipfile = os.path.join(abs_output_dir, zipfile)
  412. if os.path.exists(zipfile):
  413. os.remove(zipfile)
  414. logging.info('Compressing index into %s, tmpDir %s', zipfile, tmp_dir)
  415. olddir = os.getcwd()
  416. os.chdir(tmp_dir)
  417. subprocess.check_call('zip -r %s %s' % (zipfile, directory), shell=True)
  418. os.chdir(olddir)
  419. def parse_config():
  420. parser = argparse.ArgumentParser(description='Builds an elasticsearch index for backwards compatibility tests')
  421. required = parser.add_mutually_exclusive_group(required=True)
  422. required.add_argument('versions', metavar='X.Y.Z', nargs='*', default=[],
  423. help='The elasticsearch version to build an index for')
  424. required.add_argument('--all', action='store_true', default=False,
  425. help='Recreate all existing backwards compatibility indexes')
  426. parser.add_argument('--releases-dir', '-d', default='backwards', metavar='DIR',
  427. help='The directory containing elasticsearch releases')
  428. parser.add_argument('--output-dir', '-o', default='core/src/test/resources/indices/bwc',
  429. help='The directory to write the zipped index into')
  430. parser.add_argument('--tcp-port', default=DEFAULT_TRANSPORT_TCP_PORT, type=int,
  431. help='The port to use as the minimum port for TCP communication')
  432. parser.add_argument('--http-port', default=DEFAULT_HTTP_TCP_PORT, type=int,
  433. help='The port to use as the minimum port for HTTP communication')
  434. cfg = parser.parse_args()
  435. if not os.path.exists(cfg.output_dir):
  436. parser.error('Output directory does not exist: %s' % cfg.output_dir)
  437. if not cfg.versions:
  438. # --all
  439. for bwc_index in glob.glob(os.path.join(cfg.output_dir, 'index-*.zip')):
  440. version = os.path.basename(bwc_index)[len('index-'):-len('.zip')]
  441. cfg.versions.append(version)
  442. return cfg
  443. def create_bwc_index(cfg, version):
  444. logging.info('--> Creating bwc index for %s' % version)
  445. release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % version)
  446. if not os.path.exists(release_dir):
  447. raise RuntimeError('ES version %s does not exist in %s' % (version, cfg.releases_dir))
  448. snapshot_supported = not (version.startswith('0.') or version == '1.0.0.Beta1')
  449. tmp_dir = tempfile.mkdtemp()
  450. data_dir = os.path.join(tmp_dir, 'data')
  451. repo_dir = os.path.join(tmp_dir, 'repo')
  452. logging.info('Temp data dir: %s' % data_dir)
  453. logging.info('Temp repo dir: %s' % repo_dir)
  454. node = None
  455. try:
  456. node = start_node(version, release_dir, data_dir, repo_dir, cfg.tcp_port, cfg.http_port)
  457. client = create_client(cfg.http_port)
  458. index_name = 'index-%s' % version.lower()
  459. num_docs, capabilities = generate_index(client, version, index_name)
  460. if snapshot_supported:
  461. snapshot_index(client, version, repo_dir)
  462. # 10067: get a delete-by-query into the translog on upgrade. We must do
  463. # this after the snapshot, because it calls flush. Otherwise the index
  464. # will already have the deletions applied on upgrade.
  465. if version.startswith('0.') or version.startswith('1.'):
  466. delete_by_query(client, version, index_name, 'doc')
  467. reindex_docs(client, index_name, 'doc', min(100, num_docs), capabilities)
  468. shutdown_node(node)
  469. node = None
  470. compress_index(version, tmp_dir, cfg.output_dir)
  471. if snapshot_supported:
  472. compress_repo(version, tmp_dir, cfg.output_dir)
  473. finally:
  474. if node is not None:
  475. # This only happens if we've hit an exception:
  476. shutdown_node(node)
  477. shutil.rmtree(tmp_dir)
  478. def shutdown_node(node):
  479. logging.info('Shutting down node with pid %d', node.pid)
  480. node.kill() # don't use terminate otherwise we flush the translog
  481. node.wait()
  482. def parse_version(version):
  483. import re
  484. splitted = re.split('[.-]', version)
  485. if len(splitted) == 3:
  486. splitted = splitted + ['GA']
  487. splitted = [s.lower() for s in splitted]
  488. assert len(splitted) == 4;
  489. return splitted
  490. assert parse_version('5.0.0-alpha1') == parse_version('5.0.0-alpha1')
  491. assert parse_version('5.0.0-alpha1') < parse_version('5.0.0-alpha2')
  492. assert parse_version('5.0.0-alpha1') < parse_version('5.0.0-beta1')
  493. assert parse_version('5.0.0-beta1') < parse_version('5.0.0')
  494. assert parse_version('1.2.3') < parse_version('2.1.0')
  495. assert parse_version('1.2.3') < parse_version('1.2.4')
  496. assert parse_version('1.1.0') < parse_version('1.2.0')
  497. def main():
  498. logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
  499. datefmt='%Y-%m-%d %I:%M:%S %p')
  500. logging.getLogger('elasticsearch').setLevel(logging.ERROR)
  501. logging.getLogger('urllib3').setLevel(logging.WARN)
  502. cfg = parse_config()
  503. for version in cfg.versions:
  504. create_bwc_index(cfg, version)
  505. if __name__ == '__main__':
  506. try:
  507. main()
  508. except KeyboardInterrupt:
  509. print('Caught keyboard interrupt, exiting...')