Skip to content

Commit

Permalink
cleaned and fixed fq2bam path in source installation
Browse files Browse the repository at this point in the history
  • Loading branch information
yuk12 committed Sep 3, 2024
1 parent 23aea33 commit fc10e75
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 139 deletions.
2 changes: 1 addition & 1 deletion pipelines/fq2sortedbam/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ docker run -v <inputdir>:/input <outdir>:/out <refdir>:/refdir <tempdir>:/tempdi
### Installation:
```
git clone --recursive https://github.com/IntelLabs/Open-Omics-Acceleration-Framework.git
cd Open-Omics-Acceleration-Framework/blob/main/pipelines/fq2sortedbam/
cd Open-Omics-Acceleration-Framework/pipelines/fq2sortedbam/
bash install.sh <onprem/cloud> ## onprem mode: Manually install the depenendies present in basic_setup_ubuntu.sh as it needs sudo access
```

Expand Down
152 changes: 14 additions & 138 deletions pipelines/fq2sortedbam/dist_bwa.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def pragzip_reader_real( comm, cpus, fname, outpipe, outdir, last=False ):
if d[0] == 64: # @ symbol, begin new record
break
startpos = f.tell()
#print (rank, startpos, d[0:32])

st_vec = comm.allgather(startpos) # get all ranks start positions
if rank==nranks-1:
endpos = total_len
Expand Down Expand Up @@ -270,35 +270,16 @@ def sort_thr5(fname, comm):
sz = 0
while ndone < nranks:
bf_sz = 100000000
#bf_sz2 = 100 #100000
#req1 = comm.irecv(bf_sz, tag=0)
#stat1 = MPI.Status()
#vl = req1.wait(stat1)
#print(f'[{rank}] recv size vl: ', stat1.Get_count(MPI.CHAR))
##print(f'[{rank}] Error vl: ', MPI.Get_error_string(stat1.Get_error()))
#req = comm.irecv(bf_sz2, tag=1)
#stat = MPI.Status()
#print(f'[{rank}] status: ', stat)
#print(f'[{rank}] req', req)
#b = req.wait(stat)
#print(f'[{rank}] #size: ', stat.Get_count(MPI.CHAR))
#req = comm.irecv()
#b, vl = req.wait()
req = comm.irecv(bf_sz)
b, vl = req.wait()
#recv_rank = b
#print(f" [{rank}] after wait ", ndone, "/", nranks, ' recvd from: ', b)
b = b//nranks
if len(vl)>0 and vl[0]=="done":
ndone+=1
#print(f"[{rank}] recvd done from rank ", recv_rank, sz)
#print ("sort_thr "+str(rank)+", got done:"+str(ndone))
else:
fl[b].writelines(vl)
#pass

[f.close() for f in fl]
t1 = time.time()
#print(f"[{rank}] Done sort_thr "+str(rank)+" time: "+str(t1-t0))


# used for mode 2 and above
Expand All @@ -319,31 +300,9 @@ def send_wrap(v,r,force=False, b=None):
if b is None: b=r
send_bufs[b].append(v)
if force or len(send_bufs[b])>=20:
#print("send",rank,r)
#comm.send( (b, send_bufs[b]),r)
#assert len(send_bufs[b]) < 10000000
req = comm.isend( (b, send_bufs[b]),r)
req.wait()
#req.test()
#req1 = comm.isend(send_bufs[b],r, tag=0)
##req1 = comm.isend(["wasim"],r)
#stat1 = MPI.Status()
#req1.wait(stat1)
#print(f"[{rank}] wait", stat1.Get_count(MPI.CHAR), len(send_bufs[b]))
#assert stat1.Get_count(MPI.CHAR) < 100000000
#
#req = comm.isend(b,r, tag=1)
#stat = MPI.Status()
#req.wait(stat)
#print(f"[{rank}] free", stat.Get_count(MPI.CHAR))

#if not req.Test():
# sent_msgs.append( (req, send_bufs[b]) ) # if send is not yet complete, save this request and data
# print("saved request",rank,r)
#req.wait()
send_bufs[b].clear()
#send_bufs[b] = []
#if force: print ("send forced on rank "+str(rank)+" to "+str(r))
return req
#sent_msgs = [ rqt for rqt in sent_msgs if not rqt[0].Test() ]
def read_wrap(f):
Expand All @@ -362,7 +321,6 @@ def read_wrap(f):
headerlen += len(l)
l = l.split()
if l[0] == '@SQ': # @SQ lines describe sequences in reference genome
#sn = l[1].split(':')[1]
a = len(l[1].split(':')[0])
sn = l[1][a+1:]
ln = int(l[2].split(':')[1])
Expand All @@ -378,48 +336,34 @@ def read_wrap(f):
cumlen += ln
else :
seq_start[sn] = -1
#l = f1.readline()
l = read_wrap(f1)
# done reading headers
if keep:
seq_start['*'] = cumlen # put all unmatched reads at end
else:
seq_start['*'] = -1

