API Reference

class firexapp.application.ExitSignalHandler(app: FireXBaseApp)[source]

Bases: object

first_warning = '\nExiting due to signal %s'
last_warning = "\nFINE! We'll stop. But you might have leaked a celery instance or a broker instance."
second_warning = '\nWe know! Have a little patience for crying out loud!'
class firexapp.application.FireXBaseApp(submit_app=None, info_app=None)[source]

Bases: object

create_arg_parser(description=None) ArgumentParser[source]
main_error_exit_handler(reason=None, run_revoked=False)[source]
run(sys_argv=None)[source]
exception firexapp.application.JsonContentNotList[source]

Bases: Exception

firexapp.application.get_app_task(task_short_name: str, all_tasks=None)[source]
firexapp.application.get_app_tasks(tasks, all_tasks=None)[source]
firexapp.application.get_args_from_json(json_file: str) List[str][source]
firexapp.application.get_args_from_json_from_all_args(all_args: List[str]) List[str][source]
firexapp.application.import_microservices(plugins_files=None, imports: tuple | None = None) [][source]
firexapp.application.main()[source]
class firexapp.celery_manager.CeleryManager(plugins=None, logs_dir=None, worker_log_level='debug', cap_concurrency=None, app='firexapp.engine', env=None, broker=None)[source]

Bases: object

static cap_cpu_count(count, cap_concurrency)[source]
property celery_bin
celery_bin_name = 'celery'
property celery_logs_dir
property celery_pids_dir
extract_errors_from_celery_logs(celery_log_file, max_errors=20)[source]
find_all_procs()[source]
static find_procs(pid_file)[source]
static get_celery_logs_dir(logs_dir)[source]
static get_celery_pids_dir(logs_dir)[source]
classmethod get_pid(logs_dir, workername, hostname='runner--azerasq-project-33192579-concurrent-0')[source]
classmethod get_pid_file(logs_dir, workername, hostname='runner--azerasq-project-33192579-concurrent-0')[source]
classmethod get_pid_from_file(pid_file)[source]
static get_plugins_env(plugins)[source]
static get_worker_and_host(workername, hostname)[source]
classmethod get_worker_log_file(logs_dir, worker_and_host)[source]
static get_worker_logs_dir(logs_dir)[source]
classmethod get_worker_pids(logs_dir, hostname, workernames)[source]
kill_all_forked(pid_file)[source]
classmethod log(msg, header=None, level=10)[source]
shutdown(timeout=60)[source]
start(workername, queues=None, wait=True, timeout=900, concurrency=None, worker_log_level=None, app=None, cap_concurrency=None, cwd=None, soft_time_limit=None, autoscale: tuple | None = None, detach: bool = True)[source]
classmethod terminate(pid, timeout=60)[source]
update_env(env)[source]
wait_for_shutdown(timeout=15)[source]
wait_until_active(pid_file, stdout_file, workername, timeout=900)[source]
property workers_logs_dir
exception firexapp.celery_manager.CeleryWorkerStartFailed[source]

Bases: Exception

firexapp.common.delimit2list(str_to_split, delimiters=(',', ';', '|', ' ')) [][source]
firexapp.common.dict2str(mydict, sort=False, sep='    ', usevrepr=True, line_prefix='')[source]
firexapp.common.find(keys, input_dict)[source]
firexapp.common.find_procs(name, cmdline_regex=None, cmdline_contains=None)[source]
firexapp.common.get_available_port(so_reuseport=True)[source]
firexapp.common.poll_until_dir_empty(dir_path, timeout=15)[source]
firexapp.common.poll_until_existing_file_not_empty(file_path, timeout=10)[source]
firexapp.common.poll_until_file_exist(file_path, timeout=10)[source]
firexapp.common.poll_until_file_not_empty(file_path, timeout=10)[source]
firexapp.common.poll_until_path_exist(path, timeout=10)[source]
firexapp.common.proc_matches(proc_info, pname, cmdline_regex, cmdline_contains)[source]
firexapp.common.qualify_firex_bin(bin_name)[source]
firexapp.common.render_template(template_str, template_args)[source]
firexapp.common.select_env_vars(env_names)[source]
firexapp.common.silent_mkdir(path, exist_ok=True, **kwargs)[source]
firexapp.common.wait_until(predicate, timeout, sleep_for, *args, **kwargs)[source]
firexapp.common.wait_until_pid_not_exist(pid, timeout=7, sleep_for=1)[source]
class firexapp.discovery.PkgVersionInfo(pkg=None, version=None, commit=None)[source]

