1. Introduction
We are able to write Spark applications that create directly a data frame from a csv file by using the spark-csv package. And so, the package is often used.
Mahler's Symphony No. 5 will be performed by Eliahu Inbal and Konzerthausorchester Berlin in Tokyo tonight. I arrived at the hall an hour before that the classical concert started. So, I considered a few interesting quiz about creating data frames from csv files without above package :)
In this article, Spark's version is 1.6.3.
Mahler's Symphony No. 5 will be performed by Eliahu Inbal and Konzerthausorchester Berlin in Tokyo tonight. I arrived at the hall an hour before that the classical concert started. So, I considered a few interesting quiz about creating data frames from csv files without above package :)
In this article, Spark's version is 1.6.3.
2. Fun Quiz
In this chapter I note the source code using the spark-csv package and the few funny quiz :)Before starting the quiz game, we need to prepare the following csv file.
・ test_data.csv
20170228,scheme1,BermudanCallableSwap,JPY,21261339422
20170228,scheme2,BermudanCallableSwap,JPY,22759109989
20170228,scheme3,BermudanCallableSwap,JPY,21405741891
...
20170228,scheme2,BermudanCallableSwap,JPY,22759109989
20170228,scheme3,BermudanCallableSwap,JPY,21405741891
...
2.1. Use Spark-CSV Package
First, I note the program using the spark-csv package. There are two important points. One is setting 'SPARKR_SUBMIT_ARGS' to use spark-csv package, and the other is to load a csv file as data frame directly.
How to set 'SPARKR_SUBMIT_ARGS' and load a csv file as data frame are as follows.
・ set 'SPARKR_SUBMIT_ARGS' (com.databricks:spark-csv)
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.11:1.5.0" "sparkr-shell"')
・ load a csv file as data frame
df <- read.df(sqlContext, inputFilePath, source = "com.databricks.spark.csv", schema = schema)
The full source code is as follows.
・ source code
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.11:1.5.0" "sparkr-shell"')
# init SparkR
sc <- sparkR.init(appName = "visualization", master = "local[*]")
sqlContext <- sparkRSQL.init(sc)
# parameter
inputFilePath <- "C:/dev/R/test_data.csv"
# schema of data frame
schema <- structType(structField(
structField(x = "date", type = "integer", nullable = FALSE),
structField(x = "scheme_number", type = "string", nullable = FALSE),
structField(x = "product", type = "string", nullable = FALSE),
structField(x = "currency", type = "string", nullable = FALSE),
structField(x = "sensitivity", type = "double", nullable = FALSE))
# load a csv file as data frame
df <- read.df(sqlContext, inputFilePath, source = "com.databricks.spark.csv", schema = schema)
showDF(df)
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.11:1.5.0" "sparkr-shell"')
# init SparkR
sc <- sparkR.init(appName = "visualization", master = "local[*]")
sqlContext <- sparkRSQL.init(sc)
# parameter
inputFilePath <- "C:/dev/R/test_data.csv"
# schema of data frame
schema <- structType(structField(
structField(x = "date", type = "integer", nullable = FALSE),
structField(x = "scheme_number", type = "string", nullable = FALSE),
structField(x = "product", type = "string", nullable = FALSE),
structField(x = "currency", type = "string", nullable = FALSE),
structField(x = "sensitivity", type = "double", nullable = FALSE))
# load a csv file as data frame
df <- read.df(sqlContext, inputFilePath, source = "com.databricks.spark.csv", schema = schema)
showDF(df)
2.2. Quiz
Next, I note a simple quiz. Enjoy!! :)・ quiz
If you don't use the spark-csv package, how do you create the data frame with the folllowing schema from test_data.csv?
・ schema
date,scheme_number,product,currency,sensitivity
・ test_data.csv
20170228,scheme1,BermudanCallableSwap,JPY,21261339422
20170228,scheme2,BermudanCallableSwap,JPY,22759109989
20170228,scheme3,BermudanCallableSwap,JPY,21405741891
...
The time limit is 5 minutes :)
・ schema
date,scheme_number,product,currency,sensitivity
・ test_data.csv
20170228,scheme1,BermudanCallableSwap,JPY,21261339422
20170228,scheme2,BermudanCallableSwap,JPY,22759109989
20170228,scheme3,BermudanCallableSwap,JPY,21405741891
...
The time limit is 5 minutes :)
3. Answer
The answer is as follows. It's diffuse, but there is a bit of fun :)
You must prepare a record splitter.
You must prepare a record splitter.
・ record splitter
splitRecord <- function(record) {
Sys.setlocale("LC_ALL", "C")
part <- strsplit(record, ",")[[1]]
list(column1 = as.integer(part[1]),
column2 = part[2],
column3 = part[3],
column4 = part[4],
column5 = as.double(part[5]))
}
Sys.setlocale("LC_ALL", "C")
part <- strsplit(record, ",")[[1]]
list(column1 = as.integer(part[1]),
column2 = part[2],
column3 = part[3],
column4 = part[4],
column5 = as.double(part[5]))
}
And then, you must make a rdd, split it and convert to a data frame.
・ load a csv file and create data frame
# load a csv file
orgData <- SparkR:::textFile(sc, inputFilePath)
# split data
splitData <- SparkR:::lapply(orgData, splitRecord)
# create data frame
df <- SparkR:::createDataFrame(sqlContext, splitData, schema)
orgData <- SparkR:::textFile(sc, inputFilePath)
# split data
splitData <- SparkR:::lapply(orgData, splitRecord)
# create data frame
df <- SparkR:::createDataFrame(sqlContext, splitData, schema)
The full source code is as follows :)
・ source code
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
# init SparkR
sc <- sparkR.init(appName = "visualization", master = "local[*]")
sqlContext <- sparkRSQL.init(sc)
# parameter
inputFilePath <- "C:/dev/R/test_data.csv"
# schema of data frame
schema <- structType(structField(
structField(x = "date", type = "integer", nullable = FALSE),
structField(x = "scheme_number", type = "string", nullable = FALSE),
structField(x = "product", type = "string", nullable = FALSE),
structField(x = "currency", type = "string", nullable = FALSE),
structField(x = "sensitivity", type = "double", nullable = FALSE))
# record splitter
splitRecord <- function(record) {
Sys.setlocale("LC_ALL", "C")
part <- strsplit(record, ",")[[1]]
list(column1 = as.integer(part[1]),
column2 = part[2],
column3 = part[3],
column4 = part[4],
column5 = as.double(part[5]))
}
# load a csv file
orgData <- SparkR:::textFile(sc, inputFilePath)
# split data
splitData <- SparkR:::lapply(orgData, splitRecord)
# create data frame
df <- SparkR:::createDataFrame(sqlContext, splitData, schema)
showDF(df)
# init SparkR
sc <- sparkR.init(appName = "visualization", master = "local[*]")
sqlContext <- sparkRSQL.init(sc)
# parameter
inputFilePath <- "C:/dev/R/test_data.csv"
# schema of data frame
schema <- structType(structField(
structField(x = "date", type = "integer", nullable = FALSE),
structField(x = "scheme_number", type = "string", nullable = FALSE),
structField(x = "product", type = "string", nullable = FALSE),
structField(x = "currency", type = "string", nullable = FALSE),
structField(x = "sensitivity", type = "double", nullable = FALSE))
# record splitter
splitRecord <- function(record) {
Sys.setlocale("LC_ALL", "C")
part <- strsplit(record, ",")[[1]]
list(column1 = as.integer(part[1]),
column2 = part[2],
column3 = part[3],
column4 = part[4],
column5 = as.double(part[5]))
}
# load a csv file
orgData <- SparkR:::textFile(sc, inputFilePath)
# split data
splitData <- SparkR:::lapply(orgData, splitRecord)
# create data frame
df <- SparkR:::createDataFrame(sqlContext, splitData, schema)
showDF(df)
4. Conclusion
・ Gustav Mahler Symphony No. 5
・ Richard Wagner Tristan and Isolde: Prelude and Love Death
・ Richard Wagner Tristan and Isolde: Prelude and Love Death
・ Konzerthausorchester Berlin, cond. Eliahu Inbal
・ Tokyo
・ Tokyo
No comments:
Post a Comment