Overhauled file monitors.

Added the "process existing" (defaults to True) and "stop on error"
(defaults to False) features to both Linux and Win32 file monitors.
Both features may be overridden in config.  The Linux file monitor was
rewritten as an ``edbob.daemon.Daemon`` class.
This commit is contained in:
Lance Edgar 2013-02-16 17:11:02 -08:00
parent 7ae344403a
commit 896a320da5
4 changed files with 204 additions and 125 deletions

View file

@ -432,11 +432,6 @@ class FileMonitorCommand(Subcommand):
uninstall = subparsers.add_parser('uninstall', help="Uninstall (remove) service") uninstall = subparsers.add_parser('uninstall', help="Uninstall (remove) service")
uninstall.set_defaults(subcommand='remove') uninstall.set_defaults(subcommand='remove')
else:
parser.add_argument('-D', '--dont-daemonize',
action='store_false', dest='daemonize',
help="Don't daemonize when starting")
def get_win32_module(self): def get_win32_module(self):
from edbob.filemon import win32 from edbob.filemon import win32
return win32 return win32
@ -454,10 +449,10 @@ class FileMonitorCommand(Subcommand):
from edbob.filemon import linux as filemon from edbob.filemon import linux as filemon
if args.subcommand == 'start': if args.subcommand == 'start':
filemon.start_daemon(self.appname, daemonize=args.daemonize) filemon.start_daemon(self.appname)
elif args.subcommand == 'stop': elif args.subcommand == 'stop':
filemon.stop_daemon() filemon.stop_daemon(self.appname)
elif sys.platform == 'win32': elif sys.platform == 'win32':
from edbob import win32 from edbob import win32

View file

@ -26,10 +26,18 @@
``edbob.filemon`` -- File Monitoring Service ``edbob.filemon`` -- File Monitoring Service
""" """
import os
import os.path import os.path
import sys
import Queue
import logging import logging
import edbob import edbob
from edbob.errors import email_exception
if sys.platform == 'win32':
import win32api
from edbob.win32 import file_is_free
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -65,6 +73,12 @@ class MonitorProfile(object):
self.locks = edbob.config.getboolean( self.locks = edbob.config.getboolean(
'%s.filemon' % appname, '%s.locks' % key, default=False) '%s.filemon' % appname, '%s.locks' % key, default=False)
self.process_existing = edbob.config.getboolean(
'%s.filemon' % appname, '%s.process_existing' % key, default=True)
self.stop_on_error = edbob.config.getboolean(
'%s.filemon' % appname, '%s.stop_on_error' % key, default=False)
def get_monitor_profiles(appname): def get_monitor_profiles(appname):
""" """
@ -110,3 +124,105 @@ def get_monitor_profiles(appname):
del monitored[key] del monitored[key]
return monitored return monitored
def queue_existing(profile, path):
"""
Adds files found in a watched folder to a processing queue. This is called
when the monitor first starts, to handle the case of files which exist
prior to startup.
If files are found, they are first sorted by modification timestamp, using
a lexical sort on the filename as a tie-breaker, and then added to the
queue in that order.
:param profile: Monitor profile for which the folder is to be watched. The
profile is expected to already have a queue attached; any existing files
will be added to this queue.
:type profile: :class:`edbob.filemon.MonitorProfile` instance
:param path: Folder path which is to be checked for files.
:type path: string
:returns: ``None``
"""
def sorter(x, y):
mtime_x = os.path.getmtime(x)
mtime_y = os.path.getmtime(y)
if mtime_x < mtime_y:
return -1
if mtime_x > mtime_y:
return 1
return cmp(x, y)
paths = [os.path.join(path, x) for x in os.listdir(path)]
for path in sorted(paths, cmp=sorter):
# Only process normal files.
if not os.path.isfile(path):
continue
# If using locks, don't process "in transit" files.
if profile.locks and path.endswith('.lock'):
continue
log.debug("queue_existing: queuing existing file for "
"profile '%s': %s" % (profile.key, path))
profile.queue.put(path)
def perform_actions(profile):
"""
Callable target for action threads.
"""
keep_going = True
while keep_going:
try:
path = profile.queue.get_nowait()
except Queue.Empty:
pass
else:
# In some cases, processing one file may cause other related files
# to also be processed. When this happens, a path on the queue may
# point to a file which no longer exists.
if not os.path.exists(path):
log.info("perform_actions: path does not exist: %s" % path)
continue
log.debug("perform_actions: processing file: %s" % path)
if sys.platform == 'win32':
while not file_is_free(path):
win32api.Sleep(0)
for spec, func, args in profile.actions:
log.info("perform_actions: calling function '%s' on file: %s" %
(spec, path))
try:
func(path, *args)
except:
log.exception("perform_actions: exception occurred "
"while processing file: %s" % path)
email_exception()
# Don't process any more files if the profile is so
# configured.
if profile.stop_on_error:
keep_going = False
# Either way this particular file probably shouldn't be
# processed any further.
log.warning("perform_actions: no further processing "
"will be done for file: %s" % path)
break
log.warning("perform_actions: error encountered, and configuration "
"dictates that no more actions will be processed for "
"profile: %s" % profile.key)

