4
\$\begingroup\$

I have a requirement to essentially scan a filesystem on any kind of filesystem and move a resource whenever it is ready. To do this I have the following code

from dataclasses import dataclass
from datetime import datetime
from enum import Enum, auto
from typing import Optional, Callable, List, Any, Dict
from os.path import split
from watchdog.events import FileSystemEventHandler, FileSystemEvent as FSEvent
from abc import ABC
from asyncio import AbstractEventLoop, sleep
from asyncpool import AsyncPool


class FileSystemResourceType(Enum):
    UNKNOWN = auto()
    DIRECTORY = auto()
    FILE = auto()
    CHARACTER = auto()
    BLOCK_SPECIAL_FILE = auto()
    FIFO = auto()
    SOCKET = auto()
    SYMLINK = auto()


@dataclass
class FileSystemResource:
    """Represents some object/resource on some filesystem.

    Depending on the filesystem, any datetime value may be None as some filesystems do not provide anyway to retrieve
    these values

    Args:
        path: The path to the resource
        name: The name of the resource (eg /path/to/file.png, name=file.png)
        is_dir: Whether the resource is a directory
        size: The size of the resource in bytes
        type: The type of resource
        accessed: The last time the file was accessed.
    """

    path: str
    name: str
    is_dir: bool
    size: int
    type: FileSystemResourceType
    accessed: Optional[datetime] = None
    created: Optional[datetime] = None
    metadata_changed: Optional[datetime] = None
    modified: Optional[datetime] = None

    @property
    def parent(self) -> str:
        return split(self.path)[0]

    def __hash__(self) -> int:
        return hash(self.path)

@dataclass(frozen=True)  # Make it readonly
class FileSystemEvent:
    """Represents an event occuring on a filesystem

    Args:
        event_type: The kind of event occuring to a resource
        file_system: The filesystem that the resource is found on
        path: The path to the resource, relative to the filesystem. See IFileSystem for relativity information
        resource: The resource at the time of the event. Some events imply that there is no longer a resource (eg.
            DELETED). This is only populated if the resource exists at the time of the event.
    """

    event_type: FilesystemEventType
    file_system: IFileSystem # Abstract file system
    path: str
    resource: Optional[FileSystemResource]


class IFileSystemEventProducer(ABC):
    """Responsible for watching a single directory and producing events."""

    @abstractmethod
    def subscribe(
        self,
        on_event_cb: Callable[[FileSystemEvent], Any],
    ):
        """Subscribes to receive directory events from this directory watcher. Return of callback is not used.

        Args:
            on_event_cb: The callback to use when an event is received
        """
class LocalPollingFileSystemEventProducer(IFileSystemEventProducer):
    """An event producer using watchdog polling to make it work on samba shares.

    Please note that each watching location is run on its own thread, so callbacks will occur in that thread.
    """

    class CallbackEventHandler(FileSystemEventHandler):
        def __init__(
            self,
            base_path: str,
            filesystem: IFileSystem,
            logger: ILogger,
        ):
            self._event_callbacks: List[Callable[[FileSystemEvent], Any]] = list()
            self._base_path = base_path
            self._fs = filesystem
            self._logger = logger

            self._event_queue: Queue[FileSystemEvent] = Queue()

        def add_callback(self, on_event_cb: Callable[[FileSystemEvent], Any]):
            self._event_callbacks.append(on_event_cb)

        def on_any_event(self, event: FSEvent):
            if (
                event.src_path == self._base_path
            ):  # Ignore events on the basepath. Unsure why these are even given.
                return

            try:
                # Pylance associates event.event_type with None as internally it is instantiated as event_type=None
                # It should be a string.
                event_type = FilesystemEventType[event.event_type.upper()]  # type: ignore
            except KeyError:
                self._logger.warning(
                    f"Directory Watcher returned unknown event: {event.event_type}"
                )
                return

            # This doesn't really work as we don't know if
            # it should be relative, not do we know where the filesystem is. But because I can't use the filesystem to
            # produce the event in the first place this is the hack we have to use.
            relative_path = event.src_path.replace(self._base_path, "").replace(
                "\\", "/"
            )

            info = None
            if self._fs.exists(relative_path):
                info = self._fs.stat(relative_path)

            directory_event = FileSystemEvent(event_type, self._fs, relative_path, info)

            for cb in self._event_callbacks:
                cb(directory_event)

    def __init__(
        self,
        path: str,
        file_system: IFileSystem,
        logger: ILogger,
    ):
        self._observer = PollingObserver()
        self._event_handler = self.CallbackEventHandler(path, file_system, logger)
        self._observer.schedule(self._event_handler, path)

        self._observer.start()

    async def stop(self):
        self._observer.stop()
        self._observer.join()

    def subscribe(
        self,
        on_event_cb: Callable[[FileSystemEvent], Any],
    ):
        self._event_handler.add_callback(on_event_cb)

