Source code for doot.workflow.actions.io.io

  1## base_action.py -*- mode: python -*-
  2# Imports:
  3from __future__ import annotations
  4
  5# ##-- stdlib imports
  6import datetime
  7import functools as ftz
  8import itertools as itz
  9import logging as logmod
 10import pathlib as pl
 11import re
 12import shutil
 13import time
 14import types
 15from time import sleep
 16
 17# ##-- end stdlib imports
 18
 19# ##-- 3rd party imports
 20import sh
 21from jgdv import Mixin, Proto
 22from jgdv.structs.dkey import DKey, DKeyed
 23
 24# ##-- end 3rd party imports
 25
 26# ##-- 1st party imports
 27import doot
 28from doot.errors import LocationError, TaskError, TaskFailed
 29from doot.mixins.path_manip import PathManip_m
 30
 31# ##-- end 1st party imports
 32
 33# ##-| Local
 34from ..._interface import ActionResponse_e
 35from .._action import DootBaseAction
 36from ..util.decorators import IOWriter
 37
 38# # End of Imports.
 39
 40# ##-- types
 41# isort: off
 42import abc
 43import collections.abc
 44from typing import TYPE_CHECKING, cast, assert_type, assert_never
 45from typing import Generic, NewType
 46# Protocols:
 47from typing import Protocol, runtime_checkable
 48# Typing Decorators:
 49from typing import no_type_check, final, override, overload
 50
 51if TYPE_CHECKING:
 52    from jgdv import Maybe
 53    from typing import Final
 54    from typing import ClassVar, Any, LiteralString
 55    from typing import Never, Self, Literal
 56    from typing import TypeGuard
 57    from collections.abc import Iterable, Iterator, Callable, Generator
 58    from collections.abc import Sequence, Mapping, MutableMapping, Hashable
 59
 60##--|
 61
 62# isort: on
 63# ##-- end types
 64
 65##-- logging
 66logging = logmod.getLogger(__name__)
 67##-- end logging
 68
