Archive for the ‘Qpid’ Category

Service as a Job: The Qpid C++ Broker

May 18, 2010

Yes. A service as a job! Why? Three quick reasons: 1) dynamic, even on-demand/opportunistic, deployment of the service, 2) policy driven control of the service’s execution, 3) abstraction for interacting with service life-cycle

Condor provides strong management, deployment and policy features around what it calls jobs. Jobs come in all shapes and sizes – from those that run for less than a minute (probabilistic simulations) to those that run for months (VMs holding developer desktops) or those that use large amounts of disk or network I/O to those that use large amounts of CPU and memory.

Definitely in that spectrum you’ll find common services, be they full LAMP stacks in VMs or just the Apache HTTP server. Here’s an example of the Qpid broker (qpidd), a messaging service, as a job.

The job description is what you submit with condor_submit:

cmd = qpidd.sh
error = qpidd.log

kill_sig = SIGTERM

# Want chirp functionality
+WantIOProxy = TRUE

queue

It specifies the job, or in this case the service, to run is qpidd.sh, and that SIGTERM should be used to shut it down. qpidd.sh wraps the actual execution of qpidd for one important reason: advertising the qpidd’s endpoint. qpidd will start up on port 5672 by default. That’s all well and good, unless you want to run more than one qpidd on a single machine. qpidd.sh start qpidd up on an ephemeral port, which qpidd kindly prints to stdout, and then advertises the chosen port number back into the Schedd’s queue via condor_chirp, which is available when the job specifies WantIOProxy = TRUE.

#!/bin/sh

# qpidd lives in /usr/sbin,
# condor_chirp in /usr/libexec/condor
export PATH=$PATH:/usr/sbin:/usr/libexec/condor

# When we get SIGTERM, which Condor will send when
# we are kicked, kill off qpidd.
function term {
    rm -f port.out
    kill %1
}

# Spawn qpidd, and make sure we can shut it down cleanly.
rm -f port.out
trap term SIGTERM
# qpidd will print the port to stdout, capture it,
# no auth required, don't read /etc/qpidd.conf,
# log to stderr
qpidd --auth no \
      --config /dev/null \
      --log-to-stderr yes \
      --no-data-dir \
      --port 0 \
      1> port.out &

# We might have to wait for the port on stdout
while [ ! -s port.out ]; do sleep 1; done
PORT=$(cat port.out)
rm -f port.out

# There are all sorts of useful things that could
# happen here, such as setting up queues with
# qpid-config
#...

# Record the port number where everyone can see it
condor_chirp set_job_attr QpiddEndpoint \"$HOSTNAME:$PORT\"

# Nothing more to do, just wait on qpidd
wait %1

In action –

$ condor_submit qpidd.sub
Submitting job(s).
1 job(s) submitted to cluster 2.

$ condor_q
-- Submitter: woods :  : woods
 ID      OWNER            SUBMITTED     RUN_TIME ST PRI SIZE CMD               
   2.0   matt            5/18 14:21   0+00:00:03 R  0   0.0  qpidd.sh          
1 jobs; 0 idle, 1 running, 0 held

$ condor_q -format "qpidd running at %s\n" QpiddEndpoint
qpidd running at woods:58335

$ condor_hold 2
Cluster 2 held.
$ condor_release 2
Cluster 2 released.

$ condor_q
-- Submitter: woods :  : woods
 ID      OWNER            SUBMITTED     RUN_TIME ST PRI SIZE CMD               
   2.0   matt            5/18 14:21   0+00:00:33 I  0   73.2 qpidd.sh          
1 jobs; 1 idle, 0 running, 0 held

$ condor_reschedule 
Sent "Reschedule" command to local schedd

$ condor_q         
-- Submitter: woods :  : woods
 ID      OWNER            SUBMITTED     RUN_TIME ST PRI SIZE CMD               
   2.0   matt            5/18 14:21   0+00:00:38 R  0   73.2 qpidd.sh          
