从零开始学习比特币(五):P2P网络建立的流程之套接字的读取和发送

尚力财经 197 0

从这一节开始,我们将开始解释P2P网络是如何在比特币系统中建立的。还记得上次讲解比特币系统启动第12步的时候,我们提到几个线程相关的进程很重要吗?基于此,本节进行详细说明。

P2P网络的建立是在上一篇文章的系统启动第12步最后时刻调用CConnman:Start方法开始的。

这一部分的内容在net.cpp、net_processing.cpp等文件中

接下来开始讲解各个线程的具体处理。

该线程主要用于处理套接字的读取和发送,调用系统提供的相关底层函数进行处理,将接收到的消息转换为CNetMessage类型对象,保存在节点的vRecvMsg集合中,将待发送的消息从vSendMsg集合中取出进行发送。

线程在net.cpp文件的第1155行中定义。先来具体解读一下。

这个方法的主体是一个while循环,只要interruptNet变量为空,它就会一直循环下去。

遍历所有节点列表,将未断开的节点设置为断开。

如果(!fNetworkActive) { //断开(CNode* pnode : vNodes) { if(!pnode-fDisconnect){ log print(BC log:NET,'网络不活动,丢弃对等=%d 'pnode-GetId());pnode-fDisconnect=true;}} }

遍历所有节点列表。如果节点已经断开连接,请执行以下处理:

首先,从vNodes集合中删除当前节点。然后,调用CloseSocketDisconnect方法关闭当前节点的套接字并清理。该方法在内部将fDisconnect变量设置为true如果当前套接字有效,则调用CloseSocket方法关闭套接字。closeSocket方法调用win32系统的closesocket方法,其他系统调用Close关闭套接字,然后将套接字设置为INVALID_SOCKET。具体方法如下:

?` ` void CNode:CloseSocketDisconnect(){ f disconnect=true;LOCK(cs _ h socket);if (hSocket!=INVALID _ SOCKET){ log print(BC log:NET,'断开对等端=%d 'id);close socket(h socket);}}然后,调用释演法将节点的引用数nRefCount减一。最后,将当前节点放入vNodesDisconnected集合中。

遍历所有断开连接的节点的集合vNodesDisconnected。如果当前节点nRefCount的引用数小于等于0,则进行如下处理:

调用宏TRY_LOCK获取cs_inventory lock。如果可以获取,则获取cs_vSend锁。如果可以获得,则将变量fDelete设置为true。如果变量fDelete为true,则从vNodesDisconnected集合中移除当前节点,然后调用DeleteNode方法删除该节点。让我们解释一下DeleteNode方法。在方法内部,首先将变量fUpdateConnectionTime设置为true,然后调用消息处理接口NetEventsInterface的FinalizeNode方法,最后处理节点。如果变量fUpdateConnectionTime在FinalizeNode方法中设置为true,则调用地址管理器对象的Connected方法进行处理。最后,删除当前节点。

代码如下:

` void cconnman:delete node(cnode * pnode){ assert(pnode);bool 尚力财经小编2022 fUpdateConnectionTime=false;m _ msg proc-finalize node(pnode-GetId(),fUpdateConnectionTime);if(fUpdateConnectionTime){ addr man。已连接(pnode-addr);}删除pnode}

FinalizeNode方法是一个纯虚函数,由net_processing.cpp文件中PeerLogicValidation对象的FinalizeNode方法实现。接下来,我们来看看这个函数的处理。

设置参数fUpdateConnectionTime默认为真。调用State方法获取节点状态对象的指针。 根据节点ID,该方法从mapNodeState集合中查找并返回相应的节点状态对象。如果它不存在,则返回一个空指针。如果我们已经在这个对等节点上同步了块头,也就是说,节点状态对象的fSyncStarted属性为true,那么变量nSyncStarted被设置为减一。如果对等节点的bad评分,即节点状态对象的nMisbehavior属性等于0,对等节点已经建立了完整的连接,即节点状态对象的fCurrentlyConnected属性为true,则将变量fUpdateConnectionTime设置为true。遍历status对象的vBlocksInFlight集合并删除所有条目。调用EraseOrphansFor方法删除与此对等节点关联的孤立事务。该方法在内部遍历mapOrphanTransactions集合,如果当前孤立事务来自指定的对等节点,则调用EraseOrphanTx方法将其删除。

后者根据事务的哈希ID搜索对应于mapOrphanTransactions集的孤立事务。如果找不到,它将返回。如果找到,则遍历事务的所有输入,如果当前输入指向的父输出不在mapOrphanTransactionsByPrev集合中,则处理下一个;否则,从返回的集合中移尚力财经小编2022除指定的孤立事务,并且如果指向的父输出现在为空,则从mapOrphanTransactionsByPrev集合中删除指向的父输出。从mapOrphanTransactions中删除指定的孤立事务。

` int static EraseOrphanTx(uint 256 hash)EXCLUSIVE _ LOCKS _ requireD(g _ cs _ orbonds){ STD:map:iterator it=maporphantransactions . find(hash);if(it==maporphantransactions . end())返回0;for(const CTX in txin:it-second . tx-vin){ STD:setauto it prev=maporphantransactionsbyprev . find(txin . prev out);if(it prev==maporphantransactionsbyprev . end())继续;it prev-second . erase(it);if(it prev-second . empty())maporphantransactionsbyprev . erase(it prev);} maporphantransactions . erase(it);返回1;}从优先级下载块的对等节点变量nPreferredDownload中减去status对象对应的fprefereddownload属性。此属性是一个布尔值,它指示该节点是否是优先下载块。这里C布尔值自动转换为整数值,真值转换为1,假值转换为0。下载块中的节点数由nPeersWithValidatedDownloads变量处理。如果节点状态对象的nBlocksInFlightValidHeaders属性不等于0,尚力财经小编2022则下载该块的节点数为负1,否则为负0。接下来,处理变量G _ outbound _ peers _ with _ protect _ from _ disconnect,它表示出站对等节点的数量。代码很简单,不做解释。从节点状态集合mapNodeState中删除指定的节点ID。{ LOCK(cs _ vNodes);vNodesSize=vnodes . size();} if(vNodesSize!=nPrevNodeCount){ nPrevNodeCount=vnode ssize;if(client interface)client interface-notifynumbconnectionschanged(nPrevNodeCount);}

生成相关的时间变量和三个fd_set集合来处理接收的数据、发送的数据和数据错误,然后将集合初始化为空集。

结构timeval超时;time out . TV _ sec=0;timeout.tv _ usec=50000//轮询pnode的频率-vSendfd _ set fdsetRecv;fd _ set fdsetSendfd _ set fdsetErrorFD _ ZERO(fdsetRecv);FD _ ZERO(fdsetSend);FD _ ZERO(fdsetError);

遍历当前处于监听状态的Socket vhListenSocket集合,调用FD_SET方法,将当前socket保存在fdsetRecv集合中,然后调用标准库的max方法保存最大的socket,将变量have_fds设置为true。

for(const listen socket hListenSocket:vhListenSocket){ FD _ SET(hListenSocket . socket,fdsetRecv);hSocketMax=std:max(hSocketMax,hlistensocket . socket);have _ fds=true}

遍历所有节点,设置相关变量。

for(CNode * pnode:vNodes){ bool select _ recv=!pnode-fPauseRecv;布尔select _ send { LOCK(pnode-cs _ vSend);select_send=!pnode-vsendmsg。empty();} LOCK(pnode-cs _ h socket);if(pnode-h SOCKET==INVALID _ SOCKET)继续;FD_SET(pnode-hSocket,fdsetError);hSocketMax=std:max(hSocketMax,pnode-hSocket);have _ FDS=true if(select _ send){ FD _ SET(pnode-h socket,fdsetSend);继续;} if(select _ recv){ FD _ SET(pnode-h socket,fdsetRecv);}}

调用挑选方法进行连接。int nSelect=select(have_fds?hSocketMax 1 : 0,fdsetRecv,fdsetSend,fdsetError,超时);

接下来,检查挑选调用是否出错。if

if(n select==SOCKET _ ERROR){ if(have _ FDS){ int nErr=WSAGetLastError();LogPrintf('套接字选择错误“%s ”,网络错误字符串(nErr));对于(无符号int I=0;i=hSocketMaxi ) FD_SET(i,fdsetRecv);} FD _ ZERO(fdsetSend);FD _ ZERO(fdsetError);如果(!中断网络。sleep _ for(STD:chrono:毫秒(timeout.tv_usec/1000)))返回;}

6 .接下来,接收新的连接遍历所有监听的套接字,如果当前套接字有效,并且套接字在在接收数据的集合中,即新的连接请求进来,则调用 AcceptConnection 方法进行处理。

for (const ListenSocket& hListenSocket : vhListenSocket){ if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv)) { AcceptConnection(hListenSocket); }}

AcceptConnection 方法处理远程对等节点的连接逻辑,具体如下:

调用 accept 方法,接受客户端连接。SOCKET hSocket = accept(hListenSocket.socket, (struct sockaddr*)&sockaddr, &len);计算最大的入站连接数int nMaxInbound = nMaxConnections - (nMaxOutbound + nMaxFeeler);如果连接成功,那么调用地址对象的 SetSockAddr 方法来保存保存对等节点的地址。if (hSocket != INVALID_SOCKET) { if (!addr.SetSockAddr((const struct sockaddr*)&sockaddr)) { LogPrintf("Warning: Unknown socket family "); }}遍历对等节点列表,如果当前对等节点属于入站类型,则变量 nInbound 加 1。{ LOCK(cs_vNodes); for (const CNode* pnode : vNodes) { if (pnode->fInbound) nInbound++; }}如果连接不成功,则直接退出。if (hSocket == INVALID_SOCKET){ int nErr = WSAGetLastError(); if (nErr != WSAEWOULDBLOCK) LogPrintf("socket error accept failed: %s ", NetworkErrorString(nErr)); return;}如果网络是不活跃的,则关闭套接字并返回。if (!fNetworkActive) { LogPrintf("connection from %s dropped: not accepting new connections ", addr.ToString()); CloseSocket(hSocket); return;}如果是不可连接的,则关闭套接字并返回。if (!IsSelectableSocket(hSocket)){ LogPrintf("connection from %s dropped: non-selectable socket ", addr.ToString()); CloseSocket(hSocket); return;}

IsSelectableSocket 方法是一个内联方法,如果 Win32 则直接返回真,否则如果 select 函数返回值小于 FD_SETSIZE 则返回真,否则返回假。如果入站数量已经达到最大的入站数量,则调用 AttemptToEvictConnection 方法,找到要退出的连接。如果找不到则关闭套接字并返回。if (nInbound >= nMaxInbound){ if (!AttemptToEvictConnection()) { LogPrint(BCLog::NET, "failed to find an eviction candidate - connection dropped (full) "); CloseSocket(hSocket); return; }}生成节点对象,并进行相关设置,然后加入节点集合中 vNodes。NodeId id = GetNewNodeId(); uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); CAddress addr_bind = GetBindAddress(hSocket); CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, addr_bind, "", true); pnode->AddRef(); pnode->fWhitelisted = whitelisted; m_msgproc->InitializeNode(pnode); LogPrint(BCLog::NET, "connection from %s accepted ", addr.ToString()); { LOCK(cs_vNodes); vNodes.push_back(pnode); }

