|
- import axios from 'axios';
- interface IPrometheusNode {
- type: string;
- pod: string;
- cpu: number[];
- memory: number[];
- }
- interface IPrometheusAllData {
- totalVectorsCount: number[];
- searchVectorsCount: number[];
- searchFailedVectorsCount?: number[];
- sqLatency: number[];
- meta: number[];
- msgstream: number[];
- objstorage: number[];
- rootNodes: IPrometheusNode[];
- queryNodes: IPrometheusNode[];
- indexNodes: IPrometheusNode[];
- dataNodes: IPrometheusNode[];
- }
- const metaMetric = 'milvus_meta_op_count';
- const msgstreamMetric = 'milvus_msgstream_op_count';
- const objstorageMetric = 'milvus_storage_op_count';
- const totalVectorsCountMetric = 'milvus_proxy_insert_vectors_count';
- const searchVectorsCountMetric = 'milvus_proxy_search_vectors_count';
- const sqLatencyMetric = 'milvus_proxy_sq_latency_bucket';
- const cpuMetric = 'process_cpu_seconds_total';
- const memoryMetric = 'process_resident_memory_bytes';
- export class PrometheusService {
- static address: string = '';
- static instance: string = '';
- static namespace: string = '';
- static isReady: boolean = false;
- static get url() {
- return `http://${PrometheusService.address}`;
- }
- static get selector() {
- return (
- '{' +
- `app_kubernetes_io_instance="${PrometheusService.instance}"` +
- `,namespace="${PrometheusService.namespace}"` +
- '}'
- );
- }
- constructor() {
- // todo
- }
- async setPrometheus({
- prometheusAddress,
- prometheusInstance,
- prometheusNamespace,
- }: {
- prometheusAddress: string;
- prometheusInstance: string;
- prometheusNamespace: string;
- }) {
- PrometheusService.isReady = await this.checkPrometheus(prometheusAddress);
- if (PrometheusService.isReady) {
- PrometheusService.address = prometheusAddress;
- PrometheusService.instance = prometheusInstance;
- PrometheusService.namespace = prometheusNamespace;
- }
- return {
- isReady: PrometheusService.isReady,
- };
- }
- async checkPrometheus(prometheusAddress: string) {
- const result = await axios
- .get(`http://${prometheusAddress}/-/ready`)
- .then(res => res?.status === 200)
- .catch(err => {
- // console.log(err);
- return false;
- });
- return result;
- }
- async queryRange(expr: string, start: number, end: number, step: number) {
- const url =
- PrometheusService.url +
- '/api/v1/query_range?query=' +
- expr +
- `&start=${new Date(+start).toISOString()}` +
- `&end=${new Date(+end).toISOString()}` +
- `&step=${step / 1000}s`;
- console.log(url);
- const result = await axios
- .get(url)
- .then(res => res.data)
- .catch(err => {
- // console.log(err);
- return { status: 'failed' };
- });
- return result;
- }
- async getVectorsCount(
- metric: string,
- start: number,
- end: number,
- step: number
- ) {
- const expr = `${metric}${PrometheusService.selector}`;
- const result = await this.queryRange(expr, start, end, step);
- const data = result.data.result;
- const length = Math.floor((end - start) / step);
- if (data.length === 0) return Array(length).fill(0);
- const res = result.data.result[0].values.map((d: any) => +d[1]).slice(1);
- let leftLossCount;
- let rightLossCount;
- leftLossCount = Math.floor((data[0].values[0][0] * 1000 - start) / step);
- res.unshift(...Array(leftLossCount).fill(-1));
- rightLossCount = Math.floor(
- (end - data[0].values[data[0].values.length - 1][0] * 1000) / step
- );
- res.push(...Array(rightLossCount).fill(-2));
- return res;
- }
- getInsertVectorsCount = (start: number, end: number, step: number) =>
- this.getVectorsCount(totalVectorsCountMetric, start, end, step);
- async getSearchVectorsCount(start: number, end: number, step: number) {
- const expr = `${searchVectorsCountMetric}${PrometheusService.selector}`;
- const result = await this.queryRange(expr, start, end, step);
- const data = result.data.result;
- const length = Math.floor((end - start) / step);
- if (data.length === 0) return Array(length).fill(0);
- const totalCount = data[0].values.map((d: any) => +d[1]);
- const res = totalCount
- .map((d: number, i: number) => (i > 0 ? d - totalCount[i - 1] : d))
- .slice(1);
- let leftLossCount;
- let rightLossCount;
- leftLossCount = Math.floor((data[0].values[0][0] * 1000 - start) / step);
- res.unshift(...Array(leftLossCount).fill(-1));
- rightLossCount = Math.floor(
- (end - data[0].values[data[0].values.length - 1][0] * 1000) / step
- );
- res.push(...Array(rightLossCount).fill(-2));
- return res;
- }
- async getSQLatency(start: number, end: number, step: number) {
- const expr =
- `histogram_quantile(0.99, sum by (le, pod, node_id)` +
- `(rate(${sqLatencyMetric}${PrometheusService.selector}[${
- step / 1000
- }s])))`;
- const result = await this.queryRange(expr, start, end, step);
- const data = result.data.result;
- const length = Math.floor((end - start) / step);
- if (data.length === 0) return Array(length).fill(0);
- const res = data[0].values.map((d: any) => (isNaN(d[1]) ? 0 : +d[1]));
- // .slice(1);
- let leftLossCount;
- let rightLossCount;
- leftLossCount = Math.floor((data[0].values[0][0] * 1000 - start) / step);
- res.unshift(...Array(leftLossCount).fill(-1));
- rightLossCount = Math.floor(
- (end - data[0].values[data[0].values.length - 1][0] * 1000) / step
- );
- res.push(...Array(rightLossCount).fill(-2));
- return res;
- }
- async getThirdPartyServiceHealthStatus(
- metricName: string,
- start: number,
- end: number,
- step: number
- ) {
- const expr = `sum by (status) (${metricName}${PrometheusService.selector})`;
- const result = await this.queryRange(expr, start, end, step);
- const data = result.data.result;
- const length = Math.floor((end - start) / step);
- const totalCount = data
- .find((d: any) => d.metric.status === 'total')
- .values.map((d: any) => +d[1]);
- const totalSlices = totalCount
- .map((d: number, i: number) => (i > 0 ? d - totalCount[i - 1] : d))
- .slice(1);
- const successCount = data
- .find((d: any) => d.metric.status === 'success')
- .values.map((d: any) => +d[1]);
- const successSlices = successCount
- .map((d: number, i: number) => (i > 0 ? d - successCount[i - 1] : d))
- .slice(1);
- const res = totalSlices.map((d: number, i: number) =>
- d === 0 ? 1 : successSlices[i] / d
- );
- res.unshift(...Array(length - res.length).fill(-1));
- return res;
- }
- async getInternalNodesCPUData(start: number, end: number, step: number) {
- const expr = `${cpuMetric}${PrometheusService.selector}`;
- const result = await this.queryRange(expr, start, end, step);
- return result.data.result;
- }
- async getInternalNodesMemoryData(start: number, end: number, step: number) {
- const expr = `${memoryMetric}${PrometheusService.selector}`;
- const result = await this.queryRange(expr, start, end, step);
- return result.data.result;
- }
- reconstructNodeData(
- cpuNodesData: any,
- memoryNodesData: any,
- type: string,
- start: number,
- end: number,
- step: number
- ): IPrometheusNode[] {
- const cpuNodes = cpuNodesData.filter(
- (d: any) => d.metric.container.indexOf(type) >= 0
- );
- const memoryNodes = memoryNodesData.filter(
- (d: any) => d.metric.container.indexOf(type) >= 0
- );
- const nodesData = cpuNodes.map((d: any) => {
- const nodeType =
- d.metric.container.indexOf('coord') >= 0 ? 'coord' : 'node';
- const pod = d.metric.pod;
- const cpuProcessTotal = d.values.map((v: any) => +v[1]);
- const cpu = cpuProcessTotal
- .map((v: number, i: number) => (i > 0 ? v - cpuProcessTotal[i - 1] : 0))
- .slice(1)
- .map((v: number) => v / (step / 1000));
- let leftLossCount;
- let rightLossCount;
- leftLossCount = Math.floor((d.values[0][0] * 1000 - start) / step);
- cpu.unshift(...Array(leftLossCount).fill(-1));
- rightLossCount = Math.floor(
- (end - d.values[d.values.length - 1][0] * 1000) / step
- );
- cpu.push(...Array(rightLossCount).fill(-2));
- const node = memoryNodes.find((data: any) => data.metric.pod === pod);
- const memory = node.values.map((v: any) => +v[1]);
- leftLossCount = Math.floor((node.values[0][0] * 1000 - start) / step);
- memory.unshift(...Array(leftLossCount).fill(-1));
- rightLossCount = Math.floor(
- (end - node.values[node.values.length - 1][0] * 1000) / step
- );
- memory.push(...Array(rightLossCount).fill(-2));
- return {
- type: nodeType,
- pod,
- cpu,
- memory: memory.slice(1),
- } as IPrometheusNode;
- });
- return nodesData;
- }
- async getInternalNodesData(start: number, end: number, step: number) {
- const cpuNodes = await this.getInternalNodesCPUData(start, end, step);
- const memoryNodes = await this.getInternalNodesMemoryData(start, end, step);
- const [rootNodes, queryNodes, indexNodes, dataNodes] = [
- 'root',
- 'query',
- 'index',
- 'data',
- ].map((metric: string) =>
- this.reconstructNodeData(cpuNodes, memoryNodes, metric, start, end, step)
- );
- return { rootNodes, queryNodes, indexNodes, dataNodes };
- }
- async getMilvusHealthyData({
- start,
- end,
- step,
- }: {
- start: number;
- end: number;
- step: number;
- }) {
- const meta = await this.getThirdPartyServiceHealthStatus(
- metaMetric,
- start,
- end,
- step
- );
- const msgstream = await this.getThirdPartyServiceHealthStatus(
- msgstreamMetric,
- start,
- end,
- step
- );
- const objstorage = await this.getThirdPartyServiceHealthStatus(
- objstorageMetric,
- start,
- end,
- step
- );
- const totalVectorsCount = await this.getInsertVectorsCount(
- start,
- end,
- step
- );
- const searchVectorsCount = await this.getSearchVectorsCount(
- start,
- end,
- step
- );
- const sqLatency = await this.getSQLatency(start, end, step);
- const { rootNodes, queryNodes, indexNodes, dataNodes } =
- await this.getInternalNodesData(start, end, step);
- return {
- totalVectorsCount,
- searchVectorsCount,
- sqLatency,
- meta,
- msgstream,
- objstorage,
- rootNodes,
- queryNodes,
- indexNodes,
- dataNodes,
- } as IPrometheusAllData;
- }
- }
|