cocos2d--WebSocket分析

前端之家收集整理的这篇文章主要介绍了cocos2d--WebSocket分析前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

WebSocket初始化之后,就可以send了,创建一个新的线程并且循环udpate,线程函数循环onSubThreadLoop,update发送消息给Delegate

线程函数循环onSubThreadLoop 判断是否要destory或者触发拿发送的数据

int WebSocket::onSubThreadLoop()
{
    if (_readyState == State::CLOSED || _readyState == State::CLOSING)
    {
        libwebsocket_context_destroy(_wsContext);
        // return 1 to exit the loop.
        return 1;
    }

    if (_wsContext && _readyState != State::CLOSED && _readyState != State::CLOSING)
    {
        libwebsocket_service(_wsContext,0);//触发LWS_CALLBACK_CLIENT_WRITEABLE,去拿发送到server的数据
    }

    // Sleep 50 ms
    std::this_thread::sleep_for(std::chrono::milliseconds(50));

    // return 0 to continue the loop.
    return 0;
}

//WebSocket收到消息
int WebSocket::onSocketCallback(struct libwebsocket_context *ctx,struct libwebsocket *wsi,int reason,void *user,void *in,ssize_t len)
{
    //CCLOG("socket callback for %d reason",reason);
    CCASSERT(_wsContext == nullptr || ctx == _wsContext,"Invalid context.");
    CCASSERT(_wsInstance == nullptr || wsi == nullptr || wsi == _wsInstance,"Invaild websocket instance.");

    switch (reason) 
    {
        case LWS_CALLBACK_DEL_POLL_FD:
        case LWS_CALLBACK_PROTOCOL_DESTROY:
        case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
            {
                WsMessage* msg = nullptr;
                if (reason == LWS_CALLBACK_CLIENT_CONNECTION_ERROR
                    || (reason == LWS_CALLBACK_PROTOCOL_DESTROY && _readyState == State::CONNECTING)
                    || (reason == LWS_CALLBACK_DEL_POLL_FD && _readyState == State::CONNECTING)
                    )
                {
                    msg = new (std::nothrow) WsMessage();
                    msg->what = WS_MSG_TO_UITHREAD_ERROR;
                    _readyState = State::CLOSING;  //先设置为CLOSING,下一次循环的时候,会destory,才会变成CLOSE
                }
                else if (reason == LWS_CALLBACK_PROTOCOL_DESTROY && _readyState == State::CLOSING)
                {
                    msg = new (std::nothrow) WsMessage();
                    msg->what = WS_MSG_TO_UITHREAD_CLOSE;
                }

                if (msg)
                {
                    _wsHelper->sendMessageToUIThread(msg);
                }
            }
            break;
        case LWS_CALLBACK_CLIENT_ESTABLISHED:
            {
                WsMessage* msg = new (std::nothrow) WsMessage();
                msg->what = WS_MSG_TO_UITHREAD_OPEN;
                _readyState = State::OPEN;

                /* * start the ball rolling,* LWS_CALLBACK_CLIENT_WRITEABLE will come next service */
                //每次libwebsocket_service(loop调用)之后,会触发LWS_CALLBACK_CLIENT_WRITEABLE
                libwebsocket_callback_on_writable(ctx,wsi);  
                _wsHelper->sendMessageToUIThread(msg);
            }
            break;


        case LWS_CALLBACK_CLIENT_WRITEABLE:
            {

                std::lock_guard<std::mutex> lk(_wsHelper->_subThreadWsMessageQueueMutex);

                std::list<WsMessage*>::iterator iter = _wsHelper->_subThreadWsMessageQueue->begin();

                int bytesWrite = 0;
                for (; iter != _wsHelper->_subThreadWsMessageQueue->end();)
                {
                    WsMessage* subThreadMsg = *iter;

                    if ( WS_MSG_TO_SUBTRHEAD_SENDING_STRING == subThreadMsg->what
                      || WS_MSG_TO_SUBTRHEAD_SENDING_BINARY == subThreadMsg->what)
                    {
                        Data* data = (Data*)subThreadMsg->obj;

                        const size_t c_bufferSize = WS_WRITE_BUFFER_SIZE;

                        size_t remaining = data->len - data->issued; // 有可能一次发不完,分多次
                        size_t n = std::min(remaining,c_bufferSize );
                        //fixme: the log is not thread safe
// CCLOG("[websocket:send] total: %d,sent: %d,remaining: %d,buffer size: %d",static_cast<int>(data->len),static_cast<int>(data->issued),static_cast<int>(remaining),static_cast<int>(n));

                        //数据前后加PADDING
                        unsigned char* buf = new unsigned char[LWS_SEND_BUFFER_PRE_PADDING + n + LWS_SEND_BUFFER_POST_PADDING];

                        memcpy((char*)&buf[LWS_SEND_BUFFER_PRE_PADDING],data->bytes + data->issued,n);

                        int writeProtocol;

                        if (data->issued == 0) {  //第一次发送指定writeProtocol为LWS_WRITE_TEXT或者LWS_WRITE_BINARY
                            if (WS_MSG_TO_SUBTRHEAD_SENDING_STRING == subThreadMsg->what)
                            {
                                writeProtocol = LWS_WRITE_TEXT;
                            }
                            else
                            {
                                writeProtocol = LWS_WRITE_BINARY;
                            }

                            // If we have more than 1 fragment
                            if (data->len > c_bufferSize)
                                writeProtocol |= LWS_WRITE_NO_FIN;  //说明这还不是最后部分数据
                        } else {
                            // we are in the middle of fragments
                            writeProtocol = LWS_WRITE_CONTINUATION;  //不是第一个发送的部分。
                            // and if not in the last fragment
                            if (remaining != n)
                                writeProtocol |= LWS_WRITE_NO_FIN;  //说明这还不是最后部分数据
                        }
                        //发送数据,设置writeProtocol类型,具体解析交给WebSocket去做吧。
                        bytesWrite = libwebsocket_write(wsi,&buf[LWS_SEND_BUFFER_PRE_PADDING],n,(libwebsocket_write_protocol)writeProtocol);
                        //fixme: the log is not thread safe
// CCLOG("[websocket:send] bytesWrite => %d",bytesWrite);

                        // Buffer overrun?
                        if (bytesWrite < 0)
                        {
                            break;
                        }
                        // Do we have another fragments to send?
                        else if (remaining != n) //没有全部发送,还有一些,记录以及发送的数据issued,下次跳过issued这么多数据
                        {
                            data->issued += n;
                            break;
                        }
                        // Safely done!
                        else  //说明本次的data全部发送完成,移除之
                        {
                            CC_SAFE_DELETE_ARRAY(data->bytes);
                            CC_SAFE_DELETE(data);
                            CC_SAFE_DELETE_ARRAY(buf);
                            _wsHelper->_subThreadWsMessageQueue->erase(iter++);
                            CC_SAFE_DELETE(subThreadMsg);
                        }
                    }
                }

                /* get notified as soon as we can write again */

                libwebsocket_callback_on_writable(ctx,wsi);
            }
            break;

        case LWS_CALLBACK_CLOSED:
            {
                //fixme: the log is not thread safe
// CCLOG("%s","connection closing..");

                _wsHelper->quitSubThread();

                if (_readyState != State::CLOSED)
                {
                    WsMessage* msg = new (std::nothrow) WsMessage();
                    _readyState = State::CLOSED;
                    msg->what = WS_MSG_TO_UITHREAD_CLOSE;
                    _wsHelper->sendMessageToUIThread(msg);
                }
            }
            break;

        case LWS_CALLBACK_CLIENT_RECEIVE:  //有新的数据来了
            {
                if (in && len > 0)
                {
                    // Accumulate the data (increasing the buffer as we go)
                    if (_currentDataLen == 0)
                    {
                        _currentData = new char[len];
                        memcpy (_currentData,in,len);
                        _currentDataLen = len;
                    }
                    else
                    {//分配更多的内存,并且保存之前的数据
                        char *new_data = new char [_currentDataLen + len];
                        memcpy (new_data,_currentData,_currentDataLen);
                        memcpy (new_data + _currentDataLen,len);
                        CC_SAFE_DELETE_ARRAY(_currentData);
                        _currentData = new_data;
                        _currentDataLen = _currentDataLen + len;
                    }

                    _pendingFrameDataLen = libwebsockets_remaining_packet_payload (wsi);//说明还有滞留的数据,下次才能收到

                    if (_pendingFrameDataLen > 0)
                    {
                        //CCLOG("%ld bytes of pending data to receive,consider increasing the libwebsocket rx_buffer_size value.",_pendingFrameDataLen);
                    }

                    // If no more data pending,send it to the client thread
                    if (_pendingFrameDataLen == 0)//为0,说明没有更多的数据
                    {
                        WsMessage* msg = new (std::nothrow) WsMessage();
                        msg->what = WS_MSG_TO_UITHREAD_MESSAGE;

                        char* bytes = nullptr;
                        Data* data = new (std::nothrow) Data();

                        if (lws_frame_is_binary(wsi))
                        {

                            bytes = new char[_currentDataLen];
                            data->isBinary = true;
                        }
                        else
                        {
                            bytes = new char[_currentDataLen+1];
                            bytes[_currentDataLen] = '\0';
                            data->isBinary = false;
                        }

                        memcpy(bytes,_currentDataLen);

                        data->bytes = bytes;
                        data->len = _currentDataLen;
                        msg->obj = (void*)data;

                        CC_SAFE_DELETE_ARRAY(_currentData);
                        _currentData = nullptr;
                        _currentDataLen = 0;

                        _wsHelper->sendMessageToUIThread(msg);
                    }
                }
            }
            break;
        default:
            break;

    }

    return 0;
}

}

猜你在找的Cocos2d-x相关文章