Work Queue is Copyright (C) 2016- The University of Notre Dame. All rights reserved. This software is distributed under the GNU General Public License. See the file COPYING for details.
Work Queue is a framework for building large scale master-worker applications. Using the Work Queue library, you create a custom master program that defines and submits a large number of small tasks. Each task is distributed to a remote worker process which executes it and returns the results. As results are created, the master may generate more tasks to be executed. It is not unusual to write programs that distribute millions of tasks to thousands of remote workers.
Each worker process is a common executable that can be deployed within existing cluster and cloud systems, so it's easy to deploy a Work Queue application to run on machines that you already have access to. Whether you use a university batch system or a commercial cloud provider, your Work Queue application will be able to run there.
Work Queue is a production framework that has been used to create highly scalable scientific applications in high energy physics, bioinformatics, data mining, and other fields. It can also be used as an execution system for the Makeflow workflow engine. To see some of the Work Queue applications running right now, view the real time status page.
Work Queue is part of the Cooperating Computing Tools. The CCTools package can be downloaded from this web page. Follow the installation instructions to setup CCTools required for running Work Queue. The documentation for the full set of features of the Work Queue API can be viewed from either within the CCTools package or here and here.
Let's begin by running a simple but complete example of a Work Queue application. After trying it out, we will then show how to write a Work Queue application from scratch.
We assume that you have downloaded and installed the cctools package in your home directory under $HOME/cctools. Next, download the example file for the language of your choice:
gcc work_queue_example.c -o work_queue_example -I${HOME}/cctools/include/cctools -L${HOME}/cctools/lib -lwork_queue -ldttools -lm
export PYTHONPATH=${PYTHONPATH}:${HOME}/cctools/lib/python2.6/site-packages
export PERL5LIB=${PERL5LIB}:${HOME}/cctools/lib/perl5/site_perl
./work_queue_example a b c
listening on port 9123... submitted task: /usr/bin/gzip < a > a.gz submitted task: /usr/bin/gzip < b > b.gz submitted task: /usr/bin/gzip < c > c.gz waiting for tasks to complete...
work_queue_worker MACHINENAME 9123
If you have access to a Condor pool, you can use this shortcut to submit ten workers at once via Condor:
% condor_submit_workers MACHINENAME 9123 10 Submitting job(s).......... Logging submit event(s).......... 10 job(s) submitted to cluster 298.
% sge_submit_workers MACHINENAME 9123 10 Your job 153083 ("worker.sh") has been submitted Your job 153084 ("worker.sh") has been submitted Your job 153085 ("worker.sh") has been submitted ...
pbs_submit_workers torque_submit_workers slurm_submit_workers ec2_submit_workers
When the master completes, if the workers were not shut down in the master, your workers will still be available, so you can either run another master with the same workers, or you can remove the workers with kill, condor_rm, or qdel as appropriate. If you forget to remove them, they will exit automatically after fifteen minutes. (This can be adjusted with the -t option to worker.)
q = work_queue_create(port); for(all tasks) { t = work_queue_task_create(command); /* add to the task description */ work_queue_submit(q,t); } while(!work_queue_empty(q)) { t = work_queue_wait(q); work_queue_task_delete(t); } work_queue_delete(q);
q = work_queue_create(port);
q = WorkQueue(port)
In the example, we specify a command that takes a single input file and produces a single output file. We then create a task by providing the specified command as an argument:
t = work_queue_task_create(command);
t = Task(command)
work_queue_task_specify_file(t,"/usr/bin/gzip","gzip",WORK_QUEUE_INPUT,WORK_QUEUE_CACHE); work_queue_task_specify_file(t,infile,infile,WORK_QUEUE_INPUT,WORK_QUEUE_NOCACHE); work_queue_task_specify_file(t,outfile,outfile,WORK_QUEUE_OUTPUT,WORK_QUEUE_NOCACHE);
t.specify_file("/usr/bin/gzip","gzip",WORK_QUEUE_INPUT,cache=True); t.specify_file(infile,infile,WORK_QUEUE_INPUT,cache=False) t.specify_file(outfile,outfile,WORK_QUEUE_OUTPUT,cache=False)
t = work_queue_task_create("$WORK_QUEUE_SANDBOX/gzip < a > a.gz");
t = Task("$WORK_QUEUE_SANDBOX/gzip < a > a.gz")
We can also run a program that is already installed at the remote site, where the worker runs, by specifying its installed location in the command line of the task (and removing the specification of the executable as an input file). For example:
t = work_queue_task_create("/usr/bin/gzip < a > a.gz");
t = Task("/usr/bin/gzip < a > a.gz")
taskid = work_queue_submit(q,t);
taskid = q.submit(t)
t = work_queue_wait(q,5);
t = q.wait(5)
work_queue_task_delete(t);
Deleted automatically when task object goes out of scope
work_queue_delete(q);
Deleted automatically when work_queue object goes out of scope
The project name feature uses the catalog server to maintain and track the project names of masters and their respective locations. It works as follows: the master advertises its project name along with its hostname and port to the catalog server. Work Queue workers that are provided with the master's project name query the catalog server to find the hostname and port of the master with the given project name. So, to utilize this feature, the master must be specified to run in the WORK_QUEUE_MASTER_MODE_CATALOG. See Catalog Servers for details on specifying catalog servers.
For example, to have a Work Queue master advertise its project name as myproject, add the following code snippet after creating the queue:
work_queue_specify_master_mode(q, WORK_QUEUE_MASTER_MODE_CATALOG) work_queue_specify_name(q, "myproject");
wq.specify_mode(WORK_QUEUE_MASTER_MODE_CATALOG) wq.specify_name("myproject")
work_queue_worker -N myproject
% condor_submit_workers -N myproject 10 Submitting job(s).......... Logging submit event(s).......... 10 job(s) submitted to cluster 298.
% sge_submit_workers -N myproject 10 Your job 153097 ("worker.sh") has been submitted Your job 153098 ("worker.sh") has been submitted Your job 153099 ("worker.sh") has been submitted ...
Unless otherwise specified, Work Queue assumes that a single task runs on a single worker at a time, and a single worker occupies an entire machine.
However, if you have large multi-core machines and multi-threaded tasks, you will want one worker to manage multiple tasks running on a machine. For example, if you have a 8-core machine, then you might want to run four 2-core tasks on a single worker at once, being careful not to exceed the available memory and disk.
Two steps are needed to make this happen. First, adjust your workers to manage multiple cores at once. You can specify the exact number of cores to use like this:
% work_queue_worker --cores 8 MACHINENAME 9123
% work_queue_worker --cores 8 --memory 1000 --disk 8000 MACHINENAME 9123
Second, you must annotate every task in the worker with resource requirements in terms of cores, memory, and disk.
work_queue_task_specify_cores(t, 2); //needs 2 cores work_queue_task_specify_memory(t, 100); //needs 100 MB memory work_queue_task_specify_disk(t, 1000); //needs 1 GB disk
t.specify_cores(2) #needs 2 cores t.specify_memory(100) #needs 100 MB memory t.specify_disk(1000) #needs 1 GB disk
Note that if no requirements are specified, a task consumes an entire worker. All resource requirements must be specified in order to run multiple tasks on a single worker. For example, if you annotate a task as using 1 core, but don't specify its memory or disk requirments, Work Queue will only schedule one task to a two-core worker. However, if you annotate the core, memory, and disc requirements for a task, Work Queue can schedule two such tasks to a two-task worker, assuming it has the available memory and disk requirements for each individual task.
You may also use the --cores, --memory, and --disk options when using batch submission scripts such as condor_submit_workers or slurm_submit_workers, and the script will correctly ask the batch system for an appropiate node.
The only caveat is when using sge_submit_workers, as there are many differences across systems that the script cannot manage. For sge_submit_workers you have to specify both the resources used by the worker (i.e., with --cores, etc.) and the appropiate computing node with the -p option.
For example, say that your local SGE installation requires you to specify the number of cores with the switch -pe smp , and you want workers with 4 cores:
% sge_submit_workers --cores 4 -p "-pe smp 4" MACHINENAME 9123
If you find that there are options that are needed everytime, you can compile CCTools using the --sge-parameter . For example, at Notre Dame we automatically set the number of cores as follows:
% ./configure --sge-parameter '-pe smp $cores'
So that we can simply call:
% sge_submit_workers --cores 4 MACHINENAME 9123
The variables $cores , $memory , and $disk , have the values of the options passed to --cores, --memory, --disk.
A Work Queue foreman allows Work Queue workers to be managed in an hierarchical manner. Each foreman connects to the Work Queue master and accepts tasks as though it were a worker. It then accepts connections from Work Queue workers and dispatches tasks to them as if it were the master.
A setup using foremen is beneficial when there are common files that need to be transmitted to workers and cached for subsequent executions. In this case, the foremen transfer the common files to their workers without requiring any intervention from the master, thereby lowering the communication and transfer overheads at the master.
Foremen are also useful when harnessing resources from multiple clusters. A foreman can be run on the head node of a cluster acting as a single communications hub for the workers in that cluster. This reduces the network connections leaving the cluster and minimizes the transfer costs for sending data into the cluster over wide area networks.
To start a Work Queue foreman, invoke work_queue_worker with the --foreman option. The foreman can advertise a project name using the -f,--foreman-name option to enable workers to find and connect to it without being given its hostname and port. On the other end, the foreman will connect to the master with the same project name specified in -M argument (alternatively, the hostname and port of the master can be provided instead of its project name).
For example, to run a foreman that works for a master with project name myproject and advertises itself as foreman_myproject:
% work_queue_worker -f foreman_myproject -M myproject
To run a worker that connects to a foreman, specify the foreman's project name in the -N option. For example:
% work_queue_worker -N foreman_myproject
By default, Work Queue does not perform any authentication, so any workers will be able to connect to your master, and vice versa. This may be fine for a short running anonymous application, but is not safe for a long running application with a public name.
We recommend that you enable a password for your applications. Create a file (e.g. mypwfile) that contains any password (or other long phrase) that you like (e.g. This is my password). The password will be particular to your application and should not match any other passwords that you own. Note that the contents of the file are taken verbatim as the password; this means that any new line character at the end of the phrase will be considered as part of the password.
Then, modify your master program to use the password:
work_queue_specify_password_file(q,mypwfile);
q.specify_password_file(mypwfile)
And give the --password option to give the same password file to your workers:
work_queue_worker --password mypwfile MACHINENAME 9123
With this option enabled, both the master and the workers will verify that the other has the matching password before proceeding. The password is not sent in the clear, but is securely verified through a SHA1-based challenge-response protocol.
Work Queue can be set up to print debug messages at the master and worker to help troubleshoot failures, bugs, and errors.
When using the C API include the debug.h header to enable the debug messages at the master:
#include <debug.h> cctools_debug_flags_set("all");
In Perl and Python, simply do:
cctools_debug_flags_set("all");
cctools_debug_flags_set("all")
The all flag causes debug messages from every subsystem called by Work Queue to be printed. More information about the debug flags are here.
You can also redirect the debug messages to a file:
cctools_debug_config_file("wq.debug");
cctools_debug_config_file("wq.debug")
To enable debugging at the worker, set the -d option:
work_queue_worker -d all MACHINENAME 9123
To redirect the debug messages, specify the -o option:
work_queue_worker -d all -o worker.debug MACHINENAME 9123
work_queue_specify_log(q, "mylogfile");
q.specify_log("mylogfile")
% work_queue_graph_log -o myplots mylogfile % ls % ... myplots.tasks.png myplots.tasks-log.png myplots.time.png myplots.time-log.png ...
The output printed by a task to stdout can be accessed in the output buffer in work_queue_task struct. The size of output is limited to 1 GB. Any output beyond 1 GB will be truncated. So, please redirect the stdout of the task to a file and specify the file as an output file of the task using work_queue_task_specify_file (specify_file in Python) as described above.
work_queue_task_specify_file(t,"a.$OS.$ARCH","a",WORK_QUEUE_INPUT,WORK_QUEUE_CACHE);
t.specify_file("a.$OS.$ARCH","a",WORK_QUEUE_INPUT,cache=True)
Note this feature is specifically designed for specifying and distingushing input file names for different platforms and architectures. Also, this is different from the $WORK_QUEUE_SANDBOX shell environment variable that exports the location of the working directory of the worker to its execution environment.
t = work_queue_cancel_by_tasktag(q,"task3");
t = q.cancel_by_tasktag("task3")
t = work_queue_wait(q, SECONDS); //if t fails given a worker misconfiguration: work_queue_blacklist_add(q, t->hostname);
t = q.wait(SECONDS) # if t fails given a worker misconfiguration: q.blacklist(t.hostname)
t = work_queue_wait(q, SECONDS); # if t fails given a worker misconfiguration: work_queue_blacklist_add(q, t->{hostname});