Intermediate commit, will not actually do it like this

My comment in 1b0230e4df was misleading, even if
doing this for Unix only, if we had done it as I was first thinking, we would
have needed to pass sockets between processes. But we won't, we will instead
keep all the connections to the clients in the same master process, and pass
on the WebSocketg protocol as such to child processes. That way the child
processes can go away when idle, or crash, without the connections
breaking. Or something.
This commit is contained in:
Tor Lillqvist 2015-03-16 14:37:32 +02:00
parent 50e36c7ca5
commit bd35c1aeb8
5 changed files with 199 additions and 11 deletions

View file

@ -7,11 +7,21 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#include <iostream>
#include <Poco/Net/Socket.h>
#include "MigratorySocket.hpp"
MigratorySocket::MigratorySocket(const Poco::Net::Socket& socket)
using Poco::Net::Socket;
MigratorySocket::MigratorySocket(const Socket& socket) :
Socket(socket)
{
}
MigratorySocket::MigratorySocket(Poco::Net::SocketImpl* pImpl) :
Socket(pImpl)
{
}
@ -19,4 +29,9 @@ MigratorySocket::~MigratorySocket()
{
}
poco_socket_t MigratorySocket::sockfd() const
{
return Socket::sockfd();
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

View file

@ -19,8 +19,12 @@ public:
/// The argument is the actual socket to be transported.
MigratorySocket(const Poco::Net::Socket& socket);
MigratorySocket(Poco::Net::SocketImpl* pImpl);
virtual ~MigratorySocket();
poco_socket_t sockfd() const;
private:
/// Socket that is to be migrated
Poco::Net::Socket _socket;

View file

@ -7,6 +7,12 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#ifndef _WIN32
#include <sys/socket.h>
#endif
#include <iostream>
#include <Poco/Net/Socket.h>
#include <Poco/Net/StreamSocketImpl.h>
@ -26,7 +32,7 @@ MigratorySocketTransport MigratorySocketTransport::create()
return MigratorySocketTransport(sockets);
}
MigratorySocketTransport::MigratorySocketTransport(poco_socket_t sockets[2]):
MigratorySocketTransport::MigratorySocketTransport(poco_socket_t sockets[2]) :
Socket(new StreamSocketImpl(sockets[0])),
_thisIsParent(true)
{
@ -34,7 +40,7 @@ MigratorySocketTransport::MigratorySocketTransport(poco_socket_t sockets[2]):
_sockets[1] = sockets[1];
}
MigratorySocketTransport::MigratorySocketTransport(std::string string):
MigratorySocketTransport::MigratorySocketTransport(std::string string) :
Socket(new StreamSocketImpl(std::stoi(string))),
_thisIsParent(false)
{
@ -42,20 +48,70 @@ MigratorySocketTransport::MigratorySocketTransport(std::string string):
MigratorySocketTransport::~MigratorySocketTransport()
{
poco_closesocket(_sockets[1]);
}
std::string MigratorySocketTransport::string()
{
return std::to_string(sockfd());
return std::to_string(_sockets[1]);
}
void MigratorySocketTransport::send(MigratorySocket socket)
void MigratorySocketTransport::send(const MigratorySocket& socket)
{
#ifndef _WIN32
struct msghdr msg = {0};
struct cmsghdr *cmsg;
char buf[CMSG_SPACE(sizeof(int))];
int *fdptr;
msg.msg_control = buf;
msg.msg_controllen = sizeof buf;
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int) * 1);
fdptr = (int *) CMSG_DATA(cmsg);
*fdptr = socket.sockfd();
msg.msg_controllen = cmsg->cmsg_len;
std::cout << "socket.sockfd()=" << socket.sockfd() << std::endl;
std::cout << "sockfd()=" << sockfd() << std::endl;
if (sendmsg(sockfd(), &msg, 0) == -1)
{
std::cerr << "sendmsg() failed: " << strerror(errno) << std::endl;
}
#else
#error Add Windows implementation
#endif
}
MigratorySocket MigratorySocketTransport::receive()
{
return MigratorySocket(Socket());
struct msghdr msg = {0};
struct cmsghdr *cmsg;
char buf[CMSG_SPACE(sizeof(int))];
msg.msg_control = buf;
msg.msg_controllen = sizeof buf;
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int) * 1);
msg.msg_controllen = cmsg->cmsg_len;
std::cout << "sockfd()=" << sockfd() << std::endl;
if (recvmsg(sockfd(), &msg, 0) == -1)
{
std::cerr << "recvmsg() failed: " << strerror(errno) << std::endl;
}
return MigratorySocket(new StreamSocketImpl((poco_socket_t)(*((int *) CMSG_DATA(cmsg)))));
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

