added a flask venv

This commit is contained in:
d3m1g0d
2019-01-19 13:23:37 +01:00
parent cbd3a55746
commit a1a8bca34b
961 changed files with 129021 additions and 0 deletions
@@ -0,0 +1,94 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com>
# Copyright 2012 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
:module: watchdog.observers
:synopsis: Observer that picks a native implementation if available.
:author: yesudeep@google.com (Yesudeep Mangalapilly)
Classes
=======
.. autoclass:: Observer
:members:
:show-inheritance:
:inherited-members:
Observer thread that schedules watching directories and dispatches
calls to event handlers.
You can also import platform specific classes directly and use it instead
of :class:`Observer`. Here is a list of implemented observer classes.:
============== ================================ ==============================
Class Platforms Note
============== ================================ ==============================
|Inotify| Linux 2.6.13+ ``inotify(7)`` based observer
|FSEvents| Mac OS X FSEvents based observer
|Kqueue| Mac OS X and BSD with kqueue(2) ``kqueue(2)`` based observer
|WinApi| MS Windows Windows API-based observer
|Polling| Any fallback implementation
============== ================================ ==============================
.. |Inotify| replace:: :class:`.inotify.InotifyObserver`
.. |FSEvents| replace:: :class:`.fsevents.FSEventsObserver`
.. |Kqueue| replace:: :class:`.kqueue.KqueueObserver`
.. |WinApi| replace:: :class:`.read_directory_changes.WindowsApiObserver`
.. |WinApiAsync| replace:: :class:`.read_directory_changes_async.WindowsApiAsyncObserver`
.. |Polling| replace:: :class:`.polling.PollingObserver`
"""
import warnings
from watchdog.utils import platform
from watchdog.utils import UnsupportedLibc
if platform.is_linux():
try:
from .inotify import InotifyObserver as Observer
except UnsupportedLibc:
from .polling import PollingObserver as Observer
elif platform.is_darwin():
# FIXME: catching too broad. Error prone
try:
from .fsevents import FSEventsObserver as Observer
except:
try:
from .kqueue import KqueueObserver as Observer
warnings.warn("Failed to import fsevents. Fall back to kqueue")
except:
from .polling import PollingObserver as Observer
warnings.warn("Failed to import fsevents and kqueue. Fall back to polling.")
elif platform.is_bsd():
from .kqueue import KqueueObserver as Observer
elif platform.is_windows():
# TODO: find a reliable way of checking Windows version and import
# polling explicitly for Windows XP
try:
from .read_directory_changes import WindowsApiObserver as Observer
except:
from .polling import PollingObserver as Observer
warnings.warn("Failed to import read_directory_changes. Fall back to polling.")
else:
from .polling import PollingObserver as Observer
__all__ = ["Observer"]
@@ -0,0 +1,369 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com>
# Copyright 2012 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import threading
from watchdog.utils import BaseThread
from watchdog.utils.compat import queue
from watchdog.utils.bricks import SkipRepeatsQueue
DEFAULT_EMITTER_TIMEOUT = 1 # in seconds.
DEFAULT_OBSERVER_TIMEOUT = 1 # in seconds.
# Collection classes
class EventQueue(SkipRepeatsQueue):
"""Thread-safe event queue based on a special queue that skips adding
the same event (:class:`FileSystemEvent`) multiple times consecutively.
Thus avoiding dispatching multiple event handling
calls when multiple identical events are produced quicker than an observer
can consume them.
"""
class ObservedWatch(object):
"""An scheduled watch.
:param path:
Path string.
:param recursive:
``True`` if watch is recursive; ``False`` otherwise.
"""
def __init__(self, path, recursive):
self._path = path
self._is_recursive = recursive
@property
def path(self):
"""The path that this watch monitors."""
return self._path
@property
def is_recursive(self):
"""Determines whether subdirectories are watched for the path."""
return self._is_recursive
@property
def key(self):
return self.path, self.is_recursive
def __eq__(self, watch):
return self.key == watch.key
def __ne__(self, watch):
return self.key != watch.key
def __hash__(self):
return hash(self.key)
def __repr__(self):
return "<ObservedWatch: path=%s, is_recursive=%s>" % (
self.path, self.is_recursive)
# Observer classes
class EventEmitter(BaseThread):
"""
Producer thread base class subclassed by event emitters
that generate events and populate a queue with them.
:param event_queue:
The event queue to populate with generated events.
:type event_queue:
:class:`watchdog.events.EventQueue`
:param watch:
The watch to observe and produce events for.
:type watch:
:class:`ObservedWatch`
:param timeout:
Timeout (in seconds) between successive attempts at reading events.
:type timeout:
``float``
"""
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
BaseThread.__init__(self)
self._event_queue = event_queue
self._watch = watch
self._timeout = timeout
@property
def timeout(self):
"""
Blocking timeout for reading events.
"""
return self._timeout
@property
def watch(self):
"""
The watch associated with this emitter.
"""
return self._watch
def queue_event(self, event):
"""
Queues a single event.
:param event:
Event to be queued.
:type event:
An instance of :class:`watchdog.events.FileSystemEvent`
or a subclass.
"""
self._event_queue.put((event, self.watch))
def queue_events(self, timeout):
"""Override this method to populate the event queue with events
per interval period.
:param timeout:
Timeout (in seconds) between successive attempts at
reading events.
:type timeout:
``float``
"""
def run(self):
try:
while self.should_keep_running():
self.queue_events(self.timeout)
finally:
pass
class EventDispatcher(BaseThread):
"""
Consumer thread base class subclassed by event observer threads
that dispatch events from an event queue to appropriate event handlers.
:param timeout:
Event queue blocking timeout (in seconds).
:type timeout:
``float``
"""
def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
BaseThread.__init__(self)
self._event_queue = EventQueue()
self._timeout = timeout
@property
def timeout(self):
"""Event queue block timeout."""
return self._timeout
@property
def event_queue(self):
"""The event queue which is populated with file system events
by emitters and from which events are dispatched by a dispatcher
thread."""
return self._event_queue
def dispatch_events(self, event_queue, timeout):
"""Override this method to consume events from an event queue, blocking
on the queue for the specified timeout before raising :class:`queue.Empty`.
:param event_queue:
Event queue to populate with one set of events.
:type event_queue:
:class:`EventQueue`
:param timeout:
Interval period (in seconds) to wait before timing out on the
event queue.
:type timeout:
``float``
:raises:
:class:`queue.Empty`
"""
def run(self):
while self.should_keep_running():
try:
self.dispatch_events(self.event_queue, self.timeout)
except queue.Empty:
continue
class BaseObserver(EventDispatcher):
"""Base observer."""
def __init__(self, emitter_class, timeout=DEFAULT_OBSERVER_TIMEOUT):
EventDispatcher.__init__(self, timeout)
self._emitter_class = emitter_class
self._lock = threading.RLock()
self._watches = set()
self._handlers = dict()
self._emitters = set()
self._emitter_for_watch = dict()
def _add_emitter(self, emitter):
self._emitter_for_watch[emitter.watch] = emitter
self._emitters.add(emitter)
def _remove_emitter(self, emitter):
del self._emitter_for_watch[emitter.watch]
self._emitters.remove(emitter)
emitter.stop()
try:
emitter.join()
except RuntimeError:
pass
def _clear_emitters(self):
for emitter in self._emitters:
emitter.stop()
for emitter in self._emitters:
try:
emitter.join()
except RuntimeError:
pass
self._emitters.clear()
self._emitter_for_watch.clear()
def _add_handler_for_watch(self, event_handler, watch):
if watch not in self._handlers:
self._handlers[watch] = set()
self._handlers[watch].add(event_handler)
def _remove_handlers_for_watch(self, watch):
del self._handlers[watch]
@property
def emitters(self):
"""Returns event emitter created by this observer."""
return self._emitters
def start(self):
for emitter in self._emitters:
emitter.start()
super(BaseObserver, self).start()
def schedule(self, event_handler, path, recursive=False):
"""
Schedules watching a path and calls appropriate methods specified
in the given event handler in response to file system events.
:param event_handler:
An event handler instance that has appropriate event handling
methods which will be called by the observer in response to
file system events.
:type event_handler:
:class:`watchdog.events.FileSystemEventHandler` or a subclass
:param path:
Directory path that will be monitored.
:type path:
``str``
:param recursive:
``True`` if events will be emitted for sub-directories
traversed recursively; ``False`` otherwise.
:type recursive:
``bool``
:return:
An :class:`ObservedWatch` object instance representing
a watch.
"""
with self._lock:
watch = ObservedWatch(path, recursive)
self._add_handler_for_watch(event_handler, watch)
# If we don't have an emitter for this watch already, create it.
if self._emitter_for_watch.get(watch) is None:
emitter = self._emitter_class(event_queue=self.event_queue,
watch=watch,
timeout=self.timeout)
self._add_emitter(emitter)
if self.is_alive():
emitter.start()
self._watches.add(watch)
return watch
def add_handler_for_watch(self, event_handler, watch):
"""Adds a handler for the given watch.
:param event_handler:
An event handler instance that has appropriate event handling
methods which will be called by the observer in response to
file system events.
:type event_handler:
:class:`watchdog.events.FileSystemEventHandler` or a subclass
:param watch:
The watch to add a handler for.
:type watch:
An instance of :class:`ObservedWatch` or a subclass of
:class:`ObservedWatch`
"""
with self._lock:
self._add_handler_for_watch(event_handler, watch)
def remove_handler_for_watch(self, event_handler, watch):
"""Removes a handler for the given watch.
:param event_handler:
An event handler instance that has appropriate event handling
methods which will be called by the observer in response to
file system events.
:type event_handler:
:class:`watchdog.events.FileSystemEventHandler` or a subclass
:param watch:
The watch to remove a handler for.
:type watch:
An instance of :class:`ObservedWatch` or a subclass of
:class:`ObservedWatch`
"""
with self._lock:
self._handlers[watch].remove(event_handler)
def unschedule(self, watch):
"""Unschedules a watch.
:param watch:
The watch to unschedule.
:type watch:
An instance of :class:`ObservedWatch` or a subclass of
:class:`ObservedWatch`
"""
with self._lock:
emitter = self._emitter_for_watch[watch]
del self._handlers[watch]
self._remove_emitter(emitter)
self._watches.remove(watch)
def unschedule_all(self):
"""Unschedules all watches and detaches all associated event
handlers."""
with self._lock:
self._handlers.clear()
self._clear_emitters()
self._watches.clear()
def on_thread_stop(self):
self.unschedule_all()
def dispatch_events(self, event_queue, timeout):
event, watch = event_queue.get(block=True, timeout=timeout)
with self._lock:
# To allow unschedule/stop and safe removal of event handlers
# within event handlers itself, check if the handler is still
# registered after every dispatch.
for handler in list(self._handlers.get(watch, [])):
if handler in self._handlers.get(watch, []):
handler.dispatch(event)
event_queue.task_done()
@@ -0,0 +1,172 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com>
# Copyright 2012 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
:module: watchdog.observers.fsevents
:synopsis: FSEvents based emitter implementation.
:author: yesudeep@google.com (Yesudeep Mangalapilly)
:platforms: Mac OS X
"""
from __future__ import with_statement
import sys
import threading
import unicodedata
import _watchdog_fsevents as _fsevents
from watchdog.events import (
FileDeletedEvent,
FileModifiedEvent,
FileCreatedEvent,
FileMovedEvent,
DirDeletedEvent,
DirModifiedEvent,
DirCreatedEvent,
DirMovedEvent
)
from watchdog.utils.dirsnapshot import DirectorySnapshot
from watchdog.observers.api import (
BaseObserver,
EventEmitter,
DEFAULT_EMITTER_TIMEOUT,
DEFAULT_OBSERVER_TIMEOUT
)
class FSEventsEmitter(EventEmitter):
"""
Mac OS X FSEvents Emitter class.
:param event_queue:
The event queue to fill with events.
:param watch:
A watch object representing the directory to monitor.
:type watch:
:class:`watchdog.observers.api.ObservedWatch`
:param timeout:
Read events blocking timeout (in seconds).
:type timeout:
``float``
"""
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
EventEmitter.__init__(self, event_queue, watch, timeout)
self._lock = threading.Lock()
self.snapshot = DirectorySnapshot(watch.path, watch.is_recursive)
def on_thread_stop(self):
_fsevents.remove_watch(self.watch)
_fsevents.stop(self)
def queue_events(self, timeout):
with self._lock:
if not self.watch.is_recursive\
and self.watch.path not in self.pathnames:
return
new_snapshot = DirectorySnapshot(self.watch.path,
self.watch.is_recursive)
events = new_snapshot - self.snapshot
self.snapshot = new_snapshot
# Files.
for src_path in events.files_deleted:
self.queue_event(FileDeletedEvent(src_path))
for src_path in events.files_modified:
self.queue_event(FileModifiedEvent(src_path))
for src_path in events.files_created:
self.queue_event(FileCreatedEvent(src_path))
for src_path, dest_path in events.files_moved:
self.queue_event(FileMovedEvent(src_path, dest_path))
# Directories.
for src_path in events.dirs_deleted:
self.queue_event(DirDeletedEvent(src_path))
for src_path in events.dirs_modified:
self.queue_event(DirModifiedEvent(src_path))
for src_path in events.dirs_created:
self.queue_event(DirCreatedEvent(src_path))
for src_path, dest_path in events.dirs_moved:
self.queue_event(DirMovedEvent(src_path, dest_path))
def run(self):
try:
def callback(pathnames, flags, emitter=self):
emitter.queue_events(emitter.timeout)
# for pathname, flag in zip(pathnames, flags):
# if emitter.watch.is_recursive: # and pathname != emitter.watch.path:
# new_sub_snapshot = DirectorySnapshot(pathname, True)
# old_sub_snapshot = self.snapshot.copy(pathname)
# diff = new_sub_snapshot - old_sub_snapshot
# self.snapshot += new_subsnapshot
# else:
# new_snapshot = DirectorySnapshot(emitter.watch.path, False)
# diff = new_snapshot - emitter.snapshot
# emitter.snapshot = new_snapshot
# INFO: FSEvents reports directory notifications recursively
# by default, so we do not need to add subdirectory paths.
#pathnames = set([self.watch.path])
# if self.watch.is_recursive:
# for root, directory_names, _ in os.walk(self.watch.path):
# for directory_name in directory_names:
# full_path = absolute_path(
# os.path.join(root, directory_name))
# pathnames.add(full_path)
self.pathnames = [self.watch.path]
_fsevents.add_watch(self,
self.watch,
callback,
self.pathnames)
_fsevents.read_events(self)
except:
pass
class FSEventsObserver(BaseObserver):
def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
BaseObserver.__init__(self, emitter_class=FSEventsEmitter,
timeout=timeout)
def schedule(self, event_handler, path, recursive=False):
# Python 2/3 compat
try:
str_class = unicode
except NameError:
str_class = str
# Fix for issue #26: Trace/BPT error when given a unicode path
# string. https://github.com/gorakhargosh/watchdog/issues#issue/26
if isinstance(path, str_class):
#path = unicode(path, 'utf-8')
path = unicodedata.normalize('NFC', path)
# We only encode the path in Python 2 for backwards compatibility.
# On Python 3 we want the path to stay as unicode if possible for
# the sake of path matching not having to be rewritten to use the
# bytes API instead of strings. The _watchdog_fsevent.so code for
# Python 3 can handle both str and bytes paths, which is why we
# do not HAVE to encode it with Python 3. The Python 2 code in
# _watchdog_fsevents.so was not changed for the sake of backwards
# compatibility.
if sys.version_info < (3,):
path = path.encode('utf-8')
return BaseObserver.schedule(self, event_handler, path, recursive)
@@ -0,0 +1,239 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 Thomas Amland <thomas.amland@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
:module: watchdog.observers.fsevents2
:synopsis: FSEvents based emitter implementation.
:platforms: Mac OS X
"""
import os
import logging
import unicodedata
from threading import Thread
from watchdog.utils.compat import queue
from watchdog.events import (
FileDeletedEvent,
FileModifiedEvent,
FileCreatedEvent,
FileMovedEvent,
DirDeletedEvent,
DirModifiedEvent,
DirCreatedEvent,
DirMovedEvent
)
from watchdog.observers.api import (
BaseObserver,
EventEmitter,
DEFAULT_EMITTER_TIMEOUT,
DEFAULT_OBSERVER_TIMEOUT,
)
# pyobjc
import AppKit
from FSEvents import (
FSEventStreamCreate,
CFRunLoopGetCurrent,
FSEventStreamScheduleWithRunLoop,
FSEventStreamStart,
CFRunLoopRun,
CFRunLoopStop,
FSEventStreamStop,
FSEventStreamInvalidate,
FSEventStreamRelease,
)
from FSEvents import (
kCFAllocatorDefault,
kCFRunLoopDefaultMode,
kFSEventStreamEventIdSinceNow,
kFSEventStreamCreateFlagNoDefer,
kFSEventStreamCreateFlagFileEvents,
kFSEventStreamEventFlagItemCreated,
kFSEventStreamEventFlagItemRemoved,
kFSEventStreamEventFlagItemInodeMetaMod,
kFSEventStreamEventFlagItemRenamed,
kFSEventStreamEventFlagItemModified,
kFSEventStreamEventFlagItemFinderInfoMod,
kFSEventStreamEventFlagItemChangeOwner,
kFSEventStreamEventFlagItemXattrMod,
kFSEventStreamEventFlagItemIsDir,
kFSEventStreamEventFlagItemIsSymlink,
)
logger = logging.getLogger(__name__)
class FSEventsQueue(Thread):
""" Low level FSEvents client. """
def __init__(self, path):
Thread.__init__(self)
self._queue = queue.Queue()
self._run_loop = None
if isinstance(path, bytes):
path = path.decode('utf-8')
self._path = unicodedata.normalize('NFC', path)
context = None
latency = 1.0
self._stream_ref = FSEventStreamCreate(
kCFAllocatorDefault, self._callback, context, [self._path],
kFSEventStreamEventIdSinceNow, latency,
kFSEventStreamCreateFlagNoDefer | kFSEventStreamCreateFlagFileEvents)
if self._stream_ref is None:
raise IOError("FSEvents. Could not create stream.")
def run(self):
pool = AppKit.NSAutoreleasePool.alloc().init()
self._run_loop = CFRunLoopGetCurrent()
FSEventStreamScheduleWithRunLoop(
self._stream_ref, self._run_loop, kCFRunLoopDefaultMode)
if not FSEventStreamStart(self._stream_ref):
FSEventStreamInvalidate(self._stream_ref)
FSEventStreamRelease(self._stream_ref)
raise IOError("FSEvents. Could not start stream.")
CFRunLoopRun()
FSEventStreamStop(self._stream_ref)
FSEventStreamInvalidate(self._stream_ref)
FSEventStreamRelease(self._stream_ref)
del pool
# Make sure waiting thread is notified
self._queue.put(None)
def stop(self):
if self._run_loop is not None:
CFRunLoopStop(self._run_loop)
def _callback(self, streamRef, clientCallBackInfo, numEvents, eventPaths, eventFlags, eventIDs):
events = [NativeEvent(path, flags, _id) for path, flags, _id in
zip(eventPaths, eventFlags, eventIDs)]
logger.debug("FSEvents callback. Got %d events:" % numEvents)
for e in events:
logger.debug(e)
self._queue.put(events)
def read_events(self):
"""
Returns a list or one or more events, or None if there are no more
events to be read.
"""
if not self.is_alive():
return None
return self._queue.get()
class NativeEvent(object):
def __init__(self, path, flags, event_id):
self.path = path
self.flags = flags
self.event_id = event_id
self.is_created = bool(flags & kFSEventStreamEventFlagItemCreated)
self.is_removed = bool(flags & kFSEventStreamEventFlagItemRemoved)
self.is_renamed = bool(flags & kFSEventStreamEventFlagItemRenamed)
self.is_modified = bool(flags & kFSEventStreamEventFlagItemModified)
self.is_change_owner = bool(flags & kFSEventStreamEventFlagItemChangeOwner)
self.is_inode_meta_mod = bool(flags & kFSEventStreamEventFlagItemInodeMetaMod)
self.is_finder_info_mod = bool(flags & kFSEventStreamEventFlagItemFinderInfoMod)
self.is_xattr_mod = bool(flags & kFSEventStreamEventFlagItemXattrMod)
self.is_symlink = bool(flags & kFSEventStreamEventFlagItemIsSymlink)
self.is_directory = bool(flags & kFSEventStreamEventFlagItemIsDir)
@property
def _event_type(self):
if self.is_created: return "Created"
if self.is_removed: return "Removed"
if self.is_renamed: return "Renamed"
if self.is_modified: return "Modified"
if self.is_inode_meta_mod: return "InodeMetaMod"
if self.is_xattr_mod: return "XattrMod"
return "Unknown"
def __repr__(self):
s ="<NativeEvent: path=%s, type=%s, is_dir=%s, flags=%s, id=%s>"
return s % (repr(self.path), self._event_type, self.is_directory, hex(self.flags), self.event_id)
class FSEventsEmitter(EventEmitter):
"""
FSEvents based event emitter. Handles conversion of native events.
"""
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
EventEmitter.__init__(self, event_queue, watch, timeout)
self._fsevents = FSEventsQueue(watch.path)
self._fsevents.start()
def on_thread_stop(self):
self._fsevents.stop()
def queue_events(self, timeout):
events = self._fsevents.read_events()
if events is None:
return
i = 0
while i < len(events):
event = events[i]
# For some reason the create and remove flags are sometimes also
# set for rename and modify type events, so let those take
# precedence.
if event.is_renamed:
# Internal moves appears to always be consecutive in the same
# buffer and have IDs differ by exactly one (while others
# don't) making it possible to pair up the two events coming
# from a singe move operation. (None of this is documented!)
# Otherwise, guess whether file was moved in or out.
#TODO: handle id wrapping
if (i+1 < len(events) and events[i+1].is_renamed and
events[i+1].event_id == event.event_id + 1):
cls = DirMovedEvent if event.is_directory else FileMovedEvent
self.queue_event(cls(event.path, events[i+1].path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
self.queue_event(DirModifiedEvent(os.path.dirname(events[i+1].path)))
i += 1
elif os.path.exists(event.path):
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
else:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
#TODO: generate events for tree
elif event.is_modified or event.is_inode_meta_mod or event.is_xattr_mod :
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
self.queue_event(cls(event.path))
elif event.is_created:
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
elif event.is_removed:
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(event.path))
self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
i += 1
class FSEventsObserver2(BaseObserver):
def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
BaseObserver.__init__(self, emitter_class=FSEventsEmitter, timeout=timeout)
@@ -0,0 +1,218 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com>
# Copyright 2012 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
:module: watchdog.observers.inotify
:synopsis: ``inotify(7)`` based emitter implementation.
:author: Sebastien Martini <seb@dbzteam.org>
:author: Luke McCarthy <luke@iogopro.co.uk>
:author: yesudeep@google.com (Yesudeep Mangalapilly)
:author: Tim Cuthbertson <tim+github@gfxmonk.net>
:platforms: Linux 2.6.13+.
.. ADMONITION:: About system requirements
Recommended minimum kernel version: 2.6.25.
Quote from the inotify(7) man page:
"Inotify was merged into the 2.6.13 Linux kernel. The required library
interfaces were added to glibc in version 2.4. (IN_DONT_FOLLOW,
IN_MASK_ADD, and IN_ONLYDIR were only added in version 2.5.)"
Therefore, you must ensure the system is running at least these versions
appropriate libraries and the kernel.
.. ADMONITION:: About recursiveness, event order, and event coalescing
Quote from the inotify(7) man page:
If successive output inotify events produced on the inotify file
descriptor are identical (same wd, mask, cookie, and name) then they
are coalesced into a single event if the older event has not yet been
read (but see BUGS).
The events returned by reading from an inotify file descriptor form
an ordered queue. Thus, for example, it is guaranteed that when
renaming from one directory to another, events will be produced in
the correct order on the inotify file descriptor.
...
Inotify monitoring of directories is not recursive: to monitor
subdirectories under a directory, additional watches must be created.
This emitter implementation therefore automatically adds watches for
sub-directories if running in recursive mode.
Some extremely useful articles and documentation:
.. _inotify FAQ: http://inotify.aiken.cz/?section=inotify&page=faq&lang=en
.. _intro to inotify: http://www.linuxjournal.com/article/8478
"""
from __future__ import with_statement
import os
import threading
from .inotify_buffer import InotifyBuffer
from watchdog.observers.api import (
EventEmitter,
BaseObserver,
DEFAULT_EMITTER_TIMEOUT,
DEFAULT_OBSERVER_TIMEOUT
)
from watchdog.events import (
DirDeletedEvent,
DirModifiedEvent,
DirMovedEvent,
DirCreatedEvent,
FileDeletedEvent,
FileModifiedEvent,
FileMovedEvent,
FileCreatedEvent,
generate_sub_moved_events,
generate_sub_created_events,
)
from watchdog.utils import unicode_paths
class InotifyEmitter(EventEmitter):
"""
inotify(7)-based event emitter.
:param event_queue:
The event queue to fill with events.
:param watch:
A watch object representing the directory to monitor.
:type watch:
:class:`watchdog.observers.api.ObservedWatch`
:param timeout:
Read events blocking timeout (in seconds).
:type timeout:
``float``
"""
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
EventEmitter.__init__(self, event_queue, watch, timeout)
self._lock = threading.Lock()
self._inotify = None
def on_thread_start(self):
path = unicode_paths.encode(self.watch.path)
self._inotify = InotifyBuffer(path, self.watch.is_recursive)
def on_thread_stop(self):
if self._inotify:
self._inotify.close()
def queue_events(self, timeout, full_events=False):
#If "full_events" is true, then the method will report unmatched move events as seperate events
#This behavior is by default only called by a InotifyFullEmitter
with self._lock:
event = self._inotify.read_event()
if event is None:
return
if isinstance(event, tuple):
move_from, move_to = event
src_path = self._decode_path(move_from.src_path)
dest_path = self._decode_path(move_to.src_path)
cls = DirMovedEvent if move_from.is_directory else FileMovedEvent
self.queue_event(cls(src_path, dest_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
self.queue_event(DirModifiedEvent(os.path.dirname(dest_path)))
if move_from.is_directory and self.watch.is_recursive:
for sub_event in generate_sub_moved_events(src_path, dest_path):
self.queue_event(sub_event)
return
src_path = self._decode_path(event.src_path)
if event.is_moved_to:
if (full_events):
cls = DirMovedEvent if event.is_directory else FileMovedEvent
self.queue_event(cls(None, src_path))
else:
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
if event.is_directory and self.watch.is_recursive:
for sub_event in generate_sub_created_events(src_path):
self.queue_event(sub_event)
elif event.is_attrib:
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
self.queue_event(cls(src_path))
elif event.is_modify:
cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
self.queue_event(cls(src_path))
elif event.is_delete or (event.is_moved_from and not full_events):
cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
elif event.is_moved_from and full_events:
cls = DirMovedEvent if event.is_directory else FileMovedEvent
self.queue_event(cls(src_path, None))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
elif event.is_create:
cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
self.queue_event(cls(src_path))
self.queue_event(DirModifiedEvent(os.path.dirname(src_path)))
def _decode_path(self, path):
""" Decode path only if unicode string was passed to this emitter. """
if isinstance(self.watch.path, bytes):
return path
return unicode_paths.decode(path)
class InotifyFullEmitter(InotifyEmitter):
"""
inotify(7)-based event emitter. By default this class produces move events even if they are not matched
Such move events will have a ``None`` value for the unmatched part.
:param event_queue:
The event queue to fill with events.
:param watch:
A watch object representing the directory to monitor.
:type watch:
:class:`watchdog.observers.api.ObservedWatch`
:param timeout:
Read events blocking timeout (in seconds).
:type timeout:
``float``
"""
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
InotifyEmitter.__init__(self, event_queue, watch, timeout)
def queue_events(self, timeout, events=True):
InotifyEmitter.queue_events(self, timeout, full_events=events)
class InotifyObserver(BaseObserver):
"""
Observer thread that schedules watching directories and dispatches
calls to event handlers.
"""
def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT, generate_full_events=False):
if (generate_full_events):
BaseObserver.__init__(self, emitter_class=InotifyFullEmitter, timeout=timeout)
else:
BaseObserver.__init__(self, emitter_class=InotifyEmitter,
timeout=timeout)
@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
#
# Copyright 2014 Thomas Amland <thomas.amland@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from watchdog.utils import BaseThread
from watchdog.utils.delayed_queue import DelayedQueue
from watchdog.observers.inotify_c import Inotify
logger = logging.getLogger(__name__)
class InotifyBuffer(BaseThread):
"""A wrapper for `Inotify` that holds events for `delay` seconds. During
this time, IN_MOVED_FROM and IN_MOVED_TO events are paired.
"""
delay = 0.5
def __init__(self, path, recursive=False):
BaseThread.__init__(self)
self._queue = DelayedQueue(self.delay)
self._inotify = Inotify(path, recursive)
self.start()
def read_event(self):
"""Returns a single event or a tuple of from/to events in case of a
paired move event. If this buffer has been closed, immediately return
None.
"""
return self._queue.get()
def on_thread_stop(self):
self._inotify.close()
self._queue.close()
def close(self):
self.stop()
self.join()
def run(self):
"""Read event from `inotify` and add them to `queue`. When reading a
IN_MOVE_TO event, remove the previous added matching IN_MOVE_FROM event
and add them back to the queue as a tuple.
"""
deleted_self = False
while self.should_keep_running() and not deleted_self:
inotify_events = self._inotify.read_events()
for inotify_event in inotify_events:
logger.debug("in-event %s", inotify_event)
if inotify_event.is_moved_to:
def matching_from_event(event):
return (not isinstance(event, tuple) and event.is_moved_from
and event.cookie == inotify_event.cookie)
from_event = self._queue.remove(matching_from_event)
if from_event is not None:
self._queue.put((from_event, inotify_event))
else:
logger.debug("could not find matching move_from event")
self._queue.put(inotify_event)
else:
self._queue.put(inotify_event)
if inotify_event.is_delete_self and \
inotify_event.src_path == self._inotify.path:
# Deleted the watched directory, stop watching for events
deleted_self = True
@@ -0,0 +1,575 @@
# -*- coding: utf-8 -*-
#
# Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com>
# Copyright 2012 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import os
import errno
import struct
import threading
import ctypes
import ctypes.util
from functools import reduce
from ctypes import c_int, c_char_p, c_uint32
from watchdog.utils import has_attribute
from watchdog.utils import UnsupportedLibc
def _load_libc():
libc_path = None
try:
libc_path = ctypes.util.find_library('c')
except (OSError, IOError, RuntimeError):
# Note: find_library will on some platforms raise these undocumented
# errors, e.g.on android IOError "No usable temporary directory found"
# will be raised.
pass
if libc_path is not None:
return ctypes.CDLL(libc_path)
# Fallbacks
try:
return ctypes.CDLL('libc.so')
except (OSError, IOError):
pass
try:
return ctypes.CDLL('libc.so.6')
except (OSError, IOError):
pass
# uClibc
try:
return ctypes.CDLL('libc.so.0')
except (OSError, IOError) as err:
raise err
libc = _load_libc()
if not has_attribute(libc, 'inotify_init') or \
not has_attribute(libc, 'inotify_add_watch') or \
not has_attribute(libc, 'inotify_rm_watch'):
raise UnsupportedLibc("Unsupported libc version found: %s" % libc._name)
inotify_add_watch = ctypes.CFUNCTYPE(c_int, c_int, c_char_p, c_uint32, use_errno=True)(
("inotify_add_watch", libc))
inotify_rm_watch = ctypes.CFUNCTYPE(c_int, c_int, c_uint32, use_errno=True)(
("inotify_rm_watch", libc))
inotify_init = ctypes.CFUNCTYPE(c_int, use_errno=True)(
("inotify_init", libc))
class InotifyConstants(object):
# User-space events
IN_ACCESS = 0x00000001 # File was accessed.
IN_MODIFY = 0x00000002 # File was modified.
IN_ATTRIB = 0x00000004 # Meta-data changed.
IN_CLOSE_WRITE = 0x00000008 # Writable file was closed.
IN_CLOSE_NOWRITE = 0x00000010 # Unwritable file closed.
IN_OPEN = 0x00000020 # File was opened.
IN_MOVED_FROM = 0x00000040 # File was moved from X.
IN_MOVED_TO = 0x00000080 # File was moved to Y.
IN_CREATE = 0x00000100 # Subfile was created.
IN_DELETE = 0x00000200 # Subfile was deleted.
IN_DELETE_SELF = 0x00000400 # Self was deleted.
IN_MOVE_SELF = 0x00000800 # Self was moved.
# Helper user-space events.
IN_CLOSE = IN_CLOSE_WRITE | IN_CLOSE_NOWRITE # Close.
IN_MOVE = IN_MOVED_FROM | IN_MOVED_TO # Moves.
# Events sent by the kernel to a watch.
IN_UNMOUNT = 0x00002000 # Backing file system was unmounted.
IN_Q_OVERFLOW = 0x00004000 # Event queued overflowed.
IN_IGNORED = 0x00008000 # File was ignored.
# Special flags.
IN_ONLYDIR = 0x01000000 # Only watch the path if it's a directory.
IN_DONT_FOLLOW = 0x02000000 # Do not follow a symbolic link.
IN_EXCL_UNLINK = 0x04000000 # Exclude events on unlinked objects
IN_MASK_ADD = 0x20000000 # Add to the mask of an existing watch.
IN_ISDIR = 0x40000000 # Event occurred against directory.
IN_ONESHOT = 0x80000000 # Only send event once.
# All user-space events.
IN_ALL_EVENTS = reduce(
lambda x, y: x | y, [
IN_ACCESS,
IN_MODIFY,
IN_ATTRIB,
IN_CLOSE_WRITE,
IN_CLOSE_NOWRITE,
IN_OPEN,
IN_MOVED_FROM,
IN_MOVED_TO,
IN_DELETE,
IN_CREATE,
IN_DELETE_SELF,
IN_MOVE_SELF,
])
# Flags for ``inotify_init1``
IN_CLOEXEC = 0x02000000
IN_NONBLOCK = 0x00004000
# Watchdog's API cares only about these events.
WATCHDOG_ALL_EVENTS = reduce(
lambda x, y: x | y, [
InotifyConstants.IN_MODIFY,
InotifyConstants.IN_ATTRIB,
InotifyConstants.IN_MOVED_FROM,
InotifyConstants.IN_MOVED_TO,
InotifyConstants.IN_CREATE,
InotifyConstants.IN_DELETE,
InotifyConstants.IN_DELETE_SELF,
InotifyConstants.IN_DONT_FOLLOW,
])
class inotify_event_struct(ctypes.Structure):
"""
Structure representation of the inotify_event structure
(used in buffer size calculations)::
struct inotify_event {
__s32 wd; /* watch descriptor */
__u32 mask; /* watch mask */
__u32 cookie; /* cookie to synchronize two events */
__u32 len; /* length (including nulls) of name */
char name[0]; /* stub for possible name */
};
"""
_fields_ = [('wd', c_int),
('mask', c_uint32),
('cookie', c_uint32),
('len', c_uint32),
('name', c_char_p)]
EVENT_SIZE = ctypes.sizeof(inotify_event_struct)
DEFAULT_NUM_EVENTS = 2048
DEFAULT_EVENT_BUFFER_SIZE = DEFAULT_NUM_EVENTS * (EVENT_SIZE + 16)
class Inotify(object):
"""
Linux inotify(7) API wrapper class.
:param path:
The directory path for which we want an inotify object.
:type path:
:class:`bytes`
:param recursive:
``True`` if subdirectories should be monitored; ``False`` otherwise.
"""
def __init__(self, path, recursive=False, event_mask=WATCHDOG_ALL_EVENTS):
# The file descriptor associated with the inotify instance.
inotify_fd = inotify_init()
if inotify_fd == -1:
Inotify._raise_error()
self._inotify_fd = inotify_fd
self._lock = threading.Lock()
# Stores the watch descriptor for a given path.
self._wd_for_path = dict()
self._path_for_wd = dict()
self._path = path
self._event_mask = event_mask
self._is_recursive = recursive
self._add_dir_watch(path, recursive, event_mask)
self._moved_from_events = dict()
@property
def event_mask(self):
"""The event mask for this inotify instance."""
return self._event_mask
@property
def path(self):
"""The path associated with the inotify instance."""
return self._path
@property
def is_recursive(self):
"""Whether we are watching directories recursively."""
return self._is_recursive
@property
def fd(self):
"""The file descriptor associated with the inotify instance."""
return self._inotify_fd
def clear_move_records(self):
"""Clear cached records of MOVED_FROM events"""
self._moved_from_events = dict()
def source_for_move(self, destination_event):
"""
The source path corresponding to the given MOVED_TO event.
If the source path is outside the monitored directories, None
is returned instead.
"""
if destination_event.cookie in self._moved_from_events:
return self._moved_from_events[destination_event.cookie].src_path
else:
return None
def remember_move_from_event(self, event):
"""
Save this event as the source event for future MOVED_TO events to
reference.
"""
self._moved_from_events[event.cookie] = event
def add_watch(self, path):
"""
Adds a watch for the given path.
:param path:
Path to begin monitoring.
"""
with self._lock:
self._add_watch(path, self._event_mask)
def remove_watch(self, path):
"""
Removes a watch for the given path.
:param path:
Path string for which the watch will be removed.
"""
with self._lock:
wd = self._wd_for_path.pop(path)
del self._path_for_wd[wd]
if inotify_rm_watch(self._inotify_fd, wd) == -1:
Inotify._raise_error()
def close(self):
"""
Closes the inotify instance and removes all associated watches.
"""
with self._lock:
if self._path in self._wd_for_path:
wd = self._wd_for_path[self._path]
inotify_rm_watch(self._inotify_fd, wd)
os.close(self._inotify_fd)
def read_events(self, event_buffer_size=DEFAULT_EVENT_BUFFER_SIZE):
"""
Reads events from inotify and yields them.
"""
# HACK: We need to traverse the directory path
# recursively and simulate events for newly
# created subdirectories/files. This will handle
# mkdir -p foobar/blah/bar; touch foobar/afile
def _recursive_simulate(src_path):
events = []
for root, dirnames, filenames in os.walk(src_path):
for dirname in dirnames:
try:
full_path = os.path.join(root, dirname)
wd_dir = self._add_watch(full_path, self._event_mask)
e = InotifyEvent(
wd_dir, InotifyConstants.IN_CREATE | InotifyConstants.IN_ISDIR, 0, dirname, full_path)
events.append(e)
except OSError:
pass
for filename in filenames:
full_path = os.path.join(root, filename)
wd_parent_dir = self._wd_for_path[os.path.dirname(full_path)]
e = InotifyEvent(
wd_parent_dir, InotifyConstants.IN_CREATE, 0, filename, full_path)
events.append(e)
return events
event_buffer = None
while True:
try:
event_buffer = os.read(self._inotify_fd, event_buffer_size)
except OSError as e:
if e.errno == errno.EINTR:
continue
break
with self._lock:
event_list = []
for wd, mask, cookie, name in Inotify._parse_event_buffer(event_buffer):
if wd == -1:
continue
wd_path = self._path_for_wd[wd]
src_path = os.path.join(wd_path, name) if name else wd_path #avoid trailing slash
inotify_event = InotifyEvent(wd, mask, cookie, name, src_path)
if inotify_event.is_moved_from:
self.remember_move_from_event(inotify_event)
elif inotify_event.is_moved_to:
move_src_path = self.source_for_move(inotify_event)
if move_src_path in self._wd_for_path:
moved_wd = self._wd_for_path[move_src_path]
del self._wd_for_path[move_src_path]
self._wd_for_path[inotify_event.src_path] = moved_wd
self._path_for_wd[moved_wd] = inotify_event.src_path
src_path = os.path.join(wd_path, name)
inotify_event = InotifyEvent(wd, mask, cookie, name, src_path)
if inotify_event.is_ignored:
# Clean up book-keeping for deleted watches.
path = self._path_for_wd.pop(wd)
if self._wd_for_path[path] == wd:
del self._wd_for_path[path]
continue
event_list.append(inotify_event)
if (self.is_recursive and inotify_event.is_directory and
inotify_event.is_create):
# TODO: When a directory from another part of the
# filesystem is moved into a watched directory, this
# will not generate events for the directory tree.
# We need to coalesce IN_MOVED_TO events and those
# IN_MOVED_TO events which don't pair up with
# IN_MOVED_FROM events should be marked IN_CREATE
# instead relative to this directory.
try:
self._add_watch(src_path, self._event_mask)
except OSError:
continue
event_list.extend(_recursive_simulate(src_path))
return event_list
# Non-synchronized methods.
def _add_dir_watch(self, path, recursive, mask):
"""
Adds a watch (optionally recursively) for the given directory path
to monitor events specified by the mask.
:param path:
Path to monitor
:param recursive:
``True`` to monitor recursively.
:param mask:
Event bit mask.
"""
if not os.path.isdir(path):
raise OSError('Path is not a directory')
self._add_watch(path, mask)
if recursive:
for root, dirnames, _ in os.walk(path):
for dirname in dirnames:
full_path = os.path.join(root, dirname)
if os.path.islink(full_path):
continue
self._add_watch(full_path, mask)
def _add_watch(self, path, mask):
"""
Adds a watch for the given path to monitor events specified by the
mask.
:param path:
Path to monitor
:param mask:
Event bit mask.
"""
wd = inotify_add_watch(self._inotify_fd, path, mask)
if wd == -1:
Inotify._raise_error()
self._wd_for_path[path] = wd
self._path_for_wd[wd] = path
return wd
@staticmethod
def _raise_error():
"""
Raises errors for inotify failures.
"""
err = ctypes.get_errno()
if err == errno.ENOSPC:
raise OSError("inotify watch limit reached")
elif err == errno.EMFILE:
raise OSError("inotify instance limit reached")
else:
raise OSError(os.strerror(err))
@staticmethod
def _parse_event_buffer(event_buffer):
"""
Parses an event buffer of ``inotify_event`` structs returned by
inotify::
struct inotify_event {
__s32 wd; /* watch descriptor */
__u32 mask; /* watch mask */
__u32 cookie; /* cookie to synchronize two events */
__u32 len; /* length (including nulls) of name */
char name[0]; /* stub for possible name */
};
The ``cookie`` member of this struct is used to pair two related
events, for example, it pairs an IN_MOVED_FROM event with an
IN_MOVED_TO event.
"""
i = 0
while i + 16 <= len(event_buffer):
wd, mask, cookie, length = struct.unpack_from('iIII', event_buffer, i)
name = event_buffer[i + 16:i + 16 + length].rstrip(b'\0')
i += 16 + length
yield wd, mask, cookie, name
class InotifyEvent(object):
"""
Inotify event struct wrapper.
:param wd:
Watch descriptor
:param mask:
Event mask
:param cookie:
Event cookie
:param name:
Event name.
:param src_path:
Event source path
"""
def __init__(self, wd, mask, cookie, name, src_path):
self._wd = wd
self._mask = mask
self._cookie = cookie
self._name = name
self._src_path = src_path
@property
def src_path(self):
return self._src_path
@property
def wd(self):
return self._wd
@property
def mask(self):
return self._mask
@property
def cookie(self):
return self._cookie
@property
def name(self):
return self._name
@property
def is_modify(self):
return self._mask & InotifyConstants.IN_MODIFY > 0
@property
def is_close_write(self):
return self._mask & InotifyConstants.IN_CLOSE_WRITE > 0
@property
def is_close_nowrite(self):
return self._mask & InotifyConstants.IN_CLOSE_NOWRITE > 0
@property
def is_access(self):
return self._mask & InotifyConstants.IN_ACCESS > 0
@property
def is_delete(self):
return self._mask & InotifyConstants.IN_DELETE > 0
@property
def is_delete_self(self):
return self._mask & InotifyConstants.IN_DELETE_SELF > 0
@property
def is_create(self):
return self._mask & InotifyConstants.IN_CREATE > 0
@property
def is_moved_from(self):
return self._mask & InotifyConstants.IN_MOVED_FROM > 0
@property
def is_moved_to(self):
return self._mask & InotifyConstants.IN_MOVED_TO > 0
@property
def is_move(self):
return self._mask & InotifyConstants.IN_MOVE > 0
@property
def is_move_self(self):
return self._mask & InotifyConstants.IN_MOVE_SELF > 0
@property
def is_attrib(self):
return self._mask & InotifyConstants.IN_ATTRIB > 0
@property
def is_ignored(self):
return self._mask & InotifyConstants.IN_IGNORED > 0
@property
def is_directory(self):
# It looks like the kernel does not provide this information for
# IN_DELETE_SELF and IN_MOVE_SELF. In this case, assume it's a dir.
# See also: https://github.com/seb-m/pyinotify/blob/2c7e8f8/python2/pyinotify.py#L897
return (self.is_delete_self or self.is_move_self or
self._mask & InotifyConstants.IN_ISDIR > 0)
@property
def key(self):
return self._src_path, self._wd, self._mask, self._cookie, self._name
def __eq__(self, inotify_event):
return self.key == inotify_event.key
def __ne__(self, inotify_event):
return self.key == inotify_event.key
def __hash__(self):
return hash(self.key)
@staticmethod
def _get_mask_string(mask):
masks = []
for c in dir(InotifyConstants):
if c.startswith('IN_') and c not in ['IN_ALL_EVENTS', 'IN_CLOSE', 'IN_MOVE']:
c_val = getattr(InotifyConstants, c)
if mask & c_val:
masks.append(c)
mask_string = '|'.join(masks)
return mask_string
def __repr__(self):
mask_string = self._get_mask_string(self.mask)
s = "<InotifyEvent: src_path=%s, wd=%d, mask=%s, cookie=%d, name=%s>"
return s % (self.src_path, self.wd, mask_string, self.cookie, self.name)
@@ -0,0 +1,726 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com>
# Copyright 2012 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
:module: watchdog.observers.kqueue
:synopsis: ``kqueue(2)`` based emitter implementation.
:author: yesudeep@google.com (Yesudeep Mangalapilly)
:platforms: Mac OS X and BSD with kqueue(2).
.. WARNING:: kqueue is a very heavyweight way to monitor file systems.
Each kqueue-detected directory modification triggers
a full directory scan. Traversing the entire directory tree
and opening file descriptors for all files will create
performance problems. We need to find a way to re-scan
only those directories which report changes and do a diff
between two sub-DirectorySnapshots perhaps.
.. ADMONITION:: About ``select.kqueue`` and Python versions
* Python 2.5 does not ship with ``select.kqueue``
* Python 2.6 ships with a broken ``select.kqueue`` that cannot take
multiple events in the event list passed to ``kqueue.control``.
* Python 2.7 ships with a working ``select.kqueue``
implementation.
I have backported the Python 2.7 implementation to Python 2.5 and 2.6
in the ``select_backport`` package available on PyPI.
.. ADMONITION:: About OS X performance guidelines
Quote from the `Mac OS X File System Performance Guidelines`_:
"When you only want to track changes on a file or directory, be sure to
open it using the ``O_EVTONLY`` flag. This flag prevents the file or
directory from being marked as open or in use. This is important
if you are tracking files on a removable volume and the user tries to
unmount the volume. With this flag in place, the system knows it can
dismiss the volume. If you had opened the files or directories without
this flag, the volume would be marked as busy and would not be
unmounted."
``O_EVTONLY`` is defined as ``0x8000`` in the OS X header files.
More information here: http://www.mlsite.net/blog/?p=2312
Classes
-------
.. autoclass:: KqueueEmitter
:members:
:show-inheritance:
Collections and Utility Classes
-------------------------------
.. autoclass:: KeventDescriptor
:members:
:show-inheritance:
.. autoclass:: KeventDescriptorSet
:members:
:show-inheritance:
.. _Mac OS X File System Performance Guidelines: http://developer.apple.com/library/ios/#documentation/Performance/Conceptual/FileSystem/Articles/TrackingChanges.html#//apple_ref/doc/uid/20001993-CJBJFIDD
"""
from __future__ import with_statement
from watchdog.utils import platform
import threading
import errno
import sys
import stat
import os
# See the notes for this module in the documentation above ^.
#import select
# if not has_attribute(select, 'kqueue') or sys.version_info < (2, 7, 0):
if sys.version_info < (2, 7, 0):
import select_backport as select
else:
import select
from pathtools.path import absolute_path
from watchdog.observers.api import (
BaseObserver,
EventEmitter,
DEFAULT_OBSERVER_TIMEOUT,
DEFAULT_EMITTER_TIMEOUT
)
from watchdog.utils.dirsnapshot import DirectorySnapshot
from watchdog.events import (
DirMovedEvent,
DirDeletedEvent,
DirCreatedEvent,
DirModifiedEvent,
FileMovedEvent,
FileDeletedEvent,
FileCreatedEvent,
FileModifiedEvent,
EVENT_TYPE_MOVED,
EVENT_TYPE_DELETED,
EVENT_TYPE_CREATED
)
# Maximum number of events to process.
MAX_EVENTS = 4096
# O_EVTONLY value from the header files for OS X only.
O_EVTONLY = 0x8000
# Pre-calculated values for the kevent filter, flags, and fflags attributes.
if platform.is_darwin():
WATCHDOG_OS_OPEN_FLAGS = O_EVTONLY
else:
WATCHDOG_OS_OPEN_FLAGS = os.O_RDONLY | os.O_NONBLOCK
WATCHDOG_KQ_FILTER = select.KQ_FILTER_VNODE
WATCHDOG_KQ_EV_FLAGS = select.KQ_EV_ADD | select.KQ_EV_ENABLE | select.KQ_EV_CLEAR
WATCHDOG_KQ_FFLAGS = (
select.KQ_NOTE_DELETE |
select.KQ_NOTE_WRITE |
select.KQ_NOTE_EXTEND |
select.KQ_NOTE_ATTRIB |
select.KQ_NOTE_LINK |
select.KQ_NOTE_RENAME |
select.KQ_NOTE_REVOKE
)
# Flag tests.
def is_deleted(kev):
"""Determines whether the given kevent represents deletion."""
return kev.fflags & select.KQ_NOTE_DELETE
def is_modified(kev):
"""Determines whether the given kevent represents modification."""
fflags = kev.fflags
return (fflags & select.KQ_NOTE_EXTEND) or (fflags & select.KQ_NOTE_WRITE)
def is_attrib_modified(kev):
"""Determines whether the given kevent represents attribute modification."""
return kev.fflags & select.KQ_NOTE_ATTRIB
def is_renamed(kev):
"""Determines whether the given kevent represents movement."""
return kev.fflags & select.KQ_NOTE_RENAME
class KeventDescriptorSet(object):
"""
Thread-safe kevent descriptor collection.
"""
def __init__(self):
# Set of KeventDescriptor
self._descriptors = set()
# Descriptor for a given path.
self._descriptor_for_path = dict()
# Descriptor for a given fd.
self._descriptor_for_fd = dict()
# List of kevent objects.
self._kevents = list()
self._lock = threading.Lock()
@property
def kevents(self):
"""
List of kevents monitored.
"""
with self._lock:
return self._kevents
@property
def paths(self):
"""
List of paths for which kevents have been created.
"""
with self._lock:
return list(self._descriptor_for_path.keys())
def get_for_fd(self, fd):
"""
Given a file descriptor, returns the kevent descriptor object
for it.
:param fd:
OS file descriptor.
:type fd:
``int``
:returns:
A :class:`KeventDescriptor` object.
"""
with self._lock:
return self._descriptor_for_fd[fd]
def get(self, path):
"""
Obtains a :class:`KeventDescriptor` object for the specified path.
:param path:
Path for which the descriptor will be obtained.
"""
with self._lock:
path = absolute_path(path)
return self._get(path)
def __contains__(self, path):
"""
Determines whether a :class:`KeventDescriptor has been registered
for the specified path.
:param path:
Path for which the descriptor will be obtained.
"""
with self._lock:
path = absolute_path(path)
return self._has_path(path)
def add(self, path, is_directory):
"""
Adds a :class:`KeventDescriptor` to the collection for the given
path.
:param path:
The path for which a :class:`KeventDescriptor` object will be
added.
:param is_directory:
``True`` if the path refers to a directory; ``False`` otherwise.
:type is_directory:
``bool``
"""
with self._lock:
path = absolute_path(path)
if not self._has_path(path):
self._add_descriptor(KeventDescriptor(path, is_directory))
def remove(self, path):
"""
Removes the :class:`KeventDescriptor` object for the given path
if it already exists.
:param path:
Path for which the :class:`KeventDescriptor` object will be
removed.
"""
with self._lock:
path = absolute_path(path)
if self._has_path(path):
self._remove_descriptor(self._get(path))
def clear(self):
"""
Clears the collection and closes all open descriptors.
"""
with self._lock:
for descriptor in self._descriptors:
descriptor.close()
self._descriptors.clear()
self._descriptor_for_fd.clear()
self._descriptor_for_path.clear()
self._kevents = []
# Thread-unsafe methods. Locking is provided at a higher level.
def _get(self, path):
"""Returns a kevent descriptor for a given path."""
return self._descriptor_for_path[path]
def _has_path(self, path):
"""Determines whether a :class:`KeventDescriptor` for the specified
path exists already in the collection."""
return path in self._descriptor_for_path
def _add_descriptor(self, descriptor):
"""
Adds a descriptor to the collection.
:param descriptor:
An instance of :class:`KeventDescriptor` to be added.
"""
self._descriptors.add(descriptor)
self._kevents.append(descriptor.kevent)
self._descriptor_for_path[descriptor.path] = descriptor
self._descriptor_for_fd[descriptor.fd] = descriptor
def _remove_descriptor(self, descriptor):
"""
Removes a descriptor from the collection.
:param descriptor:
An instance of :class:`KeventDescriptor` to be removed.
"""
self._descriptors.remove(descriptor)
del self._descriptor_for_fd[descriptor.fd]
del self._descriptor_for_path[descriptor.path]
self._kevents.remove(descriptor.kevent)
descriptor.close()
class KeventDescriptor(object):
"""
A kevent descriptor convenience data structure to keep together:
* kevent
* directory status
* path
* file descriptor
:param path:
Path string for which a kevent descriptor will be created.
:param is_directory:
``True`` if the path refers to a directory; ``False`` otherwise.
:type is_directory:
``bool``
"""
def __init__(self, path, is_directory):
self._path = absolute_path(path)
self._is_directory = is_directory
self._fd = os.open(path, WATCHDOG_OS_OPEN_FLAGS)
self._kev = select.kevent(self._fd,
filter=WATCHDOG_KQ_FILTER,
flags=WATCHDOG_KQ_EV_FLAGS,
fflags=WATCHDOG_KQ_FFLAGS)
@property
def fd(self):
"""OS file descriptor for the kevent descriptor."""
return self._fd
@property
def path(self):
"""The path associated with the kevent descriptor."""
return self._path
@property
def kevent(self):
"""The kevent object associated with the kevent descriptor."""
return self._kev
@property
def is_directory(self):
"""Determines whether the kevent descriptor refers to a directory.
:returns:
``True`` or ``False``
"""
return self._is_directory
def close(self):
"""
Closes the file descriptor associated with a kevent descriptor.
"""
try:
os.close(self.fd)
except OSError:
pass
@property
def key(self):
return (self.path, self.is_directory)
def __eq__(self, descriptor):
return self.key == descriptor.key
def __ne__(self, descriptor):
return self.key != descriptor.key
def __hash__(self):
return hash(self.key)
def __repr__(self):
return "<KeventDescriptor: path=%s, is_directory=%s>"\
% (self.path, self.is_directory)
class KqueueEmitter(EventEmitter):
"""
kqueue(2)-based event emitter.
.. ADMONITION:: About ``kqueue(2)`` behavior and this implementation
``kqueue(2)`` monitors file system events only for
open descriptors, which means, this emitter does a lot of
book-keeping behind the scenes to keep track of open
descriptors for every entry in the monitored directory tree.
This also means the number of maximum open file descriptors
on your system must be increased **manually**.
Usually, issuing a call to ``ulimit`` should suffice::
ulimit -n 1024
Ensure that you pick a number that is larger than the
number of files you expect to be monitored.
``kqueue(2)`` does not provide enough information about the
following things:
* The destination path of a file or directory that is renamed.
* Creation of a file or directory within a directory; in this
case, ``kqueue(2)`` only indicates a modified event on the
parent directory.
Therefore, this emitter takes a snapshot of the directory
tree when ``kqueue(2)`` detects a change on the file system
to be able to determine the above information.
:param event_queue:
The event queue to fill with events.
:param watch:
A watch object representing the directory to monitor.
:type watch:
:class:`watchdog.observers.api.ObservedWatch`
:param timeout:
Read events blocking timeout (in seconds).
:type timeout:
``float``
"""
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
EventEmitter.__init__(self, event_queue, watch, timeout)
self._kq = select.kqueue()
self._lock = threading.RLock()
# A collection of KeventDescriptor.
self._descriptors = KeventDescriptorSet()
def walker_callback(path, stat_info, self=self):
self._register_kevent(path, stat.S_ISDIR(stat_info.st_mode))
self._snapshot = DirectorySnapshot(watch.path,
watch.is_recursive,
walker_callback)
def _register_kevent(self, path, is_directory):
"""
Registers a kevent descriptor for the given path.
:param path:
Path for which a kevent descriptor will be created.
:param is_directory:
``True`` if the path refers to a directory; ``False`` otherwise.
:type is_directory:
``bool``
"""
try:
self._descriptors.add(path, is_directory)
except OSError as e:
if e.errno == errno.ENOENT:
# Probably dealing with a temporary file that was created
# and then quickly deleted before we could open
# a descriptor for it. Therefore, simply queue a sequence
# of created and deleted events for the path.
#path = absolute_path(path)
# if is_directory:
# self.queue_event(DirCreatedEvent(path))
# self.queue_event(DirDeletedEvent(path))
# else:
# self.queue_event(FileCreatedEvent(path))
# self.queue_event(FileDeletedEvent(path))
# TODO: We could simply ignore these files.
# Locked files cause the python process to die with
# a bus error when we handle temporary files.
# eg. .git/index.lock when running tig operations.
# I don't fully understand this at the moment.
pass
else:
# All other errors are propagated.
raise
def _unregister_kevent(self, path):
"""
Convenience function to close the kevent descriptor for a
specified kqueue-monitored path.
:param path:
Path for which the kevent descriptor will be closed.
"""
self._descriptors.remove(path)
def queue_event(self, event):
"""
Handles queueing a single event object.
:param event:
An instance of :class:`watchdog.events.FileSystemEvent`
or a subclass.
"""
# Handles all the book keeping for queued events.
# We do not need to fire moved/deleted events for all subitems in
# a directory tree here, because this function is called by kqueue
# for all those events anyway.
EventEmitter.queue_event(self, event)
if event.event_type == EVENT_TYPE_CREATED:
self._register_kevent(event.src_path, event.is_directory)
elif event.event_type == EVENT_TYPE_MOVED:
self._unregister_kevent(event.src_path)
self._register_kevent(event.dest_path, event.is_directory)
elif event.event_type == EVENT_TYPE_DELETED:
self._unregister_kevent(event.src_path)
def _queue_dirs_modified(self,
dirs_modified,
ref_snapshot,
new_snapshot):
"""
Queues events for directory modifications by scanning the directory
for changes.
A scan is a comparison between two snapshots of the same directory
taken at two different times. This also determines whether files
or directories were created, which updated the modified timestamp
for the directory.
"""
if dirs_modified:
for dir_modified in dirs_modified:
self.queue_event(DirModifiedEvent(dir_modified))
diff_events = new_snapshot - ref_snapshot
for file_created in diff_events.files_created:
self.queue_event(FileCreatedEvent(file_created))
for directory_created in diff_events.dirs_created:
self.queue_event(DirCreatedEvent(directory_created))
def _queue_events_except_renames_and_dir_modifications(self, event_list):
"""
Queues events from the kevent list returned from the call to
:meth:`select.kqueue.control`.
.. NOTE:: Queues only the deletions, file modifications,
attribute modifications. The other events, namely,
file creation, directory modification, file rename,
directory rename, directory creation, etc. are
determined by comparing directory snapshots.
"""
files_renamed = set()
dirs_renamed = set()
dirs_modified = set()
for kev in event_list:
descriptor = self._descriptors.get_for_fd(kev.ident)
src_path = descriptor.path
if is_deleted(kev):
if descriptor.is_directory:
self.queue_event(DirDeletedEvent(src_path))
else:
self.queue_event(FileDeletedEvent(src_path))
elif is_attrib_modified(kev):
if descriptor.is_directory:
self.queue_event(DirModifiedEvent(src_path))
else:
self.queue_event(FileModifiedEvent(src_path))
elif is_modified(kev):
if descriptor.is_directory:
# When a directory is modified, it may be due to
# sub-file/directory renames or new file/directory
# creation. We determine all this by comparing
# snapshots later.
dirs_modified.add(src_path)
else:
self.queue_event(FileModifiedEvent(src_path))
elif is_renamed(kev):
# Kqueue does not specify the destination names for renames
# to, so we have to process these after taking a snapshot
# of the directory.
if descriptor.is_directory:
dirs_renamed.add(src_path)
else:
files_renamed.add(src_path)
return files_renamed, dirs_renamed, dirs_modified
def _queue_renamed(self,
src_path,
is_directory,
ref_snapshot,
new_snapshot):
"""
Compares information from two directory snapshots (one taken before
the rename operation and another taken right after) to determine the
destination path of the file system object renamed, and adds
appropriate events to the event queue.
"""
try:
ref_stat_info = ref_snapshot.stat_info(src_path)
except KeyError:
# Probably caught a temporary file/directory that was renamed
# and deleted. Fires a sequence of created and deleted events
# for the path.
if is_directory:
self.queue_event(DirCreatedEvent(src_path))
self.queue_event(DirDeletedEvent(src_path))
else:
self.queue_event(FileCreatedEvent(src_path))
self.queue_event(FileDeletedEvent(src_path))
# We don't process any further and bail out assuming
# the event represents deletion/creation instead of movement.
return
try:
dest_path = absolute_path(
new_snapshot.path_for_inode(ref_stat_info.st_ino))
if is_directory:
event = DirMovedEvent(src_path, dest_path)
# TODO: Do we need to fire moved events for the items
# inside the directory tree? Does kqueue does this
# all by itself? Check this and then enable this code
# only if it doesn't already.
# A: It doesn't. So I've enabled this block.
if self.watch.is_recursive:
for sub_event in event.sub_moved_events():
self.queue_event(sub_event)
self.queue_event(event)
else:
self.queue_event(FileMovedEvent(src_path, dest_path))
except KeyError:
# If the new snapshot does not have an inode for the
# old path, we haven't found the new name. Therefore,
# we mark it as deleted and remove unregister the path.
if is_directory:
self.queue_event(DirDeletedEvent(src_path))
else:
self.queue_event(FileDeletedEvent(src_path))
def _read_events(self, timeout=None):
"""
Reads events from a call to the blocking
:meth:`select.kqueue.control()` method.
:param timeout:
Blocking timeout for reading events.
:type timeout:
``float`` (seconds)
"""
return self._kq.control(self._descriptors.kevents,
MAX_EVENTS,
timeout)
def queue_events(self, timeout):
"""
Queues events by reading them from a call to the blocking
:meth:`select.kqueue.control()` method.
:param timeout:
Blocking timeout for reading events.
:type timeout:
``float`` (seconds)
"""
with self._lock:
try:
event_list = self._read_events(timeout)
files_renamed, dirs_renamed, dirs_modified = (
self._queue_events_except_renames_and_dir_modifications(event_list))
# Take a fresh snapshot of the directory and update the
# saved snapshot.
new_snapshot = DirectorySnapshot(self.watch.path,
self.watch.is_recursive)
ref_snapshot = self._snapshot
self._snapshot = new_snapshot
if files_renamed or dirs_renamed or dirs_modified:
for src_path in files_renamed:
self._queue_renamed(src_path,
False,
ref_snapshot,
new_snapshot)
for src_path in dirs_renamed:
self._queue_renamed(src_path,
True,
ref_snapshot,
new_snapshot)
self._queue_dirs_modified(dirs_modified,
ref_snapshot,
new_snapshot)
except OSError as e:
if e.errno == errno.EBADF:
# logging.debug(e)
pass
else:
raise
def on_thread_stop(self):
# Clean up.
with self._lock:
self._descriptors.clear()
self._kq.close()
class KqueueObserver(BaseObserver):
"""
Observer thread that schedules watching directories and dispatches
calls to event handlers.
"""
def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
BaseObserver.__init__(self, emitter_class=KqueueEmitter, timeout=timeout)
@@ -0,0 +1,145 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com>
# Copyright 2012 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
:module: watchdog.observers.polling
:synopsis: Polling emitter implementation.
:author: yesudeep@google.com (Yesudeep Mangalapilly)
Classes
-------
.. autoclass:: PollingObserver
:members:
:show-inheritance:
.. autoclass:: PollingObserverVFS
:members:
:show-inheritance:
:special-members:
"""
from __future__ import with_statement
import os
import threading
from functools import partial
from watchdog.utils import stat as default_stat
from watchdog.utils.dirsnapshot import DirectorySnapshot, DirectorySnapshotDiff
from watchdog.observers.api import (
EventEmitter,
BaseObserver,
DEFAULT_OBSERVER_TIMEOUT,
DEFAULT_EMITTER_TIMEOUT
)
from watchdog.events import (
DirMovedEvent,
DirDeletedEvent,
DirCreatedEvent,
DirModifiedEvent,
FileMovedEvent,
FileDeletedEvent,
FileCreatedEvent,
FileModifiedEvent
)
class PollingEmitter(EventEmitter):
"""
Platform-independent emitter that polls a directory to detect file
system changes.
"""
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT,
stat=default_stat, listdir=os.listdir):
EventEmitter.__init__(self, event_queue, watch, timeout)
self._snapshot = None
self._lock = threading.Lock()
self._take_snapshot = lambda: DirectorySnapshot(
self.watch.path, self.watch.is_recursive, stat=stat, listdir=listdir)
def on_thread_start(self):
self._snapshot = self._take_snapshot()
def queue_events(self, timeout):
# We don't want to hit the disk continuously.
# timeout behaves like an interval for polling emitters.
if self.stopped_event.wait(timeout):
return
with self._lock:
if not self.should_keep_running():
return
# Get event diff between fresh snapshot and previous snapshot.
# Update snapshot.
try:
new_snapshot = self._take_snapshot()
except OSError:
self.queue_event(DirDeletedEvent(self.watch.path))
self.stop()
return
events = DirectorySnapshotDiff(self._snapshot, new_snapshot)
self._snapshot = new_snapshot
# Files.
for src_path in events.files_deleted:
self.queue_event(FileDeletedEvent(src_path))
for src_path in events.files_modified:
self.queue_event(FileModifiedEvent(src_path))
for src_path in events.files_created:
self.queue_event(FileCreatedEvent(src_path))
for src_path, dest_path in events.files_moved:
self.queue_event(FileMovedEvent(src_path, dest_path))
# Directories.
for src_path in events.dirs_deleted:
self.queue_event(DirDeletedEvent(src_path))
for src_path in events.dirs_modified:
self.queue_event(DirModifiedEvent(src_path))
for src_path in events.dirs_created:
self.queue_event(DirCreatedEvent(src_path))
for src_path, dest_path in events.dirs_moved:
self.queue_event(DirMovedEvent(src_path, dest_path))
class PollingObserver(BaseObserver):
"""
Platform-independent observer that polls a directory to detect file
system changes.
"""
def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
BaseObserver.__init__(self, emitter_class=PollingEmitter, timeout=timeout)
class PollingObserverVFS(BaseObserver):
"""
File system independent observer that polls a directory to detect changes.
"""
def __init__(self, stat, listdir, polling_interval=1):
"""
:param stat: stat function. See ``os.stat`` for details.
:param listdir: listdir function. See ``os.listdir`` for details.
:type polling_interval: float
:param polling_interval: interval in seconds between polling the file system.
"""
emitter_cls = partial(PollingEmitter, stat=stat, listdir=listdir)
BaseObserver.__init__(self, emitter_class=emitter_cls, timeout=polling_interval)
@@ -0,0 +1,133 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2011 Yesudeep Mangalapilly <yesudeep@gmail.com>
# Copyright 2012 Google, Inc.
# Copyright 2014 Thomas Amland
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import threading
import os.path
import time
from watchdog.events import (
DirCreatedEvent,
DirMovedEvent,
DirModifiedEvent,
FileCreatedEvent,
FileDeletedEvent,
FileMovedEvent,
FileModifiedEvent,
generate_sub_moved_events,
generate_sub_created_events,
)
from watchdog.observers.api import (
EventEmitter,
BaseObserver,
DEFAULT_OBSERVER_TIMEOUT,
DEFAULT_EMITTER_TIMEOUT
)
from watchdog.observers.winapi import (
read_events,
get_directory_handle,
close_directory_handle,
)
# HACK:
WATCHDOG_TRAVERSE_MOVED_DIR_DELAY = 1 # seconds
class WindowsApiEmitter(EventEmitter):
"""
Windows API-based emitter that uses ReadDirectoryChangesW
to detect file system changes for a watch.
"""
def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT):
EventEmitter.__init__(self, event_queue, watch, timeout)
self._lock = threading.Lock()
self._handle = None
def on_thread_start(self):
self._handle = get_directory_handle(self.watch.path)
def on_thread_stop(self):
if self._handle:
close_directory_handle(self._handle)
def queue_events(self, timeout):
winapi_events = read_events(self._handle, self.watch.is_recursive)
with self._lock:
last_renamed_src_path = ""
for winapi_event in winapi_events:
src_path = os.path.join(self.watch.path, winapi_event.src_path)
if winapi_event.is_renamed_old:
last_renamed_src_path = src_path
elif winapi_event.is_renamed_new:
dest_path = src_path
src_path = last_renamed_src_path
if os.path.isdir(dest_path):
event = DirMovedEvent(src_path, dest_path)
if self.watch.is_recursive:
# HACK: We introduce a forced delay before
# traversing the moved directory. This will read
# only file movement that finishes within this
# delay time.
time.sleep(WATCHDOG_TRAVERSE_MOVED_DIR_DELAY)
# The following block of code may not
# obtain moved events for the entire tree if
# the I/O is not completed within the above
# delay time. So, it's not guaranteed to work.
# TODO: Come up with a better solution, possibly
# a way to wait for I/O to complete before
# queuing events.
for sub_moved_event in generate_sub_moved_events(src_path, dest_path):
self.queue_event(sub_moved_event)
self.queue_event(event)
else:
self.queue_event(FileMovedEvent(src_path, dest_path))
elif winapi_event.is_modified:
cls = DirModifiedEvent if os.path.isdir(src_path) else FileModifiedEvent
self.queue_event(cls(src_path))
elif winapi_event.is_added:
isdir = os.path.isdir(src_path)
cls = DirCreatedEvent if isdir else FileCreatedEvent
self.queue_event(cls(src_path))
if isdir:
# If a directory is moved from outside the watched folder to inside it
# we only get a created directory event out of it, not any events for its children
# so use the same hack as for file moves to get the child events
time.sleep(WATCHDOG_TRAVERSE_MOVED_DIR_DELAY)
sub_events = generate_sub_created_events(src_path)
for sub_created_event in sub_events:
self.queue_event(sub_created_event)
elif winapi_event.is_removed:
self.queue_event(FileDeletedEvent(src_path))
class WindowsApiObserver(BaseObserver):
"""
Observer thread that schedules watching directories and dispatches
calls to event handlers.
"""
def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
BaseObserver.__init__(self, emitter_class=WindowsApiEmitter,
timeout=timeout)
@@ -0,0 +1,348 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# winapi.py: Windows API-Python interface (removes dependency on pywin32)
#
# Copyright (C) 2007 Thomas Heller <theller@ctypes.org>
# Copyright (C) 2010 Will McGugan <will@willmcgugan.com>
# Copyright (C) 2010 Ryan Kelly <ryan@rfk.id.au>
# Copyright (C) 2010 Yesudeep Mangalapilly <yesudeep@gmail.com>
# Copyright (C) 2014 Thomas Amland
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and / or other materials provided with the distribution.
# * Neither the name of the organization nor the names of its contributors may
# be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Portions of this code were taken from pyfilesystem, which uses the above
# new BSD license.
from __future__ import with_statement
import ctypes.wintypes
from functools import reduce
try:
LPVOID = ctypes.wintypes.LPVOID
except AttributeError:
# LPVOID wasn't defined in Py2.5, guess it was introduced in Py2.6
LPVOID = ctypes.c_void_p
# Invalid handle value.
INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value
# File notification contants.
FILE_NOTIFY_CHANGE_FILE_NAME = 0x01
FILE_NOTIFY_CHANGE_DIR_NAME = 0x02
FILE_NOTIFY_CHANGE_ATTRIBUTES = 0x04
FILE_NOTIFY_CHANGE_SIZE = 0x08
FILE_NOTIFY_CHANGE_LAST_WRITE = 0x010
FILE_NOTIFY_CHANGE_LAST_ACCESS = 0x020
FILE_NOTIFY_CHANGE_CREATION = 0x040
FILE_NOTIFY_CHANGE_SECURITY = 0x0100
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
FILE_FLAG_OVERLAPPED = 0x40000000
FILE_LIST_DIRECTORY = 1
FILE_SHARE_READ = 0x01
FILE_SHARE_WRITE = 0x02
FILE_SHARE_DELETE = 0x04
OPEN_EXISTING = 3
# File action constants.
FILE_ACTION_CREATED = 1
FILE_ACTION_DELETED = 2
FILE_ACTION_MODIFIED = 3
FILE_ACTION_RENAMED_OLD_NAME = 4
FILE_ACTION_RENAMED_NEW_NAME = 5
FILE_ACTION_OVERFLOW = 0xFFFF
# Aliases
FILE_ACTION_ADDED = FILE_ACTION_CREATED
FILE_ACTION_REMOVED = FILE_ACTION_DELETED
THREAD_TERMINATE = 0x0001
# IO waiting constants.
WAIT_ABANDONED = 0x00000080
WAIT_IO_COMPLETION = 0x000000C0
WAIT_OBJECT_0 = 0x00000000
WAIT_TIMEOUT = 0x00000102
# Error codes
ERROR_OPERATION_ABORTED = 995
class OVERLAPPED(ctypes.Structure):
_fields_ = [('Internal', LPVOID),
('InternalHigh', LPVOID),
('Offset', ctypes.wintypes.DWORD),
('OffsetHigh', ctypes.wintypes.DWORD),
('Pointer', LPVOID),
('hEvent', ctypes.wintypes.HANDLE),
]
def _errcheck_bool(value, func, args):
if not value:
raise ctypes.WinError()
return args
def _errcheck_handle(value, func, args):
if not value:
raise ctypes.WinError()
if value == INVALID_HANDLE_VALUE:
raise ctypes.WinError()
return args
def _errcheck_dword(value, func, args):
if value == 0xFFFFFFFF:
raise ctypes.WinError()
return args
ReadDirectoryChangesW = ctypes.windll.kernel32.ReadDirectoryChangesW
ReadDirectoryChangesW.restype = ctypes.wintypes.BOOL
ReadDirectoryChangesW.errcheck = _errcheck_bool
ReadDirectoryChangesW.argtypes = (
ctypes.wintypes.HANDLE, # hDirectory
LPVOID, # lpBuffer
ctypes.wintypes.DWORD, # nBufferLength
ctypes.wintypes.BOOL, # bWatchSubtree
ctypes.wintypes.DWORD, # dwNotifyFilter
ctypes.POINTER(ctypes.wintypes.DWORD), # lpBytesReturned
ctypes.POINTER(OVERLAPPED), # lpOverlapped
LPVOID # FileIOCompletionRoutine # lpCompletionRoutine
)
CreateFileW = ctypes.windll.kernel32.CreateFileW
CreateFileW.restype = ctypes.wintypes.HANDLE
CreateFileW.errcheck = _errcheck_handle
CreateFileW.argtypes = (
ctypes.wintypes.LPCWSTR, # lpFileName
ctypes.wintypes.DWORD, # dwDesiredAccess
ctypes.wintypes.DWORD, # dwShareMode
LPVOID, # lpSecurityAttributes
ctypes.wintypes.DWORD, # dwCreationDisposition
ctypes.wintypes.DWORD, # dwFlagsAndAttributes
ctypes.wintypes.HANDLE # hTemplateFile
)
CloseHandle = ctypes.windll.kernel32.CloseHandle
CloseHandle.restype = ctypes.wintypes.BOOL
CloseHandle.argtypes = (
ctypes.wintypes.HANDLE, # hObject
)
CancelIoEx = ctypes.windll.kernel32.CancelIoEx
CancelIoEx.restype = ctypes.wintypes.BOOL
CancelIoEx.errcheck = _errcheck_bool
CancelIoEx.argtypes = (
ctypes.wintypes.HANDLE, # hObject
ctypes.POINTER(OVERLAPPED) # lpOverlapped
)
CreateEvent = ctypes.windll.kernel32.CreateEventW
CreateEvent.restype = ctypes.wintypes.HANDLE
CreateEvent.errcheck = _errcheck_handle
CreateEvent.argtypes = (
LPVOID, # lpEventAttributes
ctypes.wintypes.BOOL, # bManualReset
ctypes.wintypes.BOOL, # bInitialState
ctypes.wintypes.LPCWSTR, # lpName
)
SetEvent = ctypes.windll.kernel32.SetEvent
SetEvent.restype = ctypes.wintypes.BOOL
SetEvent.errcheck = _errcheck_bool
SetEvent.argtypes = (
ctypes.wintypes.HANDLE, # hEvent
)
WaitForSingleObjectEx = ctypes.windll.kernel32.WaitForSingleObjectEx
WaitForSingleObjectEx.restype = ctypes.wintypes.DWORD
WaitForSingleObjectEx.errcheck = _errcheck_dword
WaitForSingleObjectEx.argtypes = (
ctypes.wintypes.HANDLE, # hObject
ctypes.wintypes.DWORD, # dwMilliseconds
ctypes.wintypes.BOOL, # bAlertable
)
CreateIoCompletionPort = ctypes.windll.kernel32.CreateIoCompletionPort
CreateIoCompletionPort.restype = ctypes.wintypes.HANDLE
CreateIoCompletionPort.errcheck = _errcheck_handle
CreateIoCompletionPort.argtypes = (
ctypes.wintypes.HANDLE, # FileHandle
ctypes.wintypes.HANDLE, # ExistingCompletionPort
LPVOID, # CompletionKey
ctypes.wintypes.DWORD, # NumberOfConcurrentThreads
)
GetQueuedCompletionStatus = ctypes.windll.kernel32.GetQueuedCompletionStatus
GetQueuedCompletionStatus.restype = ctypes.wintypes.BOOL
GetQueuedCompletionStatus.errcheck = _errcheck_bool
GetQueuedCompletionStatus.argtypes = (
ctypes.wintypes.HANDLE, # CompletionPort
LPVOID, # lpNumberOfBytesTransferred
LPVOID, # lpCompletionKey
ctypes.POINTER(OVERLAPPED), # lpOverlapped
ctypes.wintypes.DWORD, # dwMilliseconds
)
PostQueuedCompletionStatus = ctypes.windll.kernel32.PostQueuedCompletionStatus
PostQueuedCompletionStatus.restype = ctypes.wintypes.BOOL
PostQueuedCompletionStatus.errcheck = _errcheck_bool
PostQueuedCompletionStatus.argtypes = (
ctypes.wintypes.HANDLE, # CompletionPort
ctypes.wintypes.DWORD, # lpNumberOfBytesTransferred
ctypes.wintypes.DWORD, # lpCompletionKey
ctypes.POINTER(OVERLAPPED), # lpOverlapped
)
class FILE_NOTIFY_INFORMATION(ctypes.Structure):
_fields_ = [("NextEntryOffset", ctypes.wintypes.DWORD),
("Action", ctypes.wintypes.DWORD),
("FileNameLength", ctypes.wintypes.DWORD),
#("FileName", (ctypes.wintypes.WCHAR * 1))]
("FileName", (ctypes.c_char * 1))]
LPFNI = ctypes.POINTER(FILE_NOTIFY_INFORMATION)
# We don't need to recalculate these flags every time a call is made to
# the win32 API functions.
WATCHDOG_FILE_FLAGS = FILE_FLAG_BACKUP_SEMANTICS
WATCHDOG_FILE_SHARE_FLAGS = reduce(
lambda x, y: x | y, [
FILE_SHARE_READ,
FILE_SHARE_WRITE,
FILE_SHARE_DELETE,
])
WATCHDOG_FILE_NOTIFY_FLAGS = reduce(
lambda x, y: x | y, [
FILE_NOTIFY_CHANGE_FILE_NAME,
FILE_NOTIFY_CHANGE_DIR_NAME,
FILE_NOTIFY_CHANGE_ATTRIBUTES,
FILE_NOTIFY_CHANGE_SIZE,
FILE_NOTIFY_CHANGE_LAST_WRITE,
FILE_NOTIFY_CHANGE_SECURITY,
FILE_NOTIFY_CHANGE_LAST_ACCESS,
FILE_NOTIFY_CHANGE_CREATION,
])
BUFFER_SIZE = 2048
def _parse_event_buffer(readBuffer, nBytes):
results = []
while nBytes > 0:
fni = ctypes.cast(readBuffer, LPFNI)[0]
ptr = ctypes.addressof(fni) + FILE_NOTIFY_INFORMATION.FileName.offset
#filename = ctypes.wstring_at(ptr, fni.FileNameLength)
filename = ctypes.string_at(ptr, fni.FileNameLength)
results.append((fni.Action, filename.decode('utf-16')))
numToSkip = fni.NextEntryOffset
if numToSkip <= 0:
break
readBuffer = readBuffer[numToSkip:]
nBytes -= numToSkip # numToSkip is long. nBytes should be long too.
return results
def get_directory_handle(path):
"""Returns a Windows handle to the specified directory path."""
return CreateFileW(path, FILE_LIST_DIRECTORY, WATCHDOG_FILE_SHARE_FLAGS,
None, OPEN_EXISTING, WATCHDOG_FILE_FLAGS, None)
def close_directory_handle(handle):
try:
CancelIoEx(handle, None) # force ReadDirectoryChangesW to return
CloseHandle(handle) # close directory handle
except WindowsError:
try:
CloseHandle(handle) # close directory handle
except:
return
def read_directory_changes(handle, recursive):
"""Read changes to the directory using the specified directory handle.
http://timgolden.me.uk/pywin32-docs/win32file__ReadDirectoryChangesW_meth.html
"""
event_buffer = ctypes.create_string_buffer(BUFFER_SIZE)
nbytes = ctypes.wintypes.DWORD()
try:
ReadDirectoryChangesW(handle, ctypes.byref(event_buffer),
len(event_buffer), recursive,
WATCHDOG_FILE_NOTIFY_FLAGS,
ctypes.byref(nbytes), None, None)
except WindowsError as e:
if e.winerror == ERROR_OPERATION_ABORTED:
return [], 0
raise e
# Python 2/3 compat
try:
int_class = long
except NameError:
int_class = int
return event_buffer.raw, int_class(nbytes.value)
class WinAPINativeEvent(object):
def __init__(self, action, src_path):
self.action = action
self.src_path = src_path
@property
def is_added(self):
return self.action == FILE_ACTION_CREATED
@property
def is_removed(self):
return self.action == FILE_ACTION_REMOVED
@property
def is_modified(self):
return self.action == FILE_ACTION_MODIFIED
@property
def is_renamed_old(self):
return self.action == FILE_ACTION_RENAMED_OLD_NAME
@property
def is_renamed_new(self):
return self.action == FILE_ACTION_RENAMED_NEW_NAME
def __repr__(self):
return ("<WinAPINativeEvent: action=%d, src_path=%r>" % (self.action, self.src_path))
def read_events(handle, recursive):
buf, nbytes = read_directory_changes(handle, recursive)
events = _parse_event_buffer(buf, nbytes)
return [WinAPINativeEvent(action, path) for action, path in events]