Archive for the ‘Grid’ Category

Quick walk with Condor: Looking at Scheduler performance

April 15, 2011

This was a simple walk to get a feel for what a single, out of the box, 7.6 condor_schedd could handle for a job load. There were about 1,500 dynamic slots and all jobs were 5 second sleeps.

It turns out that without errors in the system, a single Schedd can sustain at least a rate of 55 jobs per second.

Out of the box condor_schedd performance visualized with Cumin

Graphs courtesy Cumin and Ernie Allen‘s recent visualization of Erik Erlandson‘s new Schedd stats. This is a drill-down into a Scheduler.

Here’s what happened. I started submitting 25 jobs (queue 25) every 5 seconds. You can’t see this unfortunately, it is off the left side of the graphs. The submission, start and completion rates were all equal at 5 per second. Every five/ten/fifteen minutes, when I felt like it, I ramped that up a bit, by increasing the jobs per submit (queue 50 then 100 then 150 then 200) and the rate of submission (every 5 seconds then 2 seconds then 1). The scaling up matched my expectations. At 50 jobs every second, I saw 10 jobs submitted/started/completed per second. At 100 job, the rates were 20 per second. I eventually worked it up to rates about 50-55 per second.

Then we got to the 30 minute mark in the graphs. B shows that Shadows started failing. I let this run for about 10 minutes, the Schedd’s rates fluctuated down to between 45-50 jobs per second, and then kicked up the submission rate. The spike in submissions is quite visible to the right of A in the Rates graph.

At this point the Schedd was sustaining about 45 jobs per second and the Shadow exception rate was fairly sustained. I decided to kill off the submissions, also quite visible. The Schedd popped back up to 55 jobs per second and finished off its backlog.

A bit about the errors, simple investigation: condor_status -sched -long | grep ExitCode, oh a bunch of 108s; grep -B10 “STATUS 108” /var/log/condor/ShadowLog, oh a bunch of network issues and some evictions; pull out the hosts the Shadows were connecting to, oh mostly a couple nodes that were reported as being flakey; done.

Throughout this entire walk, the Mean start times turned out to be an interesting graph. It shows two things: 0) Mean time to start cumulative – the mean time between a job is queued to when it first starts running, over the lifetime of the Schedd; and, 1) Mean time to start – the same metric, but over an adjustable window, defaulting to 5 minutes. Until the exceptions started and when I blew the submissions out, around C, the mean queue time/wait time/time to start was consistently between 4 and 6 seconds. I did not look at the service rate on the back side of the job’s execution, e.g. how long it waited before termination was recognized, mail was sent, etc.

That’s about it. Though, remember this was done with about 1,500 slots. I didn’t vary the number of slots, say to see if the Schedd would have issues sustaining more than 1,500 concurrent jobs. I also didn’t do much in the way of calculations to compare the mean start time to the backlog (idle job count).

Also, if anyone tries this themselves, I would suggest notification=never in your submissions. It will prevent an email getting sent for each job, will save the Schedd and Shadow a good amount of work, and in my case would have resulted in a 389MB reduction in email.

Advertisements

Ordering Jobs: Job Priority and Submitters

April 13, 2011

Ordering jobs. The order jobs are put in for execution is scoped within a Schedd and then within a Submitter. A Submitter is the job’s Owner or AccountingGroup. This means that no order is specified between jobs of different submitters on a single Schedd, and jobs of even the same submitter on different Schedds are not ordered with respect to one another. Adjustments in the order does not impact already running jobs.

The default order operates similarly to FCFS. Jobs that are submitted earlier than others get a chance to run before later jobs. Specifically, jobs with a lower QDate run before those with a higher. If QDates are equal, jobs with a lower ClusterId run before those with a higher. If ClusterIds are equal, jobs with a lower ProcId run before those with a higher.

The order is similar to FCFS instead of exactly FCFS. A job that is not runnable, because it is on Hold or Requirements are not satisfied, will be skipped until it becomes runnable.

A job’s priority, its JobPrio attribute, provides a way to alter the default ordering. It is considered first, i.e. before QDate. If JobPrios are equal, order continues base on QDate etc. JobPrio works in the opposite direction from other ordering attribute. Jobs with a larger value run first, e.g. JobPrio = 1 runs before 0 which in turn runs before -1.

The JobPrio attribute of a job is controlled by the priority command in a submit file, and defaults to a value of 0. It can also be adjusted using the condor_prio command-line utility. If using a UserLog, the log command in a submit file, which you should be doing, you can see priority adjustments as they happen. Keep in mind that the JobPrio only helps order jobs to the point that they start running. Once running, adjustment to a lower priority will not cause them to stop and higher priority jobs to run.

Let’s get concrete. Observing execution order via a UserLog, using a single slot (NUM_CPUS=1), stopping the Negotiator, submitting jobs, then starting the Negotiation, let’s consider: 0) a single user submitting with priority 1) then adjusting with condor_prio, 2) multiple users submitting with priority, 3) a single user submitting with multiple AccountingGroups adjusting priority with condor_prio, 4) multiple users submitting with a single AccountingGroup adjusting priority with condor_prio.

The Negotiator stopping and starting is to make sure all jobs get considered at the same time. Without it, you may not get the same results.

NOTE: Endpoints, dates and hours anonymized to protect the guilty.


Zero: a single user submitting with priority