1 jobs; 0 idle, 1 running, 0 held

$ condor_q -format "qpidd running at %s\n" QpiddEndpoint
qpidd running at woods:54028

$ condor_rm -a
All jobs marked for removal.

$ condor_submit qpidd.sub
Submitting job(s).
1 job(s) submitted to cluster 9.

$ condor_submit qpidd.sub                               
Submitting job(s).
1 job(s) submitted to cluster 10.

$ condor_submit qpidd.sub                               
Submitting job(s).
1 job(s) submitted to cluster 11.

$ lsof -i | grep qpidd
qpidd     14231 matt    9u  IPv4 92241655       TCP *:50060 (LISTEN)
qpidd     14256 matt    9u  IPv4 92242129       TCP *:50810 (LISTEN)
qpidd     14278 matt    9u  IPv4 92242927       TCP *:34601 (LISTEN)

$ condor_q -format "qpidd running at %s\n" QpiddEndpoint
qpidd running at woods:34601
qpidd running at woods:50810
qpidd running at woods:50060

Submitting jobs with AMQP to a Condor based Grid

March 8, 2009

I recently took MRG Grid’s low-latency feature for a spin. It’s basically a way to use AMQP to submit jobs onto Condor managed nodes, but without going through normal scheduling, called matchmaking. The jobs are AMQP messages, and they don’t go to a Condor scheduler (a Schedd) to be matched with an execute node (a Startd). They go directly to the execute nodes, and they can go there really quickly.

This all works because Condor nodes can operate in a very autonomous fashion. Even when traditional jobs get matched with a node by the global scheduler (the Negotiator), the node always has the final say about if it will run the matched job or not. This has all sorts of benefits in a distributed system, but for low-latency it means that an execute node can take jobs from the Condor scheduler or from some other source. That other source here is an AMQP message queue.

To get going I setup the low-latency feature as described in MRG Grid User Guide: Low Latency Scheduling with the help of condor_configure_node. Since I was doing this using MRG I also used the Qpid implementation of AMQP. Qpid comes in the qpidc package, and in qpidc-devel I found some really helpful examples – the direct example and request-response client were all I needed.

The code’s flow is pretty natural…

Setup the AMQP queues

From the low-latency config (carod.conf) I know that jobs are read from a queue named grid. So the first thing I need to do is setup a way to send messages to the grid queue.

   session.queueDeclare(arg::queue="grid");
   session.exchangeBind(arg::exchange="amq.direct",
                        arg::queue="grid",
                        arg::bindingKey="grid_key");

Next, I want to make sure there’s a way for result to get back to me, so I setup my own unique queue where I can receive results.

   session.queueDeclare(arg::queue="my_grid_results",
				 // Automatically delete the queue
				 // when finished
			 arg::exclusive=true, arg::autoDelete=true);
   session.exchangeBind(arg::exchange="amq.direct",
                        arg::queue="my_grid_results",
                        arg::bindingKey="my_grid_results_key");

Construct the Condor job

With the queues all setup, I need to construct a job. In Condor all jobs are represented as ClassAds, which is basically a bunch of (attribute,value) pairs. The low-latency code needs a job in a similar form, and does so with the message’s application headers.

   message.getHeaders().setString("Cmd", "\"/bin/echo\"");
   message.getHeaders().setString("Arguments", "\"Hello Grid!\"");
   message.getHeaders().setString("Out", "\"job.out\""); // stdout
   message.getHeaders().setString("Err", "\"job.err\""); // stderr
   message.getHeaders().setString("Requirements", "TRUE");

Immediately, it looks like crazy quoting going on here, and there is. In ClassAds, the values of attributes are typed in expected ways, e.g. ints floats strings, but there is also an expression type that can be evaluated to a boolean. An expression is something like FALSE, ClusterId > 15, ((KeyboardIdle 2 * 60) && (CurrentTime - JobStart) > 90)) or regexp(".*mf.*", Name). So, to distinguish between a string and an expression, strings are quoted and expressions are not, e.g. "/bin/echo" is a string and TRUE is an expression.

