Azure Databricks-Architecture and its related components

Prashanth Kumar
18 min readApr 28, 2020

--

Azure Databricks-Architecture

Azure Databricks Unified Analytics Platform is the result of a joint product/engineering effort between Databricks and Microsoft. It’s available as a managed first-party service on Azure Public Cloud. Along with one-click setup (manual/automated), managed clusters (includingDelta), and collaborative workspaces, the platform has native integration with other Azure first-party services, such as Azure Blob Storage, Azure Data Lake Store (Gen1/Gen2), Azure SQL Data Warehouse, Azure Cosmos DB, Azure Event Hubs, Azure Data Factory, etc., and the list keeps growing

  • The Control Plane — A management layer that resides in a Microsoft-managed Azure subscription and consists of services such as cluster manager, web application, jobs service, etc. Each service has its own mechanism to isolate the processing, metadata, and resources based on a workspace identifier, which is then used to execute every request.
  • The Data Plane — Consists of a locked virtual network (Azure VNET) that’s created in a customer-managed Azure subscription. All clusters are created in that VNET, and any data processing is done on data residing in customer-managed sources.

Bring Your Own VNET

Even though the default-deployment mode works for many, a number of enterprise customers want more control over the service network configuration to comply with internal cloud/data governance policies and/or adhere to external regulations, and/or do networking customizations, such as:

  • Connect Azure Databricks clusters to other Azure data services securely using Azure Service Endpoints
  • Connect Azure Databricks clusters to data sources deployed in private/co-located data centers (on-premises)
  • Restrict outbound traffic from Azure Databricks clusters to specific Azure data services and/or external endpoints only
  • Configure Azure Databricks clusters to use custom DNS
  • Configure a custom CIDR range for the Azure Databricks clusters
  • And more

Azure AD Integration with Databricks

Single Sign-On

Single sign-on (SSO) in the form of Azure Active Directory-backed login is available in Azure Databricks for all customers. See Azure Active Directory Seamless Single Sign-On.

Azure Databricks Cost Controls

This topic describes suggested best practices under different scenarios for Databricks cluster usage and allocation on Azure cloud infrastructure. The suggestions balance usability and cost management.

Terminology and Databricks features

o Instance hour

o On-demand and spot instances

o Autoscaling

o Start cluster

o Auto Termination

Instance hour

On most of the cloud providers, one instance running for one hour is an instance hour. Databricks charges for usage based on Databricks Unit (DBU), a unit of processing capability per hour. One DBU is the processing capability of one hour of one r3.xlarge (memory optimized) or one c3.2xlarge (compute optimized) AWS instance. More details can be found at Databricks Pricing.

For example, if you create a Databricks cluster with one driver node and 3 worker nodes of type r3.xlarge and run the cluster for 2 hours, you compute the DBU as follows:

· Total instance hour = total number of nodes (1 + 3) * number of hours (2) = 8

· Total charge = AWS cost for 8 instance hours of r3.xlarge + 8 DBU cost

Databricks charges up to the second for billing and charges fractional DBU costs.

On-demand and spot instances

Amazon has two tiers of EC2 instances: on-demand and spot. For on-demand instances you pay for compute capacity by the second with no long-term commitments. Spot instances allow you to bid on spare Amazon EC2 computing capacity and choose the maximum price you are willing to pay. Spot pricing changes in real-time based on the supply and demand on AWS compute capacity. If the current spot market price is above the max bid price, the spot instances are terminated. Since spot instances are often available at a discount compared to on-demand pricing, for the same budget you can significantly reduce the cost of running your applications, grow your application’s compute capacity, and increase throughput.

Databricks supports creating clusters using a combination of on-demand and spot instances (with custom spot price) allowing you to tailor your cluster according to your use cases.

For example, the preceding configurations specify that the driver node and 4 worker nodes should be launched as on-demand instances and the remaining 4 workers should be launched as spot instances (bidding at 100% of the on-demand price).