[docs] 69@Mixin(PathManip_m, allow_inheritance=True) 70class IOBase(DootBaseAction): 71 pass
72
[docs] 73class AppendAction(IOBase): 74 """ 75 Pre/Ap-pend data from the state to a file 76 """ 77 sep = "\n--------------------\n" 78 79 @DKeyed.args 80 @DKeyed.types("sep", fallback=None) 81 @DKeyed.paths("to") 82 def __call__(self, spec, state, args, sep, to): 83 match sep: 84 case None: 85 sep = AppendAction.sep 86 case False: 87 sep = None 88 case _: 89 pass 90 91 loc = to 92 args_keys = [DKey(x) for x in args] 93 exp_args = [k.expand(spec, state, fallback=None) for k in args_keys] 94 95 if self._is_write_protected(loc): 96 raise LocationError("Tried to write a protected location", loc) 97 98 with loc.open('a') as f: 99 for arg in exp_args: 100 match arg: 101 case None: 102 continue 103 case str(): 104 pass 105 case _: 106 arg = str(arg) 107 108 doot.report.wf.act("Append", "%s chars to %s" % (len(arg), loc)) 109 if sep: 110 f.write(sep) 111 112 f.write(arg) 113 else: 114 # Done 115 pass
116
[docs] 117class WriteAction(IOBase): 118 """ 119 Writes data from the state to a file, accessed through the 120 doot.locs object 121 122 'from' is *not* expanded. 123 """ 124 125 @DKeyed.redirects("from") 126 @DKeyed.paths("to") 127 def __call__(self, spec, state, _from, to) -> dict|bool|None: 128 data = state[_from] 129 match to: 130 case None: 131 raise LocationError("Can't write to a null location") 132 case pl.Path() as x if self._is_write_protected(x): 133 raise LocationError("Tried to write a protected location", x) 134 case pl.Path() as x: 135 loc = x 136 case x: 137 raise TypeError("Didn't get an appropriate type for a location", x) 138 139 match data: 140 case None: 141 doot.report.wf.act("Write", "Nothing to Write") 142 case _ if not bool(data): 143 doot.report.wf.act("Write", "Nothing to Write") 144 case [*xs]: 145 text = "\n".join(xs) 146 loc.write_text(text) 147 doot.report.wf.act("Write", "%s chars to %s" % (len(text), loc)) 148 case bytes(): 149 doot.report.wf.act("Write", "%s bytes to %s" % (len(data), loc)) 150 loc.write_bytes(data) 151 case str(): 152 doot.report.wf.act("Write", "%s chars to %s" % (len(data), loc)) 153 loc.write_text(data) 154 case _: 155 as_str = str(data) 156 doot.report.wf.act("Write", "%s chars to %s" % (len(as_str), loc)) 157 loc.write_text(as_str) 158 159 return None
160
[docs] 161class ReadAction(IOBase): 162 """ 163 Reads data from the doot.locs location to return for the state 164 The arguments of the action are held in self.spec 165 """ 166 167 @DKeyed.paths("from") 168 @DKeyed.redirects("update_") 169 @DKeyed.types("as_bytes", fallback=False) 170 @DKeyed.types("type", check=str, fallback="read") 171 def __call__(self, spec, state, _from, _update, as_bytes, _type) -> dict|bool|None: 172 loc = _from 173 read_binary = as_bytes 174 read_lines = _type 175 doot.report.wf.act("Read", "%s into %s" % (loc, _update)) 176 if read_binary: 177 with loc.open("rb") as f: 178 return { _update : f.read() } 179 180 with loc.open("r") as f: 181 match read_lines: 182 case "read": 183 return { _update : f.read() } 184 case "lines": 185 return { _update : f.readlines() } 186 case unk: 187 raise TypeError("Unknown read type", unk)
188
[docs] 189class CopyAction(IOBase): 190 """ 191 copy a file somewhere 192 The arguments of the action are held in self.spec 193 194 'from' can be a string, path or list, always coerced to paths 195 Can handle filename/ext globs 196 """ 197 198 @DKeyed.types("from", check=str|pl.Path|list) 199 @DKeyed.paths("to") 200 def __call__(self, spec, state, _from, to) -> dict|bool|None: 201 dest_loc = to 202 if self._is_write_protected(dest_loc): 203 raise LocationError("Tried to write a protected location", to) 204 205 match _from: 206 case str() | pl.Path(): 207 expanded = [DKey[pl.Path](_from, fallback=pl.Path(_from)).expand(spec, state)] 208 case list(): 209 expanded = [DKey[pl.Path](x, fallback=pl.Path(x)).expand(spec, state) for x in _from] 210 case _: 211 raise doot.errors.ActionError("Unrecognized type for copy sources", _from) 212 213 if len(expanded) > 1 and not dest_loc.is_dir(): 214 raise doot.errors.ActionError("Tried to copy multiple files to a non-directory") 215 216 for arg in expanded: 217 match arg: 218 case pl.Path() if "*" in arg.name: 219 if not dest_loc.is_dir(): 220 raise doot.errors.ActionError("Tried to copy multiple files to a non-directory") 221 for arg_sub in arg.parent.glob(arg.name): 222 self._validate_source(arg_sub) 223 shutil.copy2(arg_sub, dest_loc) 224 case pl.Path(): 225 self._validate_source(arg) 226 shutil.copy2(arg, dest_loc) 227 case x: 228 raise TypeError("Unexpected Type attempted to be copied") 229 else: 230 return None 231
[docs] 232 def _validate_source(self, source:pl.Path) -> None: 233 match source: 234 case pl.Path() if not source.exists(): 235 raise doot.errors.ActionError("Tried to copy a file that doesn't exist", source) 236 case pl.Path(): 237 return 238 case _: 239 raise doot.errors.ActionError("CopyAction expected a path", source)
240
[docs] 241class MoveAction(IOBase): 242 """ 243 move a file somewhere 244 The arguments of the action are held in self.spec 245 """ 246 247 @DKeyed.paths("from", "to") 248 @DKeyed.types("force", check=bool, fallback=False) 249 def __call__(self, spec, state, _from, to, force) -> dict|bool|None: 250 source = _from 251 dest_loc = to 252 253 if self._is_write_protected(dest_loc): 254 raise LocationError("Tried to write a protected location", dest_loc) 255 if not source.exists(): 256 raise doot.errors.ActionError("Tried to move a file that doesn't exist", source) 257 if dest_loc.exists() and not force: 258 raise doot.errors.ActionError("Tried to move a file that already exists at the destination", dest_loc) 259 if source.is_dir(): 260 raise doot.errors.ActionError("Tried to move multiple files to a non-directory", source) 261 262 source.rename(dest_loc) 263 return None
264
[docs] 265class DeleteAction(IOBase): 266 """ 267 delete a file / directory specified in spec.args 268 """ 269 270 @DKeyed.types("recursive", "lax", check=bool, fallback=False) 271 def __call__(self, spec, state, recursive, lax): 272 rec = recursive 273 for arg in spec.args: 274 match DKey[pl.Path](arg).expand(spec, state): 275 case pl.Path() as loc: 276 pass 277 case x: 278 raise TypeError(type(x)) 279 if self._is_write_protected(loc): 280 raise LocationError("Tried to write a protected location", loc) 281 282 if not loc.exists(): 283 doot.report.wf.act("Delete", "Does Not Exist: %s" % loc) 284 continue 285 286 if loc.is_dir() and rec: 287 doot.report.wf.act("Delete", "Directory: %s" % loc) 288 shutil.rmtree(loc) 289 else: 290 doot.report.wf.act("Delete", "File: %s" % loc) 291 loc.unlink(missing_ok=lax)
292
[docs] 293class BackupAction(IOBase): 294 """ 295 copy a file somewhere, but only if it doesn't exist at the dest, or is newer than the dest 296 The arguments of the action are held in self.spec 297 """ 298 299 @DKeyed.paths("from", "to") 300 @DKeyed.types("tolerance", check=int, fallback=10_000_000) 301 @DKeyed.taskname 302 def __call__(self, spec, state, _from, to, tolerance, _name) -> dict|bool|None: 303 source_loc = _from 304 dest_loc = to 305 306 if self._is_write_protected(dest_loc): 307 raise LocationError("Tried to write a protected location", dest_loc) 308 309 # ExFat FS has lower resolution timestamps 310 # So guard by having a tolerance: 311 source_ns = source_loc.stat().st_mtime_ns 312 match dest_loc.exists(): 313 case True: 314 dest_ns = dest_loc.stat().st_mtime_ns 315 case False: 316 dest_ns = 1 317 source_newer = source_ns > dest_ns 318 difference = int(max(source_ns, dest_ns) - min(source_ns, dest_ns)) 319 below_tolerance = difference <= tolerance 320 321 if dest_loc.exists() and ((not source_newer) or below_tolerance): 322 return None 323 324 doot.report.wf.act("Backup", "%s -> %s" % (source_loc, dest_loc)) 325 shutil.copy2(source_loc,dest_loc) 326 return None
327
[docs] 328class EnsureDirectory(IOBase): 329 """ 330 ensure the directories passed as arguments exist 331 if they don't, build them 332 """ 333 334 @DKeyed.args 335 def __call__(self, spec, state, args): 336 for arg in args: 337 loc = DKey[pl.Path](arg).expand(spec, state) 338 if not loc.exists(): 339 doot.report.wf.act("MkDir", str(loc)) 340 loc.mkdir(parents=True, exist_ok=True)
341
[docs] 342class UserInput(IOBase): 343 344 @DKeyed.types("prompt", check=str, fallback="?::- ") 345 @DKeyed.redirects("update_") 346 def __call__(self, spec, state, prompt, _update): 347 result = input(prompt) 348 return { _update : result }
349
[docs] 350class SimpleFind(IOBase): 351 """ 352 A Simple glob on a path 353 """ 354 355 @DKeyed.paths("from") 356 @DKeyed.types("rec", fallback=False) 357 @DKeyed.expands("pattern") 358 @DKeyed.redirects("update_") 359 def __call__(self, spec, state, _from, rec, pattern, _update): 360 from_loc = _from 361 match rec: 362 case True: 363 return { _update : list(from_loc.rglob(pattern)) } 364 case False: 365 return { _update : list(from_loc.glob(pattern)) }
366
[docs] 367class TouchFileAction(IOBase): 368 369 @DKeyed.args 370 @DKeyed.types("soft", fallback=False) 371 def __call__(self, spec, state, args, soft): 372 for target in [DKey[pl.Path](x, fallback=None) for x in args]: 373 if (target_path:=target.expand(spec, state)) is None: 374 continue 375 if soft and not target_path.exists(): 376 continue 377 target_path.touch()
378
[docs] 379class LinkAction(IOBase): 380 """ 381 for x,y in spec.args: 382 x.expand().symlink_to(y.expand()) 383 384 pass hard=True for a hardlink 385 """ 386 387 @DKeyed.paths("link", "to", fallback=None) 388 @DKeyed.args 389 @DKeyed.types("force", "hard", check=bool, fallback=False) 390 def __call__(self, spec, state, link, to, args, force, hard): 391 if link is not None and to is not None: 392 self._do_link(spec, state, spec.kwargs.link, spec.kwargs.to, force, hard=hard) 393 394 for arg in spec.args: 395 match arg: 396 case [x,y]: 397 self._do_link(spec, state, x,y, force, hard=hard) 398 case {"link":x, "to":list() as ys}: 399 raise NotImplementedError() 400 case {"link":x, "to":y}: 401 self._do_link(spec, state, x,y, force, hard=hard) 402 case {"from":x, "to_rel":y}: 403 raise NotImplementedError() 404 case _: 405 raise TypeError("unrecognized link targets") 406
427
[docs] 428class ListFiles(IOBase): 429 """ add a list of all files in a path (recursively) to the state """ 430 431 @DKeyed.paths("from") 432 @DKeyed.redirects("update_") 433 def __call__(self, spec, state, _from, _update): 434 target = _from 435 base = target.parent 436 target = target.name 437 result = sh.fdfind("--color", "never", "-t", "f", "--base-directory", str(base), ".", target, _return_cmd=True) 438 filelist = result.stdout.decode().split("\n") 439 440 doot.report.wf.act("List", "%s files in %s" % (len(filelist), target)) 441 return { _update : filelist }