Исходный код mldev_dvc.dvc_stage

# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://gitlab.com/mlrep/mldev/-/blob/master/NOTICE.md

"""
DVC integration module
======================

This module allows to use MLDev together with DVC.

See ``Stage`` class that implements a stage which uses DVC to set up dependencies.

"""

from mldev.experiment import *
from mldev.yaml_loader import stage_context
from mldev import *
from mldev.utils import *


[документация]@experiment_tag() class Stage(object): """ Main class for DVC pipeline stages. Converts ``script`` to a single command line and creates a stage in DVC in ``prepare``. Reproduces the stage via DVC ``repro`` in ``run``. Adds ``inputs`` as input dependencies and ``outputs`` as results and places them under version control. If ``MLDEV_NO_COMMIT`` feature is not set also makes a git commit with DVC's files. Accepts the following parameters: :param inputs: lists files and folders that this stage depends upon :param outputs: lists files and folders that this stage produces and that will be added to version control :param params: parameters for the commands being invoked :param env: additional environmental variables to pass to commands :param script: a list of commands to invoke """ def __init__(self, name="", params={}, env={}, inputs={}, outputs={}, script=[]): super().__init__() self.prepared = False self.name = name self.params = params self.env = env self.inputs = inputs self.outputs = outputs self.script = script def __repr__(self): with stage_context(self): return str(self.__dict__) def __call__(self, name, *args, experiment={}, **kwargs): with stage_context(self): stage_name = name if name else self.name logger.debug(f"{self.__class__.__name__}({stage_name}).__call__()") if stage_name: self.run(stage_name) dvc_verbose = "" git_verbose = "" if MLDevSettings().get_value("LOG_LEVEL") <= LOG_DEBUG: dvc_verbose = "-v" git_verbose = "export GIT_TRACE=true && " if not MLDevSettings().is_feature("MLDEV_NOCOMMIT"): # TODO dvc has a bug - it ignores -q parameter # capture and discard the output if log level is high capture = not (MLDevSettings().get_value("LOG_LEVEL") <= LOG_INFO) exec_command(f"dvc push {dvc_verbose}", capture=capture) exec_command(f'{git_verbose} git add ./dvc.lock ./dvc.yaml && git commit -m ' '"(mldev) data config lock" || true')
[документация] def prepare(self, stage_name): if stage_name: with stage_context(self): no_commit = "" dvc_verbose = "-q" if MLDevSettings().is_feature("MLDEV_NOCOMMIT"): no_commit = "--no-commit" if MLDevSettings().get_value("LOG_LEVEL") <= LOG_DEBUG: dvc_verbose = "-v" script = prepare_experiment_command(' && '.join([str(s) for s in self.script]), env=self.env) s = f"dvc {dvc_verbose} run {no_commit} --no-exec -f -n {stage_name}{self._build_dependency_string()} " +\ '"{}"'.format(shell_escape(script, shell_symbols='\"')) logger.info(f"Prepare ({stage_name}): " + s) # TODO dvc has a bug - it ignores -q parameter # capture and discard the output if log level is high capture = not (MLDevSettings().get_value("LOG_LEVEL") <= LOG_INFO) exec_command(s, capture=capture) self.prepared = True
[документация] def run(self, stage_name): with stage_context(self): run_cache = "" no_commit = "" dvc_verbose = "-q" if MLDevSettings().get_value("LOG_LEVEL") < LOG_INFO: dvc_verbose = "-v" if MLDevSettings().is_feature("FORCE_RUN"): run_cache = "--no-run-cache --force" if MLDevSettings().is_feature("MLDEV_NOCOMMIT"): no_commit = "--no-commit" s = f"dvc {dvc_verbose} repro {run_cache} {no_commit} {stage_name}" logger.info(f"Run ({stage_name}): " + s) # TODO dvc has a bug - it ignores -q parameter # capture and discard the output if log level is high # capture = not (MLDevSettings().get_value("LOG_LEVEL") <= LOG_INFO) capture = False # contains user program output exec_command(s, capture=capture)
def _add_file_deps(self, deps, inputs=True, create_deps=True): result = "" key = "-d" if inputs else "-o" for file_path in deps: if isinstance(file_path, FilePath): for file in file_path.get_files(start=os.path.curdir): result += f" {key} {shell_quote(file)}" if not os.path.exists(file) and create_deps: is_dir = file.endswith(os.path.sep) touch_file(file, is_dir=is_dir) if len(file_path.files) == 0: result += f" {key} {shell_quote(file_path.get_path(start=os.path.curdir))}" if not os.path.exists(str(file_path.path)) and create_deps: touch_file(str(file_path.path), is_dir=True) else: result += f" {key} {shell_quote(file_path)}" return result def _build_dependency_string(self): input_deps = "" output_deps = "" if self.inputs: input_deps += self._add_file_deps(self.inputs, inputs=True) if self.outputs: output_deps += self._add_file_deps(self.outputs, inputs=False) return " ".join([input_deps, output_deps])