Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
PointToPointSerializerMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2022 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* PointToPointSerializerMng.cc (C) 2000-2020 */
9/* */
10/* Communications point à point par des 'ISerializer'. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arccore/message_passing/PointToPointSerializerMng.h"
15
16#include "arccore/message_passing/BasicSerializeMessage.h"
18#include "arccore/message_passing/ISerializeMessageList.h"
19#include "arccore/base/NotImplementedException.h"
20#include "arccore/base/FatalErrorException.h"
21
22/*---------------------------------------------------------------------------*/
23/*---------------------------------------------------------------------------*/
24
26{
27using internal::BasicSerializeMessage;
28
29/*---------------------------------------------------------------------------*/
30/*---------------------------------------------------------------------------*/
31
33{
34 public:
36 : m_message_passing_mng(mpm), m_rank(mpm->commRank())
37 {
38 m_message_list = mpCreateSerializeMessageListRef(mpm);
39 }
40 public:
41 void addMessage(Ref<ISerializeMessage> message)
42 {
43 message->setStrategy(m_strategy);
44 m_pending_messages.add(message);
45 }
46 void processPendingMessages()
47 {
48 for( auto& x : m_pending_messages ){
49 m_message_list->addMessage(x.get());
50 m_waiting_messages.add(x);
51 }
52 m_pending_messages.clear();
53 }
54 Integer waitMessages(eWaitType wt,const std::function<void(ISerializeMessage*)>& functor)
55 {
56 processPendingMessages();
57 Integer n = m_message_list->waitMessages(wt);
58 UniqueArray<Ref<ISerializeMessage>> new_waiting_messages;
59 for( auto& x : m_waiting_messages ){
60 if (x->finished()){
61 functor(x.get());
62 }
63 else
64 new_waiting_messages.add(x);
65 }
66 m_waiting_messages.clear();
67 m_waiting_messages.copy(new_waiting_messages);
68 return n;
69 }
70 bool hasMessages() const { return !m_pending_messages.empty() || !m_waiting_messages.empty(); }
71 public:
72 IMessagePassingMng* m_message_passing_mng;
73 MessageRank m_rank;
74 Ref<ISerializeMessageList> m_message_list;
77 private:
78 UniqueArray<Ref<ISerializeMessage>> m_pending_messages;
79 UniqueArray<Ref<ISerializeMessage>> m_waiting_messages;
80};
81
82/*---------------------------------------------------------------------------*/
83/*---------------------------------------------------------------------------*/
84
85PointToPointSerializerMng::
86PointToPointSerializerMng(IMessagePassingMng* mpm)
87: m_p(new Impl(mpm))
88{
89}
90
91/*---------------------------------------------------------------------------*/
92/*---------------------------------------------------------------------------*/
93
94PointToPointSerializerMng::
95~PointToPointSerializerMng()
96{
97 delete m_p;
98}
99
100/*---------------------------------------------------------------------------*/
101/*---------------------------------------------------------------------------*/
102
104messagePassingMng() const
105{
106 return m_p->m_message_passing_mng;
107}
108
109/*---------------------------------------------------------------------------*/
110/*---------------------------------------------------------------------------*/
111
114{
115 m_p->processPendingMessages();
116}
117
118/*---------------------------------------------------------------------------*/
119/*---------------------------------------------------------------------------*/
120
122waitMessages(eWaitType wt,std::function<void(ISerializeMessage*)> functor)
123{
124 return m_p->waitMessages(wt,functor);
125}
126
127/*---------------------------------------------------------------------------*/
128/*---------------------------------------------------------------------------*/
129
131hasMessages() const
132{
133 return m_p->hasMessages();
134}
135
136/*---------------------------------------------------------------------------*/
137/*---------------------------------------------------------------------------*/
138
140setDefaultTag(MessageTag default_tag)
141{
142 if (hasMessages())
143 ARCCORE_FATAL("Can not call setDefaultTag() if hasMessages()==true");
144 m_p->m_tag = default_tag;
145}
146
147/*---------------------------------------------------------------------------*/
148/*---------------------------------------------------------------------------*/
149
152{
153 if (hasMessages())
154 ARCCORE_FATAL("Can not call setStrategy() if hasMessages()==true");
155 m_p->m_strategy = strategy;
156}
157
158/*---------------------------------------------------------------------------*/
159/*---------------------------------------------------------------------------*/
160
162addSendMessage(MessageRank receiver_rank)
163{
164 auto x = BasicSerializeMessage::create(m_p->m_rank,receiver_rank,m_p->m_tag,MsgSend);
165 m_p->addMessage(x);
166 return x;
167}
168
169/*---------------------------------------------------------------------------*/
170/*---------------------------------------------------------------------------*/
171
174{
175 auto x = BasicSerializeMessage::create(m_p->m_rank,sender_rank,m_p->m_tag,MsgReceive);
176 m_p->addMessage(x);
177 return x;
178}
179
180/*---------------------------------------------------------------------------*/
181/*---------------------------------------------------------------------------*/
182
185{
186 auto x = BasicSerializeMessage::create(m_p->m_rank,message_id);
187 m_p->addMessage(x);
188 return x;
189}
190
191/*---------------------------------------------------------------------------*/
192/*---------------------------------------------------------------------------*/
193
194} // End namespace Arcane::MessagePassing
195
196/*---------------------------------------------------------------------------*/
197/*---------------------------------------------------------------------------*/
Liste des fonctions d'échange de message.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Interface du gestionnaire des échanges de messages.
virtual Int32 commRank() const =0
Rang de cette instance dans le communicateur.
void processPendingMessages()
Envoie les messages de la liste qui ne l'ont pas encore été.
bool hasMessages() const
Indique s'il reste des messages qui ne sont pas encore terminés.
IMessagePassingMng * messagePassingMng() const
Gestionnaire de message associé
Ref< ISerializeMessage > addSendMessage(MessageRank receiver_rank)
Créé message de sérialisation en envoi.
Ref< ISerializeMessage > addReceiveMessage(MessageRank sender_rank)
Créé un message de sérialisation en réception.
void setDefaultTag(MessageTag default_tag)
Tag par défaut utilisé pour les messages.
void setStrategy(ISerializeMessage::eStrategy strategy)
Stratégie utilisée pour les messages.
Integer waitMessages(eWaitType wt, std::function< void(ISerializeMessage *)> functor)
Attend que les messages aient terminé leur exécution.
static MessageTag defaultTag()
Tag par défaut pour les messages de sérialisation.
Référence à une instance.
Vecteur 1D de données avec sémantique par valeur (style STL).
Espace de nommage contenant les types et déclarations qui gèrent le mécanisme de parallélisme par éch...
Ref< ISerializeMessageList > mpCreateSerializeMessageListRef(IMessagePassingMng *pm)
Créé une liste de messages de sérialisation.
Definition Messages.cc:256
Int32 Integer
Type représentant un entier.