RTMP是Real Time Messaging Protocol(实时消息传输协议)的首字母缩写。该协议基于TCP,是一个协议族,包括RTMP基本协议及RTMPT/RTMPS/RTMPE等多种变种。RTMP是一种设计用来进行实时数据通信的网络协议,主要用来在Flash/AIR平台和支持RTMP协议的流媒体/交互服务器之间进行音视频和数据通信。
To compile type “make” with SYS=, e.g. make SYS=posix for Linux, MacOSX, Unix, etc. or make SYS=mingw for Windows. You can cross-compile for other platforms using the CROSS_COMPILE variable: make CROSS_COMPILE=arm-none-linux- INC=-I/my/cross/includes
编译完成以后,可以在librtmp目录下面找到librtmp.a的库文件。在需要使用librtmp的工程里面,将这个lib链接进去就可以了。
./Configure android-armv7 修改makefile cc ar prefix…. (edited)
编译完成以后,我们在libs目录可以看到libssl.a和libcrypto.a两个lib。在rtmp push的时候,需要这两个libs的支持。
librtmp提供了推流的API,可以在rtmp.h文件中查看所有API。我们只需要使用常用的几个API就可以将streaming推送到服务器。 - RTMP_Init()//初始化结构体 - RTMP_Free() - RTMP_Alloc() - RTMP_SetupURL()//设置rtmp server地址 - RTMP_EnableWrite()//打开可写选项,设定为推流状态 - RTMP_Connect()//建立NetConnection - RTMP_Close()//关闭连接 - RTMP_ConnectStream()//建立NetStream - RTMP_DeleteStream()//删除NetStream - RTMP_SendPacket()//发送数据
流程图
Created with Raphaël 2.1.0 Start RTMP_Init() RTMP_Alloc() RTMP_Setup() RTMP_EnableWrite() RTMP_Connect() RTMP_ConnectStream() RTMP_SendPacket() End在发送第一帧Audio和Video的时候,需要将Audio和Video的信息封装成为RTMP header,发送给rtmp server。 Audio头有4字节,包含:头部标记0xaf 0x00、 profile、channel、bitrate 信息。 Video头有16字节,包含IFrame、PFrame、AVC标识,除此之外,还需要将sps和pps放在header 里面。 RTMP协议定义了message Type,其中Type ID为8,9的消息分别用于传输音频和视频数据:
#define RTMP_PACKET_TYPE_AUDIO 0x08 #define RTMP_PACKET_TYPE_VIDEO 0x09 Audio 格式封装的源码: AAC header packet: body = (unsigned char *)malloc(4 + size); memset(body, 0, 4); body[0] = 0xaf; body[1] = 0x00; switch (profile){ case 0: body[2]|=(1<<3);//main break; case 1: body[2]|=(1<<4);//LC break; case 2: body[2]|=(1<<3);//SSR body[2]|=(1<<4); break; default: ; } switch(this->channel){ case 1: body[3]|=(1<<3);//channel1 break; case 2: body[3]|=(1<<4);//channel2 break; default: ; } switch(this->rate){ case 48000: body[2]|=(1); body[3]|=(1<<7); break; case 44100: body[2]|=(1<<1); break; case 32000: body[2]|=(1<<1); body[3]|=(1<<7); break; default: ; } sendPacket(RTMP_PACKET_TYPE_AUDIO, body, 4, 0); free(body); Video 格式封装的源码: H264 header packet: body = (unsigned char *)malloc(16 + sps_len + pps_len); this->videoFist = false; memset(body, 0, 16 + sps_len + pps_len); body[i++] = 0x17; // 1: IFrame, 7: AVC // AVC Sequence Header body[i++] = 0x00; body[i++] = 0x00; body[i++] = 0x00; body[i++] = 0x00; // AVCDecoderConfigurationRecord body[i++] = 0x01; body[i++] = sps[1]; body[i++] = sps[2]; body[i++] = sps[3]; body[i++] = 0xff; body[i++] = 0xe1; body[i++] = (sps_len >> 8) & 0xff; body[i++] = sps_len & 0xff; for (size_t j = 0; j < sps_len; j++) { body[i++] = sps[j]; } body[i++] = 0x01; body[i++] = (pps_len >> 8) & 0xff; body[i++] = pps_len & 0xff; for (size_t j = 0; j < pps_len; j++) { body[i++] = pps[j]; } sendPacket(RTMP_PACKET_TYPE_VIDEO, body, i, nTimeStamp); free(body);只有第一帧Audio和第一帧video才需要发送header信息。之后就直接发送帧数据。 发送Audio的时候,只需要在数据帧前面加上2 byte的header信息:
spec_info[0] = 0xAF; spec_info[1] = 0x01;发送Video的时候,需要在header里面标识出I P帧的信息,以及视频帧的长度信息:
body = (unsigned char *)malloc(9 + size); memset(body, 0, 9); i = 0; if (bIsKeyFrame== 0) { body[i++] = 0x17; // 1: IFrame, 7: AVC } else { body[i++] = 0x27; // 2: PFrame, 7: AVC } // AVCVIDEOPACKET body[i++] = 0x01; body[i++] = 0x00; body[i++] = 0x00; body[i++] = 0x00; // NALUs body[i++] = size >> 24 & 0xff; body[i++] = size >> 16 & 0xff; body[i++] = size >> 8 & 0xff; body[i++] = size & 0xff; memcpy(&body[i], data, size);HandShake函数在:/rtmp/rtmplib/handshack.h中。 ./rtmp.c:69:#define RTMP_SIG_SIZE 1536
/*client HandShake*/ 695 static int HandShake(RTMP * r, int FP9HandShake){ 709 uint8_t clientbuf[RTMP_SIG_SIZE + 4], *clientsig=clientbuf+4; /*C0 字段已经写入clientsig*/ 721 if (encrypted){ 722 clientsig[-1] = 0x06; /* 0x08 is RTMPE as well */ 723 offalg = 1; 724 }else //0x03代表RTMP协议的版本(客户端要求的) //数组竟然能有“-1”下标,因为clientsig指向的是clientbuf+4,所以不存在非法地址 //C0中的字段(1B) 725 clientsig[-1] = 0x03; /*准备C1字段过程略去,C1字段的数据写入clientsig中, clientsig的大小为1536个字节*/ /*1st part of shakehand .......*/ /*C ------- S*/ /*c0 C1--> */ /* <-- S0 S1*/ /*C2 --> */ /*send clientsig C0 和 C1一起发送*/ 814 if (!WriteN(r, (char *)clientsig-1, RTMP_SIG_SIZE + 1)) 815 return FALSE; /*get server response->read type, if get response type not match handshake failed*/ 817 if (ReadN(r, (char *)&type, 1) != 1) /* 0x03 or 0x06 */ 818 return FALSE; /*encrypt type = 0x06*/ /*get server response->read serversig*/ 826 if (ReadN(r, (char *)serversig, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) 827 return FALSE; /*如果是加密协议,则需要校验收到的serversig是否和发送的匹配,如果没有加密则直接发送收到的serversig*/ 968 if (!WriteN(r, (char *)reply, RTMP_SIG_SIZE)) 969 return FALSE; /*2nd part of shakehand .....*/ /*C ----- S*/ /* <-- S2*/ 972 if (ReadN(r, (char *)serversig, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) 973 return FALSE; /* compare info between serversig and clientsig*/ 1060 if (memcmp(serversig, clientsig, RTMP_SIG_SIZE) != 0) /*如果相等,则握手成功*/ }建立连接的代码位于:librtmp/rtmp.c中,定义函数:RTMP_Connect()。RTMP_Conncet()里面又分别调用了两个函数:RTMP_Connect0(), RTMP_Connect1()。RTMP_Connect0()主要进行的是socket的连接,RTMP_Connct1()进行的是RTMP相关的连接动作。
1031 int RTMP_Connect(RTMP *r, RTMPPacket *cp) 1032 { 1033 struct sockaddr_in service; 1034 if (!r->Link.hostname.av_len) 1035 return FALSE; 1036 1037 memset(&service, 0, sizeof(struct sockaddr_in)); 1038 service.sin_family = AF_INET; 1039 1040 if (r->Link.socksport) 1041 { 1042 /* Connect via SOCKS */ 1043 if (!add_addr_info(&service, &r->Link.sockshost, r->Link.socksport)) 1044 return FALSE; 1045 } 1046 else 1047 { 1048 /* Connect directly */ 1049 if (!add_addr_info(&service, &r->Link.hostname, r->Link.port)) 1050 return FALSE; 1051 } 1052 1053 if (!RTMP_Connect0(r, (struct sockaddr *)&service)) 1054 return FALSE; 1055 1056 r->m_bSendCounter = TRUE; 1057 1058 return RTMP_Connect1(r, cp); 1059 }RTMP_Connect0函数分析:
905 int RTMP_Connect0(RTMP *r, struct sockaddr * service){ /*创建socket*/ 913 r->m_sb.sb_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); /*通过socket连接到服务器地址*/ 916 if (connect(r->m_sb.sb_socket, service, sizeof(struct sockaddr)) < 0) /*如果指定了socket端口到,则进行socks Negotiate*/ 928 if (!SocksNegotiate(r)){} /*连接成功之后,返回TRUE*/ 956 return TRUE; }RTMP_Connect1函数分析: 根据不同的传输协议,选择传送数据的方式。之后进行HandShake,最后调用SendConnectPacket()送Connect packet。
int RTMP_Connect1(RTMP *r, RTMPPacket *cp) { /*if crypto use tls_conncet*/ if (r->Link.protocol & RTMP_FEATURE_SSL){ #if defined(CRYPTO) && !defined(NO_SSL) TLS_client(RTMP_TLS_ctx, r->m_sb.sb_ssl); TLS_setfd(r->m_sb.sb_ssl, r->m_sb.sb_socket); if (TLS_connect(r->m_sb.sb_ssl) < 0){...} #else return FALSE; #endif } /*if no crypto, use http post*/ if (r->Link.protocol & RTMP_FEATURE_HTTP){ HTTP_Post(r, RTMPT_OPEN, "", 1); if (HTTP_read(r, 1) != 0){...} ... } /*进行HandShake*/ if (!HandShake(r, TRUE)){...} /*握手成功之后,发送Connect Packet*/ if (!SendConnectPacket(r, cp)){...} return TRUE; }SendConnectPacket() 里面主要对RTMP信息进行打包,然后调用RTMP_SendPacket函数,将内容发送出去。
static int SendConnectPacket(RTMP *r, RTMPPacket *cp) { RTMPPacket packet; char pbuf[4096], *pend = pbuf + sizeof(pbuf); char *enc; if (cp) return RTMP_SendPacket(r, cp, TRUE); packet.m_nChannel = 0x03; /* control channel (invoke) */ packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = RTMP_PACKET_TYPE_INVOKE; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_connect); enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_OBJECT; /*encrypto 部分省略 主要就是调用AMF函数进行*/ ... packet.m_nBodySize = enc - packet.m_body; return RTMP_SendPacket(r, &packet, TRUE); }RTMP_ConnectStream()函数主要用于在NetConnection基础上面建立一个NetStream。
简单的一个逻辑判断,重点在while循环里。首先,必须要满足三个条件。其次,进入循环以后只有出错或者建立流(NetStream)完成后,才能退出循环。 有两个重要的函数:
块格式:
basic header(1-3字节)chunk msg header(0/3/7/11字节)Extended Timestamp(0/4字节)chunk data消息格式:
timestamp(3字节)msg length(3字节)msg type id(1字节,小端)msg stream id(4字节) /** * @brief 读取接收到的消息块(Chunk),存放在packet中. 对接收到的消息不做任何处理。 块的格式为: * * | basic header(1-3字节)| chunk msg header(0/3/7/11字节) | Extended Timestamp(0/4字节) | chunk data | * * 其中 basic header还可以分解为:| fmt(2位) | cs id (3 <= id <= 65599) | * RTMP协议支持65597种流,ID从3-65599。ID 0、1、2作为保留。 * id = 0,表示ID的范围是64-319(第二个字节 + 64); * id = 1,表示ID范围是64-65599(第三个字节*256 + 第二个字节 + 64); * id = 2,表示低层协议消息。 * 没有其他的字节来表示流ID。3 -- 63表示完整的流ID。 * * 一个完整的chunk msg header 还可以分解为 : * | timestamp(3字节) | msg length(3字节) | msg type id(1字节,小端) | msg stream id(4字节) | */ int RTMP_ReadPacket(RTMP *r, RTMPPacket *packet) { uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 }; // Chunk Header长度最大值为3 + 11 + 4 = 18 char *header = (char *)hbuf; // header指向从socket接收到的数据 int nSize, hSize, nToRead, nChunk; // nSize是块消息头长度,hSize是块头长度 int didAlloc = FALSE; RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d", __FUNCTION__, r->m_sb.sb_socket); // 读取1个字节存入 hbuf[0] if (ReadN(r, (char *)hbuf, 1) == 0) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__); return FALSE; } packet->m_headerType = (hbuf[0] & 0xc0) >> 6; // 块类型fmt packet->m_nChannel = (hbuf[0] & 0x3f); // 块流ID(2 - 63) header++; // 块流ID第一个字节为0,表示块流ID占2个字节,表示ID的范围是64-319(第二个字节 + 64) if (packet->m_nChannel == 0) { // 读取接下来的1个字节存放在hbuf[1]中 if (ReadN(r, (char *)&hbuf[1], 1) != 1) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte", __FUNCTION__); return FALSE; } // 块流ID = 第二个字节 + 64 = hbuf[1] + 64 packet->m_nChannel = hbuf[1]; packet->m_nChannel += 64; header++; } // 块流ID第一个字节为1,表示块流ID占3个字节,表示ID范围是64 -- 65599(第三个字节*256 + 第二个字节 + 64) else if (packet->m_nChannel == 1){ int tmp; // 读取2个字节存放在hbuf[1]和hbuf[2]中 if (ReadN(r, (char *)&hbuf[1], 2) != 2) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte", __FUNCTION__); return FALSE; } // 块流ID = 第三个字节*256 + 第二个字节 + 64 tmp = (hbuf[2] << 8) + hbuf[1]; packet->m_nChannel = tmp + 64; RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel); header += 2; } // 块消息头(ChunkMsgHeader)有四种类型,大小分别为11、7、3、0,每个值加1 就得到该数组的值 // 块头 = BasicHeader(1-3字节) + ChunkMsgHeader + ExtendTimestamp(0或4字节) nSize = packetSize[packet->m_headerType]; // 块类型fmt为0的块,在一个块流的开始和时间戳返回的时候必须有这种块 // 块类型fmt为1、2、3的块使用与先前块相同的数据 // 关于块类型的定义,可参考官方协议:流的分块 --- 6.1.2节 if (nSize == RTMP_LARGE_HEADER_SIZE) /* if we get a full header the timestamp is absolute */ { packet->m_hasAbsTimestamp = TRUE; // 11个字节的完整ChunkMsgHeader的TimeStamp是绝对时间戳 }else if (nSize < RTMP_LARGE_HEADER_SIZE){ /* using values from the last message of this channel */ if (r->m_vecChannelsIn[packet->m_nChannel]) memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel], sizeof(RTMPPacket)); } nSize--; // 真实的ChunkMsgHeader的大小,此处减1是因为前面获取包类型的时候多加了1 // 读取nSize个字节存入header if (nSize > 0 && ReadN(r, header, nSize) != nSize){ RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x", __FUNCTION__, (unsigned int)hbuf[0]); return FALSE; } // 目前已经读取的字节数 = chunk msg header + basic header hSize = nSize + (header - (char *)hbuf); // chunk msg header为11、7、3字节,fmt类型值为0、1、2 if (nSize >= 3){ // 首部前3个字节为timestamp packet->m_nTimeStamp = AMF_DecodeInt24(header); /* RTMP_Log(RTMP_LOGDEBUG, "%s, reading RTMP packet chunk on channel %x, headersz %i, timestamp %i, abs timestamp %i", __FUNCTION__, packet.m_nChannel, nSize, packet.m_nTimeStamp, packet.m_hasAbsTimestamp); */ // chunk msg header为11或7字节,fmt类型值为0或1 if (nSize >= 6) { packet->m_nBodySize = AMF_DecodeInt24(header + 3); packet->m_nBytesRead = 0; RTMPPacket_Free(packet); if (nSize > 6) { packet->m_packetType = header[6]; // msg type id if (nSize == 11) packet->m_nInfoField2 = DecodeInt32LE(header + 7); // msg stream id,小端字节序 } } // Extend Tiemstamp,占4个字节 if (packet->m_nTimeStamp == 0xffffff){ if (ReadN(r, header + nSize, 4) != 4) { RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp", __FUNCTION__); return FALSE; } packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize); hSize += 4; } } RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)hbuf, hSize); // 如果消息长度非0,且消息数据缓冲区为空,则为之申请空间 if (packet->m_nBodySize > 0 && packet->m_body == NULL){ if (!RTMPPacket_Alloc(packet, packet->m_nBodySize)){ RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__); return FALSE; } didAlloc = TRUE; packet->m_headerType = (hbuf[0] & 0xc0) >> 6; } // 剩下的消息数据长度如果比块尺寸大,则需要分块,否则块尺寸就等于剩下的消息数据长度 nToRead = packet->m_nBodySize - packet->m_nBytesRead; nChunk = r->m_inChunkSize; if (nToRead < nChunk) nChunk = nToRead; /* Does the caller want the raw chunk? */ if (packet->m_chunk){ packet->m_chunk->c_headerSize = hSize; // 块头大小 memcpy(packet->m_chunk->c_header, hbuf, hSize); // 填充块头数据 packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead; // 块消息数据缓冲区指针 packet->m_chunk->c_chunkSize = nChunk; // 块大小 } // 读取一个块大小的数据存入块消息数据缓冲区 if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk){ RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %u", __FUNCTION__, packet->m_nBodySize); return FALSE; } RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk); // 更新已读数据字节个数 packet->m_nBytesRead += nChunk; /* keep the packet as ref for other packets on this channel */ // 将这个包作为通道中其他包的参考 if (!r->m_vecChannelsIn[packet->m_nChannel]) r->m_vecChannelsIn[packet->m_nChannel] = malloc(sizeof(RTMPPacket)); memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket)); // 包读取完毕 if (RTMPPacket_IsReady(packet)){ /* make packet's timestamp absolute,绝对时间戳 = 上一次绝对时间戳 + 时间戳增量 */ if (!packet->m_hasAbsTimestamp) /* timestamps seem to be always relative!! */ packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel]; // 当前绝对时间戳保存起来,供下一个包转换时间戳使用 r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp; /* reset the data from the stored packet. we keep the header since we may use it later if a new packet for this channel arrives and requests to re-use some info (small packet header) */ // 重置保存的包。保留块头数据,因为通道中新到来的包(更短的块头)可能需要使用前面块头的信息. r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL; r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0; r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE; // can only be false if we reuse header } else{ packet->m_body = NULL; /* so it won't be erased on free */ } return TRUE; }