Kill SenderThreadPool.
This commit is contained in:
parent
e16fc97c15
commit
33d1fa7cc0
3 changed files with 0 additions and 264 deletions
|
@ -162,7 +162,6 @@ private:
|
|||
std::unique_ptr<WopiStorage::WOPIFileInfo> _wopiFileInfo;
|
||||
|
||||
SenderQueue<std::shared_ptr<Message>> _senderQueue;
|
||||
std::thread _senderThread;
|
||||
std::atomic<bool> _stop;
|
||||
};
|
||||
|
||||
|
|
|
@ -1,161 +0,0 @@
|
|||
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
|
||||
/*
|
||||
* This file is part of the LibreOffice project.
|
||||
*
|
||||
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#include "SenderQueue.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
#include <Protocol.hpp>
|
||||
#include <Log.hpp>
|
||||
|
||||
SenderQueue SenderQueue::TheQueue;
|
||||
SenderThreadPool SenderThreadPool::ThePool;
|
||||
|
||||
bool SenderThreadPool::dispatchItem(const size_t timeoutMs)
|
||||
{
|
||||
SendItem item;
|
||||
if (SenderQueue::instance().waitDequeue(item, timeoutMs))
|
||||
{
|
||||
auto session = item.Session.lock();
|
||||
if (session)
|
||||
{
|
||||
// Make sure we have extra threads before potentially getting stuck.
|
||||
checkAndGrow();
|
||||
|
||||
try
|
||||
{
|
||||
IdleCountGuard guard(_idleThreadCount);
|
||||
|
||||
const std::vector<char>& data = item.Data->data();
|
||||
if (item.Data->isBinary())
|
||||
{
|
||||
return session->sendBinaryFrame(data.data(), data.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
return session->sendTextFrame(data.data(), data.size());
|
||||
}
|
||||
}
|
||||
catch (const std::exception& ex)
|
||||
{
|
||||
LOG_ERR("Failed to send tile to " << session->getName() << ": " << ex.what());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WRN("Discarding send data for expired session.");
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
std::shared_ptr<SenderThreadPool::ThreadData> SenderThreadPool::createThread()
|
||||
{
|
||||
if (!stopping())
|
||||
{
|
||||
std::shared_ptr<ThreadData> data(std::make_shared<ThreadData>());
|
||||
std::thread thread([this, data]{ threadFunction(data); });
|
||||
data->swap(thread);
|
||||
return data;
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void SenderThreadPool::checkAndGrow()
|
||||
{
|
||||
auto queueSize = SenderQueue::instance().size();
|
||||
if (_idleThreadCount <= 1 && queueSize > 1)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
|
||||
// Check again, in case rebalancing already did the trick.
|
||||
queueSize = SenderQueue::instance().size();
|
||||
if (_idleThreadCount <= 1 && queueSize > 1 &&
|
||||
_maxThreadCount > _threads.size() && !stopping())
|
||||
{
|
||||
LOG_TRC("SenderThreadPool: growing. Cur: " << _threads.size() << ", Max: " << _maxThreadCount <<
|
||||
", Idles: " << _idleThreadCount << ", Q: " << queueSize);
|
||||
|
||||
// We have room to grow.
|
||||
auto newThreadData = createThread();
|
||||
if (newThreadData)
|
||||
{
|
||||
_threads.push_back(newThreadData);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool SenderThreadPool::rebalance()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(_mutex, std::defer_lock);
|
||||
if (!lock.try_lock())
|
||||
{
|
||||
// A sibling is likely rebalancing.
|
||||
return false;
|
||||
}
|
||||
|
||||
const auto threadCount = _threads.size();
|
||||
LOG_DBG("SenderThreadPool: rebalancing " << threadCount << " threads.");
|
||||
|
||||
// First cleanup the non-joinables.
|
||||
for (int i = _threads.size() - 1; i >= 0; --i)
|
||||
{
|
||||
if (!_threads[i]->joinable())
|
||||
{
|
||||
_threads.erase(_threads.begin() + i);
|
||||
}
|
||||
}
|
||||
|
||||
const auto threadCountNew = _threads.size();
|
||||
LOG_DBG("SenderThreadPool: removed " << threadCount - threadCountNew <<
|
||||
" dead threads to have " << threadCountNew << ".");
|
||||
|
||||
while (_threads.size() < _optimalThreadCount && !stopping())
|
||||
{
|
||||
auto newThreadData = createThread();
|
||||
if (newThreadData)
|
||||
{
|
||||
_threads.push_back(newThreadData);
|
||||
}
|
||||
}
|
||||
|
||||
// Need to reduce?
|
||||
LOG_DBG("SenderThreadPool: threads: " << _threads.size());
|
||||
return _threads.size() > _optimalThreadCount;
|
||||
}
|
||||
|
||||
void SenderThreadPool::threadFunction(const std::shared_ptr<ThreadData>& data)
|
||||
{
|
||||
LOG_DBG("SenderThread started");
|
||||
++_idleThreadCount;
|
||||
|
||||
while (!stopping())
|
||||
{
|
||||
if (!dispatchItem(HousekeepIdleIntervalMs) && !stopping())
|
||||
{
|
||||
// We timed out. Seems we have more threads than work.
|
||||
if (rebalance())
|
||||
{
|
||||
// We've been considered expendable.
|
||||
LOG_DBG("SenderThread marked to die");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data->detach();
|
||||
LOG_DBG("SenderThread finished");
|
||||
}
|
||||
|
||||
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|
|
@ -188,108 +188,6 @@ private:
|
|||
std::atomic<bool> _stop;
|
||||
};
|
||||
|
||||
/// Pool of sender threads.
|
||||
/// These are dedicated threads that only dequeue from
|
||||
/// the SenderQueue and send to the target Session's WS.
|
||||
/// This pool has long-running threads that grow
|
||||
/// only on congention and shrink otherwise.
|
||||
class SenderThreadPool final
|
||||
{
|
||||
public:
|
||||
SenderThreadPool() :
|
||||
_optimalThreadCount(std::min(2U, std::thread::hardware_concurrency())),
|
||||
_maxThreadCount(_optimalThreadCount),
|
||||
_idleThreadCount(0),
|
||||
_stop(false)
|
||||
{
|
||||
LOG_INF("Creating SenderThreadPool with " << _optimalThreadCount << " optimal threads.");
|
||||
for (size_t i = 0; i < _optimalThreadCount; ++i)
|
||||
{
|
||||
_threads.push_back(createThread());
|
||||
}
|
||||
}
|
||||
|
||||
~SenderThreadPool()
|
||||
{
|
||||
// Stop us and the queue.
|
||||
stop();
|
||||
//SenderQueue::instance().stop();
|
||||
|
||||
for (const auto& threadData : _threads)
|
||||
{
|
||||
if (threadData && threadData->joinable())
|
||||
{
|
||||
threadData->join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void stop() { _stop = true; }
|
||||
bool stopping() const { return _stop || TerminationFlag; }
|
||||
|
||||
void incMaxThreadCount() { ++_maxThreadCount; }
|
||||
void decMaxThreadCount() { --_maxThreadCount; }
|
||||
|
||||
private:
|
||||
|
||||
/// Count idle threads safely.
|
||||
/// Decrements count on ctor, and increments on dtor.
|
||||
class IdleCountGuard final
|
||||
{
|
||||
public:
|
||||
IdleCountGuard(std::atomic<size_t>& var) :
|
||||
_var(var)
|
||||
{
|
||||
--_var;
|
||||
}
|
||||
|
||||
~IdleCountGuard()
|
||||
{
|
||||
++_var;
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic<size_t>& _var;
|
||||
};
|
||||
|
||||
typedef std::thread ThreadData;
|
||||
|
||||
/// Dequeue a SendItem and send it.
|
||||
bool dispatchItem(const size_t timeoutMs);
|
||||
|
||||
/// Create a new thread and add to the pool.
|
||||
std::shared_ptr<ThreadData> createThread();
|
||||
|
||||
/// Rebalance the number of threads.
|
||||
/// Returns true if we need to reduce the threads.
|
||||
bool rebalance();
|
||||
|
||||
/// Grow the pool if congestion is detected.
|
||||
void checkAndGrow();
|
||||
|
||||
/// The worker thread entry function.
|
||||
void threadFunction(const std::shared_ptr<ThreadData>& data);
|
||||
|
||||
private:
|
||||
/// A minimum of 2, but ideally as many as cores.
|
||||
const size_t _optimalThreadCount;
|
||||
|
||||
/// Never exceed this number of threads.
|
||||
size_t _maxThreadCount;
|
||||
|
||||
/// The number of threads not sending data.
|
||||
std::atomic<size_t> _idleThreadCount;
|
||||
|
||||
/// Stop condition to take the pool down.
|
||||
std::atomic<bool> _stop;
|
||||
|
||||
std::vector<std::shared_ptr<ThreadData>> _threads;
|
||||
mutable std::mutex _mutex;
|
||||
|
||||
/// How often to do housekeeping when we idle.
|
||||
static constexpr size_t HousekeepIdleIntervalMs = 60000;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|
||||
|
|
Loading…
Reference in a new issue