By the way, at first I thought I’d need to specify the MyType, TargetType, Owner, User, In, JobUniverse, … attributes found on traditional jobs, but I discovered MyType, TargetType, and In were superfluous, and JobUniverse has a sane default that can run /bin/echo, or essentially anything. I also didn’t care about the identify of the job owner for accessing resources, so I skipped the Owner and User attributes. The job just ran as the nobody user on the execute node.

One thing that I did on purpose was set the Requirements to TRUE, which means that job is happy to run anywhere. I can do that because I know all the nodes I setup to run jobs have /bin/echo.

Send and receive messages

There are a few final steps to do to setup the job message before it can be sent. It needs a unique message id, required by the low-latency feature.

   message.getMessageProperties().setMessageId(Uuid(true));

And, it needs to pass along information about where to send results.

   message.getMessageProperties().setReplyTo(ReplyTo("amq.direct",
                                                     "my_grid_results_key"));

Finally, the message can be sent.

   message.getDeliveryProperties().setRoutingKey("grid_key");
   session.messageTransfer(arg::content=message,
                           arg::destination="amq.direct");

This got the job out onto the network, but didn’t actually give me a way to get the results back. For that I setup a function to receive messages on the my_grid_results queue.

void
Listener::received(Message &message) {
   const MessageProperties properties = message.getMessageProperties();
   const FieldTable headers = properties.getApplicationHeaders();

   const string state = headers.getAsString("JobState");
   const int status = headers.getAsInt("JobStatus");
   const int exitCode = headers.getAsInt("ExitCode");
   const string exitBySignal = headers.getAsString("ExitBySigbal");

   std::cout
      << "Response: " << properties.getMessageId() << std::endl
      << "JobState: " << state << std::endl
      << "JobStatus: " << status << std::endl
      << "ExitCode: " << exitCode << std::endl
      << "ExitBySignal: " << exitBySignal << std::endl
      << "Is Body Empty? " << (message.getData().empty() ? "Yes" : "No") << std::endl;
//    << headers << std::endl;

   if ("\"Exited\"" == state && 4 == status) {
         // There were some results returned, they're stored in the
         // message body as a zip archive
      if (!message.getData().empty()) {
         std::ofstream out;
         out.open("job.results");
         out << message.getData();
         out.close();
      }
      subscriptions.cancel(message.getDestination());
   }
}

The function is pretty simple. It prints out information about the messages on the my_grid_results queue, and when it sees a message that represents the completion of my job it writes out the results and stops listening.

To get the receive function called when messages come in, it needs to be setup and run. I started it running after I sent the job message.

   SubscriptionManager subscriptions(session);
   Listener listener(subscriptions);
   subscriptions.subscribe(listener, "my_grid_results");
   subscriptions.run();

See it all work

That was basically it. I compiled the program, ran it, and in short order found the job.results file created.

$ ./a.out
Response: 00000000-0000-0000-0000-000000000000
JobState: 
JobStatus: 2
ExitCode: 0
ExitBySignal: 
Is Body Empty? Yes
Response: 00000000-0000-0000-0000-000000000000
JobState: "Exited"
JobStatus: 4
ExitCode: 0
ExitBySignal: 
Is Body Empty? No
$ unzip -l job.results   
Archive:  job.results
  Length     Date   Time    Name
 --------    ----   ----    ----
        0  03-06-09 13:33   job.err
       12  03-06-09 13:33   job.out
 --------                   -------
       12                   2 files

What happens when my job completes is all the files in its working directory are wrapped up in a zip archive and sent back in the body of a message that has a JobState header with a value of "Exited" and a JobStatus of 4. JobStatus 4 means Completed in the Condor world.

That’s pretty much it. The full example is in LL.cpp.