An introduction to meta-pipelines

Naren Srinivasan

11/19/2018

knitr::opts_chunk$set(
    eval = TRUE
  )
library(analysisPipelines)

Introduction

The meta-pipeline construct is one which allows users to export pipelines they have created for a particular use case to a general analysis flow which can be used for a different dataset and different set of parameters. A pipeline is one where the data can change, though retaining the same schema, and the same set of parameters for the functions. A meta-pipeline is one where only the analysis flow, function dependencies and so on are retained. The specific parameters for each of the functions can be set differently for a new use case.

The objective of a meta-pipeline is to define and execute reusable analysis flows. They can be used to:

Using meta-pipelines

Creating a meta-pipeline

Through this package, meta-pipelines can be created by exporting an already created pipeline to a meta-pipeline. The export retains the following items:

In the example below, we first create a pipeline, similar to the one described in the other vignettes.

pipeline <- AnalysisPipeline(input = iris)
getColor <- function(color){
  return(color)
}

getColumnName <-function(columnName){
  return(columnName)
}

registerFunction(functionName = "getColor", isDataFunction = F, firstArgClass = "character")
## INFO [2019-01-03 13:12:32] ||  Function 'getColor' was registered successfully  ||
registerFunction(functionName = "getColumnName", isDataFunction = F, firstArgClass = "character")
## INFO [2019-01-03 13:12:32] ||  Function 'getColumnName' was registered successfully  ||
getRegistry()
## # A tibble: 16 x 7
##    functionName heading engine exceptionHandli… userDefined isDataFunction
##    <chr>        <chr>   <chr>  <chr>            <lgl>       <lgl>         
##  1 univarCatDi… Univar… r      genericPipeline… FALSE       TRUE          
##  2 outlierPlot  Univar… r      genericPipeline… FALSE       TRUE          
##  3 multiVarOut… Multiv… r      genericPipeline… FALSE       TRUE          
##  4 ignoreCols   Ignore… r      genericPipeline… FALSE       TRUE          
##  5 getFeatures… ""      r      genericPipeline… FALSE       TRUE          
##  6 getTargetFo… ""      r      genericPipeline… FALSE       TRUE          
##  7 castKafkaSt… Cast K… spark… genericPipeline… FALSE       TRUE          
##  8 convertKafk… Conver… spark… genericPipeline… FALSE       TRUE          
##  9 bivariatePl… Bivari… r      genericPipeline… TRUE        TRUE          
## 10 filterData   ""      r      genericPipeline… TRUE        TRUE          
## 11 summarizeDa… ""      r      genericPipeline… TRUE        TRUE          
## 12 plotLine     ""      r      genericPipeline… TRUE        TRUE          
## 13 plotSummary  ""      r      genericPipeline… TRUE        TRUE          
## 14 sampleFunct… Sample  r      sampleException  TRUE        TRUE          
## 15 getColor     ""      r      genericPipeline… TRUE        FALSE         
## 16 getColumnNa… ""      r      genericPipeline… TRUE        FALSE         
## # ... with 1 more variable: firstArgClass <chr>

We then generate an output from the pipeline, just to validate that the pipeline works properly. Of course, to define a meta-pipeline generation of output is not required.

pipeline %>>% getColor(color = "blue") %>>% getColumnName(columnName = "Sepal.Length") %>>%
      univarCatDistPlots(uniCol = "Species", priColor = ~f1, optionalPlots = 0, storeOutput = T) %>>%
      outlierPlot(method = "iqr", columnName = ~f2, cutoffValue = 0.01, priColor = ~f1 , optionalPlots = 0) -> complexPipeline

