Use more appropriate return type for ThreadPool::getPreferredConcurrency

All call sites already effectively asked for an unsigned return type, including:
* The ThreadPool ctor took an nWorkers argument of type sal_Int32, but
  internally stores that as std::size_t mnMaxWorkers.
* ZipOutputStream::reduceScheduledThreadTasksToGivenNumberOrLess apparently
  benefits from an unsigned nThreadTasks parameter, getting rid of various casts
  in its implementation that were necessary to silence signed vs. unsigned
  comparison warnings.

The only drawback is that

  comphelper::ThreadPool::getPreferredConcurrency() * 4

in package/source/zippackage/ZipPackageStream.cxx would now silently wrap around
instead of causing UB on overflow (which could be detected with appropriate
tools).  Ideally, it would use some o3tl::saturating_mul if we had that, so add
a TODO comment for now.

While std:🧵:hardware_concurrency returns unsigned, it looked more natural
to go with std::size_t here, as some call sites already used that (see above),
so the implementation of ThreadPool::getPreferredConcurrency is a natural place
to hide clamping std:🧵:hardware_concurrency() to std::size_t (in the
unlikely case that std::size_t is of smaller rank than unsigned).

This required addition of o3tl::clamp_to_unsigned in o3tl/safeint.hxx.

Change-Id: I0a04a8b32e63ebfeb39f924c4b38520455a6fb38
Reviewed-on: https://gerrit.libreoffice.org/c/core/+/135309
Tested-by: Jenkins
Reviewed-by: Stephan Bergmann <sbergman@redhat.com>
This commit is contained in:
Stephan Bergmann 2022-06-02 15:14:56 +02:00
parent 86922db292
commit b4163877e7
7 changed files with 34 additions and 17 deletions

View file

@ -17,6 +17,7 @@
#include <stdlib.h>
#include <atomic>
#include <cstddef>
#include <thread>
#include <mutex>
@ -42,7 +43,7 @@ void ThreadPoolTest::testPreferredConcurrency()
{
// Check default.
auto nThreads = comphelper::ThreadPool::getPreferredConcurrency();
sal_Int32 nExpected = 4; // UTs are capped to 4.
std::size_t nExpected = 4; // UTs are capped to 4.
CPPUNIT_ASSERT_MESSAGE("Expected no more than 4 threads", nExpected >= nThreads);
#ifndef _WIN32
@ -51,7 +52,7 @@ void ThreadPoolTest::testPreferredConcurrency()
setenv("MAX_CONCURRENCY", std::to_string(nThreads).c_str(), true);
nThreads = comphelper::ThreadPool::getPreferredConcurrency();
CPPUNIT_ASSERT_MESSAGE("Expected no more than hardware threads",
nThreads <= static_cast<sal_Int32>(std::thread::hardware_concurrency()));
nThreads <= std::thread::hardware_concurrency());
// Revert and check. Again, nothing should change.
unsetenv("MAX_CONCURRENCY");

View file

