Extracting data from Salesforce in near real time using OCI Data Flow — Part 1
Oracle Cloud Infrastructure Data Flow (OCI Data Flow) is a fully managed service for running Apache Spark applications. It lets developers to focus on their applications and provides an easy runtime environment to run them. It has an easy and simple user interface with API support for integration with applications and workflows. You don’t need to spend any time on the underlying infrastructure, cluster provisioning, or software installation.[1]
Salesforce is one of the most popular CRMs. Very often, we have customers who want to consolidate data from different sources in a data lake or data warehouse. In this blog, we will see an example of how to capture data from the Salesforce Accounts table and load it into an OCI Streaming (Kafka) topic.
Prerequisites
- OCI Data Flow policies created;
- OCI Streaming Stream pools and Topics (Stream) created;
- Docker installed.
How to extract data from Salesforce?
Simple-Salesforce is a basic Salesforce.com REST API client built for Python 3.6, 3.7 3.8, 3.9, 3.10, and 3.11. [2]
Using this package, we can integrate Salesforce data with other sources. In this example, we use the Account table from Salesforce, but the code can be adapted to work with other tables.
Example:
from simple_salesforce import Salesforce
# Salesforce credentials
sf = Salesforce(username='<username>', password='<password>', security_token='<security token>')
def fetch_updated_records(last_extracted_timestamp):
soql_query = f"SELECT Id, Name, LastModifiedDate FROM Account WHERE LastModifiedDate > {last_extracted_timestamp}"
result = sf.query_all(query=soql_query)
return result.get('records', [])
What does the code?
We start the process by retrieving data from Salesforce accounts while applying a filter based on the last modified date (LastModifiedDate). During the initial run of the code, the query encompasses all results modified after ‘2000–01–01T00:00:00Z’.
Subsequently, we format the output of the Salesforce query into a Spark data frame using the ‘records_to_dataframe’ function. This step ensures that potential issues are identified and addressed before the actual data load.
In the main function, we retrieve all records based on the last timestamp and insert the new data into Topics. The entire process operates within an infinite loop, with a 10-second wait between each iteration of the ‘fetch records’ function. If real-time updates are not required, you can remove the “while True:” loop. The code includes the necessary configuration for connecting to OCI Streaming; you just need to replace the placeholder values with your environment-specific information.”
**This code is just an example, I’m not responsible for the use of it**
#This code is provided "as is," without warranty of any kind,
#express or implied, including but not limited to the warranties of
#merchantability, fitness for a particular purpose, and non-infringement.
#In no event shall the contributors or copyright holders be liable for
#any claim, damages, or other liability, whether in an action of contract,
#tort, or otherwise, arising from, out of, or in connection with the
#software or the use or other dealings in the software.
#Users are solely responsible for the use of this code and assume all
#risks associated with its use. The contributors do not provide any support
#or maintenance for the code, and users should not expect updates or bug fixes. It is the responsibility of users to review and test the code thoroughly before deployment.
#By using this code, you agree to comply with all applicable laws and
#regulations. The contributors disclaim any and all liability
#for the use of this code and do not guarantee its fitness
#for any particular purpose.
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
from simple_salesforce import Salesforce
import os
import argparse
from datetime import datetime, timedelta
from salesforceCredentials import username,password,security_token
import time
# Salesforce credentials
sf = Salesforce(username=username, password=password, security_token=security_token)
parser = argparse.ArgumentParser()
parser.add_argument('--auth-type', default='PLAIN')
parser.add_argument('--bootstrap-port', default='9092')
parser.add_argument('--bootstrap-server', default='cell-1.streaming.<OCI Region>.oci.oraclecloud.com')
parser.add_argument('--checkpoint-location', default='oci://<bucketname>@<namespace>/<checkpoint folder>')
parser.add_argument('--encryption', default='SASL_SSL')
parser.add_argument('--ocid')
parser.add_argument('--output-location', default='oci://<bucketname>@<namespace>/<output folder>')
parser.add_argument('--output-mode', default='file')
parser.add_argument('--stream-password',default='<Token>')
parser.add_argument('--raw-stream', default='<Topic name>')
parser.add_argument('--stream-username',default='<tenancy name>/<username>/<streampool OCID>')
args = parser.parse_args()
if args.bootstrap_server is None:
args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
if args.raw_stream is None:
args.raw_stream = os.environ.get('RAW_STREAM')
if args.stream_username is None:
args.stream_username = os.environ.get('STREAM_USERNAME')
if args.stream_password is None:
args.stream_password = os.environ.get('STREAM_PASSWORD')
assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
assert args.output_location is not None, "Output location (--output-location) must be set"
assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
if args.ocid is not None:
jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
args.auth_type = 'OCI-RSA-SHA256'
else:
jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
# Function to fetch updated records from Salesforce
def fetch_updated_records(last_extracted_timestamp):
soql_query = f"SELECT Id, Name, LastModifiedDate FROM Account WHERE LastModifiedDate > {last_extracted_timestamp}"
result = sf.query_all(query=soql_query)
return result.get('records', [])
# Function to convert Salesforce records to PySpark DataFrame
def records_to_dataframe(records):
spark = SparkSession.builder.appName("SalesforceApp") \
.getOrCreate()
#spark.sparkContext.setLogLevel('ERROR')
# Check if records is not empty and has the expected structure
if records and isinstance(records, list) and isinstance(records[0], dict):
# Extract values from each record
data = [(record.get("Id", ""), record.get("Name", ""), record.get("LastModifiedDate", "")) for record in records]
schema = ["Id", "Name", "LastModifiedDate"]
return spark.createDataFrame(data, schema=schema)
else:
# If records are not in the expected format, return an empty DataFrame
return spark.createDataFrame([], schema=["Id", "Name", "LastModifiedDate"])
# Main streaming ingestion process
def main():
#Dummy last extracted timestamp
last_extracted_timestamp = '2000-01-01T00:00:00Z'
while True:
# Fetch updated records from Salesforce
records = fetch_updated_records(last_extracted_timestamp)
if records:
# Convert records to DataFrame
df = records_to_dataframe(records)
# Write to Kafka
df_transformed = df.selectExpr("CAST(Id AS STRING) AS key", "CAST(Name AS STRING) AS value", "CAST(LastModifiedDate AS TIMESTAMP) AS timestamp")
# Kafka configuration
df_transformed \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", '{}:{}'.format(args.bootstrap_server,
args.bootstrap_port)) \
.option("kafka.enable.idempotence", "false") \
.option("kafka.sasl.jaas.config", jaas_template.format(
username=args.stream_username, password=args.stream_password
)) \
.option('kafka.sasl.mechanism', args.auth_type) \
.option('kafka.security.protocol', args.encryption) \
.option("topic", args.raw_stream) \
.option("kafka.max.partition.fetch.bytes", "1024 * 1024") \
.option("checkpointLocation", args.checkpoint_location) \
.mode("append") \
.save()
# Update the last extraction timestamp
last_extracted_timestamp = max([record["LastModifiedDate"] for record in records])
# Sleep for a specified interval before the next iteration
time.sleep(10)
if __name__ == "__main__":
# Start the streaming ingestion process
main()
How to deploy the code and dependencies into OCI Data Flow?
Deploying the code is quite easy, you just need to upload your code into OCI Object Storage.
To have all dependencies you need Docker installed. Create a folder on:
sudo mkdir /opt/dataflow
Add these 2 files with the python and java dependencies:
#java
vi packages.txt
org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1
https://repo1.maven.org/maven2/org/apache/spark/spark-sql_2.12/3.2.1/spark-sql_2.12-3.2.1.jar
#python
vi requirements.txt
simple_salesforce==1.12.5
urllib3==1.26.16
Run this command:
#In my case, I'm using Python 3.8
#Check documentaion links in case you are running the code in different platform
sudo docker run --platform linux/amd64 --rm -v $(pwd):/opt/dataflow --pull always -it phx.ocir.io/oracle/dataflow/dependency-packager:latest -p 3.8
The above code will generate 2 files, one archive.zip and version.txt. Your version file should be similar to this one:
build_platform=linux/amd64
os=oraclelinux:7-slim
dependency_packager=2.1.0
python=3.8.11
29 Python packages installed
Package Version
------------------ ------------
attrs 23.1.0
certifi 2023.11.17
cffi 1.16.0
charset-normalizer 3.3.2
cryptography 41.0.7
distlib 0.3.7
filelock 3.12.2
idna 3.6
isodate 0.6.1
lxml 4.9.3
more-itertools 10.1.0
pendulum 2.1.2
pip 23.2.1
platformdirs 3.10.0
pycparser 2.21
PyJWT 2.8.0
python-dateutil 2.8.2
pytz 2023.3.post1
pytzdata 2020.1
requests 2.31.0
requests-file 1.5.1
requests-toolbelt 1.0.0
setuptools 68.1.2
simple-salesforce 1.12.5
six 1.16.0
urllib3 1.26.16
virtualenv 20.24.3
wheel 0.41.2
zeep 4.2.1
Packages installed:
14 Java packages installed
com.google.code.findbugs_jsr305-3.0.0.jar
commons-logging_commons-logging-1.1.3.jar
org.apache.commons_commons-pool2-2.6.2.jar
org.apache.hadoop_hadoop-client-api-3.3.1.jar
org.apache.hadoop_hadoop-client-runtime-3.3.1.jar
org.apache.htrace_htrace-core4-4.1.0-incubating.jar
org.apache.kafka_kafka-clients-2.8.0.jar
org.apache.spark_spark-sql-kafka-0-10_2.12-3.2.1.jar
org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.2.1.jar
org.lz4_lz4-java-1.7.1.jar
org.slf4j_slf4j-api-1.7.30.jar
org.spark-project.spark_unused-1.0.0.jar
org.xerial.snappy_snappy-java-1.1.8.4.jar
spark-sql_2.12-3.2.1.jar
Creating OCI Data Flow Application
Before creating the application, we need to upload our archive.zip file into Object Storage Bucket (the same for code).
Now, go to OCI Console and click on top left corner (Hamburger Menu) and then Analytics & AI → Data Flow. Click on Create Application:
Just configure accordingly with your requirements:
On Application configuration, just select the bucket where you stored your code and then the python file. For the archive.zip insert oci://<bucket_name>@<namespace>/archive.zip
And click on Create and run the application.
I hope you enjoyed the first part of this blog. In part 2, we will delve into integrating the data from Salesforce into Oracle DB using OCI GoldenGate.
References
1 — https://docs.oracle.com/en-us/iaas/data-flow/using/home.htm
2 — https://pypi.org/project/simple-salesforce/
2 — https://docs.oracle.com/en-us/iaas/data-flow/using/third-party-provide-archive.htm
4 — https://docs.oracle.com/en-us/iaas/Content/API/Concepts/apisigningkey.htm#Required_Keys_and_OCIDs