complexPipeline %>>% getPipeline
## # A tibble: 4 x 6
##   id    operation      heading              parameters outAsIn storeOutput
##   <chr> <chr>          <chr>                <list>     <lgl>   <lgl>      
## 1 1     getColor       ""                   <list [1]> FALSE   FALSE      
## 2 2     getColumnName  ""                   <list [1]> FALSE   FALSE      
## 3 3     univarCatDist… Univariate Distribu… <list [4]> FALSE   TRUE       
## 4 4     outlierPlot    Univariate Outlier   <list [6]> FALSE   FALSE
complexPipeline %>>% prepExecution -> complexPipeline
## INFO [2019-01-03 13:12:32] ||  Pipeline Prep. STARTED  ||
## INFO [2019-01-03 13:12:32] ||  Pipeline Prep. COMPLETE. Time taken : 0.0202386379241943 seconds||
complexPipeline %>>% generateOutput -> op
## INFO [2019-01-03 13:12:32] ||  Engine Assessment for pipeline STARTED  ||
## INFO [2019-01-03 13:12:32] ||  Engine Assessment COMPLETE. Time taken : 0.0132045745849609 seconds||
## INFO [2019-01-03 13:12:32] ||  Pipeline Execution STARTED  ||
## INFO [2019-01-03 13:12:32] ||  Executing Batch Number : 1/2 containing functions 'getColor, getColumnName' ||
## INFO [2019-01-03 13:12:32] ||  Function ID '1' named 'getColor' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:32] ||  Function ID '1' named 'getColor' COMPLETED. Time taken : 0.00377798080444336 seconds  ||
## INFO [2019-01-03 13:12:32] ||  Function ID '2' named 'getColumnName' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:32] ||  Function ID '2' named 'getColumnName' COMPLETED. Time taken : 0.00416803359985352 seconds  ||
## INFO [2019-01-03 13:12:32] ||  Batch Number 1/2 COMPLETE. Time taken : 0.0216243267059326 seconds  ||
## INFO [2019-01-03 13:12:32] ||  Executing Batch Number : 2/2 containing functions 'univarCatDistPlots, outlierPlot' ||
## INFO [2019-01-03 13:12:32] ||  Cleared intermediate outputs which are not required  ||
## INFO [2019-01-03 13:12:32] ||  Function ID '3' named 'univarCatDistPlots' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:32] ||  Function ID '3' named 'univarCatDistPlots' COMPLETED. Time taken : 0.0113174915313721 seconds  ||
## INFO [2019-01-03 13:12:32] ||  Function ID '4' named 'outlierPlot' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:32] ||  Function ID '4' named 'outlierPlot' COMPLETED. Time taken : 0.01088547706604 seconds  ||
## INFO [2019-01-03 13:12:32] ||  Batch Number 2/2 COMPLETE. Time taken : 0.0379235744476318 seconds  ||
## INFO [2019-01-03 13:12:32] ||  Performing final garbage cleaning and collection of outputs  ||
## INFO [2019-01-03 13:12:32] ||  Pipeline Execution COMPLETE. Time taken : 0.0759339332580566 seconds||
op %>>% getOutputById("3")

Exporting and reusing for a different case

Once a pipeline has been created, be it a batch or a streaming pipeline, it can be exported using the exportAsMetaPipeline method. This returns an object of class MetaAnalysisPipeline which stores the required information.

The meta-pipeline can be visualized similar to a normal pipeline object by calling the visualizePipeline method on the MetaAnalysisPipeline object.

complexPipeline %>>% exportAsMetaPipeline -> complexMetaPipeline

# complexMetaPipeline %>>% visualizePipeline

Setting the new parameters

The next part of using the meta-pipeline is creating another pipeline with a different set of parameters. For this purpose, the user can first export the pipeline prototype which basically contains the set of functions used in the pipeline and their respective arguments.

The pipeline prototype is exported as an object of class proto from the ‘proto’ package, which is a thin skin over environments, with usability advantages such as using methods like names to get the names of objects contained in it, as well as using the ‘$’ operator to refer to specific objects. The aim of using this class is to provide an easy-to-use interface to set the new values of the arguments.

The pipeline prototype has a nested structure. The first level is a list of objects which represent the list of functions in the pipeline. A specific function can just be referred to through its name. The second level, is the list of arguments for each of those functions (again referred by the usual name).

The new values of the parameters can simply be set by using the ‘$’ operator to refer to the values. The exported pipeline prototype by default contains the values of the parameters defined in the original pipeline. Therefore, the user can simply change some of the values as required or for all of the parameters.

In the following example, we reconfigure the pipeline for use with the ‘iris’ dataset.

pipelineProto <- getPipelinePrototype(complexMetaPipeline)
str(pipelineProto)
## proto object 
##  $ outlierPlot       :proto object  
##  $ getColumnName     :proto object  
##  $ univarCatDistPlots:proto object  
##  $ getColor          :proto object
#Setting new parameters on ToothGrowth dataset
pipelineProto$getColor$color<- "green"
pipelineProto$getColumnName$columnName<- "len"
pipelineProto$univarCatDistPlots$uniCol <- "supp"

