执行异步代码
Note
代码来源: bilibili-api
方便的异步转同步,使用方法如下:
from qqmusic_api import sync
from qqmusic_api.mv import MV
mv = MV("i0043gp575k")
sync(mv.get_url())
实现原理如下:
"""同步执行异步函数
代码来源: https://github.com/Nemo2011/bilibili-api
"""
import asyncio
from collections.abc import Coroutine
from concurrent.futures import ThreadPoolExecutor
from typing import Any, TypeVar
T = TypeVar("T")
def sync(coroutine: Coroutine[Any, Any, T]) -> T:
"""同步执行异步函数
请注意,每次执行都是新的 `Eventloop`
Args:
coroutine: 执行异步函数所创建的协程对象
Returns:
该协程对象的返回值
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coroutine)
else:
with ThreadPoolExecutor() as executor:
return executor.submit(asyncio.run, coroutine).result()