utils.py 904 B

1234567891011121314151617181920212223242526272829303132333435
  1. from concurrent.futures.thread import ThreadPoolExecutor
  2. from functools import partial
  3. from typing import Any
  4. from typing import Callable
  5. import asyncio
  6. executor = ThreadPoolExecutor(max_workers=30)
  7. async def run_async(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
  8. func_to_run = partial(func, *args, **kwargs)
  9. loop = asyncio.get_event_loop()
  10. return await loop.run_in_executor(executor, func_to_run)
  11. def resolve_dotted_name(name: str) -> Any:
  12. """
  13. import the provided dotted name
  14. >>> resolve_dotted_name('foo.bar')
  15. <object bar>
  16. :param name: dotted name
  17. """
  18. names = name.split(".")
  19. used = names.pop(0)
  20. found = __import__(used)
  21. for n in names:
  22. used += "." + n
  23. try:
  24. found = getattr(found, n)
  25. except AttributeError:
  26. __import__(used)
  27. found = getattr(found, n)
  28. return found