Bases: PkgVersionInfo

firexapp.discovery.discover_package_modules(current_path, root_path=None) [<class 'str'>][source]
firexapp.discovery.find_firex_task_bundles() [<class 'str'>][source]
firexapp.discovery.get_all_pkg_versions() [<class 'firexapp.discovery.PkgVersionInfo'>][source]
firexapp.discovery.get_all_pkg_versions_as_dict() dict[source]
firexapp.discovery.get_all_pkg_versions_str() str[source]
firexapp.discovery.get_firex_dependant_package_versions() [<class 'firexapp.discovery.PkgVersionInfo'>][source]
firexapp.discovery.get_firex_tracking_services_entry_points() [<class 'entrypoints.EntryPoint'>][source]
firexapp.discovery.loaded_firex_bundles_entry_points(path=None) Dict[EntryPoint, object][source]
firexapp.discovery.loaded_firex_core_entry_points(path=None) Dict[EntryPoint, object][source]
firexapp.discovery.loaded_firex_entry_points(path=None)[source]
firexapp.discovery.prune_duplicate_module_entry_points(entry_points) [<class 'entrypoints.EntryPoint'>][source]
class firexapp.fileregistry.FileRegistry(*args, **kwargs)[source]

Bases: object

classmethod destroy()[source]
dump_to_file(path)[source]
get_file(key, uid_or_logsdir)[source]
get_relative_path(key)[source]
static read_from_file(path)[source]
register_file(key, relative_path)[source]
static resolve_path(uid_or_logsdir, relative_path)[source]
exception firexapp.fileregistry.KeyAlreadyRegistered[source]

Bases: Exception

exception firexapp.fileregistry.KeyNotRegistered[source]

Bases: Exception

class firexapp.fileregistry.Singleton[source]

Bases: type

class firexapp.info.InfoBaseApp[source]

Bases: object

create_info_sub_parser(sub_parser)[source]
create_list_sub_parser(sub_parser)[source]
create_version_sub_parser(sub_parser)[source]
classmethod parse_task_docstring(task)[source]
static print_argument_used(plugins: str)[source]
static print_available_microservices(plugins: str)[source]
print_details(entity, plugins, all_tasks=None)[source]
print_partial_task_matches(entity, all_tasks)[source]
classmethod print_task_details(task)[source]
run_info(args)[source]
run_list(args)[source]
static version(_args)[source]
firexapp.info.get_argument_use(all_tasks) dict[source]
class firexapp.plugins.CommaDelimitedListAction(option_strings, dest, nargs=None, **kwargs)[source]

Bases: Action

exception firexapp.plugins.PluginLoadError[source]

Bases: Exception

firexapp.plugins.cdl2list(plugin_files)[source]
firexapp.plugins.create_replacement_task(original, name_postfix, sigs)[source]
firexapp.plugins.find_plugin_file(file_path)[source]
firexapp.plugins.get_active_plugins()[source]
firexapp.plugins.get_plugin_module_name(plugin_file)[source]
firexapp.plugins.get_plugin_module_names(plugin_files)[source]
firexapp.plugins.get_plugin_module_names_from_env()[source]
firexapp.plugins.get_short_name(long_name: str) str[source]
firexapp.plugins.identify_duplicate_tasks(all_tasks, priority_modules: list) [[]][source]

Returns a list of substitution. Each substitution is a list of microservices. The last will be the ‘dominant’ one. It will be the one used.

firexapp.plugins.import_plugin_file(plugin_file)[source]
firexapp.plugins.import_plugin_files(plugin_files) set[str][source]
firexapp.plugins.load_plugin_modules(plugin_files)[source]
firexapp.plugins.load_plugin_modules_from_env()[source]
firexapp.plugins.merge_plugins(*plugin_lists) [][source]

Merge comma delimited lists of plugins into a single list. Right-handed most significant plugin

firexapp.plugins.set_plugins_env(plugin_files)[source]
firexapp.engine.celery.add_items_to_conf(conf=None, **_kwargs)[source]
exception firexapp.submit.arguments.ChainArgException[source]

Bases: Exception