$ cat > zero.job << EOF
cmd = /bin/sleep
args = 1
log = zero.log
priority = -1
queue
priority = 0
queue
priority = 1
queue
EOF

$ condor_submit zero.job
Submitting job(s)...
3 job(s) submitted to cluster 1.

$ grep -e ^000 -e ^001 zero.log
000 (001.000.000) 03/17 01:40:55 Job submitted from host: 
000 (001.001.000) 03/17 01:40:55 Job submitted from host: 
000 (001.002.000) 03/17 01:40:55 Job submitted from host: 
001 (001.002.000) 03/17 01:41:08 Job executing on host: 
001 (001.001.000) 03/17 01:41:10 Job executing on host: 
001 (001.000.000) 03/17 01:41:13 Job executing on host: 

Success, execution of 1.2 then 1.1 then 1.0.


One: a single user submitting with priority, then adjusting with condor_prio

$ condor_off -negotiator

$ condor_submit -a log=one.log zero.job
Submitting job(s)...
3 job(s) submitted to cluster 1.

$ condor_prio 1.1 -p 3

$ condor_on -negotiator

$ grep -e ^000 -e ^001 -e ^033 one.log
000 (001.000.000) 03/17 01:46:40 Job submitted from host: 
000 (001.001.000) 03/17 01:46:40 Job submitted from host: 
000 (001.002.000) 03/17 01:46:40 Job submitted from host: 
033 (001.001.000) 03/17 01:47:02 Changing job attribute JobPrio from 0 to 3
001 (001.001.000) 03/17 01:47:50 Job executing on host: 
001 (001.002.000) 03/17 01:47:53 Job executing on host: 
001 (001.000.000) 03/17 01:47:55 Job executing on host: 

Success, priority change is visible for 1.1, and execution follows 1.1 then 1.2 then 1.0. Also, the change is priority for 1.1 is visible.


Two: multiple users submitting with priority

$ cat > two.job << EOF
cmd = /bin/sleep
args = 1
log = two.log
priority = -1
queue
priority = 0
queue
priority = 1
queue
EOF

$ condor_off -negotiator

userA $ condor_submit two.job
Submitting job(s)...
3 job(s) submitted to cluster 1.

userB $ condor_submit two.job
Submitting job(s)...
3 job(s) submitted to cluster 2.

$ condor_on -negotiator

userA $ grep -e ^000 -e ^001 two.log 
000 (001.000.000) 03/17 01:34:20 Job submitted from host: 
000 (001.001.000) 03/17 01:34:20 Job submitted from host: 
000 (001.002.000) 03/17 01:34:20 Job submitted from host: 
001 (001.002.000) 03/17 01:35:15 Job executing on host: 
001 (001.001.000) 03/17 01:35:17 Job executing on host: 
001 (001.000.000) 03/17 01:35:19 Job executing on host: 

userB $ grep -e ^000 -e ^001 two.log 
000 (002.000.000) 03/17 01:34:21 Job submitted from host: 
000 (002.001.000) 03/17 01:34:21 Job submitted from host: 
000 (002.002.000) 03/17 01:34:21 Job submitted from host: 
001 (002.002.000) 03/17 01:35:35 Job executing on host: 
001 (002.001.000) 03/17 01:35:38 Job executing on host: 
001 (002.000.000) 03/17 01:35:40 Job executing on host: 

Success, jobs ran in priority order and the priority is scoped to the user – you did not see 1.2 -> 2.2 -> 1.1 -> 2.1 -> 2.0 -> 1.0. Why though did 1.x run before 2.x? Because of a different type of priority: user priority.

$ condor_userprio
Last Priority Update:  3/17 01:36
                                    Effective
User Name                           Priority 
------------------------------      ---------
userA@localhost                          0.50
userB@localhost                          0.50
------------------------------      ---------
Number of users shown: 2                           

That means they can run in any order decided by the Negotiator. A topic for another time is how user priority works, but a hint is,

$ condor_userprio -setprio userA@localhost 100
The priority of userA@localhost was set to 100.000000

$ condor_userprio
Last Priority Update:  3/17 01:36
                                    Effective
User Name                           Priority 
------------------------------      ---------
userB@localhost                          0.50
userA@localhost                        100.00
------------------------------      ---------
Number of users shown: 2

$ condor_off -negotiator

userA $ condor_submit two.job
Submitting job(s)...
3 job(s) submitted to cluster 3.

userB $ condor_submit two.job
Submitting job(s)...
3 job(s) submitted to cluster 4.

$ condor_on -negotiator

userA $ grep -e ^000 -e ^001 two.log 
000 (003.000.000) 03/17 01:36:49 Job submitted from host: 
000 (003.001.000) 03/17 01:36:49 Job submitted from host: 
000 (003.002.000) 03/17 01:36:49 Job submitted from host: 
001 (003.002.000) 03/17 01:37:22 Job executing on host: 
001 (003.001.000) 03/17 01:37:24 Job executing on host: 
001 (003.000.000) 03/17 01:37:26 Job executing on host: 

userB $ grep -e ^000 -e ^001 two.log 
000 (004.000.000) 03/17 01:36:51 Job submitted from host: 
000 (004.001.000) 03/17 01:36:51 Job submitted from host: 
000 (004.002.000) 03/17 01:36:51 Job submitted from host: 
001 (004.002.000) 03/17 01:37:02 Job executing on host: 
001 (004.001.000) 03/17 01:37:04 Job executing on host: 
001 (004.000.000) 03/17 01:37:06 Job executing on host: 

