1## base_action.py -*- mode: python -*-
2# Imports:
3from __future__ import annotations
4
5# ##-- stdlib imports
6import datetime
7import functools as ftz
8import itertools as itz
9import logging as logmod
10import pathlib as pl
11import re
12import shutil
13import time
14import types
15from time import sleep
16
17# ##-- end stdlib imports
18
19# ##-- 3rd party imports
20import sh
21from jgdv import Mixin, Proto
22from jgdv.structs.dkey import DKey, DKeyed
23
24# ##-- end 3rd party imports
25
26# ##-- 1st party imports
27import doot
28from doot.errors import LocationError, TaskError, TaskFailed
29from doot.mixins.path_manip import PathManip_m
30
31# ##-- end 1st party imports
32
33# ##-| Local
34from ..._interface import ActionResponse_e
35from .._action import DootBaseAction
36from ..util.decorators import IOWriter
37
38# # End of Imports.
39
40# ##-- types
41# isort: off
42import abc
43import collections.abc
44from typing import TYPE_CHECKING, cast, assert_type, assert_never
45from typing import Generic, NewType
46# Protocols:
47from typing import Protocol, runtime_checkable
48# Typing Decorators:
49from typing import no_type_check, final, override, overload
50
51if TYPE_CHECKING:
52 from jgdv import Maybe
53 from typing import Final
54 from typing import ClassVar, Any, LiteralString
55 from typing import Never, Self, Literal
56 from typing import TypeGuard
57 from collections.abc import Iterable, Iterator, Callable, Generator
58 from collections.abc import Sequence, Mapping, MutableMapping, Hashable
59
60##--|
61
62# isort: on
63# ##-- end types
64
65##-- logging
66logging = logmod.getLogger(__name__)
67##-- end logging
68
[docs]
69@Mixin(PathManip_m, allow_inheritance=True)
70class IOBase(DootBaseAction):
71 pass
72
[docs]
73class AppendAction(IOBase):
74 """
75 Pre/Ap-pend data from the state to a file
76 """
77 sep = "\n--------------------\n"
78
79 @DKeyed.args
80 @DKeyed.types("sep", fallback=None)
81 @DKeyed.paths("to")
82 def __call__(self, spec, state, args, sep, to):
83 match sep:
84 case None:
85 sep = AppendAction.sep
86 case False:
87 sep = None
88 case _:
89 pass
90
91 loc = to
92 args_keys = [DKey(x) for x in args]
93 exp_args = [k.expand(spec, state, fallback=None) for k in args_keys]
94
95 if self._is_write_protected(loc):
96 raise LocationError("Tried to write a protected location", loc)
97
98 with loc.open('a') as f:
99 for arg in exp_args:
100 match arg:
101 case None:
102 continue
103 case str():
104 pass
105 case _:
106 arg = str(arg)
107
108 doot.report.wf.act("Append", "%s chars to %s" % (len(arg), loc))
109 if sep:
110 f.write(sep)
111
112 f.write(arg)
113 else:
114 # Done
115 pass
116
[docs]
117class WriteAction(IOBase):
118 """
119 Writes data from the state to a file, accessed through the
120 doot.locs object
121
122 'from' is *not* expanded.
123 """
124
125 @DKeyed.redirects("from")
126 @DKeyed.paths("to")
127 def __call__(self, spec, state, _from, to) -> dict|bool|None:
128 data = state[_from]
129 match to:
130 case None:
131 raise LocationError("Can't write to a null location")
132 case pl.Path() as x if self._is_write_protected(x):
133 raise LocationError("Tried to write a protected location", x)
134 case pl.Path() as x:
135 loc = x
136 case x:
137 raise TypeError("Didn't get an appropriate type for a location", x)
138
139 match data:
140 case None:
141 doot.report.wf.act("Write", "Nothing to Write")
142 case _ if not bool(data):
143 doot.report.wf.act("Write", "Nothing to Write")
144 case [*xs]:
145 text = "\n".join(xs)
146 loc.write_text(text)
147 doot.report.wf.act("Write", "%s chars to %s" % (len(text), loc))
148 case bytes():
149 doot.report.wf.act("Write", "%s bytes to %s" % (len(data), loc))
150 loc.write_bytes(data)
151 case str():
152 doot.report.wf.act("Write", "%s chars to %s" % (len(data), loc))
153 loc.write_text(data)
154 case _:
155 as_str = str(data)
156 doot.report.wf.act("Write", "%s chars to %s" % (len(as_str), loc))
157 loc.write_text(as_str)
158
159 return None
160
[docs]
161class ReadAction(IOBase):
162 """
163 Reads data from the doot.locs location to return for the state
164 The arguments of the action are held in self.spec
165 """
166
167 @DKeyed.paths("from")
168 @DKeyed.redirects("update_")
169 @DKeyed.types("as_bytes", fallback=False)
170 @DKeyed.types("type", check=str, fallback="read")
171 def __call__(self, spec, state, _from, _update, as_bytes, _type) -> dict|bool|None:
172 loc = _from
173 read_binary = as_bytes
174 read_lines = _type
175 doot.report.wf.act("Read", "%s into %s" % (loc, _update))
176 if read_binary:
177 with loc.open("rb") as f:
178 return { _update : f.read() }
179
180 with loc.open("r") as f:
181 match read_lines:
182 case "read":
183 return { _update : f.read() }
184 case "lines":
185 return { _update : f.readlines() }
186 case unk:
187 raise TypeError("Unknown read type", unk)
188
[docs]
189class CopyAction(IOBase):
190 """
191 copy a file somewhere
192 The arguments of the action are held in self.spec
193
194 'from' can be a string, path or list, always coerced to paths
195 Can handle filename/ext globs
196 """
197
198 @DKeyed.types("from", check=str|pl.Path|list)
199 @DKeyed.paths("to")
200 def __call__(self, spec, state, _from, to) -> dict|bool|None:
201 dest_loc = to
202 if self._is_write_protected(dest_loc):
203 raise LocationError("Tried to write a protected location", to)
204
205 match _from:
206 case str() | pl.Path():
207 expanded = [DKey[pl.Path](_from, fallback=pl.Path(_from)).expand(spec, state)]
208 case list():
209 expanded = [DKey[pl.Path](x, fallback=pl.Path(x)).expand(spec, state) for x in _from]
210 case _:
211 raise doot.errors.ActionError("Unrecognized type for copy sources", _from)
212
213 if len(expanded) > 1 and not dest_loc.is_dir():
214 raise doot.errors.ActionError("Tried to copy multiple files to a non-directory")
215
216 for arg in expanded:
217 match arg:
218 case pl.Path() if "*" in arg.name:
219 if not dest_loc.is_dir():
220 raise doot.errors.ActionError("Tried to copy multiple files to a non-directory")
221 for arg_sub in arg.parent.glob(arg.name):
222 self._validate_source(arg_sub)
223 shutil.copy2(arg_sub, dest_loc)
224 case pl.Path():
225 self._validate_source(arg)
226 shutil.copy2(arg, dest_loc)
227 case x:
228 raise TypeError("Unexpected Type attempted to be copied")
229 else:
230 return None
231
[docs]
232 def _validate_source(self, source:pl.Path) -> None:
233 match source:
234 case pl.Path() if not source.exists():
235 raise doot.errors.ActionError("Tried to copy a file that doesn't exist", source)
236 case pl.Path():
237 return
238 case _:
239 raise doot.errors.ActionError("CopyAction expected a path", source)
240
[docs]
241class MoveAction(IOBase):
242 """
243 move a file somewhere
244 The arguments of the action are held in self.spec
245 """
246
247 @DKeyed.paths("from", "to")
248 @DKeyed.types("force", check=bool, fallback=False)
249 def __call__(self, spec, state, _from, to, force) -> dict|bool|None:
250 source = _from
251 dest_loc = to
252
253 if self._is_write_protected(dest_loc):
254 raise LocationError("Tried to write a protected location", dest_loc)
255 if not source.exists():
256 raise doot.errors.ActionError("Tried to move a file that doesn't exist", source)
257 if dest_loc.exists() and not force:
258 raise doot.errors.ActionError("Tried to move a file that already exists at the destination", dest_loc)
259 if source.is_dir():
260 raise doot.errors.ActionError("Tried to move multiple files to a non-directory", source)
261
262 source.rename(dest_loc)
263 return None
264
[docs]
265class DeleteAction(IOBase):
266 """
267 delete a file / directory specified in spec.args
268 """
269
270 @DKeyed.types("recursive", "lax", check=bool, fallback=False)
271 def __call__(self, spec, state, recursive, lax):
272 rec = recursive
273 for arg in spec.args:
274 match DKey[pl.Path](arg).expand(spec, state):
275 case pl.Path() as loc:
276 pass
277 case x:
278 raise TypeError(type(x))
279 if self._is_write_protected(loc):
280 raise LocationError("Tried to write a protected location", loc)
281
282 if not loc.exists():
283 doot.report.wf.act("Delete", "Does Not Exist: %s" % loc)
284 continue
285
286 if loc.is_dir() and rec:
287 doot.report.wf.act("Delete", "Directory: %s" % loc)
288 shutil.rmtree(loc)
289 else:
290 doot.report.wf.act("Delete", "File: %s" % loc)
291 loc.unlink(missing_ok=lax)
292
[docs]
293class BackupAction(IOBase):
294 """
295 copy a file somewhere, but only if it doesn't exist at the dest, or is newer than the dest
296 The arguments of the action are held in self.spec
297 """
298
299 @DKeyed.paths("from", "to")
300 @DKeyed.types("tolerance", check=int, fallback=10_000_000)
301 @DKeyed.taskname
302 def __call__(self, spec, state, _from, to, tolerance, _name) -> dict|bool|None:
303 source_loc = _from
304 dest_loc = to
305
306 if self._is_write_protected(dest_loc):
307 raise LocationError("Tried to write a protected location", dest_loc)
308
309 # ExFat FS has lower resolution timestamps
310 # So guard by having a tolerance:
311 source_ns = source_loc.stat().st_mtime_ns
312 match dest_loc.exists():
313 case True:
314 dest_ns = dest_loc.stat().st_mtime_ns
315 case False:
316 dest_ns = 1
317 source_newer = source_ns > dest_ns
318 difference = int(max(source_ns, dest_ns) - min(source_ns, dest_ns))
319 below_tolerance = difference <= tolerance
320
321 if dest_loc.exists() and ((not source_newer) or below_tolerance):
322 return None
323
324 doot.report.wf.act("Backup", "%s -> %s" % (source_loc, dest_loc))
325 shutil.copy2(source_loc,dest_loc)
326 return None
327
[docs]
328class EnsureDirectory(IOBase):
329 """
330 ensure the directories passed as arguments exist
331 if they don't, build them
332 """
333
334 @DKeyed.args
335 def __call__(self, spec, state, args):
336 for arg in args:
337 loc = DKey[pl.Path](arg).expand(spec, state)
338 if not loc.exists():
339 doot.report.wf.act("MkDir", str(loc))
340 loc.mkdir(parents=True, exist_ok=True)
341
349
[docs]
350class SimpleFind(IOBase):
351 """
352 A Simple glob on a path
353 """
354
355 @DKeyed.paths("from")
356 @DKeyed.types("rec", fallback=False)
357 @DKeyed.expands("pattern")
358 @DKeyed.redirects("update_")
359 def __call__(self, spec, state, _from, rec, pattern, _update):
360 from_loc = _from
361 match rec:
362 case True:
363 return { _update : list(from_loc.rglob(pattern)) }
364 case False:
365 return { _update : list(from_loc.glob(pattern)) }
366
[docs]
367class TouchFileAction(IOBase):
368
369 @DKeyed.args
370 @DKeyed.types("soft", fallback=False)
371 def __call__(self, spec, state, args, soft):
372 for target in [DKey[pl.Path](x, fallback=None) for x in args]:
373 if (target_path:=target.expand(spec, state)) is None:
374 continue
375 if soft and not target_path.exists():
376 continue
377 target_path.touch()
378
[docs]
379class LinkAction(IOBase):
380 """
381 for x,y in spec.args:
382 x.expand().symlink_to(y.expand())
383
384 pass hard=True for a hardlink
385 """
386
387 @DKeyed.paths("link", "to", fallback=None)
388 @DKeyed.args
389 @DKeyed.types("force", "hard", check=bool, fallback=False)
390 def __call__(self, spec, state, link, to, args, force, hard):
391 if link is not None and to is not None:
392 self._do_link(spec, state, spec.kwargs.link, spec.kwargs.to, force, hard=hard)
393
394 for arg in spec.args:
395 match arg:
396 case [x,y]:
397 self._do_link(spec, state, x,y, force, hard=hard)
398 case {"link":x, "to":list() as ys}:
399 raise NotImplementedError()
400 case {"link":x, "to":y}:
401 self._do_link(spec, state, x,y, force, hard=hard)
402 case {"from":x, "to_rel":y}:
403 raise NotImplementedError()
404 case _:
405 raise TypeError("unrecognized link targets")
406
[docs]
407 def _do_link(self, spec, state, x, y, force, *, hard:bool=False) -> None:
408 x_key = DKey[pl.Path](x)
409 y_key = DKey[pl.Path](y)
410 x_path = x_key.expand(spec, state, symlinks=True)
411 y_path = y_key.expand(spec, state)
412 # TODO when py3.12: use follow_symlinks=False
413 if (x_path.exists() or x_path.is_symlink()) and not force:
414 logging.warn("SKIP: A Symlink already exists: %s -> %s", x_path, x_path.resolve())
415 return
416 if not y_path.exists():
417 raise doot.errors.ActionError("Link target does not exist", y_path)
418 if force and x_path.is_symlink():
419 logging.warn("Forcing New Symlink: %s", x_path)
420 x_path.unlink()
421 if hard:
422 x_path.hardlink_to(y_path)
423 doot.report.wf.act("Link", "Hard: %s -> %s" % (x_path, y_path))
424 else:
425 x_path.symlink_to(y_path)
426 doot.report.wf.act("Link", "Symbolic: %s -> %s" % (x_path, y_path))
427
[docs]
428class ListFiles(IOBase):
429 """ add a list of all files in a path (recursively) to the state """
430
431 @DKeyed.paths("from")
432 @DKeyed.redirects("update_")
433 def __call__(self, spec, state, _from, _update):
434 target = _from
435 base = target.parent
436 target = target.name
437 result = sh.fdfind("--color", "never", "-t", "f", "--base-directory", str(base), ".", target, _return_cmd=True)
438 filelist = result.stdout.decode().split("\n")
439
440 doot.report.wf.act("List", "%s files in %s" % (len(filelist), target))
441 return { _update : filelist }