Testing

The module luisy.testing provides helpers to test pipelines end-to-end for test scenarios.

Run a pipeline

This is how a test case may look like:

from luisy.testing import LuisyTestCase

from my_project.tasks import (
    MyFinalTask,
    MyRawTask,
    MyOtherRawTask,
)


class TestMyPipeline(LuisyTestCase):

   def test_success(self):
       self.assertSuccess(
           task=MyFinalTask(a=1, b=2),
           existing_outputs=[
               (MyRawTask(a=1), {'some_data': 1}),
               (MyRawTask(a=2), {'some_data': 2}),
               (MyOtherRaw(a=2), df_test),
           ]
        )

   def test_fail(self):
       self.assertFail(
           task=MyFinalTask(a=1, b=2),
           existing_outputs=[
               (MyRawTask(a=1), {'some_data': 1}),
           ]
        )

   def test_missing(self):
       self.assertMissing(
           task=MyFinalTask(a=1, b=2),
           existing_outputs=[
               (MyRawTask(a=1), {'some_data': 1}),
           ]
        )

   def test_output(self):
       task_output = self.run_pipeline(
           task=MyFinalTask(a=1, b=2),
           existing_outputs=[
               (MyRawTask(a=1), {'some_data': 1}),
               (MyRawTask(a=2), {'some_data': 2}),
               (MyOtherRaw(a=2), df_test),
           ]
        )

        # Performe some asserts on task_output

Using luisy.testing.LuisyTestCase.run_pipeline(), the user can specify a task she would like to run and can provide outputs of some tasks that may be needed during execution. Here, the user can provide the output objects of the tasks as python objects using existing_outputs, which is a list of tuples containing the task and the regarding output.

Test the execution summary

The most prominent test examples involve incorrect runs. For those, we would like to know which tasks fail. For this the helper luisy.testing.LuisyTestCase.get_execution_summary() can be used to get the summary of the run:

class TestMyPipeline(LuisyTestCase):

   def test_summary(self):
       summary = self.get_execution_summary(
           task=MyFinalTask(a=1, b=2),
           existing_outputs=[
               (MyRawTask(a=1), {'some_data': 1}),
           ]
        )

     self.assertEquals(
         summary['upstream_missing_dependency'],
         {MyFinalTask(a=1, b=2)}
     )

The returned summary is a dict holding the status of the tasks to be runned which then can be asserted by the user.

Example

Consider the following pipeline

import pandas as pd
import luisy

@luisy.raw
@luisy.csv_output(sep=';')
class RawTask(luisy.ExternalTask):
    a = luigi.IntParameter(default=2)

    def get_file_name(self):
        return f"some_export_{self.a}"

@luisy.interim
@luisy.requires(RawTask)
class InterimTask(luisy.Task):
    a = luigi.IntParameter(default=2)

    def run(self):
        df = self.input().read()
        df['C'] = (df*self.a).sum(axis=1)

        self.write(df)

@luisy.final
@luisy.requires(InterimTask)
class FinalTask(luisy.Task):

    def run(self):
        df = self.input().read()
        df = df.transpose()
        self.write(df)

A testcase for FinalTask which just does a transpose may look like this:

class TestFinalTask(LuisyTestCase):

    def test_run(self):

        df_test = pd.DataFrame(data={'A': [1, 2], 'B': [3, 4]})
        existing_outputs = [
            (InterimTask(a=1), df_test)
        ]

        df = self.run_pipeline(FinalTask(a=1), existing_outputs=existing_outputs)
        pd.testing.assert_frame_equal(
            df,
            df_test.transpose()
        )

Testing the pipeline from RawTask to FinalTask, the user only has to give a valid output for the RawTask:

class TestFinalTask(LuisyTestCase):

   def test_run(self):
       df_test = pd.DataFrame(data={'A': [1, 2], 'B': [3, 4]})

       df = self.run_pipeline(
           task=FinalTask(a=1),
           existing_outputs=[
               (RawTask(a=1), df_test)
           ]
       )
       pd.testing.assert_frame_equal(
           df,
           pd.DataFrame(
               data={0: [1, 3, 4], 1: [2, 4, 6]},
               index=['A', 'B', 'C']
           )
       )