經過兩個多月的攻關,終於搞定了live555多執行緒並穩定壓測通過
live555已經發展了十幾年了,不得不欽佩作者堅持不懈的奉獻和國外的開源生態環境,live555可以說是大部分的安防從業者的入門之選,尤其是在嵌入式或者Linux系統上,其應用還是蠻廣泛的,主要是其相容性和穩定性;
但是隨著live555十幾年的不斷迭代,很多開發者反覆向作者Ross提到的多執行緒和IPv6的功能,作者也一直都沒有去嘗試,可能是這樣會對live555的架構產生比較大的改動和影響,作者為了穩妥,選擇了小改動、穩定、逐步迭代的方式, 雖然是效能穩定,但支援的路數有限,不能多執行緒工作始終是個坎; 網上找到幾篇live555多執行緒的部落格, 基本上大同小異,就是建立獨立的UsageEnvironment和TaskSchedule, 由獨立的執行緒分工協作; 本人也是這個思路,建立多個工作執行緒,每個工作執行緒內建立UsageEnvironment和TaskSchedule,然後各自開啟EventLoop;
今天我們拋磚引玉,先大概聊一下主體思路,在後續的部落格中將盡力完整地彙總這些思路和開發的過程:
目標
將live555修改為多執行緒, 每個通道對應一個工作執行緒,由工作執行緒對該通道進行獨立處理;
大體修改點
修改支援多執行緒, 主要涉及到以下類的修改
- GenericMediaServer
- RTSPServer
GenericMediaServer.cpp
在GenericMediaServer的建構函式中, 建立工作執行緒個數,即最大支援的通道數;
GenericMediaServer ::GenericMediaServer(UsageEnvironment& env, int ourSocketV4, int ourSocketV6, Port ourPort, unsigned reclamationSeconds) : Medium(env), fServerSocket4(ourSocketV4), fServerSocket6(ourSocketV6), fServerPort(ourPort), fReclamationSeconds(reclamationSeconds), fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)), fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)), fClientSessions(HashTable::create(STRING_HASH_KEYS)) { ignoreSigPipeOnSocket(fServerSocket4); // so that clients on the same host that are killed don't also kill us ignoreSigPipeOnSocket(fServerSocket6); // so that clients on the same host that are killed don't also kill us #ifdef LIVE_MULTI_THREAD_ENABLE InitMutex(&mutexClientConnection); memset(&multiThreadCore, 0x00, sizeof(MultiThread_CORE_T)); multiThreadCore.threadNum = MAX_DEFAULT_MULTI_THREAD_NUM; multiThreadCore.threadTask = new LIVE_THREAD_TASK_T[multiThreadCore.threadNum]; memset(&multiThreadCore.threadTask[0], 0x00, sizeof(LIVE_THREAD_TASK_T) * multiThreadCore.threadNum); for (int i=0; i<multiThreadCore.threadNum; i++) { char szName[36] = {0}; sprintf(szName, "worker thread %d", i); multiThreadCore.threadTask[i].id = i; multiThreadCore.threadTask[i].extPtr = this; multiThreadCore.threadTask[i].pSubScheduler = BasicTaskScheduler::createNew(); multiThreadCore.threadTask[i].pSubEnv = BasicUsageEnvironment::createNew(*multiThreadCore.threadTask[i].pSubScheduler, i+1, szName); CreateOSThread( &multiThreadCore.threadTask[i].osThread, __OSThread_Proc, (void *)&multiThreadCore.threadTask[i] ); } #endif // Arrange to handle connections from others: env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket4, incomingConnectionHandler4, this); env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket6, incomingConnectionHandler6, this); }
接受客戶端連線
按原有流程接受客戶端連線;
分配客戶端請求
在收到客戶端傳送的DESCRIBE命令後,
在通道列表中找出空閒的通道,將該客戶端關聯到該通道, 然後從主執行緒中移除該socket, 由工作執行緒接管該socket的操作;
後續有客戶端如訪問已經存在的通道,則主執行緒會將該請求直接分配給對應的工作執行緒處理;
注意: 主執行緒的工作到此結束,不要執行lookupServerMediaSession的操作;
在工作執行緒中, 接管客戶端的socket後, 馬上執行lookupServerMediaSession, 在該函式中,將字尾回撥給上層呼叫程式, 由上層呼叫程式判斷是否存在該通道,如不存在則返回失敗,如存在則向前端取流,然後填充媒體資訊返回成功, 庫內部則建立相應的mediasession, 再回應客戶端, 後續的則完成整個rtsp流程的互動;
注意: 建立MediaSession時,必須將工作執行緒中的UsageEnvironment傳進去, 不能使用主執行緒中的envir();
int RTSPServer::RTSPClientConnection
::handleCmd_DESCRIBE(UsageEnvironment *pEnv, char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr, LIVE_THREAD_TASK_T **pThreadTask)
{
int handleCmdRet = 0;
ServerMediaSession* session = NULL;
char* sdpDescription = NULL;
char* rtspURL = NULL;
do {
char urlTotalSuffix[2*RTSP_PARAM_STRING_MAX];
// enough space for urlPreSuffix/urlSuffix'\0'
urlTotalSuffix[0] = '\0';
if (urlPreSuffix[0] != '\0') {
strcat(urlTotalSuffix, urlPreSuffix);
strcat(urlTotalSuffix, "/");
}
strcat(urlTotalSuffix, urlSuffix);
if (!authenticationOK("DESCRIBE", urlTotalSuffix, fullRequestStr)) break;
// We should really check that the request contains an "Accept:" #####
// for "application/sdp", because that's what we're sending back #####
_TRACE(TRACE_LOG_DEBUG, "handleCmd_DESCRIBE socket[%d]\n", this->fOurSocket);
#ifdef LIVE_MULTI_THREAD_ENABLE
//如果當前是主執行緒,則進入到查詢通道流程
if (pEnv->GetEnvirId() == 1000)
{
fOurServer.LockClientConnection(); //Lock
UsageEnvironment *pChEnv = fOurServer.GetEnvBySuffix(urlSuffix, this, pThreadTask);
if (NULL == pChEnv)
{
fOurServer.UnlockClientConnection(); //Unlock
handleCmdRet = -1;
this->assignSink = False;
this->pEnv = NULL;
handleCmd_notFound();
break;
}
else
{
_TRACE(TRACE_LOG_DEBUG, "將socket[%d] 關聯到[%s]\n", this->fOurSocket, pChEnv->GetEnvirName());
//將socket從主執行緒移到工作執行緒中
UsageEnvironment *pMainEnv = &envir();
envir().taskScheduler().disableBackgroundHandling(fOurSocket);
fOurServer.UnlockClientConnection(); //Unlock
return 1000;
}
break;
}
#endif
// Begin by looking up the "ServerMediaSession" object for the specified "urlTotalSuffix":
//在工作執行緒中執行 lookupServerMediaSession
session = fOurServer.lookupServerMediaSession(pEnv, 1, this, urlTotalSuffix);
if (session == NULL) {
//pChEnv->taskScheduler().disableBackgroundHandling(fOurSocket);
_TRACE(TRACE_LOG_DEBUG, "socket[%d] 在[%s]中, 源未就緒[%s]\n", this->fOurSocket, pEnv->GetEnvirName(), urlTotalSuffix);
this->assignSink = False;
this->pEnv = NULL;
handleCmdRet = -1;
//envir().taskScheduler().disableBackgroundHandling(fOurSocket);
//fOurServer.ResetEnvBySuffix(urlSuffix, this);
handleCmd_notFound();
break;
}
session->incrementReferenceCount();
// Then, assemble a SDP description for this session:
sdpDescription = session->generateSDPDescription(fOurIPVer);
if (sdpDescription == NULL) {
// This usually means that a file name that was specified for a
// "ServerMediaSubsession" does not exist.
setRTSPResponse("404 File Not Found, Or In Incorrect Format");
break;
}
unsigned sdpDescriptionSize = strlen(sdpDescription);
// Also, generate our RTSP URL, for the "Content-Base:" header
// (which is necessary to ensure that the correct URL gets used in subsequent "SETUP" requests).
rtspURL = fOurRTSPServer.rtspURL(session, fOurIPVer, fClientInputSocket);
snprintf((char*)fResponseBuffer, sizeof fResponseBuffer,
"RTSP/1.0 200 OK\r\nCSeq: %s\r\n"
"%s"
"Content-Base: %s/\r\n"
"Content-Type: application/sdp\r\n"
"Content-Length: %d\r\n\r\n"
"%s",
fCurrentCSeq,
dateHeader(),
rtspURL,
sdpDescriptionSize,
sdpDescription);
} while (0);
if (session != NULL) {
// Decrement its reference count, now that we're done using it:
session->decrementReferenceCount();
if (session->referenceCount() == 0 && session->deleteWhenUnreferenced()) {
fOurServer.removeServerMediaSession(pEnv, session, True);
}
session->SetStreamStatus(1); //置標誌,讓後續訪問該通道的客戶端可以得到迅速響應
}
delete[] sdpDescription;
delete[] rtspURL;
return handleCmdRet;
}
歷經2個多月,終於將多執行緒問題搞定. 在此記錄一下, 歡迎探討;
目前使用者測試反饋的情況是:
- 連續壓測8天時間;
- 接入攝像機30臺;
- 客戶端反覆啟動/停止播放;
- 記憶體、CPU、程式均非常穩定!
live555技術交流
live555技術交流群:475947825