|
@@ -8,6 +8,7 @@ import LruCache from 'lru-cache';
|
|
import { HTTP_STATUS_CODE } from '../utils/Const';
|
|
import { HTTP_STATUS_CODE } from '../utils/Const';
|
|
import { DEFAULT_MILVUS_PORT } from '../utils';
|
|
import { DEFAULT_MILVUS_PORT } from '../utils';
|
|
import { connectivityState } from '@grpc/grpc-js';
|
|
import { connectivityState } from '@grpc/grpc-js';
|
|
|
|
+import { DatabasesService } from '../database/databases.service';
|
|
|
|
|
|
export class MilvusService {
|
|
export class MilvusService {
|
|
// Share with all instances, so activeAddress is static
|
|
// Share with all instances, so activeAddress is static
|
|
@@ -44,39 +45,63 @@ export class MilvusService {
|
|
address: string;
|
|
address: string;
|
|
username?: string;
|
|
username?: string;
|
|
password?: string;
|
|
password?: string;
|
|
|
|
+ database?: string;
|
|
},
|
|
},
|
|
cache: LruCache<any, any>
|
|
cache: LruCache<any, any>
|
|
) {
|
|
) {
|
|
- const { address, username, password } = data;
|
|
|
|
- // grpc only need address without http
|
|
|
|
|
|
+ // Destructure the data object to get the connection details
|
|
|
|
+ const { address, username, password, database } = data;
|
|
|
|
+
|
|
|
|
+ // Format the address to remove the http prefix
|
|
const milvusAddress = MilvusService.formatAddress(address);
|
|
const milvusAddress = MilvusService.formatAddress(address);
|
|
|
|
|
|
try {
|
|
try {
|
|
|
|
+ // Create a new Milvus client with the provided connection details
|
|
const milvusClient: MilvusClient = new MilvusClient({
|
|
const milvusClient: MilvusClient = new MilvusClient({
|
|
address: milvusAddress,
|
|
address: milvusAddress,
|
|
username,
|
|
username,
|
|
password,
|
|
password,
|
|
});
|
|
});
|
|
|
|
|
|
- // don't break attu
|
|
|
|
- await milvusClient.connectPromise.catch(error => {
|
|
|
|
- throw HttpErrors(HTTP_STATUS_CODE.FORBIDDEN, error);
|
|
|
|
- });
|
|
|
|
|
|
+ // Set the active Milvus client to the newly created client
|
|
|
|
+ MilvusService.activeMilvusClient = milvusClient;
|
|
|
|
|
|
- // check healthy
|
|
|
|
|
|
+ try {
|
|
|
|
+ // Attempt to connect to the Milvus server
|
|
|
|
+ await milvusClient.connectPromise;
|
|
|
|
+ } catch (error) {
|
|
|
|
+ // If the connection fails, clear the cache and throw an error
|
|
|
|
+ cache.dump();
|
|
|
|
+ throw new Error('Failed to connect to Milvus: ' + error);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Check the health of the Milvus server
|
|
const res = await milvusClient.checkHealth();
|
|
const res = await milvusClient.checkHealth();
|
|
|
|
|
|
- if (res.isHealthy) {
|
|
|
|
- MilvusService.activeAddress = address;
|
|
|
|
- cache.set(address, milvusClient);
|
|
|
|
- return { address };
|
|
|
|
- } else {
|
|
|
|
|
|
+ // If the server is not healthy, throw an error
|
|
|
|
+ if (!res.isHealthy) {
|
|
throw new Error('Milvus is not ready yet.');
|
|
throw new Error('Milvus is not ready yet.');
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // If the server is healthy, set the active address and add the client to the cache
|
|
|
|
+ MilvusService.activeAddress = address;
|
|
|
|
+ cache.set(address, milvusClient);
|
|
|
|
+
|
|
|
|
+ // Create a new database service and check if the specified database exists
|
|
|
|
+ const databaseService = new DatabasesService(this);
|
|
|
|
+ const hasDatabase = await databaseService.hasDatabase(database);
|
|
|
|
+
|
|
|
|
+ // if database exists, use this db
|
|
|
|
+ if (hasDatabase) {
|
|
|
|
+ await databaseService.use(database);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Return the address and the database (if it exists, otherwise return 'default')
|
|
|
|
+ return { address, database: hasDatabase ? database : 'default' };
|
|
} catch (error) {
|
|
} catch (error) {
|
|
- // if milvus is not working, delete connection.
|
|
|
|
|
|
+ // If any error occurs, clear the cache and throw the error
|
|
cache.dump();
|
|
cache.dump();
|
|
- throw HttpErrors(HTTP_STATUS_CODE.FORBIDDEN, error);
|
|
|
|
|
|
+ throw error;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|