设计h264的rtp网络服务器,首先需要考虑的就是多路udp如何接收,如果采用多线程的模式,会导致线程上下文切换过于频繁,导致udp丢包。采用多进程的模式,占用的内存和进程资源又不好控制。所以在linux下采用epoll模型比较合适。 epoll头文件 :
#include <sys/epoll.h>接收到数据后,epoll不仅可以指明哪路fd收到了数据,还可以通过自定义结构体来指明相应的结构体收到了数据,这一点比select模型要灵活。
typedef struct Recv_Event { int fdRecv; int id; } Recv_Event;其中fdRecv是收到数据的fd,id是用来标示处理udp数据的类对象等资源的id,这样方便对不同路的udp进行处理,省去了自己写map查找fd对应的id的事。
新建epfd,定义最大20路udp的接收,描述符多少与内存空间相关,我设定在arm上设定50多万没什么问题,与select的1024个描述符相比,优势明显。
int intsize =20; int epfd = epoll_create(intsize); int op =EPOLL_CTL_ADD; struct epoll_event epv = {0}; epv.events=EPOLLIN ;其中EPOLL_CTL_ADD是向epfd增加fd的操作,EPOLLIN是epfd上有in的数据,即读入数据的事件时,返回相应的接收fd的集合。
int portCount =16; Recv_Event *pEvent[portCount] ; for(i=0;i<portCount;i++) { portid =portIndex+i; //h264Decoder[i] =new H264Decoder(portid); recvBufLength[i] =0; lastSeqNum[i] =0; len[i] =10000; sockSrv[i] = socket(AF_INET,SOCK_DGRAM, 0); addrServ[i].sin_addr.s_addr =htonl(INADDR_ANY);// inet_addr("192.168.1.198"); //addrServ.sin_addr.S_un.S_addr = htonl(INADDR_ANY);//addr4->sin_addr = ip; addrServ[i].sin_family = AF_INET; addrServ[i].sin_port = htons(portid); printf("recv port:%d\n",portid); if (bind(sockSrv[i], (struct sockaddr *)&addrServ[i], sizeof(addrServ[i]))<0) { perror("connect"); } value = 1024000; setsockopt(sockSrv[i], SOL_SOCKET, SO_RCVBUF,(char *)&value, sizeof(value)); value =0; getsockopt(sockSrv[i], SOL_SOCKET, SO_RCVBUF,(char *)&value, &valSize); printf("socket size: %d\n",value); recvBuf[i]=(unsigned char*)malloc(len[i]); pEvent[i]= (Recv_Event*)malloc(sizeof( Recv_Event)); pEvent[i]->fdRecv =sockSrv[i]; pEvent[i]->id = i; epv.data.ptr =pEvent[i]; epoll_ctl(epfd, op, sockSrv[i], &epv); }上面程序是接受16路udp的socket的初始化部分,其中h264Decoder[i] =new H264Decoder(portid);是用来处理h264码流的类对象,暂时注释掉以解耦。 setsockopt是用来设置udp的缓冲区大小,以防udp传送的视频流过大造成缓冲区溢出。 epv.data.ptr是可以指定自定义结构体。 epoll_ctl(epfd, op, sockSrv[i], &epv);是循环的将这16路socket添加到epfd的集合中,以便后续监控。
下面是16路接收数据的程序
struct epoll_event events[intsize]; while(1) { int fds = epoll_wait(epfd, events, intsize, 0); if (fds ==-1) return 0; if (fds ==0) { usleep(1); continue; } int i =0; for(i = 0; i < fds; i++) { Recv_Event *event = (Recv_Event*)events[i].data.ptr; flags = 0; len[event->id] =10000; int available; ioctl(sockSrv[event->id], FIONREAD, &available); if (available > 0 && len[event->id] >available) len[event->id] = available; recvBufLength[event->id] = recvfrom(event->fdRecv,(void *)recvBuf[event->id],len[event->id], //may be mistake. i is not right, use data.ptr is right. MSG_DONTWAIT, (struct sockaddr*)&addrClient,(socklen_t*)&length); if (recvBufLength[event->id] < 0) { continue; } // RTPFrame recvRTP(recvBuf[event->id], recvBufLength[event->id]); // if (( recvRTP.GetSequenceNumber() - lastSeqNum[event->id]) != 1) // { // printf("*****[event->id] %d,lost packet %d. recvBufLength :%d available:%d\n",event->id,( recvRTP.GetSequenceNumber() - lastSeqNum[event->id]),recvBufLength[event->id],available); // } // lastSeqNum[event->id] = recvRTP.GetSequenceNumber(); // h264Decoder[event->id]->Transcode(recvBuf[event->id],recvBufLength[event->id],flags); //printf("fds:%d,recvBufLength[i],%d\n",fds,recvBufLength[event->id]); } }epoll_event events是用来存放有数据接收的fd的集合。 fds是用来存放有数据接收的fd的数量。 int fds = epoll_wait(epfd, events, intsize, 0);是等待epfd上是否有数据接收,有1路就返回,多路就返回多路的数量和fd,保存在fds和events中。 ioctl(sockSrv[event->id], FIONREAD, &available);是用来确定接收到的数据大小,可有可无。 后面RTPFrame和h264Decoder是用来处理udp包的部分,注释掉以解耦。
for(i=0;i<portCount;i++) { free(recvBuf[i]); shutdown(sockSrv[i],SHUT_RDWR); } close(epfd);释放资源部分代码,注意epoll的fd是通过close来直接关闭即可。
下载源代码链接