/*
* Requires package qpidc-devel
* Compile this program with: g++ -lqpidclient
*/
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/SubscriptionManager.h>
#include <unistd.h>
#include <cstdlib>
#include <iostream>
#include <sstream>
#include <fstream>
using namespace qpid::client;
using namespace qpid::framing;
using std::stringstream;
using std::string;
// The queue name that carod is reading from, configured in
// /etc/opt/grid/carod.conf
const char *WORK_QUEUE_NAME = "grid";
class Listener : public MessageListener {
private:
SubscriptionManager &subscriptions;
public:
Listener(SubscriptionManager &subscriptions);
virtual void received(Message &message);
};
Listener::Listener(SubscriptionManager &subs) : subscriptions(subs) { }
void
Listener::received(Message &message) {
const MessageProperties properties = message.getMessageProperties();
const FieldTable headers = properties.getApplicationHeaders();
// Note: prior to condor-low-latency-1.0-11 there is an extra
// space on each attribute name, e.g. "JobState" must be
// "JobState ". Also, type information was lost on attribute
// values so JobStatus was a string instead of an int.
const string state = headers.getAsString("JobState");
const int status = headers.getAsInt("JobStatus");
const int exitCode = headers.getAsInt("ExitCode");
const string exitBySignal = headers.getAsString("ExitBySigbal");
std::cout
<< "Response: " << properties.getMessageId() << std::endl
<< "JobState: " << state << std::endl
<< "JobStatus: " << status << std::endl
<< "ExitCode: " << exitCode << std::endl
<< "ExitBySignal: " << exitBySignal << std::endl
<< "Is Body Empty? " << (message.getData().empty() ? "Yes" : "No") << std::endl;
// << headers << std::endl;
if ("\"Exited\"" == state && 4 == status) {
// There were some results returned, they're stored in the
// message body as a zip archive
if (!message.getData().empty()) {
std::ofstream out;
out.open("job.results");
out << message.getData();
out.close();
}
subscriptions.cancel(message.getDestination());
}
}
int
main(int argc, char **argv)
{
const char *host = argc > 1 ? argv[1] : "127.0.0.1";
int port = argc > 2 ? atoi(argv[2]) : 5674;
const char *work_queue_key = "some_random_string";
const char *response_queue_name = "some_unique_string";
const char *response_queue_key = "some_unique_string_key";
Connection connection;
try {
connection.open(host, port);
Session session = connection.newSession();
session.queueDeclare(arg::queue=WORK_QUEUE_NAME);
session.exchangeBind(arg::exchange="amq.direct",
arg::queue=WORK_QUEUE_NAME,
arg::bindingKey=work_queue_key);
session.queueDeclare(arg::queue=response_queue_name,
// Automatically delete the queue
// when finished
arg::exclusive=true, arg::autoDelete=true);
session.exchangeBind(arg::exchange="amq.direct",
arg::queue=response_queue_name,
arg::bindingKey=response_queue_key);
SubscriptionManager subscriptions(session);
Listener listener(subscriptions);
subscriptions.subscribe(listener, response_queue_name);
Message message;
message.getDeliveryProperties().setRoutingKey(work_queue_key);
// Each message needs a unique Id
message.getMessageProperties().setMessageId(Uuid(true));
// Each message must specify where replies are to be sent
message.getMessageProperties().
setReplyTo(ReplyTo("amq.direct",
response_queue_key));
// Encode the job attributes into the message headers...
//
// Funny \"s everywhere, because we want to tell the
// difference between an attribute that is a string
// ("\"string\"") vs an expression ("expr")
//
// Owner = nobody (a user on all machines)
// Iwd = /tmp (a directory on all machines)
// JobUniverse = 5 (the vanilla universe, good for anything)
// Requirements = TRUE (run anywhere, no restrictions on machines)
// message.getHeaders().setString("MyType", "\"Job\"");
// message.getHeaders().setString("TargetType", "\"Machine\"");
// message.getHeaders().setString("Owner", "\"nobody\"");
// message.getHeaders().setString("In", "\"/dev/null\"");
message.getHeaders().setString("Out", "\"job.out\"");
message.getHeaders().setString("Err", "\"job.err\"");
// TRUE because we do not want to be restricted by
// anything in this example, a.k.a. we know we can run
// everywhere
message.getHeaders().setString("Requirements", "TRUE");
// Run a simple echo program for testing
message.getHeaders().setString("Cmd", "\"/bin/echo\"");
message.getHeaders().setString("Arguments", "\"Hello Grid!\"");
// Run a simple program that touches a new file for testing
// message.getHeaders().setString("Cmd", "\"/bin/touch\"");
// message.getHeaders().setString("Arguments", "\"success\"");
// Run a job that sleeps for awhile, long enough to get a
// few status updates as well as an exit message (note:
// duration must be longer than the
// STARTER_UPDATE_INTERVAL)
// message.getHeaders().setString("Cmd", "\"/bin/sleep\"");
// message.getHeaders().setString("Arguments", "\"120\"");
// Run a java program you have already deployed called
// Hello in /tmp/Hello.class
// message.getHeaders().setString("Cmd", "\"/usr/bin/java\"");
// message.getHeaders().setString("Arguments", "\"-classpath /tmp Hello 'Hello Grid!'\"");
// Run a java program through the Java Universe, when
// you've deployed your program in /tmp/Hello.jar
// message.getHeaders().setInt("JobUniverse", 10);
// message.getHeaders().setString("Cmd", "\"DummyValue\"");
// message.getHeaders().setString("JarFiles", "\"/tmp/Hello.jar\"");
// message.getHeaders().setString("Arguments", "\"Hello 'Hello Grid!'\"");
session.messageTransfer(arg::content=message,
arg::destination="amq.direct");
subscriptions.run();
connection.close();
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
}
return 1;
}