Browse Source

Initial implementation of ResourceWatcherService

Closes #4062
Igor Motov 12 years ago
parent
commit
c724f0de5d

+ 12 - 0
docs/reference/modules/scripting.asciidoc

@@ -68,6 +68,18 @@ This will still allow execution of named scripts provided in the config, or
 _native_ Java scripts registered through plugins, however it will prevent
 _native_ Java scripts registered through plugins, however it will prevent
 users from running arbitrary scripts via the API.
 users from running arbitrary scripts via the API.
 
 
+[float]
+=== Automatic Script Reloading
+
+added[0.90.6]
+
+The `config/scripts` directory is scanned periodically for changes.
+New and changed scripts are reloaded and deleted script are removed
+from preloaded scripts cache. The reload frequency can be specified
+using `watcher.interval` setting, which defaults to `60s`.
+To disable script reloading completely set `script.auto_reload_enabled`
+to `false`.
+
 [float]
 [float]
 === Native (Java) Scripts
 === Native (Java) Scripts
 
 

+ 5 - 0
src/main/java/org/elasticsearch/node/internal/InternalNode.java

@@ -90,6 +90,8 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPoolModule;
 import org.elasticsearch.threadpool.ThreadPoolModule;
 import org.elasticsearch.transport.TransportModule;
 import org.elasticsearch.transport.TransportModule;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.watcher.ResourceWatcherModule;
+import org.elasticsearch.watcher.ResourceWatcherService;
 
 
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
@@ -170,6 +172,7 @@ public final class InternalNode implements Node {
         modules.add(new BulkUdpModule());
         modules.add(new BulkUdpModule());
         modules.add(new ShapeModule());
         modules.add(new ShapeModule());
         modules.add(new PercolatorModule());
         modules.add(new PercolatorModule());
+        modules.add(new ResourceWatcherModule());
 
 
         injector = modules.createInjector();
         injector = modules.createInjector();
 
 
@@ -223,6 +226,7 @@ public final class InternalNode implements Node {
             injector.getInstance(HttpServer.class).start();
             injector.getInstance(HttpServer.class).start();
         }
         }
         injector.getInstance(BulkUdpService.class).start();
         injector.getInstance(BulkUdpService.class).start();
+        injector.getInstance(ResourceWatcherService.class).start();
 
 
         logger.info("started");
         logger.info("started");
 
 