We recommend launching the cluster so that the Spark driver is on an on-demand instance, which allows saving the state of the cluster even after losing spot instance nodes. If you choose to use all spot instances (including the driver), any cached data or table will be deleted when you lose the driver instance due to changes in the spot market.

Another important setting to note is the option Spot fall back to On-demand. If you are running a hybrid cluster (that is, a mix of on-demand and spot instances), and if spot instance acquisition fails or you lose the spot instances, Databricks falls back to using on-demand instances and provides you with the desired capacity. Without this option, you will lose the capacity provided by the spot instances for the cluster causing delay or failure of your workload. We recommend setting the mix of on-demand and spot instances in your cluster based on the criticality, tolerance to delays and failures due to loss of instances, and cost sensitivity for each type of use case.

Tip

You can use the Amazon Spot Bid Advisor to determine suitable bid price for your instance type and region.

Autoscaling

Autoscaling automatically adds and removes worker nodes in response to changing workloads to optimize resource usage. With autoscaling enabled, Databricks automatically chooses the appropriate number of workers required to run your Spark job. Autoscaling makes it easier to achieve high cluster utilization as you do not need to worry about the exact provisioning of cluster to match workloads. This can offer two advantages:

  1. The workloads can run faster compared to running a constant sized under-provisioned cluster.
  2. You can reduce overall costs compared to a statically sized cluster.

When you enable autoscaling, you can specify the minimum and maximum number of nodes in the cluster.

The preceding configuration enables the autoscaling feature with the cluster varying from 8–20 nodes, of which 5 (including the driver node) are on-demand and the rest spot instances.

Start cluster

The Start cluster feature allows restarting previously terminated clusters while retaining their original configuration (cluster ID, number of instances, type of instances, spot versus on-demand mix, IAM role, libraries to be installed, and so on). You can restart a cluster:

· In the terminated cluster list

· In the cluster detail page

Auto Termination

Auto termination lets you define and use idle conditions (say idle for three hours) to terminate a cluster for cost savings.

Spark Cluster

Azure Databricks clusters provide a unified platform for various use cases such as running production ETL pipelines, streaming analytics, ad-hoc analytics, and machine learning.

Azure Databricks has two types of clusters: interactive and job. You use interactive clusters to analyze data collaboratively with interactive notebooks. You use job clusters to run fast and robust automated jobs.

With Azure Databricks, you can easily create and manage clusters using the UI, the CLI, and by invoking the Clusters API.

Cluster Node Types

A cluster consists of one driver node and worker nodes. You can pick separate cloud provider instance types for the driver and worker nodes, although by default the driver node uses the same instance type as the worker node. Different families of instance types fit different use cases, such as memory-intensive or compute-intensive workloads.

Worker node

Azure Databricks workers run the Spark executors and other services required for the proper functioning of the clusters. When you distribute your workload with Spark, all of the distributed processing happens on workers.

Tip

To run a Spark job, you need at least one worker. If a cluster has zero workers, you can run non-Spark commands on the driver, but Spark commands will fail.

Azure Databricks maps cluster node instance types to compute units known as DBUs. See the instance type pricing page for a list of the supported instance types and their corresponding DBUs. For instance provider information, see Azure instance type specifications and pricing.

Azure Databricks will always provide one year’s deprecation notice before ceasing support for an instance type.

Master Node:

The machine on which the Spark Standalone cluster manager runs is called the Master Node.

Executor node

Executors are worker nodes’ processes in charge of running individual tasks in a given Spark job. They are launched at the beginning of a Spark application and typically run for the entire lifetime of an application. Once they have run the task they send the results to the driver. They also provide in-memory storage for RDDs that are cached by user programs through Block Manager.

APPLICATION EXECUTION FLOW

With this in mind, when you submit an application to the cluster with spark-submit this is what happens internally:

1. A standalone application starts and instantiates a SparkContext instance (and it is only then when you can call the application a driver).

2. The driver program ask for resources to the cluster manager to launch executors.

3. The cluster manager launches executors.

