在 Twisted中,有一个全局用于实现事件循环的对象为reactor。
反应器具体的工作包括:定时任务、线程、建立网络连接、监听连接。
1、定时器简单实例
#! /usr/bin/using_reactor.py # Filename:using_reactor.py from twisted.internet import reactor import time def printTime(): print 'Current time is',time.strftime("%H:%M:%S") def stopReactor(): print "Stopping reactor" reactor.stop() reactor.callLater(1,printTime) reactor.callLater(2,printTime) reactor.callLater(3,printTime) reactor.callLater(4,printTime) reactor.callLater(5,stopReactor) print 'Running the reactor ...' reactor.run() print 'Reactor stopped.'
运行结果:
python using_reactor.py
Running the reactor ...
Current time is 17:44:58
Current time is 17:44:59
Current time is 17:45:00
Current time is 17:45:01
Stopping reactor
Reactor stopped.
运行过程:
使用reactor.callLater函数定时执行函数。reactor.callLater函数包含两个必须参数,等待的秒数,和需要调用的函数。
反应器在被告知停止之前会一直运行,直到reactor.stop()调用。
在实际应用中,reactor.callLater是常用于超时处理和定时事件。
2、协议与工厂
#! /usr/bin/using_tcp.py # Filename:using_tcp.py from twisted.internet import reactor,protocol class QuickDisconnectedProtocol(protocol.Protocol): def connectionMade(self): print "Connected to %s." %self.transport.getPeer().host self.transport.loseConnection() class BasicClientFactory(protocol.ClientFactory): protocol = QuickDisconnectedProtocol def clientConnectionLost(self,connector,reason): print 'Lost connection: %s' %reason.getErrorMessage() reactor.stop() def clientConnectionFailed(self,reason): print 'Connection Failed: %s' %reason.getErrorMessage() reactor.stop() reactor.connectTCP('www.google.com',80,BasicClientFactory()) reactor.run()
运行结果:
python using_tcp.py
Connected to 74.125.71.99.
Lost connection: Connection was closed cleanly.
运行过程:
这里有两个主要的类用于作为客户端工作,ClientFactory和Protocol。
这些类被设计成处理连接中所有可能运到的事件:成功建立连接、连接失败、连接断开、数据传送等等。
(1)协议
QuickDisconnectProtocol为自定义的Protocol,继承自protocol.Protocol。它重载了一个方法connectMade。这个方法在连接成功时运行。在reactor刚刚成功建立了连接,ClientFactory创建了QuickDisconnectProtocol的实例时。Protocol对象有一个属性叫做transport,包含了当前活动连接对象。
(2)工厂
ClientFactory的工作是管理连接事件,并且创建Protocol对象处理每一个成功的连接。一旦连接建立,Protocol对象就接管下面的工作了,包括收发数据和决定是否关闭连接。
BasicClientFactory是继承自protocol.ClientFactory的类。它首先设置了类变量protocol为QuickDisconnectProtocol。这个类的实例被创建用于管理成功的连接。
BasicClientFactory 重载了ClientFactory的两个方法,clientConnectionLost和clientConnectionFailed:
1)clientConnectionFailed在反应器建立连接失败时被调用
2)clientConnectionLost在建立的连接被关闭或断开时调用
通知反应器建立TCP连接,如调用reactor.connectTCP: reactor.connectTCP(‘www.google.com’,BasicClientFactory()),通知反应器建立一个TCP连接到服务器www.google.com的80端口,通过BasicClientFactory来管理连接。
2、具体应用
(1)被动连接工厂
server类
class server def __init__(self,config): self.nodeId = config.nodeId self.channels = {} #channels字典,nodeId 对应channel对象 def startRun(self): #这里启动监听连接 reactor.listenTCP(svrNode.port,ServerConnectionFactory(self,self.nodeId)) for nodeId in [1,2,3]:#这里应该是客户端节点列表(需要自己实现) reactor.listenTCP(svrNode.port,nodeId)) #服务器连接工厂,用来生成制造出协议类对象 class ServerConnectionFactory(protocol.ServerFactory): protocol = ServerConnection def __init__(self,host,nodeId): self.host = host self.nodeId = nodeId def buildProtocol(self,addr): p = protocol.ServerFactory.buildProtocol(self,addr) p.host = self.host p.serverId = self.nodeId p.clientId = None return p
服务器连接类
class ServerConnection(protocol.Protocol): #处理连接制造的初始化,可以在这里设置一个空的chanel,chanel会包含transfer对象,用来处理数据发送 def connectionMade(self): .... #处理连接断开后,销毁chanel def connectionLost(self,reason): ... #这里处理数据接受,并把接受的数据写到channel的缓存,如果没有建立channel,就先建立 #会在第一次接受数据时设置channel对象 #所有的channel存在于server的channels字典 #接受的数据需要验证验证码,否则就主动断开连接,使用self.transport.loseConnection(),可以自定义 #一个握手信息,在第一次接受数据时验证,以后就不用验证了。 def dataReceived(self,data): ... #在被底层删除时,需要添加的自定义处理 def __del__(self): ...
会话类
class Channel(object): #在serverConnextion(服务器接受连接协议)里面创建Channel时就要传入一个transport对象 #(protocol.Protocol的成员,来自于from twisted.internet import protocol,在twisted里面实现的协议基础类) # 设置缓冲区成员(初始是空字符串),设置server作为Channel的监听者,打包好的缓冲数据(反序列化后),就交由 #server的频道回调处理函数onChannelEvent,该函数会发送到具体服务节点,根据服务id就发送给具体的服务实例或者具体的节点id #发送给具体chanel对应的远程服务(建立连接的客户端或服务端) def __init__(self,transport): ...
(2)主动连接工厂
主动连接客户端的也类似:客户端连接协议
class ClientConnection(protocol.Protocol): #简历连接,发送握手信息(对端开始时要检验) def connectionMade(self): ... #连接断开,销毁channel def connectionLost(self,reason): ... def dataReceived(self,data): #接受数据,第一次时建立channel,数据保存在channel的缓冲区 ...
class ClientConnectionFactory(protocol.ReconnectingClientFactory): #初始化设置主机(为server),和节点id,设置重新连接延迟(5s) def __init__(self,nodeId): ... #返回客户端连接协议实例 def buildProtocol(self,addr): ... #连接断开,使用protocol.ReconnectingClientFactory.clientConnectionLost尝试重新连接 def clientConnectionLost(self,reason): ... #连接断开,使用protocol.ReconnectingClientFactory.clientConnectionFailed尝试重新连接 def clientConnectionFailed(self,reason): ...