libreoffice-online/net/DelaySocket.cpp
Ashod Nakashian 42a7607489 wsd: avoid static SocketPoll
The lifetime management of static objects
is extremely unpredictable and depends on
many variables outside of our control or
even reliable reproducibility.

Complex static objects that own threads
and other objects are doubly problematic
because of their dependency and/or
interaction with other objects.

Here we replace the static DelayPoll
instance with one we control its lifetime
in the LOOLWSD main body, such that it
is destroyed properly.

Specifically, DelayPoll's dtor  was
accessing Poco's Logging subsystem out of
order. That is, after Poco had been
destroyed.

Another advantage to this approach is that
we don't even create the DelayPoll at all
if we don't need it. The onus now is on
the user of DelayPoll to make sure they
create a Delay object with a long-enough
lifetime to encompase it use.

For completeness, here is the stacktrace:

    terminate called after throwing an instance of 'std::bad_alloc'
      what():  std::bad_alloc

    Program received signal SIGABRT, Aborted.
    __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
    51      ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
    (gdb) bt
    #0  __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
    #1  0x00007ffff613f801 in __GI_abort () at abort.c:79
    #2  0x00007ffff6d51957 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
    #3  0x00007ffff6d57ae6 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
    #4  0x00007ffff6d56b49 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
    #5  0x00007ffff6d574b8 in __gxx_personality_v0 () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
    #6  0x00007ffff671f573 in ?? () from /lib/x86_64-linux-gnu/libgcc_s.so.1
    #7  0x00007ffff671fad1 in _Unwind_RaiseException () from /lib/x86_64-linux-gnu/libgcc_s.so.1
    #8  0x00007ffff6d57d47 in __cxa_throw () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
    #9  0x00007ffff6d582dc in operator new(unsigned long) () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
    #10 0x00005555556b5927 in void std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >::_M_construct<char*>(char*, char*, std::forward_iterator_tag) [clone .isra.41] ()
    #11 0x0000555555982a14 in Poco::Message::Message(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, Poco::Message::Priority) ()
    #12 0x000055555585a909 in SocketPoll::~SocketPoll (this=0x555555d10f60 <DelayPoll>, __in_chrg=<optimized out>) at net/Socket.cpp:145
    #13 0x00007ffff6142041 in __run_exit_handlers (status=0, listp=0x7ffff64ea718 <__exit_funcs>, run_list_atexit=run_list_atexit@entry=true, run_dtors=run_dtors@entry=true) at exit.c:108
    #14 0x00007ffff614213a in __GI_exit (status=<optimized out>) at exit.c:139
    #15 0x0000555555752d78 in LOOLWSD::innerInitialize (this=<optimized out>, self=...) at wsd/LOOLWSD.cpp:1213
    #16 0x0000555555788b07 in LOOLWSD::initialize (this=<optimized out>, self=...) at wsd/LOOLWSD.hpp:432
    #17 0x00005555558dd7e4 in Poco::Util::Application::run() ()
    #18 0x00005555556b6574 in main (argc=2, argv=0x7fffffffe528) at wsd/LOOLWSD.cpp:4276

Change-Id: Ifda55fe869fa6734b9c2490da4497d2551ac21c1
Signed-off-by: Ashod Nakashian <ashod.nakashian@collabora.co.uk>
2021-03-13 16:44:09 -05:00

294 lines
9.8 KiB
C++

