14#include "arcane/mesh/DynamicMeshMerger.h"
15#include "arcane/mesh/DynamicMesh.h"
17#include "arcane/utils/ITraceMng.h"
18#include "arcane/utils/ScopedPtr.h"
20#include "arcane/IParallelSuperMng.h"
21#include "arcane/IParallelMng.h"
22#include "arcane/ServiceBuilder.h"
23#include "arcane/IArcaneMain.h"
24#include "arcane/IMainFactory.h"
26#include "arcane/mesh/MeshExchanger.h"
27#include "arcane/mesh/MeshExchangeMng.h"
28#include "arcane/mesh/MeshExchanger.h"
29#include "arcane/IMeshExchanger.h"
30#include "arcane/IMeshExchangeMng.h"
31#include "arcane/IItemFamilyExchanger.h"
32#include "arcane/ItemPrinter.h"
33#include "arcane/MeshVisitor.h"
35#include "arccore/base/ReferenceCounter.h"
48class DynamicMeshMergerHelper
54 :
TraceAccessor(tm), m_mesh(
mesh), m_parallel_mng(pm), m_local_rank(local_rank) {}
86class LauncherThreadInfo
89 LauncherThreadInfo() : m_mesh(nullptr), m_local_rank(-1), m_has_error(false){}
94 ReferenceCounter<ITraceMng> m_trace_mng;
95 Ref<IParallelMng> m_parallel_mng;
99_doLaunch(LauncherThreadInfo* lti)
101 Int32 local_rank = lti->m_local_rank;
102 IParallelMng* pm = lti->m_parallel_mng.get();
103 ITraceMng* tm = lti->m_trace_mng.get();
105 tm->info() <<
"BEGIN LAUNCHER rank=" << local_rank <<
"!\n";
109 tm->info() <<
"END EXCHANGE!\n";
114_Launcher(LauncherThreadInfo* lti)
116 ITraceMng* tm = lti->m_trace_mng.get();
120 catch(
const Exception& ex){
121 tm->info() <<
"FATAL: " << tm->traceId()
123 <<
" stack=" << ex.stackTrace();
124 lti->m_has_error =
true;
127 catch(
const std::exception& ex){
128 tm->info() <<
"FATAL: " << ex.what();
129 lti->m_has_error =
true;
133 tm->info() <<
"UNKNOWN FATAL";
134 lti->m_has_error =
true;
138class MergeMeshExchanger
143 MergeMeshExchanger(DynamicMesh* mesh,
Int32 local_rank)
144 : MeshExchanger(mesh,mesh->subDomain()->timeStats()), m_local_rank(local_rank){}
152 bool computeExchangeInfos()
override
154 mesh()->traceMng()->info() <<
"OVERRIDE COMPUTE EXCHANGE INFOS";
156 UniqueArray<std::set<Int32>> items_to_send;
157 Int32 nb_rank = mesh()->parallelMng()->commSize();
158 items_to_send.resize(nb_rank);
160 for( IItemFamily* family : mesh()->itemFamilies() ){
161 IItemFamilyExchanger* family_exchanger = this->findExchanger(family);
162 if (m_local_rank!=0){
163 items_to_send[0].clear();
164 ItemGroup all_items = family->allItems();
168 items_to_send[0].insert(iitem.itemLocalId());
174 family_exchanger->setExchangeItems(items_to_send);
176 family_exchanger->computeExchangeInfos();
179 _setNextPhase(ePhase::ProcessExchange);
186class MergerExchangeMng
187:
public MeshExchangeMng
190 MergerExchangeMng(DynamicMesh* mesh,
Int32 local_rank)
191 : MeshExchangeMng(mesh), m_mesh(mesh), m_local_rank(local_rank){}
193 IMeshExchanger* _createExchanger()
override
195 m_mesh->traceMng()->info() <<
"CREATE MERGE MESH_EXCHANGER";
196 MeshExchanger* ex =
new MergeMeshExchanger(m_mesh,m_local_rank);
209void DynamicMeshMergerHelper::
220 IParallelMng* old_parallel_mng = m_mesh->m_parallel_mng;
221 m_mesh->m_parallel_mng = this->m_parallel_mng;
224 info() <<
"DOING MERGE pm_rank=" << m_mesh->parallelMng()->commRank()
225 <<
" sd_part=" << m_mesh->meshPartInfo().partRank();
228 typedef Collection<DynamicMesh*> DynamicMeshCollection;
229 DynamicMeshCollection all_cascade_meshes = List<DynamicMesh*>();
230 all_cascade_meshes.add(m_mesh);
231 for(
Integer i=0;i<m_mesh->m_child_meshes.size();++i)
232 all_cascade_meshes.add(m_mesh->m_child_meshes[i]);
234 MergerExchangeMng exchange_mng(m_mesh,m_local_rank);
235 IMeshExchanger* iexchanger = exchange_mng.beginExchange();
236 IMeshExchanger* mesh_exchanger = iexchanger;
239 if (mesh_exchanger->computeExchangeInfos()){
240 pwarning() <<
"No load balance is performed";
241 exchange_mng.endExchange();
242 m_mesh->m_parallel_mng = old_parallel_mng;
247 mesh_exchanger->processExchange();
250 mesh_exchanger->removeNeededItems();
256 auto action = [](ItemGroup& group)
258 if (group.internal()->hasComputeFunctor() || group.isLocalToSubDomain())
261 group.internal()->removeSuppressedItems();
263 for( DynamicMesh* mesh : all_cascade_meshes ){
264 meshvisitor::visitGroups(mesh,action);
269 mesh_exchanger->allocateReceivedItems();
273 for( DynamicMesh* mesh : all_cascade_meshes )
274 mesh->_internalEndUpdateInit(
true);
276 mesh_exchanger->updateItemGroups();
279 for( DynamicMesh* mesh : all_cascade_meshes )
280 mesh->_computeGroupSynchronizeInfos();
283 mesh_exchanger->updateVariables();
286 for( DynamicMesh* mesh : all_cascade_meshes ){
288 bool print_info = (mesh==m_mesh);
289 mesh->_internalEndUpdateFinal(print_info);
295 mesh_exchanger->finalizeExchange();
298 exchange_mng.endExchange();
302 m_mesh->m_parallel_mng = old_parallel_mng;
308void DynamicMeshMerger::
309mergeMeshes(ConstArrayView<DynamicMesh*> meshes)
311 UniqueArray<DynamicMesh*> all_meshes;
314 all_meshes.add(m_mesh);
315 for(
auto mesh : meshes )
316 all_meshes.add(mesh);
325 Int32 nb_local_rank = all_meshes.size();
327 String message_passing_service =
"SharedMemoryParallelMngContainerFactory";
328 ServiceBuilder<IParallelMngContainerFactory> sf(am->application());
329 auto pbf = sf.createReference(message_passing_service,
SB_AllowNull);
331 ARCANE_FATAL(
"Can not find service '{0}' implementing IParallelMngContainerFactory",message_passing_service);
332 Parallel::Communicator comm = m_mesh->parallelMng()->communicator();
333 Ref<IParallelMngContainer> parallel_builder(pbf->_createParallelMngBuilder(nb_local_rank,comm));
335 IApplication* app = am->application();
336 UniqueArray<LauncherThreadInfo> launch_infos(nb_local_rank);
337 for(
Integer i=0; i<nb_local_rank; ++i ){
338 LauncherThreadInfo& lti = launch_infos[i];
340 Int32 local_rank = i;
341 Int32 mesh_part_rank = all_meshes[i]->meshPartInfo().partRank();
342 String file_suffix = String::format(
"mm_{0}",mesh_part_rank);
343 lti.m_trace_mng = app->createAndInitializeTraceMng(m_mesh->traceMng(),file_suffix);
344 ITraceMng* tm = lti.m_trace_mng.get();
345 lti.m_parallel_mng = parallel_builder->_createParallelMng(local_rank,tm);
346 tm->setTraceId(String::format(
"Exchanger part={0} pm_rank={1}",mesh_part_rank,local_rank));
348 lti.m_mesh = all_meshes[i];
349 lti.m_local_rank = local_rank;
352 UniqueArray<std::thread*> gths(nb_local_rank);
353 for(
Integer i=0; i<nb_local_rank; ++i ){
354 gths[i] =
new std::thread(_Launcher,&launch_infos[i]);
356 bool has_error =
false;
357 for(
Integer i=0; i<nb_local_rank; ++i ){
359 if (launch_infos[i].m_has_error)
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
static IArcaneMain * arcaneMain()
Interface du gestionnaire de parallélisme pour un sous-domaine.
Interface du gestionnaire de traces.
@ II_NeedRemove
L'entité doit être supprimé
Classe d'accès aux traces.
TraceAccessor(ITraceMng *m)
Construit un accesseur via le gestionnaire de trace m.
TraceMessage info() const
Flot pour un message d'information.
TraceMessage pwarning() const
Implémentation d'un maillage.
Informations pour un échange de maillage entre sous-domaines.
@ SB_AllowNull
Autorise l'absence du service.
Int32 Integer
Type représentant un entier.
std::int32_t Int32
Type entier signé sur 32 bits.