From 9ae7fa5b9b26ef4fa2bee1d48362ea384787ea65 Mon Sep 17 00:00:00 2001 From: Henry Castro Date: Sat, 9 Apr 2016 11:54:22 -0400 Subject: [PATCH] loolwsd: wait until bridge is completed --- loolwsd/LOOLWSD.cpp | 83 ++++++++++++++++++++++++++++++-- loolwsd/MasterProcessSession.cpp | 48 ------------------ loolwsd/MasterProcessSession.hpp | 2 + 3 files changed, 81 insertions(+), 52 deletions(-) diff --git a/loolwsd/LOOLWSD.cpp b/loolwsd/LOOLWSD.cpp index 9e32cbdd7..34291f87d 100644 --- a/loolwsd/LOOLWSD.cpp +++ b/loolwsd/LOOLWSD.cpp @@ -151,6 +151,12 @@ static std::mutex newChildrenMutex; static std::condition_variable newChildrenCV; static std::map> docBrokers; static std::mutex docBrokersMutex; +// Sessions to pre-spawned child processes that have connected but are not yet assigned a +// document to work on. +static std::mutex AvailableChildSessionMutex; +static std::condition_variable AvailableChildSessionCV; +static std::map> AvailableChildSessions; + static void forkChildren(const int number) { @@ -244,6 +250,51 @@ class ClientRequestHandler: public HTTPRequestHandler { private: + static bool waitBridgeCompleted(const std::shared_ptr& clientSession, + const std::shared_ptr& docBroker) + { + int retries = 5; + bool isFound = false; + + // Wait until the client has connected with a prison socket. + std::shared_ptr prisonSession; + std::unique_lock lock(AvailableChildSessionMutex); + + Log::debug() << "Waiting for client session [" << clientSession->getId() << "] to connect." << Log::end; + while (retries-- && !isFound) + { + AvailableChildSessionCV.wait_for( + lock, + std::chrono::milliseconds(3000), + [&isFound, &clientSession] + { + return (isFound = AvailableChildSessions.find(clientSession->getId()) != AvailableChildSessions.end()); + }); + + if (!isFound) + { + Log::info() << "Retrying client permission... " << retries << Log::end; + // request again new URL session + const std::string message = "request " + clientSession->getId() + " " + docBroker->getDocKey() + '\n'; + Log::trace("MasterToBroker: " + message.substr(0, message.length()-1)); + IoUtil::writeFIFO(LOOLWSD::ForKitWritePipe, message); + } + } + + if (isFound) + { + Log::debug("Waiting child session permission, done!"); + prisonSession = AvailableChildSessions[clientSession->getId()]; + AvailableChildSessions.erase(clientSession->getId()); + + clientSession->setPeer(prisonSession); + prisonSession->setPeer(clientSession); + Log::debug("Connected " + clientSession->getName() + " - " + prisonSession->getName() + "."); + } + + return isFound; + } + static void handlePostRequest(HTTPServerRequest& request, HTTPServerResponse& response, const std::string& id) { Log::info("Post request: [" + request.getURI() + "]"); @@ -295,6 +346,18 @@ private: docBroker->incSessions(); lock.unlock(); + if (!waitBridgeCompleted(session, docBroker)) + { + Log::error(session->getName() + ": Failed to connect to lokit child."); + // Let the client know we can't serve now. + response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE); + response.setContentLength(0); + response.send(); + return; + } + // Now the bridge beetween the client and kit process is connected + // Let messages flow + std::string encodedFrom; URI::encode(docBroker->getPublicUri().getPath(), "", encodedFrom); const std::string load = "load url=" + encodedFrom; @@ -495,6 +558,18 @@ private: if (wsSessionsCount == 1) session->setEditLock(true); + if (!waitBridgeCompleted(session, docBroker)) + { + Log::error(session->getName() + ": Failed to connect to child. Client cannot serve now."); + // Let the client know we can't serve now. + response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE); + response.setContentLength(0); + response.send(); + return; + } + // Now the bridge beetween the client and kit process is connected + // Let messages flow + QueueHandler handler(queue, session, "wsd_queue_" + session->getId()); Thread queueHandlerThread; @@ -747,14 +822,14 @@ public: auto ws = std::make_shared(request, response); auto session = std::make_shared(sessionId, LOOLSession::Kind::ToPrisoner, ws, docBroker, nullptr); - std::unique_lock lock(MasterProcessSession::AvailableChildSessionMutex); - MasterProcessSession::AvailableChildSessions.emplace(sessionId, session); + std::unique_lock lock(AvailableChildSessionMutex); + AvailableChildSessions.emplace(sessionId, session); Log::info() << " mapped " << session << " jailId=" << jailId << ", id=" << sessionId - << " into _availableChildSessions, size=" << MasterProcessSession::AvailableChildSessions.size() << Log::end; + << " into _availableChildSessions, size=" << AvailableChildSessions.size() << Log::end; lock.unlock(); - MasterProcessSession::AvailableChildSessionCV.notify_one(); + AvailableChildSessionCV.notify_one(); const auto uri = request.getURI(); std::ostringstream message; diff --git a/loolwsd/MasterProcessSession.cpp b/loolwsd/MasterProcessSession.cpp index 8339ab1a4..71ffe79e3 100644 --- a/loolwsd/MasterProcessSession.cpp +++ b/loolwsd/MasterProcessSession.cpp @@ -29,10 +29,6 @@ using namespace LOOLProtocol; using Poco::Path; using Poco::StringTokenizer; -std::map> MasterProcessSession::AvailableChildSessions; -std::mutex MasterProcessSession::AvailableChildSessionMutex; -std::condition_variable MasterProcessSession::AvailableChildSessionCV; - MasterProcessSession::MasterProcessSession(const std::string& id, const Kind kind, std::shared_ptr ws, @@ -761,50 +757,6 @@ void MasterProcessSession::sendCombinedTiles(const char* /*buffer*/, int /*lengt void MasterProcessSession::dispatchChild() { - int retries = 3; - bool isFound = false; - - // Wait until the child has connected with Master. - std::shared_ptr childSession; - std::unique_lock lock(AvailableChildSessionMutex); - - Log::debug() << "Waiting for child session [" << getId() << "] to connect." << Log::end; - while (retries-- && !isFound) - { - AvailableChildSessionCV.wait_for( - lock, - std::chrono::milliseconds(3000), - [&isFound, this] - { - return (isFound = AvailableChildSessions.find(getId()) != AvailableChildSessions.end()); - }); - - if (!isFound) - { - Log::info() << "Retrying child permission... " << retries << Log::end; - // request again new URL session - const std::string message = "request " + getId() + " " + _docBroker->getDocKey() + '\n'; - Log::trace("MasterToBroker: " + message.substr(0, message.length()-1)); - IoUtil::writeFIFO(LOOLWSD::ForKitWritePipe, message); - } - } - - if (!isFound) - { - Log::error(getName() + ": Failed to connect to child. Shutting down socket."); - IoUtil::shutdownWebSocket(_ws); - throw std::runtime_error("Failed to connect to child."); - } - - Log::debug("Waiting child session permission, done!"); - childSession = AvailableChildSessions[getId()]; - AvailableChildSessions.erase(getId()); - - _peer = childSession; - childSession->_peer = shared_from_this(); - childSession->_docBroker = _docBroker; - Log::debug("Connected " + getName() + " - " + childSession->getName() + "."); - std::ostringstream oss; oss << "load"; oss << " url=" << _docBroker->getPublicUri().toString(); diff --git a/loolwsd/MasterProcessSession.hpp b/loolwsd/MasterProcessSession.hpp index f2b5d22e3..e1422fc58 100644 --- a/loolwsd/MasterProcessSession.hpp +++ b/loolwsd/MasterProcessSession.hpp @@ -49,6 +49,8 @@ class MasterProcessSession final : public LOOLSession, public std::enable_shared std::shared_ptr getQueue() const { return _queue; } + void setPeer(const std::shared_ptr& peer) { _peer = peer; } + void setEditLock(const bool value) { _bEditLock = value; } bool isEditLocked() const { return _bEditLock; }