本文共 8321 字,大约阅读时间需要 27 分钟。
SRS中使用协程库state-thread(ST), 在使用时对其进行了封装,保证使用方便。这种封装方法和使用thread库比较类似。
用于创建一个永不退出的协程,生命周期和整个程序一样。使用时需要继承ISrsEndlessThreadHandler方法,并在构造函数中创建SrsEndlessThread,重写cycle方法。
使用时执行流程:
//永不退出协程的处理类class ISrsEndlessThreadHandler{public: ISrsEndlessThreadHandler(); virtual ~ISrsEndlessThreadHandler();public: virtual int cycle() = 0;public: virtual void on_thread_start(); virtual int on_before_cycle(); virtual int on_end_cycle(); virtual void on_thread_stop();};//永不退出的协程class SrsEndlessThread : public internal::ISrsThreadHandler{private: internal::SrsThread* pthread; //包含一个协程封装类 ISrsEndlessThreadHandler* handler; //协程处理类public: SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h); virtual ~SrsEndlessThread();public: virtual int start(); //启动public: virtual int cycle(); //执行的循环 virtual void on_thread_start(); virtual int on_before_cycle(); virtual int on_end_cycle(); virtual void on_thread_stop();};
用于创建一次循环的thread,在cycle函数执行完后,调用stop_loop退出。执行流程和SrsEndlessThread类似。
//执行完handler->cycle()后,调用stop_loop,退出int SrsOneCycleThread::cycle(){ int ret = handler->cycle(); pthread->stop_loop(); return ret;}
可以重复使用的thread, 其他类继承ISrsReusableThreadHandler,并包含SrsReusableThread的变量,start函数启动线程,stop函数停止线程。重写cycle函数
class ISrsReusableThreadHandler{public: ISrsReusableThreadHandler(); virtual ~ISrsReusableThreadHandler();public: virtual int cycle() = 0;public: virtual void on_thread_start(); virtual int on_before_cycle(); virtual int on_end_cycle(); virtual void on_thread_stop();};class SrsReusableThread : public internal::ISrsThreadHandler{private: internal::SrsThread* pthread; ISrsReusableThreadHandler* handler;public: SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t interval_us = 0); virtual ~SrsReusableThread();public: virtual int start(); virtual void stop();public: virtual int cid();// interface internal::ISrsThreadHandlerpublic: virtual int cycle(); virtual void on_thread_start(); virtual int on_before_cycle(); virtual int on_end_cycle(); virtual void on_thread_stop();};
//可以调用stopvoid SrsReusableThread::stop(){ pthread->stop();}
和SrsReusableThread区别是:在线程cycle里有内部循环,需要判断interrupt状态,如果内部loop想要退出线程,应该interrupt该线程。
//多了interruptvoid SrsReusableThread2::interrupt(){ pthread->stop_loop();}bool SrsReusableThread2::interrupted(){ return !pthread->can_loop();}
/* * 线程处理类,定制线程启动的回调函数 * */class ISrsThreadHandler{public: ISrsThreadHandler(); virtual ~ISrsThreadHandler();public: virtual void on_thread_start(); //线程启动 virtual int on_before_cycle(); //cycle前 virtual int cycle() = 0; //cycle virtual int on_end_cycle(); //cycle后 virtual void on_thread_stop(); //stop时}; /* * 协程的封装,作为内部使用的类 * */class SrsThread{private: st_thread_t tid; //tid int _cid; //cid bool loop; //是否支持loop bool can_run; //是否能run bool really_terminated; //是否terminate bool _joinable; //是否joinable const char* _name; //协程名字 bool disposed; //是否disposeprivate: ISrsThreadHandler* handler; //回调处理 int64_t cycle_interval_us; //循环时间uspublic: //初始化协程 SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable); virtual ~SrsThread();public: virtual int cid(); //获取cid virtual int start(); //启动线程 virtual void stop(); //暂停线程public: virtual bool can_loop(); //是否能loop virtual void stop_loop(); //停止loopprivate: virtual void dispose(); //释放 virtual void thread_cycle(); //线程循环 static void* thread_fun(void* arg); //线程循环调用的函数};
/* * 线程的构造函数 * name:函数名 * thread_handle:线程处理函数 * interval_us: 休眠时长 * joinalbe: 是否能join * */ SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable) { _name = name; handler = thread_handler; cycle_interval_us = interval_us; tid = NULL; loop = false; really_terminated = true; _cid = -1; _joinable = joinable; disposed = false; can_run = false; } //析构函数,调用stop SrsThread::~SrsThread() { stop(); } int SrsThread::cid() { return _cid; } //启动一个协程 int SrsThread::start() { int ret = ERROR_SUCCESS; if(tid) { srs_info("thread %s already running.", _name); return ret; } //创建协程,调用thread_fun if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){ ret = ERROR_ST_CREATE_CYCLE_THREAD; srs_error("st_thread_create failed. ret=%d", ret); return ret; } //是否dispose disposed = false; // we set to loop to true for thread to run. loop = true; // wait for cid to ready, for parent thread to get the cid. while (_cid < 0) { st_usleep(10 * 1000); } // now, cycle thread can run. can_run = true; return ret; } //停止一个协程 void SrsThread::stop() { if (!tid) { return; } loop = false; //loop为false, 那么不会继续执行cycle() dispose(); //释放协程 _cid = -1; can_run = false; tid = NULL; } //清理 void SrsThread::dispose() { if (disposed) { return; } st_thread_interrupt(tid); if (_joinable) { // wait the thread to exit. int ret = st_thread_join(tid, NULL); if (ret) { srs_warn("core: ignore join thread failed."); } } while (!really_terminated) { st_usleep(10 * 1000); if (really_terminated) { break; } srs_warn("core: wait thread to actually terminated"); } disposed = true; } //协程的循环 void SrsThread::thread_cycle() { int ret = ERROR_SUCCESS; _srs_context->generate_id(); //生成cid srs_info("thread %s cycle start", _name); _cid = _srs_context->get_id(); srs_assert(handler); handler->on_thread_start(); //调用handle的on_thread_start // thread is running now. really_terminated = false; // wait for cid to ready, for parent thread to get the cid. while (!can_run && loop) { st_usleep(10 * 1000); } //正在的loop,loop里执行函数为:on_before_cycle->cycle->on_end_cycle while (loop) { if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) { srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret); goto failed; } srs_info("thread %s on before cycle success", _name); if ((ret = handler->cycle()) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) { srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret); } goto failed; } srs_info("thread %s cycle success", _name); if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) { srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret); goto failed; } srs_info("thread %s on end cycle success", _name); failed: if (!loop) { break; } if (cycle_interval_us != 0) { st_usleep(cycle_interval_us); } } // really terminated now. really_terminated = true; handler->on_thread_stop();//停止时的回调 srs_info("thread %s cycle finished", _name); } //协程执行的函数 void* SrsThread::thread_fun(void* arg) { SrsThread* obj = (SrsThread*)arg; srs_assert(obj); obj->thread_cycle(); //调用cycle函数 // for valgrind to detect. SrsThreadContext* ctx = dynamic_cast(_srs_context); if (ctx) { ctx->clear_cid(); } st_thread_exit(NULL); //退出协程 return NULL; }
转载地址:http://yqmxb.baihongyu.com/