7. 处理节点的引用数量AddRef 方法会把 nRefCount 加1。

8. 遍历所有的节点进行收发信息处理。如果读取的数量大于0,则:调用 ReceiveMsgBytes 方法,从缓冲区中读取给定数量的数据,并生成 CNetMessage 对象。如果出错,则调用 CloseSocketDisconnect 方法,关闭套接字并断开连接。

如果读取的数量等于0,即远程节点已经关闭,则:调用 CloseSocketDisconnect 方法,关闭套接字并断开连接。

如果读取的数量小于0,即读取过程中出错,则:调用 CloseSocketDisconnect 方法,关闭套接字并断开连接。

具体代码如下:

if (recvSet || errorSet){ // typical socket buffer is 8K-64K char pchBuf[0x10000]; int nBytes = 0; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) continue; nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); } if (nBytes > 0) { bool notify = false; if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); if (notify) { size_t nSizeAdded = 0; auto it(pnode->vRecvMsg.begin()); for (; it != pnode->vRecvMsg.end(); ++it) { if (!it->complete()) break; nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; } { LOCK(pnode->cs_vProcessMsg); pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); pnode->nProcessQueueSize += nSizeAdded; pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; } WakeMessageHandler(); } } else if (nBytes == 0) { // socket closed gracefully if (!pnode->fDisconnect) { LogPrint(BCLog::NET, "socket closed "); } pnode->CloseSocketDisconnect(); } else if (nBytes fDisconnect) LogPrintf("socket recv error %s ", NetworkErrorString(nErr)); pnode->CloseSocketDisconnect(); } }}如果当前节点在发送集合中,则进行如下处理。if (sendSet){ LOCK(pnode->cs_vSend); size_t nBytes = SocketSendData(pnode); if (nBytes) { RecordBytesSent(nBytes); }}

SocketSendData 方法主要逻辑是遍历节点的发送消息集合 vSendMsg,然后调用 send 方法发送每一个消息,并针对发送正确与否进行处理;同时从发送消息集合 vSendMsg 对应的消息。

size_t CConnman::SocketSendData(CNode *pnode) const { auto it = pnode->vSendMsg.begin(); size_t nSentSize = 0; while (it != pnode->vSendMsg.end()) { const auto &data = *it; assert(data.size() > pnode->nSendOffset); int nBytes = 0; { LOCK(pnode->cs_hSocket); if (pnode->hSocket == INVALID_SOCKET) break; nBytes = send(pnode->hSocket, reinterpret_cast(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); } if (nBytes > 0) { pnode->nLastSend = GetSystemTimeInSeconds(); pnode->nSendBytes += nBytes; pnode->nSendOffset += nBytes; nSentSize += nBytes; if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; pnode->nSendSize -= data.size(); pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; it++; } else { // could not send full message; stop sending more break; } } else { if (nBytes CloseSocketDisconnect(); } } // couldn't send anything at all break; } } if (it == pnode->vSendMsg.end()) { assert(pnode->nSendOffset == 0); assert(pnode->nSendSize == 0); } pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); return nSentSize; }

9. 接下来对节点的活跃性进行检查。10. 遍历所有节点,减少引用数。Release 方法把 nRefCount 参数减1。


我是区小白,Ulord全球社区联盟(优得社区)核心区块链技术开发者,深入研究比特币,以太坊,EOS Dash,Rsk,Java, Nodejs,PHP,Python,C++ 我希望能聚集更多区块链开发者,一起学习共同进步。

从零开始学习比特币(五):P2P网络建立的流程之套接字的读取和发送-第1张图片-欧交易所

标签: do sco

抱歉,评论功能暂时关闭!

微信号已复制,请打开微信添加咨询详情!