Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,12 @@ hs_err_pid*
/.project
/.settings/
/bin/
*.DS_Store
.idea
*.iml
target/*
project/target/*
project/project/*
target/*.*
/.cache-main
/project/
71 changes: 66 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
This repository contains the Java labs as well as their Scala and Python ports of the code used in Manning Publication’s **[Spark in Action, 2nd edition](https://www.manning.com/books/spark-in-action-second-edition?a_aid=jgp)**, by Jean-Georges Perrin.

---

# Spark in Action, 2nd edition – Java, Python, and Scala code for chapter 14

Chapter 14 is about **extending data transformation with UDFs** (user defined functions).
Expand Down Expand Up @@ -30,16 +28,79 @@ Attempts at using polymorphism with UDFs.

Passing an entire column to a UDF.


## Datasets

Dataset(s) used in this chapter:
* South Dublin (Republic of Ireland) County Council's [libraries](https://data.smartdublin.ie/dataset/libraries).

## Notes
The `OpenedLibrariesApp` application does the following:

1. It acquires a session (a `SparkSession`).
2. It asks Spark to load (ingest) a dataset in CSV format.
3. Spark stores the contents in a dataframe, then demonstrate how to use Custom UDF to check if in range.

## Running the lab in Java

For information on running the Java lab, see chapter 1 in [Spark in Action, 2nd edition](http://jgp.net/sia).

## Running the lab using PySpark

Prerequisites:

You will need:
* `git`.
* Apache Spark (please refer Appendix P - 'Spark in production: installation and a few tips').

1. Clone this project

```
git clone https://github.com/jgperrin/net.jgp.books.spark.ch14
```

2. Go to the lab in the Python directory

1. All examples are in Java. Examples in Python and Scala are coming.
```
cd net.jgp.books.spark.ch14/src/main/python/lab200_library_open/
```

3. Execute the following spark-submit command to create a jar file to our this application

```
spark-submit openedLibrariesApp.py
```

## Running the lab in Scala

Prerequisites:

You will need:
* `git`.
* Apache Spark (please refer Appendix P - 'Spark in production: installation and a few tips').

1. Clone this project

```
git clone https://github.com/jgperrin/net.jgp.books.spark.ch14
```

2. cd net.jgp.books.spark.ch14

3. Package application using sbt command

```
sbt clean assembly
```

4. Run Spark/Scala application using spark-submit command as shown below:

```
spark-submit --class net.jgp.books.spark.ch14.lab200_library_open.OpenedLibrariesScalaApp target/scala-2.12/SparkInAction2-Chapter14-assembly-1.0.0.jar
```

## Notes
1. [Java] Due to renaming the packages to match more closely Java standards, this project is not in sync with the book's MEAP prior to v10 (published in April 2019).
2. [Scala, Python] As of MEAP v14, we have introduced Scala and Python examples (published in October 2019).

---

Follow me on Twitter to get updates about the book and Apache Spark: [@jgperrin](https://twitter.com/jgperrin). Join the book's community on [Facebook](https://facebook.com/sparkinaction/) or in [Manning's live site](https://forums.manning.com/forums/spark-in-action-second-edition?a_aid=jgp).
21 changes: 21 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name := "SparkInAction2-Chapter14"

version := "1.0.0"

scalaVersion := "2.12.10"

val sparkVersion = "3.0.0"

resolvers ++= Seq(
"apache-snapshots" at "http://repository.apache.org/snapshots/"
)

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)

assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.2.8
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
121 changes: 121 additions & 0 deletions src/main/python/lab200_library_open/openedLibrariesApp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
Custom UDF to check if in range.

@author rambabu.posa
"""
import os
from datetime import datetime
from pyspark.sql import (SparkSession, functions as F)
from pyspark.sql.types import (StructType,StructField,
StringType,IntegerType, BooleanType)

def get_absolute_file_path(path, filename):
current_dir = os.path.dirname(__file__)
relative_path = "{}{}".format(path, filename)
absolute_file_path = os.path.join(current_dir, relative_path)
return absolute_file_path

def create_dataframe(spark):
schema = StructType([
StructField('id', IntegerType(), False),
StructField('date_str', StringType(), False)
])
rows = [
(1, "2019-03-11 14:30:00"),
(2, "2019-04-27 16:00:00"),
(3, "2020-01-26 05:00:00")
]
df = spark.createDataFrame(rows, schema)
df = df.withColumn("date", F.to_timestamp(F.col("date_str"))) \
.drop("date_str")
return df

def is_open(hoursMon, hoursTue, hoursWed, hoursThu, hoursFri, hoursSat, hoursSun, dateTime):
if(dateTime.weekday() == 0):
hours = hoursMon
elif(dateTime.weekday() == 1):
hours = hoursTue
elif(dateTime.weekday() == 2):
hours = hoursWed
elif(dateTime.weekday() == 3):
hours = hoursThu
elif(dateTime.weekday() == 4):
hours = hoursFri
elif(dateTime.weekday() == 5):
hours = hoursSat
elif(dateTime.weekday() == 6):
hours = hoursSun

time = dateTime.time()
print('Opening hours = {}, actual time = {} '.format(hours, time))

if hours.lower() == 'closed':
return False
else:
start_time_in_hours = datetime.strptime(hours.split("-")[0], '%H:%M').time()
end_time_in_hours = datetime.strptime(hours.split("-")[1][0:5], '%H:%M').time()
start_time_in_secs = start_time_in_hours.hour*3600 + start_time_in_hours.minute*60
end_time_in_secs = end_time_in_hours.hour*3600 + end_time_in_hours.minute*60
actual_time_in_secs = dateTime.time().hour*3600 + dateTime.time().minute*60
if actual_time_in_secs>=start_time_in_secs and actual_time_in_secs <= end_time_in_secs:
return True
else:
return False

def main(spark):
path = '../../../../data/south_dublin_libraries/'
filename = "sdlibraries.csv"
absolute_file_path = get_absolute_file_path(path, filename)
librariesDf = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.option("encoding", "cp1252") \
.load(absolute_file_path) \
.drop("Administrative_Authority", "Address1", "Address2", "Town", "Postcode",
"County", "Phone", "Email", "Website", "Image", "WGS84_Latitude", "WGS84_Longitude")

librariesDf.show(truncate=False)
librariesDf.printSchema()

dateTimeDf = create_dataframe(spark).drop("id")
dateTimeDf.show(truncate=False)
dateTimeDf.printSchema()

df = librariesDf.crossJoin(dateTimeDf)
df.createOrReplaceTempView("libraries")
df.show(truncate=False)

# this is how to register an UDF for DataFrame and Dataset APIs
open = F.udf(is_open, BooleanType())
# To register UDF to use in Spark SQL API
# spark.udf.register("is_open", is_open, BooleanType())

# Using the dataframe API
finalDf = df.withColumn("open",
open(F.col("Opening_Hours_Monday"),
F.col("Opening_Hours_Tuesday"),
F.col("Opening_Hours_Wednesday"),
F.col("Opening_Hours_Thursday"),
F.col("Opening_Hours_Friday"),
F.col("Opening_Hours_Saturday"),
F.lit("Closed"),
F.col("date"))) \
.drop("Opening_Hours_Monday") \
.drop("Opening_Hours_Tuesday") \
.drop("Opening_Hours_Wednesday") \
.drop("Opening_Hours_Thursday") \
.drop("Opening_Hours_Friday") \
.drop("Opening_Hours_Saturday")

finalDf.show()
finalDf.printSchema()


if __name__ == '__main__':
# Creates a session on a local master
spark = SparkSession.builder.appName("Custom UDF to check if in range") \
.master("local[*]").getOrCreate()
# Comment this line to see full log
spark.sparkContext.setLogLevel('error')
main(spark)
spark.stop()
115 changes: 115 additions & 0 deletions src/main/python/lab210_library_open_sql/openedLibrariesSqlApp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""
Custom UDF to check if in range.

@author rambabu.posa
"""
import os
from datetime import datetime
from pyspark.sql import (SparkSession, functions as F)
from pyspark.sql.types import (StructType,StructField,
StringType,IntegerType, BooleanType)

def get_absolute_file_path(path, filename):
current_dir = os.path.dirname(__file__)
relative_path = "{}{}".format(path, filename)
absolute_file_path = os.path.join(current_dir, relative_path)
return absolute_file_path

def create_dataframe(spark):
schema = StructType([
StructField('id', IntegerType(), False),
StructField('date_str', StringType(), False)
])
rows = [
(1, "2019-03-11 14:30:00"),
(2, "2019-04-27 16:00:00"),
(3, "2020-01-26 05:00:00")
]
df = spark.createDataFrame(rows, schema)
df = df.withColumn("date", F.to_timestamp(F.col("date_str"))) \
.drop("date_str")
return df

def is_open(hoursMon, hoursTue, hoursWed, hoursThu, hoursFri, hoursSat, hoursSun, dateTime):
if(dateTime.weekday() == 0):
hours = hoursMon
elif(dateTime.weekday() == 1):
hours = hoursTue
elif(dateTime.weekday() == 2):
hours = hoursWed
elif(dateTime.weekday() == 3):
hours = hoursThu
elif(dateTime.weekday() == 4):
hours = hoursFri
elif(dateTime.weekday() == 5):
hours = hoursSat
elif(dateTime.weekday() == 6):
hours = hoursSun

time = dateTime.time()
print('Opening hours = {}, actual time = {} '.format(hours, time))

if hours.lower() == 'closed':
return False
else:
start_time_in_hours = datetime.strptime(hours.split("-")[0], '%H:%M').time()
end_time_in_hours = datetime.strptime(hours.split("-")[1][0:5], '%H:%M').time()
start_time_in_secs = start_time_in_hours.hour*3600 + start_time_in_hours.minute*60
end_time_in_secs = end_time_in_hours.hour*3600 + end_time_in_hours.minute*60
actual_time_in_secs = dateTime.time().hour*3600 + dateTime.time().minute*60
if actual_time_in_secs>=start_time_in_secs and actual_time_in_secs <= end_time_in_secs:
return True
else:
return False

def main(spark):
path = '../../../../data/south_dublin_libraries/'
filename = "sdlibraries.csv"
absolute_file_path = get_absolute_file_path(path, filename)
librariesDf = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.option("encoding", "cp1252") \
.load(absolute_file_path) \
.drop("Administrative_Authority", "Address1", "Address2", "Town", "Postcode",
"County", "Phone", "Email", "Website", "Image", "WGS84_Latitude", "WGS84_Longitude")

librariesDf.show(truncate=False)
librariesDf.printSchema()

dateTimeDf = create_dataframe(spark).drop("id")
dateTimeDf.show(truncate=False)
dateTimeDf.printSchema()

df = librariesDf.crossJoin(dateTimeDf)
df.createOrReplaceTempView("libraries")
df.show(truncate=False)

# this is how to register an UDF for DataFrame and Dataset APIs
#is_open = F.udf('is_open', BooleanType())
# To register UDF to use in Spark SQL API
spark.udf.register("is_open", is_open, BooleanType())

sqlQuery = """
SELECT Council_ID, Name, date,
is_open(Opening_Hours_Monday, Opening_Hours_Tuesday,
Opening_Hours_Wednesday, Opening_Hours_Thursday,
Opening_Hours_Friday, Opening_Hours_Saturday, 'closed', date) as open
FROM libraries
"""

# Using SQL
finalDf = spark.sql(sqlQuery)

finalDf.show()
finalDf.printSchema()


if __name__ == '__main__':
# Creates a session on a local master
spark = SparkSession.builder.appName("Custom UDF to check if in range") \
.master("local[*]").getOrCreate()
# Comment this line to see full log
spark.sparkContext.setLogLevel('error')
main(spark)
spark.stop()
Loading