From d7b7fd602b981dd6816b0e5c7c341e2cef272c17 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Tue, 7 Sep 2021 15:31:34 +0200 Subject: Expose low-level, iterative LZ4 compression/decompression API --- libbutl/lz4.cxx | 468 +++++++++++++++++++++++++++++++++----------------------- libbutl/lz4.hxx | 138 ++++++++++++++++- 2 files changed, 409 insertions(+), 197 deletions(-) diff --git a/libbutl/lz4.cxx b/libbutl/lz4.cxx index 6a91a12..a28ed78 100644 --- a/libbutl/lz4.cxx +++ b/libbutl/lz4.cxx @@ -34,44 +34,6 @@ namespace butl { namespace lz4 { - struct cctx - { - LZ4F_cctx* ctx; - - operator LZ4F_cctx* () const {return ctx;}; - - cctx () - { - if (LZ4F_isError (LZ4F_createCompressionContext (&ctx, LZ4F_VERSION))) - throw bad_alloc (); - } - - ~cctx () - { - LZ4F_errorCode_t e (LZ4F_freeCompressionContext (ctx)); - assert (!LZ4F_isError (e)); - } - }; - - struct dctx - { - LZ4F_dctx* ctx; - - operator LZ4F_dctx* () const {return ctx;}; - - dctx () - { - if (LZ4F_isError (LZ4F_createDecompressionContext (&ctx, LZ4F_VERSION))) - throw bad_alloc (); - } - - ~dctx () - { - LZ4F_errorCode_t e (LZ4F_freeDecompressionContext (ctx)); - assert (!LZ4F_isError (e)); - } - }; - static inline size_t block_size (LZ4F_blockSizeID_t id) { @@ -132,161 +94,316 @@ namespace butl throw_exception (LZ4F_getErrorCode (r)); } - // Return the compressed size. + // compression // - uint64_t - compress (ofdstream& os, ifdstream& is, - int level, - int block_id, - optional content_size) + + compressor:: + ~compressor () + { + if (LZ4F_cctx* ctx = static_cast (ctx_)) + { + LZ4F_errorCode_t e (LZ4F_freeCompressionContext (ctx)); + assert (!LZ4F_isError (e)); + } + } + + inline void compressor:: + init_preferences (void* vp) const + { + LZ4F_preferences_t* p (static_cast (vp)); + + p->autoFlush = 1; + p->favorDecSpeed = 0; + p->compressionLevel = level_; + p->frameInfo.blockMode = LZ4F_blockLinked; + p->frameInfo.blockSizeID = static_cast (block_id_); + p->frameInfo.blockChecksumFlag = LZ4F_noBlockChecksum; + p->frameInfo.contentChecksumFlag = LZ4F_contentChecksumEnabled; + p->frameInfo.contentSize = content_size_ + ? static_cast (*content_size_) + : 0; + } + + void compressor:: + begin (int level, + int block_id, + optional content_size) { assert (block_id >= 4 && block_id <= 7); + level_ = level; + block_id_ = block_id; + content_size_ = content_size; + LZ4F_preferences_t prefs = LZ4F_INIT_PREFERENCES; - prefs.autoFlush = 1; - prefs.favorDecSpeed = 0; - prefs.compressionLevel = level; - prefs.frameInfo.blockMode = LZ4F_blockLinked; - prefs.frameInfo.blockSizeID = static_cast (block_id); - prefs.frameInfo.blockChecksumFlag = LZ4F_noBlockChecksum; - prefs.frameInfo.contentChecksumFlag = LZ4F_contentChecksumEnabled; - prefs.frameInfo.contentSize = - content_size ? static_cast (*content_size) : 0; + init_preferences (&prefs); // Input/output buffer capacities. // - size_t ic (block_size (prefs.frameInfo.blockSizeID)); - size_t oc; - - // Input/output buffers. + // To be binary compatible with the lz4 utility we have to compress + // files that fit into the block with a single *_compressFrame() call + // instead of *_compressBegin()/*_compressUpdate(). And to determine the + // output buffer capacity we must use *_compressFrameBound() instead of + // *_compressBound(). The problem is, at this stage (before filling the + // input buffer), we don't know which case it will be. // - unique_ptr ibg (new char[ic]); char* ib (ibg.get ()); - unique_ptr obg; char* ob; - - // Read into the input buffer returning the number of bytes read and - // updating the total read and the eof flag. + // However, in our case (autoFlush=1), *Bound() < *FrameBound() and so + // we can always use the latter at the cost of slight overhead. Also, + // using *FrameBound() allows us to call *Begin() and *Update() without + // flushing the buffer in between (this insight is based on studying the + // implementation of the *Bound() functions). // - // Note that we could try to do direct fd read/write but that would - // complicate things quite a bit (error handling, stream state, etc). + // Actually, we can use content_size (we can get away with much smaller + // buffers for small inputs). We just need to verify the caller is not + // lying to us (failed that, we may end up with strange error like + // insufficient output buffer space). // - uint64_t it (0); - bool eof (false); - auto read = [&is, ib, ic, &it, &eof] () -> size_t - { - eof = butl::eof (is.read (ib, ic)); - size_t n (static_cast (is.gcount ())); - it += n; - return n; - }; + ic = block_size (prefs.frameInfo.blockSizeID); - // Write the specified number of bytes from the output buffer updating - // the total written. - // - uint64_t ot (0); - auto write = [&os, &ob, &ot] (size_t n) + if (content_size_ && *content_size_ < ic) { - os.write (ob, static_cast (n)); - ot += n; - }; + // This is nuanced: we need to add an extra byte in order to detect + // EOF. + // + ic = static_cast (*content_size_) + 1; + } + + oc = LZ4F_compressFrameBound (ic, &prefs); + + begin_ = true; + } + + void compressor:: + next (bool end) + { + LZ4F_cctx* ctx; // Unlike the decompression case below, compression cannot fail due to // invalid content. So any LZ4F_*() function failure is either due to a // programming bug or argument inconsistencies (e.g., content size does // not match actual). - // To be binary compatible with the lz4 utility we have to compress - // files that fit into the block with a single LZ4F_compressFrame() - // call. - // - size_t in (read ()); - size_t on; - - if (eof && in < ic) // Should be really <= but that's not lz4-compatible. + if (begin_) { - oc = LZ4F_compressFrameBound (in, &prefs); - obg.reset ((ob = new char[oc])); - - on = LZ4F_compressFrame (ob, oc, ib, in, &prefs); - if (LZ4F_isError (on)) - throw_exception (on); + begin_ = false; - write (on); + LZ4F_preferences_t prefs = LZ4F_INIT_PREFERENCES; + init_preferences (&prefs); - // Verify specified and actual content sizes match. + // If we've allocated smaller buffers based on content_size_, then + // verify the input size matches what's promised. // - // LZ4F_compressFrame() does not fail if it doesn't match instead - // replacing it with the actual value. + // Note also that LZ4F_compressFrame() does not fail if it doesn't + // match instead replacing it with the actual value. // - if (content_size && *content_size != it) - throw_exception (LZ4F_ERROR_frameSize_wrong); - } - else - { - cctx ctx; - - oc = LZ4F_compressBound (ic, &prefs); - obg.reset ((ob = new char[oc])); - - // Write the header. - // - on = LZ4F_compressBegin (ctx, ob, oc, &prefs); - if (LZ4F_isError (on)) - throw_exception (on); - - write (on); + size_t bs (block_size (prefs.frameInfo.blockSizeID)); + if (content_size_ && *content_size_ < bs) + { + if (!end || in != *content_size_) + throw_exception (LZ4F_ERROR_frameSize_wrong); + } - // Keep compressing, writing, and reading chunks of content. + // Must be < for lz4 compatibility (see EOF nuance above for the + // likely reason). // - for (;;) + if (end && in < bs) { - on = LZ4F_compressUpdate (ctx, ob, oc, ib, in, nullptr); + on = LZ4F_compressFrame (ob, oc, ib, in, &prefs); if (LZ4F_isError (on)) throw_exception (on); - if (on != 0) // LZ4F_compressUpdate() may just buffer the data. - write (on); + return; + } + else + { + if (LZ4F_isError ( + LZ4F_createCompressionContext (&ctx, LZ4F_VERSION))) + throw bad_alloc (); + + ctx_ = ctx; - if (eof) - break; + // Write the header. + // + on = LZ4F_compressBegin (ctx, ob, oc, &prefs); + if (LZ4F_isError (on)) + throw_exception (on); - in = read (); + // Fall through. } + } + else + { + ctx = static_cast (ctx_); + on = 0; + } - // Write the end marker. - // + size_t n; + + n = LZ4F_compressUpdate (ctx, ob + on, oc - on, ib, in, nullptr); + if (LZ4F_isError (n)) + throw_exception (n); + + on += n; + + // Write the end marker. + // + if (end) + { // Note that this call also verifies specified and actual content // sizes match. // - on = LZ4F_compressEnd (ctx, ob, oc, nullptr); - if (LZ4F_isError (on)) - throw_exception (on); + n = LZ4F_compressEnd (ctx, ob + on, oc - on, nullptr); + if (LZ4F_isError (n)) + throw_exception (n); + + on += n; + } + } + + uint64_t + compress (ofdstream& os, ifdstream& is, + int level, + int block_id, + optional content_size) + { + compressor c; + + // Input/output buffer guards. + // + unique_ptr ibg; + unique_ptr obg; + + // First determine required buffer capacities. + // + c.begin (level, block_id, content_size); + + ibg.reset ((c.ib = new char[c.ic])); + obg.reset ((c.ob = new char[c.oc])); + + // Read into the input buffer updating the eof flag. + // + // Note that we could try to do direct fd read/write but that would + // complicate things quite a bit (error handling, stream state, etc). + // + bool eof (false); + auto read = [&is, &c, &eof] () + { + eof = butl::eof (is.read (c.ib, c.ic)); + c.in = static_cast (is.gcount ()); + }; + + // Write from the output buffer updating the total written. + // + uint64_t ot (0); + auto write = [&os, &c, &ot] () + { + os.write (c.ob, static_cast (c.on)); + ot += c.on; + }; + + // Keep reading, compressing, and writing chunks of content. + // + while (!eof) + { + read (); - write (on); + c.next (eof); + + if (c.on != 0) // next() may just buffer the data. + write (); } return ot; } - uint64_t - decompress (ofdstream& os, ifdstream& is) + // decompression + // + + static_assert (sizeof (decompressor::hb) == LZ4F_HEADER_SIZE_MAX, + "LZ4 header size mismatch"); + + decompressor:: + decompressor () + : hn (0), in (0), on (0) + { + LZ4F_dctx* ctx; + + if (LZ4F_isError (LZ4F_createDecompressionContext (&ctx, LZ4F_VERSION))) + throw bad_alloc (); + + ctx_ = ctx; + } + + decompressor:: + ~decompressor () + { + LZ4F_errorCode_t e ( + LZ4F_freeDecompressionContext (static_cast (ctx_))); + assert (!LZ4F_isError (e)); + } + + size_t decompressor:: + begin () + { + LZ4F_dctx* ctx (static_cast (ctx_)); + + LZ4F_frameInfo_t info = LZ4F_INIT_FRAMEINFO; + + // Input hint and end as signalled by the LZ4F_*() functions. + // + size_t h, e; + + h = LZ4F_getFrameInfo (ctx, &info, hb, &(e = hn)); + if (LZ4F_isError (h)) + throw_exception (h); + + // Use the block size for the output buffer capacity and compressed + // bound plus the header size for the input. The expectation is that + // LZ4F_decompress() should never hint for more than that. + // + oc = block_size (info.blockSizeID); + ic = LZ4F_compressBound (oc, nullptr) + LZ4F_BLOCK_HEADER_SIZE; + + assert (h <= ic); + + // Move over whatever is left in the header buffer to be beginning. + // + hn -= e; + memmove (hb, hb + e, hn); + + return h; + } + + size_t decompressor:: + next () { - // The LZ4F_*() decompression functions return a hint of how much data - // they want on the next call. So the plan is to allocate the input - // buffer large enough to hold anything that can be asked for and then - // fill it in in the asked chunks. This way we avoid having to shift the - // unread data, etc. + LZ4F_dctx* ctx (static_cast (ctx_)); + + size_t h, e; + + // Note that LZ4F_decompress() verifies specified and actual content + // sizes match (similar to compression). // - dctx ctx; + h = LZ4F_decompress (ctx, ob, &(on = oc), ib, &(e = in), nullptr); + if (LZ4F_isError (h)) + throw_exception (h); - // Input/output buffer capacities and sizes. + // We expect LZ4F_decompress() to consume what it asked for. // - size_t ic, oc; - size_t in, on; + assert (e == in && h <= ic); + + return h; + } - // Input/output buffers. + uint64_t + decompress (ofdstream& os, ifdstream& is) + { + decompressor d; + + // Input/output buffer guards. // - unique_ptr ibg; char* ib; - unique_ptr obg; char* ob; + unique_ptr ibg; + unique_ptr obg; // Read into the specified buffer returning the number of bytes read and // updating the eof flag. @@ -309,73 +426,46 @@ namespace butl // the total written. // uint64_t ot (0); - auto write = [&os, &ob, &ot] (size_t n) + auto write = [&os, &ot] (char* b, size_t n) { - os.write (ob, static_cast (n)); + os.write (b, static_cast (n)); ot += n; }; - // Input hint and end as signalled by the LZ4F_*() functions. - // - size_t ih, ie; + size_t h; // Input hint. - // Read the header. + // First read in the header and allocate the buffers. // - LZ4F_frameInfo_t info = LZ4F_INIT_FRAMEINFO; - { - char hb[LZ4F_HEADER_SIZE_MAX]; - in = read (hb, sizeof (hb)); + d.hn = read (d.hb, sizeof (d.hb)); + h = d.begin (); - ih = LZ4F_getFrameInfo (ctx, &info, hb, &(ie = in)); - if (LZ4F_isError (ih)) - throw_exception (ih); + ibg.reset ((d.ib = new char[d.ic])); + obg.reset ((d.ob = new char[d.oc])); - // Use the block size for the output buffer capacity and compressed - // bound plus the header size for the input. The expectation is that - // LZ4F_decompress() should never hint for more than that. - // - oc = block_size (info.blockSizeID); - ic = LZ4F_compressBound (oc, nullptr) + LZ4F_BLOCK_HEADER_SIZE; - - assert (ih <= ic); - - ibg.reset ((ib = new char[ic])); - obg.reset ((ob = new char[oc])); + // Copy over whatever is left in the header buffer and read up to + // the hinted size. + // + memcpy (d.ib, d.hb, (d.in = d.hn)); - // Copy over whatever is left in the header buffer and read up to - // the hinted size. - // - in -= ie; - memcpy (ib, hb + ie, in); - in += read (ib + in, ih - in); - } + if (h > d.in) + d.in += read (d.ib + d.in, h - d.in); // Keep decompressing, writing, and reading chunks of compressed // content. // - // Note that LZ4F_decompress() verifies specified and actual content - // sizes match (similar to compression). - // for (;;) { - ih = LZ4F_decompress (ctx, ob, &(on = oc), ib, &(ie = in), nullptr); - if (LZ4F_isError (ih)) - throw_exception (ih); - - // We expect LZ4F_decompress() to consume what it asked for. - // - assert (ie == in); + h = d.next (); - write (on); + write (d.ob, d.on); - if (ih == 0) + if (h == 0) break; if (eof) throw invalid_argument ("incomplete compressed content"); - assert (ih <= ic); - in = read (ib, ih); + d.in = read (d.ib, h); } return ot; diff --git a/libbutl/lz4.hxx b/libbutl/lz4.hxx index bf679c5..98175c1 100644 --- a/libbutl/lz4.hxx +++ b/libbutl/lz4.hxx @@ -4,6 +4,7 @@ #pragma once #include +#include #include #include @@ -14,16 +15,14 @@ namespace butl { namespace lz4 { - //@@ TODO: allow (re-)using external buffers, contexts? - // Read the content from the input stream, compress it using the specified // compression level and block size, and write the compressed content to // the output stream. If content size is specified, then include it into // the compressed content header. Return the compressed content size. // // This function may throw std::bad_alloc as well as exceptions thrown by - // fdstream read/write functions. It may also throw invalid_argument in - // case of argument inconsistencies (e.g., content size does not match + // fdstream read/write functions. It may also throw std::invalid_argument + // in case of argument inconsistencies (e.g., content size does not match // actual) with what() returning the error description. The input stream // is expected to throw on badbit (but not failbit). The output stream is // expected to throw on badbit or failbit. @@ -52,14 +51,75 @@ namespace butl int block_size_id, optional content_size); + // Low-level iterative compression API. + // + // This API may throw std::bad_alloc in case of memory allocation errors + // and std::invalid_argument in case of argument inconsistencies (e.g., + // content size does not match actual) with what() returning the error + // description. + // + // See the implementation of the compress() function above for usage + // example. + // + // @@ TODO: reset support. + // + struct LIBBUTL_SYMEXPORT compressor + { + // Buffer, current size (part filled with data), and capacity. + // + char* ib; std::size_t in, ic; // Input. + char* ob; std::size_t on, oc; // Output. + + // As a first step call begin(). This function sets the required input + // and output buffer capacities (ic, oc). + // + // The caller normally allocates the input and output buffers and fills + // the input buffer. + // + void + begin (int compression_level, + int block_size_id, + optional content_size); + + // Then call next() to compress the next chunk of input passing true on + // reaching EOF. Note that the input buffer should be filled to capacity + // unless end is true and the output buffer must be flushed before each + // subsequent call to next(). + // + void + next (bool end); + + // Not copyable or movable. + // + compressor (const compressor&) = delete; + compressor (compressor&&) = delete; + compressor& operator= (const compressor&) = delete; + compressor& operator= (compressor&&) = delete; + + // Implementation details. + // + compressor (): ctx_ (nullptr) {} + ~compressor (); + + public: + void + init_preferences (void*) const; + + void* ctx_; + int level_; + int block_id_; + optional content_size_; + bool begin_; + }; + // Read the compressed content from the input stream, decompress it, and // write the decompressed content to the output stream. Return the // decompressed content size. // // This function may throw std::bad_alloc as well as exceptions thrown by - // fdstream read/write functions. It may also throw invalid_argument if - // the compressed content is invalid with what() returning the error + // fdstream read/write functions. It may also throw std::invalid_argument + // if the compressed content is invalid with what() returning the error // description. The input stream is expected to throw on badbit (but not // failbit). The output stream is expected to throw on badbit or failbit. // @@ -67,10 +127,72 @@ namespace butl // mode. // // Note that this function does not require the input stream to reach EOF - // at the end of compressed content. So if you have this a requirement, - // you will need to enforce it yourself. + // at the end of compressed content. So if you have this requirement, you + // will need to enforce it yourself. // LIBBUTL_SYMEXPORT std::uint64_t decompress (ofdstream&, ifdstream&); + + // Low-level iterative decompression API. + // + // This API may throw std::bad_alloc in case of memory allocation errors + // and std::invalid_argument if the compressed content is invalid with + // what() returning the error description. + // + // See the implementation of the decompress() function above for usage + // example. + // + // The LZ4F_*() decompression functions return a hint of how much data + // they want on the next call. So the plan is to allocate the input + // buffer large enough to hold anything that can be asked for and then + // fill it in in the asked chunks. This way we avoid having to shift the + // unread data around. + // + // @@ TODO: reset support. + // + struct LIBBUTL_SYMEXPORT decompressor + { + // Buffer, current size (part filled with data), and capacity. + // + char hb[19]; std::size_t hn ; // Header. + char* ib; std::size_t in, ic; // Input. + char* ob; std::size_t on, oc; // Output. + + // As a first step, fill in the header buffer and call begin(). This + // function sets the required input and output buffer capacities (ic, + // oc) and the number of bytes left in the header buffer (hn) and + // returns the number of bytes expected by the following call to next(). + // + // The caller normally allocates the input and output buffers, copies + // remaining header buffer data over to the input buffer, and then fills + // in the remainder of the input buffer up to what's expected by the + // call to next(). + // + std::size_t + begin (); + + // Then call next() to decompress the next chunk of input. This function + // returns the number of bytes expected by the following call to next() + // or 0 if no further input is expected. Note that the output buffer + // must be flushed before each subsequent call to next(). + // + std::size_t + next (); + + // Not copyable or movable. + // + decompressor (const decompressor&) = delete; + decompressor (decompressor&&) = delete; + decompressor& operator= (const decompressor&) = delete; + decompressor& operator= (decompressor&&) = delete; + + // Implementation details. + // + decompressor (); + ~decompressor (); + + public: + void* ctx_; + }; } } -- cgit v1.1