1

I was requested to build an ETL pipeline in Azure. This pipeline should

  1. read ORC file submitted by the vendor to ADLS
  2. parse the PARAMS field, existing in the ORC structure, where JSON structure is stored, and add it as two new fields (KEY, VALUE) to the output
  3. write the output to the Azure SQL database

The problem is, that there are different types of JSONs structures used by the different types of records. I do not want to write a custom expression per each of the class of JSON struct (there would be like hundreds of them). Rather, I'm looking for a generic mechanism, that will be able to parse them apart of the type of the input JSON structure.

At the moment, to fulfill this requirement, I was using the ADF built-in connector for ORC. The process in its current design:

  1. Use a copy activity that reads ORC and moves data to Azure SQL database
  2. Use the following TSQL statement as part of stored procedure executed after the 1. to parse the PARAMS field content

    SELECT uuid, 
           AttrName = a1.[key] + 
                        COALESCE('.' + a2.[key], '') + 
                        COALESCE('.' + a3.[key], '') + 
                        COALESCE('.' + a4.[key], ''), 
           AttrValue = COALESCE(a4.value, a3.value, a2.value, a1.value)
    FROM ORC.EventsSnapshot_RawData
         OUTER APPLY OPENJSON(params) a1
                                      OUTER APPLY
    (
        SELECT [key], 
               value, 
               type
        FROM OPENJSON(a1.value)
        WHERE ISJSON(a1.value) = 1
    ) a2
      OUTER APPLY
    (
        SELECT [key], 
               value, 
               type
        FROM OPENJSON(a2.value)
        WHERE ISJSON(a2.value) = 1
    ) a3
      OUTER APPLY
    (
        SELECT [key], 
               value, 
               type
        FROM OPENJSON(a3.value)
        WHERE ISJSON(a3.value) = 1
    ) a4
    

The number of required OUTER APPLY statements is determined at the beginning by counting occurrences of "[" in the PARAMS field value and then used to dynamically generate the SQL executed via sp_executesql

Unfortunately, this approach is quite inefficient in terms of execution time, as for 11 MM of records it takes c.a. 3.5 hours to finish

Someone suggested me to use Data Bricks. Ok, so I:

  1. created the notebook with the following python code to read ORC from ADLS and materialize it to Data Bricks table

        orcfile = "/mnt/adls/.../Input/*.orc"
        eventDf = spark.read.orc(orcfile)
        #spark.sql("drop table if exists  ORC.Events_RawData")
        eventDf.write.mode("overwrite").saveAsTable("ORC.Events_Raw")
    
    1. now I'm trying to find out a code that would give the result I get from TSQL OPENJSONs. I started with Python code that utilizes recursion to parse the PARAMS attribute, however, it is even more inefficient than TSQL in terms of execution speed.

Can you please suggest me the correct way of achieving the goal, i.e. converting the PARAMS attribute to KEY, VALUE attributes in a generic way?

[EDIT] Please find below a sample JSON structures that needs to be standarized into the expected structure

Sample1

    {
    "correlationId": "c3xOeEEQQCCA9sEx7-u6FA",
    "eventCreateTime": "2020-05-12T15:38:23.717Z",
    "time": 1589297903717,
    "owner": {
        "ownergeography": {
            "city": "abc",
            "country": "abc"
        },
        "ownername": {
            "firstname": "abc",
            "lastname": "def"
        },
        "clientApiKey": "xxxxx",
        "businessProfileApiKey": null,
        "userId": null
    },
    "campaignType": "Mobile push"
}

Sample2

{
    "correlationIds": [
        {
            "campaignId": "iXyS4z811Rax",
            "correlationId": "b316233807ac68675f37787f5dd83871"
        }
    ],
    "variantId": 1278915,
    "utmCampaign": "",
    "ua.os.major": "8"
    }

Sample3

{
    "correlationId": "ls7XmuuiThWzktUeewqgWg",
    "eventCreateTime": "2020-05-12T12:40:20.786Z",
    "time": 1589287220786,
    "modifiedBy": {
        "clientId": null,
        "clientApiKey": "xxx",
        "businessProfileApiKey": null,
        "userId": null
    },
    "campaignType": "Mobile push"
}

Sample expected output (Spark dataFrame) enter image description here

