9471fce06e
Change-Id: I44e546defb278bb5217ed028dcaebc9fb2d23f0b Reviewed-on: https://gerrit.libreoffice.org/c/core/+/171020 Tested-by: Jenkins Tested-by: Ilmari Lauhakangas <ilmari.lauhakangas@libreoffice.org> Reviewed-by: Ilmari Lauhakangas <ilmari.lauhakangas@libreoffice.org>
464 lines
15 KiB
Python
464 lines
15 KiB
Python
# -*- tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4 -*-
|
|
#
|
|
# This file is part of the LibreOffice project.
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
#
|
|
# Conversion watch, initially intended to detect if document layout changed since the last time it was run.
|
|
#
|
|
# Print a set of docs, compare the pdf against the old run and highlight the differences
|
|
#
|
|
|
|
import getopt
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import uuid
|
|
import datetime
|
|
import traceback
|
|
import threading
|
|
try:
|
|
from urllib.parse import quote
|
|
except ImportError:
|
|
from urllib import quote
|
|
|
|
try:
|
|
import pyuno
|
|
import uno
|
|
import unohelper
|
|
except ImportError:
|
|
print("pyuno not found: try to set PYTHONPATH and URE_BOOTSTRAP variables")
|
|
print("PYTHONPATH=/installation/opt/program")
|
|
print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
|
|
raise
|
|
|
|
try:
|
|
from com.sun.star.document import XDocumentEventListener
|
|
except ImportError:
|
|
print("UNO API class not found: try to set URE_BOOTSTRAP variable")
|
|
print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
|
|
raise
|
|
|
|
### utilities ###
|
|
|
|
def log(*args):
|
|
print(*args, flush=True)
|
|
|
|
def partition(list, pred):
|
|
left = []
|
|
right = []
|
|
for e in list:
|
|
if pred(e):
|
|
left.append(e)
|
|
else:
|
|
right.append(e)
|
|
return (left, right)
|
|
|
|
def filelist(dir, suffix):
|
|
if len(dir) == 0:
|
|
raise Exception("filelist: empty directory")
|
|
if not(dir[-1] == "/"):
|
|
dir += "/"
|
|
files = [dir + f for f in os.listdir(dir)]
|
|
# log(files)
|
|
return [f for f in files
|
|
if os.path.isfile(f) and os.path.splitext(f)[1] == suffix]
|
|
|
|
def getFiles(dirs, suffix):
|
|
files = []
|
|
for dir in dirs:
|
|
files += filelist(dir, suffix)
|
|
return files
|
|
|
|
### UNO utilities ###
|
|
|
|
class OfficeConnection:
|
|
def __init__(self, args):
|
|
self.args = args
|
|
self.soffice = None
|
|
self.socket = None
|
|
self.xContext = None
|
|
def setUp(self):
|
|
(method, sep, rest) = self.args["--soffice"].partition(":")
|
|
if sep != ":":
|
|
raise Exception("soffice parameter does not specify method")
|
|
if method == "path":
|
|
self.socket = "pipe,name=pytest" + str(uuid.uuid1())
|
|
try:
|
|
userdir = self.args["--userdir"]
|
|
except KeyError:
|
|
raise Exception("'path' method requires --userdir")
|
|
if not(userdir.startswith("file://")):
|
|
raise Exception("--userdir must be file URL")
|
|
self.soffice = self.bootstrap(rest, userdir, self.socket)
|
|
elif method == "connect":
|
|
self.socket = rest
|
|
else:
|
|
raise Exception("unsupported connection method: " + method)
|
|
self.xContext = self.connect(self.socket)
|
|
|
|
def bootstrap(self, soffice, userdir, socket):
|
|
argv = [ soffice, "--accept=" + socket + ";urp",
|
|
"-env:UserInstallation=" + userdir,
|
|
"--quickstart=no",
|
|
"--norestore", "--nologo", "--headless" ]
|
|
if "--valgrind" in self.args:
|
|
argv.append("--valgrind")
|
|
return subprocess.Popen(argv)
|
|
|
|
def connect(self, socket):
|
|
xLocalContext = uno.getComponentContext()
|
|
xUnoResolver = xLocalContext.ServiceManager.createInstanceWithContext(
|
|
"com.sun.star.bridge.UnoUrlResolver", xLocalContext)
|
|
url = "uno:" + socket + ";urp;StarOffice.ComponentContext"
|
|
log("OfficeConnection: connecting to: " + url)
|
|
while True:
|
|
try:
|
|
xContext = xUnoResolver.resolve(url)
|
|
return xContext
|
|
# except com.sun.star.connection.NoConnectException
|
|
except pyuno.getClass("com.sun.star.connection.NoConnectException"):
|
|
log("NoConnectException: sleeping...")
|
|
time.sleep(1)
|
|
|
|
def tearDown(self):
|
|
if self.soffice:
|
|
if self.xContext:
|
|
try:
|
|
log("tearDown: calling terminate()...")
|
|
xMgr = self.xContext.ServiceManager
|
|
xDesktop = xMgr.createInstanceWithContext(
|
|
"com.sun.star.frame.Desktop", self.xContext)
|
|
xDesktop.terminate()
|
|
log("...done")
|
|
# except com.sun.star.lang.DisposedException:
|
|
except pyuno.getClass("com.sun.star.beans.UnknownPropertyException"):
|
|
log("caught UnknownPropertyException")
|
|
pass # ignore, also means disposed
|
|
except pyuno.getClass("com.sun.star.lang.DisposedException"):
|
|
log("caught DisposedException")
|
|
pass # ignore
|
|
else:
|
|
self.soffice.terminate()
|
|
ret = self.soffice.wait()
|
|
self.xContext = None
|
|
self.socket = None
|
|
self.soffice = None
|
|
if ret != 0:
|
|
raise Exception("Exit status indicates failure: " + str(ret))
|
|
# return ret
|
|
|
|
class WatchDog(threading.Thread):
|
|
def __init__(self, connection):
|
|
threading.Thread.__init__(self, name="WatchDog " + connection.socket)
|
|
self.connection = connection
|
|
def run(self):
|
|
try:
|
|
if self.connection.soffice: # not possible for "connect"
|
|
self.connection.soffice.wait(timeout=120) # 2 minutes?
|
|
except subprocess.TimeoutExpired:
|
|
log("WatchDog: TIMEOUT -> killing soffice")
|
|
self.connection.soffice.terminate() # actually killing oosplash...
|
|
self.connection.xContext = None
|
|
log("WatchDog: killed soffice")
|
|
|
|
class PerTestConnection:
|
|
def __init__(self, args):
|
|
self.args = args
|
|
self.connection = None
|
|
self.watchdog = None
|
|
def getContext(self):
|
|
return self.connection.xContext
|
|
def setUp(self):
|
|
assert(not(self.connection))
|
|
def preTest(self):
|
|
conn = OfficeConnection(self.args)
|
|
conn.setUp()
|
|
self.connection = conn
|
|
self.watchdog = WatchDog(self.connection)
|
|
self.watchdog.start()
|
|
def postTest(self):
|
|
if self.connection:
|
|
try:
|
|
self.connection.tearDown()
|
|
finally:
|
|
self.connection = None
|
|
self.watchdog.join()
|
|
def tearDown(self):
|
|
assert(not(self.connection))
|
|
|
|
class PersistentConnection:
|
|
def __init__(self, args):
|
|
self.args = args
|
|
self.connection = None
|
|
def getContext(self):
|
|
return self.connection.xContext
|
|
def setUp(self):
|
|
conn = OfficeConnection(self.args)
|
|
conn.setUp()
|
|
self.connection = conn
|
|
def preTest(self):
|
|
assert(self.connection)
|
|
def postTest(self):
|
|
assert(self.connection)
|
|
def tearDown(self):
|
|
if self.connection:
|
|
try:
|
|
self.connection.tearDown()
|
|
finally:
|
|
self.connection = None
|
|
|
|
def simpleInvoke(connection, test):
|
|
try:
|
|
connection.preTest()
|
|
test.run(connection.getContext())
|
|
finally:
|
|
connection.postTest()
|
|
|
|
def retryInvoke(connection, test):
|
|
tries = 5
|
|
while tries > 0:
|
|
try:
|
|
tries -= 1
|
|
try:
|
|
connection.preTest()
|
|
test.run(connection.getContext())
|
|
return
|
|
finally:
|
|
connection.postTest()
|
|
except KeyboardInterrupt:
|
|
raise # Ctrl+C should work
|
|
except Exception:
|
|
log("retryInvoke: caught exception")
|
|
raise Exception("FAILED retryInvoke")
|
|
|
|
def runConnectionTests(connection, invoker, tests):
|
|
try:
|
|
connection.setUp()
|
|
failed = []
|
|
for test in tests:
|
|
try:
|
|
invoker(connection, test)
|
|
except KeyboardInterrupt:
|
|
raise # Ctrl+C should work
|
|
except Exception:
|
|
failed.append(test.file)
|
|
estr = traceback.format_exc()
|
|
log("... FAILED with exception:\n" + estr)
|
|
return failed
|
|
finally:
|
|
connection.tearDown()
|
|
|
|
class EventListener(XDocumentEventListener,unohelper.Base):
|
|
def __init__(self):
|
|
self.layoutFinished = False
|
|
def documentEventOccured(self, event):
|
|
# log(str(event.EventName))
|
|
if event.EventName == "OnLayoutFinished":
|
|
self.layoutFinished = True
|
|
def disposing(event):
|
|
pass
|
|
|
|
def mkPropertyValue(name, value):
|
|
return uno.createUnoStruct("com.sun.star.beans.PropertyValue",
|
|
name, 0, value, 0)
|
|
|
|
### tests ###
|
|
|
|
def loadFromURL(xContext, url):
|
|
xDesktop = xContext.ServiceManager.createInstanceWithContext(
|
|
"com.sun.star.frame.Desktop", xContext)
|
|
props = [("Hidden", True), ("ReadOnly", True)] # FilterName?
|
|
loadProps = tuple([mkPropertyValue(name, value) for (name, value) in props])
|
|
xListener = EventListener()
|
|
xGEB = xContext.getValueByName(
|
|
"/singletons/com.sun.star.frame.theGlobalEventBroadcaster")
|
|
xGEB.addDocumentEventListener(xListener)
|
|
xDoc = None
|
|
try:
|
|
xDoc = xDesktop.loadComponentFromURL(url, "_blank", 0, loadProps)
|
|
log("...loadComponentFromURL done")
|
|
if xDoc is None:
|
|
raise Exception("No document loaded?")
|
|
time_ = 0
|
|
while time_ < 30:
|
|
if xListener.layoutFinished:
|
|
return xDoc
|
|
log("delaying...")
|
|
time_ += 1
|
|
time.sleep(1)
|
|
log("timeout: no OnLayoutFinished received")
|
|
return xDoc
|
|
except Exception:
|
|
if xDoc:
|
|
log("CLOSING")
|
|
xDoc.close(True)
|
|
raise
|
|
finally:
|
|
if xListener:
|
|
xGEB.removeDocumentEventListener(xListener)
|
|
|
|
def printDoc(xContext, xDoc, url):
|
|
props = [ mkPropertyValue("FileName", url) ]
|
|
# xDoc.print(props)
|
|
uno.invoke(xDoc, "print", (tuple(props),)) # damn, that's a keyword!
|
|
busy = True
|
|
while busy:
|
|
log("printing...")
|
|
time.sleep(1)
|
|
prt = xDoc.getPrinter()
|
|
for value in prt:
|
|
if value.Name == "IsBusy":
|
|
busy = value.Value
|
|
log("...done printing")
|
|
|
|
class LoadPrintFileTest:
|
|
def __init__(self, file, prtsuffix):
|
|
self.file = file
|
|
self.prtsuffix = prtsuffix
|
|
def run(self, xContext):
|
|
start = datetime.datetime.now()
|
|
log("Time: " + str(start) + " Loading document: " + self.file)
|
|
xDoc = None
|
|
try:
|
|
if os.name == 'nt' and self.file[1] == ':':
|
|
url = "file:///" + self.file[0:2] + quote(self.file[2:])
|
|
else:
|
|
url = "file://" + quote(self.file)
|
|
xDoc = loadFromURL(xContext, url)
|
|
log("loadFromURL in: " + str(datetime.datetime.now() - start))
|
|
printDoc(xContext, xDoc, url + self.prtsuffix)
|
|
finally:
|
|
if xDoc:
|
|
xDoc.close(True)
|
|
end = datetime.datetime.now()
|
|
log("...done with: " + self.file + " in: " + str(end - start))
|
|
|
|
def runLoadPrintFileTests(opts, dirs, suffix, reference):
|
|
if reference:
|
|
prtsuffix = ".pdf.reference"
|
|
else:
|
|
prtsuffix = ".pdf"
|
|
files = getFiles(dirs, suffix)
|
|
tests = (LoadPrintFileTest(file, prtsuffix) for file in files)
|
|
# connection = PersistentConnection(opts)
|
|
connection = PerTestConnection(opts)
|
|
failed = runConnectionTests(connection, simpleInvoke, tests)
|
|
print("all printed: FAILURES: " + str(len(failed)))
|
|
for fail in failed:
|
|
print(fail)
|
|
return failed
|
|
|
|
def mkImages(file, resolution):
|
|
argv = [ "gs", "-r" + resolution, "-sOutputFile=" + file + ".%04d.jpeg",
|
|
"-dNOPROMPT", "-dNOPAUSE", "-dBATCH", "-sDEVICE=jpeg", file ]
|
|
subprocess.check_call(argv)
|
|
|
|
def mkAllImages(dirs, suffix, resolution, reference, failed):
|
|
if reference:
|
|
prtsuffix = ".pdf.reference"
|
|
else:
|
|
prtsuffix = ".pdf"
|
|
for dir in dirs:
|
|
files = filelist(dir, suffix)
|
|
log(files)
|
|
for f in files:
|
|
if f in failed:
|
|
log("Skipping failed: " + f)
|
|
else:
|
|
mkImages(f + prtsuffix, resolution)
|
|
|
|
def identify(imagefile):
|
|
argv = ["identify", "-format", "%k", imagefile]
|
|
process = subprocess.Popen(argv, stdout=subprocess.PIPE)
|
|
result, _ = process.communicate()
|
|
if process.wait() != 0:
|
|
raise Exception("identify failed")
|
|
if result.partition(b"\n")[0] != b"1":
|
|
log("identify result: " + result.decode('utf-8'))
|
|
log("DIFFERENCE in " + imagefile)
|
|
|
|
def compose(refimagefile, imagefile, diffimagefile):
|
|
argv = [ "composite", "-compose", "difference",
|
|
refimagefile, imagefile, diffimagefile ]
|
|
subprocess.check_call(argv)
|
|
|
|
def compareImages(file):
|
|
allimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
|
|
if f.startswith(file)]
|
|
# refimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
|
|
# if f.startswith(file + ".reference")]
|
|
# log("compareImages: allimages:" + str(allimages))
|
|
(refimages, images) = partition(sorted(allimages),
|
|
lambda f: f.startswith(file + ".pdf.reference"))
|
|
# log("compareImages: images" + str(images))
|
|
for (image, refimage) in zip(images, refimages):
|
|
compose(image, refimage, image + ".diff")
|
|
identify(image + ".diff")
|
|
if (len(images) != len(refimages)):
|
|
log("DIFFERENT NUMBER OF IMAGES FOR: " + file)
|
|
|
|
def compareAllImages(dirs, suffix):
|
|
log("compareAllImages...")
|
|
for dir in dirs:
|
|
files = filelist(dir, suffix)
|
|
# log("compareAllImages:" + str(files))
|
|
for f in files:
|
|
compareImages(f)
|
|
log("...compareAllImages done")
|
|
|
|
|
|
def parseArgs(argv):
|
|
(optlist,args) = getopt.getopt(argv[1:], "hr",
|
|
["help", "soffice=", "userdir=", "reference", "valgrind"])
|
|
# print optlist
|
|
return (dict(optlist), args)
|
|
|
|
def usage():
|
|
message = """usage: {program} [option]... [directory]..."
|
|
-h | --help: print usage information
|
|
-r | --reference: generate new reference files (otherwise: compare)
|
|
--soffice=method:location
|
|
specify soffice instance to connect to
|
|
supported methods: 'path', 'connect'
|
|
--userdir=URL specify user installation directory for 'path' method
|
|
--valgrind pass --valgrind to soffice for 'path' method"""
|
|
print(message.format(program = os.path.basename(sys.argv[0])))
|
|
|
|
def checkTools():
|
|
try:
|
|
subprocess.check_output(["gs", "--version"])
|
|
except Exception:
|
|
print("Cannot execute 'gs'. Please install ghostscript.")
|
|
sys.exit(1)
|
|
try:
|
|
subprocess.check_output(["composite", "-version"])
|
|
subprocess.check_output(["identify", "-version"])
|
|
except Exception:
|
|
print("Cannot execute 'composite' or 'identify'.")
|
|
print("Please install ImageMagick.")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
checkTools()
|
|
(opts,args) = parseArgs(sys.argv)
|
|
if len(args) == 0:
|
|
usage()
|
|
sys.exit(1)
|
|
if "-h" in opts or "--help" in opts:
|
|
usage()
|
|
sys.exit()
|
|
elif "--soffice" in opts:
|
|
reference = "-r" in opts or "--reference" in opts
|
|
failed = runLoadPrintFileTests(opts, args, ".odt", reference)
|
|
mkAllImages(args, ".odt", "200", reference, failed)
|
|
if not(reference):
|
|
compareAllImages(args, ".odt")
|
|
else:
|
|
usage()
|
|
sys.exit(1)
|
|
|
|
# vim: set shiftwidth=4 softtabstop=4 expandtab:
|