Success, userB’s jobs ran in priority order before userA’s jobs.


Three: a single user submitting with multiple AccountingGroups adjusting priority with condor_prio

$ cat > three-a.job < three-b.job << EOF
cmd = /bin/sleep
args = 1
log = three.log
+AccountingGroup = "three.b"
priority = -3
queue
priority = 0
queue
priority = 3
queue
EOF

$ condor_off -negotiator

$ condor_submit three-a.job
Submitting job(s)...
3 job(s) submitted to cluster 1.

$ condor_submit three-b.job
Submitting job(s)...
3 job(s) submitted to cluster 2.

$ condor_prio 2.1 -p 1

$ condor_on -negotiator

$ grep -e ^000 -e ^001 -e ^033 three.log
000 (001.000.000) 03/17 01:56:15 Job submitted from host: 
000 (001.001.000) 03/17 01:56:15 Job submitted from host: 
000 (001.002.000) 03/17 01:56:15 Job submitted from host: 
000 (002.000.000) 03/17 01:56:17 Job submitted from host: 
000 (002.001.000) 03/17 01:56:17 Job submitted from host: 
000 (002.002.000) 03/17 01:56:17 Job submitted from host: 
033 (002.001.000) 03/17 01:56:57 Changing job attribute JobPrio from 0 to 1
001 (001.002.000) 03/17 01:57:08 Job executing on host: 
001 (001.001.000) 03/17 01:57:11 Job executing on host: 
001 (001.000.000) 03/17 01:57:13 Job executing on host: 
001 (002.002.000) 03/17 01:57:29 Job executing on host: 
001 (002.001.000) 03/17 01:57:31 Job executing on host: 
001 (002.000.000) 03/17 01:57:33 Job executing on host: 

Success, jobs ran in priority order and the priority is scoped to the AccountingGroup even though they are from the same user. Why though did 1.x run before 2.x? Because of group priority, same thing as user priority. Groups three.a and three.b have the same global priority, stored with the Negotiator.

$ condor_userprio 
Last Priority Update:  3/17  02:00
                                    Effective
User Name                           Priority 
------------------------------      ---------
three.a@localhost                     0.50
three.b@localhost                     0.50
------------------------------      ---------
Number of users shown: 2

That means they can run in any order decided by the Negotiator.

$ condor_userprio -setprio three.a@localhost 100
The priority of three.a@localhost was set to 100.000000

$ condor_userprio
Last Priority Update:  3/17  02:01
                                    Effective
User Name                           Priority
------------------------------      ---------
three.b@localhost                     0.50
three.a@localhost                   100.00
------------------------------      ---------
Number of users shown: 2

$ condor_off -negotiator

$ condor_submit three-a.job
Submitting job(s)...
3 job(s) submitted to cluster 3.

$ condor_submit three-b.job
Submitting job(s)...
3 job(s) submitted to cluster 4.

$ condor_on -negotiator

$ grep -e ^000 -e ^001 three.log
000 (003.000.000) 03/17 02:01:50 Job submitted from host: 
000 (003.001.000) 03/17 02:01:50 Job submitted from host: 
000 (003.002.000) 03/17 02:01:50 Job submitted from host: 
000 (004.000.000) 03/17 02:01:51 Job submitted from host: 
000 (004.001.000) 03/17 02:01:51 Job submitted from host: 
000 (004.002.000) 03/17 02:01:51 Job submitted from host: 
001 (004.002.000) 03/17 02:01:58 Job executing on host: 
001 (004.001.000) 03/17 02:02:00 Job executing on host: 
001 (004.000.000) 03/17 02:02:03 Job executing on host: 
001 (003.002.000) 03/17 02:02:18 Job executing on host: 
001 (003.001.000) 03/17 02:02:20 Job executing on host: 
001 (003.000.000) 03/17 02:02:23 Job executing on host: 

Success, three.b’s jobs ran in priority order before three.a’s jobs.


Four: multiple users submitting with a single AccountingGroup adjusting priority with condor_prio

$ cat > four.job << EOF
cmd = /bin/sleep
args = 1
log = four.log
+AccountingGroup = "four.user"
priority = -1
queue
priority = 0
queue
priority = 1
queue
EOF

$ condor_off -negotiator

userA $ condor_submit four.job
Submitting job(s)...
3 job(s) submitted to cluster 1.

userB $ condor_submit four.job
Submitting job(s)...
3 job(s) submitted to cluster 2.

$ condor_q
-- Submitter: localhost :  : localhost
 ID      OWNER            SUBMITTED     RUN_TIME ST PRI SIZE CMD               
   1.0   userA           3/17  02:05   0+00:00:00 I  -1  0.0  sleep 1
   1.1   userA           3/17  02:05   0+00:00:00 I  0   0.0  sleep 1
   1.2   userA           3/17  02:05   0+00:00:00 I  1   0.0  sleep 1
   2.0   userB           3/17  02:05   0+00:00:00 I  -1  0.0  sleep 1
   2.1   userB           3/17  02:05   0+00:00:00 I  0   0.0  sleep 1
   2.2   userB           3/17  02:05   0+00:00:00 I  1   0.0  sleep 1
