Source code for reip.blocks.os_watch

'''File System Events

Watchdog API Reference: https://pythonhosted.org/watchdog/api.html

.. note::

    from experience, I've found file watchers can be a bit unreliable - (potentially
    due to my own misuse !).

    Additionally, many times (for example file upload) watching file 
    events may not actually be what you want, and you may be better off
    polling file lists using glob as that will reduce the number of missed files
    and is resilient to reboots.

'''
import queue
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import reip
from reip.util import text

class _WatchBlockHandler(PatternMatchingEventHandler):
    def __init__(self, q, event_types, *a, **kw):
        self.q = q
        self.event_types = event_types
        super().__init__(*a, **kw)

    def on_any_event(self, event):
        try:
            if not self.event_types or event.event_type in self.event_types:
                self.q.put(event)
        except Exception as e:
            self.q.put(e)

[docs]class Watch(reip.Block): '''A Generic File Watcher''' _q = _event_handler = _watch = None event_types = None def __init__(self, *patterns, path='./', event_types=None, recursive=False, **kw): self.patterns = list(patterns or ('*',)) self.path = path self.event_types = event_types or self.event_types self.recursive = recursive super().__init__(n_inputs=0, **kw) _Handler = _WatchBlockHandler # define a global observer - is this right? _observer = None @property def observer(self): if Watch._observer is None: Watch._observer = Observer() Watch._observer.start() return Watch._observer @observer.setter def observer(self, value): Watch._observer = value
[docs] def init(self): # a queue is used to gather all events emitted self._q = queue.Queue() # create watchdog stuff self._event_handler = self._Handler( self._q, self.event_types, patterns=self.patterns) self._watch = self.observer.schedule( self._event_handler, self.path, recursive=self.recursive)
[docs] def process(self, meta): if not self._q.empty(): e = self._q.get() if isinstance(e, Exception): raise e return self._output_event(e, meta)
def _output_event(self, event, meta): return [event.src_path], {'event_type': event.event_type}
[docs] def finish(self): # remove handler self.observer.remove_handler_for_watch(self._event_handler, self._watch) if self.observer.is_alive() and not any(self.observer._handlers.values()): # if there are no more handlers, shutdown the observer self.observer.unschedule_all() self.observer.stop() self.observer.join() self.observer = None
[docs]class Created(Watch): '''Outputs any created files.''' event_types = ('created',)
[docs]class Modified(Watch): '''Outputs any modified files.''' event_types = ('modified',)
[docs]class Deleted(Watch): '''Outputs any deleted files.''' event_types = ('deleted',)
[docs]class Moved(Watch): '''Outputs any moved files.''' event_types = ('moved',) def _output_event(self, event, meta): return [event.src_path, event.dest_path], {'event_type': event.event_type}
# Watch.Created = Created # Watch.Modified = Modified # Watch.Deleted = Deleted # Watch.Moved = Moved