Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
ArcaneMainBatch.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ArcaneMainBatch.cc (C) 2000-2026 */
9/* */
10/* Batch execution management. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/Iostream.h"
15#include "arcane/utils/Ptr.h"
16#include "arcane/utils/StdHeader.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/List.h"
19#include "arcane/utils/ApplicationInfo.h"
20#include "arcane/utils/NotSupportedException.h"
21#include "arcane/utils/FatalErrorException.h"
22#include "arcane/utils/String.h"
23#include "arcane/utils/ITraceMng.h"
24#include "arcane/utils/OStringStream.h"
25#include "arcane/utils/IMemoryInfo.h"
26#include "arcane/utils/Array.h"
27#include "arcane/utils/IFunctor.h"
28#include "arcane/utils/StringBuilder.h"
29#include "arcane/utils/ScopedPtr.h"
30#include "arcane/utils/ValueConvert.h"
31#include "arcane/utils/IProcessorAffinityService.h"
32#include "arcane/utils/ArgumentException.h"
33#include "arcane/utils/CStringUtils.h"
34#include "arcane/utils/ITraceMngPolicy.h"
35#include "arcane/utils/CommandLineArguments.h"
36#include "arcane/utils/CriticalSection.h"
37#include "arccore/common/internal/ParameterListPropertyReader.h"
38#include "arccore/common/internal/Property.h"
39
40#include "arcane/impl/ArcaneMain.h"
41#include "arcane/impl/ParallelReplication.h"
42
43#include "arcane/core/IIOMng.h"
44#include "arcane/core/ICodeService.h"
45#include "arcane/core/ISession.h"
46#include "arcane/core/Timer.h"
47#include "arcane/core/ISubDomain.h"
48#include "arcane/core/IApplication.h"
49#include "arcane/core/ITimeLoopMng.h"
50#include "arcane/core/ITimeStats.h"
51#include "arcane/core/SequentialSection.h"
52#include "arcane/core/IParallelSuperMng.h"
53#include "arcane/core/ITimeHistoryMng.h"
54#include "arcane/core/IDirectExecution.h"
55#include "arcane/core/IDirectSubDomainExecuteFunctor.h"
56#include "arcane/core/ICaseMng.h"
57#include "arcane/core/ServiceFinder2.h"
58#include "arcane/core/SubDomainBuildInfo.h"
59#include "arcane/core/IParallelMng.h"
60#include "arcane/core/IMainFactory.h"
61#include "arcane/core/ApplicationBuildInfo.h"
62#include "arcane/core/CaseDatasetSource.h"
63
64#include "arcane/core/ServiceUtils.h"
65
66#include "arcane/core/IVariableMng.h"
67#include "arcane/core/VariableCollection.h"
68#include "arcane/core/internal/IVariableMngInternal.h"
69
70#include "arcane/impl/ExecutionStatsDumper.h"
71#include "arcane/impl/TimeLoopReader.h"
72
73#include "arccore/common/accelerator/internal/RunnerInternal.h"
74
75#include <thread>
76
77/*---------------------------------------------------------------------------*/
78/*---------------------------------------------------------------------------*/
79
80namespace Arcane
81{
82
83/*---------------------------------------------------------------------------*/
84/*---------------------------------------------------------------------------*/
85
88{
89 ARCANE_DECLARE_PROPERTY_CLASS(ArcaneMainBatchProperties);
90
91 public:
92
93 Int32 m_max_iteration = 0;
94 bool m_is_continue = false;
96};
97
98/*---------------------------------------------------------------------------*/
99/*---------------------------------------------------------------------------*/
100
104class ArcaneMainBatch
105: public ArcaneMain
106{
107 public:
108
110 class SessionExec
111 {
113 class SubInfo
114 {
115 public:
116
117 SubInfo()
118 : m_sub_domain(nullptr)
119 , m_time_stats(nullptr)
120 , m_want_print_stats(false)
121 {}
122 ~SubInfo()
123 {
124 // ITimeStats must be destroyed first because it uses
125 // the TimerMng of IParallelMng.
126 delete m_time_stats;
127 //delete m_rank_parallel_mng;
128 //m_world_parallel_mng.reset();
129 // The subdomain is destroyed when the session ends
130 }
131 SubInfo(const SubInfo&) = delete;
132 void operator=(const SubInfo&) = delete;
133
134 public:
135
136 Ref<IParallelMng> m_world_parallel_mng;
137 Ref<IParallelMng> m_rank_parallel_mng;
138 ISubDomain* m_sub_domain;
139 ITimeStats* m_time_stats;
140 bool m_want_print_stats;
141 };
142
143 public:
144
145 SessionExec(ArcaneMainBatch* arcane_main, ISession* session, Int32 nb_local_rank)
146 : m_arcane_main(arcane_main)
147 , m_session(session)
149 , m_direct_test_name(m_arcane_main->m_direct_test_name)
150 , m_properties(m_arcane_main->m_properties)
151 , m_code_service(m_arcane_main->m_code_service)
152 , m_sub_infos(nb_local_rank)
153 , m_direct_sub_domain_execute_functor(m_arcane_main->_directExecuteFunctor())
154 {
155 const CaseDatasetSource& dataset_source = m_arcane_main->applicationBuildInfo().caseDatasetSource();
156 m_case_file = dataset_source.fileName();
157 m_case_bytes = dataset_source.content();
158 // The sub_infos for each thread are created in executeRank()
159 m_sub_infos.fill(nullptr);
160 }
162 {
163 for (Integer i = 0, n = m_sub_infos.size(); i < n; ++i)
164 delete m_sub_infos[i];
165 }
166
167 public:
168
169 // Collective over the process threads
170 void executeRank(Int32 local_rank);
171
172 private:
173
174 IApplication* _application() { return m_arcane_main->application(); }
175
176 private:
177
178 ArcaneMainBatch* m_arcane_main;
179 ISession* m_session;
181 String m_direct_test_name;
186 UniqueArray<SubInfo*> m_sub_infos;
187 IDirectSubDomainExecuteFunctor* m_direct_sub_domain_execute_functor;
188
189 private:
190
191 void _execDirectTest(IParallelMng* pm, const String& test_name, bool is_collective);
192 void _printStats(ISubDomain* sd, ITraceMng* trace, ITimeStats* time_stat);
193 void _createAndRunSubDomain(SubInfo* sub_info, Ref<IParallelMng> pm, Ref<IParallelMng> all_replica_pm, Int32 local_rank);
194 };
195
196 class ExecFunctor
197 : public IFunctor
198 {
199 public:
200
201 ExecFunctor(SessionExec* session_exec, Int32 local_rank)
202 : m_session_exec(session_exec)
203 , m_local_rank(local_rank)
204 {
205 }
206
207 public:
208
209 void executeFunctor() override
210 {
211 m_session_exec->executeRank(m_local_rank);
212 }
213
214 private:
215
216 SessionExec* m_session_exec;
217 Int32 m_local_rank;
218 };
219
220 public:
221
223 ~ArcaneMainBatch() override;
224
225 void build() override;
226 void initialize() override;
227 int execute() override;
228 void doAbort() override;
229 bool parseArgs(StringList args) override;
230 void finalize() override;
231
232 private:
233
234 ISession* m_session = nullptr;
240 String m_direct_exec_name;
241 String m_direct_test_name;
243 SessionExec* m_session_exec = nullptr;
244
245 private:
246
247 bool _sequentialParseArgs(StringList args);
248};
249
250/*---------------------------------------------------------------------------*/
251/*---------------------------------------------------------------------------*/
252
253extern "C++" ARCANE_IMPL_EXPORT IArcaneMain*
254createArcaneMainBatch(const ApplicationInfo& app_info, IMainFactory* main_factory)
255{
256 return new ArcaneMainBatch(app_info, main_factory);
257}
258
259/*---------------------------------------------------------------------------*/
260/*---------------------------------------------------------------------------*/
261
262ArcaneMainBatch::
263ArcaneMainBatch(const ApplicationInfo& exe_info, IMainFactory* main_factory)
264: ArcaneMain(exe_info, main_factory)
265, m_init_only(false)
266, m_check_case_only(false)
267, m_has_sub_domain_threads(false)
268{
269}
270
271/*---------------------------------------------------------------------------*/
272/*---------------------------------------------------------------------------*/
273
275build()
276{
278}
279
280/*---------------------------------------------------------------------------*/
281/*---------------------------------------------------------------------------*/
282
288
289/*---------------------------------------------------------------------------*/
290/*---------------------------------------------------------------------------*/
291
292ArcaneMainBatch::
293~ArcaneMainBatch()
294{
295 // Normally finalize() should have been called to release the
296 // various objects (m_session, m_code_service, ...).
297 // If this is not the case, it is probably due to an exception and in
298 // this case we do nothing to avoid destroying objects whose internal
299 // state we do not know well.
300}
301
302/*---------------------------------------------------------------------------*/
303/*---------------------------------------------------------------------------*/
304
307{
308 if (ArcaneMain::parseArgs(args))
309 return true;
310
311 bool r = _sequentialParseArgs(args);
312 return r;
313}
314
315/*****************************************************************************
316 * The variables ARCANE_NB_SUB_DOMAIN & ARCANE_IDLE_SERVICE are less prioritized
317 * than the arguments passed to the executable.
318 *****************************************************************************/
319bool ArcaneMainBatch::
320_sequentialParseArgs(StringList args)
321{
322 ITraceMng* trace = _application()->traceMng();
323
324 String us_arcane_opt("-arcane_opt");
325 String us_init_only("init_only");
326 String us_check_case_only("check_case_only");
327 String us_continue("continue");
328 String us_max_iteration("max_iteration");
329 String us_casename("casename");
330 String us_direct_exec("direct_exec");
331 String us_direct_test("direct_test");
332 String us_direct_mesh("direct_mesh");
333 String us_tool_arg("tool_arg");
334 String us_direct_exec_mesh_arg("direct_exec_mesh_arg");
335 String us_nb_sub_domain("nb_sub_domain");
336 String us_nb_replication("nb_replication");
337 String us_idle_service("idle_service");
338
339 // Fills 'm_properties' based on command line parameters
340 // TODO: This mechanism has been available since January 2021. Eventually, it will be
341 // obsolete and the possibility of specifying options via '-arcane_opt' will be removed.
342 properties::readFromParameterList(applicationInfo().commandLineArguments().parameters(), m_properties);
343
344 CaseDatasetSource& dataset_source = _applicationBuildInfo().caseDatasetSource();
345 // Indicates if we have a dataset.
346 bool has_case_dataset_content = !(dataset_source.fileName().empty() && dataset_source.content().empty());
347 Integer nb_arg = args.count();
348 if (nb_arg < 2 && !has_case_dataset_content) {
349 trace->info() << "Usage: program input_data ; for more information: program -arcane_opt help";
350 trace->pfatal() << "No input data specified.";
351 }
352
353 StringList unknown_args;
354 StringBuilder tool_args_xml;
355 StringBuilder direct_exec_mesh_args_xml;
356 String tool_mesh;
357
358 String nb_sub_domain_str;
359 String nb_replication_str;
360 String idle_service_name = platform::getEnvironmentVariable("ARCANE_IDLE_SERVICE");
361 if (!idle_service_name.null())
362 m_properties.m_idle_service_name = idle_service_name;
363
364 for (Integer i = 1, s = nb_arg - 1; i < s; ++i) {
365 // cerr << "** ARGS ARGS " << i << ' ' << args[i] << '\n';
366 if (args[i] != us_arcane_opt) {
367 unknown_args.add(args[i]);
368 continue;
369 }
370 bool is_valid_opt = false;
371 ++i;
372 String str;
373 if (i < s)
374 str = args[i];
375 if (str == us_init_only) {
376 m_init_only = true;
377 is_valid_opt = true;
378 }
379 else if (str == us_check_case_only) {
380 m_check_case_only = true;
381 is_valid_opt = true;
382 }
383 else if (str == us_continue) {
384 m_properties.m_is_continue = true;
385 is_valid_opt = true;
386 }
387 else if (str == us_max_iteration) {
388 ++i;
389 if (i < s) {
390 m_properties.m_max_iteration = CStringUtils::toInteger(args[i].localstr());
391 //cerr << "** MAX ITER " << m_max_iteration << '\n';
392 is_valid_opt = true;
393 }
394 else
395 trace->pfatal() << "Option 'max_iteration' must specify the number of iterations";
396 }
397 // Case name.
398 else if (str == us_casename) {
399 ++i;
400 if (i < s) {
401 m_case_name = args[i];
402 is_valid_opt = true;
403 }
404 }
405 else if (str == us_direct_exec) {
406 ++i;
407 if (i < s) {
408 m_direct_exec_name = args[i];
409 //trace->info()<<"[ArcaneMainBatch] m_direct_exec_name="<<args[i];
410 is_valid_opt = true;
411 }
412 }
413 else if (str == us_direct_test) {
414 ++i;
415 if (i < s) {
416 m_direct_test_name = args[i];
417 //trace->info()<<"[ArcaneMainBatch] m_direct_test_name="<<args[i];
418 is_valid_opt = true;
419 }
420 }
421 else if (str == us_tool_arg || str == us_direct_exec_mesh_arg) {
422 ++i;
423 String arg;
424 String value;
425 if (i < s) {
426 arg = args[i];
427 }
428 ++i;
429 if (i < s) {
430 value = args[i];
431 is_valid_opt = true;
432 String to_add = String::format("<{0}>{1}</{2}>\n", arg, value, arg);
433 if (str == us_tool_arg)
434 tool_args_xml += to_add;
435 else if (str == us_direct_exec_mesh_arg)
436 direct_exec_mesh_args_xml += to_add;
437 }
438 }
439 else if (str == us_nb_sub_domain) {
440 ++i;
441 if (i < s) {
442 nb_sub_domain_str = args[i];
443 //trace->info()<<"[ArcaneMainBatch] nb_sub_domain_str="<<args[i];
444 is_valid_opt = true;
445 }
446 }
447 else if (str == us_nb_replication) {
448 ++i;
449 if (i < s) {
450 nb_replication_str = args[i];
451 //trace->info()<<"[ArcaneMainBatch] nb_sub_domain_str="<<args[i];
452 is_valid_opt = true;
453 }
454 }
455 else if (str == us_idle_service) {
456 ++i;
457 if (i < s) {
458 m_properties.m_idle_service_name = args[i];
459 //trace->info()<<"[ArcaneMainBatch] m_idle_service_name="<<args[i];
460 is_valid_opt = true;
461 }
462 }
463 if (!is_valid_opt) {
464 trace->pfatal() << "Unknown Arcane option <" << str << ">\n";
465 }
466 }
467
468 bool use_direct_test = (!m_direct_test_name.null());
469 bool use_direct_exec = (!m_direct_exec_name.null());
470
471 if (use_direct_test) {
472 }
473 else if (use_direct_exec) {
474 // In this case, the last argument of the command line is
475 // the mesh name.
476 tool_mesh = args[nb_arg - 1];
477 dataset_source.setFileName("Dummy.arc");
478 }
479 else {
480 // The case name is contained in the last argument of the command line.
481 // We take this argument unless a filename has already been set before
482 // initializing Arcane.
483 if (dataset_source.fileName().empty() && dataset_source.content().empty())
484 dataset_source.setFileName(args[nb_arg - 1]);
485 }
486
487 if (!nb_sub_domain_str.null()) {
488 Int32 nb_sub_domain = 0;
489 bool is_bad = builtInGetValue(nb_sub_domain, nb_sub_domain_str);
490 if (is_bad || nb_sub_domain <= 0) {
491 trace->pfatal() << "Invalid number of subdomains : " << nb_sub_domain;
492 }
493 trace->info() << "Use '" << nb_sub_domain << "' subdomains";
494 _applicationBuildInfo().setNbProcessusSubDomain(nb_sub_domain);
495 }
496
497 if (!nb_replication_str.null()) {
498 Int32 nb_replication = 0;
499 bool is_bad = builtInGetValue(nb_replication, nb_replication_str);
500 if (is_bad || nb_replication < 0) {
501 trace->pfatal() << "Invalid number of replication : " << nb_replication;
502 }
503 trace->info() << "Use replication of subdomains nb_replication=" << nb_replication;
504 _applicationBuildInfo().setNbReplicationSubDomain(nb_replication);
505 }
506
507 if (_applicationBuildInfo().nbReplicationSubDomain() != 0 && _applicationBuildInfo().nbProcessusSubDomain() != 0)
508 trace->pfatal() << "The subdomains number of replication and restriction options are incompatible.";
509
510 if (!use_direct_test) {
511 String case_file = dataset_source.fileName();
512 //trace->info()<<"[ArcaneMainBatch] !use_direct_test, getCodeService";
513 m_code_service = _application()->getCodeService(case_file);
514
515 if (!m_code_service) {
516 trace->info() << "The file `" << case_file << "' is not a known file type.";
517 case_file = args[nb_arg - 2];
518
519 m_code_service = _application()->getCodeService(case_file);
520 if (!m_code_service) {
521 trace->pfatal() << "File extension not valid.";
522 }
523 }
524 }
525
526 if (use_direct_exec) {
527 //trace->info()<<"[ArcaneMainBatch] use_direct_test!";
528 // Analyzes the arguments corresponding to direct execution options
529 // and builds an XML file from them.
530 StringBuilder s;
531 s += "<?xml version=\"1.0\"?>\n";
532 s += "<case codename=\"ArcaneDriver\" xml:lang=\"en\" codeversion=\"1.0\">";
533 s += " <arcane>\n";
534 s += " <title>DirectExec</title>\n";
535 s += " <description>DirectExec</description>\n";
536 s += " <timeloop>ArcaneDirectExecutionLoop</timeloop>\n";
537 s += " </arcane>\n";
538 s += " <meshes>\n";
539 s += " <mesh>\n";
540 s += String::format(" <filename>{0}</filename>\n", tool_mesh);
541 s += direct_exec_mesh_args_xml;
542 s += " </mesh>\n";
543 s += " </meshes>\n";
544 s += " <arcane-direct-execution>\n";
545 s += String::format(" <tool name='{0}'>\n", m_direct_exec_name);
546 s += tool_args_xml;
547 s += " </tool>\n";
548 s += " </arcane-direct-execution>\n";
549 s += "</case>\n";
550 dataset_source.setFileName("(None)");
551 String buf = s;
552 dataset_source.setContent(buf.utf8());
553 trace->info() << "Direct exec xml file=" << s;
554 }
555
556 if (m_code_service.get()) {
557 bool is_bad = m_code_service->parseArgs(unknown_args);
558 if (is_bad)
559 return true;
560 }
561
562 if (!unknown_args.empty()) {
563 trace->info() << "Unknown command line option: " << unknown_args[0];
564 }
565
566 return false;
567}
568
569/*---------------------------------------------------------------------------*/
570/*---------------------------------------------------------------------------*/
571
572namespace
573{
574 struct LaunchThreadInfo
575 {
576 ArcaneMainBatch* arcane_main;
577 ArcaneMainBatch::SessionExec* session_exec;
578 IApplication* application;
579 Int32 thread_index;
580 };
581} // namespace
582
583/*---------------------------------------------------------------------------*/
584/*---------------------------------------------------------------------------*/
585
586/*
587 * This function is called when a thread is created.
588 */
589void _ThreadWrapper(LaunchThreadInfo* lti)
590{
591 ArcaneMainBatch* amb = lti->arcane_main;
592 IApplication* main_app = lti->application;
593 ArcaneMainBatch::ExecFunctor functor(lti->session_exec, lti->thread_index);
594 bool clean_abort = false;
595 bool is_master = lti->thread_index == 0;
596 int r = ArcaneMain::callFunctorWithCatchedException(&functor, amb, &clean_abort, is_master);
597 if (r != 0 && !clean_abort) {
598 // The thread has finished but since it is the only one that crashed,
599 // it is possible that the others are blocked.
600 // In this case, we perform an abort to prevent blocking
601 // TODO: try to kill the other threads correctly.
602 if (main_app) {
603 IParallelSuperMng* psm = main_app->parallelSuperMng();
604 psm->tryAbort();
605 }
606 }
607}
608
609/*---------------------------------------------------------------------------*/
610/*---------------------------------------------------------------------------*/
611
613execute()
614{
615 ITraceMng* trace = _application()->traceMng();
616
617 if (m_code_service.get())
618 m_session = m_code_service->createSession();
619
620 IParallelSuperMng* psm = _application()->parallelSuperMng();
621 Int32 nb_total_rank = psm->commSize();
622 const Integer nb_wanted_sub_domain = applicationBuildInfo().nbReplicationSubDomain();
623 CaseDatasetSource& dataset_source = _applicationBuildInfo().caseDatasetSource();
624 if (nb_wanted_sub_domain > nb_total_rank)
625 ARCANE_THROW(ArgumentException, "Number of subdomain '{0}' > number of allocated cores '{1}",
626 nb_wanted_sub_domain, nb_total_rank);
627
628 Integer nb_local_rank = psm->nbLocalSubDomain();
629 trace->info() << "NB_LOCAL_RANK=" << nb_local_rank;
630 if (nb_local_rank >= 1)
632 int return_value = 0;
633
634 // Reading dataset data.
635 if (dataset_source.content().empty() && m_direct_test_name.null()) {
636 String case_file = dataset_source.fileName();
637 trace->info() << "Reading input data '" << case_file << "'";
638 IIOMng* io_mng = _application()->ioMng();
639 UniqueArray<Byte> case_bytes;
640 bool is_bad = io_mng->collectiveRead(case_file, case_bytes);
641 if (is_bad)
642 ARCANE_THROW(ParallelFatalErrorException, "Cannot read input data file '{0}'", case_file);
643 dataset_source.setContent(case_bytes);
644 }
645
646 m_session_exec = new SessionExec(this, m_session, nb_local_rank);
647
648 UniqueArray<LaunchThreadInfo> thinfo(nb_local_rank);
649 for (Integer i = 0; i < nb_local_rank; ++i) {
650 thinfo[i].arcane_main = this;
651 thinfo[i].session_exec = m_session_exec;
652 thinfo[i].application = _application();
653 thinfo[i].thread_index = i;
654 }
655
656 if (nb_local_rank > 1) {
657 UniqueArray<std::thread*> gths(nb_local_rank);
658 for (Integer i = 0; i < nb_local_rank; ++i) {
659 gths[i] = new std::thread(_ThreadWrapper, &thinfo[i]);
660 }
661 for (Integer i = 0; i < nb_local_rank; ++i) {
662 gths[i]->join();
663 delete gths[i];
664 }
665 }
666 else {
668 m_session_exec->executeRank(0);
669 }
670
671 // TODO: remove because it is useless as it always equals 0.
672 return return_value;
673}
674
675/*---------------------------------------------------------------------------*/
676/*---------------------------------------------------------------------------*/
677
678/*
679 * In mode with one subdomain per thread, this function is called
680 * by each thread (potentially concurrently) for its subdomain.
681 * \a local_rank indicates the local rank of the thread, which is between 0 and \a nb_local_sub_domain (as defined in execute()).
682 */
683void ArcaneMainBatch::SessionExec::
684executeRank(Int32 local_rank)
685{
686 // ATTENTION:
687 // This function must be reentrant...
688
689 auto sub_info = new SubInfo();
690 m_sub_infos[local_rank] = sub_info;
691
693 if (pas && m_has_sub_domain_threads) {
694 // CPU binding should only occur if requested and only if
695 // the total number of threads (across all processes)
696 // does not exceed the number of cores on the machine.
697 if (!platform::getEnvironmentVariable("ARCANE_BIND_THREADS").null()) {
698 ITraceMng* tm = _application()->traceMng();
699 tm->info() << "Binding threads";
700 pas->bindThread(local_rank);
701 }
702 }
703
704 // Creation of the parallelism manager for all allocated ranks.
705 IParallelSuperMng* psm = _application()->parallelSuperMng();
706 Ref<IParallelMng> world_pm = psm->internalCreateWorldParallelMng(local_rank);
707 sub_info->m_world_parallel_mng = world_pm;
708
709 if (!m_direct_test_name.null()) {
710 _execDirectTest(world_pm.get(), m_direct_test_name, true);
711 return;
712 }
713
714 // Checks if we want to run the calculation on a subset
715 // of the allocated resources. For now, it is only possible
716 // to choose a number of subdomains. If so, only
717 // ranks from 0 up to the desired number of subdomains minus 1 are
718 // used. Higher ranks do not have subdomains
719 // and instead use a service that implements IDirectExecution
720
721 // Creation of the execution statistics manager.
722 ITraceMng* trace = world_pm->traceMng();
723 String stat_name = "Rank";
724 stat_name = stat_name + world_pm->commRank();
725 ITimeStats* time_stat = _application()->mainFactory()->createTimeStats(world_pm->timerMng(), trace, stat_name);
726 sub_info->m_time_stats = time_stat;
727 time_stat->beginGatherStats();
728 world_pm->setTimeStats(time_stat);
729
730 Ref<IParallelMng> pm = world_pm;
731 Ref<IParallelMng> all_replica_pm = pm;
732
733 const Integer nb_wanted_sub_domain = _application()->applicationBuildInfo().nbProcessusSubDomain();
734 const Integer nb_wanted_replication = _application()->applicationBuildInfo().nbReplicationSubDomain();
735 // We are in parallel and we want fewer subdomains than allocated processes
736 if (world_pm->isParallel()) {
737 // For now, we cannot mix subdomain replication with
738 // a number of subdomains different from the number of allocated processors.
739 // TODO: when this is no longer the case, we will need to create an all_replica_pm that
740 // contains all subdomains and replicas.
741
742 if (nb_wanted_replication > 1) {
743 Int32 comm_size = world_pm->commSize();
744 Int32 nb_sub_part = comm_size / nb_wanted_replication;
745 trace->info() << "Using sub-domain replication nb_sub_part=" << nb_sub_part;
746 if ((comm_size % nb_wanted_replication) != 0)
747 ARCANE_FATAL("The number of replication '{0}' must be a common factor of the number of allocated cores '{1}",
748 nb_wanted_replication, comm_size);
749 // First, we create a communicator containing the replicas of each subdomain
750 // This communicator will therefore contain \a m_nb_wanted_replication objects
751 Ref<IParallelMng> replicate_pm;
752 trace->info() << "Building replicated parallel mng";
753 {
754 Int32UniqueArray kept_ranks(nb_wanted_replication);
755 for (Integer i_sd = 0; i_sd < nb_sub_part; ++i_sd) {
756 for (Int32 i = 0; i < nb_wanted_replication; ++i) {
757 kept_ranks[i] = i_sd + (i * nb_sub_part);
758 trace->info() << "Rank r=" << kept_ranks[i];
759 }
760 Ref<IParallelMng> new_pm = world_pm->createSubParallelMngRef(kept_ranks);
761 if (new_pm.get()) {
762 replicate_pm = new_pm;
763 replicate_pm->setTimeStats(time_stat);
764 trace->info() << " Building own replicated parallel mng";
765 }
766 else {
767 trace->info() << "!pm";
768 }
769 trace->flush();
770 }
771 }
772 if (!replicate_pm)
773 ARCANE_FATAL("Null replicated parallel mng");
774
775 // Now, we create an IParallelMng that corresponds to the set
776 // of ranks of a single replica. This IParallelMng will be assigned to
777 // the subdomain that will be created later.
778 trace->info() << "Building sub-domain parallel mng";
779 {
780 Int32UniqueArray kept_ranks(nb_sub_part);
781 for (Integer i_repl = 0; i_repl < nb_wanted_replication; ++i_repl) {
782 for (Int32 i = 0; i < nb_sub_part; ++i) {
783 kept_ranks[i] = i + (i_repl * nb_sub_part);
784 trace->info() << "Rank r=" << kept_ranks[i];
785 }
786 Ref<IParallelMng> new_pm = world_pm->createSubParallelMngRef(kept_ranks);
787 if (new_pm.get()) {
788 pm = new_pm;
789 if (nb_sub_part == 1) {
790 // We must take the sequential version to make the calculation
791 // appear sequential. This manager will be destroyed at the same time
792 // as \a new_pm
793 pm = new_pm->sequentialParallelMngRef();
794 }
795 trace->info() << "pm: setting time_stat & m_rank_parallel_mng for replica rank=" << i_repl;
796 trace->flush();
797 pm->setTimeStats(time_stat);
798 sub_info->m_rank_parallel_mng = new_pm;
799 auto pr = new ParallelReplication(i_repl, nb_wanted_replication, replicate_pm);
800 pm->setReplication(pr);
801 }
802 else {
803 trace->info() << "!pm";
804 trace->flush();
805 }
806 }
807 }
808 }
809 else if (nb_wanted_sub_domain != 0) {
810 const Int32 nb_sub_part = nb_wanted_sub_domain;
811 Int32UniqueArray kept_ranks(nb_sub_part);
812 for (Int32 i = 0; i < nb_sub_part; ++i)
813 kept_ranks[i] = i;
814 pm = world_pm->createSubParallelMngRef(kept_ranks);
815 if (pm.get()) {
816 trace->info() << "pm: setting time_stat & m_rank_parallel_mng";
817 trace->flush();
818 pm->setTimeStats(time_stat);
819 sub_info->m_rank_parallel_mng = pm;
820 all_replica_pm = pm;
821 }
822 else {
823 trace->info() << "!pm";
824 trace->flush();
825 }
826 }
827 }
828
829 bool print_stats = false;
830 ISubDomain* sub_domain = nullptr;
831
832 if (!pm) {
833 // If this is a rank that does not own a subdomain.
834 // In this case, execute the service given by 'm_idle_service_name'
835 // (if specified, otherwise do nothing)
836 trace->info() << "The rank doesn't own any subdomain!";
837 if (m_properties.m_idle_service_name.empty()) {
838 trace->info() << "No idle service specified";
839 trace->flush();
840 }
841 else {
842 trace->info() << "execDirectTest: " << m_properties.m_idle_service_name;
843 trace->flush();
844 _execDirectTest(world_pm.get(), m_properties.m_idle_service_name, false);
845 // We exit the directTest() via the broadcast(This is the end), so we must return
846 return;
847 }
848 print_stats = true;
849 }
850 else {
851 _createAndRunSubDomain(sub_info, pm, all_replica_pm, local_rank);
852 sub_domain = sub_info->m_sub_domain;
853 print_stats = sub_info->m_want_print_stats;
854 }
855
856 time_stat->endGatherStats();
857
858 if (print_stats && sub_domain) {
859 // Ensures everyone is here before stopping the profiling
860 // TODO: Since profiling is local to the process, it would be sufficient
861 // a priori to perform the barrier on the local IParallelMngs.
862 IParallelMng* pm = sub_domain->parallelMng();
863 pm->barrier();
864 if (local_rank == 0)
866 pm->barrier();
867 _printStats(sub_domain, trace, time_stat);
868
869 // We must destroy the shared memory variables here because their
870 // destruction is performed collectively.
871 // We cannot destroy all variables because some are
872 // used afterward (GlobalIteration, for example).
873 // If, one day, we put certain "Global" variables in shared memory,
874 // this part will cause problems.
875 sub_domain->variableMng()->_internalApi()->removeAllShMemVariables();
876 }
877
878 //BaseForm[Hash["This is the end", "CRC32"], 16]
879 // We inform the 'other' capabilities that they must leave now!
880 world_pm->broadcast(UniqueArray<unsigned long>(1, 0xdfeb699fl).view(), 0);
881}
882
883/*---------------------------------------------------------------------------*/
884/*---------------------------------------------------------------------------*/
885
886void ArcaneMainBatch::SessionExec::
887_createAndRunSubDomain(SubInfo* sub_info, Ref<IParallelMng> pm, Ref<IParallelMng> all_replica_pm, Int32 local_rank)
888{
889 // This is a rank that has a subdomain.
890 // It is created and execution begins.
891 SubDomainBuildInfo sdbi(pm, local_rank, all_replica_pm);
892 sdbi.setCaseFileName(m_case_file);
893 sdbi.setCaseContent(m_case_bytes);
894 ISubDomain* sub_domain = m_code_service->createAndLoadCase(m_session, sdbi);
895 sub_info->m_sub_domain = sub_domain;
896
897 ITraceMng* trace = _application()->traceMng();
898 ITraceMng* sd_trace = sub_domain->traceMng();
899 ITraceMngPolicy* trace_policy = _application()->getTraceMngPolicy();
900
901 // In case of replication, disable the output curves
902 // of the replicas.
903 trace->info() << "REPLICATION: rank=" << pm->replication()->replicationRank();
904
905 if (!pm->replication()->isMasterRank()) {
906 trace->info() << "Disable output curves for replicates.";
907 sub_domain->timeHistoryMng()->setDumpActive(false);
908 }
909
910 // TODO:
911 // Destroy the subdomain at the end of the function, but this requires
912 // modifying ISession to support the deletion
913 // of a subdomain (and then destroying ISession).
914
915 IProcessorAffinityService* pas = platform::getProcessorAffinityService();
916 if (pas) {
917 String cpu_set = pas->cpuSetString();
918 trace->info() << " CpuSet=" << cpu_set;
919 }
920
921 if (m_arcane_main->m_check_case_only) {
922 trace->info() << "Checking the input data";
923 // Initializes the time loop modules
924 {
925 TimeLoopReader stl(_application());
926 stl.readTimeLoops();
927 stl.registerTimeLoops(sub_domain);
928 stl.setUsedTimeLoop(sub_domain);
929 }
930 ICaseMng* cm = sub_domain->caseMng();
931 cm->readOptions(true);
932 }
933 else {
934 Timer init_timer(sub_domain, "InitTimer", Timer::TimerReal);
935 Timer loop_timer(sub_domain, "LoopTimer", Timer::TimerReal);
936
937 {
938 Timer::Action ts_action(sub_domain, "Init");
939 Timer::Sentry ts(&init_timer);
940
941 m_code_service->initCase(sub_domain, m_properties.m_is_continue);
942 }
943
944 if (m_properties.m_max_iteration > 0)
945 trace->info() << "Option 'max_iteration' activated with " << m_properties.m_max_iteration;
946
947 // Redirects signals.
948 // This is also done at initialization but here we might be in another
949 // thread and some libraries might have redirected signals
950 // during the init
951 {
952 CriticalSection cs(pm->threadMng());
953 ArcaneMain::redirectSignals();
954 }
955 int ret_compute_loop = 0;
956
957 IDirectExecution* direct_exec = sub_domain->directExecution();
958 if (direct_exec && direct_exec->isActive()) {
959 trace->info() << "Direct execution activated";
960 direct_exec->execute();
961 }
962 else if (m_arcane_main->m_init_only) {
963 trace->info() << "Option 'init_only' activated";
964 sub_info->m_want_print_stats = true;
965 }
966 else {
967 sub_info->m_want_print_stats = true;
968 Timer::Action ts_action(sub_domain, "Loop");
969 Timer::Sentry ts(&loop_timer);
970 // During the calculation loop, do not force the display of traces at a given level
971 // (which is done during application initialization.
972 trace_policy->setDefaultVerboseLevel(sd_trace, Trace::UNSPECIFIED_VERBOSITY_LEVEL);
973 if (m_direct_sub_domain_execute_functor) {
974 m_direct_sub_domain_execute_functor->setSubDomain(sub_domain);
975 m_direct_sub_domain_execute_functor->execute();
976 sub_domain->parallelMng()->barrier();
977 }
978 else {
979 ret_compute_loop = sub_domain->timeLoopMng()->doComputeLoop(m_properties.m_max_iteration);
980 if (ret_compute_loop < 0)
981 //TODO: DO NOT FILL THIS FUNCTION DIRECTLY BECAUSE IT DOES NOT WORK
982 // IN MULTI-THREAD
983 m_arcane_main->setErrorCode(8);
984 }
985 }
986 {
987 Real init_time = init_timer.totalTime();
988 Real loop_time = loop_timer.totalTime();
989 trace->info(0) << "TotalReel = " << (init_time + loop_time)
990 << " seconds (init: "
991 << init_time << " loop: " << loop_time << " )";
992 }
993 {
994 Timer::Action ts_action(sub_domain, "Exit");
995 trace_policy->setDefaultVerboseLevel(sd_trace, Trace::DEFAULT_VERBOSITY_LEVEL);
996 sub_domain->doExitModules();
997 }
998 }
999}
1000
1001/*---------------------------------------------------------------------------*/
1002/*---------------------------------------------------------------------------*/
1003
1004void ArcaneMainBatch::SessionExec::
1005_printStats(ISubDomain* sub_domain, ITraceMng* trace, ITimeStats* time_stat)
1006{
1007 ExecutionStatsDumper exec_dumper(trace);
1008 exec_dumper.dumpStats(sub_domain, time_stat);
1009}
1010
1011/*---------------------------------------------------------------------------*/
1012/*---------------------------------------------------------------------------*/
1013
1014void ArcaneMainBatch::SessionExec::
1015_execDirectTest(IParallelMng* world_pm, const String& test_name, bool is_collective)
1016{
1017 ITraceMng* trace = world_pm->traceMng();
1018 trace->info() << "Direct test name=" << test_name;
1019 trace->flush();
1020 ServiceFinder2T<IDirectExecution, IApplication> sf(_application(), _application());
1021 Ref<IDirectExecution> exec(sf.createReference(test_name));
1022 if (!exec) {
1023 String msg = String::format("Can not find 'IDirectExecution' service name '{0}'", test_name);
1024 if (is_collective)
1025 throw ParallelFatalErrorException(A_FUNCINFO, msg);
1026 else
1027 throw FatalErrorException(A_FUNCINFO, msg);
1028 }
1029 else {
1030 trace->info() << "Begin execution of direct service";
1031 trace->flush();
1032 }
1033 exec->setParallelMng(world_pm);
1034 exec->execute();
1035}
1036
1037/*---------------------------------------------------------------------------*/
1038/*---------------------------------------------------------------------------*/
1039
1041finalize()
1042{
1043 if (m_session) {
1044 m_session->endSession(errorCode());
1045 _application()->removeSession(m_session);
1046 delete m_session;
1047 m_session = nullptr;
1048 }
1049 m_code_service.reset();
1050 delete m_session_exec;
1051 m_session_exec = nullptr;
1052
1053 ITraceMng* tm = _application()->traceMng();
1055}
1056
1057/*---------------------------------------------------------------------------*/
1058/*---------------------------------------------------------------------------*/
1059
1061doAbort()
1062{
1063 if (m_session)
1064 m_session->doAbort();
1065 else {
1066 // To finish cleanly even if stopped before session creation
1067 // or after session destruction.
1069 if (psm)
1070 psm->tryAbort();
1071 }
1072}
1073
1074/*---------------------------------------------------------------------------*/
1075/*---------------------------------------------------------------------------*/
1076
1077template <typename V> void ArcaneMainBatchProperties::
1078_applyPropertyVisitor(V& p)
1079{
1080 auto b = p.builder();
1081
1082 p << b.addInt32("MaxIteration")
1083 .addDescription("Maximum number of iteration")
1084 .addCommandLineArgument("MaxIteration")
1085 .addGetter([](auto a) { return a.x.m_max_iteration; })
1086 .addSetter([](auto a) { a.x.m_max_iteration = a.v; });
1087
1088 p << b.addBool("Continue")
1089 .addDescription("True if continue from previous execution (restart)")
1090 .addCommandLineArgument("Continue")
1091 .addGetter([](auto a) { return a.x.m_is_continue; })
1092 .addSetter([](auto a) { a.x.m_is_continue = a.v; });
1093
1094 p << b.addString("IdleService")
1095 .addDescription("Name of the idle service for additionnal cores")
1096 .addCommandLineArgument("IdleService")
1097 .addGetter([](auto a) { return a.x.m_idle_service_name; })
1098 .addSetter([](auto a) { a.x.m_idle_service_name = a.v; });
1099}
1100
1101/*---------------------------------------------------------------------------*/
1102/*---------------------------------------------------------------------------*/
1103
1104ARCANE_REGISTER_PROPERTY_CLASS(ArcaneMainBatchProperties, ());
1105
1106/*---------------------------------------------------------------------------*/
1107/*---------------------------------------------------------------------------*/
1108
1109} // End namespace Arcane
1110
1111/*---------------------------------------------------------------------------*/
1112/*---------------------------------------------------------------------------*/
#define ARCANE_THROW(exception_class,...)
Macro for throwing an exception with formatting.
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
static void finalize(ITraceMng *tm)
Finalizes execution.
Definition Runner.cc:518
static void stopAllProfiling()
Stops all profiling activities.
Definition Runner.cc:506
CaseDatasetSource & caseDatasetSource()
Dataset source.
Application information.
Properties associated with ArcaneMain.
String m_idle_service_name
Service name for unused CPUs.
void executeFunctor() override
Executes the associated method.
Info per subdomain that must be destroyed at the end of execution.
Execution information for a session.
Ref< ICodeService > m_code_service
Code service.
bool m_has_sub_domain_threads
indicates if threads are used to manage subdomains
UniqueArray< std::byte > m_case_bytes
Content of the case dataset as an XML document.
String m_case_file
Name of the file containing the case.
const ArcaneMainBatchProperties m_properties
Execution properties.
Batch execution of a code.
String m_case_name
Case name.
void doAbort() override
Performs an abort.
Ref< ICodeService > m_code_service
Code service.
bool m_init_only
true if only initialization is performed.
ArcaneMainBatchProperties m_properties
Session.
void initialize() override
Initializes the instance. The instance is not usable until this method has been called.
int execute() override
Starts execution. This method only returns when the program exits.
bool m_check_case_only
true if only dataset verification is performed.
void finalize() override
Performs the last operations before instance destruction.
bool m_has_sub_domain_threads
indicates if threads are used to manage subdomains
void build() override
Constructs the class members. The instance is not usable until this method has been called....
bool parseArgs(StringList args) override
Parses arguments.
const ApplicationInfo & applicationInfo() const override
Executable information.
static int callFunctorWithCatchedException(IFunctor *functor, IArcaneMain *amain, bool *clean_abort, bool is_print=true)
Calls the functor functor while catching possible exceptions.
IApplication * application() const override
Application.
Definition ArcaneMain.h:332
void build() override
Constructs the class members. The instance is not usable until this method has been called....
const ApplicationBuildInfo & applicationBuildInfo() const override
Information to build the IApplication instance.
int errorCode() const override
Execution error code.
Definition ArcaneMain.h:322
void initialize() override
Initializes the instance. The instance is not usable until this method has been called.
bool parseArgs(StringList args) override
Parses arguments.
Source of a case dataset.
void setFileName(const String &name)
Sets the file name of the dataset.
String fileName() const
File name of the dataset.
void setContent(Span< const std::byte > bytes)
Sets the content of the dataset.
ByteConstSpan content() const
Content of the dataset.
Integer count() const
Number of elements in the collection.
Application interface.
virtual IParallelSuperMng * parallelSuperMng()=0
Supervisory parallelism manager.
Interface of the code management class.
Definition IArcaneMain.h:55
virtual ITraceMng * traceMng() const =0
Trace manager.
Interface of a functor to execute code directly after the creation of a subdomain without going throu...
Interface of the input/output manager.
Definition IIOMng.h:37
virtual bool collectiveRead(const String &filename, ByteArray &bytes)=0
Collective reading of a file.
Factory for Arcane classes.
Interface of the parallelism manager for a subdomain.
Abstract class of the parallelism supervisor.
virtual void tryAbort()=0
Attempts to abort.
virtual Int32 nbLocalSubDomain()=0
Number of subdomains to create locally.
virtual Int32 commSize() const =0
Returns the total number of processes used.
Interface of a CPU core affinity management service.
virtual void bindThread(Int32 cpu)=0
Constrains the current thread to stay on the core with index cpu.
Interface for a case execution session.
Definition ISession.h:38
Interface of the subdomain manager.
Definition ISubDomain.h:75
Interface managing execution time statistics.
Definition ITimeStats.h:44
virtual TraceMessage pfatal()=0
Stream for a parallel fatal error message.
virtual TraceMessage info()=0
Stream for an information message.
Exception when a 'parallel' fatal error is generated.
Reference to an instance.
bool empty() const
True if the string is empty (null or "").
Definition String.cc:317
@ TimerReal
Timer using real time.
Definition Timer.h:77
1D data vector with value semantics (STL style).
Integer toInteger(const char *str, bool *is_ok=0)
Converts the string str to an unsigned integer. If is_ok is not null, it is set to true if the conver...
IProcessorAffinityService * getProcessorAffinityService()
Service used for managing processor affinity.
String getEnvironmentVariable(const String &name)
Environment variable named name.
-- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature --
Int32 Integer
Type representing an integer.
List< String > StringList
Unicode string list.
Definition UtilsTypes.h:509
UniqueArray< Int32 > Int32UniqueArray
Dynamic 1D array of 32-bit integers.
Definition UtilsTypes.h:341
double Real
Type representing a real number.
std::int32_t Int32
Signed integer type of 32 bits.