4. The driver process runs through the user application. Depending on the actions and transformations over RDDs task are sent to executors.

5. Executors run the tasks and save the results.

6. If any worker crashes, its tasks will be sent to different executors to be processed again. In the book “Learning Spark: Lightning-Fast Big Data Analysis” they talk about Spark and Fault Tolerance:

Driver node

The driver maintains state information of all notebooks attached to the cluster. The driver node is also responsible for maintaining the SparkContext and interpreting all the commands you run from a notebook or a library on the cluster. The driver node also runs the Apache Spark master that coordinates with the Spark executors.

The default value of the driver node type is the same as the worker node type. You can choose a larger driver node type with more memory if you are planning to collect() a lot of data from Spark workers and analyze them in the notebook.

Tip

Since the driver node maintains all of the state information of the notebooks attached, make sure to detach unused notebooks from the driver.

Spark Context:

SparkContext is the entry gate of Apache Spark functionality. The most important step of any Spark driver application is to generate SparkContext. It allows your Spark Application to access Spark Cluster with the help of Resource Manager (YARN/Mesos). To create SparkContext, first SparkConf should be made. The SparkConf has a configuration parameter that our Spark driver application will pass to SparkContext.
In this Apache Spark tutorial, we will deeply understand what is SparkContext in Spark. How to create SparkContext Class in Spark with the help of Spark-Scala word count program. We will also learn various tasks of SparkContext and how to stop SparkContext in Apache Spark.

· SparkContext is the entry point of Spark functionality. The most important step of any Spark driver application is to generate SparkContext. It allows your Spark Application to access Spark Cluster with the help of Resource Manager. The resource manager can be one of these three- Spark Standalone, YARN, Apache Mesos.

· If you want to create SparkContext, first SparkConf should be made. The SparkConf has a configuration parameter that our Spark driver application will pass to SparkContext. Some of these parameter defines properties of Spark driver application. While some are used by Spark to allocate resources on the cluster, like the number, memory size, and cores used by executor running on the worker nodes.
In short, it guides how to access the Spark cluster. After the creation of a SparkContext object, we can invoke functions such as textFile, sequenceFile, parallelize etc. The different contexts in which it can run are local, yarn-client, Mesos URL and Spark URL.
Once the SparkContext is created, it can be used to create RDDs, broadcast variable, and accumulator, ingress Spark service and run jobs. All these things can be carried out until SparkContext is stopped.

To Summarize:

a SparkContext is a conduit to access all Spark functionality; only a single SparkContext exists per JVM. The Spark driver program uses it to connect to the cluster manager, to communicate, submit Spark jobs and knows what resource manager to communicate to (In a spark cluster your resource managers can be YARN, Mesos or Standalone) . SparkContext allows you to configure Spark configuration parameters. And through SparkContext, the driver can access other contexts such as SQLContext, HiveContext, and StreamingContext to program Spark.

A Spark context comes with many useful methods for creating RDDs, loading data, and is the main interface for accessing Spark runtime.

Additional notes:

Creation of the Spark context occurs either when you run the Spark shell in which the sparkcontext will already be preconfigured or by the spark api used by your spark application.

Once the driver’s started, it configures an instance of SparkContext. When running a Spark REPL shell, the shell is the driver program. Your Spark context is already preconfigured and available as a sc variable. When running a standalone Spark application by submitting a jar file, or by using Spark API from another program, your Spark application starts and configures the Spark context.

There can be only one Spark context per JVM.

NOTE Although the configuration option spark.driver.allowMultipleContexts exists, it’s misleading because usage of multiple Spark contexts is discouraged. This option’s used only for Spark internal tests and we recommend you don’t use that option in your user programs. If you do, you may get unexpected results while running more than one Spark context in a single JVM.

Spark Session

Spark session is a unified entry point of a spark application from Spark 2.0. It provides a way to interact with various spark’s functionality with a lesser number of constructs. Instead of having a spark context, hive context, SQL context, now all of it is encapsulated in a Spark session.

