The SparkR
package let's you work with distributed data frames on top of a Spark cluster. These allow you to do operations like selection, filtering, aggregation on very large datasets.
SparkR overview
SparkR package documentation
To start working with Sparks distributed dataframes, you must connect your R program with an existing Spark Cluster.
library(SparkR)
sc <- sparkR.init() # connection to Spark context
sqlContext <- sparkRSQL.init(sc) # connection to SQL context
Here are infos how to connect your IDE to a Spark cluster.
There is an Apache Spark introduction topic with install instructions. Basically, you can employ a Spark Cluster locally via java (see instructions) or use (non-free) cloud applications (e.g. Microsoft Azure [topic site], IBM).
What:
Caching can optimize computation in Spark. Caching stores data in memory and is a special case of persistence. Here is explained what happens when you cache an RDD in Spark.
Why:
Basically, caching saves an interim partial result - usually after transformations - of your original data. So, when you use the cached RDD, the already transformed data from memory is accessed without recomputing the earlier transformations.
How:
Here is an example how to quickly access large data (here 3 GB big csv) from in-memory storage when accessing it more then once:
library(SparkR)
# next line is needed for direct csv import:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"')
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
# loading 3 GB big csv file:
train <- read.df(sqlContext, "/train.csv", source = "com.databricks.spark.csv", inferSchema = "true")
cache(train)
system.time(head(train))
# output: time elapsed: 125 s. This action invokes the caching at this point.
system.time(head(train))
# output: time elapsed: 0.2 s (!!)
mtrdd <- createDataFrame(sqlContext, mtcars)
For csv's, you need to add the csv package to the environment before initiating the Spark context:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"') # context for csv import read csv ->
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
Then, you can load the csv either by infering the data schema of the data in the columns:
train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", inferSchema = "true")
Or by specifying the data schema beforehand:
customSchema <- structType(
structField("margin", "integer"),
structField("gross", "integer"),
structField("name", "string"))
train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", schema = customSchema)