Arcane  v3.16.0.0
Documentation utilisateur
Chargement...
Recherche...
Aucune correspondance
PointToPointSerializerMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* PointToPointSerializerMng.cc (C) 2000-2025 */
9/* */
10/* Communications point à point par des 'ISerializer'. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arccore/message_passing/PointToPointSerializerMng.h"
15
16#include "arccore/message_passing/BasicSerializeMessage.h"
18#include "arccore/message_passing/ISerializeMessageList.h"
19#include "arccore/base/NotImplementedException.h"
20#include "arccore/base/FatalErrorException.h"
21
22/*---------------------------------------------------------------------------*/
23/*---------------------------------------------------------------------------*/
24
26{
28
29/*---------------------------------------------------------------------------*/
30/*---------------------------------------------------------------------------*/
31
33{
34 public:
35 Impl(IMessagePassingMng* mpm)
36 : m_message_passing_mng(mpm), m_rank(mpm->commRank())
37 {
38 m_message_list = mpCreateSerializeMessageListRef(mpm);
39 }
40 public:
41 void addMessage(Ref<ISerializeMessage> message)
42 {
43 message->setStrategy(m_strategy);
44 m_pending_messages.add(message);
45 }
46 void processPendingMessages()
47 {
48 for( auto& x : m_pending_messages ){
49 m_message_list->addMessage(x.get());
50 m_waiting_messages.add(x);
51 }
52 m_pending_messages.clear();
53 }
54 Integer waitMessages(eWaitType wt,const std::function<void(ISerializeMessage*)>& functor)
55 {
56 processPendingMessages();
57 Integer n = m_message_list->waitMessages(wt);
58 UniqueArray<Ref<ISerializeMessage>> new_waiting_messages;
59 for( auto& x : m_waiting_messages ){
60 if (x->finished()){
61 functor(x.get());
62 }
63 else
64 new_waiting_messages.add(x);
65 }
66 m_waiting_messages.clear();
67 m_waiting_messages.copy(new_waiting_messages);
68 return n;
69 }
70 bool hasMessages() const { return !m_pending_messages.empty() || !m_waiting_messages.empty(); }
71 public:
72 IMessagePassingMng* m_message_passing_mng;
73 MessageRank m_rank;
74 Ref<ISerializeMessageList> m_message_list;
77 private:
78 UniqueArray<Ref<ISerializeMessage>> m_pending_messages;
79 UniqueArray<Ref<ISerializeMessage>> m_waiting_messages;
80};
81
82/*---------------------------------------------------------------------------*/
83/*---------------------------------------------------------------------------*/
84
85PointToPointSerializerMng::
86PointToPointSerializerMng(IMessagePassingMng* mpm)
87: m_p(new Impl(mpm))
88{
89}
90
91/*---------------------------------------------------------------------------*/
92/*---------------------------------------------------------------------------*/
93
94PointToPointSerializerMng::
95~PointToPointSerializerMng()
96{
97 delete m_p;
98}
99
100/*---------------------------------------------------------------------------*/
101/*---------------------------------------------------------------------------*/
102
104messagePassingMng() const
105{
106 return m_p->m_message_passing_mng;
107}
108
109/*---------------------------------------------------------------------------*/
110/*---------------------------------------------------------------------------*/
111
114{
115 m_p->processPendingMessages();
116}
117
118/*---------------------------------------------------------------------------*/
119/*---------------------------------------------------------------------------*/
120
122waitMessages(eWaitType wt,std::function<void(ISerializeMessage*)> functor)
123{
124 return m_p->waitMessages(wt,functor);
125}
126
127/*---------------------------------------------------------------------------*/
128/*---------------------------------------------------------------------------*/
129
131hasMessages() const
132{
133 return m_p->hasMessages();
134}
135
136/*---------------------------------------------------------------------------*/
137/*---------------------------------------------------------------------------*/
138
140setDefaultTag(MessageTag default_tag)
141{
142 if (hasMessages())
143 ARCCORE_FATAL("Can not call setDefaultTag() if hasMessages()==true");
144 m_p->m_tag = default_tag;
145}
146
147/*---------------------------------------------------------------------------*/
148/*---------------------------------------------------------------------------*/
149
152{
153 if (hasMessages())
154 ARCCORE_FATAL("Can not call setStrategy() if hasMessages()==true");
155 m_p->m_strategy = strategy;
156}
157
158/*---------------------------------------------------------------------------*/
159/*---------------------------------------------------------------------------*/
160
162addSendMessage(MessageRank receiver_rank)
163{
164 auto x = BasicSerializeMessage::create(m_p->m_rank,receiver_rank,m_p->m_tag,MsgSend);
165 m_p->addMessage(x);
166 return x;
167}
168
169/*---------------------------------------------------------------------------*/
170/*---------------------------------------------------------------------------*/
171
174{
175 auto x = BasicSerializeMessage::create(m_p->m_rank,sender_rank,m_p->m_tag,MsgReceive);
176 m_p->addMessage(x);
177 return x;
178}
179
180/*---------------------------------------------------------------------------*/
181/*---------------------------------------------------------------------------*/
182
185{
186 auto x = BasicSerializeMessage::create(m_p->m_rank,message_id);
187 m_p->addMessage(x);
188 return x;
189}
190
191/*---------------------------------------------------------------------------*/
192/*---------------------------------------------------------------------------*/
193
194} // End namespace Arcane::MessagePassing
195
196/*---------------------------------------------------------------------------*/
197/*---------------------------------------------------------------------------*/
Liste des fonctions d'échange de message.
void clear()
Supprime les éléments du tableau.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
static MessageTag defaultTag()
Tag par défaut pour les messages de sérialisation.
Interface du gestionnaire des échanges de messages.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
IMessagePassingMng * messagePassingMng() const
Gestionnaire de message associé
bool hasMessages() const
Indique s'il reste des messages qui ne sont pas encore terminés.
void setStrategy(ISerializeMessage::eStrategy strategy)
Stratégie utilisée pour les messages.
Integer waitMessages(eWaitType wt, std::function< void(ISerializeMessage *)> functor)
Attend que les messages aient terminé leur exécution.
Ref< ISerializeMessage > addSendMessage(MessageRank receiver_rank)
Créé message de sérialisation en envoi.
void setDefaultTag(MessageTag default_tag)
Tag par défaut utilisé pour les messages.
Ref< ISerializeMessage > addReceiveMessage(MessageRank sender_rank)
Créé un message de sérialisation en réception.
void processPendingMessages()
Envoie les messages de la liste qui ne l'ont pas encore été.
Référence à une instance.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Definition Parallel.h:50
Ref< ISerializeMessageList > mpCreateSerializeMessageListRef(IMessagePassingMng *pm)
Créé une liste de messages de sérialisation.
Definition Messages.cc:267
Int32 Integer
Type représentant un entier.