Analysis pipelines - Core functionality and working with R data frames and functions

Naren Srinivasan, Sanjay

11/13/2018

An overview of the package

In a typical data science workflow there are multiple steps involved; from data aggregation, cleaning, exploratory analysis, modeling and so on. As the data science community matures, we are seeing that there are a variety of languages which provide better capabilities for specific steps in the data science workflow. R is typically used for data transformations, statistical models, and visualizations, while Python provides more robust functions for machine learning. In addition to this, Spark provides an environment to process high volume data - both as one-time/ batches or as streams.

The job of today’s data scientist is changing from one where they are married to a specific tool or language, to one where they are using all these tools for their specialized purposes. The key problem then becomes one of translation between these tools for seamless analysis. Additionally, in the work of a data scientist, there is a need to perform the same task repeatedly, as well as put certain analysis flows (or) pipelines into production to work on new data periodically, or work on streaming data.

Recently, interfaces for using these various tools have been published. In terms of R packages, the reticulate package provides an interface to Python, and the SparkR and sparklyr packages provide an interface to Spark.

The analysisPipelines package uses these interfaces to enable Interoperable Pipelines i.e. the ability compose and execute a reusable data science pipeline which can contain functions to be executed in an R environment, in a Python environment or in a Spark environment. These pipelines can be saved and loaded, to enable batch operation as datasets get updated with new data.

The goal of the analysisPipelines package is to make the job of the data scientist easier and help them compose pipelines of analysis which consist of data manipulation, exploratory analysis & reporting, as well as modeling steps. The idea is for data scientists to use tools of their choice through an R interface, using this package Essentially, it allows data scientists to:

Types of pipelines

This package supports for both batch/ repeated pipelines, as well as streaming pipelines.

For batch pipelines, the vision is to enable interoperable pipelines which execute efficiently with functions in R, Spark and Python

For streaming pipelines, the package allows for streaming analyses through Apache Spark Structured Streaming.

Classes and implementation

The analysisPipelines package uses S4 classes and methods to implement all the core functionality. The fundamental class exposed in this package is the BaseAnalysisPipeline class on which most of the core functions are implemented. The user, however, interacts with the AnalysisPipeline and StreamingAnalysisPipeline classes for batch and streaming analysis respectively. In this vignette, we work with the AnalysisPipeline class, with functions solely in R.

Pipelining semantics

The package stays true to the tidyverse pipelining style which also fits nicely into the idea of creating pipelines. The core mechanism in the package is too instantiate a pipeline with data and then pipeline required functions to the object itself.

The package allows both the use of magrittr pipe (%>%) or the pipeR pipe (%>>%).

Supported engines

As of this version, the package supports functions executed on R, or Spark through the SparkR interface for batch pipelines. It also supports Spark Structured Streaming pipelines for streaming analyses. In subsequent releases, Python will also be supported

Available vignettes

This package contains 5 vignettes:

Usage

Loading the package

library(analysisPipelines)

Creating an analysisPipeline object

An object of class AnalysisPipeline can be created like so:

obj <- AnalysisPipeline(input = iris)

class(obj)
## [1] "AnalysisPipeline"
## attr(,"package")
## [1] "analysisPipelines"

While initializing the object, an input dataframe can be provided on which the pipeline should work, either by providing the filePath to a .csv file through the filePath argument, or by providing R dataframe available in the session, through the input argument

The AnalysisPipeline object has a set of getters, for retrieving various slots containing data and metadata required for pipeline execution. The most basic of them is the getInput method which retrieves the input dataframe with which the object has been initialized. If not initialized with a dataframe, the setInput method can be used to do so.

