Building a Lakehouse with Oracle Data Platform

Eloi Lopes
9 min readMar 5, 2024

--

In this blog, we’ll delve into a real-world customer use case that Alex and I have been working on in recent weeks. The customer’s primary requirement was to establish a Lakehouse architecture to manage their data, with a focus on achieving near-real-time processing. While we’ll simplify some of the technical specifications for clarity, we aim to provide a comprehensive understanding of the solution implemented.

What is a Lakehouse?

A data lakehouse can be defined as a modern data platform built from a combination of a data lake and a data warehouse. More specifically, a data lakehouse takes the flexible storage of unstructured data from a data lake and the management features and tools from data warehouses, then strategically implements them together as a larger system. This integration of two unique tools brings the best of both worlds to users. [1]

What are the prerequisites for this use case?

The customer wants to read data in real time from source systems and replicate the data as is, in Object Storage bucket that we call Raw Zone. Once the data arrives there, OCI Data Flow will be running in near real time (every 10 seconds) to capture new data and merge into Silver Zone using Delta Lake.

The Gold Zone (Autonomous Database Warehousse) will have different tables, some of them will be loaded and aggregated by OCI Data Integration and other ones will be external tables connected to silver zone.

What are the products required for Lakehouse solution?

Oracle GoldenGate for Oracle and Oracle GoldenGate for Big Data to capture data in real time from Oracle databases and load into Object Storage.

OCI Data Flow: To create and update delta lake tables in Silver Zone.

OCI Data Integration: Necessary for loading data that does not require real-time processing and for aggregating data in the Gold Zone.

Autonomous Data Warehouse: This is where hot data is stored for the last three months, after which it is offloaded to Object Storage.

OCI Data Catalog: Optionally used for metadata management.

Implementation Phase

The first part of this implementation is to capture data in real time from source databases. In this case, we are using a Oracle database as a source, but it could be any other database supported by Oracle GoldenGate.

Setting up GoldenGate

To achieve the required daily and monthly partitioning of data in the Object Storage, GoldenGate is employed. Configuration involves creating three connections: for the Oracle Database Cloud Service (DBCS), OCI Object Storage, and OCI GoldenGate for Big Data. Following the setup, replication from DBCS to OCI Object Storage is configured as per the provided documentation.

Steps for creating a replication end-to-end from DBCS to OCI Object Storage, could follow the steps from the documentation links:

- https://docs.oracle.com/en/cloud/paas/goldengate-service/retlo/#articletitle

- https://docs.oracle.com/en/cloud/paas/goldengate-service/yvyvx/index.html#articletitle

After configuration was done of both versions of GoldenGate, very important thing is to correctly set the property file of GoldenGate for BigData. This property file will help to convert data from trail file into daily partitioned parquet files partitioned and invoking its value into the file name.

GoldenGate property file to generate parquet files:

Property file:
# Properties file for Replicat TABLE_1
# OCI Event Handler Template
gg.eventhandler.oci.connectionId={will be populated automatically}
#TODO: Edit the OCI compartment OCID
gg.eventhandler.oci.compartmentID={enter here the compartment OCID}
#TODO: Edit the OCI bucket name
gg.eventhandler.oci.bucketMappingTemplate={RAW ZONE bucket}
#The File Writer Handler
gg.handlerlist=filewriter
gg.handler.filewriter.type=filewriter
gg.handler.filewriter.mode=op
gg.handler.filewriter.pathMappingTemplate=./dirout
gg.handler.filewriter.stateFileDirectory=./dirsta
gg.handler.filewriter.fileRollInterval=7m
gg.handler.filewriter.inactivityRollInterval=3s
# To create partitions based on column X that you define and will output it’s value from string 0-10
gg.handler.filewriter.partitioner.TABLE_1=DAY=${substring[${columnValue[partitioned column]}][0][10]}
gg.handler.filewriter.fileWriteActiveSuffix=.tmp
gg.handler.filewriter.finalizeAction=delete
### Avro OCF
gg.handler.filewriter.format=avro_row_ocf
gg.handler.filewriter.fileNameMappingTemplate=${groupName}_${fullyQualifiedTableName}_${currentTimestamp}.avro
gg.handler.filewriter.format.pkUpdateHandling=delete-insert
gg.handler.filewriter.format.metaColumnsTemplate=${optype},${position}
gg.handler.filewriter.format.iso8601Format=false
gg.handler.filewriter.partitionByTable=true
gg.handler.filewriter.rollOnShutdown=true
##The Parquet Event Handler
gg.handler.filewriter.eventHandler=parquet
gg.eventhandler.parquet.type=parquet
gg.eventhandler.parquet.pathMappingTemplate=./dirparquet

