Dec 4, 2016

Point to Note in Upgrading Apache Spark's Version to 2.0 and Executing




1. Introduction

To upgrade Spark's version to 2.0 is so nice. It's because the perfprmance of Spark 2.0's Tungsten engine is faster than of 1.6 as the following Databricks's blog 'Faster'.

・Technical Preview of Apache Spark 2.0 Now on Databricks
 Easier, Faster, and Smarter
 https://databricks.com/blog/2016/05/11/apache-spark-2-0-technical-preview-easier-faster-and-smarter.html

In executing a Spark 2.0 application, the spark-submit script which I had used in executing a Spark 1.6 application wasn't available. The purpose of this article is to show it's solution.

2. Problem

My Spark version was 1.6.0. I upgraded Spark's version to 2.0.1 last week. I build my application with Spark 2.0.1 and deploy it. And then, I executed the following spark-submit script.

spark-submit --master yarn-client --class xxxxx --num-executors 8 --executor-memory 16G --executor-cores 20 yyyyy.jar ... 

But the following error was occured.

16/12/01 19:35:21 WARN datasources.DataSource: Error while looking for metadata directory.
org.apache.spark.SparkException: Unable to create database default as failed to create its directory hdfs://namenode:9000/usr/spark-2.0.1-bin-hadoop2.7/spark-warehouse

...

3. Solution

In Spark 2.0, a warehouse directory is used to store tables and 'spark.sql.warehouse.dir' is added to set the warehouse location. If default file system is set to a local file system, avobe error isn't occured.
My default file system was set to HDFS, so hdfs://namenode:9000/usr/spark-2.0.1-bin-hadoop2.7/spark-warehouse was tried to create and failed.

I added the following string to the spark-submit script and explicitly set a local directory path as the default warehouse path.

--conf spark.sql.warehouse.dir=file:///c:\temp\spark-warehouse 


The full of spark-submit script is following. I executed this script and confirmed that 'c:\temp\spark-warehouse' was created and the job successfully terminated.

spark-submit --master yarn-client --conf spark.sql.warehouse.dir=file:///c:\temp\spark-warehouse --class xxxxx --num-executors 8 --executor-memory 16G --executor-cores 20 yyyyy.jar ... 

4. Conclusion

If your default file system is set to HDFS and you execute a Spark 2.0 application, there is a need to set a local directory path as the default warehouse path.

Oct 30, 2016

Design of Filling Up Lacking Data with Apache Spark

I considered the design of filling up lacking data with Apache Spark :)



1. Introduction

The category of rows which id is 1, 4, or 5 is A, B and C. Otherwise, the row which id is 2 and category is B doesn't exist, and the rows which Id is 3 and category is A or C don't exist. I needed to fill up these lacking data like following.

The purpose of this article is to puropose a effective design such that the following figure is achieved.



2. The Design

The design which proposed in this article is constructed by two parts.

2.1. Part1

It is inefficient to filter a RDD by id, check that the RDD contains a particular row and fill up lacking data. In order to do efficient check and filling up, I created a RDD of key-value pairs and grouped the elements by a key.


   ・Stage1: create Pair RDD and group by key
      map() -> groupBykey()


2.2. Part2

By groupByKey(), Spark's shuffle work and partitions are created again like the following left figure. If groupByKey() don't work, a computer must search many partitions. After groupByKey(), however, the computer only search one partition whose id is equal to 2 because multiple lows which have same id are divided into the same partition.

In addition, above checking and filling up are pipeline processing, thus the execution time are reduced.


   ・Stage2: check and fill up lacking data
      map() -> flatMap()



3. Conclusion

If you don't want to do full-scan that is to search all partitions (long time), the design proposed in section 2 efficiently works and reduces the execution time greatly.


Sep 15, 2016

Architecture for Overnight Batch Processing and Data Scientists' Work with Apache Spark

I'm on vacation and consider architecture for overnight batch processing and data scientists' work with Apache Spark :)



I. Business flow

The following is a simple business flow I assumed. After overnight batch processing, many data scientists start to work :)


II. Architecture with the use of Parquet

