博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mooon-agent接收状态机代码摘要
阅读量:6561 次
发布时间:2019-06-24

本文共 5383 字,大约阅读时间需要 17 分钟。

  • recv_machine.h

点击(此处)折叠或打开

  1. #ifndef MOOON_AGENT_RECV_MACHINE_H
  2. #define MOOON_AGENT_RECV_MACHINE_H
  3. #include <agent/message.h>
  4. AGENT_NAMESPACE_BEGIN
  5. class CAgentThread;
  6. class CRecvMachine
  7. {
  8. private:
  9.     /***
  10.       * 接收状态值
  11.       */
  12.     typedef enum recv_state_t
  13.     {
  14.         rs_header, /** 接收消息头状态 */
  15.         rs_body /** 接收消息体状态 */
  16.     }TRecvState;
  17.     /***
  18.       * 接收状态上下文
  19.       */
  20.     struct RecvStateContext
  21.     {
  22.         const char* buffer; /** 当前的数据buffer */
  23.         size_t buffer_size; /** 当前的数据字节数 */
  24.         
  25.         RecvStateContext(const char* buf=NULL, size_t buf_size=0)
  26.          :buffer(buf)
  27.          ,buffer_size(buf_size)
  28.         {
  29.         }
  30.         
  31.         RecvStateContext(const RecvStateContext& other)
  32.          :buffer(other.buffer)
  33.          ,buffer_size(other.buffer_size)
  34.         {
  35.         }
  36.         
  37.         RecvStateContext& operator =(const RecvStateContext& other)
  38.         {
  39.             buffer = other.buffer;
  40.             buffer_size = other.buffer_size;
  41.             return *this;
  42.         }
  43.     };
  44.     
  45. public:
  46.     CRecvMachine(CAgentThread* thread);
  47.     util::handle_result_t work(const char* buffer, size_t buffer_size);
  48.     void reset();
  49.     
  50. private:
  51.     void set_next_state(recv_state_t next_state)
  52.     {
  53.         _recv_state = next_state;
  54.         _finished_size = 0;
  55.     }
  56.     
  57.     util::handle_result_t handle_header(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);
  58.     util::handle_result_t handle_body(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);
  59.     util::handle_result_t handle_error(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx);
  60.        
  61. private:
  62.     CAgentThread* _thread; /** 需要通过CAgentThread取得CProcessorManager */
  63.     agent_message_header_t _header; /** 消息头,这个大小是固定的 */
  64.     recv_state_t _recv_state; /** 当前的接收状态 */
  65.     size_t _finished_size; /** 当前状态已经接收到的字节数,注意不是总的已经接收到的字节数,只针对当前状态 */
  66. };
  67. AGENT_NAMESPACE_END
  68. #endif // MOOON_AGENT_RECV_MACHINE_H
  • recv_machine.cpp

