博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SRS源码分析-协程相关类
阅读量:2378 次
发布时间:2019-05-10

本文共 8321 字,大约阅读时间需要 27 分钟。

SRS中使用协程库state-thread(ST), 在使用时对其进行了封装,保证使用方便。这种封装方法和使用thread库比较类似。

SrsEndlessThread

用于创建一个永不退出的协程,生命周期和整个程序一样。使用时需要继承ISrsEndlessThreadHandler方法,并在构造函数中创建SrsEndlessThread,重写cycle方法。

使用时执行流程:

SRSEndlessThread使用时执行流程图

//永不退出协程的处理类class ISrsEndlessThreadHandler{public:    ISrsEndlessThreadHandler();    virtual ~ISrsEndlessThreadHandler();public:    virtual int cycle() = 0;public:    virtual void on_thread_start();    virtual int on_before_cycle();    virtual int on_end_cycle();    virtual void on_thread_stop();};//永不退出的协程class SrsEndlessThread : public internal::ISrsThreadHandler{private:    internal::SrsThread* pthread; //包含一个协程封装类    ISrsEndlessThreadHandler* handler; //协程处理类public:    SrsEndlessThread(const char* n, ISrsEndlessThreadHandler* h);    virtual ~SrsEndlessThread();public:    virtual int start(); //启动public:    virtual int cycle(); //执行的循环    virtual void on_thread_start();    virtual int on_before_cycle();    virtual int on_end_cycle();    virtual void on_thread_stop();};

SrsOneCycleThread

用于创建一次循环的thread,在cycle函数执行完后,调用stop_loop退出。执行流程和SrsEndlessThread类似。

//执行完handler->cycle()后,调用stop_loop,退出int SrsOneCycleThread::cycle(){    int ret = handler->cycle();    pthread->stop_loop();    return ret;}

SrsReusableThread

可以重复使用的thread, 其他类继承ISrsReusableThreadHandler,并包含SrsReusableThread的变量,start函数启动线程,stop函数停止线程。重写cycle函数

class ISrsReusableThreadHandler{public:    ISrsReusableThreadHandler();    virtual ~ISrsReusableThreadHandler();public:    virtual int cycle() = 0;public:    virtual void on_thread_start();    virtual int on_before_cycle();    virtual int on_end_cycle();    virtual void on_thread_stop();};class SrsReusableThread : public internal::ISrsThreadHandler{private:    internal::SrsThread* pthread;    ISrsReusableThreadHandler* handler;public:    SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, int64_t interval_us = 0);    virtual ~SrsReusableThread();public:    virtual int start();    virtual void stop();public:    virtual int cid();// interface internal::ISrsThreadHandlerpublic:    virtual int cycle();    virtual void on_thread_start();    virtual int on_before_cycle();    virtual int on_end_cycle();    virtual void on_thread_stop();};
//可以调用stopvoid SrsReusableThread::stop(){    pthread->stop();}

SrsReusableThread2

和SrsReusableThread区别是:在线程cycle里有内部循环,需要判断interrupt状态,如果内部loop想要退出线程,应该interrupt该线程。

//多了interruptvoid SrsReusableThread2::interrupt(){    pthread->stop_loop();}bool SrsReusableThread2::interrupted(){    return !pthread->can_loop();}

SrsThread