#print(f"[{rank}] done part 1 ")
#print(seq_start)
calculate_bins(nranks)
binstarts = [ b[0] for b in bins ]
#if rank==0: print("bins", bins)
headers_done.release() # allow output thread to begin
#print (binstarts)

# read remaining lines (aligned short reads), send to appropriate bin
i = 0
while l:
x = l.split()
seq = x[2]
offset = int(x[3])
#print (i, seq, offset)
if keep:
key = seq_start[seq] + offset
#b = bisect.bisect(binstarts, key) - 1
bn = bisect_wrap(binstarts, key) - 1
#if bn==0 and not (seq=='chr1' or seq=='chr2'):
# print("BAD BIN", seq, seq_start[seq], offset, key, bn)
_, r, b = bins[bn]
#print (seq, offset, key, bn, r, b)
#comm.send( (key, b, l), r )
send_wrap( l, r, b=bn )
else :
if seq_start[seq]!= -1:
key = seq_start[seq] + offset
#b = bisect.bisect(binstarts, key) - 1
temp2=time.time()
bn = bisect_wrap(binstarts, key) - 1
_, r, b = bins[bn]
#print (seq, offset, key, bn, r, b)
#comm.send( (key, b, l), r )
send_wrap( l, r, b=bn )

l = read_wrap(f1)
Expand All @@ -434,17 +378,6 @@ def read_wrap(f):
#print(f"[{rank}] sending done to rank: ", r)
send_wrap("done", rank, force=True) # send to self last to avoid race condition with allreduce
t1 = time.time()
#cnt=0
#while cnt<4:
# for rq in rt:
# flag, status = rq.test()
# if flag:
# cnt+=1
# print(status, cnt, flush=True)
#print(f"{rank} check", flush=True)
#total = comm2.allreduce(i) ## put it after confirming that all the send and recv done
#total = comm.allreduce(i)
#print("sw_thr "+str(rank)+" time: "+str(t1-t0)+" "+str(i)+" "+str(total))

# used for mode 2 and above
def sam_writer( comm, fname ):
Expand Down Expand Up @@ -495,35 +428,24 @@ def main(argv):
parser.add_argument('--params', default='', help="parameter string to bwa-mem2 barring threads paramter")
parser.add_argument("-i", "--index", help="name of index file")
parser.add_argument("-p", "--outfile", help="prefix for read files")
#parser.add_argument("-r", "--preads", nargs='+',help="name of reads file seperated by space")
parser.add_argument("-c", "--cpus",default=1,help="Number of cpus. default=1")
parser.add_argument("-t", "--threads",default=1,help="Number of threads used in samtool operations. default=1")
parser.add_argument("-b", "--bam_size",default=9, help="bam file size in GB")
parser.add_argument('-in', '--rindex', default='False', help="It will build reference genome index for bwa aligner. If it is already done offline then don't use this flag.")
parser.add_argument('-dindex', default='False', help="It will create .fai index. If it is done offline then disable this.")
#parser.add_argument('--container_tool',default="docker",help="Container tool used in pipeline : Docker/Podman")
#parser.add_argument('--shards',default=1,help="Number of shards for deepvariant")
parser.add_argument('-pr', '--profile',action='store_true',help="Use profiling")
parser.add_argument('--keep_unmapped',action='store_true',help="Keep Unmapped entries at the end of sam file.")
#parser.add_argument('--se_mode',action='store_true',help="Single End (SE) reads only. Paired End (PE) mode is default")
parser.add_argument('--read_type',default="short",
help="(short/long): bwa-mem2 with short reads, mm2-fast with long reads")
parser.add_argument('-y', "--config", default="", help="config yaml file for args")
## Arg values in the provided yaml file override any command-line or default args values
args = vars(parser.parse_args())

#print('before args: ', args)
if args["config"] != "":
args = populate_yaml(args)

#print('after args: ', args)
global chromo_dict
#ncpus = int(cpus)
#start0 = time.time()

#se_mode=args["se_mode"]
read_type=args["read_type"]

