LL.cpp

/*
 * Requires package qpidc-devel
 * Compile this program with: g++ -lqpidclient
 */

#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/SubscriptionManager.h>

#include <unistd.h>
#include <cstdlib>
#include <iostream>

#include <sstream>
#include <fstream>

using namespace qpid::client;
using namespace qpid::framing;

using std::stringstream;
using std::string;

// The queue name that carod is reading from, configured in
// /etc/opt/grid/carod.conf
const char *WORK_QUEUE_NAME = "grid";

class Listener : public MessageListener {
 private:
	SubscriptionManager &subscriptions;
 public:
	Listener(SubscriptionManager &subscriptions);
	virtual void received(Message &message);
};

Listener::Listener(SubscriptionManager &subs) :	subscriptions(subs) { }

void
Listener::received(Message &message) {
	const MessageProperties properties = message.getMessageProperties();
	const FieldTable headers = properties.getApplicationHeaders();

		// Note: prior to condor-low-latency-1.0-11 there is an extra
		// space on each attribute name, e.g. "JobState" must be
		// "JobState ". Also, type information was lost on attribute
		// values so JobStatus was a string instead of an int.
	const string state = headers.getAsString("JobState");
	const int status = headers.getAsInt("JobStatus");
	const int exitCode = headers.getAsInt("ExitCode");
	const string exitBySignal = headers.getAsString("ExitBySigbal");

	std::cout
		<< "Response: " << properties.getMessageId() << std::endl
		<< "JobState: " << state << std::endl
		<< "JobStatus: " << status << std::endl
		<< "ExitCode: " << exitCode << std::endl
		<< "ExitBySignal: " << exitBySignal << std::endl
		<< "Is Body Empty? " << (message.getData().empty() ? "Yes" : "No") << std::endl;
//		<< headers << std::endl;

	if ("\"Exited\"" == state && 4 == status) {
			// There were some results returned, they're stored in the
			// message body as a zip archive
		if (!message.getData().empty()) {
			std::ofstream out;
			out.open("job.results");
			out << message.getData();
			out.close();
		}

		subscriptions.cancel(message.getDestination());
	}
}

int
main(int argc, char **argv)
{
	const char *host = argc > 1 ? argv[1] : "127.0.0.1";
	int port = argc > 2 ? atoi(argv[2]) : 5674;

	const char *work_queue_key = "some_random_string";
	const char *response_queue_name = "some_unique_string";
	const char *response_queue_key = "some_unique_string_key";

	Connection connection;
	try {
		connection.open(host, port);
		Session session =  connection.newSession();

		session.queueDeclare(arg::queue=WORK_QUEUE_NAME);
		session.exchangeBind(arg::exchange="amq.direct",
							 arg::queue=WORK_QUEUE_NAME,
							 arg::bindingKey=work_queue_key);

		session.queueDeclare(arg::queue=response_queue_name,
								 // Automatically delete the queue
								 // when finished
							 arg::exclusive=true, arg::autoDelete=true);
		session.exchangeBind(arg::exchange="amq.direct",
							 arg::queue=response_queue_name,
							 arg::bindingKey=response_queue_key);

		SubscriptionManager subscriptions(session);
		Listener listener(subscriptions);
		subscriptions.subscribe(listener, response_queue_name);

		Message message;
		message.getDeliveryProperties().setRoutingKey(work_queue_key);

			// Each message needs a unique Id
		message.getMessageProperties().setMessageId(Uuid(true));

			// Each message must specify where replies are to be sent
		message.getMessageProperties().
			setReplyTo(ReplyTo("amq.direct",
							   response_queue_key));

			// Encode the job attributes into the message headers...
			//
			//  Funny \"s everywhere, because we want to tell the
			//  difference between an attribute that is a string
			//  ("\"string\"") vs an expression ("expr")
			//
			//   Owner = nobody (a user on all machines)
			//   Iwd = /tmp (a directory on all machines)
			//   JobUniverse = 5 (the vanilla universe, good for anything)
			//   Requirements = TRUE (run anywhere, no restrictions on machines)
//		message.getHeaders().setString("MyType", "\"Job\"");
//		message.getHeaders().setString("TargetType", "\"Machine\"");
//		message.getHeaders().setString("Owner", "\"nobody\"");
//		message.getHeaders().setString("In", "\"/dev/null\"");
		message.getHeaders().setString("Out", "\"job.out\"");
		message.getHeaders().setString("Err", "\"job.err\"");
			// TRUE because we do not want to be restricted by
			// anything in this example, a.k.a. we know we can run
			// everywhere
		message.getHeaders().setString("Requirements", "TRUE");

			// Run a simple echo program for testing
		message.getHeaders().setString("Cmd", "\"/bin/echo\"");
		message.getHeaders().setString("Arguments", "\"Hello Grid!\"");

			// Run a simple program that touches a new file for testing
//		message.getHeaders().setString("Cmd", "\"/bin/touch\"");
//		message.getHeaders().setString("Arguments", "\"success\"");

			// Run a job that sleeps for awhile, long enough to get a
			// few status updates as well as an exit message (note:
			// duration must be longer than the
			// STARTER_UPDATE_INTERVAL)
//		message.getHeaders().setString("Cmd", "\"/bin/sleep\"");
//		message.getHeaders().setString("Arguments", "\"120\"");

			// Run a java program you have already deployed called
			// Hello in /tmp/Hello.class
//		message.getHeaders().setString("Cmd", "\"/usr/bin/java\"");
//		message.getHeaders().setString("Arguments", "\"-classpath /tmp Hello 'Hello Grid!'\"");

			// Run a java program through the Java Universe, when
			// you've deployed your program in /tmp/Hello.jar
//		message.getHeaders().setInt("JobUniverse", 10);
//		message.getHeaders().setString("Cmd", "\"DummyValue\"");
//		message.getHeaders().setString("JarFiles", "\"/tmp/Hello.jar\"");
//		message.getHeaders().setString("Arguments", "\"Hello 'Hello Grid!'\"");		

		session.messageTransfer(arg::content=message,
								arg::destination="amq.direct");

		subscriptions.run();

		connection.close();

		return 0;
	} catch(const std::exception& error) {
		std::cout << error.what() << std::endl;
	}

	return 1;
}
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s


%d bloggers like this: