From 31726418d0f8f77040aaa0f9f5d0c5bb36105b6f Mon Sep 17 00:00:00 2001 From: Karen Arutyunov Date: Fri, 7 Oct 2016 16:08:22 +0300 Subject: Support ifdstream non-blocking mode --- butl/fdstream | 34 +++++++-- butl/fdstream.cxx | 172 +++++++++++++++++++++++++++++++++++++++------- tests/fdstream/driver.cxx | 93 +++++++++++++++++++++++-- 3 files changed, 266 insertions(+), 33 deletions(-) diff --git a/butl/fdstream b/butl/fdstream index 49b3d4d..2d7fae5 100644 --- a/butl/fdstream +++ b/butl/fdstream @@ -34,11 +34,15 @@ namespace butl // - char only // - input or output but not both // - no support for put back + // - non-blocking file descriptor is supported only by showmanyc() function + // and only on POSIX // - throws ios::failure in case of open()/read()/write()/close() errors // - exception mask has at least badbit // - after catching an exception caused by badbit the stream is no longer // used // - not movable, though can be easily supported + // - passing to constructor -1 file descriptor is valid and results in the + // creation of an unopened object // class LIBBUTL_EXPORT fdbuf: public std::basic_streambuf { @@ -46,7 +50,7 @@ namespace butl virtual ~fdbuf (); fdbuf () = default; - fdbuf (int fd) {open (fd);} + fdbuf (int fd) {if (fd != -1) open (fd);} fdbuf (const fdbuf&) = delete; fdbuf& operator= (const fdbuf&) = delete; @@ -60,6 +64,9 @@ namespace butl bool is_open () const {return fd_ != -1;} + int + fd () const {return fd_;} + public: using int_type = std::basic_streambuf::int_type; using traits_type = std::basic_streambuf::traits_type; @@ -96,6 +103,7 @@ namespace butl private: int fd_ = -1; char buf_[8192]; + bool non_blocking_ = false; }; // File stream mode. @@ -109,11 +117,21 @@ namespace butl // you may want not to "offend" the other end by closing your end before // reading all the data. // + // The blocking/non_blocking flags determine whether the IO operation should + // block or return control if currently there is no data to read or no room + // to write. Only the istream::readsome() function supports the semantics of + // non-blocking operations. We also only support this on POSIX (Windows does + // not provide means for the non-blocking reading from a file descriptor so + // these flags are noop there). IO stream operations other than readsome() + // are illegal for non_blocking mode and result in the badbit being set. + // enum class fdstream_mode: std::uint16_t { - text = 0x01, - binary = 0x02, - skip = 0x04 + text = 0x01, + binary = 0x02, + skip = 0x04, + blocking = 0x08, + non_blocking = 0x10 }; inline fdstream_mode operator& (fdstream_mode, fdstream_mode); @@ -149,6 +167,10 @@ namespace butl fdstream_base (int fd): buf_ (fd) {} fdstream_base (int, fdstream_mode); + public: + int + fd () const {return buf_.fd ();} + protected: fdbuf buf_; }; @@ -243,7 +265,7 @@ namespace butl // // throw failed (); // - class LIBBUTL_EXPORT ifdstream: fdstream_base, public std::istream + class LIBBUTL_EXPORT ifdstream: public fdstream_base, public std::istream { public: // Create an unopened object with iostate = badbit | failbit (we cannot @@ -323,7 +345,7 @@ namespace butl // (std::uncaught_exception() == true). This is enforced with assert() in // the ofdstream destructor. // - class LIBBUTL_EXPORT ofdstream: fdstream_base, public std::ostream + class LIBBUTL_EXPORT ofdstream: public fdstream_base, public std::ostream { public: // Create an unopened object with iostate = badbit | failbit (we cannot diff --git a/butl/fdstream.cxx b/butl/fdstream.cxx index ac5aa79..8e6f6c9 100644 --- a/butl/fdstream.cxx +++ b/butl/fdstream.cxx @@ -5,8 +5,9 @@ #include #ifndef _WIN32 -# include // open(), O_* -# include // close(), read(), write(), lseek(), ssize_t +# include // open(), O_*, fcntl() +# include // close(), read(), write(), lseek(), ssize_t, + // STD*_FILENO # include // writev(), iovec # include // S_I* # include // off_t @@ -72,6 +73,16 @@ namespace butl open (int fd) { close (); + +#ifndef _WIN32 + int flags (fcntl (fd, F_GETFL)); + + if (flags == -1) + throw_ios_failure (errno); + + non_blocking_ = (flags & O_NONBLOCK) == O_NONBLOCK; +#endif + fd_ = fd; setg (buf_, buf_, buf_); setp (buf_, buf_ + sizeof (buf_) - 1); // Keep space for overflow's char. @@ -92,7 +103,36 @@ namespace butl streamsize fdbuf:: showmanyc () { - return is_open () ? static_cast (egptr () - gptr ()) : -1; + if (!is_open ()) + return -1; + + streamsize n (egptr () - gptr ()); + + if (n > 0) + return n; + +#ifndef _WIN32 + if (non_blocking_) + { + ssize_t n (read (fd_, buf_, sizeof (buf_))); + + if (n == -1) + { + if (errno == EAGAIN || errno == EINTR) + return 0; + + throw_ios_failure (errno); + } + + if (n == 0) // EOF. + return -1; + + setg (buf_, buf_, buf_ + n); + return n; + } +#endif + + return 0; } fdbuf::int_type fdbuf:: @@ -102,6 +142,14 @@ namespace butl if (is_open ()) { + // The underflow() function interface doesn't support the non-blocking + // semantics as it must return either the next character or EOF. In the + // future we may implement the blocking behavior for a non-blocking file + // descriptor. + // + if (non_blocking_) + throw_ios_failure (ENOTSUP); + if (gptr () < egptr () || load ()) r = traits_type::to_int_type (*gptr ()); } @@ -112,6 +160,10 @@ namespace butl bool fdbuf:: load () { + // Doesn't handle blocking mode and so should not be called. + // + assert (!non_blocking_); + #ifndef _WIN32 ssize_t n (read (fd_, buf_, sizeof (buf_))); #else @@ -132,6 +184,14 @@ namespace butl if (is_open () && c != traits_type::eof ()) { + // The overflow() function interface doesn't support the non-blocking + // semantics since being unable to serialize the character is supposed + // to be an error. In the future we may implement the blocking behavior + // for a non-blocking file descriptor. + // + if (non_blocking_) + throw_ios_failure (ENOTSUP); + // Store last character in the space we reserved in open(). Note // that pbump() doesn't do any checks. // @@ -148,7 +208,18 @@ namespace butl int fdbuf:: sync () { - return is_open () && save () ? 0 : -1; + if (!is_open ()) + return -1; + + // The sync() function interface doesn't support the non-blocking + // semantics since it should either completely sync the data or fail. In + // the future we may implement the blocking behavior for a non-blocking + // file descriptor. + // + if (non_blocking_) + throw_ios_failure (ENOTSUP); + + return save () ? 0 : -1; } bool fdbuf:: @@ -183,6 +254,16 @@ namespace butl streamsize fdbuf:: xsputn (const char_type* s, streamsize sn) { + // The xsputn() function interface doesn't support the non-blocking + // semantics since the only excuse not to fully serialize the data is + // encountering EOF (the default behaviour is defined as a sequence of + // sputc() calls which stops when either sn characters are written or a + // call would have returned EOF). In the future we may implement the + // blocking behavior for a non-blocking file descriptor. + // + if (non_blocking_) + throw_ios_failure (ENOTSUP); + // To avoid futher 'signed/unsigned comparison' compiler warnings. // size_t n (static_cast (sn)); @@ -298,19 +379,31 @@ namespace butl #endif } + inline static bool + flag (fdstream_mode m, fdstream_mode flag) + { + return (m & flag) == flag; + } + + inline static int + mode (int fd, fdstream_mode m) + { + if (fd != -1 && + (flag (m, fdstream_mode::text) || + flag (m, fdstream_mode::binary) || + flag (m, fdstream_mode::blocking) || + flag (m, fdstream_mode::non_blocking))) + fdmode (fd, m); + + return fd; + } + // fdstream_base // fdstream_base:: fdstream_base (int fd, fdstream_mode m) - : fdstream_base (fd) // Delegate. + : fdstream_base (mode (fd, m)) // Delegate. { - // Note that here we rely on fdstream_base() (and fdbuf() which it calls) - // to not read from the file. - // - if (fd != -1 && - ((m & fdstream_mode::text) == fdstream_mode::text || - (m & fdstream_mode::binary) == fdstream_mode::binary)) - fdmode (fd, m); } static fdopen_mode @@ -631,27 +724,54 @@ namespace butl } fdstream_mode - fdmode (int, fdstream_mode) + fdmode (int fd, fdstream_mode m) { - return fdstream_mode::binary; + int flags (fcntl (fd, F_GETFL)); + + if (flags == -1) + throw_ios_failure (errno); + + if (flag (m, fdstream_mode::blocking) || + flag (m, fdstream_mode::non_blocking)) + { + m &= fdstream_mode::blocking | fdstream_mode::non_blocking; + + // Should be exactly one blocking mode flag specified. + // + if (m != fdstream_mode::blocking && m != fdstream_mode::non_blocking) + throw invalid_argument ("invalid blocking mode"); + + int new_flags ( + m == fdstream_mode::non_blocking + ? flags | O_NONBLOCK + : flags & ~O_NONBLOCK); + + if (fcntl (fd, F_SETFL, new_flags) == -1) + throw_ios_failure (errno); + } + + return fdstream_mode::binary | + ((flags & O_NONBLOCK) == O_NONBLOCK + ? fdstream_mode::non_blocking + : fdstream_mode::blocking); } fdstream_mode - stdin_fdmode (fdstream_mode) + stdin_fdmode (fdstream_mode m) { - return fdstream_mode::binary; + return fdmode (STDIN_FILENO, m); } fdstream_mode - stdout_fdmode (fdstream_mode) + stdout_fdmode (fdstream_mode m) { - return fdstream_mode::binary; + return fdmode (STDOUT_FILENO, m); } fdstream_mode - stderr_fdmode (fdstream_mode) + stderr_fdmode (fdstream_mode m) { - return fdstream_mode::binary; + return fdmode (STDERR_FILENO, m); } #else @@ -704,6 +824,11 @@ namespace butl // Should be exactly one translation flag specified. // + // It would have been natural not to change translation mode if none of + // text or binary flags are passed. Unfortunatelly there is no (easy) way + // to obtain the current mode for the file descriptor without setting a + // new one. This is why not specifying one of the modes is an error. + // if (m != fdstream_mode::binary && m != fdstream_mode::text) throw invalid_argument ("invalid translation mode"); @@ -711,9 +836,10 @@ namespace butl if (r == -1) throw_ios_failure (errno); - return (r & _O_BINARY) == _O_BINARY - ? fdstream_mode::binary - : fdstream_mode::text; + return fdstream_mode::blocking | + ((r & _O_BINARY) == _O_BINARY + ? fdstream_mode::binary + : fdstream_mode::text); } fdstream_mode diff --git a/tests/fdstream/driver.cxx b/tests/fdstream/driver.cxx index 2e3bac5..f77e2d5 100644 --- a/tests/fdstream/driver.cxx +++ b/tests/fdstream/driver.cxx @@ -2,6 +2,11 @@ // copyright : Copyright (c) 2014-2016 Code Synthesis Ltd // license : MIT; see accompanying LICENSE file +#ifndef _WIN32 +# include +# include // this_thread::sleep_for() +#endif + #include #include #include @@ -13,6 +18,7 @@ #include #include +#include #include #include #include @@ -90,13 +96,55 @@ read_time (const path& p, const T& s, size_t n) int main (int argc, const char* argv[]) { - if (!(argc == 1 || (argc == 2 && argv[1] == string ("-v")))) + bool v (false); + bool child (false); + + int i (1); + for (; i != argc; ++i) { - cerr << "usage: " << argv[0] << " [-v]" << endl; - return 1; + string a (argv[i]); + if (a == "-c") + child = true; + else if (a == "-v") + v = true; + else + { + cerr << "usage: " << argv[0] << " [-v] [-c]" << endl; + return 1; + } + } + + // To test non-blocking reading from ifdstream the test program launches + // itself as a child process with -c option and roundtrips a string through + // it. The child must write the string in chunks with some delays to make + // sure the parent reads in chunks as well. + // + if (child) + { + cin.exceptions (ios_base::badbit); + cout.exceptions (ios_base::failbit | ios_base::badbit | ios_base::eofbit); + + string s; + getline (cin, s, '\0'); + + size_t n (10); + for (size_t i (0); i < s.size (); i += n) + { + cout.write (s.c_str () + i, min (n, s.size () - i)); + cout.flush (); + + // @@ MINGW GCC 4.9 doesn't implement this_thread. If ifdstream + // non-blocking read will ever be implemented use Win32 Sleep() + // instead. + // +#ifndef _WIN32 + this_thread::sleep_for (chrono::milliseconds (50)); +#endif + } + + return 0; } - bool v (argc == 2); dir_path td (dir_path::temp_directory () / dir_path ("butl-fdstream")); // Recreate the temporary directory (that possibly exists from the previous @@ -351,6 +399,43 @@ main (int argc, const char* argv[]) { } + // Test non-blocking reading. + // + try + { + const char* args[] = {argv[0], "-c", nullptr}; + process pr (args, -1, -1); + + ofdstream os (pr.out_fd); + ifdstream is (pr.in_ofd, fdstream_mode::non_blocking); + + const string s ( + "0123456789\nABCDEFGHIJKLMNOPQRSTUVWXYZ\nabcdefghijklmnopqrstuvwxyz"); + + os << s; + os.close (); + + string r; + char buf[3]; + while (!is.eof ()) + { + streamsize n (is.readsome (buf, sizeof (buf))); + r.append (buf, n); + } + + is.close (); + + assert (r == s); + } + catch (const ios::failure&) + { + assert (false); + } + catch (const process_error&) + { + assert (false); + } + #else // Check translation modes. -- cgit v1.1