Browse Source

Simplify `flush`

jianghua 4 years ago
parent
commit
ead251b839

+ 7 - 15
src/main/java/io/milvus/client/MilvusClient.java

@@ -371,52 +371,44 @@ public interface MilvusClient {
    *
    *
    * @param collectionName collection to delete ids from
    * @param collectionName collection to delete ids from
    * @param ids a <code>List</code> of entity ids to delete
    * @param ids a <code>List</code> of entity ids to delete
-   * @return <code>Response</code>
-   * @see Response
    */
    */
-  Response deleteEntityByID(String collectionName, List<Long> ids);
+  void deleteEntityByID(String collectionName, List<Long> ids);
 
 
   /**
   /**
    * Flushes data in a list collections. Newly inserted or modifications on data will be visible
    * Flushes data in a list collections. Newly inserted or modifications on data will be visible
    * after <code>flush</code> returned
    * after <code>flush</code> returned
    *
    *
    * @param collectionNames a <code>List</code> of collections to flush
    * @param collectionNames a <code>List</code> of collections to flush
-   * @return <code>Response</code>
-   * @see Response
    */
    */
-  Response flush(List<String> collectionNames);
+  void flush(List<String> collectionNames);
 
 
   /**
   /**
    * Flushes data in a list collections asynchronously. Newly inserted or modifications on data will
    * Flushes data in a list collections asynchronously. Newly inserted or modifications on data will
    * be visible after <code>flush</code> returned
    * be visible after <code>flush</code> returned
    *
    *
    * @param collectionNames a <code>List</code> of collections to flush
    * @param collectionNames a <code>List</code> of collections to flush
-   * @return a <code>ListenableFuture</code> object which holds the <code>Response</code>
-   * @see Response
+   * @return <code>ListenableFuture</code>
    * @see ListenableFuture
    * @see ListenableFuture
    */
    */
-  ListenableFuture<Response> flushAsync(List<String> collectionNames);
+  ListenableFuture<Void> flushAsync(List<String> collectionNames);
 
 
   /**
   /**
    * Flushes data in a collection. Newly inserted or modifications on data will be visible after
    * Flushes data in a collection. Newly inserted or modifications on data will be visible after
    * <code>flush</code> returned
    * <code>flush</code> returned
    *
    *
    * @param collectionName name of collection to flush
    * @param collectionName name of collection to flush
-   * @return <code>Response</code>
-   * @see Response
    */
    */
-  Response flush(String collectionName);
+  void flush(String collectionName);
 
 
   /**
   /**
    * Flushes data in a collection asynchronously. Newly inserted or modifications on data will be
    * Flushes data in a collection asynchronously. Newly inserted or modifications on data will be
    * visible after <code>flush</code> returned
    * visible after <code>flush</code> returned
    *
    *
    * @param collectionName name of collection to flush
    * @param collectionName name of collection to flush
-   * @return a <code>ListenableFuture</code> object which holds the <code>Response</code>
-   * @see Response
+   * @return <code>ListenableFuture</code>
    * @see ListenableFuture
    * @see ListenableFuture
    */
    */
-  ListenableFuture<Response> flushAsync(String collectionName);
+  ListenableFuture<Void> flushAsync(String collectionName);
 
 
   /**
   /**
    * Compacts the collection, erasing deleted data from disk and rebuild index in background (if the
    * Compacts the collection, erasing deleted data from disk and rebuild index in background (if the

+ 21 - 100
src/main/java/io/milvus/client/MilvusGrpcClient.java

@@ -496,118 +496,39 @@ abstract class AbstractMilvusGrpcClient implements MilvusClient {
   }
   }
 
 
   @Override
   @Override
-  public Response deleteEntityByID(String collectionName, List<Long> ids) {
-    if (!maybeAvailable()) {
-      logWarning("You are not connected to Milvus server");
-      return new Response(Response.Status.CLIENT_NOT_CONNECTED);
-    }
-
-    DeleteByIDParam request =
-        DeleteByIDParam.newBuilder().setCollectionName(collectionName).addAllIdArray(ids).build();
-    Status response;
-
-    try {
-      response = blockingStub().deleteByID(request);
-
-      if (response.getErrorCode() == ErrorCode.SUCCESS) {
-        logInfo("deleteEntityByID in collection `{}` completed successfully!", collectionName);
-        return new Response(Response.Status.SUCCESS);
-      } else {
-        logError(
-            "deleteEntityByID in collection `{}` failed:\n{}", collectionName, response.toString());
-        return new Response(
-            Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
-      }
-    } catch (StatusRuntimeException e) {
-      logError("deleteEntityByID RPC failed:\n{}", e.getStatus().toString());
-      return new Response(Response.Status.RPC_ERROR, e.toString());
-    }
+  public void deleteEntityByID(String collectionName, List<Long> ids) {
+    translateExceptions(() -> {
+      DeleteByIDParam request = DeleteByIDParam.newBuilder()
+          .setCollectionName(collectionName)
+          .addAllIdArray(ids)
+          .build();
+      Status response = blockingStub().deleteByID(request);
+      checkResponseStatus(response);
+    });
   }
   }
 
 
   @Override
   @Override
-  public Response flush(List<String> collectionNames) {
-    if (!maybeAvailable()) {
-      logWarning("You are not connected to Milvus server");
-      return new Response(Response.Status.CLIENT_NOT_CONNECTED);
-    }
-
-    FlushParam request = FlushParam.newBuilder().addAllCollectionNameArray(collectionNames).build();
-    Status response;
-
-    try {
-      response = blockingStub().flush(request);
-
-      if (response.getErrorCode() == ErrorCode.SUCCESS) {
-        logInfo("Flushed collection {} successfully!", collectionNames);
-        return new Response(Response.Status.SUCCESS);
-      } else {
-        logError("Flush collection {} failed:\n{}", collectionNames, response.toString());
-        return new Response(
-            Response.Status.valueOf(response.getErrorCodeValue()), response.getReason());
-      }
-    } catch (StatusRuntimeException e) {
-      logError("flush RPC failed:\n{}", e.getStatus().toString());
-      return new Response(Response.Status.RPC_ERROR, e.toString());
-    }
+  public void flush(List<String> collectionNames) {
+    translateExceptions(() -> Futures.getUnchecked(flushAsync(collectionNames)));
   }
   }
 
 
   @Override
   @Override
-  public ListenableFuture<Response> flushAsync(@Nonnull List<String> collectionNames) {
-
-    if (!maybeAvailable()) {
-      logWarning("You are not connected to Milvus server");
-      return Futures.immediateFuture(new Response(Response.Status.CLIENT_NOT_CONNECTED));
-    }
-
-    FlushParam request = FlushParam.newBuilder().addAllCollectionNameArray(collectionNames).build();
-
-    ListenableFuture<Status> response;
-
-    response = futureStub().flush(request);
-
-    Futures.addCallback(
-        response,
-        new FutureCallback<Status>() {
-          @Override
-          public void onSuccess(Status result) {
-            if (result.getErrorCode() == ErrorCode.SUCCESS) {
-              logInfo("Flushed collection {} successfully!", collectionNames);
-            } else {
-              logError("Flush collection {} failed:\n{}", collectionNames, result.toString());
-            }
-          }
-
-          @Override
-          public void onFailure(Throwable t) {
-            logError("FlushAsync failed:\n{}", t.getMessage());
-          }
-        },
-        MoreExecutors.directExecutor());
-
-    return Futures.transform(
-        response, transformStatusToResponseFunc::apply, MoreExecutors.directExecutor());
+  public ListenableFuture<Void> flushAsync(@Nonnull List<String> collectionNames) {
+    return translateExceptions(() -> {
+      FlushParam request = FlushParam.newBuilder().addAllCollectionNameArray(collectionNames).build();
+      ListenableFuture<Status> response = futureStub().flush(request);
+      return Futures.transform(response, this::checkResponseStatus, MoreExecutors.directExecutor());
+    });
   }
   }
 
 
   @Override
   @Override
-  public Response flush(String collectionName) {
-    List<String> list =
-        new ArrayList<String>() {
-          {
-            add(collectionName);
-          }
-        };
-    return flush(list);
+  public void flush(String collectionName) {
+    flush(Collections.singletonList(collectionName));
   }
   }
 
 
   @Override
   @Override
-  public ListenableFuture<Response> flushAsync(String collectionName) {
-    List<String> list =
-        new ArrayList<String>() {
-          {
-            add(collectionName);
-          }
-        };
-    return flushAsync(list);
+  public ListenableFuture<Void> flushAsync(String collectionName) {
+    return flushAsync(Collections.singletonList(collectionName));
   }
   }
 
 
   @Override
   @Override

+ 21 - 25
src/test/java/io/milvus/client/MilvusGrpcClientTest.java

@@ -383,7 +383,7 @@ class MilvusClientTest {
         .setPartitionTag(tag2);
         .setPartitionTag(tag2);
     client.insert(insertParam);
     client.insert(insertParam);
 
 
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.flush(randomCollectionName);
 
 
     assertEquals(size * 2, client.countEntities(randomCollectionName));
     assertEquals(size * 2, client.countEntities(randomCollectionName));
 
 
@@ -427,7 +427,7 @@ class MilvusClientTest {
   @org.junit.jupiter.api.Test
   @org.junit.jupiter.api.Test
   void createIndex() {
   void createIndex() {
     insert();
     insert();
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.flush(randomCollectionName);
 
 
     Index index = Index.create(randomCollectionName, "float_vec")
     Index index = Index.create(randomCollectionName, "float_vec")
         .setIndexType(IndexType.IVF_SQ8)
         .setIndexType(IndexType.IVF_SQ8)
@@ -513,7 +513,7 @@ class MilvusClientTest {
     List<Long> entityIds = client.insert(insertParam);
     List<Long> entityIds = client.insert(insertParam);
     assertEquals(size, entityIds.size());
     assertEquals(size, entityIds.size());
 
 
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.flush(randomCollectionName);
 
 
     final int searchSize = 5;
     final int searchSize = 5;
     List<List<Float>> vectorsToSearch = vectors.subList(0, searchSize);
     List<List<Float>> vectorsToSearch = vectors.subList(0, searchSize);
@@ -572,7 +572,7 @@ class MilvusClientTest {
     List<Long> entityIds = client.insert(insertParam);
     List<Long> entityIds = client.insert(insertParam);
     assertEquals(size, entityIds.size());
     assertEquals(size, entityIds.size());
 
 
-    assertTrue(client.flush(binaryCollectionName).ok());
+    client.flush(binaryCollectionName);
 
 
     final int searchSize = 5;
     final int searchSize = 5;
     List<String> vectorsToSearch = vectors.subList(0, searchSize)
     List<String> vectorsToSearch = vectors.subList(0, searchSize)
@@ -636,14 +636,14 @@ class MilvusClientTest {
   @org.junit.jupiter.api.Test
   @org.junit.jupiter.api.Test
   void countEntities() {
   void countEntities() {
     insert();
     insert();
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.flush(randomCollectionName);
     assertEquals(size, client.countEntities(randomCollectionName));
     assertEquals(size, client.countEntities(randomCollectionName));
   }
   }
 
 
   @org.junit.jupiter.api.Test
   @org.junit.jupiter.api.Test
   void loadCollection() {
   void loadCollection() {
     insert();
     insert();
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.flush(randomCollectionName);
     client.loadCollection(randomCollectionName);
     client.loadCollection(randomCollectionName);
   }
   }
 
 
@@ -651,7 +651,7 @@ class MilvusClientTest {
   void getCollectionStats() {
   void getCollectionStats() {
     insert();
     insert();
 
 
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.flush(randomCollectionName);
 
 
     String collectionStats = client.getCollectionStats(randomCollectionName);
     String collectionStats = client.getCollectionStats(randomCollectionName);
     JSONObject jsonInfo = new JSONObject(collectionStats);
     JSONObject jsonInfo = new JSONObject(collectionStats);
@@ -685,7 +685,7 @@ class MilvusClientTest {
     List<Long> entityIds = client.insert(insertParam);
     List<Long> entityIds = client.insert(insertParam);
     assertEquals(size, entityIds.size());
     assertEquals(size, entityIds.size());
 
 
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.flush(randomCollectionName);
 
 
     List<Map<String, Object>> fieldsMap =
     List<Map<String, Object>> fieldsMap =
         client.getEntityByID(randomCollectionName, entityIds.subList(0, 100));
         client.getEntityByID(randomCollectionName, entityIds.subList(0, 100));
@@ -717,7 +717,7 @@ class MilvusClientTest {
         .setEntityIds(entityIds);
         .setEntityIds(entityIds);
     assertEquals(size, client.insert(insertParam).size());
     assertEquals(size, client.insert(insertParam).size());
 
 
-    assertTrue(client.flush(binaryCollectionName).ok());
+    client.flush(binaryCollectionName);
 
 
     List<Map<String, Object>> fieldsMap =
     List<Map<String, Object>> fieldsMap =
         client.getEntityByID(binaryCollectionName, entityIds.subList(0, 100));
         client.getEntityByID(binaryCollectionName, entityIds.subList(0, 100));
@@ -731,7 +731,7 @@ class MilvusClientTest {
   void getEntityIds() {
   void getEntityIds() {
     insert();
     insert();
 
 
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.flush(randomCollectionName);
 
 
     String collectionStats = client.getCollectionStats(randomCollectionName);
     String collectionStats = client.getCollectionStats(randomCollectionName);
     JSONObject jsonInfo = new JSONObject(collectionStats);
     JSONObject jsonInfo = new JSONObject(collectionStats);
@@ -766,21 +766,21 @@ class MilvusClientTest {
     List<Long> entityIds = client.insert(insertParam);
     List<Long> entityIds = client.insert(insertParam);
     assertEquals(size, entityIds.size());
     assertEquals(size, entityIds.size());
 
 
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.flush(randomCollectionName);
 
 
-    assertTrue(client.deleteEntityByID(randomCollectionName, entityIds.subList(0, 100)).ok());
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.deleteEntityByID(randomCollectionName, entityIds.subList(0, 100));
+    client.flush(randomCollectionName);
     assertEquals(size - 100, client.countEntities(randomCollectionName));
     assertEquals(size - 100, client.countEntities(randomCollectionName));
   }
   }
 
 
   @org.junit.jupiter.api.Test
   @org.junit.jupiter.api.Test
   void flush() {
   void flush() {
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.flush(randomCollectionName);
   }
   }
 
 
   @org.junit.jupiter.api.Test
   @org.junit.jupiter.api.Test
   void flushAsync() throws ExecutionException, InterruptedException {
   void flushAsync() throws ExecutionException, InterruptedException {
-    assertTrue(client.flushAsync(randomCollectionName).get().ok());
+    client.flushAsync(randomCollectionName);
   }
   }
 
 
   @org.junit.jupiter.api.Test
   @org.junit.jupiter.api.Test
@@ -804,7 +804,7 @@ class MilvusClientTest {
     List<Long> entityIds = client.insert(insertParam);
     List<Long> entityIds = client.insert(insertParam);
     assertEquals(size, entityIds.size());
     assertEquals(size, entityIds.size());
 
 
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.flush(randomCollectionName);
 
 
     String collectionStats = client.getCollectionStats(randomCollectionName);
     String collectionStats = client.getCollectionStats(randomCollectionName);
     JSONObject jsonInfo = new JSONObject(collectionStats);
     JSONObject jsonInfo = new JSONObject(collectionStats);
@@ -813,10 +813,8 @@ class MilvusClientTest {
         .getJSONObject(0)
         .getJSONObject(0)
         .getLong("data_size");
         .getLong("data_size");
 
 
-    assertTrue(
-        client.deleteEntityByID(randomCollectionName,
-            entityIds.subList(0, size / 2)).ok());
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.deleteEntityByID(randomCollectionName, entityIds.subList(0, size / 2));
+    client.flush(randomCollectionName);
     assertTrue(client.compact(
     assertTrue(client.compact(
         new CompactParam.Builder(randomCollectionName).withThreshold(0.2).build()).ok());
         new CompactParam.Builder(randomCollectionName).withThreshold(0.2).build()).ok());
 
 
@@ -851,7 +849,7 @@ class MilvusClientTest {
     List<Long> entityIds = client.insert(insertParam);
     List<Long> entityIds = client.insert(insertParam);
     assertEquals(size, entityIds.size());
     assertEquals(size, entityIds.size());
 
 
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.flush(randomCollectionName);
 
 
     String collectionStats = client.getCollectionStats(randomCollectionName);
     String collectionStats = client.getCollectionStats(randomCollectionName);
     JSONObject jsonInfo = new JSONObject(collectionStats);
     JSONObject jsonInfo = new JSONObject(collectionStats);
@@ -864,10 +862,8 @@ class MilvusClientTest {
 
 
     long previousSegmentSize = segmentInfo.getLong("data_size");
     long previousSegmentSize = segmentInfo.getLong("data_size");
 
 
-    assertTrue(
-        client.deleteEntityByID(randomCollectionName,
-            entityIds.subList(0, size / 2)).ok());
-    assertTrue(client.flush(randomCollectionName).ok());
+    client.deleteEntityByID(randomCollectionName, entityIds.subList(0, size / 2));
+    client.flush(randomCollectionName);
     assertTrue(client.compactAsync(
     assertTrue(client.compactAsync(
         new CompactParam.Builder(randomCollectionName).withThreshold(0.8).build()).get().ok());
         new CompactParam.Builder(randomCollectionName).withThreshold(0.8).build()).get().ok());