class IResourceStableTrackerEventProducer(ABC):
    """If you are not Me reading this, yes I know the name sucks. Its a hard thing to name. This interface has 1
    job, take in a resource to track and determine if it is being changed. Essentially, we need something to tell us
    when a given file system resource is not longer being changed so that it can be worked with.

    TODO: Question whether being an event producer itself is a violation of SRP... Seems so overboard though.
    """

    @abstractmethod
    def track_resource(self, resource: FileSystemResource, fs: IFileSystem):
        """Tracks a resource and determines when its no longer being altered.

        Args:
            resource: The resource to track
            fs: The filesystem that the resource is on.

        Raises:
            NotFound: If the resource does not exist on the filesystem
        """

    @abstractmethod
    def cancel_track(self, path: str, fs: IFileSystem):
        """Cancels a currently running tracking of a resource

        Args:
            path: The path to the resource on the filesystem. There is no resource if the resource has been deleted.
            fs: The fs that the resource exists on.
        """

    @abstractmethod
    def subscribe(
        self, resource_stable_cb: Callable[[FileSystemResource, IFileSystem], None]
    ):
        """Subscribes the callback to receive events about files that are ready.

        Args:
            resource_stable_cb: The callback
        """

class ResourceStableTracker(IResourceStableTrackerEventProducer):
    """Purpose is to determine when a path is ready to be copied.

    I'm not a huge fan of how everything is so implicit, nor am I a fan of how we use the poll time.
    """

    def __init__(
        self,
        hash_service: IPathHasher,
        poll_time: float,
        loop: AbstractEventLoop,
        logger: ILogger,
    ):
        self._loop = loop
        self._hash_service = hash_service
        self._poll_time = poll_time
        self._logger = logger

        self._worker_pool = AsyncPool(
            loop, 10, "FileTracker", logger, self._resource_countdown
        )

        self._worker_pool.start()

        self._file_ready_cbs: List[
            Callable[[FileSystemResource, IFileSystem], None]
        ] = list()

        self._path_countdowns: Dict[str, Future] = dict()

    def track_resource(self, resource: FileSystemResource, fs: IFileSystem):
        self._loop.run_until_complete(self._add_to_pool(resource, fs))

    def cancel_track(self, path: str, fs: IFileSystem):
        if path in self._path_countdowns:
            countdown_task = self._path_countdowns[path]
            countdown_task.cancel()
            try:
                self._loop.run_until_complete(countdown_task)
            except CancelledError:
                self._logger.info(f"Successfully stopped countdown on {path}")
            self._remove_from_countdown(path)

    def subscribe(
        self, resource_stable_cb: Callable[[FileSystemResource, IFileSystem], None]
    ):
        self._file_ready_cbs.append(resource_stable_cb)

    async def _add_to_pool(self, resource: FileSystemResource, fs: IFileSystem):
        if resource.path not in self._path_countdowns:
            fut = await self._worker_pool.push(resource, fs)
            self._path_countdowns[resource.path] = fut

    async def _hash_resource(
        self, resource: FileSystemResource, fs: IFileSystem
    ) -> bytes:
        try:
            return self._hash_service.hash(resource.path, fs)
        except FileLocked:
            # If its locked, it can't be ready for transfer. We will return a unique ID to convey this.
            return str(uuid4()).encode()

    def _remove_from_countdown(self, path: str):
        self._path_countdowns.pop(path)

    async def _resource_countdown(self, resource: FileSystemResource, fs: IFileSystem):
        try:
            last_hash = await self._hash_resource(resource, fs)

            while True:
                await sleep(self._poll_time)

                current_hash = await self._hash_resource(resource, fs)

                if last_hash == current_hash and last_hash != "":
                    self._logger.info(
                        f"{str(resource.path)} completely copied into directory"
                    )
                    updated_resource = fs.stat(resource.path)
                    for cb in self._file_ready_cbs:
                        cb(updated_resource, fs)
                    return

                self._logger.debug(f"{str(resource.path)} not ready")
                last_hash = current_hash
        finally:
            self._remove_from_countdown(resource.path)

class FileSystemEventResourceTrackerDecorator(IResourceStableTrackerEventProducer):
    def __init__(
        self,
        decoratee: IResourceStableTrackerEventProducer,
    ):
        self._decoratee = decoratee

    def track_resource(self, resource: FileSystemResource, fs: IFileSystem):
        self._decoratee.track_resource(resource, fs)

    def cancel_track(self, path: str, fs: IFileSystem):
        self._decoratee.cancel_track(path, fs)

    def subscribe(
        self, resource_stable_cb: Callable[[FileSystemResource, IFileSystem], None]
    ):
        self._decoratee.subscribe(resource_stable_cb)

    def handle_fs_event(self, event: FileSystemEvent):
        if event.event_type == FilesystemEventType.CLOSED:
            return

        if event.event_type in (
            FilesystemEventType.DELETED,
            FilesystemEventType.MOVED,
        ):
            self.cancel_track(event.path, event.file_system)
            return

        if not event.file_system.exists(
            event.path
        ):  # Sometimes modified events occur on deletion. Likely if its a large file that takes multiple poll times to
            # delete
            return

        if event.resource:
            self.track_resource(event.resource, event.file_system)