obj %>>% getInput %>>% str
## 'data.frame':    150 obs. of  5 variables:
##  $ Sepal.Length: num  5.1 4.9 4.7 4.6 5 5.4 4.6 5 4.4 4.9 ...
##  $ Sepal.Width : num  3.5 3 3.2 3.1 3.6 3.9 3.4 3.4 2.9 3.1 ...
##  $ Petal.Length: num  1.4 1.4 1.3 1.5 1.4 1.7 1.4 1.5 1.4 1.5 ...
##  $ Petal.Width : num  0.2 0.2 0.2 0.2 0.2 0.4 0.3 0.2 0.2 0.1 ...
##  $ Species     : Factor w/ 3 levels "setosa","versicolor",..: 1 1 1 1 1 1 1 1 1 1 ...
getRegistry()
## # A tibble: 8 x 7
##   functionName heading engine exceptionHandli… userDefined isDataFunction
##   <chr>        <chr>   <chr>  <chr>            <lgl>       <lgl>         
## 1 univarCatDi… Univar… r      genericPipeline… FALSE       TRUE          
## 2 outlierPlot  Univar… r      genericPipeline… FALSE       TRUE          
## 3 multiVarOut… Multiv… r      genericPipeline… FALSE       TRUE          
## 4 ignoreCols   Ignore… r      genericPipeline… FALSE       TRUE          
## 5 getFeatures… ""      r      genericPipeline… FALSE       TRUE          
## 6 getTargetFo… ""      r      genericPipeline… FALSE       TRUE          
## 7 castKafkaSt… Cast K… spark… genericPipeline… FALSE       TRUE          
## 8 convertKafk… Conver… spark… genericPipeline… FALSE       TRUE          
## # ... with 1 more variable: firstArgClass <chr>

The getRegistry function retrieves the set of functions and their metadata available for pipelining. Any AnalysisPipeline object comes with a set of pre-registered functions which can be used out-of-the-box. Of course, the user can register her own functions, to be used in the pipeline. We will look at this later on in the vignette.

There are two types of functions which can be pipelined:

Both pre-registered and user-defined functions work with the AnalysisPipeline object in the same way i.e. regardless of who writes the function, they follow the same semantics.

Creating a simple pipeline

We’ll now take a look at creating a simple pipeline, with some of the pre-registered functions available in the registry. We pipeline the univarCatDistPlots function (available as a pre-registered utility function,which generates a chart showing distribution of a categorical variable in a dataset), by simply using the pipe or double pipe operator, and providing the required additional parameters apart from the data on which it needs to operate, as we have already initialized the AnalysisPipeline object with the data.

Note that unless assigned to the same or another object, the pipeline does not get stored.

We can access the details of the pipeline as a tibble through the getPipeline method.

# Running univariate categorical distribution plot on the constructed object
# ?analysisPipelines::univarCatDistPlots
obj1 <- obj %>>% univarCatDistPlots(uniCol = "Species", priColor = "blue", optionalPlots = 0, storeOutput = T)
obj1 %>>% getPipeline
## # A tibble: 1 x 6
##   id    operation      heading              parameters outAsIn storeOutput
##   <chr> <chr>          <chr>                <list>     <lgl>   <lgl>      
## 1 1     univarCatDist… Univariate Distribu… <list [4]> FALSE   TRUE

We now construct a pipeline with a couple of functions, one to generate a categorical distribution plot, and another one for outliers.

# Running univariate categorical distribution plot and then 
# outlier detection on the constructed object

obj %>>% univarCatDistPlots(uniCol = "Species", priColor = "green", optionalPlots = 0) %>>% 
         outlierPlot(method = "iqr", columnName = "Sepal.Length", 
              cutoffValue = 0.01, priColor = "blue", optionalPlots = 0, storeOutput = T) -> obj2
obj2 %>>% getPipeline
## # A tibble: 2 x 6
##   id    operation      heading              parameters outAsIn storeOutput
##   <chr> <chr>          <chr>                <list>     <lgl>   <lgl>      
## 1 1     univarCatDist… Univariate Distribu… <list [4]> FALSE   FALSE      
## 2 2     outlierPlot    Univariate Outlier   <list [6]> FALSE   TRUE

