WrapperTask and ConcatenationTask
Triggering multiple Task instances
Luigi and luisy are using specific parameters to build unique pipelines. The same pipeline can be executed multiple times with different parameters, having different output. Different than usual, parameters of tasks are declared in the class scope.
This upcoming pipeline demonstrates the usage of parameters in luisy. Other than luigi, luisy
only uses the decorators inherits
and
requires()
to pass parameters between tasks. This keeps the code
clean and prevents a lot of boilerplate-code.
As always we start with an luisy.tasks.base.ExternalTask
to begin with. This time we
assume we’ve exported some .parquet files to our system in our <working_dir>/raw directory. The
files are stored in separate folders, where each folder contains data of one month of data in
parquet format.
/ └── └── raw ├── Partition=2021-01 ├── part-0001.parquet ├── part-0002.parquet └── part-0003.parquet ├── Partition=2021-02 └── ... ├── ... ├── Partition=2021-09 └── ... └── Partition=2021-10 └── ...
This directory structure can be solved with the following ExternalTask
.
Note that we use the parquetdir_output()
decorator to tell luisy we want
to parse multiple .parquet files. The only code we need to write is the dynamic folder names
Partition=<month> which are taking the month Parameter that is passed to the task. luisy will
then check on execution, whether the directory of the executed month exists:
import luisy
@luisy.raw
@luisy.parquetdir_output
class ExportDirectory(luisy.ExternalTask):
"""
Input directory with (multiple) parquet files
"""
month = luisy.MonthParameter()
def get_folder_name(self):
return f"Partition={format(get_month_str(self.month))}"
With the ExternalTask setup, we can now use a Task
to read these
parquet files and store them as a DataFrame into a pickle for further processing. We will still
be working on monthly basis that we can make use of the parallelization of luigi
. This
can be useful when you are going to do some computations on the data like throwing out NaN values
for example. The parameter month does not need to be defined again in this task, because it’s
derived from ExportDirectory by using the decorator requires()
.
The data we’ve read is now saved in ‘<working_dir>/<project_name>/interim’ directory,
because this is basically no raw data anymore. The filename of our Task is automatically
generated by auto_filename()
. This decorator is used by default for every
luisy.tasks.base.Task
, resulting in the filename ‘MergeExport_month=<month>
.<output_type>’. So if this task is called multiple times with different values for the
parameter month, we are getting dynamic filenames. The output_type is defined by an output
decorators. In our case the file will be saved as a pickle file (‘.pkl’). See more about output
decorators at Decorators.
@luisy.interim
@luisy.requires(ExportDirectory)
class MergeExport(luisy.Task):
"""
Merges all input parquet files from same month into one pickle File
"""
def run(self):
logger.info(f'Start merge of month \"{format(get_month_str(self.month))}\"')
df = self.input().read()
logger.info(f'Export to \"{self.output().path}\"')
self.write(df)
With the implementation of this task, we can now do any stuff we want on the data by just requiring the task above. The upcoming task for example removes NaN values and selects some columns in the dataframe. The columns are defined as a ListParameter and can therefore as a luisy Parameter.
@luisy.requires(MergeExport)
@luisy.interim
class SomeDataSelection(luisy.Task):
column_names = luisy.ListParameter(default=None)
def run(self):
df = self.input.read()
df = df.dropna()
if column_names is not None:
df = df[column_names]
self.write(df)
Concatenation Task
To finish up the pipeline above we finally want to merge all data into one file. We do this by
applying a ConcatenationTask
, which requires SomeDataSelection tasks
for multiple months. A ConcatenationTask
needs its required tasks to
have a dataframe saved in their output, because the data is merged with concat()
.
from dateutil import rrule
@luisy.inherits(SomeDataSelection)
@luisy.final
@luisy.auto_filename(excl_none=True)
class MonthConcatenator(luisy.ConcatenationTask):
month_start = luigi.MonthParameter(default=luigi.date_interval.Month(2021, 1))
month_stop = luigi.MonthParameter(default=luigi.date_interval.Month(2021, 10))
def month_iterator(self, date_start, date_stop):
"""
Iterates through given months
"""
for dt in rrule.rrule(rrule.MONTHLY, dtstart=date_start, until=date_stop):
yield datetime(dt.year, dt.month, 1)
def requires(self):
return [
self.clone(
header.SomeDataSelection,
month=month
) for month in month_iterator(self.month_start, self.month_stop)
]
WrapperTask
As an alternative to the ConcatenationTask we have a possibility to keep the data separately without concatenating it. This often makes sense when you are working with a huge amount of data, which does not fit in your local memory. In this case we dont want to use some ConcatenationTask or anything else that merges our data into one huge file.
In this usecase we can use a WrapperTask instead to have a trigger for our pipeline. WrapperTasks
don’t have a run()
method implemented and only use
requires()
and therefore have no output as well.
The advantage of using WrapperTasks is to have a trigger for your dynamic dependencies like the
month parameter above:
@luisy.inherits(SomeDataSelection)
@luisy.final
class MonthWrapper(luisy.WrapperTask):
month_start = luigi.MonthParameter(default=luigi.date_interval.Month(2021, 1))
month_stop = luigi.MonthParameter(default=luigi.date_interval.Month(2021, 10))
def month_iterator(self, date_start, date_stop):
"""
Iterates through given months
"""
for dt in rrule.rrule(rrule.MONTHLY, dtstart=date_start, until=date_stop):
yield datetime(dt.year, dt.month, 1)
def requires(self):
return [
self.clone(
header.SomeDataSelection,
month=month
) for month in month_iterator(self.month_start, self.month_stop)
]
Dynamic Dependencies
Due to these dynamic dependencies in the tasks above we need to write the
luisy.Task.requires()
function by ourselves. To achieve this, we define two new
arguments in our final task month_start
and month_stop
. Using these
arguments we can now start executing our pipeline for different timespans.
Using the inherits
decorator we clone all params
(column_names
and month
) from their parent task
SomeDataSelection
. Usually we dont want the
WrapperTask
and ConcatenationTask
to
have dynamic parameters (like month
) inherited because they are irrelevant in
upstream tasks (Information is contained in month_start
and month_stop
).
This can be done, by not using the clone()
function and calling
the Tasks constructor by itself. But this would require us to pass all other params manually into
the
constructor. In the following case we want to use clone()
to inherit
the parameter column_names
. Hence in the case of a ConcatenationTask
we tell our
auto_filename()
function to exclude parameters with the value None
from the filename, which then creates the filename
‘MonthConcatenator_month-start=2021-01_month_stop=2021-10_column_names=A-B-C’.
Triggering different executions
Taking the code above, we are now able to run the pipeline with different parameters for different results. Having all observations from 2021-01 to 2021-10 of the columns ‘ProcessingDate’ and ‘ID’ in one file would be triggered by:
luisy [project_name].[module] MonthConcatenator --month-start='2021-01' --month-stop='2021-10'
--column-names='["ProcessingDate", "ID"]'
By changing the parameters after your first execution we can adjust our output:
luisy [project_name].[module] MonthConcatenator --month-start='2021-01' --month-stop='2021-05'
--column-names='["ProcessingDate", "ID", "ProcessResult"]'
This will also contain the ProcessResult in our final Dataframe and only consider data between january and may.
If we don’t want the result to be merged into one file we would use the WrapperTask we defined above:
luisy [project_name].[module] MonthWrapper --month-start='2021-01' --month-stop='2021-10'
--column-names='["ProcessingDate", "ID"]'