6 jobs; 6 idle, 0 running, 0 held

userB $ condor_prio 2.1 -p 2
userB $ condor_prio 2.2 -p 3
userA $ condor_prio 1.0 -p -2

$ condor_q
-- Submitter: localhost :  : localhost
 ID      OWNER            SUBMITTED     RUN_TIME ST PRI SIZE CMD               
   1.0   userA           3/17  02:05   0+00:00:00 I  -2  0.0  sleep 1           
   1.1   userA           3/17  02:05   0+00:00:00 I  0   0.0  sleep 1           
   1.2   userA           3/17  02:05   0+00:00:00 I  1   0.0  sleep 1           
   2.0   userB           3/17  02:05   0+00:00:00 I  -1  0.0  sleep 1           
   2.1   userB           3/17  02:05   0+00:00:00 I  2   0.0  sleep 1           
   2.2   userB           3/17  02:05   0+00:00:00 I  3   0.0  sleep 1           
6 jobs; 6 idle, 0 running, 0 held

$ condor_on -negotiator

userA $ grep -e ^000 -e ^001 -e ^033 four.log
000 (001.000.000) 03/17 02:05:42 Job submitted from host: 
000 (001.001.000) 03/17 02:05:42 Job submitted from host: 
000 (001.002.000) 03/17 02:05:42 Job submitted from host: 
033 (001.000.000) 03/17 02:06:14 Changing job attribute JobPrio from -1 to -2
001 (001.002.000) 03/17 02:06:24 Job executing on host: 
001 (001.001.000) 03/17 02:06:26 Job executing on host: 
001 (001.000.000) 03/17 02:06:29 Job executing on host: 

userB $ grep -e ^000 -e ^001 -e ^033 four.log
000 (002.000.000) 03/17 02:05:44 Job submitted from host: 
000 (002.001.000) 03/17 02:05:44 Job submitted from host: 
000 (002.002.000) 03/17 02:05:44 Job submitted from host: 
033 (002.001.000) 03/17 02:05:50 Changing job attribute JobPrio from 0 to 2
033 (002.002.000) 03/17 02:05:57 Changing job attribute JobPrio from 1 to 3
001 (002.002.000) 03/17 02:06:21 Job executing on host: 
001 (002.001.000) 03/17 02:06:23 Job executing on host: 
001 (002.000.000) 03/17 02:06:28 Job executing on host: 

Success, execution went 2.2 (06:21) -> 2.1 (06:23) -> 1.2 (06:24) -> 1.1 (06:26) -> 2.0 (06:28) -> 1.0 (06:29).

Condor Week 2010

April 25, 2010

Condor Week 2010 concluded about a week ago. As always, it included interesting talks from academic and corporate users about the advances they are making enabled by Condor. It also had updates on development over the past year both at UW and Red Hat, and plans for the future.

I got to present an update on what Red Hat is doing with Condor, including configuration management, fostering transparency, performance enhancements, cloud integration, and new features.

Will Benton presented his and Rob Rati’s work on configuration management with Wallaby. Definitely something to keep an eye on.

With all the interesting talks there are many opportunities to see great advances. I found Greg Thain’s talk on High Throughput Parallel Computing (HTPC) especially interesting as it marks completion of a cycle in how resources are modeled in a batch scheduler.

Negotiation cycle statistics

March 29, 2010

If you have ever wondered how your Negotiator is doing, you may be interested in this AWK script. It summarizes negotiation cycles by reading NegotiatorLog output.

You can either pass it your NegotiatorLog or, if you want to only summarize recent cycles, combine tail -n and a pipe.

#!/usr/bin/awk -f

function parse_time(string) {
   return mktime(gensub(/([^/]*)\/([^ ]*) ([^:]*):([^:]*):([^ ]*) .*/,
                        "1984 \\1 \\2 \\3 \\4 \\5", "g"))
}

BEGIN { started = 0; finished = 0 }

/Started Negotiation Cycle/ {
   started = parse_time($0)
#   if (finished) print "Delay:", started - finished
   finished = 0; matched = 0; rejected = 0; submitters = 0; slots = 0
}

/Matched/ {
   matched += 1
}

/Rejected/ {
   rejected += 1
}

/Public ads include .* submitter, .* startd/ {
   submitters = $6
   slots = $8
}

/Finished Negotiation Cycle/ {
   finished = parse_time($0)
   if (!started) next #{ print "Skipping first cycle"; next }
#   if (!matched) next #{ print "Skipping cycle with no matches"; next }
   duration = finished - started
   if (!duration) next # { print "Skipping zero second cycle"; next }
   print strftime("%m/%d %T", started), "::",
       matched, "matches in",
       duration, "seconds",
       "(" matched / duration "/s) with",
       rejected, "rejections,",
       submitters, "submitters,",
       slots, "slots"
}

END {
   #if (!finished) print "Skipping last cycle"
}

Condor’s debug logs do not include a year or timezone in their timestamp, so cycles that span years or daylight savings periods will produce bogus results.

Migrating workflows between Schedds with DAGMan

March 15, 2010

In a distributed system, migration of resources can happen at numerous levels. For instance, in the context of Condor, migration could be of running job processes between execution machines, jobs between Schedd’s, Schedds between machines, Negotaitors between machines. Here the focus is on workflows between Schedds. A workflow is a directed acyclic graph (DAG) of jobs managed by DAGMan.

