Typical Data Access API Tasks

This section demonstrates the Data Access API's ability to access semi-structured data stored in Cassandra, in a structured way (SQL).

Loading Entities and Interactions

  • Load HCPs as hcps table. The table will be registered as a temporary table, available for SQL querying
  • Load Prescription interactions as prescriptions table
  • Load Beneficiaries as ben table

Initial setup

Set the access keys for S3. The S3 storage contains:

  • Source CSV files with interactions to load into the system
  • System-managed Parquet files to access interactions in Reltio Data Science

Load interactions using the Data Access API.

Example 1 (DataFrame definition):

val intz:DataFrame = framework.dataAccess
.configure("rafdemo", token)
.interaction(
 new InteractionDatasetBuilder()
.cassandra()
.asTable("prescriptions")
.ofType("configuration/interactionTypes/PrescriptionDrugEvent")
.select("Id")
.select("attributes.PTNT_PAY_AMT")
.select("attributes.TOT_RX_CST_AMT")
.select("members.Beneficiary")
.explode("members.Beneficiary.Id", "BenId")
.explode("attributes.PTNT_PAY_AMT","PTNT_PAY_AMT")
.explode("attributes.TOT_RX_CST_AMT","TOT_RX_CST_AMT"))
.build()
intz.cache

Example 1 Output:

intz: org.apache.spark.sql.DataFrame = 
[Id: bigint, attributes: struct<PTNT_PAY_AMT:double,TOT_RX_CST_AMT:double>, members: struct<Beneficiary:array<struct<Id:bigint>>>, BenId: bigint, PTNT_PAY_AMT: double, TOT_RX_CST_AMT: double]

Example 2 (Print Schema):

intz.printSchema

Example 2 Output:

root
|-- Id: long (nullable = true)
|-- attributes: struct (nullable = true)
| |-- PTNT_PAY_AMT: double (nullable = true)
| |-- TOT_RX_CST_AMT: double (nullable = true)
|-- members: struct (nullable = true)
| |-- Beneficiary: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Id: long (nullable = true)
|-- BenId: long (nullable = true)
|-- PTNT_PAY_AMT: double (nullable = true)
|-- TOT_RX_CST_AMT: double (nullable = true)

Multiple DataFrame Builder

Note: These examples are based on Using the Data Access API v1.0. For more information about latest API features, see the Data Access API 2.0 v2.0 section.

Example (DataFrame definition):

val list: java.util.ArrayList[DataFrame] = framework.dataAccess
.configure("rafdemo", token)
.entity(
 new EntityDatasetBuilder()
.ofType("configuration/entityTypes/HCP")
.select("Id")
.select("attributes")
.explode("attributes.FirstName","FirstName")
.explode("attributes.LastName","LastName")
.explode("attributes.Gender","Gender")
.explode("attributes.Specialities.SpecialtyType","SpecialtyType")
.explode("attributes.Specialities.Specialty", "Specialty")
.asTable("hcps"))
.entity(
 new EntityDatasetBuilder()
.ofType("configuration/entityTypes/HCO")
.select("Id")
.select("attributes")
.asTable("hcos"))
.build()
val hcp: DataFrame = list.get(0)
val hco: DataFrame = list.get(1)
hcp.cache
hco.cache

SQL Examples

Count total number of HCPs in the system.
Note: The nested query is needed because we previously exploded a few fields, thus just running select count() would result in incorrect results.
%sql SELECT count(*) FROM (SELECT DISTINCT id FROM hcps) as t
Counting male/female HCPs
%sql SELECT a.Gender, count(*) as count FROM (SELECT DISTINCT id, Gender FROM hcps)
            AS a GROUP BY Gender
Deleting Interactions
To delete interactions which do not have any members, we need the following:
  • An UDF function
  • DataAccess API to define a DataFrame with interactions
  • SQL to filter out data
  • DataDelete API to delete the data
Registering UDF

Registering a UDF function for sqlContext:

sqlContext.udf.register("arraylen",
            		(s : WrappedArray[Array[AnyRef]]) => s.length)
Defining a DataFrame

Using DataAccess API to define DataFrame with interactions:

val df:DataFrame = af.dataAccess
  .dataset(
    builder
      .select("Id")
      .select("members")
      .asTable("interactions") 
    )
  .build();
Filtering Data

Using SQL to filter out data (interactions with zero length array of members):

val dfFiltered:DataFrame = sqlContext.sql("SELECT `Id`,
            members.ProviderOrg FROM interactions WHERE
            arraylen(members.ProviderOrg)='0'");
Deleting Data

Using Data Delete API to delete interactions of specified type by previously defined filter:

import com.reltio.analytics.data.delete.Deleteimport
com.reltio.analytics.data.delete.DeleteInteractionBuilder
val tp = "configuration/interactionTypes/{yourInteractionType}"val
func : Delete = af.dataDelete()
.withBuilder( new DeleteInteractionBuilder()
  .fromDataFrame(dfFiltered)
  .idColumn("Id")
  .ofType(tp) )
.build();
.delete();