b4163877e7
All call sites already effectively asked for an unsigned return type, including: * The ThreadPool ctor took an nWorkers argument of type sal_Int32, but internally stores that as std::size_t mnMaxWorkers. * ZipOutputStream::reduceScheduledThreadTasksToGivenNumberOrLess apparently benefits from an unsigned nThreadTasks parameter, getting rid of various casts in its implementation that were necessary to silence signed vs. unsigned comparison warnings. The only drawback is that comphelper::ThreadPool::getPreferredConcurrency() * 4 in package/source/zippackage/ZipPackageStream.cxx would now silently wrap around instead of causing UB on overflow (which could be detected with appropriate tools). Ideally, it would use some o3tl::saturating_mul if we had that, so add a TODO comment for now. While std:🧵:hardware_concurrency returns unsigned, it looked more natural to go with std::size_t here, as some call sites already used that (see above), so the implementation of ThreadPool::getPreferredConcurrency is a natural place to hide clamping std:🧵:hardware_concurrency() to std::size_t (in the unlikely case that std::size_t is of smaller rank than unsigned). This required addition of o3tl::clamp_to_unsigned in o3tl/safeint.hxx. Change-Id: I0a04a8b32e63ebfeb39f924c4b38520455a6fb38 Reviewed-on: https://gerrit.libreoffice.org/c/core/+/135309 Tested-by: Jenkins Reviewed-by: Stephan Bergmann <sbergman@redhat.com>
114 lines
3.6 KiB
C++
114 lines
3.6 KiB
C++
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
|
|
/*
|
|
* This file is part of the LibreOffice project.
|
|
*
|
|
* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
*/
|
|
|
|
#ifndef INCLUDED_COMPHELPER_THREADPOOL_HXX
|
|
#define INCLUDED_COMPHELPER_THREADPOOL_HXX
|
|
|
|
#include <sal/config.h>
|
|
#include <rtl/ref.hxx>
|
|
#include <comphelper/comphelperdllapi.h>
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
#include <cstddef>
|
|
#include <vector>
|
|
#include <memory>
|
|
|
|
namespace comphelper
|
|
{
|
|
class ThreadTaskTag;
|
|
|
|
class COMPHELPER_DLLPUBLIC ThreadTask
|
|
{
|
|
friend class ThreadPool;
|
|
friend struct std::default_delete<ThreadTask>;
|
|
std::shared_ptr<ThreadTaskTag> mpTag;
|
|
|
|
/// execute this task
|
|
void exec();
|
|
protected:
|
|
/// override to get your task performed by the pool
|
|
virtual void doWork() = 0;
|
|
/// once pushed ThreadTasks are destroyed by the pool
|
|
virtual ~ThreadTask() {}
|
|
public:
|
|
ThreadTask(std::shared_ptr<ThreadTaskTag> pTag);
|
|
};
|
|
|
|
/// A very basic thread-safe thread pool implementation
|
|
class COMPHELPER_DLLPUBLIC ThreadPool final
|
|
{
|
|
public:
|
|
/// returns a pointer to a shared pool with optimal thread
|
|
/// count for the CPU
|
|
static ThreadPool& getSharedOptimalPool();
|
|
|
|
static std::shared_ptr<ThreadTaskTag> createThreadTaskTag();
|
|
|
|
static bool isTaskTagDone(const std::shared_ptr<ThreadTaskTag>&);
|
|
|
|
/// returns a configurable max-concurrency
|
|
/// limit to avoid spawning an unnecessarily
|
|
/// large number of threads on high-core boxes.
|
|
/// MAX_CONCURRENCY env. var. controls the cap.
|
|
static std::size_t getPreferredConcurrency();
|
|
|
|
ThreadPool( std::size_t nWorkers );
|
|
~ThreadPool();
|
|
|
|
/// push a new task onto the work queue
|
|
void pushTask( std::unique_ptr<ThreadTask> pTask);
|
|
|
|
/** Wait until all queued tasks associated with the tag are completed
|
|
@param bJoin - if set call joinThreadsIfIdle() at the end
|
|
*/
|
|
void waitUntilDone(const std::shared_ptr<ThreadTaskTag>&, bool bJoin = true);
|
|
|
|
/// join all threads if there are no tasks presently.
|
|
void joinThreadsIfIdle();
|
|
|
|
/// return true if there are no queued or worked-on tasks
|
|
bool isIdle() const { return maTasks.empty() && mnBusyWorkers == 0; };
|
|
|
|
/// return the number of live worker threads
|
|
sal_Int32 getWorkerCount() const { return mnMaxWorkers; }
|
|
|
|
/// wait until all work is completed, then join all threads
|
|
void shutdown();
|
|
|
|
private:
|
|
ThreadPool(const ThreadPool&) = delete;
|
|
ThreadPool& operator=(const ThreadPool&) = delete;
|
|
|
|
class ThreadWorker;
|
|
friend class ThreadWorker;
|
|
|
|
/** Pop a work task
|
|
@param bWait - if set wait until task present or termination
|
|
@return a new task to perform, or NULL if list empty or terminated
|
|
*/
|
|
std::unique_ptr<ThreadTask> popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait );
|
|
void shutdownLocked(std::unique_lock<std::mutex>&);
|
|
void incBusyWorker();
|
|
void decBusyWorker();
|
|
|
|
/// signalled when all in-progress tasks are complete
|
|
std::mutex maMutex;
|
|
std::condition_variable maTasksChanged;
|
|
bool mbTerminate;
|
|
std::size_t const mnMaxWorkers;
|
|
std::size_t mnBusyWorkers;
|
|
std::vector< std::unique_ptr<ThreadTask> > maTasks;
|
|
std::vector< rtl::Reference< ThreadWorker > > maWorkers;
|
|
};
|
|
|
|
} // namespace comphelper
|
|
|
|
#endif // INCLUDED_COMPHELPER_THREADPOOL_HXX
|
|
|
|
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|