/usr/share/pyshared/doit/filewatch.py is in python-doit 0.24.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | """Watch for modifications of file-system
use by cmd_auto module
"""
import os.path
def get_platform_system():
    """return platform.system
    platform module has many regexp, so importing it is slow...
    import only if required
    """
    import platform
    return platform.system()
class FileModifyWatcher(object):
    """Use inotify to watch file-system for file modifications
    Usage:
    1) subclass the method handle_event, action to be performed
    2) create an object passing a list of files to be watched
    3) call the loop method
    """
    supported_platforms = ('Darwin', 'Linux')
    def __init__(self, path_list):
        """@param file_list (list-str): files to be watched"""
        self.file_list = set()
        self.watch_dirs = set() # all dirs to be watched
        self.notify_dirs = set() # dirs that generate notification whatever file
        for filename in path_list:
            path = os.path.abspath(filename)
            if os.path.isfile(path):
                self.file_list.add(path)
                self.watch_dirs.add(os.path.dirname(path))
            else:
                self.notify_dirs.add(path)
                self.watch_dirs.add(path)
        self.platform = get_platform_system()
        if self.platform not in self.supported_platforms:
            msg = "Unsupported platform '%s'\n" % self.platform
            msg += ("'auto' command is supported only on %s" %
                    (self.supported_platforms,))
            raise Exception(msg)
    def _handle(self, event):
        """calls platform specific handler"""
        if self.platform == 'Darwin': # pragma: no cover
            filename = event.name
        elif self.platform == 'Linux':
            filename = event.pathname
        if (filename in self.file_list or
            os.path.dirname(filename) in self.notify_dirs):
            self.handle_event(event)
    def handle_event(self, event):
        """this should be sub-classed """
        raise NotImplementedError
    def _loop_darwin(self): # pragma: no cover
        """loop implementation for darwin platform"""
        from fsevents import Observer #pylint: disable=F0401
        from fsevents import Stream #pylint: disable=F0401
        from fsevents import IN_MODIFY #pylint: disable=F0401
        observer = Observer()
        handler = self._handle
        def fsevent_callback(event):
            if event.mask == IN_MODIFY:
                handler(event)
        for watch_this in self.watch_dirs:
            stream = Stream(fsevent_callback, watch_this, file_events=True)
            observer.schedule(stream)
        observer.daemon = True
        observer.start()
        try:
            # hack to keep main thread running...
            import time
            while True:
                time.sleep(99999)
        except (SystemExit, KeyboardInterrupt):
            pass
    def _loop_linux(self, loop_callback):
        """loop implementation for linux platform"""
        import pyinotify
        handler = self._handle
        class EventHandler(pyinotify.ProcessEvent):
            def process_default(self, event):
                handler(event)
        watch_manager = pyinotify.WatchManager()
        event_handler = EventHandler()
        notifier = pyinotify.Notifier(watch_manager, event_handler)
        mask = pyinotify.IN_CLOSE_WRITE
        for watch_this in self.watch_dirs:
            watch_manager.add_watch(watch_this, mask)
        notifier.loop(loop_callback)
    def loop(self, loop_callback=None):
        """Infinite loop watching for file modifications
        @loop_callback: used to stop loop on unittests
        """
        if self.platform == 'Darwin': # pragma: no cover
            self._loop_darwin()
        elif self.platform == 'Linux':
            self._loop_linux(loop_callback)
 |