Programming Guide

Warning

It’s best to follow this guide while executing the examples in a FireX installation. Take a look at the Quick Start Guide to create and validate your own FireX installation. In particular, see how to run your own code to follow the guide interactively.

This guide serves programmers new to writing FireX services by discussing and demonstrating the most common parts of the FireX API to incrementally implement a simple set of services. Knowledge of the Python programming language is assumed. Understanding everything in this guide will leave readers in a strong position to comprehend and author FireX workflows.

If you are interested in immediately viewing and executing the code from this guide, take a look at the final result in example.py.

The commits at the top of the programming-guide branch include the code and tests for all sections of the guide.

A Trivial Service

View Example Code

The simplest FireX service is a python function with a single, FireX-specific decorator.

from getpass import getuser
from firexapp.engine.celery import app

@app.task()
def greet(name=getuser()):
    return 'Hello %s!' % name

FireX exposes all services via the CLI, making it already possible to invoke the new greet service and view the resulting graph in Flame:

firexapp submit --chain greet

View greet in Flame

With only this straightforward service definition, developers can leverage:

  • a CLI entry point

  • telemetry data exported via Kafka, including service inputs, outputs and passed/failed/running statuses for monitoring and alerting

  • workflow visualization via Flame, including call-tree hierarchy and timing breakdown

  • a multi-process and multi-host environment

  • a unique identifier (the FireX ID) per FireX invocation

  • log aggregation at various granularity, including per-service, per-host and run-wide logging

FireX strives to provide a non-intrusive API in the simplest case, expecting workflows will mostly be written in standard Python code then integrated via FireX. Read on to learn about using FireX to create more involved workflows.

Composing Services

Visualizing and reasoning about large, complex workflows is where FireX shines. This section introduces the APIs available to programatically invoke one service from another – the first step towards building involved workflows.

Blocking/synchronous Enqueue (Invoke)

View Example Code

A greet_guests service will be created to invoke greet multiple times and aggregate the results.

First, it is necessary to modify greet so that its result can be referred to by name from invoking services. The initial implementation was kept purposefully minimal; ordinarily all service definitions that return values give them names. Add returns=['greeting'] to give the return value the name greeting. Read more about returns.

@app.task(returns=['greeting'])
def greet(name=getuser()):
    return 'Hello %s!' % name

The greet_guests service can now be defined as:

@app.task(bind=True, returns=['guests_greeting'])
def greet_guests(self, guests):
    greetings = []
    for guest in guests:
        greet_signature = greet.s(name=guest)
        greet_results = self.enqueue_child_and_get_results(greet_signature)
        greetings.append(greet_results['greeting'])
    return ' '.join(greetings)

See diff with the previous code here.

Let’s go over the FireX APIs introduced by the new greet_guests service that aggregates results from its child greet services:

  • bind=True

  • Supplying bind=True to app.task makes the FireX Task instance self the first argument to the function definition. self provides access to data and functions made available by FireX. In this example, self is only used to enqueue (i.e. invoke) child services.

  • Read more about the uses of ‘self’ here.

  • greet.s(name=guest) (or more generally, <service_name>.s(<service arguments>))

  • Creates a Celery Signature, details can be read here.

  • Celery Signatures bind arguments to a service, and can then be enqueued to eventually be executed. Note that depending on how it is enqueued, the service might run immediately or need to wait for resources. It’s important to keep in mind creating the signature does not execute the service, but rather enqueuing the signature schedules the service for that signature to be executed.

  • self.enqueue_child_and_get_results(<celery signature>)

  • Schedules the supplied Celery Signature for immediate execution, waiting on and returning the results. The return value is a Python dict that contains the returns keys from the invoked service. In this example, the greet service defines its return value’s name to be greeting.

  • There are several ways to enqueue child services; read more here.

However, there is a detail to address before invoking greet_guests from the command-line. Since greet_guests takes a list of names and the command line receives a string, it is necessary to transform a string argument from the CLI value of guests in to a python list by using @InputConverter.register and @SingleArgDecorator.

@InputConverter.register
@SingleArgDecorator('guests')
def to_list(guests):
    return guests.split(',')

The new greet_guests service can now be executed:

firexapp submit --chain greet_guests --guests Li,Mohamed

View greet_guests in Flame.

Observe that --guests was automatically made available as a command-line argument since it is an argument to the greet_guests service. FireX also automatically generates a help for your service:

firexapp info greet_guests

You can augment the info with more details, like a description of the argument, by adding a docstring to your service.

While this example chose to schedule and block on child services, it’s also possible to schedule services asynchronously. Continue on to the next section for details.

Non-Blocking/Asynchronous Enqueue (Invoke)

View Example Code

In the previous example, the result from each greet was received before the next call to greet was performed; child services were executed sequentially. If greet were a more expensive service, it will be preferable to leverage FireX as a multi-process environment by invoking all child services in parallel and then waiting for all results to become available.

@app.task(bind=True, returns=['guests_greeting'])
def greet_guests(self, guests):
    child_promises = []
    for guest in guests:
        greet_signature = greet.s(name=guest)
        child_promise = self.enqueue_child(greet_signature)
        child_promises.append(child_promise)

    self.wait_for_children()
    greetings = [promise.result['greeting'] for promise in child_promises]
    return ' '.join(greetings)

See diff with the previous code here.

Take note of the FireX APIs used to achieve parallel execution of child services:

  • self.enqueue_child(<celery signature>)

  • Unlike enqueue_child_and_get_results, the enqueue_child method schedules the supplied signature for execution asynchronously and immediately returns the newly created child result promise. It is the caller’s responsibility to extract the return value from the promise after the caller knows the result is available.

  • There are several ways to enqueue child services; read more here.

  • self.wait_for_children

  • Blocks on the completion of all child services. Once this method has returned, it’s safe to inspect the result attribute of all child result promises to retrieve the return values of the executed service.

