Browse Source

Disable bulkload interfaces (#315)

Signed-off-by: groot <yihua.mo@zilliz.com>
groot 3 years ago
parent
commit
3606d4a7ab

+ 92 - 92
src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -1198,98 +1198,98 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient {
         }
     }
 
-    @Override
-    public R<ImportResponse> bulkload(@NonNull BulkloadParam requestParam) {
-        if (!clientIsReady()) {
-            return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
-        }
-
-        logInfo(requestParam.toString());
-
-        try {
-            ImportRequest.Builder builder = ImportRequest.newBuilder();
-            builder.setCollectionName(requestParam.getCollectionName())
-                    .setPartitionName(requestParam.getPartitionName())
-                    .setRowBased(requestParam.isRowBased());
-            requestParam.getFiles().forEach(builder::addFiles);
-            List<KeyValuePair> options = assembleKvPair(requestParam.getOptions());
-            if (CollectionUtils.isNotEmpty(options)) {
-                options.forEach(builder::addOptions);
-            }
-
-            ImportRequest importRequest = builder.build();
-            ImportResponse response = blockingStub().import_(importRequest);
-
-            if (response.getStatus().getErrorCode() == ErrorCode.Success) {
-                logInfo("ImportRequest successfully!");
-                return R.success(response);
-            } else {
-                return failedStatus("ImportRequest", response.getStatus());
-            }
-        } catch (StatusRuntimeException e) {
-            logError("ImportRequest RPC failed:\n{}", e.getStatus().toString());
-            return R.failed(e);
-        } catch (Exception e) {
-            logError("ImportRequest failed:\n{}", e.getMessage());
-            return R.failed(e);
-        }
-    }
-
-    @Override
-    public R<GetImportStateResponse> getBulkloadState(GetBulkloadStateParam requestParam) {
-        if (!clientIsReady()) {
-            return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
-        }
-
-        logInfo(requestParam.toString());
-
-        try {
-            GetImportStateRequest importRequest = GetImportStateRequest.newBuilder()
-                    .setTask(requestParam.getTaskID())
-                    .build();
-            GetImportStateResponse response = blockingStub().getImportState(importRequest);
-
-            if (response.getStatus().getErrorCode() == ErrorCode.Success) {
-                logInfo("GetImportStateRequest successfully!");
-                return R.success(response);
-            } else {
-                return failedStatus("GetImportStateRequest", response.getStatus());
-            }
-        } catch (StatusRuntimeException e) {
-            logError("GetImportStateRequest RPC failed:\n{}", e.getStatus().toString());
-            return R.failed(e);
-        } catch (Exception e) {
-            logError("GetImportStateRequest failed:\n{}", e.getMessage());
-            return R.failed(e);
-        }
-    }
-
-    @Override
-    public R<ListImportTasksResponse> listBulkloadTasks(@NonNull ListBulkloadTasksParam requestParam) {
-        if (!clientIsReady()) {
-            return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
-        }
-
-        logInfo(requestParam.toString());
-
-        try {
-            ListImportTasksRequest listRequest = ListImportTasksRequest.newBuilder().build();
-            ListImportTasksResponse response = blockingStub().listImportTasks(listRequest);
-
-            if (response.getStatus().getErrorCode() == ErrorCode.Success) {
-                logInfo("ListImportTasksRequest successfully!");
-                return R.success(response);
-            } else {
-                return failedStatus("ListImportTasksRequest", response.getStatus());
-            }
-        } catch (StatusRuntimeException e) {
-            logError("ListImportTasksRequest RPC failed! \n{}", e.getMessage());
-            return R.failed(e);
-        } catch (Exception e) {
-            logError("ListImportTasksRequest failed! \n{}", e.getMessage());
-            return R.failed(e);
-        }
-    }
+//    @Override
+//    public R<ImportResponse> bulkload(@NonNull BulkloadParam requestParam) {
+//        if (!clientIsReady()) {
+//            return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
+//        }
+//
+//        logInfo(requestParam.toString());
+//
+//        try {
+//            ImportRequest.Builder builder = ImportRequest.newBuilder();
+//            builder.setCollectionName(requestParam.getCollectionName())
+//                    .setPartitionName(requestParam.getPartitionName())
+//                    .setRowBased(requestParam.isRowBased());
+//            requestParam.getFiles().forEach(builder::addFiles);
+//            List<KeyValuePair> options = assembleKvPair(requestParam.getOptions());
+//            if (CollectionUtils.isNotEmpty(options)) {
+//                options.forEach(builder::addOptions);
+//            }
+//
+//            ImportRequest importRequest = builder.build();
+//            ImportResponse response = blockingStub().import_(importRequest);
+//
+//            if (response.getStatus().getErrorCode() == ErrorCode.Success) {
+//                logInfo("ImportRequest successfully!");
+//                return R.success(response);
+//            } else {
+//                return failedStatus("ImportRequest", response.getStatus());
+//            }
+//        } catch (StatusRuntimeException e) {
+//            logError("ImportRequest RPC failed:\n{}", e.getStatus().toString());
+//            return R.failed(e);
+//        } catch (Exception e) {
+//            logError("ImportRequest failed:\n{}", e.getMessage());
+//            return R.failed(e);
+//        }
+//    }
+//
+//    @Override
+//    public R<GetImportStateResponse> getBulkloadState(GetBulkloadStateParam requestParam) {
+//        if (!clientIsReady()) {
+//            return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
+//        }
+//
+//        logInfo(requestParam.toString());
+//
+//        try {
+//            GetImportStateRequest importRequest = GetImportStateRequest.newBuilder()
+//                    .setTask(requestParam.getTaskID())
+//                    .build();
+//            GetImportStateResponse response = blockingStub().getImportState(importRequest);
+//
+//            if (response.getStatus().getErrorCode() == ErrorCode.Success) {
+//                logInfo("GetImportStateRequest successfully!");
+//                return R.success(response);
+//            } else {
+//                return failedStatus("GetImportStateRequest", response.getStatus());
+//            }
+//        } catch (StatusRuntimeException e) {
+//            logError("GetImportStateRequest RPC failed:\n{}", e.getStatus().toString());
+//            return R.failed(e);
+//        } catch (Exception e) {
+//            logError("GetImportStateRequest failed:\n{}", e.getMessage());
+//            return R.failed(e);
+//        }
+//    }
+//
+//    @Override
+//    public R<ListImportTasksResponse> listBulkloadTasks(@NonNull ListBulkloadTasksParam requestParam) {
+//        if (!clientIsReady()) {
+//            return R.failed(new ClientNotConnectedException("Client rpc channel is not ready"));
+//        }
+//
+//        logInfo(requestParam.toString());
+//
+//        try {
+//            ListImportTasksRequest listRequest = ListImportTasksRequest.newBuilder().build();
+//            ListImportTasksResponse response = blockingStub().listImportTasks(listRequest);
+//
+//            if (response.getStatus().getErrorCode() == ErrorCode.Success) {
+//                logInfo("ListImportTasksRequest successfully!");
+//                return R.success(response);
+//            } else {
+//                return failedStatus("ListImportTasksRequest", response.getStatus());
+//            }
+//        } catch (StatusRuntimeException e) {
+//            logError("ListImportTasksRequest RPC failed! \n{}", e.getMessage());
+//            return R.failed(e);
+//        } catch (Exception e) {
+//            logError("ListImportTasksRequest failed! \n{}", e.getMessage());
+//            return R.failed(e);
+//        }
+//    }
 
     @Override
     public R<MutationResult> insert(@NonNull InsertParam requestParam) {

+ 23 - 23
src/main/java/io/milvus/client/MilvusClient.java

@@ -286,29 +286,29 @@ public interface MilvusClient {
      */
     R<MutationResult> delete(DeleteParam requestParam);
 
-    /**
-     * Import data from external files, currently support JSON/Numpy format
-     *
-     * @param requestParam {@link BulkloadParam}
-     * @return {status:result code, data:RpcStatus{msg: result message}}
-     */
-    R<ImportResponse> bulkload(BulkloadParam requestParam);
-
-    /**
-     * Get state of bulk a load task
-     *
-     * @param requestParam {@link GetBulkloadStateParam}
-     * @return {status:result code, data:GetImportStateResponse{status,state}}
-     */
-    R<GetImportStateResponse> getBulkloadState(GetBulkloadStateParam requestParam);
-
-    /**
-     * List all bulk load tasks
-     *
-     * @param requestParam {@link ListBulkloadTasksParam}
-     * @return {status:result code, data:ListImportTasksResponse{status,state}}
-     */
-    R<ListImportTasksResponse> listBulkloadTasks(ListBulkloadTasksParam requestParam);
+//    /**
+//     * Import data from external files, currently support JSON/Numpy format
+//     *
+//     * @param requestParam {@link BulkloadParam}
+//     * @return {status:result code, data:RpcStatus{msg: result message}}
+//     */
+//    R<ImportResponse> bulkload(BulkloadParam requestParam);
+//
+//    /**
+//     * Get state of bulk a load task
+//     *
+//     * @param requestParam {@link GetBulkloadStateParam}
+//     * @return {status:result code, data:GetImportStateResponse{status,state}}
+//     */
+//    R<GetImportStateResponse> getBulkloadState(GetBulkloadStateParam requestParam);
+//
+//    /**
+//     * List all bulk load tasks
+//     *
+//     * @param requestParam {@link ListBulkloadTasksParam}
+//     * @return {status:result code, data:ListImportTasksResponse{status,state}}
+//     */
+//    R<ListImportTasksResponse> listBulkloadTasks(ListBulkloadTasksParam requestParam);
 
     /**
      * Conducts ANN search on a vector field. Use expression to do filtering before search.

+ 17 - 17
src/main/java/io/milvus/client/MilvusMultiServiceClient.java

@@ -289,23 +289,23 @@ public class MilvusMultiServiceClient implements MilvusClient {
         return handleResponse(response);
     }
 
-    @Override
-    public R<ImportResponse> bulkload(@NonNull BulkloadParam requestParam) {
-        List<R<ImportResponse>> response = this.clusterFactory.getAvailableServerSettings().stream()
-                .map(serverSetting -> serverSetting.getClient().bulkload(requestParam))
-                .collect(Collectors.toList());
-        return handleResponse(response);
-    }
-
-    @Override
-    public R<GetImportStateResponse> getBulkloadState(GetBulkloadStateParam requestParam) {
-        return this.clusterFactory.getMaster().getClient().getBulkloadState(requestParam);
-    }
-
-    @Override
-    public R<ListImportTasksResponse> listBulkloadTasks(@NonNull ListBulkloadTasksParam requestParam) {
-        return this.clusterFactory.getMaster().getClient().listBulkloadTasks(requestParam);
-    }
+//    @Override
+//    public R<ImportResponse> bulkload(@NonNull BulkloadParam requestParam) {
+//        List<R<ImportResponse>> response = this.clusterFactory.getAvailableServerSettings().stream()
+//                .map(serverSetting -> serverSetting.getClient().bulkload(requestParam))
+//                .collect(Collectors.toList());
+//        return handleResponse(response);
+//    }
+//
+//    @Override
+//    public R<GetImportStateResponse> getBulkloadState(GetBulkloadStateParam requestParam) {
+//        return this.clusterFactory.getMaster().getClient().getBulkloadState(requestParam);
+//    }
+//
+//    @Override
+//    public R<ListImportTasksResponse> listBulkloadTasks(@NonNull ListBulkloadTasksParam requestParam) {
+//        return this.clusterFactory.getMaster().getClient().listBulkloadTasks(requestParam);
+//    }
 
     @Override
     public R<SearchResults> search(SearchParam requestParam) {

+ 77 - 77
src/test/java/io/milvus/client/MilvusServiceClientTest.java

@@ -1606,36 +1606,36 @@ class MilvusServiceClientTest {
         testFuncByName("delete", param);
     }
 
-    @Test
-    void bulkload() {
-        List<String> files = Collections.singletonList("f1");
-        BulkloadParam param = BulkloadParam.newBuilder()
-                .withCollectionName("collection1")
-                .withPartitionName("partition1")
-                .withRowBased(true)
-                .addFile("dummy.json")
-                .withFiles(files)
-                .withBucket("myBucket")
-                .build();
-
-        testFuncByName("bulkload", param);
-    }
-
-    @Test
-    void getBulkloadState() {
-        GetBulkloadStateParam param = GetBulkloadStateParam.newBuilder()
-                .withTaskID(100L)
-                .build();
-
-        testFuncByName("getBulkloadState", param);
-    }
-
-    @Test
-    void listBulkloadTasks() {
-        ListBulkloadTasksParam param = ListBulkloadTasksParam.newBuilder().build();
-
-        testFuncByName("listBulkloadTasks", param);
-    }
+//    @Test
+//    void bulkload() {
+//        List<String> files = Collections.singletonList("f1");
+//        BulkloadParam param = BulkloadParam.newBuilder()
+//                .withCollectionName("collection1")
+//                .withPartitionName("partition1")
+//                .withRowBased(true)
+//                .addFile("dummy.json")
+//                .withFiles(files)
+//                .withBucket("myBucket")
+//                .build();
+//
+//        testFuncByName("bulkload", param);
+//    }
+//
+//    @Test
+//    void getBulkloadState() {
+//        GetBulkloadStateParam param = GetBulkloadStateParam.newBuilder()
+//                .withTaskID(100L)
+//                .build();
+//
+//        testFuncByName("getBulkloadState", param);
+//    }
+//
+//    @Test
+//    void listBulkloadTasks() {
+//        ListBulkloadTasksParam param = ListBulkloadTasksParam.newBuilder().build();
+//
+//        testFuncByName("listBulkloadTasks", param);
+//    }
 
     @Test
     void searchParam() {
@@ -2671,51 +2671,51 @@ class MilvusServiceClientTest {
         }
     }
 
-    @Test
-    void testGetBulkloadStateWrapper() {
-        long count = 1000;
-        ImportState state = ImportState.ImportStarted;
-        String reason = "unexpected error";
-        String files = "1.json";
-        String collection = "c1";
-        String partition = "p1";
-        GetImportStateResponse reso = GetImportStateResponse.newBuilder()
-                .setState(state)
-                .setRowCount(count)
-                .addIdList(0)
-                .addIdList(99)
-                .addInfos(KeyValuePair.newBuilder()
-                        .setKey(Constant.FAILED_REASON)
-                        .setValue(reason)
-                        .build())
-                .addInfos(KeyValuePair.newBuilder()
-                        .setKey(Constant.IMPORT_FILES)
-                        .setValue(files)
-                        .build())
-                .addInfos(KeyValuePair.newBuilder()
-                        .setKey(Constant.IMPORT_COLLECTION)
-                        .setValue(collection)
-                        .build())
-                .addInfos(KeyValuePair.newBuilder()
-                        .setKey(Constant.IMPORT_PARTITION)
-                        .setValue(partition)
-                        .build())
-                .setDataQueryable(false)
-                .setDataIndexed(false)
-                .build();
-
-        GetBulkloadStateWrapper wrapper = new GetBulkloadStateWrapper(reso);
-        assertEquals(count, wrapper.getImportedCount());
-        assertEquals(100, wrapper.getAutoGeneratedIDs().size());
-        assertEquals(0, wrapper.getAutoGeneratedIDs().get(0));
-        assertEquals(99, wrapper.getAutoGeneratedIDs().get(99));
-        assertEquals(reason, wrapper.getFailedReason());
-        assertEquals(files, wrapper.getFiles());
-        assertEquals(collection, wrapper.getCollectionName());
-        assertEquals(partition, wrapper.getPartitionName());
-        assertEquals(false, wrapper.queryable());
-        assertEquals(false, wrapper.indexed());
-
-        assertFalse(wrapper.toString().isEmpty());
-    }
+//    @Test
+//    void testGetBulkloadStateWrapper() {
+//        long count = 1000;
+//        ImportState state = ImportState.ImportStarted;
+//        String reason = "unexpected error";
+//        String files = "1.json";
+//        String collection = "c1";
+//        String partition = "p1";
+//        GetImportStateResponse reso = GetImportStateResponse.newBuilder()
+//                .setState(state)
+//                .setRowCount(count)
+//                .addIdList(0)
+//                .addIdList(99)
+//                .addInfos(KeyValuePair.newBuilder()
+//                        .setKey(Constant.FAILED_REASON)
+//                        .setValue(reason)
+//                        .build())
+//                .addInfos(KeyValuePair.newBuilder()
+//                        .setKey(Constant.IMPORT_FILES)
+//                        .setValue(files)
+//                        .build())
+//                .addInfos(KeyValuePair.newBuilder()
+//                        .setKey(Constant.IMPORT_COLLECTION)
+//                        .setValue(collection)
+//                        .build())
+//                .addInfos(KeyValuePair.newBuilder()
+//                        .setKey(Constant.IMPORT_PARTITION)
+//                        .setValue(partition)
+//                        .build())
+//                .setDataQueryable(false)
+//                .setDataIndexed(false)
+//                .build();
+//
+//        GetBulkloadStateWrapper wrapper = new GetBulkloadStateWrapper(reso);
+//        assertEquals(count, wrapper.getImportedCount());
+//        assertEquals(100, wrapper.getAutoGeneratedIDs().size());
+//        assertEquals(0, wrapper.getAutoGeneratedIDs().get(0));
+//        assertEquals(99, wrapper.getAutoGeneratedIDs().get(99));
+//        assertEquals(reason, wrapper.getFailedReason());
+//        assertEquals(files, wrapper.getFiles());
+//        assertEquals(collection, wrapper.getCollectionName());
+//        assertEquals(partition, wrapper.getPartitionName());
+//        assertEquals(false, wrapper.queryable());
+//        assertEquals(false, wrapper.indexed());
+//
+//        assertFalse(wrapper.toString().isEmpty());
+//    }
 }