Browse Source

Add I/O statistics on Linux

This commit adds a variety of real disk metrics for the block devices
that back Elasticsearch data paths. A collection of statistics are read
from /proc/diskstats and are used to report the raw metrics for
operations and read/write bytes.

Relates #15915
Jason Tedor 9 years ago
parent
commit
ecce53f0df

+ 25 - 4
core/src/main/java/org/elasticsearch/env/ESFileStore.java

@@ -32,6 +32,7 @@ import java.nio.file.Path;
 import java.nio.file.attribute.FileAttributeView;
 import java.nio.file.attribute.FileStoreAttributeView;
 import java.util.Arrays;
+import java.util.List;
 
 /** 
  * Implementation of FileStore that supports
@@ -44,6 +45,8 @@ class ESFileStore extends FileStore {
     final FileStore in;
     /** Cached result of Lucene's {@code IOUtils.spins} on path. */
     final Boolean spins;
+    int majorDeviceNumber;
+    int minorDeviceNumber;
     
     @SuppressForbidden(reason = "tries to determine if disk is spinning")
     // TODO: move PathUtils to be package-private here instead of 
@@ -58,6 +61,21 @@ class ESFileStore extends FileStore {
             } catch (Exception e) {
                 spins = null;
             }
+            try {
+                final List<String> lines = Files.readAllLines(PathUtils.get("/proc/self/mountinfo"));
+                for (final String line : lines) {
+                    final String[] fields = line.trim().split("\\s+");
+                    final String mountPoint = fields[4];
+                    if (mountPoint.equals(getMountPointLinux(in))) {
+                        final String[] deviceNumbers = fields[2].split(":");
+                        majorDeviceNumber = Integer.parseInt(deviceNumbers[0]);
+                        minorDeviceNumber = Integer.parseInt(deviceNumbers[1]);
+                    }
+                }
+            } catch (Exception e) {
+                majorDeviceNumber = -1;
+                minorDeviceNumber = -1;
+            }
         } else {
             spins = null;
         }
