OCI DataFlow — Invoking OCI Resources

Introduction
OCI DataFlow is a managed Apache Spark as a Service, which helps in distributed data processing. This article will look at invoking other OCI services from within the DataFlow application.
Use Case
While running a DataFlow application, we might need to access Object Store using OCI SDK to read or store documents. Similarly, we might need to access the OCI Stream to consume or push messages. Note that, instead of using spark APIs to perform these operations, we will use OCI SDK to access these OCI resources.
IAM Policies
We will need a dynamic group which refers to the DataFlow run. Make sure to add conditions as needed to assign fine grained permissions.
ALL { resource.type='dataflowrun' }
Add a policy which refers to this dynamic group. This policy will allow DataFlow to read and write objects to OCI Object Store.
ALLOW dynamic-group <DG_NAME> to use objects in compartment <compartment>
Authentication & Authorization
We can either use instance principal or Resource Principal to get authorized access to OCI resources. We will show both the options in this article.
Adding dependencies
We need to add OCI module as a dependency for our Python application. We can add dependencies by creating an archive.zip.
Refer to this page on how to create an archive.zip file. https://docs.oracle.com/en-us/iaas/data-flow/using/third-party-provide-archive.htm
DataFlow Application using Instance Principal
Here is the sample code which uses Instance Principal to access OCI Object Store. We can create any OCI resource client by using this mechanism.
Refer to the below github repo to create a client using instance principal. Observe that, we are creating the signer using ‘oci.auth.signers.InstancePrincipalsDelegationTokenSigner’.
import oci
import os
import sys
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
# Helper Functions
def get_token_path(spark):
token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
token_path = spark.sparkContext.getConf().get(token_key)
return token_path
def get_authenticated_client(token_path, client):
import oci
import os
if token_path is None:
# You are running locally, so use our API Key.
config = oci.config.from_file()
authenticated_client = client(config)
else:
# You are running in Data Flow, so use our Delegation Token.
with open(token_path) as fd:
delegation_token = fd.read()
signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
delegation_token=delegation_token
)
authenticated_client = client(config={}, signer=signer)
return authenticated_client
def get_dataflow_spark_session(
app_name="DataFlow", file_location=None, profile_name=None, spark_config={}
):
"""
Get a Spark session in a way that supports running locally or in Data Flow.
"""
spark_builder = SparkSession.builder.appName(app_name)
# Add in extra configuration.
for key, val in spark_config.items():
spark_builder.config(key, val)
# Create the Spark session.
session = spark_builder.getOrCreate()
return session
def main():
try:
spark_session = get_dataflow_spark_session()
# Get our IAM signer.
token_path = get_token_path(spark_session)
print(token_path)
# Get an object storage client.
object_storage_client = get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
namespace = object_storage_client.get_namespace().data
print(namespace)
obj = object_storage_client.get_object(namespace, "df-basic", "test1.txt")
print(obj)
print(obj.data.content)
my_data = b"Hello, World!"
#Write objectsto object store
obj = object_storage_client.put_object(
namespace,
"df-basic",
"new_file.txt",
my_data)
except Exception as e:
print("Error in dataflow run!")
print(e)
if __name__ == "__main__":
main()
DataFlow Application using Resource Principal
To use Resource Principal, while creating the DataFlow application, we need to enable ‘Resource Principal’ in advanced options as shown below.

import oci
import os
import sys
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
def get_authenticated_client(token_path, client):
import oci
import os
if token_path is None:
# You are running locally, so use our API Key.
config = oci.config.from_file()
authenticated_client = client(config)
else:
# You are running in Data Flow, so use our Delegation Token.
with open(token_path) as fd:
delegation_token = fd.read()
signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
delegation_token=delegation_token
)
authenticated_client = client(config={}, signer=signer)
return authenticated_client
def get_dataflow_spark_session(
app_name="DataFlow", file_location=None, profile_name=None, spark_config={}
):
"""
Get a Spark session in a way that supports running locally or in Data Flow.
"""
spark_builder = SparkSession.builder.appName(app_name)
# Add in extra configuration.
for key, val in spark_config.items():
spark_builder.config(key, val)
# Create the Spark session.
session = spark_builder.getOrCreate()
return session
def main():
try:
spark_session = get_dataflow_spark_session()
# Get our IAM signer.
signer_token = oci.auth.signers.get_resource_principals_signer()
# Get an object storage client.
object_storage_client = oci.object_storage.ObjectStorageClient(config={}, signer= signer_token)
namespace = object_storage_client.get_namespace().data
print(namespace)
obj = object_storage_client.get_object(namespace, "df-basic", "test1.txt")
print(obj)
print(obj.data.content)
my_data = b"Hello, World!"
#Write objectsto object store
obj = object_storage_client.put_object(
namespace,
"df-basic",
"new_file.txt",
my_data)
except Exception as e:
print("Error in dataflow run!")
print(e)
if __name__ == "__main__":
main()