@ -11,6 +11,7 @@
#include <com/sun/star/uno/Exception.hpp>
#include <config_options.h>
#include <o3tl/safeint.hxx>
#include <sal/config.h>
#include <sal/log.hxx>
#include <salhelper/thread.hxx>
@ -18,6 +19,7 @@
#include <memory>
#include <thread>
#include <chrono>
#include <cstddef>
#include <comphelper/debuggerinfo.hxx>
#include <utility>
@ -91,7 +93,7 @@ public:
}
};
ThreadPool::ThreadPool(sal_Int32 nWorkers)
ThreadPool::ThreadPool(std::size_t nWorkers)
: mbTerminate(true)
, mnMaxWorkers(nWorkers)
, mnBusyWorkers(0)
@ -116,7 +118,7 @@ std::shared_ptr< ThreadPool >& GetStaticThreadPool()
static std::shared_ptr< ThreadPool > POOL =
[]()
{
const sal_Int32 nThreads = ThreadPool::getPreferredConcurrency();
const std::size_t nThreads = ThreadPool::getPreferredConcurrency();
return std::make_shared< ThreadPool >( nThreads );
}();
return POOL;
@ -129,21 +131,22 @@ ThreadPool& ThreadPool::getSharedOptimalPool()
return *GetStaticThreadPool();
}
sal_Int32 ThreadPool::getPreferredConcurrency()
std::size_t ThreadPool::getPreferredConcurrency()
{
static sal_Int32 ThreadCount = []()
static std::size_t ThreadCount = []()
{
const sal_Int32 nHardThreads = std::max(std::thread::hardware_concurrency(), 1U);
sal_Int32 nThreads = nHardThreads;
const std::size_t nHardThreads = o3tl::clamp_to_unsigned<std::size_t>(
std::max(std::thread::hardware_concurrency(), 1U));
std::size_t nThreads = nHardThreads;
const char *pEnv = getenv("MAX_CONCURRENCY");
if (pEnv != nullptr)
{
// Override with user/admin preference.
nThreads = rtl_str_toInt32(pEnv, 10);
nThreads = o3tl::clamp_to_unsigned<std::size_t>(rtl_str_toInt32(pEnv, 10));
}
nThreads = std::min(nHardThreads, nThreads);
return std::max<sal_Int32>(nThreads, 1);
return std::max<std::size_t>(nThreads, 1);
}();
return ThreadCount;

View file

@ -15,6 +15,7 @@
#include <comphelper/comphelperdllapi.h>
#include <mutex>
#include <condition_variable>
#include <cstddef>
#include <vector>
#include <memory>
@ -55,9 +56,9 @@ public:
/// limit to avoid spawning an unnecessarily
/// large number of threads on high-core boxes.
/// MAX_CONCURRENCY env. var. controls the cap.
static sal_Int32 getPreferredConcurrency();
static std::size_t getPreferredConcurrency();
ThreadPool( sal_Int32 nWorkers );
ThreadPool( std::size_t nWorkers );
~ThreadPool();
/// push a new task onto the work queue

View file

@ -208,6 +208,16 @@ make_unsigned(T value)
return value;
}
template<typename T1, typename T2> constexpr std::enable_if_t<std::is_unsigned_v<T1>, T1>
clamp_to_unsigned(T2 value) {
if constexpr (std::is_unsigned_v<T2>) {
return value <= std::numeric_limits<T1>::max() ? value : std::numeric_limits<T1>::max();
} else {
static_assert(std::is_signed_v<T2>);
return value < 0 ? 0 : clamp_to_unsigned<T1>(make_unsigned(value));
}
}
// An implicit conversion from T2 to T1, useful in places where an explicit conversion from T2 to
// T1 is needed (e.g., in list initialization, if the implicit conversion would be narrowing) but
// tools like -fsanitize=implicit-conversion should still be able to detect truncation:

View file

@ -25,6 +25,7 @@
#include "ByteChucker.hxx"
#include <comphelper/threadpool.hxx>
#include <cstddef>
#include <vector>
struct ZipEntry;
@ -85,7 +86,7 @@ private:
public:
void reduceScheduledThreadTasksToGivenNumberOrLess(
sal_Int32 nThreadTasks);
std::size_t nThreadTasks);
const std::shared_ptr<comphelper::ThreadTaskTag>& getThreadTaskTag() const { return mpThreadTaskTag; }
};

View file

@ -143,13 +143,13 @@ void ZipOutputStream::consumeFinishedScheduledThreadTaskEntries()
m_aEntries = aNonFinishedEntries;
}
void ZipOutputStream::reduceScheduledThreadTasksToGivenNumberOrLess(sal_Int32 nThreadTasks)
void ZipOutputStream::reduceScheduledThreadTasksToGivenNumberOrLess(std::size_t nThreadTasks)
{
while(static_cast< sal_Int32 >(m_aEntries.size()) > nThreadTasks)
while(m_aEntries.size() > nThreadTasks)
{
consumeFinishedScheduledThreadTaskEntries();
if(static_cast< sal_Int32 >(m_aEntries.size()) > nThreadTasks)
if(m_aEntries.size() > nThreadTasks)
{
osl::Thread::wait(std::chrono::microseconds(100));
}

View file

@ -57,6 +57,7 @@
#include <PackageConstants.hxx>
#include <algorithm>
#include <cstddef>
using namespace com::sun::star::packages::zip::ZipConstants;
using namespace com::sun::star::packages::zip;
@ -771,7 +772,7 @@ bool ZipPackageStream::saveChild(
// cores and allow 4-times the amount for having the queue well filled. The
// 2nd parameter is the time to wait between cleanups in 10th of a second.
// Both values may be added to the configuration settings if needed.
static sal_Int32 nAllowedTasks(comphelper::ThreadPool::getPreferredConcurrency() * 4);
static std::size_t nAllowedTasks(comphelper::ThreadPool::getPreferredConcurrency() * 4); //TODO: overflow
rZipOut.reduceScheduledThreadTasksToGivenNumberOrLess(nAllowedTasks);
// Start a new thread task deflating this zip entry