luisy package

Subpackages

Submodules

luisy.cli module

This module contains the entrypoint of luisy. Here we catch the CLI arguments of the user and parse them inside a function. We also separate luigi params from luisy params here. After setting up the luisy parameters in our luisy.config.Config singleton, the logic of luisy is called.

luisy.cli.build(task, workers=1, local_scheduler=True, **args_luisy)[source]

Similar to luigi.build(), it executes a task and along with upstream requirements. There, it uses the hashing functionality to detect tasks that need a rerun.

Parameters:
  • task (luisy.base.Task) – The luisy task to be executed (along with its upstream requirements).

  • workers (int) – Number of (luigi) workers

  • local_scheduler (bool) – Whether to use the local scheduler of luigi

Returns:

Coordinator: The coordinator object of the luisy/luigi build

Return type:

luisy.luigi_interface

luisy.cli.disable_azure_logging()[source]
luisy.cli.luisy_run(argv=['doc', '-W'])[source]

luisy CLI Call

Parameters:

argv – Should (conceptually) be sys.argv[1:]

luisy.cli.parse_args(argv)[source]

Method to get all luisy args out of the argv given inside the cli. Unknown arguments will be returned separately, to later forward them to luigi

Parameters:

argv – Should (conceptually) be sys.argv[1:]

Returns:

dict of parsed luisy arguments and rest of

argv in an unparsed format

Return type:

tuple

luisy.code_inspection module

class luisy.code_inspection.ImportReturner[source]

Bases: NodeVisitor

Scans syntax tree for imports.

filter_imports(node)[source]
generic_visit(node)[source]

Called if no explicit visitor function exists for a node.

star_import_in_imports(lst)[source]
exception luisy.code_inspection.RequirementFileNotFound[source]

Bases: Exception

class luisy.code_inspection.VariablesReturner[source]

Bases: NodeVisitor

Scans syntax tree for variable names. While visiting all nodes of the ast syntax tree, save all names that are loaded or stored. These include FunctionDefs, ClassDefs, Function arguments or the Exception Handler context name.

filter_variables(node)[source]
generic_visit(node)[source]

Called if no explicit visitor function exists for a node.

luisy.code_inspection.create_deps_map(requirements_dict)[source]

Map each requirement to its dependencies. Both keys and values are in unified format. That means to apply: .lower().replace(“-“,”_”)

luisy.code_inspection.create_hashes(task_list, requirements_path=None)[source]

Produces a hash for each luisy Task in the ‘task_list’. The hash changes as soon as any of the code that is inside the ClassDef of a task (or any other code used by the task) changes.

Parameters:
  • task_list (list[luisy.Task]) – List of task classes

  • requirements_path (str) – location if requirements.txt, if None, try to get it automatically

Returns:

The hash values for the tasks in hexadecimal.

Return type:

list[str]

luisy.code_inspection.get_all_deps(package)[source]
luisy.code_inspection.get_all_deps_with_versions(package)[source]

Get names of all dependencies of ‘package’ using ‘pipdeptree’ including their required version.

Parameters:

package (str) – package for which we want deps.

Returns:

Dict where the keys are the package names and the values are the version constraints of all dependencies.

Return type:

dict

luisy.code_inspection.get_import_data(module_tree)[source]

Get imports of a module. :param module_tree: Syntax tree of module :type module_tree: ast.AST

Returns:

Data about imports to the module.

Return type:

pandas.DataFrame

luisy.code_inspection.get_irregular_pypi_names()[source]
Get pypi name for given package if they differ.

unify everything to format lower().replace(“-“,”_”) is not the package name

luisy.code_inspection.get_node(node_name, module_tree)[source]

Find a node with given name in the upper level of a module.

luisy.code_inspection.get_requirements_dict(path)[source]

Open the requirements file and create a dictionary which maps each package to its corresponding line in the requirements.txt. This line contains all version info and will be used for the hash later.

luisy.code_inspection.get_requirements_path(task_class)[source]

Tries to automatically find the path of requirements file.

luisy.code_inspection.get_varnames_delta(node)[source]

Find all variable names that are used the node body, but defined outside the node body.

luisy.code_inspection.is_builtin(func)[source]

This function is work in progress and will be improved with every upcoming error.

luisy.code_inspection.is_standard(package_name)[source]

Determines whether a package is in the python standard lib.

luisy.code_inspection.make_pypi_mapper()[source]
luisy.code_inspection.map_imports_to_requirements(imported_packages, requirements_dict, deps_map)[source]