DAGMan at its simplest is a process, condor_dagman, that is submitted and run as a job. It typically runs under the Schedd in what’s known as the Scheduler Universe. Different universes in Condor provide different contracts with jobs. The Scheduler Universe lets a job run local to the Schedd and provides certain signaling guarantees in response to condor_hold/release/rm. DAGMan is written to have all its state persisted in either job execution log files (UserLogs) or its input file, a DAG. It is highly tolerant of faults in the execution of jobs it is managing, and in its own execution.

Migration of a workflow between Schedds amounts to moving the condor_dagman job running a workflow between Schedds. The Schedd does not support migration of jobs between Schedds. However, since DAGMan keeps no state it cannot reconstruct, it can be logically migrated through removal (condor_rm) and re-submission (condor_submit_dag).

For instance,

$ _CONDOR_SCHEDD_NAME=ScheddA@ condor_submit_dag migration.dag
Submitting jobs(s).
-----------------------------------------------------------------------
File for submitting this DAG to Condor           : migration.dag.condor.sub
Log of DAGMan debugging messages                 : migration.dag.dagman.out
Log of Condor library output                     : migration.dag.lib.out
Log of Condor library error messages             : migration.dag.lib.err
Log of the life of condor_dagman itself          : migration.dag.dagman.log

Submitting job(s).
Logging submit event(s).
1 job(s) submitted to cluster 1.
-----------------------------------------------------------------------

$ condor_q -dag -global
-- Schedd: ScheddA@ : 
 ID      OWNER/NODENAME   SUBMITTED     RUN_TIME ST PRI SIZE CMD               
   1.0   matt            3/15 10:39   0+00:00:04 R  0   1.7  condor_dagman -f -
  11.0    |-M0           3/15 10:39   0+00:00:00 I  0   0.0  sleep 19
  21.0    |-M1           3/15 10:39   0+00:00:00 I  0   0.0  sleep 80
3 jobs; 2 idle, 1 running, 0 held

$ condor_rm -name ScheddA@ 1.0   
Job 1.0 marked for removal

$ _CONDOR_SCHEDD_NAME=ScheddB@ condor_submit_dag migration.dag
Running rescue DAG 1
-----------------------------------------------------------------------
File for submitting this DAG to Condor           : migration.dag.condor.sub
Log of DAGMan debugging messages                 : migration.dag.dagman.out
Log of Condor library output                     : migration.dag.lib.out
Log of Condor library error messages             : migration.dag.lib.err
Log of the life of condor_dagman itself          : migration.dag.dagman.log

Submitting job(s).
Logging submit event(s).
1 job(s) submitted to cluster 2.
-----------------------------------------------------------------------

$ condor_q -dag -global
-- Schedd: ScheddB@ : 
 ID      OWNER/NODENAME   SUBMITTED     RUN_TIME ST PRI SIZE CMD               
   2.0   matt            3/15 10:44   0+00:00:04 R  0   1.7  condor_dagman -f -
  12.0    |-M00          3/15 10:45   0+00:00:03 R  0   0.0  sleep 19
  22.0    |-M11          3/15 10:45   0+00:00:02 R  0   0.0  sleep 84
3 jobs; 0 idle, 3 running, 0 held

There are two important things going on for this to work. First, _CONDOR_SCHEDD_NAME, a way of setting the configuration parameter SCHEDD_NAME from a shell environment, specifies where condor_submit_dag will submit the workflow, and, because it is recorded in the condor_dagman job’s environment, where all the jobs that condor_dagman submits will go. This is important because DAGMan only tracks job ids, not job id + schedd name.

Second, the job ids, 1.0 11.0 21.0 and 2.0 12.0 22.0. As just mentioned, DAGMan only keeps track of job ids. This means that the Schedds cannot have overlapping job id spaces. To achieve this use SCHEDD_CLUSTER_INITIAL_VALUE and SCHEDD_CLUSTER_INCREMENT_VALUE. Give each Schedd a unique initial cluster value, and set the cluster increment value to one more than the number of Schedds.

Additionally, condor_hold on the DAGMan job will prevent it from submitting new nodes and allow existing ones to complete. Useful for draining off a submission during migration.

Node local limiters, e.g. GPUs

February 21, 2010

Condor supports pool-wide resource limiters called Concurrency Limits. They allow administrators and users to manage pool-wide consumable resources, e.g. software licenses, db connections, pool load, etc.

A common request on condor-users is for a feature related to node-wide limiters. The common solution is through node configuration. For instance, limiting one GPU job on a machine,

STARTD_ATTRS = SLOT1_GPU_COUNT, SLOT2_GPU_COUNT, SLOT3_GPU_COUNT, SLOT4_GPU_COUNT, GPU_COUNT
STARTD_JOB_EXPRS = GPU
STARTD_SLOT_ATTRS = GPU

SLOT1_GPU_COUNT = ifThenElse(slot1_GPU =?= UNDEFINED, 0, 1)
SLOT2_GPU_COUNT = ifThenElse(slot2_GPU =?= UNDEFINED, 0, 1)
SLOT3_GPU_COUNT = ifThenElse(slot3_GPU =?= UNDEFINED, 0, 1)
SLOT4_GPU_COUNT = ifThenElse(slot4_GPU =?= UNDEFINED, 0, 1)

