|
@@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|
|
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
|
|
import org.apache.lucene.util.CollectionUtil;
|
|
|
import org.elasticsearch.ExceptionsHelper;
|
|
|
+import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.search.ShardSearchFailure;
|
|
|
import org.elasticsearch.action.support.IndicesOptions;
|
|
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
|
@@ -44,6 +45,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
+import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
@@ -366,14 +368,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
|
|
|
@Override
|
|
|
public void onFailure(String source, Throwable t) {
|
|
|
logger.warn("[{}] failed to create snapshot", t, snapshot.snapshotId());
|
|
|
- removeSnapshotFromClusterState(snapshot.snapshotId(), null, t);
|
|
|
- try {
|
|
|
- repositoriesService.repository(snapshot.snapshotId().getRepository()).finalizeSnapshot(
|
|
|
- snapshot.snapshotId(), snapshot.indices(), snapshot.startTime(), ExceptionsHelper.detailedMessage(t), 0, Collections.<SnapshotShardFailure>emptyList());
|
|
|
- } catch (Throwable t2) {
|
|
|
- logger.warn("[{}] failed to close snapshot in repository", snapshot.snapshotId());
|
|
|
- }
|
|
|
- userCreateSnapshotListener.onFailure(t);
|
|
|
+ removeSnapshotFromClusterState(snapshot.snapshotId(), null, t, new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, t));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -395,17 +390,46 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
|
|
|
});
|
|
|
} catch (Throwable t) {
|
|
|
logger.warn("failed to create snapshot [{}]", t, snapshot.snapshotId());
|
|
|
- removeSnapshotFromClusterState(snapshot.snapshotId(), null, t);
|
|
|
- if (snapshotCreated) {
|
|
|
+ removeSnapshotFromClusterState(snapshot.snapshotId(), null, t, new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, t));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class CleanupAfterErrorListener implements ActionListener<SnapshotInfo> {
|
|
|
+
|
|
|
+ private final SnapshotsInProgress.Entry snapshot;
|
|
|
+ private final boolean snapshotCreated;
|
|
|
+ private final CreateSnapshotListener userCreateSnapshotListener;
|
|
|
+ private final Throwable t;
|
|
|
+
|
|
|
+ public CleanupAfterErrorListener(SnapshotsInProgress.Entry snapshot, boolean snapshotCreated, CreateSnapshotListener userCreateSnapshotListener, Throwable t) {
|
|
|
+ this.snapshot = snapshot;
|
|
|
+ this.snapshotCreated = snapshotCreated;
|
|
|
+ this.userCreateSnapshotListener = userCreateSnapshotListener;
|
|
|
+ this.t = t;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onResponse(SnapshotInfo snapshotInfo) {
|
|
|
+ cleanupAfterError();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Throwable e) {
|
|
|
+ cleanupAfterError();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void cleanupAfterError() {
|
|
|
+ if(snapshotCreated) {
|
|
|
try {
|
|
|
- repositoriesService.repository(snapshot.snapshotId().getRepository()).finalizeSnapshot(snapshot.snapshotId(), snapshot.indices(), snapshot.startTime(),
|
|
|
- ExceptionsHelper.detailedMessage(t), 0, Collections.<SnapshotShardFailure>emptyList());
|
|
|
+ repositoriesService.repository(snapshot.snapshotId().getRepository()).finalizeSnapshot(
|
|
|
+ snapshot.snapshotId(), snapshot.indices(), snapshot.startTime(), ExceptionsHelper.detailedMessage(t), 0, Collections.<SnapshotShardFailure>emptyList());
|
|
|
} catch (Throwable t2) {
|
|
|
logger.warn("[{}] failed to close snapshot in repository", snapshot.snapshotId());
|
|
|
}
|
|
|
}
|
|
|
userCreateSnapshotListener.onFailure(t);
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
private SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) {
|
|
@@ -818,6 +842,19 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
|
|
|
* @param t exception if snapshot failed
|
|
|
*/
|
|
|
private void removeSnapshotFromClusterState(final SnapshotId snapshotId, final SnapshotInfo snapshot, final Throwable t) {
|
|
|
+ removeSnapshotFromClusterState(snapshotId, snapshot, t, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Removes record of running snapshot from cluster state and notifies the listener when this action is complete
|
|
|
+ *
|
|
|
+ * @param snapshotId snapshot id
|
|
|
+ * @param snapshot snapshot info if snapshot was successful
|
|
|
+ * @param t exception if snapshot failed
|
|
|
+ * @param listener listener to notify when snapshot information is removed from the cluster state
|
|
|
+ */
|
|
|
+ private void removeSnapshotFromClusterState(final SnapshotId snapshotId, final SnapshotInfo snapshot, final Throwable t,
|
|
|
+ @Nullable ActionListener<SnapshotInfo> listener) {
|
|
|
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
|
|
|
@Override
|
|
|
public ClusterState execute(ClusterState currentState) {
|
|
@@ -843,6 +880,9 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
|
|
|
@Override
|
|
|
public void onFailure(String source, Throwable t) {
|
|
|
logger.warn("[{}] failed to remove snapshot metadata", t, snapshotId);
|
|
|
+ if (listener != null) {
|
|
|
+ listener.onFailure(t);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -858,7 +898,9 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
|
|
|
logger.warn("failed to notify listener [{}]", t, listener);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+ if (listener != null) {
|
|
|
+ listener.onResponse(snapshot);
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
}
|