Processing MongoDB JSON Data with Custom UDTF
When working with nested JSON structures from MongoDB in DataWorks, a User-Defined Table Function (UDTF) can transform complex hierarchical data into flatttened relational format. Consider this JSON structure containing premise and point details:
{
"id": 0,
"premiseDetails": [
{
"premiseId": 0,
"price": 0,
"pointDetails": [
{
"code": "",
"network": 0,
"unitId": 0,
"unitState": true,
"success": false,
"time": "",
"info": ""
}
]
}
]
}
Developing the JSON Parser UDTF
Data synchronization from MongoDB stores the JSON content in fields like id and premise_details. The following UDTF implementation processes and flattens this nested structure:
@Resolve("string->string,string,string,string,string,string,string,string")
public class JsonDataFlattener extends UDTF {
@Override
public void process(Object[] inputObjects) throws UDFException {
String jsonInput = (String) inputObjects[0];
// Clean and normalize JSON format
jsonInput = jsonInput.replaceAll("=", "\":\"")
.replaceAll("Document", "")
.replaceAll("\\{\\{{", "{\"")
.replaceAll("\\}\\}", "\"}")
.replaceAll("]\"},", "]}#")
.replaceAll("\\},\\{{", "}#{")
.replaceAll("\\},", "}@")
.replaceAll(",", "\",\"")
.replaceAll("#", ",")
.replaceAll("@", ",")
.replaceAll("pointDetails\":\"", "pointDetails\":")
.replaceAll("}]\"}]", "}]}]")
.replaceAll(" ", "");
JsonParser jsonProcessor = new JsonParser();
JsonArray rootArray = jsonProcessor.parse(jsonInput).getAsJsonArray();
if (rootArray != null) {
for (JsonElement premiseElement : rootArray) {
JsonObject premiseObject = premiseElement.getAsJsonObject();
String premiseIdentifier = premiseObject.get("premiseId").getAsString();
String costValue = "";
if (premiseObject.has("price") && !premiseObject.get("price").isJsonNull()) {
costValue = premiseObject.get("price").getAsString();
}
JsonArray pointArray = premiseObject.get("pointDetails").getAsJsonArray();
for (JsonElement pointElement : pointArray) {
JsonObject pointObject = pointElement.getAsJsonObject();
String pointCode = "";
String unitIdentifier = "";
String connectivityStatus = "";
String selectionStatus = "";
String timestamp = "";
String additionalInfo = "";
if (pointObject.has("code") && !pointObject.get("code").isJsonNull()) {
pointCode = pointObject.get("code").getAsString();
}
if (pointObject.has("unitId") && !pointObject.get("unitId").isJsonNull()) {
unitIdentifier = pointObject.get("unitId").getAsString();
}
if (pointObject.has("network") && !pointObject.get("network").isJsonNull()) {
connectivityStatus = pointObject.get("network").getAsString();
}
if (pointObject.has("success") && !pointObject.get("success").isJsonNull()) {
selectionStatus = pointObject.get("success").getAsString();
}
if (pointObject.has("time") && !pointObject.get("time").isJsonNull()) {
timestamp = pointObject.get("time").getAsString();
}
if (pointObject.has("info") && !pointObject.get("info").isJsonNull()) {
additionalInfo = pointObject.get("info").getAsString();
}
forward(premiseIdentifier, costValue, pointCode, unitIdentifier,
connectivityStatus, selectionStatus, timestamp, additionalInfo);
}
}
}
}
}
This UDTF implementation transforms nested JSON arrays into individual rows, extracting premise and point details while handling potential null values through conditional checks. The function outputs eight string columns containing the flattened data structure.