aiohttp 的详细使用指南,涵盖异步 HTTP 客户端和服务器的核心功能及示例代码:
1. 安装
bash
pip install aiohttp2. 作为异步 HTTP 客户端
基本 GET 请求
python
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
html = await fetch("https://example.com")
print(html[:200]) # 打印前 200 个字符
asyncio.run(main())异步并发请求
python
async def fetch_multiple(urls):
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
task = asyncio.create_task(session.get(url))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return [await resp.text() for resp in responses]
async def main():
urls = ["https://example.com", "https://example.org"]
results = await fetch_multiple(urls)
for html in results:
print(html[:100])
asyncio.run(main())3. 作为异步 HTTP 服务器
python
from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "World")
return web.Response(text=f"Hello, {name}!")
app = web.Application()
app.add_routes([web.get('/', handle), web.get('/{name}', handle)])
if __name__ == '__main__':
web.run_app(app, port=8080)4. 高级客户端功能
POST 请求与 JSON 处理
python
async def post_data(url, data):
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as response:
return await response.json()
async def main():
data = {"key": "value"}
result = await post_data("https://httpbin.org/post", data)
print(result)
asyncio.run(main())设置超时
python
async def fetch_with_timeout(url):
timeout = aiohttp.ClientTimeout(total=10) # 总超时 10 秒
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as response:
return await response.text()5. 高级服务器功能
中间件(日志记录)
python
@web.middleware
async def logging_middleware(request, handler):
print(f"Request: {request.method} {request.path}")
response = await handler(request)
print(f"Response: {response.status}")
return response
app = web.Application(middlewares=[logging_middleware])WebSocket 服务
python
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await ws.send_str(f"Echo: {msg.data}")
return ws6. 关键配置
连接池控制
python
connector = aiohttp.TCPConnector(limit=100) # 最大并发连接数
async with aiohttp.ClientSession(connector=connector) as session:
# 使用 session 发起请求SSL 验证禁用(仅限测试环境)
python
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
await session.get("https://example.com")7. 常见场景
调用 REST API(并发优化)
python
async def fetch_user(session, user_id):
url = f"https://api.example.com/users/{user_id}"
async with session.get(url) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_user(session, i) for i in range(1, 100)]
users = await asyncio.gather(*tasks)
print(len(users)) # 99
asyncio.run(main())8. 注意事项
- 不要混合同步代码:避免在协程中使用
requests等同步库。 - 资源管理:务必使用
async with确保连接正确关闭。 - 速率限制:使用
asyncio.Semaphore控制并发量:pythonsem = asyncio.Semaphore(10) # 最大并发 10 async with sem: await session.get(url)
与 requests 的对比
| 特性 | aiohttp | requests |
|---|---|---|
| 异步支持 | ✔️ (基于 asyncio) | ❌ |
| 性能 | 高并发(非阻塞 I/O) | 单线程阻塞 |
| 适用场景 | 高频请求、微服务、实时通信 | 简单请求、脚本工具 |
通过 aiohttp,可以实现高性能的异步 HTTP 交互,适用于 Web 爬虫、API 网关、实时监控等高并发场景。