Wednesday, August 14, 2019

Connect to Oracle DB Using Spark/Scala

Form a jdbc connection string
val jdbcHostname = "10.21.31.41"
val jdbcPort = 1521
val jdbcDatabase = "dbname" 
val jdbcUsername = "user1"
val jdbcPassword = password1 
val driverClass = "oracle.jdbc.driver.OracleDriver"
val jdbcUrl = s"jdbc:oracle:thin:@$jdbcHostname:$jdbcPort/$jdbcDatabase"

Set Up connection properties
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", s"$jdbcUsername")
connectionProperties.put("password", s"$jdbcPassword")
connectionProperties.setProperty("Driver", driverClass)
connectionProperties.put("fetchsize", "100000")

Form a query and create a spark data frame
val query = "(select * from table1)"
val df = spark.read.jdbc(jdbcUrl, query, connectionProperties)

Note: 


  1. Ensure that the oracle jar is included in the spark-shell or spark-submit commands to invoke the driverClass to make a connection
  2. More details on the fetchsize parameter which improves the data retrieval performance https://docs.oracle.com/cd/A87860_01/doc/java.817/a83724/resltse5.htm


Tuesday, August 13, 2019

Use JCEKS - Avoid Clear text passwords

The below code snippets creates a JCEKS credential store where passwords can be saved and the credential store can be used in sqoop jobs, hadoop/spark code to refer to the passwords.

Create a Credential store

:~$ hadoop credential create user1.dbname.alias -value password1 -provider jceks:///expl/dl_explr/lib/dbname.jceks
user1.dbname.alias has been successfully created.
org.apache.hadoop.security.alias.JavaKeyStoreProvider has been updated.

:~$ hadoop credential create user2.dbname.alias -value password2 -provider jceks:///expl/dl_explr/lib/dbname.jceks

user2.dbname.alias has been successfully created.
org.apache.hadoop.security.alias.JavaKeyStoreProvider has been updated.

List entries in the credential store. It will display the alias names (as opposed to the actual password)

:~$ hadoop credential list -provider jceks:///expl/dl_explr/lib/dbname.jceks
Listing aliases for CredentialProvider: jceks:///expl/dl_explr/lib/dbname.jceks
user2.dbname.alias
user1.dbname.alias

One can delete entries from the credential store.

:~$ hadoop credential delete user1.dbname.alias -provider jceks:///expl/dl_explr/lib/dbname.jceks
You are about to DELETE the credential user1.dbname.alias from CredentialProvider jceks:///expl/dl_explr/lib/dbname.jceks. Continue?  (Y or N) Y
Deleting credential: user1.dbname.alias from CredentialProvider: jceks:///expl/dl_explr/lib/dbname.jceks
user1.dbname.alias has been successfully deleted.
org.apache.hadoop.security.alias.JavaKeyStoreProvider has been updated.

Read Values from credential store

scala> import org.apache.hadoop.security.alias.CredentialProviderFactory
import org.apache.hadoop.security.alias.CredentialProviderFactory

scala> val conf = new org.apache.hadoop.conf.Configuration()

conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml

scala> val alias = "user1.dbname.alias"

alias: String = user1.dbname.alias

scala> val jceksPath = "jceks:///expl/dl_explr/lib/dbname.jceks"

jceksPath: String = jceks:///expl/dl_explr/lib/dbname.jceks

scala> conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, jceksPath)


//getPassword Returns Array[Char]

scala> val password = conf.getPassword(alias).mkString
password: String = password1

Saturday, August 10, 2019

SQL Stuff

Get counts of all tables in an oracle database with one query

select table_name, to_number(extractvalue(xmltype(dbms_xmlgen.getxml('select count(*) c from '||owner||'.'||table_name)),'/ROWSET/ROW/C')) as count
from all_tables
where owner = 'MASTER'

Thursday, August 8, 2019

Handle Multi Line strings in scala????

While coding spark SQL, we quite often encounter huge SQL queries which are written/hand-picked from data warehouse world. We store them in a string and try to execute them as a Data frame or some other purposes. The multiple lines in the sql queries and single quotes enclosing values within the sql clauses cause problems to add the entire SQL as a string to a variable. One ugly solution is to create the huge query into a single line which is not readable later. Below is the solution for it :)

