office-gobmx/bin/convwatch.py

409 lines
13 KiB
Python

# -*- tab-width: 4; indent-tabs-mode: nil; py-indent-offset: 4 -*-
#
# This file is part of the LibreOffice project.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
import getopt
import os
import subprocess
import sys
import time
import uuid
try:
from urllib.parse import quote
except ImportError:
from urllib import quote
try:
import pyuno
import uno
import unohelper
except ImportError:
print("pyuno not found: try to set PYTHONPATH and URE_BOOTSTRAP variables")
print("PYTHONPATH=/installation/opt/program")
print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
raise
try:
from com.sun.star.document import XDocumentEventListener
except ImportError:
print("UNO API class not found: try to set URE_BOOTSTRAP variable")
print("URE_BOOTSTRAP=file:///installation/opt/program/fundamentalrc")
raise
### utilities ###
def partition(list, pred):
left = []
right = []
for e in list:
if pred(e):
left.append(e)
else:
right.append(e)
return (left, right)
def filelist(dir, suffix):
if len(dir) == 0:
raise Exception("filelist: empty directory")
if not(dir[-1] == "/"):
dir += "/"
files = [dir + f for f in os.listdir(dir)]
# print(files)
return [f for f in files
if os.path.isfile(f) and os.path.splitext(f)[1] == suffix]
def getFiles(dirs, suffix):
files = []
for dir in dirs:
files += filelist(dir, suffix)
return files
### UNO utilities ###
class OfficeConnection:
def __init__(self, args):
self.args = args
self.soffice = None
self.socket = None
self.xContext = None
def setUp(self):
(method, sep, rest) = self.args["--soffice"].partition(":")
if sep != ":":
raise Exception("soffice parameter does not specify method")
if method == "path":
socket = "pipe,name=pytest" + str(uuid.uuid1())
try:
userdir = self.args["--userdir"]
except KeyError:
raise Exception("'path' method requires --userdir")
if not(userdir.startswith("file://")):
raise Exception("--userdir must be file URL")
self.soffice = self.bootstrap(rest, userdir, socket)
elif method == "connect":
socket = rest
else:
raise Exception("unsupported connection method: " + method)
self.xContext = self.connect(socket)
def bootstrap(self, soffice, userdir, socket):
argv = [ soffice, "--accept=" + socket + ";urp",
"-env:UserInstallation=" + userdir,
"--quickstart=no", "--nofirststartwizard",
"--norestore", "--nologo", "--headless" ]
if "--valgrind" in self.args:
argv.append("--valgrind")
return subprocess.Popen(argv)
def connect(self, socket):
xLocalContext = uno.getComponentContext()
xUnoResolver = xLocalContext.ServiceManager.createInstanceWithContext(
"com.sun.star.bridge.UnoUrlResolver", xLocalContext)
url = "uno:" + socket + ";urp;StarOffice.ComponentContext"
print("OfficeConnection: connecting to: " + url)
while True:
try:
xContext = xUnoResolver.resolve(url)
return xContext
# except com.sun.star.connection.NoConnectException
except pyuno.getClass("com.sun.star.connection.NoConnectException"):
print("NoConnectException: sleeping...")
time.sleep(1)
def tearDown(self):
if self.soffice:
if self.xContext:
try:
print("tearDown: calling terminate()...")
xMgr = self.xContext.ServiceManager
xDesktop = xMgr.createInstanceWithContext(
"com.sun.star.frame.Desktop", self.xContext)
xDesktop.terminate()
print("...done")
# except com.sun.star.lang.DisposedException:
except pyuno.getClass("com.sun.star.beans.UnknownPropertyException"):
print("caught UnknownPropertyException")
pass # ignore, also means disposed
except pyuno.getClass("com.sun.star.lang.DisposedException"):
print("caught DisposedException")
pass # ignore
else:
self.soffice.terminate()
ret = self.soffice.wait()
self.xContext = None
self.socket = None
self.soffice = None
if ret != 0:
raise Exception("Exit status indicates failure: " + str(ret))
# return ret
class PerTestConnection:
def __init__(self, args):
self.args = args
self.connection = None
def getContext(self):
return self.connection.xContext
def setUp(self):
assert(not(self.connection))
def preTest(self):
conn = OfficeConnection(self.args)
conn.setUp()
self.connection = conn
def postTest(self):
if self.connection:
try:
self.connection.tearDown()
finally:
self.connection = None
def tearDown(self):
assert(not(self.connection))
class PersistentConnection:
def __init__(self, args):
self.args = args
self.connection = None
def getContext(self):
return self.connection.xContext
def setUp(self):
conn = OfficeConnection(self.args)
conn.setUp()
self.connection = conn
def preTest(self):
assert(self.connection)
def postTest(self):
assert(self.connection)
def tearDown(self):
if self.connection:
try:
self.connection.tearDown()
finally:
self.connection = None
def simpleInvoke(connection, test):
try:
connection.preTest()
test.run(connection.getContext())
finally:
connection.postTest()
def retryInvoke(connection, test):
tries = 5
while tries > 0:
try:
tries -= 1
try:
connection.preTest()
test.run(connection.getContext())
return
finally:
connection.postTest()
except KeyboardInterrupt:
raise # Ctrl+C should work
except:
print("retryInvoke: caught exception")
raise Exception("FAILED retryInvoke")
def runConnectionTests(connection, invoker, tests):
try:
connection.setUp()
for test in tests:
invoker(connection, test)
finally:
connection.tearDown()
class EventListener(XDocumentEventListener,unohelper.Base):
def __init__(self):
self.layoutFinished = False
def documentEventOccured(self, event):
# print(str(event.EventName))
if event.EventName == "OnLayoutFinished":
self.layoutFinished = True
def disposing(event):
pass
def mkPropertyValue(name, value):
return uno.createUnoStruct("com.sun.star.beans.PropertyValue",
name, 0, value, 0)
### tests ###
def loadFromURL(xContext, url):
xDesktop = xContext.ServiceManager.createInstanceWithContext(
"com.sun.star.frame.Desktop", xContext)
props = [("Hidden", True), ("ReadOnly", True)] # FilterName?
loadProps = tuple([mkPropertyValue(name, value) for (name, value) in props])
xListener = EventListener()
xGEB = xContext.ServiceManager.createInstanceWithContext(
"com.sun.star.frame.GlobalEventBroadcaster", xContext)
xGEB.addDocumentEventListener(xListener)
try:
xDoc = xDesktop.loadComponentFromURL(url, "_blank", 0, loadProps)
time_ = 0
while time_ < 30:
if xListener.layoutFinished:
return xDoc
print("delaying...")
time_ += 1
time.sleep(1)
print("timeout: no OnLayoutFinished received")
return xDoc
except:
if xDoc:
print("CLOSING")
xDoc.close(True)
raise
finally:
if xListener:
xGEB.removeDocumentEventListener(xListener)
def printDoc(xContext, xDoc, url):
props = [ mkPropertyValue("FileName", url) ]
# xDoc.print(props)
uno.invoke(xDoc, "print", (tuple(props),)) # damn, that's a keyword!
busy = True
while busy:
print("printing...")
time.sleep(1)
prt = xDoc.getPrinter()
for value in prt:
if value.Name == "IsBusy":
busy = value.Value
print("...done printing")
class LoadPrintFileTest:
def __init__(self, file, prtsuffix):
self.file = file
self.prtsuffix = prtsuffix
def run(self, xContext):
print("Loading document: " + self.file)
try:
url = "file://" + quote(self.file)
xDoc = loadFromURL(xContext, url)
printDoc(xContext, xDoc, url + self.prtsuffix)
finally:
if xDoc:
xDoc.close(True)
print("...done with: " + self.file)
def runLoadPrintFileTests(opts, dirs, suffix, reference):
if reference:
prtsuffix = ".pdf.reference"
else:
prtsuffix = ".pdf"
files = getFiles(dirs, suffix)
tests = (LoadPrintFileTest(file, prtsuffix) for file in files)
connection = PersistentConnection(opts)
# connection = PerTestConnection(opts)
runConnectionTests(connection, simpleInvoke, tests)
def mkImages(file, resolution):
argv = [ "gs", "-r" + resolution, "-sOutputFile=" + file + ".%04d.jpeg",
"-dNOPROMPT", "-dNOPAUSE", "-dBATCH", "-sDEVICE=jpeg", file ]
ret = subprocess.check_call(argv)
def mkAllImages(dirs, suffix, resolution, reference):
if reference:
prtsuffix = ".pdf.reference"
else:
prtsuffix = ".pdf"
for dir in dirs:
files = filelist(dir, suffix)
print(files)
for f in files:
mkImages(f + prtsuffix, resolution)
def identify(imagefile):
argv = ["identify", "-format", "%k", imagefile]
process = subprocess.Popen(argv, stdout=subprocess.PIPE)
result, _ = process.communicate()
if process.wait() != 0:
raise Exception("identify failed")
if result.partition(b"\n")[0] != b"1":
print("identify result: " + result)
print("DIFFERENCE in " + imagefile)
def compose(refimagefile, imagefile, diffimagefile):
argv = [ "composite", "-compose", "difference",
refimagefile, imagefile, diffimagefile ]
subprocess.check_call(argv)
def compareImages(file):
allimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
if f.startswith(file)]
# refimages = [f for f in filelist(os.path.dirname(file), ".jpeg")
# if f.startswith(file + ".reference")]
# print("compareImages: allimages:" + str(allimages))
(refimages, images) = partition(sorted(allimages),
lambda f: f.startswith(file + ".pdf.reference"))
# print("compareImages: images" + str(images))
for (image, refimage) in zip(images, refimages):
compose(image, refimage, image + ".diff")
identify(image + ".diff")
if (len(images) != len(refimages)):
print("DIFFERENT NUMBER OF IMAGES FOR: " + file)
def compareAllImages(dirs, suffix):
print("compareAllImages...")
for dir in dirs:
files = filelist(dir, suffix)
# print("compareAllImages:" + str(files))
for f in files:
compareImages(f)
print("...compareAllImages done")
def parseArgs(argv):
(optlist,args) = getopt.getopt(argv[1:], "hr",
["help", "soffice=", "userdir=", "reference", "valgrind"])
# print optlist
return (dict(optlist), args)
def usage():
message = """usage: {program} [option]... [directory]..."
-h | --help: print usage information
-r | --reference: generate new reference files (otherwise: compare)
--soffice=method:location
specify soffice instance to connect to
supported methods: 'path', 'connect'
--userdir=URL specify user installation directory for 'path' method
--valgrind pass --valgrind to soffice for 'path' method"""
print(message.format(program = os.path.basename(sys.argv[0])))
def checkTools():
try:
subprocess.check_output(["gs", "--version"])
except:
print("Cannot execute 'gs'. Please install ghostscript.")
sys.exit(1)
try:
subprocess.check_output(["composite", "-version"])
subprocess.check_output(["identify", "-version"])
except:
print("Cannot execute 'composite' or 'identify'.")
print("Please install ImageMagick.")
sys.exit(1)
if __name__ == "__main__":
# checkTools()
(opts,args) = parseArgs(sys.argv)
if len(args) == 0:
usage()
sys.exit(1)
if "-h" in opts or "--help" in opts:
usage()
sys.exit()
elif "--soffice" in opts:
reference = "-r" in opts or "--reference" in opts
runLoadPrintFileTests(opts, args, ".odt", reference)
mkAllImages(args, ".odt", "200", reference)
if not(reference):
compareAllImages(args, ".odt")
else:
usage()
sys.exit(1)
# vim:set shiftwidth=4 softtabstop=4 expandtab: