WrapperTask and ConcatenationTask --------------------------------- Triggering multiple Task instances ################################## Luigi and luisy are using specific parameters to build unique pipelines. The same pipeline can be executed multiple times with different parameters, having different output. Different than usual, parameters of tasks are declared in the class scope. This upcoming pipeline demonstrates the usage of parameters in luisy. Other than luigi, luisy only uses the decorators :py:class:`~luigi.util.inherits` and :py:func:`~luisy.decorators.requires` to pass parameters between tasks. This keeps the code clean and prevents a lot of *boilerplate-code*. As always we start with an :py:class:`luisy.tasks.base.ExternalTask` to begin with. This time we assume we've exported some `.parquet` files to our system in our `/raw` directory. The files are stored in separate folders, where each folder contains data of one month of data in parquet format. .. raw:: html
   /
   └──
      └── raw
         ├── Partition=2021-01
             ├── part-0001.parquet
             ├── part-0002.parquet
             └── part-0003.parquet
         ├── Partition=2021-02
             └── ...
         ├── ...
         ├── Partition=2021-09
             └── ...
         └── Partition=2021-10
             └── ...
   
This directory structure can be solved with the following :py:class:`~luisy.tasks.base.ExternalTask`. Note that we use the :py:func:`~luisy.decorators.parquetdir_output` decorator to tell luisy we want to parse multiple `.parquet` files. The only code we need to write is the dynamic folder names `Partition=` which are taking the `month` Parameter that is passed to the task. luisy will then check on execution, whether the directory of the executed month exists: .. code-block:: python import luisy @luisy.raw @luisy.parquetdir_output class ExportDirectory(luisy.ExternalTask): """ Input directory with (multiple) parquet files """ month = luisy.MonthParameter() def get_folder_name(self): return f"Partition={format(get_month_str(self.month))}" With the ExternalTask setup, we can now use a :py:class:`~luisy.tasks.base.Task` to read these parquet files and store them as a DataFrame into a pickle for further processing. We will still be working on monthly basis that we can make use of the parallelization of :py:mod:`luigi`. This can be useful when you are going to do some computations on the data like throwing out NaN values for example. The parameter `month` does not need to be defined again in this task, because it's derived from `ExportDirectory` by using the decorator :py:func:`~luisy.decorators.requires`. The data we've read is now saved in `'//interim'` directory, because this is basically no raw data anymore. The filename of our Task is automatically generated by :py:func:`~luisy.decorators.auto_filename`. This decorator is used by default for every :py:class:`luisy.tasks.base.Task`, resulting in the filename `'MergeExport_month= .'`. So if this task is called multiple times with different values for the parameter `month`, we are getting dynamic filenames. The `output_type` is defined by an output decorators. In our case the file will be saved as a pickle file (`'.pkl'`). See more about output decorators at :doc:`Decorators <../decorators>`. .. code-block:: python @luisy.interim @luisy.requires(ExportDirectory) class MergeExport(luisy.Task): """ Merges all input parquet files from same month into one pickle File """ def run(self): logger.info(f'Start merge of month \"{format(get_month_str(self.month))}\"') df = self.input().read() logger.info(f'Export to \"{self.output().path}\"') self.write(df) With the implementation of this task, we can now do any stuff we want on the data by just requiring the task above. The upcoming task for example removes NaN values and selects some columns in the dataframe. The columns are defined as a ListParameter and can therefore as a luisy Parameter. .. code-block:: python @luisy.requires(MergeExport) @luisy.interim class SomeDataSelection(luisy.Task): column_names = luisy.ListParameter(default=None) def run(self): df = self.input.read() df = df.dropna() if column_names is not None: df = df[column_names] self.write(df) Concatenation Task ################## To finish up the pipeline above we finally want to merge all data into one file. We do this by applying a :py:class:`~luisy.tasks.base.ConcatenationTask`, which requires `SomeDataSelection` tasks for multiple months. A :py:class:`~luisy.tasks.base.ConcatenationTask` needs its required tasks to have a dataframe saved in their output, because the data is merged with :py:func:`~pandas.concat`. .. code-block:: python from dateutil import rrule @luisy.inherits(SomeDataSelection) @luisy.final @luisy.auto_filename(excl_none=True) class MonthConcatenator(luisy.ConcatenationTask): month_start = luigi.MonthParameter(default=luigi.date_interval.Month(2021, 1)) month_stop = luigi.MonthParameter(default=luigi.date_interval.Month(2021, 10)) def month_iterator(self, date_start, date_stop): """ Iterates through given months """ for dt in rrule.rrule(rrule.MONTHLY, dtstart=date_start, until=date_stop): yield datetime(dt.year, dt.month, 1) def requires(self): return [ self.clone( header.SomeDataSelection, month=month ) for month in month_iterator(self.month_start, self.month_stop) ] WrapperTask ########### As an alternative to the ConcatenationTask we have a possibility to keep the data separately without concatenating it. This often makes sense when you are working with a huge amount of data, which does not fit in your local memory. In this case we dont want to use some ConcatenationTask or anything else that merges our data into one huge file. In this usecase we can use a WrapperTask instead to have a trigger for our pipeline. WrapperTasks don't have a :py:meth:`~luisy.tasks.base.Task.run()` method implemented and only use :py:meth:`~luisy.tasks.base.Task.requires()` and therefore have no output as well. The advantage of using WrapperTasks is to have a trigger for your dynamic dependencies like the month parameter above: .. code-block:: python @luisy.inherits(SomeDataSelection) @luisy.final class MonthWrapper(luisy.WrapperTask): month_start = luigi.MonthParameter(default=luigi.date_interval.Month(2021, 1)) month_stop = luigi.MonthParameter(default=luigi.date_interval.Month(2021, 10)) def month_iterator(self, date_start, date_stop): """ Iterates through given months """ for dt in rrule.rrule(rrule.MONTHLY, dtstart=date_start, until=date_stop): yield datetime(dt.year, dt.month, 1) def requires(self): return [ self.clone( header.SomeDataSelection, month=month ) for month in month_iterator(self.month_start, self.month_stop) ] Dynamic Dependencies #################### Due to these dynamic dependencies in the tasks above we need to write the :py:meth:`luisy.Task.requires` function by ourselves. To achieve this, we define two new arguments in our final task :py:attr:`month_start` and :py:attr:`month_stop`. Using these arguments we can now start executing our pipeline for different timespans. Using the :py:class:`~luisy.decorators.inherits` decorator we clone all params (:py:attr:`column_names` and :py:attr:`month`) from their parent task :py:class:`SomeDataSelection`. Usually we dont want the :py:class:`~luisy.tasks.base.WrapperTask` and :py:class:`~luisy.tasks.base.ConcatenationTask` to have dynamic parameters (like :py:attr:`month`) inherited because they are irrelevant in upstream tasks (Information is contained in :py:attr:`month_start` and :py:attr:`month_stop`). This can be done, by not using the :py:meth:`~luisy.tasks.base.Task.clone` function and calling the Tasks constructor by itself. But this would require us to pass all other params manually into the constructor. In the following case we want to use :py:meth:`~maro.tasks.base.Task.clone` to inherit the parameter :py:attr:`column_names`. Hence in the case of a :py:class:`~luisy .ConcatenationTask` we tell our :py:func:`~luisy.decorators.auto_filename` function to exclude parameters with the value `None` from the filename, which then creates the filename `'MonthConcatenator_month-start=2021-01_month_stop=2021-10_column_names=A-B-C'`. Triggering different executions ############################### Taking the code above, we are now able to run the pipeline with different parameters for different results. Having all observations from 2021-01 to 2021-10 of the columns 'ProcessingDate' and 'ID' in one file would be triggered by: .. code:: bash luisy [project_name].[module] MonthConcatenator --month-start='2021-01' --month-stop='2021-10' --column-names='["ProcessingDate", "ID"]' By changing the parameters after your first execution we can adjust our output: .. code:: bash luisy [project_name].[module] MonthConcatenator --month-start='2021-01' --month-stop='2021-05' --column-names='["ProcessingDate", "ID", "ProcessResult"]' This will also contain the ProcessResult in our final Dataframe and only consider data between january and may. If we don't want the result to be merged into one file we would use the WrapperTask we defined above: .. code:: bash luisy [project_name].[module] MonthWrapper --month-start='2021-01' --month-stop='2021-10' --column-names='["ProcessingDate", "ID"]'