diff --git a/v2/app/BlockCompressor.go b/v2/app/BlockCompressor.go index 960a2ec4..2e9f6370 100644 --- a/v2/app/BlockCompressor.go +++ b/v2/app/BlockCompressor.go @@ -798,25 +798,29 @@ func (this *fileCompressTask) call() (int, uint64, uint64) { before := time.Now() for { - if length, err := input.Read(buffer); err != nil { + var length int + + if length, err = input.Read(buffer); err != nil { fmt.Printf("Failed to read block from file '%s': %v\n", inputName, err) return kanzi.ERR_READ_FILE, read, cos.GetWritten() } - if length <= 0 { - break - } + if length > 0 { + read += uint64(length) - read += uint64(length) + if _, err = cos.Write(buffer[0:length]); err != nil { + if ioerr, isIOErr := err.(kio.IOError); isIOErr == true { + fmt.Printf("%s\n", ioerr.Error()) + return ioerr.ErrorCode(), read, cos.GetWritten() + } - if _, err = cos.Write(buffer[0:length]); err != nil { - if ioerr, isIOErr := err.(kio.IOError); isIOErr == true { - fmt.Printf("%s\n", ioerr.Error()) - return ioerr.ErrorCode(), read, cos.GetWritten() + fmt.Printf("An unexpected condition happened. Exiting ...\n%v\n", err.Error()) + return kanzi.ERR_PROCESS_BLOCK, read, cos.GetWritten() } + } - fmt.Printf("An unexpected condition happened. Exiting ...\n%v\n", err.Error()) - return kanzi.ERR_PROCESS_BLOCK, read, cos.GetWritten() + if length < len(buffer) { + break } } diff --git a/v2/app/BlockDecompressor.go b/v2/app/BlockDecompressor.go index ba33bfa4..d8c8a3c2 100644 --- a/v2/app/BlockDecompressor.go +++ b/v2/app/BlockDecompressor.go @@ -540,7 +540,6 @@ func (this *fileDecompressTask) call() (int, uint64) { }() // Decode - read := int64(0) log.Println("\nDecompressing "+inputName+" ...", verbosity > 1) log.Println("", verbosity > 3) var input io.ReadCloser @@ -557,7 +556,7 @@ func (this *fileDecompressTask) call() (int, uint64) { if input, err = os.Open(inputName); err != nil { fmt.Printf("Cannot open input file '%s': %v\n", inputName, err) - return kanzi.ERR_OPEN_FILE, uint64(read) + return kanzi.ERR_OPEN_FILE, 0 } defer func() { @@ -570,11 +569,11 @@ func (this *fileDecompressTask) call() (int, uint64) { if err != nil { if err.(*kio.IOError) != nil { fmt.Printf("%s\n", err.(*kio.IOError).Message()) - return err.(*kio.IOError).ErrorCode(), uint64(read) + return err.(*kio.IOError).ErrorCode(), 0 } fmt.Printf("Cannot create compressed stream: %v\n", err) - return kanzi.ERR_CREATE_DECOMPRESSOR, uint64(read) + return kanzi.ERR_CREATE_DECOMPRESSOR, 0 } for _, bl := range this.listeners { @@ -582,15 +581,17 @@ func (this *fileDecompressTask) call() (int, uint64) { } buffer := make([]byte, _DECOMP_DEFAULT_BUFFER_SIZE) - written := uint64(0) + decoded := int64(0) before := time.Now() // Decode next block for { - if decoded, err := cis.Read(buffer); err != nil { + var decodedBlock int + + if decodedBlock, err = cis.Read(buffer); err != nil { if ioerr, isIOErr := err.(*kio.IOError); isIOErr == true { fmt.Printf("%s\n", ioerr.Message()) - return ioerr.ErrorCode(), uint64(read) + return ioerr.ErrorCode(), uint64(decoded) } if errors.Is(err, io.EOF) == false { @@ -598,23 +599,22 @@ func (this *fileDecompressTask) call() (int, uint64) { // Because Copy is defined to read from src until EOF, it does not // treat EOF from Read an an error to be reported) fmt.Printf("An unexpected condition happened. Exiting ...\n%v\n", err) - return kanzi.ERR_PROCESS_BLOCK, uint64(read) + return kanzi.ERR_PROCESS_BLOCK, uint64(decoded) } } - if decoded > 0 { - w, err = output.Write(buffer[0:decoded]) + if decodedBlock > 0 { + _, err := output.Write(buffer[0:decodedBlock]) if err != nil { fmt.Printf("Failed to write decompressed block to file '%s': %v\n", outputName, err) - return kanzi.ERR_WRITE_FILE, uint64(read) + return kanzi.ERR_WRITE_FILE, uint64(decoded) } - read += int64(decoded) - written += uint64(w) + decoded += int64(decodedBlock) } - if decoded != len(buffer) { + if decodedBlock != len(buffer) { break } } @@ -623,7 +623,7 @@ func (this *fileDecompressTask) call() (int, uint64) { // Deferred close is fallback for error paths if err := cis.Close(); err != nil { fmt.Printf("%v\n", err) - return kanzi.ERR_PROCESS_BLOCK, uint64(read) + return kanzi.ERR_PROCESS_BLOCK, uint64(decoded) } after := time.Now() @@ -631,16 +631,16 @@ func (this *fileDecompressTask) call() (int, uint64) { // If the whole input stream has been decoded and the original data size is present, // check that the output size match the original data size. - to, hasTo := this.ctx["to"] - from, hasFrom := this.ctx["from"] + _, hasTo := this.ctx["to"] + _, hasFrom := this.ctx["from"] if hasTo == false && hasFrom == false { if osz, hasKey := this.ctx["outputSize"]; hasKey == true { - outputSize := osz.(uint64) + outputSize := osz.(int64) - if outputSize != 0 && written != outputSize { - fmt.Printf("Corrupted bitstream: invalid output size (expected %d, got %d)\n", written, outputSize) - return kanzi.ERR_INVALID_FILE, uint64(read) + if outputSize != 0 && decoded != outputSize { + fmt.Printf("Corrupted bitstream: invalid output size (expected %d, got %d)\n", decoded, outputSize) + return kanzi.ERR_INVALID_FILE, uint64(decoded) } } } @@ -659,17 +659,17 @@ func (this *fileDecompressTask) call() (int, uint64) { log.Println(msg, true) msg = fmt.Sprintf("Input size: %d", cis.GetRead()) log.Println(msg, true) - msg = fmt.Sprintf("Output size: %d", read) + msg = fmt.Sprintf("Output size: %d", decoded) log.Println(msg, true) } if verbosity == 1 { - msg = fmt.Sprintf("Decompressing %s: %d => %d in %s", inputName, cis.GetRead(), read, msg) + msg = fmt.Sprintf("Decompressing %s: %d => %d in %s", inputName, cis.GetRead(), decoded, msg) log.Println(msg, true) } if verbosity > 1 && delta > 0 { - msg = fmt.Sprintf("Throughput (KB/s): %d", ((read*int64(1000))>>10)/delta) + msg = fmt.Sprintf("Throughput (KB/s): %d", ((decoded*int64(1000))>>10)/delta) log.Println(msg, true) } @@ -681,5 +681,5 @@ func (this *fileDecompressTask) call() (int, uint64) { notifyBDListeners(this.listeners, evt) } - return 0, uint64(read) + return 0, uint64(decoded) } diff --git a/v2/io/CompressedStream.go b/v2/io/CompressedStream.go index fa5082b3..8adb94b2 100644 --- a/v2/io/CompressedStream.go +++ b/v2/io/CompressedStream.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "runtime" + "strings" "sync" "sync/atomic" "time" @@ -41,7 +42,7 @@ import ( const ( _BITSTREAM_TYPE = 0x4B414E5A // "KANZ" - _BITSTREAM_FORMAT_VERSION = 4 + _BITSTREAM_FORMAT_VERSION = 5 _STREAM_DEFAULT_BUFFER_SIZE = 256 * 1024 _EXTRA_BUFFER_SIZE = 512 _COPY_BLOCK_MASK = 0x80 @@ -51,7 +52,6 @@ const ( _SMALL_BLOCK_SIZE = 15 _MAX_CONCURRENCY = 64 _CANCEL_TASKS_ID = -1 - _UNKNOWN_NB_BLOCKS = 65536 ) // IOError an extended error containing a message and a code value @@ -90,6 +90,7 @@ type Writer struct { buffers []blockBuffer entropyType uint32 transformType uint64 + inputSize int64 obs kanzi.OutputBitStream initialized int32 closed int32 @@ -213,7 +214,7 @@ func createWriterWithCtx(obs kanzi.OutputBitStream, ctx map[string]any) (*Writer this.blockSize = int(bSize) this.available = 0 - nbBlocks := _UNKNOWN_NB_BLOCKS + nbBlocks := 0 // If input size has been provided, calculate the number of blocks // in the input data else use 0. A value of 63 means '63 or more blocks'. @@ -221,17 +222,11 @@ func createWriterWithCtx(obs kanzi.OutputBitStream, ctx map[string]any) (*Writer // better decisions about memory usage and job allocation in concurrent // decompression scenario. if val, hasKey := ctx["fileSize"]; hasKey { - fileSize := val.(int64) - nbBlocks = int((fileSize + int64(bSize-1)) / int64(bSize)) + this.inputSize = val.(int64) + nbBlocks = int((this.inputSize + int64(bSize-1)) / int64(bSize)) } - if nbBlocks >= _MAX_CONCURRENCY { - this.nbInputBlocks = _MAX_CONCURRENCY - 1 - } else if nbBlocks == 0 { - this.nbInputBlocks = 1 - } else { - this.nbInputBlocks = nbBlocks - } + this.nbInputBlocks = max(min(nbBlocks, _MAX_CONCURRENCY-1), 1) if checksum := ctx["checksum"].(bool); checksum == true { var err error @@ -331,20 +326,42 @@ func (this *Writer) writeHeader() *IOError { return &IOError{msg: "Cannot write block size to header", code: kanzi.ERR_WRITE_FILE} } - if this.obs.WriteBits(uint64(this.nbInputBlocks&(_MAX_CONCURRENCY-1)), 6) != 6 { - return &IOError{msg: "Cannot write number of blocks to header", code: kanzi.ERR_WRITE_FILE} + // this.inputSize not provided or >= 2^48 -> 0, <2^16 -> 1, <2^32 -> 2, <2^48 -> 3 + szMask := uint(0) + + if this.inputSize != 0 && this.inputSize < (int64(1)<<48) { + if this.inputSize >= int64(1)<<32 { + szMask = 3 + } else { + szMask = uint(internal.Log2NoCheck(uint32(this.inputSize))>>4) + 1 + } + } + + if this.obs.WriteBits(uint64(szMask), 2) != 2 { + return &IOError{msg: "Cannot write size of input to header", code: kanzi.ERR_WRITE_FILE} + } + + if szMask > 0 { + if this.obs.WriteBits(uint64(this.inputSize), 16*szMask) != 16*szMask { + return &IOError{msg: "Cannot write size of input to header", code: kanzi.ERR_WRITE_FILE} + } } HASH := uint32(0x1E35A7BD) cksum = HASH * _BITSTREAM_FORMAT_VERSION - cksum ^= (HASH * uint32(this.entropyType)) - cksum ^= (HASH * uint32(this.transformType>>32)) - cksum ^= (HASH * uint32(this.transformType)) - cksum ^= (HASH * uint32(this.blockSize)) - cksum ^= (HASH * uint32(this.nbInputBlocks)) + cksum ^= (HASH * uint32(^this.entropyType)) + cksum ^= (HASH * uint32((^this.transformType)>>32)) + cksum ^= (HASH * uint32(^this.transformType)) + cksum ^= (HASH * uint32(^this.blockSize)) + + if szMask > 0 { + cksum ^= (HASH * uint32((^this.inputSize)>>32)) + cksum ^= (HASH * uint32(^this.inputSize)) + } + cksum = (cksum >> 23) ^ (cksum >> 3) - if this.obs.WriteBits(uint64(cksum), 4) != 4 { + if this.obs.WriteBits(uint64(cksum), 16) != 16 { return &IOError{msg: "Cannot write checksum to header", code: kanzi.ERR_WRITE_FILE} } @@ -811,6 +828,7 @@ type Reader struct { buffers []blockBuffer entropyType uint32 transformType uint64 + outputSize int64 ibs kanzi.InputBitStream initialized int32 closed int32 @@ -822,6 +840,7 @@ type Reader struct { nbInputBlocks int listeners []kanzi.Listener ctx map[string]any + parentCtx *map[string]any headless bool } @@ -893,6 +912,7 @@ func createReaderWithCtx(ibs kanzi.InputBitStream, ctx map[string]any) (*Reader, this.blockID = 0 this.consumed = 0 this.available = 0 + this.outputSize = 0 this.bufferThreshold = 0 this.buffers = make([]blockBuffer, 2*this.jobs) @@ -902,6 +922,7 @@ func createReaderWithCtx(ibs kanzi.InputBitStream, ctx map[string]any) (*Reader, this.listeners = make([]kanzi.Listener, 0) this.ctx = ctx + this.parentCtx = &ctx this.blockSize = 0 this.entropyType = entropy.NONE_TYPE this.transformType = transform.NONE_TYPE @@ -979,6 +1000,17 @@ func (this *Reader) validateHeaderless() error { } } + if s, hasKey := this.ctx["outputSize"]; hasKey { + this.outputSize = s.(int64) + + if this.outputSize < 0 || this.outputSize >= (1<<48) { + this.outputSize = 0 // 'not provided' + } + + nbBlocks := int((this.outputSize + int64(this.blockSize-1)) / int64(this.blockSize)) + this.nbInputBlocks = max(min(nbBlocks, _MAX_CONCURRENCY-1), 1) + } + return nil } @@ -1078,20 +1110,54 @@ func (this *Reader) readHeader() error { this.ctx["blockSize"] = uint(this.blockSize) this.bufferThreshold = this.blockSize - // Read number of blocks in input. 0 means 'unknown' and 63 means 63 or more. - this.nbInputBlocks = int(this.ibs.ReadBits(6)) + if bsVersion >= 5 { + // Read original size + // 0 -> not provided, <2^16 -> 1, <2^32 -> 2, <2^48 -> 3 + szMask := uint(this.ibs.ReadBits(2)) - if this.nbInputBlocks == 0 { - this.nbInputBlocks = _UNKNOWN_NB_BLOCKS - } + if szMask != 0 { + this.outputSize = int64(this.ibs.ReadBits(16 * szMask)) - // Read checksum - cksum1 := uint32(this.ibs.ReadBits(4)) + if this.parentCtx != nil { + (*this.parentCtx)["outputSize"] = this.outputSize + } - if bsVersion >= 3 { - // Verify checksum from bitstream version 3 + nbBlocks := int((this.outputSize + int64(this.blockSize-1)) / int64(this.blockSize)) + this.nbInputBlocks = max(min(nbBlocks, _MAX_CONCURRENCY-1), 1) + } + + // Read and verify checksum + cksum1 := uint32(this.ibs.ReadBits(16)) + var cksum2 uint32 HASH := uint32(0x1E35A7BD) + cksum2 = HASH * uint32(bsVersion) + cksum2 ^= (HASH * uint32(^this.entropyType)) + cksum2 ^= (HASH * uint32((^this.transformType)>>32)) + cksum2 ^= (HASH * uint32(^this.transformType)) + cksum2 ^= (HASH * uint32(^this.blockSize)) + + if szMask > 0 { + cksum2 ^= (HASH * uint32((^this.outputSize)>>32)) + cksum2 ^= (HASH * uint32(^this.outputSize)) + } + + cksum2 = (cksum2 >> 23) ^ (cksum2 >> 3) + + if cksum1 != (cksum2 & 0xFFFF) { + return &IOError{msg: "Invalid bitstream: corrupted header", code: kanzi.ERR_INVALID_FILE} + } + } else if bsVersion >= 3 { + // Read number of blocks in input. 0 means 'unknown' and 63 means 63 or more. + this.nbInputBlocks = int(this.ibs.ReadBits(6)) + + if this.nbInputBlocks == 0 { + this.nbInputBlocks = 65536 + } + + // Read and verify checksum + cksum1 := uint32(this.ibs.ReadBits(4)) var cksum2 uint32 + HASH := uint32(0x1E35A7BD) cksum2 = HASH * uint32(bsVersion) cksum2 ^= (HASH * uint32(this.entropyType)) cksum2 ^= (HASH * uint32(this.transformType>>32)) @@ -1103,27 +1169,32 @@ func (this *Reader) readHeader() error { if cksum1 != (cksum2 & 0x0F) { return &IOError{msg: "Invalid bitstream: corrupted header", code: kanzi.ERR_INVALID_FILE} } + } else { + // Header prior to version 3 + this.ibs.ReadBits(6) // nbInputs + this.ibs.ReadBits(4) // reserved } if len(this.listeners) > 0 { - msg := "" - msg += fmt.Sprintf("Checksum set to %v\n", this.hasher != nil) - msg += fmt.Sprintf("Block size set to %d bytes\n", this.blockSize) + var sb strings.Builder + sb.WriteString(fmt.Sprintf("Bitstream version: %d\n", bsVersion)) + sb.WriteString(fmt.Sprintf("Checksum: %v\n", this.hasher != nil)) + sb.WriteString(fmt.Sprintf("Block size: %d bytes\n", this.blockSize)) w1, _ := entropy.GetName(this.entropyType) if w1 == "NONE" { w1 = "no" } - msg += fmt.Sprintf("Using %v entropy codec (stage 1)\n", w1) + sb.WriteString(fmt.Sprintf("Using %v entropy codec (stage 1)\n", w1)) w2, _ := transform.GetName(this.transformType) if w2 == "NONE" { w2 = "no" } - msg += fmt.Sprintf("Using %v transform (stage 2)\n", w2) - evt := kanzi.NewEventFromString(kanzi.EVT_AFTER_HEADER_DECODING, 0, msg, time.Now()) + sb.WriteString(fmt.Sprintf("Using %v transform (stage 2)\n", w2)) + evt := kanzi.NewEventFromString(kanzi.EVT_AFTER_HEADER_DECODING, 0, sb.String(), time.Now()) notifyListeners(this.listeners, evt) }