Move auto-save of document into the DocumentBroker poll more convincingly.

This commit is contained in:
Michael Meeks 2017-03-09 18:19:53 +00:00 committed by Jan Holesovsky
parent c097925e51
commit 842b261d9f
6 changed files with 84 additions and 122 deletions

View file

@ -30,7 +30,8 @@ namespace {
}
}
SocketPoll::SocketPoll(const std::string& threadName) :
SocketPoll::SocketPoll(const std::string& threadName,
bool withThread) :
_name(threadName),
_stop(false)
{
@ -45,16 +46,20 @@ SocketPoll::SocketPoll(const std::string& threadName) :
getWakeupsArray().push_back(_wakeup[1]);
}
_thread = std::thread(&SocketPoll::pollingThread, this);
if (withThread)
{
_thread = std::thread(&SocketPoll::pollingThread, this);
_owner = _thread.get_id();
}
else
_owner = std::this_thread::get_id();
}
SocketPoll::~SocketPoll()
{
stop();
if (_thread.joinable())
{
_thread.join();
}
::close(_wakeup[0]);
::close(_wakeup[1]);

View file

@ -213,7 +213,7 @@ class SocketPoll
{
public:
/// Create a socket poll, called rather infrequently.
SocketPoll(const std::string& threadName);
SocketPoll(const std::string& threadName, bool withThread = true);
~SocketPoll();
/// Stop the polling thread.
@ -245,7 +245,7 @@ public:
/// Are we running in either shutdown, or the polling thread.
bool isCorrectThread()
{
return _stop || std::this_thread::get_id() == _thread.get_id();
return _stop || std::this_thread::get_id() == _owner;
}
public:
@ -450,6 +450,7 @@ protected:
std::atomic<bool> _stop;
/// The polling thread.
std::thread _thread;
std::thread::id _owner;
};
class StreamSocket;

View file

@ -504,7 +504,7 @@ bool ClientSession::handleKitToClientMessage(const char* buffer, const int lengt
}
// Save to Storage and log result.
docBroker->save(getId(), success, result);
docBroker->saveToStorage(getId(), success, result);
return true;
}
}

View file

