14#include "arcane/mesh/DynamicMeshMerger.h"
15#include "arcane/mesh/DynamicMesh.h"
17#include "arcane/utils/ITraceMng.h"
18#include "arcane/utils/ScopedPtr.h"
20#include "arcane/IParallelSuperMng.h"
21#include "arcane/IParallelMng.h"
22#include "arcane/ServiceBuilder.h"
23#include "arcane/IArcaneMain.h"
24#include "arcane/IMainFactory.h"
26#include "arcane/mesh/MeshExchanger.h"
27#include "arcane/mesh/MeshExchangeMng.h"
28#include "arcane/mesh/MeshExchanger.h"
29#include "arcane/IMeshExchanger.h"
30#include "arcane/IMeshExchangeMng.h"
31#include "arcane/IItemFamilyExchanger.h"
32#include "arcane/ItemPrinter.h"
33#include "arcane/MeshVisitor.h"
35#include "arccore/base/ReferenceCounter.h"
54 :
TraceAccessor(
tm), m_mesh(mesh), m_parallel_mng(pm), m_local_rank(local_rank) {}
86class LauncherThreadInfo
89 LauncherThreadInfo() : m_mesh(nullptr), m_local_rank(-1), m_has_error(false){}
94 ReferenceCounter<ITraceMng> m_trace_mng;
95 Ref<IParallelMng> m_parallel_mng;
99_doLaunch(LauncherThreadInfo* lti)
101 Int32 local_rank = lti->m_local_rank;
102 IParallelMng* pm = lti->m_parallel_mng.get();
103 ITraceMng* tm = lti->m_trace_mng.get();
105 tm->
info() <<
"BEGIN LAUNCHER rank=" << local_rank <<
"!\n";
107 DynamicMeshMergerHelper helper(lti->m_trace_mng.get(),lti->m_mesh,pm,local_rank);
109 tm->info() <<
"END EXCHANGE!\n";
114_Launcher(LauncherThreadInfo* lti)
116 ITraceMng* tm = lti->m_trace_mng.get();
120 catch(
const Exception& ex){
121 tm->info() <<
"FATAL: " << tm->traceId()
123 <<
" stack=" << ex.stackTrace();
124 lti->m_has_error =
true;
127 catch(
const std::exception& ex){
128 tm->info() <<
"FATAL: " << ex.what();
129 lti->m_has_error =
true;
133 tm->info() <<
"UNKNOWN FATAL";
134 lti->m_has_error =
true;
138class MergeMeshExchanger
139:
public MeshExchanger
143 MergeMeshExchanger(DynamicMesh* mesh,
Int32 local_rank)
144 : MeshExchanger(mesh,mesh->subDomain()->timeStats()), m_local_rank(local_rank){}
152 bool computeExchangeInfos()
override
154 mesh()->
traceMng()->
info() <<
"OVERRIDE COMPUTE EXCHANGE INFOS";
156 UniqueArray<std::set<Int32>> items_to_send;
157 Int32 nb_rank = mesh()->parallelMng()->commSize();
158 items_to_send.resize(nb_rank);
160 for( IItemFamily* family : mesh()->itemFamilies() ){
161 IItemFamilyExchanger* family_exchanger = this->findExchanger(family);
162 if (m_local_rank!=0){
163 items_to_send[0].clear();
164 ItemGroup all_items = family->allItems();
168 items_to_send[0].insert(iitem.itemLocalId());
174 family_exchanger->setExchangeItems(items_to_send);
176 family_exchanger->computeExchangeInfos();
179 _setNextPhase(ePhase::ProcessExchange);
186class MergerExchangeMng
187:
public MeshExchangeMng
190 MergerExchangeMng(DynamicMesh* mesh,
Int32 local_rank)
191 : MeshExchangeMng(mesh), m_mesh(mesh), m_local_rank(local_rank){}
193 IMeshExchanger* _createExchanger()
override
195 m_mesh->traceMng()->info() <<
"CREATE MERGE MESH_EXCHANGER";
196 MeshExchanger* ex =
new MergeMeshExchanger(m_mesh,m_local_rank);
209void DynamicMeshMergerHelper::
220 IParallelMng* old_parallel_mng = m_mesh->m_parallel_mng;
221 m_mesh->m_parallel_mng = this->m_parallel_mng;
228 typedef Collection<DynamicMesh*> DynamicMeshCollection;
229 DynamicMeshCollection all_cascade_meshes = List<DynamicMesh*>();
230 all_cascade_meshes.add(m_mesh);
231 for(Integer i=0;i<m_mesh->m_child_meshes.size();++i)
232 all_cascade_meshes.add(m_mesh->m_child_meshes[i]);
234 MergerExchangeMng exchange_mng(m_mesh,m_local_rank);
235 IMeshExchanger* iexchanger = exchange_mng.beginExchange();
236 IMeshExchanger* mesh_exchanger = iexchanger;
239 if (mesh_exchanger->computeExchangeInfos()){
240 pwarning() <<
"No load balance is performed";
241 exchange_mng.endExchange();
242 m_mesh->m_parallel_mng = old_parallel_mng;
247 mesh_exchanger->processExchange();
250 mesh_exchanger->removeNeededItems();
256 auto action = [](ItemGroup& group)
258 if (group.internal()->hasComputeFunctor() || group.isLocalToSubDomain())
261 group.internal()->removeSuppressedItems();
263 for( DynamicMesh* mesh : all_cascade_meshes ){
264 meshvisitor::visitGroups(mesh,action);
269 mesh_exchanger->allocateReceivedItems();
273 for( DynamicMesh* mesh : all_cascade_meshes )
274 mesh->_internalEndUpdateInit(true);
276 mesh_exchanger->updateItemGroups();
279 for( DynamicMesh* mesh : all_cascade_meshes )
280 mesh->_computeGroupSynchronizeInfos();
283 mesh_exchanger->updateVariables();
286 for( DynamicMesh* mesh : all_cascade_meshes ){
288 bool print_info = (mesh==m_mesh);
289 mesh->_internalEndUpdateFinal(print_info);
295 mesh_exchanger->finalizeExchange();
298 exchange_mng.endExchange();
302 m_mesh->m_parallel_mng = old_parallel_mng;
308void DynamicMeshMerger::
309mergeMeshes(ConstArrayView<DynamicMesh*> meshes)
311 UniqueArray<DynamicMesh*> all_meshes;
314 all_meshes.add(m_mesh);
315 for(
auto mesh : meshes )
316 all_meshes.
add(mesh);
325 Int32 nb_local_rank = all_meshes.size();
327 String message_passing_service =
"SharedMemoryParallelMngContainerFactory";
328 ServiceBuilder<IParallelMngContainerFactory> sf(am->application());
329 auto pbf = sf.createReference(message_passing_service,
SB_AllowNull);
331 ARCANE_FATAL(
"Can not find service '{0}' implementing IParallelMngContainerFactory",message_passing_service);
333 Ref<IParallelMngContainer> parallel_builder(pbf->_createParallelMngBuilder(nb_local_rank,comm));
335 IApplication* app = am->application();
336 UniqueArray<LauncherThreadInfo> launch_infos(nb_local_rank);
337 for( Integer i=0; i<nb_local_rank; ++i ){
338 LauncherThreadInfo& lti = launch_infos[i];
340 Int32 local_rank = i;
341 Int32 mesh_part_rank = all_meshes[i]->meshPartInfo().partRank();
342 String file_suffix = String::format(
"mm_{0}",mesh_part_rank);
343 lti.m_trace_mng = app->createAndInitializeTraceMng(m_mesh->
traceMng(),file_suffix);
344 ITraceMng* tm = lti.m_trace_mng.get();
345 lti.m_parallel_mng = parallel_builder->_createParallelMng(local_rank,tm);
346 tm->setTraceId(String::format(
"Exchanger part={0} pm_rank={1}",mesh_part_rank,local_rank));
348 lti.m_mesh = all_meshes[i];
349 lti.m_local_rank = local_rank;
352 UniqueArray<std::thread*> gths(nb_local_rank);
353 for( Integer i=0; i<nb_local_rank; ++i ){
354 gths[i] =
new std::thread(_Launcher,&launch_infos[i]);
356 bool has_error =
false;
357 for( Integer i=0; i<nb_local_rank; ++i ){
359 if (launch_infos[i].m_has_error)
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
static IArcaneMain * arcaneMain()
Interface du gestionnaire de parallélisme pour un sous-domaine.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
virtual Parallel::Communicator communicator() const =0
Communicateur MPI associé à ce gestionnaire.
virtual void barrier()=0
Effectue une barière.
@ II_NeedRemove
L'entité doit être supprimé
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Implémentation d'un maillage.
ITraceMng * traceMng() override
Gestionnaire de message associé
IParallelMng * parallelMng() override
Gestionnaire de parallèlisme.
const MeshPartInfo & meshPartInfo() const override
Informations sur les parties du maillage.
void endUpdate() override
Notifie l'instance de la fin de la modification du maillage.
Interface du gestionnaire de traces.
virtual TraceMessage info()=0
Flot pour un message d'information.
Classe d'accès aux traces.
TraceMessage pwarning() const
ITraceMng * traceMng() const
Gestionnaire de trace.
TraceMessage info() const
Flot pour un message d'information.
void add(ArrayView< T > lhs, ConstArrayView< T > copy_array)
Ajoute le tableau copy_array dans l'instance.
@ SB_AllowNull
Autorise l'absence du service.