Browse Source

framework && simple connection (#191)

* framework && simple connection

* pass in exception message in R

* pass in exception message in R

* pass in exception message in R

Co-authored-by: luhongwei <hongweilu@sohu-inc.com>
trovwu 3 years ago
parent
commit
940c3cc15f

+ 43 - 0
src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

@@ -0,0 +1,43 @@
+package io.milvus.client;
+
+import io.grpc.StatusRuntimeException;
+import io.milvus.grpc.BoolResponse;
+import io.milvus.grpc.ErrorCode;
+import io.milvus.grpc.HasCollectionRequest;
+import io.milvus.grpc.MilvusServiceGrpc;
+import io.milvus.param.R;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+public abstract class AbstractMilvusGrpcClient implements MilvusClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractMilvusGrpcClient.class);
+
+    protected abstract MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub();
+
+    protected abstract MilvusServiceGrpc.MilvusServiceFutureStub futureStub();
+
+    @Override
+    public R<Boolean> hasCollection(String collectionName) {
+        HasCollectionRequest hasCollectionRequest = HasCollectionRequest.newBuilder()
+                .setCollectionName(collectionName)
+                .build();
+
+        BoolResponse response;
+        try {
+            response = blockingStub().hasCollection(hasCollectionRequest);
+        } catch (StatusRuntimeException e) {
+            logger.error("[milvus] hasCollection:{} request error: {}", collectionName, e.getMessage());
+            return R.failed(e);
+        }
+        Boolean aBoolean = Optional.ofNullable(response)
+                .map(BoolResponse::getValue)
+                .orElse(false);
+
+        return R.success(aBoolean);
+
+
+    }
+}

+ 36 - 0
src/main/java/io/milvus/client/MilvusClient.java

@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.client;
+
+import io.milvus.param.R;
+
+import java.util.concurrent.TimeUnit;
+
+/** The Milvus Client Interface */
+public interface MilvusClient {
+
+  default void close() {
+    close(TimeUnit.MINUTES.toSeconds(1));
+  }
+
+  void close(long maxWaitSeconds);
+
+  R<Boolean> hasCollection(String collectionName);
+}

+ 81 - 0
src/main/java/io/milvus/client/MilvusServiceClient.java

@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.milvus.grpc.*;
+import io.milvus.param.ConnectParam;
+import io.milvus.param.R;
+
+import java.util.concurrent.TimeUnit;
+
+public class MilvusServiceClient extends AbstractMilvusGrpcClient {
+
+    private final ManagedChannel channel;
+    private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
+    private final MilvusServiceGrpc.MilvusServiceFutureStub futureStub;
+
+    public MilvusServiceClient(ConnectParam connectParam) {
+        channel = ManagedChannelBuilder.forAddress(connectParam.getHost(), connectParam.getPort())
+                .usePlaintext()
+                .maxInboundMessageSize(Integer.MAX_VALUE)
+                .keepAliveTime(connectParam.getKeepAliveTimeMs(), TimeUnit.MILLISECONDS)
+                .keepAliveTimeout(connectParam.getKeepAliveTimeoutMs(), TimeUnit.MILLISECONDS)
+                .keepAliveWithoutCalls(connectParam.isKeepAliveWithoutCalls())
+                .idleTimeout(connectParam.getIdleTimeoutMs(), TimeUnit.MILLISECONDS)
+                .build();
+        blockingStub = MilvusServiceGrpc.newBlockingStub(channel);
+        futureStub = MilvusServiceGrpc.newFutureStub(channel);
+    }
+
+    @Override
+    protected MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub() {
+        return this.blockingStub;
+    }
+
+    @Override
+    protected MilvusServiceGrpc.MilvusServiceFutureStub futureStub() {
+        return this.futureStub;
+    }
+
+    @Override
+    public void close(long maxWaitSeconds) {
+
+    }
+
+
+
+    public static void main(String[] args) {
+        ConnectParam build = ConnectParam.Builder.newBuilder()
+                .withHost("localhost")
+                .withPort(19530)
+                .build();
+
+        MilvusServiceClient milvusServiceClient = new MilvusServiceClient(build);
+
+        R<Boolean> chuwutest = milvusServiceClient.hasCollection("chuwutest");
+
+        System.out.println(chuwutest);
+    }
+
+
+}
+

+ 151 - 0
src/main/java/io/milvus/param/ConnectParam.java

