Skip to content

Commit

Permalink
Merge pull request #56 from genepi/features/write-panel-vcf
Browse files Browse the repository at this point in the history
Features/write panel vcf
  • Loading branch information
seppinho authored Mar 26, 2021
2 parents e0dd991 + c8112bc commit 4b685f1
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 52 deletions.
2 changes: 2 additions & 0 deletions src/main/java/genepi/imputationserver/steps/Imputation.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ protected void readConfigFile() {
}

if (result.needsPhasing) {
job.setPhasingRequired("true");
context.println("Input data is unphased.");

if (phasing.equals("beagle")) {
Expand All @@ -208,6 +209,7 @@ protected void readConfigFile() {

} else {
context.println("Input data is phased.");
job.setPhasingRequired("false");
}

if (mode != null && mode.equals("phasing")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import genepi.imputationserver.steps.vcf.VcfFile;
import genepi.imputationserver.steps.vcf.VcfFileUtil;
import genepi.imputationserver.util.DefaultPreferenceStore;
import genepi.imputationserver.util.ImputationParameters;
import genepi.imputationserver.util.PgsPanel;
import genepi.imputationserver.util.RefPanel;
import genepi.imputationserver.util.RefPanelList;
Expand All @@ -23,11 +24,18 @@ public class InputValidation extends WorkflowStep {

@Override
public boolean run(WorkflowContext context) {
String phasingEngine = context.get("phasing");

ImputationParameters imputationParameters = new ImputationParameters();

imputationParameters.setPhasing(phasingEngine);

context.log("Versions:");
context.log(" Pipeline: " + ImputationPipeline.PIPELINE_VERSION);
context.log(" Imputation-Engine: " + ImputationPipeline.IMPUTATION_VERSION);
context.log(" Phasing-Engine: " + ImputationPipeline.PHASING_VERSION);
if(phasingEngine != null) {
context.log(" Phasing-Engine: " + imputationParameters.getPhasingMethod());
}

if (!checkParameters(context)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class ImputationJob extends HadoopJob {
public static final String R2_FILTER = "R2_FILTER";

public static final String PHASING_ONLY = "PHASING_ONLY";

public static final String PHASING_REQUIRED = "PHASING_REQUIRED";

public static final String PHASING_ENGINE = "PHASING_ENGINE";

Expand Down Expand Up @@ -265,6 +267,10 @@ public void setBuild(String build) {
public void setR2Filter(String r2Filter) {
set(R2_FILTER, r2Filter);
}

public void setPhasingRequired(String phasingRequired) {
set(PHASING_REQUIRED, phasingRequired);
}

public void setPhasingOnly(String phasingOnly) {
set(PHASING_ONLY, phasingOnly);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import genepi.imputationserver.util.DefaultPreferenceStore;
import genepi.imputationserver.util.FileMerger;
import genepi.imputationserver.util.FileMerger.BgzipSplitOutputStream;
import genepi.imputationserver.util.ImputationParameters;
import genepi.io.FileUtil;
import genepi.io.text.LineReader;
import genepi.io.text.LineWriter;
Expand Down Expand Up @@ -46,12 +47,14 @@ public class ImputationMapper extends Mapper<LongWritable, Text, Text, Text> {

private String mapBeagleFilename = "";

private String build = "hg19";
private ImputationParameters imputationParameters = null;

private double minR2 = 0;
private String build = "hg19";

private boolean phasingOnly = false;

private boolean phasingRequired = true;

private String phasingEngine = "";

private String refEagleIndexFilename;
Expand All @@ -76,6 +79,8 @@ protected void setup(Context context) throws IOException, InterruptedException {
build = parameters.get(ImputationJob.BUILD);

String r2FilterString = parameters.get(ImputationJob.R2_FILTER);

double minR2;
if (r2FilterString == null) {
minR2 = 0;
} else {
Expand All @@ -90,6 +95,14 @@ protected void setup(Context context) throws IOException, InterruptedException {
phasingOnly = Boolean.parseBoolean(phasingOnlyString);
}

String phasingRequiredString = parameters.get(ImputationJob.PHASING_REQUIRED);

if (phasingRequiredString == null) {
phasingRequired = true;
} else {
phasingRequired = Boolean.parseBoolean(phasingRequiredString);
}

phasingEngine = parameters.get(ImputationJob.PHASING_ENGINE);

hdfsPath = parameters.get(ImputationJob.REF_PANEL_HDFS);
Expand All @@ -99,6 +112,14 @@ protected void setup(Context context) throws IOException, InterruptedException {
String hdfsRefBeagle = parameters.get(ImputationJob.REF_PANEL_BEAGLE_HDFS);
String hdfsPathMapBeagle = parameters.get(ImputationJob.MAP_BEAGLE_HDFS);

// set object
imputationParameters = new ImputationParameters();
String referenceName = parameters.get(ImputationJob.REF_PANEL);
imputationParameters.setPhasing(phasingEngine);
imputationParameters.setReferencePanelName(referenceName);
imputationParameters.setMinR2(minR2);
imputationParameters.setPhasingRequired(phasingRequired);

// get cached files
CacheStore cache = new CacheStore(context.getConfiguration());
String referencePanel = FileUtil.getFilename(hdfsPath);
Expand Down Expand Up @@ -253,16 +274,18 @@ public void map(LongWritable key, Text value, Context context) throws IOExceptio
BgzipSplitOutputStream outHeader = new BgzipSplitOutputStream(
HdfsUtil.create(HdfsUtil.path(output, chunk + ".header.dose.vcf.gz")));

FileMerger.splitPhasedIntoHeaderAndData(outputChunk.getPhasedVcfFilename(), outHeader, outData, chunk);
FileMerger.splitPhasedIntoHeaderAndData(outputChunk.getPhasedVcfFilename(), outHeader, outData, chunk,
imputationParameters);
long end = System.currentTimeMillis();

statistics.setImportTime((end - start) / 1000);

} else {
if (minR2 > 0) {
if (imputationParameters.getMinR2() > 0) {
// filter by r2
String filteredInfoFilename = outputChunk.getInfoFilename() + "_filtered";
filterInfoFileByR2(outputChunk.getInfoFilename(), filteredInfoFilename, minR2);
filterInfoFileByR2(outputChunk.getInfoFilename(), filteredInfoFilename,
imputationParameters.getMinR2());
HdfsUtil.put(filteredInfoFilename, HdfsUtil.path(output, chunk + ".info"));

} else {
Expand All @@ -278,7 +301,8 @@ public void map(LongWritable key, Text value, Context context) throws IOExceptio
BgzipSplitOutputStream outHeader = new BgzipSplitOutputStream(
HdfsUtil.create(HdfsUtil.path(output, chunk + ".header.dose.vcf.gz")));

FileMerger.splitIntoHeaderAndData(outputChunk.getImputedVcfFilename(), outHeader, outData, minR2);
FileMerger.splitIntoHeaderAndData(outputChunk.getImputedVcfFilename(), outHeader, outData,
imputationParameters);
long end = System.currentTimeMillis();

statistics.setImportTime((end - start) / 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import groovy.text.SimpleTemplateEngine;
import htsjdk.samtools.util.StopWatch;
import lukfor.progress.TaskService;
import lukfor.progress.tasks.monitors.TaskMonitorMock;

public class ImputationPipeline {

Expand All @@ -30,7 +29,7 @@ public class ImputationPipeline {

public static final String BEAGLE_VERSION = "beagle.18May20.d20.jar";

public static String PHASING_VERSION = "eagle-2.4";
public static final String EAGLE_VERSION = "eagle-2.4";

private String minimacCommand;

Expand Down Expand Up @@ -115,32 +114,33 @@ public boolean execute(VcfChunk chunk, VcfChunkOutput output) throws Interrupted
long time = System.currentTimeMillis();

boolean successful = false;

String phasingVersion;
if (phasingEngine.equals("beagle")) {

if (!new File(refBeagleFilename).exists()) {
System.out.println("Beagle: Reference '" + refBeagleFilename + "' not found.");
return false;
}
successful = phaseWithBeagle(chunk, output, refBeagleFilename, mapBeagleFilename);
PHASING_VERSION = BEAGLE_VERSION;
phasingVersion = BEAGLE_VERSION;
} else {

if (!new File(refEagleFilename).exists()) {
System.out.println("Eagle: Reference '" + refEagleFilename + "' not found.");
return false;
}
successful = phaseWithEagle(chunk, output, refEagleFilename, mapEagleFilename);
phasingVersion = EAGLE_VERSION;
}

time = (System.currentTimeMillis() - time) / 1000;

statistic.setPhasingTime(time);

if (successful) {
System.out.println(" " + PHASING_VERSION + " finished successfully. [" + time + " sec]");
System.out.println(" " + phasingVersion + " finished successfully. [" + time + " sec]");
} else {
System.out.println(" " + PHASING_VERSION + " failed. [" + time + " sec]");
System.out.println(" " + phasingVersion + " failed. [" + time + " sec]");
return false;
}

Expand Down
11 changes: 6 additions & 5 deletions src/main/java/genepi/imputationserver/tools/VersionTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ public VersionTool(String[] args) {

@Override
public void createParameters() {

}

@Override
public void init() {

System.out.println("Michigan Imputation Server");

}

@Override
Expand All @@ -27,9 +27,10 @@ public int run() {
System.out.println();
System.out.println("Pipeline: " + ImputationPipeline.PIPELINE_VERSION);
System.out.println("Imputation-Engine: " + ImputationPipeline.IMPUTATION_VERSION);
System.out.println("Phasing-Engine: " + ImputationPipeline.PHASING_VERSION);
System.out.println("Eagle-Engine: " + ImputationPipeline.EAGLE_VERSION);
System.out.println("Beagle-Engine: " + ImputationPipeline.BEAGLE_VERSION);
System.out.println();

return 0;

}
Expand Down
59 changes: 26 additions & 33 deletions src/main/java/genepi/imputationserver/util/FileMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@
import java.util.ArrayList;
import java.util.zip.GZIPOutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import genepi.hadoop.HdfsUtil;
import genepi.imputationserver.steps.imputation.ImputationPipeline;
import genepi.imputationserver.steps.vcf.VcfChunk;
Expand All @@ -24,18 +18,18 @@ public class FileMerger {

public static final String R2_FLAG = "R2";

public static void splitIntoHeaderAndData(String input, OutputStream outHeader, OutputStream outData, double minR2)
throws IOException {
public static void splitIntoHeaderAndData(String input, OutputStream outHeader, OutputStream outData,
ImputationParameters parameters) throws IOException {
LineReader reader = new LineReader(input);
boolean writeVersion = true;

while (reader.next()) {
String line = reader.get();
if (!line.startsWith("#")) {
if (minR2 > 0) {
if (parameters.getMinR2() > 0) {
// rsq set. parse line and check rsq
String info = parseInfo(line);
if (info != null) {
boolean keep = keepVcfLineByInfo(info, R2_FLAG, minR2);
boolean keep = keepVcfLineByInfo(info, R2_FLAG, parameters.getMinR2());
if (keep) {
outData.write(line.getBytes());
outData.write("\n".getBytes());
Expand All @@ -53,15 +47,15 @@ public static void splitIntoHeaderAndData(String input, OutputStream outHeader,
} else {

// write filter command before ID List starting with #CHROM
if (writeVersion && line.startsWith("##INFO")) {
outHeader.write(("##pipeline=" + ImputationPipeline.PIPELINE_VERSION+ "\n").getBytes());
outHeader.write(("##imputation=" + ImputationPipeline.IMPUTATION_VERSION+ "\n").getBytes());
outHeader.write(("##phasing=" + ImputationPipeline.PHASING_VERSION+ "\n").getBytes());
outHeader.write(("##r2Filter=" + minR2 + "\n").getBytes());
writeVersion = false;
if (line.startsWith("#CHROM")) {
outHeader.write(("##pipeline=" + ImputationPipeline.PIPELINE_VERSION + "\n").getBytes());
outHeader.write(("##imputation=" + ImputationPipeline.IMPUTATION_VERSION + "\n").getBytes());
outHeader.write(("##phasing=" + parameters.getPhasingMethod() + "\n").getBytes());
outHeader.write(("##panel=" + parameters.getReferencePanelName() + "\n").getBytes());
outHeader.write(("##r2Filter=" + parameters.getMinR2() + "\n").getBytes());
}

// remove minimac4 command
// write all headers except minimac4 command
if (!line.startsWith("##minimac4_Command") && !line.startsWith("##source")) {
outHeader.write(line.getBytes());
outHeader.write("\n".getBytes());
Expand All @@ -73,31 +67,30 @@ public static void splitIntoHeaderAndData(String input, OutputStream outHeader,
reader.close();
}

public static void splitPhasedIntoHeaderAndData(String input, OutputStream outHeader, OutputStream outData, VcfChunk chunk)
throws IOException {
public static void splitPhasedIntoHeaderAndData(String input, OutputStream outHeader, OutputStream outData,
VcfChunk chunk, ImputationParameters parameters) throws IOException {
LineReader reader = new LineReader(input);
boolean writeVersion = true;


while (reader.next()) {
String line = reader.get();
if (!line.startsWith("#")) {
//phased files also include phasingWindow
int pos = Integer.valueOf(line.split("\t",3)[1]);
// phased files also include phasingWindow
int pos = Integer.valueOf(line.split("\t", 3)[1]);
// no rsq set. keep all lines without parsing
if(pos >= chunk.getStart() && pos <= chunk.getEnd()) {
if (pos >= chunk.getStart() && pos <= chunk.getEnd()) {
outData.write(line.getBytes());
outData.write("\n".getBytes());
}
} else {

// write filter command before ID List starting with #CHROM
if (writeVersion && line.startsWith("##INFO")) {
outHeader.write(("##pipeline=" + ImputationPipeline.PIPELINE_VERSION+ "\n").getBytes());
outHeader.write(("##phasing=" + ImputationPipeline.PHASING_VERSION+ "\n").getBytes());
writeVersion = false;
if (line.startsWith("#CHROM")) {
outHeader.write(("##pipeline=" + ImputationPipeline.PIPELINE_VERSION + "\n").getBytes());
outHeader.write(("##phasing=" + parameters.getPhasingMethod() + "\n").getBytes());
outHeader.write(("##panel=" + parameters.getReferencePanelName() + "\n").getBytes());
}

// remove eagle command (since paths are included)
// write all headers except eagle command
if (!line.startsWith("##eagleCommand=eagle")) {
outHeader.write(line.getBytes());
outHeader.write("\n".getBytes());
Expand All @@ -110,9 +103,9 @@ public static void splitPhasedIntoHeaderAndData(String input, OutputStream outHe
}

public static class BgzipSplitOutputStream extends BlockCompressedOutputStream {

public final static File emptyFile = null;

public BgzipSplitOutputStream(OutputStream os) {
super(os, (File) emptyFile);
}
Expand Down
Loading

0 comments on commit 4b685f1

Please sign in to comment.