Buffer an unlimited (eek, scary) number of messages per fake socket

Matches the behaviour of real sockets in a real Online server better.
Seems to get rid of the occasional hang problem.

I had already some time ago suspected that such a change would fix
that problem, but unfortunately my first attempt missed one crucial
detail, so it didn't, and I spent days looking for other ways to fix
the problem instead, in vain.

Probably I should add some sanity limit on the number of buffered
messages, though... But if such a sanity limit then would be hit, most
likely much else is totally broken already anyway.

Change-Id: Ice43057814ee5abd85b2935ffaa91765845a515a
This commit is contained in:
Tor Lillqvist 2018-10-23 14:50:23 +03:00
parent 60a3733859
commit decf78ff2e

View file

@ -50,7 +50,7 @@ struct FakeSocketPair
int connectingFd;
bool shutdown[2];
bool readable[2];
std::vector<char> buffer[2];
std::vector<std::vector<char>> buffer[2];
FakeSocketPair()
{
@ -219,11 +219,10 @@ static bool checkForPoll(std::vector<FakeSocketPair>& fds, struct pollfd *pollfd
retval = true;
}
}
// With our trivial single-message buffering, a socket is writable if the peer socket is
// open and not readable.
// With multiple buffers, a socket is always writable unless the peer is closed or shut down
if (pollfds[i].events & POLLOUT)
{
if (fds[pollfds[i].fd/2].fd[N] != -1 && !fds[pollfds[i].fd/2].readable[N] && !fds[pollfds[i].fd/2].shutdown[N])
if (fds[pollfds[i].fd/2].fd[N] != -1 && !fds[pollfds[i].fd/2].shutdown[N])
{
pollfds[i].revents |= POLLOUT;
retval = true;
@ -464,9 +463,9 @@ ssize_t fakeSocketAvailableDataLength(int fd)
return -1;
}
loggingBuffer << "FakeSocket Available data on #" << fd << ": " << pair.buffer[K].size() << flush();
loggingBuffer << "FakeSocket Available data on #" << fd << ": " << pair.buffer[K][0].size() << flush();
return pair.buffer[K].size();
return pair.buffer[K][0].size();
}
ssize_t fakeSocketRead(int fd, void *buf, size_t nbytes)
@ -507,8 +506,9 @@ ssize_t fakeSocketRead(int fd, void *buf, size_t nbytes)
return -1;
}
// These sockets are record-oriented! It won't work to read less than the whole buffer.
ssize_t result = pair.buffer[K].size();
// These sockets are record-oriented. It won't work to read less than the whole record in turn
// to be read.
ssize_t result = pair.buffer[K][0].size();
if (nbytes < result)
{
loggingBuffer << "FakeSocket EAGAIN: Read from #" << fd << ", " << nbytes << (nbytes == 1 ? " byte" : " bytes") << flush();
@ -516,12 +516,16 @@ ssize_t fakeSocketRead(int fd, void *buf, size_t nbytes)
return -1;
}
memmove(buf, pair.buffer[K].data(), result);
pair.buffer[K].resize(0);
if (pair.buffer[K].size() == 0)
return 0;
memmove(buf, pair.buffer[K][0].data(), result);
pair.buffer[K].erase(pair.buffer[K].begin());
// If peer is closed or shut down, we continue to be readable
if (pair.fd[N] == -1 || pair.shutdown[N])
pair.readable[K] = true;
else
else if (pair.buffer[K].size() == 0)
pair.readable[K] = false;
theCV.notify_all();
@ -564,15 +568,8 @@ ssize_t fakeSocketWrite(int fd, const void *buf, size_t nbytes)
return -1;
}
if (pair.readable[N])
{
loggingBuffer << "FakeSocket EAGAIN: Write to #" << fd << ", " << nbytes << (nbytes == 1 ? " byte" : " bytes") << flush();
errno = EAGAIN;
return -1;
}
pair.buffer[N].resize(nbytes);
memmove(pair.buffer[N].data(), buf, nbytes);
pair.buffer[N].emplace_back(std::vector<char>(nbytes));
memmove(pair.buffer[N].back().data(), buf, nbytes);
pair.readable[N] = true;
theCV.notify_all();