diff --git a/.gitignore b/.gitignore index ad3d7f1..d789939 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,12 @@ hs_err_pid* /.project /.settings/ /bin/ +*.DS_Store +.idea +*.iml +target/* +project/target/* +project/project/* +target/*.* +/.cache-main +/project/ \ No newline at end of file diff --git a/README.md b/README.md index 3998f23..c4810ce 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,5 @@ This repository contains the Java labs as well as their Scala and Python ports of the code used in Manning Publication’s **[Spark in Action, 2nd edition](https://www.manning.com/books/spark-in-action-second-edition?a_aid=jgp)**, by Jean-Georges Perrin. ---- - # Spark in Action, 2nd edition – Java, Python, and Scala code for chapter 14 Chapter 14 is about **extending data transformation with UDFs** (user defined functions). @@ -30,16 +28,79 @@ Attempts at using polymorphism with UDFs. Passing an entire column to a UDF. - ## Datasets Dataset(s) used in this chapter: * South Dublin (Republic of Ireland) County Council's [libraries](https://data.smartdublin.ie/dataset/libraries). -## Notes +The `OpenedLibrariesApp` application does the following: + +1. It acquires a session (a `SparkSession`). +2. It asks Spark to load (ingest) a dataset in CSV format. +3. Spark stores the contents in a dataframe, then demonstrate how to use Custom UDF to check if in range. + +## Running the lab in Java + +For information on running the Java lab, see chapter 1 in [Spark in Action, 2nd edition](http://jgp.net/sia). + +## Running the lab using PySpark + +Prerequisites: + +You will need: + * `git`. + * Apache Spark (please refer Appendix P - 'Spark in production: installation and a few tips'). + +1. Clone this project + +``` +git clone https://github.com/jgperrin/net.jgp.books.spark.ch14 +``` + +2. Go to the lab in the Python directory -1. All examples are in Java. Examples in Python and Scala are coming. +``` +cd net.jgp.books.spark.ch14/src/main/python/lab200_library_open/ +``` +3. Execute the following spark-submit command to create a jar file to our this application + + ``` +spark-submit openedLibrariesApp.py + ``` + +## Running the lab in Scala + +Prerequisites: + +You will need: + * `git`. + * Apache Spark (please refer Appendix P - 'Spark in production: installation and a few tips'). + +1. Clone this project + +``` +git clone https://github.com/jgperrin/net.jgp.books.spark.ch14 +``` + +2. cd net.jgp.books.spark.ch14 + +3. Package application using sbt command + +``` +sbt clean assembly +``` + +4. Run Spark/Scala application using spark-submit command as shown below: + +``` +spark-submit --class net.jgp.books.spark.ch14.lab200_library_open.OpenedLibrariesScalaApp target/scala-2.12/SparkInAction2-Chapter14-assembly-1.0.0.jar +``` + +## Notes + 1. [Java] Due to renaming the packages to match more closely Java standards, this project is not in sync with the book's MEAP prior to v10 (published in April 2019). + 2. [Scala, Python] As of MEAP v14, we have introduced Scala and Python examples (published in October 2019). + --- Follow me on Twitter to get updates about the book and Apache Spark: [@jgperrin](https://twitter.com/jgperrin). Join the book's community on [Facebook](https://facebook.com/sparkinaction/) or in [Manning's live site](https://forums.manning.com/forums/spark-in-action-second-edition?a_aid=jgp). diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..9e875af --- /dev/null +++ b/build.sbt @@ -0,0 +1,21 @@ +name := "SparkInAction2-Chapter14" + +version := "1.0.0" + +scalaVersion := "2.12.10" + +val sparkVersion = "3.0.0" + +resolvers ++= Seq( + "apache-snapshots" at "http://repository.apache.org/snapshots/" +) + +libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-core" % sparkVersion, + "org.apache.spark" %% "spark-sql" % sparkVersion +) + +assemblyMergeStrategy in assembly := { + case PathList("META-INF", xs @ _*) => MergeStrategy.discard + case x => MergeStrategy.first +} diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..c0bab04 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.2.8 diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..c9c0554 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10") \ No newline at end of file diff --git a/src/main/python/lab200_library_open/openedLibrariesApp.py b/src/main/python/lab200_library_open/openedLibrariesApp.py new file mode 100644 index 0000000..ea3c330 --- /dev/null +++ b/src/main/python/lab200_library_open/openedLibrariesApp.py @@ -0,0 +1,121 @@ +""" + Custom UDF to check if in range. + + @author rambabu.posa +""" +import os +from datetime import datetime +from pyspark.sql import (SparkSession, functions as F) +from pyspark.sql.types import (StructType,StructField, + StringType,IntegerType, BooleanType) + +def get_absolute_file_path(path, filename): + current_dir = os.path.dirname(__file__) + relative_path = "{}{}".format(path, filename) + absolute_file_path = os.path.join(current_dir, relative_path) + return absolute_file_path + +def create_dataframe(spark): + schema = StructType([ + StructField('id', IntegerType(), False), + StructField('date_str', StringType(), False) + ]) + rows = [ + (1, "2019-03-11 14:30:00"), + (2, "2019-04-27 16:00:00"), + (3, "2020-01-26 05:00:00") + ] + df = spark.createDataFrame(rows, schema) + df = df.withColumn("date", F.to_timestamp(F.col("date_str"))) \ + .drop("date_str") + return df + +def is_open(hoursMon, hoursTue, hoursWed, hoursThu, hoursFri, hoursSat, hoursSun, dateTime): + if(dateTime.weekday() == 0): + hours = hoursMon + elif(dateTime.weekday() == 1): + hours = hoursTue + elif(dateTime.weekday() == 2): + hours = hoursWed + elif(dateTime.weekday() == 3): + hours = hoursThu + elif(dateTime.weekday() == 4): + hours = hoursFri + elif(dateTime.weekday() == 5): + hours = hoursSat + elif(dateTime.weekday() == 6): + hours = hoursSun + + time = dateTime.time() + print('Opening hours = {}, actual time = {} '.format(hours, time)) + + if hours.lower() == 'closed': + return False + else: + start_time_in_hours = datetime.strptime(hours.split("-")[0], '%H:%M').time() + end_time_in_hours = datetime.strptime(hours.split("-")[1][0:5], '%H:%M').time() + start_time_in_secs = start_time_in_hours.hour*3600 + start_time_in_hours.minute*60 + end_time_in_secs = end_time_in_hours.hour*3600 + end_time_in_hours.minute*60 + actual_time_in_secs = dateTime.time().hour*3600 + dateTime.time().minute*60 + if actual_time_in_secs>=start_time_in_secs and actual_time_in_secs <= end_time_in_secs: + return True + else: + return False + +def main(spark): + path = '../../../../data/south_dublin_libraries/' + filename = "sdlibraries.csv" + absolute_file_path = get_absolute_file_path(path, filename) + librariesDf = spark.read.format("csv") \ + .option("header", True) \ + .option("inferSchema", True) \ + .option("encoding", "cp1252") \ + .load(absolute_file_path) \ + .drop("Administrative_Authority", "Address1", "Address2", "Town", "Postcode", + "County", "Phone", "Email", "Website", "Image", "WGS84_Latitude", "WGS84_Longitude") + + librariesDf.show(truncate=False) + librariesDf.printSchema() + + dateTimeDf = create_dataframe(spark).drop("id") + dateTimeDf.show(truncate=False) + dateTimeDf.printSchema() + + df = librariesDf.crossJoin(dateTimeDf) + df.createOrReplaceTempView("libraries") + df.show(truncate=False) + + # this is how to register an UDF for DataFrame and Dataset APIs + open = F.udf(is_open, BooleanType()) + # To register UDF to use in Spark SQL API + # spark.udf.register("is_open", is_open, BooleanType()) + + # Using the dataframe API + finalDf = df.withColumn("open", + open(F.col("Opening_Hours_Monday"), + F.col("Opening_Hours_Tuesday"), + F.col("Opening_Hours_Wednesday"), + F.col("Opening_Hours_Thursday"), + F.col("Opening_Hours_Friday"), + F.col("Opening_Hours_Saturday"), + F.lit("Closed"), + F.col("date"))) \ + .drop("Opening_Hours_Monday") \ + .drop("Opening_Hours_Tuesday") \ + .drop("Opening_Hours_Wednesday") \ + .drop("Opening_Hours_Thursday") \ + .drop("Opening_Hours_Friday") \ + .drop("Opening_Hours_Saturday") + + finalDf.show() + finalDf.printSchema() + + +if __name__ == '__main__': + # Creates a session on a local master + spark = SparkSession.builder.appName("Custom UDF to check if in range") \ + .master("local[*]").getOrCreate() + # Comment this line to see full log + spark.sparkContext.setLogLevel('error') + main(spark) + spark.stop() \ No newline at end of file diff --git a/src/main/python/lab210_library_open_sql/openedLibrariesSqlApp.py b/src/main/python/lab210_library_open_sql/openedLibrariesSqlApp.py new file mode 100644 index 0000000..717d8b1 --- /dev/null +++ b/src/main/python/lab210_library_open_sql/openedLibrariesSqlApp.py @@ -0,0 +1,115 @@ +""" + Custom UDF to check if in range. + + @author rambabu.posa +""" +import os +from datetime import datetime +from pyspark.sql import (SparkSession, functions as F) +from pyspark.sql.types import (StructType,StructField, + StringType,IntegerType, BooleanType) + +def get_absolute_file_path(path, filename): + current_dir = os.path.dirname(__file__) + relative_path = "{}{}".format(path, filename) + absolute_file_path = os.path.join(current_dir, relative_path) + return absolute_file_path + +def create_dataframe(spark): + schema = StructType([ + StructField('id', IntegerType(), False), + StructField('date_str', StringType(), False) + ]) + rows = [ + (1, "2019-03-11 14:30:00"), + (2, "2019-04-27 16:00:00"), + (3, "2020-01-26 05:00:00") + ] + df = spark.createDataFrame(rows, schema) + df = df.withColumn("date", F.to_timestamp(F.col("date_str"))) \ + .drop("date_str") + return df + +def is_open(hoursMon, hoursTue, hoursWed, hoursThu, hoursFri, hoursSat, hoursSun, dateTime): + if(dateTime.weekday() == 0): + hours = hoursMon + elif(dateTime.weekday() == 1): + hours = hoursTue + elif(dateTime.weekday() == 2): + hours = hoursWed + elif(dateTime.weekday() == 3): + hours = hoursThu + elif(dateTime.weekday() == 4): + hours = hoursFri + elif(dateTime.weekday() == 5): + hours = hoursSat + elif(dateTime.weekday() == 6): + hours = hoursSun + + time = dateTime.time() + print('Opening hours = {}, actual time = {} '.format(hours, time)) + + if hours.lower() == 'closed': + return False + else: + start_time_in_hours = datetime.strptime(hours.split("-")[0], '%H:%M').time() + end_time_in_hours = datetime.strptime(hours.split("-")[1][0:5], '%H:%M').time() + start_time_in_secs = start_time_in_hours.hour*3600 + start_time_in_hours.minute*60 + end_time_in_secs = end_time_in_hours.hour*3600 + end_time_in_hours.minute*60 + actual_time_in_secs = dateTime.time().hour*3600 + dateTime.time().minute*60 + if actual_time_in_secs>=start_time_in_secs and actual_time_in_secs <= end_time_in_secs: + return True + else: + return False + +def main(spark): + path = '../../../../data/south_dublin_libraries/' + filename = "sdlibraries.csv" + absolute_file_path = get_absolute_file_path(path, filename) + librariesDf = spark.read.format("csv") \ + .option("header", True) \ + .option("inferSchema", True) \ + .option("encoding", "cp1252") \ + .load(absolute_file_path) \ + .drop("Administrative_Authority", "Address1", "Address2", "Town", "Postcode", + "County", "Phone", "Email", "Website", "Image", "WGS84_Latitude", "WGS84_Longitude") + + librariesDf.show(truncate=False) + librariesDf.printSchema() + + dateTimeDf = create_dataframe(spark).drop("id") + dateTimeDf.show(truncate=False) + dateTimeDf.printSchema() + + df = librariesDf.crossJoin(dateTimeDf) + df.createOrReplaceTempView("libraries") + df.show(truncate=False) + + # this is how to register an UDF for DataFrame and Dataset APIs + #is_open = F.udf('is_open', BooleanType()) + # To register UDF to use in Spark SQL API + spark.udf.register("is_open", is_open, BooleanType()) + + sqlQuery = """ + SELECT Council_ID, Name, date, + is_open(Opening_Hours_Monday, Opening_Hours_Tuesday, + Opening_Hours_Wednesday, Opening_Hours_Thursday, + Opening_Hours_Friday, Opening_Hours_Saturday, 'closed', date) as open + FROM libraries + """ + + # Using SQL + finalDf = spark.sql(sqlQuery) + + finalDf.show() + finalDf.printSchema() + + +if __name__ == '__main__': + # Creates a session on a local master + spark = SparkSession.builder.appName("Custom UDF to check if in range") \ + .master("local[*]").getOrCreate() + # Comment this line to see full log + spark.sparkContext.setLogLevel('error') + main(spark) + spark.stop() \ No newline at end of file diff --git a/src/main/python/lab900_in_range/inCustomRangeApp.py b/src/main/python/lab900_in_range/inCustomRangeApp.py new file mode 100644 index 0000000..dd92b31 --- /dev/null +++ b/src/main/python/lab900_in_range/inCustomRangeApp.py @@ -0,0 +1,64 @@ +""" + Custom UDF to check if in range. + + @author rambabu.posa +""" +from datetime import datetime +from pyspark.sql import (SparkSession, functions as F) +from pyspark.sql.types import (StructType,StructField, + StringType, BooleanType) + +def createDataframe(spark: SparkSession): + schema = StructType([ + StructField('id', StringType(), False), + StructField('time', StringType(), False), + StructField('range', StringType(), False) + ]) + + rows = [ + ("id1", "2019-03-11 05:00:00", "00h00-07h30;23h30-23h59"), + ("id2", "2019-03-11 09:00:00", "00h00-07h30;23h30-23h59"), + ("id3", "2019-03-11 10:30:00", "00h00-07h30;23h30-23h59") + ] + return spark.createDataFrame(rows, schema) + +def in_range(range, event): + range_list = range.split(';') + for item in range_list: + start_time_in_hours = datetime.strptime(item.split("-")[0], '%Hh%M').time() + end_time_in_hours = datetime.strptime(item.split("-")[1][0:5], '%Hh%M').time() + start_time_in_secs = start_time_in_hours.hour*3600 + start_time_in_hours.minute*60 + end_time_in_secs = end_time_in_hours.hour*3600 + end_time_in_hours.minute*60 + if event>=start_time_in_secs and event <= end_time_in_secs: + return True + else: + return False + +def main(spark): + df = createDataframe(spark) + df.show(truncate=False) + + df = df.withColumn("date", F.date_format(F.col("time"), "yyyy-MM-dd HH:mm:ss.SSSS")) \ + .withColumn("h", F.hour(F.col("date"))) \ + .withColumn("m", F.minute(F.col("date"))) \ + .withColumn("s", F.second(F.col("date"))) \ + .withColumn("event", F.expr("h*3600 + m*60 +s")) \ + .drop("date","h","m","s") + + df.show(truncate=False) + + inRange = F.udf(in_range, BooleanType()) + + df = df.withColumn("between", inRange(F.col("range"), F.col("event"))) + + df.show(truncate=False) + + +if __name__ == '__main__': + # Creates a session on a local master + spark = SparkSession.builder.appName("Custom UDF to check if in range") \ + .master("local[*]").getOrCreate() + # Comment this line to see full log + spark.sparkContext.setLogLevel('error') + main(spark) + spark.stop() \ No newline at end of file diff --git a/src/main/python/lab910_addition/additionApp.py b/src/main/python/lab910_addition/additionApp.py new file mode 100644 index 0000000..61e254f --- /dev/null +++ b/src/main/python/lab910_addition/additionApp.py @@ -0,0 +1,48 @@ +""" + Additions via UDF. + + @author rambabu.posa +""" +from pyspark.sql import (SparkSession, functions as F) +from pyspark.sql.types import (StructType,StructField, + StringType, IntegerType, BooleanType) + +def createDataframe(spark: SparkSession): + schema = StructType([ + StructField('fname', StringType(), False), + StructField('lname', StringType(), False), + StructField('score1', IntegerType(), False), + StructField('score2', IntegerType(), False) + ]) + + rows = [ + ("Jean-Georges", "Perrin", 123, 456), + ("Jacek", "Laskowski", 147, 758), + ("Holden", "Karau", 258, 369) + ] + return spark.createDataFrame(rows, schema) + + +def main(spark): + df = createDataframe(spark) + df.show(truncate=False) + + add_string = F.udf(lambda a,b: a+b, StringType()) + add_int = F.udf(lambda a,b: a+b, IntegerType()) + + df = df.withColumn("concat", + add_string(F.col("fname"), F.col("lname"))) + df.show(truncate=False) + + df = df.withColumn("score", + add_int(F.col("score1"), F.col("score2"))) + df.show(truncate=False) + +if __name__ == '__main__': + # Creates a session on a local master + spark = SparkSession.builder.appName("Addition") \ + .master("local[*]").getOrCreate() + # Comment this line to see full log + spark.sparkContext.setLogLevel('error') + main(spark) + spark.stop() \ No newline at end of file diff --git a/src/main/python/lab911_addition_fail1/additionFail1App.py b/src/main/python/lab911_addition_fail1/additionFail1App.py new file mode 100644 index 0000000..1901d19 --- /dev/null +++ b/src/main/python/lab911_addition_fail1/additionFail1App.py @@ -0,0 +1,54 @@ +""" + Additions via UDF. + + @author rambabu.posa +""" +from pyspark.sql import (SparkSession, functions as F) +from pyspark.sql.types import (StructType,StructField, + StringType, IntegerType) + +def createDataframe(spark: SparkSession): + schema = StructType([ + StructField('fname', StringType(), False), + StructField('lname', StringType(), False), + StructField('score1', IntegerType(), False), + StructField('score2', IntegerType(), False) + ]) + + rows = [ + ("Jean-Georges", "Perrin", 123, 456), + ("Jacek", "Laskowski", 147, 758), + ("Holden", "Karau", 258, 369) + ] + return spark.createDataFrame(rows, schema) + +def add_string(a:str, b:str) -> str: + return a+b + +def add_int(a:int, b:int) -> int: + return a+b + +def main(spark): + df = createDataframe(spark) + df.show(truncate=False) + + add = F.udf(add_string, StringType()) + add = F.udf(add_int, IntegerType()) + + df = df.withColumn("concat", + add(F.col("fname"), F.col("lname"))) + df.show(truncate=False) + # Here we can observe this new 'contact' column is null + + df = df.withColumn("score", + add(F.col("score1"), F.col("score2"))) + df.show(truncate=False) + +if __name__ == '__main__': + # Creates a session on a local master + spark = SparkSession.builder.appName("Addition") \ + .master("local[*]").getOrCreate() + # Comment this line to see full log + spark.sparkContext.setLogLevel('error') + main(spark) + spark.stop() \ No newline at end of file diff --git a/src/main/python/lab912_addition_fail2/additionFail2App.py b/src/main/python/lab912_addition_fail2/additionFail2App.py new file mode 100644 index 0000000..086d460 --- /dev/null +++ b/src/main/python/lab912_addition_fail2/additionFail2App.py @@ -0,0 +1,55 @@ +""" + Additions via UDF. + + @author rambabu.posa +""" +from pyspark.sql import (SparkSession, functions as F) +from pyspark.sql.types import (StructType,StructField, + StringType, IntegerType) + +def createDataframe(spark: SparkSession): + schema = StructType([ + StructField('fname', StringType(), False), + StructField('lname', StringType(), False), + StructField('score1', IntegerType(), False), + StructField('score2', IntegerType(), False) + ]) + + rows = [ + ("Jean-Georges", "Perrin", 123, 456), + ("Jacek", "Laskowski", 147, 758), + ("Holden", "Karau", 258, 369) + ] + return spark.createDataFrame(rows, schema) + +def add_string(a:str, b:str) -> str: + return a+b + +def add_int(a:int, b:int) -> int: + return a-b + +def main(spark): + df = createDataframe(spark) + df.show(truncate=False) + + add = F.udf(add_string, StringType()) + add = F.udf(add_int, StringType()) + + # Here we can observe this error + # TypeError: unsupported operand type(s) for -: 'str' and 'str' + df = df.withColumn("concat", + add(F.col("fname"), F.col("lname"))) + df.show(truncate=False) + + df = df.withColumn("score", + add(F.col("score1"), F.col("score2"))) + df.show(truncate=False) + +if __name__ == '__main__': + # Creates a session on a local master + spark = SparkSession.builder.appName("Addition") \ + .master("local[*]").getOrCreate() + # Comment this line to see full log + spark.sparkContext.setLogLevel('error') + main(spark) + spark.stop() \ No newline at end of file diff --git a/src/main/python/lab920_column_as_parameter/columnAdditionApp.py b/src/main/python/lab920_column_as_parameter/columnAdditionApp.py new file mode 100644 index 0000000..4975b2b --- /dev/null +++ b/src/main/python/lab920_column_as_parameter/columnAdditionApp.py @@ -0,0 +1,58 @@ +""" + Column Additions via UDF. + + @author rambabu.posa +""" +from pyspark.sql import (SparkSession, functions as F) +from pyspark.sql.types import (StructType,StructField, IntegerType) + +def createDataframe(spark: SparkSession): + schema = StructType([ + StructField('c0', IntegerType(), False), + StructField('c1', IntegerType(), False), + StructField('c2', IntegerType(), False), + StructField('c3', IntegerType(), False), + StructField('c4', IntegerType(), False), + StructField('c5', IntegerType(), False), + StructField('c6', IntegerType(), False), + StructField('c7', IntegerType(), False) + ]) + + rows = [ + (1, 2, 4, 8,16, 32, 64, 128), + (0, 0, 0, 0,0, 0, 0, 0), + (1, 1, 1, 1,1, 1, 1, 1) + ] + return spark.createDataFrame(rows, schema) + +def column_addition(col): + res = 0 + for value in col: + res = res + value + + return res + +def main(spark): + df = createDataframe(spark) + df.show(truncate=False) + + add_col = F.udf(column_addition, IntegerType()) + + cols =[df.c0, df.c1, df.c2, df.c3, + df.c4, df.c5, df.c6, df.c7] + + col = F.array(*cols) + + df = df.withColumn("sum", add_col(col)) + + df.show(truncate=False) + + +if __name__ == '__main__': + # Creates a session on a local master + spark = SparkSession.builder.appName("Column addition") \ + .master("local[*]").getOrCreate() + # Comment this line to see full log + spark.sparkContext.setLogLevel('error') + main(spark) + spark.stop() \ No newline at end of file diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab200_library_open/IsOpenScalaService.scala b/src/main/scala/net/jgp/books/spark/ch14/lab200_library_open/IsOpenScalaService.scala new file mode 100644 index 0000000..1ebf164 --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab200_library_open/IsOpenScalaService.scala @@ -0,0 +1,68 @@ +package net.jgp.books.spark.ch14.lab200_library_open + +import java.sql.Timestamp +import java.util.Calendar + +trait IsOpenScalaService { + + def isOpen(hoursMon: String, hoursTue: String, hoursWed: String, + hoursThu: String, hoursFri: String, hoursSat: String, + hoursSun: String, dateTime: Timestamp): Boolean +} + +object IsOpenScalaService extends IsOpenScalaService { + def isOpen(hoursMon: String, hoursTue: String, hoursWed: String, + hoursThu: String, hoursFri: String, hoursSat: String, + hoursSun: String, dateTime: Timestamp): Boolean = { + // get the day of the week + val cal = Calendar.getInstance + cal.setTimeInMillis(dateTime.getTime) + val day = cal.get(Calendar.DAY_OF_WEEK) + println(s"Day of the week: ${day}") + val hours = day match { + case Calendar.MONDAY => + hoursMon + + case Calendar.TUESDAY => + hoursTue + + case Calendar.WEDNESDAY => + hoursWed + + case Calendar.THURSDAY => + hoursThu + + case Calendar.FRIDAY => + hoursFri + + case Calendar.SATURDAY => + hoursSat + + case _ => // Sunday + hoursSun + } + + // quick return + if (hours.compareToIgnoreCase("closed") == 0) + return false + // check if in interval + val event = cal.get(Calendar.HOUR_OF_DAY) * 3600 + cal.get(Calendar.MINUTE) * 60 + cal.get(Calendar.SECOND) + val ranges = hours.split(" and ") + for (i <- 0 until ranges.length) { + println(s"Processing range #${i}: ${ranges(i)}") + val operningHours = ranges(i).split("-") + + val start = Integer.valueOf(operningHours(0).substring(0, 2)) * 3600 + +Integer.valueOf(operningHours(0).substring(3, 5)) * 60 + + val end = Integer.valueOf(operningHours(1).substring(0, 2)) * 3600 + +Integer.valueOf(operningHours(1).substring(3, 5)) * 60 + + println(s"Checking between ${start} and ${end}") + if (event >= start && event <= end) + return true + } + false + + } +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab200_library_open/IsOpenScalaUdf.scala b/src/main/scala/net/jgp/books/spark/ch14/lab200_library_open/IsOpenScalaUdf.scala new file mode 100644 index 0000000..60e8191 --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab200_library_open/IsOpenScalaUdf.scala @@ -0,0 +1,25 @@ +package net.jgp.books.spark.ch14.lab200_library_open + +import java.sql.Timestamp +import org.apache.spark.sql.api.java.UDF8 + +/** + * The UDF code itself provides the plumbing between the service code and + * the application code. + * + * @author rambabu.posa + * + */ +@SerialVersionUID(-216751L) +class IsOpenScalaUdf extends + UDF8[String, String, String, String, String, String, String, Timestamp, Boolean] { + + @throws[Exception] + override def call(hoursMon: String, hoursTue: String, + hoursWed: String, hoursThu: String, + hoursFri: String, hoursSat: String, + hoursSun: String, dateTime: Timestamp): Boolean = + IsOpenScalaService.isOpen(hoursMon, hoursTue, hoursWed, + hoursThu, hoursFri, hoursSat, hoursSun, dateTime) + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab200_library_open/OpenedLibrariesScalaApp.scala b/src/main/scala/net/jgp/books/spark/ch14/lab200_library_open/OpenedLibrariesScalaApp.scala new file mode 100644 index 0000000..5885ed7 --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab200_library_open/OpenedLibrariesScalaApp.scala @@ -0,0 +1,97 @@ +package net.jgp.books.spark.ch14.lab200_library_open + +import java.util.ArrayList + +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession, functions => F} + +/** + * Custom UDF to check if in range. + * + * @author rambabu.posa + */ +object OpenedLibrariesScalaApp { + + /** + * main() is your entry point to the application. + * + * @param args + */ + def main(args: Array[String]): Unit = { + + /** + * The processing code. + */ + // Creates a session on a local master + val spark: SparkSession = SparkSession.builder + .appName("Custom UDF to check if in range") + .master("local[*]") + .getOrCreate + + spark.udf.register("isOpen", new IsOpenScalaUdf, DataTypes.BooleanType) + + val librariesDf = spark.read + .format("csv") + .option("header", true) + .option("inferSchema", true) + .option("encoding", "cp1252") + .load("data/south_dublin_libraries/sdlibraries.csv") + .drop("Administrative_Authority") + .drop("Address1") + .drop("Address2") + .drop("Town") + .drop("Postcode") + .drop("County") + .drop("Phone") + .drop("Email") + .drop("Website") + .drop("Image") + .drop("WGS84_Latitude") + .drop("WGS84_Longitude") + + librariesDf.show(false) + librariesDf.printSchema() + + val dateTimeDf = createDataframe(spark) + dateTimeDf.show(false) + dateTimeDf.printSchema() + + val df = librariesDf.crossJoin(dateTimeDf) + df.show(false) + + // Using the dataframe API + val finalDf = df.withColumn("open", + F.callUDF("isOpen", F.col("Opening_Hours_Monday"), + F.col("Opening_Hours_Tuesday"), F.col("Opening_Hours_Wednesday"), + F.col("Opening_Hours_Thursday"), F.col("Opening_Hours_Friday"), + F.col("Opening_Hours_Saturday"), F.lit("Closed"), F.col("date"))) + .drop("Opening_Hours_Monday") + .drop("Opening_Hours_Tuesday") + .drop("Opening_Hours_Wednesday") + .drop("Opening_Hours_Thursday") + .drop("Opening_Hours_Friday") + .drop("Opening_Hours_Saturday") + + finalDf.show() + + // Using SQL + + spark.stop + + } + + private def createDataframe(spark: SparkSession): Dataset[Row] = { + val schema: StructType = DataTypes.createStructType(Array[StructField]( + DataTypes.createStructField("date_str", DataTypes.StringType, false))) + + val rows = new ArrayList[Row] + rows.add(RowFactory.create("2019-03-11 14:30:00")) + rows.add(RowFactory.create("2019-04-27 16:00:00")) + rows.add(RowFactory.create("2020-01-26 05:00:00")) + + spark.createDataFrame(rows, schema) + .withColumn("date", F.to_timestamp(F.col("date_str"))) + .drop("date_str") + } + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab210_library_open_sql/OpenedLibrariesSqlScalaApp.scala b/src/main/scala/net/jgp/books/spark/ch14/lab210_library_open_sql/OpenedLibrariesSqlScalaApp.scala new file mode 100644 index 0000000..a7a699f --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab210_library_open_sql/OpenedLibrariesSqlScalaApp.scala @@ -0,0 +1,82 @@ +package net.jgp.books.spark.ch14.lab210_library_open_sql + +import java.util.ArrayList + +import net.jgp.books.spark.ch14.lab200_library_open.IsOpenScalaUdf +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession, functions => F} + +/** + * Custom UDF to check if in range. + * + * @author rambabu.posa + */ +object OpenedLibrariesSqlScalaApp { + + /** + * main() is your entry point to the application. + * + * @param args + */ + def main(args: Array[String]): Unit = { + + /** + * The processing code. + */ + // Creates a session on a local master + val spark: SparkSession = SparkSession.builder + .appName("Custom UDF to check if in range") + .master("local[*]") + .getOrCreate + + spark.udf.register("isOpen", new IsOpenScalaUdf, DataTypes.BooleanType) + + val librariesDf = spark.read + .format("csv") + .option("header", true) + .option("inferSchema", true) + .option("encoding", "cp1252") + .load("data/south_dublin_libraries/sdlibraries.csv") + .drop("Administrative_Authority", "Address1", "Address2", "Town", "Postcode", + "County", "Phone", "Email", "Website", "Image", "WGS84_Latitude", "WGS84_Longitude") + + librariesDf.show(false) + librariesDf.printSchema() + + val dateTimeDf = createDataframe(spark) + dateTimeDf.show(false) + dateTimeDf.printSchema() + + val df = librariesDf.crossJoin(dateTimeDf) + df.createOrReplaceTempView("libraries") + df.show(false) + + val sqlQuery = "SELECT Council_ID, Name, date, " + + "isOpen(Opening_Hours_Monday, Opening_Hours_Tuesday, " + + "Opening_Hours_Wednesday, Opening_Hours_Thursday, " + + "Opening_Hours_Friday, Opening_Hours_Saturday, 'closed', date) AS open" + + " FROM libraries " + + // Using SQL + val finalDf = spark.sql(sqlQuery) + + finalDf.show() + + spark.stop + } + + private def createDataframe(spark: SparkSession): Dataset[Row] = { + val schema: StructType = DataTypes.createStructType(Array[StructField]( + DataTypes.createStructField("date_str", DataTypes.StringType, false))) + + val rows = new ArrayList[Row] + rows.add(RowFactory.create("2019-03-11 14:30:00")) + rows.add(RowFactory.create("2019-04-27 16:00:00")) + rows.add(RowFactory.create("2020-01-26 05:00:00")) + + spark.createDataFrame(rows, schema) + .withColumn("date", F.to_timestamp(F.col("date_str"))) + .drop("date_str") + } + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab900_in_range/InCustomRangeScalaApp.scala b/src/main/scala/net/jgp/books/spark/ch14/lab900_in_range/InCustomRangeScalaApp.scala new file mode 100644 index 0000000..e2e1408 --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab900_in_range/InCustomRangeScalaApp.scala @@ -0,0 +1,70 @@ +package net.jgp.books.spark.ch14.lab900_in_range + +import java.util.ArrayList + +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession,functions=>F} + +/** + * Custom UDF to check if in range. + * + * @author rambabu.posa + */ +object InCustomRangeScalaApp { + + /** + * main() is your entry point to the application. + * + * @param args + */ + def main(args: Array[String]): Unit = { + + /** + * The processing code. + */ + // Creates a session on a local master + val spark: SparkSession = SparkSession.builder + .appName("Custom UDF to check if in range") + .master("local[*]") + .getOrCreate + + spark.udf.register("inRange", new InRangeScalaUdf, DataTypes.BooleanType) + + var df: Dataset[Row] = createDataframe(spark) + df.show(false) + + df = df + .withColumn("date", F.date_format(F.col("time"), "yyyy-MM-dd HH:mm:ss.SSSS")) + .withColumn("h", F.hour(F.col("date"))) + .withColumn("m", F.minute(F.col("date"))) + .withColumn("s", F.second(F.col("date"))) + .withColumn("event", F.expr("h*3600 + m*60 +s")) + .drop("date","h","m","s") + + df.show(false) + + df = df + .withColumn("between", + F.callUDF("inRange", F.col("range"), F.col("event"))) + + df.show(false) + + spark.stop + + } + + private def createDataframe(spark: SparkSession): Dataset[Row] = { + val schema: StructType = DataTypes.createStructType(Array[StructField]( + DataTypes.createStructField("id", DataTypes.StringType, false), + DataTypes.createStructField("time", DataTypes.StringType, false), + DataTypes.createStructField("range", DataTypes.StringType, false))) + + val rows = new ArrayList[Row] + rows.add(RowFactory.create("id1", "2019-03-11 05:00:00", "00h00-07h30;23h30-23h59")) + rows.add(RowFactory.create("id2", "2019-03-11 09:00:00", "00h00-07h30;23h30-23h59")) + rows.add(RowFactory.create("id3", "2019-03-11 10:30:00", "00h00-07h30;23h30-23h59")) + + spark.createDataFrame(rows, schema) + } + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab900_in_range/InRangeScalaService.scala b/src/main/scala/net/jgp/books/spark/ch14/lab900_in_range/InRangeScalaService.scala new file mode 100644 index 0000000..347acf1 --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab900_in_range/InRangeScalaService.scala @@ -0,0 +1,33 @@ +package net.jgp.books.spark.ch14.lab900_in_range + + +trait InRangeScalaService { + + @throws[Exception] + def call(range: String, event: Integer): Boolean + +} + +object InRangeScalaService extends InRangeScalaService { + + @throws[Exception] + override def call(range: String, event: Integer): Boolean = { + println(s"-> call(${range}, ${event})") + val ranges = range.split(";") + for (i <- 0 until ranges.length) { + println(s"Processing range #${i}: ${ranges(i)}") + val hours = ranges(i).split("-") + + val start = Integer.valueOf(hours(0).substring(0, 2)) * 3600 + + Integer.valueOf(hours(0).substring(3)) * 60 + + val end = Integer.valueOf(hours(1).substring(0, 2)) * 3600 + + Integer.valueOf(hours(1).substring(3)) * 60 + + println(s"Checking between ${start} and ${end}") + if (event >= start && event <= end) return true + } + false + } + +} \ No newline at end of file diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab900_in_range/InRangeScalaUdf.scala b/src/main/scala/net/jgp/books/spark/ch14/lab900_in_range/InRangeScalaUdf.scala new file mode 100644 index 0000000..63f785a --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab900_in_range/InRangeScalaUdf.scala @@ -0,0 +1,17 @@ +package net.jgp.books.spark.ch14.lab900_in_range + +import org.apache.spark.sql.api.java.UDF2 + +/** + * The UDF code itself provides the plumbing between the service code and + * the application code. + * + * @author rambabu.posa + * + */ +@SerialVersionUID(-21621751L) +class InRangeScalaUdf extends UDF2[String, Integer, Boolean] { + + def call(range: String, event: Integer): Boolean = + InRangeScalaService.call(range, event) +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab910_addition/AdditionScalaApp.scala b/src/main/scala/net/jgp/books/spark/ch14/lab910_addition/AdditionScalaApp.scala new file mode 100644 index 0000000..56b2d31 --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab910_addition/AdditionScalaApp.scala @@ -0,0 +1,67 @@ +package net.jgp.books.spark.ch14.lab910_addition + +import java.util.ArrayList + +import org.apache.spark.sql.types.{DataTypes, StructField} +import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession, functions => F} + +/** + * Additions via UDF. + * + * @author rambabu.posa + */ +object AdditionScalaApp { + + /** + * main() is your entry point to the application. + * + * @param args + */ + def main(args: Array[String]): Unit = { + + /** + * The processing code. + */ + // Creates a session on a local master + val spark: SparkSession = SparkSession.builder + .appName("Addition") + .master("local[*]") + .getOrCreate + + spark.udf.register("add_int", new IntegerAdditionScalaUdf, DataTypes.IntegerType) + spark.udf.register("add_string", new StringAdditionScalaUdf, DataTypes.StringType) + + var df = createDataframe(spark) + df.show(false) + + df = df + .withColumn("concat", + F.callUDF("add_string", F.col("fname"), F.col("lname"))) + + df.show(false) + + df = df + .withColumn("score", + F.callUDF("add_int", F.col("score1"), F.col("score2"))) + + df.show(false) + + spark.stop + } + + private def createDataframe(spark: SparkSession): Dataset[Row] = { + val schema = DataTypes.createStructType(Array[StructField]( + DataTypes.createStructField("fname", DataTypes.StringType, false), + DataTypes.createStructField("lname", DataTypes.StringType, false), + DataTypes.createStructField("score1", DataTypes.IntegerType, false), + DataTypes.createStructField("score2", DataTypes.IntegerType, false))) + + val rows = new ArrayList[Row] + rows.add(RowFactory.create("Jean-Georges", "Perrin", int2Integer(123), int2Integer(456))) + rows.add(RowFactory.create("Jacek", "Laskowski", int2Integer(147), int2Integer(758))) + rows.add(RowFactory.create("Holden", "Karau", int2Integer(258), int2Integer(369))) + + spark.createDataFrame(rows, schema) + } + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab910_addition/AdditionScalaStyleApp.scala b/src/main/scala/net/jgp/books/spark/ch14/lab910_addition/AdditionScalaStyleApp.scala new file mode 100644 index 0000000..a0bf21c --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab910_addition/AdditionScalaStyleApp.scala @@ -0,0 +1,67 @@ +package net.jgp.books.spark.ch14.lab910_addition + +import java.util.ArrayList + +import org.apache.spark.sql.types.{DataTypes, StructField} +import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession, functions => F} + +/** + * Additions via UDF. + * + * @author rambabu.posa + */ +object AdditionScalaStyleApp { + + /** + * main() is your entry point to the application. + * + * @param args + */ + def main(args: Array[String]): Unit = { + + /** + * The processing code. + */ + // Creates a session on a local master + val spark: SparkSession = SparkSession.builder + .appName("Addition") + .master("local[*]") + .getOrCreate + + spark.udf.register("add_int", (a:Int,b:Int) => a+b) + spark.udf.register("add_string", (a:String,b:String) => a+b) + + var df = createDataframe(spark) + df.show(false) + + df = df + .withColumn("concat", + F.callUDF("add_string", F.col("fname"), F.col("lname"))) + + df.show(false) + + df = df + .withColumn("score", + F.callUDF("add_int", F.col("score1"), F.col("score2"))) + + df.show(false) + + spark.stop + } + + private def createDataframe(spark: SparkSession): Dataset[Row] = { + val schema = DataTypes.createStructType(Array[StructField]( + DataTypes.createStructField("fname", DataTypes.StringType, false), + DataTypes.createStructField("lname", DataTypes.StringType, false), + DataTypes.createStructField("score1", DataTypes.IntegerType, false), + DataTypes.createStructField("score2", DataTypes.IntegerType, false))) + + val rows = new ArrayList[Row] + rows.add(RowFactory.create("Jean-Georges", "Perrin", int2Integer(123), int2Integer(456))) + rows.add(RowFactory.create("Jacek", "Laskowski", int2Integer(147), int2Integer(758))) + rows.add(RowFactory.create("Holden", "Karau", int2Integer(258), int2Integer(369))) + + spark.createDataFrame(rows, schema) + } + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab910_addition/IntegerAdditionScalaUdf.scala b/src/main/scala/net/jgp/books/spark/ch14/lab910_addition/IntegerAdditionScalaUdf.scala new file mode 100644 index 0000000..d007731 --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab910_addition/IntegerAdditionScalaUdf.scala @@ -0,0 +1,13 @@ +package net.jgp.books.spark.ch14.lab910_addition + +import org.apache.spark.sql.api.java.UDF2 + +@SerialVersionUID(-2162134L) +class IntegerAdditionScalaUdf extends UDF2[Int, Int, Int] { + + @throws[Exception] + override def call(t1: Int, t2: Int): Int = + t1 + t2 + +} + diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab910_addition/StringAdditionScalaUdf.scala b/src/main/scala/net/jgp/books/spark/ch14/lab910_addition/StringAdditionScalaUdf.scala new file mode 100644 index 0000000..f7a3fb3 --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab910_addition/StringAdditionScalaUdf.scala @@ -0,0 +1,12 @@ +package net.jgp.books.spark.ch14.lab910_addition + +import org.apache.spark.sql.api.java.UDF2 + +@SerialVersionUID(-2162134L) +class StringAdditionScalaUdf extends UDF2[String, String, String] { + + @throws[Exception] + override def call(t1: String, t2: String): String = + t1 + t2 + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab911_addition_fail1/AdditionFail1ScalaApp.scala b/src/main/scala/net/jgp/books/spark/ch14/lab911_addition_fail1/AdditionFail1ScalaApp.scala new file mode 100644 index 0000000..dcb396a --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab911_addition_fail1/AdditionFail1ScalaApp.scala @@ -0,0 +1,70 @@ +package net.jgp.books.spark.ch14.lab911_addition_fail1 + +import java.util.ArrayList + +import org.apache.spark.sql.functions.{callUDF, col} +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession} + +/** + * Additions via UDF. + * + * @author rambabu.posa + */ +object AdditionFail1ScalaApp { + + /** + * main() is your entry point to the application. + * + * @param args + */ + def main(args: Array[String]): Unit = { + + /** + * The processing code. + */ + // Creates a session on a local master + val spark: SparkSession = SparkSession.builder + .appName("Addition") + .master("local[*]") + .getOrCreate + + spark.udf.register("add", new IntegerAdditionScalaUdf, DataTypes.IntegerType) + spark.udf.register("add", new StringAdditionScalaUdf, DataTypes.StringType) + + var df = createDataframe(spark) + df.show(false) + + df = df.withColumn("concat", callUDF("add", col("fname"), col("lname"))) + df.show(false) + + // The next operation will fail with an error: + // Exception in thread "main" org.apache.spark.SparkException: Failed to + // execute user defined function($anonfun$261: (int, int) => string) + + df = df + .withColumn("score", + callUDF("add", col("score1"), col("score2"))) + + df.show(false) + + spark.stop + } + + private def createDataframe(spark: SparkSession): Dataset[Row] = { + val schema: StructType = DataTypes.createStructType(Array[StructField]( + DataTypes.createStructField("fname", DataTypes.StringType, false), + DataTypes.createStructField("lname", DataTypes.StringType, false), + DataTypes.createStructField("score1", DataTypes.IntegerType, false), + DataTypes.createStructField("score2", DataTypes.IntegerType, false))) + + val rows = new ArrayList[Row] + rows.add(RowFactory.create("Jean-Georges", "Perrin", int2Integer(123), int2Integer(456))) + rows.add(RowFactory.create("Jacek", "Laskowski", int2Integer(147), int2Integer(758))) + rows.add(RowFactory.create("Holden", "Karau", int2Integer(258), int2Integer(369))) + + spark.createDataFrame(rows, schema) + } + + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab911_addition_fail1/IntegerAdditionScalaUdf.scala b/src/main/scala/net/jgp/books/spark/ch14/lab911_addition_fail1/IntegerAdditionScalaUdf.scala new file mode 100644 index 0000000..3c60fad --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab911_addition_fail1/IntegerAdditionScalaUdf.scala @@ -0,0 +1,13 @@ +package net.jgp.books.spark.ch14.lab911_addition_fail1 + +import org.apache.spark.sql.api.java.UDF2 + +@SerialVersionUID(-2162134L) +class IntegerAdditionScalaUdf extends UDF2[Int, Int, Int] { + + @throws[Exception] + override def call(t1: Int, t2: Int): Int = + t1 + t2 + +} + diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab911_addition_fail1/StringAdditionScalaUdf.scala b/src/main/scala/net/jgp/books/spark/ch14/lab911_addition_fail1/StringAdditionScalaUdf.scala new file mode 100644 index 0000000..6e7da24 --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab911_addition_fail1/StringAdditionScalaUdf.scala @@ -0,0 +1,12 @@ +package net.jgp.books.spark.ch14.lab911_addition_fail1 + +import org.apache.spark.sql.api.java.UDF2 + +@SerialVersionUID(-2162134L) +class StringAdditionScalaUdf extends UDF2[String, String, String] { + + @throws[Exception] + override def call(t1: String, t2: String): String = + t1 + t2 + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab912_addition_fail2/AdditionFail2ScalaApp.scala b/src/main/scala/net/jgp/books/spark/ch14/lab912_addition_fail2/AdditionFail2ScalaApp.scala new file mode 100644 index 0000000..ae7f2ea --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab912_addition_fail2/AdditionFail2ScalaApp.scala @@ -0,0 +1,72 @@ +package net.jgp.books.spark.ch14.lab912_addition_fail2 + +import java.util.ArrayList + +import org.apache.spark.sql.functions.{callUDF, col} +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession} + +/** + * Additions via UDF. + * + * @author rambabu.posa + */ +object AdditionFail2ScalaApp { + + /** + * main() is your entry point to the application. + * + * @param args + */ + def main(args: Array[String]): Unit = { + + /** + * The processing code. + */ + // Creates a session on a local master + val spark: SparkSession = SparkSession.builder + .appName("Addition") + .master("local[*]") + .getOrCreate + + spark.udf.register("add", new IntegerAdditionScalaUdf, DataTypes.StringType) // Same return type + + spark.udf.register("add", new StringAdditionScalaUdf, DataTypes.StringType) + + var df = createDataframe(spark) + df.show(false) + + df = df + .withColumn("concat", + callUDF("add", col("fname"), col("lname"))) + + df.show(false) + + // The next operation will fail with an error: + // Exception in thread "main" org.apache.spark.SparkException: Failed to + // execute user defined function($anonfun$261: (int, int) => string) + + df = df + .withColumn("score", + callUDF("add", col("score1"), col("score2"))) + + df.show(false) + + } + + private def createDataframe(spark: SparkSession): Dataset[Row] = { + val schema: StructType = DataTypes.createStructType(Array[StructField]( + DataTypes.createStructField("fname", DataTypes.StringType, false), + DataTypes.createStructField("lname", DataTypes.StringType, false), + DataTypes.createStructField("score1", DataTypes.IntegerType, false), + DataTypes.createStructField("score2", DataTypes.IntegerType, false))) + + val rows = new ArrayList[Row] + rows.add(RowFactory.create("Jean-Georges", "Perrin", int2Integer(123), int2Integer(456))) + rows.add(RowFactory.create("Jacek", "Laskowski", int2Integer(147), int2Integer(758))) + rows.add(RowFactory.create("Holden", "Karau", int2Integer(258), int2Integer(369))) + + spark.createDataFrame(rows, schema) + } + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab912_addition_fail2/IntegerAdditionScalaUdf.scala b/src/main/scala/net/jgp/books/spark/ch14/lab912_addition_fail2/IntegerAdditionScalaUdf.scala new file mode 100644 index 0000000..d430639 --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab912_addition_fail2/IntegerAdditionScalaUdf.scala @@ -0,0 +1,17 @@ +package net.jgp.books.spark.ch14.lab912_addition_fail2 + +import org.apache.spark.sql.api.java.UDF2 + +/** + * Return type to String + * + * @author rambabu.posa + */ +@SerialVersionUID(-2162134L) +class IntegerAdditionScalaUdf extends UDF2[Int, Int, String] { + + @throws[Exception] + override def call(t1: Int, t2: Int): String = + (t1 + t2).toString + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab912_addition_fail2/StringAdditionScalaUdf.scala b/src/main/scala/net/jgp/books/spark/ch14/lab912_addition_fail2/StringAdditionScalaUdf.scala new file mode 100644 index 0000000..fbf4d42 --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab912_addition_fail2/StringAdditionScalaUdf.scala @@ -0,0 +1,17 @@ +package net.jgp.books.spark.ch14.lab912_addition_fail2 + +import org.apache.spark.sql.api.java.UDF2 + +/** + * Concatenate two strings. + * + * @author rambabu.posa + */ +@SerialVersionUID(-2162134L) +class StringAdditionScalaUdf extends UDF2[String, String, String] { + + @throws[Exception] + override def call(t1: String, t2: String): String = + t1 + t2 + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab920_column_as_parameter/ColumnAdditionScalaApp.scala b/src/main/scala/net/jgp/books/spark/ch14/lab920_column_as_parameter/ColumnAdditionScalaApp.scala new file mode 100644 index 0000000..51dade7 --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab920_column_as_parameter/ColumnAdditionScalaApp.scala @@ -0,0 +1,74 @@ +package net.jgp.books.spark.ch14.lab920_column_as_parameter + +import java.util.ArrayList + +import org.apache.spark.sql.functions.{array, callUDF} +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.apache.spark.sql._ +import scala.collection.mutable + +/** + * Column Additions via UDF. + * + * @author rambabu.posa + */ +object ColumnAdditionScalaApp { + private val COL_COUNT = 8 + + /** + * main() is your entry point to the application. + * + * @param args + */ + def main(args: Array[String]): Unit = { + + /** + * The processing code. + */ + // Creates a session on a local master + val spark: SparkSession = SparkSession.builder + .appName("Column addition") + .master("local[*]") + .getOrCreate + + spark.udf.register("add", new ColumnAdditionScalaUdf, DataTypes.IntegerType) + + var df = createDataframe(spark) + df.show(false) + + var cols = List[Column]() + for (i <- 0 until COL_COUNT) { + cols = cols :+ df.col("c" + i) + } + + val col = array(cols:_*) + + df = df.withColumn("sum", callUDF("add", col)) + df.show(false) + + spark.stop + } + + private def createDataframe(spark: SparkSession): Dataset[Row] = { + val schema: StructType = DataTypes.createStructType(Array[StructField]( + DataTypes.createStructField("c0", DataTypes.IntegerType, false), + DataTypes.createStructField("c1", DataTypes.IntegerType, false), + DataTypes.createStructField("c2", DataTypes.IntegerType, false), + DataTypes.createStructField("c3", DataTypes.IntegerType, false), + DataTypes.createStructField("c4", DataTypes.IntegerType, false), + DataTypes.createStructField("c5", DataTypes.IntegerType, false), + DataTypes.createStructField("c6", DataTypes.IntegerType, false), + DataTypes.createStructField("c7", DataTypes.IntegerType, false))) + + val rows = new ArrayList[Row] + rows.add(RowFactory.create(int2Integer(1), int2Integer(2), int2Integer(4), int2Integer(8), + int2Integer(16), int2Integer(32), int2Integer(64), int2Integer(128))) + rows.add(RowFactory.create(int2Integer(0), int2Integer(0), int2Integer(0), int2Integer(0), + int2Integer(0), int2Integer(0), int2Integer(0), int2Integer(0))) + rows.add(RowFactory.create(int2Integer(1), int2Integer(1), int2Integer(1), int2Integer(1), + int2Integer(1), int2Integer(1), int2Integer(1), int2Integer(1))) + + spark.createDataFrame(rows, schema) + } + +} diff --git a/src/main/scala/net/jgp/books/spark/ch14/lab920_column_as_parameter/ColumnAdditionScalaUdf.scala b/src/main/scala/net/jgp/books/spark/ch14/lab920_column_as_parameter/ColumnAdditionScalaUdf.scala new file mode 100644 index 0000000..ac4215e --- /dev/null +++ b/src/main/scala/net/jgp/books/spark/ch14/lab920_column_as_parameter/ColumnAdditionScalaUdf.scala @@ -0,0 +1,18 @@ +package net.jgp.books.spark.ch14.lab920_column_as_parameter + +import org.apache.spark.sql.api.java.UDF1 +import scala.collection.Seq + +@SerialVersionUID(8331L) +class ColumnAdditionScalaUdf extends UDF1[Seq[Int], Int] { + + @throws[Exception] + override def call(t1: Seq[Int]): Int = { + var res = 0 + for (value <- t1) { + res += value + } + res + } + +}