Ben's method works but it is a terrible method in the real-world application. With his method, there is no in-memory buffer at all and you are essentially outputting every single character.
To achieve the goal, you must create two types of classes.
A filebuf class derived from std::streambuf with virtual methods sync(), overflow(),xsputn(),seekoff(),seekpos() overrode. It also needs to have appropriate buffering.
A stream class derived from std::basic_ostream. It should have a private member as your customized filebuf, for example buf, and call std::basic_ios<CharT,Traits>::init like this->init(&buf) in its constructor. You DON'T need to define any other methods because std::basic_ostream will handle it for you. Once finishing the initiation step, this->rdbuf() will return &buf.
The underlying call stack is like the following
basic_ostream& operator<<:
rdbuf()->sputn, which will call rdbuf()->xsputn
basic_ostream& put( char_type ch )
rdbuf()->sputc, which will call rdbuf()->overflow
basic_ostream& write( const char_type* s, std::streamsize count)
rdbuf()->sputn, which will call rdbuf()->xsputn
pos_type tellp()
rdbuf()->pubseekoff, which will call rdbuf()->seekoff
basic_ostream& seekp( off_type off, std::ios_base::seekdir dir )
rdbuf()->pubseekpos, which will call rdbuf()->seekpos
basic_ostream& flush()
rdbuf()->pubsync, which will call rdbuf()->sync
Here is the an example, the full example is here https://github.com/luohancfd/mpistream
In most scenarios, you only need to change the functions to open_file, close_file, seek_pos, tell_pos, and write_data. By tweaking buf_size, you can get a significant performance improvement.
// GPL v3.0 class MPI_filebuf : public std::streambuf { public: using Base = std::streambuf; using char_type = typename Base::char_type; using int_type = typename Base::int_type; using pos_type = typename Base::pos_type; using off_type = typename Base::off_type; private: static const std::streamsize buf_size = BUFSIZ; char buffer_[buf_size]; MPI_File fhw; bool opened; /** * @brief Always save one extra space in buffer_ * for overflow */ inline void reset_ptr() { setp(buffer_, buffer_ + buf_size - 1); } protected: /** * @brief For output streams, this typically results in writing the contents * of the put area into the associated sequence, i.e. flushing of the output * buffer. * * @return int Returns 0 on success, -1 otherwise. The base class * version returns 0. */ inline int sync() override { int ret = 0; if (pbase() < pptr()) { const int_type tmp = overflow(); if (traits_type::eq_int_type(tmp, traits_type::eof())) { ret = -1; } } return ret; } /** * @brief Write overflowed chars to file, derived from std::streambuf * It's user's responsibility to maintain correct sequence of * output as we are using shared file pointer * * @param ch * @return int_type Returns unspecified value not equal to Traits::eof() on * success, Traits::eof() on failure. */ inline int_type overflow(int_type ch = traits_type::eof()) override { // https://github.com/gcc-mirror/gcc/blob/master/libstdc%2B%2B-v3/include/bits/fstream.tcc int_type ret = traits_type::eof(); const bool testeof = traits_type::eq_int_type(ch, ret); if (pptr() == nullptr) { reset_ptr(); if (!testeof) { ret = sputc(ch); } } else { if (!testeof) { *pptr() = traits_type::to_char_type(ch); pbump(1); } if (write(pbase(), pptr() - pbase())) { ret = traits_type::not_eof(ch); } reset_ptr(); } return ret; } /** * @brief Writes \c count characters to the output sequence from the character * array whose first element is pointed to by \c s . Overwrite this function * to achieve no_buffered I/O * * @param s * @param n * @return std::streamsize */ inline std::streamsize xsputn(const char_type *s, std::streamsize n) override { std::streamsize bufavail = epptr() - pptr(); std::streamsize ret = n; // fill buffer up to "buf_size" std::streamsize nfill = std::min(n, bufavail + 1); std::copy(s, s + nfill, pptr()); pbump(nfill); // if nfill == bufavail+1, pptr() == epptr() if (nfill == bufavail + 1) { // equiv: bufavail + 1<= n if (!write(pbase(), pptr() - pbase())) { ret = -1; } else { reset_ptr(); s += nfill; n -= nfill; /* repeatedly write every chunk until there is less data than buf_size - 1 */ while (n >= buf_size - 1) { write(s, buf_size); s += buf_size; n -= buf_size; } std::copy(s, s + n, pptr()); pbump(n); } } return ret; } /** * @brief Sets the position indicator of the input and/or output * sequence relative to some other position. It will flush * the internal buffer to the file * @note This function is collective, which means seekp(), tellp() * need to be called by all processors * * @param off relative position to set the position indicator to. * @param dir defines base position to apply the relative offset to. * It can be one of the following constants: beg, cur, end * @param which * @return pos_type The resulting absolute position as defined by the position * indicator. */ inline pos_type seekoff(off_type off, std::ios_base::seekdir dir, __attribute__((__unused__)) std::ios_base::openmode which = std::ios_base::out) override { int ret = pos_type(off_type(-1)); if (is_open()) { int whence; if (dir == std::ios_base::beg) whence = MPI_SEEK_SET; else if (dir == std::ios_base::cur) whence = MPI_SEEK_CUR; else whence = MPI_SEEK_END; sync(); /*!< write data to the file */ if (off != off_type(0) || whence != SEEK_CUR) { if (MPI_File_seek_shared(fhw, off, whence)) { // fail to seek return ret; } } MPI_Offset tmp; MPI_File_get_position_shared(fhw, &tmp); ret = pos_type(tmp); } return ret; } inline pos_type seekpos(pos_type pos, __attribute__((__unused__)) std::ios_base::openmode which = std::ios_base::out) override { return seekoff(off_type(pos), std::ios_base::beg); } /** * @brief Method doing the real writing. It moves the data in the * internal buffer to the file * * @param pbeg * @param nch * @return true Succeed to write * @return false Fail to write */ inline bool write(const char_type *pbeg, std::streamsize nch) { return nch == 0 || !MPI_File_write_shared(fhw, pbeg, nch, MPI_CHAR, MPI_STATUS_IGNORE); } public: MPI_filebuf() : buffer_{}, opened(false) { setp(buffer_, buffer_ + buf_size - 1); } virtual ~MPI_filebuf() override { if (opened) close(); }; /** * @brief return nullptr if fail * * @param file_name * @return MPI_filebuf* */ MPI_filebuf *open(const char file_name[]); inline bool is_open() const { return opened; } MPI_filebuf *close() { sync(); return MPI_File_close(&fhw) ? nullptr : this; } }; /* ---------------------------------------------------------------------- */ class mpistream : public std::basic_ostream<char> { public: // Types using Base = std::basic_ostream<char>; using int_type = typename Base::int_type; using char_type = typename Base::char_type; using pos_type = typename Base::pos_type; using off_type = typename Base::off_type; using traits_type = typename Base::traits_type; // Non-standard types: using filebuf_type = MPI_filebuf; using ostream_type = Base; private: filebuf_type filebuf; public: mpistream() : ostream_type(), filebuf() { this->init(&filebuf); } mpistream(const char file_name[]) : ostream_type(), filebuf() { this->init(&filebuf); open(file_name); } mpistream(const mpistream &) = delete; mpistream(mpistream &&__rhs) : ostream_type(std::move(__rhs)), filebuf(std::move(__rhs.filebuf)) { ostream_type::set_rdbuf(&filebuf); } ~mpistream() {} inline void open(const char file_name[]) { if (filebuf.open(file_name) == nullptr) { this->setstate(std::ios_base::failbit); } else { this->clear(); } } inline bool is_open() const { return filebuf.is_open(); } inline void close() { if (!filebuf.close()) { this->setstate(ios_base::failbit); } } };