Post the message to the poll thread.

Change-Id: Ibd28090a420b5396b64fdfe676bef8cf06991116
This commit is contained in:
Jan Holesovsky 2018-05-11 19:15:16 +02:00
parent 93cc4b4548
commit 6c31b7793c

View file

@ -753,6 +753,7 @@ public:
const std::string& docId, const std::string& docId,
const std::string& url, const std::string& url,
std::shared_ptr<TileQueue> tileQueue, std::shared_ptr<TileQueue> tileQueue,
SocketPoll& socketPoll,
const std::shared_ptr<WebSocketHandler>& websocketHandler) const std::shared_ptr<WebSocketHandler>& websocketHandler)
: _loKit(loKit), : _loKit(loKit),
_jailId(jailId), _jailId(jailId),
@ -760,6 +761,7 @@ public:
_docId(docId), _docId(docId),
_url(url), _url(url),
_tileQueue(std::move(tileQueue)), _tileQueue(std::move(tileQueue)),
_socketPoll(socketPoll),
_websocketHandler(websocketHandler), _websocketHandler(websocketHandler),
_docPassword(""), _docPassword(""),
_haveDocPassword(false), _haveDocPassword(false),
@ -794,6 +796,20 @@ public:
const std::string& getUrl() const { return _url; } const std::string& getUrl() const { return _url; }
/// Post the message in the correct thread.
bool postMessage(const std::shared_ptr<std::vector<char>>& message, const WSOpCode code) const
{
LOG_TRC("postMessage called with: " << getAbbreviatedMessage(message->data(), message->size()));
if (!_websocketHandler)
{
LOG_ERR("Child Doc: Bad socket while sending [" << getAbbreviatedMessage(message->data(), message->size()) << "].");
return false;
}
_socketPoll.addCallback([=] { _websocketHandler->sendMessage(message->data(), message->size(), code); });
return true;
}
bool createSession(const std::string& sessionId) bool createSession(const std::string& sessionId)
{ {
std::unique_lock<std::mutex> lock(_mutex); std::unique_lock<std::mutex> lock(_mutex);
@ -908,9 +924,8 @@ public:
LOG_INF("setDocumentPassword returned"); LOG_INF("setDocumentPassword returned");
} }
void renderTile(const std::vector<std::string>& tokens, const std::shared_ptr<WebSocketHandler>& websocketHandler) void renderTile(const std::vector<std::string>& tokens)
{ {
assert(websocketHandler && "Expected a non-null websocket.");
TileDesc tile = TileDesc::parse(tokens); TileDesc tile = TileDesc::parse(tokens);
size_t pixmapDataSize = 4 * tile.getWidth() * tile.getHeight(); size_t pixmapDataSize = 4 * tile.getWidth() * tile.getHeight();
@ -964,12 +979,12 @@ public:
if (_docWatermark) if (_docWatermark)
_docWatermark->blending(pixmap.data(), 0, 0, pixelWidth, pixelHeight, pixelWidth, pixelHeight, mode); _docWatermark->blending(pixmap.data(), 0, 0, pixelWidth, pixelHeight, pixelWidth, pixelHeight, mode);
std::vector<char> output; std::shared_ptr<std::vector<char>> output = std::make_shared<std::vector<char>>();
output.reserve(response.size() + pixmapDataSize); output->reserve(response.size() + pixmapDataSize);
output.resize(response.size()); output->resize(response.size());
std::memcpy(output.data(), response.data(), response.size()); std::memcpy(output->data(), response.data(), response.size());
if (!_pngCache.encodeBufferToPNG(pixmap.data(), tile.getWidth(), tile.getHeight(), output, mode, hash, wid, oldWireId)) if (!_pngCache.encodeBufferToPNG(pixmap.data(), tile.getWidth(), tile.getHeight(), *output, mode, hash, wid, oldWireId))
{ {
//FIXME: Return error. //FIXME: Return error.
//sendTextFrame("error: cmd=tile kind=failure"); //sendTextFrame("error: cmd=tile kind=failure");
@ -978,13 +993,12 @@ public:
return; return;
} }
LOG_TRC("Sending render-tile response (" << output.size() << " bytes) for: " << response); LOG_TRC("Sending render-tile response (" << output->size() << " bytes) for: " << response);
websocketHandler->sendMessage(output.data(), output.size(), WSOpCode::Binary); postMessage(output, WSOpCode::Binary);
} }
void renderCombinedTiles(const std::vector<std::string>& tokens, const std::shared_ptr<WebSocketHandler>& websocketHandler) void renderCombinedTiles(const std::vector<std::string>& tokens)
{ {
assert(websocketHandler && "Expected a non-null websocket.");
TileCombined tileCombined = TileCombined::parse(tokens); TileCombined tileCombined = TileCombined::parse(tokens);
auto& tiles = tileCombined.getTiles(); auto& tiles = tileCombined.getTiles();
@ -1103,12 +1117,12 @@ public:
const auto tileMsg = ADD_DEBUG_RENDERID(tileCombined.serialize("tilecombine:")) + "\n"; const auto tileMsg = ADD_DEBUG_RENDERID(tileCombined.serialize("tilecombine:")) + "\n";
LOG_TRC("Sending back painted tiles for " << tileMsg); LOG_TRC("Sending back painted tiles for " << tileMsg);
std::vector<char> response; std::shared_ptr<std::vector<char>> response = std::make_shared<std::vector<char>>();
response.resize(tileMsg.size() + output.size()); response->resize(tileMsg.size() + output.size());
std::copy(tileMsg.begin(), tileMsg.end(), response.begin()); std::copy(tileMsg.begin(), tileMsg.end(), response->begin());
std::copy(output.begin(), output.end(), response.begin() + tileMsg.size()); std::copy(output.begin(), output.end(), response->begin() + tileMsg.size());
websocketHandler->sendMessage(response.data(), response.size(), WSOpCode::Binary); postMessage(response, WSOpCode::Binary);
} }
bool sendTextFrame(const std::string& message) bool sendTextFrame(const std::string& message)
@ -1120,14 +1134,11 @@ public:
{ {
try try
{ {
if (!_websocketHandler) std::shared_ptr<std::vector<char>> message = std::make_shared<std::vector<char>>();
{ message->resize(length);
LOG_ERR("Child Doc: Bad socket while sending [" << getAbbreviatedMessage(buffer, length) << "]."); std::memcpy(message->data(), buffer, length);
return false;
}
_websocketHandler->sendMessage(buffer, length, opCode); return postMessage(message, opCode);
return true;
} }
catch (const Exception& exc) catch (const Exception& exc)
{ {
@ -1826,11 +1837,11 @@ private:
if (tokens[0] == "tile") if (tokens[0] == "tile")
{ {
renderTile(tokens, _websocketHandler); renderTile(tokens);
} }
else if (tokens[0] == "tilecombine") else if (tokens[0] == "tilecombine")
{ {
renderCombinedTiles(tokens, _websocketHandler); renderCombinedTiles(tokens);
} }
else if (LOOLProtocol::getFirstToken(tokens[0], '-') == "child") else if (LOOLProtocol::getFirstToken(tokens[0], '-') == "child")
{ {
@ -1953,6 +1964,7 @@ private:
std::shared_ptr<lok::Document> _loKitDocument; std::shared_ptr<lok::Document> _loKitDocument;
std::shared_ptr<TileQueue> _tileQueue; std::shared_ptr<TileQueue> _tileQueue;
SocketPoll& _socketPoll;
std::shared_ptr<WebSocketHandler> _websocketHandler; std::shared_ptr<WebSocketHandler> _websocketHandler;
PngCache _pngCache; PngCache _pngCache;
@ -1995,14 +2007,16 @@ class KitWebSocketHandler final : public WebSocketHandler, public std::enable_sh
std::string _socketName; std::string _socketName;
std::shared_ptr<lok::Office> _loKit; std::shared_ptr<lok::Office> _loKit;
std::string _jailId; std::string _jailId;
SocketPoll& _socketPoll;
public: public:
KitWebSocketHandler(const std::string& socketName, const std::shared_ptr<lok::Office>& loKit, const std::string& jailId) : KitWebSocketHandler(const std::string& socketName, const std::shared_ptr<lok::Office>& loKit, const std::string& jailId, SocketPoll& socketPoll) :
WebSocketHandler(/* isClient = */ true), WebSocketHandler(/* isClient = */ true),
_queue(std::make_shared<TileQueue>()), _queue(std::make_shared<TileQueue>()),
_socketName(socketName), _socketName(socketName),
_loKit(loKit), _loKit(loKit),
_jailId(jailId) _jailId(jailId),
_socketPoll(socketPoll)
{ {
} }
@ -2038,7 +2052,7 @@ protected:
if (!document) if (!document)
{ {
document = std::make_shared<Document>(_loKit, _jailId, docKey, docId, url, _queue, shared_from_this()); document = std::make_shared<Document>(_loKit, _jailId, docKey, docId, url, _queue, _socketPoll, shared_from_this());
} }
// Validate and create session. // Validate and create session.
@ -2336,7 +2350,7 @@ void lokit_main(const std::string& childRoot,
SocketPoll mainKit("kit"); SocketPoll mainKit("kit");
mainKit.insertNewWebSocketSync(uri, std::make_shared<KitWebSocketHandler>("child_ws_" + pid, loKit, jailId)); mainKit.insertNewWebSocketSync(uri, std::make_shared<KitWebSocketHandler>("child_ws_" + pid, loKit, jailId, mainKit));
LOG_INF("New kit client websocket inserted."); LOG_INF("New kit client websocket inserted.");
while (!TerminationFlag) while (!TerminationFlag)