diff --git a/edbob/commands.py b/edbob/commands.py index 6c47598..3505473 100644 --- a/edbob/commands.py +++ b/edbob/commands.py @@ -432,11 +432,6 @@ class FileMonitorCommand(Subcommand): uninstall = subparsers.add_parser('uninstall', help="Uninstall (remove) service") uninstall.set_defaults(subcommand='remove') - else: - parser.add_argument('-D', '--dont-daemonize', - action='store_false', dest='daemonize', - help="Don't daemonize when starting") - def get_win32_module(self): from edbob.filemon import win32 return win32 @@ -454,10 +449,10 @@ class FileMonitorCommand(Subcommand): from edbob.filemon import linux as filemon if args.subcommand == 'start': - filemon.start_daemon(self.appname, daemonize=args.daemonize) + filemon.start_daemon(self.appname) elif args.subcommand == 'stop': - filemon.stop_daemon() + filemon.stop_daemon(self.appname) elif sys.platform == 'win32': from edbob import win32 diff --git a/edbob/filemon/__init__.py b/edbob/filemon/__init__.py index 07662e5..205ac31 100644 --- a/edbob/filemon/__init__.py +++ b/edbob/filemon/__init__.py @@ -26,10 +26,18 @@ ``edbob.filemon`` -- File Monitoring Service """ +import os import os.path +import sys +import Queue import logging import edbob +from edbob.errors import email_exception + +if sys.platform == 'win32': + import win32api + from edbob.win32 import file_is_free log = logging.getLogger(__name__) @@ -65,6 +73,12 @@ class MonitorProfile(object): self.locks = edbob.config.getboolean( '%s.filemon' % appname, '%s.locks' % key, default=False) + self.process_existing = edbob.config.getboolean( + '%s.filemon' % appname, '%s.process_existing' % key, default=True) + + self.stop_on_error = edbob.config.getboolean( + '%s.filemon' % appname, '%s.stop_on_error' % key, default=False) + def get_monitor_profiles(appname): """ @@ -110,3 +124,105 @@ def get_monitor_profiles(appname): del monitored[key] return monitored + + +def queue_existing(profile, path): + """ + Adds files found in a watched folder to a processing queue. This is called + when the monitor first starts, to handle the case of files which exist + prior to startup. + + If files are found, they are first sorted by modification timestamp, using + a lexical sort on the filename as a tie-breaker, and then added to the + queue in that order. + + :param profile: Monitor profile for which the folder is to be watched. The + profile is expected to already have a queue attached; any existing files + will be added to this queue. + :type profile: :class:`edbob.filemon.MonitorProfile` instance + + :param path: Folder path which is to be checked for files. + :type path: string + + :returns: ``None`` + """ + + def sorter(x, y): + mtime_x = os.path.getmtime(x) + mtime_y = os.path.getmtime(y) + if mtime_x < mtime_y: + return -1 + if mtime_x > mtime_y: + return 1 + return cmp(x, y) + + paths = [os.path.join(path, x) for x in os.listdir(path)] + for path in sorted(paths, cmp=sorter): + + # Only process normal files. + if not os.path.isfile(path): + continue + + # If using locks, don't process "in transit" files. + if profile.locks and path.endswith('.lock'): + continue + + log.debug("queue_existing: queuing existing file for " + "profile '%s': %s" % (profile.key, path)) + profile.queue.put(path) + + +def perform_actions(profile): + """ + Callable target for action threads. + """ + + keep_going = True + while keep_going: + + try: + path = profile.queue.get_nowait() + except Queue.Empty: + pass + else: + + # In some cases, processing one file may cause other related files + # to also be processed. When this happens, a path on the queue may + # point to a file which no longer exists. + if not os.path.exists(path): + log.info("perform_actions: path does not exist: %s" % path) + continue + + log.debug("perform_actions: processing file: %s" % path) + + if sys.platform == 'win32': + while not file_is_free(path): + win32api.Sleep(0) + + for spec, func, args in profile.actions: + + log.info("perform_actions: calling function '%s' on file: %s" % + (spec, path)) + + try: + func(path, *args) + + except: + log.exception("perform_actions: exception occurred " + "while processing file: %s" % path) + email_exception() + + # Don't process any more files if the profile is so + # configured. + if profile.stop_on_error: + keep_going = False + + # Either way this particular file probably shouldn't be + # processed any further. + log.warning("perform_actions: no further processing " + "will be done for file: %s" % path) + break + + log.warning("perform_actions: error encountered, and configuration " + "dictates that no more actions will be processed for " + "profile: %s" % profile.key) diff --git a/edbob/filemon/linux.py b/edbob/filemon/linux.py index d5902cc..6d5ab83 100644 --- a/edbob/filemon/linux.py +++ b/edbob/filemon/linux.py @@ -27,14 +27,16 @@ """ import sys -import os import os.path -import signal -import logging import pyinotify +import threading +import Queue +import logging import edbob -from edbob.filemon import get_monitor_profiles +from edbob import filemon +from edbob.daemon import Daemon +from edbob.errors import email_exception log = logging.getLogger(__name__) @@ -45,9 +47,8 @@ class EventHandler(pyinotify.ProcessEvent): Event processor for file monitor daemon. """ - def my_init(self, actions=[], locks=False, **kwargs): - self.actions = actions - self.locks = locks + def my_init(self, profile=None, **kwargs): + self.profile = profile def process_IN_ACCESS(self, event): log.debug("EventHandler: IN_ACCESS: %s" % event.pathname) @@ -57,87 +58,85 @@ class EventHandler(pyinotify.ProcessEvent): def process_IN_CLOSE_WRITE(self, event): log.debug("EventHandler: IN_CLOSE_WRITE: %s" % event.pathname) - if not self.locks: - self.perform_actions(event.pathname) + if not self.profile.locks: + self.profile.queue.put(event.pathname) def process_IN_CREATE(self, event): log.debug("EventHandler: IN_CREATE: %s" % event.pathname) def process_IN_DELETE(self, event): log.debug("EventHandler: IN_DELETE: %s" % event.pathname) - if self.locks and event.pathname.endswith('.lock'): - self.perform_actions(event.pathname[:-5]) + if self.profile.locks and event.pathname.endswith('.lock'): + self.profile.queue.put(event.pathname[:-5]) def process_IN_MODIFY(self, event): log.debug("EventHandler: IN_MODIFY: %s" % event.pathname) def process_IN_MOVED_TO(self, event): log.debug("EventHandler: IN_MOVED_TO: %s" % event.pathname) - if not self.locks: - self.perform_actions(event.pathname) - - def perform_actions(self, path): - for spec, func, args in self.actions: - func(path, *args) + if not self.profile.locks: + self.profile.queue.put(event.pathname) -def get_pid_path(): - """ - Returns the path to the PID file for the file monitor daemon. - """ +class FileMonitorDaemon(Daemon): - basename = os.path.basename(sys.argv[0]) - pid_path = edbob.config.get('%s.filemon' % basename, 'pid_path') + def run(self): + + wm = pyinotify.WatchManager() + notifier = pyinotify.Notifier(wm) + + mask = (pyinotify.IN_ACCESS + | pyinotify.IN_ATTRIB + | pyinotify.IN_CLOSE_WRITE + | pyinotify.IN_CREATE + | pyinotify.IN_DELETE + | pyinotify.IN_MODIFY + | pyinotify.IN_MOVED_TO) + + monitored = filemon.get_monitor_profiles(self.appname) + for key, profile in monitored.iteritems(): + + # Create a file queue for the profile. + profile.queue = Queue.Queue() + + # Perform setup for each of the watched folders. + for path in profile.dirs: + + # Maybe put all pre-existing files in the queue. + if profile.process_existing: + filemon.queue_existing(profile, path) + + # Create a watch for the folder. + log.debug("start_daemon: profile '%s' watches folder: %s" % (key, path)) + wm.add_watch(path, mask, proc_fun=EventHandler(profile=profile)) + + # Create an action thread for the profile. + name = 'actions-%s' % key + log.debug("start_daemon: starting action thread: %s" % name) + thread = threading.Thread(target=filemon.perform_actions, + name=name, args=(profile,)) + thread.daemon = True + thread.start() + + # Fire up the watchers. + notifier.loop() + + +def get_daemon(appname=None): + if appname is None: + appname = os.path.basename(sys.argv[0]) + pid_path = edbob.config.get('%s.filemon' % appname, 'pid_path') if not pid_path: - pid_path = '/tmp/%s_filemon.pid' % basename - return pid_path + pid_path = '/tmp/%s_filemon.pid' % appname + + monitor = FileMonitorDaemon(pid_path) + monitor.appname = appname + return monitor -def start_daemon(appname, daemonize=True): - """ - Starts the file monitor daemon. - """ - - pid_path = get_pid_path() - if os.path.exists(pid_path): - print "File monitor is already running" - return - - wm = pyinotify.WatchManager() - notifier = pyinotify.Notifier(wm) - - monitored = get_monitor_profiles(appname) - - mask = (pyinotify.IN_ACCESS | pyinotify.IN_ATTRIB - | pyinotify.IN_CLOSE_WRITE | pyinotify.IN_CREATE - | pyinotify.IN_DELETE | pyinotify.IN_MODIFY - | pyinotify.IN_MOVED_TO) - for profile in monitored.itervalues(): - for path in profile.dirs: - wm.add_watch(path, mask, proc_fun=EventHandler( - actions=profile.actions, locks=profile.locks)) - - if not daemonize: - sys.stderr.write("Starting file monitor. (Press Ctrl+C to quit.)\n") - notifier.loop(daemonize=daemonize, pid_file=pid_path) +def start_daemon(appname): + get_daemon(appname).start() -def stop_daemon(): - """ - Stops the file monitor daemon. - """ - - pid_path = get_pid_path() - if not os.path.exists(pid_path): - print "File monitor is not running" - return - - f = open(pid_path) - pid = f.read().strip() - f.close() - if not pid.isdigit(): - log.warning("stop_daemon: Found bogus PID (%s) in file: %s" % (pid, pid_path)) - return - - os.kill(int(pid), signal.SIGKILL) - os.remove(pid_path) +def stop_daemon(appname): + get_daemon(appname).stop() diff --git a/edbob/filemon/win32.py b/edbob/filemon/win32.py index eca8485..504406c 100644 --- a/edbob/filemon/win32.py +++ b/edbob/filemon/win32.py @@ -33,8 +33,8 @@ import logging import threading import edbob +from edbob import filemon from edbob.errors import email_exception -from edbob.filemon import get_monitor_profiles from edbob.win32 import Service, file_is_free if sys.platform == 'win32': # docs should build for everyone @@ -69,7 +69,7 @@ class FileMonitorService(Service): return False # Read monitor profile(s) from config. - self.monitored = get_monitor_profiles(self.appname) + self.monitored = filemon.get_monitor_profiles(self.appname) # Make sure we have something to do. if not self.monitored: @@ -79,34 +79,36 @@ class FileMonitorService(Service): for key, profile in self.monitored.iteritems(): # Create a file queue for the profile. - queue = Queue.Queue() + profile.queue = Queue.Queue() - # Create a monitor thread for each folder in profile. + # Perform setup for each of the watched folders. for i, path in enumerate(profile.dirs, 1): + + # Maybe put all pre-existing files in the queue. + if profile.process_existing: + filemon.queue_existing(profile, path) + + # Create a monitor thread for the folder. name = 'monitor-%s-%u' % (key, i) log.debug("Initialize: Starting '%s' thread for folder: %s" % (name, path)) - thread = threading.Thread( - target=monitor_files, - name=name, - args=(queue, path, profile)) + thread = threading.Thread(target=monitor_files, + name=name, args=(profile, path)) thread.daemon = True thread.start() # Create an action thread for the profile. name = 'actions-%s' % key log.debug("Initialize: Starting '%s' thread" % name) - thread = threading.Thread( - target=perform_actions, - name=name, - args=(queue, profile)) + thread = threading.Thread(target=filemon.perform_actions, + name=name, args=(profile,)) thread.daemon = True thread.start() return True -def monitor_files(queue, path, profile): +def monitor_files(profile, path): """ Callable target for file monitor threads. """ @@ -138,40 +140,7 @@ def monitor_files(queue, path, profile): winnt.FILE_ACTION_RENAMED_NEW_NAME): log.debug("monitor_files: Queueing '%s' file: %s" % (profile.key, fpath)) - queue.put(fpath) - - -def perform_actions(queue, profile): - """ - Callable target for action threads. - """ - - while True: - - try: - path = queue.get_nowait() - except Queue.Empty: - pass - else: - - while not file_is_free(path): - win32api.Sleep(0) - - for spec, func, args in profile.actions: - - log.info("perform_actions: Calling function '%s' on file: %s" % - (spec, path)) - - try: - func(path, *args) - - except: - log.exception("perform_actions: An exception occurred " - "while processing file: %s" % path) - email_exception() - - # This file probably shouldn't be processed any further. - break + profile.queue.put(fpath) if __name__ == '__main__':