b45f4b8f93
Change-Id: I71f1421debf31b8c6cd0fd75c0cb45e2563b8a5a Signed-off-by: Michael Meeks <michael.meeks@collabora.com>
432 lines
14 KiB
C++
432 lines
14 KiB
C++
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4; fill-column: 100 -*- */
|
|
/*
|
|
* Copyright the Collabora Online contributors.
|
|
*
|
|
* SPDX-License-Identifier: MPL-2.0
|
|
*
|
|
* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <math.h>
|
|
#include <chrono>
|
|
#include <cstring>
|
|
#include <unordered_map>
|
|
|
|
#include "Socket.hpp"
|
|
#include "WebSocketHandler.hpp"
|
|
#include <net/Ssl.hpp>
|
|
#if ENABLE_SSL
|
|
# include <SslSocket.hpp>
|
|
#endif
|
|
|
|
#include <TraceFile.hpp>
|
|
#include <wsd/TileDesc.hpp>
|
|
|
|
// store buckets of latency
|
|
struct Histogram {
|
|
const size_t incLowMs = 10;
|
|
const size_t maxLowMs = incLowMs * 10;
|
|
const size_t incHighMs = 100;
|
|
const size_t maxHighMs = incHighMs * 10;
|
|
size_t _items;
|
|
size_t _tooLong;
|
|
std::vector<size_t> _buckets;
|
|
|
|
Histogram() : _items(0), _tooLong(0), _buckets(20)
|
|
{
|
|
}
|
|
|
|
void addTime(size_t ms)
|
|
{
|
|
if (ms < maxLowMs)
|
|
_buckets[ms/incLowMs]++;
|
|
else if (ms < maxHighMs)
|
|
_buckets[(ms - maxLowMs) / maxHighMs]++;
|
|
else
|
|
_tooLong++;
|
|
_items++;
|
|
}
|
|
|
|
void dump(const char *legend)
|
|
{
|
|
size_t max = 0;
|
|
ssize_t firstBucket = -1;
|
|
for (size_t i = 0; i < _buckets.size(); ++i)
|
|
{
|
|
size_t n = _buckets[i];
|
|
if (n > 0 && firstBucket < 0)
|
|
firstBucket = i;
|
|
max = std::max(max, n);
|
|
}
|
|
if (firstBucket < 0 || max == 0)
|
|
return;
|
|
|
|
size_t last; // ignore
|
|
for (last = _buckets.size()-1; last > 0; --last)
|
|
if (_buckets[last] > 0)
|
|
break;
|
|
|
|
std::cout << legend << " " << _items << " items, max #: " << max << " too long: " << _tooLong << "\n";
|
|
|
|
const double chrsPerFreq = 60.0 / max;
|
|
for (size_t i = firstBucket; i <= last; ++i)
|
|
{
|
|
int chrs = ::ceil(chrsPerFreq * _buckets[i]);
|
|
int ms = i < 10 ? (incLowMs * (i+1)) : (maxLowMs + (i+1-10) * incHighMs);
|
|
std::cout << "< " << std::setw(4) << ms << " ms |" << std::string(chrs, '-') << "| " << _buckets[i] << "\n";
|
|
}
|
|
}
|
|
};
|
|
|
|
struct Stats {
|
|
Stats() :
|
|
_start(std::chrono::steady_clock::now()),
|
|
_bytesSent(0),
|
|
_bytesRecvd(0),
|
|
_tileCount(0),
|
|
_connections(0)
|
|
{
|
|
}
|
|
std::chrono::steady_clock::time_point _start;
|
|
size_t _bytesSent;
|
|
size_t _bytesRecvd;
|
|
size_t _tileCount;
|
|
size_t _connections;
|
|
Histogram _pingLatency;
|
|
Histogram _tileLatency;
|
|
|
|
// message size breakdown
|
|
struct MessageStat {
|
|
size_t size;
|
|
size_t count;
|
|
};
|
|
std::unordered_map<std::string, MessageStat> _recvd;
|
|
std::unordered_map<std::string, MessageStat> _sent;
|
|
|
|
void accumulate(std::unordered_map<std::string, MessageStat> &map,
|
|
const std::string token, size_t size)
|
|
{
|
|
auto it = map.find(token);
|
|
MessageStat st = { 0, 0 };
|
|
if (it != map.end())
|
|
st = it->second;
|
|
st.size += size;
|
|
st.count++;
|
|
map[token] = st;
|
|
}
|
|
|
|
void accumulateRecv(const std::string &token, size_t size)
|
|
{
|
|
_bytesRecvd += size;
|
|
accumulate(_recvd, token, size);
|
|
}
|
|
|
|
void accumulateSend(const char* msg, const size_t len, bool /* flush */)
|
|
{
|
|
_bytesSent += len;
|
|
size_t i;
|
|
for (i = 0; i < len && msg[i] != ' '; ++i);
|
|
accumulate(_sent, std::string(msg, std::min(i, size_t(len))), len);
|
|
}
|
|
|
|
void addConnection() { _connections++; }
|
|
|
|
void dumpMap(std::unordered_map<std::string, MessageStat> &map)
|
|
{
|
|
// how much from each command ?
|
|
std::vector<std::string> sortKeys;
|
|
size_t total = 0;
|
|
for(const auto& it : map)
|
|
{
|
|
sortKeys.push_back(it.first);
|
|
total += it.second.size;
|
|
}
|
|
std::sort(sortKeys.begin(), sortKeys.end(),
|
|
[&](const std::string &a, const std::string &b)
|
|
{ return map[a].size > map[b].size; } );
|
|
std::cout << "size\tcount\tcommand\n";
|
|
for (const auto& it : sortKeys)
|
|
{
|
|
std::cout << map[it].size << "\t"
|
|
<< map[it].count << "\t" << it << "\n";
|
|
if (map[it].size < (total / 100))
|
|
break;
|
|
}
|
|
}
|
|
|
|
void dump()
|
|
{
|
|
const auto now = std::chrono::steady_clock::now();
|
|
const size_t runMs = std::chrono::duration_cast<std::chrono::milliseconds>(now - _start).count();
|
|
std::cout << "Stress run took " << runMs << " ms\n";
|
|
std::cout << " tiles: " << _tileCount << " => TPS: " << ((_tileCount * 1000.0)/runMs) << "\n";
|
|
_pingLatency.dump("ping latency:");
|
|
_tileLatency.dump("tile latency:");
|
|
size_t recvKbps = (_bytesRecvd * 1000) / (_connections * runMs * 1024);
|
|
size_t sentKbps = (_bytesSent * 1000) / (_connections * runMs * 1024);
|
|
std::cout << " we sent " << Util::getHumanizedBytes(_bytesSent) <<
|
|
" (" << sentKbps << " kB/s) " <<
|
|
" server sent " << Util::getHumanizedBytes(_bytesRecvd) <<
|
|
" (" << recvKbps << " kB/s) to " << _connections << " connections.\n";
|
|
|
|
std::cout << "we sent:\n";
|
|
dumpMap(_sent);
|
|
|
|
std::cout << "server sent us:\n";
|
|
dumpMap(_recvd);
|
|
}
|
|
};
|
|
|
|
// Avoid a MessageHandler for now.
|
|
class StressSocketHandler : public WebSocketHandler
|
|
{
|
|
SocketPoll &_poll;
|
|
TraceFileReader _reader;
|
|
TraceFileRecord _next;
|
|
std::chrono::steady_clock::time_point _start;
|
|
std::chrono::steady_clock::time_point _nextPing;
|
|
bool _connecting;
|
|
std::string _logPre;
|
|
std::string _uri;
|
|
std::string _trace;
|
|
|
|
std::shared_ptr<Stats> _stats;
|
|
std::chrono::steady_clock::time_point _lastTile;
|
|
|
|
public:
|
|
StressSocketHandler(SocketPoll &poll, /* bad style */
|
|
const std::shared_ptr<Stats> stats,
|
|
const std::string &uri, const std::string &trace,
|
|
const int delayMs = 0) :
|
|
WebSocketHandler(true, true),
|
|
_poll(poll),
|
|
_reader(trace),
|
|
_connecting(true),
|
|
_uri(uri),
|
|
_trace(trace),
|
|
_stats(stats)
|
|
{
|
|
assert(_stats && "stats must be provided");
|
|
|
|
static std::atomic<int> number;
|
|
_logPre = "[" + std::to_string(++number) + "] ";
|
|
std::cerr << "Attempt connect to " << uri << " for trace " << _trace << "\n";
|
|
getNextRecord();
|
|
_start = std::chrono::steady_clock::now() + std::chrono::milliseconds(delayMs);
|
|
_nextPing = _start + std::chrono::milliseconds(Util::rng::getNext() % 1000);
|
|
_lastTile = _start;
|
|
}
|
|
|
|
void gotPing(WSOpCode /* code */, int pingTimeUs) override
|
|
{
|
|
_stats->_pingLatency.addTime(pingTimeUs/1000);
|
|
}
|
|
|
|
int getPollEvents(std::chrono::steady_clock::time_point now,
|
|
int64_t &timeoutMaxMicroS) override
|
|
{
|
|
if (_connecting)
|
|
{
|
|
std::cerr << _logPre << "Waiting for outbound connection to " << _uri <<
|
|
" to complete for trace " << _trace << "\n";
|
|
return POLLOUT;
|
|
}
|
|
|
|
int events = WebSocketHandler::getPollEvents(now, timeoutMaxMicroS);
|
|
|
|
if (now >= _nextPing)
|
|
{
|
|
// ping more frequently
|
|
sendPing(now, getSocket().lock());
|
|
_nextPing += std::chrono::seconds(1);
|
|
}
|
|
|
|
int64_t nextTime = -1;
|
|
while (nextTime <= 0) {
|
|
nextTime = std::chrono::duration_cast<std::chrono::microseconds>(
|
|
std::chrono::microseconds((_next.getTimestampUs() - _reader.getEpochStart()) * TRACE_MULTIPLIER)
|
|
+ _start - now).count();
|
|
if (nextTime <= 0)
|
|
{
|
|
sendTraceMessage();
|
|
events = WebSocketHandler::getPollEvents(now, timeoutMaxMicroS);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// std::cerr << "next event in " << nextTime << " us\n";
|
|
if (nextTime < timeoutMaxMicroS)
|
|
timeoutMaxMicroS = nextTime;
|
|
|
|
return events;
|
|
}
|
|
|
|
bool getNextRecord()
|
|
{
|
|
bool found = false;
|
|
while (!found) {
|
|
_next = _reader.getNextRecord();
|
|
switch (_next.getDir()) {
|
|
case TraceFileRecord::Direction::Invalid:
|
|
case TraceFileRecord::Direction::Incoming:
|
|
// FIXME: need to subset output quite a bit.
|
|
found = true;
|
|
break;
|
|
default:
|
|
found = false;
|
|
break;
|
|
}
|
|
}
|
|
return _next.getDir () != TraceFileRecord::Direction::Invalid;
|
|
}
|
|
|
|
void performWrites(std::size_t capacity) override
|
|
{
|
|
if (_connecting)
|
|
std::cerr << _logPre << "Outbound websocket - connected\n";
|
|
_connecting = false;
|
|
return WebSocketHandler::performWrites(capacity);
|
|
}
|
|
|
|
void onDisconnect() override
|
|
{
|
|
std::cerr << _logPre << "Websocket " << _uri <<
|
|
" dis-connected, re-trying in 20 seconds\n";
|
|
WebSocketHandler::onDisconnect();
|
|
}
|
|
|
|
// send outgoing messages
|
|
void sendTraceMessage()
|
|
{
|
|
if (_next.getDir() == TraceFileRecord::Direction::Invalid)
|
|
return; // shutting down
|
|
|
|
std::string msg = rewriteMessage(_next.getPayload());
|
|
if (!msg.empty())
|
|
{
|
|
std::cerr << _logPre << "Send: '" << msg << "'\n";
|
|
sendMessage(msg);
|
|
}
|
|
|
|
if (!getNextRecord())
|
|
{
|
|
std::cerr << _logPre << "Shutdown\n";
|
|
shutdown();
|
|
}
|
|
}
|
|
|
|
std::string rewriteMessage(const std::string &msg)
|
|
{
|
|
const std::string firstLine = COOLProtocol::getFirstLine(msg);
|
|
StringVector tokens = StringVector::tokenize(firstLine);
|
|
|
|
std::string out = msg;
|
|
|
|
if (tokens.equals(0, "tileprocessed"))
|
|
out = ""; // we do this accurately below
|
|
|
|
else if (tokens.equals(0, "load")) {
|
|
std::string url = tokens[1];
|
|
assert(!strncmp(url.c_str(), "url=", 4));
|
|
|
|
// load url=file%3A%2F%2F%2Ftmp%2Fhello-world.odt deviceFormFactor=desktop
|
|
out = "load url=" + _uri; // already encoded
|
|
for (size_t i = 2; i < tokens.size(); ++i)
|
|
out += " " + tokens[i];
|
|
std::cerr << _logPre << "msg " << out << "\n";
|
|
}
|
|
|
|
// FIXME: translate mouse events relative to view-port etc.
|
|
return out;
|
|
}
|
|
|
|
// handle incoming messages
|
|
void handleMessage(const std::vector<char> &data) override
|
|
{
|
|
const auto now = std::chrono::steady_clock::now();
|
|
|
|
const std::string firstLine = COOLProtocol::getFirstLine(data.data(), data.size());
|
|
StringVector tokens = StringVector::tokenize(firstLine);
|
|
std::cerr << _logPre << "Got msg: " << firstLine << "\n";
|
|
|
|
_stats->accumulateRecv(tokens[0], data.size());
|
|
|
|
if (tokens.equals(0, "tile:")) {
|
|
// accumulate latencies
|
|
_stats->_tileLatency.addTime(std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastTile).count());
|
|
_stats->_tileCount++;
|
|
_lastTile = now;
|
|
|
|
// eg. tileprocessed tile=0:9216:0:3072:3072:0
|
|
TileDesc desc = TileDesc::parse(tokens);
|
|
|
|
sendMessage("tileprocessed tile=" + desc.generateID());
|
|
std::cerr << _logPre << "Sent tileprocessed tile= " + desc.generateID() << "\n";
|
|
} if (tokens.equals(0, "error:")) {
|
|
|
|
bool reconnect = false;
|
|
if (firstLine == "error: cmd=load kind=docunloading")
|
|
{
|
|
std::cerr << ": wait and try again later ...!\n";
|
|
reconnect = true;
|
|
}
|
|
else if (firstLine == "error: cmd=storage kind=documentconflict")
|
|
{
|
|
std::cerr << "Document conflict - need to resolve it first ...\n";
|
|
sendMessage("closedocument");
|
|
reconnect = true;
|
|
}
|
|
else
|
|
{
|
|
std::cerr << _logPre << "Error while processing " << _uri
|
|
<< " and trace " << _trace << ":\n"
|
|
<< "'" << firstLine << "'\n";
|
|
}
|
|
|
|
if (reconnect)
|
|
{
|
|
shutdown(true, "bye");
|
|
auto handler = std::make_shared<StressSocketHandler>(
|
|
_poll, _stats, _uri, _trace, 1000 /* delay 1 second */);
|
|
_poll.insertNewWebSocketSync(Poco::URI(_uri), handler);
|
|
return;
|
|
}
|
|
else
|
|
Util::forcedExit(EX_SOFTWARE);
|
|
}
|
|
|
|
// FIXME: implement code to send new view-ports based
|
|
// on cursor position etc.
|
|
}
|
|
|
|
/// override ProtocolHandlerInterface piece
|
|
int sendTextMessage(const char* msg, const size_t len, bool flush = false) const override
|
|
{
|
|
_stats->accumulateSend(msg, len, flush);
|
|
return WebSocketHandler::sendTextMessage(msg, len, flush);
|
|
}
|
|
|
|
static void addPollFor(SocketPoll &poll, const std::string &server,
|
|
const std::string &filePath, const std::string &tracePath,
|
|
const std::shared_ptr<Stats> &optStats)
|
|
{
|
|
assert(optStats && "optStats must be provided");
|
|
|
|
std::string file, wrap;
|
|
std::string fileabs = Poco::Path(filePath).makeAbsolute().toString();
|
|
Poco::URI::encode("file://" + fileabs, ":/?", file);
|
|
Poco::URI::encode(file, ":/?", wrap); // double encode.
|
|
std::string uri = server + "/cool/" + wrap + "/ws";
|
|
|
|
auto handler = std::make_shared<StressSocketHandler>(poll, optStats, file, tracePath);
|
|
poll.insertNewWebSocketSync(Poco::URI(uri), handler);
|
|
|
|
optStats->addConnection();
|
|
}
|
|
};
|
|
|
|
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
|