overhaul filemon.win32 service

This commit is contained in:
Lance Edgar 2012-08-17 08:50:19 -07:00
parent 3adf289652
commit 1d8403a67e
6 changed files with 350 additions and 285 deletions

View file

@ -26,8 +26,13 @@
``edbob.filemon`` -- File Monitoring Service
"""
import os.path
import logging
import edbob
from edbob.exceptions import ConfigError
log = logging.getLogger(__name__)
class MonitorProfile(object):
@ -36,11 +41,69 @@ class MonitorProfile(object):
monitor service.
"""
def __init__(self, key):
def __init__(self, appname, key):
self.appname = appname
self.key = key
self.dirs = eval(edbob.config.require('edbob.filemon', '%s.dirs' % key))
if not self.dirs:
raise ConfigError('edbob.filemon', '%s.dirs' % key)
self.actions = eval(edbob.config.require('edbob.filemon', '%s.actions' % key))
if not self.actions:
raise ConfigError('edbob.filemon', '%s.actions' % key)
self.dirs = edbob.config.require('%s.filemon' % appname, '%s.dirs' % key)
self.dirs = eval(self.dirs)
actions = edbob.config.require('%s.filemon' % appname, '%s.actions' % key)
actions = eval(actions)
self.actions = []
for action in actions:
if isinstance(action, tuple):
spec = action[0]
args = list(action[1:])
else:
spec = action
args = []
func = edbob.load_spec(spec)
self.actions.append((spec, func, args))
def get_monitor_profiles(appname):
"""
Convenience function to load monitor profiles from config.
"""
monitored = {}
# Read monitor profile(s) from config.
keys = edbob.config.require('%s.filemon' % appname, 'monitored')
keys = keys.split(',')
for key in keys:
key = key.strip()
profile = MonitorProfile(appname, key)
monitored[key] = profile
for path in profile.dirs[:]:
# Ensure the monitored path exists.
if not os.path.exists(path):
log.warning("get_monitor_profiles: Profile '%s' has nonexistent "
"path, which will be pruned: %s" % (key, path))
profile.dirs.remove(path)
# Ensure the monitored path is a folder.
elif not os.path.isdir(path):
log.warning("get_monitor_profiles: Profile '%s' has non-folder "
"path, which will be pruned: %s" % (key, path))
profile.dirs.remove(path)
for key in monitored.keys():
profile = monitored[key]
# Prune any profiles with no valid folders to monitor.
if not profile.dirs:
log.warning("get_monitor_profiles: Profile '%s' has no folders to "
"monitor, and will be pruned." % key)
del monitored[key]
# Prune any profiles with no valid actions to perform.
elif not profile.actions:
log.warning("get_monitor_profiles: Profile '%s' has no actions to "
"perform, and will be pruned." % key)
del monitored[key]
return monitored

View file

