How to build end-to-end data pipeline on Databricks

How to build end-to-end data pipeline on Databricks

August 14, 2024

Image of the author

Nicolás Colmenares

Full Stack developer

In today’s digital era, the ability to seamlessly transform raw data into actionable insights is a game-changer. Imagine you're leading a growing company, where every strategic move depends on having the right data at the right time. Yet, your data is dispersed across different systems, unstructured and difficult to integrate. Every attempt to make sense of it feels like navigating a maze, and the pressure to deliver insights grows by the day.

This was the reality for many organizations like Airbnb, who needed to manage and analyze vast amounts of data from millions of listings, bookings, and user interactions to improve their services and user experience.That was like trying to piece together a complex puzzle without all the pieces. But what if you could streamline this entire process, making your data not just manageable, but also a driving force for innovation?

This is where data pipelines come in. These pipelines are designed to streamline the flow of data, from raw source files to structured outputs ready for analysis, helping organizations transform chaos into clarity. With a well-built pipeline, not only can you integrate diverse data sources, but you can also automate the extraction, transformation, and loading (ETL) of data, making it easy to unlock its true potential.

In this article, we will walk you through the process of creating a data pipeline using Databricks. From data ingestion to transformation and analysis, you’ll learn how to build a pipeline that automates workflows, ensures data quality, and turns your raw data into a valuable asset.

Understanding Data Pipelines and ETL Workflows

A data pipeline encompasses all the steps needed to move data from source store, transform it according to specific requirements, and save it in a target store, turning raw data into a usable format for analysis and usage. A common example of a data pipeline is the Extract, Transform, and Load (ETL) workflow, where data is extracted from source stores, transformed to ensure quality by removing duplicates or fields and even creating new ones, and load stored in a data warehouse or data lake for further use.

Requirements to follow the steps

Step-by-Step Guide:

  1. Create a cluster: Once you’re logged in, you’ll see databricks platform. Go to compute section and click to ‘create compute’ or ‘create cluster’, depending on the version you have.
Compute

Enter a name, in this case we used ‘Demo-data-pipeline’ and then click to ‘Create’ button

create compute
  1. Ingest the data: Go to workspace section to create a notebook and double click to enter a name, in this case we use ‘Ingest data'. Make sure on the right side is connected to the cluster created before.
ingest data notebook

Load the data you want to work with to create a table for future process and click in ‘Run Cell’ to validate it:

from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField

# Define variables used in the code below
file_path = "/databricks-datasets/songs/data-001/"
table_name = "raw_song_data"
checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/raw_song_data"

schema = StructType(
  [
    StructField("artist_id", StringType(), True),
    StructField("artist_lat", DoubleType(), True),
    StructField("artist_long", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("duration", DoubleType(), True),
    StructField("end_of_fade_in", DoubleType(), True),
    StructField("key", IntegerType(), True),
    StructField("key_confidence", DoubleType(), True),
    StructField("loudness", DoubleType(), True),
    StructField("release", StringType(), True),
    StructField("song_hotnes", DoubleType(), True),
    StructField("song_id", StringType(), True),
    StructField("start_of_fade_out", DoubleType(), True),
    StructField("tempo", DoubleType(), True),
    StructField("time_signature", DoubleType(), True),
    StructField("time_signature_confidence", DoubleType(), True),
    StructField("title", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("partial_sequence", IntegerType(), True)
  ]
)

(spark.readStream
  .format("cloudFiles")
  .schema(schema)
  .option("cloudFiles.format", "csv")
  .option("sep","\t")
  .load(file_path)
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name)
)
  1. Prepare the data: Create a new notebook in order to prepare your data for future analysis and make sure to select SQL language, in this case we used ‘Prepare data’:
prepare data notebook

Add a query to create a new table with the information you want to work with from the table created before and click in ‘Run Cell’ to execute it:

CREATE OR REPLACE TABLE
  song_data (
    artist_id STRING,
    artist_name STRING,
    duration DOUBLE,
    release STRING,
    tempo DOUBLE,
    time_signature DOUBLE,
    title STRING,
    year DOUBLE,
    processed_time TIMESTAMP
  );

INSERT INTO
  song_data
SELECT
  artist_id,
  artist_name,
  duration,
  release,
  tempo,
  time_signature,
  title,
  year,
  current_timestamp()
FROM
  raw_song_data
  1. Query transformed data: Create a notebook in order to add and run queries to analyze the data loaded. The queries depends on the needs that you have.
analyze notebook

4. Create a Databricks Job to Run the Pipeline: A databricks job helps to automate your data pipeline, which includes ingestion, processing, and analysis.

  1. Access Workflows: Click the Workflows icon in the sidebar and select "Create Job," or click "New" and choose "Job."
  2. Configure the Job:
    • Enter a job name
    • For the first task, name it and set the type to "Notebook”.
    • Select the notebook for data ingestion and choose the cluster and create it.
  3. Add More Tasks:
    • Click "Add Task" to add a new task, name it and set it to "Notebook," select the data preparation notebook, and choose the cluster. Click "Create."
    • Repeat this process for the analysis task, selecting the data analysis notebook.
  4. Run the Workflow: Click "Run Now" to execute the job.
  5. View Results: Once the workflow completes, click the final data analysis task to see the output and query results.
  6. Schedule a job: Run Databricks notebooks as production scripts by adding them as a task in a Databricks job.
    1. Click Workflows in the sidebar.
    2. In the “Name” column, click the job name. The side panel displays the “Job details”.
    3. Click “Add trigger” in the “Job details” panel and select “Scheduled” in “Trigger type”.
    4. Specify the period, starting time, and time zone and click “Save”.

Conclusion

Databricks makes managing data a lot easier by turning a tangled mess into clear, actionable insights. Instead of wrestling with scattered and unstructured data, you can use Databricks to bring everything together smoothly. This means less time fighting with data and more time using it to drive your business forward. With Databricks, what used to be a headache becomes a powerful tool for innovation.

References

What Is ETL?

Build an end-to-end data pipeline in Databricks