Given a list of packages, look whether they are in the requirements of the repository. If it is not a direct requirement, look if it is a dependency of a requirements. :param imported_packages: list of package names :type imported_packages: iterable[str] :param requirements_dict: dict returned by … :type requirements_dict: dict :param deps_map: dict returned by :type deps_map: dict

Returns:

Dictionary listing for each package, one or several lines of the requirements file that concern it.

Return type:

pandas.DataFrame

luisy.code_inspection.produce_node_hash(class_node)[source]

Produce hash for AST node, normally ClassDef or FunctionDef or Constant. invariant to: comments, spaces, blank lines. not invariant to: variable renaming

luisy.code_inspection.walk_nodes(variable_name, module_name, parent='', parent_module='')[source]

Function that recursively visits AST nodes. These nodes are either ClassDefs, FunctionDefs or simply Name nodes. For each node

  • the sourcecode within the node body is hashed and added to a return list

  • the names of external packages used inside the node body is added to a return list

  • if there are variable names coming from outside the node body, but from the same package as the node, the function also visits these nodes

Args:

module_name (str): module name of current node parent (str): name of parent node parent_module (str): name of module of parent node variable_name (str): name of AST node. Class name, Function name, or a module variable.

Returns:

Triple of list of hashes, set of strings representing the imported packages, and a list of dicts holding the visualization data collected during traversal of the ast syntax tree. Each dict represents an AST node. Contains names of node and its parent, modules names of node and parent and node type. This info can be used to plot the dependency graph.

Return type:

tuple

luisy.config module

This module contains all the management of luisy’s configuration. It holds a singleton, that can be used anywhere in the project after it was initialized in luisy.__init__. Also the cli parameters are passes into this config file in py:func:luisy.cli.luisy_run(). This singleton allows us to access parameters like working_dir, download, … anywhere in our pipelines. We don’t need to pass arguments through that pipeline anymore to get the information into the leafs of our DAG.

class luisy.config.Config(*args, **kwargs)[source]

Bases: object

check_params()[source]

Way to check if parameters are valid and luisy is ready to initiate the luigi run. Currently working_dir has to be set and also if the user wants to use the cloud, luisy only allows runs when azure_storage_key, azure_account_name, and azure_container_name are set.

Raises:

ValueError – When parameters are wrong or missing

property config

Get the whole config

Returns:

Config with all parameter values set right now

Return type:

dict

property download
get_param(param)[source]

Get param from config.

Parameters:

param (str) – Key to parameter

Returns:

Value of parameter

init()[source]
reset()[source]

Resets the config singleton to the initial state with default parameters and environment variable values.

set_param(name, val)[source]

Set param in config.

Parameters:
  • name (str) – Key to parameter

  • val (object) – Value of parameter

update(params)[source]

Takes dict and updates the config with all entries in that dict

Parameters:

params (dict) – new params that should be set

property upload
property working_dir
class luisy.config.Singleton[source]

Bases: type

luisy.config.activate_download()[source]
luisy.config.add_working_dir(path)[source]

Adds the working dir to a filepath.

Example

If the input is path=/my_project/raw/some_file.pkl and the working dir is /mnt/d/data, then the output is /mnt/d/data/my_project/raw/some_file.pkl.

Parameters:

path (str) – Path to a file without working dir

Returns:

Path to the file with working dir prepended.

Return type:

str

luisy.config.change_working_dir(path, dir_current, dir_new)[source]

Exchanges the working dir in path

Parameters:
  • path (str) – Path where working dir should be changed

  • dir_current (str) – The path of the working dir to be replaced

  • dir_new (str) – The working dir that should be inserted

Returns:

Path with updated working dir

Return type:

str

luisy.config.get_default_params(raw=True)[source]
luisy.config.pass_args(args)[source]
luisy.config.remove_working_dir(path)[source]

Removes the working dir from the filepath.

Example

If path=/data/my_project/raw/some_file.pkl, then the output is /my_project/raw/some_file.pkl

Parameters:

path (str) – Path to a file

Returns:

Path to the file where working dir has been removed.

Return type:

str

luisy.config.set_working_dir(working_dir)[source]

luisy.decorators module

This module holds various decorators to be attached to Task objects.

luisy.decorators.auto_filename(cls_py=None, excl_params=None, excl_none=False)[source]

“Factory” for a Decorator which automatically generates the basename of the outfile based on the name of the class and the luigi.Parameter objects attached to it. Parameters allow the user to exclude some parameters and also to exclude None values in general

