diff --git a/Makefile.am b/Makefile.am index a348f3f4b..c72ad4d03 100644 --- a/Makefile.am +++ b/Makefile.am @@ -53,6 +53,7 @@ shared_sources = common/FileUtil.cpp \ common/Unit.cpp \ common/UnitHTTP.cpp \ common/Util.cpp \ + net/DelaySocket.cpp \ net/Socket.cpp if ENABLE_SSL shared_sources += net/Ssl.cpp @@ -162,6 +163,7 @@ shared_headers = common/Common.hpp \ common/SigUtil.hpp \ common/security.h \ common/SpookyV2.h \ + net/DelaySocket.hpp \ net/ServerSocket.hpp \ net/Socket.hpp \ net/WebSocketHandler.hpp \ diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp new file mode 100644 index 000000000..52539e7da --- /dev/null +++ b/net/DelaySocket.cpp @@ -0,0 +1,264 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "config.h" + +#include "net/DelaySocket.hpp" + +class Delayer; + +// FIXME: TerminatingPoll ? +static SocketPoll DelayPoll("delay_poll"); + +/// Reads from fd, delays that and then writes to _dest. +class DelaySocket : public Socket { + int _delayMs; + bool _closed; + bool _stopPoll; + bool _waitForWrite; + std::weak_ptr _dest; + + const size_t WindowSize = 64 * 1024; + + struct DelayChunk { + std::chrono::steady_clock::time_point _sendTime; + std::vector _data; + DelayChunk(int delayMs) + { + _sendTime = std::chrono::steady_clock::now() + + std::chrono::milliseconds(delayMs); + } + bool isError() { return _data.size() == 0; } + private: + DelayChunk(); + }; + + size_t _chunksSize; + std::vector> _chunks; +public: + DelaySocket(int delayMs, int fd) : + Socket (fd), _delayMs(delayMs), _closed(false), + _stopPoll(false), _waitForWrite(false), + _chunksSize(0) + { +// setSocketBufferSize(Socket::DefaultSendBufferSize); + } + void setDestination(const std::weak_ptr &dest) + { + _dest = dest; + } + + void dumpState(std::ostream& os) override + { + os << "\tfd: " << getFD() + << "\n\tqueue: " << _chunks.size() << "\n"; + auto now = std::chrono::steady_clock::now(); + for (auto &chunk : _chunks) + { + os << "\t\tin: " << + std::chrono::duration_cast( + chunk->_sendTime - now).count() << "ms - " + << chunk->_data.size() << "bytes\n"; + } + } + + // FIXME - really need to propagate 'noDelay' etc. + // have a debug only lookup of delayed sockets for this case ? + + int getPollEvents(std::chrono::steady_clock::time_point now, + int &timeoutMaxMs) override + { + auto dest = _dest.lock(); + + bool bOtherIsWriteBlocked = !dest || dest->_waitForWrite; + bool bWeAreReadBlocked = _chunksSize >= WindowSize; + + if (_chunks.size() > 0 && (!bOtherIsWriteBlocked || !bWeAreReadBlocked)) + { + int remainingMs = std::chrono::duration_cast( + (*_chunks.begin())->_sendTime - now).count(); + if (remainingMs < timeoutMaxMs) + std::cerr << "#" << getFD() << " reset timeout max to " << remainingMs + << "ms from " << timeoutMaxMs << "ms " + << "owb: " << bOtherIsWriteBlocked << " rb: " + << bWeAreReadBlocked << "\n"; + timeoutMaxMs = std::min(timeoutMaxMs, remainingMs); + } + + if (_stopPoll) + return -1; + + int events = 0; + + if (!bWeAreReadBlocked) + events |= POLLIN; + + // NB. controlled by the other socket. + if (_waitForWrite) + events |= POLLOUT; + + return events; + } + + void pushCloseChunk(bool bErrorSocket) + { + // socket in error state ? don't keep polling it. + _stopPoll |= bErrorSocket; + _chunks.push_back(std::make_shared(_delayMs)); + } + + HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) override + { + auto dest = _dest.lock(); + + if (events & POLLIN) + { + auto chunk = std::make_shared(_delayMs); + + char buf[64 * 1024]; + ssize_t len; + size_t toRead = std::min(sizeof(buf), WindowSize - _chunksSize); + if (_closed) + { // get last data before async close + toRead = sizeof (buf); + } + do { + len = ::read(getFD(), buf, toRead); + } while (len < 0 && errno == EINTR); + + if (len >= 0) + { + std::cerr << "#" << getFD() << " read " << len + << " to queue: " << _chunks.size() << "\n"; + chunk->_data.insert(chunk->_data.end(), &buf[0], &buf[len]); + _chunksSize += len; + _chunks.push_back(chunk); + } + else if (errno != EAGAIN && errno != EWOULDBLOCK) + { + std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n"; + pushCloseChunk(true); + } + } + + if (_closed) + { + std::cerr << "#" << getFD() << " closing\n"; + dumpState(std::cerr); + if (dest) + { + std::cerr << "\t#" << dest->getFD() << " closing linked\n"; + dest->dumpState(std::cerr); + dest->pushCloseChunk(false); + _dest.reset(); + } + return HandleResult::SOCKET_CLOSED; + } + + // Write if we have delayed enough. + if (dest && _chunks.size() > 0) + { + std::shared_ptr chunk = *_chunks.begin(); + if (std::chrono::duration_cast( + now - chunk->_sendTime).count() >= 0) + { + dest->_waitForWrite = false; + + if (chunk->_data.size() == 0) + { // delayed error or close + std::cerr << "#" << getFD() << " handling delayed close with " << _chunksSize << "bytes left\n"; + _closed = true; + return HandleResult::CONTINUE; + } + + ssize_t len; + do { + len = ::write(dest->getFD(), &chunk->_data[0], chunk->_data.size()); + } while (len < 0 && errno == EINTR); + + if (len < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + dest->_waitForWrite = true; + std::cerr << "#" << dest->getFD() << " full - waiting for write ultimately from fd #" << getFD() << "\n"; + } + else + { + std::cerr << "#" << dest->getFD() << " failed onwards write " << len << "bytes of " + << chunk->_data.size() << " ultimately from fd #" << getFD() + << " queue: " << _chunks.size() << " error " << strerror(errno) << "\n"; + dest->pushCloseChunk(false); + } + } + else + { + std::cerr << "#" << dest->getFD() << " written onwards " << len << "bytes of " + << chunk->_data.size() << " ultimately from fd #" << getFD() + << " queue: " << _chunks.size() << "\n"; + if (len > 0) + { + chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len); + _chunksSize -= len; + } + + if (chunk->_data.size() == 0) + _chunks.erase(_chunks.begin(), _chunks.begin() + 1); + } + } + } + + // FIXME: ideally we could avoid polling & delay _closed state etc. + if (events & (POLLERR | POLLHUP | POLLNVAL)) + { + std::cerr << "#" << getFD() << " error events: " << events << "\n"; + pushCloseChunk(true); + } + return HandleResult::CONTINUE; + } +}; + +/// Delayer: +/// +/// Some terminology: +/// physical socket (DelaySocket's own fd) - what we accepted. +/// internalFd - the internal side of the socket-pair +/// delayFd - what we hand on to our un-suspecting wrapped socket +/// which looks like an external socket - but delayed. +namespace Delay { + int create(int delayMs, int physicalFd) + { + int pair[2]; + int rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0, pair); + assert (rc == 0); + int internalFd = pair[0]; + int delayFd = pair[1]; + + auto physical = std::make_shared(delayMs, physicalFd); + auto internal = std::make_shared(delayMs, internalFd); + physical->setDestination(internal); + internal->setDestination(physical); + + DelayPoll.startThread(); + DelayPoll.insertNewSocket(physical); + DelayPoll.insertNewSocket(internal); + + return delayFd; + } + void dumpState(std::ostream &os) + { + if (DelayPoll.isAlive()) + { + os << "Delay poll:\n"; + DelayPoll.dumpState(os); + } + } +} + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/net/DelaySocket.hpp b/net/DelaySocket.hpp new file mode 100644 index 000000000..df423b523 --- /dev/null +++ b/net/DelaySocket.hpp @@ -0,0 +1,27 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#ifndef INCLUDED_DELAY_SOCKET_HPP +#define INCLUDED_DELAY_SOCKET_HPP + +#include + +/// Simulates network latency for local debugging. +/// +/// We are lifecycle managed internally based on the physical / +/// delayFd lifecycle. +namespace Delay +{ + int create(int delayMs, int physicalFd); + void dumpState(std::ostream &os); +}; + +#endif + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/net/Socket.hpp b/net/Socket.hpp index 8c07e38d3..0e616ddf4 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -84,7 +84,7 @@ public: virtual HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) = 0; /// manage latency issues around packet aggregation - void setNoDelay() + virtual void setNoDelay() { const int val = 1; setsockopt (_fd, IPPROTO_TCP, TCP_NODELAY, @@ -182,7 +182,7 @@ public: virtual void dumpState(std::ostream&) {} /// Set the thread-id we're bound to - void setThreadOwner(const std::thread::id &id) + virtual void setThreadOwner(const std::thread::id &id) { if (id != _owner) { @@ -518,8 +518,17 @@ private: for (size_t i = 0; i < size; ++i) { - _pollFds[i].fd = _pollSockets[i]->getFD(); - _pollFds[i].events = _pollSockets[i]->getPollEvents(now, timeoutMaxMs); + int events = _pollSockets[i]->getPollEvents(now, timeoutMaxMs); + if (events < 0) // timeout on dead socket + { + _pollFds[i].fd = _wakeup[0]; + _pollFds[i].events = 0; + } + else + { + _pollFds[i].fd = _pollSockets[i]->getFD(); + _pollFds[i].events = events; + } _pollFds[i].revents = 0; } diff --git a/wsd/LOOLWSD.cpp b/wsd/LOOLWSD.cpp index a0529380a..f1bb28ea1 100644 --- a/wsd/LOOLWSD.cpp +++ b/wsd/LOOLWSD.cpp @@ -100,6 +100,7 @@ #if ENABLE_SSL # include "SslSocket.hpp" #endif +#include "DelaySocket.hpp" #include "Storage.hpp" #include "TraceFile.hpp" #include "Unit.hpp" @@ -167,6 +168,9 @@ int MasterPortNumber = DEFAULT_MASTER_PORT_NUMBER; //TODO: Move to a more sensible namespace. static bool DisplayVersion = false; +/// Funky latency simulation basic delay (ms) +static int SimulatedLatencyMs = 150; + // Tracks the set of prisoners / children waiting to be used. static std::mutex NewChildrenMutex; static std::condition_variable NewChildrenCV; @@ -2113,17 +2117,32 @@ private: class PlainSocketFactory : public SocketFactory { - std::shared_ptr create(const int fd) override + std::shared_ptr create(const int physicalFd) override { - return StreamSocket::create(fd, std::unique_ptr{ new ClientRequestDispatcher }); + int fd = physicalFd; + + if (SimulatedLatencyMs > 0) + fd = Delay::create(SimulatedLatencyMs, physicalFd); + + std::shared_ptr socket = + StreamSocket::create( + fd, std::unique_ptr{ + new ClientRequestDispatcher }); + + return socket; } }; #if ENABLE_SSL class SslSocketFactory : public SocketFactory { - std::shared_ptr create(const int fd) override + std::shared_ptr create(const int physicalFd) override { + int fd = physicalFd; + + if (SimulatedLatencyMs > 0) + fd = Delay::create(SimulatedLatencyMs, physicalFd); + return StreamSocket::create(fd, std::unique_ptr{ new ClientRequestDispatcher }); } }; @@ -2133,6 +2152,7 @@ class PrisonerSocketFactory : public SocketFactory { std::shared_ptr create(const int fd) override { + // No local delay. return StreamSocket::create(fd, std::unique_ptr{ new PrisonerRequestDispatcher }); } }; @@ -2207,6 +2227,9 @@ public: os << "Admin poll:\n"; Admin::instance().dumpState(os); + // If we have any delaying work going on. + Delay::dumpState(os); + os << "Document Broker polls " << "[ " << DocBrokers.size() << " ]:\n"; for (auto &i : DocBrokers)