class firexapp.submit.arguments.InputConverter[source]

Bases: object

This class uses a singleton object design to store converters which parse the cli arguments. Converter functions are stored into the singleton InputConverter object by adding the @register decorator to the top of each desired function.

classmethod convert(pre_load=None, **kwargs) dict[source]

Activates conversion. kwargs provided are passed to any registered converter. This function should be called twice, and only twice. Once with initially loaded converters, and then with the secondary ones.

Parameters:

pre_load – Used for testing. preload is defaulted to None and will auto populate

classmethod instance() ConverterRegister[source]

Used for unit testing only

pre_load_was_run = False
classmethod register(*args)[source]

Registers a callable object to be run during conversion. The callable should take in kwargs, and return a dict with any changes to the input arguments, or None if no changes are necessary.

Example single argument converter:

@InputConverter.register @SingleArgDecorator(‘something’) def convert_something(arg_value):

arg_value = arg_value.upper() return arg_value

Optionally, dependencies can defined at registration:

@InputConverter.register(‘other_converter’, ‘and_another_converter’) @SingleArgDecorator(‘something’) def convert_something(arg_value):

arg_value = arg_value.upper() return arg_value

Conversion occurs on two occasions, before microservices are loaded, or after. You can explicitly mark a converter to run pre-loading or post-loading of the ALL microservices by passing True (pre) or False (post) during registration. This design is used in the spirit of failing fast, providing early failure of runs before the bulk of microservices are imported. If bool is not provided, it will register to run pre unless loading has already occurred.

@InputConverter.register(‘other_converter’, False) @SingleArgDecorator(‘something’) def convert_something(arg_value):

… return arg_value

When a conversion fails the given function can simply call raise to instruct the user how to correct their inputs.

firexapp.submit.arguments.auto_load_pydev_debugging_plugin(kwargs)[source]
firexapp.submit.arguments.convert_booleans(kwargs)[source]

Converts standard true/false/none values to bools and None

firexapp.submit.arguments.find_unused_arguments(chain_args: {}, ignore_list: [], all_tasks: [])[source]

Function to detect any arguments that are not explicitly consumed by any microservice.

Note:

This should be run AFTER all microservices have been loaded.

Parameters:
  • chain_args (dict) – The dictionary of chain args to check

  • ignore_list (list) – A list of exception arguments that are acceptable. This usually includes application args.

  • all_tasks – A list of all microservices. Usually app.tasks

Returns:

A dictionary of un-applicable arguments

firexapp.submit.arguments.get_chain_args(other_args: [])[source]

This function converts a flat list of –key value pairs into a dictionary

firexapp.submit.arguments.whitelist_arguments(argument_list: str | list)[source]

Function for adding argument keys to the global argument whitelist. Used during validation of input arguments

:param argument_list:List of argument keys to whitelist. :type argument_list: list

class firexapp.submit.console.ChainInterruptedExceptionFilter(name='')[source]

Bases: Filter

filter(record)[source]

Determine if the specified record is to be logged.

Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.

class firexapp.submit.console.DistlibWarningsFilter(name='')[source]

Bases: Filter

filter(record)[source]

Determine if the specified record is to be logged.

Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.

class firexapp.submit.console.FireXColoredConsoleFormatter(*args, **kwargs)[source]

Bases: TTYColoredFormatter

format(record)[source]

Format a message from a record object.

class firexapp.submit.console.RequeueingUndeliverableFilter(name='')[source]

Bases: Filter

filter(record)[source]

Determine if the specified record is to be logged.

Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.

class firexapp.submit.console.RetryFilter(name='')[source]

Bases: Filter

filter(record)[source]

Determine if the specified record is to be logged.

Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.

firexapp.submit.console.add_filter_to_console(log_filter)[source]
firexapp.submit.console.set_console_log_level(log_level)[source]
firexapp.submit.console.setup_console_logging(module=None, stdout_logging_level=20, console_logging_formatter='%(green)s[%(asctime)s]%(reset)s[%(hostname)s] %(log_color)s%(message)s', console_datefmt='%H:%M:%S', stderr_logging_level=40, module_logger_logging_level=None)[source]
class firexapp.submit.reporting.ReportGenerator[source]

Bases: ABC

abstract add_entry(key_name, value, priority, formatters, **extra)[source]
filter_formatters(all_formatters)[source]
formatters = ()
abstract post_run_report(root_id=None, **kwargs)[source]

