First cut at moving unzipping into new thread:

XBufferedThreadedStream class buffers data in a new thread,
which will be available to be read from parent thread.

Change-Id: I62d367fa1dec23da39aba24b5c765b57707956bb
Reviewed-on: https://gerrit.libreoffice.org/38135
Tested-by: Jenkins <ci@libreoffice.org>
Reviewed-by: Michael Meeks <michael.meeks@collabora.com>
This commit is contained in:
Mohammed Abdul Azeem 2017-05-27 13:17:04 +05:30 committed by Michael Meeks
parent d21a675d3a
commit 0632208977
5 changed files with 373 additions and 34 deletions

View file

@ -31,6 +31,7 @@ $(eval $(call gb_Library_use_libraries,package2,\
sal \
sax \
ucbhelper \
salhelper \
))
$(eval $(call gb_Library_use_externals,package2,\
@ -51,6 +52,7 @@ $(eval $(call gb_Library_add_exception_objects,package2,\
package/source/zipapi/Deflater \
package/source/zipapi/Inflater \
package/source/zipapi/sha1context \
package/source/zipapi/XBufferedThreadedStream \
package/source/zipapi/XUnbufferedStream \
package/source/zipapi/ZipEnumeration \
package/source/zipapi/ZipFile \

View file

@ -27,19 +27,55 @@ namespace
public:
PackageTest() {}
virtual void setUp() override;
virtual bool load(const OUString &,
const OUString &rURL, const OUString &,
SfxFilterFlags, SotClipboardFormatId, unsigned int) override;
void test();
void testThreadedStreams();
void testBufferedThreadedStreams();
CPPUNIT_TEST_SUITE(PackageTest);
CPPUNIT_TEST(test);
CPPUNIT_TEST(testThreadedStreams);
CPPUNIT_TEST(testBufferedThreadedStreams);
CPPUNIT_TEST_SUITE_END();
private:
uno::Reference<container::XNameAccess> mxNA;
void verifyStreams( std::vector<std::vector<char>> &aBuffers );
};
void PackageTest::setUp()
{
BootstrapFixtureBase::setUp();
OUString aURL = m_directories.getURLFromSrc("/package/qa/cppunit/data/a2z.zip");
uno::Sequence<beans::NamedValue> aNVs(2);
aNVs[0].Name = "URL";
aNVs[0].Value <<= aURL;
aNVs[1].Name = "UseBufferedStream";
aNVs[1].Value <<= true;
uno::Sequence<uno::Any> aArgs(1);
aArgs[0] <<= aNVs;
uno::Reference<uno::XComponentContext> xCxt = comphelper::getProcessComponentContext();
uno::Reference<lang::XMultiComponentFactory> xSvcMgr = xCxt->getServiceManager();
uno::Reference<packages::zip::XZipFileAccess2> xZip(
xSvcMgr->createInstanceWithArgumentsAndContext(
"com.sun.star.packages.zip.ZipFileAccess", aArgs, xCxt),
uno::UNO_QUERY);
CPPUNIT_ASSERT(xZip.is());
mxNA = uno::Reference<container::XNameAccess>(xZip, uno::UNO_QUERY);
CPPUNIT_ASSERT(mxNA.is());
}
bool PackageTest::load(const OUString &,
const OUString &rURL, const OUString &,
SfxFilterFlags, SotClipboardFormatId, unsigned int)
@ -62,6 +98,20 @@ namespace
m_directories.getURLFromSrc("/package/qa/cppunit/data/"));
}
void PackageTest::verifyStreams( std::vector<std::vector<char>> &aBuffers )
{
CPPUNIT_ASSERT_EQUAL(size_t(26), aBuffers.size());
auto itBuf = aBuffers.begin();
for (char c = 'a'; c <= 'z'; ++c, ++itBuf)
{
const std::vector<char>& rBuf = *itBuf;
CPPUNIT_ASSERT_EQUAL(size_t(1048576), rBuf.size()); // 1 MB each.
for (char check : rBuf)
CPPUNIT_ASSERT_EQUAL(c, check);
}
}
// TODO : This test currently doesn't fail even when you set
// UseBufferedStream to false. Look into this and replace it with a better
// test that actually fails when the aforementioned flag is set to false.
@ -95,30 +145,6 @@ namespace
}
};
OUString aURL = m_directories.getURLFromSrc("/package/qa/cppunit/data/a2z.zip");
uno::Sequence<beans::NamedValue> aNVs(2);
aNVs[0].Name = "URL";
aNVs[0].Value <<= aURL;
aNVs[1].Name = "UseBufferedStream";
aNVs[1].Value <<= true;
uno::Sequence<uno::Any> aArgs(1);
aArgs[0] <<= aNVs;
uno::Reference<uno::XComponentContext> xCxt = comphelper::getProcessComponentContext();
uno::Reference<lang::XMultiComponentFactory> xSvcMgr = xCxt->getServiceManager();
uno::Reference<packages::zip::XZipFileAccess2> xZip(
xSvcMgr->createInstanceWithArgumentsAndContext(
"com.sun.star.packages.zip.ZipFileAccess", aArgs, xCxt),
uno::UNO_QUERY);
CPPUNIT_ASSERT(xZip.is());
uno::Reference<container::XNameAccess> xNA(xZip, uno::UNO_QUERY);
CPPUNIT_ASSERT(xNA.is());
{
comphelper::ThreadPool aPool(4);
std::shared_ptr<comphelper::ThreadTaskTag> pTag = comphelper::ThreadPool::createThreadTaskTag();
@ -132,26 +158,50 @@ namespace
aName += ".txt";
uno::Reference<io::XInputStream> xStrm;
xNA->getByName(aName) >>= xStrm;
mxNA->getByName(aName) >>= xStrm;
CPPUNIT_ASSERT(xStrm.is());
aPool.pushTask(new Worker(pTag, xStrm, *itBuf));
}
aPool.waitUntilDone(pTag);
verifyStreams( aTestBuffers );
}
}
// Verify the streams.
CPPUNIT_ASSERT_EQUAL(size_t(26), aTestBuffers.size());
itBuf = aTestBuffers.begin();
void PackageTest::testBufferedThreadedStreams()
{
std::vector<std::vector<char>> aTestBuffers(26);
auto itBuf = aTestBuffers.begin();
sal_Int32 nReadSize = 0;
for (char c = 'a'; c <= 'z'; ++c, ++itBuf)
for (char c = 'a'; c <= 'z'; ++c, ++itBuf)
{
OUString aName(c);
aName += ".txt";
uno::Reference<io::XInputStream> xStrm;
//Size of each stream is 1mb (>10000) => XBufferedThreadedStream
mxNA->getByName(aName) >>= xStrm;
CPPUNIT_ASSERT(xStrm.is());
sal_Int32 nSize = xStrm->available();
uno::Sequence<sal_Int8> aBytes;
//Read chuncks of increasing size
nReadSize += 1024;
while (nSize > 0)
{
const std::vector<char>& rBuf = *itBuf;
CPPUNIT_ASSERT_EQUAL(size_t(1048576), rBuf.size()); // 1 MB each.
for (char check : rBuf)
CPPUNIT_ASSERT_EQUAL(c, check);
sal_Int32 nBytesRead = xStrm->readBytes(aBytes, nReadSize);
const sal_Int8* p = aBytes.getArray();
const sal_Int8* pEnd = p + nBytesRead;
std::copy(p, pEnd, std::back_inserter(*itBuf));
nSize -= nBytesRead;
}
}
verifyStreams( aTestBuffers );
}
CPPUNIT_TEST_SUITE_REGISTRATION(PackageTest);

