Initial DelaySocket goodness.

This commit is contained in:
Michael Meeks 2017-03-23 17:14:51 +00:00
parent 1e0deae3df
commit 1c7f94045a
5 changed files with 332 additions and 7 deletions

View file

@ -53,6 +53,7 @@ shared_sources = common/FileUtil.cpp \
common/Unit.cpp \
common/UnitHTTP.cpp \
common/Util.cpp \
net/DelaySocket.cpp \
net/Socket.cpp
if ENABLE_SSL
shared_sources += net/Ssl.cpp
@ -162,6 +163,7 @@ shared_headers = common/Common.hpp \
common/SigUtil.hpp \
common/security.h \
common/SpookyV2.h \
net/DelaySocket.hpp \
net/ServerSocket.hpp \
net/Socket.hpp \
net/WebSocketHandler.hpp \

264
net/DelaySocket.cpp Normal file
View file

@ -0,0 +1,264 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
/*
* This file is part of the LibreOffice project.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#include "config.h"
#include "net/DelaySocket.hpp"
class Delayer;
// FIXME: TerminatingPoll ?
static SocketPoll DelayPoll("delay_poll");
/// Reads from fd, delays that and then writes to _dest.
class DelaySocket : public Socket {
int _delayMs;
bool _closed;
bool _stopPoll;
bool _waitForWrite;
std::weak_ptr<DelaySocket> _dest;
const size_t WindowSize = 64 * 1024;
struct DelayChunk {
std::chrono::steady_clock::time_point _sendTime;
std::vector<char> _data;
DelayChunk(int delayMs)
{
_sendTime = std::chrono::steady_clock::now() +
std::chrono::milliseconds(delayMs);
}
bool isError() { return _data.size() == 0; }
private:
DelayChunk();
};
size_t _chunksSize;
std::vector<std::shared_ptr<DelayChunk>> _chunks;
public:
DelaySocket(int delayMs, int fd) :
Socket (fd), _delayMs(delayMs), _closed(false),
_stopPoll(false), _waitForWrite(false),
_chunksSize(0)
{
// setSocketBufferSize(Socket::DefaultSendBufferSize);
}
void setDestination(const std::weak_ptr<DelaySocket> &dest)
{
_dest = dest;
}
void dumpState(std::ostream& os) override
{
os << "\tfd: " << getFD()
<< "\n\tqueue: " << _chunks.size() << "\n";
auto now = std::chrono::steady_clock::now();
for (auto &chunk : _chunks)
{
os << "\t\tin: " <<
std::chrono::duration_cast<std::chrono::milliseconds>(
chunk->_sendTime - now).count() << "ms - "
<< chunk->_data.size() << "bytes\n";
}
}
// FIXME - really need to propagate 'noDelay' etc.
// have a debug only lookup of delayed sockets for this case ?
int getPollEvents(std::chrono::steady_clock::time_point now,
int &timeoutMaxMs) override
{
auto dest = _dest.lock();
bool bOtherIsWriteBlocked = !dest || dest->_waitForWrite;
bool bWeAreReadBlocked = _chunksSize >= WindowSize;
if (_chunks.size() > 0 && (!bOtherIsWriteBlocked || !bWeAreReadBlocked))
{
int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>(
(*_chunks.begin())->_sendTime - now).count();
if (remainingMs < timeoutMaxMs)
std::cerr << "#" << getFD() << " reset timeout max to " << remainingMs
<< "ms from " << timeoutMaxMs << "ms "
<< "owb: " << bOtherIsWriteBlocked << " rb: "
<< bWeAreReadBlocked << "\n";
timeoutMaxMs = std::min(timeoutMaxMs, remainingMs);
}
if (_stopPoll)
return -1;
int events = 0;
if (!bWeAreReadBlocked)
events |= POLLIN;
// NB. controlled by the other socket.
if (_waitForWrite)
events |= POLLOUT;
return events;
}
void pushCloseChunk(bool bErrorSocket)
{
// socket in error state ? don't keep polling it.
_stopPoll |= bErrorSocket;
_chunks.push_back(std::make_shared<DelayChunk>(_delayMs));
}
HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) override
{
auto dest = _dest.lock();
if (events & POLLIN)
{
auto chunk = std::make_shared<DelayChunk>(_delayMs);
char buf[64 * 1024];
ssize_t len;
size_t toRead = std::min(sizeof(buf), WindowSize - _chunksSize);
if (_closed)
{ // get last data before async close
toRead = sizeof (buf);
}
do {
len = ::read(getFD(), buf, toRead);
} while (len < 0 && errno == EINTR);
if (len >= 0)
{
std::cerr << "#" << getFD() << " read " << len
<< " to queue: " << _chunks.size() << "\n";
chunk->_data.insert(chunk->_data.end(), &buf[0], &buf[len]);
_chunksSize += len;
_chunks.push_back(chunk);
}
else if (errno != EAGAIN && errno != EWOULDBLOCK)
{
std::cerr << "#" << getFD() << " error : " << errno << " " << strerror(errno) << "\n";
pushCloseChunk(true);
}
}
if (_closed)
{
std::cerr << "#" << getFD() << " closing\n";
dumpState(std::cerr);
if (dest)
{
std::cerr << "\t#" << dest->getFD() << " closing linked\n";
dest->dumpState(std::cerr);
dest->pushCloseChunk(false);
_dest.reset();
}
return HandleResult::SOCKET_CLOSED;
}
// Write if we have delayed enough.
if (dest && _chunks.size() > 0)
{
std::shared_ptr<DelayChunk> chunk = *_chunks.begin();
if (std::chrono::duration_cast<std::chrono::milliseconds>(
now - chunk->_sendTime).count() >= 0)
{
dest->_waitForWrite = false;
if (chunk->_data.size() == 0)
{ // delayed error or close
std::cerr << "#" << getFD() << " handling delayed close with " << _chunksSize << "bytes left\n";
_closed = true;
return HandleResult::CONTINUE;
}
ssize_t len;
do {
len = ::write(dest->getFD(), &chunk->_data[0], chunk->_data.size());
} while (len < 0 && errno == EINTR);
if (len < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
dest->_waitForWrite = true;
std::cerr << "#" << dest->getFD() << " full - waiting for write ultimately from fd #" << getFD() << "\n";
}
else
{
std::cerr << "#" << dest->getFD() << " failed onwards write " << len << "bytes of "
<< chunk->_data.size() << " ultimately from fd #" << getFD()
<< " queue: " << _chunks.size() << " error " << strerror(errno) << "\n";
dest->pushCloseChunk(false);
}
}
else
{
std::cerr << "#" << dest->getFD() << " written onwards " << len << "bytes of "
<< chunk->_data.size() << " ultimately from fd #" << getFD()
<< " queue: " << _chunks.size() << "\n";
if (len > 0)
{
chunk->_data.erase(chunk->_data.begin(), chunk->_data.begin() + len);
_chunksSize -= len;
}
if (chunk->_data.size() == 0)
_chunks.erase(_chunks.begin(), _chunks.begin() + 1);
}
}
}
// FIXME: ideally we could avoid polling & delay _closed state etc.
if (events & (POLLERR | POLLHUP | POLLNVAL))
{
std::cerr << "#" << getFD() << " error events: " << events << "\n";
pushCloseChunk(true);
}
return HandleResult::CONTINUE;
}
};
/// Delayer:
///
/// Some terminology:
/// physical socket (DelaySocket's own fd) - what we accepted.
/// internalFd - the internal side of the socket-pair
/// delayFd - what we hand on to our un-suspecting wrapped socket
/// which looks like an external socket - but delayed.
namespace Delay {
int create(int delayMs, int physicalFd)
{
int pair[2];
int rc = socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0, pair);
assert (rc == 0);
int internalFd = pair[0];
int delayFd = pair[1];
auto physical = std::make_shared<DelaySocket>(delayMs, physicalFd);
auto internal = std::make_shared<DelaySocket>(delayMs, internalFd);
physical->setDestination(internal);
internal->setDestination(physical);
DelayPoll.startThread();
DelayPoll.insertNewSocket(physical);
DelayPoll.insertNewSocket(internal);
return delayFd;
}
void dumpState(std::ostream &os)
{
if (DelayPoll.isAlive())
{
os << "Delay poll:\n";
DelayPoll.dumpState(os);
}
}
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

27
net/DelaySocket.hpp Normal file
View file

@ -0,0 +1,27 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
/*
* This file is part of the LibreOffice project.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#ifndef INCLUDED_DELAY_SOCKET_HPP
#define INCLUDED_DELAY_SOCKET_HPP
#include <Socket.hpp>
/// Simulates network latency for local debugging.
///
/// We are lifecycle managed internally based on the physical /
/// delayFd lifecycle.
namespace Delay
{
int create(int delayMs, int physicalFd);
void dumpState(std::ostream &os);
};
#endif
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

View file

@ -84,7 +84,7 @@ public:
virtual HandleResult handlePoll(std::chrono::steady_clock::time_point now, int events) = 0;
/// manage latency issues around packet aggregation
void setNoDelay()
virtual void setNoDelay()
{
const int val = 1;
setsockopt (_fd, IPPROTO_TCP, TCP_NODELAY,
@ -182,7 +182,7 @@ public:
virtual void dumpState(std::ostream&) {}
/// Set the thread-id we're bound to
void setThreadOwner(const std::thread::id &id)
virtual void setThreadOwner(const std::thread::id &id)
{
if (id != _owner)
{
@ -518,8 +518,17 @@ private:
for (size_t i = 0; i < size; ++i)
{
_pollFds[i].fd = _pollSockets[i]->getFD();
_pollFds[i].events = _pollSockets[i]->getPollEvents(now, timeoutMaxMs);
int events = _pollSockets[i]->getPollEvents(now, timeoutMaxMs);
if (events < 0) // timeout on dead socket
{
_pollFds[i].fd = _wakeup[0];
_pollFds[i].events = 0;
}
else
{
_pollFds[i].fd = _pollSockets[i]->getFD();
_pollFds[i].events = events;
}
_pollFds[i].revents = 0;
}

View file

@ -100,6 +100,7 @@
#if ENABLE_SSL
# include "SslSocket.hpp"
#endif
#include "DelaySocket.hpp"
#include "Storage.hpp"
#include "TraceFile.hpp"
#include "Unit.hpp"
@ -167,6 +168,9 @@ int MasterPortNumber = DEFAULT_MASTER_PORT_NUMBER;
//TODO: Move to a more sensible namespace.
static bool DisplayVersion = false;
/// Funky latency simulation basic delay (ms)
static int SimulatedLatencyMs = 150;
// Tracks the set of prisoners / children waiting to be used.
static std::mutex NewChildrenMutex;
static std::condition_variable NewChildrenCV;
@ -2113,17 +2117,32 @@ private:
class PlainSocketFactory : public SocketFactory
{
std::shared_ptr<Socket> create(const int fd) override
std::shared_ptr<Socket> create(const int physicalFd) override
{
return StreamSocket::create<StreamSocket>(fd, std::unique_ptr<SocketHandlerInterface>{ new ClientRequestDispatcher });
int fd = physicalFd;
if (SimulatedLatencyMs > 0)
fd = Delay::create(SimulatedLatencyMs, physicalFd);
std::shared_ptr<Socket> socket =
StreamSocket::create<StreamSocket>(
fd, std::unique_ptr<SocketHandlerInterface>{
new ClientRequestDispatcher });
return socket;
}
};
#if ENABLE_SSL
class SslSocketFactory : public SocketFactory
{
std::shared_ptr<Socket> create(const int fd) override
std::shared_ptr<Socket> create(const int physicalFd) override
{
int fd = physicalFd;
if (SimulatedLatencyMs > 0)
fd = Delay::create(SimulatedLatencyMs, physicalFd);
return StreamSocket::create<SslStreamSocket>(fd, std::unique_ptr<SocketHandlerInterface>{ new ClientRequestDispatcher });
}
};
@ -2133,6 +2152,7 @@ class PrisonerSocketFactory : public SocketFactory
{
std::shared_ptr<Socket> create(const int fd) override
{
// No local delay.
return StreamSocket::create<StreamSocket>(fd, std::unique_ptr<SocketHandlerInterface>{ new PrisonerRequestDispatcher });
}
};
@ -2207,6 +2227,9 @@ public:
os << "Admin poll:\n";
Admin::instance().dumpState(os);
// If we have any delaying work going on.
Delay::dumpState(os);
os << "Document Broker polls "
<< "[ " << DocBrokers.size() << " ]:\n";
for (auto &i : DocBrokers)