Implement basic buffering.

The socket now buffers input, and output, updates its poll record too.
We pass a simple message from client to server and back using lamers HTTP.
Sub-classed ClientSocket to provide a simple message handler.
   not very convinced by templatization here, but made it consistent.
   more ideal to have some virtual socket pieces.
This commit is contained in:
Michael Meeks 2017-02-14 23:45:24 +00:00 committed by Jan Holesovsky
parent 8f1ae36c8a
commit 92364f3ebf
3 changed files with 89 additions and 96 deletions

View file

@ -82,7 +82,8 @@ public:
std::cerr << "try to get response\n";
std::istream& responseStream = session->receiveResponse(response);
std::cerr << "Got response '" << responseStream << "'\n";
std::string result(std::istreambuf_iterator<char>(responseStream), {});
std::cerr << "Got response '" << result << "'\n";
}
catch (const Poco::Exception &e)
{

View file

@ -24,6 +24,34 @@
constexpr int PortNumber = 9191;
class SimpleResponseClient : public ClientSocket
{
public:
SimpleResponseClient(const int fd) :
ClientSocket(fd)
{
}
virtual void handleIncomingMessage() override
{
std::cerr << "message had size " << _inBuffer.size() << "\n";
std::ostringstream oss;
oss << "HTTP/1.1 200 OK\r\n"
<< "Date: Once, Upon a time GMT\r\n" // Mon, 27 Jul 2009 12:28:53 GMT
<< "Server: madeup string (Linux)\r\n"
<< "Content-Length: " << _inBuffer.size() << "\r\n"
<< "Content-Type: text/plain\r\n"
<< "Connection: Closed\r\n"
<< "\r\n"
;
std::string str = oss.str();
_outBuffer.insert(_outBuffer.end(), str.begin(), str.end());
// append the content we got:
_outBuffer.insert(_outBuffer.end(), _inBuffer.begin(), _inBuffer.end());
_inBuffer.clear();
}
};
/// Handles non-blocking socket event polling.
/// Only polls on N-Sockets and invokes callback and
/// doesn't manage buffers or client data.
@ -102,7 +130,7 @@ public:
/// Insert a new socket to be polled.
/// Sockets are removed only when the handler return false.
void insertNewSocket(const std::shared_ptr<ClientSocket>& newSocket)
void insertNewSocket(const std::shared_ptr<T>& newSocket)
{
std::lock_guard<std::mutex> lock(_mutex);
@ -138,8 +166,7 @@ private:
for (size_t i = 0; i < size; ++i)
{
_pollFds[i].fd = _pollSockets[i]->getFD();
//TODO: Get from the socket.
_pollFds[i].events = POLLIN | POLLOUT;
_pollFds[i].events = _pollSockets[i]->getPollEvents();
_pollFds[i].revents = 0;
}
@ -153,10 +180,10 @@ private:
/// main-loop wakeup pipe
int _wakeup[2];
/// The sockets we're controlling
std::vector<std::shared_ptr<ClientSocket>> _pollSockets;
std::vector<std::shared_ptr<T>> _pollSockets;
/// Protects _newSockets
std::mutex _mutex;
std::vector<std::shared_ptr<ClientSocket>> _newSockets;
std::vector<std::shared_ptr<T>> _newSockets;
/// The fds to poll.
std::vector<pollfd> _pollFds;
};
@ -197,39 +224,7 @@ private:
Poco::Net::SocketAddress addr("127.0.0.1", PortNumber);
void client(const int timeoutMs)
{
const auto client = std::make_shared<ClientSocket>();
if (!client->connect(addr, timeoutMs) && errno != EINPROGRESS)
{
const std::string msg = "Failed to call connect. (errno: ";
throw std::runtime_error(msg + std::strerror(errno) + ")");
}
std::cout << "Connected " << client->getFD() << std::endl;
client->send("1", 1);
int sent = 1;
while (sent > 0 && client->pollRead(5000))
{
char buf[1024];
const int recv = client->recv(buf, sizeof(buf));
if (recv <= 0)
{
perror("recv");
break;
}
else
{
const std::string msg = std::string(buf, recv);
const int num = stoi(msg);
const std::string new_msg = std::to_string(num + 1);
sent = client->send(new_msg.data(), new_msg.size());
}
}
}
void server(SocketPoll<ClientSocket>& poller)
void server(SocketPoll<SimpleResponseClient>& poller)
{
// Start server.
auto server = std::make_shared<ServerSocket>();
@ -250,7 +245,7 @@ void server(SocketPoll<ClientSocket>& poller)
{
if (server->pollRead(30000))
{
std::shared_ptr<ClientSocket> clientSocket = server->accept();
std::shared_ptr<SimpleResponseClient> clientSocket = server->accept<SimpleResponseClient>();
if (!clientSocket)
{
const std::string msg = "Failed to accept. (errno: ";
@ -264,44 +259,21 @@ void server(SocketPoll<ClientSocket>& poller)
}
/// Poll client sockets and do IO.
void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop)
void pollAndComm(SocketPoll<SimpleResponseClient>& poller, std::atomic<bool>& stop)
{
while (!stop)
{
poller.poll(5000, [](const std::shared_ptr<ClientSocket>& socket, const int events)
poller.poll(5000, [](const std::shared_ptr<SimpleResponseClient>& socket, const int events)
{
if (events & POLLIN)
{
char buf[1024];
const int recv = socket->recv(buf, sizeof(buf));
if (recv <= 0)
{
perror("recv");
return false;
}
socket->readIncomingData();
if (events & POLLOUT)
{
const std::string msg = std::string(buf, recv);
const int num = stoi(msg);
if ((num % (1<<16)) == 1)
{
std::cout << "Client #" << socket->getFD() << ": " << msg << std::endl;
}
const std::string new_msg = std::to_string(num + 1);
const int sent = socket->send(new_msg.data(), new_msg.size());
if (sent != static_cast<int>(new_msg.size()))
{
perror("send");
return false;
}
}
else
{
// Normally we'd buffer the response, but for now...
std::cerr << "Client #" << socket->getFD()
<< ": ERROR - socket not ready for write." << std::endl;
}
if (events & POLLOUT)
socket->writeOutgoingData();
if (events & (POLLHUP | POLLERR | POLLNVAL))
{
// FIXME - close and remove the socket ...
}
return true;
@ -309,17 +281,10 @@ void pollAndComm(SocketPoll<ClientSocket>& poller, std::atomic<bool>& stop)
}
}
int main(int argc, const char**)
int main(int, const char**)
{
if (argc > 1)
{
// We are now the client application.
client(0);
return 0;
}
// Used to poll client sockets.
SocketPoll<ClientSocket> poller;
SocketPoll<SimpleResponseClient> poller;
// Start the client polling thread.
Thread threadPoll([&poller](std::atomic<bool>& stop)

View file

@ -223,25 +223,51 @@ public:
// Now check if we connected, not, or not yet.
return (getError() == 0 || errno == EINPROGRESS);
}
/// Send data to our peer.
/// Returns the number of bytes sent, -1 on error.
int send(const void* buf, const size_t len)
protected:
std::vector< unsigned char > _inBuffer;
std::vector< unsigned char > _outBuffer;
public:
void readIncomingData()
{
// Don't SIGPIPE when the other end closes.
const int rc = ::send(getFD(), buf, len, MSG_NOSIGNAL);
return rc;
ssize_t len;
unsigned char buf[4096];
do {
len = ::read(getFD(), buf, sizeof(buf));
} while (len < 0 && errno == EINTR);
if (len > 0)
{
assert (len < ssize_t(sizeof(buf)));
_inBuffer.insert(_inBuffer.end(), &buf[0], &buf[len]);
handleIncomingMessage();
}
// else poll will handle errors.
}
/// Receive data from our peer.
/// Returns the number of bytes received, -1 on error,
/// and 0 when the peer has performed an orderly shutdown.
int recv(void* buf, const size_t len)
void writeOutgoingData()
{
const int rc = ::recv(getFD(), buf, len, 0);
return rc;
assert (_outBuffer.size() > 0);
ssize_t len;
do {
len = ::write(getFD(), &_outBuffer[0], _outBuffer.size());
} while (len < 0 && errno == EINTR);
if (len > 0)
{
_outBuffer.erase(_outBuffer.begin(),
_outBuffer.begin() + len);
}
// else poll will handle errors
}
int getPollEvents()
{
int pollFor = POLLIN | POLLPRI;
if (_outBuffer.size() > 0)
pollFor |= POLLOUT;
return pollFor;
}
virtual void handleIncomingMessage() = 0;
protected:
ClientSocket(const int fd) :
Socket(fd)
@ -284,12 +310,13 @@ public:
/// Accepts an incoming connection (Servers only).
/// Does not retry on error.
/// Returns a valid Socket shared_ptr on success only.
std::shared_ptr<ClientSocket> accept()
template <typename T>
std::shared_ptr<T> accept()
{
// Accept a connection (if any) and set it to non-blocking.
// We don't care about the client's address, so ignored.
const int rc = ::accept4(getFD(), nullptr, nullptr, SOCK_NONBLOCK);
return std::shared_ptr<ClientSocket>(rc != -1 ? new ClientSocket(rc) : nullptr);
return std::shared_ptr<T>(rc != -1 ? new T(rc) : nullptr);
}
};