Apart from the parameters required for the function, a couple of additional parameters can be passed to registered functions when adding them to the pipeline:

Lazy evaluation

It is important to note that the functions in the pipeline are not actually called when the pipeline is defined. This can be seen by looking at the output slot of the pipeline we defined above.

length(obj1@output)
## [1] 0

The pipeline can be executed by explicitly calling the generateOutput method and assigning it to a variable.

obj1Output <- obj1 %>>% generateOutput
## INFO [2019-01-03 13:12:28] ||  Engine Assessment for pipeline STARTED  ||
## INFO [2019-01-03 13:12:29] ||  Engine Assessment COMPLETE. Time taken : 1.28892827033997 seconds||
## INFO [2019-01-03 13:12:29] ||  Pipeline Prep. STARTED  ||
## INFO [2019-01-03 13:12:29] ||  Pipeline Prep. COMPLETE. Time taken : 0.0369842052459717 seconds||
## INFO [2019-01-03 13:12:29] ||  Pipeline Execution STARTED  ||
## INFO [2019-01-03 13:12:29] ||  Executing Batch Number : 1/1 containing functions 'univarCatDistPlots' ||
## INFO [2019-01-03 13:12:29] ||  Function ID '1' named 'univarCatDistPlots' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:29] ||  Function ID '1' named 'univarCatDistPlots' COMPLETED. Time taken : 0.141370058059692 seconds  ||
## INFO [2019-01-03 13:12:29] ||  Batch Number 1/1 COMPLETE. Time taken : 0.15303111076355 seconds  ||
## INFO [2019-01-03 13:12:29] ||  Performing final garbage cleaning and collection of outputs  ||
## INFO [2019-01-03 13:12:29] ||  Pipeline Execution COMPLETE. Time taken : 0.171310663223267 seconds||
length(obj1Output@output)
## [1] 1

Observe that the length of the output list of the first object (the chunk above) remains 0 since the generated output was assigned to a new object.

length(obj1@output)
## [1] 0

A specific output can be viewed by providing the ID of the function in the pipeline to the getOutputById method. The ID can be obtained from the id column in the pipeline tibble. corresponding to the function whose output is desired.

# The index can range from 1 to length(obj@output)
obj1Output %>>% getOutputById("1")

User-defined functions

Registering your own function

You can register your own data or non-data functions by calling registerFunction. This adds the user-defined function to the registry. The registry is maintained by the package and once registered, functions can be used across pipeline objects. The registry can be view by calling the getRegistry function.

# Currently registered functions
getRegistry()
## # A tibble: 8 x 7
##   functionName heading engine exceptionHandli… userDefined isDataFunction
##   <chr>        <chr>   <chr>  <chr>            <lgl>       <lgl>         
## 1 univarCatDi… Univar… r      genericPipeline… FALSE       TRUE          
## 2 outlierPlot  Univar… r      genericPipeline… FALSE       TRUE          
## 3 multiVarOut… Multiv… r      genericPipeline… FALSE       TRUE          
## 4 ignoreCols   Ignore… r      genericPipeline… FALSE       TRUE          
## 5 getFeatures… ""      r      genericPipeline… FALSE       TRUE          
## 6 getTargetFo… ""      r      genericPipeline… FALSE       TRUE          
## 7 castKafkaSt… Cast K… spark… genericPipeline… FALSE       TRUE          
## 8 convertKafk… Conver… spark… genericPipeline… FALSE       TRUE          
## # ... with 1 more variable: firstArgClass <chr>

In order to register a function, first the function must be defined in the Global environment, before calling registerFunction.

