Single node job using Gnu Parallel to process genes

From CAC Documentation wiki
Jump to navigation Jump to search

This job has to work through files in a directory. The program is single-threaded. In order to take advantage of all cores, it uses Gnu Parallel to start up multiple instances on the same node.

The code is in SVN as https://forge.cornell.edu/svn/repos/cacuser/mmc256

#!/bin/bash
# batch.sh
#PBS -l nodes=1,walltime=48:00:00
#PBS -A shm7_0003
#PBS -q v4
#PBS -j oe

echo start `date`
# How many total genes do we want to process?
TASKCNT=1

# How many jobs do we want on each node?
# There are 8 cores, but maybe we want 2 jobs per node.
TASKS_PER_NODE=1

# $PBS_O_WORKDIR is the directory from which the batch was submitted.

SCRATCH=/v4scratch/$USER

# This directory is used for recording which data we have processed.
MONITORDIR=$SCRATCH/monitor
mkdir -p "${MONITORDIR}"

file_size ()
{
  stat -c %s $1
}


# Original files, for comparison.
ORIGDIR=~mmc256/Obesity/Genomes

# We don't want to reread the $HOME directory lots of times, so
# find the file sizes and store them in a file.

for file in `ls "${ORIGDIR}"` 
do
  echo $file $(file_size "${ORIGDIR}/$file")
done>"${MONITORDIR}/input_data.txt"


# We want to start one task per core
# so count the nodes and calculate the cores.
NODECNT=$(wc -l < "$PBS_NODEFILE")
CORES_PER_NODE=8

# If you are using fewer than all cores, Gnu Parallel needs you to
# tell it how many less. An odd format, I know.
if test $TASKS_PER_NODE -lt $CORES_PER_NODE
then
  GP_JOBS_ARG="--jobs $((TASKS_PER_NODE - CORES_PER_NODE))"
else
  GP_JOBS_ARG=""
fi

seq 1 $TASKCNT|parallel $GP_JOBS_ARG --sshloginfile "$PBS_NODEFILE" $PBS_O_WORKDIR/one_gene.sh $SCRATCH $TMPDIR $MONITORDIR

echo finish `date`

The onegene.sh file it runs:

#!/bin/bash
# one_gene.sh
# This script processes the data for one gene.
SCRATCH=$1
TMPDIR=$2
MONITORDIR=$3

# This is where the data comes from.
GENEDIR=$SCRATCH/Obesity/Genomes
REFDIR=$SCRATCH/Obesity/Reference

# Find the genefuncs.sh in the same directory where one_gene is
# located. Do this in case one_gene.sh is called from somehwere else.
HDIR=$0
source ${HDIR%/*}/genefuncs.sh

# Make this global for the message function.
export TID=$(set_tid)

# This is where the program is run.
# The TMPDIR is one directory per process, so we need a subdirectory
# when there is more than one task on the node.
TASKTMP=$TMPDIR/$TID
mkdir -p $TASKTMP

echo Task $TID Scratch $SCRATCH tmp $TASKTMP monitor $MONITORDIR

# This is the place where we copy the results.
RESULTDIR=$SCRATCH/Mosaik

# No need to copy the executable to the working directory.
MOSAIKBIN=~mmc256/mosaik-1.1.0021/bin

mkdir -p $MONITORDIR
mkdir -p $RESULTDIR

FILES_PER_GENE=4
if test $(interactive) == no
then
  GENE=$(next_gene "$MONITORDIR" "$TASKTMP" $FILES_PER_GENE)
else
  GENE=B28
fi

# Now we just start processing
set -x

# File setup.
cd $TASKTMP

if test $(ls -1 ${TASKTMP}/${GENE}_*.fq|wc -l) -ne $FILES_PER_GENE
then
  # No need to copy gzipped files if we can gunzip
  # straight from the shared drive.
  for data in $GENEDIR/$GENE*
  do
    basename=${data##*/}
    nogz=${basename%.gz}
    gunzip -c $data > ${TASKTMP}/${nogz}
  done
fi

# And leave reference files where they are. Just use full path to REFDIR.
# Or you can copy the files and set REFDIR to a period to represent the
# current working directory.


