Python Scripts for Multiple Tasks on a Node

From CAC Documentation wiki
Jump to navigation Jump to search

Python

Python is a scripting resource that provides a uniform interface for scripts that can be run on either Linux or Windows machines. The scripts on this page are designed to enable you to have several tasks running simultaneously on a node to effectively use its multiple cores. Unless you have explicitly enabled multithreading in your code, it will run on a single core, using only 12.5% of the CPU power of the node. If it uses less than 2 GB of RAM as well, you can probably run 8 copies of it simultaneously and get a nearly 8-fold increase in the computation per node hour. Another possibility, for which an example isn't explicitly given, is to run a number of different tasks simultaneously. This option is often difficult to use effectively because the tasks are likely to require different amounts of time to execute, leaving some cores idle after the shorter-running tasks exit.

Using multiple instances of the same task

There is one caveat about running multiple tasks simultaneously: Each must have a way to determine what work it is supposed to do and each should name its output files uniquely so that collisions don't occur. In particular, if two programs try to open the same output file for writing, the second open will fail. There are several ways to keep multiple tasks from stepping on each other. One possibility is for them to work in different directories. You could create several directories and have each instance of the program work in a different one. Another possibility is to run a master/worker model in which the workers sit waiting for data and are fed from a master, to which they return their results. Examples for both of these possibilities are given below.

Multiple tasks using different directories

Here we assume for concreteness that you have a directory named "work" that has a number of subdirectories containing input files for a task we'll call "myworker". This script starts up numtasks copies of "myworker", each in a separate directory, and starts another "myworker" each time one completes until all subdirectories of "work" have been used.

# Script for running multiple copies of an executable on a single batch node
# John Zollweg, Cornell Center for Advanced Computing, April 28, 2009
# Updated for Python 2.6 and Windows compatibility by Steve Lantz, CAC, February 3, 2010
# Usage: processdirs.py dir num   --e.g.,  processdirs.py H:\myworker.bat 8

import os,sys,time
from subprocess import *

if len(sys.argv) < 3:
    print """processdirs.py runs a number of copies of a code, each in a different directory
    Usage:  processdirs.py dir num
    Where:  dir is the full path to the subdirectories to be processed
            num is the maximum number of tasks to have running at a time"""
    sys.exit()

tasks = []                  # An empty list to hold the handles of the tasks that are running
ntasks = int(sys.argv[2])   # The maximum number of tasks to have running
workdir = sys.argv[1]       # The directory containing the subdirectories with work to be done
worklist = os.listdir(workdir)      # Directory listing of workdir (files will be ignored)
for w in worklist:                  # Iterate through the listing, looking for directories
    ww = os.path.join(workdir,w)
    if os.path.isdir(ww):
        while len(tasks) == ntasks: # Wait for a task to finish if number of tasks is at max
            time.sleep(10.0)        # Sleep for 10 seconds before polling the tasks
                                    # Keeps script from tying up a core with excessive polling
            for t in tasks:         # Iterate through the tasks
                if t.poll() is not None:            # True for a task that has completed
                    stuff = t.communicate()         # Tuple containing stdout and stderr
                    sys.stdout.write(stuff[0])      # Copy task stdout to script stdout
                    sys.stderr.write(stuff[1])      # Copy task stderr to script stderr
                    tasks.remove(t)                 # Remove completed task from list
                    break           # Takes us out of while-loop too, len(tasks) != ntasks now
        wwfile = os.path.join(ww,"myworker.sh")             # Windows might have "myworker.bat"
        tasks.append(Popen([wwfile],bufsize=1000000,cwd=ww,stdout=PIPE,stderr=PIPE))
            # To provide args to "myworker.bat", simply append args to the sequence in Popen
            # The bufsize can be adjusted, but if it's too small, this script will deadlock
            # Here when directory listing is complete; now wait for the running tasks to finish
while len(tasks) > 0:               # Do this until all tasks complete
    for t in tasks:                 # Iterate through the tasks
        if t.poll() is not None:            # True only for a task that has completed
            stuff = t.communicate()         # Tuple containing stdout and stderr
            sys.stdout.write(stuff[0])      # Copy task stdout to script stdout
            sys.stderr.write(stuff[1])      # Copy task stderr to script stderr
            tasks.remove(t)                 # Remove completed task from list
            break                   # Doesn't take us out of while-loop unless no tasks are left

Multiple tasks as master/worker

Tasks started using the standard subprocess module in python can communicate with the task that started them using pipes, as in the example above, but the piped output from the task is available only after it has completed, so it is not possible to use tasks started with subprocess.Popen() interactively. However, there is a module available at activestate that allows interaction with the spawned tasks. To use it, download it and place it in a suitable directory in your filespace. Then add the directory where you have placed it to the environment variable PYTHONPATH. I have called my copy subprocessinter.py for the sample below.