@@ -238,6 +242,7 @@ public final class InternalNode implements Node {
         logger.info("stopping ...");
         logger.info("stopping ...");
 
 
         injector.getInstance(BulkUdpService.class).stop();
         injector.getInstance(BulkUdpService.class).stop();
+        injector.getInstance(ResourceWatcherService.class).stop();
         if (settings.getAsBoolean("http.enabled", true)) {
         if (settings.getAsBoolean("http.enabled", true)) {
             injector.getInstance(HttpServer.class).stop();
             injector.getInstance(HttpServer.class).stop();
         }
         }

+ 80 - 49
src/main/java/org/elasticsearch/script/ScriptService.java

@@ -23,9 +23,9 @@ import com.google.common.base.Charsets;
 import com.google.common.cache.Cache;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import org.elasticsearch.ElasticSearchIllegalArgumentException;
 import org.elasticsearch.ElasticSearchIllegalArgumentException;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.io.Streams;
@@ -35,8 +35,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.index.fielddata.IndexFieldDataService;
 import org.elasticsearch.index.fielddata.IndexFieldDataService;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.MapperService;
-import org.elasticsearch.script.mvel.MvelScriptEngineService;
 import org.elasticsearch.search.lookup.SearchLookup;
 import org.elasticsearch.search.lookup.SearchLookup;
+import org.elasticsearch.watcher.FileChangesListener;
+import org.elasticsearch.watcher.FileWatcher;
+import org.elasticsearch.watcher.ResourceWatcherService;
 
 
 import java.io.File;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileInputStream;
@@ -59,18 +61,13 @@ public class ScriptService extends AbstractComponent {
     private final ConcurrentMap<String, CompiledScript> staticCache = ConcurrentCollections.newConcurrentMap();
     private final ConcurrentMap<String, CompiledScript> staticCache = ConcurrentCollections.newConcurrentMap();
 
 
     private final Cache<CacheKey, CompiledScript> cache;
     private final Cache<CacheKey, CompiledScript> cache;
+    private final File scriptsDirectory;
 
 
     private final boolean disableDynamic;
     private final boolean disableDynamic;
 
 
-    public ScriptService(Settings settings) {
-        this(settings, new Environment(), ImmutableSet.<ScriptEngineService>builder()
-                .add(new MvelScriptEngineService(settings))
-                .build()
-        );
-    }
-
     @Inject
     @Inject
-    public ScriptService(Settings settings, Environment env, Set<ScriptEngineService> scriptEngines) {
+    public ScriptService(Settings settings, Environment env, Set<ScriptEngineService> scriptEngines,
+                         ResourceWatcherService resourceWatcherService) {
         super(settings);
         super(settings);
 
 
         int cacheMaxSize = componentSettings.getAsInt("cache.max_size", 500);
         int cacheMaxSize = componentSettings.getAsInt("cache.max_size", 500);
@@ -100,45 +97,17 @@ public class ScriptService extends AbstractComponent {
         // put some default optimized scripts
         // put some default optimized scripts
         staticCache.put("doc.score", new CompiledScript("native", new DocScoreNativeScriptFactory()));
         staticCache.put("doc.score", new CompiledScript("native", new DocScoreNativeScriptFactory()));
 
 
-        // compile static scripts
-        File scriptsFile = new File(env.configFile(), "scripts");
-        if (scriptsFile.exists()) {
-            processScriptsDirectory("", scriptsFile);
-        }
-    }
-
-    private void processScriptsDirectory(String prefix, File dir) {
-        for (File file : dir.listFiles()) {
-            if (file.isDirectory()) {
-                processScriptsDirectory(prefix + file.getName() + "_", file);
-            } else {
-                int extIndex = file.getName().lastIndexOf('.');
-                if (extIndex != -1) {
-                    String ext = file.getName().substring(extIndex + 1);
-                    String scriptName = prefix + file.getName().substring(0, extIndex);
-                    boolean found = false;
-                    for (ScriptEngineService engineService : scriptEngines.values()) {
-                        for (String s : engineService.extensions()) {
-                            if (s.equals(ext)) {
-                                found = true;
-                                try {
-                                    String script = Streams.copyToString(new InputStreamReader(new FileInputStream(file), Charsets.UTF_8));
-                                    staticCache.put(scriptName, new CompiledScript(engineService.types()[0], engineService.compile(script)));
-                                } catch (Exception e) {
-                                    logger.warn("failed to load/compile script [{}]", e, scriptName);
-                                }
-                                break;
-                            }
-                        }
-                        if (found) {
-                            break;
-                        }
-                    }
-                    if (!found) {
-                        logger.warn("no script engine found for [{}]", ext);
-                    }
-                }
-            }
+        // add file watcher for static scripts
+        scriptsDirectory = new File(env.configFile(), "scripts");
+        FileWatcher fileWatcher = new FileWatcher(scriptsDirectory);
+        fileWatcher.addListener(new ScriptChangesListener());
+
+        if (componentSettings.getAsBoolean("auto_reload_enabled", true)) {
+            // automatic reload is enabled - register scripts
+            resourceWatcherService.add(fileWatcher);
+        } else {
+            // automatic reload is disable just load scripts once
+            fileWatcher.init();
         }
         }
     }
     }
 
 
@@ -214,6 +183,68 @@ public class ScriptService extends AbstractComponent {
         return !"native".equals(lang);
         return !"native".equals(lang);
     }
     }
 
 
+    private class ScriptChangesListener extends FileChangesListener {
+
+        private Tuple<String, String> scriptNameExt(File file) {
+            String scriptPath = scriptsDirectory.toURI().relativize(file.toURI()).getPath();
+            int extIndex = scriptPath.lastIndexOf('.');
+            if (extIndex != -1) {
+                String ext = scriptPath.substring(extIndex + 1);
+                String scriptName = scriptPath.substring(0, extIndex).replace(File.separatorChar, '_');
+                return new Tuple<String, String>(scriptName, ext);
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public void onFileInit(File file) {
+            Tuple<String, String> scriptNameExt = scriptNameExt(file);
+            if (scriptNameExt != null) {
+                boolean found = false;
+                for (ScriptEngineService engineService : scriptEngines.values()) {
+                    for (String s : engineService.extensions()) {
+                        if (s.equals(scriptNameExt.v2())) {
+                            found = true;
+                            try {
+                                logger.trace("compiling script file " + file.getAbsolutePath());
+                                String script = Streams.copyToString(new InputStreamReader(new FileInputStream(file), Charsets.UTF_8));
+                                staticCache.put(scriptNameExt.v1(), new CompiledScript(engineService.types()[0], engineService.compile(script)));
+                            } catch (Throwable e) {
+                                logger.warn("failed to load/compile script [{}]", e, scriptNameExt.v1());
+                            }
+                            break;
+                        }
+                    }
+                    if (found) {
+                        break;
+                    }
+                }
+                if (!found) {
+                    logger.warn("no script engine found for [{}]", scriptNameExt.v2());
+                }
+            }
+        }
+
+        @Override
+        public void onFileCreated(File file) {
+            onFileInit(file);
+        }
+
+        @Override
+        public void onFileDeleted(File file) {
+            Tuple<String, String> scriptNameExt = scriptNameExt(file);
+            logger.trace("removing script file " + file.getAbsolutePath());
+            staticCache.remove(scriptNameExt.v1());
+        }
+
+        @Override
+        public void onFileChanged(File file) {
+            onFileInit(file);
+        }
+
+    }
+
     public static class CacheKey {
     public static class CacheKey {
         public final String lang;
         public final String lang;
         public final String script;
         public final String script;

+ 79 - 0
src/main/java/org/elasticsearch/watcher/AbstractResourceWatcher.java

@@ -0,0 +1,79 @@
+/*
+ * Licensed to ElasticSearch and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. ElasticSearch licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.watcher;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Abstract resource watcher framework, which handles adding and removing listeners
+ * and calling resource observer.
+ */
+public abstract class AbstractResourceWatcher<Listener> implements ResourceWatcher {
+    private final List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
+    private boolean initialized = false;
+
+    @Override
+    public void init() {
+        if (!initialized) {
+            doInit();
+            initialized = true;
+        }
+    }
+
+    @Override
+    public void checkAndNotify() {
+        init();
+        doCheckAndNotify();
+    }
+
+    /**
+     * Registers new listener
+     */
+    public void addListener(Listener listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Unregisters a listener
+     */
+    public void remove(Listener listener) {
+        listeners.remove(listener);
+    }
+
+    /**
+     * Returns a list of listeners
+     */
+    protected List<Listener> listeners() {
+        return listeners;
+    }
+
+    /**
+     * Will be called once on initialization
+     */
+    protected abstract void doInit();
+
+    /**
+     * Will be called periodically
+     * <p/>
+     * Implementing watcher should check resource and notify all {@link #listeners()}.
+     */
+    protected abstract void doCheckAndNotify();
+
+}

+ 75 - 0
src/main/java/org/elasticsearch/watcher/FileChangesListener.java

@@ -0,0 +1,75 @@
+/*
+ * Licensed to ElasticSearch and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. ElasticSearch licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.watcher;
+
+import java.io.File;
+
+/**
+ * Callback interface that file changes File Watcher is using to notify listeners about changes.
+ */
+public class FileChangesListener {
+    /**
+     * Called for every file found in the watched directory during initialization
+     */
+    public void onFileInit(File file) {
+
+    }
+
+    /**
+     * Called for every subdirectory found in the watched directory during initialization
+     */
+    public void onDirectoryInit(File file) {
+
+    }
+
+    /**
+     * Called for every new file found in the watched directory
+     */
+    public void onFileCreated(File file) {
+
+    }
+
+    /**
+     * Called for every file that disappeared in the watched directory
+     */
+    public void onFileDeleted(File file) {
+
+    }
+
+    /**
+     * Called for every file that was changed in the watched directory
+     */
+    public void onFileChanged(File file) {
+
+    }
+
+    /**
+     * Called for every new subdirectory found in the watched directory
+     */
+    public void onDirectoryCreated(File file) {
+
+    }
+
+    /**
+     * Called for every file that disappeared in the watched directory
+     */
+    public void onDirectoryDeleted(File file) {
+
+    }
+}

+ 274 - 0
src/main/java/org/elasticsearch/watcher/FileWatcher.java

@@ -0,0 +1,274 @@
+/*
+ * Licensed to ElasticSearch and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. ElasticSearch licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.watcher;
+
+import java.io.File;
+import java.util.Arrays;
+
+/**
+ * File resources watcher
+ *
+ * The file watcher checks directory and all its subdirectories for file changes and notifies its listeners accordingly
+ */
+public class FileWatcher extends AbstractResourceWatcher<FileChangesListener> {
+
+    private FileObserver rootFileObserver;
+
+    /**
+     * Creates new file watcher on the given directory
+     */
+    public FileWatcher(File file) {
+        rootFileObserver = new FileObserver(file);
+    }
+
+    @Override
+    protected void doInit() {
+        rootFileObserver.init(true);
+    }
+
+    @Override
+    protected void doCheckAndNotify() {
+        rootFileObserver.checkAndNotify();
+    }
+
+    private static FileObserver[] EMPTY_DIRECTORY = new FileObserver[0];
+
+    private class FileObserver {
+        private File file;
+        private boolean exists;
+        private long length;
+        private long lastModified;
+        private boolean isDirectory;
+        private FileObserver[] children;
+
+        public FileObserver(File file) {
+            this.file = file;
+        }
+
+        public void checkAndNotify() {
+            boolean prevExists = exists;
+            boolean prevIsDirectory = isDirectory;
+            long prevLength = length;
+            long prevLastModified = lastModified;
+
+            exists = file.exists();
+
+            if (exists) {
+                isDirectory = file.isDirectory();
+                if (isDirectory) {
+                    length = 0;
+                    lastModified = 0;
+                } else {
+                    length = file.length();
+                    lastModified = file.lastModified();
+                }
+            } else {
+                isDirectory = false;
+                length = 0;
+                lastModified = 0;
+            }
+
+            // Perform notifications and update children for the current file
+            if (prevExists) {
+                if (exists) {
+                    if (isDirectory) {
+                        if (prevIsDirectory) {
+                            // Remained a directory
+                            updateChildren();
+                        } else {
+                            // File replaced by directory
+                            onFileDeleted();
+                            onDirectoryCreated(false);
+                        }
+                    } else {
+                        if (prevIsDirectory) {
+                            // Directory replaced by file
+                            onDirectoryDeleted();
+                            onFileCreated(false);
+                        } else {
+                            // Remained file
+                            if (prevLastModified != lastModified || prevLength != length) {
+                                onFileChanged();
+                            }
+                        }
+                    }
+                } else {
+                    // Deleted
+                    if (prevIsDirectory) {
+                        onDirectoryDeleted();
+                    } else {
+                        onFileDeleted();
+                    }
+                }
+            } else {
+                // Created
+                if (exists) {
+                    if (isDirectory) {
+                        onDirectoryCreated(false);
+                    } else {
+                        onFileCreated(false);
+                    }
+                }
+            }
+
+        }
+
+        private void init(boolean initial) {
+            exists = file.exists();
+            if (exists) {
+                isDirectory = file.isDirectory();
+                if (isDirectory) {
+                    onDirectoryCreated(initial);
+                } else {
+                    length = file.length();
+                    lastModified = file.lastModified();
+                    onFileCreated(initial);
+                }
+            }
+        }
+
+        private FileObserver createChild(File file, boolean initial) {
+            FileObserver child = new FileObserver(file);
+            child.init(initial);
+            return child;
+        }
+
+        private File[] listFiles() {
+            File[] files = file.listFiles();
+            if (files != null) {
+                Arrays.sort(files);
+            }
+            return files;
+        }
+
+        private FileObserver[] listChildren(boolean initial) {
+            File[] files = listFiles();
+            if (files != null && files.length > 0) {
+                FileObserver[] children = new FileObserver[files.length];
+                for (int i = 0; i < files.length; i++) {
+                    children[i] = createChild(files[i], initial);
+                }
+                return children;
+            } else {
+                return EMPTY_DIRECTORY;
+            }
+        }
+
+        private void updateChildren() {
+            File[] files = listFiles();
+            if (files != null && files.length > 0) {
+                FileObserver[] newChildren = new FileObserver[files.length];
+                int child = 0;
+                int file = 0;
+                while (file < files.length || child < children.length ) {
+                    int compare;
+
+                    if (file >= files.length) {
+                        compare = -1;
+                    } else if (child >= children.length) {
+                        compare = 1;
+                    } else {
+                        compare = children[child].file.compareTo(files[file]);
+                    }
+
+                    if (compare  == 0) {
+                        // Same file copy it and update
+                        children[child].checkAndNotify();
+                        newChildren[file] = children[child];
+                        file++;
+                        child++;
+                    } else {
+                        if (compare > 0) {
+                            // This child doesn't appear in the old list - init it
+                            newChildren[file] = createChild(files[file], false);
+                            file++;
+                        } else {
+                            // The child from the old list is missing in the new list
+                            // Delete it
+                            deleteChild(child);
+                            child++;
+                        }
+                    }
+                }
+                children = newChildren;
+            } else {
+                // No files - delete all children
+                for (int child = 0; child < children.length; child++) {
+                    deleteChild(child);
+                }
+                children = EMPTY_DIRECTORY;
+            }
+        }
+
+        private void deleteChild(int child) {
+            if (children[child].exists) {
+                if (children[child].isDirectory) {
+                    children[child].onDirectoryDeleted();
+                } else {
+                    children[child].onFileDeleted();
+                }
+            }
+        }
+
+        private void onFileCreated(boolean initial) {
+            for (FileChangesListener listener : listeners()) {
+                if (initial) {
+                    listener.onFileInit(file);
+                } else {
+                    listener.onFileCreated(file);
+                }
+            }
+        }
+
+        private void onFileDeleted() {
+            for (FileChangesListener listener : listeners()) {
+                listener.onFileDeleted(file);
+            }
+        }
+
+        private void onFileChanged() {
+            for (FileChangesListener listener : listeners()) {
+                listener.onFileChanged(file);
+            }
+        }
+
+        private void onDirectoryCreated(boolean initial) {
+            for (FileChangesListener listener : listeners()) {
+                if (initial) {
+                    listener.onDirectoryInit(file);
+                } else {
+                    listener.onDirectoryCreated(file);
+                }
+            }
+            children = listChildren(initial);
+        }
+
+        private void onDirectoryDeleted() {
+            // First delete all children
+            for (int child = 0; child < children.length; child++) {
+                deleteChild(child);
+            }
+            for (FileChangesListener listener : listeners()) {
+                listener.onDirectoryDeleted(file);
+            }
+        }
+
+    }
+
+}

+ 37 - 0
src/main/java/org/elasticsearch/watcher/ResourceWatcher.java

@@ -0,0 +1,37 @@
+/*
+ * Licensed to ElasticSearch and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. ElasticSearch licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.watcher;
+
+/**
+ * Abstract resource watcher interface.
+ * <p/>
+ * Different resource watchers can be registered with {@link ResourceWatcherService} to be called
+ * periodically in order to check for changes in different external resources.
+ */
+public interface ResourceWatcher {
+    /**
+     * Called once when the resource watcher is added to {@link ResourceWatcherService}
+     */
+    void init();
+
+    /**
+     * Called periodically by {@link ResourceWatcherService} so resource watcher can check the resource
+     */
+    void checkAndNotify();
+}

+ 31 - 0
src/main/java/org/elasticsearch/watcher/ResourceWatcherModule.java

@@ -0,0 +1,31 @@
+/*
+ * Licensed to ElasticSearch and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. ElasticSearch licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.watcher;
+
+import org.elasticsearch.common.inject.AbstractModule;
+
+/**
+ *
+ */
+public class ResourceWatcherModule extends AbstractModule {
+    @Override
+    protected void configure() {
+        bind(ResourceWatcherService.class).asEagerSingleton();
+    }
+}

+ 106 - 0
src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java

@@ -0,0 +1,106 @@
+/*
+ * Licensed to ElasticSearch and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. ElasticSearch licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.watcher;
+
+import org.elasticsearch.ElasticSearchException;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
+
+/**
+ * Generic resource watcher service
+ *
+ * Other elasticsearch services can register their resource watchers with this service using {@link #add(ResourceWatcher)}
+ * method. This service will call {@link org.elasticsearch.watcher.ResourceWatcher#checkAndNotify()} method of all
+ * registered watcher periodically. The frequency of checks can be specified using {@code watcher.interval} setting, which
+ * defaults to {@code 60s}. The service can be disabled by setting {@code watcher.enabled} setting to {@code false}.
+ */
+public class ResourceWatcherService extends AbstractLifecycleComponent<ResourceWatcherService> {
+
+    private final List<ResourceWatcher> watchers = new CopyOnWriteArrayList<ResourceWatcher>();
+
+    private volatile ScheduledFuture scheduledFuture;
+
+    private final boolean enabled;
+
+    private final TimeValue interval;
+
+    private final ThreadPool threadPool;
+
+    @Inject
+    public ResourceWatcherService(Settings settings, ThreadPool threadPool) {
+        super(settings);
+        this.enabled = componentSettings.getAsBoolean("enabled", true);
+        this.interval = componentSettings.getAsTime("interval", timeValueSeconds(60));
+        this.threadPool = threadPool;
+    }
+
+    @Override
+    protected void doStart() throws ElasticSearchException {
+        if (!enabled) {
+            return;
+        }
+        scheduledFuture = threadPool.scheduleWithFixedDelay(new ResourceMonitor(), interval);
+    }
+
+    @Override
+    protected void doStop() throws ElasticSearchException {
+        if (!enabled) {
+            return;
+        }
+        scheduledFuture.cancel(true);
+    }
+
+    @Override
+    protected void doClose() throws ElasticSearchException {
+    }
+
+    /**
+     * Register new resource watcher
+     */
+    public void add(ResourceWatcher watcher) {
+        watcher.init();
+        watchers.add(watcher);
+    }
+
+    /**
+     * Unregister a resource watcher
+     */
+    public void remove(ResourceWatcher watcher) {
+        watchers.remove(watcher);
+    }
+
+    private class ResourceMonitor implements Runnable {
+
+        @Override
+        public void run() {
+            for(ResourceWatcher watcher : watchers) {
+                watcher.checkAndNotify();
+            }
+        }
+    }
+}

+ 386 - 0
src/test/java/org/elasticsearch/watcher/FileWatcherTest.java

@@ -0,0 +1,386 @@
+/*
+ * Licensed to ElasticSearch and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. ElasticSearch licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.watcher;
+
+import com.carrotsearch.randomizedtesting.LifecycleScope;
+import org.elasticsearch.test.ElasticsearchTestCase;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.io.Files.*;
+import static org.elasticsearch.common.io.FileSystemUtils.deleteRecursively;
+import static org.hamcrest.Matchers.*;
+
+/**
+ *
+ */
+public class FileWatcherTest extends ElasticsearchTestCase {
+
+    private class RecordingChangeListener extends FileChangesListener {
+
+        private File rootDir;
+
+        private RecordingChangeListener(File rootDir) {
+            this.rootDir = rootDir;
+        }
+
+        private String getRelativeFileName(File file) {
+            return rootDir.toURI().relativize(file.toURI()).getPath();
+        }
+
+        private List<String> notifications = newArrayList();
+
+        @Override
+        public void onFileInit(File file) {
+            notifications.add("onFileInit: " + getRelativeFileName(file));
+        }
+
+        @Override
+        public void onDirectoryInit(File file) {
+            notifications.add("onDirectoryInit: " + getRelativeFileName(file));
+        }
+
+        @Override
+        public void onFileCreated(File file) {
+            notifications.add("onFileCreated: " + getRelativeFileName(file));
+        }
+
+        @Override
+        public void onFileDeleted(File file) {
+            notifications.add("onFileDeleted: " + getRelativeFileName(file));
+        }
+
+        @Override
+        public void onFileChanged(File file) {
+            notifications.add("onFileChanged: " + getRelativeFileName(file));
+        }
+
+        @Override
+        public void onDirectoryCreated(File file) {
+            notifications.add("onDirectoryCreated: " + getRelativeFileName(file));
+        }
+
+        @Override
+        public void onDirectoryDeleted(File file) {
+            notifications.add("onDirectoryDeleted: " + getRelativeFileName(file));
+        }
+
+        public List<String> notifications() {
+            return notifications;
+        }
+    }
+
+    @Test
+    public void testSimpleFileOperations() throws IOException {
+        File tempDir = newTempDir(LifecycleScope.TEST);
+        RecordingChangeListener changes = new RecordingChangeListener(tempDir);
+        File testFile = new File(tempDir, "test.txt");
+        touch(testFile);
+        FileWatcher fileWatcher = new FileWatcher(testFile);
+        fileWatcher.addListener(changes);
+        fileWatcher.init();
+        assertThat(changes.notifications(), contains(equalTo("onFileInit: test.txt")));
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), hasSize(0));
+
+        append("Test", testFile, Charset.defaultCharset());
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(equalTo("onFileChanged: test.txt")));
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), hasSize(0));
+
+        testFile.delete();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(equalTo("onFileDeleted: test.txt")));
+
+    }
+
+    @Test
+    public void testSimpleDirectoryOperations() throws IOException {
+        File tempDir = newTempDir(LifecycleScope.TEST);
+        RecordingChangeListener changes = new RecordingChangeListener(tempDir);
+        File testDir = new File(tempDir, "test-dir");
+        testDir.mkdir();
+        touch(new File(testDir, "test.txt"));
+        touch(new File(testDir, "test0.txt"));
+
+        FileWatcher fileWatcher = new FileWatcher(testDir);
+        fileWatcher.addListener(changes);
+        fileWatcher.init();
+        assertThat(changes.notifications(), contains(
+                equalTo("onDirectoryInit: test-dir/"),
+                equalTo("onFileInit: test-dir/test.txt"),
+                equalTo("onFileInit: test-dir/test0.txt")
+        ));
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), hasSize(0));
+
+        for (int i = 0; i < 4; i++) {
+            touch(new File(testDir, "test" + i + ".txt"));
+        }
+        // Make sure that first file is modified
+        append("Test", new File(testDir, "test0.txt"), Charset.defaultCharset());
+
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(
+                equalTo("onFileChanged: test-dir/test0.txt"),
+                equalTo("onFileCreated: test-dir/test1.txt"),
+                equalTo("onFileCreated: test-dir/test2.txt"),
+                equalTo("onFileCreated: test-dir/test3.txt")
+        ));
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), hasSize(0));
+
+        new File(testDir, "test1.txt").delete();
+        new File(testDir, "test2.txt").delete();
+
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(
+                equalTo("onFileDeleted: test-dir/test1.txt"),
+                equalTo("onFileDeleted: test-dir/test2.txt")
+        ));
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), hasSize(0));
+
+        new File(testDir, "test0.txt").delete();
+        touch(new File(testDir, "test2.txt"));
+        touch(new File(testDir, "test4.txt"));
+        fileWatcher.checkAndNotify();
+
+        assertThat(changes.notifications(), contains(
+                equalTo("onFileDeleted: test-dir/test0.txt"),
+                equalTo("onFileCreated: test-dir/test2.txt"),
+                equalTo("onFileCreated: test-dir/test4.txt")
+        ));
+
+
+        changes.notifications().clear();
+
+        new File(testDir, "test3.txt").delete();
+        new File(testDir, "test4.txt").delete();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(
+                equalTo("onFileDeleted: test-dir/test3.txt"),
+                equalTo("onFileDeleted: test-dir/test4.txt")
+        ));
+
+
+        changes.notifications().clear();
+        deleteRecursively(testDir);
+        fileWatcher.checkAndNotify();
+
+        assertThat(changes.notifications(), contains(
+                equalTo("onFileDeleted: test-dir/test.txt"),
+                equalTo("onFileDeleted: test-dir/test2.txt"),
+                equalTo("onDirectoryDeleted: test-dir")
+        ));
+
+    }
+
+    @Test
+    public void testNestedDirectoryOperations() throws IOException {
+        File tempDir = newTempDir(LifecycleScope.TEST);
+        RecordingChangeListener changes = new RecordingChangeListener(tempDir);
+        File testDir = new File(tempDir, "test-dir");
+        testDir.mkdir();
+        touch(new File(testDir, "test.txt"));
+        new File(testDir, "sub-dir").mkdir();
+        touch(new File(testDir, "sub-dir/test0.txt"));
+
+        FileWatcher fileWatcher = new FileWatcher(testDir);
+        fileWatcher.addListener(changes);
+        fileWatcher.init();
+        assertThat(changes.notifications(), contains(
+                equalTo("onDirectoryInit: test-dir/"),
+                equalTo("onDirectoryInit: test-dir/sub-dir/"),
+                equalTo("onFileInit: test-dir/sub-dir/test0.txt"),
+                equalTo("onFileInit: test-dir/test.txt")
+        ));
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), hasSize(0));
+
+        // Create new file in subdirectory
+        touch(new File(testDir, "sub-dir/test1.txt"));
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(
+                equalTo("onFileCreated: test-dir/sub-dir/test1.txt")
+        ));
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), hasSize(0));
+
+        // Create new subdirectory in subdirectory
+        new File(testDir, "first-level").mkdir();
+        touch(new File(testDir, "first-level/file1.txt"));
+        new File(testDir, "first-level/second-level").mkdir();
+        touch(new File(testDir, "first-level/second-level/file2.txt"));
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(
+                equalTo("onDirectoryCreated: test-dir/first-level/"),
+                equalTo("onFileCreated: test-dir/first-level/file1.txt"),
+                equalTo("onDirectoryCreated: test-dir/first-level/second-level/"),
+                equalTo("onFileCreated: test-dir/first-level/second-level/file2.txt")
+        ));
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), hasSize(0));
+
+        // Delete a directory, check notifications for
+        deleteRecursively(new File(testDir, "first-level"));
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(
+                equalTo("onFileDeleted: test-dir/first-level/file1.txt"),
+                equalTo("onFileDeleted: test-dir/first-level/second-level/file2.txt"),
+                equalTo("onDirectoryDeleted: test-dir/first-level/second-level"),
+                equalTo("onDirectoryDeleted: test-dir/first-level")
+        ));
+    }
+
+    @Test
+    public void testFileReplacingDirectory() throws IOException {
+        File tempDir = newTempDir(LifecycleScope.TEST);
+        RecordingChangeListener changes = new RecordingChangeListener(tempDir);
+        File testDir = new File(tempDir, "test-dir");
+        testDir.mkdir();
+        File subDir = new File(testDir, "sub-dir");
+        subDir.mkdir();
+        touch(new File(subDir, "test0.txt"));
+        touch(new File(subDir, "test1.txt"));
+
+        FileWatcher fileWatcher = new FileWatcher(testDir);
+        fileWatcher.addListener(changes);
+        fileWatcher.init();
+        assertThat(changes.notifications(), contains(
+                equalTo("onDirectoryInit: test-dir/"),
+                equalTo("onDirectoryInit: test-dir/sub-dir/"),
+                equalTo("onFileInit: test-dir/sub-dir/test0.txt"),
+                equalTo("onFileInit: test-dir/sub-dir/test1.txt")
+        ));
+
+        changes.notifications().clear();
+
+        deleteRecursively(subDir);
+        touch(subDir);
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(
+                equalTo("onFileDeleted: test-dir/sub-dir/test0.txt"),
+                equalTo("onFileDeleted: test-dir/sub-dir/test1.txt"),
+                equalTo("onDirectoryDeleted: test-dir/sub-dir"),
+                equalTo("onFileCreated: test-dir/sub-dir")
+        ));
+
+        changes.notifications().clear();
+
+        subDir.delete();
+        subDir.mkdir();
+
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(
+                equalTo("onFileDeleted: test-dir/sub-dir/"),
+                equalTo("onDirectoryCreated: test-dir/sub-dir/")
+        ));
+    }
+
+    @Test
+    public void testEmptyDirectory() throws IOException {
+        File tempDir = newTempDir(LifecycleScope.TEST);
+        RecordingChangeListener changes = new RecordingChangeListener(tempDir);
+        File testDir = new File(tempDir, "test-dir");
+        testDir.mkdir();
+        touch(new File(testDir, "test0.txt"));
+        touch(new File(testDir, "test1.txt"));
+
+        FileWatcher fileWatcher = new FileWatcher(testDir);
+        fileWatcher.addListener(changes);
+        fileWatcher.init();
+        changes.notifications().clear();
+
+        new File(testDir, "test0.txt").delete();
+        new File(testDir, "test1.txt").delete();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(
+                equalTo("onFileDeleted: test-dir/test0.txt"),
+                equalTo("onFileDeleted: test-dir/test1.txt")
+        ));
+    }
+
+    @Test
+    public void testNoDirectoryOnInit() throws IOException {
+        File tempDir = newTempDir(LifecycleScope.TEST);
+        RecordingChangeListener changes = new RecordingChangeListener(tempDir);
+        File testDir = new File(tempDir, "test-dir");
+
+        FileWatcher fileWatcher = new FileWatcher(testDir);
+        fileWatcher.addListener(changes);
+        fileWatcher.init();
+        assertThat(changes.notifications(), hasSize(0));
+        changes.notifications().clear();
+
+        testDir.mkdir();
+        touch(new File(testDir, "test0.txt"));
+        touch(new File(testDir, "test1.txt"));
+
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(
+                equalTo("onDirectoryCreated: test-dir/"),
+                equalTo("onFileCreated: test-dir/test0.txt"),
+                equalTo("onFileCreated: test-dir/test1.txt")
+        ));
+    }
+
+    @Test
+    public void testNoFileOnInit() throws IOException {
+        File tempDir = newTempDir(LifecycleScope.TEST);
+        RecordingChangeListener changes = new RecordingChangeListener(tempDir);
+        File testFile = new File(tempDir, "testfile.txt");
+
+        FileWatcher fileWatcher = new FileWatcher(testFile);
+        fileWatcher.addListener(changes);
+        fileWatcher.init();
+        assertThat(changes.notifications(), hasSize(0));
+        changes.notifications().clear();
+
+        touch(testFile);
+
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(
+                equalTo("onFileCreated: testfile.txt")
+        ));
+    }
+
+}