loolwsd: wait until bridge is completed

This commit is contained in:
Henry Castro 2016-04-09 11:54:22 -04:00
parent 21e6ac23ea
commit 9ae7fa5b9b
3 changed files with 81 additions and 52 deletions

View file

@ -151,6 +151,12 @@ static std::mutex newChildrenMutex;
static std::condition_variable newChildrenCV;
static std::map<std::string, std::shared_ptr<DocumentBroker>> docBrokers;
static std::mutex docBrokersMutex;
// Sessions to pre-spawned child processes that have connected but are not yet assigned a
// document to work on.
static std::mutex AvailableChildSessionMutex;
static std::condition_variable AvailableChildSessionCV;
static std::map<std::string, std::shared_ptr<MasterProcessSession>> AvailableChildSessions;
static void forkChildren(const int number)
{
@ -244,6 +250,51 @@ class ClientRequestHandler: public HTTPRequestHandler
{
private:
static bool waitBridgeCompleted(const std::shared_ptr<MasterProcessSession>& clientSession,
const std::shared_ptr<DocumentBroker>& docBroker)
{
int retries = 5;
bool isFound = false;
// Wait until the client has connected with a prison socket.
std::shared_ptr<MasterProcessSession> prisonSession;
std::unique_lock<std::mutex> lock(AvailableChildSessionMutex);
Log::debug() << "Waiting for client session [" << clientSession->getId() << "] to connect." << Log::end;
while (retries-- && !isFound)
{
AvailableChildSessionCV.wait_for(
lock,
std::chrono::milliseconds(3000),
[&isFound, &clientSession]
{
return (isFound = AvailableChildSessions.find(clientSession->getId()) != AvailableChildSessions.end());
});
if (!isFound)
{
Log::info() << "Retrying client permission... " << retries << Log::end;
// request again new URL session
const std::string message = "request " + clientSession->getId() + " " + docBroker->getDocKey() + '\n';
Log::trace("MasterToBroker: " + message.substr(0, message.length()-1));
IoUtil::writeFIFO(LOOLWSD::ForKitWritePipe, message);
}
}
if (isFound)
{
Log::debug("Waiting child session permission, done!");
prisonSession = AvailableChildSessions[clientSession->getId()];
AvailableChildSessions.erase(clientSession->getId());
clientSession->setPeer(prisonSession);
prisonSession->setPeer(clientSession);
Log::debug("Connected " + clientSession->getName() + " - " + prisonSession->getName() + ".");
}
return isFound;
}
static void handlePostRequest(HTTPServerRequest& request, HTTPServerResponse& response, const std::string& id)
{
Log::info("Post request: [" + request.getURI() + "]");
@ -295,6 +346,18 @@ private:
docBroker->incSessions();
lock.unlock();
if (!waitBridgeCompleted(session, docBroker))
{
Log::error(session->getName() + ": Failed to connect to lokit child.");
// Let the client know we can't serve now.
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
response.setContentLength(0);
response.send();
return;
}
// Now the bridge beetween the client and kit process is connected
// Let messages flow
std::string encodedFrom;
URI::encode(docBroker->getPublicUri().getPath(), "", encodedFrom);
const std::string load = "load url=" + encodedFrom;
@ -495,6 +558,18 @@ private:
if (wsSessionsCount == 1)
session->setEditLock(true);
if (!waitBridgeCompleted(session, docBroker))
{
Log::error(session->getName() + ": Failed to connect to child. Client cannot serve now.");
// Let the client know we can't serve now.
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE);
response.setContentLength(0);
response.send();
return;
}
// Now the bridge beetween the client and kit process is connected
// Let messages flow
QueueHandler handler(queue, session, "wsd_queue_" + session->getId());
Thread queueHandlerThread;
@ -747,14 +822,14 @@ public:
auto ws = std::make_shared<WebSocket>(request, response);
auto session = std::make_shared<MasterProcessSession>(sessionId, LOOLSession::Kind::ToPrisoner, ws, docBroker, nullptr);
std::unique_lock<std::mutex> lock(MasterProcessSession::AvailableChildSessionMutex);
MasterProcessSession::AvailableChildSessions.emplace(sessionId, session);
std::unique_lock<std::mutex> lock(AvailableChildSessionMutex);
AvailableChildSessions.emplace(sessionId, session);
Log::info() << " mapped " << session << " jailId=" << jailId << ", id=" << sessionId
<< " into _availableChildSessions, size=" << MasterProcessSession::AvailableChildSessions.size() << Log::end;
<< " into _availableChildSessions, size=" << AvailableChildSessions.size() << Log::end;
lock.unlock();
MasterProcessSession::AvailableChildSessionCV.notify_one();
AvailableChildSessionCV.notify_one();
const auto uri = request.getURI();
std::ostringstream message;

View file

@ -29,10 +29,6 @@ using namespace LOOLProtocol;
using Poco::Path;
using Poco::StringTokenizer;
std::map<std::string, std::shared_ptr<MasterProcessSession>> MasterProcessSession::AvailableChildSessions;
std::mutex MasterProcessSession::AvailableChildSessionMutex;
std::condition_variable MasterProcessSession::AvailableChildSessionCV;
MasterProcessSession::MasterProcessSession(const std::string& id,
const Kind kind,
std::shared_ptr<Poco::Net::WebSocket> ws,
@ -761,50 +757,6 @@ void MasterProcessSession::sendCombinedTiles(const char* /*buffer*/, int /*lengt
void MasterProcessSession::dispatchChild()
{
int retries = 3;
bool isFound = false;
// Wait until the child has connected with Master.
std::shared_ptr<MasterProcessSession> childSession;
std::unique_lock<std::mutex> lock(AvailableChildSessionMutex);
Log::debug() << "Waiting for child session [" << getId() << "] to connect." << Log::end;
while (retries-- && !isFound)
{
AvailableChildSessionCV.wait_for(
lock,
std::chrono::milliseconds(3000),
[&isFound, this]
{
return (isFound = AvailableChildSessions.find(getId()) != AvailableChildSessions.end());
});
if (!isFound)
{
Log::info() << "Retrying child permission... " << retries << Log::end;
// request again new URL session
const std::string message = "request " + getId() + " " + _docBroker->getDocKey() + '\n';
Log::trace("MasterToBroker: " + message.substr(0, message.length()-1));
IoUtil::writeFIFO(LOOLWSD::ForKitWritePipe, message);
}
}
if (!isFound)
{
Log::error(getName() + ": Failed to connect to child. Shutting down socket.");
IoUtil::shutdownWebSocket(_ws);
throw std::runtime_error("Failed to connect to child.");
}
Log::debug("Waiting child session permission, done!");
childSession = AvailableChildSessions[getId()];
AvailableChildSessions.erase(getId());
_peer = childSession;
childSession->_peer = shared_from_this();
childSession->_docBroker = _docBroker;
Log::debug("Connected " + getName() + " - " + childSession->getName() + ".");
std::ostringstream oss;
oss << "load";
oss << " url=" << _docBroker->getPublicUri().toString();

View file

@ -49,6 +49,8 @@ class MasterProcessSession final : public LOOLSession, public std::enable_shared
std::shared_ptr<BasicTileQueue> getQueue() const { return _queue; }
void setPeer(const std::shared_ptr<MasterProcessSession>& peer) { _peer = peer; }
void setEditLock(const bool value) { _bEditLock = value; }
bool isEditLocked() const { return _bEditLock; }