These are used as follows in the composition root

logger = StandardLogger(logging.getLogger())
fs = PyFileSystemAdapter(lambda url: open_fs(url, create=True, writable=True) # Converts the pyfilesystem2 fs to local code
fs_event_producer = LocalPollingFileSystemEventProducer(config.base_path, fs, logger)
resource_tracker = FileSystemEventResourceTrackerDecorator(
    ResourceStableTracker(PartialPathHasher(), config.poll_time, asyncio.get_event_loop(), logger)
)
fs_event_producer.subscribe(resource_tracker.handle_fs_event)

I have been told before to add all relevent code, even if its lengthy, so I have done so.

My own notes about this code:

  • I think the LocalFileSystemEventProducer is kind of average. I think it would be a MAJOR improvement to inject an Observer object into the class, but when I went about this I found the Watchdog code hard to extend (notably I didn't try for long as its not a pressing matter as of this moment)
  • I like the abstraction of the IResourceStableTrackerEventProducer (the name is a complete bruh), but I am wondering if it does break the SRP principle.
  • I am entirely unsure if the adapter/decorator in FileSystemEventResourceTrackerDecorator is a good idea to make it usable with FileSystemEvents. I do need it to be usable with them, but maybe the abstraction should be elsewhere.
  • Overall, the idea of using the events to determine when a file is ready does not work in pretty much all scenarios. This is because 1) FileSystemEvents are different between filesystems. 2) The FileSystemEventProducer interface makes no guarantee its the file system making the event IE if its polling, 3) waiting for a hash or lock to be freed just seems better.
  • I read the excellent advice in https://stackoverflow.com/questions/9892137/windsor-pulling-transient-objects-from-the-container, that logging could be in a decorator. But how do you log things that don't run through the interfaces or are an implementation detail - see self._logger.debug(f"{str(resource.path)} not ready") in the ResourceStableTracker. The rest can be changed to follow that link
  • Im thinking to add a start and stop function to most classes that may need them like the event producer so they dont produce events before they are ready which will be called from the composition root as needed. I am thinking its a bad idea to add it to the interface as it gives the idea that anything can call it when really, only the composition root should call the start and stop functions when everything is wired together, and start should ONLY be responsible for starting its own class. Is this commonly a good idea?

I have 2 more questions which are a bit higher level

  1. the FileSystemEventProducer is likely to be a multithreaded, but the ResourceStableTracker (and many other users of the interface) may not be threadsafe. Is there a way to handle this problem without a leaky abstraction? At this moment in time, the ResourceStableTracker is not threadsafe, but it doesn't even know about threads at all.
  2. Some code uses asyncio as a implementation detail (like the ResourceStableTracker), but the interface doesn't expose any async functions. As far as I can tell, interfaces should not tell you about it being asynchronous or multithreaded in general, yet it can be difficult to work around these things without the knowledge. So somewhat the same question as 1. but for asyncio, how can I deal with a class that wants to await a task or is doing some IO but doesn't necessarily want to create a task?

E: I will add solutions I come up with at the bottom until each has its own solution. Then I will make my own answer if there isn't already an accepted one.

For Q2 at the bottom, I have simply wrapped the async function in a MultithreadedToAsyncAdapter which intercepts the callback and calls loop.call_soon_threadsafe. This is injected at the composition root meaning it is ok to have knowledge of the fact that the implementation will use threads. This appears to be a very elegant solution that should be reusable. This looks something like

class MultithreadedToAsyncAdapter:
    def __init__(self, loop: AbstractEventLoop, cb: Callable[[...], Coroutine[...]]):
        self._loop = loop
        self._cb = cb

    def cb_adapter(self, cb_params: ...): # Whatever the cb needs to be.
        asyncio.run_coroutine_threadsafe(self._cb(cb_params), self._loop)

As far as I can see, the handle produced from this only exposes a cancel function, so it may be a good idea to use a queue of some sort to ensure any running tasks are finished or cancelled on shutdown.

\$\endgroup\$
4
  • \$\begingroup\$ Please, add imports to your code \$\endgroup\$ Commented Jul 18, 2022 at 7:21
  • \$\begingroup\$ The whole module reads very Java-esque. Especially due to the I-prefixed ABCs which only provide one method (why not just use a plain function?). Also some of the names, like FileSystemEventResourceTrackerDecorator are very verbose. \$\endgroup\$ Commented Jul 18, 2022 at 10:03
  • 1
    \$\begingroup\$ @RichardNeumann I am purposefully trying to apply the dependency injection pattern as best I can. It has quite the learning curve. In DI single function interfaces don't seem to be frowned upon, but for Python I would probably agree that its overboard. As for naming, I agree and would love any suggestion to the name :p \$\endgroup\$ Commented Jul 18, 2022 at 22:17
  • \$\begingroup\$ @MiguelAlorda I have added the imports, there may be some I have missed while compiling across files though. Additionally, I have not included all code required to run the example (namely the filesystem implementation). \$\endgroup\$ Commented Jul 18, 2022 at 23:12

0