Parameters:
  • cls_py (class) – class that gets decorated. This parameter is kind of a hack to allow usage of this decorator without parenthesis “()”. So the end-user is able to call @auto_filename (important for backwards compatibility), @auto_filename(excl_list=[‘x’, ‘y’]) and @auto_filename() as well with all having the class below decorated

  • excl_params (list) – List of task parameters that should be excluded

  • excl_none (bool) – If True, None values are not part of the filename

Returns:

Decorated class

Return type:

class

luisy.decorators.azure_blob_storage_input(endpoint, directory, file_format='parquet', inferschema=False)[source]
luisy.decorators.azure_blob_storage_output(endpoint, directory, file_format='parquet', inferschema=False)[source]
luisy.decorators.csv_output(cls_py=None, **kwargs)
luisy.decorators.deltatable_input(catalog=None, schema=None, table_name=None)[source]
luisy.decorators.deltatable_output(catalog=None, schema=None, table_name=None)[source]
luisy.decorators.feather_output(cls_py=None, **kwargs)
luisy.decorators.final(cls_py=None)
luisy.decorators.hdf_output(cls_py=None, **kwargs)
luisy.decorators.interim(cls_py=None)
luisy.decorators.json_output(cls_py=None, **kwargs)
luisy.decorators.make_directory_output(read_file_func, regex_pattern=None)[source]

This function is used to give the user the ability to create his own directory outputs for external Tasks. Decorating a :py:class:ExternalTask with this function allows the user to use luisy.ExternalTask.output().read() in the task that requires the external task. The files will be passed one by one as a generator when calling task.input().read().

Parameters:
  • read_file_func – function read_file(file), where the user can define how his data is being loaded. return the filecontent in this function. To skip specific incoming files, make sure that your read_file_func function returns None.

  • regex_pattern (str) – regex string that can be passed. All files found in the directory will be checked against this regex and only be parsed if it matches.

Returns:

Decorator that can be attached to ExternalTask

Return type:

func

luisy.decorators.parquetdir_output(cls_py=None, outdir=None)[source]
luisy.decorators.pickle_output(cls_py=None, **kwargs)
luisy.decorators.project_name(project_name)[source]

Sets the project_name of the task to project_name to overwrite the default behavior of luisy.Task

Parameters:

project_name (str) – Name of the project

luisy.decorators.raw(cls_py=None)
class luisy.decorators.requires(*tasks_to_require, as_dict=False)[source]

Bases: object

A backwards compatible extension to luigi.util.requires which allows to access the requirements in a dict like fashion.

luisy.decorators.xlsx_output(cls_py=None, **kwargs)

luisy.default_params module

luisy.file_system module

class luisy.file_system.AzureContainer(account_name='', key=None, container_name='')[source]

Bases: object

Abstraction for an Azure container.

Parameters:
  • account_name (str) – Account name of the azure storage

  • container_name (str) – Account name of the azure container

  • key (str) – Secret key of the container within the storage account

download(source, dest)[source]

Downloads the content of the remote directory to the local directory.

Parameters:
  • source (str) – Path to the directory within the container, like /path/to/data

  • dest (str) – Path to local directory.

Raises:

ValueError – If local folder exists already

exists(path)[source]

Checks if dir or file exists.

Parameters:

path (string) – Path within the container, like /path/to/folder

Returns:

True, if file / folder exists. False otherwise.

Return type:

bool

get_df(remote_path)[source]
load_json(filepath)[source]
save_json(filepath, data)[source]

luisy.hashes module

class luisy.hashes.HashMapping(hashes, local=True, project_name=None)[source]

Bases: object

Maps generic file identifiers to hashes of the tasks. To work in different working-dirs, the file identifiers are the paths to the file with the working-dir removed.

Parameters:
  • hashes (dict) – Dict where the keys are file identifiers and the values are hashes of the tasks creating the respective files.

  • local (bool) – Whether the files of the hashes are located locally or remotely (i.e., the cloud).

  • project_name (str) – Identifier of the project. Typically the string marked by the decorator luisy.decorators.project_name()

clean_up()[source]

Checks if for all existing hashes the outfiles are there. If not, the hashes are removed.

static compute_from_tasks(tasks, project_name)[source]

Computes the mapping from a dict of tasks.

Parameters:
  • tasks (dict) – Dict where the values are luisy.Task objects and the keys are their file identifiers (i.e., the filepaths without the working-directory).

  • project_name (str) – Name of the project.

Returns:

The mapping of the hashes

Return type:

HashMapping

exists(filepath)[source]

Tests whether the output behind filepath exists, either locally or in the cloud.

static get_cloud_hash_path(project_name)[source]
get_hash(filepath)[source]
static get_local_hash_path(project_name)[source]
items()[source]
classmethod load_from_cloud(project_name)[source]

