I was requested to build an ETL pipeline in Azure. This pipeline should
- read ORC file submitted by the vendor to ADLS
- parse the PARAMS field, existing in the ORC structure, where JSON structure is stored, and add it as two new fields (KEY, VALUE) to the output
- write the output to the Azure SQL database
The problem is, that there are different types of JSONs structures used by the different types of records. I do not want to write a custom expression per each of the class of JSON struct (there would be like hundreds of them). Rather, I'm looking for a generic mechanism, that will be able to parse them apart of the type of the input JSON structure.
At the moment, to fulfill this requirement, I was using the ADF built-in connector for ORC. The process in its current design:
- Use a copy activity that reads ORC and moves data to Azure SQL database
Use the following TSQL statement as part of stored procedure executed after the 1. to parse the PARAMS field content
SELECT uuid, AttrName = a1.[key] + COALESCE('.' + a2.[key], '') + COALESCE('.' + a3.[key], '') + COALESCE('.' + a4.[key], ''), AttrValue = COALESCE(a4.value, a3.value, a2.value, a1.value) FROM ORC.EventsSnapshot_RawData OUTER APPLY OPENJSON(params) a1 OUTER APPLY ( SELECT [key], value, type FROM OPENJSON(a1.value) WHERE ISJSON(a1.value) = 1 ) a2 OUTER APPLY ( SELECT [key], value, type FROM OPENJSON(a2.value) WHERE ISJSON(a2.value) = 1 ) a3 OUTER APPLY ( SELECT [key], value, type FROM OPENJSON(a3.value) WHERE ISJSON(a3.value) = 1 ) a4
The number of required OUTER APPLY statements is determined at the beginning by counting occurrences of "[" in the PARAMS field value and then used to dynamically generate the SQL executed via sp_executesql
Unfortunately, this approach is quite inefficient in terms of execution time, as for 11 MM of records it takes c.a. 3.5 hours to finish
Someone suggested me to use Data Bricks. Ok, so I:
created the notebook with the following python code to read ORC from ADLS and materialize it to Data Bricks table
orcfile = "/mnt/adls/.../Input/*.orc" eventDf = spark.read.orc(orcfile) #spark.sql("drop table if exists ORC.Events_RawData") eventDf.write.mode("overwrite").saveAsTable("ORC.Events_Raw")
- now I'm trying to find out a code that would give the result I get from TSQL OPENJSONs. I started with Python code that utilizes recursion to parse the PARAMS attribute, however, it is even more inefficient than TSQL in terms of execution speed.
Can you please suggest me the correct way of achieving the goal, i.e. converting the PARAMS attribute to KEY, VALUE attributes in a generic way?
[EDIT] Please find below a sample JSON structures that needs to be standarized into the expected structure
Sample1
{
"correlationId": "c3xOeEEQQCCA9sEx7-u6FA",
"eventCreateTime": "2020-05-12T15:38:23.717Z",
"time": 1589297903717,
"owner": {
"ownergeography": {
"city": "abc",
"country": "abc"
},
"ownername": {
"firstname": "abc",
"lastname": "def"
},
"clientApiKey": "xxxxx",
"businessProfileApiKey": null,
"userId": null
},
"campaignType": "Mobile push"
}
Sample2
{
"correlationIds": [
{
"campaignId": "iXyS4z811Rax",
"correlationId": "b316233807ac68675f37787f5dd83871"
}
],
"variantId": 1278915,
"utmCampaign": "",
"ua.os.major": "8"
}
Sample3
{
"correlationId": "ls7XmuuiThWzktUeewqgWg",
"eventCreateTime": "2020-05-12T12:40:20.786Z",
"time": 1589287220786,
"modifiedBy": {
"clientId": null,
"clientApiKey": "xxx",
"businessProfileApiKey": null,
"userId": null
},
"campaignType": "Mobile push"
}