bivariatePlots <- function(dataset, select_var_name_1, select_var_name_2, 
                       priColor = "blue", secColor='black') {
  x=dataset[, select_var_name_1]
  y=dataset[, select_var_name_2]
  bivarPlot <- ggplot2::ggplot(dataset, ggplot2::aes(x,y)) +
    ggplot2::geom_point(color=priColor,alpha=0.7) +
    ggplot2::geom_smooth(method = lm,color=secColor) +
    ggplot2::xlab(select_var_name_1) +
    ggplot2::ylab(select_var_name_2) + 
    ggplot2::theme_bw() +
    ggplot2::ggtitle(paste('Bivariate plot for', select_var_name_1, 
                           'and', select_var_name_2, sep=' ')) +
    ggplot2::theme(plot.title = ggplot2::element_text(hjust = 0.5, size = 10), 
                   axis.text = ggplot2::element_text(size=10),
                   axis.title=ggplot2::element_text(size=10))
  return(bivarPlot)
}

registerFunction(functionName = "bivariatePlots", heading = "Bivariate Analysis")
## INFO [2019-01-03 13:12:30] ||  Function 'bivariatePlots' was registered successfully  ||

Adding the newly registered function to a pipeline

Now the newly registered user-defined function can be used as part of the pipeline, exactly as described before. For example, we add it to a pipeline which already contains some functions. The function then gets added to the end of the pipeline

# Chaining the user-defined function to the object's pipeline where it was registered
obj2 <- obj2 %>>% 
  bivariatePlots(select_var_name_1 = 'Sepal.Length', select_var_name_2 = 'Sepal.Width', 
                 priColor = "blue", secColor = "black")

# Printing the updated pipeline
obj2 %>>% getPipeline 
## # A tibble: 3 x 6
##   id    operation      heading              parameters outAsIn storeOutput
##   <chr> <chr>          <chr>                <list>     <lgl>   <lgl>      
## 1 1     univarCatDist… Univariate Distribu… <list [4]> FALSE   FALSE      
## 2 2     outlierPlot    Univariate Outlier   <list [6]> FALSE   TRUE       
## 3 3     bivariatePlots Bivariate Analysis   <list [5]> FALSE   FALSE

The newly added function will also lazy evaluate depending on the trigger.

obj2Output <- obj2 %>>% generateOutput()
## INFO [2019-01-03 13:12:30] ||  Engine Assessment for pipeline STARTED  ||
## INFO [2019-01-03 13:12:30] ||  Engine Assessment COMPLETE. Time taken : 0.0128231048583984 seconds||
## INFO [2019-01-03 13:12:30] ||  Pipeline Prep. STARTED  ||
## INFO [2019-01-03 13:12:30] ||  Pipeline Prep. COMPLETE. Time taken : 0.0187497138977051 seconds||
## INFO [2019-01-03 13:12:30] ||  Pipeline Execution STARTED  ||
## INFO [2019-01-03 13:12:30] ||  Executing Batch Number : 1/1 containing functions 'univarCatDistPlots, outlierPlot, bivariatePlots' ||
## INFO [2019-01-03 13:12:30] ||  Function ID '1' named 'univarCatDistPlots' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:30] ||  Function ID '1' named 'univarCatDistPlots' COMPLETED. Time taken : 0.0697989463806152 seconds  ||
## INFO [2019-01-03 13:12:30] ||  Function ID '2' named 'outlierPlot' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:30] ||  Function ID '2' named 'outlierPlot' COMPLETED. Time taken : 0.0136368274688721 seconds  ||
## INFO [2019-01-03 13:12:30] ||  Function ID '3' named 'bivariatePlots' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:30] ||  Function ID '3' named 'bivariatePlots' COMPLETED. Time taken : 0.0213360786437988 seconds  ||
## INFO [2019-01-03 13:12:30] ||  Batch Number 1/1 COMPLETE. Time taken : 0.122961521148682 seconds  ||
## INFO [2019-01-03 13:12:30] ||  Performing final garbage cleaning and collection of outputs  ||
## INFO [2019-01-03 13:12:30] ||  Pipeline Execution COMPLETE. Time taken : 0.137528419494629 seconds||
obj2Output %>>% getOutputById("1")
## $call
## # A tibble: 1 x 7
##   id    operation   heading    parameters outAsIn storeOutput dependencies
##   <chr> <chr>       <chr>      <list>     <lgl>   <lgl>       <list>      
## 1 1     univarCatD… Univariat… <list [4]> FALSE   FALSE       <chr [0]>   
## 
## $output
## [1] "The output of this function was not configured to be stored"
obj2Output %>>% getPipeline()
## # A tibble: 3 x 7
##   id    operation   heading    parameters outAsIn storeOutput dependencies
##   <chr> <chr>       <chr>      <list>     <lgl>   <lgl>       <list>      
## 1 1     univarCatD… Univariat… <list [4]> FALSE   FALSE       <chr [0]>   
## 2 2     outlierPlot Univariat… <list [6]> FALSE   TRUE        <chr [0]>   
## 3 3     bivariateP… Bivariate… <list [5]> FALSE   FALSE       <chr [0]>

