upgrade-tests.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. # Licensed to Elasticsearch under one or more contributor
  2. # license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright
  4. # ownership. Elasticsearch licenses this file to you under
  5. # the Apache License, Version 2.0 (the "License"); you may
  6. # not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on
  13. # an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  14. # either express or implied. See the License for the specific
  15. # language governing permissions and limitations under the License.
  16. import random
  17. import os
  18. import tempfile
  19. import shutil
  20. import subprocess
  21. import time
  22. import argparse
  23. import logging
  24. import sys
  25. import re
  26. from datetime import datetime
  27. try:
  28. from elasticsearch import Elasticsearch
  29. from elasticsearch.exceptions import ConnectionError
  30. from elasticsearch.exceptions import TransportError
  31. except ImportError as e:
  32. print('Can\'t import elasticsearch please install `sudo pip install elasticsearch`')
  33. raise e
  34. '''This file executes a basic upgrade test by running a full cluster restart.
  35. The upgrade test starts 2 or more nodes of an old elasticserach version, indexes
  36. a random number of documents into the running nodes and executes a full cluster restart.
  37. After the nodes are recovered a small set of basic checks are executed to ensure all
  38. documents are still searchable and field data can be loaded etc.
  39. NOTE: This script requires the elasticsearch python client `elasticsearch-py` run the following command to install:
  40. `sudo pip install elasticsearch`
  41. if you are running python3 you need to install the client using pip3. On OSX `pip3` will be included in the Python 3.4
  42. release available on `https://www.python.org/download/`:
  43. `sudo pip3 install elasticsearch`
  44. See `https://github.com/elasticsearch/elasticsearch-py` for details
  45. In order to run this test two different version of elasticsearch are required. Both need to be unpacked into
  46. the same directory:
  47. ```
  48. $ cd /path/to/elasticsearch/clone
  49. $ mkdir backwards && cd backwards
  50. $ wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.3.1.tar.gz
  51. $ wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-0.90.13.tar.gz
  52. $ tar -zxvf elasticsearch-1.3.1.tar.gz && tar -zxvf elasticsearch-0.90.13.tar.gz
  53. $ cd ..
  54. $ python dev-tools/upgrade-tests.py --version.backwards 0.90.13 --version.current 1.3.1
  55. ```
  56. '''
  57. BLACK_LIST = {'1.2.0' : { 'reason': 'Contains a major bug where routing hashes are not consistent with previous version',
  58. 'issue': 'https://github.com/elasticsearch/elasticsearch/pull/6393'},
  59. '1.3.0' : { 'reason': 'Lucene Related bug prevents upgrades from 0.90.7 and some earlier versions ',
  60. 'issue' : 'https://github.com/elasticsearch/elasticsearch/pull/7055'}}
  61. # sometimes returns True
  62. def rarely():
  63. return random.randint(0, 10) == 0
  64. # usually returns True
  65. def frequently():
  66. return not rarely()
  67. # asserts the correctness of the given hits given they are sorted asc
  68. def assert_sort(hits):
  69. values = [hit['sort'] for hit in hits['hits']['hits']]
  70. assert len(values) > 0, 'expected non emtpy result'
  71. val = min(values)
  72. for x in values:
  73. assert x >= val, '%s >= %s' % (x, val)
  74. val = x
  75. # asserts that the cluster health didn't timeout etc.
  76. def assert_health(cluster_health, num_shards, num_replicas):
  77. assert cluster_health['timed_out'] == False, 'cluster health timed out %s' % cluster_health
  78. # Starts a new elasticsearch node from a released & untared version.
  79. # This node uses unicast discovery with the provided unicast host list and starts
  80. # the nodes with the given data directory. This allows shutting down and starting up
  81. # nodes on the same data dir simulating a full cluster restart.
  82. def start_node(version, data_dir, node_dir, unicast_host_list, tcp_port, http_port):
  83. es_run_path = os.path.join(node_dir, 'elasticsearch-%s' % (version), 'bin/elasticsearch')
  84. if version.startswith('0.90.'):
  85. foreground = '-f' # 0.90.x starts in background automatically
  86. else:
  87. foreground = ''
  88. return subprocess.Popen([es_run_path,
  89. '-Des.path.data=%s' % data_dir, '-Des.cluster.name=upgrade_test',
  90. '-Des.discovery.zen.ping.unicast.hosts=%s' % unicast_host_list,
  91. '-Des.transport.tcp.port=%s' % tcp_port,
  92. '-Des.http.port=%s' % http_port,
  93. foreground], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  94. # Indexes the given number of document into the given index
  95. # and randomly runs refresh, optimize and flush commands
  96. def index_documents(es, index_name, type, num_docs):
  97. logging.info('Indexing %s docs' % num_docs)
  98. for id in range(0, num_docs):
  99. es.index(index=index_name, doc_type=type, id=id, body={'string': str(random.randint(0, 100)),
  100. 'long_sort': random.randint(0, 100),
  101. 'double_sort' : float(random.randint(0, 100))})
  102. if rarely():
  103. es.indices.refresh(index=index_name)
  104. if rarely():
  105. es.indices.flush(index=index_name, force=frequently())
  106. if rarely():
  107. es.indices.optimize(index=index_name)
  108. es.indices.refresh(index=index_name)
  109. # Runs a basic number of assertions including:
  110. # - document counts
  111. # - match all search with sort on double / long
  112. # - Realtime GET operations
  113. # TODO(simonw): we should add stuff like:
  114. # - dates including sorting
  115. # - string sorting
  116. # - docvalues if available
  117. # - global ordinal if available
  118. def run_basic_asserts(es, index_name, type, num_docs):
  119. count = es.count(index=index_name)['count']
  120. assert count == num_docs, 'Expected %r but got %r documents' % (num_docs, count)
  121. for _ in range(0, num_docs):
  122. random_doc_id = random.randint(0, num_docs-1)
  123. doc = es.get(index=index_name, doc_type=type, id=random_doc_id)
  124. assert doc, 'Expected document for id %s but got %s' % (random_doc_id, doc)
  125. assert_sort(es.search(index=index_name,
  126. body={
  127. 'sort': [
  128. {'double_sort': {'order': 'asc'}}
  129. ]
  130. }))
  131. assert_sort(es.search(index=index_name,
  132. body={
  133. 'sort': [
  134. {'long_sort': {'order': 'asc'}}
  135. ]
  136. }))
  137. # picks a random version or and entire random version tuple from the directory
  138. # to run the backwards tests against.
  139. def pick_random_upgrade_version(directory, lower_version=None, upper_version=None):
  140. if lower_version and upper_version:
  141. return lower_version, upper_version
  142. assert os.path.isdir(directory), 'No such directory %s' % directory
  143. versions = []
  144. for version in map(lambda x : x[len('elasticsearch-'):], filter(lambda x : re.match(r'^elasticsearch-\d+[.]\d+[.]\d+$', x), os.listdir(directory))):
  145. if not version in BLACK_LIST:
  146. versions.append(build_tuple(version))
  147. versions.sort()
  148. if lower_version: # lower version is set - picking a higher one
  149. versions = filter(lambda x : x > build_tuple(lower_version), versions)
  150. assert len(versions) >= 1, 'Expected at least 1 higher version than %s version in %s ' % (lower_version, directory)
  151. random.shuffle(versions)
  152. return lower_version, build_version(versions[0])
  153. if upper_version:
  154. versions = filter(lambda x : x < build_tuple(upper_version), versions)
  155. assert len(versions) >= 1, 'Expected at least 1 lower version than %s version in %s ' % (upper_version, directory)
  156. random.shuffle(versions)
  157. return build_version(versions[0]), upper_version
  158. assert len(versions) >= 2, 'Expected at least 2 different version in %s but found %s' % (directory, len(versions))
  159. random.shuffle(versions)
  160. versions = versions[0:2]
  161. versions.sort()
  162. return build_version(versions[0]), build_version(versions[1])
  163. def build_version(version_tuple):
  164. return '.'.join([str(x) for x in version_tuple])
  165. def build_tuple(version_string):
  166. return [int(x) for x in version_string.split('.')]
  167. # returns a new elasticsearch client and ensures the all nodes have joined the cluster
  168. # this method waits at most 30 seconds for all nodes to join
  169. def new_es_instance(num_nodes, http_port, timeout = 30):
  170. logging.info('Waiting for %s nodes to join the cluster' % num_nodes)
  171. for _ in range(0, timeout):
  172. # TODO(simonw): ask Honza if there is a better way to do this?
  173. try:
  174. es = Elasticsearch([
  175. {'host': '127.0.0.1', 'port': http_port + x}
  176. for x in range(0, num_nodes)])
  177. es.cluster.health(wait_for_nodes=num_nodes)
  178. es.count() # can we actually search or do we get a 503? -- anyway retry
  179. return es
  180. except (ConnectionError, TransportError):
  181. pass
  182. time.sleep(1)
  183. assert False, 'Timed out waiting for %s nodes for %s seconds' % (num_nodes, timeout)
  184. def assert_versions(bwc_version, current_version, node_dir):
  185. assert [int(x) for x in bwc_version.split('.')] < [int(x) for x in current_version.split('.')],\
  186. '[%s] must be < than [%s]' % (bwc_version, current_version)
  187. for version in [bwc_version, current_version]:
  188. assert not version in BLACK_LIST, 'Version %s is blacklisted - %s, see %s' \
  189. % (version, BLACK_LIST[version]['reason'],
  190. BLACK_LIST[version]['issue'])
  191. dir = os.path.join(node_dir, 'elasticsearch-%s' % current_version)
  192. assert os.path.isdir(dir), 'Expected elasticsearch-%s install directory does not exists: %s' % (version, dir)
  193. def full_cluster_restart(node_dir, current_version, bwc_version, tcp_port, http_port):
  194. assert_versions(bwc_version, current_version, node_dir)
  195. num_nodes = random.randint(2, 3)
  196. nodes = []
  197. data_dir = tempfile.mkdtemp()
  198. logging.info('Running upgrade test from [%s] to [%s] seed: [%s] es.path.data: [%s] es.http.port [%s] es.tcp.port [%s]'
  199. % (bwc_version, current_version, seed, data_dir, http_port, tcp_port))
  200. try:
  201. logging.info('Starting %s BWC nodes of version %s' % (num_nodes, bwc_version))
  202. unicast_addresses = ','.join(['127.0.0.1:%s' % (tcp_port+x) for x in range(0, num_nodes)])
  203. for id in range(0, num_nodes):
  204. nodes.append(start_node(bwc_version, data_dir, node_dir, unicast_addresses, tcp_port+id, http_port+id))
  205. es = new_es_instance(num_nodes, http_port)
  206. es.indices.delete(index='test_index', ignore=404)
  207. num_shards = random.randint(1, 10)
  208. num_replicas = random.randint(0, 1)
  209. logging.info('Create index with [%s] shards and [%s] replicas' % (num_shards, num_replicas))
  210. es.indices.create(index='test_index', body={
  211. # TODO(simonw): can we do more here in terms of randomization - seems hard due to all the different version
  212. 'settings': {
  213. 'number_of_shards': num_shards,
  214. 'number_of_replicas': num_replicas
  215. }
  216. })
  217. logging.info('Nodes joined, waiting for green status')
  218. health = es.cluster.health(wait_for_status='green', wait_for_relocating_shards=0)
  219. assert_health(health, num_shards, num_replicas)
  220. num_docs = random.randint(10, 100)
  221. index_documents(es, 'test_index', 'test_type', num_docs)
  222. logging.info('Run basic asserts before full cluster restart')
  223. run_basic_asserts(es, 'test_index', 'test_type', num_docs)
  224. logging.info('kill bwc nodes -- prepare upgrade')
  225. for node in nodes:
  226. node.terminate()
  227. # now upgrade the nodes and rerun the checks
  228. tcp_port = tcp_port + len(nodes) # bump up port to make sure we can claim them
  229. http_port = http_port + len(nodes)
  230. logging.info('Full Cluster restart starts upgrading to version [elasticsearch-%s] es.http.port [%s] es.tcp.port [%s]'
  231. % (current_version, http_port, tcp_port))
  232. nodes = []
  233. unicast_addresses = ','.join(['127.0.0.1:%s' % (tcp_port+x) for x in range(0, num_nodes)])
  234. for id in range(0, num_nodes+1): # one more to trigger relocation
  235. nodes.append(start_node(current_version, data_dir, node_dir, unicast_addresses, tcp_port+id, http_port+id))
  236. es = new_es_instance(num_nodes+1, http_port)
  237. logging.info('Nodes joined, waiting for green status')
  238. health = es.cluster.health(wait_for_status='green', wait_for_relocating_shards=0)
  239. assert_health(health, num_shards, num_replicas)
  240. run_basic_asserts(es, 'test_index', 'test_type', num_docs)
  241. # by running the indexing again we try to catch possible mapping problems after the upgrade
  242. index_documents(es, 'test_index', 'test_type', num_docs)
  243. run_basic_asserts(es, 'test_index', 'test_type', num_docs)
  244. logging.info("[SUCCESS] - all test passed upgrading from version [%s] to version [%s]" % (bwc_version, current_version))
  245. finally:
  246. for node in nodes:
  247. node.terminate()
  248. time.sleep(1) # wait a second until removing the data dirs to give the nodes a chance to shutdown
  249. shutil.rmtree(data_dir) # remove the temp data dir
  250. if __name__ == '__main__':
  251. logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
  252. datefmt='%Y-%m-%d %I:%M:%S %p')
  253. logging.getLogger('elasticsearch').setLevel(logging.ERROR)
  254. logging.getLogger('urllib3').setLevel(logging.WARN)
  255. parser = argparse.ArgumentParser(description='Tests Full Cluster Restarts across major version')
  256. parser.add_argument('--version.backwards', '-b', dest='backwards_version', metavar='V',
  257. help='The elasticsearch version to upgrade from')
  258. parser.add_argument('--version.current', '-c', dest='current_version', metavar='V',
  259. help='The elasticsearch version to upgrade to')
  260. parser.add_argument('--seed', '-s', dest='seed', metavar='N', type=int,
  261. help='The random seed to use')
  262. parser.add_argument('--backwards.dir', '-d', dest='bwc_directory', default='backwards', metavar='dir',
  263. help='The directory to the backwards compatibility sources')
  264. parser.add_argument('--tcp.port', '-p', dest='tcp_port', default=9300, metavar='port', type=int,
  265. help='The port to use as the minimum port for TCP communication')
  266. parser.add_argument('--http.port', '-t', dest='http_port', default=9200, metavar='port', type=int,
  267. help='The port to use as the minimum port for HTTP communication')
  268. parser.set_defaults(bwc_directory='backwards')
  269. parser.set_defaults(seed=int(time.time()))
  270. args = parser.parse_args()
  271. node_dir = args.bwc_directory
  272. current_version = args.current_version
  273. bwc_version = args.backwards_version
  274. seed = args.seed
  275. random.seed(seed)
  276. bwc_version, current_version = pick_random_upgrade_version(node_dir, bwc_version, current_version)
  277. tcp_port = args.tcp_port
  278. http_port = args.http_port
  279. try:
  280. full_cluster_restart(node_dir, current_version, bwc_version, tcp_port, http_port)
  281. except:
  282. logging.warn('REPRODUCE WITH: \n\t`python %s --version.backwards %s --version.current %s --seed %s --tcp.port %s --http.port %s`'
  283. % (sys.argv[0], bwc_version, current_version, seed, tcp_port, http_port))
  284. raise