GPU_COUNT = (SLOT1_GPU_COUNT + SLOT2_GPU_COUNT + SLOT3_GPU_COUNT + SLOT4_GPU_COUNT)

START = GPU_COUNT < 1

Then in a job submit file,

+GPU = "This job consumes a GPU resource"

This configuration works fairly well, but has three issues:

1) Job execution requires two steps between a Schedd and Startd, a CLAIM followed by ACTIVATE. The STARTD_SLOT_ATTRS is spread across slots after CLAIM and before ACTIVATE. Two GPU CLAIMs could succeed but fail to ACTIVATE because starting them would exceed the limit of one. This can result in thrashing and no forward progress if GPU jobs repeated get matched to an execute node and then rejected. This prevents forward progress.

2) To help avoid (1), jobs can be trickled in. However, when trickling in GPU jobs, slots not matched get STARTD_SLOT_ATTRS but are not re-advertised. The Negotiator ends up with a lagged view of slots, and will hand out matches that will be rejected at CLAIM time. This hurts throughput.

3) When jobs exit or are removed, only the slot that was running the job is re-advertised. Similar to (2), the result is a Negotiator with lagged state, except instead of hanging out matches that will be rejected, the Negotiator fails to hand out matches at all. This hurts throughput.

Solutions:

1) Using Dynamic Slots can address the thrashing problem in (1), at least as of 7.4.2 where a STARTD_SLOT_ATTRS issue was resolved. The dynamic slots will naturally control the rate at which jobs are matched with a node, preventing the possibility of thrashing. Of course, with dynamic slots it will take more than a single negotiation cycle to fill a multi-core machine. Also, possibly (3) below, which would have the same slow fill on multi-core machines problem, with added overall load.

2) Publish slots when they gain an attribute via STARTD_SLOT_ATTRS.

3) Publish slots when an attribute gained by STARTD_SLOT_ATTRS is removed from its source slot.

Setup a two node Condor pool (Red Hat KB)

January 14, 2010

I have been meaning to write a short getting-started tutorial about installing a multiple-node Condor pool, but it just hasn’t happened yet. Lucky for everyone, Jon Thomas at Red Hat has written one. Check out his How do I install a basic two node MRG grid?, which covers installation on Linux very nicely.

Custom ClassAd attributes in Condor: FreeMemoryMB via STARTD_CRON

November 17, 2009

ClassAds enable a lot of power in Condor. ClassAds are the data representation used for most everything in Condor. Generally, a ClassAd is a set of name-value pairs. Values are typed, and include the usual suspects plus expressions. In Condor, jobs, users, slots, daemons, etc. all have a ClassAd representation with their own set of attributes, e.g. a job has a Requirements attribute whose value is an expression such as FreeMemoryMB > 1024, and a slot has a Memory attribute whose value is the amount of memory allocated to the slot.

ClassAds are schema-free, which means they can be arbitrarily extended. Any given ad can have any attribute a user or administrator wants to put in it. Attributes are given meaning by when and where they are referenced.

On a slot there are different classes of attributes. Resource statistics, such as Disk and LoadAvg; resource properties, such as NumCpus and HasVM; policy expressions, such as Requirements or Rank; etc.

One attribute not provided by default on a slot is a statistic representing the total amount of free memory on the system. There are multiple interpretations of what free memory might really mean. On Linux, it could be the number in the free column on the Mem: row reported by the free program, e.g.

$ free -m
             total       used       free     shared    buffers     cached
Mem:          2016       1790        225          0         93        845
-/+ buffers/cache:        851       1165
Swap:         1983          0       1983

or it might be the value on the buffers/cache line. It might be some combination of totalram, freeram, sharedram, bufferedram as reported by sysinfo(2). It might also include information about totalswap and freeswap, also from sysinfo(2).

The meaning is often a function of what kinds of policies are desired in a Condor deployment.

Once you have picked a meaning for your deployment, Condor provides you with the STARTD_CRON mechanism to include a FreeMemoryMB attribute in your slot ads. From there the attribute can be referenced by policy on jobs, during negotiation, and slot policy.

You need two things: first, a way to calculate the value for FreeMemoryMB, we’ll use the simple bash script below that pulls FreeMem: out of /proc/meminfo; second, configuration available to the condor_startd to run the program.

free_memory_mb.sh:

#!/bin/sh

FREE_MEMORY_KB=$(grep ^MemFree < /proc/meminfo | awk '{print $2}')
echo "FreeMemoryMB = $((FREE_MEMORY_KB / 1024))"

condor_config.local:

STARTD_CRON_JOBLIST = FREE_MEMORY_MB
STARTD_CRON_FREE_MEMORY_MB_EXECUTABLE = $(LIBEXEC)/free_memory_mb.sh
STARTD_CRON_FREE_MEMORY_MB_PERIOD = $(UPDATE_INTERVAL)s

Notes: First, the documentation is out of sync with 7.4.0 and the units on _PERIOD must be specified; second, UPDATE_INTERVAL needs to be defined, it specifies how often the condor_startd will periodically send updates to the Collector.

After reconfiguring the Startd, or just restarting condor, you can view the new attribute with condor_status:

$ condor_status -long  | grep ^FreeMemoryMB | sort | uniq -c
     10 FreeMemoryMB = 174

$ free -m
             total       used       free     shared    buffers     cached