#complexMetaPipeline %>>% visualizePipeline

Execution

Now once the parameters have been set, a new pipeline object (which is executable) can be created by calling the createPipelineInstance method, and passing the meta-pipeline object and the pipeline prototype. This creates a pipeline object with the usual properties.

We set the input of the pipeline object to the iris dataset and then execute to generate the output.

complexMetaPipeline %>>% createPipelineInstance(pipelineProto) -> newPipelineObj

newPipelineObj %>>% setInput(input = ToothGrowth) -> newPipelineObj

newPipelineObj %>>% generateOutput %>>% getOutputById("3")
## INFO [2019-01-03 13:12:32] ||  Engine Assessment for pipeline STARTED  ||
## INFO [2019-01-03 13:12:32] ||  Engine Assessment COMPLETE. Time taken : 0.0137145519256592 seconds||
## INFO [2019-01-03 13:12:32] ||  Pipeline Prep. STARTED  ||
## INFO [2019-01-03 13:12:32] ||  Pipeline Prep. COMPLETE. Time taken : 0.00865006446838379 seconds||
## INFO [2019-01-03 13:12:32] ||  Pipeline Execution STARTED  ||
## INFO [2019-01-03 13:12:32] ||  Executing Batch Number : 1/2 containing functions 'getColor, getColumnName' ||
## INFO [2019-01-03 13:12:32] ||  Function ID '1' named 'getColor' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:32] ||  Function ID '1' named 'getColor' COMPLETED. Time taken : 0.00568485260009766 seconds  ||
## INFO [2019-01-03 13:12:32] ||  Function ID '2' named 'getColumnName' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:32] ||  Function ID '2' named 'getColumnName' COMPLETED. Time taken : 0.00609040260314941 seconds  ||
## INFO [2019-01-03 13:12:32] ||  Batch Number 1/2 COMPLETE. Time taken : 0.0257868766784668 seconds  ||
## INFO [2019-01-03 13:12:32] ||  Executing Batch Number : 2/2 containing functions 'univarCatDistPlots, outlierPlot' ||
## INFO [2019-01-03 13:12:32] ||  Cleared intermediate outputs which are not required  ||
## INFO [2019-01-03 13:12:32] ||  Function ID '3' named 'univarCatDistPlots' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:32] ||  Function ID '3' named 'univarCatDistPlots' COMPLETED. Time taken : 0.0120668411254883 seconds  ||
## INFO [2019-01-03 13:12:32] ||  Function ID '4' named 'outlierPlot' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:32] ||  Function ID '4' named 'outlierPlot' COMPLETED. Time taken : 0.0106847286224365 seconds  ||
## INFO [2019-01-03 13:12:32] ||  Batch Number 2/2 COMPLETE. Time taken : 0.0378713607788086 seconds  ||
## INFO [2019-01-03 13:12:32] ||  Performing final garbage cleaning and collection of outputs  ||
## INFO [2019-01-03 13:12:32] ||  Pipeline Execution COMPLETE. Time taken : 0.0797655582427979 seconds||

Saving and loading meta-pipelines

Similar to pipelines, meta-pipelines can be saved and loaded using the savePipeline method and the loadMetaPipeline function. As with pipelines, when a meta-pipeline is loaded, it overwrites the existing registry with the registry stored with the meta-pipeline.

complexMetaPipeline %>>% savePipeline("metaPipeline.RDS")

#Checking if registry is updated
getC <- function(color){
  return(color)
}
  
getCol <-function(columnName){
  return(columnName)
}

registerFunction(functionName = "getC", isDataFunction = F, firstArgClass = "character")
registerFunction(functionName = "getCol", isDataFunction = F, firstArgClass = "character")

getRegistry()
loadMetaPipeline(path = "metaPipeline.RDS") -> loadedMetaPipeline
getRegistry()

pipelineProtoLoaded <- getPipelinePrototype(loadedMetaPipeline)
str(pipelineProtoLoaded)

pipelineProtoLoaded$getColor$color<- "green"
pipelineProtoLoaded$getColumnName$columnName<- "Sepal.Length"
pipelineProtoLoaded$univarCatDistPlots$uniCol <- "Species"

loadedMetaPipeline %>>% createPipelineInstance(pipelineProtoLoaded) -> newPipelineObjLoaded

newPipelineObjLoaded %>>% setInput(input = iris) %>>%
                        generateOutput %>>% getOutputById("3")