Interaction with PySpark
Note
This is an experimental feature. Expect sharp edges and bugs.
Overview
In this tutorial, we show how tasks can be executed on a
Spark-cluster. The key difference of a Task
and a
SparkTask
is that the objects handling the data are
not pandas.DataFrame
but pyspark.sql.DataFrame
objects.
Generally, a SparkTask
creates from the
output files of its input a pyspark.sql.DataFrame
and
when saving to a luisy.targets.CloudTarget
, the respective
spark method is used. Here, the user has to make sure that the spark
cluster has the required permissions to read and write from the
respective locations. Whenever a SparkTask
writes or reads from a
luisy.targets.LocalTarget
, a serialization into a single
pandas.DataFrame
takes place and the user has to make sure
that data fits into memory.
Example pipeline
This is how a spark
- pipeline may looks like:
import luisy
@luisy.deltatable_input(schema='my_schema', catalog='my_catalog', table_name='raw')
@luisy.deltatable_output(schema='my_schema', catalog='my_catalog', table_name='interim')
class TaskA(SparkTask):
def run(self):
df = self.input().read()
df = df.drop('c')
self.write(df)
@luisy.requires(TaskA)
@luisy.deltatable_output(schema='my_schema', catalog='my_catalog', table_name='final')
class TaskB(SparkTask):
def run(self):
df = self.input().read()
df = df.withColumn('f', 2*df.a)
self.write(df)
@luisy.requires(TaskB)
@luisy.final
@luisy.pickle_output
class TaskC(SparkTask):
def run(self):
df = self.input().read()
self.write(df)
Here, TaskA
and TaskB
read and write their data from
and to delta tables and process them with spark. TaskC
,
however, persists its output into a pickle file, which requires
luisy
to serialize all the data to a
pandas.DataFrame
beforehand.
Note
Make sure that the delta-spark extension is installed into your spark cluster. See more here.
Running a pipeline
When the pipeline should be executed within an active python session, running the pipeline can be done as follows:
from luisy.cli import build
build(TaskC(), cloud_mode=True)
In this case, the pyspark.SparkContext()
is automatically
propagated to luisy
from the active session. Alternatively,
if a special spark context has to be used, the spark context need to
be attached to the Config
first as follows:
Config().set_param('spark', some_predefined_spark_context)
For instance, this could be a spark instance created via spark connect:
spark = SparkSession.builder.remote("sc://my-cluster:15002").getOrCreate()
Config().set_param('spark', spark)
Be aware that all LocalTarget
point to
locations on the system of the python session where luisy
runs in.
Using databricks
A convinient way to interact with pyspark clusters is by using the
databricks abstraction through a databricks notebook. Its
also possible to connect from a local session using
databricks_connect
(see Trigger from remote).
Note
When using luisy
in a databricks cluster, additional
charges are generated for the user. The amount of expenses depends
among others on the cloud provider and the cluster configuration.
luisy
has no influences on the generated costs and we
recommend to monitor cloud costs closely.
Note
The tasks itself cannot be implemented within the notebook and need to be implemented in a standalone python package or module. Only execution can be done via a databricks notebook.
Initial configuration
Using luisy
within a databricks cluster, the databricks file
system (dbfs
) can be used as local file system allowing to run
the pipeline completely in
the cloud, even for non-SparkTask
.
working_dir = "/dbfs/FileStore/my_working_dir"
Config().set_param("working_dir", working_dir)
A given pipeline can be executed as follows:
build(SomeTask(), cloud_mode=True)
Here, all SparkTask
objects use the
pyspark cluster of the databricks instance.
Trigger from remote
Using databricks-connect
, cloud pipelines can be triggered
from python sessions outside of databricks. There, a local proxy for the remote spark
session from databricks is created in the local spark. First,
databricks connect needs to be installed.
pip install databricks-connect
Make sure that the version of databricks-connect is compatible with the spark version in the databricks cluster.
To run the cloud pipelines locally, the following parameters need to be set:
spark = DatabricksSession.builder.remote(
host="https://adb-<...>.azuredatabricks.net",
token="<your secret token>",
cluster_id="<cluster id>,
).getOrCreate()
Config().set_param('spark', spark)
Note
The unity catalog needs to be enabled in your databricks instance.