a typical string declaration in scala

val a="kiran"

In Scala you create multiline strings by surrounding your text with three double quotes:

val a = """This is 
    a multiline
    String"""

A cleaner approach is to add the stripMargin method to the end of your multiline string, and begin all lines after the first line with the pipe symbol (|):

scala> val speech = """Four score and
     |                |seven years ago""".stripMargin

speech: String =
Four score and
seven years ago

If you don’t like using the | symbol, you can use any character you like with the stripMargin method:

scala> val speech = """Four score and
     |                #seven years ago""".stripMargin('#')
speech: String =
Four score and
seven years ago

Another nice feature of Scala’s multiline string syntax is that you can include single- and double-quotes without having to escape them:

scala> val a=""" This is "a multi" line 'string' syntax """
a: String = " This is "a multi" line 'string' syntax "

See below it is needed to escape if we dont use tripe quotes.

scala> val a="This is \"a multi\" line 'string' syntax "
a: String = "This is "a multi" line 'string' syntax "

Simple isn't it? Just use tripe quotes for string declaration and you will do away with the multi-line and escaping issues...!!!

Monday, July 29, 2019

Build a Scala FAT Jar Using SBT

Make a directory structure as below

~/sbt_build  - This is the root directory
  - src/main/scala/   - Place the .scala files here
  - target  - This will get created once the jar is built automatically
  - project - This will get created once the jar is built automatically
  - build.sbt - Prepare this file and place it here

Contents of build.sbt

name := "SampleApp"

version := "1.0"
scalaVersion := "2.11.6"
val sparkVersion = "2.2.0"

libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion % "provided"
libraryDependencies += "oracle" % "ojdbc6" % "11.1.0.6"

libraryDependencies += "com.cedarsoftware" % "java-util" % "1.8.0"

See above that there is an oracle jar dependency that is added and that is not available from any maven repository. The ojdbc jar has to be placed in the below directory for SBT to automatically add it to the final JAR file. Any Jar that is placed under the lib directory will get added to the final JAR.

~/sbt_build/project/lib

Also, there are few ways to create JAR files using sbt. We will be using "sbt assembly" plugin to create one FAT jar. In order to add the plugin to the sbt place a file by name "plugins.sbt" in the project folder.

~/sbt_build/project/plugins.sbt

Add the below line in the plugins.sbt file
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")

At the parent directory run the below 
~/sbt_build$ sbt assembly

Once the build is successful the jar file can be found at 
~/sbt_build/target/scala-2.11/SimpleApp-assembly-1.0.jar ...

Submit the jar on the cluster to run the application 

spark-submit --class SimpleApp--master yarn target/scala-2.11/SimpleApp-assembly-1.0.jar


Wednesday, July 24, 2019

Zeppelin - Additional Dependencies(Jars/Packages)



Zeppelin Interpreter properties to add additional JARs, package dependencies

Upload the needed JAR files to HDFS.

Add the property in the Livy interpreter settings




  • livy.spark.jars  - This is comma separated list of JAR files available on HDFS
  • livy.spark.jars.packages - This is a comma separated list of dependencies in the maven repository. Maven central repository is available by default in the Zeppelin. One can add other repositories too to add additional dependencies.






Zeppelin Livy Interpreter settings



Thursday, July 11, 2019

Parquet issues - Hive vs Spark

Reading and writing a parquet file is same in Hive and spark till spark 1.4

Post that spark complied with the latest parquet upgrades and hence if one creates a parquet file in the latest spark version and built and external hive table on top of that file, we face the below kind of errors.

org.apache.parquet.io.parquetdecodingexception: can not read value at 0 in block -1

Spark documentation (link here) has given workarounds to get over these issues.
There are two main properties

spark.sql.parquet.writeLegacyFormat (default: false)   -- If true, data will be written in a way of Spark 1.4 and earlier. For example, decimal values will be written in Apache Parquet's fixed-length byte array format, which other systems such as Apache Hive and Apache Impala use. If false, the newer format in Parquet will be used. For example, decimals will be written in int-based format. If Parquet output is intended for use with systems that do not support this newer format, set to true.

spark.sql.hive.convertMetastoreParquet (default: true)  -- When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in support.


Set the values to true/false based on the requirement.