View file

@ -0,0 +1,200 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
* This file is part of the LibreOffice project.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#include <XBufferedThreadedStream.hxx>
#include <com/sun/star/packages/zip/ZipIOException.hpp>
using namespace css::uno;
using com::sun::star::packages::zip::ZipIOException;
namespace {
class UnzippingThread: public salhelper::Thread
{
XBufferedThreadedStream &mxStream;
public:
explicit UnzippingThread(XBufferedThreadedStream &xStream): Thread("Unzipping"), mxStream(xStream) {}
private:
virtual void execute() override
{
try
{
mxStream.produce();
}
catch( const RuntimeException &e )
{
SAL_WARN("package", "RuntimeException from unbuffered Stream " << e.Message );
mxStream.saveException( new RuntimeException( e ) );
}
catch( const ZipIOException &e )
{
SAL_WARN("package", "ZipIOException from unbuffered Stream " << e.Message );
mxStream.saveException( new ZipIOException( e ) );
}
catch( const Exception &e )
{
SAL_WARN("package", "Unexpected exception " << e.Message );
mxStream.saveException( new Exception( e ) );
}
mxStream.setTerminateThread();
}
};
}
XBufferedThreadedStream::XBufferedThreadedStream(
const Reference<XInputStream>& xSrcStream )
: mxSrcStream( xSrcStream )
, mnPos(0)
, mnStreamSize( xSrcStream->available() )
, mnOffset( 0 )
, mxUnzippingThread( new UnzippingThread(*this) )
, mbTerminateThread( false )
, maSavedException( nullptr )
{
mxUnzippingThread->launch();
}
XBufferedThreadedStream::~XBufferedThreadedStream()
{
setTerminateThread();
mxUnzippingThread->join();
}
/**
* Reads from UnbufferedStream in a seperate thread and stores the buffer blocks
* in maPendingBuffers queue for further use.
*/
void XBufferedThreadedStream::produce()
{
Buffer pProducedBuffer;
std::unique_lock<std::mutex> aGuard( maBufferProtector );
do
{
if( !maUsedBuffers.empty() )
{
pProducedBuffer = maUsedBuffers.front();
maUsedBuffers.pop();
}
aGuard.unlock();
mxSrcStream->readBytes( pProducedBuffer, nBufferSize );
aGuard.lock();
maPendingBuffers.push( pProducedBuffer );
maBufferConsumeResume.notify_one();
maBufferProduceResume.wait( aGuard, [&]{return canProduce(); } );
if( mbTerminateThread )
break;
} while( hasBytes() );
}
/**
* Fetches next available block from maPendingBuffers for use in Reading thread.
*/
const Buffer& XBufferedThreadedStream::getNextBlock()
{
const sal_Int32 nBufSize = maInUseBuffer.getLength();
if( nBufSize <= 0 || mnOffset >= nBufSize )
{
std::unique_lock<std::mutex> aGuard( maBufferProtector );
if( mnOffset >= nBufSize )
maUsedBuffers.push( maInUseBuffer );
maBufferConsumeResume.wait( aGuard, [&]{return canConsume(); } );
if( maPendingBuffers.empty() )
{
maInUseBuffer = Buffer();
if( maSavedException )
throw *maSavedException;
}
else
{
maInUseBuffer = maPendingBuffers.front();
maPendingBuffers.pop();
mnOffset = 0;
if( maPendingBuffers.size() <= nBufferLowWater )
maBufferProduceResume.notify_one();
}
}
return maInUseBuffer;
}
void XBufferedThreadedStream::setTerminateThread()
{
mbTerminateThread = true;
maBufferProduceResume.notify_one();
maBufferConsumeResume.notify_one();
}
sal_Int32 SAL_CALL XBufferedThreadedStream::readBytes( Sequence< sal_Int8 >& rData, sal_Int32 nBytesToRead )
{
if( !hasBytes() )
return 0;
const sal_Int32 nAvailableSize = std::min<sal_Int32>( nBytesToRead, remainingSize() );
rData.realloc( nAvailableSize );
sal_Int32 i = 0, nPendingBytes = nAvailableSize;
while( nPendingBytes )
{
const Buffer &pBuffer = getNextBlock();
if( pBuffer.getLength() <= 0 )
{
rData.realloc( nAvailableSize - nPendingBytes );
return nAvailableSize - nPendingBytes;
}
const sal_Int32 limit = std::min<sal_Int32>( nPendingBytes, pBuffer.getLength() - mnOffset );
memcpy( &rData[i], &pBuffer[mnOffset], limit );
nPendingBytes -= limit;
mnOffset += limit;
mnPos += limit;
i += limit;
}
return nAvailableSize;
}
sal_Int32 SAL_CALL XBufferedThreadedStream::readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead )
{
return readBytes( aData, nMaxBytesToRead );
}
void SAL_CALL XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip )
{
if( nBytesToSkip )
{
Sequence < sal_Int8 > aSequence( nBytesToSkip );
readBytes( aSequence, nBytesToSkip );
}
}
sal_Int32 SAL_CALL XBufferedThreadedStream::available()
{
if( !hasBytes() )
return 0;
return remainingSize();
}
void SAL_CALL XBufferedThreadedStream::closeInput()
{
setTerminateThread();
mxUnzippingThread->join();
mxSrcStream->closeInput();
}
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

