wsd: handle async upload callback

Now we can handle the response of the storage
after the async upload is complete (or timed out).

Change-Id: I29d450646bddb07f02bb17d257e7e0fa372ce357
Signed-off-by: Ashod Nakashian <ashod.nakashian@collabora.co.uk>
This commit is contained in:
Ashod Nakashian 2020-12-28 14:58:07 -05:00 committed by Ashod Nakashian
parent 39bb098601
commit 1d0c717d0a
3 changed files with 147 additions and 54 deletions

View file

@ -1112,33 +1112,36 @@ void DocumentBroker::uploadToStorageInternal(const std::string& sessionId, bool
_uploadRequest = Util::make_unique<UploadRequest>(uriAnonym, newFileModifiedTime, it->second,
isSaveAs, isRename);
const StorageBase::AsyncUpload asyncUp = _storage->uploadLocalFileToStorageAsync(
auth, it->second->getCookies(), *_lockCtx, saveAsPath, saveAsFilename, isRename);
switch (asyncUp.state())
StorageBase::AsyncUploadCallback asyncUploadCallback =
[this](const StorageBase::AsyncUpload& asyncUp)
{
case StorageBase::AsyncUpload::State::Running:
LOG_DBG("Async upload of [" << _docKey << "] is in progress.");
return;
case StorageBase::AsyncUpload::State::Success:
LOG_TRC("onAsyncUploadCallback");
switch (asyncUp.state())
{
LOG_DBG("Successfully uploaded [" << _docKey << "], processing results.");
const StorageBase::UploadResult& uploadResult = asyncUp.result();
return handleUploadToStorageResponse(uploadResult);
case StorageBase::AsyncUpload::State::Running:
LOG_DBG("Async upload of [" << _docKey << "] is in progress.");
return;
case StorageBase::AsyncUpload::State::Complete:
{
LOG_DBG("Successfully uploaded [" << _docKey << "], processing results.");
const StorageBase::UploadResult& uploadResult = asyncUp.result();
return handleUploadToStorageResponse(uploadResult);
}
case StorageBase::AsyncUpload::State::None: // Unexpected: fallback.
case StorageBase::AsyncUpload::State::Error:
default:
break;
}
case StorageBase::AsyncUpload::State::None: // Unexpected: fallback.
case StorageBase::AsyncUpload::State::Error:
default:
break;
}
//FIXME: flag the failure so we retry.
LOG_ERR("Failed to upload [" << _docKey << "] asynchronously.");
};
LOG_ERR("Failed to upload [" << _docKey
<< "] asynchronously, will fallback to synchronous uploading.");
const StorageBase::UploadResult& uploadResult = _storage->uploadLocalFileToStorage(
auth, it->second->getCookies(), *_lockCtx, saveAsPath, saveAsFilename, isRename);
return handleUploadToStorageResponse(uploadResult);
_storage->uploadLocalFileToStorageAsync(auth, it->second->getCookies(), *_lockCtx, saveAsPath,
saveAsFilename, isRename, *_poll, asyncUploadCallback);
}
void DocumentBroker::handleUploadToStorageResponse(const StorageBase::UploadResult& uploadResult)

View file

