一、项目总结三步骤
- 项目生命周期为基准线、分析要有层次感、不要想到什么说什么。
- 这条基准线上,负责的是哪一块,做了什么。
- 举例说明项目中遇到的问题及怎么解决的。
二、项目需求分析
管理员
1 注册
2 登录
3 上传视频
4 删除视频
5 发布公告
用户
1 注册
2 登录
3 冲会员
4 查看视频
5 下载免费视频
6 下载收费视频
7 查看观影记录
8 查看公告
三、搭建框架
层级结构:客户端 服务端 数据库
客户端:
基于tcp连接的套接字程序
管理员视图
注册、登录、上传视频、删除视频、发布公告
用户视图
注册、登录、购买vip、查看视频、下载免费视频、下载收费视频、查看下载记录、查看公告
服务端:
tcpserver:基于多线程实现并发的套接字通信 解决粘包问题
interface:admin_interface、user_interface、common_interface
models类和ORM框架:models类中的四张表继承ORM框架中的基类model
数据库:
创建四张表:user、movie、notice、download_record
四、ORM框架分析
# 优点:让一个不懂数据库操作的小白也能够简单快速操作数据库实现相应功能
# 缺点:sql封装固定,不利于SQL查询优化
# 对象关系映射
# 类 >>> 数据库的表
# 对象 >>> 表的一条条的记录
# 对象获取属性或方法 >>> 记录的字段对应的值
# 一张表有字段,字段又有字段名,字段类型,字段是否是主键,字段的默认值
class Field(object):
pass
# 为了在定义的时候更加方便 通过继承Field定义具体的字段类型
class StringField(Field):
pass
class IntegerField(Field):
pass
class Models(dict):
pass
def __getattr__(self,item):
return self.get(item)
def __setattr__(self,key,value)
self[key] = value
# 查询
def select(self,**kwargs):
# select * from userinfo
# select * from userinfo where id = 1
# 新增
def save(self):
# insert into userinfo(name,password) values('jason','123')
# 修改:是基于已经存在了的数据进行修改操作
def update(self):
# update userinfo set name='jason',password='234' where id = 1
"""
(******)
hasattr
getattr
setattr
"""
# 元类拦截类的创建过程 使它具备表的特性
class ModelsMetaClass(type):
def __new__(cls,class_name,class_bases,class_attrs):
# 只拦截模型表的创建表
if class_name == 'Models':
return type.__new__(cls,calss_bases,class_attrs)
table_name = class_attrs.get('table_name',class_name)
primary_key = None
mappings = {}
for k,v in class_attrs.items():
if isinstance(v,Field):
mappings[k] = v
if v.primary:
if primary_key:
raise TypeError('主键重复')
primary_key = v.name
for k in mappings.keys():
class_attrs.pop(k)
if not primary_key:
raise TypeError('必须要有一个主键')
class_attrs['table_name'] = table_name
class_attrs['primary_key'] = primary_key
class_attrs['mappings'] = mappings
return type.__new__(cls,class_attrs)
五、数据库设计
表结构定义好以后,数据怎么操作,逻辑代码怎么实现就清晰了
用户表:包含管理员和普通用户信息 user--->id,name,password,is_locked,is_vip,user_type,register_time
电影表:movie---->id,path,is_free,is_delete,user_id,create_time,file_md5
公告表:notice--->id,title,content,user_id
下载记录表:download_record---->id,movie_id,create_time
create table user(
id int auto_increment primary key,name varchar(255),password varchar(255),is_locked int not null default 0,is_vip int not null default 0,user_type varchar(255),register_time varchar(255)
)engine=Innodb charset='utf8';
create table movie(
id int auto_increment primary key,name varchar(64),path varchar(255),is_free int not null default 1,is_delete int not null default 0,create_time varchar(255),file_md5 varchar(255),user_id int
)engine=Innodb charset='utf8';
create table notice(
id int auto_increment primary key,title varchar(255),content varchar(255),user_id int,create_time varchar(255)
)engine=Innodb charset='utf8';
create table download_record(
id int auto_increment primary key,movie_id int,create_time varchar(255)
)engine=Innodb charset='utf8';
六、项目中各个功能模块分析
管理员:
1、注册功能
客户端
1-1、选择每个功能之前都需要都需要需要连接上服务器,即需要一个socket对象,每个函数传一个client
1-2、密码在传递过程中不能是明文吧,需要加密,可选择hashlib中md5加密,定义一个公共方法咯
1-3、定义一个发送和接收的公共方法、这里要注意的是在这个方法有一个关键字形参、用于传输文件,默认为None
1-4、考虑一个问题,发送的字典中要包含哪些数据、对于注册这个问题,包含服务器端用于标识的type的功能类型、
用户名、密码(要加密)、还有用户类型"user_type"(admin或者user)这里是admin类型
1-5、接收得到的字典back_dic又包含那些数据,常见的就flag和msg,后续的功能中有返回列表类型的
服务端
1-6、首先就是基于多线程实现并发的套接字程序,子线程working函数中会先接收到客户端发来的字典(用到json、struct模块)
1-7、有个问题是有什么便利的方法将接收到的字典recv_dic 和与客户端建立连接的socket对象conn 交给接口层中相应的功能进
行操作数据库,那就定义一个分发函数dispatch(recv_dic,conn),然后判断recv_dic["type]类型和全局func_dic字典中进行
比对,去执行与之对应的函数,如果传过来的类型不存在func_dic字典中,那就自定义一个字典back_dic(包含flag和msg数据)
调用服务端公共发送数据方法返回给客户端
1-8、咱们不知不觉就来到了服务端注册接口了,意味着可以操作数据库啦,就需要用到ORM框架和db目录中models模块中与表一一对应
的类、这四个类都是根据事先在数据库中定义好的字段进行创建的,不要写错了,字段和类型。这四个类都继承了ORM框架的基类
modle,所以可是直接点就可以调用ORM框架中基类中方法,select方法是类方法,得到的是一个列表套对象,还有save方法,用于保存
,还有一个update方法用于更新,那咱们回过头来
1-9、注册功能拿到的recv_dic中可以拿到注册的用户名,得到用户名后使用user_data = models.User.select(name=name )进行判断要注册的
用户是否存在,若果存在老规矩back_dic(flag为False,msg为注册失败)返回去,不存在那咋整,还能咋整保存到数据库user表中呗,那
怎么保存呀,name,password,user_type,is_locked和is_vip都有默认值,register_time注册时间的话写个方法 time.strftime("%Y-%m-%d %X")
这样不就全搞定了,什么数据都拿到了,那就用models.User()把这些数据搞进去创建得到一个对象,对象调用save方法进行方法就ojbk了,不急还有
要记得通知客户端,老规矩back_dic字典,调用公共发送方法,注册大功告成
登录
客户端
2-1、在注册功能该项目的总体框架都已经打通了任督二脉,我的乖乖,那登录功能需要考虑一个问题,客户端如果登陆成功,是不是需要标记一下登陆状态
,老规矩在全局定义一个字典,把返回的字典中一个session存到全局字典cookie中,解决了ojbk,
2-2、发送字典send_dic中type类型修改为login,密码的话照样发送密文,然后over了
服务器
2-3、还记得tcpserver模块中的全局func_dic字典吗?强大的地方来了,刚刚只是写了一个注册的映射接口,现在来了一个login类型,那咋整,就往里加一个
login的映射方法,还可以直接拿到recv_dic和conn,任督二脉打通了就是强,哦还有注册和登录都是管理员和普通用户的公共方法,所以放到common_interface
中,其实放哪都一样只要能找到就行啦 哈哈
2-4、你要登陆,逻辑点在哪里,首先我要判断你这货存不存在呀,不存在登陆个屁呀,淡定淡定,哈哈,上面说过select方法得到的是列表,别给老子忘了,列表里面
放的是一个个对象,models中User类调用select方法根据name=recv_dic["name"]得到user_list,如果user_list存在,那就取零号位就拿到user_obj用户对象
2-5、拿到user_obj对象点表中的字段属性判断其类型和接收的recv_dic字典中类型和密码是否一致,一致的话便可以得到一个back_dic字典了,老规矩包含flag和msg
2-6、重点来了,这里可能有带你绕,请无关人员速速离开,要返回的back_dic字典中需要添加一个session添加到字典中,这个session是用户登陆成功之后生成的一个
随机字符串,咱这里也是用hashlib,这里要保证生成的字符串是唯一的,这里需要加盐,加一个当前cpu执行代码的时间 time.clock()
2-7、,服务端怎么校验用户的登陆问题,考虑两个问题,第一个问题服务端需要保存session,第二个问题当用户退出之后将该用户对应的键值删除?
那我们如何判断用户走了,运行到哪一段代码就标记用户走了呢,我们可不可以通过addr就可以定位哪一个用户断开了,找到当前用对应的数据删除,数据保存形式
{‘addr’:[session,user_id]} 将这个东西存在哪里呢,可以放在全局,但我们这里把他存到Tcpsever目录下user_data模块中live_user['addr’']=[session,user_id]
那问题又来,怎么拿到add,第一种思路给每一个函数都添加addr参数,但是这个addr参数只是login函数用到,其他函数都没用到,这样第一种思路很不合理,第二种思路
可以通过working中接收到的recv_dic字典添加recv_dic["addr"] = str(addr) 再传给每一个函数,在login函数中user_data.live_user[recv_dic["addr"]] = [session,user_obj.id]
有考虑一个问题,因为多线程要操作公共数据user_data中的live_user字典,就会出现数据错乱,所以要加锁,那这个锁在那里产生呢?我们要在tcpsever全局中产生mutex = Lock()
在这里产生,但是不能在这里用,因为会出现循环导入问题,tcpserver导入common_interface,在common_interface中又用到tcpserver中的锁,相互导入就出现循环导入,解决办法,
将锁保存到user_data中 user_data.mutex = mutex,在login中给user_data.live_user[recv_dic["addr"]] = [session,user_obj.id]加锁,直接导入user_data就可以使用到锁啦
还没完在tcpserver中 用户退出(try...except.(下面的执行的代码就表示其中一个线程断开)..)就要删除user_data.live_user.pop(str(addr)) ,这里也是公共方法需要
加锁user_data.mutex.acquire()和user_data.mutex.release()
2-8、下面的功能都需要先登录才能操作,这里来个装饰器功能:校验客户端发过来的随机字符串,如果有这个随机字符串那就正常执行函数,如果没有返回请先登录的提示,意味着客户端
发送的字典要带着session过来,装饰器inner(*args,**kwargs)中args=(recv_dic,conn) kwargs={} 拿到客户端发过来的随机字符串与服务器的数据进行比对 vlues=[session,user_id]
for vlues in user_data.live_user.vlues(): if args[0].get("session") == v[0]:将对应的user_id放入recv_dic中,以便后续使用args[0]["user_id"]=vlues[1] break
以上for循环不一定能找到,for循环只是单单的判断session,然后将user_id放到接收字典recv_dic中,那被装饰的函数到底执不执行,if args[0].get("user_id"): func(*args,**kwargs)
else: back_dic ={"flag"False,"msg":"请先登录"} 然后调用返回函数send_back(back_dic,args[1])
3、上传视频
客户端
3-1、查看有哪些影片需要上传的,即获取所有视频
3-2、判断影片是否存在才能上传,那应该怎么判断是个问题,我们能不能对上传的视频文件进行hashlib,自定义被hash的数据可以在文件开头,1/3,2/3,末尾-10然后得到md5值
发送字典类型"check_movie",包含"session","file_md5",得到字典back_dic,如果视频不存在那要输入is_free,是否免费,然后在发字典send_dic,该字典类型为"upload_movie",还包含
"session"、"file_name"、 "file_size"、"file_md5",这里调用公共收发方法是要给文件file传参了,把上传文件路径传过去
服务端
3-3、还记得tcpserver模块中的全局func_dic字典吗?加上"check_movie"和"upload_movie"映射,映射函数全都加上装饰器
3-4、"check_movie"比较简单,只是查看要上传视频的file_md5是否在数据库,注意数据库中存的只是文件地址而已,不是真实的视频文件
3-5、这里为了避免上传的视频名字是一样的但是内容不一样,所以文件名应该尽量取的唯一,所以给传来的file_name加上一个随机字符串,就直接调用之前定义的 get_session方法即可
3-6、这里要拼接文件的存放路径了,根据file_size循环写入文件
3-7、生成一个 movie_obj 电影对象,调用save方法保存,然后返回back_dic说明上传成功
4、删除视频
客户端
4-1、先查询出所有没有被删除的电影列表,即send_dic字典中"type"为'get_movie_list' 和'movie_type'为"all",返回的电影列表可以全部是收费,全部是免费,收费免费都有,这里需要注意的是获取所有视频列表考
虑的不周全,如果单从管理员角度要获得所有视频不考虑用户获取收费或者免费的视频,会出现一些代码冗余,所以在获取所有视频这个功能要判断传过来的的movie_type是all、free、charge
4-2、拿到所有视频列表movie_list,该列表的格式[电影名称,是否免费收费,电影id]发送字典send_dic中"type"为"delete_movie"和delete_movie_id'为movie_list[choice-1][2]
服务端
4-3、还记得tcpserver模块中的全局func_dic字典吗?加上'get_movie_list'和"delete_movie"映射,映射函数全都加上装饰器
4-4、删除电影不是真的删除,只是找到每一个电影对象,然后点is_delete属性改为1即可,所以get_movie_list方法会先获得所有对象列表,遍历列表得到每一个对象,对每一个对象的is_delete属性进行判断,注意还要判断
ecv_dic['movie_type'],这里是“all”类型,满足的全部添加到一个返回的列表中back_movie_list,然后返回给客户端
4-5、delete_movie方法的话 movie_list = models.Movie.select(id=recv_dic.get("delete_movie_id"))然后对列表去索引得到一个电影对象,然后修改movie_obj.is_delete,然后调用update()方法更新,然后返回back_cic
5、发布公告
客户端
5-1 公告包含title和content 发送的字典send_dic包含"type"为"release_notice"、"session"、"title"、"content"
服务端、
5-2、这里需要知道接受的字典recv_dic是包含user_id字段的,要写入表notice时用到
5-3、也是创建表notice对象,然后调用save方法保存
普通用户
1、注册
直接调用公共注册方法
2、登录
直接调用公共登录,在全局添加user_dic中保存session和is_vip
3、购买会员
客户端
3-1、判断全局user_dic['is_vip']可知道是否是会员
3-2、如果不是的话,让用户选择是否购买会员,购买的话最后要修改全局
服务端
3-3、根据recv_dic["user_id"]判断是哪一个用户要购买会员,得到的对象点is_vip属性修改为1,调用update(0方法保存
4、查看所有视频
客户端
4-1、发送字典send_dic里面的type为'get_movie_list','movie_type为'all'
服务器
4-2、直接调用之前写好的get_movie_list方法即可 这和管理员中删除视频就先获取所有视频
5、下载免费电影
客户端
5-1、先列出所有免费电影,和上个功能差不多,只是'movie_type'改为'free'
5-2、再发送字典send_dic中'type'为'download_movie' 'movie_id'为movie_list[choice-1][2]
5-3、接受得到的字典back_dic中有一个wait_time 打印可能是0或者30秒 拼接下载的路径,循环写入文件
服务端
5-4、id=recv_dic.get('movie_id')来得到电影列表movie_list,然后索引取值得到电影对象
5-5 id=recv_dic['user_id']来得到用户列表索引取得用户对象user_obj
5-6、下载电影的话先判断使用是否是vip,vip的话不需要等待30秒 不是的话需要等待30秒
5-7、更新下载记录到down_record表中
5-8、循环发送文件
5-9、发送字典back_dic
6、下载收费电影
客户端
6-1、针对普通用户和vip用户下载收费视频收费标准不一样(5元 10元)
6-2、发送字典send_dic 中还是'get_movie_list'但是电影类型为收费'movie_type':'charge'
6-3、剩下功能和下载免费电影差不多
服务器
同上
7、查看下载记录
客户端
7-1、发送字典send_dic 中的类型'check_download_record'
7-2、接受字典back_dic进行判断即可
服务端
7-3、还记得tcpserver模块中的全局func_dic字典吗?加上'check_download_record'的映射方法
7-4、要查看下载记录 先根据用户id得到一个记录列表,循环该列表得到的是每一个记录对象
7-5、根据每一个对象点movie_id 和电影id判断得到电影列表,索引取值得到各个对象
7-6、把每一个对象的名字添加到一个自定义的列表中,用于返回给客户端
8、查看公告
客户端
8-1、发送字典send_dic 中的类型'check_notice'
8-2、接受字典back_dic进行判断即可
服务端
8-3、还记得tcpserver模块中的全局func_dic字典吗?加上'check_notice'的映射方法
8-4、Notice类调用select方法得到公告列表
8-5、列表存在的话 遍历该列表得到每一个对象,返回字典中保存对象点title,点content进行返回
七、项目中遇到的问题及怎么解决的
- 校验登陆问题(服务端必须校验,客户端无所谓)
- 获取所有视频列表考虑的不周全,如果单从管理员角度要获得所有视频不考虑用户获取收费或者免费的视频,会出现一些代码冗余,所以在获取所有视频这个功能要判断传过来的的movie_type是all、free、charge
- 服务端怎样标识客户端问题:cookie保存到客户端、session保存到服务器user_data文件中
- 从客户端到数据库一顿操作打通以后遇到最多的问题有字段打错了
八、客户端代码框架
8.1 conf
8.1.1 setting
import os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
UPDATE_MOVIE = os.path.join(BASE_DIR,'update_movie')
DOWNLOAD_MOVIE_DIR = os.path.join(BASE_DIR,'download_movie')
8.2 core
8.2.1 src
from core import admin,user
func_dic = {"1": admin.admin_view,"2": user.user_view}
def run():
while True:
print("""
1、管理员视图
2、普通用户视图
q、退出
""")
choice = input("please choice your number>>:").strip()
if choice == 'q': break
if choice not in func_dic:
print("choice err not in range")
continue
func_dic.get(choice)()
8.2.2 admin
import os
from Tcpclient import tcpclient
from lib import common
from conf import setting
user_info = {
"session": None
}
def register(client):
while True:
name = input("please input your name>>:").strip()
password = input("please input your password>>:").strip()
re_password = input("please agan input your password>>:").strip()
if password != re_password:
print("两次密码不一致")
continue
send_dic = {"type": "register","name": name,"password": common.get_md5(password),"user_type": "admin"}
back_dic = common.send_back(send_dic,client)
if back_dic["flag"]:
print(back_dic["msg"])
break
else:
print(back_dic["msg"])
def login(client):
while True:
name = input("please input your name>>:").strip()
password = input("please input your password>>:").strip()
send_dic = {"type": "login",client)
if back_dic["flag"]:
print(back_dic["msg"])
user_info["session"] = back_dic["session"]
break
else:
print(back_dic["msg"])
def update_movie(client):
"""
思路:
1、是不是要先获取有哪些可以上传的视频
2、选择好要上传的视频后是不是还要判断服务器存不存在,存在了就不需要上传了
3、那就需要校验视频文件,可自定义校验规则
4、循环上传
:param client:
:return:
"""
while True:
movie_list = common.get_movie()
if not movie_list:
print("暂无影片上传")
return
for i,m in enumerate(movie_list,start=1):
print("%s:%s" % (i,m))
choice = input("please input your choice>>: ").strip()
if choice == 'q': break
if choice.isdigit():
choice = int(choice)
if choice in range(1,len(movie_list) + 1):
file_path = os.path.join(setting.UPDATE_MOVIE,movie_list[choice - 1])
file_md5 = common.get_file_md5(file_path)
send_dic = {"type": "check_movie","session": user_info["session"],"file_md5": file_md5}
back_dic = common.send_back(send_dic,client)
if back_dic["flag"]:
# 如果可以上传,那标识上传免费还是收费
is_free = input("上传的影片是否免费(y/n)>>:").strip()
is_free = 1 if is_free == 'y' else 0
file_name = movie_list[choice - 1]
send_dic = {"type": "update_movie","is_free": is_free,"file_name": file_name,"file_md5": file_md5,"file_size": os.path.getsize(file_path)}
back_dic = common.send_back(send_dic,client,file_path)
if back_dic["flag"]:
print(back_dic["msg"])
return
else:
print(back_dic["msg"])
else:
print(back_dic["msg"])
else:
print("choice not in range")
else:
print("input choice must be a number !")
def delete_movie(client):
"""
思路:
1、先从服务器获取所有视频
2、要删除的发给服务器
:param client:
:return:
"""
send_dic = {"type": "get_movie_list","movie_type": "all"}
back_dic = common.send_back(send_dic,client)
if back_dic["flag"]:
"""
服务器的get_movie_list会返回一个电影列表,列表里面为[电影名,收费或免费,电影id]
"""
movie_list = back_dic["movie_list"]
for i,start=1):
print("%s:%s-%s" % (i,m[0],m[1]))
choice = input("input your delete movie>>:").strip()
if choice.isdigit():
choice = int(choice)
if choice in range(1,len(movie_list) + 1):
send_dic = {"type": "delete_movie","movie_id": movie_list[choice - 1][2]}
back_dic = common.send_back(send_dic,client)
if back_dic['flag']:
print(back_dic['msg'])
return
else:
print(back_dic['msg'])
else:
print('choice noe in range')
else:
print(back_dic['msg'])
def release_notice(client):
while True:
title = input("please input title>>:").strip()
content = input("please input content>>:").strip()
send_dic = {"type": "release_notice","title": title,"content": content}
back_dic = common.send_back(send_dic,client)
if back_dic["flag"]:
print(back_dic["msg"])
break
else:
print(back_dic['msg'])
break
func_dic = {
"1": register,"2": login,"3": update_movie,"4": delete_movie,"5": release_notice
}
def admin_view():
client = tcpclient.get_client()
while True:
print("""
1、注册
2、登录
3、上传电影
4、删除电影
5、发布公告
""")
choice = input("please choice your number>>:").strip()
if choice == 'q': break
if choice not in func_dic:
print("choice err not in range")
continue
func_dic.get(choice)(client)
8.2.3 user
import os
import time
from Tcpclient import tcpclient
from conf import setting
from lib import common
user_info = {
"session": None,"is_vip": None
}
def register(client):
while True:
name = input("please input your name>>:").strip()
password = input("please input your password>>:").strip()
re_password = input("please agan input your password>>:").strip()
if password != re_password:
print("两次密码不一致")
continue
send_dic = {"type": "register","user_type": "user"}
back_dic = common.send_back(send_dic,client)
if back_dic["flag"]:
print(back_dic["msg"])
user_info["session"] = back_dic["session"]
user_info["is_vip"] = back_dic["is_vip"]
break
else:
print(back_dic["msg"])
def buy_vip(client):
while True:
buy_vip = input("是否购买会员(y/n)>>:").strip()
if buy_vip == 'q': break
if buy_vip not in ['y','n']:
print("输入有误")
continue
if buy_vip == 'y':
send_dic = {"type": "buy_vip","session": user_info["session"]}
back_dic = common.send_back(send_dic,client)
if back_dic["flag"]:
print(back_dic["msg"])
break
else:
print(back_dic["msg"])
else:
print("欢迎下次购买")
break
def check_movie(client):
send_dic = {"type": "get_movie_list",client)
if back_dic["flag"]:
movie_list = back_dic["movie_list"]
for i,m[1]))
else:
print(back_dic["msg"])
def download_free_movie(client):
send_dic = {"type": "get_movie_list","movie_type": "free"}
back_dic = common.send_back(send_dic,m[1]))
while True:
choice = input("请选择要下载的电影编号>>:").strip()
if choice == 'q': break
if choice.isdigit():
choice = int(choice)
if choice in range(1,len(movie_list) + 1):
send_dic = {"type": "download_movie","movie_id": movie_list[choice - 1][2],"movie_type": "free"}
back_dic = common.send_back(send_dic,client)
if back_dic["flag"]:
print("请等待》》》")
time.sleep(back_dic["wait_time"])
file_path = os.path.join(setting.DOWNLOAD_MOVIE_DIR,back_dic["file_name"])
recv_size = 0
with open(file_path,'wb') as f:
while recv_size < back_dic["file_size"]:
data = client.recv(1024)
f.write(data)
recv_size += len(data)
print("下载成功")
return
else:
print(back_dic["msg"])
else:
print("choice not in range")
else:
print("choice must be number")
else:
print(back_dic["msg"])
def download_charge_movie(client):
if user_info["is_vip"]:
charge = input("请支付10元(y/n)>>:").strip()
else:
charge = input('请支付20元(y/n)>>:').strip()
if charge != "y":
print("慢走 不送")
return
send_dic = {"type": "get_movie_list","movie_type": "charge"}
back_dic = common.send_back(send_dic,'wb') as f:
while recv_size < back_dic["file_size"]:
data = client.recv(1024)
f.write(data)
recv_size += len(data)
print("下载成功")
return
else:
print(back_dic["msg"])
else:
print("choice not in range")
else:
print("choice must be number")
else:
print(back_dic["msg"])
def download_movie_record(client):
"""
思路:当前登录的用户需要查看自己的观影记录,需要得到电影名
:param client:
:return:
"""
send_dic = {"type": "download_movie_record","session": user_info["session"]}
back_dic = common.send_back(send_dic,client)
if back_dic["flag"]:
back_record_list = back_dic['back_record_list']
for m in back_record_list:
print(m)
else:
print(back_dic['msg'])
def check_notice(client):
"""
查看公告思路:
:param client:
:return:
"""
send_dic = {"type": "check_notice",client)
if back_dic["flag"]:
back_record_list = back_dic['back_notice_list']
for m in back_record_list:
print(m)
else:
print(back_dic['msg'])
func_dic = {
"1": register,"3": buy_vip,"4": check_movie,"5": download_free_movie,"6": download_charge_movie,"7": download_movie_record,"8": check_notice
}
def user_view():
client = tcpclient.get_client()
while True:
print("""
1、注册
2、登录
3、购买会员
4、查看所有电影
5、下载免费电影
6、下载收费电影
7、查看观影记录
8、查看公告
""")
choice = input("please choice your number>>:").strip()
if choice == 'q': break
if choice not in func_dic:
print("choice err not in range")
continue
func_dic.get(choice)(client)
8.3 lib
8.3.1 common
import hashlib
import json
import os
import struct
from conf import setting
def send_back(send_dic,file=None):
json_bytes = json.dumps(send_dic).encode("utf-8")
client.send(struct.pack('i',len(json_bytes)))
client.send(json_bytes)
if file:
with open(file,'rb') as f:
for line in f:
client.send(line)
recv_len = struct.unpack('i',client.recv(4))[0]
recv_dic = json.loads(client.recv(recv_len).decode("utf-8"))
return recv_dic
def get_md5(password):
md = hashlib.md5()
md.update(password.encode("utf-8"))
return md.hexdigest()
def get_movie():
movie_list = os.listdir(setting.UPDATE_MOVIE)
return movie_list
def get_file_md5(path):
md = hashlib.md5()
file_size = os.path.getsize(path)
file_list = [0,file_size // 3,(file_size // 3) * 2,file_size - 10]
with open(path,"rb") as f:
for line in file_list:
f.seek(line)
md.update(f.read(10))
return md.hexdigest()
8.4 Tcpclient
8.4.1 tcpclient
import socket
def get_client():
client = socket.socket()
client.connect(("127.0.0.1",1688))
return client
8.5 start
8.5.1 start
import os,sys
from core import src
sys.path.append(os.path.dirname(__file__))
if __name__ == '__main__':
src.run()
九、服务端框架
9.1 conf
9.1.1 setting
import os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
MOVIE_DIR = os.path.join(BASE_DIR,'movie_dir')
9.2 db
9.2.1 models
from orm_pool.orm import Models,StringField,IntegerField
class User(Models):
table_name = 'user'
id = IntegerField("id",primary_key=True)
name = StringField("name")
password = StringField("password")
is_locked = IntegerField("is_locked",default=0)
is_vip = IntegerField("is_vip",default=0)
user_type = StringField("user_type")
register_time = StringField("register_time")
class Movie(Models):
table_name = "movie"
id = IntegerField("id",primary_key=True)
name = StringField("name",column_type="varchar(64)")
path = StringField("path")
is_free = IntegerField("is_free")
is_delete = IntegerField("is_delete",default=0)
create_time = StringField("create_time")
user_id = IntegerField("user_id")
file_md5 = StringField("file_md5")
class Notice(Models):
table_name = "notice"
id = IntegerField("id",primary_key=True)
title = StringField("title")
content = StringField("content")
user_id = IntegerField("user_id")
create_time = StringField("create_time")
class DownloadRecord(Models):
table_name = "download_record"
id = IntegerField("id",primary_key=True)
user_id = IntegerField("user_id")
movie_id = IntegerField("movie_id")
create_time = StringField("create_time")
9.3 interface
9.3.1 admin_interface
import os
from db import models
from lib import common
from conf import setting
@common.login_auth
def check_movie(recv_dic,conn):
movie_data = models.Movie.select(file_md5=recv_dic["file_md5"])
if movie_data:
back_dic = {"flag": False,"msg": "该电影已存在"}
else:
back_dic = {"flag": True,"msg": "可以上传"}
common.send_back(back_dic,conn)
@common.login_auth
def update_movie(recv_dic,conn):
file_name = common.get_session(recv_dic["file_name"]) + recv_dic["file_name"]
file_path = os.path.join(setting.MOVIE_DIR,file_name)
print(recv_dic)
recv_size = 0
with open(file_path,'wb') as f:
while recv_size < recv_dic["file_size"]:
data = conn.recv(1024)
f.write(data)
recv_size += len(data)
movie_obj = models.Movie(name=file_name,path=file_path,is_free=recv_dic.get("is_free"),is_delete=0,create_time=common.get_time(),user_id=recv_dic.get("user_id"),file_md5=recv_dic.get("file_md5"))
movie_obj.save()
back_dic = {"flag": True,"msg": "上传成功"}
common.send_back(back_dic,conn)
@common.login_auth
def delete_movie(recv_dic,conn):
movie_obj = models.Movie.select(id=recv_dic.get('movie_id'))[0]
movie_obj.is_delete = 1
movie_obj.update()
back_dic = {"flag": True,"msg": "删除成功"}
common.send_back(back_dic,conn)
@common.login_auth
def release_notice(recv_dic,conn):
title = recv_dic["title"]
content = recv_dic["content"]
user_id = recv_dic["user_id"]
create_time = common.get_time()
notice_obj = models.Notice(title=title,content=content,user_id=user_id,create_time=create_time)
notice_obj.save()
back_dic = {"flag": True,"msg": "发布成功"}
common.send_back(back_dic,conn)
9.3.2 common_interface
from db import models
from lib import common
from Tcpserver import user_data
def register(recv_dic,conn):
user_list = models.User.select(name=recv_dic["name"])
if user_list:
back_dic = {"flag": False,"msg": "用户已存在"}
common.send_back(back_dic,conn)
return
user_obj = models.User(name=recv_dic["name"],password=recv_dic["password"],is_locked=0,is_vip=0,user_type=recv_dic["user_type"],register_time=common.get_time())
user_obj.save()
back_dic = {"flag": True,"msg": "注册成功"}
common.send_back(back_dic,conn)
def login(recv_dic,conn):
user_list = models.User.select(name=recv_dic["name"])
if user_list:
user_obj = user_list[0]
if user_obj.user_type == recv_dic["user_type"]:
if user_obj.password == recv_dic["password"]:
back_dic = {"flag": True,"msg": "登陆成功","is_vip": user_obj.is_vip}
# 获取每个用户的唯一随机字符串,用于标识每个用户
session = common.get_session(user_obj.name)
back_dic["session"] = session
# 服务端要记录正在登录的客户端,将数据user_data文件live_user字典中,放在为了更好的标识
# 每一个客户,字典的key为recv_dic["addr"] -----他是一个元组包含ip和端口,值的话是一个列表
# 保存每一个session和用户id
# 因为时公共数据,且并发会造成数据错乱,咱们给他来个锁
user_data.mutex.acquire()
user_data.live_user[recv_dic["addr"]] = [session,user_obj.id]
user_data.mutex.release()
else:
back_dic = {"flag": False,"msg": "密码不正确"}
else:
back_dic = {"flag": False,"msg": "用户类型不对"}
else:
back_dic = {"flag": False,"msg": "用户不存在"}
common.send_back(back_dic,conn)
@common.login_auth
def get_movie_list(recv_dic,conn):
"""
要给调用者返回相应的电影列表:all、free、charge
:param recv_dic:
:param conn:
:return:
"""
movie_list = models.Movie.select()
if movie_list:
back_movie_list = []
for movie_obj in movie_list:
if not movie_obj.is_delete:
if recv_dic["movie_type"] == "all":
back_movie_list.append([movie_obj.name,'免费' if movie_obj.is_free else '收费',movie_obj.id])
elif recv_dic["movie_type"] == "free":
if movie_obj.is_free:
back_movie_list.append([movie_obj.name,'免费',movie_obj.id])
else:
if not movie_obj.is_free:
back_movie_list.append([movie_obj.name,'收费',movie_obj.id])
if back_movie_list:
back_dic = {"flag": True,"movie_list": back_movie_list}
else:
back_dic = {"flag": False,"msg": "暂无影片"}
else:
back_dic = {"flag": False,"msg": "暂无影片"}
common.send_back(back_dic,conn)
9.3.3 user_interface
import os
from conf import setting
from db import models
from lib import common
@common.login_auth
def buy_vip(recv_dic,conn):
user_obj = models.User.select(id=recv_dic['user_id'])[0]
if user_obj.is_vip:
back_dic = {"flag": False,"msg": "您已经是会员啦"}
else:
user_obj.is_vip = 1
user_obj.save()
back_dic = {"flag": True,"msg": "购买成功"}
common.send_back(back_dic,conn)
@common.login_auth
def download_movie(recv_dic,conn):
"""
下载电影功能:普通用户下载需要等待30秒 VIP用下载不需要等待
:param recv_dic:
:param conn:
:return:
"""
movie_list = models.Movie.select(id=recv_dic["movie_id"])
if movie_list:
movie_obj = movie_list[0]
file_path = movie_obj.path
user_obj = models.User.select(id=recv_dic["user_id"])[0]
wait_time = 0
if recv_dic["movie_type"] == "free":
if user_obj.is_vip:
wait_time = 0
else:
wait_time = 30
back_dic = {"flag": True,"file_name": movie_obj.name,"file_size": os.path.getsize(file_path),"wait_time": wait_time}
download_record = models.DownloadRecord(user_id=user_obj.id,movie_id=movie_obj.id,create_time=common.get_time())
download_record.save()
common.send_back(back_dic,conn)
with open(file_path,'rb') as f:
for line in f:
conn.send(line)
else:
back_dic = {"flag": False,"msg": "暂无影片"}
common.send_back(back_dic,conn)
@common.login_auth
def download_movie_record(recv_dic,conn):
record_list = models.DownloadRecord.select(user_id=recv_dic["user_id"])
back_record_list = []
if record_list:
for m in record_list:
movie_obj = models.Movie.select(id=m.movie_id)[0]
back_record_list.append(movie_obj.name)
back_dic = {"flag": True,"back_record_list": back_record_list}
else:
back_dic = {"flag": False,"msg": "暂无下载记录"}
common.send_back(back_dic,conn)
@common.login_auth
def check_notice(recv_dic,conn):
notice_list = models.Notice.select()
back_notice_list = []
if notice_list:
for notice_obj in notice_list:
back_notice_list.append([notice_obj.title,notice_obj.content])
back_dic = {"flag": True,"back_notice_list": back_notice_list}
else:
back_dic = {"flag": False,conn)
9.4 lib
9.4.1 common
import hashlib
import json
import struct
import time
from functools import wraps
from Tcpserver import user_data
def send_back(back_dic,conn):
json_bytes = json.dumps(back_dic).encode("utf-8")
conn.send(struct.pack('i',len(json_bytes)))
conn.send(json_bytes)
def get_time():
return time.strftime("%Y-%m-%d %X")
def get_session(name):
# 为了保证每个用户的随机字符串是唯一的,不仅要对每一个用户名加密还要加上cpu执行时机----加盐
md = hashlib.md5()
md.update(str(time.clock()).encode("utf-8"))
md.update(name.encode("utf-8"))
return md.hexdigest()
def login_auth(func):
@wraps(func)
def inner(*args,**kwargs):
# args=(recv_dic,conn)
# 登录了以后服务端user_data文件中live_user就有存在用户登陆的数据,如果没有登陆就没有数据,可以为此
# 来作为判断是否登录的依据
for values in user_data.live_user.values():
# values = [session,user_id]
if args[0]["session"] == values[0]:
args[0]["user_id"] = values[1]
break
if args[0].get("user_id"):
func(*args,**kwargs)
else:
back_dic = {"flag": False,"msg": "请先登录"}
send_back(back_dic,args[1])
return inner
9.5 orm_pool
9.5.1 db_pool
from DBUtils.PooledDB import PooledDB
import pyMysqL
POOL = PooledDB(
creator=pyMysqL,# 使用链接数据库的模块
maxconnections=6,# 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2,# 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5,# 链接池中最多闲置的链接,0和None不限制
maxshared=3,# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pyMysqL和MysqLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True,# 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None,# 一个链接最多被重复使用的次数,None表示无限制
setsession=
[],# 开始会话前执行的命令列表。如:["set datestyle to ...","set time zone ..."]
ping=0,# ping MysqL服务端,检查是否服务可用。# 如:0 = None = never,1 = default = whenever it is requested,2 = when a cursor is created,4 = when a query is executed,7 = always
host='127.0.0.1',port=3306,user='root',password='123',database='youku',charset='utf8',autocommit='True')
9.5.2 MysqL_singleton
import pyMysqL
from orm_pool import db_pool
class MysqL(object):
def __init__(self):
self.conn = db_pool.POOL.connection()
self.cursor = self.conn.cursor(pyMysqL.cursors.DictCursor)
def close(self):
self.cursor.close()
self.conn.close()
def select(self,sql,args=None):
self.cursor.execute(sql,args)
res = self.cursor.fetchall() # 列表套字典
return res
def execute(self,args):
try:
self.cursor.execute(sql,args)
except BaseException as e:
print(e)
9.5.3 orm
from orm_pool.MysqL_singleton import MysqL
# 定义字段类
class Field(object):
def __init__(self,column_type,primary_key,default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default
# 定义具体的字段
class StringField(Field):
def __init__(self,column_type='varchar(255)',primary_key=False,default=None):
super().__init__(name,default)
class IntegerField(Field):
def __init__(self,column_type='int',default)
class ModelMetaClass(type):
def __new__(cls,class_attrs):
# 我仅仅只想拦截模型表的类的创建过程
if class_name == 'Models':
return type.__new__(cls,class_attrs)
# 给类放表名,主键字段,所有字段
table_name = class_attrs.get('table_name',class_name)
# 定义一个存储主键的变量
primary_key = None
# 定义一个字典用来存储用户自定义的表示表的所有字段信息
mappings = {}
# for循环当前类的名称空间
for k,Field):
mappings[k] = v
if v.primary_key:
if primary_key:
raise TypeError("主键只能有一个")
primary_key = v.name
# 将重复的键值对删除
for k in mappings.keys():
class_attrs.pop(k)
if not primary_key:
raise TypeError('必须要有一个主键')
# 将处理好的数据放入class_attrs中
class_attrs['table_name'] = table_name
class_attrs['primary_key'] = primary_key
class_attrs['mappings'] = mappings
return type.__new__(cls,class_attrs)
class Models(dict,Metaclass=ModelMetaClass):
def __init__(self,**kwargs):
super().__init__(**kwargs)
def __getattr__(self,item):
return self.get(item,'没有该键值对')
def __setattr__(self,value):
self[key] = value
# 查询方法
@classmethod
def select(cls,**kwargs):
ms = MysqL()
# select * from userinfo
if not kwargs:
sql = 'select * from %s' % cls.table_name
res = ms.select(sql)
else:
# select * from userinfo where id = 1
k = list(kwargs.keys())[0]
v = kwargs.get(k)
sql = 'select * from %s where %s=?' % (cls.table_name,k)
# select * from userinfo where id = ?
sql = sql.replace('?','%s') # select * from userinfo where id = %s
res = ms.select(sql,v)
if res:
return [cls(**r) for r in res] # 将数据库的一条数据映射成类的对象
# 新增方法
def save(self):
ms = MysqL()
# insert into userinfo(name,'123')
# insert into %s(%s) values(?)
fields = [] # [name,password]
values = []
args = []
for k,v in self.mappings.items():
if not v.primary_key: # 将id字段去除 因为新增一条数据 id是自动递增的不需要你传
fields.append(v.name)
args.append('?')
values.append(getattr(self,v.name))
# insert into userinfo(name,password) values(?,?)
sql = "insert into %s(%s) values(%s)" % (
self.table_name,','.join(fields),'.join(args))
# insert into userinfo(name,?)
sql = sql.replace('?','%s')
ms.execute(sql,values)
# 修改方法:基于已经存在了的数据进行一个修改操作
def update(self):
ms = MysqL()
# update userinfo set name='jason',password='123' where id = 1
fields = [] # [name,password]
values = []
pr = None
for k,v in self.mappings.items():
if v.primary_key:
pr = getattr(self,v.name,v.default)
else:
fields.append(v.name + '=?')
values.append(getattr(self,v.default))
sql = 'update %s set %s where %s = %s' % (
self.table_name,self.primary_key,pr)
# update userinfo set name='?',password='?' where id = 1
sql = sql.replace('?',values)
# if __name__ == '__main__':
# class Teacher(Models):
# table_name = 'teacher'
# tid = IntegerField(name='tid',primary_key=True)
# tname = StringField(name='tname')
# obj = Teacher(tname='jason老师')
# obj.save()
# res = Teacher.select()
# for r in res:
# print(r.tname)
# print(res)
# res = Teacher.select(tid=1)
# teacher_obj = res[0]
# teacher_obj.tname = 'jason老师'
# teacher_obj.update()
# res1 = Teacher.select()
# print(res1)
# class User(Models):
# table_name = 'User'
# id = IntegerField(name='id',primary_key=True)
# name = StringField(name='name')
# password = StringField(name='password')
# print(User.primary_key)
# print(User.mappings)
# obj = User(name='jason')
# print(obj.table_name)
# print(obj.primary_key)
# print(obj.mappings)
9.6 Tcpserver
9.6.1 tcpserver
import json
import socket
import struct
import traceback
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from Tcpserver import user_data
from interface import common_interface,admin_interface,user_interface
from lib import common
pool = ThreadPoolExecutor(20)
#全局产生锁,为了避免循环导入问题,将产生的锁放到user_data中
mutex = Lock()
user_data.mutex = mutex
func_dic = {
"register": common_interface.register,"login": common_interface.login,"check_movie": admin_interface.check_movie,"update_movie": admin_interface.update_movie,"get_movie_list": common_interface.get_movie_list,"delete_movie": admin_interface.delete_movie,"release_notice": admin_interface.release_notice,"buy_vip": user_interface.buy_vip,"download_movie": user_interface.download_movie,"download_movie_record": user_interface.download_movie_record,"check_notice": user_interface.check_notice
}
def get_server():
server = socket.socket()
server.bind(("127.0.0.1",1688))
server.listen(5)
while True:
conn,addr = server.accept()
pool.submit(working,conn,addr)
def working(conn,addr):
while True:
try:
recv_header = conn.recv(4)
recv_bytes = conn.recv(struct.unpack('i',recv_header)[0])
recv_dic = json.loads(recv_bytes.decode("utf-8"))
recv_dic["addr"] = str(addr)
dispatch(recv_dic,conn)
except Exception as e:
traceback.print_exc()
conn.close()
# 当用户断开以后,服务器就无需保存表示他的数据
user_data.mutex.acquire()
user_data.live_user.pop(str(addr))
user_data.mutex.release()
break
def dispatch(recv_dic,conn):
if recv_dic.get("type") in func_dic:
func_dic.get(recv_dic["type"])(recv_dic,conn)
else:
back_dic = {"flag": False,"msg": "类型不合法"}
common.send_back(back_dic,conn)
9.6.2 user_data
live_user={}
mutex=None
9.7 start
9.7.1 start
import os,sys
from Tcpserver import tcpserver
sys.path.append(os.path.dirname(__file__))
if __name__ == '__main__':
tcpserver.get_server()