They also request the usage of DataFrames created by overnight batch processing and the performance of queries. In this case, I think architecture with the use of Parquet is suitable.
An example of this architecture is shown below.


DataFrame 1 is converted to Parquet 1. When data scientists want to use DataFrame 1, they load it from Parquet 1. DataFrame 4 is also the same. But not all of columns are loaded. Parquet is a columnar storage format so that it allows loading only selective records.
Therefore, the performance of queries against a Parquet table is higher than against a Text table.

III. Usage of Parquet

How to use Parquet is so easy. We can convert a DataFrame to a Parquet file like the following.

・convert a DataFrame to a Parquet file
dataFrame1.write.format("parquet").save("parquetFilePath")

We can also load a DataFrame from a Parquet file like the following.

・load a DataFrame from a Parquet file
val dataFrame1' = sqlContext.read.parquet("parquetFilePath")
                                              // Not all of columns are loaded. Only selective columns are loaded.

val dataFrame5 = dataFrame1'.join(dataFrame3, dataFrame1'("id") === dataFrame3("id"))
.groupBy(dataFrame3("category1"), dataFrame3("category2"))
.agg(avg(dataFrame1'(value)))


May 3, 2016

Building Apache Spark on Windows 10

I built Apache Spark on Windows 10 and checked the operation of the Spark :)


I. Building

First, I downloaded the Spark source code (v1.6.0.zip) and then extracted all files.

・Apache Spark source code
https://github.com/apache/spark/releases
v1.6.0.zip

・after extracting
C:\enjyoyspace
    ├─spark-1.6.0
    ├─assembly
    ├─bagel
    ├─bin
    ├─build

Next, I compiled with Scala 2.10.6 to produce a Spark package (Hadoop 2.6.0) by using Apache Maven. Apache Maven is a so loveable :) The changes of pom.xml and the command are following.

・changes of pom.xml
C:\enjyoyspace\spark-1.6.0\pom.xml

before:    <scala.version>2.10.5</scala.version>
after:      <scala.version>2.10.6</scala.version>

・command
cd C:\enjyoyspace\spark-1.6.0
mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package


(install Apache Maven: https://maven.apache.org/install.html)

The Spark package (Hadoop 2.6.0) was produced.

・Spark package (Hadoop 2.6.0)
C:\enjyoyspace\spark-1.6.0\assembly\target\scala-2.10\spark-assembly-1.6.0-hadoop2.6.0.jar

II. Operation Check

I checked the operation capabilities of Spark (testing interactive programs). It noramlly ran :)

・command
C:\enjyoyspace\spark-1.6.0>bin\spark-shell

・using Spark
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.6 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.

Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> val lines = sc.parallelize(List("Sqoop", "from external datastores into HDFS", "Julia", "Julia is a high-performance dynamic programming language", "JuliaCon 2016"))
lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at< console>:27


scala> lines.count()
res0: Long = 5

