Getting Started with Reltio Data Science

To work with Reltio Data Science you will need details such as Software Development Kit (SDK), account information, initializing connection between Qubole and Reltio, creating datasets and so on.

You can use Reltio Data Science to apply complex data and graph analytics against reliable data in Reltio Connected Cloud. This topic contains all the information you need to get started with Reltio Data Science.

Location of the SDK

The master script automatically downloads all the required SDKs and installs them to the cluster s3://reltio.master/scripts/prod/master-prod-script.sh.

Enabling and Accessing the SDK

Reltio Manufacturing enables and configures Reltio Data Science SDK for Reltio owned Qubole account.

You must work with Reltio Manufacturing to get access to Reltio’s scripts and SDKs.

Accessing the Latest Version of SDK

You can update to the latest version of the SDK by performing the following steps:
  1. Login to Qubole.
  2. Navigate to Cluster.
  3. Select Cluster Node bootstrap and update the shell script with custom changes, if any. s3://reltio.dw/qubole/scripts/hadoop/pipelines/attach-jars-pipelines-Reltio Data Science-2020111017154.sh.

Necessary Permissions

Define user roles (role name and description) whenever needed. The following system roles provide access control for Reltio Insights Service:
  • ROLE_ANALYTICS
  • ROLE_ANALYTICS_DEVELOPER
For more information, see System Roles in Reltio Insights Service

Key Account Information

Qubole Account: Reltio Manufacturing team provides access to Reltio owned Qubole account.

Reltio Refresh Token: For more information, see Obtaining an Access Token.
Note: If you are using a third party identity provider, see Getting a Reltio Access Token from a Third Party IDP.

Initializing Connection

Use the following code to initialize connection between Qubole and Reltio tenant:
val accessToken = "697b3ed4-97fb-490a-9ff8-0557747deaff"
val analyticsUrl = "https://san-01.reltio.com/analytics"
val tenantId = "rajeshm"
import com.reltio.analytics.framework._
val af = AnalyticsFramework.login(sqlContext, analyticsUrl, tenantId, accessToken)

Getting the Initial Schema in Qubole Dataframe

Use the following code to view the initial schema:
val hcpDf = af.entities("configuration/entityTypes/HCP")
hcpDf.printSchema

Creating Datasets

You can create your own datasets from the initial dataframe by using the respective code for the following scenarios:
  • Selecting only entity types
    val hcpDf = af.entities("configuration/entityTypes/HCP").select("Id", "Type", "attributes.FirstName", "crosswalks")
    hcpDf.show
    
  • Selecting only specific entity URIs
    val hcpDf = af.entities("configuration/entityTypes/HCP").select("Id", "Type", "attributes.FirstName", "crosswalks")
    hcpDf.createOrReplaceTempView("hcpView")
    val hcpResults= sqlContext.sql("SELECT * FROM hcpView WHERE Id IN ('0N3W9Xg', '2R7sgTG')")
    hcpResults.show
    
  • Filtering by crosswalk for entities
    val hcpDf = af.entities("configuration/entityTypes/HCP").select("Id", "Type", "attributes.FirstName", "crosswalks")
    hcpDf.createOrReplaceTempView("hcpView")
    val hcpResults= sqlContext.sql("SELECT * FROM hcpView WHERE array_contains(crosswalks.source, 'configuration/sources/NPI')")
    hcpResults.show
    
  • Counting all objects (entities, relationships, matches, etc.)
    • By type
      val hcpDf = af.entities("configuration/entityTypes/HCP").select("Id", "Type")
      hcpDf.createOrReplaceTempView("hcpView")
      val hcpResults= sqlContext.sql("SELECT COUNT(*) FROM hcpView")
      hcpResults.show
      
    • By crosswalk
      val hcpDf = af.entities("configuration/entityTypes/HCP").select("Id", "Type",  "crosswalks")
      hcpDf.createOrReplaceTempView("hcpView")
      val hcpResults= sqlContext.sql("SELECT COUNT(*) FROM hcpView WHERE array_contains(crosswalks.source, 'configuration/sources/NPI')")
      hcpResults.show
      
  • Creating your dataset with selected columns (10 out of X attributes) and filters (only ov=true, and crosswalk = a, b)
    • Using nested and reference attributes
      val hcpDf = af.entities("configuration/entityTypes/HCP", false)
      .lateralView("attributes.Phone.Number" -> "PhoneNumber", "attributes.Address.AddressLine1" -> "AddressLine1")
      .select("Id", "Type", "PhoneNumber", "attributes.LastName", "AddressLine1")
      
    • Using lateral view (Explode)
      val hcpDf = af.entities("configuration/entityTypes/HCP", false)
      .lateralView("attributes.FirstName" -> "FirstName", "attributes.Address.Zip" -> "Zip")
      .select("Id", "Type", "FirstName", "attributes.LastName", "Zip")
      