Complex pipelines and formula semantics

In addition to the simple pipelines described above, more complex pipelines can also be defined. There are cases when the outputs of previous functions in the pipeline, as inputs to arbitrary parameters of subsequent functions. This is apart from the data which is passed through in a more native manner through the outAsIn argument to subsequent functions in the pipeline.

Using formulae in pipelines

The package defines certain formula semantics to accomplish this. We take the example of two simple user-defined functions, both which simply return the color of the graph, as well as the column on which the graph should be plotted, in order to illustrate how this works.

getColor <- function(color){
  return(color)
}

getColumnName <-function(columnName){
  return(columnName)
}

registerFunction(functionName = "getColor", isDataFunction = F, firstArgClass = "character")
## INFO [2019-01-03 13:12:30] ||  Function 'getColor' was registered successfully  ||
registerFunction(functionName = "getColumnName", isDataFunction = F, firstArgClass = "character")
## INFO [2019-01-03 13:12:30] ||  Function 'getColumnName' was registered successfully  ||
getRegistry()
## # A tibble: 11 x 7
##    functionName heading engine exceptionHandli… userDefined isDataFunction
##    <chr>        <chr>   <chr>  <chr>            <lgl>       <lgl>         
##  1 univarCatDi… Univar… r      genericPipeline… FALSE       TRUE          
##  2 outlierPlot  Univar… r      genericPipeline… FALSE       TRUE          
##  3 multiVarOut… Multiv… r      genericPipeline… FALSE       TRUE          
##  4 ignoreCols   Ignore… r      genericPipeline… FALSE       TRUE          
##  5 getFeatures… ""      r      genericPipeline… FALSE       TRUE          
##  6 getTargetFo… ""      r      genericPipeline… FALSE       TRUE          
##  7 castKafkaSt… Cast K… spark… genericPipeline… FALSE       TRUE          
##  8 convertKafk… Conver… spark… genericPipeline… FALSE       TRUE          
##  9 bivariatePl… Bivari… r      genericPipeline… TRUE        TRUE          
## 10 getColor     ""      r      genericPipeline… TRUE        FALSE         
## 11 getColumnNa… ""      r      genericPipeline… TRUE        FALSE         
## # ... with 1 more variable: firstArgClass <chr>

In the following pipeline, we pass the colour and column name we want to the non-data functions we defined, and use the output of those in subsequent functions. This can be done simply by specifying a formula of the form ‘fid’ against the argument to which we want to pass the output. The ID represents the ID of the function in the pipeline. For example, to pass the output of function with ID ‘1’ as an argument to a parameter of a subsequent function, the formula ‘~f1’ is passed to that corresponding argument.

obj %>>% getColor(color = "blue") %>>% getColumnName(columnName = "Petal.Length") %>>%
      univarCatDistPlots(uniCol = "Species", priColor = ~f1, optionalPlots = 0, storeOutput = T) %>>%
      outlierPlot(method = "iqr", columnName = ~f2, cutoffValue = 0.01, priColor = ~f1 , optionalPlots = 0) -> complexPipeline