Loads the mapping from the cloud version of .luisy.hash in the cloud-storage of the project.

Parameters:

project_name (str) – Name of the project.

Returns:

The mapping of the hashes

Return type:

HashMapping

classmethod load_from_local(project_name)[source]

Loads the mapping from the local .luisy.hash file available in the working-dir of the project.

Parameters:

project_name (str) – Name of the project.

Returns:

The mapping of the hashes

Return type:

HashMapping

save()[source]

Saves the hashes either to disc (if local=True) or to the cloud storage (if local=False)

update(filepath=None, task_hash=None)[source]
class luisy.hashes.HashSynchronizer(tasks, root_task)[source]

Bases: object

Class to keep hashmappings of a luisy run synched. Calling the HashSynchronizer.initialize() loads the following Hashmappings, that will be managed in this class: - current run of luisy - local .luisy.hash - cloud .luisy.hash

This class manages the synchronization between those hashmappings when luisy did some updates on them. It also offers some functions that gives the user information on which tasks to download/upload/rerun/…

Parameters:
  • tasks (dict) – values are Tasks of current luisy run, keys are corresponding filenames.

  • root_task (luisy.Task) – Root task of the current luisy run (needed to get project name)

check_params()[source]
clear_outdated_tasks()[source]

Clears the output of all tasks that are outdated locally

Note

This can also be implemented for cloud clearing here. Currently the rerun tasks in the cloud are just overwritten when saved online

compute_new_hashes()[source]
get_all_hash_entries_that_need_a_run()[source]

Outdated or non existing local tasks

Returns:

Hash entries of tasks that will be refreshed in this luisy run

Return type:

list

get_download_hash_entries()[source]

Non existent local tasks that are available with their newest versions in the cloud

Returns:

list: hashmappings that will be downloaded

get_downloadable_hash_entries()[source]

Identifies which of the files available in the cloud have the hash needed for the local execution.

Returns:

List of hash entries that can be downloaded from the cloud having the latest hash

Return type:

list

get_local_execution_hash_entries()[source]

Tasks that need to be executed locally because they cannot be downloaded

Returns:

hashmappings with tasks that need to be executed locally

Return type:

list

get_new_cloud_hash_entries()[source]

Non existent tasks in the cloud

Returns:

hash entries that do not exist in cloud yet

Return type:

list

get_new_local_hash_entries()[source]

Tasks that are non existent on local machine

Returns:

hash entries that are not present in local luisy.hash

Return type:

list

get_outdated_cloud_hash_entries()[source]

Outdated tasks in cloud

Returns:

hash entries of outdated tasks in cloud

Return type:

list

get_outdated_local_hash_entries()[source]

Out dated tasks without external tasks as external tasks should not be reran at all

Returns:

hash entries of tasks that are outdated (no external tasks included)

Return type:

list

get_upload_hash_entries()[source]

All outdated or non existing tasks in cloud

Returns:

hash entries to upload

Return type:

list

initialize()[source]

Initializes all hashmappings needed in this class: - hash_mapping_new: computed by current pipeline - hash_mapping_local: loaded from local luisy.hash file - hash_mapping_cloud: loaded from luisy.hash file in cloud

Also sets up list of hash_entries, which are used to compare hashmappings.

read_cloud_hash_file()[source]
read_local_hash_file()[source]
set_files_to_download()[source]

Sets files that need to be downloaded when luigi is executed. Files that need to be downloaded will be set inside the Config() singleton and checked in the :py:meth`luisy.Target.exists()` method.

Note

Can this be done dynamically during luigi run?

synchronize_cloud_hashes(failed_tasks, upload_tasks=True)[source]

This function updates all online/cloud hashes to the hashes from the current run. Failed tasks can be given as an argument to prevent luisy from setting hashes that have not been uploaded. In addition, if upload_tasks is True, this method uploads all the tasks that have outdated hash entries as well.

Parameters:

failed_tasks (list) – Failed tasks in luigi run

synchronize_local_hashes(failed_tasks)[source]

This function updates all local hashes to the hashes from the current run. Failed tasks can be given as an argument to prevent luisy from setting hashes that have not been executed.

Parameters:

failed_tasks (list) – Failed tasks in luigi run

class luisy.hashes.TaskHashEntry(hash_new, hash_local=None, hash_cloud=None, task=None, filename=None)[source]

Bases: object

This class manages all available hash entries in a luisy run. Each task of luisy has its own hash depending on the code version. So in one luisy run, a task can have multiple hashes: - hash in local luisy.hash file - hash in cloud luisy_hash file (cloud) - hash that was computed in this current run

Most of the methods of this class do comparisons between those hashes to give luisy an idea of what needs to be executed, downloaded, uploaded and which hashes need to be refreshed in .luisy.hash file online or locally.

Parameters:
  • hash_new (str) – hash of current luisy run

  • hash_local (str or None) – hash of local ‘luisy.hash’ file (if available)

  • hash_cloud (str or None) – hash of cloud ‘luisy.hash’ file (if available)

  • task (luisy.Task) – instance of task which all the hashes belong to

  • filename (str) – output filename of task (key inside luisy.hash file)

Note

Can this be moved inside a Task instance?

has_outdated_cloud_hash()[source]
Returns:

if Task has a hash saved in the cloud and this is outdated

Return type:

bool

has_outdated_local_hash()[source]
has_parquet_output()[source]
in_cloud()[source]
Returns:

if Task has a hash saved in the cloud

Return type:

bool

in_local()[source]
is_downloadable()[source]
is_external_task()[source]
needs_local_execution()[source]
needs_to_be_downloaded()[source]
luisy.hashes.compute_hashes(tasks, requirements_path=None)[source]

Computes the hashes for all tasks

Parameters:
  • tasks (dict) – Dict where the values are luisy.Task objects and the keys are the paths to their output.

  • requirements_path (str) – Path to the code:requirement.txt. Defaults to None and will be read-off the config.

Returns:

Dict where the keys are the outfiles of the tasks handed in and the values are the hashes.

Return type:

dict

Todo

Can this be moved to TaskHashEntry?

luisy.hashes.get_upstream_tasks(task)[source]

Computes all upstream tasks of the given tasks by enumerating the computational DAG.

Parameters:

task (luisy.base.Task) – A luisy task object

Returns:

Dict where the values are the upstream tasks and the keys are the respective output files.

Return type:

dict

luisy.helpers module

class luisy.helpers.RegexParam(name, default, regex_placeholder='[\\S]*')[source]

Bases: object

Class to hold information about a Parameter which RegexTaskPattern class uses to create matching regex queries.

Parameters:
  • name (str) – Parameter name in luisy.Task

  • default (object) – parameters original default value

  • regex_placeholder (str) – regex string which matches all possible parameter values.

property placeholder

This creates a unique placeholder, which is used for finding the location of the parameter inside a string. i.e. filepath.

Returns:

parameter surrounded with “*”

Return type:

str

class luisy.helpers.RegexTaskPattern(task_cls, regex_placeholder='[\\S]*')[source]

Bases: object

This class can be used to find instances of a task on the disk. It uses regex to search for in the user’s filesystem. Each param is then replaced with a placeholder to build the regex from it.

Parameters:
  • task_cls (Type[luisy.Task]) – a class object of type luisy.Task. The given task should use the luisy.decorator.auto_filename() to work properly.

  • regex_placeholder (str) – This is the regex which is inserted into the filename to create a matching regex, which then searches the root directory.

Note

Only works when using @auto_filename

build_re_splitters()[source]

The splitters are needed to extract the parameters of a file that matches the regex.

Returns:

keys are parameter names, values is a list of splits of the filesname. This

allows us to separate the value from the filename

Return type:

dict

build_regex()[source]

Build a regex string from the filepath. This replaces the parameter values of the task with a regex pattern that matches these values.

Returns:

Regex pattern that can be used to search for files of that type

Return type:

str

build_root_directory()[source]

Finds “highest” possible path that can be used as a root directory.

Returns:

Root directory where the regex search is applied on the files inside it.

Return type:

str

get_available_files()[source]

Scans all files in the classes’ root_directory and then filters on the regex_pattern of this class.

Returns:

List of files which match the regexpattern and are in the root directory

Return type:

list

get_available_params()[source]

Searches for Files in the rootdirectory matching to the regex and extracts the parameters out of the filesnames using the regex_splits.

Returns:

keys are file_paths of the files found, values are a dict with param_name as

key and param_value as value.

Return type:

dict

set_filename_pattern()[source]

This creates a filename pattern with parameter values filled by RegexParam.placeholder()

luisy.helpers.add_filter_info(filter_func)[source]

this decorator for class methods adds information about the change of a pd.DataFrame’s shape before (and after if function returns a pd.DataFrame) filtering the df to the logger. This provides additional information to the user which function filtered how many rows / cols. This can be easily applied to every task method like this:

@add_filter_info
def filter_func(df):
    return df.drop(["my_most_disliked_column"], axis=1)
