spice_parallel: submitting a task queue to SPICE¶
If run.txt
is a file with a list of tasks (one per line), then run them all on SPICE with:
spice_parallel --time=10 < run.txt
Options:
--time
- CPU time required for a single task - required.--output
- task output files will be put in$SCRATCH/slurm_output
. If this option is set, the value given will be used as a subdirectory name. So--output=test
will leave the output files in$SCRATCH/slurm_output/test
.--qos
- Quality-of-service queue to submit to. Can behigh
,normal
, orlow
. Leave at the default ofnormal
unless you are sure you know better.--ntasks
Number of CPUs needed by each task. Defaults to 1.--mem
RAM requirement for each task (Mb). Defaults to 10,000.
Source¶
#!/usr/bin/env python
# Run a list of jobs on SPICE.
# Similar to Gnu parallel, except it uses SPICE
import os
import sys
import subprocess
import datetime
import time
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--maxjobs", help="Max no. of jobs to queue",
default=500,
type=int,required=False)
parser.add_argument("--output", help="Sub-directory for slurm output",
default=None,
type=str,required=False)
parser.add_argument("--qos", help="Quality-of-service (high, normal, low)",
default='normal',
type=str,required=False)
parser.add_argument("--batch", help="No. of commands to put into a single job",
default=None,
type=int,required=False)
parser.add_argument("--ntasks", help="Number of cores to assign",
default=1,
type=int,required=False)
parser.add_argument("--mem", help="RAM required (Mb)",
default=10000,
type=int,required=False)
parser.add_argument("--time", help="Max time per job (minutes)",
type=int,required=True)
parser.add_argument("--env", help="Conda environment to use",
default=None,
type=str,required=False)
args = parser.parse_args()
if args.qos not in ('high','normal','low'):
raise ValueError("QOS must be 'normal', 'high', or 'low'")
# Make the script output directory
slopdir="%s/slurm_output/" % os.getenv('SCRATCH')
if args.output is not None:
slopdir="%s/%s" % (slopdir,args.output)
if not os.path.isdir(slopdir):
os.makedirs(slopdir)
jobs = sys.stdin.readlines()
if args.batch is not None:
j2 = []
for idx in range(0,len(jobs),args.batch):
tj=min(idx+args.batch,len(jobs))
j2.append(jobs[idx:tj])
jobs=j2
i=0
while i<len(jobs):
queued_jobs=subprocess.check_output('squeue --user hadpb',
shell=True,
universal_newlines=True).count('\n')
max_new_jobs=args.maxjobs-queued_jobs
for j in range(i,min(len(jobs),i+max_new_jobs)):
f=open("run.slm","w+")
f.write('#!/bin/bash -l\n')
f.write('#SBATCH --output=%s/%d.out\n' %
(slopdir,j))
f.write('#SBATCH --qos=%s\n' % args.qos)
f.write('#SBATCH --ntasks=%d\n' % args.ntasks)
f.write('#SBATCH --ntasks-per-core=1\n')
f.write('#SBATCH --mem=%d\n' % args.mem)
f.write('#SBATCH --time=%d\n' % args.time)
if args.env is not None:
f.write("conda activate %s\n" % args.env)
if args.batch is not None:
for job in jobs[j]:
f.write(job)
else:
f.write(jobs[j])
f.close()
rc=subprocess.call('sbatch run.slm',shell=True)
os.unlink('run.slm')
if max_new_jobs>0: i = i+max_new_jobs
if i<len(jobs): time.sleep(30)