Testing Databricks SQL Notebooks: A Practical Guide
Testing notebooks may not sound glamorous, but it’s essential for building robust and scalable data pipelines. Imagine assembling IKEA furniture without instructions — everything might fall apart when you least expect it. Similarly, testing ensures your SQL notebooks run smoothly, delivering reliable and accurate data transformations.
In this article I’ll walk through how to set up a testing framework for Databricks SQL notebooks. The goal? To test individual notebooks in isolation while ensuring they work as expected in a Spark environment
Why Test SQL Notebooks?
Databricks SQL notebooks are powerful tools for managing data transformations. However, without testing, even minor mistakes — like a mismatched schema or an incorrect join — can disrupt your pipeline and lead to hours of debugging.
Testing SQL notebooks helps you:
- Refactor confidently: Update your code without fear of breaking existing workflows.
- Catch errors early: Identify issues before they escalate.
- Maintain pipeline integrity: Ensure each notebook produces the correct output.
What’s in the Test Framework?
To demonstrate, I have created a repository
that showcases a file with MERGE
operation between bronze and silver tables in a SQL notebook and corresponding test. The repository structure for this framework looks like this:
main_testing/
├── tests/
│ ├── integration_tests/
│ │ ├── conftest.py
│ │ └── test_merge_table_silver_target_table.py
│ └── run_integration_tests.py
└── src/
└── notebooks/
└── catalog_dev/
├── schema_bronze/
│ └── create_table_bronze_source_table
└── schema_silver/
├── create_table_silver_target_table
└── merge_table_silver_target_table
How Does the Test Work?
Here’s the flow of individual test:
- Setup: Create the test tables using the same scripts that are used to create production tables, but with different catalog, schema and table name parameters.
- Data Preparation: Insert small, test data into the source and target tables.
- Execute: Run the
MERGE
notebook using the test data. - Validate: Compare the results in the target table against expected outcomes.
- Cleanup: Drop the test tables to ensure isolation for future tests.
Key Principles
- Automation: Tests can run as part of a CI/CD pipeline, ensuring continuous validation.
- Isolation: Each test runs in a self-contained environment, preventing interference.
- Parameterization: Catalogs, schemas, and table names are parameterized to simplify reusability.
- Modularity: If the individual notebook contains only one operation, the corresponding test offers a level of granularity akin to unit testing in the world of data engineering. This approach ensures clarity, simplicity, and maintainability.
Step-by-Step Example
1. Parameterized SQL Notebooks
We begin with parameterized notebooks for creating source and target tables:
CREATE WIDGET TEXT catalog_name DEFAULT 'dev';
CREATE WIDGET TEXT schema_name DEFAULT 'bronze';
CREATE WIDGET TEXT table_name DEFAULT 'source_table';
CREATE OR REPLACE TABLE IDENTIFIER(:catalog_name || '.' || :schema_name || '.' || :table_name)
(
id INT,
name STRING,
created_at TIMESTAMP,
updated_at TIMESTAMP,
status STRING
);
2. Merge Notebook
The MERGE
notebook performs the transformation:
MERGE INTO IDENTIFIER(:catalog_name || '.' || :silver_schema_name || '.' || :target_table_name) AS target
USING IDENTIFIER(:catalog_name || '.' || :bronze_schema_name || '.' || :source_table_name) AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET
target.name = source.name,
target.created_at = source.created_at,
target.updated_at = source.updated_at,
target.status = source.status
WHEN NOT MATCHED THEN
INSERT (id, name, created_at, updated_at, status)
VALUES (source.id, source.name, source.created_at, source.updated_at, source.status);
Fixtures: conftest.py
. These fixtures provide a Spark session and DBUtils instance for testing.
from pyspark.sql import SparkSession
@pytest.fixture(scope="session")
def spark():
spark = SparkSession.builder.appName('integration-tests').getOrCreate()
return spark
@pytest.fixture
def dbutils(spark):
from pyspark.dbutils import DBUtils
return DBUtils(spark)
Main entry point to run all tests: run_integration_tests.py
# Databricks notebook source
%reload_ext autoreload
%autoreload 2
# COMMAND ----------
import pytest
import os
import sys
# Integration tests folder
integration_tests_folder = "integration_tests"
# Avoid writing .pyc files on a read-only filesystem.
sys.dont_write_bytecode = True
# Run pytest on integration tests folder
retcode = pytest.main([integration_tests_folder, "-v", "-p", "no:cacheprovider", "--junit-xml", "/dbfs/tmp/integration_tests/junit.xml"])
# Fail the cell execution if there are any test failures.
assert retcode == 0, "The pytest invocation failed. See the log for details."
Test File: test_merge_table_silver_target_table.py
# tests/integration_tests/test_merge_table_silver_target_table.py
import os
import pytest
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.testing import assertDataFrameEqual
def test_merge_table_silver_target_table(spark, dbutils):
"""
Test to verify the merge operation between bronze and silver tables.
This test performs the following steps:
1. Creates source and target tables by running respective notebooks.
2. Inserts test data into both tables.
3. Runs the merge notebook.
4. Verifies that the target table has the expected data.
5. Cleans up by dropping the created tables and schema.
"""
# **1. Parameterize Test Variables**
catalog_name = "tests"
bronze_schema_name = "test_bronze"
silver_schema_name = "test_silver"
source_table_name = "test_source_table"
target_table_name = "test_target_table"
# **2. Dynamically Calculate Root Path**
try:
notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
except Exception as e:
pytest.fail(f"Unable to retrieve notebook path from dbutils: {e}")
# Calculate the root path relative to the current notebook
root_path = '/Workspace' + os.path.abspath(os.path.join(notebook_path, "../.."))
print(f"Root path: {root_path}")
# **3. Parameterize Notebook Paths**
create_source_notebook = os.path.join(root_path, "src/notebooks/catalog_dev/schema_bronze/create_table_bronze_source_table")
create_target_notebook = os.path.join(root_path, "src/notebooks/catalog_dev/schema_silver/create_table_silver_target_table")
merge_notebook_path = os.path.join(root_path, "src/notebooks/catalog_dev/schema_silver/merge_table_silver_target_table")
# **4. Verify Notebook Existence**
assert os.path.exists(create_source_notebook), f"Notebook does not exist: {create_source_notebook}"
assert os.path.exists(create_target_notebook), f"Notebook does not exist: {create_target_notebook}"
assert os.path.exists(merge_notebook_path), f"Notebook does not exist: {merge_notebook_path}"
# **5. Define Parameters for Creating Tables**
create_source_params = {
"catalog_name": catalog_name,
"schema_name": bronze_schema_name,
"table_name": source_table_name
}
create_target_params = {
"catalog_name": catalog_name,
"schema_name": silver_schema_name,
"table_name": target_table_name
}
try:
# **6. Run Notebooks to Create Tables**
print(f"Running create_source_notebook: {create_source_notebook}")
dbutils.notebook.run(create_source_notebook, timeout_seconds=60, arguments=create_source_params)
print(f"Running create_target_notebook: {create_target_notebook}")
dbutils.notebook.run(create_target_notebook, timeout_seconds=60, arguments=create_target_params)
# **7. Insert Test Data into Source Table**
print("Inserting test data into source table.")
spark.sql(f"""
INSERT INTO {catalog_name}.{bronze_schema_name}.{source_table_name}
VALUES
(1, 'Alpha', current_timestamp(), current_timestamp(), 'active'),
(2, 'Beta', current_timestamp(), current_timestamp(), 'inactive')
""")
# **8. Insert Test Data into Target Table**
print("Inserting test data into target table.")
spark.sql(f"""
INSERT INTO {catalog_name}.{silver_schema_name}.{target_table_name}
VALUES
(1, 'Old Alpha', current_timestamp(), current_timestamp(), 'old_status')
""")
# **9. Define Parameters for Merge Notebook**
merge_notebook_params = {
"catalog_name": catalog_name,
"bronze_schema_name": bronze_schema_name,
"silver_schema_name": silver_schema_name,
"source_table_name": source_table_name,
"target_table_name": target_table_name
}
# **10. Run Merge Notebook**
print(f"Running merge_notebook_path: {merge_notebook_path}")
merge_result = dbutils.notebook.run(merge_notebook_path, timeout_seconds=60, arguments=merge_notebook_params)
print(f"Merge notebook result: {merge_result}")
# **11. Fetch Actual Data from Target Table**
print("Fetching actual data from target table.")
df_actual = spark.sql(f"""
SELECT id, name, status
FROM {catalog_name}.{silver_schema_name}.{target_table_name}
ORDER BY id
""")
# **12. Define Expected Data**
expected_data = [
Row(id=1, name='Alpha', status='active'),
Row(id=2, name='Beta', status='inactive')
]
expected_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('status', StringType(), True)
])
df_expected = spark.createDataFrame(expected_data, expected_schema)
# **13. Compare DataFrame Schemas**
print("Comparing DataFrame schemas.")
assert df_actual.schema == df_expected.schema, "Schemas do not match."
# **14. Compare Actual Data with Expected Data**
print("Asserting that actual data matches expected data.")
assertDataFrameEqual(df_actual, df_expected)
print("Merge notebook ran successfully and data is as expected!")
finally:
# **15. Cleanup: Drop Source and Target Tables and Schema**
print("Cleaning up test environment.")
try:
# Drop source table
spark.sql(f"DROP TABLE IF EXISTS {catalog_name}.{bronze_schema_name}.{source_table_name}")
# Drop target table
spark.sql(f"DROP TABLE IF EXISTS {catalog_name}.{silver_schema_name}.{target_table_name}")
print("Cleanup successful.")
except Exception as e:
print(f"Error during cleanup: {e}")
Why This Matters
By modularizing SQL notebooks and parameterizing table names, we ensure that each operation can be tested independently. This method offers a streamlined approach to testing even the smallest components of your data pipeline while allowing for scalability in a Spark context.
Conclusion
Testing notebooks might not be as flashy as deploying them, but it’s the unsung hero of data engineering. A solid testing framework catches bugs early, ensures data accuracy, and builds confidence in your pipelines. By adopting
References
https://medium.com/@magrathj/integration-testing-databricks-notebooks-with-repos-427a82df6575