This could runs in the context of __main__ if –sync, other in the context of celery. So the instance cannot be assumed be the same as in pre_run_report()

static pre_run_report(**kwarg)[source]

This runs in the context of __main__

class firexapp.submit.reporting.ReportersRegistry[source]

Bases: object

classmethod get_generators()[source]
classmethod post_run_report(results, kwargs)[source]
classmethod pre_run_report(kwargs)[source]
firexapp.submit.reporting.report(key_name=None, priority=-1, **formatters)[source]

Use this decorator to indicate what returns to include in the report and how to format it

class firexapp.submit.submit.AdjustCeleryConcurrency(option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=False, help=None, metavar=None)[source]

Bases: Action

exception firexapp.submit.submit.FireXReturnCodeException(error_msg, firex_returncode)[source]

Bases: Exception

class firexapp.submit.submit.JsonFileAction(option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=False, help=None, metavar=None)[source]

Bases: Action

class firexapp.submit.submit.OptionalBoolean(option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=False, help=None, metavar=None)[source]

Bases: Action

class firexapp.submit.submit.SubmitBaseApp(submission_tmp_file=None)[source]

Bases: object

DEFAULT_MICROSERVICE = None
SUBMISSION_LOGGING_FORMATTER = '[%(asctime)s %(levelname)s] %(message)s'
check_for_failures(root_task_result_promise, unsuccessful_services)[source]
convert_chain_args(chain_args) dict[source]
copy_submission_log()[source]
create_submit_parser(sub_parser)[source]
del_parser_attributes()[source]
dump_environ()[source]
static error_banner(err_msg, banner_title='ERROR', logf=<bound method Logger.error of <Logger firexapp.submit.submit (WARNING)>>)[source]
format_results_str(chain_results)[source]
graceful_exit_on_failure(failure_caption: str)[source]
init_file_logging()[source]
install_configs: FireXInstallConfigs
static log_firex_pkgs_versions()[source]
log_preamble()[source]

Overridable method to allow a firex application to log on startup

static log_results(results_str)[source]
main_error_exit_handler(chain_details=None, reason=None, run_revoked=False)[source]
process_other_chain_args(args, other_args) {}[source]
process_sync(root_task_result_promise, chain_args)[source]
resolve_install_configs_args(args_from_first_pass: ~argparse.Namespace, other_args_from_first_pass: list) -> (<class 'argparse.Namespace'>, <class 'list'>)[source]
run_submit(args, others)[source]
self_destruct(chain_details=None, reason=None, run_revoked=False)[source]
set_broker_in_app()[source]
start_broker(args)[source]
start_celery(args, plugins)[source]
start_engine(args, chain_args, uid) {}[source]
start_tracking_services(args, **chain_args) {}[source]
store_parser_attributes(arg_parser, submit_parser)[source]
submit(args_from_first_pass: Namespace, other_args_from_first_pass: list)[source]
classmethod validate_argument_applicability(chain_args, args, all_tasks)[source]
wait_tracking_services_pred(service_predicate, description, timeout) None[source]
wait_tracking_services_release_console_ready(timeout=5) None[source]
wait_tracking_services_task_ready(timeout=5) None[source]
firexapp.submit.submit.celery_worker_ready(sender, **_kwargs)[source]
firexapp.submit.submit.format_unsuccessful_services(unsuccessful_services)[source]
firexapp.submit.submit.get_firex_id_from_output(cmd_output: str) str[source]
firexapp.submit.submit.get_log_dir_from_output(cmd_output: str) str[source]
firexapp.submit.submit.get_unsuccessful_items(list_of_tasks, filters=None)[source]
firexapp.submit.submit.safe_create_completed_run_json(uid, chain_result, run_revoked, chain_args)[source]
firexapp.submit.submit.safe_create_initial_run_json(**kwargs)[source]
class firexapp.submit.tracking_service.TrackingService[source]

Bases: ABC