@ -133,9 +133,11 @@ public:
virtual void pollingThread()
{
std::unique_lock<std::mutex> lk(_lock);
while (!_docBroker && !_stop)
_start_cv.wait(lk);
{
std::unique_lock<std::mutex> lk(_lock);
while (!_docBroker && !_stop)
_start_cv.wait(lk);
}
if (_docBroker)
_docBroker->pollThread();
}
@ -184,6 +186,7 @@ std::shared_ptr<DocumentBroker> DocumentBroker::create(
return docBroker;
}
// The inner heart of the DocumentBroker - our poll loop.
void DocumentBroker::pollThread()
{
// Request a kit process for this doc.
@ -207,6 +210,8 @@ void DocumentBroker::pollThread()
_childProcess->setDocumentBroker(shared_from_this());
auto last30SecCheckTime = std::chrono::steady_clock::now();
// Main polling loop goodness.
while (!_stop && !TerminationFlag && !ShutdownRequestFlag)
{
@ -243,6 +248,32 @@ void DocumentBroker::pollThread()
}
_poll->poll(5000);
if (!std::getenv("LOOL_NO_AUTOSAVE") &&
std::chrono::duration_cast<std::chrono::seconds>
(std::chrono::steady_clock::now() - last30SecCheckTime).count() >= 30)
{
LOG_TRC("Trigger an autosave ...");
autoSave(false);
last30SecCheckTime = std::chrono::steady_clock::now();
}
}
// FIXME: probably we should stop listening on
// incoming sockets here if we have any.
auto lastSaveTime = _lastSaveTime;
auto saveTimeoutStart = std::chrono::steady_clock::now();
// Save before exit.
autoSave(true);
// wait 20 seconds for a save notification and quit.
while (lastSaveTime < saveTimeoutStart &&
std::chrono::duration_cast<std::chrono::seconds>
(std::chrono::steady_clock::now() - saveTimeoutStart).count() >= 20)
{
_poll->poll(5000);
}
}
@ -467,9 +498,10 @@ bool DocumentBroker::load(std::shared_ptr<ClientSession>& session, const std::st
return true;
}
bool DocumentBroker::save(const std::string& sessionId, bool success, const std::string& result)
bool DocumentBroker::saveToStorage(const std::string& sessionId,
bool success, const std::string& result)
{
std::unique_lock<std::mutex> lock(_saveMutex);
assert(_poll->isCorrectThread());
// If save requested, but core didn't save because document was unmodified
// notify the waiting thread, if any.
@ -477,8 +509,7 @@ bool DocumentBroker::save(const std::string& sessionId, bool success, const std:
{
LOG_DBG("Save skipped as document [" << _docKey << "] was not modified.");
_lastSaveTime = std::chrono::steady_clock::now();
lock.unlock();
_saveCV.notify_all();
_poll->wakeup();
return true;
}
@ -486,8 +517,6 @@ bool DocumentBroker::save(const std::string& sessionId, bool success, const std:
if (it == _sessions.end())
{
LOG_ERR("Session with sessionId [" << sessionId << "] not found while saving docKey [" << _docKey << "].");
lock.unlock();
_saveCV.notify_all();
return false;
}
@ -503,8 +532,7 @@ bool DocumentBroker::save(const std::string& sessionId, bool success, const std:
LOG_DBG("Skipping unnecessary saving to URI [" << uri << "] with docKey [" << _docKey <<
"]. File last modified " << _lastFileModifiedTime.elapsed() / 1000000 << " seconds ago.");
_lastSaveTime = std::chrono::steady_clock::now();
lock.unlock();
_saveCV.notify_all();
_poll->wakeup();
return true;
}
@ -522,6 +550,7 @@ bool DocumentBroker::save(const std::string& sessionId, bool success, const std:
_lastFileModifiedTime = newFileModifiedTime;
_tileCache->saveLastModified(_lastFileModifiedTime);
_lastSaveTime = std::chrono::steady_clock::now();
_poll->wakeup();
// Calling getWOPIFileInfo() or getLocalFileInfo() has the side-effect of updating
// StorageBase::_fileInfo. Get the timestamp of the document as persisted in its storage
@ -542,8 +571,6 @@ bool DocumentBroker::save(const std::string& sessionId, bool success, const std:
LOG_DBG("Saved docKey [" << _docKey << "] to URI [" << uri << "] and updated tile cache. Document modified timestamp: " <<
Poco::DateTimeFormatter::format(Poco::DateTime(_documentLastModifiedTime),
Poco::DateTimeFormat::ISO8601_FORMAT));
lock.unlock();
_saveCV.notify_all();
return true;
}
else if (storageSaveResult == StorageBase::SaveResult::DISKFULL)
@ -565,15 +592,11 @@ bool DocumentBroker::save(const std::string& sessionId, bool success, const std:
it->second->sendTextFrame("error: cmd=storage kind=savefailed");
}
lock.unlock();
_saveCV.notify_all();
return false;
}
bool DocumentBroker::autoSave(const bool force, const size_t waitTimeoutMs, std::unique_lock<std::mutex>& lock)
bool DocumentBroker::autoSave(const bool force)
{
Util::assertIsLocked(lock);
if (_sessions.empty() || _storage == nullptr || !_isLoaded ||
!_childProcess->isAlive() || (!_isModified && !force))
{
@ -582,8 +605,7 @@ bool DocumentBroker::autoSave(const bool force, const size_t waitTimeoutMs, std:
return true;
}
// Remeber the last save time, since this is the predicate.
const auto lastSaveTime = _lastSaveTime;
// Remember the last save time, since this is the predicate.
LOG_TRC("Checking to autosave [" << _docKey << "].");
bool sent = false;
@ -609,26 +631,12 @@ bool DocumentBroker::autoSave(const bool force, const size_t waitTimeoutMs, std:
}
}
if (sent && waitTimeoutMs > 0)
{
LOG_TRC("Waiting for save event for [" << _docKey << "].");
_saveCV.wait_for(lock, std::chrono::milliseconds(waitTimeoutMs));
if (lastSaveTime != _lastSaveTime)
{
LOG_DBG("Successfully persisted document [" << _docKey << "] or document was not modified.");
return true;
}
return false;
}
return sent;
}
bool DocumentBroker::sendUnoSave(const bool dontSaveIfUnmodified)
{
LOG_INF("Autosave triggered for doc [" << _docKey << "].");
Util::assertIsLocked(_mutex);
std::shared_ptr<ClientSession> savingSession;
for (auto& sessionIt : _sessions)
@ -758,9 +766,12 @@ size_t DocumentBroker::addSession(std::shared_ptr<ClientSession>& session)
return count;
}
size_t DocumentBroker::removeSession(const std::string& id)
size_t DocumentBroker::removeSession(const std::string& id, bool destroyIfLast)
{
Util::assertIsLocked(_mutex);
auto guard = getLock();
if (destroyIfLast)
destroyIfLastEditor(id);
try
{
@ -1072,7 +1083,7 @@ void DocumentBroker::handleTileCombinedResponse(const std::vector<char>& payload
}
}
bool DocumentBroker::startDestroy(const std::string& id)
void DocumentBroker::destroyIfLastEditor(const std::string& id)
{
Util::assertIsLocked(_mutex);
@ -1081,7 +1092,7 @@ bool DocumentBroker::startDestroy(const std::string& id)
{
// We could be called before adding any sessions.
// For example when a socket disconnects before loading.
return false;
return;
}
// Check if the session being destroyed is the last non-readonly session or not.
@ -1103,9 +1114,9 @@ bool DocumentBroker::startDestroy(const std::string& id)
// Last view going away, can destroy.
_markToDestroy = (_sessions.size() <= 1);
_stop = true;
LOG_DBG("startDestroy on session [" << id << "] on docKey [" << _docKey <<
"], markToDestroy: " << _markToDestroy << ", lastEditableSession: " << _lastEditableSession);
return _lastEditableSession;
}
void DocumentBroker::setModified(const bool value)

View file

@ -233,19 +233,17 @@ public:
bool isLoaded() const { return _isLoaded; }
void setLoaded() { _isLoaded = true; }
/// Save the document to Storage if needs persisting.
bool save(const std::string& sesionId, bool success, const std::string& result = "");
/// Save the document to Storage if it needs persisting.
bool saveToStorage(const std::string& sesionId, bool success, const std::string& result = "");
bool isModified() const { return _isModified; }
void setModified(const bool value);
/// Save the document if the document is modified.
/// @param force when true, will force saving if there
/// has been any recent activity after the last save.
/// @param waitTimeoutMs when >0 will wait for the save to
/// complete before returning, or timeout.
/// @return true if attempts to save or it also waits
/// and receives save notification. Otherwise, false.
bool autoSave(const bool force, const size_t waitTimeoutMs, std::unique_lock<std::mutex>& lock);
bool autoSave(const bool force);
Poco::URI getPublicUri() const { return _uriPublic; }
Poco::URI getJailedUri() const { return _uriJailed; }
@ -270,7 +268,7 @@ public:
size_t queueSession(std::shared_ptr<ClientSession>& session);
/// Removes a session by ID. Returns the new number of sessions.
size_t removeSession(const std::string& id);
size_t removeSession(const std::string& id, bool destroyIfLast = false);
void addSocketToPoll(const std::shared_ptr<Socket>& socket);
@ -299,11 +297,7 @@ public:
void handleTileResponse(const std::vector<char>& payload);
void handleTileCombinedResponse(const std::vector<char>& payload);
/// Called before destroying any session.
/// This method calculates and sets important states of
/// session being destroyed. Returns true if session id
/// is the last editable session.
bool startDestroy(const std::string& id);
void destroyIfLastEditor(const std::string& id);
bool isMarkedToDestroy() const { return _markToDestroy; }
bool handleInput(const std::vector<char>& payload);
@ -407,8 +401,6 @@ private:
int _cursorWidth;
int _cursorHeight;
mutable std::mutex _mutex;
std::condition_variable _saveCV;
std::mutex _saveMutex;
std::unique_ptr<DocumentBrokerPoll> _poll;
std::atomic<bool> _stop;

View file

@ -1411,9 +1411,7 @@ static void removeDocBrokerSession(const std::shared_ptr<DocumentBroker>& docBro
auto lock = docBroker->getLock();
if (!id.empty())
{
docBroker->removeSession(id);
}
if (docBroker->getSessionsCount() == 0 || !docBroker->isAlive())
{
@ -1677,9 +1675,7 @@ private:
void onDisconnect() override
{
if (_clientSession)
{
saveDocument();
}
disposeSession();
const size_t curConnections = --LOOLWSD::NumConnections;
LOG_TRC("Disconnected connection #" << _connectionNum << " of " <<
@ -2265,7 +2261,8 @@ private:
LOG_WRN("Failed to connect DocBroker and Client Session.");
}
void saveDocument()
// this session went away - cleanup now.
void disposeSession()
{
LOG_CHECK_RET(_clientSession && "Null ClientSession instance", );
const auto docBroker = _clientSession->getDocumentBroker();
@ -2277,25 +2274,9 @@ private:
// Connection terminated. Destroy session.
LOG_DBG("Client session [" << _id << "] on docKey [" << docKey << "] terminated. Cleaning up.");
auto docLock = docBroker->getLock();
// We issue a force-save when last editable (non-readonly) session is going away
const bool forceSave = docBroker->startDestroy(_id);
if (forceSave)
{
LOG_INF("Shutdown of the last editable (non-readonly) session, saving the document before tearing down.");
}
// We need to wait until the save notification reaches us
// and Storage persists the document.
if (!docBroker->autoSave(forceSave, COMMAND_TIMEOUT_MS, docLock))
{
LOG_ERR("Auto-save before closing failed.");
}
const auto sessionsCount = docBroker->removeSession(_id);
docLock.unlock();
const auto sessionsCount = docBroker->removeSession(_id, true);
if (sessionsCount == 0)
{
// We've supposedly destroyed the last session, now cleanup.
@ -2428,9 +2409,13 @@ public:
void dumpState()
{
std::cerr << "LOOLWSDServer:\n"
<< " Ports: server " << ClientPortNumber
<< " prisoner " << MasterPortNumber << "\n"
<< " stop: " << _stop << "\n"
<< " TerminationFlag: " << TerminationFlag << "\n"
<< " isShuttingDown: " << ShutdownRequestFlag << "\n";
<< " isShuttingDown: " << ShutdownRequestFlag << "\n"
<< " NewChildren: " << NewChildren.size() << "\n"
<< " OutstandingForks: " << OutstandingForks << "\n";
std::cerr << "Server poll:\n";
_acceptPoll.dumpState();
@ -2441,7 +2426,8 @@ public:
std::cerr << "Prisoner poll:\n";
PrisonerPoll.dumpState();
std::cerr << "Document Broker polls:\n";
std::cerr << "Document Broker polls "
<< "[ " << DocBrokers.size() << " ]:\n";
for (auto &i : DocBrokers)
i.second->dumpState();
}
@ -2665,49 +2651,16 @@ int LOOLWSD::main(const std::vector<std::string>& /*args*/)
time_t startTimeSpan = time(nullptr);
#endif
// FIXME: all of this needs cleaning up and putting in the
// relevant polls.
auto last30SecCheckTime = std::chrono::steady_clock::now();
/// Something of a hack to get woken up on exit.
SocketPoll mainWait("main", false);
while (!TerminationFlag && !ShutdownRequestFlag)
{
UnitWSD::get().invokeTest();
if (TerminationFlag || handleShutdownRequest())
{
break;
}
if (!std::getenv("LOOL_NO_AUTOSAVE") &&
std::chrono::duration_cast<std::chrono::seconds>
(std::chrono::steady_clock::now() - last30SecCheckTime).count() >= 30)
{
try
{
#if 0
std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex);
cleanupDocBrokers();
for (auto& pair : DocBrokers)
{
auto docLock = pair.second->getDeferredLock();
if (doclock.try_lock())
{
pair.second->autosave(false, 0, doclock);
}
}
#endif
}
catch (const std::exception& exc)
{
LOG_ERR("Exception: " << exc.what());
}
mainWait.poll(30 * 1000 /* ms */);
last30SecCheckTime = std::chrono::steady_clock::now();
}
else
{
// Wait if we had done no work.
std::this_thread::sleep_for(std::chrono::milliseconds(CHILD_REBALANCE_INTERVAL_MS));
}
std::unique_lock<std::mutex> docBrokersLock(DocBrokersMutex);
cleanupDocBrokers();
#if ENABLE_DEBUG
if (careerSpanSeconds > 0 && time(nullptr) > startTimeSpan + careerSpanSeconds)