OSError: [WinError 10013] 以一种访问权限不允许的方式做了一个访问套接字的尝试。
报错代码位置:
foo/main.py 28行
s = socketserver.ThreadingTCPServer((config.ip,config.port),MyServer)
代码架构:
conf
--config.py
data
--data.db
foo
-- main.py
--server.py
--models.py
manage.py
conf/config.py代码示例:
ip = "127.0.0.1" port = 8000
manage.py代码示例:
from foo import main if __name__ == '__main__': main.ArgvHandler()
foo/main.py代码示例:
import optparse import socketserver from .server import MyServer from conf import config class ArgvHandler(): def __init__(self): self.op = optparse.OptionParser() options, args = self.op.parse_args() self.verify_args(options, args) # 程序启动命令检查 def verify_args(self, options, args): if len(args) > 0: cmd = args[0] if hasattr(self, cmd): func = getattr(self, cmd) func() else: args.insert(0, 'start') return self.verify_args(options, args) # 启动socket def start(self): print('服务端正在工作') s = socketserver.ThreadingTCPServer((config.ip, config.port), MyServer) s.serve_forever()
foo/main.py代码示例:
import socketserver import json from conf import config from foo.models import DbSession, User session = DbSession() STATUS_CODE = { 250: "Invalid cmd format, e.g: {'action':'get','filename':'test.py','size':344}", 251: "Invalid cmd ", 252: "Invalid auth data", 253: "Wrong username or password", 254: "Passed authentication", 255: "Filename doesn't provided", 256: "File doesn't exist on server", 257: "ready to send file", 258: "md5 verification", 800: "the file exist,but not enough ,is continue? ", 801: "the file exist !", 802: " ready to receive datas", 900: "md5 valdate success" } class MyServer(socketserver.BaseRequestHandler): def handle(self): while True: data = self.request.recv(1024).strip() data = json.loads(data.decode('utf-8')) if data.get('action'): if hasattr(self, data.get('achtion')): func = getattr(self, data.get('action')) func(data) else: print('无效命令') else: print('无效命令') # 获取用户账号信息 def auth(self, **data): username = data['username'] password = data['password'] user = self.authentication(username, password) if user: self.send_response(254) else: self.send_response(253) # 从数据库提取账号密码 def authentication(self, username, password): user = session.session.query(User).filter_by(username=username).first() if user and user.verify_password(password): self.user = username return True def send_response(self, status_code): res = json.dumps({"status_code": status_code}).encode('utf-8') self.request.sendall(res) def put(self, **data): pass
foo/models.py代码示例:
from sqlalchemy import create_engine from sqlalchemy import Column, Integer, String, Float from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker import os from werkzeug.security import generate_password_hash, check_password_hash basedir = os.path.dirname(os.path.dirname(__file__)) data_f = os.path.join(basedir, 'data/data.db') # 绑定引擎 engine = create_engine('sqlite:///{}'.format(data_f)) # 建立基类 Base = declarative_base() # 定义User对象 class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) username = Column(String) password_hash = Column(String) space = Column(Integer) has_space = Column(Float, default=0) @property def password(self): raise AttributeError('密码不具有可读性') @password.setter def password(self, password): self.password_hash = generate_password_hash(password) def verify_password(self, password): return check_password_hash(self.password_hash, password) class DbSession: Session = sessionmaker(bind=engine) session = Session() if not os.path.isfile(data_f): # 创建数据库,如果存在自动替换 Base.Metadata.create_all(engine)
解决结果:
第一种解决方法: