loolwsd: improved child spawning and management
Spare child processes are now in a separate container. A FIFO that gives older instances priority to avoid using instances too young to have initialized fully. In addition, spare instances are now proactively spawned such that there is at least a minimum number of spares at any given time. Change-Id: Ibdb206d88473adb306c274f4af39798c784258a0 Reviewed-on: https://gerrit.libreoffice.org/23647 Reviewed-by: Ashod Nakashian <ashnakash@gmail.com> Tested-by: Ashod Nakashian <ashnakash@gmail.com>
This commit is contained in:
parent
2ba9bddf9a
commit
cbabd6177d
1 changed files with 87 additions and 52 deletions
|
@ -136,39 +136,32 @@ namespace
|
|||
};
|
||||
|
||||
static std::map<Process::PID, std::shared_ptr<ChildProcess>> _childProcesses;
|
||||
static std::deque<std::shared_ptr<ChildProcess>> _newChildProcesses;
|
||||
|
||||
/// Looks up a child hosting a URL, or returns an empty one.
|
||||
/// Looks up a child hosting a URL, otherwise returns an empty one.
|
||||
/// If neither exist, then returns null.
|
||||
std::shared_ptr<ChildProcess> findChild(const std::string& url)
|
||||
{
|
||||
std::shared_ptr<ChildProcess> child;
|
||||
for (const auto& it : _childProcesses)
|
||||
{
|
||||
Log::trace() << "Child [" << it.second->getPid()
|
||||
<< "] url [" << url << "]." << Log::end;
|
||||
if (it.second->getUrl() == url)
|
||||
{
|
||||
return it.second;
|
||||
}
|
||||
|
||||
if (it.second->getUrl().empty())
|
||||
{
|
||||
// Empty one, but keep going, we might find ours.
|
||||
child = it.second;
|
||||
}
|
||||
}
|
||||
|
||||
return child;
|
||||
// Try an empty one.
|
||||
if (!_newChildProcesses.empty())
|
||||
{
|
||||
auto child = _newChildProcesses.front();
|
||||
_newChildProcesses.pop_front();
|
||||
return child;
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/// Looks up the pipe descriptor
|
||||
/// of a child. Returns -1 on error.
|
||||
int getChildPipe(const Process::PID pid)
|
||||
{
|
||||
const auto it = _childProcesses.find(pid);
|
||||
return (it != _childProcesses.end() ? it->second->getWritePipe() : -1);
|
||||
}
|
||||
|
||||
/// Removes a child process.
|
||||
/// Removes a used child process. New ones can't be removed.
|
||||
void removeChild(const Process::PID pid, const bool rude)
|
||||
{
|
||||
const auto it = _childProcesses.find(pid);
|
||||
|
@ -189,24 +182,35 @@ public:
|
|||
{
|
||||
}
|
||||
|
||||
bool createSession(const Process::PID pid, const std::string& session, const std::string& url)
|
||||
bool createSession(const std::shared_ptr<ChildProcess>& child, const std::string& session, const std::string& url)
|
||||
{
|
||||
const std::string message = "session " + session + " " + url + "\n";
|
||||
if (IoUtil::writeFIFO(getChildPipe(pid), message) < 0)
|
||||
const auto childPid = std::to_string(child->getPid());
|
||||
const auto childPipe = child->getWritePipe();
|
||||
if (IoUtil::writeFIFO(childPipe, message) < 0)
|
||||
{
|
||||
Log::error("Error sending session message to child [" + std::to_string(pid) + "].");
|
||||
Log::error("Error sending session message to child [" + childPid + "].");
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string response;
|
||||
if (_childPipeReader.readLine(response, [](){ return TerminationFlag; }) < 0)
|
||||
while (true)
|
||||
{
|
||||
Log::error("Error reading response to session message from child [" + std::to_string(pid) + "].");
|
||||
return false;
|
||||
}
|
||||
std::string response;
|
||||
if (_childPipeReader.readLine(response, [](){ return TerminationFlag; }) <= 0)
|
||||
{
|
||||
Log::error("Error reading response to session message from child [" + childPid + "].");
|
||||
return false;
|
||||
}
|
||||
|
||||
StringTokenizer tokens(response, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
|
||||
return (tokens.count() == 2 && tokens[0] == std::to_string(pid) && tokens[1] == "ok");
|
||||
StringTokenizer tokens(response, " ", StringTokenizer::TOK_IGNORE_EMPTY | StringTokenizer::TOK_TRIM);
|
||||
if (tokens.count() > 0 && tokens[0] != childPid)
|
||||
{
|
||||
// Not a response from the child in question.
|
||||
continue;
|
||||
}
|
||||
|
||||
return (tokens.count() == 2 && tokens[1] == "ok");
|
||||
}
|
||||
}
|
||||
|
||||
/// Sync ChildProcess instances with its child.
|
||||
|
@ -294,19 +298,15 @@ public:
|
|||
Log::debug("Found URL [" + url + "] hosted on child [" + childPid + "].");
|
||||
}
|
||||
|
||||
if (createSession(child->getPid(), session, url))
|
||||
if (createSession(child, session, url))
|
||||
{
|
||||
child->setUrl(url);
|
||||
_childProcesses[child->getPid()] = child;
|
||||
Log::debug("Child [" + childPid + "] now hosts [" + url + "] for session [" + session + "].");
|
||||
}
|
||||
else
|
||||
{
|
||||
Log::error("Error creating session [" + session + "] for URL [" + url + "] on child [" + childPid + "].");
|
||||
if (isEmptyChild)
|
||||
{
|
||||
// This is probably a child in bad state. Rid of it and create new.
|
||||
removeChild(child->getPid(), true);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -520,7 +520,7 @@ static int createLibreOfficeKit(const bool sharePages,
|
|||
|
||||
Log::info() << "Adding Kit #" << childCounter << ", PID: " << childPID << Log::end;
|
||||
|
||||
_childProcesses[childPID] = std::make_shared<ChildProcess>(childPID, -1, fifoWriter);
|
||||
_newChildProcesses.emplace_back(std::make_shared<ChildProcess>(childPID, -1, fifoWriter));
|
||||
return childPID;
|
||||
}
|
||||
|
||||
|
@ -751,7 +751,7 @@ int main(int argc, char** argv)
|
|||
Util::removeFile(childPath, true);
|
||||
}
|
||||
|
||||
pipeHandler.syncChildren();
|
||||
//pipeHandler.syncChildren();
|
||||
timeoutCounter = 0;
|
||||
}
|
||||
else if (pid < 0)
|
||||
|
@ -781,24 +781,43 @@ int main(int argc, char** argv)
|
|||
{
|
||||
std::lock_guard<std::mutex> lock(forkMutex);
|
||||
|
||||
const int empty = pipeHandler.syncChildren();
|
||||
const int total = _childProcesses.size();
|
||||
const auto childCount = _childProcesses.size();
|
||||
const int newChildCount = _newChildProcesses.size();
|
||||
|
||||
// Figure out how many children we need. Always create at least as many
|
||||
// as configured pre-spawn or one more than requested (whichever is larger).
|
||||
int spawn = std::max(static_cast<int>(forkCounter) + 1, numPreSpawnedChildren);
|
||||
Log::debug() << "Creating " << spawn << (spawn == 1 ? " child" : " children") << ". Current total: "
|
||||
<< total << ", Empty: " << empty << Log::end;
|
||||
do
|
||||
if (spawn > newChildCount)
|
||||
{
|
||||
if (createLibreOfficeKit(sharePages, childRoot, sysTemplate,
|
||||
loTemplate, loSubPath) < 0)
|
||||
Log::error("Error: fork failed.");
|
||||
}
|
||||
while (--spawn > 0);
|
||||
spawn -= newChildCount;
|
||||
Log::info() << "Creating " << spawn << " new child. Current total: "
|
||||
<< childCount << " + " << newChildCount << " (new) = "
|
||||
<< (childCount + newChildCount) << "." << Log::end;
|
||||
size_t newInstances = 0;
|
||||
do
|
||||
{
|
||||
if (createLibreOfficeKit(sharePages, childRoot, sysTemplate,
|
||||
loTemplate, loSubPath) < 0)
|
||||
{
|
||||
Log::error("Error: fork failed.");
|
||||
}
|
||||
else
|
||||
{
|
||||
++newInstances;
|
||||
}
|
||||
}
|
||||
while (--spawn > 0);
|
||||
|
||||
// We've done our best. If need more, retrying will bump the counter.
|
||||
forkCounter = 0;
|
||||
// We've done our best. If need more, retrying will bump the counter.
|
||||
forkCounter = (newInstances > forkCounter ? 0 : forkCounter - newInstances);
|
||||
}
|
||||
else
|
||||
{
|
||||
Log::info() << "Requested " << spawn << " new child. Current total: "
|
||||
<< childCount << " + " << newChildCount << " (new) = "
|
||||
<< (childCount + newChildCount) << ". Will not spawn yet." << Log::end;
|
||||
forkCounter = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (timeoutCounter++ == INTERVAL_PROBES)
|
||||
|
@ -809,14 +828,20 @@ int main(int argc, char** argv)
|
|||
}
|
||||
}
|
||||
|
||||
// Terminate child processes
|
||||
// Terminate child processes.
|
||||
for (auto& it : _childProcesses)
|
||||
{
|
||||
Log::info("Requesting child process " + std::to_string(it.first) + " to terminate.");
|
||||
Util::requestTermination(it.first);
|
||||
}
|
||||
|
||||
// Wait and kill child processes
|
||||
for (auto& it : _newChildProcesses)
|
||||
{
|
||||
Log::info("Requesting child process " + std::to_string(it->getPid()) + " to terminate.");
|
||||
Util::requestTermination(it->getPid());
|
||||
}
|
||||
|
||||
// Wait and kill child processes.
|
||||
for (auto& it : _childProcesses)
|
||||
{
|
||||
if (!waitForTerminationChild(it.first))
|
||||
|
@ -826,7 +851,17 @@ int main(int argc, char** argv)
|
|||
}
|
||||
}
|
||||
|
||||
for (auto& it : _newChildProcesses)
|
||||
{
|
||||
if (!waitForTerminationChild(it->getPid()))
|
||||
{
|
||||
Log::info("Forcing child process " + std::to_string(it->getPid()) + " to terminate.");
|
||||
Process::kill(it->getPid());
|
||||
}
|
||||
}
|
||||
|
||||
_childProcesses.clear();
|
||||
_newChildProcesses.clear();
|
||||
|
||||
pipeThread.join();
|
||||
close(writerNotify);
|
||||
|
|
Loading…
Reference in a new issue