Combining Python Watchdog With Multiprocessing Or Threading
Solution 1:
Here's what I ended up doing, which solved my problem. I used multiprocessing to start a separate watchdog monitoring process to watch for each file separately. Watchdog already queues up new files for me, which is fine for me.
As for point 2 above, I needed, e.g. a file2 to process before a file1, even though file1 was created first. So during file1 I check for the output of the file2 processing. If it finds it, it goes ahead processing file1. If it doesn't it exits. On file2 processing, I check to see if file1 was created already, and if so, then process file1. (Code for this not shown)
Main Monitoring of Cameras
def monitorCam(camera, config, mainlog):
'''Uses the Watchdog package to monitor the data directory for new files.
See the MonitorFiles class in dosClasses for actual monitoring code. Monitors each camera.'''
mainlog.info('Process Name, PID: {0},{1}'.format(mp.current_process().name,mp.current_process().pid))
#init cam log
camlog = initLogger(config, filename='manga_dos_{0}'.format(camera))
camlog.info('Camera {0}, PID {1} '.format(camera,mp.current_process().pid))
config.camera=camera
event_handler = dosclass.MonitorFiles(config, camlog, mainlog)
# add logging the the event handler
log_handler = LoggingEventHandler()
# set up observer
observer = Observer()
observer.schedule(event_handler, path=config.fitsDir, recursive=False)
observer.schedule(log_handler, config.fitsDir, recursive=False)
observer.daemon=True
observer.start()
camlog.info('Begin MaNGA DOS!')
camlog.info('Start watching directory {0} for new files ...'.format(config.fitsDir))
camlog.info('Watching directory {0} for new files from camera {1}'.format(config.fitsDir,camera))
# monitor
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.unschedule_all()
observer.stop()
camlog.info('Stop watching directory ...')
camlog.info('End MaNGA DOS!')
camlog.info('--------------------------')
camlog.info('')
#observer.join()
if observer.is_alive():
camlog.info('still alive')
else:
camlog.info('thread ending')
Start of Multiple Camera Processes
def startProcess(camera,config,log):
''' Uses multiprocessing module to start 4 different camera monitoring processes'''
jobs=[]
#pdb.set_trace()
#log.info(mp.log_to_stderr(logging.DEBUG))
for i in range(len(camera)):
log.info('Starting to monitor camera {0}'.format(camera[i]))
print 'Starting to monitor camera {0}'.format(camera[i])
try:
p = mp.Process(target=monitorCam, args=(camera[i],config, log), name=camera[i])
p.daemon=True
jobs.append(p)
p.start()
except KeyboardInterrupt:
log.info('Ending process: {0} for camera {1}'.format(mp.current_process().pid, camera[i]))
p.terminate()
log.info('Terminated: {0}, {1}'.format(p,p.is_alive()))
for i in range(len(jobs)):
jobs[i].join()
return
Solution 2:
I'm not sure it would make much sense to do a thread per file. The GIL will probably eliminate any advantage you'd see from doing that and might even impact performance pretty badly and lead to some unexpected behavior. I haven't personally found watchdog
to be very reliable. You might consider implementing your own file watcher which can be done fairly easily as in the django framework (see here) by creating a dict with the modified timestamp for each file.
Post a Comment for "Combining Python Watchdog With Multiprocessing Or Threading"