#Selecting the OCI Event Handle
gg.eventhandler.parquet.eventHandler=oci

gg.eventhandler.oci.type=oci

gg.eventhandler.oci.pathMappingTemplate=${fullyQualifiedTableName}
gg.eventhandler.parquet.finalizeAction=none
gg.eventhandler.parquet.fileNameMappingTemplate=${fullyQualifiedTableName}_${currentTimestamp[dd.MM.YYYY]}.parquet
gg.handler.parquet.partitioner. TABLE_1=DAY=${substring[${columnValue[partitioned column]}][0][10]}
gg.handler.parquet.partitionByTable=true
gg.classpath=$THIRD_PARTY_DIR/oci/*:$THIRD_PARTY_DIR/hadoop/*:$THIRD_PARTY_DIR/parquet/*

Repeat the steps for Table_2. Partitioned parquet files will be generated in the Raw Zone bucket. These files will picked up in the next step by the OCI Data Flow.

Setting up OCI Data Flow

OCI Data Flow serves two distinct tasks based on requirements. The first script joins different tables (parquet files) and partitions them per month (YYYYMM). This process is not executed in real-time. The second script reads parquet files from the Raw Zone and creates two Delta Lake tables.

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import print_function
from random import random
from operator import add
import argparse
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, sum, to_date, from_unixtime, unix_timestamp, expr
from pyspark.sql.types import StringType

def main():

parser = argparse.ArgumentParser()
parser.add_argument("--parquet-table-path-table_1", required=True)
parser.add_argument("--parquet-table-path-table_2", required=True)
parser.add_argument("--delta-table-month-path", required=True)
args = parser.parse_args()

# Set up Spark.
spark = get_dataflow_spark_session()

parquetTablePath_df1 = args.parquet_table_path_table_1
parquetTablePath_df2 = args.parquet_table_path_table_2
deltaTablePathMonth = args.delta_table_month_path
# Read parquet files
df1 = spark.read.option("basePath", parquetTablePath_df1).parquet(parquetTablePath_df1).withColumn("time_stamp", current_timestamp())
df2 = spark.read.option("basePath", parquetTablePath_df2).parquet(parquetTablePath_df2).withColumn("time_stamp", current_timestamp())

df1_prefixed = df1.select([col(column).alias("df1_" + column) for column in df1.columns])
# Rename all columns in df2 with a prefix
df2_prefixed = df2.select([col(column).alias("df2_" + column) for column in df2.columns])

# Join the two DataFrames on the "UUID" column
joined_df = df1_prefixed.join(df2_prefixed, df1_prefixed["df1_UUID"] == df2_prefixed["df2_UUID"])

# Convert the "MyDate" column to a timestamp format
joined_df = joined_df.withColumn("df1_MyDate", from_unixtime(unix_timestamp("df1_MyDate", "dd.MM.yyyy HH:mm:ss")).cast("timestamp"))

# Extract month and year from the "MyDate" and create a new column "YearMonth"
joined_df = joined_df.withColumn("YearMonth", expr("date_format(df1_MyDate, 'yyyyMM')"))

joined_df.write.partitionBy("YearMonth").format("delta").mode("overwrite").save(deltaTablePathMonth)

def get_dataflow_spark_session(
app_name="DataFlow", file_location=None, profile_name=None, spark_config={}
):
"""
Get a Spark session in a way that supports running locally or in Data Flow.
"""
if in_dataflow():
spark_builder = SparkSession.builder.appName(app_name)
else:
# Import OCI.
try:
import oci
except:
raise Exception(
"You need to install the OCI python library to test locally"
)

# Use defaults for anything unset.
if file_location is None:
file_location = oci.config.DEFAULT_LOCATION
if profile_name is None:
profile_name = oci.config.DEFAULT_PROFILE

# Load the config file.
try:
oci_config = oci.config.from_file(
file_location=file_location, profile_name=profile_name
)
except Exception as e:
print("You need to set up your OCI config properly to run locally")
raise e
conf = SparkConf()
conf.set("fs.oci.client.auth.tenantId", oci_config["tenancy"])
conf.set("fs.oci.client.auth.userId", oci_config["user"])
conf.set("fs.oci.client.auth.fingerprint", oci_config["fingerprint"])
conf.set("fs.oci.client.auth.pemfilepath", oci_config["key_file"])
conf.set(
"fs.oci.client.hostname",
"https://objectstorage.{0}.oraclecloud.com".format(oci_config["region"]),
)
spark_builder = SparkSession\
.builder\
.appName("App")\
.config(conf=conf) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

# Add in extra configuration.
for key, val in spark_config.items():
spark_builder.config(key, val)

# Create the Spark session.
session = spark_builder.getOrCreate()
return session

def in_dataflow():
"""
Determine if we are running in OCI Data Flow by checking the environment.
"""
if os.environ.get("HOME") == "/home/dataflow":
return True
return False

if __name__ == "__main__":
main()

The second script reads the parquet files from Raw Zone and creates 2 delta lake tables. The script runs in parallel the function to load both tables at same time.

AppLoadMonthData_RT

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import print_function
import sys
from random import random
from operator import add
import argparse
import os
import time
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, sum, to_date, from_unixtime, unix_timestamp, expr
from pyspark.sql.types import StringType
from delta.tables import *
from datetime import datetime
from multiprocessing import Process
from multiprocessing.pool import ThreadPool

def main():

parser = argparse.ArgumentParser()
parser.add_argument("--parquet-table-path-file_1", required=True)
parser.add_argument("--delta-table-month-file_1-path", required=True)
parser.add_argument("--parquet-table-path-file_2", required=True)
parser.add_argument("--delta-table-month-file_2-path", required=True)
args = parser.parse_args()

file_1_path = args.parquet_table_path_file_1
target_file_1_tables = args.delta_table_month_file_1_path
file_2_path = args.parquet_table_path_file_2
target_file_2_tables = args.delta_table_month_file_2_path

# Set up Spark.
spark = get_dataflow_spark_session()
#run the monthly function in parallel for both files
parameters_list = [(spark, file_1_path, target_file_1_tables),
(spark, file_2_path, target_file_2_tables),
]
with ThreadPool() as pool:
pool.map(lambda params: monthly_function(*params), parameters_list)

def monthly_function(spark, source_daily_parquet_files, target_table):

while True:
df_daily_files = spark.read.option("basePath", source_daily_parquet_files).parquet(source_daily_parquet_files).withColumn("time_stamp", current_timestamp())
try:
df_target_monthly_files = spark.read.option("basePath", target_table).parquet(target_table).withColumn("time_stamp", current_timestamp())
except:
# If the target table doesn't exist, create an empty DataFrame with the same schema
schema = df_daily_files.schema
df_target_monthly_files = spark.createDataFrame([], schema).withColumn("time_stamp", current_timestamp()) \
.withColumn("YearMonth", lit(""))
df_target_monthly_files.write.partitionBy("YearMonth").format("delta").mode("overwrite").save(target_table)

df_daily_files = df_daily_files.withColumn("MyDate",
from_unixtime(unix_timestamp(col("MyDate"), "dd.MM.yyyy HH:mm:ss"))
.cast("timestamp"))
df_daily_files = df_daily_files.withColumn("YearMonth", expr("date_format(MyDate, 'yyyyMM')"))
# Read the latest processed timestamp from the Delta table

delta_table = DeltaTable.forPath(spark, target_table)

latest_timestamp = delta_table.toDF().agg({"time_stamp": "max"}).collect()[0][0]

columnsDF = df_daily_files.printSchema()
print("columnsDF ", columnsDF)
deltaTable = DeltaTable.forPath(spark,target_table)

try:
deltaTable.alias("target").merge(
source=df_daily_files.alias("source"),
condition="target.UUID = source.UUID and source.MyDate > '{latest_timestamp}' "
).whenMatchedUpdateAll()\
.whenNotMatchedInsertAll() \
.execute()
except Exception as e:
print(f"An error occurred: {e}")
time.sleep(60)

def get_dataflow_spark_session(
app_name="DataFlow", file_location=None, profile_name=None, spark_config={}
):
"""
Get a Spark session in a way that supports running locally or in Data Flow.
"""
if in_dataflow():
spark_builder = SparkSession.builder.appName(app_name)

else:
# Import OCI.
try:
import oci
except:
raise Exception(
"You need to install the OCI python library to test locally"
)

# Use defaults for anything unset.
if file_location is None:
file_location = oci.config.DEFAULT_LOCATION
if profile_name is None:
profile_name = oci.config.DEFAULT_PROFILE

# Load the config file.
try:
oci_config = oci.config.from_file(
file_location=file_location, profile_name=profile_name
)
except Exception as e:
print("You need to set up your OCI config properly to run locally")
raise e
conf = SparkConf()
conf.set("fs.oci.client.auth.tenantId", oci_config["tenancy"])
conf.set("fs.oci.client.auth.userId", oci_config["user"])
conf.set("fs.oci.client.auth.fingerprint", oci_config["fingerprint"])
conf.set("fs.oci.client.auth.pemfilepath", oci_config["key_file"])
conf.set(
"fs.oci.client.hostname",
"https://objectstorage.{0}.oraclecloud.com".format(oci_config["region"]),
)
spark_builder = SparkSession\
.builder\
.appName("App")\
.config("spark.log.level", "ERROR") \
.config(conf=conf) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

# Add in extra configuration.
for key, val in spark_config.items():
spark_builder.config(key, val)

# Create the Spark session.
session = spark_builder.getOrCreate()
return session


def in_dataflow():
"""
Determine if we are running in OCI Data Flow by checking the environment.
"""
if os.environ.get("HOME") == "/home/dataflow":
return True
return False


if __name__ == "__main__":
main()

OCI Data Integration configuration

OCI Data Integration is utilized to combine merged files in the Silver Zone, enrich them with additional information, and deliver the output to the Autonomous Database in the Gold Zone. Enrichment is optional, and if not needed, data can be loaded directly through the Data Loader Task. Integration Tasks with attached data flows are created for this purpose.

More information about tasks could be found here: https://docs.oracle.com/en-us/iaas/data-integration/using/manage-design-tasks.htm

To create a data flow, in home page of your OCI Data Integration workspace click on “Create data flow”:

A blank canvas will open. Drag required operators from the operator list to create transformations and populate required fields, such is: Source, target, Connection name, etc.

At completion, you should have a similar view, depending on what and how many transformations you have.

Target table could be created prior to data ingestion, or you could leave OCI Data Integration to do it for you. For that, click on Target operator and while populating required fields, click on “Create new data entity”. This option will create structure table first in ADW and then will load data.

Save the data flow.

Now create an Integration Task and in the Data Flow field, select the one that you have just created.

Once done, publish it to your application and you could Run it. Target table will be created in your ADW.

Conclusion

The implementation of a Lakehouse architecture requires the integration of multiple services. Oracle GoldenGate facilitates data replication from DBCS into Object Storage, while OCI Data Flow manages data partitioning and merging. OCI Data Integration plays a pivotal role in loading and enriching data into the Autonomous Database, thereby fulfilling the customer’s requirements effectively.

References

1 — https://www.oracle.com/big-data/what-is-data-lakehouse/

- https://docs.oracle.com/en/cloud/paas/goldengate-service/retlo/#articletitle

- https://docs.oracle.com/en/cloud/paas/goldengate-service/yvyvx/index.html#articletitle

--

--

Eloi Lopes

Opinions expressed are solely my own and do not express the views or opinions of my employer Oracle. https://www.linkedin.com/in/eloilopes/