Revert "loolwsd: support reading long messages directly"
This reverts commit 84607b43a3
.
LOOLWebSocket piece requires a much too recent Poco.
This commit is contained in:
parent
a2058341a3
commit
4432aba25b
3 changed files with 67 additions and 49 deletions
|
@ -50,17 +50,17 @@ void SocketProcessor(const std::shared_ptr<LOOLWebSocket>& ws,
|
||||||
|
|
||||||
// Timeout given is in microseconds.
|
// Timeout given is in microseconds.
|
||||||
static const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000);
|
static const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000);
|
||||||
constexpr auto bufferSize = READ_BUFFER_SIZE * 8;
|
const auto bufferSize = READ_BUFFER_SIZE * 100;
|
||||||
|
|
||||||
int flags = 0;
|
int flags = 0;
|
||||||
int n = -1;
|
int n = -1;
|
||||||
bool stop = false;
|
bool stop = false;
|
||||||
std::vector<char> payload(bufferSize);
|
std::vector<char> payload(bufferSize);
|
||||||
Poco::Buffer<char> buffer(bufferSize);
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
ws->setReceiveTimeout(0);
|
ws->setReceiveTimeout(0);
|
||||||
|
|
||||||
|
payload.resize(0);
|
||||||
|
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
stop = stopPredicate();
|
stop = stopPredicate();
|
||||||
|
@ -79,12 +79,10 @@ void SocketProcessor(const std::shared_ptr<LOOLWebSocket>& ws,
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
payload.resize(0);
|
payload.resize(payload.capacity());
|
||||||
buffer.resize(0);
|
|
||||||
n = -1;
|
n = -1;
|
||||||
n = ws->receiveFrame(buffer, flags);
|
n = ws->receiveFrame(payload.data(), payload.capacity(), flags);
|
||||||
LOG_WRN("GOT: [" << LOOLProtocol::getAbbreviatedMessage(buffer.begin(), buffer.size()) << "]");
|
payload.resize(n > 0 ? n : 0);
|
||||||
payload.insert(payload.end(), buffer.begin(), buffer.end());
|
|
||||||
}
|
}
|
||||||
catch (const Poco::TimeoutException&)
|
catch (const Poco::TimeoutException&)
|
||||||
{
|
{
|
||||||
|
@ -101,7 +99,7 @@ void SocketProcessor(const std::shared_ptr<LOOLWebSocket>& ws,
|
||||||
|
|
||||||
assert(n > 0);
|
assert(n > 0);
|
||||||
|
|
||||||
const std::string firstLine = LOOLProtocol::getFirstLine(buffer.begin(), buffer.size());
|
const std::string firstLine = LOOLProtocol::getFirstLine(payload);
|
||||||
if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) != WebSocket::FrameFlags::FRAME_FLAG_FIN)
|
if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) != WebSocket::FrameFlags::FRAME_FLAG_FIN)
|
||||||
{
|
{
|
||||||
// One WS message split into multiple frames.
|
// One WS message split into multiple frames.
|
||||||
|
@ -109,7 +107,8 @@ void SocketProcessor(const std::shared_ptr<LOOLWebSocket>& ws,
|
||||||
LOG_WRN("SocketProcessor [" << name << "]: Receiving multi-parm frame.");
|
LOG_WRN("SocketProcessor [" << name << "]: Receiving multi-parm frame.");
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
n = ws->receiveFrame(buffer, flags);
|
char buffer[READ_BUFFER_SIZE * 10];
|
||||||
|
n = ws->receiveFrame(buffer, sizeof(buffer), flags);
|
||||||
if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)
|
if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)
|
||||||
{
|
{
|
||||||
LOG_WRN("SocketProcessor [" << name << "]: Connection closed while reading multiframe message.");
|
LOG_WRN("SocketProcessor [" << name << "]: Connection closed while reading multiframe message.");
|
||||||
|
@ -117,7 +116,7 @@ void SocketProcessor(const std::shared_ptr<LOOLWebSocket>& ws,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
payload.insert(payload.end(), buffer.begin(), buffer.end());
|
payload.insert(payload.end(), buffer, buffer + n);
|
||||||
if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) == WebSocket::FrameFlags::FRAME_FLAG_FIN)
|
if ((flags & WebSocket::FrameFlags::FRAME_FLAG_FIN) == WebSocket::FrameFlags::FRAME_FLAG_FIN)
|
||||||
{
|
{
|
||||||
// No more frames.
|
// No more frames.
|
||||||
|
@ -125,6 +124,27 @@ void SocketProcessor(const std::shared_ptr<LOOLWebSocket>& ws,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
int size = 0;
|
||||||
|
Poco::StringTokenizer tokens(firstLine, " ", Poco::StringTokenizer::TOK_IGNORE_EMPTY | Poco::StringTokenizer::TOK_TRIM);
|
||||||
|
// Check if it is a "nextmessage:" and in that case read the large
|
||||||
|
// follow-up message separately, and handle that only.
|
||||||
|
if (tokens.count() == 2 && tokens[0] == "nextmessage:" &&
|
||||||
|
LOOLProtocol::getTokenInteger(tokens[1], "size", size) && size > 0)
|
||||||
|
{
|
||||||
|
LOG_TRC("SocketProcessor [" << name << "]: Getting large message of " << size << " bytes.");
|
||||||
|
if (size > MAX_MESSAGE_SIZE)
|
||||||
|
{
|
||||||
|
LOG_ERR("SocketProcessor [" << name << "]: Large-message size (" << size << ") over limit or invalid.");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
payload.resize(size);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)
|
if (n <= 0 || (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_CLOSE)
|
||||||
{
|
{
|
||||||
|
|
|
@ -108,39 +108,6 @@ public:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Wrapper for Poco::Net::WebSocket::receiveFrame() that handles PING frames
|
|
||||||
/// (by replying with a PONG frame) and PONG frames. PONG frames are ignored.
|
|
||||||
/// Should we also factor out the handling of non-final and continuation frames into this?
|
|
||||||
int receiveFrame(Poco::Buffer<char>& buffer, int& flags)
|
|
||||||
{
|
|
||||||
#ifdef ENABLE_DEBUG
|
|
||||||
// Delay receiving the frame
|
|
||||||
std::this_thread::sleep_for(getWebSocketDelay());
|
|
||||||
#endif
|
|
||||||
// Timeout given is in microseconds.
|
|
||||||
static const Poco::Timespan waitTime(POLL_TIMEOUT_MS * 1000);
|
|
||||||
|
|
||||||
while (poll(waitTime, Poco::Net::Socket::SELECT_READ))
|
|
||||||
{
|
|
||||||
const int n = Poco::Net::WebSocket::receiveFrame(buffer, flags);
|
|
||||||
if (n > 0 && (flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PING)
|
|
||||||
{
|
|
||||||
sendFrame(buffer.begin(), n, WebSocket::FRAME_FLAG_FIN | WebSocket::FRAME_OP_PONG);
|
|
||||||
}
|
|
||||||
else if ((flags & WebSocket::FRAME_OP_BITMASK) == WebSocket::FRAME_OP_PONG)
|
|
||||||
{
|
|
||||||
// In case we do send pongs in the future.
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return n;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Wrapper for Poco::Net::WebSocket::sendFrame() that handles large frames.
|
/// Wrapper for Poco::Net::WebSocket::sendFrame() that handles large frames.
|
||||||
int sendFrame(const char* buffer, const int length, const int flags = FRAME_TEXT)
|
int sendFrame(const char* buffer, const int length, const int flags = FRAME_TEXT)
|
||||||
{
|
{
|
||||||
|
@ -150,6 +117,19 @@ public:
|
||||||
#endif
|
#endif
|
||||||
std::unique_lock<std::mutex> lock(_mutex);
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
|
||||||
|
// Size after which messages will be sent preceded with
|
||||||
|
// 'nextmessage' frame to let the receiver know in advance
|
||||||
|
// the size of larger coming message. All messages up to this
|
||||||
|
// size are considered small messages.
|
||||||
|
constexpr int SMALL_MESSAGE_SIZE = READ_BUFFER_SIZE / 2;
|
||||||
|
|
||||||
|
if (length > SMALL_MESSAGE_SIZE)
|
||||||
|
{
|
||||||
|
const std::string nextmessage = "nextmessage: size=" + std::to_string(length);
|
||||||
|
Poco::Net::WebSocket::sendFrame(nextmessage.data(), nextmessage.size());
|
||||||
|
Log::debug("Message is long, sent " + nextmessage);
|
||||||
|
}
|
||||||
|
|
||||||
const int result = Poco::Net::WebSocket::sendFrame(buffer, length, flags);
|
const int result = Poco::Net::WebSocket::sendFrame(buffer, length, flags);
|
||||||
|
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
|
@ -197,7 +197,6 @@ std::vector<char> getResponseMessage(LOOLWebSocket& ws, const std::string& prefi
|
||||||
int retries = timeoutMs / 500;
|
int retries = timeoutMs / 500;
|
||||||
const Poco::Timespan waitTime(retries ? timeoutMs * 1000 / retries : timeoutMs * 1000);
|
const Poco::Timespan waitTime(retries ? timeoutMs * 1000 / retries : timeoutMs * 1000);
|
||||||
std::vector<char> response;
|
std::vector<char> response;
|
||||||
Poco::Buffer<char> buffer(READ_BUFFER_SIZE);
|
|
||||||
|
|
||||||
bool timedout = false;
|
bool timedout = false;
|
||||||
ws.setReceiveTimeout(0);
|
ws.setReceiveTimeout(0);
|
||||||
|
@ -211,10 +210,9 @@ std::vector<char> getResponseMessage(LOOLWebSocket& ws, const std::string& prefi
|
||||||
timedout = false;
|
timedout = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
response.resize(0);
|
response.resize(READ_BUFFER_SIZE);
|
||||||
buffer.resize(0);
|
int bytes = ws.receiveFrame(response.data(), response.size(), flags);
|
||||||
const int bytes = ws.receiveFrame(buffer, flags);
|
response.resize(bytes >= 0 ? bytes : 0);
|
||||||
response.insert(response.end(), buffer.begin(), buffer.end());
|
|
||||||
std::cerr << name << "Got " << LOOLProtocol::getAbbreviatedFrameDump(response.data(), bytes, flags) << std::endl;
|
std::cerr << name << "Got " << LOOLProtocol::getAbbreviatedFrameDump(response.data(), bytes, flags) << std::endl;
|
||||||
const auto message = LOOLProtocol::getFirstLine(response);
|
const auto message = LOOLProtocol::getFirstLine(response);
|
||||||
if (bytes > 0 && (flags & Poco::Net::WebSocket::FRAME_OP_BITMASK) != Poco::Net::WebSocket::FRAME_OP_CLOSE)
|
if (bytes > 0 && (flags & Poco::Net::WebSocket::FRAME_OP_BITMASK) != Poco::Net::WebSocket::FRAME_OP_CLOSE)
|
||||||
|
@ -223,6 +221,26 @@ std::vector<char> getResponseMessage(LOOLWebSocket& ws, const std::string& prefi
|
||||||
{
|
{
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
else if (LOOLProtocol::matchPrefix("nextmessage", message))
|
||||||
|
{
|
||||||
|
int size = 0;
|
||||||
|
if (LOOLProtocol::getTokenIntegerFromMessage(message, "size", size) && size > 0)
|
||||||
|
{
|
||||||
|
response.resize(size);
|
||||||
|
bytes = ws.receiveFrame(response.data(), response.size(), flags);
|
||||||
|
response.resize(bytes >= 0 ? bytes : 0);
|
||||||
|
std::cerr << name << "Got " << LOOLProtocol::getAbbreviatedFrameDump(response.data(), bytes, flags) << std::endl;
|
||||||
|
if (bytes > 0 &&
|
||||||
|
LOOLProtocol::matchPrefix(prefix, LOOLProtocol::getFirstLine(response)))
|
||||||
|
{
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
response.resize(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bytes <= 0)
|
if (bytes <= 0)
|
||||||
|
|
Loading…
Reference in a new issue