Dataflow via Chaining (the ‘|’ operator)

View Example Code

The preceding examples enqueue signatures from single services. It’s also possible to build a signature that composes multiple services and executes them as a unit.

Two new services will be created to demonstrate chaining: the outer service amplified_greet_guests will chain the existing greet_guests service with a new, trivial amplify service.

@app.task(returns=['amplified_message'])
def amplify(guests_greeting):
    return guests_greeting.upper()

A chain will be created to send the guests_greeting result of the greet_guests service along to the argument named guests_greeting of the amplify service, then return the result as amplified_greeting:

@app.task(bind=True, returns=['amplified_greeting'])
def amplified_greet_guests(self, guests):

    amplified_greet_guests_chain = greet_guests.s(guests=guests) | amplify.s()

    chain_results = self.enqueue_child_and_get_results(amplified_greet_guests_chain)
    return chain_results['amplified_message']

Warning

Chains are built from signatures, not service names, so don’t forget the .s(...)!

The chain operator | is used to combine two signatures in to a single chain. The greet_guests service will produce a result named guests_greeting, which is consumed as input by amplify. Notice that binding by names can lead to coupling; amplify doesn’t know about greet_guests, so why should amplify have an input argument named guests_greeting? The mapping from names present in the chain to argument names expected by a service can be reassigned, allowing amplify to have a more general input argument name, such as to_amplify.

@app.task(returns=['amplified_message'])
def amplify(to_amplify):
    return to_amplify.upper()

We can reassign the name received by amplify by changing its signature construction to amplify.s(to_amplify='@guests_greeting'), so that the chain becomes:

amplified_greet_guests_chain = greet_guests.s(guests=guests) | amplify.s(to_amplify='@guests_greeting')

View diff with previous section.

The amplified_greet_guests service can be executed:

firexapp submit --chain amplified_greet_guests --guests Li,Mohamed

View amplified_greet_guests in Flame.

With this example in mind, chaining can be discussed in more general terms. The input arguments flow from the first service to the next, with services later in the chain receiving inputs that may have been created or updated by return values produced by earlier services.

So, if you a have a chain:

A | B | C

then:

  • A input can use: all explicit argument values specified for A.

  • B input can use: all arguments values A can use + the return values of A + all explicit argument values of B.

  • C input can use: all arguments values B can use + the return values of B + all explicit argument values of C.

A data context can be created to make many arguments available to all services in a chain via the InjectArgs construct:

InjectArgs | A | B | C

InjectArgs is a pseudo-service that can be used to inject a dictionary of arguments/values at the head of a chain. It can be used only once and only at the head of the chain, not between services.

See an example that uses InjectArgs here

Note

Remember: if a service updates a value with an existing name, it will override the previous value for downstream services in the chain.

The Celery mechanics of chaining are described here.

Chaining is fundamentally a convenience for assembling involved workflows to have results available to downstream services. If any service in the chain fails, subsequent services will not be executed. The same outcome can be achieved by calling enqueue methods, extracting results, and making those results (as well as other required inputs) available to the next service in the would-be chain, then calling enqueue again. Complex workflows often have significant logic while constructing chains by conditionally assembling lower-level services.

Error Propagation

View Example Code

Thus far, no services have failed while executing; results from complete service executions have always been available. The greet leaf-node service will be modified so that it may fail, then calling services can observe failures and decide how to handle the them.

@app.task(returns=['greeting'], flame=['greeting'])
def greet(name=getpass.getuser()):
    assert len(name) > 1, "Cannot greet a name with 1 or fewer characters."
    return 'Hello %s!' % name

Now, when greet_guests calls self.wait_for_children() to wait for its greet children, wait_for_children may raise a ChainInterruptedException that is caused by the AssertionError from greet. Be aware that the ChainInterruptedException will be raised after all children have completed (i.e. either produced results or raised exceptions).

greet_guests can inspect the child task failures instead of automatically raising a ChainInterruptedException by invoking self.wait_for_children(raise_exception_on_failure=False):

@app.task(...)
def greet_guests(self, guests):
    ...

    self.wait_for_children(raise_exception_on_failure=False)
    greetings = [promise.result['greeting'] for promise in child_promises if promise.successful()]

    if any(promise.failed() for promise in child_promises):
        greetings.append("And apologies to those not mentioned.")

    return ' '.join(greetings)

See diff with the previous code here.

After setting raise_exception_on_failure=False, it’s no longer safe to immediately inspect the promise.result value as a dict. Instead, it’s necessary to distinguish between successful children that have a result dict and failed children that have result set to the exception that was raised for that child service.

The greet_guests service can be executed to purposefully make a greet service fail:

firexapp submit --chain greet_guests --guests Li,A

View greet_guests with a greet failure in Flame.

In general, calls that block on child results, such as self.enqueue_child_and_get_results and self.wait_for_children, will by default raise a ChainInterruptedException when the enqueued chain fails. Conversely, if a parent service enqueues a child asyncronously and that child fails, the parent service is not affected. It’s only when a parent is waiting on a child’s result that errors propagate and fail the parent (unless error handling, such as exception catching or raise_exception_on_failure=False, is used by the parent).

Subject Specific Guides

The simple services described in this guide should give readers an understanding of the most common FireX APIs. For more details on the topics touched upon here, refer to the subject specific guides:

If you have feedback on this guide or questions not addressed in any of the topic-specific guides, please open an issue.