Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
SerializeMessageList.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SerializeMessageList.cc (C) 2000-2024 */
9/* */
10/* Liste de messages de sérialisation. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arccore/message_passing/SerializeMessageList.h"
15
16#include "arccore/message_passing/IRequestList.h"
17#include "arccore/message_passing/BasicSerializeMessage.h"
19#include "arccore/base/NotImplementedException.h"
20#include "arccore/base/FatalErrorException.h"
21
22#include <algorithm>
23
24/*---------------------------------------------------------------------------*/
25/*---------------------------------------------------------------------------*/
26
27namespace Arccore::MessagePassing::internal
28{
29
30/*---------------------------------------------------------------------------*/
31/*---------------------------------------------------------------------------*/
32
33SerializeMessageList::
34SerializeMessageList(IMessagePassingMng* mpm)
35: m_message_passing_mng(mpm)
36, m_request_list(mpCreateRequestListRef(mpm))
37, m_message_passing_phase(timeMetricPhaseMessagePassing(mpm->timeMetricCollector()))
38{
39}
40
41/*---------------------------------------------------------------------------*/
42/*---------------------------------------------------------------------------*/
43
44void SerializeMessageList::
45addMessage(ISerializeMessage* message)
46{
47 BasicSerializeMessage* true_message = dynamic_cast<BasicSerializeMessage*>(message);
48 if (!true_message)
49 ARCCORE_FATAL("Can not convert 'ISerializeMessage' to 'BasicSerializeMessage'");
50 m_messages_to_process.add(true_message);
51}
52
53/*---------------------------------------------------------------------------*/
54/*---------------------------------------------------------------------------*/
55
56void SerializeMessageList::
57processPendingMessages()
58{
59 for( BasicSerializeMessage* sm : m_messages_to_process ){
60 PointToPointMessageInfo message_info(buildMessageInfo(sm));
61 bool is_any_source = sm->destination().isNull() || sm->destination().isAnySource();
62 if (is_any_source && !m_allow_any_rank_receive) {
63 // Il faudra faire un probe pour ce message
64 m_messages_to_probe.add({sm,message_info});
65 }
66 else
67 _addMessage(sm,message_info);
68 sm->setIsProcessed(true);
69 }
70 m_messages_to_process.clear();
71}
72
73/*---------------------------------------------------------------------------*/
74/*---------------------------------------------------------------------------*/
75
76Integer SerializeMessageList::
77waitMessages(eWaitType wait_type)
78{
79 processPendingMessages();
80 // NOTE: il faudrait peut-être faire aussi faire des probe() dans l'appel
81 // à _waitMessages() car il est possible que tous les messages n'aient pas
82 // été posté. Dans ce cas, il faudrait passer en mode non bloquant tant
83 // qu'il y a des probe à faire
84 while(!m_messages_to_probe.empty())
85 _doProbe();
86
87 return _waitMessages(wait_type);
88}
89
90/*---------------------------------------------------------------------------*/
91/*---------------------------------------------------------------------------*/
92
93void SerializeMessageList::
94_doProbe()
95{
96 // Il faut tester avec probe() si des messages sont disponibles
97 for( ProbeInfo& p : m_messages_to_probe ){
98 //tm->info() << "CHECK PROBE msg=" << p.m_message_info << " is_done?=" << p.m_is_probe_done;
99 // Ne devrait pas être 'vrai' mais par sécurité on fait le test.
100 if (p.m_is_probe_done)
101 continue;
102 MessageId message_id = mpProbe(m_message_passing_mng,p.m_message_info);
103 if (message_id.isValid()){
104 //tm->info() << "FOUND PROBE message_id=" << message_id;
105 PointToPointMessageInfo message_info(message_id,NonBlocking);
106 _addMessage(p.m_serialize_message,message_info);
107 p.m_is_probe_done = true;
108 }
109 }
110
111 // Supprime les probes qui sont terminés.
112 auto k = std::remove_if(m_messages_to_probe.begin(),m_messages_to_probe.end(),
113 [](const ProbeInfo& p) { return p.m_is_probe_done; });
114 m_messages_to_probe.resize(k-m_messages_to_probe.begin());
115}
116
117/*---------------------------------------------------------------------------*/
118/*---------------------------------------------------------------------------*/
119
120Integer SerializeMessageList::
121_waitMessages(eWaitType wait_type)
122{
123 TimeMetricSentry tphase(m_message_passing_phase);
124
125 if (wait_type==WaitAll){
126 m_request_list->wait(WaitAll);
127 // Indique que les messages sont bien terminés
128 for( ISerializeMessage* sm : m_messages_serialize )
129 sm->setFinished(true);
130 m_request_list->clear();
131 m_messages_serialize.clear();
132 return (-1);
133 }
134
135 if (wait_type==WaitSome || wait_type==TestSome){
136 Integer nb_request = m_request_list->size();
137 m_request_list->wait(wait_type);
138 m_remaining_serialize_messages.clear();
139 Integer nb_done = 0;
140 for( Integer i=0; i<nb_request; ++i ){
141 BasicSerializeMessage* sm = m_messages_serialize[i];
142 if (m_request_list->isRequestDone(i)){
143 ++nb_done;
144 sm->setFinished(true);
145 }
146 else{
147 m_remaining_serialize_messages.add(sm);
148 }
149 }
150 m_request_list->removeDoneRequests();
151 m_messages_serialize = m_remaining_serialize_messages;
152 if (nb_done==nb_request)
153 return (-1);
154 return nb_done;
155 }
156
157 ARCCORE_THROW(NotImplementedException,"waitMessage with wait_type=={0}",(int)wait_type);
158}
159
160/*---------------------------------------------------------------------------*/
161/*---------------------------------------------------------------------------*/
162
163void SerializeMessageList::
164_addMessage(BasicSerializeMessage* sm,const PointToPointMessageInfo& message_info)
165{
166 Request r;
167 ISerializer* s = sm->serializer();
168 if (sm->isSend())
169 r = mpSend(m_message_passing_mng,s,message_info);
170 else
171 r = mpReceive(m_message_passing_mng,s,message_info);
172 m_request_list->add(r);
173 m_messages_serialize.add(sm);
174}
175
176/*---------------------------------------------------------------------------*/
177/*---------------------------------------------------------------------------*/
178
179PointToPointMessageInfo SerializeMessageList::
180buildMessageInfo(ISerializeMessage* sm)
181{
182 MessageId message_id(sm->_internalMessageId());
183 if (message_id.isValid()){
184 PointToPointMessageInfo message_info(message_id,NonBlocking);
185 message_info.setEmiterRank(sm->source());
186 return message_info;
187 }
188 return { sm->source(), sm->destination(), sm->internalTag(), NonBlocking };
189}
190
191/*---------------------------------------------------------------------------*/
192/*---------------------------------------------------------------------------*/
193
194Ref<ISerializeMessage> SerializeMessageList::
195createAndAddMessage(MessageRank destination,ePointToPointMessageType type)
196{
197 MessageRank source(m_message_passing_mng->commRank());
198 auto x = BasicSerializeMessage::create(source,destination,type);
199 addMessage(x.get());
200 return x;
201}
202
203/*---------------------------------------------------------------------------*/
204/*---------------------------------------------------------------------------*/
205
206} // End namespace Arcane::MessagePassing
207
208/*---------------------------------------------------------------------------*/
209/*---------------------------------------------------------------------------*/
Liste des fonctions d'échange de message.
Informations pour envoyer/recevoir un message point à point.
Message de sérialisation utilisant un BasicSerializer.
Référence à une instance.
MessageId mpProbe(IMessagePassingMng *pm, const PointToPointMessageInfo &message)
Teste si un message est disponible.
Definition Messages.cc:207
Request mpSend(IMessagePassingMng *pm, const ISerializer *values, const PointToPointMessageInfo &message)
Message d'envoi utilisant un ISerializer.
Definition Messages.cc:266
ePointToPointMessageType
Type de message point à point.
Request mpReceive(IMessagePassingMng *pm, ISerializer *values, const PointToPointMessageInfo &message)
Message de réception utilisant un ISerializer.
Definition Messages.cc:277
Ref< IRequestList > mpCreateRequestListRef(IMessagePassingMng *pm)
Créé une liste de requêtes.
Definition Messages.cc:142
Int32 Integer
Type représentant un entier.