Posts Tagged ‘Python’

Submitting a DAG via Aviary using Python

September 16, 2011

Submitting individual jobs through Condor’s various interfaces is, unsurprisingly, the first thing people do. A quick second is submitting DAGs. I have previously discussed this in Java with BirdBath.

Aviary is a suite of APIs that expose Condor features via powerful, easy to use developer interfaces. It builds on experience from other implementations and takes an approach of exposing common use cases through clean abstractions, while maintaining the Condor philosophy of giving experts access to extended features.

The code is maintained in the contributions section of the Condor repository and is documented in the Grid Developer Guide.

The current implementation provides a SOAP interface for job submission, control and query. It is broken in two parts: a plugin to the condor_schedd that exposes the submission and control; and, a daemon, aviary_query_server, exposing the data querying capabilities.

Installation on Fedora 15 and beyond is a simple yum install condor-aviary. The condor-aviary package includes configuration placed in /etc/condor/config.d. A reconfig of the condor_master, to start the aviary_query_server, and restart of the condor_schedd, to load the plugin, is necessary.

Once installed, there are examples in the repository, including a python submit script.

Starting with the above, is a straightforward extension following the example.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2009-2011 Red Hat, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

# uses Suds -
from suds.client import Client
import sys, pwd, os, logging, argparse

def attr_builder(type_, format):
    def attr(name, value):
        attr = client.factory.create("ns0:Attribute") = name
        attr.type = type_
        attr.value = format % (value,)
        return attr
    return attr
string_attr=attr_builder('STRING', '%s')
int_attr=attr_builder('INTEGER', '%d')
expr_attr=attr_builder('EXPRESSION', '%s')

parser = argparse.ArgumentParser(description='Submit a job remotely via SOAP.')
parser.add_argument('-v', '--verbose', action='store_true',
                    default=False, help='enable SOAP logging')
parser.add_argument('-u', '--url', action='store', nargs='?', dest='url',
                    help='http or https URL prefix to be added to cmd')
parser.add_argument('dag', action='store', help='full path to dag file')
args =  parser.parse_args()

uid = pwd.getpwuid(os.getuid())[0] or "nobody"

client = Client('file:/var/lib/condor/aviary/services/job/aviary-job.wsdl')


if args.verbose:
    print client

    result = client.service.submitJob(
        '-f -l . -Debug 3 -AutoRescue 1 -DoRescueFrom 0 -Allowversionmismatch -Lockfile %s.lock -Dag %s' % (args.dag, args.dag),
        [string_attr('Env', '_CONDOR_MAX_DAGMAN_LOG=0;_CONDOR_DAGMAN_LOG=%s.dagman.out' % (args.dag,)),
         int_attr('JobUniverse', 7),
         string_attr('UserLog', args.dag + '.dagman.log'),
         string_attr('RemoveKillSig', 'SIGUSR1'),
         expr_attr('OnExitRemove', '(ExitSignal =?= 11 || (ExitCode =!= UNDEFINED && ExitCode >= 0 && ExitCode <= 2))')]
except Exception, e:
    print 'invocation failed at: ', args.url
    print e

if result.status.code != 'OK':
    print result.status.code,'; ', result.status.text

print args.verbose and result or

Scheduler Universe: A meta-scheduler’s home

May 16, 2010

# I'm a meta-scheduler. I submit and perform high-level
# management of jobs in a Schedd's queue. I have specialized
# knowledge that the Schedd should not have to worry itself
# about.
# I like to run in the Scheduler Universe with
# hold_kill_sig=SIGUSR1 and remove_kill_sig=SIGUSR2,
# e.g.
#  universe = scheduler
#  cmd =
#  hold_kill_sig = SIGUSR1
#  remove_kill_sig = SIGUSR2
#  queue
# When I run in the Scheduler Universe,
#  o I run directly under the Schedd
#  o I have to carefully manage the processes I start
#  o I can get special signals in response to
#    condor_hold and condor_rm
#  o I should always exit in response to signals
#  o I should always cleanup after myself before exiting
#  o I know if I get an RM signal I'm leaving the queue
#    and may never run again
#  o I might get a HOLD signal and never an RM, if I'm
#    held and not released before I'm removed
# I often want to make sure I don't leave the Schedd's
# queue unless I complete successfully. To help assure
# this is the case, I can be used with on_exit_remove,
# e.g
#  on_exit_remove = ExitCode =!= UNDEFINED && ExitCode == 42

import signal, time, sys

running = True

def rm_handler(signum, frame):
    global running
    # Someone ran condor_rm on me!
    running = False

def hold_handler(signum, frame):
    global running
    # Someone ran condor_hold on me!
    running = False

signal.signal(signal.SIGUSR1, hold_handler)
signal.signal(signal.SIGUSR2, rm_handler)

while running:


%d bloggers like this: