da97e2ac18
The pool is checked for expansion before processing items from the queue. By tracking the number of idle thread (i.e. not currently sending data) we ensure that there is at least one idle thread before we invoke the socket. If there is not enough idle threads at that point, a new thread is spawned, so long as we're below the limit. This guarantees that even if all the active threads block on the socket, we'd always have one more to process new data (until we reach the limit, which is as many client connections as we have). Technically, a single slow connection could still monopolize all connections if there are many messages to be sent to it. For that we'd need to track and assign one thread per connection, something we don't currently do. Change-Id: Ic8b5e064da068b37bcfa773005495b198763c31d Reviewed-on: https://gerrit.libreoffice.org/31886 Reviewed-by: Ashod Nakashian <ashnakash@gmail.com> Tested-by: Ashod Nakashian <ashnakash@gmail.com>
159 lines
4.4 KiB
C++
159 lines
4.4 KiB
C++
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
|
|
/*
|
|
* This file is part of the LibreOffice project.
|
|
*
|
|
* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
*/
|
|
|
|
#include "SenderQueue.hpp"
|
|
|
|
#include <algorithm>
|
|
|
|
#include <Protocol.hpp>
|
|
#include <Log.hpp>
|
|
|
|
SenderQueue SenderQueue::TheQueue;
|
|
SenderThreadPool SenderThreadPool::ThePool;
|
|
|
|
bool SenderThreadPool::dispatchItem(const size_t timeoutMs)
|
|
{
|
|
SendItem item;
|
|
if (SenderQueue::instance().waitDequeue(item, timeoutMs))
|
|
{
|
|
auto session = item.Session.lock();
|
|
if (session)
|
|
{
|
|
// Make sure we have extra threads before potentially getting stuck.
|
|
checkAndGrow();
|
|
|
|
try
|
|
{
|
|
IdleCountGuard guard(_idleThreadCount);
|
|
|
|
const std::vector<char>& data = item.Data->data();
|
|
if (item.Data->isBinary())
|
|
{
|
|
return session->sendBinaryFrame(data.data(), data.size());
|
|
}
|
|
else
|
|
{
|
|
return session->sendTextFrame(data.data(), data.size());
|
|
}
|
|
}
|
|
catch (const std::exception& ex)
|
|
{
|
|
LOG_ERR("Failed to send tile to " << session->getName() << ": " << ex.what());
|
|
}
|
|
}
|
|
else
|
|
{
|
|
LOG_WRN("Discarding send data for expired session.");
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
std::shared_ptr<SenderThreadPool::ThreadData> SenderThreadPool::createThread()
|
|
{
|
|
if (!stopping())
|
|
{
|
|
std::shared_ptr<ThreadData> data(std::make_shared<ThreadData>());
|
|
std::thread thread([this, data]{ threadFunction(data); });
|
|
data->swap(thread);
|
|
return data;
|
|
}
|
|
|
|
return nullptr;
|
|
}
|
|
|
|
void SenderThreadPool::checkAndGrow()
|
|
{
|
|
auto queueSize = SenderQueue::instance().size();
|
|
if (_idleThreadCount <= 1 && queueSize > 1)
|
|
{
|
|
std::lock_guard<std::mutex> lock(_mutex);
|
|
|
|
// Check again, in case rebalancing already did the trick.
|
|
queueSize = SenderQueue::instance().size();
|
|
if (_idleThreadCount <= 1 && queueSize > 1 &&
|
|
_maxThreadCount > _threads.size() && !stopping())
|
|
{
|
|
LOG_TRC("SenderThreadPool: growing. Cur: " << _threads.size() << ", Max: " << _maxThreadCount <<
|
|
", Idles: " << _idleThreadCount << ", Q: " << queueSize);
|
|
|
|
// We have room to grow.
|
|
auto newThreadData = createThread();
|
|
if (newThreadData)
|
|
{
|
|
_threads.push_back(newThreadData);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
bool SenderThreadPool::rebalance()
|
|
{
|
|
std::unique_lock<std::mutex> lock(_mutex, std::defer_lock);
|
|
if (!lock.try_lock())
|
|
{
|
|
// A sibling is likely rebalancing.
|
|
return false;
|
|
}
|
|
|
|
const auto threadCount = _threads.size();
|
|
LOG_DBG("SenderThreadPool: rebalancing " << threadCount << " threads.");
|
|
|
|
// First cleanup the non-joinables.
|
|
for (int i = _threads.size() - 1; i >= 0; --i)
|
|
{
|
|
if (!_threads[i]->joinable())
|
|
{
|
|
_threads.erase(_threads.begin() + i);
|
|
}
|
|
}
|
|
|
|
const auto threadCountNew = _threads.size();
|
|
LOG_DBG("SenderThreadPool: removed " << threadCount - threadCountNew <<
|
|
" dead threads to have " << threadCountNew << ".");
|
|
|
|
while (_threads.size() < _optimalThreadCount && !stopping())
|
|
{
|
|
auto newThreadData = createThread();
|
|
if (newThreadData)
|
|
{
|
|
_threads.push_back(newThreadData);
|
|
}
|
|
}
|
|
|
|
// Need to reduce?
|
|
LOG_DBG("SenderThreadPool: threads: " << _threads.size());
|
|
return _threads.size() > _optimalThreadCount;
|
|
}
|
|
|
|
void SenderThreadPool::threadFunction(const std::shared_ptr<ThreadData>& data)
|
|
{
|
|
LOG_DBG("SenderThread started");
|
|
++_idleThreadCount;
|
|
|
|
while (!stopping())
|
|
{
|
|
if (!dispatchItem(HousekeepIdleIntervalMs) && !stopping())
|
|
{
|
|
// We timed out. Seems we have more threads than work.
|
|
if (rebalance())
|
|
{
|
|
// We've been considered expendable.
|
|
LOG_DBG("SenderThread marked to die");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
data->detach();
|
|
LOG_DBG("SenderThread finished");
|
|
}
|
|
|
|
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|