aboutsummaryrefslogtreecommitdiff
path: root/libbutl
diff options
context:
space:
mode:
authorBoris Kolpackov <boris@codesynthesis.com>2021-09-08 16:02:41 +0200
committerBoris Kolpackov <boris@codesynthesis.com>2021-09-09 08:34:00 +0200
commit07286ad05fc2a60a485f542340aa04ceeaa3748c (patch)
treedf47d74453aeb4b10b9746cc396db3d883f81dd0 /libbutl
parent98c4038df36fb73601c58ccd885d1c2d3703cf6e (diff)
Implement lz4::{istream,ostream}
Diffstat (limited to 'libbutl')
-rw-r--r--libbutl/lz4-stream.cxx281
-rw-r--r--libbutl/lz4-stream.hxx280
-rw-r--r--libbutl/lz4.cxx136
-rw-r--r--libbutl/lz4.hxx19
4 files changed, 667 insertions, 49 deletions
diff --git a/libbutl/lz4-stream.cxx b/libbutl/lz4-stream.cxx
new file mode 100644
index 0000000..43b0250
--- /dev/null
+++ b/libbutl/lz4-stream.cxx
@@ -0,0 +1,281 @@
+// file : libbutl/lz4-stream.cxx -*- C++ -*-
+// license : MIT; see accompanying LICENSE file
+
+#include <libbutl/lz4-stream.hxx>
+
+#include <cstring> // memcpy()
+#include <stdexcept> // invalid_argument
+
+#include <libbutl/utility.mxx> // eof()
+
+using namespace std;
+
+namespace butl
+{
+ namespace lz4
+ {
+ // istream
+ //
+
+ // Read into the specified buffer returning the number of bytes read and
+ // the eof flag.
+ //
+ pair<size_t, bool> istreambuf::
+ read (char* b, size_t c)
+ {
+ size_t n (0);
+ bool e (false);
+
+ // @@ TODO: would it be faster to do a direct buffer copy if input
+ // stream is bufstreabuf-based (see sha*.cxx for code)?
+ do
+ {
+ e = eof (is_->read (b + n, c - n));
+ n += static_cast<size_t> (is_->gcount ());
+ }
+ while (!e && n != c);
+
+ return make_pair (n, e);
+ }
+
+ optional<uint64_t> istreambuf::
+ open (std::istream& is, bool end)
+ {
+ assert (is.exceptions () == std::istream::badbit);
+
+ is_ = &is;
+ end_ = end;
+
+ // Read in the header and allocate the buffers.
+ //
+ // What if we hit EOF here? And could begin() return 0? Turns out the
+ // answer to both questions is yes: 0-byte content compresses to 15
+ // bytes (with or without content size; 1-byte -- to 20/28 bytes). We
+ // can ignore EOF here since an attempt to read more will result in
+ // another EOF. And our load() is prepared to handle 0 hint.
+ //
+ // @@ We could end up leaving some of the input content from the header
+ // in the input buffer which the caller will have to way of using
+ // (e.g., in a stream of compressed contents). Doesn't look like
+ // there is much we can do (our streams don't support putback) other
+ // than document this limitation.
+ //
+ optional<uint64_t> r;
+
+ d_.hn = read (d_.hb, sizeof (d_.hb)).first;
+ h_ = d_.begin (&r);
+
+ ib_.reset ((d_.ib = new char[d_.ic]));
+ ob_.reset ((d_.ob = new char[d_.oc]));
+
+ // Copy over whatever is left in the header buffer.
+ //
+ memcpy (d_.ib, d_.hb, (d_.in = d_.hn));
+
+ setg (d_.ob, d_.ob, d_.ob);
+ return r;
+ }
+
+ void istreambuf::
+ close ()
+ {
+ if (is_open ())
+ {
+ is_ = nullptr;
+ }
+ }
+
+ istreambuf::int_type istreambuf::
+ underflow ()
+ {
+ int_type r (traits_type::eof ());
+
+ if (is_open ())
+ {
+ if (gptr () < egptr () || load ())
+ r = traits_type::to_int_type (*gptr ());
+ }
+
+ return r;
+ }
+
+ bool istreambuf::
+ load ()
+ {
+ // Note that the first call to this function may be with h_ == 0 (see
+ // open() for details). In which case we just need to verify there is
+ // no just after the compressed content.
+ //
+ bool r;
+
+ if (h_ == 0)
+ r = false; // EOF
+ else
+ {
+ // Note: next() may just buffer the data.
+ //
+ do
+ {
+ // Note that on the first call we may already have some data in the
+ // input buffer (leftover header data).
+ //
+ if (h_ > d_.in)
+ {
+ pair<size_t, bool> p (read (d_.ib + d_.in, h_ - d_.in));
+
+ d_.in += p.first;
+
+ if (p.second && d_.in != h_)
+ throw invalid_argument ("incomplete compressed content");
+ }
+
+ h_ = d_.next (); // Clears d_.in.
+
+ } while (d_.on == 0 && h_ != 0);
+
+ setg (d_.ob, d_.ob, d_.ob + d_.on);
+ off_ += d_.on;
+ r = (d_.on != 0);
+ }
+
+ // If we don't expect any more compressed content and we were asked to
+ // end the underlying input stream, then verify there is no more input.
+ //
+ if (h_ == 0 && end_)
+ {
+ end_ = false;
+
+ if (d_.in != 0 ||
+ (!is_->eof () &&
+ is_->good () &&
+ is_->peek () != istream::traits_type::eof ()))
+ throw invalid_argument ("junk after compressed content");
+ }
+
+ return r;
+ }
+
+ // ostream
+ //
+
+ void ostreambuf::
+ write (char* b, std::size_t n)
+ {
+ os_->write (b, static_cast<streamsize> (n));
+ }
+
+ void ostreambuf::
+ open (std::ostream& os,
+ int level,
+ int block_id,
+ optional<std::uint64_t> content_size)
+ {
+ assert (os.exceptions () == (std::ostream::badbit |
+ std::ostream::failbit));
+
+ os_ = &os;
+
+ // Determine required buffer capacities.
+ //
+ c_.begin (level, block_id, content_size);
+
+ ib_.reset ((c_.ib = new char[c_.ic]));
+ ob_.reset ((c_.ob = new char[c_.oc]));
+
+ setp (c_.ib, c_.ib + c_.ic - 1); // Keep space for overflow's char.
+ end_ = false;
+ }
+
+ void ostreambuf::
+ close ()
+ {
+ if (is_open ())
+ {
+ if (!end_)
+ save ();
+
+ os_ = nullptr;
+ }
+ }
+
+ ostreambuf::
+ ~ostreambuf ()
+ {
+ close ();
+ }
+
+ ostreambuf::int_type ostreambuf::
+ overflow (int_type c)
+ {
+ int_type r (traits_type::eof ());
+
+ if (is_open () && c != traits_type::eof ())
+ {
+ // Store last character in the space we reserved in open(). Note
+ // that pbump() doesn't do any checks.
+ //
+ *pptr () = traits_type::to_char_type (c);
+ pbump (1);
+
+ save ();
+ r = c;
+ }
+
+ return r;
+ }
+
+ void ostreambuf::
+ save ()
+ {
+ c_.in = pptr () - pbase ();
+ off_ += c_.in;
+
+ // We assume this is the end if the input buffer is not full.
+ //
+ end_ = (c_.in != c_.ic);
+ c_.next (end_);
+
+ if (c_.on != 0) // next() may just buffer the data.
+ write (c_.ob, c_.on);
+
+ setp (c_.ib, c_.ib + c_.ic - 1);
+ }
+
+ streamsize ostreambuf::
+ xsputn (const char_type* s, streamsize sn)
+ {
+ if (!is_open () || end_)
+ return 0;
+
+ // To avoid futher 'signed/unsigned comparison' compiler warnings.
+ //
+ size_t n (static_cast<size_t> (sn));
+
+ // The plan is to keep copying the data into the input buffer and
+ // calling save() (our compressor API currently has no way of avoiding
+ // the copy).
+ //
+ while (n != 0)
+ {
+ // Amount of free space in the buffer (including the extra byte
+ // we've reserved).
+ //
+ size_t an (epptr () - pptr () + 1);
+
+ size_t m (n > an ? an : n);
+ memcpy (pptr (), s, m);
+ pbump (static_cast<int> (m));
+
+ if (n < an)
+ break; // All fitted with at least 1 byte left.
+
+ save ();
+
+ s += m;
+ n -= m;
+ }
+
+ return sn;
+ }
+ }
+}
diff --git a/libbutl/lz4-stream.hxx b/libbutl/lz4-stream.hxx
new file mode 100644
index 0000000..611fe37
--- /dev/null
+++ b/libbutl/lz4-stream.hxx
@@ -0,0 +1,280 @@
+// file : libbutl/lz4-stream.hxx -*- C++ -*-
+// license : MIT; see accompanying LICENSE file
+
+#pragma once
+
+#include <memory> // unique_ptr
+#include <cstddef> // size_t
+#include <cstdint> // uint64_t
+#include <utility> // move()
+#include <istream>
+#include <ostream>
+#include <cassert>
+
+#include <libbutl/lz4.hxx>
+#include <libbutl/optional.mxx>
+#include <libbutl/bufstreambuf.hxx>
+
+#include <libbutl/export.hxx>
+
+namespace butl
+{
+ namespace lz4
+ {
+ // istream
+ //
+
+ class LIBBUTL_SYMEXPORT istreambuf: public bufstreambuf
+ {
+ public:
+ optional<std::uint64_t>
+ open (std::istream&, bool end);
+
+ bool
+ is_open () const {return is_ != nullptr;}
+
+ void
+ close ();
+
+ public:
+ using base = bufstreambuf;
+
+ // basic_streambuf input interface.
+ //
+ public:
+ virtual int_type
+ underflow () override;
+
+ // Direct access to the get area. Use with caution.
+ //
+ using base::gptr;
+ using base::egptr;
+ using base::gbump;
+
+ // Return the (logical) position of the next byte to be read.
+ //
+ using base::tellg;
+
+ private:
+ std::pair<std::size_t, bool>
+ read (char*, std::size_t);
+
+ bool
+ load ();
+
+ private:
+ std::istream* is_ = nullptr;
+ bool end_;
+ decompressor d_;
+ std::unique_ptr<char[]> ib_; // Decompressor input buffer.
+ std::unique_ptr<char[]> ob_; // Decompressor output buffer.
+ std::size_t h_; // Decompressor next chunk hint.
+ };
+
+ // Typical usage:
+ //
+ // try
+ // {
+ // ifdstream ifs (..., ifdstream::badbit);
+ // lz4::istream izs (ifs, true /* end */);
+ // ... // Read from izs.
+ // }
+ // catch (const invalid_argument& e)
+ // {
+ // ... // Invalid compressed content, call e.what() for description.
+ // }
+ // catch (/* ifdstream exceptions */)
+ // {
+ // ...
+ // }
+ //
+ // See class decompressor for details on semantics nad exceptions thrown.
+ //
+ // @@ TODO: get rid of badbit-only requirement.
+ // @@ TODO: re-openning support (will need compressor reset).
+ //
+ class LIBBUTL_SYMEXPORT istream: public std::istream
+ {
+ public:
+ explicit
+ istream (iostate e = badbit | failbit)
+ : std::istream (&buf_)
+ {
+ assert (e & badbit);
+ exceptions (e);
+ }
+
+ // The underlying input stream is expected to throw on badbit but not
+ // failbit. If end is true, then on reaching the end of compressed data
+ // verify there is no more input.
+ //
+ // Note that this implementation does not support handing streams of
+ // compressed contents (end is false) that may include individual
+ // contents that uncompress to 0 bytes (see istreambuf::open()
+ // implementation for details).
+ //
+ istream (std::istream& is, bool end, iostate e = badbit | failbit)
+ : istream (e)
+ {
+ open (is, end);
+ }
+
+ // Return decompressed content size, if available.
+ //
+ optional<std::uint64_t>
+ open (std::istream& is, bool end)
+ {
+ return buf_.open (is, end);
+ }
+
+ bool
+ is_open () const
+ {
+ return buf_.is_open ();
+ }
+
+ // Signal that no further uncompressed input will be read.
+ //
+ void
+ close ()
+ {
+ return buf_.close ();
+ }
+
+ private:
+ istreambuf buf_;
+ };
+
+ // ostream
+ //
+
+ class LIBBUTL_SYMEXPORT ostreambuf: public bufstreambuf
+ {
+ public:
+ void
+ open (std::ostream&,
+ int compression_level,
+ int block_size_id,
+ optional<std::uint64_t> content_size);
+
+ bool
+ is_open () const {return os_ != nullptr;}
+
+ void
+ close ();
+
+ virtual
+ ~ostreambuf () override;
+
+ public:
+ using base = bufstreambuf;
+
+ // basic_streambuf output interface.
+ //
+ // Note that syncing the input buffer before the end doesn't make much
+ // sense (it will just get buffered in the compressor). In fact, it can
+ // break our single-shot compression arrangement (for compatibility with
+ // the lz4 utility). Thus we inherit noop sync() from the base.
+ //
+ public:
+ virtual int_type
+ overflow (int_type) override;
+
+ virtual std::streamsize
+ xsputn (const char_type*, std::streamsize) override;
+
+ // Return the (logical) position of the next byte to be written.
+ //
+ using base::tellp;
+
+ private:
+ void
+ write (char*, std::size_t);
+
+ void
+ save ();
+
+ private:
+ std::ostream* os_ = nullptr;
+ bool end_;
+ compressor c_;
+ std::unique_ptr<char[]> ib_; // Compressor input buffer.
+ std::unique_ptr<char[]> ob_; // Compressor output buffer.
+ };
+
+ // Typical usage:
+ //
+ // try
+ // {
+ // ofdstream ofs (...);
+ // lz4::ostream ozs (ofs, 4 /* 64KB */, 9, nullopt /* content_size */);
+ //
+ // ... // Write to ozs.
+ //
+ // ozs.close ();
+ // ofs.close ();
+ // }
+ // catch (/* ofdstream exceptions */)
+ // {
+ // ...
+ // }
+ //
+ // See class compressor for details on semantics nad exceptions thrown.
+ //
+ // @@ TODO: re-openning support (will need compressor reset).
+ //
+ class LIBBUTL_SYMEXPORT ostream: public std::ostream
+ {
+ public:
+ explicit
+ ostream (iostate e = badbit | failbit)
+ : std::ostream (&buf_)
+ {
+ assert (e & badbit);
+ exceptions (e);
+ }
+
+ // The underlying output stream is expected to throw on badbit or
+ // failbit.
+ //
+ // See compress() for the description of the compression level, block
+ // size and content size arguments.
+ //
+ ostream (std::ostream& os,
+ int compression_level,
+ int block_size_id,
+ optional<std::uint64_t> content_size,
+ iostate e = badbit | failbit)
+ : ostream (e)
+ {
+ open (os, compression_level, block_size_id, content_size);
+ }
+
+ void
+ open (std::ostream& os,
+ int compression_level,
+ int block_size_id,
+ optional<std::uint64_t> content_size)
+ {
+ buf_.open (os, compression_level, block_size_id, content_size);
+ }
+
+ bool
+ is_open () const
+ {
+ return buf_.is_open ();
+ }
+
+ // Signal that no further uncompressed output will be written.
+ //
+ void
+ close ()
+ {
+ return buf_.close ();
+ }
+
+ private:
+ ostreambuf buf_;
+ };
+ }
+}
diff --git a/libbutl/lz4.cxx b/libbutl/lz4.cxx
index a28ed78..9d15203 100644
--- a/libbutl/lz4.cxx
+++ b/libbutl/lz4.cxx
@@ -28,6 +28,10 @@
#include <libbutl/utility.mxx> // eos()
+#if 0
+#include <libbutl/lz4-stream.hxx>
+#endif
+
using namespace std;
namespace butl
@@ -212,12 +216,12 @@ namespace butl
if (LZ4F_isError (on))
throw_exception (on);
+ in = 0; // All consumed.
return;
}
else
{
- if (LZ4F_isError (
- LZ4F_createCompressionContext (&ctx, LZ4F_VERSION)))
+ if (LZ4F_isError (LZ4F_createCompressionContext (&ctx, LZ4F_VERSION)))
throw bad_alloc ();
ctx_ = ctx;
@@ -239,11 +243,15 @@ namespace butl
size_t n;
- n = LZ4F_compressUpdate (ctx, ob + on, oc - on, ib, in, nullptr);
- if (LZ4F_isError (n))
- throw_exception (n);
+ if (in != 0)
+ {
+ n = LZ4F_compressUpdate (ctx, ob + on, oc - on, ib, in, nullptr);
+ if (LZ4F_isError (n))
+ throw_exception (n);
- on += n;
+ in = 0; // All consumed.
+ on += n;
+ }
// Write the end marker.
//
@@ -266,6 +274,21 @@ namespace butl
int block_id,
optional<uint64_t> content_size)
{
+#if 0
+ char buf[1024 * 3 + 7];
+ ostream cos (os, level, block_id, content_size);
+
+ for (bool e (false); !e; )
+ {
+ e = eof (is.read (buf, sizeof (buf)));
+ cos.write (buf, is.gcount ());
+ //for (streamsize i (0), n (is.gcount ()); i != n; ++i)
+ // cos.put (buf[i]);
+ }
+
+ cos.close ();
+ return content_size ? *content_size : 0;
+#else
compressor c;
// Input/output buffer guards.
@@ -314,6 +337,7 @@ namespace butl
}
return ot;
+#endif
}
// decompression
@@ -323,8 +347,17 @@ namespace butl
"LZ4 header size mismatch");
decompressor::
- decompressor ()
- : hn (0), in (0), on (0)
+ ~decompressor ()
+ {
+ if (LZ4F_dctx* ctx = static_cast<LZ4F_dctx*> (ctx_))
+ {
+ LZ4F_errorCode_t e (LZ4F_freeDecompressionContext (ctx));
+ assert (!LZ4F_isError (e));
+ }
+ }
+
+ size_t decompressor::
+ begin (optional<uint64_t>* content_size)
{
LZ4F_dctx* ctx;
@@ -332,20 +365,6 @@ namespace butl
throw bad_alloc ();
ctx_ = ctx;
- }
-
- decompressor::
- ~decompressor ()
- {
- LZ4F_errorCode_t e (
- LZ4F_freeDecompressionContext (static_cast<LZ4F_dctx*> (ctx_)));
- assert (!LZ4F_isError (e));
- }
-
- size_t decompressor::
- begin ()
- {
- LZ4F_dctx* ctx (static_cast<LZ4F_dctx*> (ctx_));
LZ4F_frameInfo_t info = LZ4F_INIT_FRAMEINFO;
@@ -357,6 +376,14 @@ namespace butl
if (LZ4F_isError (h))
throw_exception (h);
+ if (content_size != nullptr)
+ {
+ if (info.contentSize != 0)
+ *content_size = static_cast<uint64_t> (info.contentSize);
+ else
+ *content_size = nullopt;
+ }
+
// Use the block size for the output buffer capacity and compressed
// bound plus the header size for the input. The expectation is that
// LZ4F_decompress() should never hint for more than that.
@@ -391,6 +418,7 @@ namespace butl
// We expect LZ4F_decompress() to consume what it asked for.
//
assert (e == in && h <= ic);
+ in = 0; // All consumed.
return h;
}
@@ -398,13 +426,26 @@ namespace butl
uint64_t
decompress (ofdstream& os, ifdstream& is)
{
- decompressor d;
-
- // Input/output buffer guards.
+ // Write the specified number of bytes from the output buffer updating
+ // the total written.
//
- unique_ptr<char[]> ibg;
- unique_ptr<char[]> obg;
+ uint64_t ot (0);
+ auto write = [&os, &ot] (char* b, size_t n)
+ {
+ os.write (b, static_cast<streamsize> (n));
+ ot += n;
+ };
+
+#if 0
+ char buf[1024 * 3 + 7];
+ istream dis (is, true, istream::badbit);
+ for (bool e (false); !e; )
+ {
+ e = eof (dis.read (buf, sizeof (buf)));
+ write (buf, static_cast<size_t> (dis.gcount ()));
+ }
+#else
// Read into the specified buffer returning the number of bytes read and
// updating the eof flag.
//
@@ -422,20 +463,27 @@ namespace butl
return n;
};
- // Write the specified number of bytes from the output buffer updating
- // the total written.
+ decompressor d;
+
+ // Input/output buffer guards.
//
- uint64_t ot (0);
- auto write = [&os, &ot] (char* b, size_t n)
- {
- os.write (b, static_cast<streamsize> (n));
- ot += n;
- };
+ unique_ptr<char[]> ibg;
+ unique_ptr<char[]> obg;
size_t h; // Input hint.
// First read in the header and allocate the buffers.
//
+ // What if we hit EOF here? And could begin() return 0? Turns out the
+ // answer to both questions is yes: 0-byte content compresses to 15
+ // bytes (with or without content size; 1-byte -- to 20/28 bytes). We
+ // can ignore EOF here since an attempt to read more will result in
+ // another EOF. And code below is prepared to handle 0 initial hint.
+ //
+ // @@ We could end up leaving some of the input content from the
+ // header in the input buffer which the caller will have to way
+ // of using/detecting.
+ //
d.hn = read (d.hb, sizeof (d.hb));
h = d.begin ();
@@ -453,20 +501,22 @@ namespace butl
// Keep decompressing, writing, and reading chunks of compressed
// content.
//
- for (;;)
+ while (h != 0)
{
h = d.next ();
- write (d.ob, d.on);
+ if (d.on != 0) // next() may just buffer the data.
+ write (d.ob, d.on);
- if (h == 0)
- break;
-
- if (eof)
- throw invalid_argument ("incomplete compressed content");
+ if (h != 0)
+ {
+ if (eof)
+ throw invalid_argument ("incomplete compressed content");
- d.in = read (d.ib, h);
+ d.in = read (d.ib, h);
+ }
}
+#endif
return ot;
}
diff --git a/libbutl/lz4.hxx b/libbutl/lz4.hxx
index 98175c1..cfe9967 100644
--- a/libbutl/lz4.hxx
+++ b/libbutl/lz4.hxx
@@ -30,8 +30,9 @@ namespace butl
// The output and most likely the input streams must be in the binary
// mode.
//
- // Valid values for the compressions level are between 1 (fastest) and
- // 12 (best compression level).
+ // Valid values for the compression level are between 1 (fastest) and 12
+ // (best compression level) though, practically, after 9 returns are
+ // diminished.
//
// Valid block sizes and their IDs:
//
@@ -40,6 +41,10 @@ namespace butl
// 6: 1MB
// 7: 4MB
//
+ // Note that due to the underlying API limitations, 0 content size is
+ // treated as absent and it's therefore impossible to compress 0-byte
+ // content with content size.
+ //
// This function produces compressed content identical to:
//
// lz4 -z -<compression_level> -B<block_size_id> -BD [--content-size]
@@ -120,8 +125,8 @@ namespace butl
// This function may throw std::bad_alloc as well as exceptions thrown by
// fdstream read/write functions. It may also throw std::invalid_argument
// if the compressed content is invalid with what() returning the error
- // description. The input stream is expected to throw on badbit (but not
- // failbit). The output stream is expected to throw on badbit or failbit.
+ // description. The input stream is expected to throw on badbit but not
+ // failbit. The output stream is expected to throw on badbit or failbit.
//
// The input and most likely the output streams must be in the binary
// mode.
@@ -162,6 +167,8 @@ namespace butl
// function sets the required input and output buffer capacities (ic,
// oc) and the number of bytes left in the header buffer (hn) and
// returns the number of bytes expected by the following call to next().
+ // If content_size is not NULL, then it is set to the decompressed
+ // content size, if available.
//
// The caller normally allocates the input and output buffers, copies
// remaining header buffer data over to the input buffer, and then fills
@@ -169,7 +176,7 @@ namespace butl
// call to next().
//
std::size_t
- begin ();
+ begin (optional<std::uint64_t>* content_size = nullptr);
// Then call next() to decompress the next chunk of input. This function
// returns the number of bytes expected by the following call to next()
@@ -188,7 +195,7 @@ namespace butl
// Implementation details.
//
- decompressor ();
+ decompressor (): hn (0), in (0), on (0), ctx_ (nullptr) {}
~decompressor ();
public: