欢迎光临散文网 会员登陆 & 注册

1

2023-06-16 10:47 作者:释o怀灬  | 我要投稿

#include <iostream>

#include <fstream>

#include <string>

#include <vector>

#include <chrono>

#include <mpi.h>


using namespace std;

using namespace std::chrono;


int main(int argc, char **argv) {

    MPI_Init(&argc, &argv);

    int rank, size;

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    MPI_Comm_size(MPI_COMM_WORLD, &size);


    double start_time = MPI_Wtime();


    if (rank == 0) {

        // 进程0读取CSV文件并将其行数分配给每个进程

        ifstream input("data.csv");

        vector<vector<double>> data;

        string line, value;


        while (getline(input, line)) {

            vector<double> row;

            stringstream ss(line);

            while (getline(ss, value, ',')) {

                row.push_back(stod(value));

            }

            data.push_back(row);

        }


        int data_len = data.size();

        int chunk_size = data_len / size;

        int extra = data_len % size;


        // 向其他进程广播数据长度和每个进程的数据数量

        MPI_Bcast(&data_len, 1, MPI_INT, 0, MPI_COMM_WORLD);

        MPI_Bcast(&chunk_size, 1, MPI_INT, 0, MPI_COMM_WORLD);

        MPI_Bcast(&extra, 1, MPI_INT, 0, MPI_COMM_WORLD);


        // 将数据按平均分配给其他进程

        for (int i = 1; i < size; i++) {

            int start = i * chunk_size;

            int end = (i + 1) * chunk_size;

            if (i == size - 1) end += extra;

            MPI_Send(&data[start][0], (end - start) * 4, MPI_DOUBLE, i, 0, MPI_COMM_WORLD);

        }

    } else {

        // 接收广播的数据长度和每个进程的数据数量

        int data_len, chunk_size, extra;

        MPI_Bcast(&data_len, 1, MPI_INT, 0, MPI_COMM_WORLD);

        MPI_Bcast(&chunk_size, 1, MPI_INT, 0, MPI_COMM_WORLD);

        MPI_Bcast(&extra, 1, MPI_INT, 0, MPI_COMM_WORLD);


        // 接收平均分配给它们的数据

        int start = rank * chunk_size;

        int end = (rank + 1) * chunk_size;

        if (rank == size - 1) end += extra;

        vector<vector<double>> data(end - start, vector<double>(4));

        MPI_Recv(&data[0][0], (end - start) * 4, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);


        // 对数据进行处理并计算平均数

        vector<double> result(end - start);

        for (int i = 0; i < end - start; i++) {

            double avg = 0.0;

            for (int j = 0; j < 4; j++) {

                avg += data[i][j];

            }

            result[i] = avg / 4.0;

        }


        // 将计算结果发送回进程0

        MPI_Send(&result[0], end - start, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);

    }


    if (rank == 0) {

        // 进程0接收其他进程发送的计算结果并写回文件

        vector<double> results;

        for (int i = 1; i < size; i++) {

            int start = i * chunk_size;

            int end = (i + 1) * chunk_size;

            if (i == size - 1) end += extra;

            vector<double> buf(end - start);

            MPI_Recv(&buf[0], end - start, MPI_DOUBLE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

            results.insert(results.end(), buf.begin(), buf.end());

        }


        ofstream output("data_processed.csv");

        output << "Average" << endl;

        for (double r : results) {

            output << r << endl;

        }


        // 记录从开始到结尾的时间并输出

        double end_time = MPI_Wtime();

        double elapsed_time = end_time - start_time;

        cout << "Total elapsed time: " << elapsed_time << endl;

    }


    MPI_Finalize();

    return 0;

}


1的评论 (共 条)

分享到微博请遵守国家法律