Skip to content

Commit

Permalink
Merge pull request #2 from am-giordano/v020
Browse files Browse the repository at this point in the history
V020
  • Loading branch information
am-giordano authored Jul 31, 2022
2 parents edfb425 + c9b0869 commit f30c097
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
/.idea/
/data/output/
/project/
/releases/
/target/
57 changes: 40 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,38 @@ DataFrames with flat columns interrelated by foreign keys.

## Add as a dependency to your project

sbt:
### spark-shell, pyspark, or spark-submit

```
libraryDependencies += "com.amgiordano.spark" % "spark-relational" % "0.1.0"
> $SPARK_HOME/bin/spark-shell --packages am-giordano:spark-relational:0.2.0
```

### sbt

```
resolvers += "Spark Packages Repo" at "https://repos.spark-packages.org/"
libraryDependencies += "am-giordano" % "spark-relational" % "0.2.0"
```

### Maven

```
<dependencies>
<!-- list of dependencies -->
<dependency>
<groupId>am-giordano</groupId>
<artifactId>spark-relational</artifactId>
<version>0.2.0</version>
</dependency>
</dependencies>
<repositories>
<!-- list of other repositories -->
<repository>
<id>SparkPackagesRepo</id>
<url>https://repos.spark-packages.org/</url>
</repository>
</repositories>
```

