I have a requirement to essentially scan a filesystem on any kind of filesystem and move a resource whenever it is ready. To do this I have the following code
from dataclasses import dataclass
from datetime import datetime
from enum import Enum, auto
from typing import Optional, Callable, List, Any, Dict
from os.path import split
from watchdog.events import FileSystemEventHandler, FileSystemEvent as FSEvent
from abc import ABC
from asyncio import AbstractEventLoop, sleep
from asyncpool import AsyncPool
class FileSystemResourceType(Enum):
UNKNOWN = auto()
DIRECTORY = auto()
FILE = auto()
CHARACTER = auto()
BLOCK_SPECIAL_FILE = auto()
FIFO = auto()
SOCKET = auto()
SYMLINK = auto()
@dataclass
class FileSystemResource:
"""Represents some object/resource on some filesystem.
Depending on the filesystem, any datetime value may be None as some filesystems do not provide anyway to retrieve
these values
Args:
path: The path to the resource
name: The name of the resource (eg /path/to/file.png, name=file.png)
is_dir: Whether the resource is a directory
size: The size of the resource in bytes
type: The type of resource
accessed: The last time the file was accessed.
"""
path: str
name: str
is_dir: bool
size: int
type: FileSystemResourceType
accessed: Optional[datetime] = None
created: Optional[datetime] = None
metadata_changed: Optional[datetime] = None
modified: Optional[datetime] = None
@property
def parent(self) -> str:
return split(self.path)[0]
def __hash__(self) -> int:
return hash(self.path)
@dataclass(frozen=True) # Make it readonly
class FileSystemEvent:
"""Represents an event occuring on a filesystem
Args:
event_type: The kind of event occuring to a resource
file_system: The filesystem that the resource is found on
path: The path to the resource, relative to the filesystem. See IFileSystem for relativity information
resource: The resource at the time of the event. Some events imply that there is no longer a resource (eg.
DELETED). This is only populated if the resource exists at the time of the event.
"""
event_type: FilesystemEventType
file_system: IFileSystem # Abstract file system
path: str
resource: Optional[FileSystemResource]
class IFileSystemEventProducer(ABC):
"""Responsible for watching a single directory and producing events."""
@abstractmethod
def subscribe(
self,
on_event_cb: Callable[[FileSystemEvent], Any],
):
"""Subscribes to receive directory events from this directory watcher. Return of callback is not used.
Args:
on_event_cb: The callback to use when an event is received
"""
class LocalPollingFileSystemEventProducer(IFileSystemEventProducer):
"""An event producer using watchdog polling to make it work on samba shares.
Please note that each watching location is run on its own thread, so callbacks will occur in that thread.
"""
class CallbackEventHandler(FileSystemEventHandler):
def __init__(
self,
base_path: str,
filesystem: IFileSystem,
logger: ILogger,
):
self._event_callbacks: List[Callable[[FileSystemEvent], Any]] = list()
self._base_path = base_path
self._fs = filesystem
self._logger = logger
self._event_queue: Queue[FileSystemEvent] = Queue()
def add_callback(self, on_event_cb: Callable[[FileSystemEvent], Any]):
self._event_callbacks.append(on_event_cb)
def on_any_event(self, event: FSEvent):
if (
event.src_path == self._base_path
): # Ignore events on the basepath. Unsure why these are even given.
return
try:
# Pylance associates event.event_type with None as internally it is instantiated as event_type=None
# It should be a string.
event_type = FilesystemEventType[event.event_type.upper()] # type: ignore
except KeyError:
self._logger.warning(
f"Directory Watcher returned unknown event: {event.event_type}"
)
return
# This doesn't really work as we don't know if
# it should be relative, not do we know where the filesystem is. But because I can't use the filesystem to
# produce the event in the first place this is the hack we have to use.
relative_path = event.src_path.replace(self._base_path, "").replace(
"\\", "/"
)
info = None
if self._fs.exists(relative_path):
info = self._fs.stat(relative_path)
directory_event = FileSystemEvent(event_type, self._fs, relative_path, info)
for cb in self._event_callbacks:
cb(directory_event)
def __init__(
self,
path: str,
file_system: IFileSystem,
logger: ILogger,
):
self._observer = PollingObserver()
self._event_handler = self.CallbackEventHandler(path, file_system, logger)
self._observer.schedule(self._event_handler, path)
self._observer.start()
async def stop(self):
self._observer.stop()
self._observer.join()
def subscribe(
self,
on_event_cb: Callable[[FileSystemEvent], Any],
):
self._event_handler.add_callback(on_event_cb)
class IResourceStableTrackerEventProducer(ABC):
"""If you are not Me reading this, yes I know the name sucks. Its a hard thing to name. This interface has 1
job, take in a resource to track and determine if it is being changed. Essentially, we need something to tell us
when a given file system resource is not longer being changed so that it can be worked with.
TODO: Question whether being an event producer itself is a violation of SRP... Seems so overboard though.
"""
@abstractmethod
def track_resource(self, resource: FileSystemResource, fs: IFileSystem):
"""Tracks a resource and determines when its no longer being altered.
Args:
resource: The resource to track
fs: The filesystem that the resource is on.
Raises:
NotFound: If the resource does not exist on the filesystem
"""
@abstractmethod
def cancel_track(self, path: str, fs: IFileSystem):
"""Cancels a currently running tracking of a resource
Args:
path: The path to the resource on the filesystem. There is no resource if the resource has been deleted.
fs: The fs that the resource exists on.
"""
@abstractmethod
def subscribe(
self, resource_stable_cb: Callable[[FileSystemResource, IFileSystem], None]
):
"""Subscribes the callback to receive events about files that are ready.
Args:
resource_stable_cb: The callback
"""
class ResourceStableTracker(IResourceStableTrackerEventProducer):
"""Purpose is to determine when a path is ready to be copied.
I'm not a huge fan of how everything is so implicit, nor am I a fan of how we use the poll time.
"""
def __init__(
self,
hash_service: IPathHasher,
poll_time: float,
loop: AbstractEventLoop,
logger: ILogger,
):
self._loop = loop
self._hash_service = hash_service
self._poll_time = poll_time
self._logger = logger
self._worker_pool = AsyncPool(
loop, 10, "FileTracker", logger, self._resource_countdown
)
self._worker_pool.start()
self._file_ready_cbs: List[
Callable[[FileSystemResource, IFileSystem], None]
] = list()
self._path_countdowns: Dict[str, Future] = dict()
def track_resource(self, resource: FileSystemResource, fs: IFileSystem):
self._loop.run_until_complete(self._add_to_pool(resource, fs))
def cancel_track(self, path: str, fs: IFileSystem):
if path in self._path_countdowns:
countdown_task = self._path_countdowns[path]
countdown_task.cancel()
try:
self._loop.run_until_complete(countdown_task)
except CancelledError:
self._logger.info(f"Successfully stopped countdown on {path}")
self._remove_from_countdown(path)
def subscribe(
self, resource_stable_cb: Callable[[FileSystemResource, IFileSystem], None]
):
self._file_ready_cbs.append(resource_stable_cb)
async def _add_to_pool(self, resource: FileSystemResource, fs: IFileSystem):
if resource.path not in self._path_countdowns:
fut = await self._worker_pool.push(resource, fs)
self._path_countdowns[resource.path] = fut
async def _hash_resource(
self, resource: FileSystemResource, fs: IFileSystem
) -> bytes:
try:
return self._hash_service.hash(resource.path, fs)
except FileLocked:
# If its locked, it can't be ready for transfer. We will return a unique ID to convey this.
return str(uuid4()).encode()
def _remove_from_countdown(self, path: str):
self._path_countdowns.pop(path)
async def _resource_countdown(self, resource: FileSystemResource, fs: IFileSystem):
try:
last_hash = await self._hash_resource(resource, fs)
while True:
await sleep(self._poll_time)
current_hash = await self._hash_resource(resource, fs)
if last_hash == current_hash and last_hash != "":
self._logger.info(
f"{str(resource.path)} completely copied into directory"
)
updated_resource = fs.stat(resource.path)
for cb in self._file_ready_cbs:
cb(updated_resource, fs)
return
self._logger.debug(f"{str(resource.path)} not ready")
last_hash = current_hash
finally:
self._remove_from_countdown(resource.path)
class FileSystemEventResourceTrackerDecorator(IResourceStableTrackerEventProducer):
def __init__(
self,
decoratee: IResourceStableTrackerEventProducer,
):
self._decoratee = decoratee
def track_resource(self, resource: FileSystemResource, fs: IFileSystem):
self._decoratee.track_resource(resource, fs)
def cancel_track(self, path: str, fs: IFileSystem):
self._decoratee.cancel_track(path, fs)
def subscribe(
self, resource_stable_cb: Callable[[FileSystemResource, IFileSystem], None]
):
self._decoratee.subscribe(resource_stable_cb)
def handle_fs_event(self, event: FileSystemEvent):
if event.event_type == FilesystemEventType.CLOSED:
return
if event.event_type in (
FilesystemEventType.DELETED,
FilesystemEventType.MOVED,
):
self.cancel_track(event.path, event.file_system)
return
if not event.file_system.exists(
event.path
): # Sometimes modified events occur on deletion. Likely if its a large file that takes multiple poll times to
# delete
return
if event.resource:
self.track_resource(event.resource, event.file_system)
These are used as follows in the composition root
logger = StandardLogger(logging.getLogger())
fs = PyFileSystemAdapter(lambda url: open_fs(url, create=True, writable=True) # Converts the pyfilesystem2 fs to local code
fs_event_producer = LocalPollingFileSystemEventProducer(config.base_path, fs, logger)
resource_tracker = FileSystemEventResourceTrackerDecorator(
ResourceStableTracker(PartialPathHasher(), config.poll_time, asyncio.get_event_loop(), logger)
)
fs_event_producer.subscribe(resource_tracker.handle_fs_event)
I have been told before to add all relevent code, even if its lengthy, so I have done so.
My own notes about this code:
- I think the LocalFileSystemEventProducer is kind of average. I think it would be a MAJOR improvement to inject an Observer object into the class, but when I went about this I found the Watchdog code hard to extend (notably I didn't try for long as its not a pressing matter as of this moment)
- I like the abstraction of the IResourceStableTrackerEventProducer (the name is a complete bruh), but I am wondering if it does break the SRP principle.
- I am entirely unsure if the adapter/decorator in FileSystemEventResourceTrackerDecorator is a good idea to make it usable with FileSystemEvents. I do need it to be usable with them, but maybe the abstraction should be elsewhere.
- Overall, the idea of using the events to determine when a file is ready does not work in pretty much all scenarios. This is because 1) FileSystemEvents are different between filesystems. 2) The FileSystemEventProducer interface makes no guarantee its the file system making the event IE if its polling, 3) waiting for a hash or lock to be freed just seems better.
- I read the excellent advice in https://stackoverflow.com/questions/9892137/windsor-pulling-transient-objects-from-the-container, that logging could be in a decorator. But how do you log things that don't run through the interfaces or are an implementation detail - see
self._logger.debug(f"{str(resource.path)} not ready")
in theResourceStableTracker
. The rest can be changed to follow that link - Im thinking to add a
start
andstop
function to most classes that may need them like the event producer so they dont produce events before they are ready which will be called from the composition root as needed. I am thinking its a bad idea to add it to the interface as it gives the idea that anything can call it when really, only the composition root should call the start and stop functions when everything is wired together, and start should ONLY be responsible for starting its own class. Is this commonly a good idea?
I have 2 more questions which are a bit higher level
- the FileSystemEventProducer is likely to be a multithreaded, but the ResourceStableTracker (and many other users of the interface) may not be threadsafe. Is there a way to handle this problem without a leaky abstraction? At this moment in time, the ResourceStableTracker is not threadsafe, but it doesn't even know about threads at all.
- Some code uses asyncio as a implementation detail (like the ResourceStableTracker), but the interface doesn't expose any async functions. As far as I can tell, interfaces should not tell you about it being asynchronous or multithreaded in general, yet it can be difficult to work around these things without the knowledge. So somewhat the same question as 1. but for asyncio, how can I deal with a class that wants to await a task or is doing some IO but doesn't necessarily want to create a task?
E: I will add solutions I come up with at the bottom until each has its own solution. Then I will make my own answer if there isn't already an accepted one.
For Q2 at the bottom, I have simply wrapped the async function in a MultithreadedToAsyncAdapter
which intercepts the callback and calls loop.call_soon_threadsafe
. This is injected at the composition root meaning it is ok to have knowledge of the fact that the implementation will use threads. This appears to be a very elegant solution that should be reusable. This looks something like
class MultithreadedToAsyncAdapter:
def __init__(self, loop: AbstractEventLoop, cb: Callable[[...], Coroutine[...]]):
self._loop = loop
self._cb = cb
def cb_adapter(self, cb_params: ...): # Whatever the cb needs to be.
asyncio.run_coroutine_threadsafe(self._cb(cb_params), self._loop)
As far as I can see, the handle produced from this only exposes a cancel
function, so it may be a good idea to use a queue of some sort to ensure any running tasks are finished or cancelled on shutdown.
I
-prefixed ABCs which only provide one method (why not just use a plain function?). Also some of the names, likeFileSystemEventResourceTrackerDecorator
are very verbose. \$\endgroup\$