/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#include <config.h>
#include <net/DelaySocket.hpp>
#include <memory>
#include <mutex>
#define DELAY_LOG(X) std::cerr << X << '\n';
std::shared_ptr<TerminatingPoll> Delay::DelayPoll;
std::once_flag Delay::DelayPollOnceFlag;
/// Reads from fd, delays that and then writes to _dest.
class DelaySocket : public Socket {
int _delayMs;
enum State { ReadWrite, // normal socket
EofFlushWrites, // finish up writes and close
Closed };
State _state;
std::shared_ptr<DelaySocket> _dest; // our writing twin.
const size_t WindowSize = 64 * 1024;
/// queued up data - sent to us by our opposite twin.
struct WriteChunk {
private:
std::chrono::steady_clock::time_point _sendTime;
std::vector<char> _data;
public:
WriteChunk(int delayMs)
{
_sendTime = std::chrono::steady_clock::now() +
std::chrono::milliseconds(delayMs);
}
bool isError() { return _data.size() == 0; }
std::chrono::steady_clock::time_point getSendTime() const { return _sendTime; }
std::vector<char>& getData() { return _data; }
private:
WriteChunk();
};
std::vector<std::shared_ptr<WriteChunk>> _chunks;
public:
DelaySocket(int delayMs, int fd) :
Socket (fd), _delayMs(delayMs),
_state(ReadWrite)
{
// setSocketBufferSize(Socket::DefaultSendBufferSize);
}
void setDestination(const std::shared_ptr<DelaySocket> &dest)
{
_dest = dest;
}
void dumpState(std::ostream& os) override
{
os << "\tfd: " << getFD()
<< "\n\tqueue: " << _chunks.size() << '\n';
auto now = std::chrono::steady_clock::now();
for (auto &chunk : _chunks)
{
os << "\t\tin: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(chunk->getSendTime() - now)
<< " - " << chunk->getData().size() << "bytes\n";
}
}
// FIXME - really need to propagate 'noDelay' etc.
// have a debug only lookup of delayed sockets for this case ?
int getPollEvents(std::chrono::steady_clock::time_point now,
int64_t &timeoutMaxMicroS) override
{
if (_chunks.size() > 0)
{
int64_t remainingMicroS = std::chrono::duration_cast<std::chrono::microseconds>(
(*_chunks.begin())->getSendTime() - now).count();
if (remainingMicroS < timeoutMaxMicroS)
DELAY_LOG('#' << getFD() << " reset timeout max to " << remainingMicroS
<< "us from " << timeoutMaxMicroS << "us\n");
timeoutMaxMicroS = std::min(timeoutMaxMicroS, remainingMicroS);
}
if (_chunks.size() > 0 &&
now > (*_chunks.begin())->getSendTime())
return POLLIN | POLLOUT;
else
return POLLIN;
}
void pushCloseChunk()
{
_chunks.push_back(std::make_shared<WriteChunk>(_delayMs));
}
void changeState(State newState)
{
switch (newState)
{
case ReadWrite:
assert (false);
break;
case EofFlushWrites:
assert (_state == ReadWrite);
assert (_dest);
_dest->pushCloseChunk();
_dest = nullptr;
break;
case Closed:
if (_dest && _state == ReadWrite)
_dest->pushCloseChunk();
_dest = nullptr;
shutdown();
break;
}
DELAY_LOG('#' << getFD() << " changed to state " << newState << '\n');
_state = newState;
}
void handlePoll(SocketDisposition &disposition,
std::chrono::steady_clock::time_point now, int events) override
{
if (_state == ReadWrite && (events & POLLIN))
{
auto chunk = std::make_shared<WriteChunk>(_delayMs);
char buf[64 * 1024];
ssize_t len;
size_t toRead = sizeof(buf); //std::min(sizeof(buf), WindowSize - _chunksSize);
do {
len = ::read(getFD(), buf, toRead);
} while (len < 0 && errno == EINTR);
if (len == 0) // EOF.
changeState(EofFlushWrites);
else if (len >= 0)
{
DELAY_LOG('#' << getFD() << " read " << len
<< " to queue: " << _chunks.size() << '\n');
chunk->getData().insert(chunk->getData().end(), &buf[0], &buf[len]);
if (_dest)
_dest->_chunks.push_back(chunk);
else
assert("no destination for data" && false);
}
else if (errno != EAGAIN && errno != EWOULDBLOCK)
{
DELAY_LOG('#' << getFD() << " error : " << Util::symbolicErrno(errno) << ": " << strerror(errno) << '\n');
changeState(Closed); // FIXME - propagate the error ?
}
}
if (_chunks.size() == 0)
{
if (_state == EofFlushWrites)
changeState(Closed);
}
else // Write if we have delayed enough.
{
std::shared_ptr<WriteChunk> chunk = *_chunks.begin();
if (std::chrono::duration_cast<std::chrono::milliseconds>(
now - chunk->getSendTime()).count() >= 0)
{
if (chunk->getData().size() == 0)
{ // delayed error or close
DELAY_LOG('#' << getFD() << " handling delayed close\n");
changeState(Closed);
}
else
{
ssize_t len;
do {
len = ::write(getFD(), &chunk->getData()[0], chunk->getData().size());
} while (len < 0 && errno == EINTR);
if (len < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
DELAY_LOG('#' << getFD() << " full - waiting for write\n");
}
else
{
DELAY_LOG('#' << getFD() << " failed onwards write "
<< len << "bytes of "
<< chunk->getData().size()
<< " queue: " << _chunks.size() << " error: "
<< Util::symbolicErrno(errno) << ": " << strerror(errno) << '\n');
changeState(Closed);
}
}
else
{
DELAY_LOG('#' << getFD() << " written onwards " << len << "bytes of "
<< chunk->getData().size()
<< " queue: " << _chunks.size() << '\n');
if (len > 0)
chunk->getData().erase(chunk->getData().begin(), chunk->getData().begin() + len);
if (chunk->getData().size() == 0)
_chunks.erase(_chunks.begin(), _chunks.begin() + 1);
}
}
}
}
if (events & (POLLERR | POLLHUP | POLLNVAL))
{
DELAY_LOG('#' << getFD() << " error events: " << events << '\n');
changeState(Closed);
}
if (_state == Closed)
disposition.setClosed();
}
};
/// Delayer:
///
/// Some terminology:
/// physical socket (DelaySocket's own fd) - what we accepted.
/// internalFd - the internal side of the socket-pair
/// delayFd - what we hand on to our un-suspecting wrapped socket
/// which looks like an external socket - but delayed.
Delay::Delay(std::size_t latencyMs)
{
if (latencyMs)
{
// This will be called exactly once, and all
// competing threads (if more than one) will
// block until it returns.
std::call_once(DelayPollOnceFlag, []() {
DelayPoll = std::make_shared<TerminatingPoll>("delay_poll");
DelayPoll->startThread(); // Start the thread.
});
}
}
Delay::~Delay() { DelayPoll.reset(); }
int Delay::create(int delayMs, int physicalFd)
{
auto delayPoll = DelayPoll;
if (delayPoll && delayPoll->isAlive())
{
int pair[2];
int rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0, pair);
assert(rc == 0);
(void)rc;
int internalFd = pair[0];
int delayFd = pair[1];
auto physical = std::make_shared<DelaySocket>(delayMs, physicalFd);
auto internal = std::make_shared<DelaySocket>(delayMs, internalFd);
physical->setDestination(internal);
internal->setDestination(physical);
delayPoll->insertNewSocket(physical);
delayPoll->insertNewSocket(internal);
LOG_TRC("Created DelaySockets with physicalFd: " << physicalFd
<< " and internalFd: " << internalFd);
return delayFd;
}
else
{
LOG_ERR("Failed to create DelaySockets for physicalFd: " << physicalFd
<< ". No DelayPoll exists.");
}
return -1;
}
void Delay::dumpState(std::ostream& os)
{
auto delayPoll = DelayPoll;
if (delayPoll && delayPoll->isAlive())
{
os << "Delay poll:\n";
delayPoll->dumpState(os);
}
else
{
os << "Delay poll: doesn't exist.\n";
}
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */