diff --git a/CMakeLists.txt b/CMakeLists.txt index ea56b6f..9a5ad24 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,6 +32,9 @@ if(DEFINED TEMPEST_NOREGEX) add_compile_definitions(TEMPEST_NOREGEX) endif() +option(STITCHBLOBS_MPI_DEBUG "Enable StitchBlobs MPI debug helpers" OFF) + + if(WIN32) add_compile_definitions(_USE_MATH_DEFINES 1) endif() diff --git a/quick_make_unix.sh b/quick_make_unix.sh index b921f30..ed5ddbf 100755 --- a/quick_make_unix.sh +++ b/quick_make_unix.sh @@ -17,6 +17,10 @@ INSTALL_PREFIX="" # Specify the installation directory. # If left blank, it defaults to the project root (TEMPEST_EXTREMES_SOURCE_DIR) # and final executables will be installed in TEMPEST_EXTREMES_SOURCE_DIR/bin. +# Developer Configuration Options +STITCHBLOBS_MPI_DEBUG="OFF" # "ON" or "OFF" (enables extra MPI debug helpers in StitchBlobs) + + ./remove_depend.sh @@ -123,6 +127,7 @@ cmake_args=( -DCMAKE_BUILD_TYPE="${BUILD_TYPE}" -DCMAKE_CXX_FLAGS_DEBUG="${OPTIMIZATION_LEVEL} ${DEBUG_FLAGS}" -DENABLE_MPI="${ENABLE_MPI}" + -DSTITCHBLOBS_MPI_DEBUG="${STITCHBLOBS_MPI_DEBUG}" -DCMAKE_INSTALL_PREFIX="${INSTALL_PREFIX}" ) diff --git a/src/blobs/CMakeLists.txt b/src/blobs/CMakeLists.txt index 1312b36..e1bf544 100644 --- a/src/blobs/CMakeLists.txt +++ b/src/blobs/CMakeLists.txt @@ -17,8 +17,10 @@ list(APPEND PERSISTENT_BLOBS_FILES list(APPEND STITCH_BLOBS_FILES StitchBlobs.cpp + StitchBlobsMPIUtilities.h ) + include_directories( ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../base @@ -40,6 +42,13 @@ target_link_libraries(PersistentBlobs PUBLIC extremesbase netcdf_c++ ${MPI_CXX_L add_executable(StitchBlobs ${STITCH_BLOBS_FILES}) target_link_libraries(StitchBlobs PUBLIC extremesbase netcdf_c++ ${MPI_CXX_LIBRARIES}) +if(STITCHBLOBS_MPI_DEBUG) + target_compile_definitions(StitchBlobs PRIVATE STITCHBLOBS_MPI_DEBUG=1) +else() + target_compile_definitions(StitchBlobs PRIVATE STITCHBLOBS_MPI_DEBUG=0) +endif() + + # Install executables to the "bin" directory under the installation prefix. install( TARGETS BlobStats DetectBlobs PersistentBlobs StitchBlobs diff --git a/src/blobs/StitchBlobs.cpp b/src/blobs/StitchBlobs.cpp index 55aa5ac..b0e6857 100644 --- a/src/blobs/StitchBlobs.cpp +++ b/src/blobs/StitchBlobs.cpp @@ -14,9 +14,6 @@ /// or implied warranty. /// -#if defined(TEMPEST_MPIOMP) -#include -#endif #include "Constants.h" #include "CoordTransforms.h" @@ -47,1890 +44,86 @@ #include #include #include +#include -/////////////////////////////////////////////////////////////////////////////// - -struct Tag { - - /// - /// Identifier associated with this tag. - /// - int id; - - /// - /// Time index associated with this tag. - /// - int time; - - /// - /// Global id associated with this tag (minimum value 1). - /// - int global_id; - - /// - /// Default constructor. - /// - Tag() : - id(0), - time(0), - global_id(0) - { } - - /// - /// Value-based constructor. - /// - Tag(int a_id, int a_time) : - id(a_id), - time(a_time), - global_id(0) - { } - - /// - /// Copy constructor. - /// - Tag(const Tag & tag) : - id(tag.id), - time(tag.time), - global_id(tag.global_id) - { } - - /// - /// Comparator. - /// - bool operator<(const Tag & tag) const { - if (time < tag.time) { - return true; - } - if (time > tag.time) { - return false; - } - if (id < tag.id) { - return true; - } - return false; - } -}; - - - -#if defined(TEMPEST_MPIOMP) -/// -/// A class used for MPI collective communication for vecAllBlobTags -/// (gather and scatter) between processors) -/// This class will serialize the vecAllBlobTags into an 1-D array -/// and deserialize the received 1-D vecAllBlobTags into the original -/// 2-D vectors after the communication. -/// This class can also generate and return the setAllTags based on the local vecAllBlobTags. -/// -class TagCollectiveOP { - private: - /// - /// Tag for MPI communication for sending/receiving Tag - /// - int gather_tag = 107; - - /// - /// Tag for MPI communication for sending/receiving Tag index - /// - int gather_tag_index = 108; - - /// - /// The MPI Communicator - /// - MPI_Comm m_comm; - - /// - /// The MPI Datatype for Tag - /// - MPI_Datatype MPI_Tag_type; - - /// - /// The Flag that marks whether the vecAllTags are in serialized state. - /// (default with 0; 1 after calling Serialize(); False after calling Deserailize()). - /// - int serializedFlag; - - /// - /// Serialize the std::vector< std::vector> vecAllBlobTags into a 1-D array - /// and generate the index array serialVecAllBlobTags_index - /// - void Serialize() { - serialVecAllBlobTags.clear(); - serialVecAllBlobTags_index.clear(); - int curIndx = 0;//Point to the next empty slot for inserting a new vector - serialVecAllBlobTags_index.push_back(curIndx); - for (int i = 0; i < _vecAllBlobTags.size(); i++) { - for (int j = 0; j < _vecAllBlobTags[i].size(); j++) { - serialVecAllBlobTags.push_back(_vecAllBlobTags[i][j]); - curIndx++; - } - serialVecAllBlobTags_index.push_back(curIndx); - } - serializedFlag = 1; - } - - /// - /// Deserialize the the local std::vector serialVecAllBlobTags array - /// to generate the deserialSetAllTags and desirialVecAllBlobTags - /// the this.serialVecAllBlobTags and this.serialVecAllBlobTags_index will be cleared - /// after the deserialization - /// - void Deserialize() { - for (int i = 0; i < serialVecAllBlobTags_index.size() - 1; i++) { - int startIndx = serialVecAllBlobTags_index[i]; - int endIndx = std::min(serialVecAllBlobTags_index[i+1],int(serialVecAllBlobTags.size())); - std::vector curVecBlobTags; - for (int i = startIndx; i < endIndx; i++) { - curVecBlobTags.push_back(serialVecAllBlobTags[i]); - deserialSetAllTags.insert(serialVecAllBlobTags[i]); - } - desirialVecAllBlobTags.push_back(curVecBlobTags); - } - serialVecAllBlobTags.clear(); - serialVecAllBlobTags_index.clear(); - serializedFlag = 0; - } - - - - protected: - /// - /// The unexchanged original local 2-D vecAllBlobTags that needs to be serialize before communication. - /// - std::vector< std::vector> _vecAllBlobTags; - - /// - /// The serialized 1D vecAllBlobTags that needs to be sent. - /// - std::vector serialVecAllBlobTags; - - /// - /// The serialized 1D vecAllBlobTags index array that needs to be sent. - /// - std::vector serialVecAllBlobTags_index; - - /// - /// Output setAllTags based on the local desirialVecAllBlobTags. (Only used for processor 0) - /// - std::set deserialSetAllTags; - - /// - /// The deserial vecAllBlobTags after communication (Only used for processor 0) - /// - std::vector< std::vector> desirialVecAllBlobTags; - - /// - /// Vector that records the size information of vecAllBlobTags on each processors - /// - std::vector vecScatterCounts; - - /// - /// Vector that records the index information of the serial vecAllBlobTags on each processors - /// - std::vector vecScatterCounts_index; - - public: - - /// - /// Constructor that will read in std::vector< std::vector> vecAllBlobTags and MPI communicator - /// It will also create the derived MPI_Datatype for Tag and commit it. - /// - TagCollectiveOP( - MPI_Comm communicator, - const std::vector< std::vector> & vecAllBlobTags - ) { - this->_vecAllBlobTags = vecAllBlobTags; - this->m_comm = communicator; - this->serializedFlag = 0; - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - - // Create an MPI datatype for the Tag: - struct Tag sampleTag; - int tagFieldsCount = 3; - MPI_Datatype Tag_typesig[3] = {MPI_INT,MPI_INT,MPI_INT}; - int Tag_block_lengths[3] = {1,1,1}; - MPI_Aint Tag_displacements[3]; - - MPI_Aint base_address; - MPI_Get_address(&sampleTag, &base_address); - MPI_Get_address(&sampleTag.id, &Tag_displacements[0]); - MPI_Get_address(&sampleTag.time, &Tag_displacements[1]); - MPI_Get_address(&sampleTag.global_id, &Tag_displacements[2]); - - Tag_displacements[0] = MPI_Aint_diff(Tag_displacements[0], base_address); - Tag_displacements[1] = MPI_Aint_diff(Tag_displacements[1], base_address); - Tag_displacements[2] = MPI_Aint_diff(Tag_displacements[2], base_address); - - MPI_Type_create_struct(tagFieldsCount, Tag_block_lengths, Tag_displacements, Tag_typesig, &MPI_Tag_type); - - int result = MPI_Type_commit(&MPI_Tag_type); - if (result != MPI_SUCCESS) { - _EXCEPTION1("The MPI routine MPI_Type_commit(&MPI_Tag_type) failed (code %i)", result); - } - } - - /// - /// Destructor. - /// - ~TagCollectiveOP(){ - MPI_Type_free(&MPI_Tag_type); - } - - /// - /// The MPI gather process that will gather each local vecAllBlobTags to the processor 0. - /// - void Gather() { - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - - this->Serialize(); - int d = std::ceil(std::log2(size)); // here we use floor. - - for (int j = 0; j < d; j++) { - - // First send the serialVecAllBlobTags - if ((rank & (int)std::round(std::pow(2,j))) != 0) { - - // Send to (world_rank ^ pow(2,j) - int destRank = rank ^ (int)round(std::pow(2,j)); - if (destRank > size - 1) { - continue; - } - - MPI_Send (serialVecAllBlobTags.data(), serialVecAllBlobTags.size(), MPI_Tag_type, destRank, gather_tag, m_comm); - MPI_Send (serialVecAllBlobTags_index.data(), serialVecAllBlobTags_index.size(), MPI_INT, destRank, gather_tag_index, m_comm); - // Simply need to break the algorithm here (juest return, not Finalize()) - return; - - - } else { - // Receive from (world_rank ^ pow(2,j)) - MPI_Status status; - int recvCount; - int sourceRank = rank ^ (int)std::round(std::pow(2,j)); - - if (sourceRank > size - 1) { - continue; - } - - MPI_Probe(sourceRank, gather_tag, m_comm, &status); - - MPI_Get_count( &status, MPI_Tag_type, &recvCount); - std::vector recvTags; - recvTags.resize(recvCount); - MPI_Recv(recvTags.data(), recvCount, MPI_Tag_type, sourceRank, gather_tag, m_comm, &status); - - // Pack the receive Tag into the local serialVecAllBlobTags. - for (auto recvTag : recvTags) { - serialVecAllBlobTags.push_back(recvTag); - } - - MPI_Status status_index; - int recvCount_index; - MPI_Probe(sourceRank, gather_tag_index, m_comm, &status_index); - - MPI_Get_count( &status_index, MPI_INT, &recvCount_index); - std::vector recvTagsIndx; - recvTagsIndx.resize(recvCount_index); - MPI_Recv(recvTagsIndx.data(), recvCount_index, MPI_INT, sourceRank, gather_tag_index, m_comm, &status_index); - - - // Update the received index and then Pack the receive Tag index into the - // local serialVecAllBlobTags_index. - // Example: - // Initial: - // P0 serialVecAllBlobTags: 0, 3, 5, 7; P1 serialVecAllBlobTags: 0, 3, 5, 7 - // After Gather: - // P0 serialVecAllBlobTags: 0, 3, 5, 7, 9, 11, 13 - int serialVecAllBlobTags_index_size = serialVecAllBlobTags_index.size(); - int curLocalTagSize = serialVecAllBlobTags_index[serialVecAllBlobTags_index_size - 1]; - for (int i = 1; i < recvTagsIndx.size(); i++) { - // Update the index - int index = recvTagsIndx[i]; - index += curLocalTagSize; - serialVecAllBlobTags_index.push_back(index); - } - } - } - } - - /// - /// Return the gathered setAllTags (only called by the processor 0) - /// - std::set GetGatheredSetAllTags() { - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - if (rank == 0) { - if (serializedFlag == 1) { - this->Deserialize(); - } - - return deserialSetAllTags; - } else { - _EXCEPTIONT("Only processor 0 should call GetGatheredSetAllTags()."); - } - } - - /// - /// Return the gathered/scattered vecAllTags (only called by the processor 0) - /// GatheredFlag = 1 indicates that returning the vecAllBlobTags after gathering to the processor 0 - /// (in this case, vecAllBlobTags is the global vecAllBlobTags and only processor 0 can call the function) - /// GatheredFalg = 0 indicates that returning the vecAllBlobTags after scattering to each processot. - /// (in this case, vecAllBlobTags is the local vecAllBlobTags and all valid processor can call the function) - /// - std::vector< std::vector > GetUnserialVecAllTags(int GatheredFlag) { - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - if ((GatheredFlag == 1 && rank == 0) || GatheredFlag== 0) { - if (serializedFlag == 1) { - this->Deserialize(); - } - return desirialVecAllBlobTags; - } else { - _EXCEPTIONT("Only processor 0 should call GetUnserialVecAllTags()."); - } - } - - /// - /// Gather the size info of the original unexchanged vecAllBlobTags to processor 0 - /// On processor 0, the _vecAllBlobTags will be the gathered global vecAllBlobTags; - /// On other processors, the _vecAllBlobTags will be updated to the input vecAllBlobTags - /// - void GatherTagCounts(const std::vector< std::vector > & vecAllBlobTags) { - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - int curSize = 0; - for ( auto vecBlobTags : vecAllBlobTags) { - curSize += vecBlobTags.size(); - } - int localSize = vecAllBlobTags.size(); - vecScatterCounts.resize(size); - vecScatterCounts_index.resize(size); - - if (rank == 0) { - MPI_Gather(&curSize, 1, MPI_INT, vecScatterCounts.data(), 1, MPI_INT, 0, m_comm); - MPI_Gather(&localSize, 1, MPI_INT, vecScatterCounts_index.data(), 1, MPI_INT, 0, m_comm); - } else { - this->_vecAllBlobTags = vecAllBlobTags; - MPI_Gather(&curSize, 1, MPI_INT, NULL, 0, MPI_INT, 0,m_comm); - MPI_Gather(&localSize, 1, MPI_INT, NULL, 0, MPI_INT, 0,m_comm); - - } - } - - - /// - /// Scatter the vecAllBlobs to each processor - /// The displacement is calculated based on the vecGlobalTimes - /// On processor 0, the _vecAllBlobTags will be the reduced global vecAllBlobTags; - /// On other processors, the _vecAllBlobTags will be the original unreduced _vecAllBlobTags - /// - void Scatter(){ - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - - this->Serialize(); - - if (rank == 0) { - // For Tags - std::vector arrayScatterCounts(size); - std::vector arrayScatterDisplacements(size); - _ASSERT(vecScatterCounts.size() > 1); - arrayScatterCounts[0] = vecScatterCounts[0]; - arrayScatterDisplacements[0] = 0; - for (int i = 1; i < vecScatterCounts.size(); i++) { - arrayScatterCounts[i] = vecScatterCounts[i]; - arrayScatterDisplacements[i] = vecScatterCounts[i - 1] + arrayScatterDisplacements[i - 1]; - } - - // For Tags Index - std::vector arrayScatterCounts_index(size); - std::vector arrayScatterDisplacements_index(size); - _ASSERT(vecScatterCounts_index.size() > 1); - arrayScatterCounts_index[0] = vecScatterCounts_index[0]; - arrayScatterDisplacements_index[0] = 0; - for (int i = 1; i < vecScatterCounts.size(); i++) { - arrayScatterCounts_index[i] = vecScatterCounts_index[i]; - arrayScatterDisplacements_index[i] = vecScatterCounts_index[i - 1] + arrayScatterDisplacements_index[i - 1]; - } - - //-------------------Scatter--------------------------- - // For Tags - auto scatterBuffer = this->serialVecAllBlobTags; - this->serialVecAllBlobTags.clear(); - this->serialVecAllBlobTags.resize(arrayScatterCounts[rank]); - MPI_Scatterv(scatterBuffer.data(), arrayScatterCounts.data(), arrayScatterDisplacements.data(), MPI_Tag_type, - serialVecAllBlobTags.data(), arrayScatterCounts[rank], MPI_Tag_type, 0, m_comm); - - // For index - auto scatterBuffer_index = this->serialVecAllBlobTags_index; - this->serialVecAllBlobTags_index.clear(); - this->serialVecAllBlobTags_index.resize(arrayScatterCounts_index[rank]); - MPI_Scatterv(scatterBuffer_index.data(), arrayScatterCounts_index.data(), arrayScatterDisplacements_index.data(), - MPI_INT, serialVecAllBlobTags_index.data(), arrayScatterCounts_index[rank], MPI_INT, 0, m_comm); - - } else { - int localTagSize = serialVecAllBlobTags.size(); - this->serialVecAllBlobTags.clear(); - this->serialVecAllBlobTags.resize(localTagSize); - MPI_Scatterv(NULL, NULL, NULL, MPI_Tag_type, serialVecAllBlobTags.data(), localTagSize, MPI_Tag_type, 0, m_comm); - - int localTagSize_index = serialVecAllBlobTags_index.size(); - this->serialVecAllBlobTags_index.clear(); - this->serialVecAllBlobTags_index.resize(localTagSize_index); - MPI_Scatterv(NULL, NULL, NULL, MPI_INT, serialVecAllBlobTags_index.data(), localTagSize_index, MPI_INT, 0, m_comm); - } - - // Now modify the received serialVecAllBlobTags_index for deserailization call - int prevCount = serialVecAllBlobTags_index[0]; - serialVecAllBlobTags_index[0] = 0; - for (int i = 1; i < serialVecAllBlobTags_index.size() - 1; i++ ) { - serialVecAllBlobTags_index[i] = serialVecAllBlobTags_index[i] - prevCount; - } - if (rank == 0) { - serialVecAllBlobTags_index.push_back(serialVecAllBlobTags.size()); - - } else { - serialVecAllBlobTags_index[serialVecAllBlobTags_index.size() - 1] = serialVecAllBlobTags.size(); - } - } -}; - -/// -/// Enumerator of exchange directions. -/// -typedef enum { - DIR_LEFT = 0, - DIR_RIGHT = 1 -} CommDirection; - -/// -/// A Class used for exchanging vecAllBlobsTag between processors -/// The exchange process will first update the exchanged Tags.time to the actual global time -/// And then start the exchange process. -/// -class TagExchangeOP { - private: - - /// - /// Tag for MPI communication for blob Tag - /// - int tag = 100; - - /// - /// The MPI Datatype for Tag - /// - MPI_Datatype MPI_Tag_type; - - /// - /// The MPI Communicator - /// - MPI_Comm m_comm; - - /// - /// An array of MPI_Request. - /// - std::vector MPIrequests; - - /// - /// An array of MPI_Status. - /// - std::vector MPIstatuses; - - /// - /// The tool function that uses the prefix sum algorithmn to assign global time for each Tag - /// - void UpdateTime(){ - int err, rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - this->gloablTimeIndx.resize(size); - int localEndTime = _vecAllBlobTags.size(); - int preFixTime; - - MPI_Scan(&localEndTime, &preFixTime, 1, MPI_INT, MPI_SUM, m_comm); - this->gloablTimeIndx[rank].resize(2); - this->gloablTimeIndx[rank][0] = preFixTime - localEndTime; - this->gloablTimeIndx[rank][1] = preFixTime; - - //Update Tag.time - int globalTime = this->gloablTimeIndx[rank][0]; - for (int i = 0; i < _vecAllBlobTags.size(); i++) { - _ASSERT(globalTime < gloablTimeIndx[rank][1]); - - for (int j = 0; j < _vecAllBlobTags[i].size(); j++) { - _vecAllBlobTags[i][j].time = globalTime; - } - globalTime++; - - } - } - - protected: - /// - /// The initial vecAllBlobTags that need to be exchanged - /// - std::vector< std::vector> _vecAllBlobTags; - - /// - /// The vecAllBlobTags after the exchange. it's column number is _vecAllBlobTags' column number plus 2 (left and right) - /// except for p0 and pn-1 (for these two processors, the column number is _vecAllBlobTags' column number plus 1). - /// - std::vector< std::vector> exchangedvecAllBlobTags; - - /// - /// The buffer for vecAllBlobTags that will be sent - /// sendTags[0] is the left vector and sendTags[1] is the right vector - /// - std::vector> sendTags; - - /// - /// The buffer for vecAllBlobTags that will be received - /// recvTags[0] is the left vector and recvTags[1] is the right vector - /// - std::vector> recvTags; - - /// - /// The array (size is nMPISize) records the start(inclusive) and end (exclusive) global time index in each processor - /// start: gloablTimeIndx[p_i][0] - /// end: gloablTimeIndx[p_i][1] - /// - std::vector> gloablTimeIndx; - - public: - - /// - /// Construct the Operator with vecAllBlobTags - /// It will contruct the this->m_comm and this->_vecAllBlobTags based on the input communicator and vecAllBlobTags - /// And also construct the derived MPI_Datatype for Tag and commit it. - /// - TagExchangeOP(MPI_Comm communicator, - const std::vector< std::vector > & vecAllBlobTags){ - this->_vecAllBlobTags = vecAllBlobTags; - this->m_comm = communicator; - //Initialize the size for the sendTags: - sendTags.resize(2); - sendTags[0].resize(_vecAllBlobTags[0].size()); - sendTags[1].resize(_vecAllBlobTags[_vecAllBlobTags.size()-1].size()); - - //Initialize the size for the recvTags: - recvTags.resize(2); - - //Create an MPI datatype for the Tag: - struct Tag sampleTag; - int tagFieldsCount = 3; - MPI_Datatype Tag_typesig[3] = {MPI_INT,MPI_INT,MPI_INT}; - int Tag_block_lengths[3] = {1,1,1}; - MPI_Aint Tag_displacements[3]; - - MPI_Aint base_address; - MPI_Get_address(&sampleTag, &base_address); - MPI_Get_address(&sampleTag.id, &Tag_displacements[0]); - MPI_Get_address(&sampleTag.time, &Tag_displacements[1]); - MPI_Get_address(&sampleTag.global_id, &Tag_displacements[2]); - Tag_displacements[0] = MPI_Aint_diff(Tag_displacements[0], base_address); - Tag_displacements[1] = MPI_Aint_diff(Tag_displacements[1], base_address); - Tag_displacements[2] = MPI_Aint_diff(Tag_displacements[2], base_address); - MPI_Type_create_struct(tagFieldsCount, Tag_block_lengths, Tag_displacements, Tag_typesig, &MPI_Tag_type); - - int result = MPI_Type_commit(&MPI_Tag_type); - if (result != MPI_SUCCESS) { - _EXCEPTION1("The MPI routine MPI_Type_commit(&MPI_Tag_type) failed (code %i)", result); - } - } - - - /// - /// Destructor - /// - ~TagExchangeOP(){ - MPI_Type_free(&MPI_Tag_type); - MPIrequests.clear(); - MPIstatuses.clear(); - } - - /// - /// Return the original unexchanged vecAllBlobTags - /// - std::vector< std::vector > GetOriginalVecAllBlobTags(){ - return _vecAllBlobTags; - } - - /// - /// Start the exchange process. - /// this function is non-blocking and the data values in the TagExchangeOP should not be modified - /// The exchange values are not guaranteed to be current when this function returns and need to be used with the EndExchange() - /// - void StartExchange() { - - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - - //First update all processors' Tag.time to the actual global time - this->UpdateTime(); - - //----------------------Send sendTags data first---------------------- - // Pack data into the send buffer - sendTags[0] = _vecAllBlobTags[0]; - sendTags[1] = _vecAllBlobTags[_vecAllBlobTags.size()-1]; - - // Send data - for (auto dir: {DIR_LEFT, DIR_RIGHT}) { - int destRank;//Destination Rank - if (dir == DIR_LEFT) { - // Sending Data to the left - if (rank == 0) { - //Rank 0 Do Nothing - destRank = -1; - continue; - } else { - destRank = rank - 1; - } - - } else { - // Sending Data to the right - if (rank == size - 1) { - //Rank n-1 Do Nothing - destRank = -1; - continue; - } else { - destRank = rank + 1; - } - } - if (destRank > size - 1) { - continue; - } - - //----------------------Send sendBlobs---------------------- - - // Only the odd number processors will send out the data - if (rank % 2 != 0) { - - MPI_Request request; - int result = MPI_Isend(sendTags[dir].data(), sendTags[dir].size(), MPI_Tag_type, - destRank, tag, m_comm, &request); - if (result != MPI_SUCCESS) { - _EXCEPTION1("The MPI routine MPI_Isend failed (code %i)", result); - } - } - } - - //----------------------Then Receive data---------------------- - for (auto dir : {DIR_LEFT, DIR_RIGHT}) { - int sourceRank = -1; - if (dir == DIR_LEFT) { - // Receive Data From the left. - // Rank 0 will not receive from the left - if (rank == 0) { - continue; - } else { - sourceRank = rank - 1; - } - - } else { - // Receive Data From the right. - // rank n-1 will not receive from the right - if (rank == size - 1) { - continue; - } else { - sourceRank = rank + 1; - } - - } - if (sourceRank > size - 1) { - continue; - } - - - - //----------------------Receive---------------------- - // Only the prime number processors will receive data - if (rank % 2 == 0) { - MPI_Status status; - MPI_Request request; - int recvCount; - - // Use a non-blocking probe to know the incoming data size - int flag = 0; - while(!flag) - { - MPI_Iprobe( sourceRank, tag, m_comm, &flag, &status ); - } - MPI_Get_count( &status, MPI_Tag_type, &recvCount ); - recvTags[dir].resize(recvCount); - - int result = - MPI_Irecv(recvTags[dir].data(), recvTags[dir].size(), MPI_Tag_type, - sourceRank, tag, m_comm, &request); - if (result != MPI_SUCCESS) { - _EXCEPTION1("The MPI routine MPI_Isend failed (code %i)", result); - } - MPIrequests.emplace_back(std::move(request)); - MPIstatuses.push_back(MPI_Status()); - } - } - } - - /// - /// End the exchange process. - // this function is blocking until: - // - it is safe to modify the values in the TagExchangeOP without - // affecting the exchange values for other processes - // - the exchange values can be read: they contain to up-to-date values - // from other processes - /// - void EndExchange() { - // Wait for all Irecv to complete - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - - int result = MPI_Waitall( MPIrequests.size(), MPIrequests.data(), MPIstatuses.data()); - if (result != MPI_SUCCESS) { - _EXCEPTION1("The MPI routine MPI_Waitall failed (code %i)", result); - } - - MPIrequests.clear(); - MPIstatuses.clear(); - - // Pack the data into the vecAllBlobTags - // Only need to pack data for the prime number processors - if (rank % 2 == 0) { - if (rank == 0) { - exchangedvecAllBlobTags.resize(_vecAllBlobTags.size() + 1); - for (int i = 0; i < exchangedvecAllBlobTags.size() - 1; i++) { - exchangedvecAllBlobTags[i] = _vecAllBlobTags[i]; - } - exchangedvecAllBlobTags[exchangedvecAllBlobTags.size() - 1] = recvTags[1]; - - } else if (rank == size - 1) { - exchangedvecAllBlobTags.resize(_vecAllBlobTags.size() + 1); - exchangedvecAllBlobTags[0] = recvTags[0]; - for (int i = 1; i < exchangedvecAllBlobTags.size(); i++) { - exchangedvecAllBlobTags[i] = _vecAllBlobTags[i -1 ]; - } - - } else { - - exchangedvecAllBlobTags.resize(_vecAllBlobTags.size() + 2); - exchangedvecAllBlobTags[0] = recvTags[0]; - for (int i = 1; i < exchangedvecAllBlobTags.size() - 1; i++) { - exchangedvecAllBlobTags[i] = _vecAllBlobTags[i-1]; - } - exchangedvecAllBlobTags[exchangedvecAllBlobTags.size() - 1] = recvTags[1]; - - } - - } else { - exchangedvecAllBlobTags = _vecAllBlobTags; - } - } - - /// - /// Return the exchanged vecAllBlobTags - /// - std::vector< std::vector> GetExchangedVecAllBlobTags(){ - return this->exchangedvecAllBlobTags; - } -}; - - -/// -/// Class for exchanging vecAllBlobBoxesDeg among processors -/// -class BlobBoxesDegExchangeOP { - private: - /// - /// Tag for MPI communication for LatlonBox - /// - int tag = 103; - - // /// - // /// The MPI Datatype for LatlonBox - // /// - // MPI_Datatype MPI_LatonBox_double_type; - - // /// - // /// The MPI Datatype for double[2] - // /// - // MPI_Datatype MPI_doubleArray; - - /// - /// The MPI Communicator - /// - MPI_Comm m_comm; - - /// - /// An array of MPI_Request. - /// - std::vector MPIrequests; - - /// - /// An array of MPI_Status. - /// - std::vector MPIstatuses; - - protected: - /// - /// The initial vecAllBlobBoxesDeg that will be sent - /// - std::vector< std::vector< LatLonBox > > _vecAllBlobBoxesDeg; - - /// - /// The vecAllBlobBoxesDeg that is after the exchange. it's column number is _vecAllBlobBoxesDegs' column number plus 2 (left and right) - /// except for p0 and pn-1 (for these two processors, the column number is _vecAllBBlobBoxesDegs' column number plus 1). - /// - std::vector< std::vector< LatLonBox > > exchangedvecAllBlobBoxesDeg; - - /// - /// The buffer for vecAllBlobBoxesDeg that will be sent - /// sendBlobBoxesDeg[0] is the left vector and sendBlobBoxesDeg[1] is the right vector - /// - std::vector< std::vector< LatLonBox > > sendBlobBoxesDeg; - - /// - /// The buffer for vecAllBlobBoxesDeg that will be received - /// sendBlobBoxesDeg[0] is the left vector and sendBlobBoxesDeg[1] is the right vector - /// - std::vector< std::vector< LatLonBox > > recvBlobBoxesDeg; - - public: - - /// - /// Construct the Operator with vecAllBlobBoxesDeg - /// It will contruct the this->m_comm and this->_vecAllBlobBoxesDeg based on the input communicator and vecAllBlobBoxesDeg - /// - BlobBoxesDegExchangeOP(MPI_Comm communicator, - const std::vector< std::vector< LatLonBox > > & vecAllBlobBoxesDeg){ - this->_vecAllBlobBoxesDeg = vecAllBlobBoxesDeg; - this->m_comm = communicator; - - //########################### Notes for derived MPI datatype of the LatLonBox(Hongyu Chen) ############################################################################ - //1. "Because vector holds bits instead of bools, it can't return a bool& from its indexing operator or iterator dereference" (src: https://isocpp.org/blog/2012/11/on-vectorbool) - // Therefore, creating the userdefined datatype for LatLonBox and then use vector.data() to send/recv like TagExhangeOP is not working here - //2. Now we're using the MPI_BYTE to manually calculate the send/recv buffer size in byte here, which is working currently - //3. If the program breaks again here, please consider going to the BlobUtilities.h and modify the constructer at line 232 and line 236 according to the description there. - - // //Create an MPI datatype for the LatLonBox: - // //First create the datatype for double[2] - // MPI_Type_contiguous (2, MPI_DOUBLE,&MPI_doubleArray); - // MPI_Type_commit (&MPI_doubleArray); - // //Then use this doubleArray to construct LatlonBox - // LatLonBox sampleBox; - // int LatlonBoxFieldCount = 5; - // MPI_Datatype LatlonBox_typesig[5] = {MPI_CXX_BOOL, MPI_CXX_BOOL, MPI_DOUBLE, MPI_doubleArray, MPI_doubleArray}; - // int LatlonBox_block_lengths[5] = {1,1,1,1,1}; - // MPI_Aint LatlongBox_displacements[5]; - // MPI_Aint base_address; - // MPI_Get_address(&sampleBox.is_null, &LatlongBox_displacements[0]); - // MPI_Get_address(&sampleBox.lon_periodic, &LatlongBox_displacements[1]); - // MPI_Get_address(&sampleBox.lon_width, &LatlongBox_displacements[2]); - // MPI_Get_address(&sampleBox.lon, &LatlongBox_displacements[3]); - // MPI_Get_address(&sampleBox.lat, &LatlongBox_displacements[4]); - // LatlongBox_displacements[0] = MPI_Aint_diff(LatlongBox_displacements[0], base_address); - // LatlongBox_displacements[1] = MPI_Aint_diff(LatlongBox_displacements[1], base_address); - // LatlongBox_displacements[2] = MPI_Aint_diff(LatlongBox_displacements[2], base_address); - // LatlongBox_displacements[3] = MPI_Aint_diff(LatlongBox_displacements[3], base_address); - // LatlongBox_displacements[4] = MPI_Aint_diff(LatlongBox_displacements[4], base_address); - // MPI_Type_create_struct(LatlonBoxFieldCount, LatlonBox_block_lengths, LatlongBox_displacements, LatlonBox_typesig, &MPI_LatonBox_double_type); - // MPI_Type_commit(&MPI_LatonBox_double_type); - - //########################### End Notes for derived MPI datatype of the LatLonBox(Hongyu Chen) ############################################################################ - - sendBlobBoxesDeg.resize(2); - recvBlobBoxesDeg.resize(2); - } - - - /// - /// Destructor. - /// - ~BlobBoxesDegExchangeOP(){ - // MPI_Type_free(&MPI_LatonBox_double_type); - // MPI_Type_free(&MPI_doubleArray); - MPIrequests.clear(); - MPIstatuses.clear(); - } - - /// - /// Start the exchange process. - /// this function is non-blocking and the data values in the BlobBoxesDegExchangeOP should not be modified - /// The exchange values are not guaranteed to be current when this function returns and need to be used with the EndExchange() - /// - void StartExchange() { - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - - //----------------------Send data first---------------------- - // Pack data into the send buffer - sendBlobBoxesDeg[0] = _vecAllBlobBoxesDeg[0]; - sendBlobBoxesDeg[1] = _vecAllBlobBoxesDeg[_vecAllBlobBoxesDeg.size()-1]; - - // Send data - for (auto dir: {DIR_LEFT, DIR_RIGHT}) { - int destRank;//Destination Rank - if (dir == DIR_LEFT) { - // Sending Data to the left - if (rank == 0) { - // Rank 0 Do Nothing - continue; - } else { - destRank = rank - 1; - } - - } else { - // Sending Data to the right - if (rank == size - 1) { - //Rank n-1 Do Nothing - continue; - } else { - destRank = rank + 1; - } - } - - if (destRank > size - 1) { - continue; - } - - //----------------------Send sendBlobs---------------------- - // Only the odd number processors will send out data - if (rank % 2 != 0) { - MPI_Request request; - MPI_Isend(sendBlobBoxesDeg[dir].data(), sendBlobBoxesDeg[dir].size() * sizeof(LatLonBox), MPI_BYTE, - destRank,tag , m_comm, &request); - } - } - - //----------------------Then Receive data---------------------- - for (auto dir : {DIR_LEFT, DIR_RIGHT}) { - int sourceRank; - if (dir == DIR_LEFT) { - // Receive Data From the left. - if (rank == 0) {// Rank 0 will not receive from the left - continue; - } else { - sourceRank = rank - 1; - } - - } else { - // Receive Data From the right. - if (rank == size - 1) {// rank n-1 will not receive from the right - continue; - - } else { - sourceRank = rank + 1; - } - - } - - if (sourceRank > size - 1) { - continue; - } - - // Only the even number processors will receive data - if (rank % 2 == 0) { - MPI_Status status; - MPI_Request request; - int recvCount; - // Use a non-blocking probe to know the incoming data size - int flag = 0; - while(!flag) - { - MPI_Iprobe( sourceRank, tag, m_comm, &flag, &status ); - } - MPI_Get_count( &status, MPI_BYTE, &recvCount ); - _ASSERT(recvCount % sizeof(LatLonBox) == 0); - recvBlobBoxesDeg[dir].resize(recvCount / sizeof(LatLonBox)); - MPI_Irecv(recvBlobBoxesDeg[dir].data(), recvCount,MPI_BYTE, - sourceRank, tag, m_comm, &request); - MPIrequests.emplace_back(std::move(request)); - MPIstatuses.push_back(MPI_Status()); - } - } - } - - /// - /// End the exchange process. - // this function is blocking until: - // - it is safe to modify the values in the BlobBoxesDegExchangeOP data without - // affecting the exchange values for other processes - // - the exchange values can be read: they contain to up-to-date values - // from other processes - /// - void EndExchange() { - // Wait for all Irecv to complete - - int result = MPI_Waitall( MPIrequests.size(), MPIrequests.data(), MPIstatuses.data()); - if (result != MPI_SUCCESS) { - _EXCEPTION1("The MPI routine MPI_Waitall failed (code %i)", result); - } - - MPIrequests.clear(); - MPIstatuses.clear(); - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - - // Pack the data into the vecAllBlobBoxesDeg - // The std::move() here is to avoid the possible errors in LatLonBox default copy constructor - // Only the prime number processors need to pack the data - if (rank % 2 == 0) { - if (rank == 0) { - for (int i = 0; i < _vecAllBlobBoxesDeg.size();i++) { - exchangedvecAllBlobBoxesDeg.emplace_back(std::move(_vecAllBlobBoxesDeg[i])); - } - exchangedvecAllBlobBoxesDeg.emplace_back(std::move(recvBlobBoxesDeg[1])); - - } else if (rank == size - 1) { - exchangedvecAllBlobBoxesDeg.emplace_back(std::move(recvBlobBoxesDeg[0])); - for (int i = 0; i < _vecAllBlobBoxesDeg.size(); i++) { - exchangedvecAllBlobBoxesDeg.emplace_back(std::move(_vecAllBlobBoxesDeg[i])); - } - - } else { - exchangedvecAllBlobBoxesDeg.emplace_back(std::move(recvBlobBoxesDeg[0])); - for (int i = 0; i < _vecAllBlobBoxesDeg.size(); i++) { - exchangedvecAllBlobBoxesDeg.emplace_back(std::move(_vecAllBlobBoxesDeg[i])); - } - exchangedvecAllBlobBoxesDeg.emplace_back(std::move(recvBlobBoxesDeg[1])); - } - - } else { - exchangedvecAllBlobBoxesDeg = _vecAllBlobBoxesDeg; - } - - - } - - /// - /// Return the exchanged vecAllBlobTags - /// - std::vector< std::vector< LatLonBox > > GetExchangedVecAllBlobBoxesDeg(){ - return this->exchangedvecAllBlobBoxesDeg; - } -}; - -#endif /////////////////////////////////////////////////////////////////////////////// -// Set of indicator locations stored as grid indices -typedef std::set IndicatorSet; -typedef IndicatorSet::iterator IndicatorSetIterator; -typedef IndicatorSet::const_iterator IndicatorSetConstIterator; - - -#if defined(TEMPEST_MPIOMP) - -/// -/// A Class used for exchanging vecAllBlobs between processors -/// -class BlobsExchangeOp { - private: - - /// - /// Tag for MPI communication for sending/receiving blobs - /// - int blob_tag = 101; - - /// - /// Tag for MPI communication for sending/receiving serialized index - /// - int indx_tag = 102; - - - /// - /// The MPI Communicator - /// - MPI_Comm m_comm; - - /// - /// An array of MPI_Request. - /// - std::vector MPIrequests; - - /// - /// An array of MPI_Status. - /// - std::vector MPIstatuses; - - - /// - /// Serialize the vector and generate the sendBlobsIndx array - /// - void Serialize(){ - - sendBlobs.clear();sendBlobsIndx.clear(); - - sendBlobs.resize(2);sendBlobsIndx.resize(2); - - for (auto dir : {DIR_LEFT, DIR_RIGHT}) { - int curIndx = 0;//Point to the next empty slot for inserting a new set - std::vector sendVecBlobs = (dir == DIR_LEFT)? _vecAllBlobs[0]:_vecAllBlobs[_vecAllBlobs.size()-1];//the vector of set that needs to be serialized - sendBlobsIndx[dir].push_back(curIndx); - - for (int i = 0; i < sendVecBlobs.size(); i++) { - IndicatorSet curSet = sendVecBlobs[i]; - for (auto it = curSet.begin(); it != curSet.end(); it++) { - sendBlobs[dir].push_back(*it); - curIndx++; - } - sendBlobsIndx[dir].push_back(curIndx);//Now it records the starting position of the next IndicatorSet - } - } - } - - /// - /// Deserialize the received vectorrecvBlobs into vector and clear the recvBlobsIndx array - /// - void Deserialize(){ - recvBlobsUnserial.clear(); - recvBlobsUnserial.resize(2); - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - - - for (auto dir : {DIR_LEFT, DIR_RIGHT}) { - //Rank 0 will not receive from its left, hence no deserialization. - if (rank == 0 && dir == DIR_LEFT) { - continue; - } - - //the last processor will now receive from its right, hence no deserialization - if ((rank == size - 1) && dir == DIR_RIGHT) { - continue; - } - for (int i = 0; i < recvBlobsIndx[dir].size()-1; i++){ - IndicatorSet curSet; - int startIndx = recvBlobsIndx[dir][i]; - int endIndx = std::min(recvBlobsIndx[dir][i+1],int(recvBlobs[dir].size())); - - //Deserialize each set - for (int i = startIndx; i < endIndx; i++){ - curSet.insert(recvBlobs[dir][i]); - } - - //push each set into the vector - recvBlobsUnserial[dir].push_back(curSet); - } - } - recvBlobsIndx.clear(); - sendBlobsIndx.clear(); - } - protected: - - /// - /// The initial local vecAllBlobs before the exchange - /// - std::vector> _vecAllBlobs; - - /// - /// The vecAllBlobs that is after the exchange. it's column number is _vecAllAllBlobs' column number plus 2 (left and right) - /// except for p0 and pn-1 (for these two processors, the column number is _vecAllAllBlobs' column number plus 1). - /// - std::vector> exchangedVecAllBlobs; - - /// - /// The buffer for vecBlobs that is serialized and will be sent - /// sendBlobs[0] is the left vector and sendBlobs[1] is the right vector - /// - std::vector> sendBlobs; - - - /// - /// The Array recording the starting index for each set that is serialized - /// sendBlobsIndx[0] is for the left vector and sendBlobsIndx[1] is for the right vector - /// - std::vector> sendBlobsIndx; - - /// - /// The buffer for the received serailized blobs array and need to be unserialized - /// recvBlobs[0] is for the left vector and recvBlobs[1] is for the right vector - /// - std::vector> recvBlobs; - - /// - /// The Buffer recording the starting index for each set that is in the recvBlobs - /// recvBlobsIndx[0] is for the left vector and recvBlobsIndx[1] is for the right vector - /// - std::vector> recvBlobsIndx; - - /// - /// The array recording the unserialize recvBlobs - /// recvBlobsUnserial[0] is for the left vector and recvBlobsUnserial[1] is for the right vector - /// - std::vector> recvBlobsUnserial; - - - public: - /// - /// Construct the Operator with BlobsExchangeOp - /// It will contruct the this->m_comm and this->_vecAllBlobs based on the input communicator and vecAllBlobs - /// - BlobsExchangeOp(MPI_Comm communicator, - const std::vector< std::vector > & vecAllBlobs){ - this->_vecAllBlobs = vecAllBlobs; - this->m_comm = communicator; - } - - - /// - /// Destructor for BlobsExchangeOp - /// - ~BlobsExchangeOp(){ - MPIrequests.clear(); - MPIstatuses.clear(); - - } - - - /// - /// Start the exchange process. - /// this function is non-blocking and the data values in the BlobsExchangeOp should not be modified - /// The exchange values are not guaranteed to be current when this function returns and need to be used with the EndExchange() - /// - void StartExchange() { - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - recvBlobs.resize(2); - recvBlobsIndx.resize(2); - - //Merge the received vecBlobs into the new vecAllBlobs - if (rank == 0 || rank == size - 1) { - exchangedVecAllBlobs.resize(_vecAllBlobs.size() + 1); - - }else{ - exchangedVecAllBlobs.resize(_vecAllBlobs.size() + 2); - } - - - //----------------------Send sendBlobs/sendBlobsIndx data---------------------- - for (auto dir: {DIR_LEFT, DIR_RIGHT}) { - int destRank;//Destination Rank - if (dir == DIR_LEFT) { - // Sending Data to the left - if (rank == 0) { - //Rank 0 Do Nothing - continue; - } else { - destRank = rank - 1; - } - - } else { - // Sending Data to the right - if (rank == size - 1) { - // Rank n-1 Do Nothing - continue; - } else { - destRank = rank + 1; - } - } - if (destRank > size - 1) { - continue; - - } - - // only the odd number processors will send out data - if (rank % 2 != 0) { - // First Serialize the Sending Buffer. - this->Serialize(); - - - //----------------------Send sendBlobs---------------------- - MPI_Request request; - MPI_Isend(sendBlobs[dir].data(), sendBlobs[dir].size(), MPI_INT, - destRank, blob_tag, m_comm, &request); - - //----------------------Send sendBlobsIndx---------------------- - MPI_Request indx_Request; - MPI_Isend(sendBlobsIndx[dir].data(), sendBlobsIndx[dir].size(), MPI_INT, - destRank, indx_tag, m_comm, &indx_Request); - - } - - - - } - - //----------------------Receive sendBlobs/recvBlobsIndx Data---------------------- - for (auto dir : {DIR_LEFT, DIR_RIGHT}) { - int sourceRank; - - - if (dir == DIR_LEFT) { - // Receive Data From the left. - if (rank == 0) {// Rank 0 will not receive from the left - continue; - } else { - sourceRank = rank - 1; - } - - } else { - // Receive Data From the right. - if (rank == size - 1) {// rank n-1 will not receive from the right - continue; - - } else { - sourceRank = rank + 1; - } - - } - if (sourceRank > size - 1) { - continue; - - } - - //----------------------Receive the serialized Blobs---------------------- - - //only the prime number processor will receive the blobs - if (rank % 2 == 0) { - MPI_Status status; - MPI_Request request; - int recvCount; - - // Use a non-blocking probe to know the incoming data size - int flag = 0; - while(!flag) - { - MPI_Iprobe( sourceRank, blob_tag, m_comm, &flag, &status ); - } - MPI_Get_count( &status, MPI_INT, &recvCount ); - recvBlobs[dir].resize(recvCount); - MPI_Irecv(recvBlobs[dir].data(), recvBlobs[dir].size(), MPI_INT, - sourceRank, blob_tag, m_comm, &request); - MPIrequests.emplace_back(std::move(request)); - MPIstatuses.push_back(MPI_Status()); - - //----------------------Receive the index info for the Blobs---------------------- - MPI_Status indxStatus; - MPI_Request indxRequest; - int indxRecvCount; - - // Use a non-blocking probe to know the incoming data size - int indxFlag = 0; - while(!indxFlag) - { - MPI_Iprobe( sourceRank, indx_tag, m_comm, &indxFlag, &indxStatus); - } - MPI_Get_count( &indxStatus, MPI_INT, &indxRecvCount); - recvBlobsIndx[dir].resize(indxRecvCount); - MPI_Irecv(recvBlobsIndx[dir].data(), recvBlobsIndx[dir].size(), MPI_INT, - sourceRank, indx_tag, m_comm, &indxRequest); - MPIrequests.emplace_back(std::move(indxRequest)); - MPIstatuses.push_back(MPI_Status()); - - } - - } - - - } - - /// - /// End the exchange process. - // this function is blocking until: - // - it is safe to modify the values in the BlobsExchangeOp data without - // affecting the exchange values for other processes - // - the exchange values can be read: they contain to up-to-date values - // from other processes - /// - void EndExchange() { - - // Wait for all Irecv to complete - int result = MPI_Waitall( MPIrequests.size(), MPIrequests.data(), MPIstatuses.data()); - if (result != MPI_SUCCESS) { - _EXCEPTION1("The MPI routine MPI_Waitall failed (code %i)", result); - } - - MPIrequests.clear(); - MPIstatuses.clear(); - int rank, size; - MPI_Comm_size(m_comm, &size); - MPI_Comm_rank(m_comm, &rank); - - // Only the even number processors need deserialize - if (rank % 2 == 0) { - this->Deserialize(); - - if (rank == 0) { - for (int i = 0; i < exchangedVecAllBlobs.size() - 1; i++) { - exchangedVecAllBlobs[i] = _vecAllBlobs[i]; - } - exchangedVecAllBlobs[exchangedVecAllBlobs.size() - 1] = recvBlobsUnserial[1]; - - } else if (rank == size - 1) { - exchangedVecAllBlobs[0] = recvBlobsUnserial[0]; - for (int i = 1; i < exchangedVecAllBlobs.size(); i++) { - exchangedVecAllBlobs[i] = _vecAllBlobs[i-1]; - } - } else { - exchangedVecAllBlobs[0] = recvBlobsUnserial[0]; - for (int i = 1; i < exchangedVecAllBlobs.size() - 1; i++) { - exchangedVecAllBlobs[i] = _vecAllBlobs[i-1]; - } - exchangedVecAllBlobs[exchangedVecAllBlobs.size() - 1] = recvBlobsUnserial[1]; - } - } else { - // For odd number processors, nothing is modified. - exchangedVecAllBlobs = _vecAllBlobs; - } - } - - /// - /// Return the exchanged VecAllBlobs - /// - std::vector> GetExchangedVecAllBlobs(){ - return this->exchangedVecAllBlobs; - } - - /// - /// Return the unexchanged VecAllBlobs - /// - std::vector> GetOriginalVecAllBlobs(){ - return this->_vecAllBlobs; - } - -}; - -/// -/// A Class used for exchanging vecGlobalTimes between processors -/// This class will update the local global time accordingly after the exchange process. -/// -class GlobalTimesExchangeOp { - private: - /// - /// Tag for MPI communication for vecGlobalTimes - /// - int tag = 104; - - /// - /// The MPI Communicator - /// - MPI_Comm m_comm; - - /// - /// An array of MPI_Request. - /// - std::vector MPIrequests; - - /// - /// An array of MPI_Status. - /// - std::vector MPIstatuses; - - protected: - /// - /// The initial vecGlobalTimes before exchange - /// - std::vector< std::vector