spice_parallel: submitting a task queue to SPICE

If run.txt is a file with a list of tasks (one per line), then run them all on SPICE with:

spice_parallel --time=10 < run.txt

Options:

  • --time - CPU time required for a single task - required.

  • --output - task output files will be put in $SCRATCH/slurm_output. If this option is set, the value given will be used as a subdirectory name. So --output=test will leave the output files in $SCRATCH/slurm_output/test.

  • --qos - Quality-of-service queue to submit to. Can be high, normal, or low. Leave at the default of normal unless you are sure you know better.

  • --ntasks Number of CPUs needed by each task. Defaults to 1.

  • --mem RAM requirement for each task (Mb). Defaults to 10,000.

Source

#!/usr/bin/env python

# Run a list of jobs on SPICE.
# Similar to Gnu parallel, except it uses SPICE

import os
import sys
import subprocess
import datetime
import time

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--maxjobs", help="Max no. of jobs to queue",
                    default=500,
                    type=int,required=False)
parser.add_argument("--output", help="Sub-directory for slurm output",
                    default=None,
                    type=str,required=False)
parser.add_argument("--qos", help="Quality-of-service (high, normal, low)",
                    default='normal',
                    type=str,required=False)
parser.add_argument("--batch", help="No. of commands to put into a single job",
                    default=None,
                    type=int,required=False)
parser.add_argument("--ntasks", help="Number of cores to assign",
                    default=1,
                    type=int,required=False)
parser.add_argument("--mem", help="RAM required (Mb)",
                    default=10000,
                    type=int,required=False)
parser.add_argument("--time", help="Max time per job (minutes)",
                    type=int,required=True)
parser.add_argument("--env", help="Conda environment to use",
                    default=None,
                    type=str,required=False)
args = parser.parse_args()

if args.qos not in ('high','normal','low'):
   raise ValueError("QOS must be 'normal', 'high', or 'low'")

# Make the script output directory
slopdir="%s/slurm_output/" % os.getenv('SCRATCH')
if args.output is not None:
    slopdir="%s/%s" % (slopdir,args.output)
if not os.path.isdir(slopdir):
    os.makedirs(slopdir)

jobs = sys.stdin.readlines()

if args.batch is not None:
    j2 = []
    for idx in range(0,len(jobs),args.batch):
        tj=min(idx+args.batch,len(jobs))
        j2.append(jobs[idx:tj])
    jobs=j2

i=0
while i<len(jobs):
    queued_jobs=subprocess.check_output('squeue --user hadpb',
                                         shell=True,
                                         universal_newlines=True).count('\n')
    max_new_jobs=args.maxjobs-queued_jobs
    for j in range(i,min(len(jobs),i+max_new_jobs)):
        f=open("run.slm","w+")
        f.write('#!/bin/bash -l\n')
        f.write('#SBATCH --output=%s/%d.out\n' % 
                                         (slopdir,j))
        f.write('#SBATCH --qos=%s\n' % args.qos)
        f.write('#SBATCH --ntasks=%d\n' % args.ntasks)
        f.write('#SBATCH --ntasks-per-core=1\n')
        f.write('#SBATCH --mem=%d\n' % args.mem)
        f.write('#SBATCH --time=%d\n' % args.time)
        if args.env is not None:
            f.write("conda activate %s\n" % args.env)
        if args.batch is not None:
            for job in jobs[j]:
                f.write(job)
        else:
            f.write(jobs[j])
        f.close()
        rc=subprocess.call('sbatch run.slm',shell=True)
        os.unlink('run.slm')
    if max_new_jobs>0: i = i+max_new_jobs
    if i<len(jobs): time.sleep(30)