FastAPI 与 Aiohttp 使用心得
本文主要记录在使用 FastAPI 作为服务端框架,并以 aiohttp
作为异步客户端进行交互时的一些关键点和技巧。
1. FastAPI 请求体 (Request Body)
在 FastAPI 中,参数可以从不同的来源获取:
路径参数 (
Path
): 嵌入在 URL 路径中,如/users/{user_id}
。查询参数 (
Query
): 附加在 URL 末尾,如?name=Alice&age=30
。
当参数不适合放在 URL 中时(例如,包含复杂数据结构或敏感信息),我们应该使用请求体来传输数据。
场景示例:
假设我们需要更新一个物品,同时需要传递物品信息 (Item
)、用户信息 (User
) 以及一个重要性等级 (importance
)。Item
和 User
是复杂的 Pydantic 模型,importance
是一个简单的整数。
服务端 (FastAPI)
我们可以使用 Body()
来明确指定某个参数应从请求体中获取。
xxxxxxxxxx
# server.py
from fastapi import FastAPI, Body
from pydantic import BaseModel
from typing import Annotated
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
class User(BaseModel):
username: str
full_name: str | None = None
put("/items/{item_id}") .
async def update_item(
item_id: int,
item: Item,
user: User,
importance: Annotated[int, Body()]
):
"""
这个端点接收一个路径参数 item_id,
以及一个包含 item, user, importance 的 JSON 请求体。
"""
return {
"item_id": item_id,
"item": item,
"user": user,
"importance": importance
}
客户端 (aiohttp)
在客户端,我们需要构造一个符合服务端期望的 JSON 结构,并通过 data
参数发送。
xxxxxxxxxx
# client.py
import aiohttp
import asyncio
import json
from pydantic import BaseModel
# 在客户端侧也定义 Pydantic 模型,方便数据验证和序列化
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
class User(BaseModel):
username: str
full_name: str | None = None
async def main():
item_id = 123
my_header = {"Content-Type": "application/json"}
# 准备要发送的数据
payload = {
"item": Item(name="Awesome Gadget", price=99.9).model_dump(),
"user": User(username="johndoe", full_name="John Doe").model_dump(),
"importance": 10,
}
async with aiohttp.ClientSession() as session:
resp = await session.put(
url=f"http://127.0.0.1:8000/items/{item_id}",
headers=my_header,
data=json.dumps(payload) # 必须手动将字典序列化为 JSON 字符串
)
print("Status:", resp.status)
print("Response:", await resp.text())
if __name__ == "__main__":
asyncio.run(main())
2. 关于参数传递的注意事项
发送表单数据 (application/x-www-form-urlencoded
)
当服务端需要接收表单数据时(例如,使用 FastAPI 的 Form
),aiohttp
客户端可以直接传递一个字典给 data
参数,它会自动进行 URL 编码。
xxxxxxxxxx
# client.py
async def main():
async with aiohttp.ClientSession() as session:
# aiohttp 会自动设置 Content-Type 为 application/x-www-form-urlencoded
resp = await session.post(
url="http://127.0.0.1:8000/login/",
data={"username": "testuser", "password": "strongpassword"}
)
print(await resp.text())
asyncio.run(main())
发送 JSON 数据 (application/json
)
发送 JSON 数据有两种推荐的方式:
使用
json
参数 (推荐):aiohttp
会自动将 Python 字典序列化为 JSON 字符串,并设置正确的Content-Type: application/json
头。xxxxxxxxxx
# client.py
from pydantic import BaseModel
class PwdModel(BaseModel):
pwdA: str
async def main():
async with aiohttp.ClientSession() as session:
payload = PwdModel(pwdA="123456").model_dump()
resp = await session.post(
url="http://127.0.0.1:8000/apply_state/EpicMo",
json=payload # 直接传递字典
)
print(await resp.text())
asyncio.run(main())
使用
data
参数: 需要手动将数据序列化为 JSON 字符串,并手动设置Content-Type
头。xxxxxxxxxx
# client.py
from pydantic import BaseModel
class PwdModel(BaseModel):
pwdA: str
async def main():
headers = {"Content-Type": "application/json"}
async with aiohttp.ClientSession() as session:
# 使用 .model_dump_json() 直接获取 JSON 字符串
payload_str = PwdModel(pwdA="123456").model_dump_json()
resp = await session.post(
url="http://127.0.0.1:8000/apply_state/EpicMo",
headers=headers,
data=payload_str # 传递 JSON 字符串
)
print(await resp.text())
asyncio.run(main())
3. 文件处理
流式下载 (Streaming Download)
当处理大文件时,一次性将整个响应加载到内存中(如使用 .read()
, .json()
, .text()
)是危险的。aiohttp
提供了流式 API 来解决这个问题。
resp.content
是一个 aiohttp.StreamReader
实例,它允许我们分块读取响应内容。
xxxxxxxxxx
# client.py
async def download_large_file(session, url, filename):
async with session.get(url) as resp:
if resp.status != 200:
print(f"Failed to download. Status: {resp.status}")
return
with open(filename, 'wb') as fd:
# iter_chunked() 以指定大小的块迭代响应内容
async for chunk in resp.content.iter_chunked(8192): # 8KB per chunk
fd.write(chunk)
print(f"File '{filename}' downloaded successfully.")
# 使用方法:
# async with aiohttp.ClientSession() as session:
# await download_large_file(session, 'http://example.com/large_file.zip', 'downloaded_file.zip')
注意: 一旦开始使用 resp.content
进行流式读取,就不能再调用 .read()
, .json()
或 .text()
方法。
文件上传 (multipart/form-data
)
当表单中包含文件时,Content-Type
必须是 multipart/form-data
。aiohttp
提供了 aiohttp.FormData
对象来轻松构建这种类型的请求。
上传小文件
FormData
会自动处理 boundary
的生成和 Content-Type
头的设置。
xxxxxxxxxx
# client.py
async def upload_files():
# 1. 创建 FormData 对象
my_data = aiohttp.FormData()
# 2. 添加字段
# add_field(name, value, filename=None, content_type=None)
my_data.add_field('file1', open('test.py', 'rb'), filename='test.py', content_type='text/plain')
my_data.add_field('file2', open('image.png', 'rb'), filename='photo.png', content_type='image/png')
my_data.add_field('token', 'my-secret-token') # 普通表单字段
# 3. 发送请求
async with aiohttp.ClientSession() as session:
async with session.post(url='http://127.0.0.1:8000/upload/', data=my_data) as resp:
print("Status:", resp.status)
print("Response:", await resp.text())
asyncio.run(upload_files())
上传大文件 (流式上传)
对于非常大的文件,为了避免一次性读入内存,可以提供一个异步生成器作为 data
。aiofiles
库是实现这一点的绝佳搭档。
x# client.py
import aiofiles
async def file_sender(file_name=None):
"""一个异步生成器,分块读取文件内容。"""
async with aiofiles.open(file_name, 'rb') as f:
chunk = await f.read(64 * 1024) # 64KB per chunk
while chunk:
yield chunk
chunk = await f.read(64 * 1024)
async def stream_upload_large_file():
async with aiohttp.ClientSession() as session:
async with session.post('http://httpbin.org/post',
data=file_sender(file_name='huge_video.mp4')) as resp:
print(await resp.text())
# asyncio.run(stream_upload_large_file())
0 评论:
发表评论