View file

@ -31,7 +31,7 @@ public:
std::string string();
/// Parent side: Send a socket to the child
void send(MigratorySocket socket);
void send(const MigratorySocket& socket);
/// Child side: Receive a socket sent from an ancestor process
MigratorySocket receive();

View file

@ -7,9 +7,19 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#include <unistd.h>
#include <iostream>
#include <Poco/Net/ServerSocket.h>
#include <Poco/Net/SocketAddress.h>
#include <Poco/Net/SocketStream.h>
#include <Poco/Net/StreamSocket.h>
#include <Poco/Net/TCPServer.h>
#include <Poco/Net/TCPServerConnection.h>
#include <Poco/Net/TCPServerConnectionFactory.h>
#include <Poco/Process.h>
#include <Poco/Timespan.h>
#include <Poco/Util/Option.h>
#include <Poco/Util/OptionSet.h>
#include <Poco/Util/ServerApplication.h>
@ -17,11 +27,95 @@
#include "MigratorySocket.hpp"
#include "MigratorySocketTransport.hpp"
using Poco::Net::ServerSocket;
using Poco::Net::Socket;
using Poco::Net::SocketAddress;
using Poco::Net::SocketOutputStream;
using Poco::Net::StreamSocket;
using Poco::Net::TCPServer;
using Poco::Net::TCPServerConnection;
using Poco::Process;
using Poco::ProcessHandle;
using Poco::Runnable;
using Poco::Thread;
using Poco::Timespan;
using Poco::Util::Application;
using Poco::Util::Option;
using Poco::Util::OptionSet;
using Poco::Util::ServerApplication;
class EchoConnection : public TCPServerConnection
{
public:
EchoConnection(const StreamSocket& socket, MigratorySocketTransport& transport) :
TCPServerConnection(socket),
_transport(transport)
{
}
void run() override
{
StreamSocket& ss = socket();
SocketOutputStream(ss) << "Connected to thread " << Thread::current()->id() << ". Enter lines to be echoed. End with an empty line." << std::endl;
_transport.send(MigratorySocket(ss));
Thread::sleep(10000);
}
private:
MigratorySocketTransport _transport;
};
class ServerConnectionFactory : public Poco::Net::TCPServerConnectionFactory
{
public:
ServerConnectionFactory(MigratorySocketTransport& transport) :
_transport(transport)
{
};
virtual TCPServerConnection* createConnection(const StreamSocket& socket) override
{
return new EchoConnection(socket, _transport);
}
private:
MigratorySocketTransport _transport;
};
namespace
{
#if 0
while (true)
{
try
{
char buffer[256];
int n = ss.receiveBytes(buffer, sizeof(buffer));
std::cout << "Got " << n << " bytes" << std::endl;
if (n == 2 && buffer[0] == '\r' && buffer[1] == '\n')
break;
ss.sendBytes(buffer, n);
}
catch (Poco::Exception& exc)
{
std::cerr << "ServerConnection: " << exc.displayText() << std::endl;
}
}
#endif
void ChildProcess(MigratorySocketTransport transport)
{
while (true)
{
if (transport.poll(Timespan::DAYS, Socket::SELECT_READ))
{
std::cout << "New socket incoming" << std::endl;
MigratorySocket socket(transport.receive());
}
}
}
}
class SocketTransportTest: public Poco::Util::ServerApplication
{
@ -39,17 +133,36 @@ class SocketTransportTest: public Poco::Util::ServerApplication
{
MigratorySocketTransport transport(MigratorySocketTransport::create());
#if 0
// Let' leave testing this until we need that functionality
Process::Args kidArgs;
kidArgs.push_back("--child");
kidArgs.push_back(transport.string());
ProcessHandle kid = Process::launch("./sockettransporttest", kidArgs);
#endif
Poco::Net::Socket socket;
MigratorySocket migrant(socket);
pid_t pid = fork();
if (pid == 0)
{
MigratorySocketTransport kidEndOfTransport(transport.string());
transport.close(); // Closes the parent end
transport.send(migrant);
migrant.close();
ChildProcess(kidEndOfTransport);
exit(0);
}
TCPServer srv(new ServerConnectionFactory(transport));
std::cout <<
"Server listening on port " << srv.socket().address().port() << "." << std::endl <<
"Please connect to it with one or more telnet sessions." << std::endl;
srv.start();
waitForTerminationRequest();
srv.stop();
return Application::EXIT_OK;
}