Note: To handle common errors, see Using the Data Access API. For more information about building a dataset, relationships and interactions, see the following topics:

Using Incremental Data Access API

You can use the Incremental DataAccess API in the following scenarios:
  • For timestamp
    val hcpDf = af.entities("configuration/entityTypes/HCO", false, "20h")
  • For events (deleted and deltawindow)
    val hcpDf = af.entities("configuration/entityTypes/HCO", false, "20h", true)
  • For Entities
    val hcpDf = af.entities("configuration/entityTypes/HCO", false, "20h")
  • For Relations
    val hasAddressDf = af.relations("configuration/relationTypes/HasAddress", false, "20h")
  • For Interactions
    val activityDf = af.interactions("configuration/interactionTypes/Activity", false, "20h")

Running Queries on Your Dataset

Reltio includes the following sample queries/notebooks:
  • To get count and URIs of all created entities, relations and interactions
    val hcpDf = af.entities("configuration/entityTypes/HCO").select("Id","createdTime")
    hcpDf.createOrReplaceTempView("hcpDf")
    %sql select count(*) from hcpDf where createdTime > CAST('2020-03-13 00:00:00' AS TIMESTAMP)
    
  • To get count and URIs of all updated entities, relations and interactions
    val hcpDf = af.entities("configuration/entityTypes/HCO").select("Id","updatedTime")
    hcpDf.createOrReplaceTempView("hcpDf")
    %sql select count(*) from hcpDf where updatedTime > CAST('2020-03-13 00:00:00' AS TIMESTAMP)
    
  • To get count and URIs of all deleted entities, relations and interactions
    val hcpDf = af.entities("configuration/entityTypes/HCO",  false, null, true).select("Id","deletedTime")
    hcpDf.createOrReplaceTempView("hcpDf")
    %sql select count(*) from hcpDf where updatedTime > CAST('2020-03-13 00:00:00' AS TIMESTAMP)
    
  • To filter by OV only, Type (entity type, reation type), Crosswalk and Date/Time
    val hcpDf = af.entities("configuration/entityTypes/HCO",  false).select("Id","crosswalks")
    hcpDf.createOrReplaceTempView("hcpView")
    val hcpResults= sqlContext.sql("SELECT COUNT(*) FROM hcpView WHERE array_contains(crosswalks.source, 'configuration/sources/NPI')")
    hcpResults.show
    

Writing Data

You can write data using DataPersist in the following scenarios:
  • To define analytical attributes in L3 use the following code:
    "entityTypes": [
            {
                "uri": "configuration/entityTypes/Organization",
    "attributes": [
        ],
                "analyticsAttributes": [
                    {
                        "uri": "configuration/entityTypes/Organization/analyticsAttributes/MosquitoIndex",
                        "label": "Mosquito Index",
                        "name": "MosquitoIndex",
                        "type": "Int",
                        "maxOccurs": 1,
                        "faceted": true,
                        "searchable": true
                    },
     ]
    
  • To write into a specific entity type:

    Data persist API writes data to only one particular entityType. It cannot persist data to multiple entityTypes. Separate jobs need to be triggered to persist data to multiple entityTypes.

  • To write into entities with certain type of crosswalks:
    • Data persist API writes data to the Reltio platform based on the entityUri in the input dataFrame. It is not based on crosswalks.
    • Crosswalk filter can be applied on DataAccess by using the following code:
      val hcpDf = af.entities("configuration/entityTypes/HCP").select("Id", "Type",  "crosswalks")
      hcpDf.createOrReplaceTempView("hcpView")
      val hcpResults= sqlContext.sql("SELECT COUNT(*) FROM hcpView WHERE array_contains(crosswalks.source, 'configuration/sources/NPI')")
      hcpResults.show
      
      Note: The filtered dataFrame result has entityUri which can be used as input to DataPersist API.
For more information about creating a mapping between analytical attributes and calculated fields, see Data Persist API.