Source code for redbot.core.utils

import asyncio
import logging
import os
import shutil
from asyncio import AbstractEventLoop, as_completed, Semaphore
from asyncio.futures import isfuture
from itertools import chain
from pathlib import Path
from typing import (
    Any,
    AsyncIterator,
    AsyncIterable,
    Awaitable,
    Callable,
    Iterable,
    Iterator,
    List,
    Optional,
    Tuple,
    TypeVar,
    Union,
    Set,
    TYPE_CHECKING,
)

import discord
from fuzzywuzzy import fuzz, process

from .chat_formatting import box

if TYPE_CHECKING:
    from ..commands import Command, Context

__all__ = [
    "bounded_gather",
    "safe_delete",
    "fuzzy_command_search",
    "format_fuzzy_results",
    "deduplicate_iterables",
]

_T = TypeVar("_T")


# Benchmarked to be the fastest method.
[docs]def deduplicate_iterables(*iterables): """ Returns a list of all unique items in ``iterables``, in the order they were first encountered. """ # dict insertion order is guaranteed to be preserved in 3.6+ return list(dict.fromkeys(chain.from_iterable(iterables)))
def _fuzzy_log_filter(record): return record.funcName != "extractWithoutOrder" logging.getLogger().addFilter(_fuzzy_log_filter) def safe_delete(pth: Path): if pth.exists(): for root, dirs, files in os.walk(str(pth)): os.chmod(root, 0o700) for d in dirs: os.chmod(os.path.join(root, d), 0o700) for f in files: os.chmod(os.path.join(root, f), 0o700) shutil.rmtree(str(pth), ignore_errors=True) # https://github.com/PyCQA/pylint/issues/2717 class AsyncFilter(AsyncIterator[_T], Awaitable[List[_T]]): # pylint: disable=duplicate-bases """Class returned by `async_filter`. See that function for details. We don't recommend instantiating this class directly. """ def __init__( self, func: Callable[[_T], Union[bool, Awaitable[bool]]], iterable: Union[AsyncIterable[_T], Iterable[_T]], ) -> None: self.__func: Callable[[_T], Union[bool, Awaitable[bool]]] = func self.__iterable: Union[AsyncIterable[_T], Iterable[_T]] = iterable # We assign the generator strategy based on the arguments' types if isinstance(iterable, AsyncIterable): if asyncio.iscoroutinefunction(func): self.__generator_instance = self.__async_generator_async_pred() else: self.__generator_instance = self.__async_generator_sync_pred() elif asyncio.iscoroutinefunction(func): self.__generator_instance = self.__sync_generator_async_pred() else: raise TypeError("Must be either an async predicate, an async iterable, or both.") async def __sync_generator_async_pred(self) -> AsyncIterator[_T]: for item in self.__iterable: if await self.__func(item): yield item async def __async_generator_sync_pred(self) -> AsyncIterator[_T]: async for item in self.__iterable: if self.__func(item): yield item async def __async_generator_async_pred(self) -> AsyncIterator[_T]: async for item in self.__iterable: if await self.__func(item): yield item async def __flatten(self) -> List[_T]: return [item async for item in self] def __aiter__(self): return self def __await__(self): # Simply return the generator filled into a list return self.__flatten().__await__() def __anext__(self) -> Awaitable[_T]: # This will use the generator strategy set in __init__ return self.__generator_instance.__anext__() def async_filter( func: Callable[[_T], Union[bool, Awaitable[bool]]], iterable: Union[AsyncIterable[_T], Iterable[_T]], ) -> AsyncFilter[_T]: """Filter an (optionally async) iterable with an (optionally async) predicate. At least one of the arguments must be async. Parameters ---------- func : Callable[[T], Union[bool, Awaitable[bool]]] A function or coroutine function which takes one item of ``iterable`` as an argument, and returns ``True`` or ``False``. iterable : Union[AsyncIterable[_T], Iterable[_T]] An iterable or async iterable which is to be filtered. Raises ------ TypeError If neither of the arguments are async. Returns ------- AsyncFilter[T] An object which can either be awaited to yield a list of the filtered items, or can also act as an async iterator to yield items one by one. """ return AsyncFilter(func, iterable) async def async_enumerate( async_iterable: AsyncIterable[_T], start: int = 0 ) -> AsyncIterator[Tuple[int, _T]]: """Async iterable version of `enumerate`. Parameters ---------- async_iterable : AsyncIterable[T] The iterable to enumerate. start : int The index to start from. Defaults to 0. Returns ------- AsyncIterator[Tuple[int, T]] An async iterator of tuples in the form of ``(index, item)``. """ async for item in async_iterable: yield start, item start += 1 async def fuzzy_command_search( ctx: "Context", term: Optional[str] = None, *, commands: Optional[Set["Command"]] = None, min_score: int = 80, ) -> Optional[List["Command"]]: """Search for commands which are similar in name to the one invoked. Returns a maximum of 5 commands which must all be at least matched greater than ``min_score``. Parameters ---------- ctx : `commands.Context <redbot.core.commands.Context>` The command invocation context. term : Optional[str] The name of the invoked command. If ``None``, `Context.invoked_with` will be used instead. commands : Optional[Set[commands.Command]] The commands available to choose from when doing a fuzzy match. When omitted, `Bot.walk_commands` will be used instead. min_score : int The minimum score for matched commands to reach. Defaults to 80. Returns ------- Optional[List[`commands.Command <redbot.core.commands.Command>`]] A list of commands which were fuzzily matched with the invoked command. """ if ctx.guild is not None: enabled = await ctx.bot.db.guild(ctx.guild).fuzzy() else: enabled = await ctx.bot.db.fuzzy() if not enabled: return if term is None: term = ctx.invoked_with # If the term is an alias or CC, we don't want to send a supplementary fuzzy search. alias_cog = ctx.bot.get_cog("Alias") if alias_cog is not None: is_alias, alias = await alias_cog.is_alias(ctx.guild, term) if is_alias: return customcom_cog = ctx.bot.get_cog("CustomCommands") if customcom_cog is not None: cmd_obj = customcom_cog.commandobj try: await cmd_obj.get(ctx.message, term) except: pass else: return # Do the scoring. `extracted` is a list of tuples in the form `(command, score)` extracted = process.extract( term, (commands or set(ctx.bot.walk_commands())), limit=5, scorer=fuzz.QRatio ) if not extracted: return # Filter through the fuzzy-matched commands. matched_commands = [] for command, score in extracted: if score < min_score: # Since the list is in decreasing order of score, we can exit early. break if await command.can_see(ctx): matched_commands.append(command) return matched_commands async def format_fuzzy_results( ctx: "Context", matched_commands: List["Command"], *, embed: Optional[bool] = None ) -> Union[str, discord.Embed]: """Format the result of a fuzzy command search. Parameters ---------- ctx : `commands.Context <redbot.core.commands.Context>` The context in which this result is being displayed. matched_commands : List[`commands.Command <redbot.core.commands.Command>`] A list of commands which have been matched by the fuzzy search, sorted in order of decreasing similarity. embed : bool Whether or not the result should be an embed. If set to ``None``, this will default to the result of `ctx.embed_requested`. Returns ------- Union[str, discord.Embed] The formatted results. """ if embed is not False and (embed is True or await ctx.embed_requested()): lines = [] for cmd in matched_commands: lines.append(f"**{ctx.clean_prefix}{cmd.qualified_name}** {cmd.short_doc}") return discord.Embed( title="Perhaps you wanted one of these?", colour=await ctx.embed_colour(), description="\n".join(lines), ) else: lines = [] for cmd in matched_commands: lines.append(f"{ctx.clean_prefix}{cmd.qualified_name} -- {cmd.short_doc}") return "Perhaps you wanted one of these? " + box("\n".join(lines), lang="vhdl") async def _sem_wrapper(sem, task): async with sem: return await task
[docs]def bounded_gather_iter( *coros_or_futures, loop: Optional[AbstractEventLoop] = None, limit: int = 4, semaphore: Optional[Semaphore] = None, ) -> Iterator[Awaitable[Any]]: """ An iterator that returns tasks as they are ready, but limits the number of tasks running at a time. Parameters ---------- *coros_or_futures The awaitables to run in a bounded concurrent fashion. loop : asyncio.AbstractEventLoop The event loop to use for the semaphore and :meth:`asyncio.gather`. limit : Optional[`int`] The maximum number of concurrent tasks. Used when no ``semaphore`` is passed. semaphore : Optional[:class:`asyncio.Semaphore`] The semaphore to use for bounding tasks. If `None`, create one using ``loop`` and ``limit``. Raises ------ TypeError When invalid parameters are passed """ if loop is None: loop = asyncio.get_event_loop() if semaphore is None: if not isinstance(limit, int) or limit <= 0: raise TypeError("limit must be an int > 0") semaphore = Semaphore(limit, loop=loop) pending = [] for cof in coros_or_futures: if isfuture(cof) and cof._loop is not loop: raise ValueError("futures are tied to different event loops") cof = _sem_wrapper(semaphore, cof) pending.append(cof) return as_completed(pending, loop=loop)
[docs]def bounded_gather( *coros_or_futures, loop: Optional[AbstractEventLoop] = None, return_exceptions: bool = False, limit: int = 4, semaphore: Optional[Semaphore] = None, ) -> Awaitable[List[Any]]: """ A semaphore-bounded wrapper to :meth:`asyncio.gather`. Parameters ---------- *coros_or_futures The awaitables to run in a bounded concurrent fashion. loop : asyncio.AbstractEventLoop The event loop to use for the semaphore and :meth:`asyncio.gather`. return_exceptions : bool If true, gather exceptions in the result list instead of raising. limit : Optional[`int`] The maximum number of concurrent tasks. Used when no ``semaphore`` is passed. semaphore : Optional[:class:`asyncio.Semaphore`] The semaphore to use for bounding tasks. If `None`, create one using ``loop`` and ``limit``. Raises ------ TypeError When invalid parameters are passed """ if loop is None: loop = asyncio.get_event_loop() if semaphore is None: if not isinstance(limit, int) or limit <= 0: raise TypeError("limit must be an int > 0") semaphore = Semaphore(limit, loop=loop) tasks = (_sem_wrapper(semaphore, task) for task in coros_or_futures) return asyncio.gather(*tasks, loop=loop, return_exceptions=return_exceptions)