Some History….

Prior Spark 2.0, Spark Context was the entry point of any spark application and used to access all spark features and needed a sparkConf which had all the cluster configs and parameters to create a Spark Context object. We could primarily create just RDDs using Spark Context and we had to create specific spark contexts for any other spark interactions. For SQL SQLContext, hive HiveContext, streaming Streaming Application. In a nutshell, Spark session is a combination of all these different contexts. Internally, Spark session creates a new SparkContext for all the operations and also all the above-mentioned contexts can be accessed using the SparkSession object.

How do I create a Spark session?

A Spark Session can be created using a builder pattern.

import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("SparkSessionExample") .master("local[4]") .config("spark.sql.warehouse.dir", "target/spark-warehouse").enableHiveSupport().getOrCreate

The spark session builder will try to get a spark session if there is one already created or create a new one and assigns the newly created SparkSession as the global default. Note that enableHiveSupport here is similar to creating a HiveContext and all it does is enables access to Hive metastore, Hive serdes, and Hive udfs.

Note that, we don’t have to create a spark session object when using spark-shell. It is already created for us with the variable spark.

scala> sparkres1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2bd158ea

We can access spark context and other contexts using the spark session object.

scala> spark.sparkContextres2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@6803b02dscala> spark.sqlContextres3: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@74037f9b

Accessing Spark’s configuration:

We can still access the spark’s configurations using spark session the same way as we used using spark conf .

Cluster deployment methods

For most people starting out they will be running in local mode with Spark running on their local machine. If you use Spark on the Azure Data Science VM this is what you will get.

Instructor delivery note: You just need to explain that the various components run in different locations depending on the type of cluster deployment. This slide is mainly for reference. You do not need to go over each of the cluster deployment types.

Note here that the Driver, Worker, Executor, and Master run in different locations depending on the type of cluster deployment.

Anatomy of a Spark Application

Spark application contains several components, all of which exists whether you are running spark on single machine or across a cluster of hundreds or thousands of nodes.

The components of the spark application are Driver, the Master, the Cluster Manager and the Executors. I will explain the components in following sections.

All of the spark components including the driver, master, executor processes run in java virtual machines(Jvms). A JVM is a cross platform runtime engine that an execute the instructions compiled into java bytecode. Scala, which spark is written in, compiles into bytecode and runs on JVMS.

An anatomy of a Spark application usually comprises of Spark operations, which can be either transformations or actions on your data sets using Spark’s RDDs, DataFrames or Datasets APIs.

For example, in your Spark app, if you invoke an action, such as collect() or take() on your DataFrame or Dataset, the action will create a job. A job will then be decomposed into single or multiple stages; stages are further divided into individual tasks; and tasks are units of execution that the Spark driver’s scheduler ships to Spark Executors on the Spark worker nodes to execute in your cluster. Often multiple tasks will run in parallel on the same executor, each processing its unit of partitioned dataset in its memory.

Graphic walkthrough:

Your Spark application calls the collect() action on your dataframe or dataset

This action causes Job #1 to be created

The spark driver breaks the job down into stages

Note that all stages run serially except stages 3 & 4 which can run in parallel

Each stage is broken down into tasks

Tasks are scheduled and distributed by the Spark Driver among the Spark Executors on the Spark Workers.

Workspaces

A Workspace is an environment for accessing all of your Azure Databricks assets. A Workspace organizes notebooks, libraries, dashboards, and experiments into folders and provides access to data objects and computational resources. By default, the Workspace and all of its contents are available to all users, but each user also has a private home folder that is not shared. You can control who can view, edit, and run objects in the Workspace by enabling Workspace access control.

You can create and manage the Workspace using the UI, the CLI, and by invoking the Workspace API. This topic focuses on performing Workspace tasks using the UI. For the other methods, see Databricks CLI and Workspace API.

· Folders

o Special folders

o Folder actions

· Objects

o Move an object

o Delete an object

o Restore an object

· Recently used objects

· Search the Workspace

