14#include "arccore/message_passing/SerializeMessageList.h"
16#include "arccore/message_passing/IRequestList.h"
17#include "arccore/message_passing/BasicSerializeMessage.h"
19#include "arccore/base/NotImplementedException.h"
20#include "arccore/base/FatalErrorException.h"
27namespace Arccore::MessagePassing::internal
34SerializeMessageList(IMessagePassingMng* mpm)
35: m_message_passing_mng(mpm)
37, m_message_passing_phase(timeMetricPhaseMessagePassing(mpm->timeMetricCollector()))
44void SerializeMessageList::
49 ARCCORE_FATAL(
"Can not convert 'ISerializeMessage' to 'BasicSerializeMessage'");
50 m_messages_to_process.add(true_message);
56void SerializeMessageList::
57processPendingMessages()
61 bool is_any_source = sm->destination().isNull() || sm->destination().isAnySource();
62 if (is_any_source && !m_allow_any_rank_receive) {
64 m_messages_to_probe.add({sm,message_info});
67 _addMessage(sm,message_info);
68 sm->setIsProcessed(
true);
70 m_messages_to_process.clear();
79 processPendingMessages();
84 while(!m_messages_to_probe.empty())
87 return _waitMessages(wait_type);
93void SerializeMessageList::
97 for( ProbeInfo& p : m_messages_to_probe ){
100 if (p.m_is_probe_done)
103 if (message_id.isValid()){
106 _addMessage(p.m_serialize_message,message_info);
107 p.m_is_probe_done =
true;
112 auto k = std::remove_if(m_messages_to_probe.begin(),m_messages_to_probe.end(),
113 [](
const ProbeInfo& p) { return p.m_is_probe_done; });
114 m_messages_to_probe.resize(k-m_messages_to_probe.begin());
120Integer SerializeMessageList::
121_waitMessages(eWaitType wait_type)
123 TimeMetricSentry tphase(m_message_passing_phase);
125 if (wait_type==WaitAll){
126 m_request_list->wait(WaitAll);
128 for( ISerializeMessage* sm : m_messages_serialize )
129 sm->setFinished(true);
130 m_request_list->clear();
131 m_messages_serialize.clear();
135 if (wait_type==WaitSome || wait_type==TestSome){
136 Integer nb_request = m_request_list->size();
137 m_request_list->wait(wait_type);
138 m_remaining_serialize_messages.clear();
140 for( Integer i=0; i<nb_request; ++i ){
141 BasicSerializeMessage* sm = m_messages_serialize[i];
142 if (m_request_list->isRequestDone(i)){
144 sm->setFinished(
true);
147 m_remaining_serialize_messages.add(sm);
150 m_request_list->removeDoneRequests();
151 m_messages_serialize = m_remaining_serialize_messages;
152 if (nb_done==nb_request)
157 ARCCORE_THROW(NotImplementedException,
"waitMessage with wait_type=={0}",(
int)wait_type);
163void SerializeMessageList::
164_addMessage(BasicSerializeMessage* sm,
const PointToPointMessageInfo& message_info)
167 ISerializer* s = sm->serializer();
169 r =
mpSend(m_message_passing_mng,s,message_info);
171 r =
mpReceive(m_message_passing_mng,s,message_info);
172 m_request_list->add(r);
173 m_messages_serialize.add(sm);
179PointToPointMessageInfo SerializeMessageList::
180buildMessageInfo(ISerializeMessage* sm)
182 MessageId message_id(sm->_internalMessageId());
183 if (message_id.isValid()){
184 PointToPointMessageInfo message_info(message_id,NonBlocking);
185 message_info.setEmiterRank(sm->source());
188 return { sm->source(), sm->destination(), sm->internalTag(), NonBlocking };
194Ref<ISerializeMessage> SerializeMessageList::
197 MessageRank source(m_message_passing_mng->commRank());
198 auto x = BasicSerializeMessage::create(source,destination,type);
Liste des fonctions d'échange de message.
Informations pour envoyer/recevoir un message point à point.
Request mpReceive(IMessagePassingMng *pm, ISerializer *values, const PointToPointMessageInfo &message)
Message de réception utilisant un ISerializer.
MessageId mpProbe(IMessagePassingMng *pm, const PointToPointMessageInfo &message)
Teste si un message est disponible.
Ref< IRequestList > mpCreateRequestListRef(IMessagePassingMng *pm)
Créé une liste de requêtes.
ePointToPointMessageType
Type de message point à point.
Request mpSend(IMessagePassingMng *pm, const ISerializer *values, const PointToPointMessageInfo &message)
Message d'envoi utilisant un ISerializer.
Int32 Integer
Type représentant un entier.