# 1. Format reference genome sequence (Mosaik Build)
# 2. Create a jump database for the reference D. melanogaster genome (Mosaik Jump)
message "3. Format genomic data (Mosaik Build)"
ls -la

# We grep out the lines with "reads" in them so the output won't be crazy.
$MOSAIKBIN/MosaikBuild -q ${GENE}_1_1.fq -q2 ${GENE}_1_2.fq \
    -out ${GENE}.dat -st illumina -p ${GENE}_ | grep -v reads

check_exists ${GENE}.dat

$MOSAIKBIN/MosaikBuild -q ${GENE}_2_1.fq -q2 ${GENE}_2_2.fq \
    -out ${GENE}_2.dat -st illumina -p ${GENE}_ | grep -v reads

check_exists ${GENE}_2.dat


message "4. Align genomic data to reference genome (Mosaik Aligner)"
ls -la

$MOSAIKBIN/MosaikAligner -in ${GENE}.dat -out ${GENE}_aligned.dat \
    -ia ${REFDIR}/Dmel-r5.34.fasta -rur ${GENE}_unaligned.dat \
    -p 8 -hs 15 -mm 15 -mhp 100 -act 35 -bw 35 \
    -j ${REFDIR}/Dmel-r5.34_15 | grep -v reads

check_exists ${GENE}_aligned.dat
check_exists ${GENE}_unaligned.dat

$MOSAIKBIN/MosaikAligner -in ${GENE}_2.dat -out ${GENE}_2aligned.dat \
    -ia ${REFDIR}/Dmel-r5.34.fasta -rur ${GENE}_2unaligned.dat \
    -p 8 -hs 15 -mm 15 -mhp 100 -act 35 -bw 35 \
    -j ${REFDIR}/Dmel-r5.34_15 | grep -v reads

check_exists ${GENE}_2aligned.dat
check_exists ${GENE}_2unaligned.dat

message "5. Run MosaikSort to resolve paired ends"
ls -la

$MOSAIKBIN/MosaikSort -in ${GENE}_aligned.dat -out ${GENE}_sorted.dat |grep -v reads
check_exists ${GENE}_sorted.dat


$MOSAIKBIN/MosaikSort -in ${GENE}_2aligned.dat -out ${GENE}_2sorted.dat|grep -v reads
check_exists ${GENE}_2sorted.dat

message "6. Run MosaikMerge to combine both alignments"
ls -la

$MOSAIKBIN/MosaikMerge -in ${GENE}_sorted.dat -in ${GENE}_2sorted.dat \
    -out ${GENE}_comb_sorted.dat |grep -v reads
check_exists ${GENE}_comb_sorted.dat

message "7. Run MosaikCoverage"
ls -la

mkdir ${GENE}

$MOSAIKBIN/MosaikCoverage -in ${GENE}_comb_sorted.dat \
    -ia ${REFDIR}/Dmel-r5.34.fasta -u -od ${GENE} | grep -v reads

message "8. Convert alignment to bam"
ls -la

$MOSAIKBIN/MosaikText -in ${GENE}_aligned.dat -bam ${GENE}_aligned.bam 

$MOSAIKBIN/MosaikText -in ${GENE}_2aligned.dat -bam ${GENE}_2aligned.bam 

$MOSAIKBIN/MosaikText -in ${GENE}_comb_sorted.dat -bam ${GENE}_comb_sorted.bam 


message "Now it is time to get all the files back."
GENERESULT=${RESULTDIR}/${GENE}
mkdir -p $GENERESULT

for f in _comb_sorted.dat _comb_sorted.bam
do
  cp ${GENE}${f} ${GENERESULT}
done

