Michael Meeks f845ac08af bgsave: have a single source for InputProcessing enable & disable.
Somehow this state can get confused in a bgsave process:

   Kit Document:
         inputProcessingEnabled: false
     Poll [kit] with 1 socket - wakeup rfd: 39 wfd: 45
             fd        events        rbuffered        wbuffered        rtotal        wtotal
             52        0x1        process             0             0         r:    825

'process' should read 'ignore' for disabled input.

Signed-off-by: Michael Meeks <>
Change-Id: I787eebe6fda3ae1b527d7605b8813fa764e81890
2024-04-18 17:19:19 +01:00

1056 lines
37 KiB

/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
* Copyright the Collabora Online contributors.
* SPDX-License-Identifier: MPL-2.0
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at
#pragma once
#include "common/Protocol.hpp"
#include "common/Common.hpp"
#include "common/Log.hpp"
#include "common/Unit.hpp"
#include "common/Util.hpp"
#include "Socket.hpp"
#include <net/HttpRequest.hpp>
#include <Poco/Net/HTTPResponse.h>
#include <chrono>
#include <memory>
#include <vector>
class WebSocketHandler : public ProtocolHandlerInterface
/// The socket that owns us (we can't own it).
std::weak_ptr<StreamSocket> _socket;
std::chrono::steady_clock::time_point _lastPingSentTime;
int _pingTimeUs;
bool _isMasking;
bool _inFragmentBlock;
/// The security key. Meaningful only for clients.
const std::string _key;
unsigned char _lastFlags; //< The flags in the last frame.
std::vector<char> _wsPayload;
std::atomic<bool> _shuttingDown;
const bool _isClient;
// Last member.
/// The UnitBase instance. We capture it here since
/// this is our instance, but the test framework
/// has a single global instance via UnitWSD::get().
UnitBase* const _unit;
struct WSFrameMask
static constexpr unsigned char Fin = 0x80;
static constexpr unsigned char Mask = 0x80;
static constexpr std::chrono::microseconds InitialPingDelayMicroS = std::chrono::milliseconds(25);
static constexpr std::chrono::microseconds PingFrequencyMicroS = std::chrono::seconds(18);
/// Perform upgrade ourselves, or select a client web socket.
/// Parameters:
/// isClient: the instance should behave like a client (true) or like a server (false)
/// (from websocket perspective)
/// isMasking: a client should mask (true) or not (false) outgoing frames
WebSocketHandler(bool isClient, [[maybe_unused]] bool isMasking)
_lastPingSentTime(std::chrono::steady_clock::now() -
std::chrono::microseconds(PingFrequencyMicroS) +
, _pingTimeUs(0)
, _isMasking(isClient && isMasking)
, _inFragmentBlock(false)
, _key(isClient ? generateKey() : std::string())
, _lastFlags(0)
, _isClient(isClient)
, _unit(UnitBase::isUnitTesting() ? &UnitBase::get() : nullptr)
/// Upgrades itself to a websocket directly.
/// Parameters:
/// socket: the TCP socket which received the upgrade request
/// request: the HTTP upgrade request to WebSocket
template <typename T>
WebSocketHandler(const std::shared_ptr<StreamSocket>& socket, const T& request)
: WebSocketHandler(/*isClient=*/false, /*isMasking=*/false)
if (!socket)
throw std::runtime_error("Invalid socket while upgrading to WebSocket.");
_socket = socket;
// As a server, respond with 101 protocol-upgrade.
upgradeToWebSocket(socket, request);
/// Status codes sent to peer on shutdown.
enum class StatusCodes : unsigned short
RESERVED = 1004,
/// Returns the Web-Socket Security Key generated for this instance.
const std::string& getWebSocketKey() const { return _key; }
/// Returns the flags of the last received WS frame.
unsigned char lastFlags() const { return _lastFlags; }
/// Create a WebSocket connection to the given @host
/// and @port and add the socket to @poll.
bool wsRequest(http::Request& req, const std::string& host, const std::string& port,
bool isSecure, SocketPoll& poll)
const std::string hostAndPort = host + ':' + port;
LOG_TRC("Web-Socket request: " << hostAndPort);
auto socket = net::connect(host, port, isSecure, shared_from_this());
if (!socket)
LOG_ERR("Failed to connect to " << host << ':' << port);
return false;
req.set("Host", hostAndPort); // Make sure the host is set.
req.set("Date", Util::getHttpTimeNow());
req.set("User-Agent", http::getAgentString());
req.set("Connection", "Upgrade");
req.set("Upgrade", "websocket");
req.set("Sec-WebSocket-Version", "13");
req.set("Sec-WebSocket-Key", getWebSocketKey());
if (socket->send(req))
return true;
LOG_ERR("Failed to make WebSocket request");
return false;
/// Implementation of the ProtocolHandlerInterface.
void onConnect(const std::shared_ptr<StreamSocket>& socket) override
LOG_ASSERT_MSG(socket, "Invalid socket passed to WebSocketHandler::onConnect");
_socket = socket;
LOG_TRC("Connected to WS Handler " << this);
/// Sends WS Close frame to the peer.
void sendCloseFrame(const StatusCodes statusCode = StatusCodes::NORMAL_CLOSE,
const std::string& statusMessage = std::string())
std::shared_ptr<StreamSocket> socket = _socket.lock();
if (!socket)
LOG_ERR("No socket associated with WebSocketHandler " << this
<< " to send Close Frame to");
if (socket->isClosed())
LOG_DBG("Socket is closed. Cannot send Close Frame");
// Don't send close-frame more than once.
if (!_shuttingDown)
LOG_TRC("Shutdown websocket, code: " << static_cast<unsigned>(statusCode)
<< ", message: " << statusMessage);
_shuttingDown = true;
if (!Util::isMobileApp())
const size_t len = statusMessage.size();
std::vector<char> buf(2 + len);
buf[0] = ((((int)statusCode) >> 8) & 0xff);
buf[1] = ((((int)statusCode) >> 0) & 0xff);
std::copy(statusMessage.begin(), statusMessage.end(), buf.begin() + 2);
const unsigned char flags = WSFrameMask::Fin | static_cast<char>(WSOpCode::Close);
sendFrame(socket,, buf.size(), flags);
void shutdown(bool goingAway, const std::string &statusMessage) override
shutdown(goingAway ? WebSocketHandler::StatusCodes::ENDPOINT_GOING_AWAY :
WebSocketHandler::StatusCodes::NORMAL_CLOSE, statusMessage);
void getIOStats(uint64_t &sent, uint64_t &recv) override
std::shared_ptr<StreamSocket> socket = getSocket().lock();
if (socket)
socket->getIOStats(sent, recv);
sent = 0;
recv = 0;
void shutdown(const StatusCodes statusCode = StatusCodes::NORMAL_CLOSE,
const std::string& statusMessage = std::string(),
bool hardShutdown = false)
std::shared_ptr<StreamSocket> socket = _socket.lock();
if (socket)
LOG_TRC("Shutdown: Closing Connection");
if (!_shuttingDown)
sendCloseFrame(statusCode, statusMessage);
assert(socket->getInBuffer().empty() &&
"Socket buffer must be empty after ignoreInput");
// force close after writing this message
if (hardShutdown)
_inFragmentBlock = false;
_shuttingDown = false;
/// Don't wait for the remote Websocket to handshake with us; go down fast.
void shutdownAfterWriting()
shutdown(WebSocketHandler::StatusCodes::NORMAL_CLOSE, std::string(),
true /* hard async shutdown & close */);
bool handleTCPStream(const std::shared_ptr<StreamSocket>& socket)
assert(socket && "Expected a valid socket instance.");
// websocket fun !
const size_t len = socket->getInBuffer().size();
if (len == 0)
return false; // avoid logging.
if (len < 2) // partial read
LOG_TRC("Still incomplete WebSocket message, have " << len << " bytes");
return false;
unsigned char *p = reinterpret_cast<unsigned char*>(&socket->getInBuffer()[0]);
_lastFlags = p[0];
const bool fin = _lastFlags & 0x80;
const WSOpCode code = static_cast<WSOpCode>(_lastFlags & 0x0f);
const bool hasMask = p[1] & 0x80;
size_t payloadLen = p[1] & 0x7f;
size_t headerLen = 2;
// normally - 7 bit length.
if (payloadLen == 126) // 2 byte length
if (len < 2 + 2)
LOG_TRC("Still incomplete WebSocket message, have " << len << " bytes");
return false;
payloadLen = (((unsigned)p[2]) << 8) | ((unsigned)p[3]);
headerLen += 2;
else if (payloadLen == 127) // 8 byte length
if (len < 2 + 8)
LOG_TRC("Still incomplete WebSocket message, have " << len << " bytes");
return false;
payloadLen = ((((uint64_t)p[9]) << 0) + (((uint64_t)p[8]) << 8) +
(((uint64_t)p[7]) << 16) + (((uint64_t)p[6]) << 24) +
(((uint64_t)p[5]) << 32) + (((uint64_t)p[4]) << 40) +
(((uint64_t)p[3]) << 48) + (((uint64_t)p[2]) << 56));
// FIXME: crop read length to remove top / sign bits.
headerLen += 8;
unsigned char *mask = nullptr;
if (hasMask)
mask = p + headerLen;
headerLen += 4;
if (headerLen > len || payloadLen > len - headerLen)
{ // partial read wait for more data.
LOG_TRC("Still incomplete WebSocket frame, have "
<< len << " bytes, frame is " << payloadLen + headerLen << " bytes");
return false;
if (hasMask && _isClient)
LOG_ERR("Servers should not send masked frames. Only clients");
return true;
LOG_TRC("Incoming WebSocket data of "
<< len << " bytes: "
<< Util::stringifyHexLine(socket->getInBuffer(), 0, std::min((size_t)32, len)));
unsigned char *data = p + headerLen;
if (isControlFrame(code))
//Process control frames
std::vector<char> ctrlPayload;
readPayload(data, payloadLen, mask, ctrlPayload);
socket->getInBuffer().eraseFirst(headerLen + payloadLen);
LOG_TRC("Incoming WebSocket frame code "
<< static_cast<unsigned>(code) << ", fin? " << fin << ", mask? " << hasMask
<< ", payload length: " << payloadLen
<< ", residual socket data: " << socket->getInBuffer().size() << " bytes");
// All control frames MUST NOT be fragmented and MUST have a payload length of 125 bytes or less
if (!fin)
LOG_ERR("A control frame cannot be fragmented");
return true;
if (payloadLen > 125)
LOG_ERR("The payload length of a control frame must not exceed 125 bytes");
return true;
switch (code)
case WSOpCode::Pong:
if (_isClient)
LOG_WRN("Servers should not send pongs, only clients");
_pingTimeUs = std::chrono::duration_cast<std::chrono::microseconds>
(std::chrono::steady_clock::now() - _lastPingSentTime).count();
LOG_TRC("Pong received: " << _pingTimeUs << " microseconds");
gotPing(code, _pingTimeUs);
case WSOpCode::Ping:
if (!_isClient)
LOG_DBG("Clients should not send pings, only servers");
const auto now = std::chrono::steady_clock::now();
_pingTimeUs = std::chrono::duration_cast<std::chrono::microseconds>
(now - _lastPingSentTime).count();
sendPong(now, &ctrlPayload[0], payloadLen, socket);
gotPing(code, _pingTimeUs);
case WSOpCode::Close:
std::string message;
StatusCodes statusCode = StatusCodes::NORMAL_CLOSE;
if (!_shuttingDown)
// Peer-initiated shutdown must be echoed.
// Otherwise, this is the echo to _our_ shutdown message, which we should ignore.
if (ctrlPayload.size())
statusCode = static_cast<StatusCodes>((((uint64_t)(unsigned char)ctrlPayload[0]) << 8) +
(((uint64_t)(unsigned char)ctrlPayload[1]) << 0));
if (ctrlPayload.size() > 2)
message.assign(&ctrlPayload[2], &ctrlPayload[2] + ctrlPayload.size() - 2);
LOG_TRC("Peer initiated socket shutdown. Code: "
<< static_cast<int>(statusCode));
shutdown(statusCode, message);
return true;
LOG_ERR("Received unknown control code");
return true;
// Check data frames for errors
if (_inFragmentBlock)
if (code != WSOpCode::Continuation)
LOG_ERR("A fragment that is not the first fragment of a message must have the opcode "
"equal to 0");
return true;
else if (code == WSOpCode::Continuation)
LOG_ERR("An unfragmented message or the first fragment of a fragmented message must "
"have the opcode different than 0");
return true;
//Process data frame
readPayload(data, payloadLen, mask, _wsPayload);
unsigned char * const p = reinterpret_cast<unsigned char*>(&socket->getInBuffer()[0]);
_wsPayload.insert(_wsPayload.end(), p, p + len);
const size_t headerLen = 0;
const size_t payloadLen = len;
socket->eraseFirstInputBytes(headerLen + payloadLen);
"Incoming WebSocket frame code "
<< static_cast<unsigned>(code) << ", fin? " << fin << ", mask? " << hasMask
<< ", payload length: " << payloadLen
<< ", residual socket data: " << socket->getInBuffer().size()
<< " bytes, unmasked data: " +
Util::stringifyHexLine(_wsPayload, 0, std::min((size_t)32, _wsPayload.size())));
if (fin)
// If is final fragment then process the accumulated message.
catch (const Poco::Exception& ex)
LOG_ERR("Error during handleMessage: " << ex.displayText());
catch (const std::exception& exception)
LOG_ERR("Error during handleMessage: " << exception.what());
catch (...)
LOG_ERR("Error during handleMessage");
_inFragmentBlock = false;
_inFragmentBlock = true;
// If is not final fragment then wait for next fragment.
return true;
catch (const std::exception& exception)
LOG_ERR("Error during handleMessage: " << exception.what());
catch (...)
LOG_ERR("Error during handleMessage");
return true;
/// Implementation of the ProtocolHandlerInterface.
virtual void handleIncomingMessage(SocketDisposition&) override
std::shared_ptr<StreamSocket> socket = _socket.lock();
if (Util::isMobileApp())
// No separate "upgrade" is going on
if (socket && !socket->isWebSocket())
if (!socket)
LOG_ERR("No socket associated with WebSocketHandler " << this);
else if (_isClient && !socket->isWebSocket())
catch (const std::exception& ex)
LOG_DBG("handleClientUpgrade exception caught: " << ex.what());
while (socket->processInputEnabled() && handleTCPStream(socket))
; // might have multiple messages in the accumulated buffer.
int getPollEvents([[maybe_unused]] std::chrono::steady_clock::time_point now,
[[maybe_unused]] int64_t& timeoutMaxMicroS) override
if (!_isClient)
const auto timeSincePingMicroS
= std::chrono::duration_cast<std::chrono::microseconds>(now - _lastPingSentTime);
= std::min(timeoutMaxMicroS, (int64_t)(PingFrequencyMicroS - timeSincePingMicroS).count());
int events = POLLIN;
if (_msgHandler && _msgHandler->hasQueuedMessages())
events |= POLLOUT;
return events;
/// Send a ping message
void sendPingOrPong(std::chrono::steady_clock::time_point now,
const char* data, const size_t len,
const WSOpCode code,
const std::shared_ptr<StreamSocket>& socket)
assert(socket && "Expected a valid socket instance.");
// Must not send this before we're upgraded.
if (!socket->isWebSocket())
LOG_WRN("Attempted ping on non-upgraded websocket! #" << socket->getFD());
_lastPingSentTime = now; // Pretend we sent it to avoid timing out immediately.
LOG_TRC("Sending " << (const char*)(code == WSOpCode::Ping ? " ping" : "pong"));
// FIXME: allow an empty payload.
sendMessage(data, len, code, false);
_lastPingSentTime = now;
void sendPing(std::chrono::steady_clock::time_point now,
const std::shared_ptr<StreamSocket>& socket)
// assert(!_isClient);
sendPingOrPong(now, "", 1, WSOpCode::Ping, socket);
void sendPong(std::chrono::steady_clock::time_point now,
const char* data, const size_t len,
const std::shared_ptr<StreamSocket>& socket)
sendPingOrPong(now, data, len, WSOpCode::Pong, socket);
/// Do we need to handle a timeout ?
void checkTimeout([[maybe_unused]] std::chrono::steady_clock::time_point now) override
if (_isClient)
const auto timeSincePingMicroS
= std::chrono::duration_cast<std::chrono::microseconds>(now - _lastPingSentTime);
if (timeSincePingMicroS >= PingFrequencyMicroS)
const std::shared_ptr<StreamSocket> socket = _socket.lock();
if (socket)
sendPing(now, socket);
void performWrites(std::size_t capacity) override
if (_msgHandler)
void onDisconnect() override
if (_msgHandler)
/// Sends a WebSocket Text message.
int sendMessage(const std::string& msg) const
return sendTextMessage(msg.c_str(), msg.size());
template <std::size_t N> int sendMessage(const char (&msg)[N]) const
return sendTextMessage(msg, N - 1); // Minus the null-terminator.
/// Implementation of the ProtocolHandlerInterface.
int sendTextMessage(const char* msg, const size_t len, bool flush = false) const override
return sendMessage(msg, len, WSOpCode::Text, flush);
/// Implementation of the ProtocolHandlerInterface.
int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override
return sendMessage(data, len, WSOpCode::Binary, flush);
/// Sends a WebSocket message of WPOpCode type.
/// Returns the number of bytes written (including frame overhead) on success,
/// 0 for closed socket, and -1 for other errors.
int sendMessage(const char* data, const size_t len, const WSOpCode code, const bool flush) const
if (UnitBase::isUnitTesting() && !Util::isFuzzing())
int unitReturn = -1;
if (_unit->filterSendWebSocketMessage(data, len, code, flush, unitReturn))
return unitReturn;
//TODO: Support fragmented messages.
std::shared_ptr<StreamSocket> socket = _socket.lock();
return sendFrame(socket, data, len, WSFrameMask::Fin | static_cast<unsigned char>(code), flush);
virtual bool processInputEnabled() const override
std::shared_ptr<StreamSocket> socket = _socket.lock();
if (socket)
return socket->processInputEnabled();
return true;
/// Builds a websocket frame based on data and flags received as parameters.
/// The frame is output in 'out' parameter
void buildFrame(const char* data, const uint64_t len, unsigned char flags, Buffer &out) const
int slen = 0;
char scratch[16];
// All unfragmented frames must have the Fin bit.
scratch[slen++] = WSFrameMask::Fin | flags;
int maskFlag = _isMasking ? 0x80 : 0;
if (len < 126)
scratch[slen++] = (char)(len | maskFlag);
else if (len <= 0xffff)
scratch[slen++] = (char)(126 | maskFlag);
scratch[slen++] = static_cast<char>((len >> 8) & 0xff);
scratch[slen++] = static_cast<char>((len >> 0) & 0xff);
scratch[slen++] = (char)(127 | maskFlag);
scratch[slen++] = static_cast<char>((len >> 56) & 0xff);
scratch[slen++] = static_cast<char>((len >> 48) & 0xff);
scratch[slen++] = static_cast<char>((len >> 40) & 0xff);
scratch[slen++] = static_cast<char>((len >> 32) & 0xff);
scratch[slen++] = static_cast<char>((len >> 24) & 0xff);
scratch[slen++] = static_cast<char>((len >> 16) & 0xff);
scratch[slen++] = static_cast<char>((len >> 8) & 0xff);
scratch[slen++] = static_cast<char>((len >> 0) & 0xff);
assert(slen <= static_cast<int>(sizeof(scratch)));
out.append(scratch, slen);
if (_isMasking)
{ // flip some top bits - perhaps it helps.
char mask[4];
mask[0] = static_cast<char>(0x81);
mask[1] = static_cast<char>(0x76);
mask[2] = static_cast<char>(0x81);
mask[3] = static_cast<char>(0x76);
out.append(mask, 4);
// copy and mask the data
char copy[16384];
ssize_t i = 0, toSend;
while (true)
toSend = std::min(sizeof(copy), len - i);
if (toSend == 0)
for (ssize_t j = 0; j < toSend; ++j, ++i)
copy[j] = data[i] ^ mask[i%4];
out.append(copy, toSend);
// Copy the data.
out.append(data, len);
/// Sends a WebSocket frame given the data, length, and flags.
/// Returns the number of bytes written (including frame overhead) on success,
/// 0 for closed/invalid socket, and -1 for other errors.
int sendFrame(const std::shared_ptr<StreamSocket>& socket, const char* data, const uint64_t len,
[[maybe_unused]] unsigned char flags, bool flush = true) const
if (!socket || data == nullptr || len == 0)
return -1;
if (socket->isClosed())
LOG_DBG("Socket is closed. Cannot send WS frame");
return 0;
Buffer& out = socket->getOutBuffer();
LOG_TRC("WebSocketHandler: Writing " << len << " bytes to #" << socket->getFD()
<< " in addition to " << out.size()
<< " bytes buffered");
if ((flags & 0xf) == (int)WSOpCode::Text) // utf8 validate
size_t offset = Util::isValidUtf8((unsigned char*)data, len);
if (offset < len)
std::string hex, raw;
if (len < 256)
raw = std::string(data, len);
hex = "whole string:" + Util::dumpHex(raw);
// 64 bytes before & after ...
size_t cropstart, croplen;
if (offset < 64)
cropstart = 0;
cropstart = offset - 64;
croplen = std::min<size_t>(len - cropstart, 128);
assert (cropstart + croplen <= len);
raw = std::string(data + cropstart, croplen);
hex = "msg: "+ COOLProtocol::getAbbreviatedMessage(data, len) +
" string region error at byte " + std::to_string(offset - cropstart) + ": " + Util::dumpHex(raw);
std::cerr << "attempting to send invalid UTF-8 message '" << raw << "' "
<< " error at offset " << std::hex << "0x" << offset << std::dec
<< " bytes, " << hex << "\n";
assert("invalid utf-8 - check Message::detectType()" && false);
// This would generate huge amounts of "instant" Trace Events. Is that what we want? If so,
// it would be good to include in the args some identificating information about the sender
// and recipient of the frame (and what message it is related to, if any).
// TraceEvent::emitInstantEvent("WebSocketHandler::sendFrame", { { "length", std::to_string(len) } });
const size_t oldSize = out.size();
buildFrame(data, len, flags, out);
// Return the number of bytes we wrote to the *buffer*.
const size_t size = out.size() - oldSize;
// We ignore the flush parameter and always flush in the MOBILEAPP case because there is no
// WebSocket framing, we put the messages as such into the FakeSocket queue.
flush = true;
out.append(data, len);
const size_t size = out.size();
assert(size >= len && "Expected to have data in outBuffer to send");
if (flush || _shuttingDown)
// Retry if we are shutting down and failed.
// This is particularly relevant when we simulate socket error
// during unit-tests. Dropping WS frames results in random test failures.
// But more important is to flush the data we have before closing the socket.
// There is a FIXME item in Session::shutdown specifically to address this case.
// When we terminte a client's connection in DocumentBroker::finalRemoveSession,
// we send the close frame and close the socket via Socket::closeConnection(),
// which is called immediately after *this* function (see shutdown() above).
// So, a common scenario is when we want to shutdown all clients. The stack
// trace looks like this:
// WebSocketHandler::sendFrame at ./net/WebSocketHandler.hpp:678 (this function)
// WebSocketHandler::sendCloseFrame at ./net/WebSocketHandler.hpp:149
// WebSocketHandler::shutdown at ./net/WebSocketHandler.hpp:175
// WebSocketHandler::shutdown at ./net/WebSocketHandler.hpp:155
// Session::shutdown at common/Session.cpp:235
// Session::shutdownGoingAway at ./common/Session.hpp:152 (this will close the socket)
// DocumentBroker::shutdownClients at wsd/DocumentBroker.cpp:2386
// DocumentBroker::terminateChild at wsd/DocumentBroker.cpp:2421
// The proper fix is to flag the socket(s) for shutdown, but continue
// polling until we completly flush the buffered data, then we close
// the socket in question. This isn't possible in the above scenario,
// and a proper fix is to modify DocumentBroker's poll to take this
// flushing into account (note that currently terminateChild is called
// *after* the poll loop exists). This will be done in a follow up later.
// For now, we just do a second write, and hope for the best.
if (_shuttingDown && !out.empty())
if (!out.empty())
LOG_WRN("Shutting down but "
<< out.size()
<< " bytes couldn't be flushed and still remain in the output buffer");
return size;
bool isControlFrame(WSOpCode code) const { return code >= WSOpCode::Close; }
void readPayload(unsigned char *data, size_t dataLen, unsigned char* mask, std::vector<char>& payload)
if (dataLen == 0)
if (mask)
size_t end = payload.size();
payload.resize(end + dataLen);
if (dataLen > 0)
char* wsData = &payload[end];
for (size_t i = 0; i < dataLen; ++i)
*wsData++ = data[i] ^ mask[i % 4];
payload.insert(payload.end(), data, data + dataLen);
/// To be overridden to handle the websocket messages the way you need.
virtual void handleMessage(const std::vector<char> &data)
if (_msgHandler)
const std::weak_ptr<StreamSocket>& getSocket() const
return _socket;
void ignoreInput()
std::shared_ptr<StreamSocket> socket = _socket.lock();
if (socket)
/// Implementation of the ProtocolHandlerInterface.
void dumpState(std::ostream& os) const override;
static std::string generateKey();
static std::string computeAccept(const std::string &key);
/// Upgrade the http(s) connection to a websocket.
template <typename T>
void upgradeToWebSocket(const std::shared_ptr<StreamSocket>& socket,
[[maybe_unused]] const T& req)
assert(socket && "Must have a valid socket");
LOG_TRC("Upgrading to WebSocket");
assert(!_isClient && "Accepting upgrade requests are done by servers only.");
// create our websocket goodness ...
const int wsVersion = std::stoi(req.get("Sec-WebSocket-Version", "13"));
const std::string wsKey = req.get("Sec-WebSocket-Key", "");
const std::string wsProtocol = req.get("Sec-WebSocket-Protocol", "chat");
// FIXME: other sanity checks ...
LOG_INF("WebSocket version: " << wsVersion << ", key: [" << wsKey << "], protocol: ["
<< wsProtocol << ']');
if (std::getenv("COOL_ZERO_BUFFER_SIZE"))
http::Response httpResponse(http::StatusCode::SwitchingProtocols, socket->getFD());
httpResponse.set("Upgrade", "websocket");
httpResponse.set("Connection", "Upgrade");
httpResponse.set("Sec-WebSocket-Accept", computeAccept(wsKey));
LOG_TRC("Sending WS Upgrade response: " << httpResponse.header().toString());
// Handle incoming upgrade to full socket as client WS.
void handleClientUpgrade(const std::shared_ptr<StreamSocket>& socket)
assert(socket && "socket must be valid");
assert(_isClient && "Upgrade handshakes are finished by clients.");
Buffer& data = socket->getInBuffer();
LOG_TRC("Incoming client websocket upgrade response: " << std::string(,
// Consume the incoming data by parsing and processing the body.
http::Response response(
if (response.statusLine().statusCode() == http::StatusCode::SwitchingProtocols &&
Util::iequal(response.get("Upgrade"), "websocket") &&
Util::iequal(response.get("Connection", ""), "Upgrade") &&
response.get("Sec-WebSocket-Accept", "") == computeAccept(_key))
LOG_TRC("Accepted incoming websocket response");
LOG_ERR("Server returned invalid accept token during handshake. Disconnecting");
const int64_t read = response.readData(, data.size());
if (read < 0)
// Error: Interrupt the transfer.
LOG_ERR("Error in client websocket upgrade response. Disconnecting");
if (read > 0)
// Remove consumed data.
// Nothing to do, not enough data to parse.
assert(read == 0 && "Need more more data to parse.");
void setWebSocket(const std::shared_ptr<StreamSocket>& socket)
assert(socket && "Must have a valid socket");
// No need to ping right upon connection/upgrade,
// but do reset the time to avoid pinging immediately after.
_lastPingSentTime = std::chrono::steady_clock::now();
virtual void enableProcessInput(bool enable = true) override
std::shared_ptr<StreamSocket> socket = _socket.lock();
if (socket)
virtual void gotPing(WSOpCode /* code */, int /* pingTimeUs */)
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */