Streaming Analysis Pipelines for working with Apache Spark Structured Streaming

Naren Srinivasan, Anoop S

9/11/2018

Introduction

The vignette aims to show examples of using SparkR as an interface to run streaming Spark jobs through R - using the analysisPipelines package. The major use case is that of implementing a pipeline using SparkR dataframes for streaming data.

Important Note

Using Spark as an engine requires the SparkR package to be installed. SparkR is distributed natively with Apache Spark and is not distributed on CRAN. The SparkR version needs to directly map to the Spark version (hence the native distribution), and care needs to be taken to ensure that this is configured properly.

To install from Github, run the following command, if you know the Spark version:

devtools::install_github('apache/spark@v2.x.x', subdir='R/pkg')

The other option is to install SparkR by running the following terminal commands if Spark has already been installed.

$ export SPARK_HOME=/path/to/spark/directory
$ cd $SPARK_HOME/R/lib/SparkR/
$ R -e "devtools::install('.')"

Initialize libraries

knitr::opts_chunk$set(
    eval = FALSE
  )

Connect to Spark cluster

sparkRSessionCreateIfNotPresent(master = sparkMaster, sparkPackages = sparkPackages)

Streaming Analysis Pipelines using Apache Spark Structured Streaming

This example illustrates usage of pipelines for a streaming application. In this use case streaming data is read from Kafka, aggregations are performed and the output is written to the console.

Read stream from Kafka

Read streaming data from Kafka.

## Define these variables as per the configuration of your machine. The below example is just illustrative.

kafkaBootstrapServers <- "192.168.0.256:9092,192.168.0.257:9092,192.168.0.258:9092"
consumerTopic <- "topic1"
streamObj <- read.stream(source = "kafka", kafka.bootstrap.servers = kafkaBootstrapServers, subscribe = consumerTopic, startingOffsets="earliest")
printSchema(streamObj)

User-defined Spark functions

Users can define their own functions and use it as a part of the pipeline. These functions range from data prep, aggregations, casting data to suitable write stream format, etc.

# Function to convert datatype json struct to columns
convertStructToDf <- function(streamObj) {
  streamObj <- SparkR::select(streamObj,list(getField(streamObj$`jsontostructs(value)`,"bannerId"),
                                   getField(streamObj$`jsontostructs(value)`,"mobile"),
                                   getField(streamObj$`jsontostructs(value)`,"homeAppliance"),
                                   getField(streamObj$`jsontostructs(value)`,"gamingConsole"),
                                   getField(streamObj$`jsontostructs(value)`,"accessories"),
                                   getField(streamObj$`jsontostructs(value)`,"brand"),
                                   getField(streamObj$`jsontostructs(value)`,"previousPrice"),
                                   getField(streamObj$`jsontostructs(value)`,"currentPrice"),
                                   getField(streamObj$`jsontostructs(value)`,"discount"),
                                   getField(streamObj$`jsontostructs(value)`,"emi"),
                                   getField(streamObj$`jsontostructs(value)`,"crossSale"),
                                   getField(streamObj$`jsontostructs(value)`,"customerId"),
                                   getField(streamObj$`jsontostructs(value)`,"ts"),
                                   getField(streamObj$`jsontostructs(value)`,"click"),
                                   getField(streamObj$`jsontostructs(value)`,"conversion"),
                                   getField(streamObj$`jsontostructs(value)`,"age"),
                                   getField(streamObj$`jsontostructs(value)`,"income"),
                                   getField(streamObj$`jsontostructs(value)`,"maritalStatus"),
                                   getField(streamObj$`jsontostructs(value)`,"segment")))
  colnames(streamObj) <- c("bannerId","mobile","homeAppliance","gamingConsole","accessories","brand","previousPrice","currentPrice",
                           "discount","emi","crossSale","customerId","ts","click","conversion","age","income","maritalStatus","segment")
  return(streamObj)
}

