Source code for firexapp.testing.config_interpreter

import sys
import time
import os
import inspect
import subprocess
from datetime import datetime
from typing import Optional, List
import tempfile

from firexapp.common import wait_until
from firexapp.reporters.json_reporter import load_completion_report
from firexkit.resources import get_cloud_ci_install_config_path
from firexapp.submit.submit import get_firex_id_from_output, get_log_dir_from_output
from firexapp.submit.tracking_service import has_flame
from firexapp.submit.install_configs import load_new_install_configs, INSTALL_CONFIGS_ENV_NAME
from firexapp.testing.config_base import InterceptFlowTestConfiguration, FlowTestConfiguration


[docs] class ConfigInterpreter: execution_directory = None tmp_json_file: Optional[str] def __init__(self): self.profile = False self.coverage = False self.is_public = False self.tmp_json_file = None
[docs] @staticmethod def is_submit_command(test_config: FlowTestConfiguration): cmd = test_config.initial_firex_options() if not cmd: return False if cmd[0] in "submit": return True return cmd[0] not in ["list", "info"]
[docs] @staticmethod def is_instance_of_intercept(test_config: FlowTestConfiguration): base_classes = [cls.__name__ for cls in inspect.getmro(type(test_config))[1:]] return InterceptFlowTestConfiguration.__name__ in base_classes
[docs] @staticmethod def create_mock_file(results_folder, results_file, test_name, intercept_microservice): mock_file_name = test_name + "_mock.py" mock_file = os.path.join(results_folder, mock_file_name) intercept_task = """ import os from celery import current_app as app from firexkit.task import FireXTask # noinspection PyPep8Naming @app.task(base=FireXTask) def {0}(**kwargs): local_stuff = locals() local_stuff.update(kwargs) import pickle def str_default(o): return str(o) from helper import str2file file_path = "{1}" dir_path = os.path.dirname(file_path) if not os.path.isdir(dir_path): os.makedirs(dir_path) with open(file_path, "wb") as f: pickle.dump(local_stuff, f) if not os.path.isfile(file_path): raise Exception("Could not create result files") """ content = intercept_task.format(intercept_microservice, results_file) if not os.path.isdir(results_folder): os.mkdir(results_folder) with open(mock_file, "w") as f: f.write(content) return mock_file
[docs] def document_viewer(self, file_path: str)->str: return file_path
[docs] @staticmethod def get_intercept_results_file(flow_test_config): return os.path.join(flow_test_config.results_folder, flow_test_config.name + ".results")
[docs] @staticmethod def get_test_name(flow_test_config): return flow_test_config.__class__.__name__
[docs] def run_integration_test(self, flow_test_config, results_folder): # provide sub-folder for testsuite data flow_test_config.results_folder = os.path.join(results_folder, flow_test_config.name) os.makedirs(flow_test_config.results_folder) flow_test_config.std_out = os.path.join(flow_test_config.results_folder, flow_test_config.name + ".stdout.txt") flow_test_config.std_err = os.path.join(flow_test_config.results_folder, flow_test_config.name + ".stderr.txt") cmd = self.create_cmd(flow_test_config) self.run_executable(cmd, flow_test_config)
[docs] def create_cmd(self, flow_test_config) -> List[str]: # assemble options, adding/consolidating --external and --sync cmd = self.get_exe(flow_test_config) cmd += flow_test_config.initial_firex_options() plugins = self.collect_plugins(flow_test_config) if plugins: cmd += ["--plugins", ",".join(plugins)] submit_test = self.is_submit_command(flow_test_config) if submit_test: flow_test_config.logs_link = os.path.join(flow_test_config.results_folder, flow_test_config.name + ".logs") cmd += ["--logs_link", flow_test_config.logs_link] if getattr(flow_test_config, "sync", True): cmd += ["--sync"] if has_flame() and getattr(flow_test_config, "flame_terminate_on_complete", True): cmd += ["--flame_terminate_on_complete"] if (self.is_public # only use public CI install config for is_public runs and not getattr(flow_test_config, 'no_install_config', False) # Some tests supply install configs (via CLI or ENV) for legitimate testing purposes. and '--install_configs' not in cmd and INSTALL_CONFIGS_ENV_NAME not in os.environ): # TODO: should merge test-specific install_configs with ci-viewer configs, # since we usually want the ci URLs, even with a test's install_config specifies other stuff. cmd += ['--install_configs', get_cloud_ci_install_config_path()] if '--json_file' not in cmd: self.tmp_json_file = tempfile.mktemp(prefix='firex_json_file_') cmd += ['--json_file', self.tmp_json_file] return cmd
[docs] def get_exe(self, flow_test_config) -> List[str]: import firexapp if self.coverage and not hasattr(flow_test_config, 'no_coverage'): return ["coverage", "run", "--branch", "--append", "-m", firexapp.__name__] if self.profile: base_dir = os.path.dirname(firexapp.__file__) exe_file = os.path.join(base_dir, "__main__.py") return ["python", "-m", "cProfile", "-s", "cumtime", exe_file] return ["python", "-m", firexapp.__name__]
[docs] def collect_plugins(self, flow_test_config)->[]: # add test file and dynamically generated files to --plugins test_src_file = inspect.getfile(flow_test_config.__class__) plugins = [ os.path.realpath(test_src_file), # must be first to be imported ] submit_test = self.is_submit_command(flow_test_config) if submit_test: if self.coverage and not hasattr(flow_test_config, 'no_coverage'): # add the coverage plugin to restart celery in coverage mode from firexapp.testing import coverage_plugin plugins.append(coverage_plugin.__file__) if self.is_instance_of_intercept(flow_test_config): # create file containing mock and capture microservices intercept = flow_test_config.intercept_service() if intercept: intercept_results_file = self.get_intercept_results_file(flow_test_config) mock_file = self.create_mock_file(flow_test_config.results_folder, intercept_results_file, flow_test_config.name, intercept) plugins.append(mock_file) # test and use add_plugins=False to explicitly prevent dynamic plugins being added to the test extras = getattr(flow_test_config, "add_plugins", []) if extras: plugins += extras if extras is False: plugins = [] return plugins
[docs] def run_executable(self, cmd, flow_test_config): # print useful links test_src_file = inspect.getfile(flow_test_config.__class__) if hasattr(flow_test_config, 'logs_link'): print("\tLogs:", self.document_viewer(flow_test_config.logs_link)) print("\tTest source:", self.document_viewer(test_src_file)) print("\tStdout:", self.document_viewer(flow_test_config.std_out)) print("\tStderr:", self.document_viewer(flow_test_config.std_err)) elapsed_time = None verification_time = None # run firex try: with open(flow_test_config.std_out, 'w') as std_out_f, open(flow_test_config.std_err, 'w') as std_err_f: start_time = time.monotonic() process = subprocess.Popen(cmd, stdout=std_out_f, stderr=std_err_f, universal_newlines=True, shell=False, cwd=self.execution_directory) _, _ = process.communicate(timeout=getattr(flow_test_config, "timeout", None)) elapsed_time = time.monotonic() - start_time verification_start_time = time.monotonic() # check for expected return code expected_return = flow_test_config.assert_expected_return_code(process.returncode) if expected_return is not None: raise Exception("assert_expected_return_code should not return. It should assert if needed") if self.tmp_json_file and os.path.isfile(self.tmp_json_file): flow_test_config.run_data = load_completion_report(self.tmp_json_file) if self.is_submit_command(flow_test_config) and process.returncode == 0 and \ self.is_instance_of_intercept(flow_test_config) and \ flow_test_config.intercept_service(): # retrieve captured kwargs and validate them intercept_results_file = self.get_intercept_results_file(flow_test_config) if not os.path.isfile(intercept_results_file): raise FileNotFoundError(intercept_results_file + " was not found. Could not retrieve results.") import pickle with open(intercept_results_file, 'rb') as results_file_f: captured_options = pickle.load(results_file_f) flow_test_config.assert_expected_options(captured_options) with open(flow_test_config.std_out, 'r') as std_out_f, open(flow_test_config.std_err, 'r') as std_err_f: errors = std_err_f.read().split("\n") errors = [line for line in errors if line and not line.startswith("pydev debugger:")] flow_test_config.assert_expected_firex_output(std_out_f.read(), "\n".join(errors)) verification_time = time.monotonic() - verification_start_time except (subprocess.TimeoutExpired, KeyboardInterrupt) as e: elapsed_time = getattr(e, 'timeout', None) print("\t%s! Current wall time: %s" % (type(e).__name__, datetime.now().strftime('%c')), file=sys.stderr) verification_start_time = time.monotonic() self.cleanup_after_timeout(flow_test_config.std_out, flow_test_config.std_err) verification_time = time.monotonic() - verification_start_time raise except Exception as e: print('\tException: {}: {}'.format(type(e).__name__, e), file=sys.stderr) raise finally: self.on_test_exit(flow_test_config.std_out, flow_test_config.std_err) try: flow_test_config.cleanup() except Exception as cleanup_e: print(f'Exception during flow test cleanup: {cleanup_e}') if self.tmp_json_file and os.path.exists(self.tmp_json_file): os.unlink(self.tmp_json_file) # report on time if elapsed_time is not None: msg = "\tTime %.1fs" % elapsed_time if verification_time is not None: overhead = " +(%.1fs)" % verification_time if overhead != " +(0.0s)": msg += overhead print(msg)
[docs] def cleanup_after_timeout(self, std_out, std_err): pass
# noinspection PyMethodMayBeStatic,PyUnusedLocal
[docs] def on_test_exit(self, std_out, std_err): # Try to print to log directory to help with debugging # noinspection PyBroadException try: with open(std_out, 'r') as std_out_f: std_out_content = std_out_f.read() firex_id = get_firex_id_from_output(std_out_content) if firex_id: print("\tFireX ID: " + firex_id) if self.is_public: install_configs = load_new_install_configs(firex_id, get_log_dir_from_output(std_out_content), get_cloud_ci_install_config_path()) print(f'\tLogs URL: {install_configs.get_logs_root_url()}') print(f"\tFlame: {install_configs.run_url}") except Exception as e: print(e) pass