This file is indexed.

/usr/lib/python3/dist-packages/doit/filewatch.py is in python3-doit 0.28.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Watch for modifications of file-system
use by cmd_auto module
"""

import os.path

def get_platform_system():
    """return platform.system
    platform module has many regexp, so importing it is slow...
    import only if required
    """
    import platform
    return platform.system()


class FileModifyWatcher(object):
    """Use inotify to watch file-system for file modifications

    Usage:
    1) subclass the method handle_event, action to be performed
    2) create an object passing a list of files to be watched
    3) call the loop method
    """
    supported_platforms = ('Darwin', 'Linux')

    def __init__(self, path_list):
        """@param file_list (list-str): files to be watched"""
        self.file_list = set()
        self.watch_dirs = set() # all dirs to be watched
        self.notify_dirs = set() # dirs that generate notification whatever file
        for filename in path_list:
            path = os.path.abspath(filename)
            if os.path.isfile(path):
                self.file_list.add(path)
                self.watch_dirs.add(os.path.dirname(path))
            else:
                self.notify_dirs.add(path)
                self.watch_dirs.add(path)
        self.platform = get_platform_system()
        if self.platform not in self.supported_platforms:
            msg = "Unsupported platform '%s'\n" % self.platform
            msg += ("'auto' command is supported only on %s" %
                    (self.supported_platforms,))
            raise Exception(msg)

    def _handle(self, event):
        """calls platform specific handler"""
        if self.platform == 'Darwin': # pragma: no cover
            filename = event.name
        elif self.platform == 'Linux':
            filename = event.pathname
        if (filename in self.file_list or
            os.path.dirname(filename) in self.notify_dirs):
            self.handle_event(event)

    def handle_event(self, event):
        """this should be sub-classed """
        raise NotImplementedError


    def _loop_darwin(self): # pragma: no cover
        """loop implementation for darwin platform"""
        from fsevents import Observer #pylint: disable=F0401
        from fsevents import Stream #pylint: disable=F0401
        from fsevents import IN_MODIFY #pylint: disable=F0401

        observer = Observer()
        handler = self._handle
        def fsevent_callback(event):
            if event.mask == IN_MODIFY:
                handler(event)

        for watch_this in self.watch_dirs:
            stream = Stream(fsevent_callback, watch_this, file_events=True)
            observer.schedule(stream)

        observer.daemon = True
        observer.start()
        try:
            # hack to keep main thread running...
            import time
            while True:
                time.sleep(99999)
        except (SystemExit, KeyboardInterrupt):
            pass


    def _loop_linux(self, loop_callback):
        """loop implementation for linux platform"""
        import pyinotify
        handler = self._handle
        class EventHandler(pyinotify.ProcessEvent):
            def process_default(self, event):
                handler(event)

        watch_manager = pyinotify.WatchManager()
        event_handler = EventHandler()
        notifier = pyinotify.Notifier(watch_manager, event_handler)

        mask = pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO
        for watch_this in self.watch_dirs:
            watch_manager.add_watch(watch_this, mask)

        notifier.loop(loop_callback)


    def loop(self, loop_callback=None):
        """Infinite loop watching for file modifications
        @loop_callback: used to stop loop on unittests
        """

        if self.platform == 'Darwin': # pragma: no cover
            self._loop_darwin()

        elif self.platform == 'Linux':
            self._loop_linux(loop_callback)