|
|
@@ -23,11 +23,11 @@ import com.google.gson.Gson;
|
|
|
import io.milvus.bulkwriter.common.clientenum.ConnectType;
|
|
|
import io.milvus.bulkwriter.common.utils.FileUtils;
|
|
|
import io.milvus.bulkwriter.model.UploadFilesResult;
|
|
|
-import io.milvus.bulkwriter.request.stage.ApplyStageRequest;
|
|
|
-import io.milvus.bulkwriter.request.stage.UploadFilesRequest;
|
|
|
+import io.milvus.bulkwriter.request.volume.ApplyVolumeRequest;
|
|
|
+import io.milvus.bulkwriter.request.volume.UploadFilesRequest;
|
|
|
import io.milvus.bulkwriter.resolver.EndpointResolver;
|
|
|
-import io.milvus.bulkwriter.response.ApplyStageResponse;
|
|
|
-import io.milvus.bulkwriter.restful.DataStageUtils;
|
|
|
+import io.milvus.bulkwriter.response.ApplyVolumeResponse;
|
|
|
+import io.milvus.bulkwriter.restful.DataVolumeUtils;
|
|
|
import io.milvus.bulkwriter.storage.StorageClient;
|
|
|
import io.milvus.bulkwriter.storage.client.MinioStorageClient;
|
|
|
import io.milvus.exception.ParamException;
|
|
|
@@ -46,30 +46,30 @@ import java.util.concurrent.*;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
-public class StageFileManager {
|
|
|
- private static final Logger logger = LoggerFactory.getLogger(StageFileManager.class);
|
|
|
+public class VolumeFileManager {
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(VolumeFileManager.class);
|
|
|
private final String cloudEndpoint;
|
|
|
private final String apiKey;
|
|
|
- private final String stageName;
|
|
|
+ private final String volumeName;
|
|
|
private final ConnectType connectType;
|
|
|
private final ExecutorService executor;
|
|
|
|
|
|
private StorageClient storageClient;
|
|
|
- private ApplyStageResponse applyStageResponse;
|
|
|
+ private ApplyVolumeResponse applyVolumeResponse;
|
|
|
|
|
|
- public StageFileManager(StageFileManagerParam stageWriterParam) {
|
|
|
- this.cloudEndpoint = stageWriterParam.getCloudEndpoint();
|
|
|
- this.apiKey = stageWriterParam.getApiKey();
|
|
|
- this.stageName = stageWriterParam.getStageName();
|
|
|
- this.connectType = stageWriterParam.getConnectType();
|
|
|
+ public VolumeFileManager(VolumeFileManagerParam volumeFileManagerParam) {
|
|
|
+ this.cloudEndpoint = volumeFileManagerParam.getCloudEndpoint();
|
|
|
+ this.apiKey = volumeFileManagerParam.getApiKey();
|
|
|
+ this.volumeName = volumeFileManagerParam.getVolumeName();
|
|
|
+ this.connectType = volumeFileManagerParam.getConnectType();
|
|
|
this.executor = Executors.newFixedThreadPool(10);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Asynchronously uploads a local file or directory to the specified path within the Stage.
|
|
|
+ * Asynchronously uploads a local file or directory to the specified path within the Volume.
|
|
|
*
|
|
|
* @param request the upload request containing the source local file or directory path
|
|
|
- * and the target directory path in the Stage {@link UploadFilesRequest}
|
|
|
+ * and the target directory path in the Volume {@link UploadFilesRequest}
|
|
|
* @return a {@link CompletableFuture} that completes with an {@link UploadFilesResult}
|
|
|
* once all files have been uploaded successfully
|
|
|
* @throws CompletionException if an error occurs during the upload process
|
|
|
@@ -77,9 +77,9 @@ public class StageFileManager {
|
|
|
public CompletableFuture<UploadFilesResult> uploadFilesAsync(UploadFilesRequest request) {
|
|
|
String localDirOrFilePath = request.getSourceFilePath();
|
|
|
Pair<List<String>, Long> localPathPair = FileUtils.processLocalPath(localDirOrFilePath);
|
|
|
- String stagePath = convertDirPath(request.getTargetStagePath());
|
|
|
+ String volumePath = convertDirPath(request.getTargetVolumePath());
|
|
|
|
|
|
- refreshStageAndClient(stagePath);
|
|
|
+ refreshVolumeAndClient(volumePath);
|
|
|
initValidator(localPathPair);
|
|
|
|
|
|
AtomicInteger currentFileCount = new AtomicInteger(0);
|
|
|
@@ -94,7 +94,7 @@ public class StageFileManager {
|
|
|
long fileStartTime = System.currentTimeMillis();
|
|
|
|
|
|
try {
|
|
|
- uploadLocalFileToStage(localFilePath, localDirOrFilePath, stagePath);
|
|
|
+ uploadLocalFileToVolume(localFilePath, localDirOrFilePath, volumePath);
|
|
|
long bytes = processedBytes.addAndGet(file.length());
|
|
|
int completeCount = currentFileCount.incrementAndGet();
|
|
|
long elapsed = System.currentTimeMillis() - fileStartTime;
|
|
|
@@ -110,11 +110,11 @@ public class StageFileManager {
|
|
|
})
|
|
|
.thenApply(v -> {
|
|
|
long totalElapsed = (System.currentTimeMillis() - startTime) / 1000;
|
|
|
- logger.info("all files in {} has been async uploaded to stage, stageName:{}, stagePath:{}, totalFileCount:{}, totalFileSize:{}, cost times:{} s",
|
|
|
- localDirOrFilePath, applyStageResponse.getStageName(), stagePath, localPathPair.getKey().size(), localPathPair.getValue(), totalElapsed);
|
|
|
+ logger.info("all files in {} has been async uploaded to volume, volumeName:{}, volumePath:{}, totalFileCount:{}, totalFileSize:{}, cost times:{} s",
|
|
|
+ localDirOrFilePath, applyVolumeResponse.getVolumeName(), volumePath, localPathPair.getKey().size(), localPathPair.getValue(), totalElapsed);
|
|
|
return UploadFilesResult.builder()
|
|
|
- .stageName(applyStageResponse.getStageName())
|
|
|
- .path(stagePath)
|
|
|
+ .volumeName(applyVolumeResponse.getVolumeName())
|
|
|
+ .path(volumePath)
|
|
|
.build();
|
|
|
});
|
|
|
}
|
|
|
@@ -129,7 +129,7 @@ public class StageFileManager {
|
|
|
* <p>
|
|
|
* Usage recommendation:
|
|
|
* <ul>
|
|
|
- * <li>Call this method when the StageFileManager is no longer needed.</li>
|
|
|
+ * <li>Call this method when the VolumeFileManager is no longer needed.</li>
|
|
|
* </ul>
|
|
|
* <p>
|
|
|
* Thread interruption is respected, and the interrupt status is restored if interrupted during shutdown.
|
|
|
@@ -149,33 +149,33 @@ public class StageFileManager {
|
|
|
}
|
|
|
|
|
|
private void initValidator(Pair<List<String>, Long> localPathPair) {
|
|
|
- if (localPathPair.getValue() > applyStageResponse.getCondition().getMaxContentLength()) {
|
|
|
- String msg = String.format("localFileTotalSize %s exceeds the maximum contentLength limit %s defined in the condition. If you want to upload larger files, please contact us to lift the restriction", localPathPair.getValue(), applyStageResponse.getCondition().getMaxContentLength());
|
|
|
+ if (localPathPair.getValue() > applyVolumeResponse.getCondition().getMaxContentLength()) {
|
|
|
+ String msg = String.format("localFileTotalSize %s exceeds the maximum contentLength limit %s defined in the condition. If you want to upload larger files, please contact us to lift the restriction", localPathPair.getValue(), applyVolumeResponse.getCondition().getMaxContentLength());
|
|
|
logger.error(msg);
|
|
|
throw new ParamException(msg);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void refreshStageAndClient(String path) {
|
|
|
- logger.info("refreshing Stage info...");
|
|
|
- ApplyStageRequest applyStageRequest = ApplyStageRequest.builder()
|
|
|
+ private void refreshVolumeAndClient(String path) {
|
|
|
+ logger.info("refreshing Volume info...");
|
|
|
+ ApplyVolumeRequest applyVolumeRequest = ApplyVolumeRequest.builder()
|
|
|
.apiKey(apiKey)
|
|
|
- .stageName(stageName)
|
|
|
+ .volumeName(volumeName)
|
|
|
.path(path)
|
|
|
.build();
|
|
|
- String result = DataStageUtils.applyStage(cloudEndpoint, applyStageRequest);
|
|
|
- applyStageResponse = new Gson().fromJson(result, ApplyStageResponse.class);
|
|
|
- logger.info("stage info refreshed");
|
|
|
+ String result = DataVolumeUtils.applyVolume(cloudEndpoint, applyVolumeRequest);
|
|
|
+ applyVolumeResponse = new Gson().fromJson(result, ApplyVolumeResponse.class);
|
|
|
+ logger.info("volume info refreshed");
|
|
|
|
|
|
- String endpoint = EndpointResolver.resolveEndpoint(applyStageResponse.getEndpoint(), applyStageResponse.getCloud(),
|
|
|
- applyStageResponse.getRegion(), connectType);
|
|
|
+ String endpoint = EndpointResolver.resolveEndpoint(applyVolumeResponse.getEndpoint(), applyVolumeResponse.getCloud(),
|
|
|
+ applyVolumeResponse.getRegion(), connectType);
|
|
|
storageClient = MinioStorageClient.getStorageClient(
|
|
|
- applyStageResponse.getCloud(),
|
|
|
+ applyVolumeResponse.getCloud(),
|
|
|
endpoint,
|
|
|
- applyStageResponse.getCredentials().getTmpAK(),
|
|
|
- applyStageResponse.getCredentials().getTmpSK(),
|
|
|
- applyStageResponse.getCredentials().getSessionToken(),
|
|
|
- applyStageResponse.getRegion(), null);
|
|
|
+ applyVolumeResponse.getCredentials().getTmpAK(),
|
|
|
+ applyVolumeResponse.getCredentials().getTmpSK(),
|
|
|
+ applyVolumeResponse.getCredentials().getSessionToken(),
|
|
|
+ applyVolumeResponse.getRegion(), null);
|
|
|
logger.info("storage client refreshed");
|
|
|
}
|
|
|
|
|
|
@@ -189,7 +189,7 @@ public class StageFileManager {
|
|
|
return inputPath + "/";
|
|
|
}
|
|
|
|
|
|
- private void uploadLocalFileToStage(String localFilePath, String rootPath, String stagePath) {
|
|
|
+ private void uploadLocalFileToVolume(String localFilePath, String rootPath, String volumePath) {
|
|
|
File file = new File(localFilePath);
|
|
|
Path filePath = file.toPath().toAbsolutePath();
|
|
|
Path root = Paths.get(rootPath).toAbsolutePath();
|
|
|
@@ -201,36 +201,36 @@ public class StageFileManager {
|
|
|
relativePath = root.relativize(filePath).toString().replace("\\", "/");
|
|
|
}
|
|
|
|
|
|
- String remoteFilePath = applyStageResponse.getStagePrefix() + stagePath + relativePath;
|
|
|
- putObjectWithRetry(file, remoteFilePath, stagePath);
|
|
|
+ String remoteFilePath = applyVolumeResponse.getVolumePrefix() + volumePath + relativePath;
|
|
|
+ putObjectWithRetry(file, remoteFilePath, volumePath);
|
|
|
}
|
|
|
|
|
|
- private void putObjectWithRetry(File file, String remoteFilePath, String stagePath) {
|
|
|
- refreshIfExpire(stagePath);
|
|
|
+ private void putObjectWithRetry(File file, String remoteFilePath, String volumePath) {
|
|
|
+ refreshIfExpire(volumePath);
|
|
|
String msg = "upload " + file.getAbsolutePath();
|
|
|
withRetry(msg, () -> {
|
|
|
try {
|
|
|
- storageClient.putObject(file, applyStageResponse.getBucketName(), remoteFilePath);
|
|
|
+ storageClient.putObject(file, applyVolumeResponse.getBucketName(), remoteFilePath);
|
|
|
} catch (Exception e) {
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
|
- }, stagePath);
|
|
|
+ }, volumePath);
|
|
|
|
|
|
}
|
|
|
|
|
|
- private void refreshIfExpire(String stagePath) {
|
|
|
- Instant instant = Instant.parse(applyStageResponse.getCredentials().getExpireTime());
|
|
|
+ private void refreshIfExpire(String volumePath) {
|
|
|
+ Instant instant = Instant.parse(applyVolumeResponse.getCredentials().getExpireTime());
|
|
|
Date expireTime = Date.from(instant);
|
|
|
if (new Date().after(expireTime)) {
|
|
|
synchronized (this) {
|
|
|
if (new Date().after(expireTime)) {
|
|
|
- refreshStageAndClient(stagePath);
|
|
|
+ refreshVolumeAndClient(volumePath);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private <T> T withRetry(String actionName, Callable<T> callable, String stagePath) {
|
|
|
+ private <T> T withRetry(String actionName, Callable<T> callable, String volumePath) {
|
|
|
final int maxRetries = 5;
|
|
|
int attempt = 0;
|
|
|
while (attempt < maxRetries) {
|
|
|
@@ -240,7 +240,7 @@ public class StageFileManager {
|
|
|
throw e;
|
|
|
} catch (Exception e) {
|
|
|
attempt++;
|
|
|
- refreshStageAndClient(stagePath);
|
|
|
+ refreshVolumeAndClient(volumePath);
|
|
|
logger.warn("Attempt {} failed to {}", attempt, actionName, e);
|
|
|
if (attempt == maxRetries) {
|
|
|
throw new RuntimeException(actionName + " failed after " + maxRetries + " attempts", e);
|
|
|
@@ -254,11 +254,11 @@ public class StageFileManager {
|
|
|
throw new RuntimeException(actionName + " failed unexpectedly.");
|
|
|
}
|
|
|
|
|
|
- private void withRetry(String actionName, Runnable runnable, String stagePath) {
|
|
|
+ private void withRetry(String actionName, Runnable runnable, String volumePath) {
|
|
|
withRetry(actionName, () -> {
|
|
|
runnable.run();
|
|
|
return null;
|
|
|
- }, stagePath);
|
|
|
+ }, volumePath);
|
|
|
}
|
|
|
|
|
|
|