wsd: WebSocket state is a property of the socket

This resolves the erroneous warnings of pinging
on a non-upgraded (i.e. HTTP) socket.

This was due to the fact that we moved the socket
from one SocketHandlerInterface to a WebSocketHandler
after upgrading and since the WSState was a property
of the handler, the WebSocketHandler didn't know
that the socket had already been upgraded.

Also other cosmetics and cleanups.

Change-Id: I1a88edef750117ed551d23245e49380371561422
Reviewed-on: https://gerrit.libreoffice.org/49911
Reviewed-by: Ashod Nakashian <ashnakash@gmail.com>
Tested-by: Ashod Nakashian <ashnakash@gmail.com>
This commit is contained in:
Ashod Nakashian 2018-02-11 19:14:21 -05:00 committed by Ashod Nakashian
parent 224660a0ba
commit 71f4597059
3 changed files with 46 additions and 40 deletions

View file

@ -146,7 +146,7 @@ void SocketDisposition::execute()
void WebSocketHandler::dumpState(std::ostream& os)
{
os << (_shuttingDown ? "shutd " : "alive ")
<< std::setw(5) << 1.0*_pingTimeUs/1000 << "ms ";
<< std::setw(5) << _pingTimeUs/1000. << "ms ";
if (_wsPayload.size() > 0)
Util::dumpHex(os, "\t\tws queued payload:\n", "\t\t", _wsPayload);
os << "\n";

View file

@ -283,7 +283,7 @@ protected:
setNoDelay();
_sendBufferSize = DefaultSendBufferSize;
_owner = std::this_thread::get_id();
LOG_DBG("#" << _fd << " Thread affinity set to " << _owner << ".");
LOG_DBG("#" << _fd << " Thread affinity set to " << Log::to_string(_owner) << ".");
#if ENABLE_DEBUG
if (std::getenv("LOOL_ZERO_BUFFER_SIZE"))
@ -700,10 +700,11 @@ public:
StreamSocket(const int fd, std::shared_ptr<SocketHandlerInterface> socketHandler) :
Socket(fd),
_socketHandler(std::move(socketHandler)),
_closed(false),
_shutdownSignalled(false),
_bytesSent(0),
_bytesRecvd(0)
_bytesRecvd(0),
_wsState(WSState::HTTP),
_closed(false),
_shutdownSignalled(false)
{
LOG_DBG("StreamSocket ctor #" << fd);
@ -730,6 +731,8 @@ public:
}
bool isClosed() const { return _closed; }
bool isWebSocket() const { return _wsState == WSState::WS; }
void setWebSocket() { _wsState = WSState::WS; }
/// Just trigger the async shutdown.
virtual void shutdown() override
@ -968,18 +971,20 @@ protected:
/// Client handling the actual data.
std::shared_ptr<SocketHandlerInterface> _socketHandler;
std::vector<char> _inBuffer;
std::vector<char> _outBuffer;
uint64_t _bytesSent;
uint64_t _bytesRecvd;
enum class WSState { HTTP, WS } _wsState;
/// True if we are already closed.
bool _closed;
/// True when shutdown was requested via shutdown().
bool _shutdownSignalled;
std::vector< char > _inBuffer;
std::vector< char > _outBuffer;
uint64_t _bytesSent;
uint64_t _bytesRecvd;
// To be able to access _inBuffer and _outBuffer.
// TODO we probably need accessors to the _inBuffer & _outBuffer
// instead of this many friends...

View file

@ -25,30 +25,29 @@
class WebSocketHandler : public SocketHandlerInterface
{
protected:
// The socket that owns us (we can't own it).
/// The socket that owns us (we can't own it).
std::weak_ptr<StreamSocket> _socket;
const int InitialPingDelayMs = 25;
const int PingFrequencyMs = 18 * 1000;
std::chrono::steady_clock::time_point _pingSent;
std::chrono::steady_clock::time_point _lastPingSentTime;
int _pingTimeUs;
std::vector<char> _wsPayload;
bool _shuttingDown;
enum class WSState { HTTP, WS } _wsState;
std::atomic<bool> _shuttingDown;
enum class WSFrameMask : unsigned char
struct WSFrameMask
{
Fin = 0x80,
Mask = 0x80
static const unsigned char Fin = 0x80;
static const unsigned char Mask = 0x80;
};
static const int InitialPingDelayMs = 25;
static const int PingFrequencyMs = 18 * 1000;
public:
WebSocketHandler() :
_pingSent(std::chrono::steady_clock::now()),
_lastPingSentTime(std::chrono::steady_clock::now()),
_pingTimeUs(0),
_shuttingDown(false),
_wsState(WSState::HTTP)
_shuttingDown(false)
{
}
@ -56,12 +55,11 @@ public:
WebSocketHandler(const std::weak_ptr<StreamSocket>& socket,
const Poco::Net::HTTPRequest& request) :
_socket(socket),
_pingSent(std::chrono::steady_clock::now() -
_lastPingSentTime(std::chrono::steady_clock::now() -
std::chrono::milliseconds(PingFrequencyMs) -
std::chrono::milliseconds(InitialPingDelayMs)),
_pingTimeUs(0),
_shuttingDown(false),
_wsState(WSState::HTTP)
_shuttingDown(false)
{
upgradeToWebSocket(request);
}
@ -110,7 +108,7 @@ public:
buf[0] = ((((int)statusCode) >> 8) & 0xff);
buf[1] = ((((int)statusCode) >> 0) & 0xff);
std::copy(statusMessage.begin(), statusMessage.end(), buf.begin() + 2);
const unsigned char flags = static_cast<unsigned char>(WSFrameMask::Fin)
const unsigned char flags = WSFrameMask::Fin
| static_cast<char>(WSOpCode::Close);
sendFrame(socket, buf.data(), buf.size(), flags);
@ -197,9 +195,12 @@ public:
switch (code)
{
case WSOpCode::Pong:
_pingTimeUs = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _pingSent).count();
{
_pingTimeUs = std::chrono::duration_cast<std::chrono::microseconds>
(std::chrono::steady_clock::now() - _lastPingSentTime).count();
LOG_TRC("#" << socket->getFD() << ": Pong received: " << _pingTimeUs << " microseconds");
break;
}
case WSOpCode::Ping:
LOG_ERR("#" << socket->getFD() << ": Clients should not send pings, only servers");
// drop through
@ -263,39 +264,39 @@ public:
int & timeoutMaxMs) override
{
const int timeSincePingMs =
std::chrono::duration_cast<std::chrono::milliseconds>(now - _pingSent).count();
std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastPingSentTime).count();
timeoutMaxMs = std::min(timeoutMaxMs, PingFrequencyMs - timeSincePingMs);
return POLLIN;
}
/// Send a ping message
void sendPing(std::chrono::steady_clock::time_point now,
const std::shared_ptr<Socket>& socket)
const std::shared_ptr<StreamSocket>& socket)
{
assert(socket && "Expected a valid socket instance.");
// Must not send this before we're upgraded.
if (_wsState != WSState::WS)
if (!socket->isWebSocket())
{
LOG_WRN("Attempted ping on non-upgraded websocket!");
_pingSent = now; // Pretend we sent it to avoid timing out immediately.
LOG_WRN("Attempted ping on non-upgraded websocket! #" << socket->getFD());
_lastPingSentTime = now; // Pretend we sent it to avoid timing out immediately.
return;
}
LOG_TRC("#" << socket->getFD() << ": Sending ping.");
// FIXME: allow an empty payload.
sendMessage("", 1, WSOpCode::Ping, false);
_pingSent = now;
_lastPingSentTime = now;
}
/// Do we need to handle a timeout ?
void checkTimeout(std::chrono::steady_clock::time_point now) override
{
const int timeSincePingMs =
std::chrono::duration_cast<std::chrono::milliseconds>(now - _pingSent).count();
std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastPingSentTime).count();
if (timeSincePingMs >= PingFrequencyMs)
{
const std::shared_ptr<Socket> socket = _socket.lock();
const std::shared_ptr<StreamSocket> socket = _socket.lock();
if (socket)
sendPing(now, socket);
}
@ -322,7 +323,7 @@ public:
//TODO: Support fragmented messages.
std::shared_ptr<StreamSocket> socket = _socket.lock();
return sendFrame(socket, data, len, static_cast<unsigned char>(WSFrameMask::Fin) | static_cast<unsigned char>(code), flush);
return sendFrame(socket, data, len, WSFrameMask::Fin | static_cast<unsigned char>(code), flush);
}
protected:
@ -406,7 +407,7 @@ protected:
throw std::runtime_error("Invalid socket while upgrading to WebSocket. Request: " + req.getURI());
LOG_TRC("#" << socket->getFD() << ": Upgrading to WebSocket.");
assert(_wsState == WSState::HTTP);
assert(!socket->isWebSocket());
// create our websocket goodness ...
const int wsVersion = std::stoi(req.get("Sec-WebSocket-Version", "13"));
@ -432,11 +433,11 @@ protected:
LOG_TRC("#" << socket->getFD() << ": Sending WS Upgrade response: " << res);
socket->send(res);
_wsState = WSState::WS;
socket->setWebSocket();
// No need to ping right upon connection/upgrade,
// but do reset the time to avoid pinging immediately after.
_pingSent = std::chrono::steady_clock::now();
_lastPingSentTime = std::chrono::steady_clock::now();
}
};