Skip to content

Commit d6a5c22

Browse files
authored
Merge pull request #4 from ffengc/dev
cn reamde done, cn docs done
2 parents 82429ff + a9f0842 commit d6a5c22

File tree

8 files changed

+333
-6
lines changed

8 files changed

+333
-6
lines changed

README.md

Lines changed: 290 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
# 基于事件驱动的管道通信系统框架
22

3+
![](./assets/6.png)
4+
35
- [基于事件驱动的管道通信系统框架](#基于事件驱动的管道通信系统框架)
46
- [复用该Reactor模式框架的方法](#复用该reactor模式框架的方法)
57
- [项目基本框架](#项目基本框架)
68
- [项目基本信息](#项目基本信息)
9+
- [如何运行本项目?](#如何运行本项目)
710
- [不同lambda组合实验](#不同lambda组合实验)
811
- [文件目录结构](#文件目录结构)
912
- [客户端和服务端执行流程](#客户端和服务端执行流程)
@@ -16,11 +19,15 @@
1619
- [`__recver`, `__sender``__excepter`实现](#__recver-__sender和__excepter实现)
1720
- [开启写事件的关心 `enable_read_write`](#开启写事件的关心-enable_read_write)
1821
- [client和server分别提供的worker方法和callback方法](#client和server分别提供的worker方法和callback方法)
22+
- [client的worker和callback](#client的worker和callback)
23+
- [server的worker和callback](#server的worker和callback)
1924
- [设置ET模式的非阻塞](#设置et模式的非阻塞)
25+
- [基本概念](#基本概念)
26+
- [为什么ET模式一定要是非阻塞的读取才行](#为什么et模式一定要是非阻塞的读取才行)
27+
- [设置文件描述符为非阻塞](#设置文件描述符为非阻塞)
2028
- [消息结构和粘包处理-序列化与反序列化-报头实现](#消息结构和粘包处理-序列化与反序列化-报头实现)
2129
- [文件描述符的封装](#文件描述符的封装)
22-
- [`poll.hpp`多路转接的封装](#pollhpp多路转接的封装)
23-
- [`log.hpp``thread.hpp`的封装](#loghpp和threadhpp的封装)
30+
- [`poll.hpp`多路转接的封装, `log.hpp``thread.hpp`的封装](#pollhpp多路转接的封装-loghpp和threadhpp的封装)
2431
- [负值数控制](#负值数控制)
2532

2633

@@ -44,6 +51,31 @@
4451
- **封装linux中epoll的相关操作到 `./Utils/poll.hpp`中,增加代码的可读性。**
4552
- 封装该项目的核心对象`class poll_control`。本质上是一个reactor服务。客户端和服务端均可复用这个对象的代码,管理所需要的线程,和线程所对应需要做的函数回调。**这个对象我认为是本次项目的核心所在,它可以避免在客户端进程和服务端进程中,分别编写控制线程的逻辑,使得线程控制的逻辑从客户端和服务端中解耦出来,大大减少代码的冗余,大大提高了代码的二次开发潜力。具体核心实现可以见见 `./Utils/poll_control.hpp`**
4653

54+
## 如何运行本项目?
55+
56+
克隆这个仓库:
57+
```bash
58+
https://github.com/ffengc/Event-Driven-Pipeline-Communication-System-Framework
59+
```
60+
进入这个仓库:
61+
```bash
62+
cd Event-Driven-Pipeline-Communication-System-Framework;
63+
```
64+
编译:
65+
```bash
66+
make clean;make
67+
```
68+
打开第一个终端,进入server目录启动服务端:
69+
```bash
70+
cd Server; ./server 1
71+
```
72+
打开第二个终端,进入client目录启动客户端:
73+
```bash
74+
cd Client; ./client 1
75+
```
76+
77+
![](./assets/3.png)
78+
4779
## 不同lambda组合实验
4880

4981
- **[exp.md](./docs/exp.md)**
@@ -431,18 +463,270 @@ void loop_once() {
431463
432464
### 开启写事件的关心 `enable_read_write`
433465
434-
466+
```cpp
467+
void enable_read_write(connection* conn, bool readable, bool writable) {
468+
uint32_t events = (readable ? EPOLLIN : 0) | (writable ? EPOLLOUT : 0);
469+
if (!__poll.control_poll(conn->__fd, events))
470+
logMessage(ERROR, "trigger write event fail");
471+
}
472+
```
473+
这个函数会被当epoll获取到读事件后,进行回调后被调用。因为在本项目中,epoll如果获取到了读事件,就会需要把数据写到cache里,然后发送到另一条管道里,因此需要允许写事件的发生。然后前面也提到了epoll是只默认关心读事件的,因此写事件需要手动开启。
435474

436475
## client和server分别提供的worker方法和callback方法
437476

477+
### client的worker和callback
478+
479+
对于client来说,worker的工作就是按照一定规律生产数据,并传输到对应的文件描述符上。
480+
思路是非常简单的,直接实现即可,使用write把数据写到管道中去,当然,需要序列化消息和加上报头。
481+
482+
```cpp
483+
void* worker(void* args) {
484+
__thread_data* td = (__thread_data*)args;
485+
poll_control* pc = (poll_control*)td->__args;
486+
// 在这里构造Task
487+
std::random_device rd;
488+
std::mt19937 gen(rd());
489+
std::exponential_distribution<> dist(pc->__lambda); // 这里用命令行传递过来的参数
490+
size_t mesg_number = 0;
491+
while (true) {
492+
mesg_number++;
493+
double interval = dist(gen); // 生成符合负指数分布的随机数
494+
unsigned int sleepTime = static_cast<unsigned int>(std::floor(interval)); // 负指数
495+
sleep(sleepTime);
496+
// 这里要生成一条数据
497+
struct message msg;
498+
msg.mesg_number = mesg_number;
499+
msg.src_tid = pthread_self();
500+
memset(msg.data, '0', sizeof(msg.data));
501+
// 现在数据已经生成好了,现在需要发给conn,通过管道的方式,那么这个管道的fd在哪?
502+
std::cout << "generate a mesg[" << mesg_number << "], src_tid: " << msg.src_tid << std::endl;
503+
int cur_fd = pc->__worker_thread_name_fd_map[td->__name]; // 所以只需要把信息放到cur_fd的管道里面就可以了
504+
// 在把消息放进去之前,先encode一下,协议定制!
505+
std::string encoded = encode(msg) + "\n\r\n"; // "\n\r\n" 就是防止粘包的标识
506+
// 写到管道中去
507+
write(cur_fd, encoded.c_str(), encoded.size());
508+
if (mesg_number >= MESG_NUMBER) {
509+
// 最多发MESG_NUMBER条消息
510+
pc->__worker_finish_count++; // 设置退出信号
511+
break;
512+
}
513+
}
514+
return nullptr;
515+
}
516+
```
517+
518+
对于client的callback,就是epoll获取到读事件之后,把东西从cache中放到写管道的过程,并调用 `enable_read_write` 允许写事件触发。
519+
520+
```cpp
521+
void callback(connection* conn) {
522+
auto& q = conn->__tsvr->__local_cache;
523+
std::string buffer;
524+
while (!q.empty()) {
525+
// 访问队列前端的元素
526+
std::string single_msg = q.front();
527+
buffer += single_msg + "\n\r\n";
528+
q.pop();
529+
}
530+
// 此时buffer里就是要发送的数据了,发送的fd是哪个?conn->__tsvr->__connector_to_connector_fd
531+
auto send_conn = conn->__tsvr->__connection_map[conn->__tsvr->__connector_to_connector_fd];
532+
send_conn->__out_buffer += buffer;
533+
conn->__tsvr->enable_read_write(send_conn, true, true); // 允许写!
534+
}
535+
```
536+
537+
### server的worker和callback
538+
539+
server的worker就是从管道中获取事件并打印出来,callback和client基本上是一样的,只是有细微区别。对于client来说,epoll只需要往一个fd中写入数据,但是对于server来说,如结构图所示,需要往3个fd中平均写入,控制这里的逻辑非常简单,可以直接看代码,这里不再解释。
540+
438541
## 设置ET模式的非阻塞
439542

543+
这一部分更详细的解释可以参考我的个人博客:[work_reactor.html](https://ffengc.github.io/gh-blog/blogs/reactor-server/work_reactor.html)
544+
545+
### 基本概念
546+
547+
epoll有两种工作模式,水平触发(LT)和边缘触发(ET)
548+
549+
- LT模式: 如果我手里有你的数据,我就会一直通知
550+
- ET模式: 只有我手里你数据是首次到达,从无到有,从有到多(变化)的时候,我才会通知你
551+
552+
**细节:**
553+
554+
我为什么要听ET模式的?凭什么要立刻去走?我如果不取,底层再也不通知了,上层调用就 无法获取该fd的就绪事件了,无法再调用recv, 数据就丢失了。倒逼程序员,如果数据就绪, 就必须一次将本轮就绪的数据全部取走。
555+
556+
我可以暂时不处理LT中就绪的数据吗?可以! 因为我后面还有读取的机会。
557+
558+
如果LT模式,我也一次将数据取完的话,LT和ET的效率是没有区别的。
559+
560+
ET模式为什么更高效?
561+
562+
更少的返回次数(毕竟一次epoll_wait都是一次内核到用户)
563+
564+
ET模式会倒逼程序员尽快将缓冲区中的数据全部取走,应用层尽快的去走了缓冲区中的数据,那么在单位时间下,该模式下工作的服务器,就可以在一定程度上,给发送方发送一 个更大的接收窗口,所以对方就可以拥有一个工大的滑动窗 口,一次向我们发送更多的数据,提高IO吞吐。
565+
566+
### 为什么ET模式一定要是非阻塞的读取才行
567+
568+
结论:et模式一定要是非阻塞读取。为什么?
569+
570+
首先,et模式要一次全部读完!怎么才能一次读完呢?我都不知道有多少,怎么保证一次读完?所以我们要连续读,一直读!循环读!读到没有数据为止!
571+
572+
ok!读到没有数据, recv就会阻塞!这就不行了,我们不允许阻塞!
573+
574+
所以怎么办?把这个sock设置成非阻塞的sock,这种sock有个特点:一直读,读到没数据了,不阻塞!直接返回报错,报一个错误:EAGAIN。而这个EAGAIN,可以告诉我们,读完了!
575+
576+
### 设置文件描述符为非阻塞
577+
578+
可以直接调用系统调用`fcntl`
579+
580+
![](./assets/2.png)
581+
582+
```cpp
583+
static bool set_non_block_fd(int fd) { // 文件描述符设置为非阻塞的文件描述符
584+
int fl = fcntl(fd, F_GETFL);
585+
if (fl < 0)
586+
return false;
587+
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
588+
return true;
589+
}
590+
```
591+
440592
## 消息结构和粘包处理-序列化与反序列化-报头实现
441593
594+
消息结构:
595+
596+
```cpp
597+
struct message {
598+
size_t mesg_number;
599+
uint64_t src_tid; // 8个字节
600+
char data[4096]; // 4096个字节
601+
};
602+
```
603+
604+
序列化方法:
605+
606+
```cpp
607+
std::string encode(const message& msg) {
608+
std::ostringstream out;
609+
// 编码 mesg_number 和 src_tid 为十六进制字符串
610+
out << std::hex << msg.mesg_number << '|' << msg.src_tid << '|';
611+
// 编码 data,处理特殊字符
612+
for (int i = 0; i < 4096; i++) {
613+
if (std::isprint(msg.data[i]) && msg.data[i] != '%') {
614+
out << msg.data[i];
615+
} else {
616+
out << '%' << std::setw(2) << std::setfill('0') << std::hex << (unsigned int)(unsigned char)msg.data[i];
617+
}
618+
}
619+
return out.str();
620+
}
621+
```
622+
623+
反序列化方法:
624+
625+
```cpp
626+
// 反序列化
627+
bool decode(const std::string& serialized, message& msg) {
628+
std::istringstream in(serialized);
629+
std::string mesg_number_hex, tid_hex;
630+
if (!std::getline(in, mesg_number_hex, '|') || !std::getline(in, tid_hex, '|'))
631+
return false;
632+
// 解析 mesg_number
633+
std::istringstream mesg_number_stream(mesg_number_hex);
634+
mesg_number_stream >> std::hex >> msg.mesg_number;
635+
// 解析 src_tid
636+
std::istringstream tid_stream(tid_hex);
637+
tid_stream >> std::hex >> msg.src_tid;
638+
// 解析 data
639+
std::string data;
640+
std::getline(in, data); // 读取剩余部分作为 data
641+
size_t i = 0, j = 0;
642+
while (i < data.size() && j < 4096) {
643+
if (data[i] == '%' && i + 2 < data.size()) {
644+
std::istringstream hex_char(data.substr(i + 1, 2));
645+
int value;
646+
hex_char >> std::hex >> value;
647+
msg.data[j++] = static_cast<char>(value);
648+
i += 3; // 跳过 "%XX"
649+
} else {
650+
msg.data[j++] = data[i++];
651+
}
652+
}
653+
return true;
654+
}
655+
```
656+
657+
报文分割符设置为: `\n\r\n`
658+
659+
分割报文方法:
660+
```cpp
661+
std::vector<std::string> extract_messages(std::string& buffer) {
662+
std::vector<std::string> messages;
663+
std::string delimiter = "\n\r\n";
664+
size_t pos = 0;
665+
std::string token;
666+
while ((pos = buffer.find(delimiter)) != std::string::npos) {
667+
token = buffer.substr(0, pos);
668+
messages.push_back(token);
669+
buffer.erase(0, pos + delimiter.length());
670+
}
671+
return messages;
672+
}
673+
```
674+
442675
## 文件描述符的封装
443676
444-
## `poll.hpp`多路转接的封装
677+
为什么需要封装fd:
678+
679+
因为读取是非阻塞的,所以需要对报文做切割处理,因为是非阻塞读取,所以epoll在某个fd进行读取时候是会一次性读完的!读完的字节流可能含有多个报文,因此需要一个缓冲区,来做报文切割的任务,因此每一个fd都需要配套一个缓冲区。除此之外每一个fd的三种就绪事件对应的回调,也应该整合起来,因此把fd封装成 `connection` 类型。这个类型最关键的,就是三种回调方法,输入缓冲区和输出缓冲区。其余还有一些细节,比如回指指针等等。
680+
681+
封装后结构如下所示:
445682
446-
## `log.hpp`和`thread.hpp`的封装
683+
```cpp
684+
class poll_control;
685+
class connection;
686+
using func_t = std::function<void(connection*)>;
687+
using callback_t = std::function<void(connection*)>; // 业务逻辑
688+
/**
689+
* 对于client来说callback负责把cache的东西,放到发送的文件描述符中的out_buffer里去
690+
* 对于server来说callback就是把cache的东西,平均分配到3个worker线程对应的pipe_fd的out_buffer里去
691+
*/
692+
class connection {
693+
public:
694+
connection(int fd = -1)
695+
: __fd(fd)
696+
, __tsvr(nullptr) { }
697+
~connection() { }
698+
void set_callback(func_t recv_cb, func_t send_cb, func_t except_cb) {
699+
__recv_callback = recv_cb;
700+
__send_callback = send_cb;
701+
__except_callback = except_cb;
702+
}
703+
704+
public:
705+
int __fd; // io的文件描述符
706+
func_t __recv_callback;
707+
func_t __send_callback;
708+
func_t __except_callback;
709+
std::string __in_buffer; // 输入缓冲区(暂时没有处理二进制流)
710+
std::string __out_buffer; // 输出缓冲区
711+
poll_control* __tsvr; // 回指指针
712+
};
713+
```
714+
715+
## `poll.hpp`多路转接的封装, `log.hpp``thread.hpp`的封装
716+
717+
可以直接看代码,这里都是一些比较简单的封装。
718+
719+
## 负值数控制
720+
721+
使用C++11随机数生成的方法进行控制。
722+
723+
```cpp
724+
std::random_device rd;
725+
std::mt19937 gen(rd());
726+
std::exponential_distribution<> dist(pc->__lambda); // 这里用命令行传递过来的参数
727+
double interval = dist(gen); // 生成符合负指数分布的随机数
728+
unsigned int sleepTime = static_cast<unsigned int>(std::floor(interval)); // 负指数
729+
sleep(sleepTime);
730+
```
447731
448-
## 负值数控制
732+
通过这种方法可以控制负指数生成的逻辑。

assets/2.png

369 KB
Loading

assets/3.png

1.77 MB
Loading

assets/4.png

1.35 MB
Loading

assets/5.png

2.62 MB
Loading

assets/6.png

82.4 KB
Loading

docs/exp.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# 不同lambda组合实验
2+
3+
在负指数分布中,如果参数越大,则表示 C++11 生成器生成的数字越小,这意味着 `sleep()` 的时间越小,表示事件越频繁。其次,我设置的逻辑是,server 端会在 client 关闭了写端描述符后,才会退出,因此,如果 client 已经发送完了所有信息,则会关闭中间管道的文件描述符,此时,无论 server 是否读完所有数据,server 都会直接退出。其三,我设置的逻辑是,client 的每一个 worker 端在发送 10 条消息(一共 30 条信息)后,自行退出(这个数字可以在配置文件更改)。
4+
5+
**为了方便观察,在server端,我打印了pc对象的cache缓存大小,用于观察和分析。**
6+
7+
**生产速率大于消费速率时:**
8+
9+
如图所示,当生产速率大于消费速率的时候,server端中pc对象中维护的缓存一直是处于高水平状态,这个也是符合预期的,因为生产速率特别快,信息很快就从管道从客户端发送过来,因此会在服务端导致堆积。与此同时,由于生产者(客户端)在生产完特定数据之后,就会关闭中间管道的文件描述符,因此也会导致server端也会对应的关闭,因此可以观潮到,server端并没有完整读取到了30条数据。
10+
11+
![](../assets/4.png)
12+
13+
**生产速率小于消费速率:**
14+
15+
如图所示,当生产速率小于消费速率的时候,server端pc对象维护的缓存一直处于较低水平,这是因为生产速率太慢,生产完的数据会立刻被消费,因此缓存不会有太多数据堆积。除此之外,因为server端消费更快,因此可以在client关闭写端文件描述符之前完整地消费所有client发送过来的数据。
16+
17+
![](../assets/5.png)

docs/reuse.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# 框架复用
2+
3+
复用该框架,需要`./Utils``./temp`目录中的内容。
4+
5+
构造`poll_control`对象:
6+
7+
```cpp
8+
poll_control* pc = new poll_control(worker,
9+
callback, WORKER_NUMBER, worker_fds, connector_to_worker_fds, connector_to_connector_fd, lambda_arg, CLIENT);
10+
pc->dispather();
11+
```
12+
13+
其中参数含义如下所示:
14+
- `worker`: worker线程要做的事情,类型为: `void* worker(void* args);`的函数指针。构造pc对象后,会启动`WORKER_NUMBER`个线程执行worker所定义的内容。
15+
- `callback`: pc对象中维护的多路转接所要执行的回调,类型为: `void callback(connection* conn)`
16+
- `using callback_t = std::function<void(connection*)>;`
17+
- `WORKER_NUMBER`: worker线程数量,类型为整型。
18+
- `worker_fds`: 与worker线程通信的文件描述符列表,序列大小和`WORKER_NUMBER`相同。类型为: `std::vector<int>`
19+
- `connector_to_worker_fds`: pc对象中epoll和worker通信的文件描述符,也是epoll需要关心的文件描述符,序列大小和`WORKER_NUMBER`相同。类型为: `std::vector<int>`
20+
- `connector_to_connector_fd`: epoll对象关心的外界sock/fd,比如在这个项目中,`connector_to_connector_fd`就是和另一个进程进行管道通信的fd,类型为整型。
21+
- `lambda_arg`: 类型为double,是本项目特有设置的参数,具体可见[readme](../README.md),如果不需要可以删除。
22+
- `CLIENT`: 本项目特有的类型,为整型,表示当前是客户端还是服务端,具体可见[readme](../README.md),如果不需要可以删除。
23+
24+
注意:
25+
- worker线程会在构造函数调用之后启动
26+
- 多路转接服务需要手动调用 `dispather()` 转发才能启动。

0 commit comments

Comments
 (0)