/* * 线程处理类,定制线程启动的回调函数 * */class ISrsThreadHandler{public:    ISrsThreadHandler();    virtual ~ISrsThreadHandler();public:    virtual void on_thread_start(); //线程启动    virtual int on_before_cycle(); //cycle前    virtual int cycle() = 0; //cycle    virtual int on_end_cycle(); //cycle后    virtual void on_thread_stop(); //stop时}; /*  * 协程的封装,作为内部使用的类  * */class SrsThread{private:    st_thread_t tid; //tid    int _cid; //cid    bool loop; //是否支持loop    bool can_run; //是否能run    bool really_terminated; //是否terminate    bool _joinable; //是否joinable    const char* _name; //协程名字    bool disposed; //是否disposeprivate:    ISrsThreadHandler* handler; //回调处理    int64_t cycle_interval_us; //循环时间uspublic:     //初始化协程    SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable);    virtual ~SrsThread();public:    virtual int cid(); //获取cid    virtual int start(); //启动线程    virtual void stop(); //暂停线程public:    virtual bool can_loop(); //是否能loop    virtual void stop_loop(); //停止loopprivate:    virtual void dispose(); //释放    virtual void thread_cycle(); //线程循环    static void* thread_fun(void* arg); //线程循环调用的函数};
/*     * 线程的构造函数     * name:函数名     * thread_handle:线程处理函数     * interval_us: 休眠时长     * joinalbe: 是否能join     * */    SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable)    {        _name = name;        handler = thread_handler;        cycle_interval_us = interval_us;                tid = NULL;        loop = false;        really_terminated = true;        _cid = -1;        _joinable = joinable;        disposed = false;        can_run = false;    }    //析构函数,调用stop    SrsThread::~SrsThread()    {        stop();    }        int SrsThread::cid()    {        return _cid;    }    //启动一个协程    int SrsThread::start()    {        int ret = ERROR_SUCCESS;                if(tid) {            srs_info("thread %s already running.", _name);            return ret;        }        //创建协程,调用thread_fun        if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){            ret = ERROR_ST_CREATE_CYCLE_THREAD;            srs_error("st_thread_create failed. ret=%d", ret);            return ret;        }        //是否dispose        disposed = false;        // we set to loop to true for thread to run.        loop = true;                // wait for cid to ready, for parent thread to get the cid.        while (_cid < 0) {            st_usleep(10 * 1000);        }                // now, cycle thread can run.        can_run = true;                return ret;    }    //停止一个协程    void SrsThread::stop()    {        if (!tid) {            return;        }                loop = false; //loop为false, 那么不会继续执行cycle()                dispose(); //释放协程                _cid = -1;        can_run = false;        tid = NULL;            }    //清理    void SrsThread::dispose()    {        if (disposed) {            return;        }        st_thread_interrupt(tid);        if (_joinable) {            // wait the thread to exit.            int ret = st_thread_join(tid, NULL);            if (ret) {                srs_warn("core: ignore join thread failed.");            }        }        while (!really_terminated) {            st_usleep(10 * 1000);                        if (really_terminated) {                break;            }            srs_warn("core: wait thread to actually terminated");        }                disposed = true;    }    //协程的循环    void SrsThread::thread_cycle()    {        int ret = ERROR_SUCCESS;                _srs_context->generate_id(); //生成cid        srs_info("thread %s cycle start", _name);                _cid = _srs_context->get_id();                srs_assert(handler);        handler->on_thread_start(); //调用handle的on_thread_start                // thread is running now.        really_terminated = false;                // wait for cid to ready, for parent thread to get the cid.        while (!can_run && loop) {            st_usleep(10 * 1000);        }        //正在的loop,loop里执行函数为:on_before_cycle->cycle->on_end_cycle        while (loop) {            if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {                srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);                goto failed;            }            srs_info("thread %s on before cycle success", _name);                        if ((ret = handler->cycle()) != ERROR_SUCCESS) {                if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {                    srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);                }                goto failed;            }            srs_info("thread %s cycle success", _name);                        if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {                srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);                goto failed;            }            srs_info("thread %s on end cycle success", _name);                    failed:            if (!loop) {                break;            }            if (cycle_interval_us != 0) {                st_usleep(cycle_interval_us);            }        }                // really terminated now.        really_terminated = true;                handler->on_thread_stop();//停止时的回调        srs_info("thread %s cycle finished", _name);    }    //协程执行的函数    void* SrsThread::thread_fun(void* arg)    {        SrsThread* obj = (SrsThread*)arg;        srs_assert(obj);                obj->thread_cycle(); //调用cycle函数                // for valgrind to detect.        SrsThreadContext* ctx = dynamic_cast
(_srs_context); if (ctx) { ctx->clear_cid(); } st_thread_exit(NULL); //退出协程 return NULL; }

转载地址:http://yqmxb.baihongyu.com/

你可能感兴趣的文章
了解内核的装入地址和入口地址,vmlinux.bin与vmlinux
查看>>
内核里面对大小写字符的转换
查看>>
eclipse的奇怪的更新方式
查看>>
MIPS技术公司官方对linux的支持信息
查看>>
openflow简介
查看>>
windows7配置虚拟AP的脚本
查看>>
CompilationUnit
查看>>
日本用大数据技术预测流感 预测与分析结果精确
查看>>
我们为什么需要大数据技术?
查看>>
大数据使超级计算机迎来第二春
查看>>
多地政府探索“大数据”惠民之道 信息垄断须打破
查看>>
民营经济的支撑不可或缺
查看>>
共赢安全大数据 SIEM助企业准确识别威胁
查看>>
社会化媒体营销发展报告发布 大数据受重视
查看>>
大数据促使云计算行业向规模化发展
查看>>
融合广播或成电台发展新方向 进军大数据
查看>>
场外市场布局“大数据”
查看>>
看“大数据”如何帮我们寻找爱情
查看>>
国家统计局:利用大数据做好网购统计
查看>>
大数据揭另类爱情报告:每4对新婚有1对离婚
查看>>