# Function to cast columns as string, integer, etc
castDfColumns <- function(streamObj) {
  streamObj <- SparkR::selectExpr(streamObj, "bannerId","mobile","homeAppliance","gamingConsole","accessories","brand",
                          "CAST(previousPrice as INTEGER)","CAST(currentPrice as INTEGER)","CAST(discount as INTEGER)","emi",
                          "crossSale","customerId","ts","CAST(click as INTEGER)","CAST(conversion as INTEGER)",
                          "CAST(age as INTEGER)","CAST(income as INTEGER)","maritalStatus","segment")
  streamObj$ts <- SparkR::to_timestamp(streamObj$ts,"yyyy-MM-dd HH:mm:ss")
  return (streamObj)
}

# Function to convert datatype json struct to columns
convertDfToKafkaKeyValuePairs <- function (streamObj, kafkaKey) {
  streamObj <- SparkR::toJSON(streamObj)
  streamObj$key <- kafkaKey
  return(streamObj)
}

# Function to summarize click stream data
globalUiMetrics <- function (streamObj) {
  ## Aggregation query
  streamObj <- SparkR::summarize(SparkR::groupBy(streamObj,streamObj$bannerId),
                         impressions=count(streamObj$customerId),
                         clicks=sum(streamObj$click),
                         conversions=sum(streamObj$conversion))
  SparkR::colnames(streamObj) <- c("banner_id","impressions","clicks","conversions")
  return (streamObj)
}

Define pipeline object, register user-defined functions to the pipeline object

In order to use pipelines, a pipeline object needs to be defined. Notice the Spark pipelines are defined using the readInputSpark function.

Each user-defined function needs to be registered to the pipeline object. Post registration, the function can be used to construct a pipeline. A pipeline can be a pipeline of multiple functions called in a particular sequence.

# Define pipeline object
pipelineObj <- analysisPipelines::StreamingAnalysisPipeline(input = streamObj)

consumerDataSchema <- structType(structField("bannerId", "string"),
                                 structField("mobile", "string"),
                                 structField("homeAppliance", "string"),
                                 structField("gamingConsole", "string"),
                                 structField("accessories", "string"),
                                 structField("brand", "string"),
                                 structField("previousPrice", "string"),
                                 structField("currentPrice", "string"),
                                 structField("discount", "string"),
                                 structField("emi", "string"),
                                 structField("crossSale", "string"),
                                 structField("customerId", "string"),
                                 structField("ts", "string"),
                                 structField("click", "string"),
                                 structField("conversion", "string"),
                                 structField("age", "string"),
                                 structField("income", "string"),
                                 structField("maritalStatus", "string"),
                                 structField("segment", "string"))

# Register user-defined functions
registerFunction("convertStructToDf", "", functionType = "streaming", engine = "spark-structured-streaming") 
registerFunction("castDfColumns", "", functionType = "streaming", engine = "spark-structured-streaming") 
registerFunction("convertDfToKafkaKeyValuePairs", "", functionType = "streaming", engine = "spark-structured-streaming")

getRegistry()

# Define pipeline 
# Do data prep
pipelineObj %>% castKafkaStreamAsString_sparkSS() %>% 
  convertKafkaValueFromJson_sparkSS(schema = consumerDataSchema, outAsIn = T) %>% convertStructToDf_sparkSS(outAsIn = T) %>% castDfColumns_sparkSS(outAsIn = T, storeOutput = T) -> pipelineObj

pipelineObj %>>% getPipeline
pipelineObj %>>% visualizePipeline

Running the pipeline and generating an output

The pipeline is run by calling the generateOutput() function. The output attribute of the pipeline object contains the resultant Spark dataframe(s).

In this example the Spark DataFrames are converted to R dataframes to help understand the result.

## Run pipeline
pipelineObj %>% generateOutput() -> pipelineObj

## Write to output stream
streamObj <- pipelineObj %>>% getOutputById("4")
streamObj

Supplementary note

Currently, streaming pipelines have the limitation that they are able to execute only linear flows as this constrained by Apache Spark Structured Streaming. Non-linear flows can be defined but might throw execution errors in runtime. Also, streaming pipelines can be implemented using only 1 engine i.e. Apache Spark Structured Streaming.