#!/usr/bin/env python """ Simple driver script that splits up the large movie into two minute slices, sends them out to the computational grid to be encoded, waits for the results to be staged back, and then combines the results into a newly encoded version of the movie. """ import os import sys import getopt import socket import popen2 import time import random def usage(): """ Print a usage message to stderr. @return: None @rtype: None """ msg = """\ NAME encodeWorkflow SYNOPSIS encodeWorkflow --input=PATH --PBShost=HOSTNAME --SGEhost=HOSTNAME [ --output=PATH] [ --help ] encodeWorkflow --clean DESCRIPTION Simple driver script that splits up the large movie into two minute slices, sends them out to the computational grid to be encoded, waits for the results to be staged back, and then combines the results into a newly encoded version of the movie. This script assumes that it is being run on the computer used as part of the tutorial as the 'client' computer, and that the GridFTP server is running on the computer. -i, --input Path to the movie file used as input. This script assumes you are using the movie file supplied as part of tutorial. -p, --PBShost Fully qualified domain name for the computer on which Globus GRAM WS is running with a configured PBS jobmanager. -s, --SGEhost Fully qualified domain name for the computer on which Globus GRAM WS is running with a configured SGE jobmanager. -o, --output The name to use for the final output file containing the new encoding of the movie. If no output name is provided a default will be created based on the input file name. -c, --clean Remove intermediate files. This option is to be used only by itself after running the script successfully to generate a new encoding. -h, --help Print this usage message. EXAMPLE $ encodeWorkflow --input=./movie.mov --PBShost=nodeb.ps.univa.com --SGEhost=nodec.ps.univa.com --output=./newmovie.mpg \ """ print >>sys.stderr, msg def checkCredentials(): """ Check for a valid proxy credential. Only a simple existence check is done. @return: None @rtype: None @raise RuntimeException: raised if there is an error checking for a credential or if no valid credential is found. """ cmd = "grid-proxy-info -e" try: job = popen2.Popen4(cmd) ret = job.wait() except Exception, e: msg = "Error while checking for a valid credential: %s" % e raise RuntimeError, msg if ret: msg = "Error: No valid proxy credential found. Run 'grid-proxy-init' and try again." raise RuntimeError, msg def sliceAndSubmit(): """ Slice the input movie file into twelve equal pieces. After creating each slice submit a grid job to re-encode the slice in a different format. @return: None @rtype: None """ for sliceNumber, startTime in zip(sliceNumbers,sliceStartTimes): newSliceFilePath = singleSlice(sliceNumber, startTime) singleSubmit(sliceNumber) def singleSlice(sliceNumber, startTime): """ Create a single two minute slice from the movie starting at a particular time and identified by a particular number. @param sliceNumber: the number labeling the created slice @type: integer @param startTime: the time in minutes at which the slice should start @type: integer @return: None @rtype: None @raise RuntimeError: thrown if there is an error while creating the slice """ sliceFileName = "slice%02d" % sliceNumber sliceCommandTemplate = "mencoder %s -ss 00:%02d:00 -endpos 00:02:00 -ovc copy -oac pcm -o %s" sliceCommand = sliceCommandTemplate % (inputFileName, startTime, sliceFileName) print "Creating next slice with start time = 00:%02d:00" % startTime print "Command is: %s" % sliceCommand try: job = popen2.Popen3(sliceCommand, capturestderr = True) jobOut = [] jobErr = [] ret = job.poll() while ret == -1: jobOut.extend(job.fromchild.readlines()) jobErr.extend(job.childerr.readlines()) time.sleep(1) ret = job.poll() except Exception, e: msg = "Error while running job to create slice: %s" % e raise RuntimeError, msg if ret != 0: errorString = "".join(jobErr) msg = "Error while creating slice: %s" % errorString raise RuntimeError, msg print "Finished creating slice number %d" % sliceNumber def singleSubmit(sliceNumber): """ Submit a job via Globus GRAM WS for encoding. The remote factory is chosen at random. @param sliceNumber: the number labeling the slice to be encoded @type: integer @return: None @rtype: None @raise RuntimeError: thrown if there is an error while submitting the job """ submitCommandTemplate = "globusrun-ws -submit -batch -o slice%02d.epr -S -F %s -Ft %s -f %s" # choose randomly the PBS or SGE factory factoryType = random.choice(['PBS','SGE']) factory = factoryCatalog[factoryType] jobDescriptionFilePath = createGridJobDescriptionFile(sliceNumber) submitCommand = submitCommandTemplate % ( sliceNumber, factory, factoryType, jobDescriptionFilePath ) print "Preparing to submit job to grid..." print "Submit command: %s" % submitCommand try: job = popen2.Popen3(submitCommand, capturestderr = True) jobOut = [] jobErr = [] ret = job.poll() while ret == -1: jobOut.extend(job.fromchild.readlines()) jobErr.extend(job.childerr.readlines()) time.sleep(1) ret = job.poll() except Exception, e: msg = "Error while submitting GRAM WS job: %s" % e raise RuntimeError, msg if ret != 0: errorString = "".join(jobErr) msg = "Error while submitting grid job: %s" % errorString raise RuntimeError, msg # after submitting job set the job state to False so that it # can be monitored later encodingGridJobCompletion[sliceNumber] = False print "Grid encoding job submitted to factory type %s" % factoryType print # This is a template for the job description file that will be # used to submit each grid encoding job. jobDescriptionTemplate = """\ /usr/bin/mencoder ${GLOBUS_USER_HOME} slice%(number)02d -vf harddup -ovc lavc -oac lavc -lavcopts vcodec=mpeg4:vqmax=4:acodec=ac3:abitrate=128 -of avi -o mpeg4_slice%(number)02d slice%(number)02d.stdout slice%(number)02d.stderr %(sourceUrl)s file:///${GLOBUS_USER_HOME}/slice%(number)02d 4 file:///${GLOBUS_USER_HOME}/mpeg4_slice%(number)02d %(destinationUrl)s 4 file:///${GLOBUS_USER_HOME}/slice%(number)02d.stdout %(stdoutDestinationUrl)s file:///${GLOBUS_USER_HOME}/slice%(number)02d.stderr %(stderrDestinationUrl)s file:///${GLOBUS_USER_HOME}/slice%(number)02d file:///${GLOBUS_USER_HOME}/mpeg4_slice%(number)02d file:///${GLOBUS_USER_HOME}/slice%(number)02d.stdout file:///${GLOBUS_USER_HOME}/slice%(number)02d.stderr """ def createGridJobDescriptionFile(sliceNumber): """ Write into the current working directory a job description file. Uses the template above. @param sliceNumber: number that labels the current slice @type sliceNumber: integer @return: path of job description file @rtype: string @raise RuntimeError: thrown if unable to write the job description file. """ localhost = socket.getfqdn() cwd = os.getcwd() sourceUrl = "gsiftp://%s%s/slice%02d" % (localhost,cwd,sliceNumber) destinationUrl = "gsiftp://%s%s/mpeg4_slice%02d" % (localhost,cwd,sliceNumber) stdoutDestinationUrl = "gsiftp://%s%s/slice%02d.stdout" % (localhost,cwd,sliceNumber) stderrDestinationUrl = "gsiftp://%s%s/slice%02d.stderr" % (localhost,cwd,sliceNumber) jobDescription = jobDescriptionTemplate % { 'number': sliceNumber, 'sourceUrl': sourceUrl, 'destinationUrl': destinationUrl, 'stdoutDestinationUrl': stdoutDestinationUrl, 'stderrDestinationUrl': stderrDestinationUrl, } jobDescriptionFilePath = os.path.abspath("encode-slice%02d.rsl" % sliceNumber) try: jobDescriptionFile = open(jobDescriptionFilePath, "w") print >>jobDescriptionFile, jobDescription jobDescriptionFile.close() except Exception, e: msg = "Error writing out job description file for slice %d: %s" % (sliceNumber, e) raise RuntimeError, msg return jobDescriptionFilePath def isGridJobComplete(slice): """ Query to determine if a particular grid encoding job is complete. @param slice: the number labeling the slice being encoded as part of the job @type slice: integer @return: True if job is complete or False if not complete. @rtype: Boolean @raise RuntimeError: thrown if an error is encountered while trying to query for the job state. """ command = "globusrun-ws -status -job-epr-file slice%02d.epr" % slice try: job = popen2.Popen3(command, capturestderr = 1) jobOut = [] jobErr = [] ret = job.poll() while ret == -1: jobOut.extend(job.fromchild.readlines()) jobErr.extend(job.childerr.readlines()) time.sleep(1) ret = job.poll() except Exception, e: msg = "Error while checking grid job status: %s" % e raise RuntimeError, msg if ret != 0: errorString = "".join(jobErr) msg = "Error while checking grid job status: %s" % errorString raise RuntimeError, msg jobStateString = "".join(jobOut) if jobStateString.find("Done") == -1: return False else: return True def checkGridJobStatus(): """ Loop through the jobs checking to see if all are completed. The state of each job is saved so jobs known to be complete are not queried again. This function will return False as soon as it finds a job that is not completed. @return: True if all jobs are completed or False if not all are completed. @rtype: Boolean @raise RuntimeError: thrown if an error is encoutered while checking status of jobs. """ for slice in sliceNumbers: if not encodingGridJobCompletion[slice]: try: if isGridJobComplete(slice): encodingGridJobCompletion[slice] = True print "Grid encoding job for slice %d is completed" % slice else: print "Grid encoding job for slice %d is not complete yet..." % slice return False except Exception, e: msg = "Error while checking status of grid jobs: %s" % e raise RuntimeError, msg return True def stitch(): """ Combine the newly encoded slices into a single movie file. @return: None @rtype: None @raise RuntimeError: thrown if there is an error while combining the slices """ encodedSlices = [ "mpeg4_slice%02d" % n for n in sliceNumbers] args = " ".join(encodedSlices) command = "mencoder -ovc copy -oac copy -o %s %s" % (outputFilePath, args) print "Combining encoded slices next..." print "Command is: %s" % command try: job = popen2.Popen3(command, capturestderr = 1) jobOut = [] jobErr = [] ret = job.poll() while ret == -1: jobOut.extend(job.fromchild.readlines()) jobErr.extend(job.childerr.readlines()) time.sleep(1) ret = job.poll() except Exception, e: msg = "Error while recombining: %s" % e raise RuntimeError, msg if ret != 0: errorString = "".join(jobErr) msg = "Error while recombining: %s" % errorString raise RuntimeError, msg def cleanup(): """ Remove the intermediate files created during the running of this script. Note that the end result, the newly encoded movie file, is not removed. If there is an error while removing a file that file is simply passed over. @return: None @rtype: None """ for slice in sliceNumbers: clean = [] clean.append("slice%02d" % slice) clean.append("encode-slice%02d.rsl" % slice) clean.append("mpeg4_slice%02d" % slice) clean.append("slice%02d.epr" % slice) clean.append("slice%02d.stdout" % slice) clean.append("slice%02d.stderr" % slice) for name in clean: try: os.unlink(name) except Exception, e: pass # The main program begins here. # Process command line arguments. shortop = "i:p:s:o:hc" longop = [ "input=", "PBShost=", "SGEhost=", "output=", "help", "clean", ] try: opts, args = getopt.getopt(sys.argv[1:], shortop, longop) except getopt.GetoptError, e: print >>sys.stderr, "Error parsing command line: %s" % e print >>sys.stderr, "Enter '%s --help' for usage" % sys.argv[0] sys.exit(1) # defaults inputFileName = None outputFileName = None PBShostname = None SGEhostname = None clean = False # A list containing the numbering of the slices. sliceNumbers = range(12) # A list containing the start times for each of the slices. sliceStartTimes = range(0,24,2) for o, a in opts: if o in ("-h", "--help"): usage() sys.exit(1) elif o in ("-i", "--input"): inputFileName = a elif o in ("-o", "--output"): outputFileName = a elif o in ("-p", "--PBShost"): PBShostname = a elif o in ("-s", "--SGEhost"): SGEhostname = a elif o in ("-c", "--clean"): clean = True else: print >>sys.stderr, "Error parsing command line" print >>sys.stderr, "Enter '%s --help' for usage" % sys.argv[0] sys.exit(1) if clean: cleanup() sys.exit(0) # Some moderate sanity checking. if not inputFileName: print >>sys.stderr, "Error: Input file name must be specified." print >>sys.stderr, "Enter '%s --help' for usage" % sys.argv[0] sys.exit(1) if not PBShostname: print >>sys.stderr, "Error: hostname for GRAM WS PBS service must be specified." print >>sys.stderr, "Enter '%s --help' for usage" % sys.argv[0] sys.exit(1) if not SGEhostname: print >>sys.stderr, "Error: hostname for GRAM WS SGE service must be specified." print >>sys.stderr, "Enter '%s --help' for usage" % sys.argv[0] sys.exit(1) inputFilePath = os.path.abspath(inputFileName) if not os.path.exists(inputFilePath): print >>sys.stderr, "Error: input file %s does not exist." % inputFilePath print >>sys.stderr, "Enter '%s --help' for usage" % sys.argv[0] sys.exit(1) if not outputFileName: outputFileName = "mpeg_" + os.path.basename(os.path.abspath(inputFileName)) outputFilePath = os.path.abspath(outputFileName) if os.path.exists(outputFilePath): print >>sys.stderr, "Error: output file %s already exists." % outputFilePath print >>sys.stderr, "Enter '%s --help' for usage" % sys.argv[0] sys.exit(1) # Do our best to make sure we have FQDN for the remote servers PBShostFQDN = socket.getfqdn(PBShostname) SGEhostFQDN = socket.getfqdn(SGEhostname) factoryCatalog = { 'PBS': 'https://%s:8443/wsrf/services/ManagedJobFactoryService' % PBShostFQDN, 'SGE': 'https://%s:8443/wsrf/services/ManagedJobFactoryService' % SGEhostFQDN, } # A structure for holding the state of the encoding jobs. encodingGridJobCompletion = {} # Check for valid credentials checkCredentials() # Loop over the movie and slice it into 12 pieces with each one being # two minutes. After creating a slice submit a job to the grid for # encoding the slice and record the job ID so that we can query later # to determine when a job has completed. sliceAndSubmit() # All encoding jobs are now submitted. We can monitor their # progress and when they are all finished we can stitch back # together the newly encoded pieces. allDone = False while not allDone: allDone = checkGridJobStatus() if not allDone: time.sleep(30) print "All grid encoding jobs are complete!" print # All encoding jobs are completed so now stitch the results # back together. stitch() print "Finished!"