View file

@ -27,14 +27,16 @@
""" """
import sys import sys
import os
import os.path import os.path
import signal
import logging
import pyinotify import pyinotify
import threading
import Queue
import logging
import edbob import edbob
from edbob.filemon import get_monitor_profiles from edbob import filemon
from edbob.daemon import Daemon
from edbob.errors import email_exception
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -45,9 +47,8 @@ class EventHandler(pyinotify.ProcessEvent):
Event processor for file monitor daemon. Event processor for file monitor daemon.
""" """
def my_init(self, actions=[], locks=False, **kwargs): def my_init(self, profile=None, **kwargs):
self.actions = actions self.profile = profile
self.locks = locks
def process_IN_ACCESS(self, event): def process_IN_ACCESS(self, event):
log.debug("EventHandler: IN_ACCESS: %s" % event.pathname) log.debug("EventHandler: IN_ACCESS: %s" % event.pathname)
@ -57,87 +58,85 @@ class EventHandler(pyinotify.ProcessEvent):
def process_IN_CLOSE_WRITE(self, event): def process_IN_CLOSE_WRITE(self, event):
log.debug("EventHandler: IN_CLOSE_WRITE: %s" % event.pathname) log.debug("EventHandler: IN_CLOSE_WRITE: %s" % event.pathname)
if not self.locks: if not self.profile.locks:
self.perform_actions(event.pathname) self.profile.queue.put(event.pathname)
def process_IN_CREATE(self, event): def process_IN_CREATE(self, event):
log.debug("EventHandler: IN_CREATE: %s" % event.pathname) log.debug("EventHandler: IN_CREATE: %s" % event.pathname)
def process_IN_DELETE(self, event): def process_IN_DELETE(self, event):
log.debug("EventHandler: IN_DELETE: %s" % event.pathname) log.debug("EventHandler: IN_DELETE: %s" % event.pathname)
if self.locks and event.pathname.endswith('.lock'): if self.profile.locks and event.pathname.endswith('.lock'):
self.perform_actions(event.pathname[:-5]) self.profile.queue.put(event.pathname[:-5])
def process_IN_MODIFY(self, event): def process_IN_MODIFY(self, event):
log.debug("EventHandler: IN_MODIFY: %s" % event.pathname) log.debug("EventHandler: IN_MODIFY: %s" % event.pathname)
def process_IN_MOVED_TO(self, event): def process_IN_MOVED_TO(self, event):
log.debug("EventHandler: IN_MOVED_TO: %s" % event.pathname) log.debug("EventHandler: IN_MOVED_TO: %s" % event.pathname)
if not self.locks: if not self.profile.locks:
self.perform_actions(event.pathname) self.profile.queue.put(event.pathname)
def perform_actions(self, path):
for spec, func, args in self.actions:
func(path, *args)
def get_pid_path(): class FileMonitorDaemon(Daemon):
"""
Returns the path to the PID file for the file monitor daemon.
"""
basename = os.path.basename(sys.argv[0]) def run(self):
pid_path = edbob.config.get('%s.filemon' % basename, 'pid_path')
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm)
mask = (pyinotify.IN_ACCESS
| pyinotify.IN_ATTRIB
| pyinotify.IN_CLOSE_WRITE
| pyinotify.IN_CREATE
| pyinotify.IN_DELETE
| pyinotify.IN_MODIFY
| pyinotify.IN_MOVED_TO)
monitored = filemon.get_monitor_profiles(self.appname)
for key, profile in monitored.iteritems():
# Create a file queue for the profile.
profile.queue = Queue.Queue()
# Perform setup for each of the watched folders.
for path in profile.dirs:
# Maybe put all pre-existing files in the queue.
if profile.process_existing:
filemon.queue_existing(profile, path)
# Create a watch for the folder.
log.debug("start_daemon: profile '%s' watches folder: %s" % (key, path))
wm.add_watch(path, mask, proc_fun=EventHandler(profile=profile))
# Create an action thread for the profile.
name = 'actions-%s' % key
log.debug("start_daemon: starting action thread: %s" % name)
thread = threading.Thread(target=filemon.perform_actions,
name=name, args=(profile,))
thread.daemon = True
thread.start()
# Fire up the watchers.
notifier.loop()
def get_daemon(appname=None):
if appname is None:
appname = os.path.basename(sys.argv[0])
pid_path = edbob.config.get('%s.filemon' % appname, 'pid_path')
if not pid_path: if not pid_path:
pid_path = '/tmp/%s_filemon.pid' % basename pid_path = '/tmp/%s_filemon.pid' % appname
return pid_path
monitor = FileMonitorDaemon(pid_path)
monitor.appname = appname
return monitor
def start_daemon(appname, daemonize=True): def start_daemon(appname):
""" get_daemon(appname).start()
Starts the file monitor daemon.
"""
pid_path = get_pid_path()
if os.path.exists(pid_path):
print "File monitor is already running"
return
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm)
monitored = get_monitor_profiles(appname)
mask = (pyinotify.IN_ACCESS | pyinotify.IN_ATTRIB
| pyinotify.IN_CLOSE_WRITE | pyinotify.IN_CREATE
| pyinotify.IN_DELETE | pyinotify.IN_MODIFY
| pyinotify.IN_MOVED_TO)
for profile in monitored.itervalues():
for path in profile.dirs:
wm.add_watch(path, mask, proc_fun=EventHandler(
actions=profile.actions, locks=profile.locks))
if not daemonize:
sys.stderr.write("Starting file monitor. (Press Ctrl+C to quit.)\n")
notifier.loop(daemonize=daemonize, pid_file=pid_path)
def stop_daemon(): def stop_daemon(appname):
""" get_daemon(appname).stop()
Stops the file monitor daemon.
"""
pid_path = get_pid_path()
if not os.path.exists(pid_path):
print "File monitor is not running"
return
f = open(pid_path)
pid = f.read().strip()
f.close()
if not pid.isdigit():
log.warning("stop_daemon: Found bogus PID (%s) in file: %s" % (pid, pid_path))
return
os.kill(int(pid), signal.SIGKILL)
os.remove(pid_path)