Mem:          2016       1842        173          0        101        874
-/+ buffers/cache:        866       1149
Swap:         1983          0       1983

Yes, the values are different between FreeMemoryMB and free. The amount of free memory is changing constantly and we are just sampling it. You can increase the sampling rate, but beware that means you will generate more frequent updates to the Collector. Maybe not a problem when you have 32 machines and 256 slots, but definitely something to consider when you have 3000 machines and 24000 slots.

Final note: A better name for the attribute representing free memory on a system is TotalFreeMemoryMB to remain consistent with other attributes. For instance, Disk is a slot’s share of the TotalDisk free on the system.

Submitting a workflow to Condor via SOAP using Java

November 2, 2009

In Condor, a workflow is called a DAG. DAG stand for directed acyclic graph. DAGs in Condor are managed by processes called condor_dagman. condor_dagman is a program that takes a description of a DAG as input and walks it; submitting jobs and monitoring their progress. The condor_dagman process itself is often submitted to Condor and managed by the Schedd like any other job. The only thing special about condor_dagman jobs is that they are run on the same machine as the Schedd. Typically in Condor’s Scheduler Universe. Historical note: the first job submitted to Condor via SOAP was a DAG.

Here’s a recipe for submitting a DAG to Condor via SOAP:

0) Get your favorite SOAP library. Here we’ll use Apache Axis: http://www.apache.org/dyn/closer.cgi/ws/axis/1_4

You’ll want to untar it:

$ tar zxf axis-bin-1_4.tar.gz

1) Generate and compile your SOAP library code