@ -1104,15 +1104,55 @@ std::string WopiStorage::downloadDocument(const Poco::URI& uriObject, const std:
return Poco::Path(getJailPath(), getFileInfo().getFilename()).toString();
}
StorageBase::UploadResult
WopiStorage::uploadLocalFileToStorage(const Authorization& auth, const std::string& cookies,
LockContext& lockCtx, const std::string& saveAsPath,
const std::string& saveAsFilename, const bool isRename)
/// A helper class to invoke the AsyncUploadCallback
/// when it exits its scope.
/// By default it invokes the callback with a failure state.
class ScopedInvokeAsyncUploadCallback
{
public:
ScopedInvokeAsyncUploadCallback(StorageBase::AsyncUploadCallback asyncUploadCallback)
: _asyncUploadCallback(std::move(asyncUploadCallback))
, _arg(StorageBase::AsyncUpload(
StorageBase::AsyncUpload::State::Error,
StorageBase::UploadResult(StorageBase::UploadResult::Result::FAILED)))
{
}
~ScopedInvokeAsyncUploadCallback()
{
if (_asyncUploadCallback)
_asyncUploadCallback(_arg);
}
/// Set a new callback argument.
void setArg(StorageBase::AsyncUpload arg) { _arg = std::move(arg); }
private:
StorageBase::AsyncUploadCallback _asyncUploadCallback;
StorageBase::AsyncUpload _arg;
};
void WopiStorage::uploadLocalFileToStorageAsync(const Authorization& auth,
const std::string& cookies, LockContext& lockCtx,
const std::string& saveAsPath,
const std::string& saveAsFilename,
const bool isRename, SocketPoll& socketPoll,
const AsyncUploadCallback& asyncUploadCallback)
{
ProfileZone profileZone("WopiStorage::uploadLocalFileToStorage", { {"url", _fileUrl} });
// TODO: Check if this URI has write permission (canWrite = true)
// Always invoke the callback with the result of the async upload.
ScopedInvokeAsyncUploadCallback scopedInvokeCallback(asyncUploadCallback);
//TODO: replace with state machine.
if (_uploadHttpSession)
{
LOG_ERR("Upload is already in progress.");
return;
}
const bool isSaveAs = !saveAsPath.empty() && !saveAsFilename.empty();
const std::string filePath(isSaveAs ? saveAsPath : getRootFilePath());
const std::string filePathAnonym = LOOLWSD::anonymizeUrl(filePath);
@ -1121,7 +1161,10 @@ WopiStorage::uploadLocalFileToStorage(const Authorization& auth, const std::stri
if (!fileStat.good())
{
LOG_ERR("Cannot access file [" << filePathAnonym << "] to upload to wopi storage.");
return UploadResult(UploadResult::Result::FAILED, "File not found.");
scopedInvokeCallback.setArg(
AsyncUpload(AsyncUpload::State::Error,
UploadResult(UploadResult::Result::FAILED, "File not found.")));
return;
}
const std::size_t size = (fileStat.good() ? fileStat.size() : 0);
@ -1139,7 +1182,7 @@ WopiStorage::uploadLocalFileToStorage(const Authorization& auth, const std::stri
const auto startTime = std::chrono::steady_clock::now();
try
{
std::shared_ptr<http::Session> httpSession = getHttpSession(uriObject);
_uploadHttpSession = getHttpSession(uriObject);
http::Request httpRequest = initHttpRequest(uriObject, auth, cookies);
httpRequest.setVerb(http::Request::VERB_POST);
@ -1217,22 +1260,43 @@ WopiStorage::uploadLocalFileToStorage(const Authorization& auth, const std::stri
httpRequest.setBodyFile(filePath);
http::Session::FinishedCallback finishedCallback =
[=](const std::shared_ptr<http::Session>& httpSession)
{
const std::shared_ptr<const http::Response> httpResponse = httpSession->response();
_wopiSaveDuration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - startTime);
WopiUploadDetails details = { filePathAnonym,
uriAnonym,
httpResponse->statusLine().reasonPhrase(),
httpResponse->statusLine().statusCode(),
size,
isSaveAs,
isRename };
// Handle the response.
const StorageBase::UploadResult res =
handleUploadToStorageResponse(details, httpResponse->getBody());
// Fire the callback to our client (DocBroker, typically).
asyncUploadCallback(AsyncUpload(AsyncUpload::State::Complete, res));
// Retire.
_uploadHttpSession.reset(); //FIXME: use a state machine.
};
_uploadHttpSession->setFinishedHandler(finishedCallback);
LOG_DBG("Async upload request: " << httpRequest.header().toString());
// Make the request.
const std::shared_ptr<const http::Response> httpResponse
= httpSession->syncRequest(httpRequest);
_uploadHttpSession->asyncRequest(httpRequest, socketPoll);
_wopiSaveDuration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - startTime);
WopiUploadDetails details = { filePathAnonym,
uriAnonym,
httpResponse->statusLine().reasonPhrase(),
httpResponse->statusLine().statusCode(),
size,
isSaveAs,
isRename };
return handleUploadToStorageResponse(details, httpResponse->getBody());
scopedInvokeCallback.setArg(
AsyncUpload(AsyncUpload::State::Running, UploadResult(UploadResult::Result::OK)));
return;
}
catch (const Poco::Exception& ex)
{
@ -1245,7 +1309,16 @@ WopiStorage::uploadLocalFileToStorage(const Authorization& auth, const std::stri
LOG_ERR("Cannot upload file to WOPI storage uri [" + uriAnonym + "]. Error: " << ex.what());
}
return UploadResult(UploadResult::Result::FAILED, "Internal error.");
scopedInvokeCallback.setArg(AsyncUpload(
AsyncUpload::State::Error, UploadResult(UploadResult::Result::FAILED, "Internal error.")));
}
StorageBase::UploadResult WopiStorage::uploadLocalFileToStorage(const Authorization&,
const std::string&, LockContext&,
const std::string&,
const std::string&, const bool)
{
return UploadResult(UploadResult::Result::FAILED);
}
StorageBase::UploadResult
@ -1295,8 +1368,8 @@ WopiStorage::handleUploadToStorageResponse(const WopiUploadDetails& details,
}
}
LOG_INF(wopiLog << " uploaded " << details.size << " bytes from ["
<< details.filePathAnonym << "] -> [" << details.uriAnonym
LOG_INF(wopiLog << " uploaded " << details.size << " bytes in " << _wopiSaveDuration
<< " from [" << details.filePathAnonym << "] -> [" << details.uriAnonym
<< "]: " << details.httpResponseCode << ' '
<< details.httpResponseReason << ": " << responseString);
}

View file

@ -165,7 +165,7 @@ public:
Complete //< The last async upload request completed (regardless of the server's response).
};
AsyncUpload(State state, UploadResult result = UploadResult(UploadResult::Result::FAILED))
AsyncUpload(State state, UploadResult result)
: _state(state)
, _result(std::move(result))
{
@ -178,7 +178,7 @@ public:
const UploadResult& result() const { return _result; }
private:
const State _state;
State _state;
UploadResult _result;
};
@ -280,25 +280,32 @@ public:
const std::string& saveAsFilename, const bool isRename)
= 0;
/// The asynchronous upload completion callback function.
using AsyncUploadCallback = std::function<void(const AsyncUpload&)>;
/// Writes the contents of the file back to the source asynchronously, if possible.
/// @param cookies A string representing key=value pairs that are set as cookies.
/// @param savedFile When the operation was saveAs, this is the path to the file that was saved.
virtual AsyncUpload
uploadLocalFileToStorageAsync(const Authorization& auth, const std::string& cookies,
LockContext& lockCtx, const std::string& saveAsPath,
const std::string& saveAsFilename, const bool isRename)
/// @param asyncUploadCallback Used to communicate the result back to the caller.
virtual void uploadLocalFileToStorageAsync(const Authorization& auth,
const std::string& cookies, LockContext& lockCtx,
const std::string& saveAsPath,
const std::string& saveAsFilename,
const bool isRename, SocketPoll&,
const AsyncUploadCallback& asyncUploadCallback)
{
// By default do a synchronous save.
const UploadResult res = uploadLocalFileToStorage(auth, cookies, lockCtx, saveAsPath,
saveAsFilename, isRename);
return AsyncUpload(AsyncUpload::State::Complete, res);
const UploadResult res =
uploadLocalFileToStorage(auth, cookies, lockCtx, saveAsPath, saveAsFilename, isRename);
if (asyncUploadCallback)
asyncUploadCallback(AsyncUpload(AsyncUpload::State::Complete, res));
}
/// Get the progress state of an asynchronous LocalFileToStorage upload.
virtual AsyncUpload queryLocalFileToStorageAsyncUploadState()
{
// Unsupported.
return AsyncUpload(AsyncUpload::State::None);
return AsyncUpload(AsyncUpload::State::None, UploadResult(UploadResult::Result::OK));
}
/// Cancels an active asynchronous LocalFileToStorage upload.
@ -570,6 +577,12 @@ public:
const std::string& saveAsFilename,
const bool isRename) override;
void uploadLocalFileToStorageAsync(const Authorization& auth, const std::string& cookies,
LockContext& lockCtx, const std::string& saveAsPath,
const std::string& saveAsFilename, const bool isRename,
SocketPoll& socketPoll,
const AsyncUploadCallback& asyncUploadCallback) override;
/// Total time taken for making WOPI calls during saving.
std::chrono::milliseconds getWopiSaveDuration() const { return _wopiSaveDuration; }
@ -610,6 +623,10 @@ private:
// Time spend in saving the file from storage
std::chrono::milliseconds _wopiSaveDuration;
/// The http::Session used for uploading asynchronously.
std::shared_ptr<http::Session> _uploadHttpSession;
/// Whether or not to re-use cookies from the browser for the WOPI requests.
bool _reuseCookies;
};