Interoperable analysis pipelines

Naren Srinivasan

11/13/2018

Introduction

This vignette explains how interoperable pipelines containing functions operating on different engines such as R, Spark and Python can be configured and executed through the analysisPipelines package. Currently, the package supports interoperable pipelines containing R and Spark batch functions.

If the package is new to you, it is recommended that you go through the Analysis pipelines - Core functionality and working with R data frames and functions vignette first.

Important Note

Using Spark as an engine requires the SparkR package to be installed. SparkR is distributed natively with Apache Spark and is not distributed on CRAN.

An example of an interoperable pipeline

In this vignette we demonstrate an interoperable pipeline built using the analysisPipelines package, which contains a couple of filtering/ aggregation functions performed in Spark, which is then subsequently visualized through R functions using ggplot2

Initializing a Spark connection from R and loading the data

We initialize a Spark session using the sparkRSessionCreateIfNotPresent helper function in the analysisPipelines package, which internally uses SparkR. We then read the data into the Spark session using functions in the SparkR package. In this case we read a .csv file, though SparkR can work with multiple other data sources

## Define these variables as per the configuration of your machine. This is just an example.
sparkHome <- "/Users/naren/softwares/spark-2.3.1-bin-hadoop2.7/"
sparkMaster <- "local[1]"
sparkPackages <- c("org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1")

sparkRSessionCreateIfNotPresent(sparkHome = sparkHome, master = sparkMaster, sparkPackages = sparkPackages)

inputDataset <- iris

# Replacing '.' in column names with '_' as SparkR is not able to deal with '.' in column names
colnames(inputDataset) <- gsub(".", "_", colnames(inputDataset), fixed = T)

Initializing Python connection

## Define these variables as per the configuration of your machine. This is just an example.

analysisPipelines::setPythonEnvir('python', '/Users/naren/anaconda3/bin/python')
os <- reticulate::import("os")
numpy <- reticulate::import("numpy")
pandas <- reticulate::import("pandas")
sklearn <- reticulate::import("sklearn")

reticulate::source_python(system.file("python/sampleFunctions.py", package = "analysisPipelines"))

reticulate::py_config()

Creating an analysisPipeline object

We then initialize an AnalysisPipeline, with the input dataset

pipelineObj <- AnalysisPipeline(input = inputDataset)

Registering functions to work in the Spark environment

In order to manipulate the data in the Spark environment, we define our own functions using SparkR interface functions. We then register these functions with the AnalysisPipeline object, so that they can be used in constructing a pipeline.

The getRegistry function lists all the registered functions, along with details such as which engine they should run on.

getSchema <- function(inputDataset) {
  sparkSchema <- SparkR::schema(inputDataset)
 return(sparkSchema)
}

filterData <- function(inputDataset, condition) {
  filteredData <- SparkR::filter(inputDataset, condition)
 return(filteredData)
}

registerFunction(functionName = "getSchema", engine = "spark") 
registerFunction(functionName = "filterData", engine = "spark") 


getRegistry()

Registering R functions

Similar to the Spark functions, we register some user-defined functions in R. In this case to plot a bivariate plot using ggplot2.