# To avoid passing -classpath to java and javac, set CLASSPATH in your env
$ export CLASSPATH=.:$(echo axis-1_4/lib/*.jar | tr ' ' ':')

# Have Axis generate Java stubs
$ java org.apache.axis.wsdl.WSDL2Java file:///usr/share/condor/webservice/condorSchedd.wsdl

# Compile the stubs
$ javac condor/*.java

2) Write a DAG. This one is pretty simple, it’s a diamond: D depends on B and C, B and C depend on A:

~/dagman/diamond.dag:
Job  A  A.submit 
Job  B  B.submit 
Job  C  C.submit        
Job  D  D.submit
PARENT A CHILD B C
PARENT B C CHILD D

~/dagman/A.submit:
executable = /bin/sleep
arguments = 112
output = A.out
log = diamond.log
queue

~/dagman/B.submit:
executable = /bin/sleep
arguments = 358
output = B.out
log = diamond.log
queue

~/dagman/C.submit:
executable = /bin/sleep
arguments = 132
output = C.out
log = diamond.log
queue

~/dagman/D.submit:
executable = /bin/sleep
arguments = 134
output = D.out
log = diamond.log
queue

3) Write a program that will submit your DAG via SOAP.

The Schedd’s Submit() function takes a ClassAd representing a job. What condor_submit does is take a submit file and convert it into a ClassAd. To look at a submit file for a DAG run condor_submit_dag -no_submit diamond.dag and read diamond.dag.condor.sub. That will give you a hint at what kind of environment condor_dagman wants to run in. Note: remove_kill_sig, arguments, environment and on_exit_remove.

However, diamond.dag.condor.sub is not a ClassAd. To see the ClassAd you can run diamond.dag.condor.sub through condor_submit. Do so with condor_submit -dump diamond.dag.condor.sub.ad diamond.dag.condor.sub. Have a look at diamond.dag.condor.sub.ad and note RemoveKillSig, Arguments, Env and OnExitRemove.

Now you have the basis for what a DAG job needs to run. In the example code I used a little extra knowledge to generate the ClassAd, for brevity. You can use the Schedd’s CreateJobTemplate function to help generate the ClassAd for you instead. Extending arrays is kinda annoying in Java, but cake in python.

Start with this example: CondorSubmitDAG.java

4) Configure your condor_schedd to accept SOAP requests. The basic configuration you will need is:

ENABLE_SOAP = TRUE
ALLOW_SOAP = *
QUEUE_ALL_USERS_TRUSTED = TRUE
SCHEDD_ARGS = -p 1984

This configuration lets anyone talk to your Schedd and submit jobs as any user. For deployment, you should restrict this access by narrowing the ALLOW_SOAP and by setting QUEUE_ALL_USERS_TRUSTED to FALSE. Note: changing QUEUE_ALL_USERS_TRUSTED requires that clients can authenticate themselves via SSL.

This configuration also gives you a fixed port, 1984, for the condor_schedd. Otherwise the port is ephemeral, and you’ll have to query the Collector to find it.

5) Compile your submission program and submit your DAG.

$ javac CondorSubmitDAG.java

$ java CondorSubmitDAG http://localhost:1984 soapmonkey /some/shared/space/where/you/put/dagman/diamond.dag

You can watch the DAG run with condor_q, and condor_q -dag. Don’t be afraid of the —????— for the DAG itself, that’s just because I pruned the job ad to the bare minimum to run. condor_q expects a few extra attributes to be present. If you use CreateJobTemplate you’ll get all the attributes condor_q wants.

condor_master for managing processes

October 21, 2009
#!/usr/bin/env python

from socket import socket, htons, AF_INET, SOCK_DGRAM
from array import array

def send_alive(pid, timeout=300, master_host="127.0.0.1", master_port=1271):
    """
    Send a UDP packet to the condor_master containing the
    DC_CHILDALIVE command.

    This will have the master register a trigger to fire in timeout
    seconds. When the trigger fires the pid will be killed. Each time
    the DC_CHILDALIVE is sent the trigger's timer is reset to fire in
    timeout seconds.

    DC_CHILDALIVE should be sent every timeout/3 seconds.
    """
    sock = socket(AF_INET, SOCK_DGRAM)
    sock.sendto(build_message(pid, timeout), (master_host, master_port))

def build_message(pid, timeout):
    """
    Build a datagram packet to send to the condor_master.

    The package format is (command, pid, timeout). The command is
    always DC_CHILDALIVE (the integer 60008). The pid is the pid of
    the process the master is monitoring, i.e. getpid if this
    script. The timeout is the amount of time, in seconds, the master
    will wait before killing the pid. Each field in the packet must be
    8 bytes long, thus the padding.
    """
    DC_CHILDALIVE = 60008

    message=array('H')
    message.append(0); message.append(0); message.append(0) # padding
    message.append(htons(DC_CHILDALIVE))
    message.append(0); message.append(0); message.append(0) # padding
    message.append(htons(pid))        
    message.append(0); message.append(0); message.append(0) # padding
    message.append(htons(timeout))

    return message.tostring()

#
# The condor_master is a daemon that can run arbitrary executables,
# monitor them for failures, restart them, watch for executable
# updates, and send obituary emails.
#
# The condor_master can monitor any program it starts for abnormal
# termination, e.g. return code != 0 or caused by a signal. It can
# also detect hung processes if they periodically send DC_CHILDALIVE
# commands.
#
# The condor_master cleans up process trees, not just the processes it
# directly started.
#
# Example usage...
#
# Run this script from the condor_master, e.g.
#  env CONDOR_CONFIG=ONLY_ENV \
#      _CONDOR_WANT_UDP_COMMAND_SOCKET=TRUE \
#      _CONDOR_NETWORK_INTERFACE=127.0.0.1 \
#      _CONDOR_MASTER_LOG=master.log \
#      _CONDOR_MASTER=$(which condor_master) \
#      _CONDOR_PROG=$(which <this script>) \
#      _CONDOR_PROG_ARGS="-some -args 3" \
#      _CONDOR_DAEMON_LIST=MASTER,PROG \
#      condor_master -p 1271 -pidfile master.pid
#
# At some point kill -STOP or -TERM this script and watch the
# condor_master react.
#
# condor_master configuration/features:
#
#  CONDOR_CONFIG=ONLY_ENV
#    - useful, only look in the ENV for config, no config file needed
#
#  _CONDOR_WANT_UDP_COMMAND_SOCKET=TRUE
#    - required, make master listen for UDP commands
#
#  _CONDOR_NETWORK_INTERFACE=<ip>
#    - useful, make master listen on <ip>
#
#  _CONDOR_MASTER_LOG=<file>
#    - required, the master's log file
#
#  _CONDOR_MASTER_DEBUG=<level>
#    - D_ALWAYS is default D_FULLDEBUG shows a lot more
#
#  _CONDOR_MASTER=<path to condor_master>
#    - required, master will restart itself if its executable is
#      updated
#
#  _CONDOR_PROG=<path to this script>
#    - required, the path to an executable the master will start and
#      monitor
#
#  _CONDOR_PROG_ARGS=<arguments>
#    - useful, if the executable needs the master to pass it arguments
#
#  _CONDOR_DAEMON_LIST=MASTER,PROG
#    - required, list of executables the master will monitor
#
#  _CONDOR_MAIL=/bin/mail
#    - useful, no default, if email notification of PROG hang/crash is
#      desired. don't specify and get no emails.
#
#  _CONDOR_CONDOR_ADMIN=admin@fqdn
#    - useful, the address to send emails to
#
#  _CONDOR_MASTER_BACKOFF_CONSTANT, C, default 9 seconds
#  _CONDOR_MASTER_BACKOFF_FACTOR, K, default 2 seconds
#  _CONDOR_MASTER_BACKOFF_CEILING, T, default 3600 seconds
#  _CONDOR_MASTER_RECOVER_FACTOR, default 500 seconds
#    - useful, parameters to control the exponential backoff
#      start delay = min(T, C + K^n),
#        n is the # of restarts since last recovery
#        a recovery is a run of RECOVER_FACTOR without a crash
#
#  _CONDOR_MASTER_CHECK_NEW_EXEC_INTERVAL=<seconds>
#    - useful, default 300, how often to check executable timestamps
#      and potentially restart processes
#
#  _CONDOR_MASTER_NEW_BINARY_DELAY=<seconds>
#    - useful, default 120, time waited after noticing a timestamp
#      change before restarting the executable
#
#  condor_master -p <port> -pidfile <file>
#    - specify a port for the master to listen on, default is
#      ephemerial, and specify a file where the master writes its pid,
#      default is nowhere
#      also: -t (log to stdout) -f (run in foreground)
#
# Advanced note: condor_squawk can send DC_CHILDALIVE as well, e.g.
#    $ echo "command DC_CHILDALIVE 1234 15" | condor_squawk "<127.0.0.1:1271>"
#

import time, os

timeout = 30
while True:
    send_alive(os.getpid(), timeout)
    time.sleep(timeout/3)

%d bloggers like this: