1
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <chrono>
#include <mpi.h>
using namespace std;
using namespace std::chrono;
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
double start_time = MPI_Wtime();
if (rank == 0) {
// 进程0读取CSV文件并将其行数分配给每个进程
ifstream input("data.csv");
vector<vector<double>> data;
string line, value;
while (getline(input, line)) {
vector<double> row;
stringstream ss(line);
while (getline(ss, value, ',')) {
row.push_back(stod(value));
}
data.push_back(row);
}
int data_len = data.size();
int chunk_size = data_len / size;
int extra = data_len % size;
// 向其他进程广播数据长度和每个进程的数据数量
MPI_Bcast(&data_len, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&chunk_size, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&extra, 1, MPI_INT, 0, MPI_COMM_WORLD);
// 将数据按平均分配给其他进程
for (int i = 1; i < size; i++) {
int start = i * chunk_size;
int end = (i + 1) * chunk_size;
if (i == size - 1) end += extra;
MPI_Send(&data[start][0], (end - start) * 4, MPI_DOUBLE, i, 0, MPI_COMM_WORLD);
}
} else {
// 接收广播的数据长度和每个进程的数据数量
int data_len, chunk_size, extra;
MPI_Bcast(&data_len, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&chunk_size, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&extra, 1, MPI_INT, 0, MPI_COMM_WORLD);
// 接收平均分配给它们的数据
int start = rank * chunk_size;
int end = (rank + 1) * chunk_size;
if (rank == size - 1) end += extra;
vector<vector<double>> data(end - start, vector<double>(4));
MPI_Recv(&data[0][0], (end - start) * 4, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// 对数据进行处理并计算平均数
vector<double> result(end - start);
for (int i = 0; i < end - start; i++) {
double avg = 0.0;
for (int j = 0; j < 4; j++) {
avg += data[i][j];
}
result[i] = avg / 4.0;
}
// 将计算结果发送回进程0
MPI_Send(&result[0], end - start, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);
}
if (rank == 0) {
// 进程0接收其他进程发送的计算结果并写回文件
vector<double> results;
for (int i = 1; i < size; i++) {
int start = i * chunk_size;
int end = (i + 1) * chunk_size;
if (i == size - 1) end += extra;
vector<double> buf(end - start);
MPI_Recv(&buf[0], end - start, MPI_DOUBLE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
results.insert(results.end(), buf.begin(), buf.end());
}
ofstream output("data_processed.csv");
output << "Average" << endl;
for (double r : results) {
output << r << endl;
}
// 记录从开始到结尾的时间并输出
double end_time = MPI_Wtime();
double elapsed_time = end_time - start_time;
cout << "Total elapsed time: " << elapsed_time << endl;
}
MPI_Finalize();
return 0;
}