In this example, my worker program is designed to listen for the name of an xml file on stdin. When it receives a name, it opens and processes it, returning output on stdout. It is also designed to quit if the name of the file is "-". I pass the instance number of the worker to it as argument so it can use it in uniquely naming a log file. That isn't required -- it could use its PID in the logfile name. There are lots of other possibilities for telling a worker what to do.

# script for running multiple workers on a single batch node
# John Zollweg, Cornell Center for Advanced Computing, April 29, 2009
# usage: master.py dir num
# i.e., master.py work 8

import os,subprocess,sys,time
from subprocessinter import Popen

if len(sys.argv) < 3:
    print """master.py starts a number of copies of a code, feeds them data, and receives results
             Usage: processdirs.py dir num
             where:  The first argument is the directory (relative to where this script is running) containing files to be processed
                     The second argument is the number of worker tasks to run"""
    sys.exit()

nworkers = num(sys.argv[2])  # Number of workers
fin = "-\n"   # String that tells a worker to exit
tasks = []    # Array for task handles
qs = []  # Array to contain what has been sent
numactive = 0  # number of workers running
target = -1 #index for worker we will communicate with
for i in range(numtasks):  # loop to start workers
    i_s = "%d" % i   # convert loop index to a string
# start a worker, passing its index as an argument, and any other arguments it might need in <args>
    tasks.append(Popen([worker,i_s,<args>],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    qs.append('')   # put empty string as what has been sent to this woker 
    numactive += 1  #increment counter of workers running
files = os.listdir(sys.argv[1])    # list the files in the working directory
for f in files:     # iterate over the files
    if f.endswith('.xml'):  # our workers deal only with xml files - other files aren't sent
        recd = False     # flag for determining whether a worker is free
        while not recd:  # look for a worker that has results for us
            target = (target+1)%nworkers  # do round-robin on workers
            if len(qs[target]>0):   # has worker been sent anything?
                time.sleep(0.01)   # have it pause briefly before each check so doesn't spend all its time polling
                done=Popen.recv(tasks[target])    #check for a result from this worker
                if done:     # A result was received
                    print done    # Do something with the result - note that qs[target] is a reminder of what this worker worked on
                    recd = True    # This worker is eligible for the present work
            else:
                recd = True   # This worker hasn't been sent anything yet, so it's eligible for this work
        sent = Popen.send(tasks[target],f+'\n')    # Send filename to worker - we won't check that it was sent, but you probably ought to.
while numactive > 0: # look for a worker that has results for us as long as there are still active workers
    target = (target+1)%nworkers  # do round-robin on workers
    if len(qs[target]>0):   # is worker still working on anything? 
        done=Popen.recv(tasks[target])    #check for a result from this worker
        if done:     # A result was received
            print done    # Do something with the result - note that qs[target] is a reminder of what this worker worked on
            Popen.send(tasks[target],fin)   # Tell worker to quit
            qs[target] = ''  # Mark worker as idle so we don't try to receive from it again
            numactive -= 1  # Decrement number of active workers
for t in tasks:  # Now that all results have been received from all workers and they have been told to quit, clean up
    t.wait()

Variants for streaming large volumes of data to stdout

When lots of data are being written to stdout, it may be unwise to pipe everything directly to the stdout of the parent Python script. Internal buffers handling the I/O will quickly fill up, causing the Python program to freeze. There are two ways to avoid this: (1) write the stdout of each task to a separate file; (2) unload the buffer of each task in turn by using the communicate() method, without polling.

The following variant of processdirs.py utilizes strategy #1. It writes the stdout of each task to a separate file.

# Script for running multiple copies of an executable on a single batch node
# John Zollweg, Cornell Center for Advanced Computing, April 28, 2009
# Updated for Python 2.6 and Windows compatibility by Steve Lantz, CAC, February 3, 2010
# This variant sends stdout and stderr to files instead of piping everything to the parent -
# Prevents deadlock when tasks of variable duration are writing lots of data to stdout
# Usage: processdirs-nopipes.py dir num   --e.g.,  processdirs-nopipes.py H:\myworker.bat 8

import os,sys,time
from subprocess import *

if len(sys.argv) < 3:
    print """processdirs-nopipes.py runs copies of a code, each in a different directory
    Usage:  processdirs.py dir num
    Where:  dir is the full path to the subdirectories to be processed
            num is the maximum number of tasks to have running at a time"""
    sys.exit()

tasks = []                  # An empty list to hold the handles of the tasks that are running
taskout = dict()            # Dictionary to store file objects receiving stdout from each task
taskerr = dict()            # Dictionary to store file objects receiving stderr from each task
ntasks = int(sys.argv[2])   # The maximum number of tasks to have running
workdir = sys.argv[1]       # The directory containing the subdirectories with work to be done
worklist = os.listdir(workdir)      # Directory listing of workdir (files will be ignored)
for w in worklist:                  # Iterate through the listing, looking for directories
    ww = os.path.join(workdir,w)
    if os.path.isdir(ww):
        while len(tasks) == ntasks: # Wait for a task to finish if number of tasks is at max
            time.sleep(10.0)        # Sleep for 10 seconds before polling the tasks
                                    # Keeps script from tying up a core with excessive polling
            for t in tasks:         # Iterate through the tasks
                if t.poll() is not None:            # True for a task that has completed
                    taskout[w].close                # Close stdout file for this task
                    taskerr[w].close                # Close stdout file for this task
                    tasks.remove(t)                 # Remove completed task from list
                    break           # Takes us out of while-loop too, len(tasks) != ntasks now
        wwfile = os.path.join(ww,"myworker.sh")             # Windows might have "myworker.bat"
        taskout[w] = open(os.path.join(ww,"out.txt"),'w')   # Open stdout file for this task
        taskerr[w] = open(os.path.join(ww,"err.txt"),'w')   # Open stdout file for this task
        tasks.append(Popen([wwfile],cwd=ww,stdout=taskout[w],stderr=taskerr[w]))     # Start 1
            # To provide args to "myworker.bat", simply append args to the sequence in Popen
            # Here when directory listing is complete; now wait for the running tasks to finish
while len(tasks) > 0:               # Do this until all tasks complete
    for t in tasks:                 # Iterate through the tasks
        if t.poll() is not None:            # True only for a task that has completed
            taskout[w].close                # Close stdout file for this task
            taskerr[w].close                # Close stdout file for this task
            tasks.remove(t)                 # Remove completed task from list
            break                   # Doesn't take us out of while-loop unless no tasks are left

Here's another variant of processdirs.py that utilizes strategy #2. All calls to the poll() method are removed. The communicate() method alone is used to unload the buffer of each task, in FIFO order.

# Script for running multiple copies of an executable on a single batch node
# John Zollweg, Cornell Center for Advanced Computing, April 28, 2009
# Updated for Python 2.6 and Windows compatibility by Steve Lantz, CAC, January 28, 2010
# This variant waits for tasks to complete in FIFO order rather than polling them repeatedly -
# Prevents deadlock when tasks of similar duration are writing lots of data to stdout
# Usage: processdirs-nopoll.py dir num   --e.g.,  processdirs-nopoll.py H:\myworker.bat 8

import os,sys,time
from subprocess import *

if len(sys.argv) < 3:
    print """processdirs-nopoll.py runs copies of a code, each in a different directory
    Usage:  processdirs.py dir num
    Where:  dir is the full path to the subdirectories to be processed
            num is the maximum number of tasks to have running at a time"""
    sys.exit()

tasks = []                  # An empty list to hold the handles of the tasks that are running
ntasks = int(sys.argv[2])   # The maximum number of tasks to have running
workdir = sys.argv[1]       # The directory containing the subdirectories with work to be done
worklist = os.listdir(workdir)      # Directory listing of workdir (files will be ignored)
for w in worklist:                  # Iterate through the listing, looking for directories
    ww = os.path.join(workdir,w)
    if os.path.isdir(ww):
        while len(tasks) == ntasks: # Wait for a task to finish if number of tasks is at max
            tfifo = tasks.pop(0)            # Pop oldest task from list (first in, first out)
            stuff = tfifo.communicate()     # Wait for task to return tuple with stdout, stderr
            sys.stdout.write(stuff[0])      # Copy task stdout to script stdout
            sys.stderr.write(stuff[1])      # Copy task stderr to script stderr
        wwfile = os.path.join(ww,"myworker.sh")             # Windows might have "myworker.bat"
        tasks.append(Popen([wwfile],cwd=ww,stdout=PIPE,stderr=PIPE))    # Start a task
            # To provide args to "myworker.bat", simply append args to the sequence in Popen
            # Here when directory listing is complete; now wait for the running tasks to finish
while len(tasks) > 0:       # Do this until all tasks complete
    tfifo = tasks.pop(0)            # Remove oldest task from list (first in, first out)
    stuff = tfifo.communicate()     # Wait for this task to complete
    sys.stdout.write(stuff[0])      # Copy task stdout to script stdout
    sys.stderr.write(stuff[1])      # Copy task stderr to script stderr