Interaction with PySpark

Note

This is an experimental feature. Expect sharp edges and bugs.

Overview

In this tutorial, we show how tasks can be executed on a Spark-cluster. The key difference of a Task and a SparkTask is that the objects handling the data are not pandas.DataFrame but pyspark.sql.DataFrame objects.

Generally, a SparkTask creates from the output files of its input a pyspark.sql.DataFrame and when saving to a luisy.targets.CloudTarget, the respective spark method is used. Here, the user has to make sure that the spark cluster has the required permissions to read and write from the respective locations. Whenever a SparkTask writes or reads from a luisy.targets.LocalTarget, a serialization into a single pandas.DataFrame takes place and the user has to make sure that data fits into memory.

Example pipeline

This is how a spark- pipeline may looks like:

import luisy

@luisy.deltatable_input(schema='my_schema', catalog='my_catalog', table_name='raw')
@luisy.deltatable_output(schema='my_schema', catalog='my_catalog', table_name='interim')
class TaskA(SparkTask):

    def run(self):
        df = self.input().read()
        df = df.drop('c')
        self.write(df)


@luisy.requires(TaskA)
@luisy.deltatable_output(schema='my_schema', catalog='my_catalog', table_name='final')
class TaskB(SparkTask):

    def run(self):
        df = self.input().read()
        df = df.withColumn('f', 2*df.a)
        self.write(df)

@luisy.requires(TaskB)
@luisy.final
@luisy.pickle_output
class TaskC(SparkTask):
    def run(self):
        df = self.input().read()
        self.write(df)

Here, TaskA and TaskB read and write their data from and to delta tables and process them with spark. TaskC, however, persists its output into a pickle file, which requires luisy to serialize all the data to a pandas.DataFrame beforehand.

Note

Make sure that the delta-spark extension is installed into your spark cluster. See more here.

Running a pipeline

When the pipeline should be executed within an active python session, running the pipeline can be done as follows:

from luisy.cli import build

build(TaskC(), cloud_mode=True)

In this case, the pyspark.SparkContext() is automatically propagated to luisy from the active session. Alternatively, if a special spark context has to be used, the spark context need to be attached to the Config first as follows:

Config().set_param('spark', some_predefined_spark_context)

For instance, this could be a spark instance created via spark connect:

spark = SparkSession.builder.remote("sc://my-cluster:15002").getOrCreate()
Config().set_param('spark', spark)

Be aware that all LocalTarget point to locations on the system of the python session where luisy runs in.

Using databricks

A convinient way to interact with pyspark clusters is by using the databricks abstraction through a databricks notebook. Its also possible to connect from a local session using databricks_connect (see Trigger from remote).

Note

When using luisy in a databricks cluster, additional charges are generated for the user. The amount of expenses depends among others on the cloud provider and the cluster configuration. luisy has no influences on the generated costs and we recommend to monitor cloud costs closely.

Note

The tasks itself cannot be implemented within the notebook and need to be implemented in a standalone python package or module. Only execution can be done via a databricks notebook.

Initial configuration

Using luisy within a databricks cluster, the databricks file system (dbfs) can be used as local file system allowing to run the pipeline completely in the cloud, even for non-SparkTask.

working_dir = "/dbfs/FileStore/my_working_dir"
Config().set_param("working_dir", working_dir)

A given pipeline can be executed as follows:

build(SomeTask(), cloud_mode=True)

Here, all SparkTask objects use the pyspark cluster of the databricks instance.

Trigger from remote

Using databricks-connect, cloud pipelines can be triggered from python sessions outside of databricks. There, a local proxy for the remote spark session from databricks is created in the local spark. First, databricks connect needs to be installed.

pip install databricks-connect

Make sure that the version of databricks-connect is compatible with the spark version in the databricks cluster.

To run the cloud pipelines locally, the following parameters need to be set:

spark = DatabricksSession.builder.remote(
    host="https://adb-<...>.azuredatabricks.net",
    token="<your secret token>",
    cluster_id="<cluster id>,
).getOrCreate()

Config().set_param('spark', spark)

Note

The unity catalog needs to be enabled in your databricks instance.