1
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <mpi.h>
using namespace std;
// 读取 CSV 文件,并返回一个二维数组
vector<vector<double>> read_csv(string filename) {
vector<vector<double>> data;
ifstream file(filename);
if (file.is_open()) {
string line;
while (getline(file, line)) {
vector<double> row;
size_t pos = 0;
string token;
while ((pos = line.find(",")) != string::npos) {
token = line.substr(0, pos);
row.push_back(stod(token));
line.erase(0, pos + 1);
}
row.push_back(stod(line));
data.push_back(row);
}
file.close();
}
return data;
}
// 计算一行数据的平均数
double mean(vector<double> row) {
double sum = 0.0;
for (double i : row) {
sum += i;
}
return sum / double(row.size());
}
int main(int argc, char** argv) {
// 初始化 MPI 环境
int rank;
int size;
double start_time, end_time;
start_time = MPI_Wtime();
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
// 读取 CSV 文件
vector<vector<double>> data = read_csv("data.csv");
vector<double> local_means;
// 计算每个进程分配到的行数据的平均数
int num_rows_per_proc = data.size() / size;
for (int i = rank * num_rows_per_proc; i < (rank + 1) * num_rows_per_proc; i++) {
double m = mean(data[i]);
local_means.push_back(m);
}
// 等待所有进程计算完成
MPI_Barrier(MPI_COMM_WORLD);
// 汇总所有进程的结果
int num_local_means = local_means.size();
vector<double> all_means(num_local_means * size);
MPI_Gather(&local_means[0], num_local_means, MPI_DOUBLE,
&all_means[0], num_local_means, MPI_DOUBLE,
0, MPI_COMM_WORLD);
// 将结果输出到新的 CSV 文件
if (rank == 0) {
ofstream file("result.csv");
for (double m : all_means) {
file << m << endl;
}
file.close();
end_time = MPI_Wtime();
printf("时间:%f秒\n", end_time - start_time);
}
// 结束 MPI 环境
MPI_Finalize();
return 0;
}