From 71f4597059f2606ad8394161b6eda1c51c88c8d3 Mon Sep 17 00:00:00 2001 From: Ashod Nakashian Date: Sun, 11 Feb 2018 19:14:21 -0500 Subject: [PATCH] wsd: WebSocket state is a property of the socket This resolves the erroneous warnings of pinging on a non-upgraded (i.e. HTTP) socket. This was due to the fact that we moved the socket from one SocketHandlerInterface to a WebSocketHandler after upgrading and since the WSState was a property of the handler, the WebSocketHandler didn't know that the socket had already been upgraded. Also other cosmetics and cleanups. Change-Id: I1a88edef750117ed551d23245e49380371561422 Reviewed-on: https://gerrit.libreoffice.org/49911 Reviewed-by: Ashod Nakashian Tested-by: Ashod Nakashian --- net/Socket.cpp | 2 +- net/Socket.hpp | 25 ++++++++++------- net/WebSocketHandler.hpp | 59 ++++++++++++++++++++-------------------- 3 files changed, 46 insertions(+), 40 deletions(-) diff --git a/net/Socket.cpp b/net/Socket.cpp index d0a791ec9..31ad77c6c 100644 --- a/net/Socket.cpp +++ b/net/Socket.cpp @@ -146,7 +146,7 @@ void SocketDisposition::execute() void WebSocketHandler::dumpState(std::ostream& os) { os << (_shuttingDown ? "shutd " : "alive ") - << std::setw(5) << 1.0*_pingTimeUs/1000 << "ms "; + << std::setw(5) << _pingTimeUs/1000. << "ms "; if (_wsPayload.size() > 0) Util::dumpHex(os, "\t\tws queued payload:\n", "\t\t", _wsPayload); os << "\n"; diff --git a/net/Socket.hpp b/net/Socket.hpp index bec3744de..7c8563358 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -283,7 +283,7 @@ protected: setNoDelay(); _sendBufferSize = DefaultSendBufferSize; _owner = std::this_thread::get_id(); - LOG_DBG("#" << _fd << " Thread affinity set to " << _owner << "."); + LOG_DBG("#" << _fd << " Thread affinity set to " << Log::to_string(_owner) << "."); #if ENABLE_DEBUG if (std::getenv("LOOL_ZERO_BUFFER_SIZE")) @@ -700,10 +700,11 @@ public: StreamSocket(const int fd, std::shared_ptr socketHandler) : Socket(fd), _socketHandler(std::move(socketHandler)), - _closed(false), - _shutdownSignalled(false), _bytesSent(0), - _bytesRecvd(0) + _bytesRecvd(0), + _wsState(WSState::HTTP), + _closed(false), + _shutdownSignalled(false) { LOG_DBG("StreamSocket ctor #" << fd); @@ -730,6 +731,8 @@ public: } bool isClosed() const { return _closed; } + bool isWebSocket() const { return _wsState == WSState::WS; } + void setWebSocket() { _wsState = WSState::WS; } /// Just trigger the async shutdown. virtual void shutdown() override @@ -968,18 +971,20 @@ protected: /// Client handling the actual data. std::shared_ptr _socketHandler; + std::vector _inBuffer; + std::vector _outBuffer; + + uint64_t _bytesSent; + uint64_t _bytesRecvd; + + enum class WSState { HTTP, WS } _wsState; + /// True if we are already closed. bool _closed; /// True when shutdown was requested via shutdown(). bool _shutdownSignalled; - std::vector< char > _inBuffer; - std::vector< char > _outBuffer; - - uint64_t _bytesSent; - uint64_t _bytesRecvd; - // To be able to access _inBuffer and _outBuffer. // TODO we probably need accessors to the _inBuffer & _outBuffer // instead of this many friends... diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp index dd869ebb6..d981ab4df 100644 --- a/net/WebSocketHandler.hpp +++ b/net/WebSocketHandler.hpp @@ -25,30 +25,29 @@ class WebSocketHandler : public SocketHandlerInterface { protected: - // The socket that owns us (we can't own it). + /// The socket that owns us (we can't own it). std::weak_ptr _socket; - const int InitialPingDelayMs = 25; - const int PingFrequencyMs = 18 * 1000; - std::chrono::steady_clock::time_point _pingSent; + std::chrono::steady_clock::time_point _lastPingSentTime; int _pingTimeUs; std::vector _wsPayload; - bool _shuttingDown; - enum class WSState { HTTP, WS } _wsState; + std::atomic _shuttingDown; - enum class WSFrameMask : unsigned char + struct WSFrameMask { - Fin = 0x80, - Mask = 0x80 + static const unsigned char Fin = 0x80; + static const unsigned char Mask = 0x80; }; + static const int InitialPingDelayMs = 25; + static const int PingFrequencyMs = 18 * 1000; + public: WebSocketHandler() : - _pingSent(std::chrono::steady_clock::now()), + _lastPingSentTime(std::chrono::steady_clock::now()), _pingTimeUs(0), - _shuttingDown(false), - _wsState(WSState::HTTP) + _shuttingDown(false) { } @@ -56,12 +55,11 @@ public: WebSocketHandler(const std::weak_ptr& socket, const Poco::Net::HTTPRequest& request) : _socket(socket), - _pingSent(std::chrono::steady_clock::now() - + _lastPingSentTime(std::chrono::steady_clock::now() - std::chrono::milliseconds(PingFrequencyMs) - std::chrono::milliseconds(InitialPingDelayMs)), _pingTimeUs(0), - _shuttingDown(false), - _wsState(WSState::HTTP) + _shuttingDown(false) { upgradeToWebSocket(request); } @@ -110,7 +108,7 @@ public: buf[0] = ((((int)statusCode) >> 8) & 0xff); buf[1] = ((((int)statusCode) >> 0) & 0xff); std::copy(statusMessage.begin(), statusMessage.end(), buf.begin() + 2); - const unsigned char flags = static_cast(WSFrameMask::Fin) + const unsigned char flags = WSFrameMask::Fin | static_cast(WSOpCode::Close); sendFrame(socket, buf.data(), buf.size(), flags); @@ -197,9 +195,12 @@ public: switch (code) { case WSOpCode::Pong: - _pingTimeUs = std::chrono::duration_cast(std::chrono::steady_clock::now() - _pingSent).count(); + { + _pingTimeUs = std::chrono::duration_cast + (std::chrono::steady_clock::now() - _lastPingSentTime).count(); LOG_TRC("#" << socket->getFD() << ": Pong received: " << _pingTimeUs << " microseconds"); break; + } case WSOpCode::Ping: LOG_ERR("#" << socket->getFD() << ": Clients should not send pings, only servers"); // drop through @@ -263,39 +264,39 @@ public: int & timeoutMaxMs) override { const int timeSincePingMs = - std::chrono::duration_cast(now - _pingSent).count(); + std::chrono::duration_cast(now - _lastPingSentTime).count(); timeoutMaxMs = std::min(timeoutMaxMs, PingFrequencyMs - timeSincePingMs); return POLLIN; } /// Send a ping message void sendPing(std::chrono::steady_clock::time_point now, - const std::shared_ptr& socket) + const std::shared_ptr& socket) { assert(socket && "Expected a valid socket instance."); // Must not send this before we're upgraded. - if (_wsState != WSState::WS) + if (!socket->isWebSocket()) { - LOG_WRN("Attempted ping on non-upgraded websocket!"); - _pingSent = now; // Pretend we sent it to avoid timing out immediately. + LOG_WRN("Attempted ping on non-upgraded websocket! #" << socket->getFD()); + _lastPingSentTime = now; // Pretend we sent it to avoid timing out immediately. return; } LOG_TRC("#" << socket->getFD() << ": Sending ping."); // FIXME: allow an empty payload. sendMessage("", 1, WSOpCode::Ping, false); - _pingSent = now; + _lastPingSentTime = now; } /// Do we need to handle a timeout ? void checkTimeout(std::chrono::steady_clock::time_point now) override { const int timeSincePingMs = - std::chrono::duration_cast(now - _pingSent).count(); + std::chrono::duration_cast(now - _lastPingSentTime).count(); if (timeSincePingMs >= PingFrequencyMs) { - const std::shared_ptr socket = _socket.lock(); + const std::shared_ptr socket = _socket.lock(); if (socket) sendPing(now, socket); } @@ -322,7 +323,7 @@ public: //TODO: Support fragmented messages. std::shared_ptr socket = _socket.lock(); - return sendFrame(socket, data, len, static_cast(WSFrameMask::Fin) | static_cast(code), flush); + return sendFrame(socket, data, len, WSFrameMask::Fin | static_cast(code), flush); } protected: @@ -406,7 +407,7 @@ protected: throw std::runtime_error("Invalid socket while upgrading to WebSocket. Request: " + req.getURI()); LOG_TRC("#" << socket->getFD() << ": Upgrading to WebSocket."); - assert(_wsState == WSState::HTTP); + assert(!socket->isWebSocket()); // create our websocket goodness ... const int wsVersion = std::stoi(req.get("Sec-WebSocket-Version", "13")); @@ -432,11 +433,11 @@ protected: LOG_TRC("#" << socket->getFD() << ": Sending WS Upgrade response: " << res); socket->send(res); - _wsState = WSState::WS; + socket->setWebSocket(); // No need to ping right upon connection/upgrade, // but do reset the time to avoid pinging immediately after. - _pingSent = std::chrono::steady_clock::now(); + _lastPingSentTime = std::chrono::steady_clock::now(); } };