complexPipeline %>>% getPipeline
## # A tibble: 4 x 6
##   id    operation      heading              parameters outAsIn storeOutput
##   <chr> <chr>          <chr>                <list>     <lgl>   <lgl>      
## 1 1     getColor       ""                   <list [1]> FALSE   FALSE      
## 2 2     getColumnName  ""                   <list [1]> FALSE   FALSE      
## 3 3     univarCatDist… Univariate Distribu… <list [4]> FALSE   TRUE       
## 4 4     outlierPlot    Univariate Outlier   <list [6]> FALSE   FALSE
complexPipeline %>>% generateOutput -> op
## INFO [2019-01-03 13:12:30] ||  Engine Assessment for pipeline STARTED  ||
## INFO [2019-01-03 13:12:30] ||  Engine Assessment COMPLETE. Time taken : 0.012681245803833 seconds||
## INFO [2019-01-03 13:12:30] ||  Pipeline Prep. STARTED  ||
## INFO [2019-01-03 13:12:30] ||  Pipeline Prep. COMPLETE. Time taken : 0.0110037326812744 seconds||
## INFO [2019-01-03 13:12:30] ||  Pipeline Execution STARTED  ||
## INFO [2019-01-03 13:12:30] ||  Executing Batch Number : 1/2 containing functions 'getColor, getColumnName' ||
## INFO [2019-01-03 13:12:30] ||  Function ID '1' named 'getColor' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:30] ||  Function ID '1' named 'getColor' COMPLETED. Time taken : 0.0036170482635498 seconds  ||
## INFO [2019-01-03 13:12:30] ||  Function ID '2' named 'getColumnName' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:30] ||  Function ID '2' named 'getColumnName' COMPLETED. Time taken : 0.00403928756713867 seconds  ||
## INFO [2019-01-03 13:12:30] ||  Batch Number 1/2 COMPLETE. Time taken : 0.0178816318511963 seconds  ||
## INFO [2019-01-03 13:12:30] ||  Executing Batch Number : 2/2 containing functions 'univarCatDistPlots, outlierPlot' ||
## INFO [2019-01-03 13:12:30] ||  Cleared intermediate outputs which are not required  ||
## INFO [2019-01-03 13:12:30] ||  Function ID '3' named 'univarCatDistPlots' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:30] ||  Function ID '3' named 'univarCatDistPlots' COMPLETED. Time taken : 0.0115842819213867 seconds  ||
## INFO [2019-01-03 13:12:30] ||  Function ID '4' named 'outlierPlot' STARTED on the 'r' engine ||
## INFO [2019-01-03 13:12:30] ||  Function ID '4' named 'outlierPlot' COMPLETED. Time taken : 0.0538463592529297 seconds  ||
## INFO [2019-01-03 13:12:30] ||  Batch Number 2/2 COMPLETE. Time taken : 0.0800571441650391 seconds  ||
## INFO [2019-01-03 13:12:30] ||  Performing final garbage cleaning and collection of outputs  ||
## INFO [2019-01-03 13:12:30] ||  Pipeline Execution COMPLETE. Time taken : 0.113564252853394 seconds||
op %>>% getOutputById("3")

Differences between outAsIn and formulae - When to use what

The package provides 2 mechanisms to pass outputs from previous functions to subsequent ones. The first one is the outAsIn parameter. This is limited only to transformations on data that the pipeline is instantiated with which need to be passed on and is limited to the output of the immediate previous function when defining the pipeline. This is provided as an easy-to-use intuitive interface for the common use case of performing a set of sequential data transformations on the input data before performing some kind of analysis. Therefore, this should be sufficient for simple, or linear pipelines.

Formula semantics are provided to implement more complex pipelines, and are not limited to data parameters. Any type of object which is an output of a previous function can be used in a subsequent function. The typical use of these formulae are to provide parameters to certain functions in the pipeline, which are the result of previous functions. Formula semantics can be used for the data parameter of data functions as well. This uses the output of the function specified, instead of the input data the object was instantiated with.

