django jwt认证 Python中实现JWT认证的完整指南 django登
目录
- 引言
- 一、JWT 是什么?为什么需要它?
- 传统 session 与 JWT 对比
- 二、JWT 的结构解析
- 三、Python 中实现 JWT
- 1. 安装 PyJWT 包
- 2. 生成 JWT
- 3. 验证 JWT
- 4. 错误处理大全
- 四、高质量应用场景
- 1. 双令牌体系(Access + Refresh)
- 详细说明表格:
- 异常处理补充表:
- 2. 与 FastAPI/Django 集成
- 五、安全最佳操作
- 六、性能优化技巧
- 算法性能比较表(执行时刻,越小越好)
- 详细对比表格:
- 七、完整示例:FastAPI 实现
- 小编归纳一下
引言
JSON Web Tokens (JWT) 是现代 Web 开发中广泛使用的身份验证机制。这篇文章小编将用生动的方式带你全面了解 JWT 在 Python 中的实现,包括生成、验证和各种相关技巧,通过丰富的比喻、表格和流程图帮助你彻底掌握 JWT。
一、JWT 是什么?为什么需要它?
想象你去游乐园,入园时会得到一个手环。这个手环:
- 包含信息:显示你的门票类型(VIP/普通)
- 防伪设计:有独特图案难以伪造
- 有效期:只在当天有效
JWT 就是这样的数字手环:
游乐园手环 | JWT 令牌 |
---|---|
门票类型 | 用户角色 |
入园时刻 | 签发时刻(iat) |
闭园时刻 | 过期时刻(exp) |
防伪标记 | 数字签名 |
手环材质 | 加密算法 |
传统 session 与 JWT 对比
特性 | Session | JWT |
---|---|---|
存储位置 | 服务器 | 客户端 |
扩展性 | 需要共享session | 天然无情形 |
跨域支持 | 需要配置 | 原生支持 |
移动端友好度 | 需处理cookie | 直接使用header |
安全性 | 依赖cookie安全 | 依赖token存储方式 |
典型场景 | 传统Web应用 | API/移动应用/微服务 |
二、JWT 的结构解析
一个 JWT 看起来像这样:xxxxx.yyyyy.zzzzz
就像三明治分三层:
Header(面包上层)
“alg”: “HS256”, // 签名算法(HMAC SHA256) “typ”: “JWT” // 类型标识}
Payload(馅料)
“sub”: “user123”, // 主题(用户ID) “name”: “张三”, “admin”: true, “iat”: 1516239022 // 签发时刻}
Signature(面包下层+防伪标记)
HMACSHA256( base64UrlEncode(header) + “.” + base64UrlEncode(payload), 密钥)
生成流程:
[Header] → base64编码 → xxxxx[Payload] → base64编码 → yyyyy[xxxxx.yyyyy + 密钥] → 签名算法 → zzzzz最终令牌:xxxxx.yyyyy.zzzzz
三、Python 中实现 JWT
1. 安装 PyJWT 包
pip install pyjwt
2. 生成 JWT
import jwtimport datetimefrom datetime import timezone 建议从环境变量读取SECRET_KEY = “your_super_secret_key”def generate_jwt(user_id: str, username: str, role: str) -> str: “””生成JWT令牌””” payload = “sub”: user_id, 标准字段:主题 “name”: username, “role”: role, “iat”: datetime.datetime.now(tz=timezone.utc), 签发时刻 “exp”: datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(hours=1) 过期时刻 } return jwt.encode(payload, SECRET_KEY, algorithm=”HS256″)
参数详解表:
参数 | 类型 | 必填 | 说明 | 示例 |
---|---|---|---|---|
payload | dict | 是 | 负载数据 | “sub”: “user123”} |
key | str | 是 | 签名密钥 | “secret” |
algorithm | str | 否 | 签名算法 | “HS256” |
headers | dict | 否 | 额外头部 | “kid”: “key1”} |
标准声明字段(建议):
字段 | 全称 | 说明 | 示例 |
---|---|---|---|
sub | Subject | 主题(用户ID) | “user123” |
exp | Expiration | 过期时刻 | 1735689600 |
iat | Issued At | 签发时刻 | 1735686000 |
aud | Audience | 接收方 | “mobile-app” |
iss | Issuer | 签发者 | “auth-server” |
3. 验证 JWT
def verify_jwt(token: str) -> dict: “””验证JWT令牌””” try: payload = jwt.decode( token, SECRET_KEY, algorithms=[“HS256″], audience=”your-app”, 验证接收方 issuer=”auth-server” 验证签发方 ) return payload except jwt.PyJWTError as e: print(f”Token验证失败: type(e).__name__}: e}”) return None
验证流程示意图:
4. 错误处理大全
from fastapi import HTTPException, statusdef validate_token(token: str): try: return jwt.decode(token, SECRET_KEY, algorithms=[“HS256″]) except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=”Token已过期”, headers=”WWW-Authenticate”: “Bearer”} ) except jwt.InvalidTokenError as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f”无效Token: str(e)}”, headers=”WWW-Authenticate”: “Bearer”} )
异常类型表:
异常类 | 触发条件 | HTTP情形码 | 处理建议 |
---|---|---|---|
ExpiredSignatureError | Token过期 | 401 | 提示重新登录 |
InvalidSignatureError | 签名无效 | 401 | 拒绝访问 |
InvalidTokenError | 通用错误 | 401 | 记录日志 |
MissingRequiredClaimError | 缺少必要声明 | 400 | 返回错误详情 |
InvalidIssuerError | 签发者不匹配 | 403 | 审计日志 |
四、高质量应用场景
1. 双令牌体系(Access + Refresh)
下面内容是基于你的 JWT 认证流程整理的表格表示:
步骤 | 流程节点 | 条件/判断 | 动作/响应 | 备注 |
---|---|---|---|---|
1 | 用户登录 | 提交账号密码 | 体系验证凭证 | |
2 | 验证结局 | 验证成功 | 生成: &8211; access_token(15分钟) &8211; refresh_token(7天) | |
验证失败 | 返回错误信息 | HTTP 401 | ||
3 | 返回响应 | 返回双token给客户端 | ||
4 | API访问 | 携带access_token | 验证token有效性 | |
5 | 验证结局 | access_token有效 | 正常返回请求数据 | HTTP 200 |
access_token无效 | 检查是否存在refresh_token | |||
6 | 刷新检查 | 存在有效refresh_token | 发放新access_token | HTTP 200 + 新token |
无有效refresh_token | 要求重新登录 | HTTP 401 |
详细说明表格:
阶段 | 条件分支 | 体系行为 | 客户端响应 | HTTP情形码 |
---|---|---|---|---|
登录阶段 | ||||
1.1 | 凭证正确 | 生成双token | 接收: &8211; access_token &8211; refresh_token | 200 |
1.2 | 凭证错误 | 终止流程 | 收到错误提示 | 401 |
API访问阶段 | ||||
2.1 | access_token有效 | 处理请求 | 获取正常数据 | 200 |
2.2 | access_token过期 | 检查refresh_token | ||
Token刷新阶段 | ||||
3.1 | refresh_token有效 | 发放新access_token | 获取新token | 200 |
3.2 | refresh_token无效 | 终止流程 | 要求重新登录 | 401 |
异常处理补充表:
异常情况 | 体系处理 | 客户端表现 |
---|---|---|
access_token格式错误 | 直接拒绝请求 | 收到"无效token"错误 |
refresh_token过期 | 清除客户端存储 | 跳转登录页面 |
连续使用过期refresh_token | 标记为安全事件 | 强制登出所有设备 |
实现代码:
def generate_token_pair(user_id: str): “””生成令牌对””” access_payload = “sub”: user_id, “type”: “access”, “exp”: datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(minutes=15) } refresh_payload = “sub”: user_id, “type”: “refresh”, “exp”: datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(days=7) } access_token = jwt.encode(access_payload, SECRET_KEY, algorithm=”HS256″) refresh_token = jwt.encode(refresh_payload, SECRET_KEY, algorithm=”HS256″) return “access_token”: access_token, “refresh_token”: refresh_token }def refresh_access_token(refresh_token: str): “””使用refresh_token获取新access_token””” try: payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[“HS256”]) if payload.get(“type”) != “refresh”: raise HTTPException(status_code=400, detail=”无效的refresh token类型”) new_payload = “sub”: payload[“sub”], “type”: “access”, “exp”: datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(minutes=15) } return jwt.encode(new_payload, SECRET_KEY, algorithm=”HS256″) except jwt.PyJWTError as e: raise HTTPException(status_code=401, detail=f”refresh token无效: str(e)}”)
2. 与 FastAPI/Django 集成
FastAPI 示例:
from fastapi import Depends, FastAPI, HTTPExceptionfrom fastapi.security import HTTPBearer, HTTPAuthorizationCredentialssecurity = HTTPBearer()app = FastAPI()async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): token = credentials.credentials try: payload = jwt.decode(token, SECRET_KEY, algorithms=[“HS256″]) return payload except jwt.PyJWTError: raise HTTPException(status_code=401, detail=”无效或过期的token”)@app.get(“/protected”)async def protected_route(user: dict = Depends(get_current_user)): return “message”: f”无论兄弟们好, user[‘sub’]}!”, “user_data”: user}
Django 示例:
from django.http import JsonResponsefrom functools import wrapsdef jwt_required(view_func): @wraps(view_func) def wrapper(request, args, kwargs): auth_header = request.headers.get(‘Authorization’) if not auth_header or not auth_header.startswith(‘Bearer ‘): return JsonResponse(“error”: “未提供token”}, status=401) token = auth_header.split(‘ ‘)[1] try: request.user = jwt.decode(token, SECRET_KEY, algorithms=[“HS256”]) return view_func(request, args, kwargs) except jwt.ExpiredSignatureError: return JsonResponse(“error”: “token已过期”}, status=401) except jwt.InvalidTokenError: return JsonResponse(“error”: “无效token”}, status=401) return wrapper@jwt_requireddef protected_view(request): return JsonResponse(“data”: “受保护的内容”, “user”: request.user})
五、安全最佳操作
密钥管理:
- 使用环境变量存储密钥
import osSECRET_KEY = os.getenv(“JWT_SECRET_KEY”, “fallback-secret”)
- 定期轮换密钥(密钥版本控制)
keys = “2023”: “old-secret”, “2024”: “current-secret”}
增强安全措施:
def generate_secure_token(user, ip): “””生成带IP绑定的token””” payload = “sub”: user.id, “ip”: ip, 绑定客户端IP “jti”: str(uuid.uuid4()) 唯一标识防止重放 } return jwt.encode(payload, SECRET_KEY, algorithm=”HS256″)def verify_secure_token(token, ip): payload = jwt.decode(token, SECRET_KEY, algorithms=[“HS256”]) if payload.get(“ip”) != ip: raise ValueError(“IP地址不匹配”) return payload
黑名单实现:
from redis import Redisredis = Redis(host=’localhost’, port=6379)def revoke_token(jti: str, expire_in: int): “””将token加入黑名单””” redis.setex(f”blacklist:jti}”, expire_in, “revoked”)def is_revoked(jti: str) -> bool: “””检查是否在黑名单””” return bool(redis.exists(f”blacklist:jti}”))
六、性能优化技巧
选择更快的算法:
算法性能比较表(执行时刻,越小越好)
算法类型 | 性能指标(单位:ms) | 备注 |
---|---|---|
HS256 | 1.2 | 对称加密算法 |
RS256 | 3.8 | RSA 非对称加密 |
ES256 | 4.1 | ECDSA 非对称加密 |
详细对比表格:
特性对比 | HS256 | RS256 | ES256 |
---|---|---|---|
算法类型 | 对称加密 | 非对称加密 | 非对称加密 |
密钥管理 | 共享密钥 | 公钥/私钥 | 公钥/私钥 |
签名速度 | &x26a1;&xfe0f; 1.2ms | &x1f422; 3.8ms | &x1f422; 4.1ms |
验证速度 | &x26a1;&xfe0f; 最快 | &x1f422; 慢 | &x1f422; 最慢 |
安全性 | 中等 | 高 | 最高 |
适用场景 | 内部服务 | 公开API | 金融级应用 |
减少payload大致:
不推荐 – payload过大payload = user.__dict__, “iat”: …, “exp”: …} 推荐 – 只存储必要信息payload = “sub”: user.id, “role”: user.role, “iat”: …, “exp”: …}
异步签名验证:
import asynciofrom jwt import PyJWTasync def async_verify(token: str): loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: jwt.decode(token, SECRET_KEY, algorithms=[“HS256”]) )
七、完整示例:FastAPI 实现
from fastapi import FastAPI, Depends, HTTPException, statusfrom fastapi.security import HTTPBearer, HTTPAuthorizationCredentialsfrom pydantic import BaseModelimport jwtimport datetimefrom datetime import timezonefrom typing import Optionalapp = FastAPI()security = HTTPBearer() 配置SECRET_KEY = “your-secret-key-here” 生产环境应从环境变量获取ALGORITHM = “HS256″ACCESS_TOKEN_EXPIRE = datetime.timedelta(minutes=15)REFRESH_TOKEN_EXPIRE = datetime.timedelta(days=7) 模拟数据库fake_users_db = “johndoe”: “username”: “johndoe”, “password”: “secret123”, “role”: “user” }, “admin”: “username”: “admin”, “password”: “admin123”, “role”: “admin” }}class Token(BaseModel): access_token: str r