|
@@ -57,22 +57,29 @@ public final class CcrRequests {
|
|
|
}
|
|
|
client.admin().cluster().state(request, ActionListener.wrap(
|
|
|
response -> {
|
|
|
- if (response.getState() == null) {
|
|
|
+ if (response.getState() == null) { // timeout on wait_for_metadata_version
|
|
|
assert metadataVersion > 0 : metadataVersion;
|
|
|
- throw new IllegalStateException("timeout to get cluster state with" +
|
|
|
- " metadata version [" + metadataVersion + "], mapping version [" + mappingVersion + "]");
|
|
|
+ if (timeoutSupplier.get().nanos() < 0) {
|
|
|
+ listener.onFailure(new IllegalStateException("timeout to get cluster state with" +
|
|
|
+ " metadata version [" + metadataVersion + "], mapping version [" + mappingVersion + "]"));
|
|
|
+ } else {
|
|
|
+ getIndexMetadata(client, index, mappingVersion, metadataVersion, timeoutSupplier, listener);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ final MetaData metaData = response.getState().metaData();
|
|
|
+ final IndexMetaData indexMetaData = metaData.getIndexSafe(index);
|
|
|
+ if (indexMetaData.getMappingVersion() >= mappingVersion) {
|
|
|
+ listener.onResponse(indexMetaData);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (timeoutSupplier.get().nanos() < 0) {
|
|
|
+ listener.onFailure(new IllegalStateException(
|
|
|
+ "timeout to get cluster state with mapping version [" + mappingVersion + "]"));
|
|
|
+ } else {
|
|
|
+ // ask for the next version.
|
|
|
+ getIndexMetadata(client, index, mappingVersion, metaData.version() + 1, timeoutSupplier, listener);
|
|
|
+ }
|
|
|
}
|
|
|
- final MetaData metaData = response.getState().metaData();
|
|
|
- final IndexMetaData indexMetaData = metaData.getIndexSafe(index);
|
|
|
- if (indexMetaData.getMappingVersion() >= mappingVersion) {
|
|
|
- listener.onResponse(indexMetaData);
|
|
|
- return;
|
|
|
- }
|
|
|
- if (timeoutSupplier.get().nanos() < 0) {
|
|
|
- throw new IllegalStateException("timeout to get cluster state with mapping version [" + mappingVersion + "]");
|
|
|
- }
|
|
|
- // ask for the next version.
|
|
|
- getIndexMetadata(client, index, mappingVersion, metaData.version() + 1, timeoutSupplier, listener);
|
|
|
},
|
|
|
listener::onFailure
|
|
|
));
|