4ec7128470
It's more robust to restart soffice after every file, even if it takes more time, overall it's better than having to prune all the files that are invalid or cause loops before running convwatch. Change-Id: I7f7155f71bb2522ae48182aa1b5ca61fc47ae4d5
455 lines
15 KiB
Python
455 lines
15 KiB
Python
# -*- tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4 -*-
|
|
#
|
|
# This file is part of the LibreOffice project.
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
#
|
|
|
|
import getopt
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import uuid
|
|
import datetime
|
|
import traceback
|
|
import threading
|
|
try:
|
|
from urllib.parse import quote
|
|
except ImportError:
|
|
from urllib import quote
|
|
|
|
try:
|
|
import pyuno
|
|
import uno
|
|
import unohelper
|
|
except ImportError:
|
|
print("pyuno not found: try to set PYTHONPATH and URE_BOOTSTRAP variables")
|
|
print("PYTHONPATH=/installation/opt/program")
|
|
print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
|
|
raise
|
|
|
|
try:
|
|
from com.sun.star.document import XDocumentEventListener
|
|
except ImportError:
|
|
print("UNO API class not found: try to set URE_BOOTSTRAP variable")
|
|
print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
|
|
raise
|
|
|
|
### utilities ###
|
|
|
|
def log(*args):
|
|
print(*args, flush=True)
|
|
|
|
def partition(list, pred):
|
|
left = []
|
|
right = []
|
|
for e in list:
|
|
if pred(e):
|
|
left.append(e)
|
|
else:
|
|
right.append(e)
|
|
return (left, right)
|
|
|
|
def filelist(dir, suffix):
|
|
if len(dir) == 0:
|
|
raise Exception("filelist: empty directory")
|
|
if not(dir[-1] == "/"):
|
|
dir += "/"
|
|
files = [dir + f for f in os.listdir(dir)]
|
|
# log(files)
|
|
return [f for f in files
|
|
if os.path.isfile(f) and os.path.splitext(f)[1] == suffix]
|
|
|
|
def getFiles(dirs, suffix):
|
|
files = []
|
|
for dir in dirs:
|
|
files += filelist(dir, suffix)
|
|
return files
|
|
|
|
### UNO utilities ###
|
|
|
|
class OfficeConnection:
|
|
def __init__(self, args):
|
|
self.args = args
|
|
self.soffice = None
|
|
self.socket = None
|
|
self.xContext = None
|
|
def setUp(self):
|
|
(method, sep, rest) = self.args["--soffice"].partition(":")
|
|
if sep != ":":
|
|
raise Exception("soffice parameter does not specify method")
|
|
if method == "path":
|
|
self.socket = "pipe,name=pytest" + str(uuid.uuid1())
|
|
try:
|
|
userdir = self.args["--userdir"]
|
|
except KeyError:
|
|
raise Exception("'path' method requires --userdir")
|
|
if not(userdir.startswith("file://")):
|
|
raise Exception("--userdir must be file URL")
|
|
self.soffice = self.bootstrap(rest, userdir, self.socket)
|
|
elif method == "connect":
|
|
self.socket = rest
|
|
else:
|
|
raise Exception("unsupported connection method: " + method)
|
|
self.xContext = self.connect(self.socket)
|
|
|
|
def bootstrap(self, soffice, userdir, socket):
|
|
argv = [ soffice, "--accept=" + socket + ";urp",
|
|
"-env:UserInstallation=" + userdir,
|
|
"--quickstart=no",
|
|
"--norestore", "--nologo", "--headless" ]
|
|
if "--valgrind" in self.args:
|
|
argv.append("--valgrind")
|
|
return subprocess.Popen(argv)
|
|
|
|
def connect(self, socket):
|
|
xLocalContext = uno.getComponentContext()
|
|
xUnoResolver = xLocalContext.ServiceManager.createInstanceWithContext(
|
|
"com.sun.star.bridge.UnoUrlResolver", xLocalContext)
|
|
url = "uno:" + socket + ";urp;StarOffice.ComponentContext"
|
|
log("OfficeConnection: connecting to: " + url)
|
|
while True:
|
|
try:
|
|
xContext = xUnoResolver.resolve(url)
|
|
return xContext
|
|
# except com.sun.star.connection.NoConnectException
|
|
except pyuno.getClass("com.sun.star.connection.NoConnectException"):
|
|
log("NoConnectException: sleeping...")
|
|
time.sleep(1)
|
|
|
|
def tearDown(self):
|
|
if self.soffice:
|
|
if self.xContext:
|
|
try:
|
|
log("tearDown: calling terminate()...")
|
|
xMgr = self.xContext.ServiceManager
|
|
xDesktop = xMgr.createInstanceWithContext(
|
|
"com.sun.star.frame.Desktop", self.xContext)
|
|
xDesktop.terminate()
|
|
log("...done")
|
|
# except com.sun.star.lang.DisposedException:
|
|
except pyuno.getClass("com.sun.star.beans.UnknownPropertyException"):
|
|
log("caught UnknownPropertyException")
|
|
pass # ignore, also means disposed
|
|
except pyuno.getClass("com.sun.star.lang.DisposedException"):
|
|
log("caught DisposedException")
|
|
pass # ignore
|
|
else:
|
|
self.soffice.terminate()
|
|
ret = self.soffice.wait()
|
|
self.xContext = None
|
|
self.socket = None
|
|
self.soffice = None
|
|
if ret != 0:
|
|
raise Exception("Exit status indicates failure: " + str(ret))
|
|
# return ret
|
|
|
|
class WatchDog(threading.Thread):
|
|
def __init__(self, connection):
|
|
threading.Thread.__init__(self, name="WatchDog " + connection.socket)
|
|
self.connection = connection
|
|
def run(self):
|
|
try:
|
|
if self.connection.soffice: # not possible for "connect"
|
|
self.connection.soffice.wait(timeout=120) # 2 minutes?
|
|
except subprocess.TimeoutExpired:
|
|
log("WatchDog: TIMEOUT -> killing soffice")
|
|
self.connection.soffice.terminate() # actually killing oosplash...
|
|
self.connection.xContext = None
|
|
log("WatchDog: killed soffice")
|
|
|
|
class PerTestConnection:
|
|
def __init__(self, args):
|
|
self.args = args
|
|
self.connection = None
|
|
self.watchdog = None
|
|
def getContext(self):
|
|
return self.connection.xContext
|
|
def setUp(self):
|
|
assert(not(self.connection))
|
|
def preTest(self):
|
|
conn = OfficeConnection(self.args)
|
|
conn.setUp()
|
|
self.connection = conn
|
|
self.watchdog = WatchDog(self.connection)
|
|
self.watchdog.start()
|
|
def postTest(self):
|
|
if self.connection:
|
|
try:
|
|
self.connection.tearDown()
|
|
finally:
|
|
self.connection = None
|
|
self.watchdog.join()
|
|
def tearDown(self):
|
|
assert(not(self.connection))
|
|
|
|
class PersistentConnection:
|
|
def __init__(self, args):
|
|
self.args = args
|
|
self.connection = None
|
|
def getContext(self):
|
|
return self.connection.xContext
|
|
def setUp(self):
|
|
conn = OfficeConnection(self.args)
|
|
conn.setUp()
|
|
self.connection = conn
|
|
def preTest(self):
|
|
assert(self.connection)
|
|
def postTest(self):
|
|
assert(self.connection)
|
|
def tearDown(self):
|
|
if self.connection:
|
|
try:
|
|
self.connection.tearDown()
|
|
finally:
|
|
self.connection = None
|
|
|
|
def simpleInvoke(connection, test):
|
|
try:
|
|
connection.preTest()
|
|
test.run(connection.getContext())
|
|
finally:
|
|
connection.postTest()
|
|
|
|
def retryInvoke(connection, test):
|
|
tries = 5
|
|
while tries > 0:
|
|
try:
|
|
tries -= 1
|
|
try:
|
|
connection.preTest()
|
|
test.run(connection.getContext())
|
|
return
|
|
finally:
|
|
connection.postTest()
|
|
except KeyboardInterrupt:
|
|
raise # Ctrl+C should work
|
|
except:
|
|
log("retryInvoke: caught exception")
|
|
raise Exception("FAILED retryInvoke")
|
|
|
|
def runConnectionTests(connection, invoker, tests):
|
|
try:
|
|
connection.setUp()
|
|
failed = []
|
|
for test in tests:
|
|
try:
|
|
invoker(connection, test)
|
|
except KeyboardInterrupt:
|
|
raise # Ctrl+C should work
|
|
except:
|
|
failed.append(test.file)
|
|
estr = traceback.format_exc()
|
|
log("... FAILED with exception:\n" + estr)
|
|
return failed
|
|
finally:
|
|
connection.tearDown()
|
|
|
|
class EventListener(XDocumentEventListener,unohelper.Base):
|
|
def __init__(self):
|
|
self.layoutFinished = False
|
|
def documentEventOccured(self, event):
|
|
# log(str(event.EventName))
|
|
if event.EventName == "OnLayoutFinished":
|
|
self.layoutFinished = True
|
|
def disposing(event):
|
|
pass
|
|
|
|
def mkPropertyValue(name, value):
|
|
return uno.createUnoStruct("com.sun.star.beans.PropertyValue",
|
|
name, 0, value, 0)
|
|
|
|
### tests ###
|
|
|
|
def loadFromURL(xContext, url):
|
|
xDesktop = xContext.ServiceManager.createInstanceWithContext(
|
|
"com.sun.star.frame.Desktop", xContext)
|
|
props = [("Hidden", True), ("ReadOnly", True)] # FilterName?
|
|
loadProps = tuple([mkPropertyValue(name, value) for (name, value) in props])
|
|
xListener = EventListener()
|
|
xGEB = xContext.getValueByName(
|
|
"/singletons/com.sun.star.frame.theGlobalEventBroadcaster")
|
|
xGEB.addDocumentEventListener(xListener)
|
|
xDoc = None
|
|
try:
|
|
xDoc = xDesktop.loadComponentFromURL(url, "_blank", 0, loadProps)
|
|
if xDoc is None:
|
|
raise Exception("No document loaded?")
|
|
time_ = 0
|
|
while time_ < 30:
|
|
if xListener.layoutFinished:
|
|
return xDoc
|
|
log("delaying...")
|
|
time_ += 1
|
|
time.sleep(1)
|
|
log("timeout: no OnLayoutFinished received")
|
|
return xDoc
|
|
except:
|
|
if xDoc:
|
|
log("CLOSING")
|
|
xDoc.close(True)
|
|
raise
|
|
finally:
|
|
if xListener:
|
|
xGEB.removeDocumentEventListener(xListener)
|
|
|
|
def printDoc(xContext, xDoc, url):
|
|
props = [ mkPropertyValue("FileName", url) ]
|
|
# xDoc.print(props)
|
|
uno.invoke(xDoc, "print", (tuple(props),)) # damn, that's a keyword!
|
|
busy = True
|
|
while busy:
|
|
log("printing...")
|
|
time.sleep(1)
|
|
prt = xDoc.getPrinter()
|
|
for value in prt:
|
|
if value.Name == "IsBusy":
|
|
busy = value.Value
|
|
log("...done printing")
|
|
|
|
class LoadPrintFileTest:
|
|
def __init__(self, file, prtsuffix):
|
|
self.file = file
|
|
self.prtsuffix = prtsuffix
|
|
def run(self, xContext):
|
|
start = datetime.datetime.now()
|
|
log("Time: " + str(start) + " Loading document: " + self.file)
|
|
xDoc = None
|
|
try:
|
|
url = "file://" + quote(self.file)
|
|
xDoc = loadFromURL(xContext, url)
|
|
printDoc(xContext, xDoc, url + self.prtsuffix)
|
|
finally:
|
|
if xDoc:
|
|
xDoc.close(True)
|
|
end = datetime.datetime.now()
|
|
log("...done with: " + self.file + " in: " + str(end - start))
|
|
|
|
def runLoadPrintFileTests(opts, dirs, suffix, reference):
|
|
if reference:
|
|
prtsuffix = ".pdf.reference"
|
|
else:
|
|
prtsuffix = ".pdf"
|
|
files = getFiles(dirs, suffix)
|
|
tests = (LoadPrintFileTest(file, prtsuffix) for file in files)
|
|
# connection = PersistentConnection(opts)
|
|
connection = PerTestConnection(opts)
|
|
failed = runConnectionTests(connection, simpleInvoke, tests)
|
|
print("all printed: FAILURES: " + str(len(failed)))
|
|
for fail in failed:
|
|
print(fail)
|
|
return failed
|
|
|
|
def mkImages(file, resolution):
|
|
argv = [ "gs", "-r" + resolution, "-sOutputFile=" + file + ".%04d.jpeg",
|
|
"-dNOPROMPT", "-dNOPAUSE", "-dBATCH", "-sDEVICE=jpeg", file ]
|
|
ret = subprocess.check_call(argv)
|
|
|
|
def mkAllImages(dirs, suffix, resolution, reference, failed):
|
|
if reference:
|
|
prtsuffix = ".pdf.reference"
|
|
else:
|
|
prtsuffix = ".pdf"
|
|
for dir in dirs:
|
|
files = filelist(dir, suffix)
|
|
log(files)
|
|
for f in files:
|
|
if f in failed:
|
|
log("Skipping failed: " + f)
|
|
else:
|
|
mkImages(f + prtsuffix, resolution)
|
|
|
|
def identify(imagefile):
|
|
argv = ["identify", "-format", "%k", imagefile]
|
|
process = subprocess.Popen(argv, stdout=subprocess.PIPE)
|
|
result, _ = process.communicate()
|
|
if process.wait() != 0:
|
|
raise Exception("identify failed")
|
|
if result.partition(b"\n")[0] != b"1":
|
|
log("identify result: " + result.decode('utf-8'))
|
|
log("DIFFERENCE in " + imagefile)
|
|
|
|
def compose(refimagefile, imagefile, diffimagefile):
|
|
argv = [ "composite", "-compose", "difference",
|
|
refimagefile, imagefile, diffimagefile ]
|
|
subprocess.check_call(argv)
|
|
|
|
def compareImages(file):
|
|
allimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
|
|
if f.startswith(file)]
|
|
# refimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
|
|
# if f.startswith(file + ".reference")]
|
|
# log("compareImages: allimages:" + str(allimages))
|
|
(refimages, images) = partition(sorted(allimages),
|
|
lambda f: f.startswith(file + ".pdf.reference"))
|
|
# log("compareImages: images" + str(images))
|
|
for (image, refimage) in zip(images, refimages):
|
|
compose(image, refimage, image + ".diff")
|
|
identify(image + ".diff")
|
|
if (len(images) != len(refimages)):
|
|
log("DIFFERENT NUMBER OF IMAGES FOR: " + file)
|
|
|
|
def compareAllImages(dirs, suffix):
|
|
log("compareAllImages...")
|
|
for dir in dirs:
|
|
files = filelist(dir, suffix)
|
|
# log("compareAllImages:" + str(files))
|
|
for f in files:
|
|
compareImages(f)
|
|
log("...compareAllImages done")
|
|
|
|
|
|
def parseArgs(argv):
|
|
(optlist,args) = getopt.getopt(argv[1:], "hr",
|
|
["help", "soffice=", "userdir=", "reference", "valgrind"])
|
|
# print optlist
|
|
return (dict(optlist), args)
|
|
|
|
def usage():
|
|
message = """usage: {program} [option]... [directory]..."
|
|
-h | --help: print usage information
|
|
-r | --reference: generate new reference files (otherwise: compare)
|
|
--soffice=method:location
|
|
specify soffice instance to connect to
|
|
supported methods: 'path', 'connect'
|
|
--userdir=URL specify user installation directory for 'path' method
|
|
--valgrind pass --valgrind to soffice for 'path' method"""
|
|
print(message.format(program = os.path.basename(sys.argv[0])))
|
|
|
|
def checkTools():
|
|
try:
|
|
subprocess.check_output(["gs", "--version"])
|
|
except:
|
|
print("Cannot execute 'gs'. Please install ghostscript.")
|
|
sys.exit(1)
|
|
try:
|
|
subprocess.check_output(["composite", "-version"])
|
|
subprocess.check_output(["identify", "-version"])
|
|
except:
|
|
print("Cannot execute 'composite' or 'identify'.")
|
|
print("Please install ImageMagick.")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
checkTools()
|
|
(opts,args) = parseArgs(sys.argv)
|
|
if len(args) == 0:
|
|
usage()
|
|
sys.exit(1)
|
|
if "-h" in opts or "--help" in opts:
|
|
usage()
|
|
sys.exit()
|
|
elif "--soffice" in opts:
|
|
reference = "-r" in opts or "--reference" in opts
|
|
failed = runLoadPrintFileTests(opts, args, ".odt", reference)
|
|
mkAllImages(args, ".odt", "200", reference, failed)
|
|
if not(reference):
|
|
compareAllImages(args, ".odt")
|
|
else:
|
|
usage()
|
|
sys.exit(1)
|
|
|
|
# vim:set shiftwidth=4 softtabstop=4 expandtab:
|