rBivarPlots <- function(dataset, select_var_name_1, select_var_name_2, priColor = "blue", secColor= "black") {

  numeric_cols <- unlist(getDatatype(dataset)['numeric_cols'])
  cat_cols <- unlist(getDatatype(dataset)['cat_cols'])

  if (select_var_name_1 %in% numeric_cols && select_var_name_2 %in% numeric_cols) {
    x = dataset[, select_var_name_1]
    y = dataset[, select_var_name_2]
    bivarPlot <-
      ggplot2::ggplot(dataset, ggplot2::aes(x, y)) +
      ggplot2::geom_point(color = priColor, alpha = 0.7) +
      ggplot2::geom_smooth(method = lm, color = secColor) +
      ggplot2::xlab(select_var_name_1) +
      ggplot2::ylab(select_var_name_2) + ggplot2::theme_bw() +
      ggplot2::ggtitle(paste(
        'Bivariate plot for',
        select_var_name_1,
        'and',
        select_var_name_2,
        sep = ' '
      )) +
      ggplot2::theme(
        plot.title = ggplot2::element_text(hjust = 0.5, size = 10),
        axis.text = ggplot2::element_text(size = 10),
        axis.title = ggplot2::element_text(size = 10)
      )



  } else if (select_var_name_1 %in% cat_cols &&
             select_var_name_2 %in% cat_cols) {
    new_df <- dataset %>% dplyr::group_by_(.dots=c(select_var_name_1,select_var_name_2)) %>% dplyr::summarise(n = dplyr::n())
    colfunc <- grDevices::colorRampPalette(c(priColor, "white" , secColor))
    colorvar <- length(unique(new_df[[select_var_name_2]]))
    a=as.vector(as.character(unique(new_df[[select_var_name_1]])))
    y=new_df[[select_var_name_1]]
    label=new_df[[select_var_name_2]]
    bivarPlot <-ggplot2::ggplot(new_df, ggplot2::aes(x = y, y= n, fill = label)) +
      ggplot2::geom_bar(position = "dodge", stat = "identity",alpha=0.9) +
      ggplot2::guides(fill=ggplot2::guide_legend(title=select_var_name_2)) +
      ggplot2::coord_flip()+
      ggplot2::xlab(select_var_name_1) +
      ggplot2::ylab("count") + ggplot2::theme_bw() +
      ggplot2::ggtitle(paste('Bivariate plot for',select_var_name_1,'and',select_var_name_2,sep=' '))+
      ggplot2::theme(plot.title = ggplot2::element_text(hjust = 0.5, size = 10),axis.text = ggplot2::element_text(size=10),
                     axis.title=ggplot2::element_text(size=10),legend.position="bottom",axis.text.x=ggplot2::element_text(angle=45, hjust=1))+ ggplot2::scale_fill_manual(values = colfunc(colorvar))


  } else {
    cols <- c(select_var_name_1, select_var_name_2)
    cat_col <- cols[which(cols %in% cat_cols)]
    num_col <- cols[which(cols %in% numeric_cols)]
    a = as.vector(as.character(unique(dataset[[cat_col]])))
    y = dataset[[cat_col]]
    x = dataset[[num_col]]
    bivarPlot <-
      ggplot2::ggplot(dataset, ggplot2::aes(x = y, y = x)) +
      ggplot2::geom_point(color = priColor, alpha = 0.7) +
      ggplot2::coord_flip() +
      ggplot2::xlab(cat_col) +
      ggplot2::ylab(num_col) + ggplot2::theme_bw() +
      ggplot2::ggtitle(paste(
        'Bivariate plot for',
        select_var_name_1,
        'and',
        select_var_name_2,
        sep = ' '
      )) +
      ggplot2::theme(
        plot.title = ggplot2::element_text(hjust = 0.5, size = 10),
        axis.text = ggplot2::element_text(size = 10),
        axis.title = ggplot2::element_text(size = 10)
      )
  }

  return(bivarPlot)
}

registerFunction(functionName = "rBivarPlots", engine = "r", heading = "Bivariate analysis")

getRegistry()

Registering Python functions

registerFunction("decisionTreeTrainAndTest", engine = "python", isDataFunction = F, firstArgClass = "numpy.ndarray")
getRegistry()

Interoperable pipeline containing R, Spark and Python functions

We first visualize the data without filtering:

pipelineObj %>>% rBivarPlots(select_var_name_1 = "Sepal_Length", select_var_name_2 =  "Sepal_Width", 
                     priColor = "blue", secColor = "green", storeOutput = T) -> vizWithoutFilterPipeline
vizWithoutFilterPipeline %>>% getPipeline
vizWithoutFilterPipeline %>>% assessEngineSetUp
vizWithoutFilterPipeline %>>% generateOutput -> opWithoutFilter
opWithoutFilter %>>% getOutputById(1)

We then perform filtering on one of the variables in Spark, before visualizing in R

pipelineObj %>>% filterData_spark(condition = "Species == 'setosa'") %>>% 
  rBivarPlots(select_var_name_1 = "Sepal_Length", select_var_name_2 =  "Sepal_Width", 
                     priColor = "blue", secColor = "green", outAsIn = T, storeOutput = T) -> singleFilterPipeline
singleFilterPipeline %>>% visualizePipeline

singleFilterPipeline %>>% generateOutput -> opWithFilter
opWithFilter %>>% getOutputById(2)

Finally, we show a case, where sequential filtering steps are performed in Spark, before visualizing in R, and running a decision tree model in Python.

Note, that in this case, getTargetForPyClassifcation and getTargetForPyClassification have been registered as data functions. Type conversions between R, Spark and Python for data functions are performed automatically by the package.

pipelineObj %>>% filterData_spark(condition = "Species == 'setosa' or Species == 'virginica'") %>>% 
  filterData_spark(condition = "Petal_Length > 3.7", outAsIn = T) %>>%
  rBivarPlots(select_var_name_1 = "Sepal_Length", select_var_name_2 =  "Sepal_Width", 
                     priColor = "blue", secColor = "green", outAsIn = T, storeOutput = T) %>>%
  getFeaturesForPyClassification(dataset = ~f2, featureNames = c("Sepal_Length",
                                                                 "Sepal_Width",
                                                                 "Petal_Length")) %>>% 
          getTargetForPyClassification(dataset = ~f2, targetVarName = "Species", positiveClass = "setosa") %>>%
          decisionTreeTrainAndTest_py(data = ~f4, target = ~f5, newData = ~f4, storeOutput = T) -> twoFilterPipeline

twoFilterPipeline %>>% visualizePipeline

twoFilterPipeline %>>% generateOutput -> opWith2Filters
opWith2Filters %>>% getOutputById(3)
opWith2Filters %>>% getOutputById(6)

Supplementary Note

The analysisPipelines package internally uses the SparkR package to interface with Spark. SparkR masks many typical data manipulation and processing functions from base as well as packages like dplyr. Therefore, ensure you use function scoping when calling a function.