## Example of use: Read-Convert-Write
Expand All @@ -30,7 +58,7 @@ scala> rs.dataFrames.foreach(item => item._2.write.option("header", "true").csv(
```

This example shows how to load a JSON file into a DataFrame, convert it into an interrelated set of flat DataFrames,
and write these in CSV files. The dataset used can be found in the GitHub repository of this package in
and write these to CSV files. The dataset used can be found in the GitHub repository of this package in
`data/input/resumes.json`.

Each document in this dataset has a complex structure with nested objects and arrays.
Expand Down Expand Up @@ -70,18 +98,16 @@ the DataFrame and a name for the main entity of the dataset, "person":

```
scala> import com.amgiordano.spark.relational.RelationalSchema
scala> val rs = new RelationalSchema(df, "person")
scala> val rs = RelationalSchema(df, "person")
```

Now we can look at each of the tables:

```
scala> rs.dataFrames.foreach(
| item => {
| println(item._1) // First element of the item is the table name
| item._2.show // Second element of the item if the DataFrame
| }
| )
scala> for ((tableName, df) <- rs.dataFrames) {
| println(tableName)
| df.show
| }
```

Output:
Expand Down Expand Up @@ -131,13 +157,10 @@ experience!!technologies
+--------------------------------+------------------+--------------+------------------------------+---------------------------------+
```

Finally, let's write each DataFrame as a CSV file:
Finally, let's write each DataFrame to a CSV file:

```
scala> rs.dataFrames.foreach(
| item => item._2
| .write
| .option("header", "true")
| .csv(s"data/output/resumes/${item._1}")
| )
scala> for ((tableName, df) <- rs.dataFrames) {
| df.write.option("header", "true").csv(s"data/output/resumes/$tableName")
| }
```
8 changes: 6 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
name := "spark-relational"
version := "0.1.1"
version := "0.2.0"
organization := "com.amgiordano.spark"
scalaVersion := "2.12.15"
publishMavenStyle := true
libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "3.3.0"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "3.3.0",
"org.scalatest" %% "scalatest" % "3.3.0-SNAP3"
)
27 changes: 27 additions & 0 deletions mkartifact.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
username=am-giordano

version=$(grep 'version :=' build.sbt | sed 's/version := "\(.*\)"/\1/')
group_id=$(grep 'organization :=' build.sbt | sed 's/organization := "\(.*\)"/\1/')
artifact_id=$(grep 'name :=' build.sbt | sed 's/name := "\(.*\)"/\1/')
scala_version=$(grep 'scalaVersion :=' build.sbt | sed 's/scalaVersion := "\(.*\)\..*"/\1/')
target_prefix=target/scala-"$scala_version"/"$artifact_id"_"$scala_version"-"$version"
jar="$artifact_id"-"$version".jar
pom="$artifact_id"-"$version".pom

sbt publishLocal

cp "$target_prefix".jar "$jar"
cp "$target_prefix".pom "$pom"

echo "JAR STRUCTURE"
jar tf "$jar"

echo "POM CONTENTS"
cat "$pom"

sed -ibackup "s/$group_id/$username/" "$pom"
sed -ibackup "s/${artifact_id}_$scala_version/$artifact_id/" "$pom"

zip "releases/$artifact_id-$version.zip" "$jar" "$pom"

rm "$jar" "$pom" "$pom"backup
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.apache.spark.sql.functions.{col, monotonically_increasing_id}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class RelationalSchema(dfMain: DataFrame, mainEntityName: String = "main") {
case class RelationalSchema(dfMain: DataFrame, mainEntityName: String = "main") {

type TableMap = mutable.LinkedHashMap[String, DataFrame]
type TripletBuffer = ArrayBuffer[(String, DataFrame, Array[String])]
Expand All @@ -20,7 +20,7 @@ class RelationalSchema(dfMain: DataFrame, mainEntityName: String = "main") {
while (toProcess.nonEmpty) {
var (entityName, df, foreignKeys) = toProcess.remove(0)
while (dataFrames.keySet.contains(entityName)) entityName += "_"
val tab = new Tabulator(entityName, insertIndex(df, entityName), foreignKeys)
val tab = Tabulator(entityName, insertIndex(df, entityName), foreignKeys)
val (dfNew, fromArrayTriplets) = tab.tabulate()
dataFrames.update(entityName, dfNew)
toProcess ++= fromArrayTriplets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, explode}
import org.apache.spark.sql.types.{ArrayType, StructType}

class Tabulator(entityName: String, var df: DataFrame, foreignKeys: Array[String]) {
case class Tabulator(entityName: String, var df: DataFrame, foreignKeys: Array[String]) {

type TripletArray = Array[(String, DataFrame, Array[String])]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.amgiordano.spark.relational

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.Assertion
import org.scalatest.funsuite.AnyFunSuite

class RelationalSchemaSuite extends AnyFunSuite {

val spark: SparkSession = SparkSession.builder.master("local[1]").getOrCreate
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._

def assertSameRS(inputStrings: Seq[String], expectedSchema: Map[String, Seq[String]]): Assertion = {
val dfInput = spark.read.json(inputStrings.toDS)
val rs = RelationalSchema(dfInput)
assert(rs.dataFrames.forall(item => assertSameDataFrames(item._2, spark.read.json(expectedSchema(item._1).toDS))))
}

def assertSameDataFrames(dfActual: DataFrame, dfExpected: DataFrame): Boolean = {
val aCols = dfActual.columns.sorted
val eCols = dfExpected.columns.sorted
if (aCols sameElements eCols) aCols.forall(c => values(dfActual, c) sameElements values(dfExpected, c)) else false
}

def values(df: DataFrame, colName: String): Array[Any] = df.select(colName).collect.map(_(0))

test("Flat document") {
assertSameRS(Seq("{'a': 0}"), Map("main" -> Seq("{'main!!__id__': 0, 'a': 0}")))
}

test("With object") {
assertSameRS(Seq("{'a': {'b': 0}}"), Map("main" -> Seq("{'main!!__id__': 0, 'a!!b': 0}")))
}

test("With array") {
assertSameRS(
Seq("{'a': [0]}"),
Map(
"main" -> Seq("{'main!!__id__': 0}"),
"a" -> Seq("{'a!!__id__': 0, 'main!!__id__': 0, 'a': 0}")
)
)
}

test("With array of objects") {
assertSameRS(
Seq("{'a': [{'b': 0}]}"),
Map(
"main" -> Seq("{'main!!__id__': 0}"),
"a" -> Seq("{'a!!__id__': 0, 'main!!__id__': 0, 'a!!b': 0}")
)
)
}
}

0 comments on commit f30c097

Please sign in to comment.