Skip to content Skip to sidebar Skip to footer

Combining Python Watchdog With Multiprocessing Or Threading

I'm using Python's Watchdog to monitor a given directory for new files being created. When a file is created, some code runs that spawns a subprocess shell command to run differen

Solution 1:

Here's what I ended up doing, which solved my problem. I used multiprocessing to start a separate watchdog monitoring process to watch for each file separately. Watchdog already queues up new files for me, which is fine for me.

As for point 2 above, I needed, e.g. a file2 to process before a file1, even though file1 was created first. So during file1 I check for the output of the file2 processing. If it finds it, it goes ahead processing file1. If it doesn't it exits. On file2 processing, I check to see if file1 was created already, and if so, then process file1. (Code for this not shown)

Main Monitoring of Cameras

def monitorCam(camera, config, mainlog):
    '''Uses the Watchdog package to monitor the data directory for new files.
    See the MonitorFiles class in dosClasses for actual monitoring code.  Monitors each camera.'''

    mainlog.info('Process Name, PID: {0},{1}'.format(mp.current_process().name,mp.current_process().pid))

    #init cam log
    camlog = initLogger(config, filename='manga_dos_{0}'.format(camera))
    camlog.info('Camera {0}, PID {1} '.format(camera,mp.current_process().pid))
    config.camera=camera

    event_handler = dosclass.MonitorFiles(config, camlog, mainlog)

    # add logging the the event handler
    log_handler = LoggingEventHandler()

    # set up observer
    observer = Observer()
    observer.schedule(event_handler, path=config.fitsDir, recursive=False)
    observer.schedule(log_handler, config.fitsDir, recursive=False)
    observer.daemon=True
    observer.start()
    camlog.info('Begin MaNGA DOS!')
    camlog.info('Start watching directory {0} for new files ...'.format(config.fitsDir))
    camlog.info('Watching directory {0} for new files from camera {1}'.format(config.fitsDir,camera))

    # monitor
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.unschedule_all()
        observer.stop()
        camlog.info('Stop watching directory ...')
        camlog.info('End MaNGA DOS!')
        camlog.info('--------------------------')
        camlog.info('')
    #observer.join()

    if observer.is_alive():
        camlog.info('still alive')
    else:
        camlog.info('thread ending')    

Start of Multiple Camera Processes

def startProcess(camera,config,log):
    ''' Uses multiprocessing module to start 4 different camera monitoring processes'''

    jobs=[]

    #pdb.set_trace()

    #log.info(mp.log_to_stderr(logging.DEBUG))
    for i in range(len(camera)):
        log.info('Starting to monitor camera {0}'.format(camera[i]))
        print 'Starting to monitor camera {0}'.format(camera[i])
        try:
            p = mp.Process(target=monitorCam, args=(camera[i],config, log), name=camera[i])
            p.daemon=True
            jobs.append(p)
            p.start()
        except KeyboardInterrupt:
            log.info('Ending process: {0} for camera {1}'.format(mp.current_process().pid, camera[i]))
            p.terminate()
            log.info('Terminated: {0}, {1}'.format(p,p.is_alive()))

    for i in range(len(jobs)):
        jobs[i].join()  

    return      

Solution 2:

I'm not sure it would make much sense to do a thread per file. The GIL will probably eliminate any advantage you'd see from doing that and might even impact performance pretty badly and lead to some unexpected behavior. I haven't personally found watchdog to be very reliable. You might consider implementing your own file watcher which can be done fairly easily as in the django framework (see here) by creating a dict with the modified timestamp for each file.


Post a Comment for "Combining Python Watchdog With Multiprocessing Or Threading"