make Socket use Buffer for input buffer too

SteamSocket::eraseFirstInputBytes() removes from the beginning
of std::vector, which is generally slow. If the buffer becomes
too big, which it may under a load, then the function will get
slow, which in turn will likely lead to the buffer getting even
bigger because of accumulated backlog.

The Buffer class is optimized for removal at the beginning,
so use it instead of std::vector, including some API additions
for it to be an in-place replacement where it's used.

Signed-off-by: Luboš Luňák <l.lunak@collabora.com>
Change-Id: I4cf7ec56c908c7d3df391dc3f8e230ad32abb162
This commit is contained in:
Luboš Luňák 2021-10-25 18:05:54 +02:00 committed by Luboš Luňák
parent 81ea2266dd
commit 897c5df270
7 changed files with 55 additions and 18 deletions

View file

@ -30,6 +30,9 @@ public:
{
}
typedef std::vector<char>::iterator iterator;
typedef std::vector<char>::const_iterator const_iterator;
std::size_t size() const { return _size; }
bool empty() const { return _size == 0; }
@ -93,6 +96,43 @@ public:
if (_buffer.size() > 0)
Util::dumpHex(os, _buffer, legend, prefix);
}
// various std::vector API compatibility functions
void clear()
{
_buffer.clear();
_offset = 0;
_size = 0;
}
iterator begin() { return _buffer.begin() + _offset; }
const_iterator begin() const { return _buffer.begin() + _offset; }
iterator end() { return _buffer.end(); }
const_iterator end() const { return _buffer.end(); }
char operator[](int index) const { return _buffer[_offset + index]; }
char& operator[](int index) { return _buffer[_offset + index]; }
const char* data() const { return _buffer.data() + _offset; }
char* data() { return _buffer.data() + _offset; }
iterator erase(iterator first, iterator last)
{
if (first == begin())
{
eraseFirst(last - begin());
return begin();
}
iterator ret = _buffer.erase(first, last);
_size = _buffer.size() - _offset;
return ret;
}
};
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

View file

@ -1230,14 +1230,14 @@ private:
LOG_TRC('#' << socket->getFD() << " handleIncomingMessage.");
bool close = false;
std::vector<char>& data = socket->getInBuffer();
Buffer& data = socket->getInBuffer();
// Consume the incoming data by parsing and processing the body.
const int64_t read = _response->readData(data.data(), data.size());
if (read > 0)
{
// Remove consumed data.
data.erase(data.begin(), data.begin() + read);
data.eraseFirst(read);
close = !isConnected();
}
else if (read < 0)

View file

@ -1126,7 +1126,7 @@ public:
{
assert (len <= ssize_t(sizeof(buf)));
_bytesRecvd += len;
_inBuffer.insert(_inBuffer.end(), &buf[0], &buf[len]);
_inBuffer.append(&buf[0], len);
}
// else poll will handle errors.
}
@ -1149,7 +1149,7 @@ public:
assert(len == available);
_bytesRecvd += len;
assert(_inBuffer.size() == 0);
_inBuffer.insert(_inBuffer.end(), buf.data(), buf.data() + len);
_inBuffer.append(buf.data(), len);
}
#endif
@ -1209,7 +1209,7 @@ public:
if (toErase < count)
LOG_ERR('#' << getFD() << ": attempted to remove: " << count << " which is > size: " << _inBuffer.size() << " clamped to " << toErase);
if (toErase > 0)
_inBuffer.erase(_inBuffer.begin(), _inBuffer.begin() + count);
_inBuffer.eraseFirst(count);
}
/// Compacts chunk headers away leaving just the data we want
@ -1230,10 +1230,7 @@ public:
recv = _bytesRecvd;
}
std::vector<char>& getInBuffer()
{
return _inBuffer;
}
Buffer& getInBuffer() { return _inBuffer; }
Buffer& getOutBuffer()
{
@ -1513,7 +1510,7 @@ private:
/// Client handling the actual data.
std::shared_ptr<ProtocolHandlerInterface> _socketHandler;
std::vector<char> _inBuffer;
Buffer _inBuffer;
Buffer _outBuffer;
uint64_t _bytesSent;

View file

@ -326,7 +326,7 @@ private:
std::vector<char> ctrlPayload;
readPayload(data, payloadLen, mask, ctrlPayload);
socket->getInBuffer().erase(socket->getInBuffer().begin(), socket->getInBuffer().begin() + headerLen + payloadLen);
socket->getInBuffer().eraseFirst(headerLen + payloadLen);
LOG_TRC('#' << socket->getFD() << ": Incoming WebSocket frame code " << static_cast<unsigned>(code) <<
", fin? " << fin << ", mask? " << hasMask << ", payload length: " << payloadLen <<
", residual socket data: " << socket->getInBuffer().size() << " bytes.");
@ -877,7 +877,7 @@ protected:
assert(socket && "socket must be valid");
assert(_isClient && "Upgrade handshakes are finished by clients.");
std::vector<char>& data = socket->getInBuffer();
Buffer& data = socket->getInBuffer();
LOG_TRC('#' << socket->getFD() << " Incoming client websocket upgrade response: "
<< std::string(data.data(), data.size()));

View file

@ -45,7 +45,7 @@ private:
LOG_TRC('#' << socket->getFD() << " handleIncomingMessage.");
std::vector<char>& data = socket->getInBuffer();
Buffer& data = socket->getInBuffer();
LOG_TRC('#' << socket->getFD() << " handleIncomingMessage: buffer has ["
<< std::string(data.data(), data.size()));
@ -69,7 +69,7 @@ private:
assert(read > 0 && "Must have read some data!");
// Remove consumed data.
data.erase(data.begin(), data.begin() + read);
data.eraseFirst(read);
LOG_TRC('#' << socket->getFD() << " handleIncomingMessage: removed " << read
<< " bytes to have " << data.size() << " in the buffer.");

View file

@ -77,7 +77,7 @@ private:
return;
}
std::vector<char>& in = socket->getInBuffer();
Buffer& in = socket->getInBuffer();
LOG_TRC('#' << socket->getFD() << " handling incoming " << in.size() << " bytes.");
// Find the end of the header, if any.

View file

@ -106,7 +106,7 @@ void DocumentBroker::handleProxyRequest(
bool ProxyProtocolHandler::parseEmitIncoming(
const std::shared_ptr<StreamSocket> &socket)
{
std::vector<char> &in = socket->getInBuffer();
Buffer& in = socket->getInBuffer();
#if 0 // protocol debugging.
std::stringstream oss;
@ -150,14 +150,14 @@ bool ProxyProtocolHandler::parseEmitIncoming(
// far from efficient:
std::vector<char> data;
data.insert(data.begin(), in.begin(), in.begin() + len + 1);
in.erase(in.begin(), in.begin() + len);
in.eraseFirst(len);
if (in.size() < 1 || in[0] != '\n')
{
LOG_ERR("Missing final newline");
return false;
}
in.erase(in.begin(), in.begin() + 1);
in.eraseFirst(1);
if (serial != _inSerial + 1)
LOG_ERR("Serial mismatch " << serial << " vs. " << (_inSerial + 1));