libreoffice-online/loolwsd/DocumentBroker.cpp
Tor Lillqvist 96e6ad7502 Re-think disk full handling
Instead of trying to inform the sysadmin (which we did not yet try to
do in any meaningful way), inform all connected clients (even those
editing other documents).

We use 'error: cmd=internal kind=diskfull' as the message to the
clients. The loleaflet code needs to be updated to handle that
carefully by displaying a very prominent message that tells the user
that all bets are off.

Also add a unit test for the functionality.

Document the new protocol details.

The code for this alert functionalty became a bit less elegant than I
like because of the way we include Util.cpp in the unit test 'test'
program.

Still need to add code to check for disk full in more places, not just
when saving a cached tile or font. Probably we should even actually
check for disk space on the file system(s) we use getting alarmingly
low, not just check for file writing operations that fail. Later.
2016-09-29 00:23:14 +03:00

720 lines
22 KiB
C++

/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
/*
* This file is part of the LibreOffice project.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#include "DocumentBroker.hpp"
#include "config.h"
#include <cassert>
#include <fstream>
#include <Poco/Path.h>
#include <Poco/SHA1Engine.h>
#include <Poco/StringTokenizer.h>
#include "ClientSession.hpp"
#include "Exceptions.hpp"
#include "LOOLProtocol.hpp"
#include "LOOLWSD.hpp"
#include "Log.hpp"
#include "PrisonerSession.hpp"
#include "Storage.hpp"
#include "TileCache.hpp"
#include "Unit.hpp"
using namespace LOOLProtocol;
using Poco::StringTokenizer;
void ChildProcess::socketProcessor()
{
IoUtil::SocketProcessor(_ws,
[this](const std::vector<char>& payload)
{
if (UnitWSD::get().filterChildMessage(payload))
{
return true;
}
auto docBroker = this->_docBroker.lock();
if (docBroker)
{
return docBroker->handleInput(payload);
}
Log::warn() << "Child " << this->_pid << " has no DocumentBroker to handle message: ["
<< LOOLProtocol::getAbbreviatedMessage(payload) << "]." << Log::end;
return true;
},
[]() { },
[this]() { return !!this->_stop; });
}
namespace
{
/// Returns the cache path for a given document URI.
std::string getCachePath(const std::string& uri)
{
Poco::SHA1Engine digestEngine;
digestEngine.update(uri.c_str(), uri.size());
return (LOOLWSD::Cache + "/" +
Poco::DigestEngine::digestToHex(digestEngine.digest()).insert(3, "/").insert(2, "/").insert(1, "/"));
}
}
Poco::URI DocumentBroker::sanitizeURI(const std::string& uri)
{
// The URI of the document should be url-encoded.
std::string decodedUri;
Poco::URI::decode(uri, decodedUri);
auto uriPublic = Poco::URI(decodedUri);
if (uriPublic.isRelative() || uriPublic.getScheme() == "file")
{
// TODO: Validate and limit access to local paths!
uriPublic.normalize();
}
if (uriPublic.getPath().empty())
{
throw std::runtime_error("Invalid URI.");
}
return uriPublic;
}
std::string DocumentBroker::getDocKey(const Poco::URI& uri)
{
// Keep the host as part of the key to close a potential security hole.
std::string docKey;
Poco::URI::encode(uri.getHost() + uri.getPath(), "", docKey);
return docKey;
}
DocumentBroker::DocumentBroker() :
_uriPublic(),
_docKey(),
_childRoot(),
_cacheRoot(),
_childProcess(),
_lastSaveTime(std::chrono::steady_clock::now()),
_markToDestroy(true),
_lastEditableSession(true),
_cursorPosX(0),
_cursorPosY(0),
_cursorWidth(0),
_cursorHeight(0),
_isLoaded(false),
_isModified(false),
_mutex(),
_saveMutex(),
_tileVersion(0)
{
Log::info("Empty DocumentBroker (marked to destroy) created.");
}
DocumentBroker::DocumentBroker(const Poco::URI& uriPublic,
const std::string& docKey,
const std::string& childRoot,
std::shared_ptr<ChildProcess> childProcess) :
_uriPublic(uriPublic),
_docKey(docKey),
_childRoot(childRoot),
_cacheRoot(getCachePath(uriPublic.toString())),
_childProcess(std::move(childProcess)),
_lastSaveTime(std::chrono::steady_clock::now()),
_markToDestroy(false),
_lastEditableSession(false),
_cursorPosX(0),
_cursorPosY(0),
_cursorWidth(0),
_cursorHeight(0),
_isLoaded(false),
_isModified(false),
_mutex(),
_saveMutex(),
_tileVersion(0)
{
assert(!_docKey.empty());
assert(!_childRoot.empty());
Log::info("DocumentBroker [" + _uriPublic.toString() + "] created. DocKey: [" + _docKey + "]");
}
const StorageBase::FileInfo DocumentBroker::validate(const Poco::URI& uri)
{
const auto uriString = uri.toString();
Log::info("Validating: " + uriString);
try
{
auto storage = StorageBase::create("", "", uri);
auto fileinfo = storage->getFileInfo(uri);
Log::info("After checkfileinfo: " + uriString + " -> " + fileinfo._filename);
if (!fileinfo.isValid())
{
Log::error("Invalid file info for uri " + uriString);
throw BadRequestException("Invalid URI or access denied.");
}
return fileinfo;
}
catch (const std::exception& ex)
{
Log::error("Exception while getting file info for uri " + uriString + ": " + ex.what());
throw BadRequestException("Invalid URI or access denied.");
}
}
bool DocumentBroker::load(const std::string& jailId)
{
Log::debug("Loading from URI: " + _uriPublic.toString());
std::unique_lock<std::mutex> lock(_mutex);
if (_markToDestroy)
{
// Tearing down.
return false;
}
if (_storage)
{
// Already loaded. Nothing to do.
return true;
}
_jailId = jailId;
// The URL is the publicly visible one, not visible in the chroot jail.
// We need to map it to a jailed path and copy the file there.
// user/doc/jailId
const auto jailPath = Poco::Path(JAILED_DOCUMENT_ROOT, jailId);
std::string jailRoot = getJailRoot();
Log::info("jailPath: " + jailPath.toString() + ", jailRoot: " + jailRoot);
if (LOOLWSD::NoCapsForKit)
jailRoot = jailPath.toString() + "/" + getJailRoot();
auto storage = StorageBase::create(jailRoot, jailPath.toString(), _uriPublic);
if (storage)
{
const auto fileInfo = storage->getFileInfo(_uriPublic);
_filename = fileInfo._filename;
const auto localPath = storage->loadStorageFileToLocal();
_uriJailed = Poco::URI(Poco::URI("file://"), localPath);
// Use the local temp file's timestamp.
_lastFileModifiedTime = Poco::File(storage->getLocalRootPath()).getLastModified();
_tileCache.reset(new TileCache(_uriPublic.toString(), _lastFileModifiedTime, _cacheRoot));
_storage = std::move(storage);
return true;
}
return false;
}
bool DocumentBroker::save(bool success, const std::string& result)
{
std::unique_lock<std::mutex> lock(_saveMutex);
const auto uri = _uriPublic.toString();
// If save requested, but core didn't save because document was unmodified
// notify the waiting thread, if any.
if (!success && result == "unmodified")
{
Log::debug("Save skipped as document was not modified");
_saveCV.notify_all();
return true;
}
// If we aren't destroying the last editable session just yet, and the file
// timestamp hasn't changed, skip saving.
const auto newFileModifiedTime = Poco::File(_storage->getLocalRootPath()).getLastModified();
if (!isLastEditableSession() && newFileModifiedTime == _lastFileModifiedTime)
{
// Nothing to do.
Log::debug() << "Skipping unnecessary saving to URI [" << uri
<< "]. File last modified "
<< _lastFileModifiedTime.elapsed() / 1000000
<< " seconds ago." << Log::end;
return true;
}
Log::debug("Saving to URI [" + uri + "].");
assert(_storage && _tileCache);
if (_storage->saveLocalFileToStorage())
{
_isModified = false;
_tileCache->setUnsavedChanges(false);
_lastFileModifiedTime = newFileModifiedTime;
_tileCache->saveLastModified(_lastFileModifiedTime);
_lastSaveTime = std::chrono::steady_clock::now();
Log::debug("Saved to URI [" + uri + "] and updated tile cache.");
_saveCV.notify_all();
return true;
}
Log::error("Failed to save to URI [" + uri + "].");
return false;
}
bool DocumentBroker::autoSave(const bool force, const size_t waitTimeoutMs)
{
std::unique_lock<std::mutex> lock(_mutex);
if (_sessions.empty() || _storage == nullptr || !_isLoaded ||
(!_isModified && !force))
{
// Nothing to do.
Log::trace("Nothing to autosave [" + _docKey + "].");
return true;
}
// Remeber the last save time, since this is the predicate.
const auto lastSaveTime = _lastSaveTime;
Log::trace("Checking to autosave [" + _docKey + "].");
bool sent = false;
if (force)
{
Log::trace("Sending forced save command for [" + _docKey + "].");
sent = sendUnoSave(true);
}
else if (_isModified)
{
// Find the most recent activity.
double inactivityTimeMs = std::numeric_limits<double>::max();
for (auto& sessionIt: _sessions)
{
inactivityTimeMs = std::min(sessionIt.second->getInactivityMS(), inactivityTimeMs);
}
Log::trace("Most recent activity was " + std::to_string((int)inactivityTimeMs) + " ms ago.");
const auto timeSinceLastSaveMs = getTimeSinceLastSaveMs();
Log::trace("Time since last save is " + std::to_string((int)timeSinceLastSaveMs) + " ms.");
// Either we've been idle long enough, or it's auto-save time.
if (inactivityTimeMs >= IdleSaveDurationMs ||
timeSinceLastSaveMs >= AutoSaveDurationMs)
{
Log::trace("Sending timed save command for [" + _docKey + "].");
sent = sendUnoSave(true);
}
}
if (sent && waitTimeoutMs > 0)
{
Log::trace("Waiting for save event for [" + _docKey + "].");
if (_saveCV.wait_for(lock, std::chrono::milliseconds(waitTimeoutMs)) == std::cv_status::no_timeout)
{
Log::debug("Successfully persisted document [" + _docKey + "] or document was not modified");
return true;
}
return (lastSaveTime != _lastSaveTime);
}
return sent;
}
bool DocumentBroker::sendUnoSave(const bool dontSaveIfUnmodified)
{
Log::info("Autosave triggered for doc [" + _docKey + "].");
Util::assertIsLocked(_mutex);
// Save using session holding the edit-lock (or first if multview).
for (auto& sessionIt: _sessions)
{
// Invalidate the timestamp to force persisting.
_lastFileModifiedTime.fromEpochTime(0);
// We do not want save to terminate editing mode if we are in edit mode now
std::ostringstream oss;
// arguments init
oss << "{";
// Mention DontTerminateEdit always
oss << "\"DontTerminateEdit\":"
<< "{"
<< "\"type\":\"boolean\","
<< "\"value\":true"
<< "}";
// Mention DontSaveIfUnmodified
if (dontSaveIfUnmodified)
{
oss << ","
<< "\"DontSaveIfUnmodified\":"
<< "{"
<< "\"type\":\"boolean\","
<< "\"value\":true"
<< "}";
}
// arguments end
oss << "}";
const auto saveArgs = oss.str();
Log::trace(".uno:Save arguments: " + saveArgs);
const auto command = "uno .uno:Save " + saveArgs;
sessionIt.second->handleInput(command.data(), command.size());
return true;
}
Log::error("Failed to auto-save doc [" + _docKey + "]: No valid sessions.");
return false;
}
std::string DocumentBroker::getJailRoot() const
{
assert(!_jailId.empty());
return Poco::Path(_childRoot, _jailId).toString();
}
size_t DocumentBroker::addSession(std::shared_ptr<ClientSession>& session)
{
const auto id = session->getId();
const std::string aMessage = "session " + id + " " + _docKey + "\n";
std::lock_guard<std::mutex> lock(_mutex);
// Request a new session from the child kit.
Log::debug("DocBroker to Child: " + aMessage.substr(0, aMessage.length() - 1));
_childProcess->getWebSocket()->sendFrame(aMessage.data(), aMessage.size());
auto ret = _sessions.emplace(id, session);
if (!ret.second)
{
Log::warn("DocumentBroker: Trying to add already existing session.");
}
if (session->isReadOnly())
{
Log::debug("Adding a readonly session [" + id + "]");
}
// Below values are recalculated when startDestroy() is called (before destroying the
// document). It is safe to reset their values to their defaults whenever a new session is added
_lastEditableSession = false;
_markToDestroy = false;
return _sessions.size();
}
bool DocumentBroker::connectPeers(std::shared_ptr<PrisonerSession>& session)
{
const auto id = session->getId();
std::lock_guard<std::mutex> lock(_mutex);
auto it = _sessions.find(id);
if (it != _sessions.end())
{
it->second->setPeer(session);
session->setPeer(it->second);
return true;
}
return false;
}
size_t DocumentBroker::removeSession(const std::string& id)
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = _sessions.find(id);
if (it != _sessions.end())
{
_sessions.erase(it);
}
return _sessions.size();
}
void DocumentBroker::alertAllUsersOfDocument(const std::string& cmd, const std::string& kind)
{
std::lock_guard<std::mutex> lock(_mutex);
for (auto& it: _sessions)
{
it.second->sendTextFrame("error: cmd=" + cmd + " kind=" + kind);
}
}
bool DocumentBroker::handleInput(const std::vector<char>& payload)
{
const auto msg = LOOLProtocol::getAbbreviatedMessage(payload);
Log::trace("DocumentBroker got child message: [" + msg + "].");
LOOLWSD::dumpOutgoingTrace(getJailId(), "0", msg);
if (msg.find("tile:") == 0)
{
handleTileResponse(payload);
}
else if (msg.find("tilecombine:") == 0)
{
handleTileCombinedResponse(payload);
}
else if (msg.find("errortoall:") == 0)
{
StringTokenizer tokens(msg, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
assert(tokens.count() == 3);
std::string cmd, kind;
LOOLProtocol::getTokenString(tokens, "cmd", cmd);
assert(cmd != "");
LOOLProtocol::getTokenString(tokens, "kind", kind);
assert(kind != "");
Util::alertAllUsers(cmd, kind);
}
else
{
Log::error("Unexpected message: [" + msg + "].");
return false;
}
return true;
}
void DocumentBroker::invalidateTiles(const std::string& tiles)
{
std::unique_lock<std::mutex> lock(_mutex);
// Remove from cache.
_tileCache->invalidateTiles(tiles);
//TODO: Re-issue the tiles again to avoid races.
}
void DocumentBroker::handleTileRequest(TileDesc& tile,
const std::shared_ptr<ClientSession>& session)
{
std::unique_lock<std::mutex> lock(_mutex);
tile.setVersion(++_tileVersion);
const auto tileMsg = tile.serialize();
Log::trace() << "Tile request for " << tile.serialize() << Log::end;
std::unique_ptr<std::fstream> cachedTile = _tileCache->lookupTile(tile);
if (cachedTile)
{
#if ENABLE_DEBUG
const std::string response = tile.serialize("tile:") + " renderid=cached\n";
#else
const std::string response = tile.serialize("tile:") + '\n';
#endif
std::vector<char> output;
output.reserve(static_cast<size_t>(4) * tile.getWidth() * tile.getHeight());
output.resize(response.size());
std::memcpy(output.data(), response.data(), response.size());
assert(cachedTile->is_open());
cachedTile->seekg(0, std::ios_base::end);
const auto pos = output.size();
std::streamsize size = cachedTile->tellg();
output.resize(pos + size);
cachedTile->seekg(0, std::ios_base::beg);
cachedTile->read(output.data() + pos, size);
cachedTile->close();
session->sendBinaryFrame(output.data(), output.size());
return;
}
tileCache().subscribeToTileRendering(tile, session);
// Forward to child to render.
Log::debug() << "Sending render request for tile (" << tile.getPart() << ',' << tile.getTilePosX() << ',' << tile.getTilePosY() << ")." << Log::end;
const std::string request = "tile " + tile.serialize();
_childProcess->getWebSocket()->sendFrame(request.data(), request.size());
}
void DocumentBroker::handleTileCombinedRequest(TileCombined& tileCombined,
const std::shared_ptr<ClientSession>& session)
{
std::unique_lock<std::mutex> lock(_mutex);
Log::trace() << "TileCombined request for " << tileCombined.serialize() << Log::end;
// Satisfy as many tiles from the cache.
std::vector<TileDesc> tiles;
for (auto& tile : tileCombined.getTiles())
{
std::unique_ptr<std::fstream> cachedTile = _tileCache->lookupTile(tile);
if (cachedTile)
{
//TODO: Combine the response to reduce latency.
#if ENABLE_DEBUG
const std::string response = tile.serialize("tile:") + " renderid=cached\n";
#else
const std::string response = tile.serialize("tile:") + "\n";
#endif
std::vector<char> output;
output.reserve(static_cast<size_t>(4) * tile.getWidth() * tile.getHeight());
output.resize(response.size());
std::memcpy(output.data(), response.data(), response.size());
assert(cachedTile->is_open());
cachedTile->seekg(0, std::ios_base::end);
const auto pos = output.size();
std::streamsize size = cachedTile->tellg();
output.resize(pos + size);
cachedTile->seekg(0, std::ios_base::beg);
cachedTile->read(output.data() + pos, size);
cachedTile->close();
session->sendBinaryFrame(output.data(), output.size());
}
else
{
// Not cached, needs rendering.
tile.setVersion(++_tileVersion);
tileCache().subscribeToTileRendering(tile, session);
tiles.push_back(tile);
}
}
if (!tiles.empty())
{
auto newTileCombined = TileCombined::create(tiles);
newTileCombined.setVersion(++_tileVersion);
// Forward to child to render.
const auto req = newTileCombined.serialize("tilecombine");
Log::debug() << "Sending residual tilecombine: " << req << Log::end;
_childProcess->getWebSocket()->sendFrame(req.data(), req.size());
}
}
void DocumentBroker::cancelTileRequests(const std::shared_ptr<ClientSession>& session)
{
std::unique_lock<std::mutex> lock(_mutex);
const auto canceltiles = tileCache().cancelTiles(session);
if (!canceltiles.empty())
{
Log::debug() << "Forwarding canceltiles request: " << canceltiles << Log::end;
_childProcess->getWebSocket()->sendFrame(canceltiles.data(), canceltiles.size());
}
}
void DocumentBroker::handleTileResponse(const std::vector<char>& payload)
{
const std::string firstLine = getFirstLine(payload);
try
{
auto tile = TileDesc::parse(firstLine);
const auto buffer = payload.data();
const auto length = payload.size();
if (firstLine.size() < static_cast<std::string::size_type>(length) - 1)
{
tileCache().saveTileAndNotify(
tile, buffer + firstLine.size() + 1,
length - firstLine.size() - 1);
}
else
{
Log::debug() << "Render request declined for " << firstLine << Log::end;
std::unique_lock<std::mutex> tileBeingRenderedLock(tileCache().getTilesBeingRenderedLock());
tileCache().forgetTileBeingRendered(tile);
}
}
catch (const std::exception& exc)
{
Log::error("Failed to process tile response [" + firstLine + "]: " + exc.what() + ".");
//FIXME: Return error.
//sendTextFrame("error: cmd=tile kind=syntax");
}
}
void DocumentBroker::handleTileCombinedResponse(const std::vector<char>& payload)
{
const std::string firstLine = getFirstLine(payload);
Log::debug("Handling tile combined: " + firstLine);
try
{
auto tileCombined = TileCombined::parse(firstLine);
const auto buffer = payload.data();
const auto length = payload.size();
auto offset = firstLine.size() + 1;
if (firstLine.size() < static_cast<std::string::size_type>(length) - 1)
{
for (const auto& tile : tileCombined.getTiles())
{
tileCache().saveTileAndNotify(tile, buffer + offset, tile.getImgSize());
offset += tile.getImgSize();
}
}
else
{
Log::error() << "Render request failed for " << firstLine << Log::end;
std::unique_lock<std::mutex> tileBeingRenderedLock(tileCache().getTilesBeingRenderedLock());
for (const auto& tile : tileCombined.getTiles())
{
tileCache().forgetTileBeingRendered(tile);
}
}
}
catch (const std::exception& exc)
{
Log::error("Failed to process tile response [" + firstLine + "]: " + exc.what() + ".");
//FIXME: Return error.
//sendTextFrame("error: cmd=tile kind=syntax");
}
}
void DocumentBroker::startDestroy(const std::string& id)
{
std::unique_lock<std::mutex> lock(_mutex);
auto currentSession = _sessions.find(id);
assert(currentSession != _sessions.end());
// Check if session which is being destroyed is last non-readonly session
bool lastEditableSession = !currentSession->second->isReadOnly();
for (auto& it: _sessions)
{
if (it.second->getId() == id)
continue;
if (!it.second->isReadOnly())
{
lastEditableSession = false;
}
}
// Last editable session going away
_lastEditableSession = lastEditableSession;
// Last view going away, can destroy.
_markToDestroy = (_sessions.size() <= 1);
}
void DocumentBroker::setModified(const bool value)
{
_tileCache->setUnsavedChanges(value);
_isModified = value;
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */