loolwsd: DocumentStoreManager -> DocumentBroker
Renamed DocumentStoreManager to DocumentBroker and restructured the handshake process. Currently, at first client connection to a given doc a DocumentBroker is created to serve as the clearing house of all client-side activities on the document. Prime goals is loading and saving of the document, but also to guarantee race-free management of the doc. Each doc has a unique DocKey based on the URL (the path, without queries). This DocKey is used as key into a map of all DocumentBrokers. The latter is shared among MasterProcessSession instances. Change-Id: I569f2d235676e88ddc690147f3cb89faa60388c2 Reviewed-on: https://gerrit.libreoffice.org/23216 Reviewed-by: Ashod Nakashian <ashnakash@gmail.com> Tested-by: Ashod Nakashian <ashnakash@gmail.com>
This commit is contained in:
parent
8ef45a975e
commit
bb16272e11
5 changed files with 125 additions and 70 deletions
|
@ -7,8 +7,8 @@
|
|||
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
#ifndef INCLUDED_DOCUMENTSTOREMANAGER_HPP
|
||||
#define INCLUDED_DOCUMENTSTOREMANAGER_HPP
|
||||
#ifndef INCLUDED_DOCUMENTBROKER_HPP
|
||||
#define INCLUDED_DOCUMENTBROKER_HPP
|
||||
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
|
@ -18,9 +18,11 @@
|
|||
|
||||
#include "Storage.hpp"
|
||||
|
||||
/// A DocumentStoreManager as mananged by us.
|
||||
/// DocumentBroker is responsible for setting up a document
|
||||
/// in jail and brokering loading it from Storage
|
||||
/// and saving it back.
|
||||
/// Contains URI, physical path, etc.
|
||||
class DocumentStoreManager
|
||||
class DocumentBroker
|
||||
{
|
||||
public:
|
||||
|
||||
|
@ -55,9 +57,7 @@ public:
|
|||
}
|
||||
|
||||
static
|
||||
std::shared_ptr<DocumentStoreManager> create(const std::string& uri,
|
||||
const std::string& jailRoot,
|
||||
const std::string& jailId)
|
||||
std::shared_ptr<DocumentBroker> create(const std::string& uri)
|
||||
{
|
||||
std::string decodedUri;
|
||||
Poco::URI::decode(uri, decodedUri);
|
||||
|
@ -75,17 +75,39 @@ public:
|
|||
throw std::runtime_error("Invalid URI.");
|
||||
}
|
||||
|
||||
return create(uriPublic, jailRoot, jailId);
|
||||
return create(uriPublic);
|
||||
}
|
||||
|
||||
static
|
||||
std::shared_ptr<DocumentStoreManager> create(
|
||||
const Poco::URI& uriPublic,
|
||||
const std::string& jailRoot,
|
||||
const std::string& jailId)
|
||||
std::shared_ptr<DocumentBroker> create(const Poco::URI& uriPublic)
|
||||
{
|
||||
Log::info("Creating DocumentStoreManager with uri: " + uriPublic.toString() +
|
||||
", jailRoot: " + jailRoot + ", jailId: " + jailId);
|
||||
Log::info("Creating DocumentBroker for uri: " + uriPublic.toString());
|
||||
|
||||
std::string docKey;
|
||||
Poco::URI::encode(uriPublic.getPath(), "", docKey);
|
||||
|
||||
return std::shared_ptr<DocumentBroker>(new DocumentBroker(uriPublic, docKey));
|
||||
}
|
||||
|
||||
~DocumentBroker()
|
||||
{
|
||||
Log::info("~DocumentBroker [" + _uriPublic.toString() + "] destroyed.");
|
||||
}
|
||||
|
||||
/// Loads a document from the public URI into the jail.
|
||||
bool load(const std::string& jailRoot, const std::string& jailId)
|
||||
{
|
||||
Log::debug("Loading from URI: " + _uriPublic.toString());
|
||||
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
|
||||
if (_storage)
|
||||
{
|
||||
// Already loaded. Just return.
|
||||
return true;
|
||||
}
|
||||
|
||||
_jailId = jailId;
|
||||
|
||||
// The URL is the publicly visible one, not visible in the chroot jail.
|
||||
// We need to map it to a jailed path and copy the file there.
|
||||
|
@ -95,64 +117,53 @@ public:
|
|||
|
||||
Log::info("jailPath: " + jailPath.toString() + ", jailRoot: " + jailRoot);
|
||||
|
||||
auto uriJailed = uriPublic;
|
||||
std::unique_ptr<StorageBase> storage;
|
||||
if (uriPublic.isRelative() || uriPublic.getScheme() == "file")
|
||||
if (_uriPublic.isRelative() || _uriPublic.getScheme() == "file")
|
||||
{
|
||||
Log::info("Public URI [" + uriPublic.toString() + "] is a file.");
|
||||
storage.reset(new LocalStorage(jailRoot, jailPath.toString(), uriPublic.getPath()));
|
||||
const auto localPath = storage->loadStorageFileToLocal();
|
||||
uriJailed = Poco::URI(Poco::URI("file://"), localPath);
|
||||
Log::info("Public URI [" + _uriPublic.toString() + "] is a file.");
|
||||
_storage.reset(new LocalStorage(jailRoot, jailPath.toString(), _uriPublic.getPath()));
|
||||
}
|
||||
else
|
||||
{
|
||||
Log::info("Public URI [" + uriPublic.toString() +
|
||||
Log::info("Public URI [" + _uriPublic.toString() +
|
||||
"] assuming cloud storage.");
|
||||
//TODO: Configure the storage to use. For now, assume it's WOPI.
|
||||
storage.reset(new WopiStorage(jailRoot, jailPath.toString(), uriPublic.toString()));
|
||||
const auto localPath = storage->loadStorageFileToLocal();
|
||||
uriJailed = Poco::URI(Poco::URI("file://"), localPath);
|
||||
_storage.reset(new WopiStorage(jailRoot, jailPath.toString(), _uriPublic.toString()));
|
||||
}
|
||||
|
||||
auto document = std::shared_ptr<DocumentStoreManager>(new DocumentStoreManager(uriPublic, uriJailed, jailId, storage));
|
||||
|
||||
return document;
|
||||
}
|
||||
|
||||
~DocumentStoreManager()
|
||||
{
|
||||
Log::info("~DocumentStoreManager [" + _uriPublic.toString() + "] destroyed.");
|
||||
const auto localPath = _storage->loadStorageFileToLocal();
|
||||
_uriJailed = Poco::URI(Poco::URI("file://"), localPath);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool save()
|
||||
{
|
||||
Log::debug("Saving to URI: " + _uriPublic.toString());
|
||||
|
||||
assert(_storage);
|
||||
return _storage->saveLocalFileToStorage();
|
||||
}
|
||||
|
||||
Poco::URI getPublicUri() const { return _uriPublic; }
|
||||
Poco::URI getJailedUri() const { return _uriJailed; }
|
||||
std::string getJailId() const { return _jailId; }
|
||||
const std::string& getJailId() const { return _jailId; }
|
||||
const std::string& getDocKey() const { return _docKey; }
|
||||
|
||||
private:
|
||||
DocumentStoreManager(const Poco::URI& uriPublic,
|
||||
const Poco::URI& uriJailed,
|
||||
const std::string& jailId,
|
||||
std::unique_ptr<StorageBase>& storage) :
|
||||
DocumentBroker(const Poco::URI& uriPublic,
|
||||
const std::string& docKey) :
|
||||
_uriPublic(uriPublic),
|
||||
_uriJailed(uriJailed),
|
||||
_jailId(jailId),
|
||||
_storage(std::move(storage))
|
||||
_docKey(docKey)
|
||||
{
|
||||
Log::info("DocumentStoreManager [" + _uriPublic.toString() + "] created.");
|
||||
Log::info("DocumentBroker [" + _uriPublic.toString() + "] created.");
|
||||
}
|
||||
|
||||
private:
|
||||
const Poco::URI _uriPublic;
|
||||
const Poco::URI _uriJailed;
|
||||
const std::string _jailId;
|
||||
|
||||
const std::string _docKey;
|
||||
Poco::URI _uriJailed;
|
||||
std::string _jailId;
|
||||
std::unique_ptr<StorageBase> _storage;
|
||||
std::mutex _mutex;
|
||||
};
|
||||
|
||||
#endif
|
|
@ -149,8 +149,8 @@ using Poco::Util::Option;
|
|||
using Poco::Util::OptionSet;
|
||||
using Poco::Util::ServerApplication;
|
||||
|
||||
std::map<std::string, std::map<std::string, std::shared_ptr<MasterProcessSession>>> LOOLWSD::Sessions;
|
||||
std::mutex LOOLWSD::SessionsMutex;
|
||||
std::map<std::string, std::shared_ptr<DocumentBroker>> LOOLWSD::DocBrokers;
|
||||
std::mutex LOOLWSD::DocBrokersMutex;
|
||||
|
||||
/// Handles the filename part of the convert-to POST request payload.
|
||||
class ConvertToPartHandler : public PartHandler
|
||||
|
@ -505,9 +505,28 @@ private:
|
|||
// request.getCookies(cookies);
|
||||
// Log::info("Cookie: " + cookies.get("PHPSESSID", ""));
|
||||
|
||||
const auto uri = DocumentStoreManager::getUri(request.getURI());
|
||||
std::string docKey;
|
||||
Poco::URI::encode(uri.getPath(), "", docKey);
|
||||
auto docBroker = DocumentBroker::create(request.getURI());
|
||||
const auto docKey = docBroker->getDocKey();
|
||||
|
||||
{
|
||||
// This lock could become a bottleneck.
|
||||
// In that case, we can use a pool and index by publicPath.
|
||||
std::unique_lock<std::mutex> lock(LOOLWSD::DocBrokersMutex);
|
||||
|
||||
// Lookup this document.
|
||||
auto it = LOOLWSD::DocBrokers.find(docKey);
|
||||
if (it != LOOLWSD::DocBrokers.end())
|
||||
{
|
||||
// Get the DocumentBroker from the Cache.
|
||||
docBroker = it->second;
|
||||
assert(docBroker);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Set one we just created.
|
||||
LOOLWSD::DocBrokers.emplace(docKey, docBroker);
|
||||
}
|
||||
}
|
||||
|
||||
// Request a kit process for this doc.
|
||||
const std::string aMessage = "request " + id + " " + docKey + "\r\n";
|
||||
|
@ -515,7 +534,7 @@ private:
|
|||
Util::writeFIFO(LOOLWSD::BrokerWritePipe, aMessage);
|
||||
|
||||
auto ws = std::make_shared<WebSocket>(request, response);
|
||||
auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws, nullptr);
|
||||
auto session = std::make_shared<MasterProcessSession>(id, LOOLSession::Kind::ToClient, ws, docBroker);
|
||||
|
||||
// For ToClient sessions, we store incoming messages in a queue and have a separate
|
||||
// thread that handles them. This is so that we can empty the queue when we get a
|
||||
|
@ -549,6 +568,8 @@ private:
|
|||
queue.clear();
|
||||
queue.put("eof");
|
||||
queueHandlerThread.join();
|
||||
|
||||
//TODO: Cleanup DocumentBroker.
|
||||
}
|
||||
|
||||
public:
|
||||
|
@ -635,8 +656,33 @@ public:
|
|||
|
||||
Log::debug("Child socket for SessionId: " + sessionId + ", jailId: " + jailId +
|
||||
", docKey: " + docKey + " connected.");
|
||||
|
||||
std::shared_ptr<DocumentBroker> docBroker;
|
||||
{
|
||||
// This lock could become a bottleneck.
|
||||
// In that case, we can use a pool and index by publicPath.
|
||||
std::unique_lock<std::mutex> lock(LOOLWSD::DocBrokersMutex);
|
||||
|
||||
// Lookup this document.
|
||||
auto it = LOOLWSD::DocBrokers.find(docKey);
|
||||
if (it != LOOLWSD::DocBrokers.end())
|
||||
{
|
||||
// Get the DocumentBroker from the Cache.
|
||||
docBroker = it->second;
|
||||
assert(docBroker);
|
||||
}
|
||||
else
|
||||
{
|
||||
// The client closed before we started,
|
||||
// or some early failure happened.
|
||||
Log::error("Failed to find DocumentBroker for docKey [" + docKey +
|
||||
"] while handling child connection for session [" + sessionId + "].");
|
||||
throw std::runtime_error("Invalid docKey.");
|
||||
}
|
||||
}
|
||||
|
||||
auto ws = std::make_shared<WebSocket>(request, response);
|
||||
auto session = std::make_shared<MasterProcessSession>(sessionId, LOOLSession::Kind::ToPrisoner, ws, nullptr);
|
||||
auto session = std::make_shared<MasterProcessSession>(sessionId, LOOLSession::Kind::ToPrisoner, ws, docBroker);
|
||||
|
||||
std::unique_lock<std::mutex> lock(MasterProcessSession::AvailableChildSessionMutex);
|
||||
MasterProcessSession::AvailableChildSessions.emplace(sessionId, session);
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
|
||||
#include "Auth.hpp"
|
||||
#include "Common.hpp"
|
||||
#include "DocumentBroker.hpp"
|
||||
#include "Util.hpp"
|
||||
|
||||
class MasterProcessSession;
|
||||
|
@ -50,10 +51,9 @@ public:
|
|||
static const std::string FIFO_LOOLWSD;
|
||||
static const std::string LOKIT_PIDLOG;
|
||||
|
||||
// All sessions for a given doc. The URI path (without host, port, or query) is the key.
|
||||
// The value is a map of SessionId => Session instance.
|
||||
static std::map<std::string, std::map<std::string, std::shared_ptr<MasterProcessSession>>> Sessions;
|
||||
static std::mutex SessionsMutex;
|
||||
// All DocumentBrokers by their DocKey (the URI path without host, port, or query).
|
||||
static std::map<std::string, std::shared_ptr<DocumentBroker>> DocBrokers;
|
||||
static std::mutex DocBrokersMutex;
|
||||
|
||||
static
|
||||
std::string GenSessionId()
|
||||
|
|
|
@ -33,11 +33,11 @@ std::condition_variable MasterProcessSession::AvailableChildSessionCV;
|
|||
MasterProcessSession::MasterProcessSession(const std::string& id,
|
||||
const Kind kind,
|
||||
std::shared_ptr<Poco::Net::WebSocket> ws,
|
||||
std::shared_ptr<DocumentStoreManager> docStoreManager) :
|
||||
std::shared_ptr<DocumentBroker> docBroker) :
|
||||
LOOLSession(id, kind, ws),
|
||||
_curPart(0),
|
||||
_loadPart(-1),
|
||||
_docStoreManager(docStoreManager)
|
||||
_docBroker(docBroker)
|
||||
{
|
||||
Log::info("MasterProcessSession ctor [" + getName() + "].");
|
||||
}
|
||||
|
@ -142,8 +142,7 @@ bool MasterProcessSession::_handleInput(const char *buffer, int length)
|
|||
if (object->get("commandName").toString() == ".uno:Save" &&
|
||||
object->get("success").toString() == "true")
|
||||
{
|
||||
Log::info() << getName() << " " << _docStoreManager << Log::end;
|
||||
_docStoreManager->save();
|
||||
_docBroker->save();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -809,17 +808,16 @@ void MasterProcessSession::dispatchChild()
|
|||
}
|
||||
|
||||
const auto jailRoot = Poco::Path(LOOLWSD::ChildRoot, childSession->_childId);
|
||||
auto document = DocumentStoreManager::create(_docURL, jailRoot.toString(), childSession->_childId);
|
||||
_docBroker->load(jailRoot.toString(), childSession->_childId);
|
||||
|
||||
_docStoreManager = document;
|
||||
_peer = childSession;
|
||||
childSession->_peer = shared_from_this();
|
||||
childSession->_docStoreManager = document;
|
||||
childSession->_docBroker = _docBroker;
|
||||
|
||||
std::ostringstream oss;
|
||||
oss << "load";
|
||||
oss << " url=" << document->getPublicUri().toString();
|
||||
oss << " jail=" << document->getJailedUri().toString();
|
||||
oss << " url=" << _docBroker->getPublicUri().toString();
|
||||
oss << " jail=" << _docBroker->getJailedUri().toString();
|
||||
|
||||
if (_loadPart >= 0)
|
||||
oss << " part=" + std::to_string(_loadPart);
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
|
||||
#include <Poco/Random.h>
|
||||
|
||||
#include "DocumentStoreManager.hpp"
|
||||
#include "DocumentBroker.hpp"
|
||||
#include "LOOLSession.hpp"
|
||||
#include "TileCache.hpp"
|
||||
|
||||
|
@ -22,7 +22,7 @@ public:
|
|||
MasterProcessSession(const std::string& id,
|
||||
const Kind kind,
|
||||
std::shared_ptr<Poco::Net::WebSocket> ws,
|
||||
std::shared_ptr<DocumentStoreManager> docStoreManager);
|
||||
std::shared_ptr<DocumentBroker> docBroker);
|
||||
virtual ~MasterProcessSession();
|
||||
|
||||
bool haveSeparateProcess();
|
||||
|
@ -42,7 +42,7 @@ public:
|
|||
*/
|
||||
std::string getSaveAs();
|
||||
|
||||
std::shared_ptr<DocumentStoreManager> getDocumentStoreManager() const { return _docStoreManager; }
|
||||
std::shared_ptr<DocumentBroker> getDocumentBroker() const { return _docBroker; }
|
||||
|
||||
// Sessions to pre-spawned child processes that have connected but are not yet assigned a
|
||||
// document to work on.
|
||||
|
@ -89,7 +89,7 @@ private:
|
|||
int _loadPart;
|
||||
/// Kind::ToClient instances store URLs of completed 'save as' documents.
|
||||
MessageQueue _saveAsQueue;
|
||||
std::shared_ptr<DocumentStoreManager> _docStoreManager;
|
||||
std::shared_ptr<DocumentBroker> _docBroker;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in a new issue