Archive for April, 2009

Submitting to Condor from QMF: A minimal Vanilla Universe job

April 30, 2009

Part of Red Hat’s MRG product is a management system that covers Condor. At the core of that system is the Qpid Management Framework (QMF), built on top of AMQP. Condor components modeled in QMF allow for information retrieval and control injection. I previously discussed How QMF Submission to Condor Could work, and now there’s a prototype of that interface.

Jobs in Condor are represented as ClassAds, a.k.a. a job ad, which are essentially schema-free. Any schema they have is defined by the code paths that handle jobs. This includes not only the name of attributes but also the types of their values.. For instance, a job ad does not have to have an attribute that describes what program to execute, the Cmd attribute whose value is a string, but if it does then the job can actually run.

Who cares what’s in a job ad?

Most all components in a Condor system handle jobs in one way or another, which means they all help define what a job ad looks like. To complicate things a bit, different components handle jobs in different ways depending on the job’s type or requested features. For simplicity I’m only going to discuss jobs that want to run in the Vanilla Universe with minimal extra features, e.g. no file transfer requests.

The Schedd is the first component that deals with jobs. Its purpose is to manage them. It insists all jobs it manages have: an Owner string, an identify of the job’s owner; a ClusterId string, an identifier assigned by the Schedd; a ProcId string, an identifier within the ClusterId, also assigned by the Schedd; a JobStatus integer, specifying the state the job is in, e.g. Idle, Held, Running; and, a JobUniverse integer, denoying part of a job’s type, e.g. Vanilla Universe or Grid Universe.

The Negotaitor is sent jobs by the Schedd to perform matching with machines. To do that matching it requires that a job have a Requirements expression.

The Shadow helps the Schedd manage a job while it is running. The Schedd gives it the job ad to the Shadow, and the Shadow insists on finding an Iwd string representing a path to the job’s initial working directory.

The Startd handles jobs similarly to the Negotiator. Since matching is bi-directional in Condor and execution resources get final say in running a job, the Startd evaluates the job’s Requirements expression before accepting it.

The Starter is responsible for actually running the job. It requires that a job have the Cmd string attribute. Without one the Starter does not know what program to run. An additional requirement the Starter imposes is the validity of the Owner string. If the Starter is to impersonate the Owner, then the string must specify an identify known on the Starter’s machine.

Now, those are not the only attributes on a job ad. Often components will fill in sane defaults for attributes they need but are not present. For instance, the Shadow will fill in the TransferFiles string, and the Schedd will assume a MaxHosts integer.

What about tools?

It’s not just the components that manage jobs that care about attributes on a job. The condor_q command-line tool also expects to find certain attributes to help it display information. In addition to those attributes already described, it requires: a QDate integer, the number of seconds since EPOCH when the job was submitted; a RemoteUserCpu float, an attribute set while a job runs; a JobPrio integer, specifying the job’s relative priority to other jobs; and, an ImageSize integer, specifying the amount of memory used by the job in KiB.

What does a submitter care about?

Anyone who wants to submit a job to Condor is going to have to specify enough attributes on the job ad to make all the Condor components happy. Those attributes will vary depending on the type of job and the features it desires. Luckily Condor can fill in many attributes with sane defaults. For instance, condor_q wants a QDate and a RemoteUserCpu. Those are two attributes that the submitter should not be able to or have to specify.

To perform a simple submission for running a pre-staged program, the job ad needs: a Cmd, a Requirements, a JobUniverse, an Iwd, and an Owner. Additionally, an Args string is possible if the Cmd takes arguments.

Given the knownledge of what attributes are required on a job ad, and using the QMF Python Console Tutorial, I was able to quickly write up an example program to submit a job via QMF.

#!/usr/bin/env python

from qmf.console import Session
from sys import exit

UNIVERSE = {"VANILLA": 5, "SCHEDULER": 7, "GRID": 9, "JAVA": 10, "PARALLEL": 11, "LOCAL": 12, "VM": 13}
JOB_STATUS = ("", "Idle", "Running", "Removed", "Completed", "Held", "")

ad = {"Cmd":          {"TYPE": STRING_TYPE,  "VALUE": "/bin/sleep"},
      "Args":         {"TYPE": STRING_TYPE,  "VALUE": "120"},
      "Requirements": {"TYPE": EXPR_TYPE,    "VALUE": "TRUE"},
      "JobUniverse":  {"TYPE": INTEGER_TYPE, "VALUE": "%s" % (UNIVERSE["VANILLA"],)},
      "Iwd":          {"TYPE": STRING_TYPE,  "VALUE": "/tmp"},
      "Owner":        {"TYPE": STRING_TYPE,  "VALUE": "nobody"}}

session = Session(); session.addBroker("amqp://localhost:5672")
schedulers = session.getObjects(_class="scheduler", _package="mrg.grid")
result = schedulers[0].Submit(ad)

