This is collaborative work from Abacus Insights and Beesbridge, the delivery partner for Databricks.
Faced with clinician shortages, an aging population, and stagnant health outcomes, the healthcare industry has the potential to greatly benefit from disruptive technologies such as artificial intelligence. However, high quality and usable data are the lifeblood of any advanced analytics or machine learning system, and in the highly complex healthcare industry, data quality (DQ) has historically been poor, with limited standards and inconsistent implementations. With the potential to impact a patient’s care and health outcomes, making sure healthcare data is of high quality and usable couldn’t have higher stakes. As a result, healthcare companies need to ensure their foundational data is truly usable – defined as accurate, complete, timely, relevant, versatile and use case and application agnostic – before using it to train advanced machine learning models and unlock the promise of artificial intelligence.
Abacus Insights, as a healthcare technology leader with the only data transformation platform and solutions built specifically for health plans, has a core competency and value proposition of integrating data silos and delivering usable data. To solve the data quality challenge, we introduced the concept of Data Grading where the data is augmented with a data quality score, ensuring that downstream consumers of the data understand its degree of usability. The natural corollary of that score is to have Process Transparency that allows our customers to interpret the results. This blog post will walk you through the high-level details of how we have implemented these concepts using Spark, and Delta Lake on the Databricks platform.
Data grading is the outcome of data quality rules that can be applied on the raw data from the source system in Bronze
layer, as well as the data which is transformed and enriched in Silver
layer. The section first explores the construction of a data quality rule engine that enables the end-user in the business to interpret the DQ rule and thereby promotes the process transparency. The section then explores how this engine is implemented using Spark and Delta Lake on the Databricks platform.
In addition, the data quality is continuously improved in Databricks’ medallion architecture as the data is curated with help of the data quality rule engine , and then published as trusted asset to drive the downstream analytics and machine learning use cases. The following diagram shows the high level flow of data quality in the medallion architecture.
The first question that comes to mind is why a grammar is needed to represent data quality rule? One can also build such rules in most of the popular data quality tools/libraries - after all, these tools provide rich visual interface and useful report outputs. However, these tools are not designed to work natively on the Databricks platform that support co-mingled workload made up of Batch and Streaming pipelines. In addition, these tools are not custom-built for delta lakehouse use cases. On one hand, the grammar enables the DataOps engineer to express the data quality rules in a simple and intuitive manner that can be interpreted by a non-technical user. On the other hand, it also allows the generation of efficient Spark code that can be run natively using Dataframe API along with the data engineering pipelines.
Databricks’ platform is built on open standards like Spark and Delta that can be extended easily to support specialized use cases by using other custom libraries, framework, or methodologies. One of the more popular examples is how Databricks were able to bring the power of large language model (LLM) to the platform by releasing Dolly 2.0. Similarly, we have integrated a powerful parse generator, ANTLR with Spark and Delta Lake to build and deploy an English-like grammar for a data quality rule engine. In particular, the distributed compute and open framework of Spark enables the execution of the following end to end flow in a scalable manner.
The implementation of Data Quality Rule engine is constructed using the following components:
Data Quality grammar is expressed using ANTLR grammar. ANTLR is a powerful parser generator for reading, processing, executing, or translating structured text or binary files. It is widely used to build languages, tools, and frameworks. The grammar itself is made up of two parts: Lexer and Parser rules.
Used to define the tokens that are used to build the grammar. Here is the snippet of the tokens and literals used in the grammar.
DataQualityLexer.g4
lexer grammar DataQualityLexer;
// Lexer rules
T_MUST : M U S T;
T_SHOULD : S H O U L D;
//...
T_CONTAINS : C O N T A I N S;
T_NOW : N O W;
T_CONSECUTIVE : C O N S E C U T I V E;
T_HUMAN_NAME : H U M A N '_' N A M E;
// Functions
T_LENGTH : L E N G T H;
T_COUNT : C O U N T;
//...
// Literals
L_GT : '>';
L_ID : [a-zA-Z_][a-zA-Z0-9_]*; //Identifier
// case insensitive keywords
fragment A : ('a' | 'A');
fragment B : ('b' | 'B');
//...
fragment Z : ('z' | 'Z');
Parser Grammar : Used to define the grammar rules. Here is the snippet of the rules used in the grammar.
DataQualityParser.g4
parser grammar DataQualityParser;
table_dq :
L_NL* table L_NL+ (column_dq L_NL+)* column_dq L_NL*
;
table:
L_ID # table_name
;
column_dq :
column severity_type T_NOT? dq_check
;
severity_type:
T_MUST | T_SHOULD
;
//...
// dq checks
dq_check:
T_BE T_IN lookup_check
| T_BE T_IN set_check
| T_BE value_compare
| T_MATCH pattern_check
| T_HAVE whitespace_check
| T_BE null_check
| T_BE human_check
| T_BE past_check
;
//..
Data Quality rules are defined based on the grammar defined above. The rules are grouped together for each entity, and each rule validates an attribute of that entity.
To make the rules as succinct as possible, and yet intuitive for the business user to understand the intent of the rule, we use the following keywords in the rules:
must
and should
indicate the mandatory and recommended quality requirements.be
, not
, less than
, greater than
and have
specify the nature of comparison in the actionhuman_name
, white space
indicate the special quality checks that are not supported by the standard comparison operators.In this example, the rules are defined for the Person
entity and validates attributes Name
, BirthDate
, and Designation
.
Person.dq
Person
Name should be human_name
Name must not have whitespace
BirthDate must be valid date
BirthDate should be less than now
EffDate must be greater than BirthDate
Designation must be in ['Doctor', 'Nurse', 'Pharmacist', 'Technician']
Parser is the component that parses the data quality rules and generates the Rule Abstract Syntax Tree (AST). As an example, the AST for two DQ rules will be represented as shown below.
Building the lexer/parser code ground up can be cumbersome, but ANTLR
makes short work of that by generating the Lexer
, Parser
as well as the base Visitor
(or Listener
) classes in multiple target language.
antlr4 -Dlanguage=Python3 \
-no-listener -visitor \
src/data_quality/antlr/DataQualityLexer.g4 \
src/data_quality/antlr/DataQualityParser.g4
Command to generate the lexer, parser, and visitor base code in Python
After the DQ Rule is parsed into the AST, DQ Visitor traverses the tree and incrementally adds custom behavior for each node in the tree.
The following snippet shows how the user input dq_rules
are converted to an AST tree
which is then used by DataQualityCustomVisitor
to perform data quality validation against the input df: DataFrame
using the same spark: SparkSession
that is used to create the dataframe.
from antlr4 import *
from data_quality.antlr.DataQualityLexer import DataQualityLexer
from data_quality.antlr.DataQualityParser import DataQualityParser
from pyspark.sql import DataFrame, SparkSession, functions as F
# ..
# ..
def apply_dq_rules(dq_rules: str, df: DataFrame, spark: SparkSession = None) -> DataFrame:
input_stream = InputStream(dq_rules)
lexer = DataQualityLexer(input_stream)
stream = CommonTokenStream(lexer)
parser = DataQualityParser(stream)
tree = parser.table_dq()
if parser.getNumberOfSyntaxErrors() != 0:
print("Syntax error")
print(tree.toStringTree(recog=parser))
return
visitor = DataQualityCustomVisitor(df=df, spark=spark)
df_with_rules = visitor.visit(tree)
return df_with_rules
The line visitor.visit(tree)
in above code starts a tree traversal that will invoke the visit implementation of each node of the tree.
Each row of DQ rule corresponds to the following visit function call.
def visitColumn_dq(self, ctx: DataQualityParser.Column_dqContext):
# Start a new DQ Builder
self.currentDqBuilder = DqBuilder(self.df)
# Get the column being checked
self.currentDqBuilder.source_column = self.visit(ctx.column())
# MUST - severe, SHOULD - warning
self.visitSeverity_type(ctx.severity_type())
# continue traversal to children
self.visitChildren(ctx.dq_check())
if ctx.T_NOT():
self.currentDqBuilder.dq_type = "not_" + self.currentDqBuilder.dq_type
self.currentDqBuilder.dq_spark_col = ~self.currentDqBuilder.dq_spark_col
# traversal is completed - construct the DQ column
col_name = self.currentDqBuilder.get_dq_col_name()
spark_col = self.currentDqBuilder.get_dq_spark_col()
if col_name in self.dq_cols_map:
raise ValueError(f"{col_name} is already taken")
self.dq_cols_map[col_name] = spark_col
self.currentDqBuilder = None
The ctx.dq_check()
will invoke a visit to appropriate DQ check node. For instance the check BirthDate should be less than now
will be handled by the following piece fo code will construct a new DQ column on the spark dataframe with the condition col < F.current_timestamp()
.
def visitPast_check(self, ctx: DataQualityParser.Past_checkContext):
col = self.currentDqBuilder.source_column
self.currentDqBuilder.dq_type = "lt_now"
self.currentDqBuilder.dq_spark_col = col < F.current_timestamp()
Another DQ Rule from above: Name should be human_name
will invoke the following DQ check visit
method, that adds a new DF column with a regular expression check.
def visitHuman_check(self, ctx: DataQualityParser.Human_checkContext):
col = self.currentDqBuilder.source_column
self.currentDqBuilder.dq_type = "human_name"
self.currentDqBuilder.dq_spark_col = col.rlike(r"^(?:[A-Za-z\-\']+\s?)*$")
Process Transparency - The Data Quality grammar is a declarative language that is easy to understand and can be shared with the business users. The grammar is also easy to extend and can be used to define other quality checks. Optimized Execution - The DQ rules are converted into optimized Spark code that is lazily evaluated on the same Dataframe that is generated from preceding data transformation steps. Traditional approach of data quality validation is to persist the ETL output and then perform data quality check. Performing data quality checks inline can save this additional round trip to the persistent storage layer.
What is Data Grading? Data grading is the process of assigning a grade to each row of the dataframe based on the data quality check results. In practice, the true data grade is dependent on the final analytical use case. For instance, the data grade for a customer record can be based on the data quality of the customer name, address, and phone number. On other hand, the data grade for a product record can be based on the data quality of the product name, description, and price. Since the data grading subjective to the analytic use case, it is important to retain the data quality results at the row level without filtering out any failed validations.
In the first pass, each data quality check is performed for each row of the input dataframe. The result of the data quality check is a new boolean column that is added to the dataframe. The value of the new column is a boolean that indicates whether the data quality check passed or failed. The data quality check can be either a MUST
or SHOULD
check. The MUST
check is a severe check and the SHOULD
check is a warning check. The column name is constructed using the tested severity type, column name, and data quality check type. For instance, the data quality check Name should be human_name
will add a new column dqw_Name_human_name
to the dataframe. The value of the new column will be True
if the data quality check passed and False
if the data quality check failed. In a similar fashion, the data quality check BirthDate must be less than now
will add a new column dqs_BirthDate_lt_now
to the dataframe. The value of the new column will be True
if the data quality check passed and False
if the data quality check failed.
Name | BirthDate | dqs_Name_human_name | dqw_BirthDate_lt_now |
---|---|---|---|
John | 1982-12-01 | True | True |
Mark | 2042-01-01 | True | False |
Gle9 | 2012-01-01 | False | True |
The above format of storing data is not optimal for the following reasons:
Furthermore, the Audit-Balance-Control
framework in most Data Quality Management (DQM) tools may require the total number of severe and warning failure counts.
The following code snippet converts the above dataframe into a more compact format that is easy to read and understand.
def flatten_dq_results(df_with_rules: DataFrame) -> DataFrame:
# get all the columns for severe errors
dqs_array_col = F.filter(F.array([
F.when(F.col(c).eqNullSafe(F.lit(True)), F.lit(None)).otherwise(F.lit(c[4:]))
for c in df_with_rules.columns if c.startswith('dqs_')
]), lambda x: x.isNotNull())
# get all the columns for warning errors
dqw_array_col = F.filter(F.array([
F.when(F.col(c).eqNullSafe(F.lit(True)), F.lit(None)).otherwise(F.lit(c[4:]))
for c in df_with_rules.columns if c.startswith('dqw_')
]), lambda x: x.isNotNull())
dq_result = F.struct(
F.size(dqs_array_col).alias('SevereCount'),
F.size(dqw_array_col).alias('WarningCount'),
dqs_array_col.alias("SevereErrors"),
dqw_array_col.alias("WarningErrors")
)
df_with_rules = df_with_rules.withColumn('DataQualityResult', dq_result)
df_clean = df_with_rules.drop(*[c for c in df_with_rules.columns if c.startswith('dqw_') or c.startswith('dqs_')])
return df_clean
Once the data quality results are flattened as shown below, the data grade is showcased by the count of severe and warning occurances, indicating the degree of the issue, while also showcasing the specific data quality issue discovered. Abacus Data Platform uses these metrics and grades to further apply a business context grade on the data, while enrichening the data from the Silver
to Gold
layer.
Final table with grade and data quality results
Name | BirthDate | DataQualityResult |
---|---|---|
John | 1982-12-01 | { “SevereCount”: 0, “WarningCount”:0, SevereErrors: [], WarningErrors:[]} |
Mark | 2042-01-01 | { “SevereCount”: 0, “WarningCount”:1, SevereErrors: [], WarningErrors:[“BirthDate_lt_now”]} |
Gle9 | 2012-01-01 | { “SevereCount”: 1, “WarningCount”:0, SevereErrors: [“Name_human_name”], WarningErrors:[]} |
Now as part of an automated pipeline, Data Quality rules can be applied to our data as it moves from Bronze->Silver->Gold, as a natural extension of our data pipelines. Moreover we are tracking with the data all the Data Quality tests, the results and their severity that feeds into also defining the grade or degree of usability of the data. This tied to our overall data governance and telemetry, enables Abacus Insights to deliver on usable data that our customers can depend on to run their business and improve patient outcomes.
Abacus Insights manages healthcare data by breaking down data silos to make a real impact for our customers and their members. Our platform standardizes data across the healthcare ecosystem by providing a highly secure unified data platform that minimizes change management and maximizes analytics enablement to reduce costs and improve outcomes. We provide a data platform that enables our customers to build a data-driven culture and drive better outcomes.
Beesbridge is Databricks’ delivery partner specializing in building scalable and high-performance data infrastructure. We have built a team of solution architects and data engineers, who have helped many organizations unlock the potential of their data on Databricks platform by building innovative solutions.