django jwt认证 Python中实现JWT认证的完整指南 django登

django jwt认证 Python中实现JWT认证的完整指南 django登

目录
  • 引言
  • 一、JWT 是什么?为什么需要它?
    • 传统 session 与 JWT 对比
  • 二、JWT 的结构解析
    • 三、Python 中实现 JWT
      • 1. 安装 PyJWT 包
      • 2. 生成 JWT
      • 3. 验证 JWT
      • 4. 错误处理大全
    • 四、高质量应用场景
      • 1. 双令牌体系(Access + Refresh)
      • 详细说明表格:
      • 异常处理补充表:
      • 2. 与 FastAPI/Django 集成
    • 五、安全最佳操作
      • 六、性能优化技巧
        • 算法性能比较表(执行时刻,越小越好)
        • 详细对比表格:
      • 七、完整示例:FastAPI 实现
        • 小编归纳一下

          引言

          JSON Web Tokens (JWT) 是现代 Web 开发中广泛使用的身份验证机制。这篇文章小编将用生动的方式带你全面了解 JWT 在 Python 中的实现,包括生成、验证和各种相关技巧,通过丰富的比喻、表格和流程图帮助你彻底掌握 JWT。

          一、JWT 是什么?为什么需要它?

          想象你去游乐园,入园时会得到一个手环。这个手环:

          • 包含信息:显示你的门票类型(VIP/普通)
          • 防伪设计:有独特图案难以伪造
          • 有效期:只在当天有效

          JWT 就是这样的数字手环:

          游乐园手环 JWT 令牌
          门票类型 用户角色
          入园时刻 签发时刻(iat)
          闭园时刻 过期时刻(exp)
          防伪标记 数字签名
          手环材质 加密算法

          传统 session 与 JWT 对比

          特性 Session JWT
          存储位置 服务器 客户端
          扩展性 需要共享session 天然无情形
          跨域支持 需要配置 原生支持
          移动端友好度 需处理cookie 直接使用header
          安全性 依赖cookie安全 依赖token存储方式
          典型场景 传统Web应用 API/移动应用/微服务

          二、JWT 的结构解析

          一个 JWT 看起来像这样:xxxxx.yyyyy.zzzzz

          就像三明治分三层:

          Header(面包上层)

          “alg”: “HS256”, // 签名算法(HMAC SHA256) “typ”: “JWT” // 类型标识}

          Payload(馅料)

          “sub”: “user123”, // 主题(用户ID) “name”: “张三”, “admin”: true, “iat”: 1516239022 // 签发时刻}

          Signature(面包下层+防伪标记)

          HMACSHA256( base64UrlEncode(header) + “.” + base64UrlEncode(payload), 密钥)

          生成流程

          [Header] → base64编码 → xxxxx[Payload] → base64编码 → yyyyy[xxxxx.yyyyy + 密钥] → 签名算法 → zzzzz最终令牌:xxxxx.yyyyy.zzzzz

          三、Python 中实现 JWT

          1. 安装 PyJWT 包

          pip install pyjwt

          2. 生成 JWT

          import jwtimport datetimefrom datetime import timezone 建议从环境变量读取SECRET_KEY = “your_super_secret_key”def generate_jwt(user_id: str, username: str, role: str) -> str: “””生成JWT令牌””” payload = “sub”: user_id, 标准字段:主题 “name”: username, “role”: role, “iat”: datetime.datetime.now(tz=timezone.utc), 签发时刻 “exp”: datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(hours=1) 过期时刻 } return jwt.encode(payload, SECRET_KEY, algorithm=”HS256″)

          参数详解表

          参数 类型 必填 说明 示例
          payload dict 负载数据 “sub”: “user123”}
          key str 签名密钥 “secret”
          algorithm str 签名算法 “HS256”
          headers dict 额外头部 “kid”: “key1”}

          标准声明字段(建议)

          字段 全称 说明 示例
          sub Subject 主题(用户ID) “user123”
          exp Expiration 过期时刻 1735689600
          iat Issued At 签发时刻 1735686000
          aud Audience 接收方 “mobile-app”
          iss Issuer 签发者 “auth-server”

          3. 验证 JWT

          def verify_jwt(token: str) -> dict: “””验证JWT令牌””” try: payload = jwt.decode( token, SECRET_KEY, algorithms=[“HS256″], audience=”your-app”, 验证接收方 issuer=”auth-server” 验证签发方 ) return payload except jwt.PyJWTError as e: print(f”Token验证失败: type(e).__name__}: e}”) return None

          验证流程示意图

          4. 错误处理大全

          from fastapi import HTTPException, statusdef validate_token(token: str): try: return jwt.decode(token, SECRET_KEY, algorithms=[“HS256″]) except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=”Token已过期”, headers=”WWW-Authenticate”: “Bearer”} ) except jwt.InvalidTokenError as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f”无效Token: str(e)}”, headers=”WWW-Authenticate”: “Bearer”} )

          异常类型表

          异常类 触发条件 HTTP情形码 处理建议
          ExpiredSignatureError Token过期 401 提示重新登录
          InvalidSignatureError 签名无效 401 拒绝访问
          InvalidTokenError 通用错误 401 记录日志
          MissingRequiredClaimError 缺少必要声明 400 返回错误详情
          InvalidIssuerError 签发者不匹配 403 审计日志

          四、高质量应用场景

          1. 双令牌体系(Access + Refresh)

          下面内容是基于你的 JWT 认证流程整理的表格表示:

          步骤 流程节点 条件/判断 动作/响应 备注
          1 用户登录 提交账号密码 体系验证凭证
          2 验证结局 验证成功 生成: &8211; access_token(15分钟) &8211; refresh_token(7天)
          验证失败 返回错误信息 HTTP 401
          3 返回响应 返回双token给客户端
          4 API访问 携带access_token 验证token有效性
          5 验证结局 access_token有效 正常返回请求数据 HTTP 200
          access_token无效 检查是否存在refresh_token
          6 刷新检查 存在有效refresh_token 发放新access_token HTTP 200 + 新token
          无有效refresh_token 要求重新登录 HTTP 401

          详细说明表格:

          阶段 条件分支 体系行为 客户端响应 HTTP情形码
          登录阶段
          1.1 凭证正确 生成双token 接收: &8211; access_token &8211; refresh_token 200
          1.2 凭证错误 终止流程 收到错误提示 401
          API访问阶段
          2.1 access_token有效 处理请求 获取正常数据 200
          2.2 access_token过期 检查refresh_token
          Token刷新阶段
          3.1 refresh_token有效 发放新access_token 获取新token 200
          3.2 refresh_token无效 终止流程 要求重新登录 401

          异常处理补充表:

          异常情况 体系处理 客户端表现
          access_token格式错误 直接拒绝请求 收到"无效token"错误
          refresh_token过期 清除客户端存储 跳转登录页面
          连续使用过期refresh_token 标记为安全事件 强制登出所有设备

          实现代码:

          def generate_token_pair(user_id: str): “””生成令牌对””” access_payload = “sub”: user_id, “type”: “access”, “exp”: datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(minutes=15) } refresh_payload = “sub”: user_id, “type”: “refresh”, “exp”: datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(days=7) } access_token = jwt.encode(access_payload, SECRET_KEY, algorithm=”HS256″) refresh_token = jwt.encode(refresh_payload, SECRET_KEY, algorithm=”HS256″) return “access_token”: access_token, “refresh_token”: refresh_token }def refresh_access_token(refresh_token: str): “””使用refresh_token获取新access_token””” try: payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[“HS256”]) if payload.get(“type”) != “refresh”: raise HTTPException(status_code=400, detail=”无效的refresh token类型”) new_payload = “sub”: payload[“sub”], “type”: “access”, “exp”: datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(minutes=15) } return jwt.encode(new_payload, SECRET_KEY, algorithm=”HS256″) except jwt.PyJWTError as e: raise HTTPException(status_code=401, detail=f”refresh token无效: str(e)}”)

          2. 与 FastAPI/Django 集成

          FastAPI 示例

          from fastapi import Depends, FastAPI, HTTPExceptionfrom fastapi.security import HTTPBearer, HTTPAuthorizationCredentialssecurity = HTTPBearer()app = FastAPI()async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): token = credentials.credentials try: payload = jwt.decode(token, SECRET_KEY, algorithms=[“HS256″]) return payload except jwt.PyJWTError: raise HTTPException(status_code=401, detail=”无效或过期的token”)@app.get(“/protected”)async def protected_route(user: dict = Depends(get_current_user)): return “message”: f”无论兄弟们好, user[‘sub’]}!”, “user_data”: user}

          Django 示例

          from django.http import JsonResponsefrom functools import wrapsdef jwt_required(view_func): @wraps(view_func) def wrapper(request, args, kwargs): auth_header = request.headers.get(‘Authorization’) if not auth_header or not auth_header.startswith(‘Bearer ‘): return JsonResponse(“error”: “未提供token”}, status=401) token = auth_header.split(‘ ‘)[1] try: request.user = jwt.decode(token, SECRET_KEY, algorithms=[“HS256”]) return view_func(request, args, kwargs) except jwt.ExpiredSignatureError: return JsonResponse(“error”: “token已过期”}, status=401) except jwt.InvalidTokenError: return JsonResponse(“error”: “无效token”}, status=401) return wrapper@jwt_requireddef protected_view(request): return JsonResponse(“data”: “受保护的内容”, “user”: request.user})

          五、安全最佳操作

          密钥管理

          • 使用环境变量存储密钥

          import osSECRET_KEY = os.getenv(“JWT_SECRET_KEY”, “fallback-secret”)

          • 定期轮换密钥(密钥版本控制)

          keys = “2023”: “old-secret”, “2024”: “current-secret”}

          增强安全措施

          def generate_secure_token(user, ip): “””生成带IP绑定的token””” payload = “sub”: user.id, “ip”: ip, 绑定客户端IP “jti”: str(uuid.uuid4()) 唯一标识防止重放 } return jwt.encode(payload, SECRET_KEY, algorithm=”HS256″)def verify_secure_token(token, ip): payload = jwt.decode(token, SECRET_KEY, algorithms=[“HS256”]) if payload.get(“ip”) != ip: raise ValueError(“IP地址不匹配”) return payload

          黑名单实现

          from redis import Redisredis = Redis(host=’localhost’, port=6379)def revoke_token(jti: str, expire_in: int): “””将token加入黑名单””” redis.setex(f”blacklist:jti}”, expire_in, “revoked”)def is_revoked(jti: str) -> bool: “””检查是否在黑名单””” return bool(redis.exists(f”blacklist:jti}”))

          六、性能优化技巧

          选择更快的算法

          算法性能比较表(执行时刻,越小越好)

          算法类型 性能指标(单位:ms) 备注
          HS256 1.2 对称加密算法
          RS256 3.8 RSA 非对称加密
          ES256 4.1 ECDSA 非对称加密

          详细对比表格:

          特性对比 HS256 RS256 ES256
          算法类型 对称加密 非对称加密 非对称加密
          密钥管理 共享密钥 公钥/私钥 公钥/私钥
          签名速度 &x26a1;&xfe0f; 1.2ms &x1f422; 3.8ms &x1f422; 4.1ms
          验证速度 &x26a1;&xfe0f; 最快 &x1f422; 慢 &x1f422; 最慢
          安全性 中等 最高
          适用场景 内部服务 公开API 金融级应用

          减少payload大致

          不推荐 – payload过大payload = user.__dict__, “iat”: …, “exp”: …} 推荐 – 只存储必要信息payload = “sub”: user.id, “role”: user.role, “iat”: …, “exp”: …}

          异步签名验证

          import asynciofrom jwt import PyJWTasync def async_verify(token: str): loop = asyncio.get_event_loop() return await loop.run_in_executor( None, lambda: jwt.decode(token, SECRET_KEY, algorithms=[“HS256”]) )

          七、完整示例:FastAPI 实现

          from fastapi import FastAPI, Depends, HTTPException, statusfrom fastapi.security import HTTPBearer, HTTPAuthorizationCredentialsfrom pydantic import BaseModelimport jwtimport datetimefrom datetime import timezonefrom typing import Optionalapp = FastAPI()security = HTTPBearer() 配置SECRET_KEY = “your-secret-key-here” 生产环境应从环境变量获取ALGORITHM = “HS256″ACCESS_TOKEN_EXPIRE = datetime.timedelta(minutes=15)REFRESH_TOKEN_EXPIRE = datetime.timedelta(days=7) 模拟数据库fake_users_db = “johndoe”: “username”: “johndoe”, “password”: “secret123”, “role”: “user” }, “admin”: “username”: “admin”, “password”: “admin123”, “role”: “admin” }}class Token(BaseModel): access_token: str r