diff --git a/wsd/ClientSession.hpp b/wsd/ClientSession.hpp index 97f5591e3..51b810a18 100644 --- a/wsd/ClientSession.hpp +++ b/wsd/ClientSession.hpp @@ -162,7 +162,6 @@ private: std::unique_ptr _wopiFileInfo; SenderQueue> _senderQueue; - std::thread _senderThread; std::atomic _stop; }; diff --git a/wsd/SenderQueue.cpp b/wsd/SenderQueue.cpp deleted file mode 100644 index f614dd014..000000000 --- a/wsd/SenderQueue.cpp +++ /dev/null @@ -1,161 +0,0 @@ -/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */ -/* - * This file is part of the LibreOffice project. - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - */ - -#include "config.h" - -#include "SenderQueue.hpp" - -#include - -#include -#include - -SenderQueue SenderQueue::TheQueue; -SenderThreadPool SenderThreadPool::ThePool; - -bool SenderThreadPool::dispatchItem(const size_t timeoutMs) -{ - SendItem item; - if (SenderQueue::instance().waitDequeue(item, timeoutMs)) - { - auto session = item.Session.lock(); - if (session) - { - // Make sure we have extra threads before potentially getting stuck. - checkAndGrow(); - - try - { - IdleCountGuard guard(_idleThreadCount); - - const std::vector& data = item.Data->data(); - if (item.Data->isBinary()) - { - return session->sendBinaryFrame(data.data(), data.size()); - } - else - { - return session->sendTextFrame(data.data(), data.size()); - } - } - catch (const std::exception& ex) - { - LOG_ERR("Failed to send tile to " << session->getName() << ": " << ex.what()); - } - } - else - { - LOG_WRN("Discarding send data for expired session."); - } - } - - return false; -} - -std::shared_ptr SenderThreadPool::createThread() -{ - if (!stopping()) - { - std::shared_ptr data(std::make_shared()); - std::thread thread([this, data]{ threadFunction(data); }); - data->swap(thread); - return data; - } - - return nullptr; -} - -void SenderThreadPool::checkAndGrow() -{ - auto queueSize = SenderQueue::instance().size(); - if (_idleThreadCount <= 1 && queueSize > 1) - { - std::lock_guard lock(_mutex); - - // Check again, in case rebalancing already did the trick. - queueSize = SenderQueue::instance().size(); - if (_idleThreadCount <= 1 && queueSize > 1 && - _maxThreadCount > _threads.size() && !stopping()) - { - LOG_TRC("SenderThreadPool: growing. Cur: " << _threads.size() << ", Max: " << _maxThreadCount << - ", Idles: " << _idleThreadCount << ", Q: " << queueSize); - - // We have room to grow. - auto newThreadData = createThread(); - if (newThreadData) - { - _threads.push_back(newThreadData); - } - } - } -} - -bool SenderThreadPool::rebalance() -{ - std::unique_lock lock(_mutex, std::defer_lock); - if (!lock.try_lock()) - { - // A sibling is likely rebalancing. - return false; - } - - const auto threadCount = _threads.size(); - LOG_DBG("SenderThreadPool: rebalancing " << threadCount << " threads."); - - // First cleanup the non-joinables. - for (int i = _threads.size() - 1; i >= 0; --i) - { - if (!_threads[i]->joinable()) - { - _threads.erase(_threads.begin() + i); - } - } - - const auto threadCountNew = _threads.size(); - LOG_DBG("SenderThreadPool: removed " << threadCount - threadCountNew << - " dead threads to have " << threadCountNew << "."); - - while (_threads.size() < _optimalThreadCount && !stopping()) - { - auto newThreadData = createThread(); - if (newThreadData) - { - _threads.push_back(newThreadData); - } - } - - // Need to reduce? - LOG_DBG("SenderThreadPool: threads: " << _threads.size()); - return _threads.size() > _optimalThreadCount; -} - -void SenderThreadPool::threadFunction(const std::shared_ptr& data) -{ - LOG_DBG("SenderThread started"); - ++_idleThreadCount; - - while (!stopping()) - { - if (!dispatchItem(HousekeepIdleIntervalMs) && !stopping()) - { - // We timed out. Seems we have more threads than work. - if (rebalance()) - { - // We've been considered expendable. - LOG_DBG("SenderThread marked to die"); - break; - } - } - } - - data->detach(); - LOG_DBG("SenderThread finished"); -} - -/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/wsd/SenderQueue.hpp b/wsd/SenderQueue.hpp index 28996112a..6debf5956 100644 --- a/wsd/SenderQueue.hpp +++ b/wsd/SenderQueue.hpp @@ -188,108 +188,6 @@ private: std::atomic _stop; }; -/// Pool of sender threads. -/// These are dedicated threads that only dequeue from -/// the SenderQueue and send to the target Session's WS. -/// This pool has long-running threads that grow -/// only on congention and shrink otherwise. -class SenderThreadPool final -{ -public: - SenderThreadPool() : - _optimalThreadCount(std::min(2U, std::thread::hardware_concurrency())), - _maxThreadCount(_optimalThreadCount), - _idleThreadCount(0), - _stop(false) - { - LOG_INF("Creating SenderThreadPool with " << _optimalThreadCount << " optimal threads."); - for (size_t i = 0; i < _optimalThreadCount; ++i) - { - _threads.push_back(createThread()); - } - } - - ~SenderThreadPool() - { - // Stop us and the queue. - stop(); - //SenderQueue::instance().stop(); - - for (const auto& threadData : _threads) - { - if (threadData && threadData->joinable()) - { - threadData->join(); - } - } - } - - void stop() { _stop = true; } - bool stopping() const { return _stop || TerminationFlag; } - - void incMaxThreadCount() { ++_maxThreadCount; } - void decMaxThreadCount() { --_maxThreadCount; } - -private: - - /// Count idle threads safely. - /// Decrements count on ctor, and increments on dtor. - class IdleCountGuard final - { - public: - IdleCountGuard(std::atomic& var) : - _var(var) - { - --_var; - } - - ~IdleCountGuard() - { - ++_var; - } - - private: - std::atomic& _var; - }; - - typedef std::thread ThreadData; - - /// Dequeue a SendItem and send it. - bool dispatchItem(const size_t timeoutMs); - - /// Create a new thread and add to the pool. - std::shared_ptr createThread(); - - /// Rebalance the number of threads. - /// Returns true if we need to reduce the threads. - bool rebalance(); - - /// Grow the pool if congestion is detected. - void checkAndGrow(); - - /// The worker thread entry function. - void threadFunction(const std::shared_ptr& data); - -private: - /// A minimum of 2, but ideally as many as cores. - const size_t _optimalThreadCount; - - /// Never exceed this number of threads. - size_t _maxThreadCount; - - /// The number of threads not sending data. - std::atomic _idleThreadCount; - - /// Stop condition to take the pool down. - std::atomic _stop; - - std::vector> _threads; - mutable std::mutex _mutex; - - /// How often to do housekeeping when we idle. - static constexpr size_t HousekeepIdleIntervalMs = 60000; -}; - #endif /* vim:set shiftwidth=4 softtabstop=4 expandtab: */