基本概念
Wangle中的Pipeline和Netty中的Pipeline是很相似的,既可以将它看为一种职责链模式的实现也可以看作是Handler的容器。Pipeline中的handler都是串行化执行的,前一个handler完成自己的工作之后把事件传递给下一个handler,理论上Pipeline中的所有handler都是在同一个IO线程中执行的,但是为了防止某些handler(比如序列化、编解码handler等)耗时过长,Netty中允许为某些handler指定其它线程(eventloop)异步执行,类似的功能在Wangle中也有体现,只是在实现方式上有些区别。和Netty中一个较大的区别是,Wangle中并没有专门的Channel定义,Wangle中的Pipeline兼有了Channel的角色和功能。下面分别就Pipeline、Handler和Context的顺序进行源码分析。
Pipeline
PipelineBase作为Pipeline的基类,提供了一些最为通用、核心的api实现,比如对handler的操作:addBack及其变体、addFront及其变体、remove及其变体等,下面看一下addBack的一个实现版本:
template <class H> PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) { typedef typename ContextType<H>::type Context;// 声明Conetxt类型,ContextImpl<Handler>、InboundContextImpl<Handler>、OutboundContextImpl<Handler>其中之一 // 使用Context包装Handler后,将其添加到pipeline中,Context中还持有pipeline的引用 return addHelper( std::make_shared<Context>(shared_from_this(),std::move(handler)),false);// false标识添加到尾部 }
首先,会根据要添加的handler类型定义一个Context(Context可以看成是Handler的外套,后面还会单独介绍)类型,然后根据这个Context类型创建一个Context:参数为Pipeline指针和handler,最终addHelper会将Context添加到容器管理起来:
template <class Context> PipelineBase& PipelineBase::addHelper(std::shared_ptr<Context>&& ctx,bool front) { // 先加入总的Context (std::vector<std::shared_ptr<PipelineContext>>) // 该vector种使用的是智能指针,可以保持对Context的引用 ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(),ctx); // 然后根据方向(BOTH、IN、OUT分别加入相应的vector中) // std::vector<PipelineContext*> 这里放的是Context的指针,因为引用在上面的容器中已经保持 if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) { inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(),ctx.get()); } if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) { outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(),ctx.get()); } return *this; }
Context内部包含了Pipeline、Handler,和Handler一样,Context也有方向:BOTH、IN、OUT,首先,无论Context 什么方向,都会在ctxs_容器上添加这个Context,然后会根据Context方向的不同,分别在inCtxs_和outCtxs_上添加该Context。接下来看一下这三个容器的定义:
std::vector<std::shared_ptr<PipelineContext>> ctxs_; // 所有的PipelineContext std::vector<PipelineContext*> inCtxs_; // inbound 类型的PipelineContext std::vector<PipelineContext*> outCtxs_; // outbound 类型的PipelineContext
由于handler的其他操作(addFront、remove等)都是对这三个容器的增删操作,原理一样,此处不再赘述。
PipelineBase中还提供了设置PipelineManager的接口,从字面理解,PipelineManager就是管理Pipeline的接口,其定义如下:
class PipelineManager { public: virtual ~PipelineManager() = default; virtual void deletePipeline(PipelineBase* pipeline) = 0; virtual void refreshTimeout() {}; };
其中,deletePipeline会在显示调用一个pipeline的close方法时被调用,一般用来完成该Pipeline相关的资源释放,而refreshTimeout主要在Pipeline发生读写事件时被回调,主要用来刷新Pipeline的空闲时间。因此,如果你需要监听Pipeline的delete和refresh事件,那么可以自己实现一个PipelineManager并设置到Pipeline上。
在Wangle中没有定义专门的Channel结构,其实Wangle中的Pipeline兼有Channel的功能,比如要判断一个Channel是否还处于连接状态,在Netty中代码如下:
channel.isConnected();
那么Wangle中的Pipeline并没有此类方法可供使用,怎么办呢?其实,Wangle的Pipeline提供了一个更强大的方法:getTransport,该方法可以获得一个底层的AsyncTransport,而该AsyncTransport拥有所有的底层连接信息,比如(仅列出主要接口):
class AsyncTransport : public DelayedDestruction,public AsyncSocketBase { public: typedef std::unique_ptr<AsyncTransport,Destructor> UniquePtr; virtual void close() = 0; virtual void closeNow() = 0; virtual void closeWithReset() { closeNow(); } virtual void shutdownWrite() = 0; virtual void shutdownWriteNow() = 0; virtual bool good() const = 0; virtual bool readable() const = 0; virtual bool isPending() const { return readable(); } virtual bool connecting() const = 0; virtual bool error() const = 0; virtual void attachEventBase(EventBase* eventBase) = 0; virtual void detachEventBase() = 0; virtual bool isDetachable() const = 0; virtual void setSendTimeout(uint32_t milliseconds) = 0; virtual uint32_t getSendTimeout() const = 0; virtual void getLocalAddress(SocketAddress* address) const = 0; virtual void getAddress(SocketAddress* address) const { getLocalAddress(address); } virtual void getPeerAddress(SocketAddress* address) const = 0; virtual ssl::X509UniquePtr getPeerCert() const { return nullptr; } };
至此,PipelineBase中的主要功能分析完毕。
Pipeline是PipelineBase的子类,其具体定义如下:
template <class R,class W = folly::Unit> class Pipeline : public PipelineBase { public: using Ptr = std::shared_ptr<Pipeline>; static Ptr create() { return std::shared_ptr<Pipeline>(new Pipeline()); } ~Pipeline(); // 模板方法 template <class T = R> typename std::enable_if<!std::is_same<T,folly::Unit>::value>::type read(R msg);//front_->read(std::forward<R>(msg)); --> this->handler_->read(this,std::forward<Rin>(msg)); template <class T = R> typename std::enable_if<!std::is_same<T,folly::Unit>::value>::type readEOF();//front_->readEOF(); template <class T = R> typename std::enable_if<!std::is_same<T,folly::Unit>::value>::type readException(folly::exception_wrapper e);//front_->readException(std::move(e)); template <class T = R> typename std::enable_if<!std::is_same<T,folly::Unit>::value>::type transportActive();// front_->transportActive(); template <class T = R> typename std::enable_if<!std::is_same<T,folly::Unit>::value>::type transportInactive();//front_->transportInactive(); template <class T = W> typename std::enable_if<!std::is_same<T,folly::Unit>::value,folly::Future<folly::Unit>>::type write(W msg);//back_->write(std::forward<W>(msg)); template <class T = W> typename std::enable_if<!std::is_same<T,folly::Future<folly::Unit>>::type writeException(folly::exception_wrapper e);//back_->writeException(std::move(e)); template <class T = W> typename std::enable_if<!std::is_same<T,folly::Future<folly::Unit>>::type close();//back_->close() void finalize() override; protected: Pipeline(); explicit Pipeline(bool isStatic); private: bool isStatic_{false}; InboundLink<R>* front_{nullptr};// inbound类型Context(read) OutboundLink<W>* back_{nullptr};// outbound类型Context (write) };
可以看到,Pipeline主要定义和实现了一些和Handler对应的常用方法:read、readEOF、readException、transportActive、transportInactive、write、writeException、close。同时,Pipeline还定义了两个私有成员:front_和back_,从类型可以看出这是两个不同的方向,首先看一下InboundLink定义:
template <class In> class InboundLink { public: virtual ~InboundLink() = default; virtual void read(In msg) = 0; virtual void readEOF() = 0; virtual void readException(folly::exception_wrapper e) = 0; virtual void transportActive() = 0; virtual void transportInactive() = 0; };
可以看出,InboundLink只是把Pipeline主要方法中的IN方向单独抽象出来,都是一个IN事件(输入事件),那么可想而知OutboundLink的定义:
template <class Out> class OutboundLink { public: virtual ~OutboundLink() = default; virtual folly::Future<folly::Unit> write(Out msg) = 0; virtual folly::Future<folly::Unit> writeException( folly::exception_wrapper e) = 0; virtual folly::Future<folly::Unit> close() = 0; };
的确,OutboundLink定义的都是OUT事件类型的操作。
前文在讲PipelineBase时,addBack之类的操作都是只针对那三个容器进行的,没有地方对front_链表和back_链表进行操作啊?其实,front_链表和back_链表的设置是在Pipeline的finalize中完成的:
template <class R,class W> void Pipeline<R,W>::finalize() { front_ = nullptr; if (!inCtxs_.empty()) { front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front()); for (size_t i = 0; i < inCtxs_.size() - 1; i++) { inCtxs_[i]->setNextIn(inCtxs_[i + 1]); } inCtxs_.back()->setNextIn(nullptr); } back_ = nullptr; if (!outCtxs_.empty()) { back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back()); for (size_t i = outCtxs_.size() - 1; i > 0; i--) { outCtxs_[i]->setNextOut(outCtxs_[i - 1]); } outCtxs_.front()->setNextOut(nullptr); } if (!front_) { detail::logWarningIfNotUnit<R>( "No inbound handler in Pipeline,inbound operations will throw " "std::invalid_argument"); } if (!back_) { detail::logWarningIfNotUnit<W>( "No outbound handler in Pipeline,outbound operations will throw " "std::invalid_argument"); } for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) { (*it)->attachPipeline(); } }
代码很简单,以IN方向为例,遍历inCtxs_容器,对容器中的每一个Context调用其setNextIn方法将Context组成一个单向链表front_。同理,outCtxs_最终会变为back_单向链表。最后,还会遍历Context的总容器ctxs_,为每一个Context调用attachPipeline方法,该方法主要工作就是把Context绑定到对应的Handler上(最终是Context和Handler都互相持有对方的引用),还会回调Handler的attachPipeline方法。
此处还有一个细节,Pipeline是一个模板类,具有两个模板参数template <class R,class W = folly::Unit>,分别代表Pipeline的 read(IN事件)的数据类型和write(out事件)数据类型,这些类型的设置要和Pipeline中的handler类型向匹配(后文还会详细讲解)。
下面就以Pipeline中的write方法来看一下事件的流动过程:
template <class R,class W> template <class T> typename std::enable_if < !std::is_same<T,folly::Future<folly::Unit >>::type Pipeline<R,W>::write(W msg) { if (!back_) { throw std::invalid_argument("write(): no outbound handler in Pipeline"); } return back_->write(std::forward<W>(msg)); }
Pipeline的write方法只是简单的调用back_的wirte方法,也就是OUT类型的事件会从Pipeline的最后一个Context依次向前传递(只传递给OUT类型的handler)。
Handler
Handler在继承层次上类似于Pipeline,首先有一个基类HandlerBase,其定义如下:
template <class Context> class HandlerBase { public: virtual ~HandlerBase() = default; virtual void attachPipeline(Context* /*ctx*/) {} virtual void detachPipeline(Context* /*ctx*/) {} // 获取绑定的Context Context* getContext() { if (attachCount_ != 1) { return nullptr; } CHECK(ctx_); return ctx_; } private: friend PipelineContext; // 设置PipelineContext为友元类,便于PipelineContext操作自己 uint64_t attachCount_{0}; // 绑定计数,同一个handler可以被同时绑定到不同的pipeline中 Context* ctx_{nullptr}; // 该Handler绑定的Context };
HandlerBase内部组合了一个绑定的Context指针,并提供了getContext接口用于获取这个Handler绑定的Context。
Handler作为HandlerBase的子类,它具有四个模板参数:Rin、Rout、Win、Wout,其中Rin作为Handler和Context中read方法中消息的数据类型,Rout是作为Context中fireRead方法的参数类型。同理,Win是作为Handler和Context中wirte方法的消息参数类型,而Wout是作为Context中fireWrite的消息参数类型。可以这么理解:Xout是作为以fire开头的事件方法的参数类型。
template <class Rin,class Rout = Rin,class Win = Rout,class Wout = Rin> class Handler : public HandlerBase<HandlerContext<Rout,Wout>> { public: static const HandlerDir dir = HandlerDir::BOTH; // 方向为双向 typedef Rin rin; typedef Rout rout; typedef Win win; typedef Wout wout; typedef HandlerContext<Rout,Wout> Context; // 声明该HandlerContext类型 virtual ~Handler() = default; // inbound类型事件 virtual void read(Context* ctx,Rin msg) = 0; virtual void readEOF(Context* ctx) { ctx->fireReadEOF(); } virtual void readException(Context* ctx,folly::exception_wrapper e) { ctx->fireReadException(std::move(e)); } virtual void transportActive(Context* ctx) { ctx->fireTransportActive(); } virtual void transportInactive(Context* ctx) { ctx->fireTransportInactive(); } // outbound类型事件 virtual folly::Future<folly::Unit> write(Context* ctx,Win msg) = 0; virtual folly::Future<folly::Unit> writeException(Context* ctx,folly::exception_wrapper e) { return ctx->fireWriteException(std::move(e)); } virtual folly::Future<folly::Unit> close(Context* ctx) { return ctx->fireClose(); } };
类似于Pipeline,Handler也相应的定义了inbound类型和outbound类型事件,分别对应方法:read、readEOF、readException、transportActive、transportInactive、write、writeException、close(这些方法和Pipeline中一一对应)。其中,除了read和write两个方法是纯虚接口之外,其他的方法都提供了默认实现:就是将事件进行透传(调用Context里fireXxx方法)。
同理,根据事件类型的不同,还可以进一步细分Handler类型,比如InboundHandler类型为:
// inbound类型的Handler (默认情况下读入和读出的类型是一致) template <class Rin,class Rout = Rin> class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> { public: static const HandlerDir dir = HandlerDir::IN; // 方向为输入 typedef Rin rin; typedef Rout rout; typedef folly::Unit win; typedef folly::Unit wout; typedef InboundHandlerContext<Rout> Context; // 声明inbound类型的InboundHandlerContext virtual ~InboundHandler() = default; // 纯虚函数。由子类实现 virtual void read(Context* ctx,Rin msg) = 0; // 下面的默认实现都是事件的透传 virtual void readEOF(Context* ctx) { ctx->fireReadEOF(); } virtual void readException(Context* ctx,folly::exception_wrapper e) { ctx->fireReadException(std::move(e));// std::move } virtual void transportActive(Context* ctx) { ctx->fireTransportActive(); } virtual void transportInactive(Context* ctx) { ctx->fireTransportInactive(); } };
相应的,OutboundHandler类型定义为:
// outbound类型的Handler (默认写入类型和写出类型一致,如果不一致就会产生很多的转换) template <class Win,class Wout = Win> class OutboundHandler : public HandlerBase<OutboundHandlerContext<Wout>> { public: static const HandlerDir dir = HandlerDir::OUT; // 方向为输出 typedef folly::Unit rin; typedef folly::Unit rout; typedef Win win; typedef Wout wout; typedef OutboundHandlerContext<Wout> Context; virtual ~OutboundHandler() = default; // 纯虚函数。由子类实现 virtual folly::Future<folly::Unit> write(Context* ctx,Win msg) = 0; // 下面的默认实现都是事件的透传 virtual folly::Future<folly::Unit> writeException( Context* ctx,folly::exception_wrapper e) { return ctx->fireWriteException(std::move(e)); } virtual folly::Future<folly::Unit> close(Context* ctx) { return ctx->fireClose(); } };
前文所说,Handler所有的事件方法中只有read和write是纯虚接口,这样用户每次实现自己的Handler时都需要override这两个方法(即使只是完成简单的事件透传),因此,为了方便用户编写自己的Handler,Wangle提供了HandlerAdapter,HandlerAdapter其实很简单,就是以事件透传的方式重写(override)了read个write两个方法。代码如下:
// Handler适配器 template <class R,class W = R> class HandlerAdapter : public Handler<R,R,W,W> { public: typedef typename Handler<R,W>::Context Context; // 将read事件直接进行透传 void read(Context* ctx,R msg) override { ctx->fireRead(std::forward<R>(msg)); } // 将write事件直接进行透传 folly::Future<folly::Unit> write(Context* ctx,W msg) override { return ctx->fireWrite(std::forward<W>(msg)); } };
Context
如前文所述,Pipeline中直接管理的并不是Handler,而是Context,为了便于理解,此处再把Pipeline中的addBack源码列出来:
template <class H> PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) { typedef typename ContextType<H>::type Context;// 声明Conetxt类型,false);// false标识添加到尾部 }
其中,ContextType的定义如下,它会根据Handler的类型(具体来说是方向)决定Context的类型,如果Handler是双向的,那么Context类型为ContextImpl<Handler>,如果Handler的方向为IN,那么Context类型为InboundContextImpl<Handler>,如果Handler的方向为OUT,那么Context类型为OutboundContextImpl<Handler>。
template <class Handler> struct ContextType { // template< bool B,class T,class F > // type T if B == true,F if B == false typedef typename std::conditional < Handler::dir == HandlerDir::BOTH,//如果是双向 ContextImpl<Handler>,//类型就是ContextImpl<Handler> typename std::conditional< //如果不是双向,那么还需要细分 Handler::dir == HandlerDir::IN,//如果是IN类型 InboundContextImpl<Handler>,//那么类型就是InboundContextImpl<Handler> OutboundContextImpl<Handler> //否则就是OutboundContextImpl<Handler> >::type >::type type; // Context类型 };
其实,InboundContextImpl和OutboundContextImpl都是ContextImpl的子类,ContextImpl的继承关系为:
template <class H> class ContextImpl : public HandlerContext<typename H::rout,typename H::wout>,public InboundLink<typename H::rin>,public OutboundLink<typename H::win>,public ContextImplBase<H,HandlerContext<typename H::rout,typename H::wout>>
可以看到,ContextImpl一个继承自四个父类:HandlerContext、InboundLink、OutboundLink和ContextImplBase,其中HandlerContext中主要定义了以fire开头的事件传递方法;InboundLink和OutboundLink分别定义了Handler中Inbound和Outbound类型的方法接口,还记得Pipeline中用于管理IN方向和OUT方向的两个链表:front_和back_,它们就分别是InboundLink和OutboundLink类型;ContextImplBase主要提供了Pipeline中Context在组装链表时的接口,比如:setNextIn、setNextOut,以及用于将Context绑定到handler上的attachPipeline方法。
首先来看HandlerContext基类:
// HandlerContext定义(集inbound和outbound类型于一身) // 以fire开始的方法都是Context中的事件方法 template <class In,class Out> class HandlerContext { public: virtual ~HandlerContext() = default; // inbound类型事件接口 virtual void fireRead(In msg) = 0; virtual void fireReadEOF() = 0; virtual void fireReadException(folly::exception_wrapper e) = 0; virtual void fireTransportActive() = 0; virtual void fireTransportInactive() = 0; // outbound类型事件接口 virtual folly::Future<folly::Unit> fireWrite(Out msg) = 0; virtual folly::Future<folly::Unit> fireWriteException( folly::exception_wrapper e) = 0; virtual folly::Future<folly::Unit> fireClose() = 0; virtual PipelineBase* getPipeline() = 0; virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0; std::shared_ptr<folly::AsyncTransport> getTransport() { return getPipeline()->getTransport(); } virtual void setWriteFlags(folly::WriteFlags flags) = 0; virtual folly::WriteFlags getWriteFlags() = 0; virtual void setReadBufferSettings( uint64_t minAvailable,uint64_t allocationSize) = 0; virtual std::pair<uint64_t,uint64_t> getReadBufferSettings() = 0; };
HandlerContext主要定义了以fire开头的事件传播方法:fireRead、fireReadEOF、fireReadException、fireTransportActive、fireTransportInactive、fireWrite、fireWriteException、fireClose,以及getPipeline用于获取Context绑定的Pipeline、getPipelineShared以智能指针的形式获取Pipeline、getTransport用于获取Pipeline对应的Transport。
根据事件流向的不同,Context也可以细分定义,InboundHandlerContext定义为:
// inbound 类型的InboundHandlerContext template <class In> class InboundHandlerContext { public: virtual ~InboundHandlerContext() = default; virtual void fireRead(In msg) = 0; virtual void fireReadEOF() = 0; virtual void fireReadException(folly::exception_wrapper e) = 0; virtual void fireTransportActive() = 0; virtual void fireTransportInactive() = 0; virtual PipelineBase* getPipeline() = 0; virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0; std::shared_ptr<folly::AsyncTransport> getTransport() { return getPipeline()->getTransport(); } };
同理,OutboundHandlerContext定义为:
// outbound 类型的OutboundHandlerContext template <class Out> class OutboundHandlerContext { public: virtual ~OutboundHandlerContext() = default; virtual folly::Future<folly::Unit> fireWrite(Out msg) = 0; virtual folly::Future<folly::Unit> fireWriteException( folly::exception_wrapper e) = 0; virtual folly::Future<folly::Unit> fireClose() = 0; virtual PipelineBase* getPipeline() = 0; virtual std::shared_ptr<PipelineBase> getPipelineShared() = 0; std::shared_ptr<folly::AsyncTransport> getTransport() { return getPipeline()->getTransport(); } };
如前文所述,PipelineContext主要定义了如何在Pipeline中组织Context链表的操作接口,比如setNextIn用于设置下一个IN类型的Context,setNextOut用来设置下一个OUT类型Context,具体定义如下:
class PipelineContext { public: virtual ~PipelineContext() = default; // 依附到一个pipeline中 virtual void attachPipeline() = 0; // 从pipeline中分离 virtual void detachPipeline() = 0; // 将一个HandlerContext绑定到handler上 template <class H,class HandlerContext> void attachContext(H* handler,HandlerContext* ctx) { // 只有第一次绑定的时候才会设置 if (++handler->attachCount_ == 1) { handler->ctx_ = ctx; } else { // 为何在此设置的时候就为nullptr handler->ctx_ = nullptr; } } // 设置下一个inbound类型的Context virtual void setNextIn(PipelineContext* ctx) = 0; // 设置下一个outbound类型的Context virtual void setNextOut(PipelineContext* ctx) = 0; // 获取方向(Context方向依赖于Handler方向) virtual HandlerDir getDirection() = 0; };
ContextImplBase主要实现了PipelineContext接口方法,同时它的两个成员:nextIn_和nextOut_就是链表的指针,用来串联起整个Context。
template <class H,class Context> class ContextImplBase : public PipelineContext { public: ~ContextImplBase() = default; // 获取Context绑定的Handler H* getHandler() { return handler_.get(); } // Context初始化,参数为Context所属的Pipeline weak_ptr,Context要绑定的Handler shared_ptr void initialize(std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) { pipelineWeak_ = pipeline; pipelineRaw_ = pipeline.lock().get();//裸指针 handler_ = std::move(handler); } // PipelineContext overrides void attachPipeline() override { // 如果该Context还没有被绑定 if (!attached_) { this->attachContext(handler_.get(),impl_);// 将该Context绑定到handler上 handler_->attachPipeline(impl_); // 调用Handler的attachPipeline,有具体的Handler实现 attached_ = true;//标记Context已经attached到一个pipeline中 } } // 从pipeline中分离 void detachPipeline() override { handler_->detachPipeline(impl_);// 调用Handler的detachPipeline,有具体的Handler实现 // 依附标志位为false attached_ = false; } void setNextIn(PipelineContext* ctx) override { if (!ctx) { nextIn_ = nullptr; return; } // 转成InboundLink,因为Context是InboundLink子类 auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx); if (nextIn) { nextIn_ = nextIn; } else { throw std::invalid_argument(folly::sformat( "inbound type mismatch after {}",folly::demangle(typeid(H)))); } } void setNextOut(PipelineContext* ctx) override { if (!ctx) { nextOut_ = nullptr; return; } auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx); if (nextOut) { nextOut_ = nextOut; } else { throw std::invalid_argument(folly::sformat( "outbound type mismatch after {}",folly::demangle(typeid(H)))); } } // 获取Context的方向 HandlerDir getDirection() override { return H::dir; } protected: Context* impl_; // 具体的Context实现 std::weak_ptr<PipelineBase> pipelineWeak_; // PipelineBase* pipelineRaw_; // 该Context绑定的pipeline std::shared_ptr<H> handler_; // 该Context包含的Handler InboundLink<typename H::rout>* nextIn_{nullptr}; // 下一个inbound类型的Context地址 OutboundLink<typename H::wout>* nextOut_{nullptr}; // 下一个outbound类型的Context地址 private: bool attached_{false}; // 这个Context是否已经被绑定 };
ContextImpl就是最终的Context实现,也就是要被添加到Pipeline中(比如使用addBack)的容器(ctxs_,inCtxs_,outCtxs_)的最终Context,在最后的finalize方法中还会进一步将容器中的Context组装成front_和back_单向链表。
ContextImpl的主要功能就是实现了各种事件传递方法(以fire开头的方法),以fireRead为例,这是一个IN类型的事件,由于Context中持有的Pipeline是一个weak类型的指针,因此先尝试lock,保证在事件传播阶段这个Pipeline不会销毁,然后会去调用下一个IN类型的Context的read方法。read方法是InboundLink中定义的接口(注意这里的read不是Handler中的也不是Pipeline中的),ContextImpl的也实现了这个read方法,它的功能很简单,首先还是先lock住这个Pipeline,然后直接调用Context内部包含的Handler的read方法。
template <class H> class ContextImpl : public HandlerContext<typename H::rout,typename H::wout>> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; typedef typename H::win Win; typedef typename H::wout Wout; static const HandlerDir dir = HandlerDir::BOTH; explicit ContextImpl(std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) { this->impl_ = this;//实现就是自己 this->initialize(pipeline,std::move(handler));//初始化 } // For StaticPipeline ContextImpl() { this->impl_ = this; } ~ContextImpl() = default; // HandlerContext overrides // Inbound类型的事件:read事件 void fireRead(Rout msg) override { auto guard = this->pipelineWeak_.lock();// 锁住,确保一旦锁住成功,在操作期间,pipeline不会被销毁 // 如果还没有到最后 if (this->nextIn_) { // 将事件继续向下传播(传给下一个Inbound类型的Context) // 注意:这里调用的是下一个Contex的read而不是fireRead // 即调用下一个Context里面的Handler方法 this->nextIn_->read(std::forward<Rout>(msg)); } else { LOG(WARNING) << "read reached end of pipeline"; } } void fireReadEOF() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->readEOF(); } else { LOG(WARNING) << "readEOF reached end of pipeline"; } } void fireReadException(folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->readException(std::move(e)); } else { LOG(WARNING) << "readException reached end of pipeline"; } } void fireTransportActive() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->transportActive(); } } void fireTransportInactive() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->transportInactive(); } } //Outbound类型的事件传播 folly::Future<folly::Unit> fireWrite(Wout msg) override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->write(std::forward<Wout>(msg)); } else { LOG(WARNING) << "write reached end of pipeline"; // 如果到了最后,返回一个future return folly::makeFuture(); } } folly::Future<folly::Unit> fireWriteException( folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->writeException(std::move(e)); } else { LOG(WARNING) << "close reached end of pipeline"; return folly::makeFuture(); } } folly::Future<folly::Unit> fireClose() override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->close(); } else { LOG(WARNING) << "close reached end of pipeline"; return folly::makeFuture(); } } // 获取Context绑定的pipeline指针 PipelineBase* getPipeline() override { return this->pipelineRaw_; } // 获取Context绑定的pipeline引用 std::shared_ptr<PipelineBase> getPipelineShared() override { return this->pipelineWeak_.lock(); } // 设置和获取wirte标志位 void setWriteFlags(folly::WriteFlags flags) override { this->pipelineRaw_->setWriteFlags(flags); } folly::WriteFlags getWriteFlags() override { return this->pipelineRaw_->getWriteFlags(); } // 设置read缓冲区参数 minAvailable、allocationSize void setReadBufferSettings( uint64_t minAvailable,uint64_t allocationSize) override { this->pipelineRaw_->setReadBufferSettings(minAvailable,allocationSize); } std::pair<uint64_t,uint64_t> getReadBufferSettings() override { return this->pipelineRaw_->getReadBufferSettings(); } // InboundLink overrides void read(Rin msg) override { // 保证pipeline不会被删除 auto guard = this->pipelineWeak_.lock(); // 调用该Context绑定的Handler的read方法,至于事件是都需要继续传播,完全受read中的实现 this->handler_->read(this,std::forward<Rin>(msg)); } void readEOF() override { auto guard = this->pipelineWeak_.lock(); this->handler_->readEOF(this); } void readException(folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); this->handler_->readException(this,std::move(e)); } void transportActive() override { auto guard = this->pipelineWeak_.lock(); this->handler_->transportActive(this); } void transportInactive() override { auto guard = this->pipelineWeak_.lock(); this->handler_->transportInactive(this); } // OutboundLink overrides folly::Future<folly::Unit> write(Win msg) override { auto guard = this->pipelineWeak_.lock(); return this->handler_->write(this,std::forward<Win>(msg)); } folly::Future<folly::Unit> writeException( folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); return this->handler_->writeException(this,std::move(e)); } folly::Future<folly::Unit> close() override { auto guard = this->pipelineWeak_.lock(); return this->handler_->close(this); } };
同样,Context也可以根据传输方向进行细分,首先是InboundContextImpl:
template <class H> class InboundContextImpl : public InboundHandlerContext<typename H::rout>,InboundHandlerContext<typename H::rout>> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; typedef typename H::win Win; typedef typename H::wout Wout; static const HandlerDir dir = HandlerDir::IN; explicit InboundContextImpl( std::weak_ptr<PipelineBase> pipeline,std::shared_ptr<H> handler) { this->impl_ = this; this->initialize(pipeline,std::move(handler)); } // For StaticPipeline InboundContextImpl() { this->impl_ = this; } ~InboundContextImpl() = default; // InboundHandlerContext overrides void fireRead(Rout msg) override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->read(std::forward<Rout>(msg)); } else { LOG(WARNING) << "read reached end of pipeline"; } } void fireReadEOF() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->readEOF(); } else { LOG(WARNING) << "readEOF reached end of pipeline"; } } void fireReadException(folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->readException(std::move(e)); } else { LOG(WARNING) << "readException reached end of pipeline"; } } void fireTransportActive() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->transportActive(); } } void fireTransportInactive() override { auto guard = this->pipelineWeak_.lock(); if (this->nextIn_) { this->nextIn_->transportInactive(); } } PipelineBase* getPipeline() override { return this->pipelineRaw_; } std::shared_ptr<PipelineBase> getPipelineShared() override { return this->pipelineWeak_.lock(); } // InboundLink overrides void read(Rin msg) override { auto guard = this->pipelineWeak_.lock(); this->handler_->read(this,std::move(e)); } void transportActive() override { auto guard = this->pipelineWeak_.lock(); this->handler_->transportActive(this); } void transportInactive() override { auto guard = this->pipelineWeak_.lock(); this->handler_->transportInactive(this); } };
其次是OutboundContextImpl:
template <class H> class OutboundContextImpl : public OutboundHandlerContext<typename H::wout>,OutboundHandlerContext<typename H::wout>> { public: typedef typename H::rin Rin; typedef typename H::rout Rout; typedef typename H::win Win; typedef typename H::wout Wout; static const HandlerDir dir = HandlerDir::OUT; explicit OutboundContextImpl( std::weak_ptr<PipelineBase> pipeline,std::move(handler)); } // For StaticPipeline OutboundContextImpl() { this->impl_ = this; } ~OutboundContextImpl() = default; // OutboundHandlerContext overrides folly::Future<folly::Unit> fireWrite(Wout msg) override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->write(std::forward<Wout>(msg)); } else { LOG(WARNING) << "write reached end of pipeline"; return folly::makeFuture(); } } folly::Future<folly::Unit> fireWriteException( folly::exception_wrapper e) override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->writeException(std::move(e)); } else { LOG(WARNING) << "close reached end of pipeline"; return folly::makeFuture(); } } folly::Future<folly::Unit> fireClose() override { auto guard = this->pipelineWeak_.lock(); if (this->nextOut_) { return this->nextOut_->close(); } else { LOG(WARNING) << "close reached end of pipeline"; return folly::makeFuture(); } } PipelineBase* getPipeline() override { return this->pipelineRaw_; } std::shared_ptr<PipelineBase> getPipelineShared() override { return this->pipelineWeak_.lock(); } // OutboundLink overrides folly::Future<folly::Unit> write(Win msg) override { auto guard = this->pipelineWeak_.lock(); return this->handler_->write(this,std::move(e)); } folly::Future<folly::Unit> close() override { auto guard = this->pipelineWeak_.lock(); return this->handler_->close(this); } };
按照惯例,还是来一张图总结一下吧:
本系列文章
Wangle源码分析:EventBaseHandler、AsyncSocketHandler