OpenMP实现生产者消费者问题

作业要求

使用$OpenMP$实现生产者-消费者程序:多个线程中的一部分线程是生产者,另外一部分线程是消费者。假设有$n$个生成者和$n$个文件集合,每个生产者针对一个文件读取文本,并将读出的文本行插入到一个共享的队列中。消费者从共享队列中取出文本行,并对文本行进行分词。消费者在分词时,发现一个单词就将该单词输出到$stdout$
要求:至少$4$个文件集合

具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <iostream>
#include <omp.h>
#include <queue>
#include <fstream>

// 四个文件集合,四个生产者,四个消费者
signed main()
{
omp_set_num_threads(8); // 四个生产者 + 四个消费者
std::queue<std::string> q; // 共享队列
int ok = 0; // 记录已经生产完所有消息的生产者的数量

#pragma omp parallel shared(q, ok)
{
int tid = omp_get_thread_num();
if (tid >= 1 && tid <= 4) // 如果tid在1到4之间,将其作为生产者
{
// 构造path
std::string path = "";
path += char(tid + '0');
path += ".txt";

// 打开该生产者对应的文件
std::ifstream fin;
fin.open(path, std::ios::in);

// 将读出的文本行插入共享队列
std::string str;
while (getline(fin, str))
{
#pragma omp critical
q.push(str);
}

// 关闭文件,记录当前生产者已经生产完毕
fin.close();
++ok;
}
else // 消费者
{
while (q.size() || ok != 4) // 当共享队列中还有东西,或者还有生产者没有生产完毕,消费者就不能停下
{
std::string res;
#pragma omp critical
{
if (q.size()) // 从共享队列中取出文本行
{
res = q.front();
q.pop();
}
}
// 分词过程,发现一个单词就将该单词输出到stdout
std::string word = "";
for (int i = 0; i < res.size(); ++i)
{
if (res[i] == ' ')
{
// 如果输出单词不加锁,会有单词粘黏在一起的风险
// 但是加了锁感觉这个并行程序就毫无意义了
// #pragma omp critical
std::cout << word << ' ';
word = "";
}
else
word += res[i];
}
}
}
}
return 0;
}

运行方式

1
2
g++ -o homework -fopenmp homework.cpp
./homework.exe