Browse Source

Add os.allocated_processors

Current processors setting is not reflected in nodes info API
("os.available_processors"). Add os.allocated_processors to shows
actual number of processors that we are using.
xuzha 10 years ago
parent
commit
fb1d8bb149

+ 11 - 0
core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java

@@ -301,6 +301,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
     public static class OsStats implements ToXContent, Streamable {
 
         int availableProcessors;
+        int allocatedProcessors;
         long availableMemory;
         final ObjectIntHashMap<String> names;
 
@@ -310,6 +311,8 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
 
         public void addNodeInfo(NodeInfo nodeInfo) {
             availableProcessors += nodeInfo.getOs().getAvailableProcessors();
+            allocatedProcessors += nodeInfo.getOs().getAllocatedProcessors();
+
             if (nodeInfo.getOs().getName() != null) {
                 names.addTo(nodeInfo.getOs().getName(), 1);
             }
@@ -319,6 +322,10 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
             return availableProcessors;
         }
 
+        public int getAllocatedProcessors() {
+            return allocatedProcessors;
+        }
+
         public ByteSizeValue getAvailableMemory() {
             return new ByteSizeValue(availableMemory);
         }
@@ -326,6 +333,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
         @Override
         public void readFrom(StreamInput in) throws IOException {
             availableProcessors = in.readVInt();
+            allocatedProcessors = in.readVInt();
             availableMemory = in.readLong();
             int size = in.readVInt();
             names.clear();
@@ -337,6 +345,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeVInt(availableProcessors);
+            out.writeVInt(allocatedProcessors);
             out.writeLong(availableMemory);
             out.writeVInt(names.size());
             for (ObjectIntCursor<String> name : names) {
@@ -353,6 +362,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
 
         static final class Fields {
             static final XContentBuilderString AVAILABLE_PROCESSORS = new XContentBuilderString("available_processors");
+            static final XContentBuilderString ALLOCATED_PROCESSORS = new XContentBuilderString("allocated_processors");
             static final XContentBuilderString NAME = new XContentBuilderString("name");
             static final XContentBuilderString NAMES = new XContentBuilderString("names");
             static final XContentBuilderString MEM = new XContentBuilderString("mem");
@@ -364,6 +374,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.field(Fields.AVAILABLE_PROCESSORS, availableProcessors);
+            builder.field(Fields.ALLOCATED_PROCESSORS, allocatedProcessors);
             builder.startObject(Fields.MEM);
             builder.byteSizeField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, availableMemory);
             builder.endObject();

+ 10 - 0
core/src/main/java/org/elasticsearch/monitor/os/OsInfo.java

@@ -34,6 +34,8 @@ public class OsInfo implements Streamable, ToXContent {
 
     int availableProcessors;
 
+    int allocatedProcessors;
+
     String name = null;
     String arch = null;
     String version = null;
@@ -49,6 +51,10 @@ public class OsInfo implements Streamable, ToXContent {
         return this.availableProcessors;
     }
 
+    public int getAllocatedProcessors() {
+        return this.allocatedProcessors;
+    }
+
     public String getName() {
         return name;
     }
@@ -69,6 +75,7 @@ public class OsInfo implements Streamable, ToXContent {
         static final XContentBuilderString REFRESH_INTERVAL = new XContentBuilderString("refresh_interval");
         static final XContentBuilderString REFRESH_INTERVAL_IN_MILLIS = new XContentBuilderString("refresh_interval_in_millis");
         static final XContentBuilderString AVAILABLE_PROCESSORS = new XContentBuilderString("available_processors");
+        static final XContentBuilderString ALLOCATED_PROCESSORS = new XContentBuilderString("allocated_processors");
     }
 
     @Override
@@ -85,6 +92,7 @@ public class OsInfo implements Streamable, ToXContent {
             builder.field(Fields.VERSION, version);
         }
         builder.field(Fields.AVAILABLE_PROCESSORS, availableProcessors);
+        builder.field(Fields.ALLOCATED_PROCESSORS, allocatedProcessors);
         builder.endObject();
         return builder;
     }
@@ -99,11 +107,13 @@ public class OsInfo implements Streamable, ToXContent {
     public void readFrom(StreamInput in) throws IOException {
         refreshInterval = in.readLong();
         availableProcessors = in.readInt();
+        allocatedProcessors = in.readInt();
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeLong(refreshInterval);
         out.writeInt(availableProcessors);
+        out.writeInt(allocatedProcessors);
     }
 }

+ 3 - 0
core/src/main/java/org/elasticsearch/monitor/os/OsService.java

@@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.SingleObjectCache;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 
 /**
  *
@@ -45,6 +46,8 @@ public class OsService extends AbstractComponent {
 
         this.info = probe.osInfo();
         this.info.refreshInterval = refreshInterval.millis();
+        this.info.allocatedProcessors = EsExecutors.boundedNumberOfProcessors(settings);
+
         osStatsCache = new OsStatsCache(refreshInterval, probe.osStats());
         logger.debug("Using probe [{}] with refresh_interval [{}]", probe, refreshInterval);
     }

+ 14 - 1
core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@@ -35,6 +36,7 @@ import java.io.IOException;
 
 import static org.elasticsearch.common.settings.Settings.settingsBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 
 @ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0)
@@ -86,7 +88,6 @@ public class ClusterStatsIT extends ESIntegTestCase {
         ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
         assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
 
-
         prepareCreate("test1").setSettings("number_of_shards", 2, "number_of_replicas", 1).get();
         ensureYellow();
         response = client().admin().cluster().prepareClusterStats().get();
@@ -157,4 +158,16 @@ public class ClusterStatsIT extends ESIntegTestCase {
         assertThat(msg, response.nodesStats.getProcess().getMaxOpenFileDescriptors(), Matchers.greaterThanOrEqualTo(-1L));
 
     }
+
+    public void testAllocatedProcessors() throws Exception {
+        // stop all other nodes
+        internalCluster().ensureAtMostNumDataNodes(0);
+
+        // start one node with 7 processors.
+        internalCluster().startNodesAsync(Settings.builder().put(EsExecutors.PROCESSORS, 7).build()).get();
+        waitForNodes(1);
+
+        ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
+        assertThat(response.getNodesStats().getOs().getAllocatedProcessors(), equalTo(7));
+    }
 }

+ 35 - 2
core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java

@@ -22,6 +22,8 @@ package org.elasticsearch.nodesinfo;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
 import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
 import org.elasticsearch.test.ESIntegTestCase.Scope;
@@ -29,8 +31,7 @@ import org.elasticsearch.test.ESIntegTestCase.Scope;
 import java.util.List;
 
 import static org.elasticsearch.client.Requests.nodesInfoRequest;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.*;
 
 /**
  *
@@ -81,4 +82,36 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
         assertThat(response.getNodes().length, is(1));
         assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
     }
+
+    public void testAllocatedProcessors() throws Exception {
+        List<String> nodesIds = internalCluster().
+                startNodesAsync(
+                        Settings.builder().put(EsExecutors.PROCESSORS, 3).build(),
+                        Settings.builder().put(EsExecutors.PROCESSORS, 6).build()
+                ).get();
+
+        final String node_1 = nodesIds.get(0);
+        final String node_2 = nodesIds.get(1);
+
+        ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").get();
+        logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
+
+        String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().localNodeId();
+        String server2NodeId = internalCluster().getInstance(ClusterService.class, node_2).state().nodes().localNodeId();
+        logger.info("--> started nodes: " + server1NodeId + " and " + server2NodeId);
+
+        NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet();
+
+        assertThat(response.getNodes().length, is(2));
+        assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
+        assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
+
+        assertThat(response.getNodesMap().get(server1NodeId).getOs().getAvailableProcessors(),
+                equalTo(Runtime.getRuntime().availableProcessors()));
+        assertThat(response.getNodesMap().get(server2NodeId).getOs().getAvailableProcessors(),
+                equalTo(Runtime.getRuntime().availableProcessors()));
+
+        assertThat(response.getNodesMap().get(server1NodeId).getOs().getAllocatedProcessors(), equalTo(3));
+        assertThat(response.getNodesMap().get(server2NodeId).getOs().getAllocatedProcessors(), equalTo(6));
+    }
 }

+ 5 - 0
docs/reference/cluster/nodes-info.asciidoc

@@ -54,6 +54,11 @@ the operating system:
 `os.available_processors`::
 	Number of processors available to the Java virtual machine
 
+`os.allocated_processors`::
+    The number of processors actually used to calculate thread pool size. This number can be set
+    with the `processors` setting of a node and defaults to the number of processors reported by the OS.
+    In both cases this number will never be larger than 32.
+
 [float]
 [[process-info]]
 ==== Process information