scala> val words = lines.flatMap(line => line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at flatMap at< console>:29

scala> words.count()
res1: Long = 16
scala>


・Spark Web UI (http://localhost:4040/jobs/)

Mar 19, 2016

Simple Performance Test of List<T> and HashSet<T> in C#

It's often said that the search performance of HashSet<T> is faster than of List<T> and this is  because HashSet<T> has a hash table. I surveyed the performance of the add, contains, remove and where methods of HashSet<T> and List<T>.


I ran the following simple tests.
Expected results were obtained. The power of hash lookups is attractive :)

I. Test

・List<T>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Diagnostics;


namespace WhiteChocolate
{
    class Program
    {
        static void Main(string[] args)
        {
            Enumerable.Range(0, 10).ToList().ForEach(x =>
            {
                TestList(500000 * x);
            });
        }

        public static void TestList(int num)
        {
            var list = new List<string>();
           
            // 1. Add
            var stopWatch = Stopwatch.StartNew();
            Enumerable.Range(0, num).ToList().ForEach(x => list.Add(x.ToString()));
            stopWatch.Stop();
            Console.WriteLine(string.Format("List Add: {0} ms", stopWatch.ElapsedMilliseconds.ToString()));


            // 2. Contains
            var count = (num / 2).ToString();
            stopWatch = Stopwatch.StartNew();
            var result = list.Contains(count);
            stopWatch.Stop();
            Console.WriteLine(string.Format("List Contains: {0} ms", stopWatch.ElapsedMilliseconds.ToString()));


            // 3. Add (if not exists)
            count = (num * 2).ToString();
            stopWatch = Stopwatch.StartNew();
            if (!list.Contains(count)) list.Add(count);
            stopWatch.Stop();
            Console.WriteLine(string.Format("List Add (if not exists): {0} ms", stopWatch.ElapsedMilliseconds.ToString()));


            // 4. Remove (if exists)
            count = (num * 2).ToString();
            stopWatch = Stopwatch.StartNew();
            if (!list.Contains(count)) list.Remove(count);
            stopWatch.Stop();
            Console.WriteLine(string.Format("List Remove (if exists): {0} ms", stopWatch.ElapsedMilliseconds.ToString()));


            // 5. Where
            count = (num / 4).ToString();
            stopWatch = Stopwatch.StartNew();
            foreach (var one in list.Where(x => x == count)) Console.WriteLine(one);
            stopWatch.Stop();
            Console.WriteLine(string.Format("List Where: {0} ms", stopWatch.ElapsedMilliseconds.ToString()));
        }
    }
}

・HashSet<T>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Diagnostics;


namespace StrawberryChocolate
{
    class Program
    {
        static void Main(string[] args)
        {
            Enumerable.Range(0, 10).ToList().ForEach(x =>
            {
                TestHashSet(500000 * x);
            });
        }

        public static void TestHashSet(int num)
        {
            var hashSet = new HashSet<string>();


            // 1. Add
            var stopWatch = Stopwatch.StartNew();
            Enumerable.Range(0, num).ToList().ForEach(x => hashSet.Add(x.ToString()));
            stopWatch.Stop();
            Console.WriteLine(string.Format("HashSet Add: {0} ms", stopWatch.ElapsedMilliseconds.ToString()));
           
            // 2. Contains
            var count = (num / 2).ToString();
            stopWatch = Stopwatch.StartNew();
            var result = hashSet.Contains(count);
            stopWatch.Stop();
            Console.WriteLine(string.Format("HashSet Contains: {0} ms", stopWatch.ElapsedMilliseconds.ToString()));


            // 3. Add (if not exists)
            count = (num * 2).ToString();
            stopWatch = Stopwatch.StartNew();
            if (!hashSet.Contains(count)) hashSet.Add(count);
            stopWatch.Stop();
            Console.WriteLine(string.Format("HashSet Add (if not exists): {0} ms", stopWatch.ElapsedMilliseconds.ToString()));
           
            // 4. Remove (if exists)
            count = (num * 2).ToString();
            stopWatch = Stopwatch.StartNew();
            if (!hashSet.Contains(count)) hashSet.Remove(count);
            stopWatch.Stop();
            Console.WriteLine(string.Format("HashSet Remove (if exists): {0} ms", stopWatch.ElapsedMilliseconds.ToString()));


            // 5. Where
            count = (num / 4).ToString();
            stopWatch = Stopwatch.StartNew();
            foreach (var one in hashSet.Where(x => x == count)) Console.WriteLine(one);
            stopWatch.Stop();
            Console.WriteLine(string.Format("HashSet Where: {0} ms", stopWatch.ElapsedMilliseconds.ToString()));
        }
    }
}


II. Result

1. Add
run the add method. add X string objects
X = 500000, 1000000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000, 4500000, 5000000


2. Contains
run the contains method against half the objects in a list of X string objects
X = 500000, 1000000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000, 4500000, 5000000


3. Add (If not exists)
run the contains method against all objects in a list of X string objects and then run the add method
X = 500000, 1000000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000, 4500000, 5000000


4. Remove (If exists)
run the contains method against a half objects in a list of X string objects and then run the remove method
X = 500000, 1000000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000, 4500000, 5000000


5. Where
run the where method against all objects in a list of X string objects (this method sounds like full scan in some way :))
X = 500000, 1000000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000, 4500000, 5000000