In essence, you can implement any kind of complex pipeline with formula semantics. The outAsIn parameter is provided as an easy-to-use shortcut for simpler linear pipelines. When a formula is specified for the first argument, the outAsIn parameter is rendered irrelevant.

exampleObj <- AnalysisPipeline(input = iris)

filterData <- function(dataset, conditionVar, val){
  condition <- paste0(conditionVar, '== "', val, '"')
  dataSub <- dataset %>>% dplyr::filter_(condition)
  return(dataSub)
}

summarizeData <- function(dataset, conditionVar){
  dataGrouped <-  dataset %>>% dplyr::group_by_(conditionVar) %>>% dplyr::summarise_all(dplyr::funs(avg = mean))
  return(dataGrouped)
}

plotLine <- function(dataset, y, x){
    p <- ggplot2::ggplot(data = dataset, ggplot2::aes_string(y = y, x = x)) + 
         ggplot2::geom_line(color = "blue")
  return(p)
}

plotSummary <- function(d, summaryVar, groupingVar){
  p <- ggplot2::ggplot(data = d, ggplot2::aes_string(y = paste0(summaryVar, "_avg"), x = groupingVar)) + 
         ggplot2::geom_col(fill = "blue")
  return(p)
}

registerFunction("filterData")
## INFO [2019-01-03 13:12:31] ||  Function 'filterData' was registered successfully  ||
registerFunction("summarizeData")
## INFO [2019-01-03 13:12:31] ||  Function 'summarizeData' was registered successfully  ||
registerFunction("plotLine")
## INFO [2019-01-03 13:12:31] ||  Function 'plotLine' was registered successfully  ||
registerFunction("plotSummary")
## INFO [2019-01-03 13:12:31] ||  Function 'plotSummary' was registered successfully  ||
exampleObj %>>%  summarizeData(conditionVar = "Species") %>>% 
                filterData(conditionVar = "Species", val = "setosa", outAsIn = F, storeOutput = F) %>>%
                plotLine(y = "Sepal.Length", x = "Sepal.Width", outAsIn = T, storeOutput = T) %>>%
                plotSummary(d = ~f1, summaryVar = "Sepal.Length", groupingVar = "Species", storeOutput = T) ->
  exampleObj

#exampleObj %>>% visualizePipeline
#exampleObj %>>% generateOutput %>>% getOutputById("4")

Visualizing pipelines

These pipelines can be visualized by calling the visualizePipeline method. This generates the whole pipeline as a network, showing the engines on which each function is run, and which outputs are stored.

complexPipeline %>>% visualizePipeline

Generating reports

The generated output can easily be rendered into a beautiful, formatted report by calling the generateReport method. The report is a html file which contains the stored outputs, as well as a peek into the data and the pipeline that was executed.

obj2Output %>>% generateReport(path = '~/Desktop')
objRep <- obj %>>% bivariatePlots(select_var_name_1 = 'Petal.Length',select_var_name_2 =  'Petal.Width', 
                           priColor = "blue", secColor = "black")

objRep %>>% generateReport(path = '~/Desktop')

Saving and loading pipelines

Pipelines can also be saved and loaded for re-use and execution on new data. The pipeline can be saved by calling the savePipeline function. On save, both the pipeline and the existing state of the registry is saved (including both predefined as well as user-defined functions).

obj2 %>>% savePipeline(path = 'pipeline.RDS')

This pipeline can be loaded and while loading instantiated with a new dataset of the same schema as that which was used in the original pipeline (in order to prevent errors). This can be used for a variety of purposes such as:

library(analysisPipelines)
obj3 <- loadPipeline(path = 'pipeline.RDS', input = iris) 
getRegistry()
obj2 %>% getPipeline

Details of execution

This section provides an overview of how pipelines are executed, along with additional features such as logging and exception handling.

Execution mechanism

