Implementing UDTF Functions in Alibaba DataWorks for JSON Data Processing

Processing MongoDB JSON Data with Custom UDTF

When working with nested JSON structures from MongoDB in DataWorks, a User-Defined Table Function (UDTF) can transform complex hierarchical data into flatttened relational format. Consider this JSON structure containing premise and point details:

{
  "id": 0,
  "premiseDetails": [
    {
      "premiseId": 0,
      "price": 0,
      "pointDetails": [
        {
          "code": "",
          "network": 0,
          "unitId": 0,
          "unitState": true,
          "success": false,
          "time": "",
          "info": ""
        }
      ]
    }
  ]
}

Developing the JSON Parser UDTF

Data synchronization from MongoDB stores the JSON content in fields like id and premise_details. The following UDTF implementation processes and flattens this nested structure:

@Resolve("string->string,string,string,string,string,string,string,string")
public class JsonDataFlattener extends UDTF {
    @Override
    public void process(Object[] inputObjects) throws UDFException {
        String jsonInput = (String) inputObjects[0];
        
        // Clean and normalize JSON format
        jsonInput = jsonInput.replaceAll("=", "\":\"")
                .replaceAll("Document", "")
                .replaceAll("\\{\\{{", "{\"")
                .replaceAll("\\}\\}", "\"}")
                .replaceAll("]\"},", "]}#")
                .replaceAll("\\},\\{{", "}#{")
                .replaceAll("\\},", "}@")
                .replaceAll(",", "\",\"")
                .replaceAll("#", ",")
                .replaceAll("@", ",")
                .replaceAll("pointDetails\":\"", "pointDetails\":")
                .replaceAll("}]\"}]", "}]}]")
                .replaceAll(" ", "");

        JsonParser jsonProcessor = new JsonParser();
        JsonArray rootArray = jsonProcessor.parse(jsonInput).getAsJsonArray();

        if (rootArray != null) {
            for (JsonElement premiseElement : rootArray) {
                JsonObject premiseObject = premiseElement.getAsJsonObject();
                String premiseIdentifier = premiseObject.get("premiseId").getAsString();
                String costValue = "";
                
                if (premiseObject.has("price") && !premiseObject.get("price").isJsonNull()) {
                    costValue = premiseObject.get("price").getAsString();
                }

                JsonArray pointArray = premiseObject.get("pointDetails").getAsJsonArray();
                for (JsonElement pointElement : pointArray) {
                    JsonObject pointObject = pointElement.getAsJsonObject();
                    String pointCode = "";
                    String unitIdentifier = "";
                    String connectivityStatus = "";
                    String selectionStatus = "";
                    String timestamp = "";
                    String additionalInfo = "";

                    if (pointObject.has("code") && !pointObject.get("code").isJsonNull()) {
                        pointCode = pointObject.get("code").getAsString();
                    }

                    if (pointObject.has("unitId") && !pointObject.get("unitId").isJsonNull()) {
                        unitIdentifier = pointObject.get("unitId").getAsString();
                    }

                    if (pointObject.has("network") && !pointObject.get("network").isJsonNull()) {
                        connectivityStatus = pointObject.get("network").getAsString();
                    }

                    if (pointObject.has("success") && !pointObject.get("success").isJsonNull()) {
                        selectionStatus = pointObject.get("success").getAsString();
                    }

                    if (pointObject.has("time") && !pointObject.get("time").isJsonNull()) {
                        timestamp = pointObject.get("time").getAsString();
                    }

                    if (pointObject.has("info") && !pointObject.get("info").isJsonNull()) {
                        additionalInfo = pointObject.get("info").getAsString();
                    }

                    forward(premiseIdentifier, costValue, pointCode, unitIdentifier,
                            connectivityStatus, selectionStatus, timestamp, additionalInfo);
                }
            }
        }
    }
}

This UDTF implementation transforms nested JSON arrays into individual rows, extracting premise and point details while handling potential null values through conditional checks. The function outputs eight string columns containing the flattened data structure.

Tags: DataWorks UDTF JSON Processing mongodb Data Transformation

Posted on Sun, 10 May 2026 06:29:39 +0000 by SharkBait