WrapperTask and ConcatenationTask

Triggering multiple Task instances

Luigi and luisy are using specific parameters to build unique pipelines. The same pipeline can be executed multiple times with different parameters, having different output. Different than usual, parameters of tasks are declared in the class scope.

This upcoming pipeline demonstrates the usage of parameters in luisy. Other than luigi, luisy only uses the decorators inherits and requires() to pass parameters between tasks. This keeps the code clean and prevents a lot of boilerplate-code.

As always we start with an luisy.tasks.base.ExternalTask to begin with. This time we assume we’ve exported some .parquet files to our system in our <working_dir>/raw directory. The files are stored in separate folders, where each folder contains data of one month of data in parquet format.

/
└──
   └── raw
      ├── Partition=2021-01
          ├── part-0001.parquet
          ├── part-0002.parquet
          └── part-0003.parquet
      ├── Partition=2021-02
          └── ...
      ├── ...
      ├── Partition=2021-09
          └── ...
      └── Partition=2021-10
          └── ...

This directory structure can be solved with the following ExternalTask. Note that we use the parquetdir_output() decorator to tell luisy we want to parse multiple .parquet files. The only code we need to write is the dynamic folder names Partition=<month> which are taking the month Parameter that is passed to the task. luisy will then check on execution, whether the directory of the executed month exists:

import luisy

@luisy.raw
@luisy.parquetdir_output
class ExportDirectory(luisy.ExternalTask):
    """
    Input directory with (multiple) parquet files
    """
    month = luisy.MonthParameter()

    def get_folder_name(self):
        return f"Partition={format(get_month_str(self.month))}"

With the ExternalTask setup, we can now use a Task to read these parquet files and store them as a DataFrame into a pickle for further processing. We will still be working on monthly basis that we can make use of the parallelization of luigi. This can be useful when you are going to do some computations on the data like throwing out NaN values for example. The parameter month does not need to be defined again in this task, because it’s derived from ExportDirectory by using the decorator requires().

The data we’ve read is now saved in ‘<working_dir>/<project_name>/interim’ directory, because this is basically no raw data anymore. The filename of our Task is automatically generated by auto_filename(). This decorator is used by default for every luisy.tasks.base.Task, resulting in the filename ‘MergeExport_month=<month> .<output_type>’. So if this task is called multiple times with different values for the parameter month, we are getting dynamic filenames. The output_type is defined by an output decorators. In our case the file will be saved as a pickle file (‘.pkl’). See more about output decorators at Decorators.

@luisy.interim
@luisy.requires(ExportDirectory)
class MergeExport(luisy.Task):
"""
Merges all input parquet files from same month into one pickle File
"""

    def run(self):
        logger.info(f'Start merge of month \"{format(get_month_str(self.month))}\"')
        df = self.input().read()
        logger.info(f'Export to \"{self.output().path}\"')
        self.write(df)

With the implementation of this task, we can now do any stuff we want on the data by just requiring the task above. The upcoming task for example removes NaN values and selects some columns in the dataframe. The columns are defined as a ListParameter and can therefore as a luisy Parameter.

@luisy.requires(MergeExport)
@luisy.interim
class SomeDataSelection(luisy.Task):
    column_names = luisy.ListParameter(default=None)

    def run(self):
        df = self.input.read()
        df = df.dropna()
        if column_names is not None:
            df = df[column_names]
        self.write(df)

Concatenation Task

To finish up the pipeline above we finally want to merge all data into one file. We do this by applying a ConcatenationTask, which requires SomeDataSelection tasks for multiple months. A ConcatenationTask needs its required tasks to have a dataframe saved in their output, because the data is merged with concat().

from dateutil import rrule

@luisy.inherits(SomeDataSelection)
@luisy.final
@luisy.auto_filename(excl_none=True)
class MonthConcatenator(luisy.ConcatenationTask):
    month_start = luigi.MonthParameter(default=luigi.date_interval.Month(2021, 1))
    month_stop = luigi.MonthParameter(default=luigi.date_interval.Month(2021, 10))

    def month_iterator(self, date_start, date_stop):
        """
        Iterates through given months
        """
        for dt in rrule.rrule(rrule.MONTHLY, dtstart=date_start, until=date_stop):
            yield datetime(dt.year, dt.month, 1)

    def requires(self):
        return [
            self.clone(
                header.SomeDataSelection,
                month=month
            ) for month in month_iterator(self.month_start, self.month_stop)
        ]

WrapperTask

As an alternative to the ConcatenationTask we have a possibility to keep the data separately without concatenating it. This often makes sense when you are working with a huge amount of data, which does not fit in your local memory. In this case we dont want to use some ConcatenationTask or anything else that merges our data into one huge file.

In this usecase we can use a WrapperTask instead to have a trigger for our pipeline. WrapperTasks don’t have a run() method implemented and only use requires() and therefore have no output as well. The advantage of using WrapperTasks is to have a trigger for your dynamic dependencies like the month parameter above:

@luisy.inherits(SomeDataSelection)
@luisy.final
class MonthWrapper(luisy.WrapperTask):
month_start = luigi.MonthParameter(default=luigi.date_interval.Month(2021, 1))
month_stop = luigi.MonthParameter(default=luigi.date_interval.Month(2021, 10))

def month_iterator(self, date_start, date_stop):
        """
        Iterates through given months
        """
        for dt in rrule.rrule(rrule.MONTHLY, dtstart=date_start, until=date_stop):
            yield datetime(dt.year, dt.month, 1)

def requires(self):
    return [
        self.clone(
            header.SomeDataSelection,
            month=month
        ) for month in month_iterator(self.month_start, self.month_stop)
    ]

Dynamic Dependencies

Due to these dynamic dependencies in the tasks above we need to write the luisy.Task.requires() function by ourselves. To achieve this, we define two new arguments in our final task month_start and month_stop. Using these arguments we can now start executing our pipeline for different timespans.

Using the inherits decorator we clone all params (column_names and month) from their parent task SomeDataSelection. Usually we dont want the WrapperTask and ConcatenationTask to have dynamic parameters (like month) inherited because they are irrelevant in upstream tasks (Information is contained in month_start and month_stop). This can be done, by not using the clone() function and calling the Tasks constructor by itself. But this would require us to pass all other params manually into the constructor. In the following case we want to use clone() to inherit the parameter column_names. Hence in the case of a ConcatenationTask we tell our auto_filename() function to exclude parameters with the value None from the filename, which then creates the filename ‘MonthConcatenator_month-start=2021-01_month_stop=2021-10_column_names=A-B-C’.

Triggering different executions

Taking the code above, we are now able to run the pipeline with different parameters for different results. Having all observations from 2021-01 to 2021-10 of the columns ‘ProcessingDate’ and ‘ID’ in one file would be triggered by:

luisy [project_name].[module] MonthConcatenator --month-start='2021-01' --month-stop='2021-10'
--column-names='["ProcessingDate", "ID"]'

By changing the parameters after your first execution we can adjust our output:

luisy [project_name].[module] MonthConcatenator --month-start='2021-01' --month-stop='2021-05'
--column-names='["ProcessingDate", "ID", "ProcessResult"]'

This will also contain the ProcessResult in our final Dataframe and only consider data between january and may.

If we don’t want the result to be merged into one file we would use the WrapperTask we defined above:

luisy [project_name].[module] MonthWrapper --month-start='2021-01' --month-stop='2021-10'
--column-names='["ProcessingDate", "ID"]'