if result.status:
    print "Error submitting job:", result.text

print "Submitted job:", result.Id

jobs = session.getObjects(_class="job", _package="mrg.grid")
job = reduce(lambda x,y: x or (y.CustomId == result.Id and y), jobs, False)
if not job:
    print "Did not find job"

print "Job status:", JOB_STATUS[job.JobStatus]
print "Job properties:"
for prop in job.getProperties():
    print " ",prop[0],"=",prop[1]

The program does more than just submit, it also looks up the job based on what has been published via QMF. The job that it does submit runs /bin/sleep 120 as the nobody user from /tmp on any, Requirements = TRUE, execution node.

The job’s ClassAd is presented as nested maps. The top level map holds attribute names mapped to values. Those values are themselves maps that specify the type of the actual value and a representation of the actual value. All representations of values are strings. The type specifies how the string should be handled, e.g. if it should be parsed into an int or float.

Two good sources of information about job ad attributes are the UW’s Condor Manual’s Appendix, and the output from condor_submit -dump when run against a job submission file.

Network use by advertisements in a Condor based Grid

April 11, 2009

It is certainly a good idea to have an understanding of how information flows through a distributed system when you consider deploying it, or when you want to build an application on top of it.

Condor has three core protocols: advertisement, negotiation and execution. The advertisement protocol keeps components apprised of what other components are in the pool and their state. This works by all components sending information about themselves, in the form of a ClassAd called an ad, to the pool’s Collector. The Schedulers, condor_schedd, send Schedd Ads, the execute nodes, running the condor_startd, send Startd Ads, and so on.

The Collector, the condor_collector daemon, aggregates all the ads and provides an interface to query them. It is a bootstrap. An important part of the advertisement protocol is the Collector does not hold onto ads forever. Doing so would make the Collector essentially a huge memory leak, introduce start up ordering issues, e.g. Collector first, and complicate the case where the Collector fails and all components need to re-advertise. Since the Collector does not hold onto ads forever, all components must periodically advertise themselves.

The Rates

There are two useful sets of rates in the system: the baseline rates and the activity driven rates.


The rates are defined for each component in the pool, and specify how often they publish information about themselves.

 Ad Type        Publisher    Frequency  Config
  Collector      Collector    15 min     COLLECTOR_UPDATE_INTERNVAL
  Negotiator     Negotiator    5 min     NEGOTIATOR_UPDATE_INTERNVAL
  DaemonMaster   Master        5 min     MASTER_UPDATE_INTERVAL
  Scheduler      Schedd        5 min     SCHEDD_INTERVAL
  Submitter      Schedd        5 min     SCHEDD_INTERVAL
  HAD            HAD           5 min     HAD_UPDATE_INTERVAL
  Grid           Gridmanager   5 min     GRIDMANAGER_COLLECTOR_UPDATE_INTERVAL
  Machine        Startd        5 min     UPDATE_INTERVAL
 (Note: Missing Quill and Standard Universe related components)

Of these rates the most interesting are the Machine, Master, and Submitter. There are going to be more of those in the pool than anything else. Every core on every execution node has a Machine ad, every physical node has a Master ad, and each submitter, a user with jobs, has a Submitter ad.

In a pool with 10K physical 4 core execute nodes, the baseline advertisement rate is 10K (Master ads) + 10K*4 (Machine ads) / 5 minutes = 50K ads / 5 minutes = 10K ads / minute ~= 167 ads / second. This is optimistic. There will be spikes.

Activity Driven

Activity driven advertisements happen when some component changes state in a meaningful way that should be shared with other components. The two primary sources of state change in a pool come from submitted jobs, and activity on execution nodes.

When a job is submitted it initiates a state change in the Scheduler, which initiates the negotiation protocol, which results in state changes in execution nodes and initiates the execution protocol, which results in changes to Scheduler and execute node state.

Independent of a submitted job, an execution node can change state based on policy, e.g. a user accesses the node, administrative activities cause load or time passes.

All these state changes can be complex to model. However, one aspect of them is directly controllable, and useful to know when monitoring a pool. While a job is running there are periodic updates from the execution node to the Scheduler.

 Publisher    Consumer    Frequency  Config
  Starter      Shadow       5 min     STARTER_UPDATE_INTERVAL
  Shadow       Schedd      15 min     SHADOW_QUEUE_UPDATE_INTERVAL

This means, while a job is running every 15 minutes the Scheduler gets an update on the job’s activity. Of course, state changes for the job, such as completion or eviction are immediately propagated.

In a pool with 40K running jobs, the rate of updates to the Scheduler is 40K / 15 minutes ~= 45 updates / second. With updates to the Scheduler’s machine coming in at 40K / 5 minutes ~= 134 updates / second.


What’s the size of a ad or update on the wire and in memory?

%d bloggers like this: