Typical Data Access API Tasks

This section demonstrates the Data Access API's ability to access semi-structured data stored in Cassandra, in a structured way (SQL).

Loading Entities and Interactions

  • Load HCPs as hcps table. The table will be registered as a temporary table, available for SQL querying
  • Load Prescription interactions as prescriptions table
  • Load Beneficiaries as ben table

Initial setup

Set the access keys for S3. The S3 storage contains:

  • Source CSV files with interactions to load into the system
  • System-managed Parquet files to access interactions in Reltio Data Science

Load interactions using the Data Access API.

Example 1 (DataFrame definition):

val intz:DataFrame = framework.dataAccess
.configure("rafdemo", token)
 new InteractionDatasetBuilder()
.explode("members.Beneficiary.Id", "BenId")

Example 1 Output:

intz: org.apache.spark.sql.DataFrame = 
[Id: bigint, attributes: struct<PTNT_PAY_AMT:double,TOT_RX_CST_AMT:double>, members: struct<Beneficiary:array<struct<Id:bigint>>>, BenId: bigint, PTNT_PAY_AMT: double, TOT_RX_CST_AMT: double]

Example 2 (Print Schema):


Example 2 Output:

|-- Id: long (nullable = true)
|-- attributes: struct (nullable = true)
| |-- PTNT_PAY_AMT: double (nullable = true)
| |-- TOT_RX_CST_AMT: double (nullable = true)
|-- members: struct (nullable = true)
| |-- Beneficiary: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Id: long (nullable = true)
|-- BenId: long (nullable = true)
|-- PTNT_PAY_AMT: double (nullable = true)
|-- TOT_RX_CST_AMT: double (nullable = true)

Multiple DataFrame Builder

Note: These examples are based on Using the Data Access API v1.0. For more information about latest API features, see the Data Access API 2.0 v2.0 section.

Example (DataFrame definition):

val list: java.util.ArrayList[DataFrame] = framework.dataAccess
.configure("rafdemo", token)
 new EntityDatasetBuilder()
.explode("attributes.Specialities.Specialty", "Specialty")
 new EntityDatasetBuilder()
val hcp: DataFrame = list.get(0)
val hco: DataFrame = list.get(1)

SQL Examples

Count total number of HCPs in the system.
Note: The nested query is needed because we previously exploded a few fields, thus just running select count() would result in incorrect results.
%sql SELECT count(*) FROM (SELECT DISTINCT id FROM hcps) as t
Counting male/female HCPs
%sql SELECT a.Gender, count(*) as count FROM (SELECT DISTINCT id, Gender FROM hcps)
            AS a GROUP BY Gender
Deleting Interactions
To delete interactions which do not have any members, we need the following:
  • An UDF function
  • DataAccess API to define a DataFrame with interactions
  • SQL to filter out data
  • DataDelete API to delete the data
Registering UDF

Registering a UDF function for sqlContext:

            		(s : WrappedArray[Array[AnyRef]]) => s.length)
Defining a DataFrame

Using DataAccess API to define DataFrame with interactions:

val df:DataFrame = af.dataAccess
Filtering Data

Using SQL to filter out data (interactions with zero length array of members):

val dfFiltered:DataFrame = sqlContext.sql("SELECT `Id`,
            members.ProviderOrg FROM interactions WHERE
Deleting Data

Using Data Delete API to delete interactions of specified type by previously defined filter:

import com.reltio.analytics.data.delete.Deleteimport
val tp = "configuration/interactionTypes/{yourInteractionType}"val
func : Delete = af.dataDelete()
.withBuilder( new DeleteInteractionBuilder()
  .ofType(tp) )