Source code for firexapp.info

import inspect
import re
from textwrap import wrap
from celery.exceptions import NotRegistered

from firexapp.discovery import get_all_pkg_versions_str
from firexapp.plugins import plugin_support_parser
from firexapp.application import import_microservices, get_app_task


[docs] class InfoBaseApp: def __init__(self): self._list_sub_parser = None self._info_sub_parser = None
[docs] def create_list_sub_parser(self, sub_parser): list_parser = sub_parser.add_parser("list", help="Lists FireX microservices, or used arguments" " {microservices,arguments}", parents=[plugin_support_parser]) list_group = list_parser.add_mutually_exclusive_group(required=True) list_group.add_argument("--microservices", '-microservices', help="Lists all available microservices", action='store_true') list_group.add_argument("--arguments", '-arguments', help="Lists all arguments used by microservices", action='store_true') list_group.set_defaults(func=self.run_list) return list_group
[docs] def create_info_sub_parser(self, sub_parser): if not self._info_sub_parser: info_parser = sub_parser.add_parser("info", help="Lists detailed information about a microservice", parents=[plugin_support_parser]) info_parser.add_argument("entity", help="The short or long name of the microservice to be detailed, or a " "microservice argument. It can be a Python compatible regexp to " "display information about all services matching that expression.") info_parser.set_defaults(func=self.run_info) self._info_sub_parser = info_parser return self._info_sub_parser
[docs] def create_version_sub_parser(self, sub_parser): version_parser = sub_parser.add_parser("version", help="Print FireX Package Version Information", parents=[plugin_support_parser]) version_parser.set_defaults(func=self.version) return version_parser
[docs] @staticmethod def version(_args): print(get_all_pkg_versions_str())
[docs] def run_list(self, args): if args.microservices: self.print_available_microservices(args.plugins) elif args.arguments: self.print_argument_used(args.plugins)
[docs] def run_info(self, args): self.print_details(args.entity, args.plugins)
[docs] @staticmethod def print_available_microservices(plugins: str): apps = import_microservices(plugins) print() print("The following microservices are available:") services = [str(task) for task in apps] services = [task for task in services if not task.startswith('celery.')] # filter out base celery types services.sort() for service in services: print(service) pointers = [(full, apps[full].name) for full in apps if apps[full].name not in full] if pointers: print("\nPointers (override -> original):") for new, old in pointers: print(new, "->", old) print("\nUse the info sub-command for more details\n")
[docs] @staticmethod def print_argument_used(plugins: str): all_tasks = import_microservices(plugins) print() print("The following arguments are used by microservices:") usage = get_argument_use(all_tasks) for arg in sorted(usage): print(arg) print("\nUse the info sub-command for more details\n")
[docs] def print_partial_task_matches(self, entity, all_tasks): entries_found = False for task_name in sorted(all_tasks, key=lambda i: i.split('.')[-1]): # Is this even a partial match if not re.search(entity, task_name): continue try: task = get_app_task(task_name, all_tasks) except NotRegistered: continue else: if entries_found: print('\n') self.print_task_details(task) entries_found = True return entries_found
[docs] def print_details(self, entity, plugins, all_tasks=None): if not all_tasks: all_tasks = import_microservices(plugins) # Is this entity a microservice try: # Do we have a match on the full name? task = get_app_task(entity, all_tasks) except NotRegistered: # Do we have a match on the partial name if self.print_partial_task_matches(entity, all_tasks): return else: self.print_task_details(task) return # Is this entity an argument all_args = get_argument_use(all_tasks) if entity in all_args: print("Argument name: " + entity) print("Used in the following microservices:") for micro in all_args[entity]: print(micro.name) return self._info_sub_parser.exit(status=-1, message="Microservice %s was not found!" % entity)
[docs] @classmethod def parse_task_docstring(cls, task): if not task.__doc__: return None, None header = None arg_dict = None docstring = inspect.getdoc(task) match = re.search(r"^(.*)\n\s*Arguments?[^\n]*\n\s*-*(.*)", docstring, re.MULTILINE | re.DOTALL) if match: if len(match.group(1).strip()): header = match.group(1).strip() if len(match.group(2).strip()): arg_desc_str = match.group(2).strip() arg_dict = {} # Need to determine if args are indicated with prefixed '--' if re.search("^\s*--", arg_desc_str): # assume our args all start with -- arg_prefix = "(?:--)" else: # no prefix arg_prefix = "" # Go over arguments section and create dict of args/description desc = '' arg = None for line in arg_desc_str.split("\n"): line = line.strip() # blank line ends entry if not len(line): if arg: arg_dict[arg] = desc arg = None desc = '' # Look for start of new entry match = re.search(r"^" + arg_prefix + r"(\S[^:\(\r\n]+)(?:\([^)\r\n]*\))?:(?:(?:\([^)\r\n]*\))?[^\S\r\n]*([^\r\n]*))?$", line) if match: if arg: arg_dict[arg] = desc arg = match.group(1) desc = match.group(2).strip() # Remove ':' and any leading () that usually indicate type. These can appear in either order match = re.search(r"^(?:\([^)]*\)\s*)?:(?:\([^)]*\))?\s*(.+)$", desc) if match: desc = match.group(1) else: # continue prev entry if len(desc): desc += " " + line else: desc = line # store last outstanding entry if arg: arg_dict[arg] = desc else: header = docstring return header, arg_dict
[docs] @classmethod def print_task_details(cls, task): dash_length = 40 print('-' * dash_length) split_name = task.name.split(".") name = split_name[-1] path = '.'.join(split_name[0:-1]) if path: path = " (%s)" % path print("Name: " + name + path) header, arguments = cls.parse_task_docstring(task) if header: print('\n' + header) if not arguments: arguments = {} def print_arg(arg, deflt, description): max_arg_len = 25 max_desc_len = 80 - max_arg_len - len(tab) arg_str = f"{tab}{arg}" # Add default value, if present if deflt is not None: arg_str += f"(default={deflt})" if description: # Add filler or newline, depending on arg_str length if len(arg_str) >= max_arg_len: arg_str += "\n" + (' ' * max_arg_len) else: arg_str += " " * (max_arg_len - len(arg_str)) # Remove excess spacing in description description = re.sub(r" {2,}", " ", description) # Add description if len(description) < max_desc_len: arg_str += description else: arg_str += f"\n{' ' * max_arg_len}".join(wrap(description, max_desc_len)) print(arg_str) tab = ' ' * 1 print("\nArguments info") print("--------------") required_args = getattr(task, "required_args", []) cnt = 0 for chain_arg in sorted(required_args): if "self" not in chain_arg and \ "uid" not in chain_arg and \ chain_arg != 'kwargs': desc = arguments.get(chain_arg, None) print_arg(chain_arg, None, desc) cnt += 1 optional_args = getattr(task, "optional_args", {}) if len(optional_args): for chain_arg in sorted(optional_args): if chain_arg.startswith('_'): continue desc = arguments.get(chain_arg, None) default = optional_args[chain_arg] if default is None: default = "None" print_arg(chain_arg, default, desc) else: if not cnt: print(tab, "None") print('\nReturns\n-------') out = getattr(task, "return_keys", {}) if out: for chain_arg in sorted(out): desc = arguments.get(chain_arg, None) print_arg(chain_arg, None, desc) else: print(tab, "None")
[docs] def get_argument_use(all_tasks) -> dict: argument_usage = {} for _, task in all_tasks.items(): if not hasattr(task, "required_args") or not hasattr(task, "optional_args"): continue for arg in task.required_args + list(task.optional_args): if arg in argument_usage: argument_usage[arg].add(task) else: tasks = set() tasks.add(task) argument_usage[arg] = tasks return argument_usage