14#include "arcane/mesh/DynamicMeshMerger.h"
15#include "arcane/mesh/DynamicMesh.h"
17#include "arcane/utils/ITraceMng.h"
18#include "arcane/utils/ScopedPtr.h"
20#include "arcane/core/IParallelSuperMng.h"
21#include "arcane/core/IParallelMng.h"
22#include "arcane/core/ServiceBuilder.h"
23#include "arcane/core/IArcaneMain.h"
24#include "arcane/core/IMainFactory.h"
26#include "arcane/mesh/MeshExchanger.h"
27#include "arcane/mesh/MeshExchangeMng.h"
28#include "arcane/mesh/MeshExchanger.h"
29#include "arcane/core/IMeshExchanger.h"
30#include "arcane/core/IMeshExchangeMng.h"
31#include "arcane/core/IItemFamilyExchanger.h"
32#include "arcane/core/ItemPrinter.h"
33#include "arcane/core/MeshVisitor.h"
35#include "arccore/base/ReferenceCounter.h"
48class DynamicMeshMergerHelper
58 , m_local_rank(local_rank)
95 class LauncherThreadInfo
110 ReferenceCounter<ITraceMng> m_trace_mng;
111 Ref<IParallelMng> m_parallel_mng;
115 _doLaunch(LauncherThreadInfo* lti)
117 Int32 local_rank = lti->m_local_rank;
118 IParallelMng* pm = lti->m_parallel_mng.get();
119 ITraceMng* tm = lti->m_trace_mng.get();
121 tm->info() <<
"BEGIN LAUNCHER rank=" << local_rank <<
"!\n";
125 tm->info() <<
"END EXCHANGE!\n";
130 _Launcher(LauncherThreadInfo* lti)
132 ITraceMng* tm = lti->m_trace_mng.get();
136 catch (
const Exception& ex) {
137 tm->info() <<
"FATAL: " << tm->traceId()
139 <<
" stack=" << ex.stackTrace();
140 lti->m_has_error =
true;
143 catch (
const std::exception& ex) {
144 tm->info() <<
"FATAL: " << ex.what();
145 lti->m_has_error =
true;
149 tm->info() <<
"UNKNOWN FATAL";
150 lti->m_has_error =
true;
154 class MergeMeshExchanger
160 MergeMeshExchanger(DynamicMesh* mesh,
Int32 local_rank)
161 : MeshExchanger(mesh, mesh->subDomain()->timeStats())
162 , m_local_rank(local_rank)
173 bool computeExchangeInfos()
override
175 mesh()->traceMng()->info() <<
"OVERRIDE COMPUTE EXCHANGE INFOS";
177 UniqueArray<std::set<Int32>> items_to_send;
178 Int32 nb_rank = mesh()->parallelMng()->commSize();
179 items_to_send.resize(nb_rank);
181 for (IItemFamily* family : mesh()->itemFamilies()) {
182 IItemFamilyExchanger* family_exchanger = this->findExchanger(family);
183 if (m_local_rank != 0) {
184 items_to_send[0].clear();
185 ItemGroup all_items = family->allItems();
189 items_to_send[0].insert(iitem.itemLocalId());
195 family_exchanger->setExchangeItems(items_to_send);
197 family_exchanger->computeExchangeInfos();
200 _setNextPhase(ePhase::ProcessExchange);
209 class MergerExchangeMng
214 MergerExchangeMng(DynamicMesh* mesh,
Int32 local_rank)
215 : MeshExchangeMng(mesh)
217 , m_local_rank(local_rank)
222 IMeshExchanger* _createExchanger()
override
224 m_mesh->traceMng()->info() <<
"CREATE MERGE MESH_EXCHANGER";
225 MeshExchanger* ex =
new MergeMeshExchanger(m_mesh, m_local_rank);
240void DynamicMeshMergerHelper::
251 IParallelMng* old_parallel_mng = m_mesh->m_parallel_mng;
252 m_mesh->m_parallel_mng = this->m_parallel_mng;
255 info() <<
"DOING MERGE pm_rank=" << m_mesh->parallelMng()->commRank()
256 <<
" sd_part=" << m_mesh->meshPartInfo().partRank();
259 typedef Collection<DynamicMesh*> DynamicMeshCollection;
260 DynamicMeshCollection all_cascade_meshes = List<DynamicMesh*>();
261 all_cascade_meshes.add(m_mesh);
262 for (
Integer i = 0; i < m_mesh->m_child_meshes.size(); ++i)
263 all_cascade_meshes.add(m_mesh->m_child_meshes[i]);
265 MergerExchangeMng exchange_mng(m_mesh, m_local_rank);
266 IMeshExchanger* iexchanger = exchange_mng.beginExchange();
267 IMeshExchanger* mesh_exchanger = iexchanger;
270 if (mesh_exchanger->computeExchangeInfos()) {
271 pwarning() <<
"No load balance is performed";
272 exchange_mng.endExchange();
273 m_mesh->m_parallel_mng = old_parallel_mng;
278 mesh_exchanger->processExchange();
281 mesh_exchanger->removeNeededItems();
287 auto action = [](ItemGroup& group) {
288 if (group.internal()->hasComputeFunctor() || group.isLocalToSubDomain())
291 group.internal()->removeSuppressedItems();
293 for (DynamicMesh* mesh : all_cascade_meshes) {
294 meshvisitor::visitGroups(mesh, action);
299 mesh_exchanger->allocateReceivedItems();
303 for (DynamicMesh* mesh : all_cascade_meshes)
304 mesh->_internalEndUpdateInit(
true);
306 mesh_exchanger->updateItemGroups();
309 for (DynamicMesh* mesh : all_cascade_meshes)
310 mesh->_computeGroupSynchronizeInfos();
313 mesh_exchanger->updateVariables();
316 for (DynamicMesh* mesh : all_cascade_meshes) {
318 bool print_info = (mesh == m_mesh);
319 mesh->_internalEndUpdateFinal(print_info);
325 mesh_exchanger->finalizeExchange();
328 exchange_mng.endExchange();
332 m_mesh->m_parallel_mng = old_parallel_mng;
338void DynamicMeshMerger::
339mergeMeshes(ConstArrayView<DynamicMesh*> meshes)
341 UniqueArray<DynamicMesh*> all_meshes;
344 all_meshes.add(m_mesh);
345 for (
auto mesh : meshes)
346 all_meshes.add(mesh);
355 Int32 nb_local_rank = all_meshes.size();
357 String message_passing_service =
"SharedMemoryParallelMngContainerFactory";
358 ServiceBuilder<IParallelMngContainerFactory> sf(am->application());
359 auto pbf = sf.createReference(message_passing_service,
SB_AllowNull);
361 ARCANE_FATAL(
"Can not find service '{0}' implementing IParallelMngContainerFactory", message_passing_service);
362 Parallel::Communicator comm = m_mesh->parallelMng()->communicator();
363 Parallel::Communicator machine_comm = m_mesh->parallelMng()->machineCommunicator();
364 Ref<IParallelMngContainer> parallel_builder(pbf->_createParallelMngBuilder(nb_local_rank, comm, machine_comm));
366 IApplication* app = am->application();
367 UniqueArray<LauncherThreadInfo> launch_infos(nb_local_rank);
368 for (
Integer i = 0; i < nb_local_rank; ++i) {
369 LauncherThreadInfo& lti = launch_infos[i];
371 Int32 local_rank = i;
372 Int32 mesh_part_rank = all_meshes[i]->meshPartInfo().partRank();
373 String file_suffix = String::format(
"mm_{0}", mesh_part_rank);
374 lti.m_trace_mng = app->createAndInitializeTraceMng(m_mesh->traceMng(), file_suffix);
375 ITraceMng* tm = lti.m_trace_mng.get();
376 lti.m_parallel_mng = parallel_builder->_createParallelMng(local_rank, tm);
377 tm->setTraceId(String::format(
"Exchanger part={0} pm_rank={1}", mesh_part_rank, local_rank));
379 lti.m_mesh = all_meshes[i];
380 lti.m_local_rank = local_rank;
383 UniqueArray<std::thread*> gths(nb_local_rank);
384 for (
Integer i = 0; i < nb_local_rank; ++i) {
385 gths[i] =
new std::thread(_Launcher, &launch_infos[i]);
387 bool has_error =
false;
388 for (
Integer i = 0; i < nb_local_rank; ++i) {
390 if (launch_infos[i].m_has_error)
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
static IArcaneMain * arcaneMain()
Interface of the parallelism manager for a subdomain.
@ II_NeedRemove
The entity must be removed.
TraceAccessor(ITraceMng *m)
Constructs an accessor via the trace manager m.
TraceMessage info() const
Flow for an information message.
TraceMessage pwarning() const
Implementation of a mesh.
Interface for the mesh exchange manager between subdomains.
Information for a mesh exchange between sub-domains.
@ SB_AllowNull
Allows the service to be absent.
Int32 Integer
Type representing an integer.
std::int32_t Int32
Signed integer type of 32 bits.