[docs]@dataclasses.dataclass(frozen=True)classNodeStatus:"""Node status object. Parameters ---------- remote : str, optional The repository remote, e.g. the URL of the git repository. rev : str, optional The revision of the repository, e.g. the git commit hash. run_count : int How often this Node has been run. Only incremented when the Node is restarted. state : NodeStatusEnum The state of the Node. lazy_evaluation : bool Whether to load fields lazily. tmp_path : pathlib.Path, optional The temporary path when using 'use_tmp_path'. node : Node, optional The Node object. plugins : dict[str, ZnTrackPlugin], optional Active plugins. In addition to the default plugins, MLFLow or AIM plugins will be added here. group : Group, optional The group of the Node. run_time : datetime.timedelta, optional The total run time of the Node. name: str The name of the Node. nwd: pathlib.Path The node working directory. restarted: bool Whether the Node was restarted and has been run at least once before. path: str The path to the directory where the ``zntrack.json`` file is located. """remote:str|None=Nonerev:str|None=Nonerun_count:int=0state:NodeStatusEnum=NodeStatusEnum.CREATEDlazy_evaluation:bool=Truetmp_path:pathlib.Path|None=Nonenode:"Node|None"=dataclasses.field(default=None,repr=False,compare=False,hash=False)plugins:PLUGIN_DICT=dataclasses.field(default_factory=dict,compare=False,repr=False)group:Group|None=Nonerun_time:datetime.timedelta|None=Nonepath:pathlib.Path=dataclasses.field(default_factory=pathlib.Path)lockfile:dict|None=Nonefs:AbstractFileSystem|None=dataclasses.field(default_factory=LocalFileSystem,repr=False,compare=False,hash=False)# TODO: move node name and nwd to here as well@propertydefname(self)->str:returnself.node.name@propertydefnwd(self):ifself.tmp_pathisnotNone:returnself.tmp_pathreturnself.path/get_nwd(self.node)@propertydefdvc_fs(self)->dvc.api.DVCFileSystem:"""Get the file system of the Node."""returndvc.api.DVCFileSystem(url=self.remote,rev=self.rev,)@propertydefrestarted(self)->bool:"""Whether the node was restarted."""returnself.run_count>1
[docs]@contextlib.contextmanagerdefuse_tmp_path(self,path:pathlib.Path|None=None)->t.Iterator[pathlib.Path]:"""Load the data for ``*_path`` into a temporary directory. If you can not use ``node.state.fs.open`` you can use this as an alternative. This will load the data into a temporary directory and then delete it afterwards. The respective paths ``node.*_path`` will be replaced automatically inside the context manager. This is only set, if either ``remote`` or ``rev`` are set. Otherwise, the data will be loaded from the current directory. Examples -------- >>> import zntrack >>> from pathlib import Path >>> >>> class MyNode(zntrack.Node): ... outs_path: Path = zntrack.outs_path(zntrack.nwd / "file.txt") ... ... def run(self): ... self.outs_path.parent.mkdir(parents=True, exist_ok=True) ... self.outs_path.write_text("Hello World!") ... ... @property ... def data(self): ... with self.state.use_tmp_path(): ... with open(self.outs_path) as f: ... return f.read() ... >>> # build and run the graph and make multiple commits. >>> # the `use_tmp_path` ensures that the correct version >>> # of the file is loaded in the temporary directory and >>> # the `self.outs_path` is updated accordingly. >>> >>> zntrack.from_rev("MyNode", rev="HEAD").data >>> zntrack.from_remote("MyNode", rev="HEAD~1").data """ifpathisnotNone:raiseNotImplementedError("Custom paths are not implemented yet.")# This feature is only required when the load# is loaded, not when it is saved/executedifself.remoteisNoneandself.revisNone:warnings.warn("The temporary path is not used when neither remote or rev are set.""Consider checking for `self.state.remote` and `self.state.rev` when""using `with node.state.use_tmp_path(): ...` .")yieldpathlib.Path.cwd()returnwithtempfile.TemporaryDirectory()astmpdir:self.node.__dict__["state"]["tmp_path"]=pathlib.Path(tmpdir)try:yieldpathlib.Path(tmpdir)finally:self.node.__dict__["state"].pop("tmp_path")
defget_stage(self)->dvc.stage.Stage:"""Access to the internal dvc.repo api."""stage=next(iter(self.dvc_fs.repo.stage.collect(self.name)))ifself.revisNoneandself.remoteisNone:# If we want to look at the current workspace result, we need to# load all the information, not just dvc.yamlstage.save(allow_missing=True,run_cache=False)returnstagedefget_stage_lock(self)->dict:"""Access to the internal dvc.repo api."""stage=self.get_stage()returndvc.stage.serialize.to_single_stage_lockfile(stage)defget_stage_hash(self,include_outs:bool=False)->str:"""Get the hash of the stage."""stage_lock=self.get_stage_lock()ifinclude_outs:returndict_sha256(stage_lock)else:filtered_lock={k:vfork,vinstage_lock.items()ifkin["cmd","deps","params"]}returndict_sha256(filtered_lock)defto_dict(self)->dict:"""Convert the NodeStatus to a dictionary."""content=dataclasses.asdict(self)content.pop("node")returncontentdefget_field(self,attribute:str)->dataclasses.Field:fields=dataclasses.fields(self.node)forfieldinfields:iffield.name==attribute:returnfieldelse:raiseAttributeError(f"Unable to locate '{attribute}' in {self.node}.")defadd_run_time(self,run_time:datetime.timedelta)->None:"""Add the run time to the node."""ifself.run_timeisNone:self.node.__dict__["state"]["run_time"]=run_timeelse:self.node.__dict__["state"]["run_time"]+=run_timedefincrement_run_count(self)->None:self.node.__dict__["state"]["run_count"]=self.run_count+1defset_lockfile(self,lockfile:dict)->None:"""Set the lockfile for the node."""self.node.__dict__["state"]["lockfile"]=lockfiledefsave_node_meta(self)->None:node_meta_content={"uuid":str(self.node.uuid),"run_count":self.run_count,"zntrack_version":importlib.metadata.version("zntrack"),}ifself.run_timeisnotNone:node_meta_content["run_time"]=self.run_time.total_seconds()ifself.lockfileisnotNone:node_meta_content["lockfile"]=self.lockfilewithcontextlib.suppress(importlib.metadata.PackageNotFoundError):module=self.node.__module__.split(".")[0]node_meta_content["package_version"]=importlib.metadata.version(module)self.nwd.mkdir(parents=True,exist_ok=True)(self.nwd/"node-meta.json").write_text(json.dumps(node_meta_content,indent=2))@propertydefchanged(self)->bool:stage=self.get_stage()withstage.repo.lock:returnstage.changed()