X7ROOT File Manager
Current Path:
/opt/alt/python311/lib/python3.11/site-packages/loguru
opt
/
alt
/
python311
/
lib
/
python3.11
/
site-packages
/
loguru
/
📁
..
📄
__init__.py
(626 B)
📄
__init__.pyi
(14.1 KB)
📁
__pycache__
📄
_asyncio_loop.py
(597 B)
📄
_better_exceptions.py
(19.53 KB)
📄
_colorama.py
(1.63 KB)
📄
_colorizer.py
(14.26 KB)
📄
_contextvars.py
(321 B)
📄
_ctime_functions.py
(1.5 KB)
📄
_datetime.py
(3.67 KB)
📄
_defaults.py
(2.86 KB)
📄
_error_interceptor.py
(1.08 KB)
📄
_file_sink.py
(14.19 KB)
📄
_filters.py
(618 B)
📄
_get_frame.py
(458 B)
📄
_handler.py
(12.34 KB)
📄
_locks_machinery.py
(1.28 KB)
📄
_logger.py
(93.72 KB)
📄
_recattrs.py
(2.79 KB)
📄
_simple_sinks.py
(3.65 KB)
📄
_string_parsers.py
(4.6 KB)
📄
py.typed
(0 B)
Editing: _simple_sinks.py
import asyncio import logging import weakref from ._asyncio_loop import get_running_loop, get_task_loop class StreamSink: def __init__(self, stream): self._stream = stream self._flushable = callable(getattr(stream, "flush", None)) self._stoppable = callable(getattr(stream, "stop", None)) self._completable = asyncio.iscoroutinefunction(getattr(stream, "complete", None)) def write(self, message): self._stream.write(message) if self._flushable: self._stream.flush() def stop(self): if self._stoppable: self._stream.stop() def tasks_to_complete(self): if not self._completable: return [] return [self._stream.complete()] class StandardSink: def __init__(self, handler): self._handler = handler def write(self, message): record = message.record message = str(message) exc = record["exception"] record = logging.getLogger().makeRecord( record["name"], record["level"].no, record["file"].path, record["line"], message, (), (exc.type, exc.value, exc.traceback) if exc else None, record["function"], {"extra": record["extra"]}, ) if exc: record.exc_text = "\n" self._handler.handle(record) def stop(self): self._handler.close() def tasks_to_complete(self): return [] class AsyncSink: def __init__(self, function, loop, error_interceptor): self._function = function self._loop = loop self._error_interceptor = error_interceptor self._tasks = weakref.WeakSet() def write(self, message): try: loop = self._loop or get_running_loop() except RuntimeError: return coroutine = self._function(message) task = loop.create_task(coroutine) def check_exception(future): if future.cancelled() or future.exception() is None: return if not self._error_interceptor.should_catch(): raise future.exception() self._error_interceptor.print(message.record, exception=future.exception()) task.add_done_callback(check_exception) self._tasks.add(task) def stop(self): for task in self._tasks: task.cancel() def tasks_to_complete(self): # To avoid errors due to "self._tasks" being mutated while iterated, the # "tasks_to_complete()" method must be protected by the same lock as "write()" (which # happens to be the handler lock). However, the tasks must not be awaited while the lock is # acquired as this could lead to a deadlock. Therefore, we first need to collect the tasks # to complete, then return them so that they can be awaited outside of the lock. return [self._complete_task(task) for task in self._tasks] async def _complete_task(self, task): loop = get_running_loop() if get_task_loop(task) is not loop: return try: await task except Exception: pass # Handled in "check_exception()" def __getstate__(self): state = self.__dict__.copy() state["_tasks"] = None return state def __setstate__(self, state): self.__dict__.update(state) self._tasks = weakref.WeakSet() class CallableSink: def __init__(self, function): self._function = function def write(self, message): self._function(message) def stop(self): pass def tasks_to_complete(self): return []
Upload File
Create Folder