@ -23,136 +23,213 @@
################################################################################
"""
``edbob.filemon.win32`` -- File Monitor for Windows
``edbob.filemon.win32`` -- File Monitoring Service for Windows
"""
# Much of the Windows monitoring code below was borrowed from Tim Golden:
# http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html
import sys
import os.path
import threading
import sys
import Queue
import logging
import subprocess
import threading
import edbob
from edbob.exceptions import ConfigError
from edbob.errors import email_exception
from edbob.filemon import get_monitor_profiles
from edbob.win32 import file_is_free
if sys.platform == 'win32':
import win32file
if sys.platform == 'win32': # docs should build for everyone
import win32api
import win32con
import win32event
import win32file
import win32service
import win32serviceutil
import winnt
FILE_LIST_DIRECTORY = 0x0001
ACTION_CREATE = 1
ACTION_DELETE = 2
ACTION_UPDATE = 3
ACTION_RENAME_TO = 4
ACTION_RENAME_FROM = 5
log = logging.getLogger(__name__)
def exec_server_command(command):
class FileMonitorService(win32serviceutil.ServiceFramework):
"""
Executes ``command`` against the file monitor Windows service, i.e. one of:
* ``'install'``
* ``'start'``
* ``'stop'``
* ``'remove'``
Implements edbob's file monitor Windows service.
"""
server_path = os.path.join(os.path.dirname(__file__), 'filemon_server.py')
subprocess.call([sys.executable, server_path, command])
_svc_name_ = "Edbob File Monitor"
_svc_display_name_ = "Edbob : File Monitoring Service"
_svc_description_ = ("Monitors one or more folders for incoming files, "
"and performs configured actions as new files arrive.")
appname = 'edbob'
def __init__(self, args):
"""
Constructor.
"""
# super(FileMonitorService, self).__init__(args)
win32serviceutil.ServiceFramework.__init__(self, args)
# Create "wait stop" event, for main worker loop.
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
def Initialize(self):
"""
Service initialization.
"""
# Read configuration file(s).
edbob.init(self.appname)
# Read monitor profile(s) from config.
self.monitored = get_monitor_profiles(self.appname)
# Make sure we have something to do.
if not self.monitored:
return False
# Create monitor and action threads for each profile.
for key, profile in self.monitored.iteritems():
# Create a file queue for the profile.
queue = Queue.Queue()
# Create a monitor thread for each folder in profile.
for i, path in enumerate(profile.dirs, 1):
name = 'monitor-%s-%u' % (key, i)
log.debug("Initialize: Starting '%s' thread for folder: %s" %
(name, path))
thread = threading.Thread(
target=monitor_files,
name=name,
args=(queue, path, profile))
thread.daemon = True
thread.start()
# Create an action thread for the profile.
name = 'actions-%s' % key
log.debug("Initialize: Starting '%s' thread" % name)
thread = threading.Thread(
target=perform_actions,
name=name,
args=(queue, profile))
thread.daemon = True
thread.start()
return True
def SvcDoRun(self):
"""
This method is invoked when the service starts.
"""
import servicemanager
# Write start occurrence to Windows Event Log.
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STARTED,
(self._svc_name_, ''))
# Figure out what we're supposed to be doing.
if self.Initialize():
# Wait infinitely for stop request, while threads do their thing.
log.info("SvcDoRun: All threads started; waiting for stop request.")
win32event.WaitForSingleObject(self.hWaitStop, win32event.INFINITE)
log.info("SvcDoRun: Stop request received.")
else: # Nothing to be done...
msg = "Nothing to do! No valid monitor profiles found in config."
servicemanager.LogWarningMsg(msg)
log.warning("SvcDoRun: %s" % msg)
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
# Write stop occurrence to Windows Event Log.
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STOPPED,
(self._svc_name_, ''))
def SvcStop(self):
"""
This method is invoked when the service is requested to stop itself.
"""
# Let the SCM know we're trying to stop.
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
# Let worker loop know its job is done.
win32event.SetEvent(self.hWaitStop)
def monitor_win32(path, include_subdirs=False):
"""
This is the workhorse of file monitoring on the Windows platform. It is a
generator function which yields a ``(file_type, file_path, action)`` tuple
whenever changes occur in the monitored folder. ``file_type`` will be one
of:
* ``'file'``
* ``'folder'``
* ``'<deleted>'``
``file_path`` will be the path to the changed object; and ``action`` will
be one of:
* ``ACTION_CREATE``
* ``ACTION_DELETE``
* ``ACTION_UPDATE``
* ``ACTION_RENAME_TO``
* ``ACTION_RENAME_FROM``
(The above are "constants" importable from ``edbob.filemon``.)
This function leverages the ``ReadDirectoryChangesW()`` Windows API
function. This is nice because the OS now reports changes to us so that we
needn't poll to find them.
However ``ReadDirectoryChangesW()`` is a blocking call, so in practice this
means that if, say, you're using this function in a python shell, you will
not be able to stop the process with a keyboard interrupt. (Actually you
sort of can, as long as you send the interrupt and then perform some file
operation which will trigger the monitoring function to return.)
Fortunately the primary need for this function is by the Windows service,
which is of course "disconnected" from the user's desktop and never really
needs to close under normal circumstances.
def monitor_files(queue, path, profile):
"""
Callable target for file monitor threads.
"""
hDir = win32file.CreateFile(
path,
FILE_LIST_DIRECTORY,
winnt.FILE_LIST_DIRECTORY,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
win32con.FILE_FLAG_BACKUP_SEMANTICS,
None)
if hDir == win32file.INVALID_HANDLE_VALUE:
log.warning("monitor_files: Can't open directory with CreateFile(): %s" % path)
return
while True:
results = win32file.ReadDirectoryChangesW (
results = win32file.ReadDirectoryChangesW(
hDir,
1024,
include_subdirs,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME |
win32con.FILE_NOTIFY_CHANGE_DIR_NAME |
win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |
win32con.FILE_NOTIFY_CHANGE_SIZE |
win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |
win32con.FILE_NOTIFY_CHANGE_SECURITY,
None,
None)
False,
win32con.FILE_NOTIFY_CHANGE_FILE_NAME)
for action, fn in results:
fpath = os.path.join(path, fn)
if not os.path.exists(fpath):
ftype = "<deleted>"
elif os.path.isdir(fpath):
ftype = "folder"
else:
ftype = "file"
yield ftype, fpath, action
log.debug("monitor_files: ReadDirectoryChangesW() results: %s" % results)
for action, fname in results:
fpath = os.path.join(path, fname)
if action in (winnt.FILE_ACTION_ADDED,
winnt.FILE_ACTION_RENAMED_NEW_NAME):
log.debug("monitor_files: Queueing '%s' file: %s" %
(profile.key, fpath))
queue.put(fpath)
class WatcherWin32(threading.Thread):
def perform_actions(queue, profile):
"""
A ``threading.Thread`` subclass which is responsible for monitoring a
particular folder (on Windows platforms).
Callable target for action threads.
"""
def __init__(self, key, path, queue, include_subdirs=False, **kwargs):
threading.Thread.__init__(self, **kwargs)
self.setDaemon(1)
self.key = key
self.path = path
self.queue = queue
self.include_subdirs = include_subdirs
self.start()
while True:
def run(self):
for result in monitor_win32(self.path,
include_subdirs=self.include_subdirs):
self.queue.put((self.key,) + result)
try:
path = queue.get_nowait()
except Queue.Empty:
pass
else:
while not file_is_free(path):
win32api.Sleep(0)
for spec, func, args in profile.actions:
log.info("perform_actions: Calling function '%s' on file: %s" %
(spec, path))
try:
func(path, *args)
except:
log.exception("perform_actions: An exception occurred "
"while processing file: %s" % path)
email_exception()
# This file probably shouldn't be processed any further.
break
if __name__ == '__main__':
win32serviceutil.HandleCommandLine(FileMonitorService)

View file

@ -1,164 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
################################################################################
#
# edbob -- Pythonic Software Framework
# Copyright © 2010-2012 Lance Edgar
#
# This file is part of edbob.
#
# edbob is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# edbob is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for
# more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with edbob. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
``edbob.filemon_server`` -- File Monitoring Service for Windows
"""
import os.path
import sys
import socket
import time
import Queue
import logging
from traceback import format_exception
import edbob
from edbob.filemon import MonitorProfile
from edbob.filemon.win32 import WatcherWin32, ACTION_CREATE, ACTION_UPDATE
from edbob.win32 import file_is_free
if sys.platform == 'win32': # docs should build for everyone
import win32serviceutil
import win32service
import win32event
import servicemanager
import win32api
log = logging.getLogger(__name__)
class FileMonitorService(win32serviceutil.ServiceFramework):
"""
Implements edbob's file monitor Windows service.
"""
_svc_name_ = "Edbob File Monitor"
_svc_display_name_ = "Edbob : File Monitoring Service"
_svc_description_ = ("Monitors one or more folders for incoming files, "
"and performs configured actions as new files arrive.")
appname = 'edbob'
def __init__(self, *args):
win32serviceutil.ServiceFramework.__init__(self, *args)
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
socket.setdefaulttimeout(60)
self.stop_requested = False
def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
self.stop_requested = True
def SvcDoRun(self):
servicemanager.LogMsg(servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STARTED,
(self._svc_name_, ''))
edbob.init(self.appname)
self.main()
def main(self):
self.monitored = {}
monitored = edbob.config.require('edbob.filemon', 'monitored')
monitored = monitored.split(',')
for key in monitored:
key = key.strip()
profile = MonitorProfile(key)
self.monitored[key] = profile
log.debug("Monitoring profile '%s': %s" % (key, profile.dirs))
for path in profile.dirs:
if not os.path.exists(path):
log.warning("Path does not exist: %s" % path)
queue = Queue.Queue()
for key in self.monitored:
for d in self.monitored[key].dirs:
WatcherWin32(key, d, queue)
while not self.stop_requested:
try:
key, ftype, fpath, action = queue.get_nowait()
except Queue.Empty:
pass
else:
log.debug("Got notification: %s, %s, %s" % (key, ftype, fpath))
# if ftype == 'file' and action in (
# ACTION_CREATE, ACTION_UPDATE):
if ftype == 'file' and action == ACTION_CREATE:
self.do_actions(key, fpath)
win32api.SleepEx(250, True)
servicemanager.LogMsg(servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STOPPED,
(self._svc_name_, ''))
def do_actions(self, key, path):
if not os.path.exists(path):
return
while not file_is_free(path):
# TODO: Add configurable timeout so long-open files can't hijack
# our prcessing.
win32api.SleepEx(250, True)
for action in self.monitored[key].actions:
if isinstance(action, tuple):
func = action[0]
args = action[1:]
else:
func = action
args = []
func = edbob.load_spec(func)
try:
func(path, *args)
except:
exc_info = sys.exc_info()
# Call the system exception hook in case anything special has
# been registered there, e.g. if edbob.errors.init() has
# happened. Note that this is especially necessary since
# PythonService.exe doesn't seem to honor sys.excepthook.
sys.excepthook(*exc_info)
# Go ahead and write exception info to the Windows Event Log
# while we're at it.
msg = "File monitor action failed.\n"
msg += "\n"
msg += "Profile: %s\n" % key
msg += "Action: %s\n" % action
msg += "File Path: %s\n" % path
msg += "\n"
msg += ''.join(format_exception(*exc_info))
servicemanager.LogErrorMsg(msg)
# Don't re-raise the exception since the service should
# continue running despite any problems it encounters. But
# this file probably shouldn't be processed any further.
break
if __name__ == '__main__':
win32serviceutil.HandleCommandLine(FileMonitorService)