wsd: minimize wait when DocBroker terminates

Add checking up the stack to detect when we have clean queues
and buffers so we can exit fast.

Change-Id: I82c3843f816bbe869094c21f070774e6d034ac65
Reviewed-on: https://gerrit.libreoffice.org/c/online/+/90358
Reviewed-by: Michael Meeks <michael.meeks@collabora.com>
Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice@gmail.com>
This commit is contained in:
Michael Meeks 2020-03-24 12:20:41 +00:00
parent 9c2f6382d2
commit a73590d81f
4 changed files with 52 additions and 3 deletions

View file

@ -200,6 +200,11 @@ void SocketPoll::wakeupWorld()
wakeup(fd); wakeup(fd);
} }
bool ProtocolHandlerInterface::hasPendingWork() const
{
return _msgHandler && _msgHandler->hasQueuedMessages();
}
#if !MOBILEAPP #if !MOBILEAPP
void SocketPoll::insertNewWebSocketSync( void SocketPoll::insertNewWebSocketSync(

View file

@ -162,6 +162,9 @@ public:
std::chrono::steady_clock::time_point now, std::chrono::steady_clock::time_point now,
int events) = 0; int events) = 0;
/// Is all data sent, so tha we can shutdown ?
virtual bool hasPendingWork() const { return false; }
/// manage latency issues around packet aggregation /// manage latency issues around packet aggregation
void setNoDelay() void setNoDelay()
{ {
@ -290,7 +293,7 @@ public:
} }
/// Asserts in the debug builds, otherwise just logs. /// Asserts in the debug builds, otherwise just logs.
void assertCorrectThread() void assertCorrectThread() const
{ {
if (InhibitThreadChecks) if (InhibitThreadChecks)
return; return;
@ -392,6 +395,9 @@ public:
_msgHandler = msgHandler; _msgHandler = msgHandler;
} }
/// Do we have something to send ?
virtual bool hasPendingWork() const;
/// Clear all external references /// Clear all external references
virtual void dispose() { _msgHandler.reset(); } virtual void dispose() { _msgHandler.reset(); }
@ -768,6 +774,21 @@ public:
return _pollSockets.size(); return _pollSockets.size();
} }
bool hasPendingWork() const
{
assertCorrectThread();
if (_newCallbacks.size() > 0 ||
_newSockets.size() > 0)
return true;
for (auto &i : _pollSockets)
if (i->hasPendingWork())
return true;
return false;
}
const std::string& name() const { return _name; } const std::string& name() const { return _name; }
/// Start the polling thread (if desired) /// Start the polling thread (if desired)
@ -926,6 +947,14 @@ public:
return events; return events;
} }
bool hasPendingWork() const override
{
assertCorrectThread();
if (!_outBuffer.empty() || !_inBuffer.empty())
return true;
return _socketHandler && _socketHandler->hasPendingWork();
}
/// Send data to the socket peer. /// Send data to the socket peer.
void send(const char* data, const int len, const bool flush = true) void send(const char* data, const int len, const bool flush = true)
{ {

View file

@ -448,14 +448,14 @@ void DocumentBroker::pollThread()
<< ", TerminationFlag: " << SigUtil::getTerminationFlag() << ", TerminationFlag: " << SigUtil::getTerminationFlag()
<< ". Terminating child with reason: [" << _closeReason << "]."); << ". Terminating child with reason: [" << _closeReason << "].");
const auto flushStartTime = std::chrono::steady_clock::now(); const auto flushStartTime = std::chrono::steady_clock::now();
while (_poll->getSocketCount()) while (_poll->hasPendingWork() || hasDisconnectingSessions())
{ {
const auto now = std::chrono::steady_clock::now(); const auto now = std::chrono::steady_clock::now();
const int elapsedMs = std::chrono::duration_cast<std::chrono::milliseconds>(now - flushStartTime).count(); const int elapsedMs = std::chrono::duration_cast<std::chrono::milliseconds>(now - flushStartTime).count();
if (elapsedMs > flushTimeoutMs) if (elapsedMs > flushTimeoutMs)
break; break;
_poll->poll(std::min(flushTimeoutMs - elapsedMs, POLL_TIMEOUT_MS / 5)); _poll->poll(std::min(flushTimeoutMs - elapsedMs, POLL_TIMEOUT_MS / 10));
} }
LOG_INF("Finished flushing socket for doc [" << _docKey << "]. stop: " << _stop << ", continuePolling: " << LOG_INF("Finished flushing socket for doc [" << _docKey << "]. stop: " << _stop << ", continuePolling: " <<
@ -480,6 +480,18 @@ void DocumentBroker::pollThread()
LOG_INF("Finished docBroker polling thread for docKey [" << _docKey << "]."); LOG_INF("Finished docBroker polling thread for docKey [" << _docKey << "].");
} }
bool DocumentBroker::hasDisconnectingSessions() const
{
for (const auto& pair : _sessions)
{
const std::shared_ptr<ClientSession> &session = pair.second;
// need to wait around to fetch clipboards from disconnecting sessions.
if (session->inWaitDisconnected())
return true;
}
return false;
}
bool DocumentBroker::isAlive() const bool DocumentBroker::isAlive() const
{ {
if (!_stop || _poll->isAlive()) if (!_stop || _poll->isAlive())

View file

@ -442,6 +442,9 @@ private:
/// Starts the Kit <-> DocumentBroker shutdown handshake /// Starts the Kit <-> DocumentBroker shutdown handshake
void disconnectSessionInternal(const std::string& id); void disconnectSessionInternal(const std::string& id);
/// Are any of our sessions still dis-connecting ?
bool hasDisconnectingSessions() const;
/// Forward a message from child session to its respective client session. /// Forward a message from child session to its respective client session.
bool forwardToClient(const std::shared_ptr<Message>& payload); bool forwardToClient(const std::shared_ptr<Message>& payload);