A framework for pluggable business logic components.
The package can be installed by adding opus to your list of dependencies in mix.exs:
def deps do [{:opus, "~> 0.8"}] end- Each Opus pipeline module has a single entry point and returns tagged tuples
{:ok, value} | {:error, error} - A pipeline is a composition of stateless stages
- A stage returning
{:error, _}halts the pipeline - A stage may be skipped based on a condition function (
:ifand:unlessoptions) - Exceptions are converted to
{:error, error}tuples by default - An exception may be left to raise using the
:raiseoption - Each stage of the pipeline is instrumented. Metrics are captured automatically (but can be disabled).
- Errors are meaningful and predictable
defmodule ArithmeticPipeline do use Opus.Pipeline step :add_one, with: &(&1 + 1) check :even?, with: &(rem(&1, 2) == 0), error_message: :expected_an_even tee :publish_number, if: &Publisher.publishable?/1, raise: [ExternalError] step :double, if: :lucky_number? step :divide, unless: :lucky_number? step :randomize, with: &(&1 * :rand.uniform) link JSONPipeline def double(n), do: n * 2 def divide(n), do: n / 2 def lucky_number?(n) when n in 42..1337, do: true def lucky_number?(_), do: false end ArithmeticPipeline.call(41) # {:ok, 84.13436750126804}Read this blogpost to get started.
The core aspect of this library is defining pipeline modules. As in the example above you need to add use Opus.Pipeline to turn a module into a pipeline. A pipeline module is a composition of stages executed in sequence.
There are a few different types of stages for different use-cases. All stage functions, expect a single argument which is provided either from initial call/1 of the pipeline module or the return value of the previous stage.
An error value is either :error or {:error, any} and anything else is considered a success value.
This stage processes the input value and with a success value the next stage is called with that value. With an error value the pipeline is halted and an {:error, any} is returned.
This stage is intended for validations.
This stage calls the stage function and unless it returns true it halts the pipeline.
Example:
defmodule CreateUserPipeline do use Opus.Pipeline check :valid_params?, with: &match?(%{email: email} when is_bitstring(email), &1) # other stages to actually create the user endThis stage is intended for side effects, such as a notification or a call to an external system where the return value is not meaningful. It never halts the pipeline.
This stage is to link with another Opus.Pipeline module. It calls call/1 for the provided module. If the module is not an Opus.Pipeline it is ignored.
The skip macro can be used for linked pipelines. A linked pipeline may act as a true bypass, based on a condition, expressed as either :if or :unless. When skipped, none of the stages are executed and it returns the input, to be used by any next stages of the caller pipeline. A very common use-case is illustrated in the following example:
defmodule RetrieveCustomerInformation do use Opus.Pipeline check :valid_query? link FetchFromCache, if: :cacheable? link FetchFromDatabase, if: :db_backed? step :serialize endWith skip it can be written as:
defmodule RetrieveCustomerInformation do use Opus.Pipeline check :valid_query? link FetchFromCache link FetchFromDatabase step :serialize endA linked pipeline becomes:
defmodule FetchFromCache do use Opus.Pipeline skip :assert_suitable, if: :cacheable? step :retrieve_from_cache endThe behaviour of each stage can be configured with any of the available options:
:with: The function to call to fulfill this stage. It can be an Atom referring to a public function of the module, an anonymous function or a function reference.:if: Makes a stage conditional, it can be either an Atom referring to a public function of the module, an anonymous function or a function reference. For the stage to be executed, the condition must returntrue. When the stage is skipped, the input is forwarded to the next step if there's one.:unless: The opposite of the:ifoption, executes the step only when the callback function returnsfalse.:raise: A list of exceptions to not rescue. Defaults tofalsewhich converts all exceptions to{:error, %Opus.PipelineError{}}values halting the pipeline.:error_message: An error message to replace the original error when a stage fails. It can be a String or Atom, which will be used directly in place of the original message, or an anonymous function, which receives the input of the failed stage and must return the error message to be used.:retry_times: How many times to retry a failing stage, before halting the pipeline.:retry_backoff: A backoff function to provide delay values for retries. It can be an Atom referring to a public function in the module, an anonymous function or a function reference. It must return anEnumerable.tyielding at least as many numbers as theretry_times.:instrument?: A boolean which defaults totrue. Set tofalseto skip instrumentation for a stage.
defmodule ExternalApiPipeline do use Opus.Pipeline step :http_request, retry_times: 8, retry_backoff: fn -> linear_backoff(10, 30) |> cap(100) end def http_request(_input) do # code for the actual request end endThe above module, will retry be retried up to 8 times, each time applying a delay from the next value of the retry_backoff function, which returns a Stream.
All the functions from the :retry package will be available to be used in retry_backoff.
You can select the stages of a pipeline to run using call/2 with the :except and :only options.
Example:
# Runs only the stage with the :validate_params name CreateUserPipeline.call(params, only: [:validate_params] # Runs all the stages except the selected ones CreateUserPipeline.call(params, except: :send_notification) Instrumentation hooks which can be defined:
:pipeline_started: Called before a pipeline module is called:before_stage: Called before each stage:stage_skipped: Called when a conditional stage was skipped:stage_completed: Called after each stage:pipeline_completed: Called after pipeline module has returned
You can disable all instrumentation callbacks for a stage using instrument?: false.
defmodule ArithmeticPipeline do use Opus.Pipeline step :double, instrument?: false endYou can define module specific instrumentation callbacks using:
defmodule ArithmeticPipeline do use Opus.Pipeline step :double, with: &(&1 * 2) step :triple, with: &(&1 * 3) instrument :before_stage, fn %{input: input} -> IO.inspect input end # Will be called only for the matching stage instrument :stage_completed, %{stage: %{name: :triple}}, fn %{time: time} -> # send to the monitoring tool of your choice end endYou can define a default instrumentation module for all your pipelines by adding in your config/*.exs:
config :opus, :instrumentation, YourModule # but you may choose to provide a list of modules config :opus, :instrumentation, [YourModuleA, YourModuleB]An instrumentation module has to export instrument/3 functions like:
defmodule CustomInstrumentation do def instrument(:pipeline_started, %{pipeline: ArithmeticPipeline}, %{input: input}) do # publish the metrics to specific backend end def instrument(:before_stage, %{stage: %{pipeline: pipeline}}, %{input: input}) do # publish the metrics to specific backend end def instrument(:stage_completed, %{stage: %{pipeline: ArithmeticPipeline}}, %{time: time}) do # publish the metrics to specific backend end def instrument(:pipeline_completed, %{pipeline: ArithmeticPipeline}, %{result: result, time: total_time}) do # publish the metrics to specific backend end def instrument(_, _, _), do: nil endOpus includes an instrumentation module which emits events using the :telemetry library.
To enable it, change your config/config.exs with:
config :opus, :instrumentation, [Opus.Telemetry]Browse the available events here.
For instructions to integrate Opus Telemetry metrics in your Phoenix application, read this post.
You may choose to provide some common options to all the stages of a pipeline.
:raise: A list of exceptions to not rescue. When set totrue, Opus does not handle any exceptions. Defaults tofalsewhich converts all exceptions to{:error, %Opus.PipelineError{}}values halting the pipeline.:instrument?: A boolean which defaults totrue. Set tofalseto skip instrumentation for a module.
defmodule ArithmeticPipeline do use Opus.Pipeline, instrument?: false, raise: true # The pipeline opts will disable instrumentation for this module # and will not rescue exceptions from any of the stages step :double, with: &(&1 * 2) step :triple, with: &(&1 * 3) endYou may visualise your pipelines using Opus.Graph:
Opus.Graph.generate(:your_app) # => {:ok, "Graph file has been written to your_app_opus_graph.png"}❗ This feature requires the opus_graph package to be installed, add it in your mix.exs.
defp deps do {:opus_graph, "~> 0.1", only: [:dev]} endFirst make sure to add graphvix to your dependencies:
# in mix.exs defp deps do [ {:opus, "~> 0.5"}, {:graphvix, "~> 0.5", only: [:dev]} ] endThis feature uses graphviz, so make sure to have it installed. To install it:
# MacOS brew install graphviz# Debian / Ubuntu apt-get install graphvizOpus.Graph is in fact a pipeline and its visualisation is:
You can customise the visualisation:
Opus.Graph.generate(:your_app, %{filetype: :svg}) # => {:ok, "Graph file has been written to your_app_opus_graph.svg"}Read the available visualisation options here.
- Quiqup Engineering - How to Create Beautiful Pipelines with Opus
- Pagerduty - How I Centralized our Scattered Business Logic Into One Clear Pipeline for our Elixir Webhook Service
- A Slack bookmarking application in Elixir with Opus
- Opus Telemetry
Using Opus in your company / project?
Let us know by submitting an issue describing how you use it.
Copyright (c) 2018 Dimitris Zorbas, MIT License. See LICENSE.txt for further details.