@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.milvus.param;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * coonectParam, timeUnit:ms
+ */
+public class ConnectParam {
+  private final String host;
+  private final int port;
+  private final long connectTimeoutMs;
+  private final long keepAliveTimeMs;
+  private final long keepAliveTimeoutMs;
+  private final boolean keepAliveWithoutCalls;
+  private final long idleTimeoutMs;
+
+  private ConnectParam(@Nonnull Builder builder) {
+    this.host = builder.host;
+    this.port = builder.port;
+    this.connectTimeoutMs = builder.connectTimeoutMs;
+    this.keepAliveTimeMs = builder.keepAliveTimeMs;
+    this.keepAliveTimeoutMs = builder.keepAliveTimeoutMs;
+    this.keepAliveWithoutCalls = builder.keepAliveWithoutCalls;
+    this.idleTimeoutMs = builder.idleTimeoutMs;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public long getConnectTimeoutMs() {
+    return connectTimeoutMs;
+  }
+
+  public long getKeepAliveTimeMs() {
+    return keepAliveTimeMs;
+  }
+
+  public long getKeepAliveTimeoutMs() {
+    return keepAliveTimeoutMs;
+  }
+
+  public boolean isKeepAliveWithoutCalls() {
+    return keepAliveWithoutCalls;
+  }
+
+  public long getIdleTimeoutMs() {
+    return idleTimeoutMs;
+  }
+
+  /** Builder for <code>ConnectParam</code> */
+  public static class Builder {
+    private String host = "localhost";
+    private int port = 19530;
+    private long connectTimeoutMs = 10000;
+    private long keepAliveTimeMs = Long.MAX_VALUE; // Disabling keepalive
+    private long keepAliveTimeoutMs = 20000;
+    private boolean keepAliveWithoutCalls = false;
+    private long idleTimeoutMs = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS);
+
+    private Builder(){}
+
+    public static Builder newBuilder(){
+      return new Builder();
+    }
+
+    public Builder withHost(@Nonnull String host) {
+      this.host = host;
+      return this;
+    }
+
+    public Builder withPort(int port) throws IllegalArgumentException {
+      if (port < 0 || port > 0xFFFF) {
+        throw new IllegalArgumentException("Port is out of range!");
+      }
+      this.port = port;
+      return this;
+    }
+
+    public Builder withConnectTimeout(long connectTimeout, @Nonnull TimeUnit timeUnit)
+        throws IllegalArgumentException {
+      if (connectTimeout <= 0L) {
+        throw new IllegalArgumentException("Connect timeout must be positive!");
+      }
+      this.connectTimeoutMs = timeUnit.toMillis(connectTimeout);
+      return this;
+    }
+
+    public Builder withKeepAliveTime(long keepAliveTime, @Nonnull TimeUnit timeUnit)
+        throws IllegalArgumentException {
+      if (keepAliveTime <= 0L) {
+        throw new IllegalArgumentException("Keepalive time must be positive!");
+      }
+      this.keepAliveTimeMs = timeUnit.toMillis(keepAliveTime);
+      return this;
+    }
+
+    public Builder withKeepAliveTimeout(long keepAliveTimeout, @Nonnull TimeUnit timeUnit)
+        throws IllegalArgumentException {
+      if (keepAliveTimeout <= 0L) {
+        throw new IllegalArgumentException("Keepalive timeout must be positive!");
+      }
+      this.keepAliveTimeoutMs = timeUnit.toNanos(keepAliveTimeout);
+      return this;
+    }
+
+
+    public Builder keepAliveWithoutCalls(boolean enable) {
+      keepAliveWithoutCalls = enable;
+      return this;
+    }
+
+
+    public Builder withIdleTimeout(long idleTimeout, @Nonnull TimeUnit timeUnit)
+        throws IllegalArgumentException {
+      if (idleTimeout <= 0L) {
+        throw new IllegalArgumentException("Idle timeout must be positive!");
+      }
+      this.idleTimeoutMs = timeUnit.toMillis(idleTimeout);
+      return this;
+    }
+
+    public ConnectParam build() {
+      return new ConnectParam(this);
+    }
+  }
+}

+ 62 - 0
src/main/java/io/milvus/param/R.java

@@ -0,0 +1,62 @@
+package io.milvus.param;
+
+import io.milvus.grpc.ErrorCode;
+
+public class R<T> {
+    private Exception exception;
+    private Integer status;
+    private T data;
+
+    public Exception getException() {
+        return exception;
+    }
+
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+
+    public Integer getStatus() {
+        return status;
+    }
+
+    public void setStatus(Integer status) {
+        this.status = status;
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        this.data = data;
+    }
+
+    public static <T> R<T> failed(Exception exception){
+        R<T> r = new R<>();
+        r.setStatus(-1);
+        r.setException(exception);
+        return r;
+    }
+
+    public static <T> R<T> failed(ErrorCode errorCode){
+        R<T> r = new R<>();
+        r.setStatus(errorCode.ordinal());
+        r.setException(new Exception(errorCode.name()));
+        return r;
+    }
+
+    public static <T> R<T> success(){
+        R<T> r = new R<>();
+        r.setStatus(0);
+        return r;
+    }
+
+
+    public static <T> R<T> success(T data){
+        R<T> r = new R<>();
+        r.setStatus(0);
+        r.setData(data);
+        return r;
+    }
+
+}

+ 1 - 1
src/main/proto/milvus.proto

@@ -8,7 +8,7 @@ option java_package = "io.milvus.grpc";
 option java_outer_classname = "MilvusProto";
 option java_generate_equals_and_hash = true;
 
-package milvus.grpc;
+package milvus.proto.milvus;
 
 service MilvusService {
     rpc CreateCollection(CreateCollectionRequest) returns (common.Status) {}