aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBoris Kolpackov <boris@codesynthesis.com>2021-09-07 15:31:34 +0200
committerBoris Kolpackov <boris@codesynthesis.com>2021-09-08 06:33:02 +0200
commitd7b7fd602b981dd6816b0e5c7c341e2cef272c17 (patch)
tree39da5c061afc879f629ac14f60f29dbce580e822
parent49fb295bdfcc0e50498a592015d2131a5682d60f (diff)
Expose low-level, iterative LZ4 compression/decompression API
-rw-r--r--libbutl/lz4.cxx468
-rw-r--r--libbutl/lz4.hxx138
2 files changed, 409 insertions, 197 deletions
diff --git a/libbutl/lz4.cxx b/libbutl/lz4.cxx
index 6a91a12..a28ed78 100644
--- a/libbutl/lz4.cxx
+++ b/libbutl/lz4.cxx
@@ -34,44 +34,6 @@ namespace butl
{
namespace lz4
{
- struct cctx
- {
- LZ4F_cctx* ctx;
-
- operator LZ4F_cctx* () const {return ctx;};
-
- cctx ()
- {
- if (LZ4F_isError (LZ4F_createCompressionContext (&ctx, LZ4F_VERSION)))
- throw bad_alloc ();
- }
-
- ~cctx ()
- {
- LZ4F_errorCode_t e (LZ4F_freeCompressionContext (ctx));
- assert (!LZ4F_isError (e));
- }
- };
-
- struct dctx
- {
- LZ4F_dctx* ctx;
-
- operator LZ4F_dctx* () const {return ctx;};
-
- dctx ()
- {
- if (LZ4F_isError (LZ4F_createDecompressionContext (&ctx, LZ4F_VERSION)))
- throw bad_alloc ();
- }
-
- ~dctx ()
- {
- LZ4F_errorCode_t e (LZ4F_freeDecompressionContext (ctx));
- assert (!LZ4F_isError (e));
- }
- };
-
static inline size_t
block_size (LZ4F_blockSizeID_t id)
{
@@ -132,161 +94,316 @@ namespace butl
throw_exception (LZ4F_getErrorCode (r));
}
- // Return the compressed size.
+ // compression
//
- uint64_t
- compress (ofdstream& os, ifdstream& is,
- int level,
- int block_id,
- optional<uint64_t> content_size)
+
+ compressor::
+ ~compressor ()
+ {
+ if (LZ4F_cctx* ctx = static_cast<LZ4F_cctx*> (ctx_))
+ {
+ LZ4F_errorCode_t e (LZ4F_freeCompressionContext (ctx));
+ assert (!LZ4F_isError (e));
+ }
+ }
+
+ inline void compressor::
+ init_preferences (void* vp) const
+ {
+ LZ4F_preferences_t* p (static_cast<LZ4F_preferences_t*> (vp));
+
+ p->autoFlush = 1;
+ p->favorDecSpeed = 0;
+ p->compressionLevel = level_;
+ p->frameInfo.blockMode = LZ4F_blockLinked;
+ p->frameInfo.blockSizeID = static_cast<LZ4F_blockSizeID_t> (block_id_);
+ p->frameInfo.blockChecksumFlag = LZ4F_noBlockChecksum;
+ p->frameInfo.contentChecksumFlag = LZ4F_contentChecksumEnabled;
+ p->frameInfo.contentSize = content_size_
+ ? static_cast<unsigned long long> (*content_size_)
+ : 0;
+ }
+
+ void compressor::
+ begin (int level,
+ int block_id,
+ optional<uint64_t> content_size)
{
assert (block_id >= 4 && block_id <= 7);
+ level_ = level;
+ block_id_ = block_id;
+ content_size_ = content_size;
+
LZ4F_preferences_t prefs = LZ4F_INIT_PREFERENCES;
- prefs.autoFlush = 1;
- prefs.favorDecSpeed = 0;
- prefs.compressionLevel = level;
- prefs.frameInfo.blockMode = LZ4F_blockLinked;
- prefs.frameInfo.blockSizeID = static_cast<LZ4F_blockSizeID_t> (block_id);
- prefs.frameInfo.blockChecksumFlag = LZ4F_noBlockChecksum;
- prefs.frameInfo.contentChecksumFlag = LZ4F_contentChecksumEnabled;
- prefs.frameInfo.contentSize =
- content_size ? static_cast<unsigned long long> (*content_size) : 0;
+ init_preferences (&prefs);
// Input/output buffer capacities.
//
- size_t ic (block_size (prefs.frameInfo.blockSizeID));
- size_t oc;
-
- // Input/output buffers.
+ // To be binary compatible with the lz4 utility we have to compress
+ // files that fit into the block with a single *_compressFrame() call
+ // instead of *_compressBegin()/*_compressUpdate(). And to determine the
+ // output buffer capacity we must use *_compressFrameBound() instead of
+ // *_compressBound(). The problem is, at this stage (before filling the
+ // input buffer), we don't know which case it will be.
//
- unique_ptr<char[]> ibg (new char[ic]); char* ib (ibg.get ());
- unique_ptr<char[]> obg; char* ob;
-
- // Read into the input buffer returning the number of bytes read and
- // updating the total read and the eof flag.
+ // However, in our case (autoFlush=1), *Bound() < *FrameBound() and so
+ // we can always use the latter at the cost of slight overhead. Also,
+ // using *FrameBound() allows us to call *Begin() and *Update() without
+ // flushing the buffer in between (this insight is based on studying the
+ // implementation of the *Bound() functions).
//
- // Note that we could try to do direct fd read/write but that would
- // complicate things quite a bit (error handling, stream state, etc).
+ // Actually, we can use content_size (we can get away with much smaller
+ // buffers for small inputs). We just need to verify the caller is not
+ // lying to us (failed that, we may end up with strange error like
+ // insufficient output buffer space).
//
- uint64_t it (0);
- bool eof (false);
- auto read = [&is, ib, ic, &it, &eof] () -> size_t
- {
- eof = butl::eof (is.read (ib, ic));
- size_t n (static_cast<size_t> (is.gcount ()));
- it += n;
- return n;
- };
+ ic = block_size (prefs.frameInfo.blockSizeID);
- // Write the specified number of bytes from the output buffer updating
- // the total written.
- //
- uint64_t ot (0);
- auto write = [&os, &ob, &ot] (size_t n)
+ if (content_size_ && *content_size_ < ic)
{
- os.write (ob, static_cast<streamsize> (n));
- ot += n;
- };
+ // This is nuanced: we need to add an extra byte in order to detect
+ // EOF.
+ //
+ ic = static_cast<size_t> (*content_size_) + 1;
+ }
+
+ oc = LZ4F_compressFrameBound (ic, &prefs);
+
+ begin_ = true;
+ }
+
+ void compressor::
+ next (bool end)
+ {
+ LZ4F_cctx* ctx;
// Unlike the decompression case below, compression cannot fail due to
// invalid content. So any LZ4F_*() function failure is either due to a
// programming bug or argument inconsistencies (e.g., content size does
// not match actual).
- // To be binary compatible with the lz4 utility we have to compress
- // files that fit into the block with a single LZ4F_compressFrame()
- // call.
- //
- size_t in (read ());
- size_t on;
-
- if (eof && in < ic) // Should be really <= but that's not lz4-compatible.
+ if (begin_)
{
- oc = LZ4F_compressFrameBound (in, &prefs);
- obg.reset ((ob = new char[oc]));
-
- on = LZ4F_compressFrame (ob, oc, ib, in, &prefs);
- if (LZ4F_isError (on))
- throw_exception (on);
+ begin_ = false;
- write (on);
+ LZ4F_preferences_t prefs = LZ4F_INIT_PREFERENCES;
+ init_preferences (&prefs);
- // Verify specified and actual content sizes match.
+ // If we've allocated smaller buffers based on content_size_, then
+ // verify the input size matches what's promised.
//
- // LZ4F_compressFrame() does not fail if it doesn't match instead
- // replacing it with the actual value.
+ // Note also that LZ4F_compressFrame() does not fail if it doesn't
+ // match instead replacing it with the actual value.
//
- if (content_size && *content_size != it)
- throw_exception (LZ4F_ERROR_frameSize_wrong);
- }
- else
- {
- cctx ctx;
-
- oc = LZ4F_compressBound (ic, &prefs);
- obg.reset ((ob = new char[oc]));
-
- // Write the header.
- //
- on = LZ4F_compressBegin (ctx, ob, oc, &prefs);
- if (LZ4F_isError (on))
- throw_exception (on);
-
- write (on);
+ size_t bs (block_size (prefs.frameInfo.blockSizeID));
+ if (content_size_ && *content_size_ < bs)
+ {
+ if (!end || in != *content_size_)
+ throw_exception (LZ4F_ERROR_frameSize_wrong);
+ }
- // Keep compressing, writing, and reading chunks of content.
+ // Must be < for lz4 compatibility (see EOF nuance above for the
+ // likely reason).
//
- for (;;)
+ if (end && in < bs)
{
- on = LZ4F_compressUpdate (ctx, ob, oc, ib, in, nullptr);
+ on = LZ4F_compressFrame (ob, oc, ib, in, &prefs);
if (LZ4F_isError (on))
throw_exception (on);
- if (on != 0) // LZ4F_compressUpdate() may just buffer the data.
- write (on);
+ return;
+ }
+ else
+ {
+ if (LZ4F_isError (
+ LZ4F_createCompressionContext (&ctx, LZ4F_VERSION)))
+ throw bad_alloc ();
+
+ ctx_ = ctx;
- if (eof)
- break;
+ // Write the header.
+ //
+ on = LZ4F_compressBegin (ctx, ob, oc, &prefs);
+ if (LZ4F_isError (on))
+ throw_exception (on);
- in = read ();
+ // Fall through.
}
+ }
+ else
+ {
+ ctx = static_cast<LZ4F_cctx*> (ctx_);
+ on = 0;
+ }
- // Write the end marker.
- //
+ size_t n;
+
+ n = LZ4F_compressUpdate (ctx, ob + on, oc - on, ib, in, nullptr);
+ if (LZ4F_isError (n))
+ throw_exception (n);
+
+ on += n;
+
+ // Write the end marker.
+ //
+ if (end)
+ {
// Note that this call also verifies specified and actual content
// sizes match.
//
- on = LZ4F_compressEnd (ctx, ob, oc, nullptr);
- if (LZ4F_isError (on))
- throw_exception (on);
+ n = LZ4F_compressEnd (ctx, ob + on, oc - on, nullptr);
+ if (LZ4F_isError (n))
+ throw_exception (n);
+
+ on += n;
+ }
+ }
+
+ uint64_t
+ compress (ofdstream& os, ifdstream& is,
+ int level,
+ int block_id,
+ optional<uint64_t> content_size)
+ {
+ compressor c;
+
+ // Input/output buffer guards.
+ //
+ unique_ptr<char[]> ibg;
+ unique_ptr<char[]> obg;
+
+ // First determine required buffer capacities.
+ //
+ c.begin (level, block_id, content_size);
+
+ ibg.reset ((c.ib = new char[c.ic]));
+ obg.reset ((c.ob = new char[c.oc]));
+
+ // Read into the input buffer updating the eof flag.
+ //
+ // Note that we could try to do direct fd read/write but that would
+ // complicate things quite a bit (error handling, stream state, etc).
+ //
+ bool eof (false);
+ auto read = [&is, &c, &eof] ()
+ {
+ eof = butl::eof (is.read (c.ib, c.ic));
+ c.in = static_cast<size_t> (is.gcount ());
+ };
+
+ // Write from the output buffer updating the total written.
+ //
+ uint64_t ot (0);
+ auto write = [&os, &c, &ot] ()
+ {
+ os.write (c.ob, static_cast<streamsize> (c.on));
+ ot += c.on;
+ };
+
+ // Keep reading, compressing, and writing chunks of content.
+ //
+ while (!eof)
+ {
+ read ();
- write (on);
+ c.next (eof);
+
+ if (c.on != 0) // next() may just buffer the data.
+ write ();
}
return ot;
}
- uint64_t
- decompress (ofdstream& os, ifdstream& is)
+ // decompression
+ //
+
+ static_assert (sizeof (decompressor::hb) == LZ4F_HEADER_SIZE_MAX,
+ "LZ4 header size mismatch");
+
+ decompressor::
+ decompressor ()
+ : hn (0), in (0), on (0)
+ {
+ LZ4F_dctx* ctx;
+
+ if (LZ4F_isError (LZ4F_createDecompressionContext (&ctx, LZ4F_VERSION)))
+ throw bad_alloc ();
+
+ ctx_ = ctx;
+ }
+
+ decompressor::
+ ~decompressor ()
+ {
+ LZ4F_errorCode_t e (
+ LZ4F_freeDecompressionContext (static_cast<LZ4F_dctx*> (ctx_)));
+ assert (!LZ4F_isError (e));
+ }
+
+ size_t decompressor::
+ begin ()
+ {
+ LZ4F_dctx* ctx (static_cast<LZ4F_dctx*> (ctx_));
+
+ LZ4F_frameInfo_t info = LZ4F_INIT_FRAMEINFO;
+
+ // Input hint and end as signalled by the LZ4F_*() functions.
+ //
+ size_t h, e;
+
+ h = LZ4F_getFrameInfo (ctx, &info, hb, &(e = hn));
+ if (LZ4F_isError (h))
+ throw_exception (h);
+
+ // Use the block size for the output buffer capacity and compressed
+ // bound plus the header size for the input. The expectation is that
+ // LZ4F_decompress() should never hint for more than that.
+ //
+ oc = block_size (info.blockSizeID);
+ ic = LZ4F_compressBound (oc, nullptr) + LZ4F_BLOCK_HEADER_SIZE;
+
+ assert (h <= ic);
+
+ // Move over whatever is left in the header buffer to be beginning.
+ //
+ hn -= e;
+ memmove (hb, hb + e, hn);
+
+ return h;
+ }
+
+ size_t decompressor::
+ next ()
{
- // The LZ4F_*() decompression functions return a hint of how much data
- // they want on the next call. So the plan is to allocate the input
- // buffer large enough to hold anything that can be asked for and then
- // fill it in in the asked chunks. This way we avoid having to shift the
- // unread data, etc.
+ LZ4F_dctx* ctx (static_cast<LZ4F_dctx*> (ctx_));
+
+ size_t h, e;
+
+ // Note that LZ4F_decompress() verifies specified and actual content
+ // sizes match (similar to compression).
//
- dctx ctx;
+ h = LZ4F_decompress (ctx, ob, &(on = oc), ib, &(e = in), nullptr);
+ if (LZ4F_isError (h))
+ throw_exception (h);
- // Input/output buffer capacities and sizes.
+ // We expect LZ4F_decompress() to consume what it asked for.
//
- size_t ic, oc;
- size_t in, on;
+ assert (e == in && h <= ic);
+
+ return h;
+ }
- // Input/output buffers.
+ uint64_t
+ decompress (ofdstream& os, ifdstream& is)
+ {
+ decompressor d;
+
+ // Input/output buffer guards.
//
- unique_ptr<char[]> ibg; char* ib;
- unique_ptr<char[]> obg; char* ob;
+ unique_ptr<char[]> ibg;
+ unique_ptr<char[]> obg;
// Read into the specified buffer returning the number of bytes read and
// updating the eof flag.
@@ -309,73 +426,46 @@ namespace butl
// the total written.
//
uint64_t ot (0);
- auto write = [&os, &ob, &ot] (size_t n)
+ auto write = [&os, &ot] (char* b, size_t n)
{
- os.write (ob, static_cast<streamsize> (n));
+ os.write (b, static_cast<streamsize> (n));
ot += n;
};
- // Input hint and end as signalled by the LZ4F_*() functions.
- //
- size_t ih, ie;
+ size_t h; // Input hint.
- // Read the header.
+ // First read in the header and allocate the buffers.
//
- LZ4F_frameInfo_t info = LZ4F_INIT_FRAMEINFO;
- {
- char hb[LZ4F_HEADER_SIZE_MAX];
- in = read (hb, sizeof (hb));
+ d.hn = read (d.hb, sizeof (d.hb));
+ h = d.begin ();
- ih = LZ4F_getFrameInfo (ctx, &info, hb, &(ie = in));
- if (LZ4F_isError (ih))
- throw_exception (ih);
+ ibg.reset ((d.ib = new char[d.ic]));
+ obg.reset ((d.ob = new char[d.oc]));
- // Use the block size for the output buffer capacity and compressed
- // bound plus the header size for the input. The expectation is that
- // LZ4F_decompress() should never hint for more than that.
- //
- oc = block_size (info.blockSizeID);
- ic = LZ4F_compressBound (oc, nullptr) + LZ4F_BLOCK_HEADER_SIZE;
-
- assert (ih <= ic);
-
- ibg.reset ((ib = new char[ic]));
- obg.reset ((ob = new char[oc]));
+ // Copy over whatever is left in the header buffer and read up to
+ // the hinted size.
+ //
+ memcpy (d.ib, d.hb, (d.in = d.hn));
- // Copy over whatever is left in the header buffer and read up to
- // the hinted size.
- //
- in -= ie;
- memcpy (ib, hb + ie, in);
- in += read (ib + in, ih - in);
- }
+ if (h > d.in)
+ d.in += read (d.ib + d.in, h - d.in);
// Keep decompressing, writing, and reading chunks of compressed
// content.
//
- // Note that LZ4F_decompress() verifies specified and actual content
- // sizes match (similar to compression).
- //
for (;;)
{
- ih = LZ4F_decompress (ctx, ob, &(on = oc), ib, &(ie = in), nullptr);
- if (LZ4F_isError (ih))
- throw_exception (ih);
-
- // We expect LZ4F_decompress() to consume what it asked for.
- //
- assert (ie == in);
+ h = d.next ();
- write (on);
+ write (d.ob, d.on);
- if (ih == 0)
+ if (h == 0)
break;
if (eof)
throw invalid_argument ("incomplete compressed content");
- assert (ih <= ic);
- in = read (ib, ih);
+ d.in = read (d.ib, h);
}
return ot;
diff --git a/libbutl/lz4.hxx b/libbutl/lz4.hxx
index bf679c5..98175c1 100644
--- a/libbutl/lz4.hxx
+++ b/libbutl/lz4.hxx
@@ -4,6 +4,7 @@
#pragma once
#include <cstdint>
+#include <cstddef>
#include <libbutl/optional.mxx>
#include <libbutl/fdstream.mxx>
@@ -14,16 +15,14 @@ namespace butl
{
namespace lz4
{
- //@@ TODO: allow (re-)using external buffers, contexts?
-
// Read the content from the input stream, compress it using the specified
// compression level and block size, and write the compressed content to
// the output stream. If content size is specified, then include it into
// the compressed content header. Return the compressed content size.
//
// This function may throw std::bad_alloc as well as exceptions thrown by
- // fdstream read/write functions. It may also throw invalid_argument in
- // case of argument inconsistencies (e.g., content size does not match
+ // fdstream read/write functions. It may also throw std::invalid_argument
+ // in case of argument inconsistencies (e.g., content size does not match
// actual) with what() returning the error description. The input stream
// is expected to throw on badbit (but not failbit). The output stream is
// expected to throw on badbit or failbit.
@@ -52,14 +51,75 @@ namespace butl
int block_size_id,
optional<std::uint64_t> content_size);
+ // Low-level iterative compression API.
+ //
+ // This API may throw std::bad_alloc in case of memory allocation errors
+ // and std::invalid_argument in case of argument inconsistencies (e.g.,
+ // content size does not match actual) with what() returning the error
+ // description.
+ //
+ // See the implementation of the compress() function above for usage
+ // example.
+ //
+ // @@ TODO: reset support.
+ //
+ struct LIBBUTL_SYMEXPORT compressor
+ {
+ // Buffer, current size (part filled with data), and capacity.
+ //
+ char* ib; std::size_t in, ic; // Input.
+ char* ob; std::size_t on, oc; // Output.
+
+ // As a first step call begin(). This function sets the required input
+ // and output buffer capacities (ic, oc).
+ //
+ // The caller normally allocates the input and output buffers and fills
+ // the input buffer.
+ //
+ void
+ begin (int compression_level,
+ int block_size_id,
+ optional<std::uint64_t> content_size);
+
+ // Then call next() to compress the next chunk of input passing true on
+ // reaching EOF. Note that the input buffer should be filled to capacity
+ // unless end is true and the output buffer must be flushed before each
+ // subsequent call to next().
+ //
+ void
+ next (bool end);
+
+ // Not copyable or movable.
+ //
+ compressor (const compressor&) = delete;
+ compressor (compressor&&) = delete;
+ compressor& operator= (const compressor&) = delete;
+ compressor& operator= (compressor&&) = delete;
+
+ // Implementation details.
+ //
+ compressor (): ctx_ (nullptr) {}
+ ~compressor ();
+
+ public:
+ void
+ init_preferences (void*) const;
+
+ void* ctx_;
+ int level_;
+ int block_id_;
+ optional<std::uint64_t> content_size_;
+ bool begin_;
+ };
+
// Read the compressed content from the input stream, decompress it, and
// write the decompressed content to the output stream. Return the
// decompressed content size.
//
// This function may throw std::bad_alloc as well as exceptions thrown by
- // fdstream read/write functions. It may also throw invalid_argument if
- // the compressed content is invalid with what() returning the error
+ // fdstream read/write functions. It may also throw std::invalid_argument
+ // if the compressed content is invalid with what() returning the error
// description. The input stream is expected to throw on badbit (but not
// failbit). The output stream is expected to throw on badbit or failbit.
//
@@ -67,10 +127,72 @@ namespace butl
// mode.
//
// Note that this function does not require the input stream to reach EOF
- // at the end of compressed content. So if you have this a requirement,
- // you will need to enforce it yourself.
+ // at the end of compressed content. So if you have this requirement, you
+ // will need to enforce it yourself.
//
LIBBUTL_SYMEXPORT std::uint64_t
decompress (ofdstream&, ifdstream&);
+
+ // Low-level iterative decompression API.
+ //
+ // This API may throw std::bad_alloc in case of memory allocation errors
+ // and std::invalid_argument if the compressed content is invalid with
+ // what() returning the error description.
+ //
+ // See the implementation of the decompress() function above for usage
+ // example.
+ //
+ // The LZ4F_*() decompression functions return a hint of how much data
+ // they want on the next call. So the plan is to allocate the input
+ // buffer large enough to hold anything that can be asked for and then
+ // fill it in in the asked chunks. This way we avoid having to shift the
+ // unread data around.
+ //
+ // @@ TODO: reset support.
+ //
+ struct LIBBUTL_SYMEXPORT decompressor
+ {
+ // Buffer, current size (part filled with data), and capacity.
+ //
+ char hb[19]; std::size_t hn ; // Header.
+ char* ib; std::size_t in, ic; // Input.
+ char* ob; std::size_t on, oc; // Output.
+
+ // As a first step, fill in the header buffer and call begin(). This
+ // function sets the required input and output buffer capacities (ic,
+ // oc) and the number of bytes left in the header buffer (hn) and
+ // returns the number of bytes expected by the following call to next().
+ //
+ // The caller normally allocates the input and output buffers, copies
+ // remaining header buffer data over to the input buffer, and then fills
+ // in the remainder of the input buffer up to what's expected by the
+ // call to next().
+ //
+ std::size_t
+ begin ();
+
+ // Then call next() to decompress the next chunk of input. This function
+ // returns the number of bytes expected by the following call to next()
+ // or 0 if no further input is expected. Note that the output buffer
+ // must be flushed before each subsequent call to next().
+ //
+ std::size_t
+ next ();
+
+ // Not copyable or movable.
+ //
+ decompressor (const decompressor&) = delete;
+ decompressor (decompressor&&) = delete;
+ decompressor& operator= (const decompressor&) = delete;
+ decompressor& operator= (decompressor&&) = delete;
+
+ // Implementation details.
+ //
+ decompressor ();
+ ~decompressor ();
+
+ public:
+ void* ctx_;
+ };
}
}