diff --git a/pipelines/fq2sortedbam/README.md b/pipelines/fq2sortedbam/README.md index 1b2ad84..2ff0847 100644 --- a/pipelines/fq2sortedbam/README.md +++ b/pipelines/fq2sortedbam/README.md @@ -35,7 +35,7 @@ docker run -v :/input :/out :/refdir :/tempdi ### Installation: ``` git clone --recursive https://github.com/IntelLabs/Open-Omics-Acceleration-Framework.git -cd Open-Omics-Acceleration-Framework/blob/main/pipelines/fq2sortedbam/ +cd Open-Omics-Acceleration-Framework/pipelines/fq2sortedbam/ bash install.sh ## onprem mode: Manually install the depenendies present in basic_setup_ubuntu.sh as it needs sudo access ``` diff --git a/pipelines/fq2sortedbam/dist_bwa.py b/pipelines/fq2sortedbam/dist_bwa.py index 7a0aa8c..3817441 100644 --- a/pipelines/fq2sortedbam/dist_bwa.py +++ b/pipelines/fq2sortedbam/dist_bwa.py @@ -185,7 +185,7 @@ def pragzip_reader_real( comm, cpus, fname, outpipe, outdir, last=False ): if d[0] == 64: # @ symbol, begin new record break startpos = f.tell() - #print (rank, startpos, d[0:32]) + st_vec = comm.allgather(startpos) # get all ranks start positions if rank==nranks-1: endpos = total_len @@ -270,35 +270,16 @@ def sort_thr5(fname, comm): sz = 0 while ndone < nranks: bf_sz = 100000000 - #bf_sz2 = 100 #100000 - #req1 = comm.irecv(bf_sz, tag=0) - #stat1 = MPI.Status() - #vl = req1.wait(stat1) - #print(f'[{rank}] recv size vl: ', stat1.Get_count(MPI.CHAR)) - ##print(f'[{rank}] Error vl: ', MPI.Get_error_string(stat1.Get_error())) - #req = comm.irecv(bf_sz2, tag=1) - #stat = MPI.Status() - #print(f'[{rank}] status: ', stat) - #print(f'[{rank}] req', req) - #b = req.wait(stat) - #print(f'[{rank}] #size: ', stat.Get_count(MPI.CHAR)) - #req = comm.irecv() - #b, vl = req.wait() req = comm.irecv(bf_sz) b, vl = req.wait() - #recv_rank = b - #print(f" [{rank}] after wait ", ndone, "/", nranks, ' recvd from: ', b) b = b//nranks if len(vl)>0 and vl[0]=="done": ndone+=1 - #print(f"[{rank}] recvd done from rank ", recv_rank, sz) - #print ("sort_thr "+str(rank)+", got done:"+str(ndone)) else: fl[b].writelines(vl) - #pass + [f.close() for f in fl] t1 = time.time() - #print(f"[{rank}] Done sort_thr "+str(rank)+" time: "+str(t1-t0)) # used for mode 2 and above @@ -319,31 +300,9 @@ def send_wrap(v,r,force=False, b=None): if b is None: b=r send_bufs[b].append(v) if force or len(send_bufs[b])>=20: - #print("send",rank,r) - #comm.send( (b, send_bufs[b]),r) - #assert len(send_bufs[b]) < 10000000 req = comm.isend( (b, send_bufs[b]),r) req.wait() - #req.test() - #req1 = comm.isend(send_bufs[b],r, tag=0) - ##req1 = comm.isend(["wasim"],r) - #stat1 = MPI.Status() - #req1.wait(stat1) - #print(f"[{rank}] wait", stat1.Get_count(MPI.CHAR), len(send_bufs[b])) - #assert stat1.Get_count(MPI.CHAR) < 100000000 - # - #req = comm.isend(b,r, tag=1) - #stat = MPI.Status() - #req.wait(stat) - #print(f"[{rank}] free", stat.Get_count(MPI.CHAR)) - - #if not req.Test(): - # sent_msgs.append( (req, send_bufs[b]) ) # if send is not yet complete, save this request and data - # print("saved request",rank,r) - #req.wait() send_bufs[b].clear() - #send_bufs[b] = [] - #if force: print ("send forced on rank "+str(rank)+" to "+str(r)) return req #sent_msgs = [ rqt for rqt in sent_msgs if not rqt[0].Test() ] def read_wrap(f): @@ -362,7 +321,6 @@ def read_wrap(f): headerlen += len(l) l = l.split() if l[0] == '@SQ': # @SQ lines describe sequences in reference genome - #sn = l[1].split(':')[1] a = len(l[1].split(':')[0]) sn = l[1][a+1:] ln = int(l[2].split(':')[1]) @@ -378,7 +336,6 @@ def read_wrap(f): cumlen += ln else : seq_start[sn] = -1 - #l = f1.readline() l = read_wrap(f1) # done reading headers if keep: @@ -386,13 +343,9 @@ def read_wrap(f): else: seq_start['*'] = -1 - #print(f"[{rank}] done part 1 ") - #print(seq_start) calculate_bins(nranks) binstarts = [ b[0] for b in bins ] - #if rank==0: print("bins", bins) headers_done.release() # allow output thread to begin - #print (binstarts) # read remaining lines (aligned short reads), send to appropriate bin i = 0 @@ -400,26 +353,17 @@ def read_wrap(f): x = l.split() seq = x[2] offset = int(x[3]) - #print (i, seq, offset) if keep: key = seq_start[seq] + offset - #b = bisect.bisect(binstarts, key) - 1 bn = bisect_wrap(binstarts, key) - 1 - #if bn==0 and not (seq=='chr1' or seq=='chr2'): - # print("BAD BIN", seq, seq_start[seq], offset, key, bn) _, r, b = bins[bn] - #print (seq, offset, key, bn, r, b) - #comm.send( (key, b, l), r ) send_wrap( l, r, b=bn ) else : if seq_start[seq]!= -1: key = seq_start[seq] + offset - #b = bisect.bisect(binstarts, key) - 1 temp2=time.time() bn = bisect_wrap(binstarts, key) - 1 _, r, b = bins[bn] - #print (seq, offset, key, bn, r, b) - #comm.send( (key, b, l), r ) send_wrap( l, r, b=bn ) l = read_wrap(f1) @@ -434,17 +378,6 @@ def read_wrap(f): #print(f"[{rank}] sending done to rank: ", r) send_wrap("done", rank, force=True) # send to self last to avoid race condition with allreduce t1 = time.time() - #cnt=0 - #while cnt<4: - # for rq in rt: - # flag, status = rq.test() - # if flag: - # cnt+=1 - # print(status, cnt, flush=True) - #print(f"{rank} check", flush=True) - #total = comm2.allreduce(i) ## put it after confirming that all the send and recv done - #total = comm.allreduce(i) - #print("sw_thr "+str(rank)+" time: "+str(t1-t0)+" "+str(i)+" "+str(total)) # used for mode 2 and above def sam_writer( comm, fname ): @@ -495,35 +428,24 @@ def main(argv): parser.add_argument('--params', default='', help="parameter string to bwa-mem2 barring threads paramter") parser.add_argument("-i", "--index", help="name of index file") parser.add_argument("-p", "--outfile", help="prefix for read files") - #parser.add_argument("-r", "--preads", nargs='+',help="name of reads file seperated by space") parser.add_argument("-c", "--cpus",default=1,help="Number of cpus. default=1") parser.add_argument("-t", "--threads",default=1,help="Number of threads used in samtool operations. default=1") parser.add_argument("-b", "--bam_size",default=9, help="bam file size in GB") parser.add_argument('-in', '--rindex', default='False', help="It will build reference genome index for bwa aligner. If it is already done offline then don't use this flag.") parser.add_argument('-dindex', default='False', help="It will create .fai index. If it is done offline then disable this.") - #parser.add_argument('--container_tool',default="docker",help="Container tool used in pipeline : Docker/Podman") - #parser.add_argument('--shards',default=1,help="Number of shards for deepvariant") parser.add_argument('-pr', '--profile',action='store_true',help="Use profiling") parser.add_argument('--keep_unmapped',action='store_true',help="Keep Unmapped entries at the end of sam file.") - #parser.add_argument('--se_mode',action='store_true',help="Single End (SE) reads only. Paired End (PE) mode is default") parser.add_argument('--read_type',default="short", help="(short/long): bwa-mem2 with short reads, mm2-fast with long reads") parser.add_argument('-y', "--config", default="", help="config yaml file for args") ## Arg values in the provided yaml file override any command-line or default args values args = vars(parser.parse_args()) - #print('before args: ', args) if args["config"] != "": args = populate_yaml(args) - #print('after args: ', args) - global chromo_dict - #ncpus = int(cpus) - #start0 = time.time() - - #se_mode=args["se_mode"] - read_type=args["read_type"] - + global chromo_dict + read_type=args["read_type"] ifile=args["index"] params=args["params"] @@ -537,7 +459,6 @@ def main(argv): se_mode = True if read_type == "long": - #assert se_mode == True, "for long reads se_mode should be enabled by the code" se_mode = True ## for long reads, se_mode=True always whitelist=args["whitelist"] @@ -572,14 +493,10 @@ def main(argv): rank = comm.Get_rank() nranks = comm.Get_size() - #print(refdir + ifile) - #global bins_per_rank - #bins_per_rank = max(4,bins_per_rank//nranks) global ncpus ncpus = int(cpus) if rank == 0: - #print("\n") if se_mode: print("[Info] Running in SE reads only mode!!!") else: print("[Info] Running in PE reads mode!!!") @@ -597,7 +514,6 @@ def faidx(refdir, ifile): return 1 tic=time.time() - #print(f'{SAMTOOLS} faidx '+os.path.join(refdir,ifile) + ' > samlog.txt') a=run(f'{SAMTOOLS} faidx '+os.path.join(refdir,ifile) + ' > ' + output + 'logs/samlog.txt', capture_output=True, shell=True) assert a.returncode==0,"samtools reference index creation failed" end=time.time() @@ -612,11 +528,6 @@ def faidx(refdir, ifile): flg = faidx(refdir, ifile) allexit(comm, flg) - #comm.barrier() - #flg = comm.bcast(flg, root=0) - #if flg: - # os.sys.exit(1) - #comm.MPI_Finalize() flg = 0 if rank == 0: @@ -626,7 +537,6 @@ def faidx(refdir, ifile): os.mkdir(fo) except: print("Error: Unable to create logs folder inside the ouptut folder") - #os.sys.exit(1) flg = 1 else: print('[Info] output logs folder exits, will override the log files') @@ -645,33 +555,14 @@ def faidx(refdir, ifile): '/logs/bwaindexlog.txt', capture_output=True,shell=True) end=time.time() - #file_size = os.path.getsize(folder+rfile1) print("\n[Info] Bwa Index creation time:",end-begin) - #print("\n[Info] Size of FASTQ file:",file_size) comm.barrier() ################################################################# if mode in ['flatmode', 'sortedbam', 'multifq']: - ## bwa-mem2 index path check, every ranks checks the access. - #g = refdir + ifile - #print(g.split(".")[-1]) - #if g.split(".")[-1] != "gz": - # if rank == 0: - # faidx(refdir, ifile) - #else: - # print("Error: genome file should not be gzip. Exiting...") - # os.sys.exit(1) - #comm.barrier() - - #print("Genome: ", refdir + ifile) assert os.path.exists(refdir + ifile) == True, "missing bwa-mem2 genome" if not read_type == "long": flg = 0 - #assert os.path.exists(refdir + ifile + ".bwt.2bit.64") == True, "missing bwa-mem2 index files" - #assert os.path.exists(refdir + ifile + ".0123") == True, "missing bwa-mem2 index files" - #assert os.path.exists(refdir + ifile + ".amb") == True, "missing bwa-mem2 index files" - #assert os.path.exists(refdir + ifile + ".ann") == True, "missing bwa-mem2 index files" - #assert os.path.exists(refdir + ifile + ".pac") == True, "missing bwa-mem2 index files" if os.path.exists(refdir + ifile + ".bwt.2bit.64") == False: flg=1 if os.path.exists(refdir + ifile + ".0123") == False: flg = 1 if os.path.exists(refdir + ifile + ".amb") == False: flg=1 @@ -686,13 +577,10 @@ def faidx(refdir, ifile): if mode in ['flatmode', 'sortedbam']: - #print("Read1: ", folder + rfile1) assert os.path.exists(folder+rfile1) == True, "missing input read1 files" if not se_mode: - #print("Read2: ", folder + rfile2) assert os.path.exists(folder+rfile2) == True, "missing input read2 files" # output paths - #print("Output dir: ", folder) assert os.path.exists(output) == True, "output path does not exist" if mode in ['sortedbam', 'flatmode']: @@ -711,7 +599,6 @@ def faidx(refdir, ifile): # chromo_dict information added - #print('####### keep: ', keep, flush=True) if keep == False and rank == 0: chromo_file = os.path.join(refdir,ifile)+".fai" if os.path.isfile(chromo_file): @@ -729,9 +616,7 @@ def faidx(refdir, ifile): comm.barrier() - keep = comm.bcast(keep, root=0) - #print('####### keep: ', keep, flush=True) - + keep = comm.bcast(keep, root=0) if mode == 'flatmode': r''' @@ -751,8 +636,6 @@ def faidx(refdir, ifile): - It needs reads files in gzip format ''' - #if rank == 0: - # print("dist bwa-mem2 starts in flatmode") if rank == 0 and read_type == "short": print("[Info] bwa-mem2 starts in flatmode") if rank == 0 and read_type == "long": @@ -765,13 +648,8 @@ def faidx(refdir, ifile): fn2 = pragzip_reader( comm, int(cpus), folder+rfile2, output, last=True ) fn3 = os.path.join(output, outfile + str("%05d"%rank) + ".sam") begin1 = time.time() - #a=run(f'{BINDIR}/applications/bwa-mem2/bwa-mem2 mem -t '+cpus+' '+refdir+ifile+' '+fn1+' '+fn2+' > '+fn3,capture_output=True, shell=True) - #bwastr = '{BINDIR}/applications/bwa-mem2/bwa-mem2 mem -t '+cpus+' '+refdir+ifile+' '+fn1+' '+fn2+' > '+fn3 + ' 2> ' + output +'/bwalog' + str(rank) + '.txt' - #print(bwastr) if se_mode: if read_type == "long": - ## s='minimap2 ' + params + ' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+' > '+fn3 + ' 2> ' + output +'logs/mm2log' + str(rank) + '.txt' - #print('mm2 cmd: ', s, flush=True) a = run(f'{MINIMAP2} ' + params + ' -t '+cpus+' '+refdir+ifile+' '+ fn1+' '+' > '+fn3 + ' 2> ' + output +'logs/mm2log' + str(rank) + '.txt',capture_output=True, shell=True) @@ -964,24 +842,23 @@ def faidx(refdir, ifile): begin1 = time.time() if se_mode: if read_type == "long": - a=run(f'{MINIMAP2} ' + params + ' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+' > '+fn3 + ' 2> ' + output +'logs/mm2log' + str(rank) + '.txt',capture_output=True, shell=True) + a=run(f'{MINIMAP2} ' + params + ' -t '+cpus+' '+refdir+ifile+' '+ + fn1+' '+' > '+fn3 + ' 2> ' + output +'logs/mm2log' + + str(rank) + '.txt',capture_output=True, shell=True) else: - ##print(f'{BWA} mem ' + params +' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+' > '+fn3 + ' 2> ' + output +'logs/bwalog' + str(rank) + '.txt') - a=run(f'{BWA} mem ' + params +' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+' > '+fn3 + ' 2> ' + output +'logs/bwalog' + str(rank) + '.txt',capture_output=True, shell=True) + a=run(f'{BWA} mem ' + params +' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+ + ' > '+fn3 + ' 2> ' + output +'logs/bwalog' + str(rank) + + '.txt',capture_output=True, shell=True) else: - a=run(f'{BWA} mem ' + params +' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+fn2+' > '+fn3 + ' 2> ' + output +'logs/bwalog' + str(rank) + '.txt',capture_output=True, shell=True) + a=run(f'{BWA} mem ' + params +' -t '+cpus+' '+refdir+ifile+' '+fn1+' '+ + fn2+' > '+fn3 + ' 2> ' + output +'logs/bwalog' + str(rank) + + '.txt',capture_output=True, shell=True) assert a.returncode == 0 end1b=time.time() thr.join() comm.barrier() end1=time.time() - #print('end: ', end, flush=True) - #except Exception as e: - # print(f"An error occurred: {e}") - # print("Exiting...") - # print("Please check bwa logs in the output folder for more details.") - # os.sys.exit(1) if rank==0: print("\n[Info] FASTQ to SAM time:",end1-begin1) @@ -1029,7 +906,6 @@ def faidx(refdir, ifile): outfile = "final_fq2bam" cmd+=f'{SAMTOOLS} cat -o ' + os.path.join(output, outfile) + '.sorted.bam ' + infstr - #print("merge cmd: ", cmd, flush=True) a=run(cmd,capture_output=True,shell=True) assert a.returncode == 0 print("[Info] Concat done, time taken: ", time.time() - tic)