3
  • 1
    Would you please provide a reduced but realistic example of your JSON (better two or three examples in order to see the different type / various strcutrues? And please add the expected output (which should be the same for any input assumably). Commented Apr 1, 2020 at 7:42
  • @Shnugo Sorry for the delay. I edited the post adding sample json values that I need to process
    – kamilzet_
    Commented May 14, 2020 at 10:04
  • Hi, I just decided - because I think this might be something people are looking for more often - to share this approach as a self-answered question. Please follow this link to find a better formatted version of my answer below... Commented May 14, 2020 at 15:21

1 Answer 1

0

Well, this is your get all and everything approach :-)

First we create a declared table variable and fill it with your samples to simuate your issue (please try to provide this yourself the next time).

DECLARE @table TABLE(ID INT IDENTITY, AnyJSON NVARCHAR(MAX));
INSERT INTO @table VALUES
(N' {
    "correlationId": "c3xOeEEQQCCA9sEx7-u6FA",
    "eventCreateTime": "2020-05-12T15:38:23.717Z",
    "time": 1589297903717,
    "owner": {
        "ownergeography": {
            "city": "abc",
            "country": "abc"
        },
        "ownername": {
            "firstname": "abc",
            "lastname": "def"
        },
        "clientApiKey": "xxxxx",
        "businessProfileApiKey": null,
        "userId": null
    },
    "campaignType": "Mobile push"
}')
,(N'{
    "correlationIds": [
        {
            "campaignId": "iXyS4z811Rax",
            "correlationId": "b316233807ac68675f37787f5dd83871"
        }
    ],
    "variantId": 1278915,
    "utmCampaign": "",
    "ua.os.major": "8"
    }')
,(N'{
    "correlationId": "ls7XmuuiThWzktUeewqgWg",
    "eventCreateTime": "2020-05-12T12:40:20.786Z",
    "time": 1589287220786,
    "modifiedBy": {
        "clientId": null,
        "clientApiKey": "xxx",
        "businessProfileApiKey": null,
        "userId": null
    },
    "campaignType": "Mobile push"
}');

--The query

WITH recCTE AS
(
    SELECT ID
          ,CAST(1 AS BIGINT) AS ObjectIndex
          ,CAST(N'000' COLLATE DATABASE_DEFAULT AS NVARCHAR(MAX)) SortString
          ,1 AS NestLevel
          ,CAST(CONCAT(N'Root-',ID,'.') COLLATE DATABASE_DEFAULT AS NVARCHAR(MAX)) AS JsonPath
          ,CAST(N'$' COLLATE DATABASE_DEFAULT AS NVARCHAR(MAX)) AS JsonKey
          ,CAST(AnyJSON COLLATE DATABASE_DEFAULT AS NVARCHAR(MAX)) AS JsonValue 
          ,CAST(CASE WHEN ISJSON(AnyJSON)=1 THEN AnyJSON COLLATE DATABASE_DEFAULT ELSE NULL END AS NVARCHAR(MAX)) AS NestedJSON 
    FROM @table t

    UNION ALL

    SELECT r.ID
          ,ROW_NUMBER() OVER(ORDER BY (SELECT NULL))
          ,CAST(CONCAT(r.SortString,STR(ROW_NUMBER() OVER(ORDER BY (SELECT NULL)),3)) AS NVARCHAR(MAX))
          ,r.NestLevel+1
          ,CAST(CONCAT(r.JsonPath, A.[key] + N'.') COLLATE DATABASE_DEFAULT AS NVARCHAR(MAX))
          ,CAST(A.[key] COLLATE DATABASE_DEFAULT AS NVARCHAR(MAX))
          ,r.JsonValue  COLLATE DATABASE_DEFAULT
          ,CAST(A.[value] COLLATE DATABASE_DEFAULT AS NVARCHAR(MAX))
    FROM recCTE r
    CROSS APPLY OPENJSON(r.NestedJSON) A
    WHERE ISJSON(r.NestedJSON)=1
)
SELECT ID
      ,JsonPath
      ,JsonKey
      ,NestedJSON AS JsonValue
FROM recCTE 
WHERE ISJSON(NestedJSON)=0
ORDER BY recCTE.ID,SortString;

The result

