Extracting data from Salesforce in near real time using OCI Data Flow and GoldenGate Stream Analytics — Part 2

Eloi Lopes
7 min readDec 12, 2023

On part 1 of this blog, we’ve seen how to create Python code and deploy it on OCI Data Flow (Spark) to capture data in near real-time (every 10 seconds) and write it into OCI Streaming.

In this part 2, we are going to modify two things. First, we are changing the output of Spark data frames to generate a JSON messages, and secondly, we’ll add GoldenGate Stream Analytics to process events in real time from Salesforce and other sources like Kafka, Oracle databases, MySQL, and integrate with other GoldenGate products.

What is GoldenGate Stream Analytics?

The Oracle GoldenGate Stream Analytics (GGSA) is a complete solution platform for building applications to filter, correlate, and process events in real-time. With flexible deployment options of stand-alone Spark or Hadoop-YARN, it proves to be a versatile, high-performance event processing engine.

As December 12 of 2023 when this blog is being published there is available a PaaS GoldenGate Stream Analytics under Limited Availability.

Changing python code from part 1

To generate JSON records we are going to create a new function (I modified the function fetch_updated_records).

#This code is provided "as is," without warranty of any kind, 
#express or implied, including but not limited to the warranties of
#merchantability, fitness for a particular purpose, and non-infringement.
#In no event shall the contributors or copyright holders be liable for
#any claim, damages, or other liability, whether in an action of contract,
#tort, or otherwise, arising from, out of, or in connection with the
#software or the use or other dealings in the software.

#Users are solely responsible for the use of this code and assume all
#risks associated with its use. The contributors do not provide any support
#or maintenance for the code, and users should not expect updates or bug fixes. It is the responsibility of users to review and test the code thoroughly before deployment.

#By using this code, you agree to comply with all applicable laws and
#regulations. The contributors disclaim any and all liability
#for the use of this code and do not guarantee its fitness
#for any particular purpose.
def fetch_updated_json_records(last_extracted_timestamp):
soql_query = f"SELECT Id, Name, LastModifiedDate FROM Account WHERE LastModifiedDate > {last_extracted_timestamp}"
result = sf.query_all(query=soql_query)
records = result.get('records', [])
# Convert records to JSON format
json_data = json.dumps(records)
return json_data

Change the code of main function to process the JSON records:


#This code is provided "as is," without warranty of any kind,
#express or implied, including but not limited to the warranties of
#merchantability, fitness for a particular purpose, and non-infringement.
#In no event shall the contributors or copyright holders be liable for
#any claim, damages, or other liability, whether in an action of contract,
#tort, or otherwise, arising from, out of, or in connection with the
#software or the use or other dealings in the software.

#Users are solely responsible for the use of this code and assume all
#risks associated with its use. The contributors do not provide any support
#or maintenance for the code, and users should not expect updates or bug fixes. It is the responsibility of users to review and test the code thoroughly before deployment.

#By using this code, you agree to comply with all applicable laws and
#regulations. The contributors disclaim any and all liability
#for the use of this code and do not guarantee its fitness
#for any particular purpose.
def main():
last_extracted_timestamp = '2000-01-01T00:00:00Z'
while True:
# Fetch updated records from Salesforce
records = fetch_updated_json_records(last_extracted_timestamp)
if len(records) >2:
spark = SparkSession.builder.appName("SalesforceApp").getOrCreate()
# Read JSON data into a Spark DataFrame
df = spark.read.json(spark.sparkContext.parallelize([records]))
# Show the DataFrame
#df.show()
last_modified_date_row = df.orderBy("LastModifiedDate", ascending=False).first()
last_modified_date=last_modified_date_row['LastModifiedDate']
# Write to Kafka
df_transformed = df.selectExpr("CAST(Id AS STRING) AS key","to_json(struct(*)) AS value")
# Kafka configuration
df_transformed \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", '{}:{}'.format(args.bootstrap_server,
args.bootstrap_port)) \
.option("kafka.enable.idempotence", "false") \
.option("kafka.sasl.jaas.config", jaas_template.format(
username=args.stream_username, password=args.stream_password
)) \
.option('kafka.sasl.mechanism', args.auth_type) \
.option('kafka.security.protocol', args.encryption) \
.option("topic", args.raw_stream) \
.option("kafka.max.partition.fetch.bytes", "1024 * 1024") \
.option("checkpointLocation", args.checkpoint_location) \
.mode("append") \
.save()


last_modified_datetime = last_modified_date[:19]+'Z' #to UTC time
last_extracted_timestamp = last_modified_datetime
# Sleep for a specified interval before the next iteration
time.sleep(10)

Configuring Stream Analytics Pipeline

We are going to configure a Pipeline in Stream Analytics. The main goal of this pipeline is to use the OCI Streaming topic as the main source and join the data with an Oracle database table. If the data matches, it is loaded into Autonomous Data Warehouse. To create this pipeline, it’s required to create all connections and objects. That’s what we are going to do in the next paragraphs.

Creating connection to OCI Streaming

Log in to GoldenGate Stream Analytics and click on Create New Item and Select Kafka.

Provide a connection name:

Select Use Bootstrap and use the OCI Streaming FQDN (cell-1.streaming.<region>.oci.oraclecloud.com) and the port 992.

The User Name is a string compound by : <tenancy name>/<username>/<ocid1.streampool..>

Password: <Token>

Autonomous Database Connection

Click on Create New ItemConnectionOracle Database and provide a connection name:

I’m using Autonomous Data Warehouse, so I just need to select Wallet and upload the wallet file.

Save the connection.

Creating Reference Table

A reference is a source of static data that provides contextual information about the event data. References are used to enrich data that arrives from a stream.

Click on Reference and Database Table:

Provide a reference name:

Select the database connection name. For this example, I’m not using cache.

Just select the database table name that contains the column(s) that you want to join with Kafka topic:

Creating Stream

A Stream is a source of continuous and dynamic data. The data can be from a wide variety of data sources such as IoT sensors, transaction or activity log files, point-of-sale devices, ATM machines, transactional databases, or information from geospatial services or social networks.

In our case, we are going to use the OCI Streaming as a stream. Click on Create New Item, Stream and then Kafka.

Provide a stream name:

Select the OCI Stream Connection and the Topic you used to store the Salesforce data. The Data Format is JSON.

The Infer Shape will work if you have the Topic receiving data while the infer shape is working. Another option, it to use a sample JSON message and produce it during the infer shape.

Click Save.

Following you will have a popup to create the pipeline based on previous stream:

The pipeline will start in Draft mode, and you can see the data popping in real time.

The next step is to create a join with OCI Streaming topic. Right click, add a stage and Query:

Select the reference table (in my case SALESACCOUNT). Add the condition by ID.

You will notice, new columns were added to Live Output and with different colors.

Let’s create the target table. Right click, add a stage and Target.

From target, select an existing target table or create your own one.

Publish the pipeline and verify if the data is being inserted into oracle database table.

I hope you have enjoyed the part 2 of this blog.

References

--

--

Eloi Lopes

Opinions expressed are solely my own and do not express the views or opinions of my employer Oracle. https://www.linkedin.com/in/eloilopes/