global chromo_dict
read_type=args["read_type"]
ifile=args["index"]
params=args["params"]

Expand All @@ -537,7 +459,6 @@ def main(argv):
se_mode = True

if read_type == "long":
#assert se_mode == True, "for long reads se_mode should be enabled by the code"
se_mode = True ## for long reads, se_mode=True always

whitelist=args["whitelist"]
Expand Down Expand Up @@ -572,14 +493,10 @@ def main(argv):
rank = comm.Get_rank()
nranks = comm.Get_size()

#print(refdir + ifile)
#global bins_per_rank
#bins_per_rank = max(4,bins_per_rank//nranks)
global ncpus
ncpus = int(cpus)

if rank == 0:
#print("\n")
if se_mode:
print("[Info] Running in SE reads only mode!!!")
else: print("[Info] Running in PE reads mode!!!")
Expand All @@ -597,7 +514,6 @@ def faidx(refdir, ifile):
return 1

tic=time.time()
#print(f'{SAMTOOLS} faidx '+os.path.join(refdir,ifile) + ' > samlog.txt')
a=run(f'{SAMTOOLS} faidx '+os.path.join(refdir,ifile) + ' > ' + output + 'logs/samlog.txt', capture_output=True, shell=True)
assert a.returncode==0,"samtools reference index creation failed"
end=time.time()
Expand All @@ -612,11 +528,6 @@ def faidx(refdir, ifile):
flg = faidx(refdir, ifile)

allexit(comm, flg)
#comm.barrier()
#flg = comm.bcast(flg, root=0)
#if flg:
# os.sys.exit(1)
#comm.MPI_Finalize()

flg = 0
if rank == 0:
Expand All @@ -626,7 +537,6 @@ def faidx(refdir, ifile):
os.mkdir(fo)
except:
print("Error: Unable to create logs folder inside the ouptut folder")
#os.sys.exit(1)
flg = 1
else:
print('[Info] output logs folder exits, will override the log files')
Expand All @@ -645,33 +555,14 @@ def faidx(refdir, ifile):
'/logs/bwaindexlog.txt', capture_output=True,shell=True)

end=time.time()
#file_size = os.path.getsize(folder+rfile1)
print("\n[Info] Bwa Index creation time:",end-begin)
#print("\n[Info] Size of FASTQ file:",file_size)

comm.barrier()
#################################################################
if mode in ['flatmode', 'sortedbam', 'multifq']:
## bwa-mem2 index path check, every ranks checks the access.
#g = refdir + ifile
#print(g.split(".")[-1])
#if g.split(".")[-1] != "gz":
# if rank == 0:
# faidx(refdir, ifile)
#else:
# print("Error: genome file should not be gzip. Exiting...")
# os.sys.exit(1)
#comm.barrier()

#print("Genome: ", refdir + ifile)
assert os.path.exists(refdir + ifile) == True, "missing bwa-mem2 genome"
if not read_type == "long":
flg = 0
#assert os.path.exists(refdir + ifile + ".bwt.2bit.64") == True, "missing bwa-mem2 index files"
#assert os.path.exists(refdir + ifile + ".0123") == True, "missing bwa-mem2 index files"
#assert os.path.exists(refdir + ifile + ".amb") == True, "missing bwa-mem2 index files"
#assert os.path.exists(refdir + ifile + ".ann") == True, "missing bwa-mem2 index files"
#assert os.path.exists(refdir + ifile + ".pac") == True, "missing bwa-mem2 index files"
if os.path.exists(refdir + ifile + ".bwt.2bit.64") == False: flg=1
if os.path.exists(refdir + ifile + ".0123") == False: flg = 1
if os.path.exists(refdir + ifile + ".amb") == False: flg=1
Expand All @@ -686,13 +577,10 @@ def faidx(refdir, ifile):


if mode in ['flatmode', 'sortedbam']:
#print("Read1: ", folder + rfile1)
assert os.path.exists(folder+rfile1) == True, "missing input read1 files"
if not se_mode:
#print("Read2: ", folder + rfile2)
assert os.path.exists(folder+rfile2) == True, "missing input read2 files"
# output paths
#print("Output dir: ", folder)
assert os.path.exists(output) == True, "output path does not exist"

if mode in ['sortedbam', 'flatmode']:
Expand All @@ -711,7 +599,6 @@ def faidx(refdir, ifile):


# chromo_dict information added
#print('####### keep: ', keep, flush=True)
if keep == False and rank == 0:
chromo_file = os.path.join(refdir,ifile)+".fai"
if os.path.isfile(chromo_file):
Expand All @@ -729,9 +616,7 @@ def faidx(refdir, ifile):


