Arcane  v3.16.2.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
ArcaneMainBatch.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* ArcaneMainBatch.cc (C) 2000-2025 */
9/* */
10/* Gestion de l'exécution en mode Batch. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/Iostream.h"
15#include "arcane/utils/Ptr.h"
16#include "arcane/utils/StdHeader.h"
17#include "arcane/utils/PlatformUtils.h"
18#include "arcane/utils/List.h"
19#include "arcane/utils/ApplicationInfo.h"
20#include "arcane/utils/NotSupportedException.h"
21#include "arcane/utils/FatalErrorException.h"
22#include "arcane/utils/String.h"
23#include "arcane/utils/ITraceMng.h"
24#include "arcane/utils/OStringStream.h"
25#include "arcane/utils/IMemoryInfo.h"
26#include "arcane/utils/Array.h"
27#include "arcane/utils/IFunctor.h"
28#include "arcane/utils/StringBuilder.h"
29#include "arcane/utils/ScopedPtr.h"
30#include "arcane/utils/ValueConvert.h"
31#include "arcane/utils/IProcessorAffinityService.h"
32#include "arcane/utils/ArgumentException.h"
33#include "arcane/utils/CStringUtils.h"
34#include "arcane/utils/ITraceMngPolicy.h"
35#include "arcane/utils/Property.h"
36#include "arcane/utils/ParameterListPropertyReader.h"
37#include "arcane/utils/CommandLineArguments.h"
38#include "arcane/utils/CriticalSection.h"
39
40#include "arcane/impl/ArcaneMain.h"
41#include "arcane/impl/ParallelReplication.h"
42
43#include "arcane/IIOMng.h"
44#include "arcane/ICodeService.h"
45#include "arcane/ISession.h"
46#include "arcane/Timer.h"
47#include "arcane/ISubDomain.h"
48#include "arcane/IApplication.h"
49#include "arcane/ITimeLoopMng.h"
50#include "arcane/ITimeStats.h"
51#include "arcane/SequentialSection.h"
52#include "arcane/IParallelSuperMng.h"
53#include "arcane/ITimeHistoryMng.h"
54#include "arcane/IDirectExecution.h"
55#include "arcane/IDirectSubDomainExecuteFunctor.h"
56#include "arcane/ICaseMng.h"
57#include "arcane/ServiceFinder2.h"
58#include "arcane/SubDomainBuildInfo.h"
59#include "arcane/IParallelMng.h"
60#include "arcane/IMainFactory.h"
61#include "arcane/ApplicationBuildInfo.h"
62#include "arcane/CaseDatasetSource.h"
63
64#include "arcane/ServiceUtils.h"
65
66#include "arcane/impl/ExecutionStatsDumper.h"
67#include "arcane/impl/TimeLoopReader.h"
68
69#include "arcane/accelerator/core/internal/RunnerInternal.h"
70
71#include <thread>
72
73/*---------------------------------------------------------------------------*/
74/*---------------------------------------------------------------------------*/
75
76namespace Arcane
77{
78
79/*---------------------------------------------------------------------------*/
80/*---------------------------------------------------------------------------*/
81
84{
85 ARCANE_DECLARE_PROPERTY_CLASS(ArcaneMainBatchProperties);
86 public:
87 Int32 m_max_iteration = 0;
88 bool m_is_continue = false;
90};
91
92/*---------------------------------------------------------------------------*/
93/*---------------------------------------------------------------------------*/
97class ArcaneMainBatch
98: public ArcaneMain
99{
100 public:
101
103 class SessionExec
104 {
106 class SubInfo
107 {
108 public:
109 SubInfo()
110 : m_sub_domain(nullptr), m_time_stats(nullptr), m_want_print_stats(false) {}
111 ~SubInfo()
112 {
113 // Il faut d'abord détruire la ITimeStats car il utilise
114 // les TimerMng des IParallelMng.
115 delete m_time_stats;
116 //delete m_rank_parallel_mng;
117 //m_world_parallel_mng.reset();
118 // Le sous-domaine est détruit lorsque la session se termine
119 }
120 SubInfo(const SubInfo&) = delete;
121 void operator=(const SubInfo&) = delete;
122 public:
123 Ref<IParallelMng> m_world_parallel_mng;
124 Ref<IParallelMng> m_rank_parallel_mng;
125 ISubDomain* m_sub_domain;
126 ITimeStats* m_time_stats;
127 bool m_want_print_stats;
128 };
129
130 public:
131 SessionExec(ArcaneMainBatch* arcane_main,ISession* session,Int32 nb_local_rank)
132 : m_arcane_main(arcane_main),
133 m_session(session),
135 m_direct_test_name(m_arcane_main->m_direct_test_name),
136 m_properties(m_arcane_main->m_properties),
137 m_code_service(m_arcane_main->m_code_service),
138 m_sub_infos(nb_local_rank),
139 m_direct_sub_domain_execute_functor(m_arcane_main->_directExecuteFunctor())
140 {
141 const CaseDatasetSource& dataset_source = m_arcane_main->applicationBuildInfo().caseDatasetSource();
142 m_case_file = dataset_source.fileName();
143 m_case_bytes = dataset_source.content();
144 // Les sub_infos de chaque thread sont créés executeRank()
145 m_sub_infos.fill(nullptr);
146 }
148 {
149 for( Integer i=0, n=m_sub_infos.size(); i<n; ++i )
150 delete m_sub_infos[i];
151 }
152 public:
153 // Collective sur les threads du processus
154 void executeRank(Int32 local_rank);
155 private:
156 IApplication* _application() { return m_arcane_main->application(); }
157 private:
158 ArcaneMainBatch* m_arcane_main;
159 ISession* m_session;
161 String m_direct_test_name;
166 UniqueArray<SubInfo*> m_sub_infos;
167 IDirectSubDomainExecuteFunctor* m_direct_sub_domain_execute_functor;
168 private:
169 void _execDirectTest(IParallelMng* pm,const String& test_name,bool is_collective);
170 void _printStats(ISubDomain* sd,ITraceMng* trace,ITimeStats* time_stat);
171 void _createAndRunSubDomain(SubInfo* sub_info,Ref<IParallelMng> pm,Ref<IParallelMng> all_replica_pm,Int32 local_rank);
172 };
173
174 class ExecFunctor
175 : public IFunctor
176 {
177 public:
178 ExecFunctor(SessionExec* session_exec,Int32 local_rank)
179 : m_session_exec(session_exec), m_local_rank(local_rank)
180 {
181 }
182 public:
183 void executeFunctor() override
184 {
185 m_session_exec->executeRank(m_local_rank);
186 }
187 private:
188 SessionExec* m_session_exec;
189 Int32 m_local_rank;
190 };
191
192 public:
193
195 ~ArcaneMainBatch() override;
196
197 void build() override;
198 void initialize() override;
199 int execute() override;
200 void doAbort() override;
201 bool parseArgs(StringList args) override;
202 void finalize() override;
203
204 private:
205
206 ISession* m_session = nullptr;
212 String m_direct_exec_name;
213 String m_direct_test_name;
215 SessionExec* m_session_exec = nullptr;
216
217 private:
218
219 bool _sequentialParseArgs(StringList args);
220};
221
222/*---------------------------------------------------------------------------*/
223/*---------------------------------------------------------------------------*/
224
225extern "C++" ARCANE_IMPL_EXPORT IArcaneMain*
226createArcaneMainBatch(const ApplicationInfo& app_info,IMainFactory* main_factory)
227{
228 return new ArcaneMainBatch(app_info,main_factory);
229}
230
231/*---------------------------------------------------------------------------*/
232/*---------------------------------------------------------------------------*/
233
234ArcaneMainBatch::
235ArcaneMainBatch(const ApplicationInfo& exe_info,IMainFactory* main_factory)
236: ArcaneMain(exe_info,main_factory)
237, m_init_only(false)
238, m_check_case_only(false)
239, m_has_sub_domain_threads(false)
240{
241}
242
243/*---------------------------------------------------------------------------*/
244/*---------------------------------------------------------------------------*/
245
247build()
248{
250}
251
252/*---------------------------------------------------------------------------*/
253/*---------------------------------------------------------------------------*/
254
260
261/*---------------------------------------------------------------------------*/
262/*---------------------------------------------------------------------------*/
263
264ArcaneMainBatch::
265~ArcaneMainBatch()
266{
267 // Normalement finalize() doit avoir été appelé pour libérer les
268 // différents objets (m_session, m_code_service, ...).
269 // Si ce n'est pas le cas, c'est probablement du à une exception et dans
270 // ce cas on ne fait rien pour éviter de détruire des objets dont on ne
271 // connait pas trop l'état interne.
272}
273
274/*---------------------------------------------------------------------------*/
275/*---------------------------------------------------------------------------*/
276
279{
280 if (ArcaneMain::parseArgs(args))
281 return true;
282
283 bool r = _sequentialParseArgs(args);
284 return r;
285}
286
287
288/*****************************************************************************
289 * Les variables ARCANE_NB_SUB_DOMAIN & ARCANE_IDLE_SERVICE sont moins prioritaires
290 * que les arguments passés à l'exécutable.
291 *****************************************************************************/
292bool ArcaneMainBatch::
293_sequentialParseArgs(StringList args)
294{
295 ITraceMng* trace = _application()->traceMng();
296
297 String us_arcane_opt("-arcane_opt");
298 String us_init_only("init_only");
299 String us_check_case_only("check_case_only");
300 String us_continue("continue");
301 String us_max_iteration("max_iteration");
302 String us_casename("casename");
303 String us_direct_exec("direct_exec");
304 String us_direct_test("direct_test");
305 String us_direct_mesh("direct_mesh");
306 String us_tool_arg("tool_arg");
307 String us_direct_exec_mesh_arg("direct_exec_mesh_arg");
308 String us_nb_sub_domain("nb_sub_domain");
309 String us_nb_replication("nb_replication");
310 String us_idle_service("idle_service");
311
312 // Remplit 'm_properties' en fonction des paramètres de la ligne de commande
313 // TODO: Ce mécanisme est disponible depuis janvier 2021. A terme, il faudra
314 // rendre obsolète et supprimer la possibilité de spécifier les options
315 // via '-arcane_opt'.
316 properties::readFromParameterList(applicationInfo().commandLineArguments().parameters(),m_properties);
317
318 CaseDatasetSource& dataset_source = _applicationBuildInfo().caseDatasetSource();
319 // Indique si on a un jeu de données.
320 bool has_case_dataset_content = !(dataset_source.fileName().empty() && dataset_source.content().empty());
321 Integer nb_arg = args.count();
322 if (nb_arg<2 && !has_case_dataset_content){
323 trace->info() << "Usage: programm input_data ; for more information: program -arcane_opt help";
324 trace->pfatal() << "No input data specified.";
325 }
326
327 StringList unknown_args;
328 StringBuilder tool_args_xml;
329 StringBuilder direct_exec_mesh_args_xml;
330 String tool_mesh;
331
332 String nb_sub_domain_str;
333 String nb_replication_str;
334 String idle_service_name = platform::getEnvironmentVariable("ARCANE_IDLE_SERVICE");
335 if (!idle_service_name.null())
336 m_properties.m_idle_service_name = idle_service_name;
337
338 for( Integer i=1, s=nb_arg-1; i<s; ++i ){
339 // cerr << "** ARGS ARGS " << i << ' ' << args[i] << '\n';
340 if (args[i]!=us_arcane_opt){
341 unknown_args.add(args[i]);
342 continue;
343 }
344 bool is_valid_opt = false;
345 ++i;
346 String str;
347 if (i<s)
348 str = args[i];
349 if (str==us_init_only){
350 m_init_only = true;
351 is_valid_opt = true;
352 }
353 else if (str==us_check_case_only){
354 m_check_case_only = true;
355 is_valid_opt = true;
356 }
357 else if (str==us_continue){
358 m_properties.m_is_continue = true;
359 is_valid_opt = true;
360 }
361 else if (str==us_max_iteration){
362 ++i;
363 if (i<s){
364 m_properties.m_max_iteration = CStringUtils::toInteger(args[i].localstr());
365 //cerr << "** MAX ITER " << m_max_iteration << '\n';
366 is_valid_opt = true;
367 }
368 else
369 trace->pfatal() << "Option 'max_iteration' must specify the number of iterations";
370 }
371 // Nom du cas.
372 else if (str==us_casename){
373 ++i;
374 if (i<s){
375 m_case_name = args[i];
376 is_valid_opt = true;
377 }
378 }
379 else if (str==us_direct_exec){
380 ++i;
381 if (i<s){
382 m_direct_exec_name = args[i];
383 //trace->info()<<"[ArcaneMainBatch] m_direct_exec_name="<<args[i];
384 is_valid_opt = true;
385 }
386 }
387 else if (str==us_direct_test){
388 ++i;
389 if (i<s){
390 m_direct_test_name = args[i];
391 //trace->info()<<"[ArcaneMainBatch] m_direct_test_name="<<args[i];
392 is_valid_opt = true;
393 }
394 }
395 else if (str==us_tool_arg || str==us_direct_exec_mesh_arg){
396 ++i;
397 String arg;
398 String value;
399 if (i<s){
400 arg = args[i];
401 }
402 ++i;
403 if (i<s){
404 value = args[i];
405 is_valid_opt = true;
406 String to_add = String::format("<{0}>{1}</{2}>\n",arg,value,arg);
407 if (str==us_tool_arg)
408 tool_args_xml += to_add;
409 else if (str==us_direct_exec_mesh_arg)
410 direct_exec_mesh_args_xml += to_add;
411 }
412 }
413 else if (str==us_nb_sub_domain){
414 ++i;
415 if (i<s){
416 nb_sub_domain_str = args[i];
417 //trace->info()<<"[ArcaneMainBatch] nb_sub_domain_str="<<args[i];
418 is_valid_opt = true;
419 }
420 }
421 else if (str==us_nb_replication){
422 ++i;
423 if (i<s){
424 nb_replication_str = args[i];
425 //trace->info()<<"[ArcaneMainBatch] nb_sub_domain_str="<<args[i];
426 is_valid_opt = true;
427 }
428 }
429 else if (str==us_idle_service){
430 ++i;
431 if (i<s){
432 m_properties.m_idle_service_name = args[i];
433 //trace->info()<<"[ArcaneMainBatch] m_idle_service_name="<<args[i];
434 is_valid_opt = true;
435 }
436 }
437 if (!is_valid_opt){
438 trace->pfatal() << "Unknown Arcane option <" << str << ">\n";
439 }
440 }
441
442 bool use_direct_test = (!m_direct_test_name.null());
443 bool use_direct_exec = (!m_direct_exec_name.null());
444
445 if (use_direct_test){
446 }
447 else if (use_direct_exec){
448 // Dans ce cas, le dernier argument de la ligne de commande est
449 // le nom du maillage.
450 tool_mesh = args[nb_arg-1];
451 dataset_source.setFileName("Dummy.arc");
452 }
453 else{
454 // Le nom du cas est contenu dans le dernier argument de la ligne
455 // de commande. On prend cet argument sauf si un nom de fichier
456 // a déjà été positionné avant d'initialiser Arcane.
457 if (dataset_source.fileName().empty() && dataset_source.content().empty())
458 dataset_source.setFileName(args[nb_arg-1]);
459 }
460
461 if (!nb_sub_domain_str.null()){
462 Int32 nb_sub_domain = 0;
463 bool is_bad = builtInGetValue(nb_sub_domain,nb_sub_domain_str);
464 if (is_bad || nb_sub_domain<=0){
465 trace->pfatal() << "Invalid number of subdomains : " << nb_sub_domain;
466 }
467 trace->info() << "Use '" << nb_sub_domain << "' subdomains";
468 _applicationBuildInfo().setNbProcessusSubDomain(nb_sub_domain);
469 }
470
471 if (!nb_replication_str.null()){
472 Int32 nb_replication = 0;
473 bool is_bad = builtInGetValue(nb_replication,nb_replication_str);
474 if (is_bad || nb_replication<0){
475 trace->pfatal() << "Invalid number of replication : " << nb_replication;
476 }
477 trace->info() << "Use replication of subdomains nb_replication=" << nb_replication;
478 _applicationBuildInfo().setNbReplicationSubDomain(nb_replication);
479 }
480
481 if (_applicationBuildInfo().nbReplicationSubDomain()!=0 && _applicationBuildInfo().nbProcessusSubDomain()!=0)
482 trace->pfatal() << "The subdomains number of replication and restriction options are incompatible.";
483
484 if (!use_direct_test){
485 String case_file = dataset_source.fileName();
486 //trace->info()<<"[ArcaneMainBatch] !use_direct_test, getCodeService";
487 m_code_service = _application()->getCodeService(case_file);
488
489 if (!m_code_service){
490 trace->info() << "The file `" << case_file << "' is not a known file type.";
491 case_file = args[nb_arg-2];
492
493 m_code_service = _application()->getCodeService(case_file);
494 if (!m_code_service){
495 trace->pfatal() << "File extension not valid.";
496 }
497 }
498 }
499
500 if (use_direct_exec){
501 //trace->info()<<"[ArcaneMainBatch] use_direct_test!";
502 // Analyse les arguments qui correspondent aux options d'exécution directes
503 // et construit un fichier xml à partir de la.
504 StringBuilder s;
505 s += "<?xml version=\"1.0\"?>\n";
506 s += "<case codename=\"ArcaneDriver\" xml:lang=\"en\" codeversion=\"1.0\">";
507 s += " <arcane>\n";
508 s += " <title>DirectExec</title>\n";
509 s += " <description>DirectExec</description>\n";
510 s += " <timeloop>ArcaneDirectExecutionLoop</timeloop>\n";
511 s += " </arcane>\n";
512 s += " <meshes>\n";
513 s += " <mesh>\n";
514 s += String::format(" <filename>{0}</filename>\n",tool_mesh);
515 s += direct_exec_mesh_args_xml;
516 s += " </mesh>\n";
517 s += " </meshes>\n";
518 s += " <arcane-direct-execution>\n";
519 s += String::format(" <tool name='{0}'>\n",m_direct_exec_name);
520 s += tool_args_xml;
521 s += " </tool>\n";
522 s += " </arcane-direct-execution>\n";
523 s += "</case>\n";
524 dataset_source.setFileName("(None)");
525 String buf = s;
526 dataset_source.setContent(buf.utf8());
527 trace->info() << "Direct exec xml file=" << s;
528 }
529
530 if (m_code_service.get()){
531 bool is_bad = m_code_service->parseArgs(unknown_args);
532 if (is_bad)
533 return true;
534 }
535
536 if (!unknown_args.empty()){
537 trace->info()<< "Unknown command line option: " << unknown_args[0];
538 }
539
540 return false;
541}
542
543/*---------------------------------------------------------------------------*/
544/*---------------------------------------------------------------------------*/
545
546namespace
547{
548struct LaunchThreadInfo
549{
550 ArcaneMainBatch* arcane_main;
551 ArcaneMainBatch::SessionExec* session_exec;
552 IApplication* application;
553 Int32 thread_index;
554};
555}
556
557/*---------------------------------------------------------------------------*/
558/*---------------------------------------------------------------------------*/
559/*
560 * Cette fonction est celle appelée lors de la création d'un thread.
561 */
562void
563_ThreadWrapper(LaunchThreadInfo* lti)
564{
565 ArcaneMainBatch* amb = lti->arcane_main;
566 IApplication* main_app = lti->application;
567 ArcaneMainBatch::ExecFunctor functor(lti->session_exec,lti->thread_index);
568 bool clean_abort = false;
569 bool is_master = lti->thread_index == 0;
570 int r = ArcaneMain::callFunctorWithCatchedException(&functor,amb,&clean_abort,is_master);
571 if (r!=0 && !clean_abort){
572 // Le thread est terminé mais comme il est le seul à avoir planté,
573 // il est possible que les autres soient bloqués.
574 // Dans ce cas, on fait un abort pour éviter un blocage
575 // TODO: essayer de tuer les autres threads correctement.
576 if (main_app){
577 IParallelSuperMng* psm = main_app->parallelSuperMng();
578 psm->tryAbort();
579 }
580 }
581}
582
583/*---------------------------------------------------------------------------*/
584/*---------------------------------------------------------------------------*/
585
587execute()
588{
589 ITraceMng* trace = _application()->traceMng();
590
591 if (m_code_service.get())
592 m_session = m_code_service->createSession();
593
594 IParallelSuperMng* psm = _application()->parallelSuperMng();
595 Int32 nb_total_rank = psm->commSize();
596 const Integer nb_wanted_sub_domain = applicationBuildInfo().nbReplicationSubDomain();
597 CaseDatasetSource& dataset_source = _applicationBuildInfo().caseDatasetSource();
598 if (nb_wanted_sub_domain>nb_total_rank)
599 ARCANE_THROW(ArgumentException,"Number of subdomain '{0}' > number of allocated cores '{1}",
600 nb_wanted_sub_domain,nb_total_rank);
601
602 Integer nb_local_rank = psm->nbLocalSubDomain();
603 trace->info() << "NB_LOCAL_RANK=" << nb_local_rank;
604 if (nb_local_rank>=1)
606 int return_value = 0;
607
608 // Lecture des données du jeu de données.
609 if (dataset_source.content().empty() && m_direct_test_name.null()){
610 String case_file = dataset_source.fileName();
611 trace->info() << "Reading input data '" << case_file << "'";
612 IIOMng* io_mng = _application()->ioMng();
613 UniqueArray<Byte> case_bytes;
614 bool is_bad = io_mng->collectiveRead(case_file,case_bytes);
615 if (is_bad)
616 ARCANE_THROW(ParallelFatalErrorException,"Cannot read input data file '{0}'",case_file);
617 dataset_source.setContent(case_bytes);
618 }
619
620 m_session_exec = new SessionExec(this,m_session,nb_local_rank);
621
622 UniqueArray<LaunchThreadInfo> thinfo(nb_local_rank);
623 for( Integer i=0; i<nb_local_rank; ++i ){
624 thinfo[i].arcane_main = this;
625 thinfo[i].session_exec = m_session_exec;
626 thinfo[i].application = _application();
627 thinfo[i].thread_index = i;
628 }
629
630 if (nb_local_rank>1){
631 UniqueArray<std::thread*> gths(nb_local_rank);
632 for( Integer i=0; i<nb_local_rank; ++i ){
633 gths[i] = new std::thread(_ThreadWrapper,&thinfo[i]);
634 }
635 for( Integer i=0; i<nb_local_rank; ++i ){
636 gths[i]->join();
637 delete gths[i];
638 }
639 }
640 else{
642 m_session_exec->executeRank(0);
643 }
644
645 // TODO: supprimer car inutile car vaut toujours 0.
646 return return_value;
647}
648
649/*---------------------------------------------------------------------------*/
650/*---------------------------------------------------------------------------*/
651/*
652 * En mode avec un sous-domaine par thread,cette fonction est appelée
653 * par chaque thread (de manière potentiellement concurrente) pour son sous-domaine.
654 * \a local_rank indique le rang local du thread, qui est compris
655 * entre 0 et \a nb_local_sub_domain (tèl que définit dans execute()).
656 */
657void ArcaneMainBatch::SessionExec::
658executeRank(Int32 local_rank)
659{
660 // ATTENTION:
661 // Cette fonction doit etre reentrente...
662
663 auto sub_info = new SubInfo();
664 m_sub_infos[local_rank] = sub_info;
665
667 if (pas && m_has_sub_domain_threads){
668 // Il ne faut binder les CPU que si demandé et uniquement si
669 // le nombre de threads au total (sur l'ensemble des processus)
670 // ne dépasse pas le nombre de coeur de la machine.
671 if (!platform::getEnvironmentVariable("ARCANE_BIND_THREADS").null()){
672 ITraceMng* tm = _application()->traceMng();
673 tm->info() << "Binding threads";
674 pas->bindThread(local_rank);
675 }
676 }
677
678 // Création du gestionnaire de parallélisme pour l'ensemble des rangs alloués.
679 IParallelSuperMng* psm = _application()->parallelSuperMng();
680 Ref<IParallelMng> world_pm = psm->internalCreateWorldParallelMng(local_rank);
681 sub_info->m_world_parallel_mng = world_pm;
682
683 if (!m_direct_test_name.null()){
684 _execDirectTest(world_pm.get(),m_direct_test_name,true);
685 return;
686 }
687
688 // Regarde si on souhaite exécuter le calcul sur un sous-ensemble
689 // des ressources allouées. Pour l'instant, il est uniquement possible
690 // de choisir un nombre de sous-domaine. Si c'est le cas, seuls
691 // les rangs de 0 au nombre de sous-domaine souhaité moins 1 sont
692 // utilisés. Les rangs supérieurs n'ont pas de sous-domaines
693 // et à la place utilisent un service qui implémente IDirectExecution
694
695 // Création du gestionnaire des statistiques d'exécution.
696 ITraceMng* trace = world_pm->traceMng();
697 String stat_name = "Rank";
698 stat_name = stat_name + world_pm->commRank();
699 ITimeStats* time_stat = _application()->mainFactory()->createTimeStats(world_pm->timerMng(),trace,stat_name);
700 sub_info->m_time_stats = time_stat;
701 time_stat->beginGatherStats();
702 world_pm->setTimeStats(time_stat);
703
704 Ref<IParallelMng> pm = world_pm;
705 Ref<IParallelMng> all_replica_pm = pm;
706
707 const Integer nb_wanted_sub_domain = _application()->applicationBuildInfo().nbProcessusSubDomain();
708 const Integer nb_wanted_replication = _application()->applicationBuildInfo().nbReplicationSubDomain();
709 // On est en parallèle et on souhaite moins de sous-domaines que de processus alloués
710 if (world_pm->isParallel()){
711 // Pour l'instant, on ne peut pas mélanger la réplication de sous-domaines avec
712 // un nombre de sous-domaines différent du nombre de processeurs alloués.
713 // TODO: lorsque ce ne sera plus le cas, il faudra faire un all_replica_pm qui
714 // contiendra l'ensemble des sous-domaines et des réplica.
715
716 if (nb_wanted_replication>1){
717 Int32 comm_size = world_pm->commSize();
718 Int32 nb_sub_part = comm_size / nb_wanted_replication;
719 trace->info() << "Using sub-domain replication nb_sub_part=" << nb_sub_part;
720 if ((comm_size % nb_wanted_replication)!=0)
721 ARCANE_FATAL("The number of replication '{0}' must be a common factor of the number of allocated cores '{1}",
722 nb_wanted_replication,comm_size);
723 // D'abord, on créé un communicateur contenant les réplicats de chaque sous-domaine
724 // Ce communicateur contiendra donc \a m_nb_wanted_replication objets
725 Ref<IParallelMng> replicate_pm;
726 trace->info() << "Building replicated parallel mng";
727 {
728 Int32UniqueArray kept_ranks(nb_wanted_replication);
729 for( Integer i_sd=0; i_sd<nb_sub_part; ++i_sd ){
730 for( Int32 i=0; i<nb_wanted_replication; ++i ){
731 kept_ranks[i] = i_sd + (i*nb_sub_part);
732 trace->info() << "Rank r=" << kept_ranks[i];
733 }
734 Ref<IParallelMng> new_pm = world_pm->createSubParallelMngRef(kept_ranks);
735 if (new_pm.get()){
736 replicate_pm = new_pm;
737 replicate_pm->setTimeStats(time_stat);
738 trace->info() << " Building own replicated parallel mng";
739 }
740 else{
741 trace->info()<<"!pm";
742 }
743 trace->flush();
744 }
745 }
746 if (!replicate_pm)
747 ARCANE_FATAL("Null replicated parallel mng");
748
749 // Maintenant, on créé un IParallelMng qui correspond à l'ensemble
750 // des rangs d'un même réplica. Ce IParallelMng sera assigné au
751 // sous-domaine qui sera créé par la suite.
752 trace->info() << "Building sub-domain parallel mng";
753 {
754 Int32UniqueArray kept_ranks(nb_sub_part);
755 for( Integer i_repl=0; i_repl<nb_wanted_replication; ++i_repl ){
756 for( Int32 i=0; i<nb_sub_part; ++i ){
757 kept_ranks[i] = i + (i_repl*nb_sub_part);
758 trace->info() << "Rank r=" << kept_ranks[i];
759 }
760 Ref<IParallelMng> new_pm = world_pm->createSubParallelMngRef(kept_ranks);
761 if (new_pm.get()){
762 pm = new_pm;
763 if (nb_sub_part==1){
764 // Il faut prendre la version séquentielle pour faire comme si le calcul
765 // était séquentiel. Ce gestionnaire sera détruit en même temps
766 // que \a new_pm
767 pm = new_pm->sequentialParallelMngRef();
768 }
769 trace->info()<<"pm: setting time_stat & m_rank_parallel_mng for replica rank=" << i_repl;
770 trace->flush();
771 pm->setTimeStats(time_stat);
772 sub_info->m_rank_parallel_mng = new_pm;
773 auto pr = new ParallelReplication(i_repl,nb_wanted_replication,replicate_pm);
774 pm->setReplication(pr);
775 }
776 else{
777 trace->info()<<"!pm";
778 trace->flush();
779 }
780 }
781 }
782 }
783 else if (nb_wanted_sub_domain!=0){
784 const Int32 nb_sub_part = nb_wanted_sub_domain;
785 Int32UniqueArray kept_ranks(nb_sub_part);
786 for( Int32 i=0; i<nb_sub_part; ++i )
787 kept_ranks[i] = i;
788 pm = world_pm->createSubParallelMngRef(kept_ranks);
789 if (pm.get()){
790 trace->info()<<"pm: setting time_stat & m_rank_parallel_mng";
791 trace->flush();
792 pm->setTimeStats(time_stat);
793 sub_info->m_rank_parallel_mng = pm;
794 all_replica_pm = pm;
795 }
796 else{
797 trace->info()<<"!pm";
798 trace->flush();
799 }
800 }
801 }
802
803 bool print_stats = false;
804 ISubDomain* sub_domain = nullptr;
805
806 if (!pm){
807 // Si ici, il s'agit d'un rang qui ne possède pas de sous-domaine.
808 // Dans ce cas, exécute le service donnée par 'm_idle_service_name'
809 // (si spécifié, sinon ne fait rien)
810 trace->info()<<"The rank doesn't own any subdomain!";
811 if (m_properties.m_idle_service_name.empty()){
812 trace->info() << "No idle service specified"; trace->flush();
813 }
814 else{
815 trace->info()<<"execDirectTest: "<< m_properties.m_idle_service_name;
816 trace->flush();
817 _execDirectTest(world_pm.get(),m_properties.m_idle_service_name,false);
818 // On sort de l'execute() du directTest grâce au broadcast(This is the end), il faut s'en retourner
819 return;
820 }
821 print_stats = true;
822 }
823 else {
824 _createAndRunSubDomain(sub_info,pm,all_replica_pm,local_rank);
825 sub_domain = sub_info->m_sub_domain;
826 print_stats = sub_info->m_want_print_stats;
827 }
828
829 time_stat->endGatherStats();
830
831 if (print_stats && sub_domain){
832 // S'assure que tout le monde est ici avant d'arêter le profiling
833 // TODO: Comme le profiling est local au processus, il suffirait
834 // a priori de faire la barrière sur les IParallelMng locaux.
835 IParallelMng* pm = sub_domain->parallelMng();
836 pm->barrier();
837 if (local_rank==0)
839 pm->barrier();
840 _printStats(sub_domain,trace,time_stat);
841 }
842
843 //BaseForm[Hash["This is the end", "CRC32"], 16]
844 // On informes les 'autres' capacités qu'il faut s'en aller, maintenant!
845 world_pm->broadcast(UniqueArray<unsigned long>(1,0xdfeb699fl).view(),0);
846}
847
848/*---------------------------------------------------------------------------*/
849/*---------------------------------------------------------------------------*/
850
851void ArcaneMainBatch::SessionExec::
852_createAndRunSubDomain(SubInfo* sub_info,Ref<IParallelMng> pm,Ref<IParallelMng> all_replica_pm,Int32 local_rank)
853{
854 // Il s'agit d'un rang qui a un sous-domaine.
855 // Celui-ci est créé et l'exécution commence.
856 SubDomainBuildInfo sdbi(pm,local_rank,all_replica_pm);
857 sdbi.setCaseFileName(m_case_file);
858 sdbi.setCaseContent(m_case_bytes);
859 ISubDomain* sub_domain = m_code_service->createAndLoadCase(m_session,sdbi);
860 sub_info->m_sub_domain = sub_domain;
861
862 ITraceMng* trace = _application()->traceMng();
863 ITraceMng* sd_trace = sub_domain->traceMng();
864 ITraceMngPolicy* trace_policy = _application()->getTraceMngPolicy();
865
866 // En cas de réplication, désactive les sorties courbes
867 // des réplicats.
868 trace->info() << "REPLICATION: rank=" << pm->replication()->replicationRank();
869
870 if (!pm->replication()->isMasterRank()){
871 trace->info() << "Disable output curves for replicates.";
872 sub_domain->timeHistoryMng()->setDumpActive(false);
873 }
874
875 // TODO:
876 // Détruire le sous-domaine à la fin de la fonction mais il
877 // faut pour cela modifier ISession pour supporter la suppression
878 // d'un sous-domaine (et ensuite détruire ISession).
879
880 IProcessorAffinityService* pas = platform::getProcessorAffinityService();
881 if (pas){
882 String cpu_set = pas->cpuSetString();
883 trace->info() << " CpuSet=" << cpu_set;
884 }
885
886 if (m_arcane_main->m_check_case_only){
887 trace->info() << "Checking the input data";
888 // Initialise les modules de la boucle en temps
889 {
890 TimeLoopReader stl(_application());
891 stl.readTimeLoops();
892 stl.registerTimeLoops(sub_domain);
893 stl.setUsedTimeLoop(sub_domain);
894 }
895 ICaseMng* cm = sub_domain->caseMng();
896 cm->readOptions(true);
897 }
898 else{
899 Timer init_timer(sub_domain,"InitTimer",Timer::TimerReal);
900 Timer loop_timer(sub_domain,"LoopTimer",Timer::TimerReal);
901
902 {
903 Timer::Action ts_action(sub_domain,"Init");
904 Timer::Sentry ts(&init_timer);
905
906 m_code_service->initCase(sub_domain,m_properties.m_is_continue);
907 }
908
909 if (m_properties.m_max_iteration>0)
910 trace->info() << "Option 'max_iteration' activated with " << m_properties.m_max_iteration;
911
912 // Redirige les signaux.
913 // Cela se fait aussi a l'initialisation mais ici on peut être dans un autre
914 // thread et de plus certaines bibliothèques ont pu rediriger les signaux
915 // lors de l'init
916 {
917 CriticalSection cs(pm->threadMng());
918 ArcaneMain::redirectSignals();
919 }
920 int ret_compute_loop = 0;
921
922 IDirectExecution* direct_exec = sub_domain->directExecution();
923 if (direct_exec && direct_exec->isActive()){
924 trace->info() << "Direct execution activated";
925 direct_exec->execute();
926 }
927 else if (m_arcane_main->m_init_only){
928 trace->info() << "Option 'init_only' activated";
929 sub_info->m_want_print_stats = true;
930 }
931 else{
932 sub_info->m_want_print_stats = true;
933 Timer::Action ts_action(sub_domain,"Loop");
934 Timer::Sentry ts(&loop_timer);
935 // Lors de la boucle de calcul, ne force pas l'affichage des traces à un niveau
936 // donné (ce qui est fait lors de l'initialisation de l'application.
937 trace_policy->setDefaultVerboseLevel(sd_trace,Trace::UNSPECIFIED_VERBOSITY_LEVEL);
938 if (m_direct_sub_domain_execute_functor){
939 m_direct_sub_domain_execute_functor->setSubDomain(sub_domain);
940 m_direct_sub_domain_execute_functor->execute();
941 sub_domain->parallelMng()->barrier();
942 }
943 else{
944 ret_compute_loop = sub_domain->timeLoopMng()->doComputeLoop(m_properties.m_max_iteration);
945 if (ret_compute_loop<0)
946 //TODO: NE PAS REMPLIR DIRECTEMENT CETTE FONCTION CAR CELA NE MARCHE
947 // PAS EN MULTI-THREAD
948 m_arcane_main->setErrorCode(8);
949 }
950 }
951 {
952 Real init_time = init_timer.totalTime();
953 Real loop_time = loop_timer.totalTime();
954 trace->info(0) << "TotalReel = " << (init_time+loop_time)
955 << " secondes (init: "
956 << init_time << " loop: " << loop_time << " )";
957 }
958 {
959 Timer::Action ts_action(sub_domain,"Exit");
960 trace_policy->setDefaultVerboseLevel(sd_trace,Trace::DEFAULT_VERBOSITY_LEVEL);
961 sub_domain->doExitModules();
962 }
963 }
964}
965
966/*---------------------------------------------------------------------------*/
967/*---------------------------------------------------------------------------*/
968
969void ArcaneMainBatch::SessionExec::
970_printStats(ISubDomain* sub_domain,ITraceMng* trace,ITimeStats* time_stat)
971{
972 ExecutionStatsDumper exec_dumper(trace);
973 exec_dumper.dumpStats(sub_domain,time_stat);
974}
975
976/*---------------------------------------------------------------------------*/
977/*---------------------------------------------------------------------------*/
978
979void ArcaneMainBatch::SessionExec::
980_execDirectTest(IParallelMng* world_pm,const String& test_name,bool is_collective)
981{
982 ITraceMng* trace = world_pm->traceMng();
983 trace->info() << "Direct test name=" << test_name;
984 trace->flush();
985 ServiceFinder2T<IDirectExecution,IApplication> sf(_application(),_application());
986 Ref<IDirectExecution> exec(sf.createReference(test_name));
987 if (!exec){
988 String msg = String::format("Can not find 'IDirectExecution' service name '{0}'",test_name);
989 if (is_collective)
990 throw ParallelFatalErrorException(A_FUNCINFO,msg);
991 else
992 throw FatalErrorException(A_FUNCINFO,msg);
993 }
994 else{
995 trace->info() << "Begin execution of direct service";
996 trace->flush();
997 }
998 exec->setParallelMng(world_pm);
999 exec->execute();
1000}
1001
1002/*---------------------------------------------------------------------------*/
1003/*---------------------------------------------------------------------------*/
1004
1006finalize()
1007{
1008 if (m_session){
1009 m_session->endSession(errorCode());
1010 _application()->removeSession(m_session);
1011 delete m_session;
1012 m_session = nullptr;
1013 }
1014 m_code_service.reset();
1015 delete m_session_exec;
1016 m_session_exec = nullptr;
1017
1018 ITraceMng* tm = _application()->traceMng();
1020}
1021
1022/*---------------------------------------------------------------------------*/
1023/*---------------------------------------------------------------------------*/
1024
1026doAbort()
1027{
1028 if (m_session)
1029 m_session->doAbort();
1030 else{
1031 // Pour finir proprement même si arrêt avant la création de la session
1032 // ou après la destruction de la session.
1034 if (psm)
1035 psm->tryAbort();
1036 }
1037}
1038
1039/*---------------------------------------------------------------------------*/
1040/*---------------------------------------------------------------------------*/
1041
1042template<typename V> void ArcaneMainBatchProperties::
1043_applyPropertyVisitor(V& p)
1044{
1045 auto b = p.builder();
1046
1047 p << b.addInt32("MaxIteration")
1048 .addDescription("Maximum number of iteration")
1049 .addCommandLineArgument("MaxIteration")
1050 .addGetter([](auto a) { return a.x.m_max_iteration; })
1051 .addSetter([](auto a) { a.x.m_max_iteration = a.v; });
1052
1053 p << b.addBool("Continue")
1054 .addDescription("True if continue from previous execution (restart)")
1055 .addCommandLineArgument("Continue")
1056 .addGetter([](auto a) { return a.x.m_is_continue; })
1057 .addSetter([](auto a) { a.x.m_is_continue = a.v; });
1058
1059 p << b.addString("IdleService")
1060 .addDescription("Name of the idle service for additionnal cores")
1061 .addCommandLineArgument("IdleService")
1062 .addGetter([](auto a) { return a.x.m_idle_service_name; })
1063 .addSetter([](auto a) { a.x.m_idle_service_name = a.v; });
1064}
1065
1066/*---------------------------------------------------------------------------*/
1067/*---------------------------------------------------------------------------*/
1068
1069ARCANE_REGISTER_PROPERTY_CLASS(ArcaneMainBatchProperties,());
1070
1071/*---------------------------------------------------------------------------*/
1072/*---------------------------------------------------------------------------*/
1073
1074} // End namespace Arcane
1075
1076/*---------------------------------------------------------------------------*/
1077/*---------------------------------------------------------------------------*/
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
static void finalize(ITraceMng *tm)
Finalise l'exécution.
Definition Runner.cc:516
static void stopAllProfiling()
Stoppe toutes les activités de profiling.
Definition Runner.cc:504
CaseDatasetSource & caseDatasetSource()
Source du jeu de données.
Informations sur une application.
Propriétés associées à ArcaneMain.
String m_idle_service_name
Nom du service pour les CPU non utilisés.
void executeFunctor() override
Exécute la méthode associé
Infos par sous-domaine qui doivent être détruites à la fin de l'exécution.
Informations d'exécution pour une session.
Ref< ICodeService > m_code_service
Service du code.
bool m_has_sub_domain_threads
indique si on utilise des threads pour gérer des sous-domaines
UniqueArray< std::byte > m_case_bytes
Contenu du jeu de données du cas sous forme d'un document XML.
String m_case_file
Nom du fichier contenant le cas.
const ArcaneMainBatchProperties m_properties
Propriétés d'exécution.
Exécution en mode batch d'un code.
String m_case_name
Nom du cas.
void doAbort() override
Effectue un abort.
Ref< ICodeService > m_code_service
Service du code.
bool m_init_only
true si on ne fait que l'initialisation.
ArcaneMainBatchProperties m_properties
Session.
void initialize() override
Initialise l'instance. L'instance n'est pas utilisable tant que cette méthode n'a pas été appelée.
int execute() override
Lance l'exécution. Cette méthode ne retourne que lorsqu'on quitte le programme.
bool m_check_case_only
true si on ne fait que vérifier le jeu de données.
void finalize() override
Effectue les dernières opérations avant destruction de l'instance.
bool m_has_sub_domain_threads
indique si on utilise des threads pour gérer des sous-domaines
void build() override
Construit les membres la classe. L'instance n'est pas utilisable tant que cette méthode n'a pas été a...
bool parseArgs(StringList args) override
Analyse les arguments.
const ApplicationInfo & applicationInfo() const override
Informations sur l'éxécutable.
static int callFunctorWithCatchedException(IFunctor *functor, IArcaneMain *amain, bool *clean_abort, bool is_print=true)
Appelle le fonctor functor en récupérant les éventuelles exceptions.
IApplication * application() const override
Application.
Definition ArcaneMain.h:321
void build() override
Construit les membres la classe. L'instance n'est pas utilisable tant que cette méthode n'a pas été a...
const ApplicationBuildInfo & applicationBuildInfo() const override
Informations pour construire l'instance IApplication.
int errorCode() const override
Code d'erreur de l'exécution.
Definition ArcaneMain.h:311
void initialize() override
Initialise l'instance. L'instance n'est pas utilisable tant que cette méthode n'a pas été appelée.
bool parseArgs(StringList args) override
Analyse les arguments.
Exception lorsqu'un argument est invalide.
Source d'un jeu de données d'un cas.
void setFileName(const String &name)
Positionne le nom du fichier du jeu de données.
String fileName() const
Nom du fichier du jeu de données.
void setContent(Span< const std::byte > bytes)
Positionne le contenu du jeu de données.
ByteConstSpan content() const
Contenu du jeu de données.
Integer count() const
Nombre d'éléments de la collection.
Definition Collection.h:70
Interface de l'application.
virtual IParallelSuperMng * parallelSuperMng()=0
Gestionnaire superviseur du parallélisme.
Interface de la classe de gestion du code.
Definition IArcaneMain.h:54
virtual ITraceMng * traceMng() const =0
Gestionnaire de traces.
Interface d'un fonctor pour exécuter du code directement après la création d'un sous-domaine sans pas...
Interface du gestionnaire des entrées sorties.
Definition IIOMng.h:42
virtual bool collectiveRead(const String &filename, ByteArray &bytes)=0
Lecture collective d'un fichier.
Manufacture des classes d'Arcane.
Interface du gestionnaire de parallélisme pour un sous-domaine.
Classe abstraite du superviseur de parallélisme.
virtual void tryAbort()=0
Tente de faire un abort.
virtual Int32 nbLocalSubDomain()=0
Nombre de sous-domaines à créér localement.
virtual Int32 commSize() const =0
Retourne le nombre total de process utilisés.
Interface d'un service de de trace des appels de fonctions.
virtual void bindThread(Int32 cpu)=0
Contraint le thread courant à rester sur le coeur d'indice cpu.
Interface d'une session d'exécution d'un cas.
Definition ISession.h:44
Interface du gestionnaire d'un sous-domaine.
Definition ISubDomain.h:74
Interface gérant les statistiques sur les temps d'exécution.
Definition ITimeStats.h:43
Interface du gestionnaire de traces.
virtual TraceMessage pfatal()=0
Flot pour un message d'erreur fatale parallèle.
virtual TraceMessage info()=0
Flot pour un message d'information.
Exception lorsqu'une erreur fatale 'parallèle' est générée.
Référence à une instance.
Chaîne de caractères unicode.
bool empty() const
Vrai si la chaîne est vide (nulle ou "")
Definition String.cc:315
@ TimerReal
Timer utilisant le temps réel.
Definition Timer.h:76
Vecteur 1D de données avec sémantique par valeur (style STL).
Integer toInteger(const char *str, bool *is_ok=0)
Converti la chaîne str en un entier non signé. Si is_ok n'est pas nul, il vaut true en retour si la c...
ARCCORE_BASE_EXPORT String getEnvironmentVariable(const String &name)
Variable d'environnement du nom name.
IProcessorAffinityService * getProcessorAffinityService()
Service utilisé pour la gestion de l'affinité des processeurs.
-*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
Int32 Integer
Type représentant un entier.
UniqueArray< Int32 > Int32UniqueArray
Tableau dynamique à une dimension d'entiers 32 bits.
Definition UtilsTypes.h:428
List< String > StringList
Tableau de chaînes de caractères unicode.
Definition UtilsTypes.h:596
double Real
Type représentant un réel.
std::int32_t Int32
Type entier signé sur 32 bits.