JWT模块
在djangorestframework
中,有一款扩展模块可用于做JWT
认证,使用如下命令进行安装:
pip install djangorestframework-jwt
现在,就让我们开始使用它吧。
JWT配置
该模块的所有配置都会从settings.py
中进行读取,与drf
一样,它会先去读取项目全局文件夹下的settings.py
,再去读取自身的settings.py
,所以如果我们要对JWT
进行配置,则在项目全局文件夹下的settings.py
中进行配置即可:
import datetime
JWT_AUTH = {
# 配置过期时间
'JWT_EXPIRATION_DELTA': datetime.timedelta(days=7),# 配置请求头中携带token的前缀
'JWT_AUTH_HEADER_PREFIX': 'JWT',}
如果你想了解更多配置,则可查看该模块读取的默认配置文件。
from rest_framework_jwt import settings
在默认配置文件中,你可以看到如下代码,它会先去全局中找配置,再到局部中找配置:
USER_SETTINGS = getattr(settings,'JWT_AUTH',None)
auth组件
下面将介绍如何使用auth
组件与JWT
配套使用,这当然非常方便,auth
组件可以说是Django
的核心。
我打算这样做,对内置的user
表做扩展,添加头像字段,只有用户登录后才能修改头像,否则将会采用默认头像。
准备工作
首先我们需要对内置的auth_user
表做扩展,如下所示:
from django.db import models
from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
avatar = models.FileField(upload_to="avatar",default="avatar/default.png")
其次配置上传文件的路径,声明media
所在的位置,以及声明我们对内置的auth_user
表做了扩展:
MEDIA_ROOT = BASE_DIR / "media"
AUTH_USER_MODEL = "app01.User"
# python manage.py makemigrations
# python manage.py migrate
最后打开资源暴露接口:
from django.contrib import admin
from django.urls import path,re_path
from django.views.static import serve
from django.conf import settings
urlpatterns = [
path('admin/',admin.site.urls),re_path(r"^media/(?P<path>.*)",serve,{"document_root": settings.MEDIA_ROOT}),]
注册API
现在,我们需要来做一个注册的API
接口,如下所示:
class Register(ViewSet):
def register(self,request,*args,**kwargs):
serializer = UserModelSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response(data=serializer.data,status=status.HTTP_201_CREATED)
return Response(data=serializer.errors,status=status.HTTP_401_UNAUTHORIZED)
我们可以规定register
这个方法必须是POST
请求才能访问,在url
中进行配置(ViewSet
是ViewSetMixin
的子类,所以有actions
参数):
path('register/',views.Register.as_view(actions={"post":"register"})),
由于auth_user
的密码需要密文,所以我们重写了模型序列化器的create
方法。
from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from app01 import models
class UserModelSerializer(serializers.ModelSerializer):
re_password = serializers.CharField(required=True,write_only=True)
# 数据表中不存在该字段,我们自己写一个
class Meta:
model = models.User
fields = ("username","password","re_password","email")
extra_kwargs = {
"password":{"write_only":True}
}
def create(self,validated_data):
password = validated_data.get("password")
re_password = validated_data.get("re_password")
email = validated_data.get("email")
if re_password != password:
raise ValidationError("两次密码输入不一致")
if models.User.objects.filter(email=email):
raise ValidationError("邮箱已被注册")
validated_data.pop("re_password") # 删除即可,然后写入
user_obj = models.User.objects.create_user(**validated_data) # 加密创建
return user_obj
签发token
下面将实现登录接口,如果你使用了auth
组件作为扩展那么登录接口将十分的简单。
JWT
模块已经全部帮你完成了,你只需要向下面这么做:
from rest_framework_jwt.views import obtain_jwt_token # 导入视图,它都写好了的,并且会做验证
from rest_framework_jwt.views import ObtainJSONWebToken # 上面是一个变量,内部实际上是 obtain_jwt_token=ObtainJSONWebToken.as_view()
urlpatterns = [
path('login/',obtain_jwt_token),# path('login/',ObtainJSONWebToken.as_view()),]
现在,当我们向该接口发送POST
请求时,如果校验全部通过,则会发送给我们一个JWT
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjozLCJ1c2VybmFtZSI6Inl1bnlhIiwiZXhwIjoxNjA0NTYyMzcyLCJlbWFpbCI6IjIzMjNAcXEuY29tIn0._SmZ0e0mj5QVOKUftAwI3xBX4_BOw1ZNjAi94_U3mXg
JWT认证
下面我们来实现修改头像,头像必须先登录才能修改,所以要添加JWT
认证。
from rest_framework.permissions import IsAuthenticated # 导入权限
from rest_framework_jwt.authentication import JSONWebTokenAuthentication # 导入认证
class SetAvatar(ViewSet):
authentication_classes = [JSONWebTokenAuthentication] # 存储到request.user,如果只配置这个,则不登陆也能访问
permission_classes = [IsAuthenticated] # 必须已经登陆,即request.user不能是匿名用户
def set_avatar(self,**kwargs):
serializer = UserSetAvatar(instance=request.user,data=request.FILES)
if serializer.is_valid():
serializer.save()
return Response(data="修改成功",status=status.HTTP_205_RESET_CONTENT)
return Response(data="修改失败",status=status.HTTP_401_UNAUTHORIZED)
序列类如下:
class UserSetAvatar(serializers.ModelSerializer):
class Meta:
model = models.User
fields = ("avatar",)
extra_kwargs = {
"avatar":{"write_only":True},}
url
配置:
path('setavatar/',views.SetAvatar.as_view(actions={"post":"set_avatar"})),
现在,我们使用POSTMAN
来发送请求,首先要先登录,获得JWT
:
然后需要在请求头中添加JWT
认证,并且在body
体中添加新头像:
需要注意的是在添加JWT
认证时,需要在VALUE
处添加前缀JWT 随机字符串
,以这样的格式提交,这是因为settings.py
中设置了前缀。
全局使用
要在全局使用JWT
认证,方式如下,它将作用于所有视图:
REST_FRAMEWORK = {
# 认证模块
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework_jwt.authentication.JSONWebTokenAuthentication',),'DEFAULT_PERMISSION_CLASSES':{
'rest_framework.permissions.IsAuthenticated',}
}
authentication_classes = []
permission_classes = []
局部使用参见JWT
认证中的书写。
JWT认证通过返回信息定制
在上面的示例中,我们可以看见在用户登录之后,返回信息只有一个JWT
字符串,那么我们可不可以将已登录的用户名字返回呢?也是可以的。
jwt_response_payload_handler()
这个函数就是控制返回格式的,我们可以覆写它然后在settings.py
中进行配置。
如下所示:
def jwt_response_payload_handler(token,user=None,request=None):
return {
'status': 0,'msg': 'ok','data': {
'token': token,'user': UserModelSerializers(user).data
}
}
在settings.py
中进行配置,该项配置是配置在REST_FRAMEWORK
中,而不是JWT_AUTH
中,一定要注意:
REST_FRAMEWORK = {
# 配置自定义登录成功后的返回信息
'JWT_RESPONSE_PAYLOAD_HANDLER':"utils.jwt_response_payload_handler",}
最后登录完成的结果如下:
{
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjozLCJ1c2VybmFtZSI6Inl1bnlhIiwiZXhwIjoxNjA0NTY0Njk3LCJlbWFpbCI6IjIzMjNAcXEuY29tIn0.cvmM6LvoVkSQETybss3fVVGZNXT099o8U21tzDvdFe4","username": "yunya"
}
JWT认证流程源码阅读
又开始愉快的读源码环节了,那么JWT
的源码还是比较简单的,下面一起看一看。
签发流程
首先,我们来分析一下为什么只写了下面一个登录接口,甚至都没写视图,就可以完成签发。
from rest_framework_jwt.views import obtain_jwt_token # 导入视图,它都写好了的,并且会做验证
from rest_framework_jwt.views import ObtainJSONWebToken # 上面是一个变量,内部实际上是 obtain_jwt_token=ObtainJSONWebToken.as_view()
urlpatterns = [
path('login/',]
先看obtain_jwt_token
,可以发现这个代码:
obtain_jwt_token = ObtainJSONWebToken.as_view()
我们发现了一个ObtainJSONWebToken
这个类,它会执行as_view()
方法,先不管,看看它继承了谁:
@H_803_301@
它继承了JSONWebTokenAPIView
,并且该类又继承了APIView
,那么这个APIView
的视图的源码已经阅读过不下五次了,可以查看之前的文章。我们直接来看关于登录的认证,JSONWebTOkenAPIView
中实现了一个post
方法:
def post(self,**kwargs):
serializer = self.get_serializer(data=request.data) # 可以发现它有一个自带的序列化器
if serializer.is_valid(): # 直接进行验证
user = serializer.object.get('user') or request.user
token = serializer.object.get('token')
response_data = jwt_response_payload_handler(token,user,request)
response = Response(response_data)
if api_settings.JWT_AUTH_COOKIE:
expiration = (datetime.utcnow() +
api_settings.JWT_EXPIRATION_DELTA)
response.set_cookie(api_settings.JWT_AUTH_COOKIE,token,expires=expiration,httponly=True)
return response
return Response(serializer.errors,status=status.HTTP_400_BAD_REQUEST)
现在看到序列化器对吧,序列化器其实在这里:
class ObtainJSONWebToken(JSONWebTokenAPIView):
serializer_class = JSONWebTokenSerializer
这个是序列化器的源码,在__init__
方法中实现了字段:
class JSONWebTokenSerializer(Serializer):
def __init__(self,**kwargs):
super(JSONWebTokenSerializer,self).__init__(*args,**kwargs)
self.fields[self.username_field] = serializers.CharField() # username字段
self.fields['password'] = PasswordField(write_only=True) # password字段
@property
def username_field(self):
return get_username_field()
def validate(self,attrs):
credentials = {
self.username_field: attrs.get(self.username_field),'password': attrs.get('password')
}
if all(credentials.values()):
user = authenticate(**credentials) # 这里是执行认证。
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
payload = jwt_payload_handler(user) # 拿到荷载信息
return {
'token': jwt_encode_handler(payload),# 荷载信息放进去,来生成JWT的token字符串
'user': user
}
else:
msg = _('Unable to log in with provided credentials.')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "{username_field}" and "password".')
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
也就是说,在执行JSONWebTOkenAPIView
的post()
方法时,会走一次数据库查询,根据提交的用户名和密码来拿到用户对象。并且在序列化器中,会生成token()
信息。他们都是配置好的一些函数:
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
jwt_decode_handler = api_settings.JWT_DECODE_HANDLER
jwt_get_username_from_payload = api_settings.JWT_PAYLOAD_GET_USERNAME_HANDLER
(我们可不可以自己写一个验证类来覆盖它,然后用于多端登录呢?手机号,邮箱等都可以登录)
我们继续来看post()
方法:
def post(self,**kwargs):
serializer = self.get_serializer(data=request.data) # 可以发现它有一个自带的序列化器
if serializer.is_valid(): # 直接进行验证
user = serializer.object.get('user') or request.user # request.user等于已经登录的用户
token = serializer.object.get('token') # token等于生成的jwt随机字符串
response_data = jwt_response_payload_handler(token,request) # 设置返回信息
response = Response(response_data)
if api_settings.JWT_AUTH_COOKIE: # 未设置,不走
expiration = (datetime.utcnow() +
api_settings.JWT_EXPIRATION_DELTA)
response.set_cookie(api_settings.JWT_AUTH_COOKIE,httponly=True)
return response # 直接返回
return Response(serializer.errors,status=status.HTTP_400_BAD_REQUEST) # 没验证通过
OK
,自动签发的流程已经走完了。
大概的缕一缕,它自己有序列化器,并且在序列化器中完成了JWT
字符串的拼接,最后进行返回。
验证流程
现在我们再来看一下,当用户登录后,再次访问的验证流程,最开始肯定走认证:
authentication_classes = [JSONWebTokenAuthentication]
我们都知道,在APIView
中的dispatch()
方法中的initial()
方法中,会有下面这三条代码:
self.perform_authentication(request)
self.check_permissions(request)
self.check_throttles(request)
# 详细的这三个代码的执行步骤,尤其是认证,可以查看之前的文章
也就是先走认证,走认证时会统一执行一个叫做authenticators()
的方法。我们直接找JSONWebTokenAuthentication
中的authenticators()
方法即可。
这个类没有authenticators()
这个方法,所以它继承类谁呢?
class JSONWebTokenAuthentication(BaseJSONWebTokenAuthentication):
所以我们要找的其实是BaseJSONWebTokenAuthentication
中的authenticators()
方法。
终于,找到了:
def authenticate(self,request):
jwt_value = self.get_jwt_value(request) # 获取jwt字符串,进行解析。也就是[ JWT 字符串 ],这个字符串的内容,排除前缀,感兴趣可以看看
if jwt_value is None:
return None # 如果没有JWT验证字符串,则返回None
try: # 一系列的异常捕获,均来自于内置的jwt模块
payload = jwt_decode_handler(jwt_value) # 解码荷载部分
except jwt.ExpiredSignature:
msg = _('Signature has expired.') # 签证已过期
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.') # 解码签证时出错
raise exceptions.AuthenticationFailed(msg)
except jwt.InvalidTokenError:
raise exceptions.AuthenticationFailed() # 无效令牌
user = self.authenticate_credentials(payload) # 如果都没出错,则执行这里
return (user,jwt_value) # 返回user对象,这个会赋值给reque.user,这个会jwt字符串会赋值给request.auth
=================Request.user中关于权限认证的地方
for authenticator in self.authenticators:
try:
user_auth_tuple = authenticator.authenticate(self) # 这里会返回一个user
except exceptions.APIException:
self._not_authenticated()
raise
if user_auth_tuple is not None:
self._authenticator = authenticator
self.user,self.auth = user_auth_tuple # 进行赋值 self.user=user ,self.auth = jwt_value
return
我们来看看user
是怎么弄出来的。它是在BaseJSONWebTokenAuthentication
类中定义的:
def authenticate_credentials(self,payload):
User = get_user_model() # 返回模型!!不是记录对象 ,通过settings.py中的AUTH_USER_MODEL进行获取
username = jwt_get_username_from_payload(payload) # 获取payload中的用户名
if not username: # 如果没有用户名就抛出异常
msg = _('Invalid payload.')
raise exceptions.AuthenticationFailed(msg)
try: # 根据荷载中的用户名,在模型User中试图获取出用户的记录对象,也就是说这里会走数据库查询。
user = User.objects.get_by_natural_key(username)
except User.DoesNotExist: # 如果User这个模型不存在则抛出异常,说明
msg = _('Invalid signature.')
raise exceptions.AuthenticationFailed(msg)
if not user.is_active: # 判断账户是否被禁用
msg = _('User account is disabled.')
raise exceptions.AuthenticationFailed(msg)
return user # 返回user
不用auth组件
看完了上面的源码分析后,我们再来想一想,如果不用auth
模块该怎么办?
其实也很简单,我们自己造一个jwt
然后将token
这个随机字符串返回就好了。
认证的时候我们重写一下认证类就好了,反正都是那一套逻辑。
手动签发
我们的表中要有username
以及password
字段,使用JWT
模块中生成token
的几个函数,自己造就一个token
。
# views.py
from rest_framework_jwt.settings import api_settings
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
from users.models import User
class LoginView(APIView): # 登录的时候签发token
authentication_classes = []
def post(self,request):
username=request.data.get('username')
password=request.data.get('password')
user=User.objects.filter(username=username,password=password).first()
if user: # 能查到,登陆成功,手动签发
payload = jwt_payload_handler(user)
token = jwt_encode_handler(payload)
return CommonResponse('100','登陆成功',data={'token':token})
else:
return CommonResponse('101','登陆失败')
如果你想实现多端登录,手机、用户名、邮箱等都能登录,这里也有一份代码,只不过配合了序列化器使用,比较复杂:
# 使用用户名,手机号,邮箱,都可以登录#
# 前端需要传的数据格式
{
"username":"lqz/1332323223/33@qq.com",# 用户名、或者手机号、或者邮箱
"password":"lqz12345"
}
# 视图
from rest_framework.views import APIView
from rest_framework.viewsets import ViewSetMixin,ViewSet
from app02 import ser
class Login2View(ViewSet): # 跟上面完全一样
def login(self,**kwargs):
# 1 需要 有个序列化的类
login_ser = ser.LoginModelSerializer(data=request.data,context={'request':request}) # context参数类似与管道,与序列化器进行连接
# 2 生成序列化类对象
# 3 调用序列号对象的is_validad
login_ser.is_valid(raise_exception=True)
token=login_ser.context.get('token') # 从管道中取出token并返回
# 4 return
return Response({'status':100,'msg':'登录成功','token':token,'username':login_ser.context.get('username')})
# 序列化类
from rest_framework import serializers
from api import models
import re
from rest_framework.exceptions import ValidationError
from rest_framework_jwt.utils import jwt_encode_handler,jwt_payload_handler
class LoginModelSerializer(serializers.ModelSerializer):
username=serializers.CharField() # 重新覆盖username字段,数据中它是unique,post,认为你保存数据,自己有校验没过
class Meta:
model=models.User
fields=['username','password']
def validate(self,attrs):
print(self.context)
# 在这写逻辑
username=attrs.get('username') # 用户名有三种方式
password=attrs.get('password')
# 通过判断,username数据不同,查询字段不一样
# 正则匹配,如果是手机号
if re.match('^1[3-9][0-9]{9}$',username):
user=models.User.objects.filter(mobile=username).first()
elif re.match('^.+@.+$',username):# 邮箱
user=models.User.objects.filter(email=username).first()
else:
user=models.User.objects.filter(username=username).first()
if user: # 存在用户
# 校验密码,因为是密文,要用check_password
if user.check_password(password):
# 签发token
payload = jwt_payload_handler(user) # 把user传入,得到payload
token = jwt_encode_handler(payload) # 把payload传入,得到token
self.context['token']=token
self.context['username']=user.username
return attrs
else:
raise ValidationError('密码错误')
else:
raise ValidationError('用户不存在')
JWT验证
验证的时候,继承BaseAuthentication
类,重写一下验证方法:
# app_auth.py
from users.models import User
class MyJSONWebTokenAuthentication(BaseAuthentication):
def authenticate(self,request):
jwt_value = get_authorization_header(request)
if not jwt_value:
raise AuthenticationFailed('Authorization 字段是必须的')
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
raise AuthenticationFailed('签名过期')
except jwt.InvalidTokenError:
raise AuthenticationFailed('非法用户')
username = jwt_get_username_from_payload(payload)
print(username)
user = User.objects.filter(username=username).first()
print(user)
return user,jwt_value
然后你的某一个视图必须登录后才可以访问,把他添加到认证中就OK
了。
from users.app_auth import JSONWebTokenAuthentication,MyJSONWebTokenAuthentication
class OrderView(APIView):
# authentication_classes = [JSONWebTokenAuthentication] # 不用默认的了,用我们自己的
authentication_classes = [MyJSONWebTokenAuthentication] # 由于不是auth_user表,所以不需要判断是否为匿名用户。
def get(self,request):
print(request.user)
return CommonResponse('100','成功',{'数据':'测试'})
最后我想说
1.注意前缀,JWT
开头,一个空格,后面是JWT
的token
字符串
2.如果你不用auth
组件,则需要手动生成JWT
的token
字符串与手动进行校验。这很麻烦,所幸JWT
这个模块给我们很多方便之处。
3.多阅读源码,有好处的