+---+----------------------------------------+-----------------+----------------------------------+
| 1 | Root-1.correlationId.                  | correlationId   | c3xOeEEQQCCA9sEx7-u6FA           |
+---+----------------------------------------+-----------------+----------------------------------+
| 1 | Root-1.eventCreateTime.                | eventCreateTime | 2020-05-12T15:38:23.717Z         |
+---+----------------------------------------+-----------------+----------------------------------+
| 1 | Root-1.time.                           | time            | 1589297903717                    |
+---+----------------------------------------+-----------------+----------------------------------+
| 1 | Root-1.owner.ownergeography.city.      | city            | abc                              |
+---+----------------------------------------+-----------------+----------------------------------+
| 1 | Root-1.owner.ownergeography.country.   | country         | abc                              |
+---+----------------------------------------+-----------------+----------------------------------+
| 1 | Root-1.owner.ownername.firstname.      | firstname       | abc                              |
+---+----------------------------------------+-----------------+----------------------------------+
| 1 | Root-1.owner.ownername.lastname.       | lastname        | def                              |
+---+----------------------------------------+-----------------+----------------------------------+
| 1 | Root-1.owner.clientApiKey.             | clientApiKey    | xxxxx                            |
+---+----------------------------------------+-----------------+----------------------------------+
| 1 | Root-1.campaignType.                   | campaignType    | Mobile push                      |
+---+----------------------------------------+-----------------+----------------------------------+
| 2 | Root-2.correlationIds.0.campaignId.    | campaignId      | iXyS4z811Rax                     |
+---+----------------------------------------+-----------------+----------------------------------+
| 2 | Root-2.correlationIds.0.correlationId. | correlationId   | b316233807ac68675f37787f5dd83871 |
+---+----------------------------------------+-----------------+----------------------------------+
| 2 | Root-2.variantId.                      | variantId       | 1278915                          |
+---+----------------------------------------+-----------------+----------------------------------+
| 2 | Root-2.utmCampaign.                    | utmCampaign     |                                  |
+---+----------------------------------------+-----------------+----------------------------------+
| 2 | Root-2.ua.os.major.                    | ua.os.major     | 8                                |
+---+----------------------------------------+-----------------+----------------------------------+
| 3 | Root-3.correlationId.                  | correlationId   | ls7XmuuiThWzktUeewqgWg           |
+---+----------------------------------------+-----------------+----------------------------------+
| 3 | Root-3.eventCreateTime.                | eventCreateTime | 2020-05-12T12:40:20.786Z         |
+---+----------------------------------------+-----------------+----------------------------------+
| 3 | Root-3.time.                           | time            | 1589287220786                    |
+---+----------------------------------------+-----------------+----------------------------------+
| 3 | Root-3.modifiedBy.clientApiKey.        | clientApiKey    | xxx                              |
+---+----------------------------------------+-----------------+----------------------------------+
| 3 | Root-3.campaignType.                   | campaignType    | Mobile push                      |
+---+----------------------------------------+-----------------+----------------------------------+

The idea in short:

  • we use a recursive CTE to walk this down.
  • The query will test any fragment ([value] coming from OPENJSON) for being valid JSON.
  • If the fragment is valid, this walks deeper and deeper.
  • The column SortString is needed to get a final sort order.

Come back, if you have any open questions.

3
  • @Shungo I'm just a bit worried about the performance of your solution. Currently, using my "hardcoded" approach with 4x CROSS APPLY OPENJSON(AnyJSON) applied on the table with c.a.3.8 MM records stored, I was able to produce 108 MM output records in less than 1h37m. Definitely I will test your approach, to collect a reference exec statistics. Let's see what will we get.
    – kamilzet_
    Commented May 14, 2020 at 15:27
  • @kamilzet_, sure the performance won't be great... A recursive CTE is nothing else than a hidden RBAR, kind of a loop... The verb iterative would fit better anyway... However, my approach is not limited in depth. In the end yours and mine might be pretty the same under the hood... Commented May 14, 2020 at 15:36
  • @kamilzet_, in the linked answer I added some corrections / enhancements. Especially the Json Path is now correctly reflecting arrays. If you do not need all this in your output, you might shorten my approach. And be aware, that your approach might appear in any sort order, this could mix up jsons with multiple nested objects. Commented May 14, 2020 at 16:03

Not the answer you're looking for? Browse other questions tagged or ask your own question.