luisy package
Subpackages
Submodules
luisy.cli module
This module contains the entrypoint of luisy. Here we catch the CLI arguments of the user and
parse them inside a function. We also separate luigi params from luisy params here. After setting up
the luisy parameters in our luisy.config.Config
singleton, the logic of luisy is
called.
- luisy.cli.build(task, workers=1, local_scheduler=True, **args_luisy)[source]
Similar to
luigi.build()
, it executes a task and along with upstream requirements. There, it uses the hashing functionality to detect tasks that need a rerun.- Parameters:
- Returns:
Coordinator: The coordinator object of the luisy/luigi build
- Return type:
luisy.luigi_interface
- luisy.cli.luisy_run(argv=['doc', '-W'])[source]
luisy CLI Call
- Parameters:
argv – Should (conceptually) be
sys.argv[1:]
- luisy.cli.parse_args(argv)[source]
Method to get all luisy args out of the argv given inside the cli. Unknown arguments will be returned separately, to later forward them to luigi
- Parameters:
argv – Should (conceptually) be
sys.argv[1:]
- Returns:
- dict of parsed luisy arguments and rest of
argv in an unparsed format
- Return type:
luisy.code_inspection module
- class luisy.code_inspection.ImportReturner[source]
Bases:
NodeVisitor
Scans syntax tree for imports.
- class luisy.code_inspection.VariablesReturner[source]
Bases:
NodeVisitor
Scans syntax tree for variable names. While visiting all nodes of the ast syntax tree, save all names that are loaded or stored. These include FunctionDefs, ClassDefs, Function arguments or the Exception Handler context name.
- luisy.code_inspection.create_deps_map(requirements_dict)[source]
Map each requirement to its dependencies. Both keys and values are in unified format. That means to apply: .lower().replace(“-“,”_”)
- luisy.code_inspection.create_hashes(task_list, requirements_path=None)[source]
Produces a hash for each luisy Task in the ‘task_list’. The hash changes as soon as any of the code that is inside the ClassDef of a task (or any other code used by the task) changes.
- luisy.code_inspection.get_all_deps_with_versions(package)[source]
Get names of all dependencies of ‘package’ using ‘pipdeptree’ including their required version.
- luisy.code_inspection.get_import_data(module_tree)[source]
Get imports of a module. :param module_tree: Syntax tree of module :type module_tree: ast.AST
- Returns:
Data about imports to the module.
- Return type:
- luisy.code_inspection.get_irregular_pypi_names()[source]
- Get pypi name for given package if they differ.
unify everything to format lower().replace(“-“,”_”) is not the package name
- luisy.code_inspection.get_node(node_name, module_tree)[source]
Find a node with given name in the upper level of a module.
- luisy.code_inspection.get_requirements_dict(path)[source]
Open the requirements file and create a dictionary which maps each package to its corresponding line in the requirements.txt. This line contains all version info and will be used for the hash later.
- luisy.code_inspection.get_requirements_path(task_class)[source]
Tries to automatically find the path of requirements file.
- luisy.code_inspection.get_varnames_delta(node)[source]
Find all variable names that are used the node body, but defined outside the node body.
- luisy.code_inspection.is_builtin(func)[source]
This function is work in progress and will be improved with every upcoming error.
- luisy.code_inspection.is_standard(package_name)[source]
Determines whether a package is in the python standard lib.
- luisy.code_inspection.map_imports_to_requirements(imported_packages, requirements_dict, deps_map)[source]
Given a list of packages, look whether they are in the requirements of the repository. If it is not a direct requirement, look if it is a dependency of a requirements. :param imported_packages: list of package names :type imported_packages: iterable[str] :param requirements_dict: dict returned by … :type requirements_dict: dict :param deps_map: dict returned by :type deps_map: dict
- Returns:
Dictionary listing for each package, one or several lines of the requirements file that concern it.
- Return type:
- luisy.code_inspection.produce_node_hash(class_node)[source]
Produce hash for AST node, normally ClassDef or FunctionDef or Constant. invariant to: comments, spaces, blank lines. not invariant to: variable renaming
- luisy.code_inspection.walk_nodes(variable_name, module_name, parent='', parent_module='')[source]
Function that recursively visits AST nodes. These nodes are either ClassDefs, FunctionDefs or simply Name nodes. For each node
the sourcecode within the node body is hashed and added to a return list
the names of external packages used inside the node body is added to a return list
if there are variable names coming from outside the node body, but from the same package as the node, the function also visits these nodes
- Args:
module_name (str): module name of current node parent (str): name of parent node parent_module (str): name of module of parent node variable_name (str): name of AST node. Class name, Function name, or a module variable.
- Returns:
Triple of list of hashes, set of strings representing the imported packages, and a list of dicts holding the visualization data collected during traversal of the ast syntax tree. Each dict represents an AST node. Contains names of node and its parent, modules names of node and parent and node type. This info can be used to plot the dependency graph.
- Return type:
luisy.config module
This module contains all the management of luisy’s configuration. It holds a singleton, that can
be used anywhere in the project after it was initialized in luisy.__init__
. Also the
cli parameters are passes into this config file in py:func:luisy.cli.luisy_run().
This singleton allows us to access parameters like working_dir, download, … anywhere in our
pipelines. We don’t need to pass arguments through that pipeline anymore to get the information
into the leafs of our DAG.
- class luisy.config.Config(*args, **kwargs)[source]
Bases:
object
- check_params()[source]
Way to check if parameters are valid and luisy is ready to initiate the luigi run. Currently working_dir has to be set and also if the user wants to use the cloud, luisy only allows runs when azure_storage_key, azure_account_name, and azure_container_name are set.
- Raises:
ValueError – When parameters are wrong or missing
- property config
Get the whole config
- Returns:
Config with all parameter values set right now
- Return type:
- property download
- get_param(param)[source]
Get param from config.
- Parameters:
param (str) – Key to parameter
- Returns:
Value of parameter
- reset()[source]
Resets the config singleton to the initial state with default parameters and environment variable values.
- update(params)[source]
Takes dict and updates the config with all entries in that dict
- Parameters:
params (dict) – new params that should be set
- property upload
- property working_dir
- luisy.config.add_working_dir(path)[source]
Adds the working dir to a filepath.
Example
If the input is
path=/my_project/raw/some_file.pkl
and the working dir is/mnt/d/data
, then the output is/mnt/d/data/my_project/raw/some_file.pkl
.
- luisy.config.change_working_dir(path, dir_current, dir_new)[source]
Exchanges the working dir in
path
luisy.decorators module
This module holds various decorators to be attached to Task
objects.
- luisy.decorators.auto_filename(cls_py=None, excl_params=None, excl_none=False)[source]
“Factory” for a Decorator which automatically generates the basename of the outfile based on the name of the class and the
luigi.Parameter
objects attached to it. Parameters allow the user to exclude some parameters and also to exclude None values in general- Parameters:
cls_py (class) – class that gets decorated. This parameter is kind of a hack to allow usage of this decorator without parenthesis “()”. So the end-user is able to call @auto_filename (important for backwards compatibility), @auto_filename(excl_list=[‘x’, ‘y’]) and @auto_filename() as well with all having the class below decorated
excl_params (list) – List of task parameters that should be excluded
excl_none (bool) – If True, None values are not part of the filename
- Returns:
Decorated class
- Return type:
class
- luisy.decorators.azure_blob_storage_input(endpoint, directory, file_format='parquet', inferschema=False)[source]
- luisy.decorators.azure_blob_storage_output(endpoint, directory, file_format='parquet', inferschema=False)[source]
- luisy.decorators.csv_output(cls_py=None, **kwargs)
- luisy.decorators.feather_output(cls_py=None, **kwargs)
- luisy.decorators.final(cls_py=None)
- luisy.decorators.hdf_output(cls_py=None, **kwargs)
- luisy.decorators.interim(cls_py=None)
- luisy.decorators.json_output(cls_py=None, **kwargs)
- luisy.decorators.make_directory_output(read_file_func, regex_pattern=None)[source]
This function is used to give the user the ability to create his own directory outputs for external Tasks. Decorating a :py:class:ExternalTask with this function allows the user to use
luisy.ExternalTask.output().read()
in the task that requires the external task. The files will be passed one by one as a generator when calling task.input().read().- Parameters:
read_file_func – function read_file(file), where the user can define how his data is being loaded. return the filecontent in this function. To skip specific incoming files, make sure that your read_file_func function returns None.
regex_pattern (str) – regex string that can be passed. All files found in the directory will be checked against this regex and only be parsed if it matches.
- Returns:
Decorator that can be attached to
ExternalTask
- Return type:
func
- luisy.decorators.pickle_output(cls_py=None, **kwargs)
- luisy.decorators.project_name(project_name)[source]
Sets the project_name of the task to project_name to overwrite the default behavior of
luisy.Task
- Parameters:
project_name (str) – Name of the project
- luisy.decorators.raw(cls_py=None)
- class luisy.decorators.requires(*tasks_to_require, as_dict=False)[source]
Bases:
object
A backwards compatible extension to
luigi.util.requires
which allows to access the requirements in a dict like fashion.
- luisy.decorators.xlsx_output(cls_py=None, **kwargs)
luisy.default_params module
luisy.file_system module
- class luisy.file_system.AzureContainer(account_name='', key=None, container_name='')[source]
Bases:
object
Abstraction for an Azure container.
- Parameters:
- download(source, dest)[source]
Downloads the content of the remote directory to the local directory.
- Parameters:
- Raises:
ValueError – If local folder exists already
luisy.hashes module
- class luisy.hashes.HashMapping(hashes, local=True, project_name=None)[source]
Bases:
object
Maps generic file identifiers to hashes of the tasks. To work in different working-dirs, the file identifiers are the paths to the file with the working-dir removed.
- Parameters:
hashes (dict) – Dict where the keys are file identifiers and the values are hashes of the tasks creating the respective files.
local (bool) – Whether the files of the hashes are located locally or remotely (i.e., the cloud).
project_name (str) – Identifier of the project. Typically the string marked by the decorator
luisy.decorators.project_name()
- clean_up()[source]
Checks if for all existing hashes the outfiles are there. If not, the hashes are removed.
- static compute_from_tasks(tasks, project_name)[source]
Computes the mapping from a dict of tasks.
- Parameters:
- Returns:
The mapping of the hashes
- Return type:
- exists(filepath)[source]
Tests whether the output behind filepath exists, either locally or in the cloud.
- classmethod load_from_cloud(project_name)[source]
Loads the mapping from the cloud version of
.luisy.hash
in the cloud-storage of the project.- Parameters:
project_name (str) – Name of the project.
- Returns:
The mapping of the hashes
- Return type:
- classmethod load_from_local(project_name)[source]
Loads the mapping from the local
.luisy.hash
file available in the working-dir of the project.- Parameters:
project_name (str) – Name of the project.
- Returns:
The mapping of the hashes
- Return type:
- class luisy.hashes.HashSynchronizer(tasks, root_task)[source]
Bases:
object
Class to keep hashmappings of a luisy run synched. Calling the HashSynchronizer.initialize() loads the following Hashmappings, that will be managed in this class: - current run of luisy - local .luisy.hash - cloud .luisy.hash
This class manages the synchronization between those hashmappings when luisy did some updates on them. It also offers some functions that gives the user information on which tasks to download/upload/rerun/…
- Parameters:
tasks (dict) – values are Tasks of current luisy run, keys are corresponding filenames.
root_task (luisy.Task) – Root task of the current luisy run (needed to get project name)
- clear_outdated_tasks()[source]
Clears the output of all tasks that are outdated locally
Note
This can also be implemented for cloud clearing here. Currently the rerun tasks in the cloud are just overwritten when saved online
- get_all_hash_entries_that_need_a_run()[source]
Outdated or non existing local tasks
- Returns:
Hash entries of tasks that will be refreshed in this luisy run
- Return type:
- get_download_hash_entries()[source]
Non existent local tasks that are available with their newest versions in the cloud
- Returns:
list: hashmappings that will be downloaded
- get_downloadable_hash_entries()[source]
Identifies which of the files available in the cloud have the hash needed for the local execution.
- Returns:
List of hash entries that can be downloaded from the cloud having the latest hash
- Return type:
- get_local_execution_hash_entries()[source]
Tasks that need to be executed locally because they cannot be downloaded
- Returns:
hashmappings with tasks that need to be executed locally
- Return type:
- get_new_cloud_hash_entries()[source]
Non existent tasks in the cloud
- Returns:
hash entries that do not exist in cloud yet
- Return type:
- get_new_local_hash_entries()[source]
Tasks that are non existent on local machine
- Returns:
hash entries that are not present in local luisy.hash
- Return type:
- get_outdated_cloud_hash_entries()[source]
Outdated tasks in cloud
- Returns:
hash entries of outdated tasks in cloud
- Return type:
- get_outdated_local_hash_entries()[source]
Out dated tasks without external tasks as external tasks should not be reran at all
- Returns:
hash entries of tasks that are outdated (no external tasks included)
- Return type:
- get_upload_hash_entries()[source]
All outdated or non existing tasks in cloud
- Returns:
hash entries to upload
- Return type:
- initialize()[source]
Initializes all hashmappings needed in this class: - hash_mapping_new: computed by current pipeline - hash_mapping_local: loaded from local luisy.hash file - hash_mapping_cloud: loaded from luisy.hash file in cloud
Also sets up list of hash_entries, which are used to compare hashmappings.
- set_files_to_download()[source]
Sets files that need to be downloaded when luigi is executed. Files that need to be downloaded will be set inside the Config() singleton and checked in the :py:meth`luisy.Target.exists()` method.
Note
Can this be done dynamically during luigi run?
- synchronize_cloud_hashes(failed_tasks, upload_tasks=True)[source]
This function updates all online/cloud hashes to the hashes from the current run. Failed tasks can be given as an argument to prevent luisy from setting hashes that have not been uploaded. In addition, if upload_tasks is True, this method uploads all the tasks that have outdated hash entries as well.
- Parameters:
failed_tasks (list) – Failed tasks in luigi run
- class luisy.hashes.TaskHashEntry(hash_new, hash_local=None, hash_cloud=None, task=None, filename=None)[source]
Bases:
object
This class manages all available hash entries in a luisy run. Each task of luisy has its own hash depending on the code version. So in one luisy run, a task can have multiple hashes: - hash in local luisy.hash file - hash in cloud luisy_hash file (cloud) - hash that was computed in this current run
Most of the methods of this class do comparisons between those hashes to give luisy an idea of what needs to be executed, downloaded, uploaded and which hashes need to be refreshed in .luisy.hash file online or locally.
- Parameters:
hash_new (str) – hash of current luisy run
hash_local (str or None) – hash of local ‘luisy.hash’ file (if available)
hash_cloud (str or None) – hash of cloud ‘luisy.hash’ file (if available)
task (luisy.Task) – instance of task which all the hashes belong to
filename (str) – output filename of task (key inside luisy.hash file)
Note
Can this be moved inside a Task instance?
- luisy.hashes.compute_hashes(tasks, requirements_path=None)[source]
Computes the hashes for all tasks
- Parameters:
- Returns:
Dict where the keys are the outfiles of the tasks handed in and the values are the hashes.
- Return type:
Todo
Can this be moved to TaskHashEntry?
- luisy.hashes.get_upstream_tasks(task)[source]
Computes all upstream tasks of the given tasks by enumerating the computational DAG.
- Parameters:
task (luisy.base.Task) – A luisy task object
- Returns:
Dict where the values are the upstream tasks and the keys are the respective output files.
- Return type:
luisy.helpers module
- class luisy.helpers.RegexParam(name, default, regex_placeholder='[\\S]*')[source]
Bases:
object
Class to hold information about a Parameter which RegexTaskPattern class uses to create matching regex queries.
- Parameters:
- class luisy.helpers.RegexTaskPattern(task_cls, regex_placeholder='[\\S]*')[source]
Bases:
object
This class can be used to find instances of a task on the disk. It uses regex to search for in the user’s filesystem. Each param is then replaced with a placeholder to build the regex from it.
- Parameters:
task_cls (Type[luisy.Task]) – a class object of type luisy.Task. The given task should use the
luisy.decorator.auto_filename()
to work properly.regex_placeholder (str) – This is the regex which is inserted into the filename to create a matching regex, which then searches the root directory.
Note
Only works when using @auto_filename
- build_re_splitters()[source]
The splitters are needed to extract the parameters of a file that matches the regex.
- Returns:
- keys are parameter names, values is a list of splits of the filesname. This
allows us to separate the value from the filename
- Return type:
- build_regex()[source]
Build a regex string from the filepath. This replaces the parameter values of the task with a regex pattern that matches these values.
- Returns:
Regex pattern that can be used to search for files of that type
- Return type:
- build_root_directory()[source]
Finds “highest” possible path that can be used as a root directory.
- Returns:
Root directory where the regex search is applied on the files inside it.
- Return type:
- get_available_files()[source]
Scans all files in the classes’ root_directory and then filters on the regex_pattern of this class.
- Returns:
List of files which match the regexpattern and are in the root directory
- Return type:
- get_available_params()[source]
Searches for Files in the rootdirectory matching to the regex and extracts the parameters out of the filesnames using the regex_splits.
- Returns:
- keys are file_paths of the files found, values are a dict with param_name as
key and param_value as value.
- Return type:
- set_filename_pattern()[source]
This creates a filename pattern with parameter values filled by
RegexParam.placeholder()
- luisy.helpers.add_filter_info(filter_func)[source]
this decorator for class methods adds information about the change of a pd.DataFrame’s shape before (and after if function returns a pd.DataFrame) filtering the df to the logger. This provides additional information to the user which function filtered how many rows / cols. This can be easily applied to every task method like this:
@add_filter_info def filter_func(df): return df.drop(["my_most_disliked_column"], axis=1)
- Parameters:
filter_func (cls.method) – method that takes pd.DataFrame as first argument and filters it by some operations
- Returns:
wrapper around filter_func, that adds additional information about filtered pd.DataFrame.
- luisy.helpers.get_df_from_parquet_dir(path)[source]
Reads all Parquet files from a given directory and parses them into a pandas dataframe
- Parameters:
path (str) – path where parquet files are stored
- Returns:
dataframe with all the data from parquet files
- Return type:
pd.DataFrame
- luisy.helpers.get_start_date(week)[source]
Extracts the first day of the week.
- Parameters:
week (luigi.date_interval.Week or string) – Week that is compatible with luigi or string of the form
2021-W12
- Returns:
The first day of the week
- Return type:
- luisy.helpers.get_stop_date(week)[source]
Extracts the last day of the week.
- Parameters:
week (luigi.date_interval.Week or string) – Week that is compatible with luigi or string of the form
2021-W12
- Returns:
The last day of the week
- Return type:
- luisy.helpers.remove_parent_directories(lst_with_substring_elements)[source]
Removes elements from list that are substrings from other items in that list. E.g. [‘/mnt/d’, ‘/mnt/d/a’, ‘/mnt/d/b’] removes ‘/mnt/d’ from the list. :param lst_with_substring_elements: removes substrings from that list :type lst_with_substring_elements: list
- Returns:
Clean list without substrings of other elements, sorted by character length
- Return type:
List
luisy.luigi_interface module
This module contains all touch points with the package :py:mod`luigi`. We had to adapt the method run_luigi, because we need to handle the system exit by ourselves.
The current logic of luisy looks looks like this:
Do some luisy stuff like hashing every Task in the new DAG built up by given root task
Run luigi
Do some luisy stuff, e.g. save hashes.
- class luisy.luigi_interface.COLORS[source]
Bases:
object
- BLUE = '\x1b[94m'
- END = '\x1b[0m'
- GREEN = '\x1b[92m'
- RED = '\x1b[91m'
- class luisy.luigi_interface.CmdPrinter(hash_synchronizer)[source]
Bases:
object
- class luisy.luigi_interface.Coordinator(args_luisy, root_task=None)[source]
Bases:
object
Parent class to handle different luisy modes. The basic coordinator does hash computation of a given root task and reads hashes that are found on the local filesystem in .luisy.hash file.
- Parameters:
args_luisy (dict) – Arguments for luisy
root_task (luisy.tasks.base.Task) – Root task to be executed
- class luisy.luigi_interface.HashUpdateMode(args_luisy, root_task=None)[source]
Bases:
Coordinator
The HashUpdateMode is started when the user passes –hash-update to luisy. Instead of rerunning all the tasks, whose hash changed in this run, this mode only updates the hashes ( which already exist) in the .luisy.hash file and does not run the tasks again. This feature can be useful when refactoring your luisy Pipeline. It’s also possible to update the hashes in the cloud when –upload is passed to luisy.
- class luisy.luigi_interface.PipelineMode(args_luisy, args_luigi, root_task=None, run_luigi_cli=True)[source]
Bases:
Coordinator
This class manages the communication between luisy and luigi functionalities.
- Parameters:
args_luisy (dict) – Arguments for luisy
args_luigi (list) – Parsed command line arguments for luigi
root_task (luisy.tasks.base.Task) – Root task to be executed
run_luigi_cli (bool) – If true, luigi is called via command line, otherwise
luisy.testing.debug_run()
is called.
- luisy.luigi_interface.run_luigi(argv)[source]
Run luigi with command line parsing. This function is taken from
luigi.retcodes.run_with_retcodes()
. We need to overwrite this,because luigi calls sys.exit(). luisy should keep on running to do stuff after luigi completed.- Parameters:
argv – Should (conceptually) be
sys.argv[1:]
luisy.targets module
- class luisy.targets.AzureBlobStorageTarget(outdir=None, endpoint=None, directory=None, inferschema=False, file_format='parquet')[source]
Bases:
SparkTarget
- property blob_uri
- file_ending = ''
- property path
- class luisy.targets.CSVTarget(path, **kwargs)[source]
Bases:
LocalTarget
- file_ending = 'csv'
- class luisy.targets.CloudTarget(path, **kwargs)[source]
Bases:
LuisyTarget
- class luisy.targets.DeltaTableTarget(outdir=None, schema='schema', catalog='catalog', table_name=None)[source]
Bases:
SparkTarget
- exists()[source]
Checks whether the Deltatable exists.
Note
Ideally, we would call self.spark.catalog.tableExists(self.table_uri) to check whether the table exists, but this always returns False.
- file_ending = 'DeltaTable'
- property path
- remove()[source]
Remove the resource at the path specified by this FileSystemTarget.
This method is implemented by using
fs
.
- property table_uri
- class luisy.targets.DirectoryTarget(path, **kwargs)[source]
Bases:
LocalTarget
Target to read directories. The read_file method has to be filled by the user and passed to target by decorating it.
- file_ending = ''
- regex_pattern = None
- class luisy.targets.FeatherTarget(path, **kwargs)[source]
Bases:
LocalTarget
- file_ending = 'feather'
- class luisy.targets.HDFTarget(path, **kwargs)[source]
Bases:
LocalTarget
- file_ending = 'hdf'
- class luisy.targets.JSONTarget(path, **kwargs)[source]
Bases:
LocalTarget
- file_ending = 'json'
- requires_pandas = False
- class luisy.targets.LocalTarget(path, **kwargs)[source]
Bases:
LuisyTarget
- exists()[source]
Checks if the file exists. If
download
is set toTrue
, then it is checked whether the file is available in the cloud if it is not available locally. If it is available, it is downloaded.
- file_ending = None
- is_folder()[source]
Checks if the output is a folder.
Note
This may has to be improved in future as we (silently) assume that any file has file extension.
- remove()[source]
Remove the resource at the path specified by this FileSystemTarget.
This method is implemented by using
fs
.
- requires_pandas = True
- class luisy.targets.LuisyTarget(path=None, format=None, is_tmp=False)[source]
Bases:
LocalTarget
- Parameters:
- exists()[source]
Returns
True
if the path for this FileSystemTarget exists;False
otherwise.This method is implemented by using
fs
.
- property fs
Wrapper for access to file system operations.
Work in progress - add things as needed.
- remove()[source]
Remove the resource at the path specified by this FileSystemTarget.
This method is implemented by using
fs
.
- requires_pandas = False
- class luisy.targets.ParquetDirTarget(path, **kwargs)[source]
Bases:
LocalTarget
- file_ending = ''
- requires_pandas = False
- class luisy.targets.PickleTarget(path, **kwargs)[source]
Bases:
LocalTarget
- file_ending = 'pkl'
- class luisy.targets.SparkTarget(path, **kwargs)[source]
Bases:
CloudTarget
this abstract class is for targets working with spark instances.
- property spark
luisy.testing module
- class luisy.testing.LuisyTestCase(methodName='runTest')[source]
Bases:
TestCase
- run_pipeline(task, existing_outputs=None, return_store=False)[source]
Method that can be used to run luigi pipelines. You are able to pass existing outputs which mock a specific task’s output. This helps the user in testing without mocking his task output or creating a temporary directory.
- Parameters:
task (luigi.Task) – Task that should be run
existing_outputs (list) – List of Tuples, where first element of tuple is the Taskobject whose output should be mocked and the second element is the content of the mocked task
return_store (bool) – If true, the whole store gets returned as a dict. The store contains all the outputs after the execution of the pipeline, where the tasks are the keys and the values are their outputs. If false, only the output of the given task is returned.
- Returns:
- depending on return_store value, a dict with output of all tasks or
just the output of the root_task
- Return type:
- luisy.testing.create_testing_config(working_dir, storage_key='', container_name='', account_name='', mock_spark=True, **kwargs)[source]
Creates a Config Object for testing purpose. Should be used when working with temporary directories.
- Parameters:
working_dir (str) – working dir for testing luisy tasks (mostly tmpdirs)
storage_key (str) – storage_key that can be set if no environ variable can be used in tests.
account_name (str) – Account name that can be set if no environ variable can be used in tests.
container_name (str) – Container name that can be set if no environ variable can be used in tests.
**kwargs – params like download or upload
- luisy.testing.debug_run(tasks, return_summary=True)[source]
This is a helper Method which allows to run pipelines in tests and bypassing cli. This method returns some useful information from the build as well in form of a dict.
- Parameters:
tasks (list or luigi.Task) – List of Tasks that should be run or a single
luigi.Task
- Returns:
- Dictionary with information about the executed tasks. I.e. dict[‘completed’]
contains the tasks that have been completed in this run or dict[‘already_done’] contains tasks that have been completed before running luigi
- Return type:
luisy.visualize module
- luisy.visualize.add_children(task, graph, namefunc=<function create_label>, depth=1, depth_limit=10, unique_children=False)[source]
Recursive Function that adds the children of task to a graph object. Make sure that ‘namefunc’ returns unique strings for all tasks that will be included in the tree :param task: The task :type task: luisy.tasks.base.Task :param graph: The directed ayclic graph :type graph: networkx.DiGraph :param namefunc: Function that creates a string out a task. :type namefunc: callable :param depth: Depth of current node :type depth: int :param depth_limit: Max allowed depth of tree :type depth_limit: int :param unique_children: If true, add only subset of unique types of children for each task :type unique_children: bool
- luisy.visualize.create_label(task, excl_list=(), with_id=False)[source]
Make a string label out of a luigi task. Make sure the string is unique ! :param task: The task the label should be created for :type task: luisy.tasks.base.Task :param excl_list: List of parameter kwargs that are not included in the label :type excl_list: list :param with_id: If False, do not include the hash of the task. :type with_id: bool
- Returns:
Label to be used for the node
- Return type:
- luisy.visualize.make_dependency_graph(task, excl_list=(), namefunc=None, unique_children=True, depth_limit=10)[source]
- Create pydot graph out of a tasks dependencies.
- Args:
task (luisy.tasks.base.Task): task excl_list (list): Parameter forwarded to ‘create_label’ if namefunc is none namefunc (callable): Function that creates a string out a task. depth_limit (int): Max allowed depth of tree unique_children (bool): If true, add only subset of unique types of children
- luisy.visualize.unique_types(tasks)[source]
Given a list of objects, return a subset that has each type only once.
- luisy.visualize.visualize_task(task, ax=None, unique_children=True, parameters_to_exclude=())[source]
Visualizes the dependencies of a
luisy.Task
.- Parameters:
task (luisy.tasks.base.Task) – The task whose dependencies should be visualized.
ax (matplotlib.axis.Axis) – A matplotlib axis to draw in
unique_children (bool) – If true, add only subset of unique types of children
parameters_to_exclude (list) – List of names of py:mod:luigi-parameters to exclude in visualization.