View file

@ -0,0 +1,79 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
* This file is part of the LibreOffice project.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#ifndef INCLUDED_PACKAGE_SOURCE_ZIPAPI_XBUFFEREDTHREADEDSTREAM_HXX
#define INCLUDED_PACKAGE_SOURCE_ZIPAPI_XBUFFEREDTHREADEDSTREAM_HXX
#include <salhelper/thread.hxx>
#include <XUnbufferedStream.hxx>
#include <queue>
#include <vector>
#include <mutex>
#include <condition_variable>
typedef css::uno::Sequence< sal_Int8 > Buffer;
class XBufferedThreadedStream : public cppu::WeakImplHelper< css::io::XInputStream >
{
private:
const css::uno::Reference<XInputStream> mxSrcStream;
size_t mnPos; /// position in stream
size_t mnStreamSize; /// available size of stream
Buffer maInUseBuffer; /// Buffer block in use
int mnOffset; /// position in maInUseBuffer
std::queue < Buffer > maPendingBuffers; /// Buffers that are available for use
std::queue < Buffer > maUsedBuffers;
rtl::Reference< salhelper::Thread > mxUnzippingThread;
std::mutex maBufferProtector; /// mutex protecting Buffer queues.
std::condition_variable maBufferConsumeResume;
std::condition_variable maBufferProduceResume;
bool mbTerminateThread; /// indicates the failure of one of the threads
css::uno::Exception *maSavedException; /// exception caught during unzipping is saved to be thrown during reading
static const size_t nBufferLowWater = 2;
static const size_t nBufferHighWater = 4;
static const size_t nBufferSize = 32 * 1024;
const Buffer& getNextBlock();
size_t remainingSize() const { return mnStreamSize - mnPos; }
bool hasBytes() const { return mnPos < mnStreamSize; }
bool canProduce() const
{
return( mbTerminateThread || maPendingBuffers.size() < nBufferHighWater );
}
bool canConsume() const
{
return( mbTerminateThread || !maPendingBuffers.empty() );
}
public:
XBufferedThreadedStream(
const css::uno::Reference<XInputStream>& xSrcStream );
virtual ~XBufferedThreadedStream() override;
void produce();
void setTerminateThread();
void saveException( css::uno::Exception *e ) { maSavedException = e; }
// XInputStream
virtual sal_Int32 SAL_CALL readBytes( css::uno::Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead ) override;
virtual sal_Int32 SAL_CALL readSomeBytes( css::uno::Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead ) override;
virtual void SAL_CALL skipBytes( sal_Int32 nBytesToSkip ) override;
virtual sal_Int32 SAL_CALL available( ) override;
virtual void SAL_CALL closeInput( ) override;
};
#endif
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

View file

@ -44,6 +44,7 @@
#include <ZipFile.hxx>
#include <ZipEnumeration.hxx>
#include <XUnbufferedStream.hxx>
#include <XBufferedThreadedStream.hxx>
#include <PackageConstants.hxx>
#include <EncryptedDataHeader.hxx>
#include <EncryptionData.hxx>
@ -625,7 +626,14 @@ uno::Reference< XInputStream > ZipFile::createStreamForZipEntry(
if (!mbUseBufferedStream)
return xSrcStream;
uno::Reference<io::XInputStream> xBufStream(new XBufferedStream(xSrcStream));
uno::Reference<io::XInputStream> xBufStream;
static const sal_Int32 nThreadingThreshold = 10000;
if( xSrcStream->available() > nThreadingThreshold )
xBufStream = new XBufferedThreadedStream(xSrcStream);
else
xBufStream = new XBufferedStream(xSrcStream);
return xBufStream;
}