Parameters:

filter_func (cls.method) – method that takes pd.DataFrame as first argument and filters it by some operations

Returns:

wrapper around filter_func, that adds additional information about filtered pd.DataFrame.

luisy.helpers.get_all_reqs(task, visited=None)[source]
luisy.helpers.get_df_from_parquet_dir(path)[source]

Reads all Parquet files from a given directory and parses them into a pandas dataframe

Parameters:

path (str) – path where parquet files are stored

Returns:

dataframe with all the data from parquet files

Return type:

pd.DataFrame

luisy.helpers.get_start_date(week)[source]

Extracts the first day of the week.

Parameters:

week (luigi.date_interval.Week or string) – Week that is compatible with luigi or string of the form 2021-W12

Returns:

The first day of the week

Return type:

datetime.date

luisy.helpers.get_stop_date(week)[source]

Extracts the last day of the week.

Parameters:

week (luigi.date_interval.Week or string) – Week that is compatible with luigi or string of the form 2021-W12

Returns:

The last day of the week

Return type:

datetime.date

luisy.helpers.get_string_repr(obj)[source]

Computes a string representation of the given object.

Parameters:

obj (object) – An arbitrary python object

Returns:

A string representation

Return type:

str

luisy.helpers.remove_parent_directories(lst_with_substring_elements)[source]

Removes elements from list that are substrings from other items in that list. E.g. [‘/mnt/d’, ‘/mnt/d/a’, ‘/mnt/d/b’] removes ‘/mnt/d’ from the list. :param lst_with_substring_elements: removes substrings from that list :type lst_with_substring_elements: list

Returns:

Clean list without substrings of other elements, sorted by character length

Return type:

List

luisy.helpers.wrap_in_brackets(string)[source]

luisy.luigi_interface module

This module contains all touch points with the package :py:mod`luigi`. We had to adapt the method run_luigi, because we need to handle the system exit by ourselves.

The current logic of luisy looks looks like this:

  • Do some luisy stuff like hashing every Task in the new DAG built up by given root task

  • Run luigi

  • Do some luisy stuff, e.g. save hashes.

class luisy.luigi_interface.COLORS[source]

Bases: object

BLUE = '\x1b[94m'
END = '\x1b[0m'
GREEN = '\x1b[92m'
RED = '\x1b[91m'
class luisy.luigi_interface.CmdPrinter(hash_synchronizer)[source]

Bases: object

get_user_permission(message=None)[source]

Asks the user if the change applied is alright. These checks can be skipped if the parameter no_ask is set.

print_cloud_hash_changes()[source]
print_hash_changes()[source]
print_new_hashes()[source]
print_opening(message=None)[source]
print_sources()[source]
print_tasks(text, hash_entries=None)[source]
print_uploads()[source]
class luisy.luigi_interface.Coordinator(args_luisy, root_task=None)[source]

Bases: object

Parent class to handle different luisy modes. The basic coordinator does hash computation of a given root task and reads hashes that are found on the local filesystem in .luisy.hash file.

Parameters:
initialize_hash_synchronization()[source]
initialize_printer()[source]
print_approvals()[source]
run()[source]
run_coordinator()[source]
class luisy.luigi_interface.HashUpdateMode(args_luisy, root_task=None)[source]

Bases: Coordinator

The HashUpdateMode is started when the user passes –hash-update to luisy. Instead of rerunning all the tasks, whose hash changed in this run, this mode only updates the hashes ( which already exist) in the .luisy.hash file and does not run the tasks again. This feature can be useful when refactoring your luisy Pipeline. It’s also possible to update the hashes in the cloud when –upload is passed to luisy.

print_approvals()[source]
run_coordinator()[source]
class luisy.luigi_interface.PipelineMode(args_luisy, args_luigi, root_task=None, run_luigi_cli=True)[source]

Bases: Coordinator

This class manages the communication between luisy and luigi functionalities.

Parameters:
finalize_luigi_run()[source]
print_approvals()[source]
run_coordinator()[source]
run_luigi()[source]
luisy.luigi_interface.get_root_task(args_luigi)[source]
luisy.luigi_interface.run_luigi(argv)[source]

Run luigi with command line parsing. This function is taken from luigi.retcodes.run_with_retcodes(). We need to overwrite this,because luigi calls sys.exit(). luisy should keep on running to do stuff after luigi completed.

Parameters:

argv – Should (conceptually) be sys.argv[1:]

luisy.targets module

class luisy.targets.AzureBlobStorageTarget(outdir=None, endpoint=None, directory=None, inferschema=False, file_format='parquet')[source]