comm.barrier()
keep = comm.bcast(keep, root=0)
#print('####### keep: ', keep, flush=True)

keep = comm.bcast(keep, root=0)

if mode == 'flatmode':
r'''
Expand All @@ -751,8 +636,6 @@ def faidx(refdir, ifile):
- It needs reads files in gzip format
'''

#if rank == 0:
# print("dist bwa-mem2 starts in flatmode")
if rank == 0 and read_type == "short":
print("[Info] bwa-mem2 starts in flatmode")
if rank == 0 and read_type == "long":
Expand All @@ -765,13 +648,8 @@ def faidx(refdir, ifile):
fn2 = pragzip_reader( comm, int(cpus), folder+rfile2, output, last=True )
fn3 = os.path.join(output, outfile + str("%05d"%rank) + ".sam")
begin1 = time.time()
#a=run(f'{BINDIR}/applications/bwa-mem2/bwa-mem2 mem -t '+cpus+' '+refdir+ifile+' '+fn1+' '+fn2+' > '+fn3,capture_output=True, shell=True)
#bwastr = '{BINDIR}/applications/bwa-mem2/bwa-mem2 mem -t '+cpus+' '+refdir+ifile+' '+fn1+' '+fn2+' > '+fn3 + ' 2> ' + output +'/bwalog' + str(rank) + '.txt'
#print(bwastr)
if se_mode:
if read_type == "long":
## s='minimap2 ' + params + ' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+' > '+fn3 + ' 2> ' + output +'logs/mm2log' + str(rank) + '.txt'
#print('mm2 cmd: ', s, flush=True)
a = run(f'{MINIMAP2} ' + params + ' -t '+cpus+' '+refdir+ifile+' '+
fn1+' '+' > '+fn3 + ' 2> ' + output +'logs/mm2log' +
str(rank) + '.txt',capture_output=True, shell=True)
Expand Down Expand Up @@ -964,24 +842,23 @@ def faidx(refdir, ifile):
begin1 = time.time()
if se_mode:
if read_type == "long":
a=run(f'{MINIMAP2} ' + params + ' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+' > '+fn3 + ' 2> ' + output +'logs/mm2log' + str(rank) + '.txt',capture_output=True, shell=True)
a=run(f'{MINIMAP2} ' + params + ' -t '+cpus+' '+refdir+ifile+' '+
fn1+' '+' > '+fn3 + ' 2> ' + output +'logs/mm2log' +
str(rank) + '.txt',capture_output=True, shell=True)
else:
##print(f'{BWA} mem ' + params +' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+' > '+fn3 + ' 2> ' + output +'logs/bwalog' + str(rank) + '.txt')
a=run(f'{BWA} mem ' + params +' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+' > '+fn3 + ' 2> ' + output +'logs/bwalog' + str(rank) + '.txt',capture_output=True, shell=True)
a=run(f'{BWA} mem ' + params +' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+
' > '+fn3 + ' 2> ' + output +'logs/bwalog' + str(rank) +
'.txt',capture_output=True, shell=True)
else:
a=run(f'{BWA} mem ' + params +' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+fn2+' > '+fn3 + ' 2> ' + output +'logs/bwalog' + str(rank) + '.txt',capture_output=True, shell=True)
a=run(f'{BWA} mem ' + params +' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+
fn2+' > '+fn3 + ' 2> ' + output +'logs/bwalog' + str(rank) +
'.txt',capture_output=True, shell=True)

assert a.returncode == 0
end1b=time.time()
thr.join()
comm.barrier()
end1=time.time()
#print('end: ', end, flush=True)
#except Exception as e:
# print(f"An error occurred: {e}")
# print("Exiting...")
# print("Please check bwa logs in the output folder for more details.")
# os.sys.exit(1)

if rank==0:
print("\n[Info] FASTQ to SAM time:",end1-begin1)
Expand Down Expand Up @@ -1029,7 +906,6 @@ def faidx(refdir, ifile):
outfile = "final_fq2bam"

cmd+=f'{SAMTOOLS} cat -o ' + os.path.join(output, outfile) + '.sorted.bam ' + infstr
#print("merge cmd: ", cmd, flush=True)
a=run(cmd,capture_output=True,shell=True)
assert a.returncode == 0
print("[Info] Concat done, time taken: ", time.time() - tic)
Expand Down

0 comments on commit fc10e75

Please sign in to comment.