from getpass import getuser
import time
from typing import Optional
from firexkit.argument_conversion import SingleArgDecorator
from firexkit.chain import InjectArgs, returns
from firexkit.task import FireXTask, flame, flame_collapse
from firexapp.engine.celery import app
from firexapp.submit.arguments import InputConverter
from firexapp.tasks.core_tasks import CopyBogKeys
@app.task
def nop() -> None:
return
@app.task
def sleep(sleep: Optional[int] = None) -> None:
if sleep:
time.sleep(int(sleep))
return
@app.task(returns='username')
def getusername() -> str:
return getuser()
# The @app.task() makes this normal python function a FireX Service.
@app.task(returns=['greeting'], flame=['greeting'])
def greet(name: str = getuser()) -> str:
assert len(name) > 1, "Cannot greet a name with 1 or fewer characters."
return 'Hello %s!' % name
# Setting bind=True makes the first argument received by the service 'self'. It's most commonly used to invoke
# (enqueue) other services, but provides much more functionality as outlined here:
@app.task(bind=True, returns=['guests_greeting'], flame=['guests_greeting'])
def greet_guests(self: FireXTask, guests: list[str]) -> str:
child_promises = []
for guest in guests:
# Create a Celery Signature, see: https://docs.celeryproject.org/en/latest/userguide/canvas.html#signatures
greet_signature = greet.s(name=guest)
# enqueue_child by default schedules the supplied signature for execution asynchronously and immediately
# returns the newly created child result promise.
child_promise = self.enqueue_child(greet_signature)
child_promises.append(child_promise)
# We want to get all the return values (greetings) from child tasks, but we must wait first to make sure they're all
# available before inspecting them with child_promise.result[<returns_key>]
self.wait_for_children(raise_exception_on_failure=False)
# Since wait_for_children has completed, we know it's safe to inspect the results of all child task promises,
# after verifying the task was a success.
greetings = [promise.result['greeting'] for promise in child_promises if promise.successful()]
if any(promise.failed() for promise in child_promises):
greetings.append("And apologies to those not mentioned.")
return ' '.join(greetings)
[docs]
@InputConverter.register
@SingleArgDecorator('guests')
def to_list(guests: str) -> list[str]:
return guests.split(',')
@app.task(returns=['amplified_message'])
def amplify(to_amplify: str,
upper: bool = True,
surround_str: Optional[str] = None,
underline_char: Optional[str] = None,
overline_char: Optional[str] = None) -> str:
result = to_amplify
if upper:
result = to_amplify.upper()
if surround_str:
result = surround_str + result + surround_str
centerline_len = len(result)
if underline_char:
result = result + '\n' + (underline_char * centerline_len)
if overline_char:
result = (overline_char * centerline_len) + '\n' + result
return result
def _amplified_greeting_formatter(args_and_maybe_results: dict) -> str:
# Since 'amplified_greeting' is the return value name, it isn't available to the formatter when the task is first
# started. It will be available if the task completes successfully.
if 'amplified_greeting' in args_and_maybe_results:
br_as_nl = args_and_maybe_results["amplified_greeting"].replace('\n', '<br>')
return f'<h1 style="font-family: monospace;">{br_as_nl}</h1>'
# Since 'guests' is an input argument, it will always be available to the formatter, even before the service
# has completed.
return f'Planning to greet: {",".join(args_and_maybe_results["guests"])}'
@app.task(bind=True, returns=['amplified_greeting'])
@flame('*', _amplified_greeting_formatter)
@flame_collapse({'greet_guests': 'descendants'})
def amplified_greet_guests(self: FireXTask, guests: list[str]) -> str:
# Nonsense failure case to illustrate flame HTML data when the service fails (i.e. no return value present).
assert len(guests) > 1, "Only willing to amplify greeting for more than one guest."
# Create a chain that can be enqueued. The greet_guests service will produce a guests_greeting,
# which will then be delivered to amplify as its to_amplify argument.
amplified_greet_guests_chain = InjectArgs(**self.abog) | greet_guests.s() | amplify.s(to_amplify='@guests_greeting')
# Chains can be enqueued just like signatures. You can consider a signature a chain with only one service.
chain_results = self.enqueue_child_and_get_results(amplified_greet_guests_chain)
return chain_results['amplified_message']
@app.task()
@returns('job_title')
def get_springfield_power_plant_job_title(name):
username_to_title = {'Charles Montgomery Burns': 'OWNER',
'Waylon Smithers': 'EXECUTIVE ASSISTANT',
'Lenny Leonard': 'DIRECTOR',
'Homer Simpson': 'SUPERVISOR'}
return username_to_title.get(name, 'UNKNOWN')
[docs]
@InputConverter.register
@SingleArgDecorator('employee_names')
def employee_names_to_list(employee_names):
return employee_names.split(',')
@app.task(bind=True, returns=['amplified_greeting'])
def greet_springfield_power_plant_employees(self, employee_names):
names_with_titles = []
for name in employee_names:
job_title = self.enqueue_child_and_get_results(get_springfield_power_plant_job_title.s(name=name))['job_title']
names_with_titles.append(f"{job_title} {name}")
results = self.enqueue_child_and_get_results(amplified_greet_guests.s(guests=names_with_titles))
return results['amplified_greeting']
# A contrived example showing how CopyBogKeys can be used to copy values in between chained services, preventing
# data from being trampled when multiple services within a chain return values with the same name.
# Of course, in practice, it's preferable to enqueue the multiple calls to greet separately.
@app.task(bind=True, returns=['lee_greeting', 'tom_greeting'])
def greet_lee_and_tom(self: FireXTask) -> (str, str):
chain = (greet.s("Lee")
| CopyBogKeys.s({'greeting': 'lee_greeting'})
| greet.s("Tom"))
results = self.enqueue_child_and_get_results(chain)
# Lee's greeting has been copied to 'lee_greeting', so it's still accessible after Tom's greet has executed.
lee_greeting = results['lee_greeting']
# Tom's greeting still has the return name specified in the greet service.
tom_greeting = results['greeting']
return lee_greeting, tom_greeting