Notebooks

A notebook is a web-based interface to a document that contains runnable code, visualizations, and narrative text. Notebooks are one interface for interacting with Azure Databricks.

· Managing Notebooks

o Create a notebook

o Delete a notebook

o Control access to a notebook

o Notebook external formats

o Notebooks and clusters

o Schedule a notebook

o Distributing notebooks

· Using Notebooks

o Develop notebooks

o Run notebooks

o Manage notebook state and results

o Version control

· Notebook Workflows

o API

o Example

o Pass structured data

o Handle errors

o Run multiple notebooks concurrently

· Dashboards

o Dashboards notebook

o Create a scheduled job to refresh a dashboard

o View a specific dashboard version

· Widgets

o Widget types

o Widget API

o Configure widget settings

o Widgets in dashboards

o Use widgets with %run

· Package Cells

o Package Cells in Notebooks

· Databricks Advisor

o View advice

o Advice settings

Package Cells

To use custom Scala classes and objects defined within notebooks reliably in Spark and across notebook sessions, you should define classes in package cells. A package cell is a cell that is compiled when it is run. A package cell has no visibility with respect to the rest of the notebook. You can think of it as a separate Scala file

Jobs

A job is a way of running a notebook or JAR either immediately or on a scheduled basis. You can create and run jobs using the UI, the CLI, and by invoking the Jobs API. Similarly, you can monitor job run results in the UI, using the CLI, by querying the API, and through email alerts.

RDD- Resilent Distributed Datasets

RDD was the primary user-facing API in Spark since its inception. At the core, an RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.

5 Reasons on When to use RDDs

  1. You want low-level transformation and actions and control on your dataset;
  2. Your data is unstructured, such as media streams or streams of text;
  3. You want to manipulate your data with functional programming constructs than domain specific expressions;
  4. You don’t care about imposing a schema, such as columnar format while processing or accessing data attributes by name or column; and
  5. You can forgo some optimization and performance benefits available with DataFramesand Datasets for structured and semi-structured data.

Transformations

Source Databricks

In Spark, the core data structures are immutable meaning they cannot be changed once created. This might seem like a strange concept at first, if you cannot change it, how are you supposed to use it? In order to “change” a DataFrame you will have to instruct Spark how you would like to modify the DataFrame you have into the one that you want. These instructions are called transformations.

Transformations are the core of how you will be expressing your business logic using Spark. There are two types of transformations, those that specify narrow dependencies and those that specify wide dependencies.

Transformations consisting of narrow dependencies (we’ll call them narrow transformations) are those where each input partition will contribute to only one output partition.

A wide dependency (or wide transformation) style transformation will have input partitions contributing to many output partitions. You will often hear this referred to as a shuffle where Spark will exchange partitions across the cluster. With narrow transformations, Spark will automatically perform an operation called pipelining on narrow dependencies, this means that if we specify multiple filters on DataFrames they’ll all be performed in-memory. The same cannot be said for shuffles. When we perform a shuffle, Spark will write the results to disk. You’ll see lots of talks about shuffle optimization across the web because it’s an important topic but for now all you need to understand are that there are two kinds of transformations.

Actions

Actions compute result based on an RDD

The results can be returned to the program or written to a file

The RDD is recomputed each time an action is invoked

DataFrames

Source Databricks

A DataFrame is the most common Structured API and simply represents a table of data with rows and columns. The list of columns and the types in those columns the schema. A simple analogy would be a spreadsheet with named columns. The fundamental difference is that while a spreadsheet sits on one computer in one specific location, a Spark DataFrame can span thousands of computers. The reason for putting the data on more than one computer should be intuitive: either the data is too large to fit on one machine or it would simply take too long to perform that computation on one machine.

The DataFrame concept is not unique to Spark. R and Python both have similar concepts. However, Python/R DataFrames (with some exceptions) exist on one machine rather than multiple machines. This limits what you can do with a given DataFrame in python and R to the resources that exist on that specific machine. However, since Spark has language interfaces for both Python and R, it’s quite easy to convert to Pandas (Python) DataFrames to Spark DataFrames and R DataFrames to Spark DataFrames (in R).

