dune-common 2.9.0
Loading...
Searching...
No Matches
communicator.hh
Go to the documentation of this file.
1// -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2// vi: set et ts=4 sw=2 sts=2:
3// SPDX-FileCopyrightInfo: Copyright (C) DUNE Project contributors, see file LICENSE.md in module root
4// SPDX-License-Identifier: LicenseRef-GPL-2.0-only-with-DUNE-exception
5#ifndef DUNE_COMMUNICATOR
6#define DUNE_COMMUNICATOR
7
8#if HAVE_MPI
9
10#include <cassert>
11#include <cstddef>
12#include <iostream>
13#include <map>
14#include <type_traits>
15#include <utility>
16
17#include <mpi.h>
18
23
24namespace Dune
25{
109 struct SizeOne
110 {};
111
118 {};
119
120
126 template<class V>
128 {
140 typedef V Type;
141
147 typedef typename V::value_type IndexedType;
148
154
163 static const void* getAddress(const V& v, int index);
164
170 static int getSize(const V&, int index);
171 };
172
173 template<class K, int n> class FieldVector;
174
175 template<class B, class A> class VariableBlockVector;
176
177 template<class K, class A, int n>
179 {
181
182 typedef typename Type::B IndexedType;
183
185
186 static const void* getAddress(const Type& v, int i);
187
188 static int getSize(const Type& v, int i);
189 };
190
195 {};
196
200 template<class T>
202 {
204
205 static const IndexedType& gather(const T& vec, std::size_t i);
206
207 static void scatter(T& vec, const IndexedType& v, std::size_t i);
208
209 };
210
222 template<typename T>
223 class DatatypeCommunicator : public InterfaceBuilder
224 {
225 public:
226
230 typedef T ParallelIndexSet;
231
236
240 typedef typename RemoteIndices::GlobalIndex GlobalIndex;
241
245 typedef typename RemoteIndices::Attribute Attribute;
246
250 typedef typename RemoteIndices::LocalIndex LocalIndex;
251
255 DatatypeCommunicator();
256
260 ~DatatypeCommunicator();
261
288 template<class T1, class T2, class V>
289 void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
290
294 void forward();
295
299 void backward();
300
304 void free();
305 private:
309 constexpr static int commTag_ = 234;
310
314 const RemoteIndices* remoteIndices_;
315
316 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
317 MessageTypeMap;
318
322 MessageTypeMap messageTypes;
323
327 void* data_;
328
329 MPI_Request* requests_[2];
330
334 bool created_;
335
339 template<class V, bool FORWARD>
340 void createRequests(V& sendData, V& receiveData);
341
345 template<class T1, class T2, class V, bool send>
346 void createDataTypes(const T1& source, const T2& destination, V& data);
347
351 void sendRecv(MPI_Request* req);
352
356 struct IndexedTypeInformation
357 {
363 void build(int i)
364 {
365 length = new int[i];
366 displ = new MPI_Aint[i];
367 size = i;
368 }
369
373 void free()
374 {
375 delete[] length;
376 delete[] displ;
377 }
379 int* length;
381 MPI_Aint* displ;
387 int elements;
391 int size;
392 };
393
399 template<class V>
400 struct MPIDatatypeInformation
401 {
406 MPIDatatypeInformation(const V& data) : data_(data)
407 {}
408
414 void reserve(int proc, int size)
415 {
416 information_[proc].build(size);
417 }
424 void add(int proc, int local)
425 {
426 IndexedTypeInformation& info=information_[proc];
427 assert((info.elements)<info.size);
428 MPI_Get_address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
429 info.displ+info.elements);
430 info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
431 info.elements++;
432 }
433
438 std::map<int,IndexedTypeInformation> information_;
442 const V& data_;
443
444 };
445
446 };
447
458 {
459
460 public:
465
472 template<class Data, class Interface>
473 typename std::enable_if<std::is_same<SizeOne,typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
474 build(const Interface& interface);
475
483 template<class Data, class Interface>
484 void build(const Data& source, const Data& target, const Interface& interface);
485
514 template<class GatherScatter, class Data>
515 void forward(const Data& source, Data& dest);
516
545 template<class GatherScatter, class Data>
546 void backward(Data& source, const Data& dest);
547
573 template<class GatherScatter, class Data>
574 void forward(Data& data);
575
601 template<class GatherScatter, class Data>
602 void backward(Data& data);
603
607 void free();
608
613
614 private:
615
619 typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
620 InterfaceMap;
621
622
626 template<class Data, typename IndexedTypeFlag>
627 struct MessageSizeCalculator
628 {};
629
634 template<class Data>
635 struct MessageSizeCalculator<Data,SizeOne>
636 {
643 inline int operator()(const InterfaceInformation& info) const;
652 inline int operator()(const Data& data, const InterfaceInformation& info) const;
653 };
654
659 template<class Data>
660 struct MessageSizeCalculator<Data,VariableSize>
661 {
670 inline int operator()(const Data& data, const InterfaceInformation& info) const;
671 };
672
676 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
677 struct MessageGatherer
678 {};
679
684 template<class Data, class GatherScatter, bool send>
685 struct MessageGatherer<Data,GatherScatter,send,SizeOne>
686 {
688 typedef typename CommPolicy<Data>::IndexedType Type;
689
694 typedef GatherScatter Gatherer;
695
701 constexpr static bool forward = send;
702
710 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
711 };
712
717 template<class Data, class GatherScatter, bool send>
718 struct MessageGatherer<Data,GatherScatter,send,VariableSize>
719 {
721 typedef typename CommPolicy<Data>::IndexedType Type;
722
727 typedef GatherScatter Gatherer;
728
734 constexpr static bool forward = send;
735
743 inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
744 };
745
749 template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
750 struct MessageScatterer
751 {};
752
757 template<class Data, class GatherScatter, bool send>
758 struct MessageScatterer<Data,GatherScatter,send,SizeOne>
759 {
761 typedef typename CommPolicy<Data>::IndexedType Type;
762
767 typedef GatherScatter Scatterer;
768
774 constexpr static bool forward = send;
775
783 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
784 };
789 template<class Data, class GatherScatter, bool send>
790 struct MessageScatterer<Data,GatherScatter,send,VariableSize>
791 {
793 typedef typename CommPolicy<Data>::IndexedType Type;
794
799 typedef GatherScatter Scatterer;
800
806 constexpr static bool forward = send;
807
815 inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
816 };
817
821 struct MessageInformation
822 {
824 MessageInformation()
825 : start_(0), size_(0)
826 {}
827
835 MessageInformation(size_t start, size_t size)
836 : start_(start), size_(size)
837 {}
841 size_t start_;
845 size_t size_;
846 };
847
854 typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
855 InformationMap;
859 InformationMap messageInformation_;
863 char* buffers_[2];
867 size_t bufferSize_[2];
868
872 constexpr static int commTag_ = 0;
873
877 std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
878
879 MPI_Comm communicator_;
880
884 template<class GatherScatter, bool FORWARD, class Data>
885 void sendRecv(const Data& source, Data& target);
886
887 };
888
889#ifndef DOXYGEN
890
891 template<class V>
892 inline const void* CommPolicy<V>::getAddress(const V& v, int index)
893 {
894 return &(v[index]);
895 }
896
897 template<class V>
898 inline int CommPolicy<V>::getSize([[maybe_unused]] const V& v, [[maybe_unused]] int index)
899 {
900 return 1;
901 }
902
903 template<class K, class A, int n>
904 inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
905 {
906 return &(v[index][0]);
907 }
908
909 template<class K, class A, int n>
910 inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
911 {
912 return v[index].getsize();
913 }
914
915 template<class T>
916 inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
917 {
918 return vec[i];
919 }
920
921 template<class T>
922 inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
923 {
924 vec[i]=v;
925 }
926
927 template<typename T>
928 DatatypeCommunicator<T>::DatatypeCommunicator()
929 : remoteIndices_(0), created_(false)
930 {
931 requests_[0]=0;
932 requests_[1]=0;
933 }
934
935
936
937 template<typename T>
938 DatatypeCommunicator<T>::~DatatypeCommunicator()
939 {
940 free();
941 }
942
943 template<typename T>
944 template<class T1, class T2, class V>
945 inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
946 const T1& source, V& sendData,
947 const T2& destination, V& receiveData)
948 {
949 remoteIndices_ = &remoteIndices;
950 free();
951 createDataTypes<T1,T2,V,false>(source,destination, receiveData);
952 createDataTypes<T1,T2,V,true>(source,destination, sendData);
953 createRequests<V,true>(sendData, receiveData);
954 createRequests<V,false>(receiveData, sendData);
955 created_=true;
956 }
957
958 template<typename T>
959 void DatatypeCommunicator<T>::free()
960 {
961 if(created_) {
962 delete[] requests_[0];
963 delete[] requests_[1];
964 typedef MessageTypeMap::iterator iterator;
965 typedef MessageTypeMap::const_iterator const_iterator;
966
967 const const_iterator end=messageTypes.end();
968
969 for(iterator process = messageTypes.begin(); process != end; ++process) {
970 MPI_Datatype *type = &(process->second.first);
971 int finalized=0;
972 MPI_Finalized(&finalized);
973 if(*type!=MPI_DATATYPE_NULL && !finalized)
974 MPI_Type_free(type);
975 type = &(process->second.second);
976 if(*type!=MPI_DATATYPE_NULL && !finalized)
977 MPI_Type_free(type);
978 }
979 messageTypes.clear();
980 created_=false;
981 }
982
983 }
984
985 template<typename T>
986 template<class T1, class T2, class V, bool send>
987 void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
988 {
989
990 MPIDatatypeInformation<V> dataInfo(data);
991 this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
992
993 typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
994 const const_iterator end=this->remoteIndices_->end();
995
996 // Allocate MPI_Datatypes and deallocate memory for the type construction.
997 for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
998 IndexedTypeInformation& info=dataInfo.information_[process->first];
999 // Shift the displacement
1000 MPI_Aint base;
1001 MPI_Get_address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1002
1003 for(int i=0; i< info.elements; i++) {
1004 info.displ[i]-=base;
1005 }
1006
1007 // Create data type
1008 MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1009 MPI_Type_create_hindexed(info.elements, info.length, info.displ,
1010 MPITraits<typename CommPolicy<V>::IndexedType>::getType(), type);
1011 MPI_Type_commit(type);
1012 // Deallocate memory
1013 info.free();
1014 }
1015 }
1016
1017 template<typename T>
1018 template<class V, bool createForward>
1019 void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1020 {
1021 typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1022 int rank;
1023 static int index = createForward ? 1 : 0;
1024 int noMessages = messageTypes.size();
1025 // allocate request handles
1026 requests_[index] = new MPI_Request[2*noMessages];
1027 const MapIterator end = messageTypes.end();
1028 int request=0;
1029 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1030
1031 // Set up the requests for receiving first
1032 for(MapIterator process = messageTypes.begin(); process != end;
1033 ++process, ++request) {
1034 MPI_Datatype type = createForward ? process->second.second : process->second.first;
1035 void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1036 MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1037 }
1038
1039 // And now the send requests
1040
1041 for(MapIterator process = messageTypes.begin(); process != end;
1042 ++process, ++request) {
1043 MPI_Datatype type = createForward ? process->second.first : process->second.second;
1044 void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1045 MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1046 }
1047 }
1048
1049 template<typename T>
1050 void DatatypeCommunicator<T>::forward()
1051 {
1052 sendRecv(requests_[1]);
1053 }
1054
1055 template<typename T>
1056 void DatatypeCommunicator<T>::backward()
1057 {
1058 sendRecv(requests_[0]);
1059 }
1060
1061 template<typename T>
1062 void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1063 {
1064 int noMessages = messageTypes.size();
1065 // Start the receive calls first
1066 MPI_Startall(noMessages, requests);
1067 // Now the send calls
1068 MPI_Startall(noMessages, requests+noMessages);
1069
1070 // Wait for completion of the communication send first then receive
1071 MPI_Status* status=new MPI_Status[2*noMessages];
1072 for(int i=0; i<2*noMessages; i++)
1073 status[i].MPI_ERROR=MPI_SUCCESS;
1074
1075 int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1076 int receive = MPI_Waitall(noMessages, requests, status);
1077
1078 // Error checks
1079 int success=1, globalSuccess=0;
1080 if(send==MPI_ERR_IN_STATUS) {
1081 int rank;
1082 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1083 std::cerr<<rank<<": Error in sending :"<<std::endl;
1084 // Search for the error
1085 for(int i=noMessages; i< 2*noMessages; i++)
1086 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1087 char message[300];
1088 int messageLength;
1089 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1090 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1091 for(int j = 0; j < messageLength; j++)
1092 std::cout << message[j];
1093 }
1094 std::cerr<<std::endl;
1095 success=0;
1096 }
1097
1098 if(receive==MPI_ERR_IN_STATUS) {
1099 int rank;
1100 MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1101 std::cerr<<rank<<": Error in receiving!"<<std::endl;
1102 // Search for the error
1103 for(int i=0; i< noMessages; i++)
1104 if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1105 char message[300];
1106 int messageLength;
1107 MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1108 std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1109 for(int j = 0; j < messageLength; j++)
1110 std::cerr << message[j];
1111 }
1112 std::cerr<<std::endl;
1113 success=0;
1114 }
1115
1116 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1117
1118 delete[] status;
1119
1120 if(!globalSuccess)
1121 DUNE_THROW(CommunicationError, "A communication error occurred!");
1122
1123 }
1124
1126 {
1127 buffers_[0]=0;
1128 buffers_[1]=0;
1129 bufferSize_[0]=0;
1130 bufferSize_[1]=0;
1131 }
1132
1133 template<class Data, class Interface>
1134 typename std::enable_if<std::is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1135 BufferedCommunicator::build(const Interface& interface)
1136 {
1137 interfaces_=interface.interfaces();
1138 communicator_=interface.communicator();
1139 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1140 ::const_iterator const_iterator;
1141 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1142 const const_iterator end = interfaces_.end();
1143 int lrank;
1144 MPI_Comm_rank(communicator_, &lrank);
1145
1146 bufferSize_[0]=0;
1147 bufferSize_[1]=0;
1148
1149 for(const_iterator interfacePair = interfaces_.begin();
1150 interfacePair != end; ++interfacePair) {
1151 int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1152 int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1153 if (noSend + noRecv > 0)
1154 messageInformation_.insert(std::make_pair(interfacePair->first,
1155 std::make_pair(MessageInformation(bufferSize_[0],
1156 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1157 MessageInformation(bufferSize_[1],
1158 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1159 bufferSize_[0] += noSend;
1160 bufferSize_[1] += noRecv;
1161 }
1162
1163 // allocate the buffers
1164 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1165 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1166
1167 buffers_[0] = new char[bufferSize_[0]];
1168 buffers_[1] = new char[bufferSize_[1]];
1169 }
1170
1171 template<class Data, class Interface>
1172 void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1173 {
1174
1175 interfaces_=interface.interfaces();
1176 communicator_=interface.communicator();
1177 typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1178 ::const_iterator const_iterator;
1179 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1180 const const_iterator end = interfaces_.end();
1181
1182 bufferSize_[0]=0;
1183 bufferSize_[1]=0;
1184
1185 for(const_iterator interfacePair = interfaces_.begin();
1186 interfacePair != end; ++interfacePair) {
1187 int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1188 int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1189 if (noSend + noRecv > 0)
1190 messageInformation_.insert(std::make_pair(interfacePair->first,
1191 std::make_pair(MessageInformation(bufferSize_[0],
1192 noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1193 MessageInformation(bufferSize_[1],
1194 noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1195 bufferSize_[0] += noSend;
1196 bufferSize_[1] += noRecv;
1197 }
1198
1199 bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1200 bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1201 // allocate the buffers
1202 buffers_[0] = new char[bufferSize_[0]];
1203 buffers_[1] = new char[bufferSize_[1]];
1204 }
1205
1206 inline void BufferedCommunicator::free()
1207 {
1208 messageInformation_.clear();
1209 if(buffers_[0])
1210 delete[] buffers_[0];
1211
1212 if(buffers_[1])
1213 delete[] buffers_[1];
1214 buffers_[0]=buffers_[1]=0;
1215 }
1216
1218 {
1219 free();
1220 }
1221
1222 template<class Data>
1223 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1224 (const InterfaceInformation& info) const
1225 {
1226 return info.size();
1227 }
1228
1229
1230 template<class Data>
1231 inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1232 (const Data&, const InterfaceInformation& info) const
1233 {
1234 return operator()(info);
1235 }
1236
1237
1238 template<class Data>
1239 inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1240 (const Data& data, const InterfaceInformation& info) const
1241 {
1242 int entries=0;
1243
1244 for(size_t i=0; i < info.size(); i++)
1245 entries += CommPolicy<Data>::getSize(data,info[i]);
1246
1247 return entries;
1248 }
1249
1250
1251 template<class Data, class GatherScatter, bool FORWARD>
1252 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, [[maybe_unused]] size_t bufferSize) const
1253 {
1254 typedef typename InterfaceMap::const_iterator
1255 const_iterator;
1256
1257 int rank;
1258 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1259 const const_iterator end = interfaces.end();
1260 size_t index=0;
1261
1262 for(const_iterator interfacePair = interfaces.begin();
1263 interfacePair != end; ++interfacePair) {
1264 int size = forward ? interfacePair->second.first.size() :
1265 interfacePair->second.second.size();
1266
1267 for(int i=0; i < size; i++) {
1268 int local = forward ? interfacePair->second.first[i] :
1269 interfacePair->second.second[i];
1270 for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1271
1272#ifdef DUNE_ISTL_WITH_CHECKING
1273 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1274#endif
1275 buffer[index]=GatherScatter::gather(data, local, j);
1276 }
1277
1278 }
1279 }
1280
1281 }
1282
1283
1284 template<class Data, class GatherScatter, bool FORWARD>
1285 inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(
1286 const InterfaceMap& interfaces, const Data& data, Type* buffer, [[maybe_unused]] size_t bufferSize) const
1287 {
1288 typedef typename InterfaceMap::const_iterator
1289 const_iterator;
1290 const const_iterator end = interfaces.end();
1291 size_t index = 0;
1292
1293 int rank;
1294 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1295
1296 for(const_iterator interfacePair = interfaces.begin();
1297 interfacePair != end; ++interfacePair) {
1298 size_t size = FORWARD ? interfacePair->second.first.size() :
1299 interfacePair->second.second.size();
1300
1301 for(size_t i=0; i < size; i++) {
1302
1303#ifdef DUNE_ISTL_WITH_CHECKING
1304 assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1305#endif
1306
1307 buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1308 interfacePair->second.second[i]);
1309 }
1310 }
1311
1312 }
1313
1314
1315 template<class Data, class GatherScatter, bool FORWARD>
1316 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1317 {
1318 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1319 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1320
1321 assert(infoPair!=interfaces.end());
1322
1323 const Information& info = FORWARD ? infoPair->second.second :
1324 infoPair->second.first;
1325
1326 for(size_t i=0, index=0; i < info.size(); i++) {
1327 for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1328 GatherScatter::scatter(data, buffer[index++], info[i], j);
1329 }
1330 }
1331
1332
1333 template<class Data, class GatherScatter, bool FORWARD>
1334 inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1335 {
1336 typedef typename InterfaceMap::value_type::second_type::first_type Information;
1337 const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1338
1339 assert(infoPair!=interfaces.end());
1340
1341 const Information& info = FORWARD ? infoPair->second.second :
1342 infoPair->second.first;
1343
1344 for(size_t i=0; i < info.size(); i++) {
1345 GatherScatter::scatter(data, buffer[i], info[i]);
1346 }
1347 }
1348
1349
1350 template<class GatherScatter,class Data>
1351 void BufferedCommunicator::forward(Data& data)
1352 {
1353 this->template sendRecv<GatherScatter,true>(data, data);
1354 }
1355
1356
1357 template<class GatherScatter, class Data>
1358 void BufferedCommunicator::backward(Data& data)
1359 {
1360 this->template sendRecv<GatherScatter,false>(data, data);
1361 }
1362
1363
1364 template<class GatherScatter, class Data>
1365 void BufferedCommunicator::forward(const Data& source, Data& dest)
1366 {
1367 this->template sendRecv<GatherScatter,true>(source, dest);
1368 }
1369
1370
1371 template<class GatherScatter, class Data>
1372 void BufferedCommunicator::backward(Data& source, const Data& dest)
1373 {
1374 this->template sendRecv<GatherScatter,false>(dest, source);
1375 }
1376
1377
1378 template<class GatherScatter, bool FORWARD, class Data>
1379 void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1380 {
1381 int rank, lrank;
1382
1383 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1384 MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1385
1386 typedef typename CommPolicy<Data>::IndexedType Type;
1387 Type *sendBuffer, *recvBuffer;
1388 size_t sendBufferSize;
1389#ifndef NDEBUG
1390 size_t recvBufferSize;
1391#endif
1392
1393 if(FORWARD) {
1394 sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1395 sendBufferSize = bufferSize_[0];
1396 recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1397#ifndef NDEBUG
1398 recvBufferSize = bufferSize_[1];
1399#endif
1400 }else{
1401 sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1402 sendBufferSize = bufferSize_[1];
1403 recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1404#ifndef NDEBUG
1405 recvBufferSize = bufferSize_[0];
1406#endif
1407 }
1408 typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1409
1410 MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1411
1412 MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1413 MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1414 /* Number of recvRequests that are not MPI_REQUEST_NULL */
1415 size_t numberOfRealRecvRequests = 0;
1416
1417 // Setup receive first
1418 typedef typename InformationMap::const_iterator const_iterator;
1419
1420 const const_iterator end = messageInformation_.end();
1421 size_t i=0;
1422 int* processMap = new int[messageInformation_.size()];
1423
1424 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1425 processMap[i]=info->first;
1426 if(FORWARD) {
1427 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1428 Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1429 if(info->second.second.size_) {
1430 MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1431 MPI_BYTE, info->first, commTag_, communicator_,
1432 recvRequests+i);
1433 numberOfRealRecvRequests += 1;
1434 } else {
1435 // Nothing to receive -> set request to inactive
1436 recvRequests[i]=MPI_REQUEST_NULL;
1437 }
1438 }else{
1439 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1440 Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1441 if(info->second.first.size_) {
1442 MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1443 MPI_BYTE, info->first, commTag_, communicator_,
1444 recvRequests+i);
1445 numberOfRealRecvRequests += 1;
1446 } else {
1447 // Nothing to receive -> set request to inactive
1448 recvRequests[i]=MPI_REQUEST_NULL;
1449 }
1450 }
1451 }
1452
1453 // now the send requests
1454 i=0;
1455 for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1456 if(FORWARD) {
1457 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1458 Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1459 assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1460 if(info->second.first.size_)
1461 MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1462 MPI_BYTE, info->first, commTag_, communicator_,
1463 sendRequests+i);
1464 else
1465 // Nothing to send -> set request to inactive
1466 sendRequests[i]=MPI_REQUEST_NULL;
1467 }else{
1468 assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1469 Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1470 if(info->second.second.size_)
1471 MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1472 MPI_BYTE, info->first, commTag_, communicator_,
1473 sendRequests+i);
1474 else
1475 // Nothing to send -> set request to inactive
1476 sendRequests[i]=MPI_REQUEST_NULL;
1477 }
1478
1479 // Wait for completion of receive and immediately start scatter
1480 i=0;
1481 //int success = 1;
1482 int finished = MPI_UNDEFINED;
1483 MPI_Status status; //[messageInformation_.size()];
1484 //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1485
1486 for(i=0; i< numberOfRealRecvRequests; i++) {
1487 status.MPI_ERROR=MPI_SUCCESS;
1488 MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1489 assert(finished != MPI_UNDEFINED);
1490
1491 if(status.MPI_ERROR==MPI_SUCCESS) {
1492 int& proc = processMap[finished];
1493 typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1494 assert(infoIter != messageInformation_.end());
1495
1496 MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1497 assert(info.start_+info.size_ <= recvBufferSize);
1498
1499 MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1500 }else{
1501 std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1502 //success=0;
1503 }
1504 }
1505
1506 MPI_Status recvStatus;
1507
1508 // Wait for completion of sends
1509 for(i=0; i< messageInformation_.size(); i++)
1510 if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1511 std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1512 //success=0;
1513 }
1514 /*
1515 int globalSuccess;
1516 MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1517
1518 if(!globalSuccess)
1519 DUNE_THROW(CommunicationError, "A communication error occurred!");
1520 */
1521 delete[] processMap;
1522 delete[] sendRequests;
1523 delete[] recvRequests;
1524
1525 }
1526
1527#endif // DOXYGEN
1528
1530}
1531
1532#endif // HAVE_MPI
1533
1534#endif
A few common exception classes.
Standard Dune debug streams.
Classes describing a distributed indexset.
#define DUNE_THROW(E, m)
Definition exceptions.hh:218
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition stdstreams.hh:95
Dune namespace.
Definition alignedallocator.hh:13
Default exception class for I/O errors.
Definition exceptions.hh:231
Flag for marking indexed data structures where data at each index is of the same size.
Definition communicator.hh:110
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition communicator.hh:118
Default policy used for communicating an indexed type.
Definition communicator.hh:128
V::value_type IndexedType
The type we get at each index with operator[].
Definition communicator.hh:147
static int getSize(const V &, int index)
Get the number of primitive elements at that index.
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition communicator.hh:153
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
V Type
The type the policy is for.
Definition communicator.hh:140
Definition communicator.hh:173
Definition communicator.hh:175
VariableBlockVector< FieldVector< K, n >, A > Type
Definition communicator.hh:180
Error thrown if there was a problem with the communication.
Definition communicator.hh:195
GatherScatter default implementation that just copies data.
Definition communicator.hh:202
static void scatter(T &vec, const IndexedType &v, std::size_t i)
CommPolicy< T >::IndexedType IndexedType
Definition communicator.hh:203
static const IndexedType & gather(const T &vec, std::size_t i)
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition communicator.hh:458
void backward(Data &data)
Backward send where target and source are the same.
BufferedCommunicator()
Constructor.
~BufferedCommunicator()
Destructor.
void forward(const Data &source, Data &dest)
Send from source to target.
void free()
Free the allocated memory (i.e. buffers and message information.
std::enable_if< std::is_same< SizeOne, typenameCommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
void build(const Data &source, const Data &target, const Interface &interface)
Build the buffers and information for the communication process.
void forward(Data &data)
Forward send where target and source are the same.
Manager class for the mapping between local indices and globally unique indices.
Definition indexset.hh:218
Base class of all classes representing a communication interface.
Definition parallel/interface.hh:35
Information describing an interface.
Definition parallel/interface.hh:101
Communication interface between remote and local indices.
Definition parallel/interface.hh:209
An index present on the local process.
Definition localindex.hh:35
The indices present on remote processes.
Definition remoteindices.hh:189
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition remoteindices.hh:215
LocalIndex::Attribute Attribute
The type of the attribute.
Definition remoteindices.hh:226
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition remoteindices.hh:221
Provides classes for building the communication interface between remote indices.