点击(此处)折叠或打开

  1. #include "recv_machine.h"
  2. #include "agent_thread.h"
  3. AGENT_NAMESPACE_BEGIN
  4. CRecvMachine::CRecvMachine(CAgentThread* thread)
  5.  :_thread(thread)
  6. {
  7.     set_next_state(rs_header);
  8. }
  9. // 状态机入口函数
  10. // 状态机工作原理:-> rs_header -> rs_body -> rs_header
  11. // -> rs_header -> rs_error -> rs_header
  12. // -> rs_header -> rs_body -> rs_error -> rs_header
  13. // 参数说明:
  14. // buffer - 本次收到的数据,注意不是总的
  15. // buffer_size - 本次收到的数据字节数
  16. util::handle_result_t CRecvMachine::work(const char* buffer, size_t buffer_size)
  17. {
  18.     RecvStateContext next_ctx(buffer, buffer_size);
  19.     util::handle_result_t hr = util::handle_continue;
  20.     
  21.     // 状态机循环条件为:util::handle_continue == hr
  22.     while (util::handle_continue == hr)
  23.     {
  24.         RecvStateContext cur_ctx(next_ctx);
  25.         
  26.         switch (_recv_state)
  27.         {
  28.         case rs_header:
  29.             hr = handle_header(cur_ctx, &next_ctx);
  30.             break;
  31.         case rs_body:
  32.             hr = handle_body(cur_ctx, &next_ctx);
  33.             break;
  34.         default:
  35.             hr = handle_error(cur_ctx, &next_ctx);
  36.             break;
  37.         }
  38.     }
  39.         
  40.     return hr;
  41. }
  42. void CRecvMachine::reset()
  43. {
  44.     set_next_state(rs_header);
  45. }
  46. // 处理消息头部
  47. // 参数说明:
  48. // cur_ctx - 当前上下文,
  49. // cur_ctx.buffer为当前收到的数据buffer,包含了消息头,但也可能包含了消息体。
  50. // cur_ctx.buffer_size为当前收到字节数
  51. // next_ctx - 下一步上下文,
  52. // 由于cur_ctx.buffer可能包含了消息体,所以在一次接收receive动作后,
  53. // 会涉及到消息头和消息体两个状态,这里的next_ctx实际为下一步handle_body的cur_ctx
  54. util::handle_result_t CRecvMachine::handle_header(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)
  55. {
  56.     if (_finished_size + cur_ctx.buffer_size < sizeof(agent_message_header_t))
  57.     {
  58.         memcpy(reinterpret_cast<char*>(&_header) + _finished_size
  59.               ,cur_ctx.buffer
  60.               ,cur_ctx.buffer_size);
  61.               
  62.         _finished_size += cur_ctx.buffer_size;
  63.         return util::handle_continue;
  64.     }
  65.     else
  66.     {
  67.         size_t need_size = sizeof(agent_message_header_t) - _finished_size;
  68.         memcpy(reinterpret_cast<char*>(&_header) + _finished_size
  69.               ,cur_ctx.buffer
  70.               ,need_size);
  71.               
  72.         // TODO: Check header here
  73.         
  74.         size_t remain_size = cur_ctx.buffer_size - need_size;
  75.         if (remain_size > 0)
  76.         {
  77.             next_ctx->buffer = cur_ctx.buffer + need_size;
  78.             next_ctx->buffer_size = cur_ctx.buffer_size - need_size;
  79.         }
  80.         
  81.         // 只有当包含消息体时,才需要进行状态切换,
  82.         // 否则维持rs_header状态不变
  83.         if (_header.size > 0)
  84.         {
  85.             // 切换状态
  86.             set_next_state(rs_body);
  87.         }
  88.         else
  89.         {
  90.             CProcessorManager* processor_manager = _thread->get_processor_manager();
  91.             if (!processor_manager->on_message(_header, 0, NULL, 0))
  92.             {
  93.                 return util::handle_error;
  94.             }
  95.         }
  96.                 
  97.         return (remain_size > 0)
  98.               ? util::handle_continue // 控制work过程是否继续循环
  99.               : util::handle_finish;
  100.     }
  101. }
  102. // 处理消息体
  103. // 参数说明:
  104. // cur_ctx - 当前上下文,
  105. // cur_ctx.buffer为当前收到的数据buffer,包含了消息体,但也可能包含了消息头。
  106. // cur_ctx.buffer_size为当前收到字节数
  107. // next_ctx - 下一步上下文,
  108. // 由于cur_ctx.buffer可能包含了消息头,所以在一次接收receive动作后,
  109. // 会涉及到消息头和消息体两个状态,这里的next_ctx实际为下一步handle_header的cur_ctx
  110. util::handle_result_t CRecvMachine::handle_body(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)
  111. {
  112.     CProcessorManager* processor_manager = _thread->get_processor_manager();
  113.     
  114.     if (_finished_size + cur_ctx.buffer_size < _header.size)
  115.     {
  116.         if (!processor_manager->on_message(_header, _finished_size, cur_ctx.buffer, cur_ctx.buffer_size))
  117.         {
  118.             return util::handle_error;
  119.         }
  120.         
  121.         _finished_size += cur_ctx.buffer_size;
  122.         return util::handle_continue;
  123.     }
  124.     else
  125.     {
  126.         size_t need_size = _header.size - _finished_size;
  127.         if (!processor_manager->on_message(_header, _finished_size, cur_ctx.buffer, need_size))
  128.         {
  129.             return util::handle_error;
  130.         }
  131.         
  132.         // 切换状态
  133.         set_next_state(rs_header);
  134.         
  135.         size_t remain_size = cur_ctx.buffer_size - need_size;
  136.         if (remain_size > 0)
  137.         {
  138.             next_ctx->buffer = cur_ctx.buffer + need_size;
  139.             next_ctx->buffer_size = cur_ctx.buffer_size - need_size;
  140.             return util::handle_continue;
  141.         }
  142.         return util::handle_finish;
  143.     }
  144. }
  145. util::handle_result_t CRecvMachine::handle_error(const RecvStateContext& cur_ctx, RecvStateContext* next_ctx)
  146. {
  147.     //AGENT_LOG_ERROR("Network error.\n");
  148.     set_next_state(rs_header); // 无条件切换到rs_header,这个时候应当断开连接重连接
  149.     return util::handle_error;
  150. }
  151. AGENT_NAMESPACE_END

转载于:https://www.cnblogs.com/aquester/archive/2012/07/24/9891822.html

你可能感兴趣的文章
如何在Linux命令行中创建以及展示演示稿
查看>>
FutureTask——另一种闭锁的实现
查看>>
Android和MVC
查看>>
Linux 用户和用户组管理
查看>>
tomcat架构分析(valve源码导读)
查看>>
spring中InitializingBean接口使用理解(转)
查看>>
基于php5.5使用PHPMailer-5.2发送邮件
查看>>
InstallShield 2012 Spring新功能试用(16): Suite/Advanced UI 或 Advanced UI安装程序能在安装时进行输入合法性校验与反馈...
查看>>
C#面试宝典
查看>>
基金项目的英文
查看>>
《软件性能测试与LoadRunner实战教程》喜马拉雅有声图书上线
查看>>
ios 字典转模型
查看>>
正在编译转换: 未能找到元数据文件 EntityFramework.dll
查看>>
Java类集
查看>>
K-Means聚类算法的原理及实现【转】
查看>>
类的生命周期
查看>>
php apache用户写文件夹权限设置
查看>>
003-诠释 Java 工程师【一】
查看>>
浅析rune数据类型
查看>>
普通用户开启AUTOTRACE 功能
查看>>