View file

@ -33,8 +33,8 @@ import logging
import threading import threading
import edbob import edbob
from edbob import filemon
from edbob.errors import email_exception from edbob.errors import email_exception
from edbob.filemon import get_monitor_profiles
from edbob.win32 import Service, file_is_free from edbob.win32 import Service, file_is_free
if sys.platform == 'win32': # docs should build for everyone if sys.platform == 'win32': # docs should build for everyone
@ -69,7 +69,7 @@ class FileMonitorService(Service):
return False return False
# Read monitor profile(s) from config. # Read monitor profile(s) from config.
self.monitored = get_monitor_profiles(self.appname) self.monitored = filemon.get_monitor_profiles(self.appname)
# Make sure we have something to do. # Make sure we have something to do.
if not self.monitored: if not self.monitored:
@ -79,34 +79,36 @@ class FileMonitorService(Service):
for key, profile in self.monitored.iteritems(): for key, profile in self.monitored.iteritems():
# Create a file queue for the profile. # Create a file queue for the profile.
queue = Queue.Queue() profile.queue = Queue.Queue()
# Create a monitor thread for each folder in profile. # Perform setup for each of the watched folders.
for i, path in enumerate(profile.dirs, 1): for i, path in enumerate(profile.dirs, 1):
# Maybe put all pre-existing files in the queue.
if profile.process_existing:
filemon.queue_existing(profile, path)
# Create a monitor thread for the folder.
name = 'monitor-%s-%u' % (key, i) name = 'monitor-%s-%u' % (key, i)
log.debug("Initialize: Starting '%s' thread for folder: %s" % log.debug("Initialize: Starting '%s' thread for folder: %s" %
(name, path)) (name, path))
thread = threading.Thread( thread = threading.Thread(target=monitor_files,
target=monitor_files, name=name, args=(profile, path))
name=name,
args=(queue, path, profile))
thread.daemon = True thread.daemon = True
thread.start() thread.start()
# Create an action thread for the profile. # Create an action thread for the profile.
name = 'actions-%s' % key name = 'actions-%s' % key
log.debug("Initialize: Starting '%s' thread" % name) log.debug("Initialize: Starting '%s' thread" % name)
thread = threading.Thread( thread = threading.Thread(target=filemon.perform_actions,
target=perform_actions, name=name, args=(profile,))
name=name,
args=(queue, profile))
thread.daemon = True thread.daemon = True
thread.start() thread.start()
return True return True
def monitor_files(queue, path, profile): def monitor_files(profile, path):
""" """
Callable target for file monitor threads. Callable target for file monitor threads.
""" """
@ -138,40 +140,7 @@ def monitor_files(queue, path, profile):
winnt.FILE_ACTION_RENAMED_NEW_NAME): winnt.FILE_ACTION_RENAMED_NEW_NAME):
log.debug("monitor_files: Queueing '%s' file: %s" % log.debug("monitor_files: Queueing '%s' file: %s" %
(profile.key, fpath)) (profile.key, fpath))
queue.put(fpath) profile.queue.put(fpath)
def perform_actions(queue, profile):
"""
Callable target for action threads.
"""
while True:
try:
path = queue.get_nowait()
except Queue.Empty:
pass
else:
while not file_is_free(path):
win32api.Sleep(0)
for spec, func, args in profile.actions:
log.info("perform_actions: Calling function '%s' on file: %s" %
(spec, path))
try:
func(path, *args)
except:
log.exception("perform_actions: An exception occurred "
"while processing file: %s" % path)
email_exception()
# This file probably shouldn't be processed any further.
break
if __name__ == '__main__': if __name__ == '__main__':