cp -r ${GENE}/* ${GENERESULT}

message "Then remove unneeded files."
rm ${GENERESULT}/${GENE}.dat
rm ${GENERESULT}/${GENE}_2.dat

rm -rf $TASKTMP

Both of these reference some functions in the genefuncs.sh file.

# Are we running at the command prompt or in batch.
interactive ()
{
  if test -z "$PARALLEL_SEQ" || test -z "$PMI_RANK"
  then
    echo yes
  else
    if test "${ENVIRONMENT:-INT}" == "BATCH"
    then
      echo yes
    else
      echo no
    fi
  fi
}




# Get a task ID if one is available from MPI or Gnu Parallel
set_tid ()
{
  if test -n "$PBS_JOBID"
  then
    TID=${PBS_JOBID%%.*}
  fi
  if test -n "$PARALLEL_SEQ"
  then
    TID="${TID}${PARALLEL_SEQ}"
  else
    if test -n "$PMI_RANK"
    then
      TID="${TID}${PMI_RANK}"
    else
      TID="${TID}1"
    fi
  fi
  echo $TID
}


file_size ()
{
  stat -c %s $1
}


# Send messages to stdout, and date them.
message ()
{
  echo `date` "$*" 1>&2
  echo `date` "$*" >> $SCRATCH/$TID.log
}


# You need to send SIGKILL to stop the parallel program.
kill_parallel ()
{
  if test `pgrep parallel`
  then
    pkill -t KILL parallel
  fi
}



# Kill every job if one file is not made because
# it isn't a good sign.
check_exists ()
{
if test ! -e "$1"
then
  echo "Output file $1 was not made. Exiting."
  ls -la
  kill_parallel
  exit 3
fi
}



# This finds the index of an element in a bash array.
index_of ()
{
  local val=$1
  shift
  local arr=($*)
  local res=notfound
  for (( i=0; i < $((${#arr[*]}-1)) ; i++ ))
  do
    if test "${arr[i]}" == "$val"
    then
      res=$i
      break
    fi
  done
  echo $res
}




# We want each task to find a gene to work on, so we keep a
# list in $MONITORDIR of which genes are taken. This section
# of the script looks to see which genes are not taken and
# have all of their data files ready.
# The output of this command comes in stdout, so call it with
# $(next_gene monitordir tasktmp filepergene).
next_gene ()
{
MONITORDIR=$1
TASKTMP=$2
FILES_PER_GENE=$3

# Start by reading what all the datafiles should be.
# We are putting these into Bash arrays.
declare -a datafile
declare -a datasize

while read name size
do
  datafile[${#datafile[*]}]=$name
  datasize[${#datasize[*]}]=$size
done<"${MONITORDIR}/input_data.txt"

# The message function prints to stderr, so it will not be seen
# by a calling script as output.
message datafile len ${#datafile[*]}
message datasize len ${#datasize[*]}

genes=(`cat "${MONITORDIR}/input_data.txt"|cut -d'_' -f1|sort|uniq`)
gene_cnt=${#genes[*]}


# Then acquire a lock so that the free one that we find is
# not simultaneously taken by another process.
(
  flock 200
  for gene in "${genes[@]}"
  do
    if test ! -e $MONITORDIR/$gene.taken
    then
      # This gene is ready if there are 4 data files.
      AGENEDATA=(`ls $GENEDIR/$gene* 2>/dev/null`)
      if test ${#AGENEDATA[*]} -eq $FILES_PER_GENE
      then
        # Test whether the whole file is there.
        SAME=1
        for gfile in "${AGENEDATA[@]}"
        do
          basename=${gfile##*/}
          idx=$(index_of $basename ${datafile[@]})
          orig_size=${datasize[$idx]}
          if test ! $orig_size -eq $(file_size $gfile)
          then
            SAME=0
          fi
        done
        if test $SAME -eq 1
        then
          message Found gene $gene.
          echo `hostname` ${TID} `date`> $MONITORDIR/$gene.taken
          echo $gene > $TASKTMP/gene.txt
          break
        else
          message Gene $gene did not have all of the data for its files.
        fi
      else
        message Gene $gene did not have four files.
      fi
    else
      message Gene $gene was already taken.
    fi
  done
) 200>$MONITORDIR/markused.lock


# Communicate through the gene.txt file because the lock occurred
# in a subprocess. Cannot share variables out of the subprocess.
if test -e $TASKTMP/gene.txt
then
  read GENE<$TASKTMP/gene.txt
  message Task $TID will work on gene $GENE
else
  message No gene available to work on.
  exit
fi

echo $GENE
}