From 43b92007c6c2a360b9924e1bbe58e81c4dc6e38f Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Wed, 8 Sep 2021 16:02:41 +0200 Subject: Implement lz4::{istream,ostream} --- libbutl/lz4-stream.cxx | 276 +++++++++++++++++++++++++++++++++++++++++++++++++ libbutl/lz4-stream.hxx | 241 ++++++++++++++++++++++++++++++++++++++++++ libbutl/lz4.cxx | 98 +++++++++++++----- libbutl/lz4.hxx | 4 +- 4 files changed, 592 insertions(+), 27 deletions(-) create mode 100644 libbutl/lz4-stream.cxx create mode 100644 libbutl/lz4-stream.hxx (limited to 'libbutl') diff --git a/libbutl/lz4-stream.cxx b/libbutl/lz4-stream.cxx new file mode 100644 index 0000000..d2f8c94 --- /dev/null +++ b/libbutl/lz4-stream.cxx @@ -0,0 +1,276 @@ +// file : libbutl/lz4-stream.cxx -*- C++ -*- +// license : MIT; see accompanying LICENSE file + +#include + +#include // memcpy() +#include // invalid_argument + +#include // eof() + +using namespace std; + +namespace butl +{ + namespace lz4 + { + // istream + // + + // Read into the specified buffer returning the number of bytes read and + // the eof flag. + // + pair istreambuf:: + read (char* b, size_t c) + { + size_t n (0); + bool e (false); + + // @@ TODO: would it be faster to do a direct buffer copy if input + // stream is bufstreabuf-based (see sha*.cxx for code)? + do + { + e = eof (is_->read (b + n, c - n)); + n += static_cast (is_->gcount ()); + } + while (!e && n != c); + + return make_pair (n, e); + } + + void istreambuf:: + open (std::istream& is, bool end) + { + assert (is.exceptions () == std::istream::badbit); + + is_ = &is; + end_ = end; + + // Read in the header and allocate the buffers. + // + // What if we hit EOF here? And could begin() return 0? Turns out the + // answer to both questions is yes: 0-byte content compresses to 15 + // bytes (with or without content size; 1-byte -- to 20/28 bytes). We + // can ignore EOF here since an attempt to read more will result in + // another EOF. And our load() is prepared to handle 0 hint. + // + // @@ We could end up leaving some of the input content from the header + // in the input buffer which the caller will have to way of using + // (e.g., in a stream of compressed contents). Doesn't look like + // there is much we can do (our streams don't support putback) other + // than document this limitation. + // + d_.hn = read (d_.hb, sizeof (d_.hb)).first; + h_ = d_.begin (); + + ib_.reset ((d_.ib = new char[d_.ic])); + ob_.reset ((d_.ob = new char[d_.oc])); + + // Copy over whatever is left in the header buffer. + // + memcpy (d_.ib, d_.hb, (d_.in = d_.hn)); + + setg (d_.ob, d_.ob, d_.ob); + } + + void istreambuf:: + close () + { + if (is_open ()) + { + is_ = nullptr; + } + } + + istreambuf::int_type istreambuf:: + underflow () + { + int_type r (traits_type::eof ()); + + if (is_open ()) + { + if (gptr () < egptr () || load ()) + r = traits_type::to_int_type (*gptr ()); + } + + return r; + } + + bool istreambuf:: + load () + { + // Note that the first call to this function may be with h_ == 0 (see + // open() for details). In which case we just need to verify there is + // no just after the compressed content. + // + bool r; + + if (h_ == 0) + r = false; // EOF + else + { + // Note: next() may just buffer the data. + // + do + { + // Note that on the first call we may already have some data in the + // input buffer (leftover header data). + // + if (h_ > d_.in) + { + pair p (read (d_.ib + d_.in, h_ - d_.in)); + + d_.in += p.first; + + if (p.second && d_.in != h_) + throw invalid_argument ("incomplete compressed content"); + } + + h_ = d_.next (); // Clears d_.in. + + } while (d_.on == 0 && h_ != 0); + + setg (d_.ob, d_.ob, d_.ob + d_.on); + off_ += d_.on; + r = (d_.on != 0); + } + + // If we don't expect any more compressed content and we were asked to + // end the underlying input stream, then verify there is no more input. + // + if (h_ == 0 && end_) + { + if (d_.in != 0 || + (!is_->eof () && + is_->good () && + is_->peek () != istream::traits_type::eof ())) + throw invalid_argument ("junk after compressed content"); + } + + return r; + } + + // ostream + // + + void ostreambuf:: + write (char* b, std::size_t n) + { + os_->write (b, static_cast (n)); + } + + void ostreambuf:: + open (std::ostream& os, + int level, + int block_id, + optional content_size) + { + assert (os.exceptions () == (std::ostream::badbit | + std::ostream::failbit)); + + os_ = &os; + + // Determine required buffer capacities. + // + c_.begin (level, block_id, content_size); + + ib_.reset ((c_.ib = new char[c_.ic])); + ob_.reset ((c_.ob = new char[c_.oc])); + + setp (c_.ib, c_.ib + c_.ic - 1); // Keep space for overflow's char. + end_ = false; + } + + void ostreambuf:: + close () + { + if (is_open ()) + { + if (!end_) + save (); + + os_ = nullptr; + } + } + + ostreambuf:: + ~ostreambuf () + { + close (); + } + + ostreambuf::int_type ostreambuf:: + overflow (int_type c) + { + int_type r (traits_type::eof ()); + + if (is_open () && c != traits_type::eof ()) + { + // Store last character in the space we reserved in open(). Note + // that pbump() doesn't do any checks. + // + *pptr () = traits_type::to_char_type (c); + pbump (1); + + save (); + r = c; + } + + return r; + } + + void ostreambuf:: + save () + { + c_.in = pptr () - pbase (); + off_ += c_.in; + + // We assume this is the end if the input buffer is not full. + // + end_ = (c_.in != c_.ic); + c_.next (end_); + + if (c_.on != 0) // next() may just buffer the data. + write (c_.ob, c_.on); + + setp (c_.ib, c_.ib + c_.ic - 1); + } + + streamsize ostreambuf:: + xsputn (const char_type* s, streamsize sn) + { + if (!is_open () || end_) + return 0; + + // To avoid futher 'signed/unsigned comparison' compiler warnings. + // + size_t n (static_cast (sn)); + + // The plan is to keep copying the data into the input buffer and + // calling save() (our compressor API currently has no way of avoiding + // the copy). + // + while (n != 0) + { + // Amount of free space in the buffer (including the extra byte + // we've reserved). + // + size_t an (epptr () - pptr () + 1); + + size_t m (n > an ? an : n); + memcpy (pptr (), s, m); + pbump (static_cast (m)); + + if (n < an) + break; // All fitted with at least 1 byte left. + + save (); + + s += m; + n -= m; + } + + return sn; + } + } +} diff --git a/libbutl/lz4-stream.hxx b/libbutl/lz4-stream.hxx new file mode 100644 index 0000000..bcf40f1 --- /dev/null +++ b/libbutl/lz4-stream.hxx @@ -0,0 +1,241 @@ +// file : libbutl/lz4-stream.hxx -*- C++ -*- +// license : MIT; see accompanying LICENSE file + +#pragma once + +#include // unique_ptr +#include // size_t +#include // uint64_t +#include // move() +#include +#include +#include + +#include +#include +#include + +#include + +namespace butl +{ + namespace lz4 + { + // istream + // + + class LIBBUTL_SYMEXPORT istreambuf: public bufstreambuf + { + public: + void + open (std::istream&, bool end); + + bool + is_open () const {return is_ != nullptr;} + + void + close (); + + public: + using base = bufstreambuf; + + // basic_streambuf input interface. + // + public: + virtual int_type + underflow () override; + + // Direct access to the get area. Use with caution. + // + using base::gptr; + using base::egptr; + using base::gbump; + + // Return the (logical) position of the next byte to be read. + // + using base::tellg; + + private: + std::pair + read (char*, std::size_t); + + bool + load (); + + private: + std::istream* is_ = nullptr; + bool end_; + decompressor d_; + std::unique_ptr ib_; // Decompressor input buffer. + std::unique_ptr ob_; // Decompressor output buffer. + std::size_t h_; // Decompressor next chunk hint. + }; + + // @@ TODO: doc exceptions. + // @@ TODO: re-openning support (will need decompressor reset). + // + class LIBBUTL_SYMEXPORT istream: public std::istream + { + public: + explicit + istream (iostate e = badbit | failbit) + : std::istream (&buf_) + { + assert (e & badbit); + exceptions (e); + } + + // The underlying input stream is expected to throw on badbit but not + // failbit. If end is true, then on reaching the end of compressed + // data verify there is no more input data. + // + // Note that this implementation does not support handing streams of + // compressed contents (end is false) that may include individual + // contents that uncompress to 0 bytes (see istreambuf::open() + // implementation for details). + // + istream (std::istream& is, bool end, iostate e = badbit | failbit) + : istream (e) + { + open (is, end); + } + + void + open (std::istream& is, bool end) + { + buf_.open (is, end); + } + + bool + is_open () const + { + return buf_.is_open (); + } + + // Signal that no further uncompressed input will be read. + // + void + close () + { + return buf_.close (); + } + + private: + istreambuf buf_; + }; + + // ostream + // + + class LIBBUTL_SYMEXPORT ostreambuf: public bufstreambuf + { + public: + void + open (std::ostream&, + int compression_level, + int block_size_id, + optional content_size); + + bool + is_open () const {return os_ != nullptr;} + + void + close (); + + virtual + ~ostreambuf () override; + + public: + using base = bufstreambuf; + + // basic_streambuf output interface. + // + // Note that syncing the input buffer before the end doesn't make much + // sense (it will just get buffered in the compressor). In fact, it can + // break our single-shot compression arrangement (for compatibility with + // the lz4 utility). Thus we inherit noop sync() from the base. + // + public: + virtual int_type + overflow (int_type) override; + + virtual std::streamsize + xsputn (const char_type*, std::streamsize) override; + + // Return the (logical) position of the next byte to be written. + // + using base::tellp; + + private: + void + write (char*, std::size_t); + + void + save (); + + private: + std::ostream* os_ = nullptr; + bool end_; + compressor c_; + std::unique_ptr ib_; // Compressor input buffer. + std::unique_ptr ob_; // Compressor output buffer. + }; + + // @@ TODO: doc exceptions. + // @@ TODO: re-openning support (will need compressor reset). + // + class LIBBUTL_SYMEXPORT ostream: public std::ostream + { + public: + explicit + ostream (iostate e = badbit | failbit) + : std::ostream (&buf_) + { + assert (e & badbit); + exceptions (e); + } + + // The underlying output stream is expected to throw on badbit or + // failbit. + // + // See compress() for the description of the compression level, block + // size and content size arguments. + // + ostream (std::ostream& os, + int compression_level, + int block_size_id, + optional content_size, + iostate e = badbit | failbit) + : ostream (e) + { + open (os, compression_level, block_size_id, content_size); + } + + void + open (std::ostream& os, + int compression_level, + int block_size_id, + optional content_size) + { + buf_.open (os, compression_level, block_size_id, content_size); + } + + bool + is_open () const + { + return buf_.is_open (); + } + + // Signal that no further uncompressed output will be written. + // + void + close () + { + return buf_.close (); + } + + private: + ostreambuf buf_; + }; + } +} diff --git a/libbutl/lz4.cxx b/libbutl/lz4.cxx index a28ed78..386971a 100644 --- a/libbutl/lz4.cxx +++ b/libbutl/lz4.cxx @@ -28,6 +28,10 @@ #include // eos() +#if 1 +#include +#endif + using namespace std; namespace butl @@ -212,6 +216,7 @@ namespace butl if (LZ4F_isError (on)) throw_exception (on); + in = 0; // All consumed. return; } else @@ -239,11 +244,15 @@ namespace butl size_t n; - n = LZ4F_compressUpdate (ctx, ob + on, oc - on, ib, in, nullptr); - if (LZ4F_isError (n)) - throw_exception (n); + if (in != 0) + { + n = LZ4F_compressUpdate (ctx, ob + on, oc - on, ib, in, nullptr); + if (LZ4F_isError (n)) + throw_exception (n); - on += n; + in = 0; // All consumed. + on += n; + } // Write the end marker. // @@ -266,6 +275,21 @@ namespace butl int block_id, optional content_size) { +#if 1 + char buf[1024 * 3 + 7]; + ostream cos (os, level, block_id, content_size); + + for (bool e (false); !e; ) + { + e = eof (is.read (buf, sizeof (buf))); + cos.write (buf, is.gcount ()); + //for (streamsize i (0), n (is.gcount ()); i != n; ++i) + // cos.put (buf[i]); + } + + cos.close (); + return 0; +#else compressor c; // Input/output buffer guards. @@ -314,6 +338,7 @@ namespace butl } return ot; +#endif } // decompression @@ -391,6 +416,7 @@ namespace butl // We expect LZ4F_decompress() to consume what it asked for. // assert (e == in && h <= ic); + in = 0; // All consumed. return h; } @@ -398,13 +424,26 @@ namespace butl uint64_t decompress (ofdstream& os, ifdstream& is) { - decompressor d; - - // Input/output buffer guards. + // Write the specified number of bytes from the output buffer updating + // the total written. // - unique_ptr ibg; - unique_ptr obg; + uint64_t ot (0); + auto write = [&os, &ot] (char* b, size_t n) + { + os.write (b, static_cast (n)); + ot += n; + }; + +#if 1 + char buf[1024 * 3 + 7]; + istream dis (is, true, istream::badbit); + for (bool e (false); !e; ) + { + e = eof (dis.read (buf, sizeof (buf))); + write (buf, static_cast (dis.gcount ())); + } +#else // Read into the specified buffer returning the number of bytes read and // updating the eof flag. // @@ -422,20 +461,27 @@ namespace butl return n; }; - // Write the specified number of bytes from the output buffer updating - // the total written. + decompressor d; + + // Input/output buffer guards. // - uint64_t ot (0); - auto write = [&os, &ot] (char* b, size_t n) - { - os.write (b, static_cast (n)); - ot += n; - }; + unique_ptr ibg; + unique_ptr obg; size_t h; // Input hint. // First read in the header and allocate the buffers. // + // What if we hit EOF here? And could begin() return 0? Turns out the + // answer to both questions is yes: 0-byte content compresses to 15 + // bytes (with or without content size; 1-byte -- to 20/28 bytes). We + // can ignore EOF here since an attempt to read more will result in + // another EOF. And code below is prepared to handle 0 initial hint. + // + // @@ We could end up leaving some of the input content from the + // header in the input buffer which the caller will have to way + // of using/detecting. + // d.hn = read (d.hb, sizeof (d.hb)); h = d.begin (); @@ -453,20 +499,22 @@ namespace butl // Keep decompressing, writing, and reading chunks of compressed // content. // - for (;;) + while (h != 0) { h = d.next (); - write (d.ob, d.on); - - if (h == 0) - break; + if (d.on != 0) // next() may just buffer the data. + write (d.ob, d.on); - if (eof) - throw invalid_argument ("incomplete compressed content"); + if (h != 0) + { + if (eof) + throw invalid_argument ("incomplete compressed content"); - d.in = read (d.ib, h); + d.in = read (d.ib, h); + } } +#endif return ot; } diff --git a/libbutl/lz4.hxx b/libbutl/lz4.hxx index 98175c1..3b8bcc4 100644 --- a/libbutl/lz4.hxx +++ b/libbutl/lz4.hxx @@ -120,8 +120,8 @@ namespace butl // This function may throw std::bad_alloc as well as exceptions thrown by // fdstream read/write functions. It may also throw std::invalid_argument // if the compressed content is invalid with what() returning the error - // description. The input stream is expected to throw on badbit (but not - // failbit). The output stream is expected to throw on badbit or failbit. + // description. The input stream is expected to throw on badbit but not + // failbit. The output stream is expected to throw on badbit or failbit. // // The input and most likely the output streams must be in the binary // mode. -- cgit v1.1