A common task in data engineering is to automate, schedule, and
monitor multiple data processing pipelines. This is called
orchestration. maestro
is an R package
that helps orchestrate data pipelines.
A fully realized maestro
project involves the following
components and actions:
Collection of pipelines (R functions to be orchestrated, such as batch ETL jobs)
Orchestrator - an R script or Quarto doc that orchestrates the pipelines and monitors them
A process external to R to schedule the orchestrator (e.g., cron, Posit Connect).
Create a maestro
project in an existing project or a new
project using create_maestro()
or the New Project wizard in
RStudio. This creates the orchestrator script and the folder of
pipelines with one sample pipeline. Your project should look something
like this:
maestro_project
├── maestro_project.Rproj
├── orchestrator.R
└── pipelines
├── my_pipe.R
└── another_pipe.R
Pipelines are the jobs you want to automate, schedule, and monitor. For the most part, they’re regular R functions with a special sprinkling of comments.
A pipeline is simply an R function with decorators called maestro
tags. Maestro tags are special code comments used for communicating the
scheduling and configuration of a pipeline to the orchestrator. Let’s
take a quick look at the sample my_pipe.R
:
#' my_pipe maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-05-24
#' @maestroTz UTC
#' @maestroLogLevel INFO
my_pipe <- function() {
# Pipeline code
}
my_pipe
is a function with an empty body - so right now
it won’t do anything. The comments above are interpreted by
maestro
as “this function is scheduled to run every day
starting at 2024-05-24 (00:00:00) UTC time”.
maestroFrequency and maestroStartTime are the most important tags for scheduling. Frequency is how often you want the pipeline to run and can be formatted as a single string like hourly, daily, weekly, biweekly, etc. or with a number and a unit (e.g., 1 day, 3 hours, etc.).
Note that you don’t need to provide all these tags. A single maestro
tag is enough to distinguish it as a pipeline. Pipelines missing tags
will use consistent defaults (e.g., if maestroFrequency
is
missing the default is 1 day/daily).
In most use cases, the actual code inside of my_pipe
would be to run an ETL job (extract data from a source, transform it,
and load it into a file system or database). In technical terms, it’s
the side effect of the code and not its return value that is
important.
Here’s a more realistic, albeit impractical, example:
A project with a single pipeline is ok, but in maestro
is more useful when you have multiple jobs to run. You can add more
pipelines to your pipelines directory manually or use
create_pipeline()
:
The orchestrator is the process that schedules and monitors the pipelines.
The orchestrator can be an R script, Quarto/RMarkdown doc, but here
we’ll use a regular R script. Here is where you’ll run
maestro
functions. The two main functions are
build_schedule()
and run_schedule()
.
library(maestro)
schedule_table <- build_schedule()
output <- run_schedule(
schedule_table,
orch_frequency = "1 hour"
)
Building the schedule gets maestro
to look through the
pipelines in the pipelines folder and creates a data.frame with schedule
information. Then, you pass that data.frame to
run_schedule()
along with how often the orchestrator is
supposed to run. It is important to tell maestro
how often
it’ll be checking the pipelines using the orch_frequency
parameter. Here, we’re informing it that the orchestrator is running
every 1 hour.
Importantly, it isn’t maestro
’s job to actually run it
this often - it’s your job to make sure it runs at that
frequency (e.g., deploying it via cron or some cloud environment where
code can be scheduled).1
The decision around how often to run the orchestrator depends on the frequencies of the pipelines in the project. The simplest guideline is to take the frequency of your most often recurring pipeline and split that in half. So for example, if my most often running pipeline runs every 1 day then my orchestrator should run every 12 hours.↩︎