Bases: SparkTarget

property blob_uri
exists()[source]

Checks whether the file exists in Azure Blob Storage

file_ending = ''
make_dir(path)[source]
property path
read()[source]

Read object from Azure Blob

remove()[source]

we do not remove files from azure blob storage, but we always overwrite.

write(df)[source]

Write Pyspark DataFrame to Azure Blob Storage :param df: DataFrame that is to be stored in Azure Blob Storage :type df: pyspark.DataFrame

class luisy.targets.CSVTarget(path, **kwargs)[source]

Bases: LocalTarget

file_ending = 'csv'
read()[source]
write(df)[source]
class luisy.targets.CloudTarget(path, **kwargs)[source]

Bases: LuisyTarget

class luisy.targets.DeltaTableTarget(outdir=None, schema='schema', catalog='catalog', table_name=None)[source]

Bases: SparkTarget

exists()[source]

Checks whether the Deltatable exists.

Note

Ideally, we would call self.spark.catalog.tableExists(self.table_uri) to check whether the table exists, but this always returns False.

file_ending = 'DeltaTable'
make_dir(path)[source]
property path
read()[source]
remove()[source]

Remove the resource at the path specified by this FileSystemTarget.

This method is implemented by using fs.

property table_uri
write(df: DataFrame)[source]
Parameters:

df (pyspark.sql.DataFrame) – Dataframe that should be written to delta table

class luisy.targets.DirectoryTarget(path, **kwargs)[source]

Bases: LocalTarget

Target to read directories. The read_file method has to be filled by the user and passed to target by decorating it.

file_ending = ''
is_valid_file(filename)[source]

Checks if file matches given regex

Parameters:

filename (str) – file that need to be checked against regex

Returns:

flag if file matches regex

Return type:

bool

read()[source]
read_file(file)[source]
regex_pattern = None
write()[source]
class luisy.targets.FeatherTarget(path, **kwargs)[source]

Bases: LocalTarget

file_ending = 'feather'
read()[source]
write(df)[source]
class luisy.targets.HDFTarget(path, **kwargs)[source]

Bases: LocalTarget

file_ending = 'hdf'
read()[source]
write(df)[source]
class luisy.targets.JSONTarget(path, **kwargs)[source]

Bases: LocalTarget

file_ending = 'json'
read()[source]
requires_pandas = False
write(dct)[source]
class luisy.targets.LocalTarget(path, **kwargs)[source]

Bases: LuisyTarget

exists()[source]

Checks if the file exists. If download is set to True, then it is checked whether the file is available in the cloud if it is not available locally. If it is available, it is downloaded.

file_ending = None
is_folder()[source]

Checks if the output is a folder.

Note

This may has to be improved in future as we (silently) assume that any file has file extension.

make_dir(path)[source]

Creates the local path path.

remove()[source]

Remove the resource at the path specified by this FileSystemTarget.

This method is implemented by using fs.

requires_pandas = True
class luisy.targets.LuisyTarget(path=None, format=None, is_tmp=False)[source]

Bases: LocalTarget

Parameters:
  • path (str) – Path to the file

  • working_dir (str) – The working dir

  • download (bool) – Whether the file should be downloaded from the cloud if not available locally

exists()[source]

Returns True if the path for this FileSystemTarget exists; False otherwise.

This method is implemented by using fs.

property fs

Wrapper for access to file system operations.

Work in progress - add things as needed.

read()[source]
remove()[source]

Remove the resource at the path specified by this FileSystemTarget.

This method is implemented by using fs.

requires_pandas = False
write(obj)[source]
class luisy.targets.ParquetDirTarget(path, **kwargs)[source]

Bases: LocalTarget

file_ending = ''
read()[source]
requires_pandas = False
write(df)[source]
class luisy.targets.PickleTarget(path, **kwargs)[source]

Bases: LocalTarget

file_ending = 'pkl'
read()[source]
write(obj)[source]
class luisy.targets.SparkTarget(path, **kwargs)[source]

Bases: CloudTarget

this abstract class is for targets working with spark instances.

property spark
class luisy.targets.XLSXTarget(path, **kwargs)[source]

Bases: LocalTarget

file_ending = 'xlsx'
read()[source]
write(df)[source]

luisy.testing module

class luisy.testing.DFMock(*args, **kw)[source]

Bases: Mock

mode(*args)[source]
saveAsTable(table_name)[source]
toPandas()[source]
property write
class luisy.testing.LuisyTestCase(methodName='runTest')[source]

Bases: TestCase

