From e5beab3eeed6e9836d19d304bdd401457804d04c Mon Sep 17 00:00:00 2001 From: Andrea Cracco Date: Fri, 1 Nov 2024 14:03:59 +0200 Subject: [PATCH] Add GFA support for uncolored graphs, fixes #20 --- Cargo.lock | 1 + crates/api/src/lib.rs | 25 ++- crates/assembler/src/lib.rs | 124 ++++++++------ .../src/pipeline/maximal_unitig_links.rs | 8 +- .../maximal_unitig_index.rs | 24 ++- crates/cmdline/src/main.rs | 14 +- crates/colors/src/managers/multiple.rs | 8 +- crates/colors/src/managers/single.rs | 9 +- crates/colors/src/non_colored.rs | 9 +- crates/io/Cargo.toml | 1 + .../io/src/concurrent/structured_sequences.rs | 43 ++++- .../concurrent/structured_sequences/fasta.rs | 28 ++-- .../concurrent/structured_sequences/gfa.rs | 153 ++++++++++++++++++ .../{fasta => }/stream_finish.rs | 18 +-- 14 files changed, 376 insertions(+), 89 deletions(-) create mode 100644 crates/io/src/concurrent/structured_sequences/gfa.rs rename crates/io/src/concurrent/structured_sequences/{fasta => }/stream_finish.rs (66%) diff --git a/Cargo.lock b/Cargo.lock index 7bd1fa5..453a936 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1203,6 +1203,7 @@ dependencies = [ "bincode", "bstr", "byteorder", + "dynamic-dispatch", "flate2", "ggcat-logging", "ggcat_config", diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index e01147f..87d9926 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -9,6 +9,9 @@ pub use ggcat_logging::MessageLevel; use ggcat_logging::UnrecoverableErrorLogging; use hashes::MinimizerHashFunctionFactory; use hashes::{cn_nthash::CanonicalNtHashIteratorFactory, fw_nthash::ForwardNtHashIteratorFactory}; +use io::concurrent::structured_sequences::fasta::FastaWriterWrapper; +use io::concurrent::structured_sequences::gfa::GFAWriterWrapper; +use io::concurrent::structured_sequences::StructuredSequenceBackendWrapper; use io::sequences_stream::fasta::FastaFileSequencesStream; use io::sequences_stream::GenericSequencesStream; use parallel_processor::enable_counters_logging; @@ -86,6 +89,9 @@ pub struct GGCATConfig { /// The messages callback, if present, no output will be automatically written to stdout pub messages_callback: Option, + + /// Output GFA format instead of FASTA + pub gfa_output: bool, } #[derive(Copy, Clone, Eq, PartialEq, Debug)] @@ -217,6 +223,8 @@ impl GGCATInstance { min_multiplicity: usize, extra_elab: ExtraElaboration, + + gfa_output: bool, ) -> anyhow::Result { let bucketing_hash_dispatch = if forward_only { ::dynamic_dispatch_id() @@ -236,10 +244,25 @@ impl GGCATInstance { NonColoredManager::dynamic_dispatch_id() }; + if gfa_output && colors { + anyhow::bail!("GFA output is not supported with colors"); + } + + let output_mode = if gfa_output { + GFAWriterWrapper::dynamic_dispatch_id() + } else { + FastaWriterWrapper::dynamic_dispatch_id() + }; + let temp_dir = create_tempdir(self.0.temp_dir.clone()); let output_file = assembler::dynamic_dispatch::run_assembler( - (bucketing_hash_dispatch, merging_hash_dispatch, colors_hash), + ( + bucketing_hash_dispatch, + merging_hash_dispatch, + colors_hash, + output_mode, + ), kmer_length, minimizer_length.unwrap_or(::utils::compute_best_m(kmer_length)), debug::DEBUG_ASSEMBLER_FIRST_STEP.lock().clone(), diff --git a/crates/assembler/src/lib.rs b/crates/assembler/src/lib.rs index 8f508f1..38ccc8a 100644 --- a/crates/assembler/src/lib.rs +++ b/crates/assembler/src/lib.rs @@ -15,8 +15,12 @@ use config::{ }; use hashes::{HashFunctionFactory, MinimizerHashFunctionFactory}; use io::concurrent::structured_sequences::binary::StructSeqBinaryWriter; -use io::concurrent::structured_sequences::fasta::FastaWriter; -use io::concurrent::structured_sequences::StructuredSequenceWriter; +use io::concurrent::structured_sequences::fasta::FastaWriterWrapper; +use io::concurrent::structured_sequences::gfa::GFAWriterWrapper; +use io::concurrent::structured_sequences::{ + IdentSequenceWriter, StructuredSequenceBackend, StructuredSequenceBackendInit, + StructuredSequenceBackendWrapper, StructuredSequenceWriter, +}; use io::sequences_stream::general::GeneralSequenceBlockData; use io::{compute_stats_from_input_blocks, generate_bucket_names}; use parallel_processor::buckets::concurrent::BucketsThreadBuffer; @@ -49,6 +53,23 @@ pub enum AssemblerStartingStep { MaximalUnitigsLinks = 6, } +fn get_writer< + C: IdentSequenceWriter, + L: IdentSequenceWriter, + W: StructuredSequenceBackend + StructuredSequenceBackendInit, +>( + output_file: &PathBuf, +) -> W { + match output_file.extension() { + Some(ext) => match ext.to_string_lossy().to_string().as_str() { + "lz4" => W::new_compressed_lz4(&output_file, 2), + "gz" => W::new_compressed_gzip(&output_file, 2), + _ => W::new_plain(&output_file), + }, + None => W::new_plain(&output_file), + } +} + #[dynamic_dispatch(BucketingHash = [ hashes::cn_nthash::CanonicalNtHashIteratorFactory, #[cfg(not(feature = "devel-build"))] hashes::fw_nthash::ForwardNtHashIteratorFactory @@ -70,11 +91,15 @@ pub enum AssemblerStartingStep { ], AssemblerColorsManager = [ #[cfg(not(feature = "devel-build"))] colors::bundles::multifile_building::ColorBundleMultifileBuilding, colors::non_colored::NonColoredManager, +], OutputMode = [ + FastaWriterWrapper, + #[cfg(not(feature = "devel-build"))] GFAWriterWrapper ])] pub fn run_assembler< BucketingHash: MinimizerHashFunctionFactory, MergingHash: HashFunctionFactory, AssemblerColorsManager: ColorsManager, + OutputMode: StructuredSequenceBackendWrapper, >( k: usize, m: usize, @@ -336,14 +361,7 @@ pub fn run_assembler< } let final_unitigs_file = StructuredSequenceWriter::new( - match output_file.extension() { - Some(ext) => match ext.to_string_lossy().to_string().as_str() { - "lz4" => FastaWriter::new_compressed_lz4(&output_file, 2), - "gz" => FastaWriter::new_compressed_gzip(&output_file, 2), - _ => FastaWriter::new_plain(&output_file), - }, - None => FastaWriter::new_plain(&output_file), - }, + get_writer::<_, _, OutputMode::Backend<_, _>>(&output_file), k, ); @@ -365,40 +383,44 @@ pub fn run_assembler< None }; - let (reorganized_reads, _final_unitigs_bucket) = if step - <= AssemblerStartingStep::ReorganizeReads - { - if generate_maximal_unitigs_links || compute_tigs_mode.is_some() { - reorganize_reads::< - BucketingHash, - MergingHash, - AssemblerColorsManager, - StructSeqBinaryWriter<_, _>, - >( - sequences, - reads_map, - temp_dir.as_path(), - compressed_temp_unitigs_file.as_ref().unwrap(), - buckets_count, - ) + let (reorganized_reads, _final_unitigs_bucket) = + if step <= AssemblerStartingStep::ReorganizeReads { + if generate_maximal_unitigs_links || compute_tigs_mode.is_some() { + reorganize_reads::< + BucketingHash, + MergingHash, + AssemblerColorsManager, + StructSeqBinaryWriter<_, _>, + >( + sequences, + reads_map, + temp_dir.as_path(), + compressed_temp_unitigs_file.as_ref().unwrap(), + buckets_count, + ) + } else { + reorganize_reads::< + BucketingHash, + MergingHash, + AssemblerColorsManager, + OutputMode::Backend<_, _>, + >( + sequences, + reads_map, + temp_dir.as_path(), + &final_unitigs_file, + buckets_count, + ) + } } else { - reorganize_reads::>( - sequences, - reads_map, - temp_dir.as_path(), - &final_unitigs_file, - buckets_count, + ( + generate_bucket_names(temp_dir.join("reads_bucket"), buckets_count, Some("tmp")), + (generate_bucket_names(temp_dir.join("reads_bucket_lonely"), 1, Some("tmp")) + .into_iter() + .next() + .unwrap()), ) - } - } else { - ( - generate_bucket_names(temp_dir.join("reads_bucket"), buckets_count, Some("tmp")), - (generate_bucket_names(temp_dir.join("reads_bucket_lonely"), 1, Some("tmp")) - .into_iter() - .next() - .unwrap()), - ) - }; + }; if last_step <= AssemblerStartingStep::ReorganizeReads { PHASES_TIMES_MONITOR @@ -427,7 +449,12 @@ pub fn run_assembler< k, ); } else { - build_unitigs::>( + build_unitigs::< + BucketingHash, + MergingHash, + AssemblerColorsManager, + OutputMode::Backend<_, _>, + >( reorganized_reads, unitigs_map, temp_dir.as_path(), @@ -483,14 +510,7 @@ pub fn run_assembler< final_unitigs_file.finalize(); let final_unitigs_file = StructuredSequenceWriter::new( - match output_file.extension() { - Some(ext) => match ext.to_string_lossy().to_string().as_str() { - "lz4" => FastaWriter::new_compressed_lz4(&output_file, 2), - "gz" => FastaWriter::new_compressed_gzip(&output_file, 2), - _ => FastaWriter::new_plain(&output_file), - }, - None => FastaWriter::new_plain(&output_file), - }, + get_writer::<_, _, OutputMode::Backend<_, _>>(&output_file), k, ); @@ -498,7 +518,7 @@ pub fn run_assembler< BucketingHash, MergingHash, AssemblerColorsManager, - FastaWriter<_, _>, + OutputMode::Backend<_, _>, >(temp_path, temp_dir.as_path(), &final_unitigs_file, k); final_unitigs_file.finalize(); } diff --git a/crates/assembler/src/pipeline/maximal_unitig_links.rs b/crates/assembler/src/pipeline/maximal_unitig_links.rs index 34c1312..f0acab6 100644 --- a/crates/assembler/src/pipeline/maximal_unitig_links.rs +++ b/crates/assembler/src/pipeline/maximal_unitig_links.rs @@ -65,7 +65,6 @@ pub fn build_maximal_unitigs_links< let buckets_count = 1 << DEFAULT_BUCKET_HASHES_SIZE_LOG; - let self_complemental_unitigs = DashSet::new(); // Hash all the extremities @@ -137,7 +136,7 @@ pub fn build_maximal_unitigs_links< let first_hash_unx = first_hash.to_unextendable(); let last_hash_unx = last_hash.to_unextendable(); - let self_complemental = (first_hash_unx == last_hash_unx) + let self_complemental = (first_hash_unx == last_hash_unx) && (first_hash.is_rc_symmetric() || (first_hash.is_forward() != last_hash.is_forward())); if self_complemental { @@ -308,8 +307,9 @@ pub fn build_maximal_unitigs_links< // Rewrite the output file to include found links { - let self_complemental_unitigs = self_complemental_unitigs.into_iter().collect::>(); - + let self_complemental_unitigs = self_complemental_unitigs + .into_iter() + .collect::>(); PHASES_TIMES_MONITOR .write() diff --git a/crates/assembler/src/pipeline/maximal_unitig_links/maximal_unitig_index.rs b/crates/assembler/src/pipeline/maximal_unitig_links/maximal_unitig_index.rs index 80db78c..dd7dbea 100644 --- a/crates/assembler/src/pipeline/maximal_unitig_links/maximal_unitig_index.rs +++ b/crates/assembler/src/pipeline/maximal_unitig_links/maximal_unitig_index.rs @@ -270,8 +270,28 @@ impl IdentSequenceWriter for DoubleMaximalUnitigLinks { } #[allow(unused_variables)] - fn write_as_gfa(&self, stream: &mut impl Write, extra_buffer: &Self::TempBuffer) { - todo!() + fn write_as_gfa( + &self, + k: u64, + index: u64, + stream: &mut impl Write, + extra_buffer: &Self::TempBuffer, + ) { + for entries in &self.links { + let entries = entries.entries.get_slice(extra_buffer); + for entry in entries { + writeln!( + stream, + "L\t{}\t{}\t{}\t{}\t{}M", + index, + if entry.flags.flip_current() { "-" } else { "+" }, + entry.index, + if entry.flags.flip_other() { "-" } else { "+" }, + k - 1 + ) + .unwrap(); + } + } } fn parse_as_ident<'a>(_ident: &[u8], _extra_buffer: &mut Self::TempBuffer) -> Option { diff --git a/crates/cmdline/src/main.rs b/crates/cmdline/src/main.rs index 168f256..67e2f82 100644 --- a/crates/cmdline/src/main.rs +++ b/crates/cmdline/src/main.rs @@ -191,6 +191,10 @@ struct AssemblerArgs { #[structopt(flatten)] pub common_args: CommonArgs, + + /// Output the graph in GFA format + #[structopt(short = "h", long = "gfa")] + pub gfa_output: bool, } #[derive(StructOpt, Debug)] @@ -240,7 +244,7 @@ struct QueryArgs { // #[cfg(feature = "mem-analysis")] // static DEBUG_ALLOCATOR: DebugAllocator = DebugAllocator::new(); -fn initialize(args: &CommonArgs, out_file: &PathBuf) -> &'static GGCATInstance { +fn initialize(args: &CommonArgs, out_file: &PathBuf, gfa_output: bool) -> &'static GGCATInstance { let instance = GGCATInstance::create(GGCATConfig { temp_dir: Some(args.temp_dir.clone()), memory: args.memory, @@ -249,6 +253,7 @@ fn initialize(args: &CommonArgs, out_file: &PathBuf) -> &'static GGCATInstance { intermediate_compression_level: args.intermediate_compression_level, stats_file: Some(out_file.with_extension("stats.log")), messages_callback: None, + gfa_output, }); ggcat_api::debug::DEBUG_KEEP_FILES.store(args.keep_temp_files, Ordering::Relaxed); @@ -428,6 +433,7 @@ fn run_assembler_from_args(instance: &GGCATInstance, args: AssemblerArgs) { } else { ExtraElaboration::None }, + args.gfa_output, ) .unwrap(); @@ -501,7 +507,7 @@ fn main() { &["ix86arch::INSTRUCTION_RETIRED", "ix86arch::LLC_MISSES"], ); - let instance = initialize(&args.common_args, &args.output_file); + let instance = initialize(&args.common_args, &args.output_file, args.gfa_output); run_assembler_from_args(&instance, args); } @@ -526,8 +532,6 @@ fn main() { return; // Skip final memory deallocation } CliArgs::Query(args) => { - initialize(&args.common_args, &args.output_file_prefix); - if !args.colors && args.colored_query_output_format.is_some() { println!("Warning: colored query output format is specified, but the graph is not colored"); } @@ -537,7 +541,7 @@ fn main() { &["ix86arch::INSTRUCTION_RETIRED", "ix86arch::LLC_MISSES"], ); - let instance = initialize(&args.common_args, &args.output_file_prefix); + let instance = initialize(&args.common_args, &args.output_file_prefix, false); let output_file_name = run_querier_from_args(&instance, args); println!("Final output saved to: {}", output_file_name.display()); diff --git a/crates/colors/src/managers/multiple.rs b/crates/colors/src/managers/multiple.rs index 707b2e3..7fd0f7a 100644 --- a/crates/colors/src/managers/multiple.rs +++ b/crates/colors/src/managers/multiple.rs @@ -668,7 +668,13 @@ impl IdentSequenceWriter for UnitigColorData { } #[allow(unused_variables)] - fn write_as_gfa(&self, stream: &mut impl Write, extra_buffer: &Self::TempBuffer) { + fn write_as_gfa( + &self, + _k: u64, + _index: u64, + stream: &mut impl Write, + extra_buffer: &Self::TempBuffer, + ) { if self.slice.len() > 0 { write!(stream, "CS",).unwrap(); } diff --git a/crates/colors/src/managers/single.rs b/crates/colors/src/managers/single.rs index 0e999c8..dab8589 100644 --- a/crates/colors/src/managers/single.rs +++ b/crates/colors/src/managers/single.rs @@ -236,7 +236,14 @@ impl SequenceExtraData for UnitigColorDataSerializer { impl IdentSequenceWriter for UnitigColorDataSerializer { fn write_as_ident(&self, _stream: &mut impl Write, _extra_buffer: &Self::TempBuffer) {} - fn write_as_gfa(&self, _stream: &mut impl Write, _extra_buffer: &Self::TempBuffer) {} + fn write_as_gfa( + &self, + _k: u64, + _index: u64, + _stream: &mut impl Write, + _extra_buffer: &Self::TempBuffer, + ) { + } fn parse_as_ident<'a>(_ident: &[u8], _extra_buffer: &mut Self::TempBuffer) -> Option { todo!() diff --git a/crates/colors/src/non_colored.rs b/crates/colors/src/non_colored.rs index f0a03ed..e882614 100644 --- a/crates/colors/src/non_colored.rs +++ b/crates/colors/src/non_colored.rs @@ -98,7 +98,14 @@ impl IdentSequenceWriter for NonColoredManager { #[inline(always)] fn write_as_ident(&self, _stream: &mut impl Write, _extra_buffer: &Self::TempBuffer) {} #[inline(always)] - fn write_as_gfa(&self, _stream: &mut impl Write, _extra_buffer: &Self::TempBuffer) {} + fn write_as_gfa( + &self, + _k: u64, + _index: u64, + _stream: &mut impl Write, + _extra_buffer: &Self::TempBuffer, + ) { + } #[inline(always)] fn parse_as_ident<'a>(_ident: &[u8], _extra_buffer: &mut Self::TempBuffer) -> Option { diff --git a/crates/io/Cargo.toml b/crates/io/Cargo.toml index 0901119..e4cb1fe 100644 --- a/crates/io/Cargo.toml +++ b/crates/io/Cargo.toml @@ -33,6 +33,7 @@ typenum = "1.17.0" bstr = "1.9.1" ggcat-logging = { version = "0.1.0", path = "../logging" } anyhow = "1.0.89" +dynamic-dispatch = "0.5.4" [dev-dependencies] rand = "0.8.5" diff --git a/crates/io/src/concurrent/structured_sequences.rs b/crates/io/src/concurrent/structured_sequences.rs index 4ceac0f..6783726 100644 --- a/crates/io/src/concurrent/structured_sequences.rs +++ b/crates/io/src/concurrent/structured_sequences.rs @@ -1,16 +1,25 @@ use super::temp_reads::extra_data::SequenceExtraDataConsecutiveCompression; +use dynamic_dispatch::dynamic_dispatch; use parking_lot::{Condvar, Mutex}; use std::io::Write; use std::marker::PhantomData; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; pub mod binary; pub mod concurrent; pub mod fasta; +pub mod gfa; +pub mod stream_finish; pub trait IdentSequenceWriter: SequenceExtraDataConsecutiveCompression + Sized { fn write_as_ident(&self, stream: &mut impl Write, extra_buffer: &Self::TempBuffer); - fn write_as_gfa(&self, stream: &mut impl Write, extra_buffer: &Self::TempBuffer); + fn write_as_gfa( + &self, + k: u64, + index: u64, + stream: &mut impl Write, + extra_buffer: &Self::TempBuffer, + ); fn parse_as_ident<'a>(ident: &[u8], extra_buffer: &mut Self::TempBuffer) -> Option; @@ -20,7 +29,14 @@ pub trait IdentSequenceWriter: SequenceExtraDataConsecutiveCompression + Sized { impl IdentSequenceWriter for () { fn write_as_ident(&self, _stream: &mut impl Write, _extra_buffer: &Self::TempBuffer) {} - fn write_as_gfa(&self, _stream: &mut impl Write, _extra_buffer: &Self::TempBuffer) {} + fn write_as_gfa( + &self, + _k: u64, + _index: u64, + _stream: &mut impl Write, + _extra_buffer: &Self::TempBuffer, + ) { + } fn parse_as_ident<'a>(_ident: &[u8], _extra_buffer: &mut Self::TempBuffer) -> Option { Some(()) @@ -45,6 +61,27 @@ pub type SequenceAbundanceType = SequenceAbundance; #[cfg(not(feature = "support_kmer_counters"))] pub type SequenceAbundanceType = (); +pub trait StructuredSequenceBackendInit: Sync + Send + Sized { + fn new_compressed_gzip(_path: impl AsRef, _level: u32) -> Self { + unimplemented!() + } + + fn new_compressed_lz4(_path: impl AsRef, _level: u32) -> Self { + unimplemented!() + } + + fn new_plain(_path: impl AsRef) -> Self { + unimplemented!() + } +} + +#[dynamic_dispatch] +pub trait StructuredSequenceBackendWrapper: 'static { + type Backend: + StructuredSequenceBackendInit + + StructuredSequenceBackend; +} + pub trait StructuredSequenceBackend: Sync + Send { diff --git a/crates/io/src/concurrent/structured_sequences/fasta.rs b/crates/io/src/concurrent/structured_sequences/fasta.rs index b12392e..d804c7b 100644 --- a/crates/io/src/concurrent/structured_sequences/fasta.rs +++ b/crates/io/src/concurrent/structured_sequences/fasta.rs @@ -1,5 +1,6 @@ use crate::concurrent::structured_sequences::{IdentSequenceWriter, StructuredSequenceBackend}; use config::{DEFAULT_OUTPUT_BUFFER_SIZE, DEFAULT_PER_CPU_BUFFER_SIZE}; +use dynamic_dispatch::dynamic_dispatch; use flate2::write::GzEncoder; use flate2::Compression; use lz4::{BlockMode, BlockSize, ContentChecksum}; @@ -8,12 +9,19 @@ use std::io::{BufWriter, Write}; use std::marker::PhantomData; use std::path::{Path, PathBuf}; -use self::stream_finish::FastaWriterWrapper; +use super::stream_finish::SequencesWriterWrapper; #[cfg(feature = "support_kmer_counters")] use super::SequenceAbundance; +use super::{StructuredSequenceBackendInit, StructuredSequenceBackendWrapper}; -mod stream_finish; +pub struct FastaWriterWrapper; + +#[dynamic_dispatch] +impl StructuredSequenceBackendWrapper for FastaWriterWrapper { + type Backend = + FastaWriter; +} pub struct FastaWriter { writer: Box, @@ -31,17 +39,17 @@ unsafe impl Sync { } -impl - FastaWriter +impl StructuredSequenceBackendInit + for FastaWriter { - pub fn new_compressed_gzip(path: impl AsRef, level: u32) -> Self { + fn new_compressed_gzip(path: impl AsRef, level: u32) -> Self { let compress_stream = GzEncoder::new( BufWriter::with_capacity(DEFAULT_OUTPUT_BUFFER_SIZE, File::create(&path).unwrap()), Compression::new(level), ); FastaWriter { - writer: Box::new(FastaWriterWrapper::new(BufWriter::with_capacity( + writer: Box::new(SequencesWriterWrapper::new(BufWriter::with_capacity( DEFAULT_OUTPUT_BUFFER_SIZE, compress_stream, ))), @@ -50,7 +58,7 @@ impl } } - pub fn new_compressed_lz4(path: impl AsRef, level: u32) -> Self { + fn new_compressed_lz4(path: impl AsRef, level: u32) -> Self { let compress_stream = lz4::EncoderBuilder::new() .level(level) .checksum(ContentChecksum::NoChecksum) @@ -63,7 +71,7 @@ impl .unwrap(); FastaWriter { - writer: Box::new(FastaWriterWrapper::new(BufWriter::with_capacity( + writer: Box::new(SequencesWriterWrapper::new(BufWriter::with_capacity( DEFAULT_OUTPUT_BUFFER_SIZE, compress_stream, ))), @@ -72,9 +80,9 @@ impl } } - pub fn new_plain(path: impl AsRef) -> Self { + fn new_plain(path: impl AsRef) -> Self { FastaWriter { - writer: Box::new(FastaWriterWrapper::new(BufWriter::with_capacity( + writer: Box::new(SequencesWriterWrapper::new(BufWriter::with_capacity( DEFAULT_OUTPUT_BUFFER_SIZE, File::create(&path).unwrap(), ))), diff --git a/crates/io/src/concurrent/structured_sequences/gfa.rs b/crates/io/src/concurrent/structured_sequences/gfa.rs new file mode 100644 index 0000000..a7ceb53 --- /dev/null +++ b/crates/io/src/concurrent/structured_sequences/gfa.rs @@ -0,0 +1,153 @@ +use crate::concurrent::structured_sequences::{IdentSequenceWriter, StructuredSequenceBackend}; +use config::{DEFAULT_OUTPUT_BUFFER_SIZE, DEFAULT_PER_CPU_BUFFER_SIZE}; +use dynamic_dispatch::dynamic_dispatch; +use flate2::write::GzEncoder; +use flate2::Compression; +use lz4::{BlockMode, BlockSize, ContentChecksum}; +use std::fs::File; +use std::io::{BufWriter, Write}; +use std::marker::PhantomData; +use std::path::{Path, PathBuf}; + +use super::stream_finish::SequencesWriterWrapper; +#[cfg(feature = "support_kmer_counters")] +use super::SequenceAbundance; +use super::{StructuredSequenceBackendInit, StructuredSequenceBackendWrapper}; + +pub struct GFAWriterWrapper; + +#[dynamic_dispatch] +impl StructuredSequenceBackendWrapper for GFAWriterWrapper { + type Backend = + GFAWriter; +} +pub struct GFAWriter { + writer: Box, + path: PathBuf, + _phantom: PhantomData<(ColorInfo, LinksInfo)>, +} + +unsafe impl Send + for GFAWriter +{ +} + +unsafe impl Sync + for GFAWriter +{ +} + +impl StructuredSequenceBackendInit + for GFAWriter +{ + fn new_compressed_gzip(path: impl AsRef, level: u32) -> Self { + let compress_stream = GzEncoder::new( + BufWriter::with_capacity(DEFAULT_OUTPUT_BUFFER_SIZE, File::create(&path).unwrap()), + Compression::new(level), + ); + + GFAWriter { + writer: Box::new(SequencesWriterWrapper::new(BufWriter::with_capacity( + DEFAULT_OUTPUT_BUFFER_SIZE, + compress_stream, + ))), + path: path.as_ref().to_path_buf(), + _phantom: PhantomData, + } + } + + fn new_compressed_lz4(path: impl AsRef, level: u32) -> Self { + let compress_stream = lz4::EncoderBuilder::new() + .level(level) + .checksum(ContentChecksum::NoChecksum) + .block_mode(BlockMode::Linked) + .block_size(BlockSize::Max1MB) + .build(BufWriter::with_capacity( + DEFAULT_OUTPUT_BUFFER_SIZE, + File::create(&path).unwrap(), + )) + .unwrap(); + + GFAWriter { + writer: Box::new(SequencesWriterWrapper::new(BufWriter::with_capacity( + DEFAULT_OUTPUT_BUFFER_SIZE, + compress_stream, + ))), + path: path.as_ref().to_path_buf(), + _phantom: PhantomData, + } + } + + fn new_plain(path: impl AsRef) -> Self { + GFAWriter { + writer: Box::new(SequencesWriterWrapper::new(BufWriter::with_capacity( + DEFAULT_OUTPUT_BUFFER_SIZE, + File::create(&path).unwrap(), + ))), + path: path.as_ref().to_path_buf(), + _phantom: PhantomData, + } + } +} + +impl + StructuredSequenceBackend for GFAWriter +{ + type SequenceTempBuffer = Vec; + + fn alloc_temp_buffer() -> Self::SequenceTempBuffer { + Vec::with_capacity(DEFAULT_PER_CPU_BUFFER_SIZE.as_bytes()) + } + + fn write_sequence( + k: usize, + buffer: &mut Self::SequenceTempBuffer, + sequence_index: u64, + sequence: &[u8], + + _color_info: ColorInfo, + links_info: LinksInfo, + extra_buffers: &(ColorInfo::TempBuffer, LinksInfo::TempBuffer), + + #[cfg(feature = "support_kmer_counters")] abundance: SequenceAbundance, + ) { + write!(buffer, "S\t{}\t{}\t", sequence_index, sequence.len()).unwrap(); + buffer.extend_from_slice(sequence); + write!(buffer, "\tLN:i:{}", sequence.len()).unwrap(); + + #[cfg(feature = "support_kmer_counters")] + { + write!(buffer, "\tKC:i:{}", abundance.sum).unwrap(); + write!( + buffer, + "\tkm:f:{:.1}", + abundance.sum as f64 / (sequence.len() - k + 1) as f64 + ) + .unwrap(); + } + + buffer.push(b'\n'); + + // color_info.write_as_ident(buffer, &extra_buffers.0); + links_info.write_as_gfa(k as u64, sequence_index, buffer, &extra_buffers.1); + } + + fn get_path(&self) -> PathBuf { + self.path.clone() + } + + fn flush_temp_buffer(&mut self, buffer: &mut Self::SequenceTempBuffer) { + self.writer.write_all(buffer).unwrap(); + buffer.clear(); + } + + fn finalize(self) {} +} + +impl Drop + for GFAWriter +{ + fn drop(&mut self) { + self.writer.flush().unwrap(); + } +} diff --git a/crates/io/src/concurrent/structured_sequences/fasta/stream_finish.rs b/crates/io/src/concurrent/structured_sequences/stream_finish.rs similarity index 66% rename from crates/io/src/concurrent/structured_sequences/fasta/stream_finish.rs rename to crates/io/src/concurrent/structured_sequences/stream_finish.rs index 885aa13..219f5f1 100644 --- a/crates/io/src/concurrent/structured_sequences/fasta/stream_finish.rs +++ b/crates/io/src/concurrent/structured_sequences/stream_finish.rs @@ -5,38 +5,38 @@ use std::{ io::{BufWriter, Write}, }; -pub(crate) trait FastaFileFinish: Write + Debug { +pub(crate) trait SequencesFileFinish: Write + Debug { fn finalize(self); } -impl FastaFileFinish for BufWriter { +impl SequencesFileFinish for BufWriter { fn finalize(self) { self.into_inner().unwrap().finalize(); } } -impl FastaFileFinish for File { +impl SequencesFileFinish for File { fn finalize(mut self) { self.flush().unwrap(); } } -impl FastaFileFinish for lz4::Encoder { +impl SequencesFileFinish for lz4::Encoder { fn finalize(self) { let (w, err) = self.finish(); err.unwrap(); w.finalize(); } } -impl FastaFileFinish for GzEncoder { +impl SequencesFileFinish for GzEncoder { fn finalize(self) { let w = self.finish().unwrap(); w.finalize(); } } -pub(crate) struct FastaWriterWrapper { +pub(crate) struct SequencesWriterWrapper { writer: Option, } -impl FastaWriterWrapper { +impl SequencesWriterWrapper { pub fn new(writer: W) -> Self { Self { writer: Some(writer), @@ -44,7 +44,7 @@ impl FastaWriterWrapper { } } -impl Write for FastaWriterWrapper { +impl Write for SequencesWriterWrapper { fn write(&mut self, buf: &[u8]) -> std::io::Result { unsafe { self.writer.as_mut().unwrap_unchecked() }.write(buf) } @@ -58,7 +58,7 @@ impl Write for FastaWriterWrapper { } } -impl Drop for FastaWriterWrapper { +impl Drop for SequencesWriterWrapper { fn drop(&mut self) { self.writer.take().unwrap().finalize(); }