OCI DataFlow — Invoking OCI Resources

Raghuveer Bhandarkar
4 min readFeb 8, 2024
Source: <a href=”https://www.flaticon.com/free-icons/data-flow" title=”data flow icons”>Data flow icons created by Alfian Dwi Hartanto — Flaticon</a>

Introduction

OCI DataFlow is a managed Apache Spark as a Service, which helps in distributed data processing. This article will look at invoking other OCI services from within the DataFlow application.

Use Case

While running a DataFlow application, we might need to access Object Store using OCI SDK to read or store documents. Similarly, we might need to access the OCI Stream to consume or push messages. Note that, instead of using spark APIs to perform these operations, we will use OCI SDK to access these OCI resources.

IAM Policies

We will need a dynamic group which refers to the DataFlow run. Make sure to add conditions as needed to assign fine grained permissions.

ALL { resource.type='dataflowrun' }

Add a policy which refers to this dynamic group. This policy will allow DataFlow to read and write objects to OCI Object Store.

ALLOW dynamic-group <DG_NAME> to use objects in compartment <compartment>

Authentication & Authorization

We can either use instance principal or Resource Principal to get authorized access to OCI resources. We will show both the options in this article.

Adding dependencies

We need to add OCI module as a dependency for our Python application. We can add dependencies by creating an archive.zip.

Refer to this page on how to create an archive.zip file. https://docs.oracle.com/en-us/iaas/data-flow/using/third-party-provide-archive.htm

DataFlow Application using Instance Principal

Here is the sample code which uses Instance Principal to access OCI Object Store. We can create any OCI resource client by using this mechanism.

Refer to the below github repo to create a client using instance principal. Observe that, we are creating the signer using ‘oci.auth.signers.InstancePrincipalsDelegationTokenSigner’.

import oci

import os
import sys
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext

# Helper Functions
def get_token_path(spark):
token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
token_path = spark.sparkContext.getConf().get(token_key)
return token_path

def get_authenticated_client(token_path, client):
import oci
import os

if token_path is None:
# You are running locally, so use our API Key.
config = oci.config.from_file()
authenticated_client = client(config)
else:
# You are running in Data Flow, so use our Delegation Token.
with open(token_path) as fd:
delegation_token = fd.read()
signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
delegation_token=delegation_token
)
authenticated_client = client(config={}, signer=signer)
return authenticated_client

def get_dataflow_spark_session(
app_name="DataFlow", file_location=None, profile_name=None, spark_config={}
):
"""
Get a Spark session in a way that supports running locally or in Data Flow.
"""


spark_builder = SparkSession.builder.appName(app_name)

# Add in extra configuration.
for key, val in spark_config.items():
spark_builder.config(key, val)

# Create the Spark session.
session = spark_builder.getOrCreate()
return session

def main():
try:

spark_session = get_dataflow_spark_session()

# Get our IAM signer.
token_path = get_token_path(spark_session)

print(token_path)

# Get an object storage client.
object_storage_client = get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)

namespace = object_storage_client.get_namespace().data

print(namespace)
obj = object_storage_client.get_object(namespace, "df-basic", "test1.txt")

print(obj)
print(obj.data.content)
my_data = b"Hello, World!"

#Write objectsto object store
obj = object_storage_client.put_object(
namespace,
"df-basic",
"new_file.txt",
my_data)


except Exception as e:
print("Error in dataflow run!")
print(e)

if __name__ == "__main__":
main()

DataFlow Application using Resource Principal

To use Resource Principal, while creating the DataFlow application, we need to enable ‘Resource Principal’ in advanced options as shown below.

import oci

import os
import sys
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext


def get_authenticated_client(token_path, client):
import oci
import os

if token_path is None:
# You are running locally, so use our API Key.
config = oci.config.from_file()
authenticated_client = client(config)
else:
# You are running in Data Flow, so use our Delegation Token.
with open(token_path) as fd:
delegation_token = fd.read()
signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
delegation_token=delegation_token
)
authenticated_client = client(config={}, signer=signer)
return authenticated_client

def get_dataflow_spark_session(
app_name="DataFlow", file_location=None, profile_name=None, spark_config={}
):
"""
Get a Spark session in a way that supports running locally or in Data Flow.
"""


spark_builder = SparkSession.builder.appName(app_name)

# Add in extra configuration.
for key, val in spark_config.items():
spark_builder.config(key, val)

# Create the Spark session.
session = spark_builder.getOrCreate()
return session

def main():
try:

spark_session = get_dataflow_spark_session()

# Get our IAM signer.

signer_token = oci.auth.signers.get_resource_principals_signer()

# Get an object storage client.
object_storage_client = oci.object_storage.ObjectStorageClient(config={}, signer= signer_token)

namespace = object_storage_client.get_namespace().data

print(namespace)
obj = object_storage_client.get_object(namespace, "df-basic", "test1.txt")

print(obj)
print(obj.data.content)
my_data = b"Hello, World!"

#Write objectsto object store
obj = object_storage_client.put_object(
namespace,
"df-basic",
"new_file.txt",
my_data)


except Exception as e:
print("Error in dataflow run!")
print(e)

if __name__ == "__main__":
main()

References

  1. https://docs.oracle.com/en-us/iaas/data-flow/data-flow-tutorial/use-your-delegation-token/overview.htm
  2. https://github.com/oracle-samples/oracle-dataflow-samples

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Raghuveer Bhandarkar
Raghuveer Bhandarkar

Written by Raghuveer Bhandarkar

Machine Learning, Architecture, Georgia Tech Alumni.

No responses yet

Write a response