NuPlayer从服务端获取应答消息流程

    xiaoxiao2021-03-25  88

      本文具体介绍NuPlayer获取应答消息的代码流程,流程设计到的每一个行数的详细介绍在之前的文章都有介绍。   下一篇文章介绍接收到服务端发送来的应答消息后的处理过程:   ARTSPConnection::notifyResponseListener函数完成这个流程。   

    ==> void ARTSPConnection::performConnect(const sp<AMessage> &reply, AString host, unsigned port) { struct hostent *ent = gethostbyname(host.c_str()); if (ent == NULL) { ALOGE("Unknown host %s", host.c_str()); reply->setInt32("result", -ENOENT); reply->post(); mState = DISCONNECTED; return; } mSocket = socket(AF_INET, SOCK_STREAM, 0); if (mUIDValid) { HTTPBase::RegisterSocketUserTag(mSocket, mUID, (uint32_t)*(uint32_t*) "RTSP"); HTTPBase::RegisterSocketUserMark(mSocket, mUID); } MakeSocketBlocking(mSocket, false); struct sockaddr_in remote; memset(remote.sin_zero, 0, sizeof(remote.sin_zero)); remote.sin_family = AF_INET; remote.sin_addr.s_addr = *(in_addr_t *)ent->h_addr; remote.sin_port = htons(port); int err = ::connect( mSocket, (const struct sockaddr *)&remote, sizeof(remote)); reply->setInt32("server-ip", ntohl(remote.sin_addr.s_addr)); if (err < 0) { if (errno == EINPROGRESS) { sp<AMessage> msg = new AMessage(kWhatCompleteConnection, this); msg->setMessage("reply", reply); msg->setInt32("connection-id", mConnectionID); msg->post(); return; } reply->setInt32("result", -errno); mState = DISCONNECTED; if (mUIDValid) { HTTPBase::UnRegisterSocketUserTag(mSocket); HTTPBase::UnRegisterSocketUserMark(mSocket); } close(mSocket); mSocket = -1; } else { reply->setInt32("result", OK); mState = CONNECTED; mNextCSeq = 1; //==> postReceiveReponseEvent(); } reply->post(); }  ==> void ARTSPConnection::postReceiveReponseEvent() { if (mReceiveResponseEventPending) { return; } sp<AMessage> msg = new AMessage(kWhatReceiveResponse, this); msg->post(); mReceiveResponseEventPending = true; }  ==> void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) { switch (msg->what()) { case kWhatConnect: onConnect(msg); break; case kWhatDisconnect: onDisconnect(msg); break; case kWhatCompleteConnection: onCompleteConnection(msg); break; case kWhatSendRequest: onSendRequest(msg); break; case kWhatReceiveResponse: //==> onReceiveResponse(); break; case kWhatObserveBinaryData: { CHECK(msg->findMessage("reply", &mObserveBinaryMessage)); break; } default: TRESPASS(); break; } } ==> void ARTSPConnection::onReceiveResponse() { mReceiveResponseEventPending = false; if (mState != CONNECTED) { return; } struct timeval tv; tv.tv_sec = 0; tv.tv_usec = kSelectTimeoutUs; fd_set rs; FD_ZERO(&rs); FD_SET(mSocket, &rs); int res = select(mSocket + 1, &rs, NULL, NULL, &tv); if (res == 1) { MakeSocketBlocking(mSocket, true); //==> bool success = receiveRTSPReponse(); MakeSocketBlocking(mSocket, false); if (!success) { // Something horrible, irreparable has happened. flushPendingRequests(); return; } } postReceiveReponseEvent(); } ==> bool ARTSPConnection::receiveRTSPReponse() { AString statusLine; if (!receiveLine(&statusLine)) { return false; } if (statusLine == "$") { sp<ABuffer> buffer = receiveBinaryData(); if (buffer == NULL) { return false; } if (mObserveBinaryMessage != NULL) { sp<AMessage> notify = mObserveBinaryMessage->dup(); notify->setBuffer("buffer", buffer); notify->post(); } else { ALOGW("received binary data, but no one cares."); } return true; } sp<ARTSPResponse> response = new ARTSPResponse; response->mStatusLine = statusLine; ALOGI("status: %s", response->mStatusLine.c_str()); ssize_t space1 = response->mStatusLine.find(" "); if (space1 < 0) { return false; } ssize_t space2 = response->mStatusLine.find(" ", space1 + 1); if (space2 < 0) { return false; } bool isRequest = false; if (!IsRTSPVersion(AString(response->mStatusLine, 0, space1))) { CHECK(IsRTSPVersion( AString( response->mStatusLine, space2 + 1, response->mStatusLine.size() - space2 - 1))); isRequest = true; response->mStatusCode = 0; } else { AString statusCodeStr( response->mStatusLine, space1 + 1, space2 - space1 - 1); if (!ParseSingleUnsignedLong( statusCodeStr.c_str(), &response->mStatusCode) || response->mStatusCode < 100 || response->mStatusCode > 999) { return false; } } AString line; ssize_t lastDictIndex = -1; for (;;) { if (!receiveLine(&line)) { break; } if (line.empty()) { break; } ALOGV("line: '%s'", line.c_str()); if (line.c_str()[0] == ' ' || line.c_str()[0] == '\t') { // Support for folded header values. if (lastDictIndex < 0) { // First line cannot be a continuation of the previous one. return false; } AString &value = response->mHeaders.editValueAt(lastDictIndex); value.append(line); continue; } ssize_t colonPos = line.find(":"); if (colonPos < 0) { // Malformed header line. return false; } AString key(line, 0, colonPos); key.trim(); key.tolower(); line.erase(0, colonPos + 1); lastDictIndex = response->mHeaders.add(key, line); } for (size_t i = 0; i < response->mHeaders.size(); ++i) { response->mHeaders.editValueAt(i).trim(); } unsigned long contentLength = 0; ssize_t i = response->mHeaders.indexOfKey("content-length"); if (i >= 0) { AString value = response->mHeaders.valueAt(i); if (!ParseSingleUnsignedLong(value.c_str(), &contentLength)) { return false; } } if (contentLength > 0) { response->mContent = new ABuffer(contentLength); if (receive(response->mContent->data(), contentLength) != OK) { return false; } } if (response->mStatusCode == 401) { if (mAuthType == NONE && mUser.size() > 0 && parseAuthMethod(response)) { ssize_t i; CHECK_EQ((status_t)OK, findPendingRequest(response, &i)); CHECK_GE(i, 0); sp<AMessage> reply = mPendingRequests.valueAt(i); mPendingRequests.removeItemsAt(i); AString request; CHECK(reply->findString("original-request", &request)); sp<AMessage> msg = new AMessage(kWhatSendRequest, this); msg->setMessage("reply", reply); msg->setString("request", request.c_str(), request.size()); ALOGI("re-sending request with authentication headers..."); onSendRequest(msg); return true; } } //==> return isRequest ? handleServerRequest(response) : notifyResponseListener(response); } ==> bool ARTSPConnection::notifyResponseListener( const sp<ARTSPResponse> &response) { ssize_t i; status_t err = findPendingRequest(response, &i); if (err == OK && i < 0) { // An unsolicited server response is not a problem. return true; } if (err != OK) { return false; } sp<AMessage> reply = mPendingRequests.valueAt(i); mPendingRequests.removeItemsAt(i); reply->setInt32("result", OK); reply->setObject("response", response); reply->post(); return true; }
    转载请注明原文地址: https://ju.6miu.com/read-21372.html

    最新回复(0)