@@ -229,10 +247,13 @@ class ESFileStore extends FileStore {
 
     @Override
     public Object getAttribute(String attribute) throws IOException {
-        if ("lucene:spins".equals(attribute)) {
-            return spins;
-        } else {
-            return in.getAttribute(attribute);
+        switch(attribute) {
+            // for the device
+            case "lucene:spins": return spins;
+            // for the partition
+            case "lucene:major_device_number": return majorDeviceNumber;
+            case "lucene:minor_device_number": return minorDeviceNumber;
+            default: return in.getAttribute(attribute);
         }
     }
 

+ 7 - 1
core/src/main/java/org/elasticsearch/env/NodeEnvironment.java

@@ -64,7 +64,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Semaphore;
@@ -88,14 +87,21 @@ public final class NodeEnvironment extends AbstractComponent implements Closeabl
          *  not running on Linux, or we hit an exception trying), True means the device possibly spins and False means it does not. */
         public final Boolean spins;
 
+        public final int majorDeviceNumber;
+        public final int minorDeviceNumber;
+
         public NodePath(Path path) throws IOException {
             this.path = path;
             this.indicesPath = path.resolve(INDICES_FOLDER);
             this.fileStore = Environment.getFileStore(path);
             if (fileStore.supportsFileAttributeView("lucene")) {
                 this.spins = (Boolean) fileStore.getAttribute("lucene:spins");
+                this.majorDeviceNumber = (int) fileStore.getAttribute("lucene:major_device_number");
+                this.minorDeviceNumber = (int) fileStore.getAttribute("lucene:minor_device_number");
             } else {
                 this.spins = null;
+                this.majorDeviceNumber = -1;
+                this.minorDeviceNumber = -1;
             }
         }
 

+ 1 - 7
core/src/main/java/org/elasticsearch/monitor/MonitorService.java

@@ -31,19 +31,12 @@ import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 
-/**
- *
- */
 public class MonitorService extends AbstractLifecycleComponent<MonitorService> {
 
     private final JvmGcMonitorService jvmGcMonitorService;
-
     private final OsService osService;
-
     private final ProcessService processService;
-
     private final JvmService jvmService;
-
     private final FsService fsService;
 
     public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool) throws IOException {
@@ -85,4 +78,5 @@ public class MonitorService extends AbstractLifecycleComponent<MonitorService> {
     protected void doClose() {
         jvmGcMonitorService.close();
     }
+
 }

+ 243 - 8
core/src/main/java/org/elasticsearch/monitor/fs/FsInfo.java

@@ -188,12 +188,234 @@ public class FsInfo implements Iterable<FsInfo.Path>, Writeable, ToXContent {
         }
     }
 
+    public static class DeviceStats implements Writeable, ToXContent {
+
+        final int majorDeviceNumber;
+        final int minorDeviceNumber;
+        final String deviceName;
+        final long currentReadsCompleted;
+        final long previousReadsCompleted;
+        final long currentSectorsRead;
+        final long previousSectorsRead;
+        final long currentWritesCompleted;
+        final long previousWritesCompleted;
+        final long currentSectorsWritten;
+        final long previousSectorsWritten;
+
+        public DeviceStats(
+                final int majorDeviceNumber,
+                final int minorDeviceNumber,
+                final String deviceName,
+                final long currentReadsCompleted,
+                final long currentSectorsRead,
+                final long currentWritesCompleted,
+                final long currentSectorsWritten,
+                final DeviceStats previousDeviceStats) {
+            this(
+                    majorDeviceNumber,
+                    minorDeviceNumber,
+                    deviceName,
+                    currentReadsCompleted,
+                    previousDeviceStats != null ? previousDeviceStats.currentReadsCompleted : -1,
+                    currentSectorsWritten,
+                    previousDeviceStats != null ? previousDeviceStats.currentSectorsWritten : -1,
+                    currentSectorsRead,
+                    previousDeviceStats != null ? previousDeviceStats.currentSectorsRead : -1,
+                    currentWritesCompleted,
+                    previousDeviceStats != null ? previousDeviceStats.currentWritesCompleted : -1);
+        }
+
+        private DeviceStats(
+                final int majorDeviceNumber,
+                final int minorDeviceNumber,
+                final String deviceName,
+                final long currentReadsCompleted,
+                final long previousReadsCompleted,
+                final long currentSectorsWritten,
+                final long previousSectorsWritten,
+                final long currentSectorsRead,
+                final long previousSectorsRead,
+                final long currentWritesCompleted,
+                final long previousWritesCompleted) {
+            this.majorDeviceNumber = majorDeviceNumber;
+            this.minorDeviceNumber = minorDeviceNumber;
+            this.deviceName = deviceName;
+            this.currentReadsCompleted = currentReadsCompleted;
+            this.previousReadsCompleted = previousReadsCompleted;
+            this.currentWritesCompleted = currentWritesCompleted;
+            this.previousWritesCompleted = previousWritesCompleted;
+            this.currentSectorsRead = currentSectorsRead;
+            this.previousSectorsRead = previousSectorsRead;
+            this.currentSectorsWritten = currentSectorsWritten;
+            this.previousSectorsWritten = previousSectorsWritten;
+        }
+
+        public DeviceStats(StreamInput in) throws IOException {
+            majorDeviceNumber = in.readVInt();
+            minorDeviceNumber = in.readVInt();
+            deviceName = in.readString();
+            currentReadsCompleted = in.readLong();
+            previousReadsCompleted = in.readLong();
+            currentWritesCompleted = in.readLong();
+            previousWritesCompleted = in.readLong();
+            currentSectorsRead = in.readLong();
+            previousSectorsRead = in.readLong();
+            currentSectorsWritten = in.readLong();
+            previousSectorsWritten = in.readLong();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeVInt(majorDeviceNumber);
+            out.writeVInt(minorDeviceNumber);
+            out.writeString(deviceName);
+            out.writeLong(currentReadsCompleted);
+            out.writeLong(previousReadsCompleted);
+            out.writeLong(currentWritesCompleted);
+            out.writeLong(previousWritesCompleted);
+            out.writeLong(currentSectorsRead);
+            out.writeLong(previousSectorsRead);
+            out.writeLong(currentSectorsWritten);
+            out.writeLong(previousSectorsWritten);
+        }
+
+        public long operations() {
+            if (previousReadsCompleted == -1 || previousWritesCompleted == -1) return -1;
+
+            return (currentReadsCompleted - previousReadsCompleted) +
+                (currentWritesCompleted - previousWritesCompleted);
+        }
+
+        public long readOperations() {
+            if (previousReadsCompleted == -1) return -1;
+
+            return (currentReadsCompleted - previousReadsCompleted);
+        }
+
+        public long writeOperations() {
+            if (previousWritesCompleted == -1) return -1;
+
+            return (currentWritesCompleted - previousWritesCompleted);
+        }
+
+        public long readKilobytes() {
+            if (previousSectorsRead == -1) return -1;
+
+            return (currentSectorsRead - previousSectorsRead) / 2;
+        }
+
+        public long writeKilobytes() {
+            if (previousSectorsWritten == -1) return -1;
+
+            return (currentSectorsWritten - previousSectorsWritten) / 2;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.field("device_name", deviceName);
+            builder.field(IoStats.OPERATIONS, operations());
+            builder.field(IoStats.READ_OPERATIONS, readOperations());
+            builder.field(IoStats.WRITE_OPERATIONS, writeOperations());
+            builder.field(IoStats.READ_KILOBYTES, readKilobytes());
+            builder.field(IoStats.WRITE_KILOBYTES, writeKilobytes());
+            return builder;
+        }
+
+    }
+
+    public static class IoStats implements Writeable, ToXContent {
+
+        private static final String OPERATIONS = "operations";
+        private static final String READ_OPERATIONS = "read_operations";
+        private static final String WRITE_OPERATIONS = "write_operations";
+        private static final String READ_KILOBYTES = "read_kilobytes";
+        private static final String WRITE_KILOBYTES = "write_kilobytes";
+
+        final DeviceStats[] devicesStats;
+        final long totalOperations;
+        final long totalReadOperations;
+        final long totalWriteOperations;
+        final long totalReadKilobytes;
+        final long totalWriteKilobytes;
+
+        public IoStats(final DeviceStats[] devicesStats) {
+            this.devicesStats = devicesStats;
+            long totalOperations = 0;
+            long totalReadOperations = 0;
+            long totalWriteOperations = 0;
+            long totalReadKilobytes = 0;
+            long totalWriteKilobytes = 0;
+            for (DeviceStats deviceStats : devicesStats) {
+                totalOperations += deviceStats.operations() != -1 ? deviceStats.operations() : 0;
+                totalReadOperations += deviceStats.readOperations() != -1 ? deviceStats.readOperations() : 0;
+                totalWriteOperations += deviceStats.writeOperations() != -1 ? deviceStats.writeOperations() : 0;
+                totalReadKilobytes += deviceStats.readKilobytes() != -1 ? deviceStats.readKilobytes() : 0;
+                totalWriteKilobytes += deviceStats.writeKilobytes() != -1 ? deviceStats.writeKilobytes() : 0;
+            }
+            this.totalOperations = totalOperations;
+            this.totalReadOperations = totalReadOperations;
+            this.totalWriteOperations = totalWriteOperations;
+            this.totalReadKilobytes = totalReadKilobytes;
+            this.totalWriteKilobytes = totalWriteKilobytes;
+        }
+
+        public IoStats(StreamInput in) throws IOException {
+            final int length = in.readVInt();
+            final DeviceStats[] devicesStats = new DeviceStats[length];
+            for (int i = 0; i < length; i++) {
+                devicesStats[i] = new DeviceStats(in);
+            }
+            this.devicesStats = devicesStats;
+            this.totalOperations = in.readLong();
+            this.totalReadOperations = in.readLong();
+            this.totalWriteOperations = in.readLong();
+            this.totalReadKilobytes = in.readLong();
+            this.totalWriteKilobytes = in.readLong();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeVInt(devicesStats.length);
+            for (int i = 0; i < devicesStats.length; i++) {
+                devicesStats[i].writeTo(out);
+            }
+            out.writeLong(totalOperations);
+            out.writeLong(totalReadOperations);
+            out.writeLong(totalWriteOperations);
+            out.writeLong(totalReadKilobytes);
+            out.writeLong(totalWriteKilobytes);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            if (devicesStats.length > 0) {
+                builder.startArray("devices");
+                for (DeviceStats deviceStats : devicesStats) {
+                    builder.startObject();
+                    deviceStats.toXContent(builder, params);
+                    builder.endObject();
+                }
+                builder.endArray();
+                builder.startObject("total");
+                builder.field(OPERATIONS, totalOperations);
+                builder.field(READ_OPERATIONS, totalReadOperations);
+                builder.field(WRITE_OPERATIONS, totalWriteOperations);
+                builder.field(READ_KILOBYTES, totalReadKilobytes);
+                builder.field(WRITE_KILOBYTES, totalWriteKilobytes);
+            }
+            return builder;
+        }
+
+    }
+
     final long timestamp;
     final Path[] paths;
+    final IoStats ioStats;
     Path total;
 
-    public FsInfo(long timestamp, Path[] paths) {
+    public FsInfo(long timestamp, IoStats ioStats, Path[] paths) {
         this.timestamp = timestamp;
+        this.ioStats = ioStats;
         this.paths = paths;
         this.total = null;
     }
@@ -203,6 +425,7 @@ public class FsInfo implements Iterable<FsInfo.Path>, Writeable, ToXContent {
      */
     public FsInfo(StreamInput in) throws IOException {
         timestamp = in.readVLong();
+        ioStats = in.readOptionalWriteable(IoStats::new);
         paths = new Path[in.readVInt()];
         for (int i = 0; i < paths.length; i++) {
             paths[i] = new Path(in);
@@ -212,6 +435,7 @@ public class FsInfo implements Iterable<FsInfo.Path>, Writeable, ToXContent {
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeVLong(timestamp);
+        out.writeOptionalWriteable(ioStats);
         out.writeVInt(paths.length);
         for (Path path : paths) {
             path.writeTo(out);
@@ -244,18 +468,15 @@ public class FsInfo implements Iterable<FsInfo.Path>, Writeable, ToXContent {
         return timestamp;
     }
 
+    public IoStats getIoStats() {
+        return ioStats;
+    }
+
     @Override
     public Iterator<Path> iterator() {
         return Arrays.stream(paths).iterator();
     }
 
-    static final class Fields {
-        static final String FS = "fs";
-        static final String TIMESTAMP = "timestamp";
-        static final String DATA = "data";
-        static final String TOTAL = "total";
-    }
-
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject(Fields.FS);
@@ -267,7 +488,21 @@ public class FsInfo implements Iterable<FsInfo.Path>, Writeable, ToXContent {
             path.toXContent(builder, params);
         }
         builder.endArray();
+        if (ioStats != null) {
+            builder.startObject(Fields.IO_STATS);
+            ioStats.toXContent(builder, params);
+            builder.endObject();
+        }
         builder.endObject();
         return builder;
     }
+
+    static final class Fields {
+        static final String FS = "fs";
+        static final String TIMESTAMP = "timestamp";
+        static final String DATA = "data";
+        static final String TOTAL = "total";
+        static final String IO_STATS = "io_stats";
+    }
+
 }

+ 79 - 3
core/src/main/java/org/elasticsearch/monitor/fs/FsProbe.java

@@ -19,12 +19,23 @@
 
 package org.elasticsearch.monitor.fs;
 
+import org.apache.lucene.util.Constants;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.io.PathUtils;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.env.NodeEnvironment.NodePath;
 
 import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class FsProbe extends AbstractComponent {
 
@@ -35,16 +46,80 @@ public class FsProbe extends AbstractComponent {
         this.nodeEnv = nodeEnv;
     }
 
-    public FsInfo stats() throws IOException {
+    public FsInfo stats(FsInfo previous) throws IOException {
         if (!nodeEnv.hasNodeFile()) {
-            return new FsInfo(System.currentTimeMillis(), new FsInfo.Path[0]);
+            return new FsInfo(System.currentTimeMillis(), null, new FsInfo.Path[0]);
         }
         NodePath[] dataLocations = nodeEnv.nodePaths();
         FsInfo.Path[] paths = new FsInfo.Path[dataLocations.length];
         for (int i = 0; i < dataLocations.length; i++) {
             paths[i] = getFSInfo(dataLocations[i]);
         }
-        return new FsInfo(System.currentTimeMillis(), paths);
+        FsInfo.IoStats ioStats = null;
+        if (Constants.LINUX) {
+            Set<Tuple<Integer, Integer>> devicesNumbers = new HashSet<>();
+            for (int i = 0; i < dataLocations.length; i++) {
+                if (dataLocations[i].majorDeviceNumber != -1 && dataLocations[i].minorDeviceNumber != -1) {
+                    devicesNumbers.add(Tuple.tuple(dataLocations[i].majorDeviceNumber, dataLocations[i].minorDeviceNumber));
+                }
+            }
+            ioStats = ioStats(devicesNumbers, previous);
+        }
+        return new FsInfo(System.currentTimeMillis(), ioStats, paths);
+    }
+
+    final FsInfo.IoStats ioStats(final Set<Tuple<Integer, Integer>> devicesNumbers, final FsInfo previous) {
+        try {
+            final Map<Tuple<Integer, Integer>, FsInfo.DeviceStats> deviceMap = new HashMap<>();
+            if (previous != null && previous.getIoStats() != null && previous.getIoStats().devicesStats != null) {
+                for (int i = 0; i < previous.getIoStats().devicesStats.length; i++) {
+                    FsInfo.DeviceStats deviceStats = previous.getIoStats().devicesStats[i];
+                    deviceMap.put(Tuple.tuple(deviceStats.majorDeviceNumber, deviceStats.minorDeviceNumber), deviceStats);
+                }
+            }
+
+            List<FsInfo.DeviceStats> devicesStats = new ArrayList<>();
+
+            List<String> lines = readProcDiskStats();
+            if (!lines.isEmpty()) {
+                for (String line : lines) {
+                    String fields[] = line.trim().split("\\s+");
+                    final int majorDeviceNumber = Integer.parseInt(fields[0]);
+                    final int minorDeviceNumber = Integer.parseInt(fields[1]);
+                    if (!devicesNumbers.contains(Tuple.tuple(majorDeviceNumber, minorDeviceNumber))) {
+                        continue;
+                    }
+                    final String deviceName = fields[2];
+                    final long readsCompleted = Long.parseLong(fields[3]);
+                    final long sectorsRead = Long.parseLong(fields[5]);
+                    final long writesCompleted = Long.parseLong(fields[7]);
+                    final long sectorsWritten = Long.parseLong(fields[9]);
+                    final FsInfo.DeviceStats deviceStats =
+                            new FsInfo.DeviceStats(
+                                    majorDeviceNumber,
+                                    minorDeviceNumber,
+                                    deviceName,
+                                    readsCompleted,
+                                    sectorsRead,
+                                    writesCompleted,
+                                    sectorsWritten,
+                                    deviceMap.get(Tuple.tuple(majorDeviceNumber, minorDeviceNumber)));
+                    devicesStats.add(deviceStats);
+                }
+            }
+
+            return new FsInfo.IoStats(devicesStats.toArray(new FsInfo.DeviceStats[devicesStats.size()]));
+        } catch (Exception e) {
+            // do not fail Elasticsearch if something unexpected
+            // happens here
+            logger.debug("unexpected exception processing /proc/diskstats for devices {}", e, devicesNumbers);
+            return null;
+        }
+    }
+
+    @SuppressForbidden(reason = "read /proc/diskstats")
+    List<String> readProcDiskStats() throws IOException {
+        return Files.readAllLines(PathUtils.get("/proc/diskstats"));
     }
 
     public static FsInfo.Path getFSInfo(NodePath nodePath) throws IOException {
@@ -62,4 +137,5 @@ public class FsProbe extends AbstractComponent {
         fsPath.spins = nodePath.spins;
         return fsPath;
     }
+
 }

+ 28 - 17
core/src/main/java/org/elasticsearch/monitor/fs/FsService.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.monitor.fs;
 
 import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.logging.ESLogger;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
@@ -29,44 +30,54 @@ import org.elasticsearch.env.NodeEnvironment;
 
 import java.io.IOException;
 
-/**
- */
 public class FsService extends AbstractComponent {
 
     private final FsProbe probe;
-
-    private final SingleObjectCache<FsInfo> fsStatsCache;
+    private final TimeValue refreshInterval;
+    private final SingleObjectCache<FsInfo> cache;
 
     public final static Setting<TimeValue> REFRESH_INTERVAL_SETTING =
-        Setting.timeSetting("monitor.fs.refresh_interval", TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(1),
+        Setting.timeSetting(
+            "monitor.fs.refresh_interval",
+            TimeValue.timeValueSeconds(1),
+            TimeValue.timeValueSeconds(1),
             Property.NodeScope);
 
-    public FsService(Settings settings, NodeEnvironment nodeEnvironment) throws IOException {
+    public FsService(final Settings settings, final NodeEnvironment nodeEnvironment) {
         super(settings);
         this.probe = new FsProbe(settings, nodeEnvironment);
-        TimeValue refreshInterval = REFRESH_INTERVAL_SETTING.get(settings);
-        fsStatsCache = new FsInfoCache(refreshInterval, probe.stats());
+        refreshInterval = REFRESH_INTERVAL_SETTING.get(settings);
         logger.debug("using refresh_interval [{}]", refreshInterval);
+        cache = new FsInfoCache(refreshInterval, stats(probe, null, logger));
     }
 
     public FsInfo stats() {
-        return fsStatsCache.getOrRefresh();
+        return cache.getOrRefresh();
+    }
+
+    private static FsInfo stats(FsProbe probe, FsInfo initialValue, ESLogger logger) {
+        try {
+            return probe.stats(initialValue);
+        } catch (IOException e) {
+            logger.debug("unexpected exception reading filesystem info", e);
+            return null;
+        }
     }
 
     private class FsInfoCache extends SingleObjectCache<FsInfo> {
-        public FsInfoCache(TimeValue interval, FsInfo initValue) {
-            super(interval, initValue);
+
+        private final FsInfo initialValue;
+
+        public FsInfoCache(TimeValue interval, FsInfo initialValue) {
+            super(interval, initialValue);
+            this.initialValue = initialValue;
         }
 
         @Override
         protected FsInfo refresh() {
-            try {
-                return probe.stats();
-            } catch (IOException ex) {
-                logger.warn("Failed to fetch fs stats - returning empty instance");
-                return new FsInfo(0, null);
-            }
+            return stats(probe, initialValue, logger);
         }
+
     }
 
 }

+ 3 - 0
core/src/main/resources/org/elasticsearch/bootstrap/security.policy

@@ -124,4 +124,7 @@ grant {
 
   // read max virtual memory areas
   permission java.io.FilePermission "/proc/sys/vm/max_map_count", "read";
+
+  // io stats on Linux
+  permission java.io.FilePermission "/proc/diskstats", "read";
 };

+ 6 - 6
core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java

@@ -201,11 +201,11 @@ public class DiskUsageTests extends ESTestCase {
         };
         List<NodeStats> nodeStats = Arrays.asList(
                 new NodeStats(new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0,
-                        null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null,null, null),
+                        null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null),
                 new NodeStats(new DiscoveryNode("node_2", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null,null, null),
+                        null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null),
                 new NodeStats(new DiscoveryNode("node_3", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null,null, null)
+                        null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null)
         );
         InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages);
         DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1");
@@ -242,11 +242,11 @@ public class DiskUsageTests extends ESTestCase {
         };
         List<NodeStats> nodeStats = Arrays.asList(
                 new NodeStats(new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0,
-                        null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null,null, null),
+                        null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null),
                 new NodeStats(new DiscoveryNode("node_2", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null,null, null),
+                        null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null),
                 new NodeStats(new DiscoveryNode("node_3", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null,null, null)
+                        null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null)
         );
         InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvailableUsages, newMostAvailableUsages);
         DiskUsage leastNode_1 = newLeastAvailableUsages.get("node_1");

+ 62 - 0
core/src/test/java/org/elasticsearch/monitor/fs/DeviceStatsTests.java

@@ -0,0 +1,62 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.monitor.fs;
+
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class DeviceStatsTests extends ESTestCase {
+
+    public void testDeviceStats() {
+        final int majorDeviceNumber = randomIntBetween(1, 1 << 8);
+        final int minorDeviceNumber = randomIntBetween(0, 1 << 5);
+        final String deviceName = randomAsciiOfLength(3);
+        final int readsCompleted = randomIntBetween(1, 1 << 16);
+        final int sectorsRead = randomIntBetween(8 * readsCompleted, 16 * readsCompleted);
+        final int writesCompleted = randomIntBetween(1, 1 << 16);
+        final int sectorsWritten = randomIntBetween(8 * writesCompleted, 16 * writesCompleted);
+
+        FsInfo.DeviceStats previous = new FsInfo.DeviceStats(
+            majorDeviceNumber,
+            minorDeviceNumber,
+            deviceName,
+            readsCompleted,
+            sectorsRead,
+            writesCompleted,
+            sectorsWritten,
+            null);
+        FsInfo.DeviceStats current = new FsInfo.DeviceStats(
+            majorDeviceNumber,
+            minorDeviceNumber,
+            deviceName,
+            readsCompleted + 1024,
+            sectorsRead + 16384,
+            writesCompleted + 2048,
+            sectorsWritten + 32768,
+            previous);
+        assertThat(current.operations(), equalTo(1024L + 2048L));
+        assertThat(current.readOperations(), equalTo(1024L));
+        assertThat(current.writeOperations(), equalTo(2048L));
+        assertThat(current.readKilobytes(), equalTo(16384L / 2));
+        assertThat(current.writeKilobytes(), equalTo(32768L / 2));
+    }
+
+}

+ 120 - 1
core/src/test/java/org/elasticsearch/monitor/fs/FsProbeTests.java

@@ -19,25 +19,55 @@
 
 package org.elasticsearch.monitor.fs;
 
+import org.apache.lucene.util.Constants;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.isEmptyOrNullString;
 import static org.hamcrest.Matchers.not;
 
 public class FsProbeTests extends ESTestCase {
+
     public void testFsInfo() throws IOException {
+
         try (NodeEnvironment env = newNodeEnvironment()) {
             FsProbe probe = new FsProbe(Settings.EMPTY, env);
 
-            FsInfo stats = probe.stats();
+            FsInfo stats = probe.stats(null);
             assertNotNull(stats);
             assertThat(stats.getTimestamp(), greaterThan(0L));
 
+            if (Constants.LINUX) {
+                assertNotNull(stats.getIoStats());
+                assertNotNull(stats.getIoStats().devicesStats);
+                for (int i = 0; i < stats.getIoStats().devicesStats.length; i++) {
+                    final FsInfo.DeviceStats deviceStats = stats.getIoStats().devicesStats[i];
+                    assertNotNull(deviceStats);
+                    assertThat(deviceStats.currentReadsCompleted, greaterThanOrEqualTo(0L));
+                    assertThat(deviceStats.previousReadsCompleted, equalTo(-1L));
+                    assertThat(deviceStats.currentSectorsRead, greaterThanOrEqualTo(0L));
+                    assertThat(deviceStats.previousSectorsRead, equalTo(-1L));
+                    assertThat(deviceStats.currentWritesCompleted, greaterThanOrEqualTo(0L));
+                    assertThat(deviceStats.previousWritesCompleted, equalTo(-1L));
+                    assertThat(deviceStats.currentSectorsWritten, greaterThanOrEqualTo(0L));
+                    assertThat(deviceStats.previousSectorsWritten, equalTo(-1L));
+                }
+            } else {
+                assertNull(stats.getIoStats());
+            }
+
             FsInfo.Path total = stats.getTotal();
             assertNotNull(total);
             assertThat(total.total, greaterThan(0L));
@@ -55,4 +85,93 @@ public class FsProbeTests extends ESTestCase {
             }
         }
     }
+
+    public void testIoStats() {
+        final AtomicReference<List<String>> diskStats = new AtomicReference<>();
+        diskStats.set(Arrays.asList(
+                " 259       0 nvme0n1 336609 0 7923613 82813 10264051 0 182983933 52451441 0 2970886 52536260",
+                " 259       1 nvme0n1p1 602 0 9919 131 1 0 1 0 0 19 131",
+                " 259       2 nvme0n1p2 186 0 8626 18 24 0 60 20 0 34 38",
+                " 259       3 nvme0n1p3 335733 0 7901620 82658 9592875 0 182983872 50843431 0 1737726 50926087",
+                " 253       0 dm-0 287716 0 7184666 33457 8398869 0 118857776 18730966 0 1918440 18767169",
+                " 253       1 dm-1 112 0 4624 13 0 0 0 0 0 5 13",
+                " 253       2 dm-2 47802 0 710658 49312 1371977 0 64126096 33730596 0 1058193 33781827"));
+
+        final FsProbe probe = new FsProbe(Settings.EMPTY, null) {
+            @Override
+            List<String> readProcDiskStats() throws IOException {
+                return diskStats.get();
+            }
+        };
+
+        final Set<Tuple<Integer, Integer>> devicesNumbers = new HashSet<>();
+        devicesNumbers.add(Tuple.tuple(253, 0));
+        devicesNumbers.add(Tuple.tuple(253, 2));
+        final FsInfo.IoStats first = probe.ioStats(devicesNumbers, null);
+        assertNotNull(first);
+        assertThat(first.devicesStats[0].majorDeviceNumber, equalTo(253));
+        assertThat(first.devicesStats[0].minorDeviceNumber, equalTo(0));
+        assertThat(first.devicesStats[0].deviceName, equalTo("dm-0"));
+        assertThat(first.devicesStats[0].currentReadsCompleted, equalTo(287716L));
+        assertThat(first.devicesStats[0].previousReadsCompleted, equalTo(-1L));
+        assertThat(first.devicesStats[0].currentSectorsRead, equalTo(7184666L));
+        assertThat(first.devicesStats[0].previousSectorsRead, equalTo(-1L));
+        assertThat(first.devicesStats[0].currentWritesCompleted, equalTo(8398869L));
+        assertThat(first.devicesStats[0].previousWritesCompleted, equalTo(-1L));
+        assertThat(first.devicesStats[0].currentSectorsWritten, equalTo(118857776L));
+        assertThat(first.devicesStats[0].previousSectorsWritten, equalTo(-1L));
+        assertThat(first.devicesStats[1].majorDeviceNumber, equalTo(253));
+        assertThat(first.devicesStats[1].minorDeviceNumber, equalTo(2));
+        assertThat(first.devicesStats[1].deviceName, equalTo("dm-2"));
+        assertThat(first.devicesStats[1].currentReadsCompleted, equalTo(47802L));
+        assertThat(first.devicesStats[1].previousReadsCompleted, equalTo(-1L));
+        assertThat(first.devicesStats[1].currentSectorsRead, equalTo(710658L));
+        assertThat(first.devicesStats[1].previousSectorsRead, equalTo(-1L));
+        assertThat(first.devicesStats[1].currentWritesCompleted, equalTo(1371977L));
+        assertThat(first.devicesStats[1].previousWritesCompleted, equalTo(-1L));
+        assertThat(first.devicesStats[1].currentSectorsWritten, equalTo(64126096L));
+        assertThat(first.devicesStats[1].previousSectorsWritten, equalTo(-1L));
+
+        diskStats.set(Arrays.asList(
+                " 259       0 nvme0n1 336870 0 7928397 82876 10264393 0 182986405 52451610 0 2971042 52536492",
+                " 259       1 nvme0n1p1 602 0 9919 131 1 0 1 0 0 19 131",
+                " 259       2 nvme0n1p2 186 0 8626 18 24 0 60 20 0 34 38",
+                " 259       3 nvme0n1p3 335994 0 7906404 82721 9593184 0 182986344 50843529 0 1737840 50926248",
+                " 253       0 dm-0 287734 0 7185242 33464 8398869 0 118857776 18730966 0 1918444 18767176",
+                " 253       1 dm-1 112 0 4624 13 0 0 0 0 0 5 13",
+                " 253       2 dm-2 48045 0 714866 49369 1372291 0 64128568 33730766 0 1058347 33782056"));
+
+        final FsInfo previous = new FsInfo(System.currentTimeMillis(), first, null);
+        final FsInfo.IoStats second = probe.ioStats(devicesNumbers, previous);
+        assertNotNull(second);
+        assertThat(second.devicesStats[0].majorDeviceNumber, equalTo(253));
+        assertThat(second.devicesStats[0].minorDeviceNumber, equalTo(0));
+        assertThat(second.devicesStats[0].deviceName, equalTo("dm-0"));
+        assertThat(second.devicesStats[0].currentReadsCompleted, equalTo(287734L));
+        assertThat(second.devicesStats[0].previousReadsCompleted, equalTo(287716L));
+        assertThat(second.devicesStats[0].currentSectorsRead, equalTo(7185242L));
+        assertThat(second.devicesStats[0].previousSectorsRead, equalTo(7184666L));
+        assertThat(second.devicesStats[0].currentWritesCompleted, equalTo(8398869L));
+        assertThat(second.devicesStats[0].previousWritesCompleted, equalTo(8398869L));
+        assertThat(second.devicesStats[0].currentSectorsWritten, equalTo(118857776L));
+        assertThat(second.devicesStats[0].previousSectorsWritten, equalTo(118857776L));
+        assertThat(second.devicesStats[1].majorDeviceNumber, equalTo(253));
+        assertThat(second.devicesStats[1].minorDeviceNumber, equalTo(2));
+        assertThat(second.devicesStats[1].deviceName, equalTo("dm-2"));
+        assertThat(second.devicesStats[1].currentReadsCompleted, equalTo(48045L));
+        assertThat(second.devicesStats[1].previousReadsCompleted, equalTo(47802L));
+        assertThat(second.devicesStats[1].currentSectorsRead, equalTo(714866L));
+        assertThat(second.devicesStats[1].previousSectorsRead, equalTo(710658L));
+        assertThat(second.devicesStats[1].currentWritesCompleted, equalTo(1372291L));
+        assertThat(second.devicesStats[1].previousWritesCompleted, equalTo(1371977L));
+        assertThat(second.devicesStats[1].currentSectorsWritten, equalTo(64128568L));
+        assertThat(second.devicesStats[1].previousSectorsWritten, equalTo(64126096L));
+
+        assertThat(second.totalOperations, equalTo(575L));
+        assertThat(second.totalReadOperations, equalTo(261L));
+        assertThat(second.totalWriteOperations, equalTo(314L));
+        assertThat(second.totalReadKilobytes, equalTo(2392L));
+        assertThat(second.totalWriteKilobytes, equalTo(1236L));
+    }
+
 }

+ 49 - 0
docs/reference/cluster/nodes-stats.asciidoc

@@ -121,6 +121,55 @@ information that concern the file system:
 	`null` means we could not determine it, `true` means the device possibly spins
 	 and `false` means it does not (ex: solid-state disks).
 
+`fs.io_stats.devices` (Linux only)::
+    Array of disk metrics for each device that is backing an
+    Elasticsearch data path. These disk metrics are probed periodically
+    and averages between the last probe and the current probe are
+    computed.
+
+`fs.io_stats.devices.device_name` (Linux only)::
+    The Linux device name.
+
+`fs.io_stats.devices.operations` (Linux only)::
+    The total number of read and write operations for the device
+    completed since starting Elasticsearch.
+
+`fs.io_stats.devices.read_operations` (Linux only)::
+    The total number of read operations for the device completed since
+    starting Elasticsearch.
+
+`fs.io_stats.devices.write_operations` (Linux only)::
+    The total number of write operations for the device completed since
+    starting Elasticsearch.
+
+`fs.io_stats.devices.read_kilobytes` (Linux only)::
+    The total number of kilobytes read for the device since starting
+    Elasticsearch.
+
+`fs.io_stats.devices.write_kilobytes` (Linux only)::
+    The total number of kilobytes written for the device since
+    starting Elasticsearch.
+
+`fs.io_stats.operations` (Linux only)::
+    The total number of read and write operations across all devices
+    used by Elasticsearch completed since starting Elasticsearch.
+
+`fs.io_stats.read_operations` (Linux only)::
+    The total number of read operations for across all devices used by
+    Elasticsearch completed since starting Elasticsearch.
+
+`fs.io_stats.write_operations` (Linux only)::
+    The total number of write operations across all devices used by
+    Elasticsearch completed since starting Elasticsearch.
+
+`fs.io_stats.read_kilobytes` (Linux only)::
+    The total number of kilobytes read across all devices used by
+    Elasticsearch since starting Elasticsearch.
+
+`fs.io_stats.write_kilobytes` (Linux only)::
+    The total number of kilobytes written across all devices used by
+    Elasticsearch since starting Elasticsearch.
+
 [float]
 [[os-stats]]
 ==== Operating System statistics

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java

@@ -74,7 +74,7 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
         FsInfo.Path path = new FsInfo.Path("/dev/null", null,
             usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeBytes());
         paths[0] = path;
-        FsInfo fsInfo = new FsInfo(System.currentTimeMillis(), paths);
+        FsInfo fsInfo = new FsInfo(System.currentTimeMillis(), null, paths);
         return new NodeStats(new DiscoveryNode(nodeName, DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT),
             System.currentTimeMillis(),
             null, null, null, null, null,