Datasets

Source Databricks

Datasets are a type-safe version of Spark’s structured API for Java and Scala. This API is not available in Python and R, because those are dynamically typed languages, but it is a powerful tool for writing large applications in Scala and Java.

Recall that DataFrames are a distributed collection of objects of type Row, which can hold various types of tabular data. The Dataset API allows users to assign a Java class to the records inside a DataFrame, and manipulate it as a collection of typed objects, similar to a Java ArrayList or Scala Seq. The APIs available on Datasets are type-safe, meaning that you cannot accidentally view the objects in a Dataset as being of another class than the class you put in initially. This makes Datasets especially attractive for writing large applications where multiple software engineers must interact through well-defined interfaces.

The Dataset class is parametrized with the type of object contained inside: Dataset in Java and Dataset[T] in Scala. As of Spark 2.0, the types T supported are all classes following the JavaBean pattern in Java, and case classes in Scala. These types are restricted because Spark needs to be able to automatically analyze the type T and create an appropriate schema for the tabular data inside your Dataset.

To summarize:

Starting in Spark 2.0, the DataFrame APIs are merged with Datasets APIs, unifying data processing capabilities across all libraries. Because of unification, developers now have fewer concepts to learn or remember, and work with a single high-level and type-safe API called Dataset. Conceptually, the Spark DataFrame is an alias for a collection of generic objects Dataset[Row], where a Row is a generic untyped JVM object. Dataset, by contrast, is a collection of strongly-typed JVM objects, dictated by a case class you define, in Scala or Java.

Catalyst Optimizer

At the core of Spark SQL is the Catalyst optimizer, which leverages advanced programming language features (e.g. Scala’s pattern matching and quasi quotes) in a novel way to build an extensible query optimizer.

Catalyst is based on functional programming constructs in Scala and designed with these key two purposes:

  • Easily add new optimization techniques and features to Spark SQL
  • Enable external developers to extend the optimizer (e.g. adding data source specific rules, support for new data types, etc.)

Catalyst contains a general library for representing trees and applying rules to manipulate them. On top of this framework, it has libraries specific to relational query processing (e.g., expressions, logical query plans), and several sets of rules that handle different phases of query execution: analysis, logical optimization, physical planning, and code generation to compile parts of queries to Java bytecode. For the latter, it uses another Scala feature, quasiquotes, that makes it easy to generate code at runtime from composable expressions. Catalyst also offers several public extension points, including external data sources and user-defined types. As well, Catalyst supports both rule-based and cost-based optimization.

To summarize:

http://spark-packages.org is a community package index to track the growing number of open source packages and libraries that work with Apache Spark. Spark Packages makes it easy for users to find, discuss, rate, and install packages for any version of Spark and makes it easy for developers to contribute packages.

Spark Packages features integrations with various data sources, management tools, higher level domain-specific libraries, machine learning algorithms, code samples, and other Spark content. Examples packages include Spark-CSV (which is now included in Spark 2.0) and Spark ML integration packages including GraphFrames and TensorFrames.

Databricks Runtime for Machine Learning

Azure Databricks provides Databricks Runtime for Machine Learning (Databricks Runtime ML), a machine learning runtime that contains multiple popular libraries, including TensorFlow, PyTorch, Keras, and XGBoost. It also supports distributed training using Horovod. Databricks Runtime ML provides a ready-to-go environment for machine learning and data science, freeing you from having to install and configure these libraries on your cluster.

· MLib: Machine Learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, as well as underlying optimization primitives.

· GraphX: Graphs and graph computation for a broad scope of use cases from cognitive analytics to data exploration.

--

--

Prashanth Kumar
Prashanth Kumar

Written by Prashanth Kumar

IT professional with 20+ years experience, feel free to contact me at: Prashanth.kumar.ms@outlook.com

No responses yet