extra_cli_arguments(arg_parser)[source]
get_pkg_version_info() PkgVersionInfo | None[source]
install_configs: FireXInstallConfigs
ready_for_tasks(**kwargs) bool[source]
ready_release_console(**kwargs) bool[source]
abstract start(args, install_configs: FireXInstallConfigs, **kwargs) {}[source]
firexapp.submit.tracking_service.get_service_name(service: TrackingService) str[source]
firexapp.submit.tracking_service.get_tracking_services() tuple[TrackingService] | None[source]
firexapp.submit.tracking_service.get_tracking_services_versions() list[PkgVersionInfo][source]
firexapp.submit.tracking_service.has_flame() bool[source]
class firexapp.submit.uid.FireXIdParts(user: str, timestamp: datetime.datetime, random_int: int)[source]

Bases: object

firex_id()[source]
random_int: int
timestamp: datetime
user: str
class firexapp.submit.uid.Uid(identifier=None)[source]

Bases: object

add_viewers(**attrs)[source]
property base_logging_dir
copy_resources()[source]
create_debug_dir()[source]
create_logs_dir()[source]
property debug_dir
debug_dirname = 'firex_internal'
classmethod get_resources_path(logs_dir)[source]
property logs_dir
property logs_url
property resources_dir
property run_data
property viewers
firexapp.submit.uid.firex_id_str(user: str, timestamp: datetime, random_int: int) str[source]
firexapp.submit.uid.get_firex_id_parts(maybe_firex_id: str) FireXIdParts | None[source]
firexapp.submit.uid.is_firex_id(maybe_firex_id: str) bool[source]
firexapp.tasks.example.employee_names_to_list(employee_names)[source]
firexapp.tasks.example.to_list(guests: str) list[str][source]
class firexapp.testing.config_base.FlowTestConfiguration[source]

Bases: object

abstract assert_expected_firex_output(cmd_output, cmd_err)[source]
abstract assert_expected_return_code(ret_value)[source]
cleanup()[source]
abstract initial_firex_options() list[source]
property name
run_data: FireXRunData | None
class firexapp.testing.config_base.InterceptFlowTestConfiguration[source]

Bases: FlowTestConfiguration

assert_expected_firex_output(cmd_output, cmd_err)[source]
abstract assert_expected_options(captured_kwargs)[source]
assert_expected_return_code(ret_value)[source]
abstract intercept_service() str[source]

Name of the microservice that will be mocked into the validator capturing the options that get compared by assertExpectedOptions()

run_data: FireXRunData | None
firexapp.testing.config_base.assert_is_bad_run(ret_value)[source]
firexapp.testing.config_base.assert_is_good_run(ret_value)[source]
firexapp.testing.config_base.discover_tests(tests, config_filter='') list[source]
firexapp.testing.config_base.import_test_configs(path) [][source]
firexapp.testing.config_base.skip_test(cls)[source]
class firexapp.testing.config_interpreter.ConfigInterpreter[source]

Bases: object

cleanup_after_timeout(std_out, std_err)[source]
collect_plugins(flow_test_config) [][source]
create_cmd(flow_test_config) List[str][source]
static create_mock_file(results_folder, results_file, test_name, intercept_microservice)[source]
document_viewer(file_path: str) str[source]
execution_directory = None
get_exe(flow_test_config) List[str][source]
static get_intercept_results_file(flow_test_config)[source]
static get_test_name(flow_test_config)[source]
static is_instance_of_intercept(test_config: FlowTestConfiguration)[source]
static is_submit_command(test_config: FlowTestConfiguration)[source]
on_test_exit(std_out, std_err)[source]
run_executable(cmd, flow_test_config)[source]
run_integration_test(flow_test_config, results_folder)[source]
tmp_json_file: str | None
firexapp.testing.coverage_plugin.find_in_stack(file_to_find) bool[source]
firexapp.testing.coverage_plugin.is_celery() bool[source]
firexapp.testing.coverage_plugin.is_running_under_coverage() bool[source]
firexapp.testing.coverage_plugin.restart_celery_under_coverage()[source]
class firexapp.testing.test_infra.FlowTestInfra(methodName='runTest')[source]

Bases: TestCase

config_interpreter = <firexapp.testing.config_interpreter.ConfigInterpreter object>
failures = 0
max_acceptable_failures = None
classmethod populate_tests()[source]
results_dir = None
setUp()[source]

Hook method for setting up the test fixture before exercising it.

tearDown()[source]

Hook method for deconstructing the test fixture after testing it.

test_configs = []
firexapp.testing.test_infra.default_main()[source]
firexapp.testing.test_infra.main(default_results_dir, default_test_dir)[source]