Submitting a workflow to Condor via SOAP using Java

November 2, 2009 by spinningmatt

In Condor, a workflow is called a DAG. DAG stand for directed acyclic graph. DAGs in Condor are managed by processes called condor_dagman. condor_dagman is a program that takes a description of a DAG as input and walks it; submitting jobs and monitoring their progress. The condor_dagman process itself is often submitted to Condor and managed by the Schedd like any other job. The only thing special about condor_dagman jobs is that they are run on the same machine as the Schedd. Typically in Condor’s Scheduler Universe. Historical note: the first job submitted to Condor via SOAP was a DAG.

Here’s a recipe for submitting a DAG to Condor via SOAP:

0) Get your favorite SOAP library. Here we’ll use Apache Axis: http://www.apache.org/dyn/closer.cgi/ws/axis/1_4

You’ll want to untar it:

$ tar zxf axis-bin-1_4.tar.gz

1) Generate and compile your SOAP library code

# To avoid passing -classpath to java and javac, set CLASSPATH in your env
$ export CLASSPATH=.:$(echo axis-1_4/lib/*.jar | tr ' ' ':')

# Have Axis generate Java stubs
$ java org.apache.axis.wsdl.WSDL2Java file:///usr/share/condor/webservice/condorSchedd.wsdl

# Compile the stubs
$ javac condor/*.java

2) Write a DAG. This one is pretty simple, it’s a diamond: D depends on B and C, B and C depend on A:

~/dagman/diamond.dag:
Job  A  A.submit
Job  B  B.submit
Job  C  C.submit
Job  D  D.submit
PARENT A CHILD B C
PARENT B C CHILD D

~/dagman/A.submit:
executable = /bin/sleep
arguments = 112
output = A.out
log = diamond.log
queue

~/dagman/B.submit:
executable = /bin/sleep
arguments = 358
output = B.out
log = diamond.log
queue

~/dagman/C.submit:
executable = /bin/sleep
arguments = 132
output = C.out
log = diamond.log
queue

~/dagman/D.submit:
executable = /bin/sleep
arguments = 134
output = D.out
log = diamond.log
queue

3) Write a program that will submit your DAG via SOAP.

The Schedd’s Submit() function takes a ClassAd representing a job. What condor_submit does is take a submit file and convert it into a ClassAd. To look at a submit file for a DAG run condor_submit_dag -no_submit diamond.dag and read diamond.dag.condor.sub. That will give you a hint at what kind of environment condor_dagman wants to run in. Note: remove_kill_sig, arguments, environment and on_exit_remove.

However, diamond.dag.condor.sub is not a ClassAd. To see the ClassAd you can run diamond.dag.condor.sub through condor_submit. Do so with condor_submit -dump diamond.dag.condor.sub.ad diamond.dag.condor.sub. Have a look at diamond.dag.condor.sub.ad and note RemoveKillSig, Arguments, Env and OnExitRemove.

Now you have the basis for what a DAG job needs to run. In the example code I used a little extra knowledge to generate the ClassAd, for brevity. You can use the Schedd’s CreateJobTemplate function to help generate the ClassAd for you instead. Extending arrays is kinda annoying in Java, but cake in python.

Start with this example: CondorSubmitDAG.java

4) Configure your condor_schedd to accept SOAP requests. The basic configuration you will need is:

ENABLE_SOAP = TRUE
ALLOW_SOAP = *
QUEUE_ALL_USERS_TRUSTED = TRUE
SCHEDD_ARGS = -p 1984

This configuration lets anyone talk to your Schedd and submit jobs as any user. For deployment, you should restrict this access by narrowing the ALLOW_SOAP and by setting QUEUE_ALL_USERS_TRUSTED to FALSE. Note: changing QUEUE_ALL_USERS_TRUSTED requires that clients can authenticate themselves via SSL.

This configuration also gives you a fixed port, 1984, for the condor_schedd. Otherwise the port is ephemeral, and you’ll have to query the Collector to find it.

5) Compile your submission program and submit your DAG.

$ javac CondorSubmitDAG.java

$ java CondorSubmitDAG http://localhost:1984 soapmonkey /some/shared/space/where/you/put/dagman/diamond.dag

You can watch the DAG run with condor_q, and condor_q -dag. Don’t be afraid of the —????— for the DAG itself, that’s just because I pruned the job ad to the bare minimum to run. condor_q expects a few extra attributes to be present. If you use CreateJobTemplate you’ll get all the attributes condor_q wants.

condor_master for managing processes

October 21, 2009 by spinningmatt
#!/usr/bin/env python

from socket import socket, htons, AF_INET, SOCK_DGRAM
from array import array

def send_alive(pid, timeout=300, master_host="127.0.0.1", master_port=1271):
    """
    Send a UDP packet to the condor_master containing the
    DC_CHILDALIVE command.

    This will have the master register a trigger to fire in timeout
    seconds. When the trigger fires the pid will be killed. Each time
    the DC_CHILDALIVE is sent the trigger's timer is reset to fire in
    timeout seconds.

    DC_CHILDALIVE should be sent every timeout/3 seconds.
    """
    sock = socket(AF_INET, SOCK_DGRAM)
    sock.sendto(build_message(pid, timeout), (master_host, master_port))

def build_message(pid, timeout):
    """
    Build a datagram packet to send to the condor_master.

    The package format is (command, pid, timeout). The command is
    always DC_CHILDALIVE (the integer 60008). The pid is the pid of
    the process the master is monitoring, i.e. getpid if this
    script. The timeout is the amount of time, in seconds, the master
    will wait before killing the pid. Each field in the packet must be
    8 bytes long, thus the padding.
    """
    DC_CHILDALIVE = 60008

    message=array('H')
    message.append(0); message.append(0); message.append(0) # padding
    message.append(htons(DC_CHILDALIVE))
    message.append(0); message.append(0); message.append(0) # padding
    message.append(htons(pid))
    message.append(0); message.append(0); message.append(0) # padding
    message.append(htons(timeout))

    return message.tostring()

#
# The condor_master is a daemon that can run arbitrary executables,
# monitor them for failures, restart them, watch for executable
# updates, and send obituary emails.
#
# The condor_master can monitor any program it starts for abnormal
# termination, e.g. return code != 0 or caused by a signal. It can
# also detect hung processes if they periodically send DC_CHILDALIVE
# commands.
#
# The condor_master cleans up process trees, not just the processes it
# directly started.
#
# Example usage...
#
# Run this script from the condor_master, e.g.
#  env CONDOR_CONFIG=ONLY_ENV \
#      _CONDOR_WATCH_UDP_COMMAND_SOCKET=TRUE \
#      _CONDOR_NETWORK_INTERFACE=127.0.0.1 \
#      _CONDOR_MASTER_LOG=master.log \
#      _CONDOR_MASTER=$(which condor_master) \
#      _CONDOR_PROG=$(which <this script>) \
#      _CONDOR_PROC_ARGS="-some -args 3" \
#      _CONDOR_DAEMON_LIST=MASTER,PROG \
#      condor_master -p 1271 -pidfile master.pid
#
# At some point kill -STOP or -TERM this script and watch the
# condor_master react.
#
# condor_master configuration/features:
#
#  CONDOR_CONFIG=ONLY_ENV
#    - useful, only look in the ENV for config, no config file needed
#
#  _CONDOR_WATCH_UDP_COMMAND_SOCKET=TRUE
#    - required, make master listen for UDP commands
#
#  _CONDOR_NETWORK_INTERFACE=<ip>
#    - useful, make master listen on <ip>
#
#  _CONDOR_MASTER_LOG=<file>
#    - required, the master's log file
#
#  _CONDOR_MASTER_DEBUG=<level>
#    - D_ALWAYS is default D_FULLDEBUG shows a lot more
#
#  _CONDOR_MASTER=<path to condor_master>
#    - required, master will restart itself if its executable is
#      updated
#
#  _CONDOR_PROG=<path to this script>
#    - required, the path to an executable the master will start and
#      monitor
#
#  _CONDOR_PROG_ARGS=<arguments>
#    - useful, if the executable needs the master to pass it arguments
#
#  _CONDOR_DAEMON_LIST=MASTER,PROG
#    - required, list of executables the master will monitor
#
#  _CONDOR_MAIL=/bin/mail
#    - useful, no default, if email notification of PROG hang/crash is
#      desired. don't specify and get no emails.
#
#  _CONDOR_CONDOR_ADMIN=admin@fqdn
#    - useful, the address to send emails to
#
#  _CONDOR_MASTER_BACKOFF_CONSTANT, C, default 9 seconds
#  _CONDOR_MASTER_BACKOFF_FACTOR, K, default 2 seconds
#  _CONDOR_MASTER_BACKOFF_CEILING, T, default 3600 seconds
#  _CONDOR_MASTER_RECOVER_FACTOR, default 500 seconds
#    - useful, parameters to control the exponential backoff
#      start delay = min(T, C + K^n),
#        n is the # of restarts since last recovery
#        a recovery is a run of RECOVER_FACTOR without a crash
#
#  _CONDOR_MASTER_CHECK_NEW_EXEC_INTERVAL=<seconds>
#    - useful, default 300, how often to check executable timestamps
#      and potentially restart processes
#
#  _CONDOR_MASTER_NEW_BINARY_DELAY=<seconds>
#    - useful, default 120, time waited after noticing a timestamp
#      change before restarting the executable
#
#  condor_master -p <port> -pidfile <file>
#    - specify a port for the master to listen on, default is
#      ephemerial, and specify a file where the master writes its pid,
#      default is nowhere
#      also: -t (log to stdout) -f (run in foreground)
#
# Advanced note: condor_squawk can send DC_CHILDALIVE as well, e.g.
#    $ echo "command DC_CHILDALIVE 1234 15" | condor_squawk "<127.0.0.1:1271>"
#

import time, os

timeout = 30
while True:
    send_alive(os.getpid(), timeout)
    time.sleep(timeout/3)

Live Migration: How it could work

August 25, 2009 by spinningmatt

The Condor infrastructure historically prefers jobs that run on a fixed set of resources. A parallel universe job may run in multiple resources at once, but won’t add or remove resources dynamically. All other jobs just run on a single resource.

Condor has long had functionality to periodically checkpoint a process and resume from a checkpoint. Historically the functionality was part of the standard universe. It is also available in the virtual machine universe.

The checkpoint and resume functionality can provide a basis for migration. For instance, a job may run, checkpoint, be evicted and then be run again. The second time the job will start from the last checkpoint to minimize bad put. This is a powerful mode of operation that handles many use cases. One use case not handled is live migration. At issue is the be evicted step in the process.

Migration is about moving a process from one resource to another. Whenever a process migrates there will be some interruption in its execution. Live migration is about minimizing that interruption.

An eviction in Condor is when an executing job is removed from the resource where it is running and placed back into the queue as an idle job ready to be scheduled and run again. This process often does not take less than 10s of seconds. It involves terminating the job and transferring any state off the execution resource, waiting for a negotiation cycle where a new resource is found for the job, transferring the jobs state the to new resource and starting the job.

This is clearly not live migration, and it is not intended to be.

Live migration could be something that Condor’s scheduler and execution infrastructure becomes deeply aware of, but the existing infrastructure may already be enough. Generally, it is preferable to write general, instead of specialized, features into the infrastructure. It is better to either show the infrastructure simply cannot handle a use case and needs to be extended, or show that it can handle a use case but an extension will simplify the case and simplify or enable others.

Imagine,

0) job A is submitted
1) job A starts running on resource X
2) job B is submitted, containing knowledge of job A
3) job B starts running on resource Y

Now, job A and B are special in that A knows how to transfer its state to someone else when asked and B knows how to ask A to transfer its state.

4) job B asks job A to transfer state and execution to it
5) after the transfer, job B starts executing and job A terminates

In the end, the new job B has taken over for job A, which may no longer exist.

The semantics jobs A and B must provide are not very complex. The details in implementing them very well may be. Many research and production technologies have tried to generally address this problem. One such, currently popular, technology is virtualization. Any virtualization technology worth its salt can handle migration between resources and often with minimal execution interruption.

Condor’s virtual machine universe uses the libvirt API, which provides it the ability to direct different hypervisors in performing migrations. Jobs that run on Condor can do the same. To actually do virtual machine migration in Condor a number of details would need to be worked out: who is allowed to initiate the migration; who should be terminating the source job (job A); what to do about different network topologies; how to most transparently link the source and destination (job B) jobs; how to be tolerant to faults before, during and after transfer; how to evaluate and pick a particular technology for a particular workload; what general enhancements can or should go into the infrastructure; … . It can be done, and not everything must be done to have a useful migration solution.

When it comes to general features for the infrastructure, one noticeable downsides to the procedure above is anything tracking job A has to know that job A’s termination signals the need to track job B instead of a normal termination. The transfer operation between jobs A and B has to be robust. It must handle the cases where the transfer fails, potentially due to either job A or B being evicted. It is likely not the case that job A and B should both think they are the owner of their shared state. All this information could be discernible at the tracking level, or it could be completely invisible. Often it is desirable to be complete invisible. Making the information invisible means providing a mechanism to link job A and B as a single unit in the infrastructure. Also, being invisible cannot involve hiding or swallowing critical information.

Another potential infrastructure enhancement is a means to link two jobs, though they may not be presented as a single unit. For instance, job B could be linked with job A meaning it can find out where job A is running, and job A could contain policy to officially terminate once job B has taken over execution. The ability to link jobs is actually one that is often requested. For instance, being able to have a set of jobs that always run on separate resources is quite desirable. Simplistically, the possibility of a STARTD_SLOT_ATTRS for jobs could be quite powerful.

Beyond tracking and sharing, the ability to have a multi-homed or multi-execution job could attack the common long tail in large workloads. When only a few jobs are left to be run, they could be matched and executed on multiple resources, with all but the first to complete being discarded.

Yet another enhancement that could assist in improving migration support, is the ability for a job to be matched with a resource while it is already executing. For instance, a running job might have ended up on a low ranked execution resource. It is stuck running there until it completes or gets evicted. It cannot express policy that states: I’ll run on my current resource until I complete, unless a higher ranked resource shows up in the first 5 minutes of my execution, in which case I want to be restarted on the other resource.

All these potential Condor enhancements would help in implementing or optimizing a migration solution, but are not specific to migration and do not necessarily involve deep knowledge about migration to be introduced to the scheduler or execution resources.

Condor and the Cloud @ Open Source Cloud Computing Forum, 22 July 2009

July 24, 2009 by spinningmatt

Red Hat hosted an Open Source Cloud Computing forum on 22 July 2009. A number of technologies for building clouds were presented, including my session on Condor. The proceedings of the forum will be available online for 12 months. The Condor and the Cloud presentation is also attached to this blog.

Submitting to Condor from QMF: A minimal Vanilla Universe job

April 30, 2009 by spinningmatt

Part of Red Hat’s MRG product is a management system that covers Condor. At the core of that system is the Qpid Management Framework (QMF), built on top of AMQP. Condor components modeled in QMF allow for information retrieval and control injection. I previously discussed How QMF Submission to Condor Could work, and now there’s a prototype of that interface.

Jobs in Condor are represented as ClassAds, a.k.a. a job ad, which are essentially schema-free. Any schema they have is defined by the code paths that handle jobs. This includes not only the name of attributes but also the types of their values.. For instance, a job ad does not have to have an attribute that describes what program to execute, the Cmd attribute whose value is a string, but if it does then the job can actually run.

Who cares what’s in a job ad?

Most all components in a Condor system handle jobs in one way or another, which means they all help define what a job ad looks like. To complicate things a bit, different components handle jobs in different ways depending on the job’s type or requested features. For simplicity I’m only going to discuss jobs that want to run in the Vanilla Universe with minimal extra features, e.g. no file transfer requests.

The Schedd is the first component that deals with jobs. Its purpose is to manage them. It insists all jobs it manages have: an Owner string, an identify of the job’s owner; a ClusterId string, an identifier assigned by the Schedd; a ProcId string, an identifier within the ClusterId, also assigned by the Schedd; a JobStatus integer, specifying the state the job is in, e.g. Idle, Held, Running; and, a JobUniverse integer, denoying part of a job’s type, e.g. Vanilla Universe or Grid Universe.

The Negotaitor is sent jobs by the Schedd to perform matching with machines. To do that matching it requires that a job have a Requirements expression.

The Shadow helps the Schedd manage a job while it is running. The Schedd gives it the job ad to the Shadow, and the Shadow insists on finding an Iwd string representing a path to the job’s initial working directory.

The Startd handles jobs similarly to the Negotiator. Since matching is bi-directional in Condor and execution resources get final say in running a job, the Startd evaluates the job’s Requirements expression before accepting it.

The Starter is responsible for actually running the job. It requires that a job have the Cmd string attribute. Without one the Starter does not know what program to run. An additional requirement the Starter imposes is the validity of the Owner string. If the Starter is to impersonate the Owner, then the string must specify an identify known on the Starter’s machine.

Now, those are not the only attributes on a job ad. Often components will fill in sane defaults for attributes they need but are not present. For instance, the Shadow will fill in the TransferFiles string, and the Schedd will assume a MaxHosts integer.

What about tools?

It’s not just the components that manage jobs that care about attributes on a job. The condor_q command-line tool also expects to find certain attributes to help it display information. In addition to those attributes already described, it requires: a QDate integer, the number of seconds since EPOCH when the job was submitted; a RemoteUserCpu float, an attribute set while a job runs; a JobPrio integer, specifying the job’s relative priority to other jobs; and, an ImageSize integer, specifying the amount of memory used by the job in KiB.

What does a submitter care about?

Anyone who wants to submit a job to Condor is going to have to specify enough attributes on the job ad to make all the Condor components happy. Those attributes will vary depending on the type of job and the features it desires. Luckily Condor can fill in many attributes with sane defaults. For instance, condor_q wants a QDate and a RemoteUserCpu. Those are two attributes that the submitter should not be able to or have to specify.

To perform a simple submission for running a pre-staged program, the job ad needs: a Cmd, a Requirements, a JobUniverse, an Iwd, and an Owner. Additionally, an Args string is possible if the Cmd takes arguments.

Given the knownledge of what attributes are required on a job ad, and using the QMF Python Console Tutorial, I was able to quickly write up an example program to submit a job via QMF.

#!/usr/bin/env python

from qmf.console import Session
from sys import exit

(EXPR_TYPE, INTEGER_TYPE, FLOAT_TYPE, STRING_TYPE) = (0, 1, 2, 3)
UNIVERSE = {"VANILLA": 5, "SCHEDULER": 7, "GRID": 9, "JAVA": 10, "PARALLEL": 11, "LOCAL": 12, "VM": 13}
JOB_STATUS = ("", "Idle", "Running", "Removed", "Completed", "Held", "")

ad = {"Cmd":          {"TYPE": STRING_TYPE,  "VALUE": "/bin/sleep"},
      "Args":         {"TYPE": STRING_TYPE,  "VALUE": "120"},
      "Requirements": {"TYPE": EXPR_TYPE,    "VALUE": "TRUE"},
      "JobUniverse":  {"TYPE": INTEGER_TYPE, "VALUE": "%s" % (UNIVERSE["VANILLA"],)},
      "Iwd":          {"TYPE": STRING_TYPE,  "VALUE": "/tmp"},
      "Owner":        {"TYPE": STRING_TYPE,  "VALUE": "nobody"}}

session = Session(); session.addBroker("amqp://localhost:5672")
schedulers = session.getObjects(_class="scheduler", _package="mrg.grid")
result = schedulers[0].Submit(ad)

if result.status:
    print "Error submitting job:", result.text
    exit(1)

print "Submitted job:", result.Id

jobs = session.getObjects(_class="job", _package="mrg.grid")
job = reduce(lambda x,y: x or (y.CustomId == result.Id and y), jobs, False)
if not job:
    print "Did not find job"
    exit(2)

print "Job status:", JOB_STATUS[job.JobStatus]
print "Job properties:"
for prop in job.getProperties():
    print " ",prop[0],"=",prop[1]

The program does more than just submit, it also looks up the job based on what has been published via QMF. The job that it does submit runs /bin/sleep 120 as the nobody user from /tmp on any, Requirements = TRUE, execution node.

The job’s ClassAd is presented as nested maps. The top level map holds attribute names mapped to values. Those values are themselves maps that specify the type of the actual value and a representation of the actual value. All representations of values are strings. The type specifies how the string should be handled, e.g. if it should be parsed into an int or float.

Two good sources of information about job ad attributes are the UW’s Condor Manual’s Appendix, and the output from condor_submit -dump job.ad when run against a job submission file.

Network use by advertisements in a Condor based Grid

April 11, 2009 by spinningmatt

It is certainly a good idea to have an understanding of how information flows through a distributed system when you consider deploying it, or when you want to build an application on top of it.

Condor has three core protocols: advertisement, negotiation and execution. The advertisement protocol keeps components apprised of what other components are in the pool and their state. This works by all components sending information about themselves, in the form of a ClassAd called an ad, to the pool’s Collector. The Schedulers, condor_schedd, send Schedd Ads, the execute nodes, running the condor_startd, send Startd Ads, and so on.

The Collector, the condor_collector daemon, aggregates all the ads and provides an interface to query them. It is a bootstrap. An important part of the advertisement protocol is the Collector does not hold onto ads forever. Doing so would make the Collector essentially a huge memory leak, introduce start up ordering issues, e.g. Collector first, and complicate the case where the Collector fails and all components need to re-advertise. Since the Collector does not hold onto ads forever, all components must periodically advertise themselves.

The Rates

There are two useful sets of rates in the system: the baseline rates and the activity driven rates.

Baseline

The rates are defined for each component in the pool, and specify how often they publish information about themselves.

 Ad Type        Publisher    Frequency  Config
  Collector      Collector    15 min     COLLECTOR_UPDATE_INTERNVAL
  Negotiator     Negotiator    5 min     NEGOTIATOR_UPDATE_INTERNVAL
  DaemonMaster   Master        5 min     MASTER_UPDATE_INTERVAL
  Scheduler      Schedd        5 min     SCHEDD_INTERVAL
  Submitter      Schedd        5 min     SCHEDD_INTERVAL
  HAD            HAD           5 min     HAD_UPDATE_INTERVAL
  Grid           Gridmanager   5 min     GRIDMANAGER_COLLECTOR_UPDATE_INTERVAL
  Machine        Startd        5 min     UPDATE_INTERVAL
 (Note: Missing Quill and Standard Universe related components)

Of these rates the most interesting are the Machine, Master, and Submitter. There are going to be more of those in the pool than anything else. Every core on every execution node has a Machine ad, every physical node has a Master ad, and each submitter, a user with jobs, has a Submitter ad.

In a pool with 10K physical 4 core execute nodes, the baseline advertisement rate is 10K (Master ads) + 10K*4 (Machine ads) / 5 minutes = 50K ads / 5 minutes = 10K ads / minute ~= 167 ads / second. This is optimistic. There will be spikes.

Activity Driven

Activity driven advertisements happen when some component changes state in a meaningful way that should be shared with other components. The two primary sources of state change in a pool come from submitted jobs, and activity on execution nodes.

When a job is submitted it initiates a state change in the Scheduler, which initiates the negotiation protocol, which results in state changes in execution nodes and initiates the execution protocol, which results in changes to Scheduler and execute node state.

Independent of a submitted job, an execution node can change state based on policy, e.g. a user accesses the node, administrative activities cause load or time passes.

All these state changes can be complex to model. However, one aspect of them is directly controllable, and useful to know when monitoring a pool. While a job is running there are periodic updates from the execution node to the Scheduler.

 Publisher    Consumer    Frequency  Config
  Starter      Shadow       5 min     STARTER_UPDATE_INTERVAL
  Shadow       Schedd      15 min     SHADOW_QUEUE_UPDATE_INTERVAL

This means, while a job is running every 15 minutes the Scheduler gets an update on the job’s activity. Of course, state changes for the job, such as completion or eviction are immediately propagated.

In a pool with 40K running jobs, the rate of updates to the Scheduler is 40K / 15 minutes ~= 45 updates / second. With updates to the Scheduler’s machine coming in at 40K / 5 minutes ~= 134 updates / second.

Later

What’s the size of a ad or update on the wire and in memory?

How QMF Submission to Condor Could Work

March 23, 2009 by spinningmatt

I recently goofed and told someone that they could use the Qpid Management Framework (QMF) to submit jobs to Condor. What I meant to say is they can use AMQP. This is maybe understandable because QMF is a management framework built on top of AMQP, and MRG Grid already has many parts of Condor modeled in QMF, but submission via QMF could be very different than via AMQP.

QMF is a framework that allows for the modeling of objects that can publish information about themselves as well as respond to actions. All information and control is sent via AMQP messages.

Along with a quick correction to my comment, s/QMF/AMQP/, I went ahead and mocked up a QMF submission interface to make my comment almost true.

Existing Submission Interfaces

Condor already has a number of submission interfaces: the command-line tools, e.g. condor_submit; a GAHP interface, the condor_c-gahp; a SOAP interface, once termed Birdbath; the previously mentioned AMQP interface; and a few others. So, what’s one more? Or, why one more!?

Command-line Interface

The command-line interface is the default means for submitting jobs to Condor’s Scheduler, the condor_schedd. The condor_submit tool takes a job description file, performs some processing on it, and generates one or many ClassAds representing jobs, a.k.a job ads. The condor_schedd only cares about job ads, and is never exposed to the job description file. condor_submit’s processing is sometimes shallow, e.g. executable = /bin/true becomes Cmd = "/bin/true", and sometimes not, e.g. getenv = TRUE becomes Environment = "<contents of env for condor_submit>". Sometimes the processing is even iterative in nature, e.g. queue 1000000 generates one million copies of the job constructed since the last queue command. The job description file is really a script in the condor_submit language that generates jobs. This makes the condor_submit tool thick, and optimizations that it performs requires it to be tightly integrated with condor_schedd.

SOAP Interface

The SOAP interface (starts slide 15) is very different from condor_submit. It is implemented within the condor_schedd, and exposes a transactional interface that accepts job ads as input. This means no high level job description file processing. It also means the thick condor_submit tool could be implemented on top of the SOAP interface. A job ad that might be submitted via SOAP would look like:

   [Owner="matt";
    Cmd="/bin/echo";
    Arguments="Hello there!";
    JobUniverse=5;
    ...;
    Requirements=TRUE]

This is a job ad that may have been created from a job description file like:

   executable = /bin/echo
   arguments = Hello there!
   requirements = TRUE
   queue 1

Pass that to condor_submit -dump job.ad to have a look.

A QMF Interface

So, what about a QMF submission interface. A nice aspect of the condor_submit interface is the script nature of the input. Unfortunately, there are some things that cannot be cleanly captured on the remote side of a submission, e.g. the getenv command, transfer_input_files, platform specific requirements bits, or working directory information. To some extent these reasons, and the desire to keep script processing out of the condor_schedd, is why the SOAP interface only deals in job ads. It’s also a reason why a QMF interface should only handle job ads.

A benefit of the SOAP interface is, quite obviously, that it makes for a more natural programmatic interface. Unfortunately, it also exposes concepts and optimizations that are used by condor_submit and may not be needed by other submission programs, e.g. transactions and clusters.

A Submission

One thing that is an afterthought when using both interfaces is the notion of a submission, something that binds together jobs based on their overall purpose. Often a cluster is thought of as the means to group jobs. However, a single job description file can generate multiple clusters. Likewise, the SOAP interface can allow for group all jobs into a single cluster, but if one of the jobs is a DAGMan workflow then the point of the single cluster is violated. The use of clusters to associate jobs is broken.

Two things the QMF interface can do are: 1) simplify the operations required to perform a submission; and, 2) motivate its users to materialize the notion of a submission.

A QMF submission API

   submit, ClassAd -> Id : Submit a new job described by ClassAd

and,

   create, void -> Id : Create a transaction to submit data and a job ad
   send, Id x Data -> void : Spool data for a forthcoming job ad

This interface would be a great simplification over the SOAP API. It eliminates the necessity of a transaction and chunked data transfers, and it does not expose the notion of a cluster. Without a cluster, job association must be done in some other way. The natural way is via an attribute on job ads, including DAGMan jobs. All jobs in a submission could have an attribute Submission = "Monday Parameter Sweep Run, features: A, B, D", a +Submission = "Monday..." in a job description file.

This interface does not have some of the high level niceties of a condor_submit submission. However, those niceties are not necessarily the ability to do many things with one line, e.g. queue 100, but to have a well defined description of a job. Understanding executable becomes the Cmd attribute is one thing, knowing universe = vanilla becomes JobUniverse = 5 is significantly different. Shortcomings in the high level interface can be addressed with improved specification for a job ad.

Submitting jobs with AMQP to a Condor based Grid

March 8, 2009 by spinningmatt

I recently took MRG Grid’s low-latency feature for a spin. It’s basically a way to use AMQP to submit jobs onto Condor managed nodes, but without going through normal scheduling, called matchmaking. The jobs are AMQP messages, and they don’t go to a Condor scheduler (a Schedd) to be matched with an execute node (a Startd). They go directly to the execute nodes, and they can go there really quickly.

This all works because Condor nodes can operate in a very autonomous fashion. Even when traditional jobs get matched with a node by the global scheduler (the Negotiator), the node always has the final say about if it will run the matched job or not. This has all sorts of benefits in a distributed system, but for low-latency it means that an execute node can take jobs from the Condor scheduler or from some other source. That other source here is an AMQP message queue.

To get going I setup the low-latency feature as described in MRG Grid User Guide: Low Latency Scheduling with the help of condor_configure_node. Since I was doing this using MRG I also used the Qpid implementation of AMQP. Qpid comes in the qpidc package, and in qpidc-devel I found some really helpful examples – the direct example and request-response client were all I needed.

The code’s flow is pretty natural…

Setup the AMQP queues

From the low-latency config (carod.conf) I know that jobs are read from a queue named grid. So the first thing I need to do is setup a way to send messages to the grid queue.

   session.queueDeclare(arg::queue="grid");
   session.exchangeBind(arg::exchange="amq.direct",
                        arg::queue="grid",
                        arg::bindingKey="grid_key");

Next, I want to make sure there’s a way for result to get back to me, so I setup my own unique queue where I can receive results.

   session.queueDeclare(arg::queue="my_grid_results",
				 // Automatically delete the queue
				 // when finished
			 arg::exclusive=true, arg::autoDelete=true);
   session.exchangeBind(arg::exchange="amq.direct",
                        arg::queue="my_grid_results",
                        arg::bindingKey="my_grid_results_key");

Construct the Condor job

With the queues all setup, I need to construct a job. In Condor all jobs are represented as ClassAds, which is basically a bunch of (attribute,value) pairs. The low-latency code needs a job in a similar form, and does so with the message’s application headers.

   message.getHeaders().setString("Cmd", "\"/bin/echo\"");
   message.getHeaders().setString("Arguments", "\"Hello Grid!\"");
   message.getHeaders().setString("Out", "\"job.out\""); // stdout
   message.getHeaders().setString("Err", "\"job.err\""); // stderr
   message.getHeaders().setString("Requirements", "TRUE");

Immediately, it looks like crazy quoting going on here, and there is. In ClassAds, the values of attributes are typed in expected ways, e.g. ints floats strings, but there is also an expression type that can be evaluated to a boolean. An expression is something like FALSE, ClusterId > 15, ((KeyboardIdle 2 * 60) && (CurrentTime - JobStart) > 90)) or regexp(".*mf.*", Name). So, to distinguish between a string and an expression, strings are quoted and expressions are not, e.g. "/bin/echo" is a string and TRUE is an expression.

By the way, at first I thought I’d need to specify the MyType, TargetType, Owner, User, In, JobUniverse, … attributes found on traditional jobs, but I discovered MyType, TargetType, and In were superfluous, and JobUniverse has a sane default that can run /bin/echo, or essentially anything. I also didn’t care about the identify of the job owner for accessing resources, so I skipped the Owner and User attributes. The job just ran as the nobody user on the execute node.

One thing that I did on purpose was set the Requirements to TRUE, which means that job is happy to run anywhere. I can do that because I know all the nodes I setup to run jobs have /bin/echo.

Send and receive messages

There are a few final steps to do to setup the job message before it can be sent. It needs a unique message id, required by the low-latency feature.

   message.getMessageProperties().setMessageId(Uuid(true));

And, it needs to pass along information about where to send results.

   message.getMessageProperties().setReplyTo(ReplyTo("amq.direct",
                                                     "my_grid_results_key"));

Finally, the message can be sent.

   message.getDeliveryProperties().setRoutingKey("grid_key");
   session.messageTransfer(arg::content=message,
                           arg::destination="amq.direct");

This got the job out onto the network, but didn’t actually give me a way to get the results back. For that I setup a function to receive messages on the my_grid_results queue.

void
Listener::received(Message &message) {
   const MessageProperties properties = message.getMessageProperties();
   const FieldTable headers = properties.getApplicationHeaders();

   const string state = headers.getAsString("JobState");
   const int status = headers.getAsInt("JobStatus");
   const int exitCode = headers.getAsInt("ExitCode");
   const string exitBySignal = headers.getAsString("ExitBySigbal");

   std::cout
      << "Response: " << properties.getMessageId() << std::endl
      << "JobState: " << state << std::endl
      << "JobStatus: " << status << std::endl
      << "ExitCode: " << exitCode << std::endl
      << "ExitBySignal: " << exitBySignal << std::endl
      << "Is Body Empty? " << (message.getData().empty() ? "Yes" : "No") << std::endl;
//    << headers << std::endl;

   if ("\"Exited\"" == state && 4 == status) {
         // There were some results returned, they're stored in the
         // message body as a zip archive
      if (!message.getData().empty()) {
         std::ofstream out;
         out.open("job.results");
         out << message.getData();
         out.close();
      }
      subscriptions.cancel(message.getDestination());
   }
}

The function is pretty simple. It prints out information about the messages on the my_grid_results queue, and when it sees a message that represents the completion of my job it writes out the results and stops listening.

To get the receive function called when messages come in, it needs to be setup and run. I started it running after I sent the job message.

   SubscriptionManager subscriptions(session);
   Listener listener(subscriptions);
   subscriptions.subscribe(listener, "my_grid_results");
   subscriptions.run();

See it all work

That was basically it. I compiled the program, ran it, and in short order found the job.results file created.

$ ./a.out
Response: 00000000-0000-0000-0000-000000000000
JobState:
JobStatus: 2
ExitCode: 0
ExitBySignal:
Is Body Empty? Yes
Response: 00000000-0000-0000-0000-000000000000
JobState: "Exited"
JobStatus: 4
ExitCode: 0
ExitBySignal:
Is Body Empty? No
$ unzip -l job.results
Archive:  job.results
  Length     Date   Time    Name
 --------    ----   ----    ----
        0  03-06-09 13:33   job.err
       12  03-06-09 13:33   job.out
 --------                   -------
       12                   2 files

What happens when my job completes is all the files in its working directory are wrapped up in a zip archive and sent back in the body of a message that has a JobState header with a value of "Exited" and a JobStatus of 4. JobStatus 4 means Completed in the Condor world.

That’s pretty much it. The full example is in LL.cpp.

Hello world!

March 8, 2009 by spinningmatt

Hello blog!