Module field_reports.exec_proxy

Expand source code
# -*- coding: utf-8 -*-
from six import raise_from
import subprocess
from field_reports.proxy import *

class ExecProxy(Proxy):
    def __init__(self, exe_path, cwd, loglevel, logout):
        self.exe_path = exe_path
        self.cwd = cwd
        self.loglevel = loglevel
        self.logout = logout

    def version(self):
        try:
            proc = subprocess.Popen(
                [self.exe_path, 'version'],
                cwd=self.cwd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE)
            out, err = proc.communicate()
            self.logout.write(err.decode('utf-8'))
            if proc.returncode != 0:
                raise RuntimeError("Exit Code = {0}".format(proc.returncode))
            return out.decode().rstrip()
        except Exception as exn:
            raise_from(ReportsError(self._exn_message(exn)), exn)

    def render(self, param):
        try:
            proc = subprocess.Popen(
                [self.exe_path, 'render', '-l' + str(self.loglevel), '-', '-'],
                cwd=self.cwd,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE)
            out, err = proc.communicate(self.to_jbytes(param))
            self.logout.write(err.decode('utf-8'))
            if proc.returncode != 0:
                raise RuntimeError("Exit Code = {0}".format(proc.returncode))
            return out
        except Exception as exn:
            raise_from(ReportsError(self._exn_message(exn)), exn)

    def parse(self, pdf):
        import json
        try:
            proc = subprocess.Popen(
                [self.exe_path, 'parse', '-'],
                cwd=self.cwd,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE)
            out, err = proc.communicate(pdf)
            self.logout.write(err.decode('utf-8'))
            if proc.returncode != 0:
                raise RuntimeError("Exit Code = {0}".format(proc.returncode))
            return json.loads(out.decode('utf-8'))
        except Exception as exn:
            raise_from(ReportsError(self._exn_message(exn)), exn)
    
    def _exn_message(self, exn):
        return "Process terminated abnormally: {0}.".format(exn)

Classes

class ExecProxy (exe_path, cwd, loglevel, logout)
Expand source code
class ExecProxy(Proxy):
    def __init__(self, exe_path, cwd, loglevel, logout):
        self.exe_path = exe_path
        self.cwd = cwd
        self.loglevel = loglevel
        self.logout = logout

    def version(self):
        try:
            proc = subprocess.Popen(
                [self.exe_path, 'version'],
                cwd=self.cwd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE)
            out, err = proc.communicate()
            self.logout.write(err.decode('utf-8'))
            if proc.returncode != 0:
                raise RuntimeError("Exit Code = {0}".format(proc.returncode))
            return out.decode().rstrip()
        except Exception as exn:
            raise_from(ReportsError(self._exn_message(exn)), exn)

    def render(self, param):
        try:
            proc = subprocess.Popen(
                [self.exe_path, 'render', '-l' + str(self.loglevel), '-', '-'],
                cwd=self.cwd,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE)
            out, err = proc.communicate(self.to_jbytes(param))
            self.logout.write(err.decode('utf-8'))
            if proc.returncode != 0:
                raise RuntimeError("Exit Code = {0}".format(proc.returncode))
            return out
        except Exception as exn:
            raise_from(ReportsError(self._exn_message(exn)), exn)

    def parse(self, pdf):
        import json
        try:
            proc = subprocess.Popen(
                [self.exe_path, 'parse', '-'],
                cwd=self.cwd,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE)
            out, err = proc.communicate(pdf)
            self.logout.write(err.decode('utf-8'))
            if proc.returncode != 0:
                raise RuntimeError("Exit Code = {0}".format(proc.returncode))
            return json.loads(out.decode('utf-8'))
        except Exception as exn:
            raise_from(ReportsError(self._exn_message(exn)), exn)
    
    def _exn_message(self, exn):
        return "Process terminated abnormally: {0}.".format(exn)

Ancestors

Inherited members