The ‘analysisPipelines’ package internally converts the pipeline defined by the user into a directed graph which captures dependencies of each function in the pipeline on data, other arguments as well as outputs as other functions.

Topological sort and ordering

When it is required to generate the output, the pipeline is first prepped by performing a topological sort of the directed graph, and identifying sets (or) batches of independent functions and a sequence of batches for execution. A later release of the package will allow for parallel execution of these independent functions

Memory management & garbage cleaning

Memory is managed efficiently, by only storing outputs which the user has explicitly specified, or temporarily storing intermediate outputs required for subsequent functions only until they are required for processing. Garbage cleaning is performed after the execution of each batch in order to manage memory effectively.

Type conversions

In the case of Interoperable pipelines executing across multiple engines such as R, Spark and Python, type conversions between data types in the different engines is minimized by identifying the optimal number of type conversions, before execution starts

Logging & Execution times

The package provides logging capabilities for execution of pipelines, as you might have noted when the output was generated in sections above. By default, logs are written to the console, but alternatively the user can specify an output file to which the logs need to be written through the setLoggerDetails function.

obj2 %>>% setLoggerDetails(target = "file") -> obj2

Logs capture errors, as well as provide information on the steps being performed, execution times and so on.

Custom exception-handling

By default, when a function is registered, a generic exception handling function which captures the R error message, in case of error is registered against each function in the registry. The user can define a custom exception handling function, by defining it and providing it during the time of registration. The function should take 1 argument, which is the error object.

sampleFunction <- function(text){
  return(text)
}

sampleException <- function(error){
  stop("There was an error with the provided input")
}

registerFunction(functionName = 'sampleFunction', "Sample", exceptionFunction = "sampleException")
## INFO [2019-01-03 13:12:31] ||  Function 'sampleFunction' was registered successfully  ||
getRegistry()
## # A tibble: 16 x 7
##    functionName heading engine exceptionHandli… userDefined isDataFunction
##    <chr>        <chr>   <chr>  <chr>            <lgl>       <lgl>         
##  1 univarCatDi… Univar… r      genericPipeline… FALSE       TRUE          
##  2 outlierPlot  Univar… r      genericPipeline… FALSE       TRUE          
##  3 multiVarOut… Multiv… r      genericPipeline… FALSE       TRUE          
##  4 ignoreCols   Ignore… r      genericPipeline… FALSE       TRUE          
##  5 getFeatures… ""      r      genericPipeline… FALSE       TRUE          
##  6 getTargetFo… ""      r      genericPipeline… FALSE       TRUE          
##  7 castKafkaSt… Cast K… spark… genericPipeline… FALSE       TRUE          
##  8 convertKafk… Conver… spark… genericPipeline… FALSE       TRUE          
##  9 bivariatePl… Bivari… r      genericPipeline… TRUE        TRUE          
## 10 getColor     ""      r      genericPipeline… TRUE        FALSE         
## 11 getColumnNa… ""      r      genericPipeline… TRUE        FALSE         
## 12 filterData   ""      r      genericPipeline… TRUE        TRUE          
## 13 summarizeDa… ""      r      genericPipeline… TRUE        TRUE          
## 14 plotLine     ""      r      genericPipeline… TRUE        TRUE          
## 15 plotSummary  ""      r      genericPipeline… TRUE        TRUE          
## 16 sampleFunct… Sample  r      sampleException  TRUE        TRUE          
## # ... with 1 more variable: firstArgClass <chr>

Saving and loading registries

Registries can be saved and loaded like pipelines. The characteristic of the registry is that it is shared across all pipelines in a particular session. Therefore, when a registry is loaded it overwrites the existing registry in the session

getRegistry()
saveRegistry("./registry.RDS")
##.rs.restartR() # Run this on console to restart the R session
rm(list=ls(all=TRUE)) # Remove all objects in the R environment
getRegistry()

loadRegistry("./registry.RDS")
getRegistry()