assertFail(task, existing_outputs=None)[source]
assertMissing(task, existing_outputs=None)[source]
assertSuccess(task, existing_outputs=None)[source]
get_execution_summary(task, existing_outputs=None)[source]
run_pipeline(task, existing_outputs=None, return_store=False)[source]

Method that can be used to run luigi pipelines. You are able to pass existing outputs which mock a specific task’s output. This helps the user in testing without mocking his task output or creating a temporary directory.

Parameters:
  • task (luigi.Task) – Task that should be run

  • existing_outputs (list) – List of Tuples, where first element of tuple is the Taskobject whose output should be mocked and the second element is the content of the mocked task

  • return_store (bool) – If true, the whole store gets returned as a dict. The store contains all the outputs after the execution of the pipeline, where the tasks are the keys and the values are their outputs. If false, only the output of the given task is returned.

Returns:

depending on return_store value, a dict with output of all tasks or

just the output of the root_task

Return type:

dict or object

class luisy.testing.SparkMock(*args, **kw)[source]

Bases: Mock

data = {}
sql(qry)[source]
table(table_uri)[source]
property tables
luisy.testing.create_file(file, val)[source]
luisy.testing.create_testing_config(working_dir, storage_key='', container_name='', account_name='', mock_spark=True, **kwargs)[source]

Creates a Config Object for testing purpose. Should be used when working with temporary directories.

Parameters:
  • working_dir (str) – working dir for testing luisy tasks (mostly tmpdirs)

  • storage_key (str) – storage_key that can be set if no environ variable can be used in tests.

  • account_name (str) – Account name that can be set if no environ variable can be used in tests.

  • container_name (str) – Container name that can be set if no environ variable can be used in tests.

  • **kwargs – params like download or upload

luisy.testing.debug_run(tasks, return_summary=True)[source]

This is a helper Method which allows to run pipelines in tests and bypassing cli. This method returns some useful information from the build as well in form of a dict.

Parameters:

tasks (list or luigi.Task) – List of Tasks that should be run or a single luigi.Task

Returns:

Dictionary with information about the executed tasks. I.e. dict[‘completed’]

contains the tasks that have been completed in this run or dict[‘already_done’] contains tasks that have been completed before running luigi

Return type:

dict

luisy.testing.get_all_dependencies(task)[source]
luisy.testing.mock_target(task, output_store)[source]

luisy.visualize module

luisy.visualize.add_children(task, graph, namefunc=<function create_label>, depth=1, depth_limit=10, unique_children=False)[source]

Recursive Function that adds the children of task to a graph object. Make sure that ‘namefunc’ returns unique strings for all tasks that will be included in the tree :param task: The task :type task: luisy.tasks.base.Task :param graph: The directed ayclic graph :type graph: networkx.DiGraph :param namefunc: Function that creates a string out a task. :type namefunc: callable :param depth: Depth of current node :type depth: int :param depth_limit: Max allowed depth of tree :type depth_limit: int :param unique_children: If true, add only subset of unique types of children for each task :type unique_children: bool

luisy.visualize.create_label(task, excl_list=(), with_id=False)[source]

Make a string label out of a luigi task. Make sure the string is unique ! :param task: The task the label should be created for :type task: luisy.tasks.base.Task :param excl_list: List of parameter kwargs that are not included in the label :type excl_list: list :param with_id: If False, do not include the hash of the task. :type with_id: bool

Returns:

Label to be used for the node

Return type:

str

luisy.visualize.make_dependency_graph(task, excl_list=(), namefunc=None, unique_children=True, depth_limit=10)[source]
Create pydot graph out of a tasks dependencies.
Args:

task (luisy.tasks.base.Task): task excl_list (list): Parameter forwarded to ‘create_label’ if namefunc is none namefunc (callable): Function that creates a string out a task. depth_limit (int): Max allowed depth of tree unique_children (bool): If true, add only subset of unique types of children

luisy.visualize.simple_task_string(task)[source]

Create simple string label out of task.

luisy.visualize.unique_types(tasks)[source]

Given a list of objects, return a subset that has each type only once.

luisy.visualize.visualize_task(task, ax=None, unique_children=True, parameters_to_exclude=())[source]

Visualizes the dependencies of a luisy.Task.

Parameters:
  • task (luisy.tasks.base.Task) – The task whose dependencies should be visualized.

  • ax (matplotlib.axis.Axis) – A matplotlib axis to draw in

  • unique_children (bool) – If true, add only subset of unique types of children

  • parameters_to_exclude (list) – List of names of py:mod:luigi-parameters to exclude in visualization.

Module contents