14#include "arccore/message_passing/internal/SerializeMessageList.h"
16#include "arccore/message_passing/IRequestList.h"
17#include "arccore/message_passing/BasicSerializeMessage.h"
19#include "arccore/base/NotImplementedException.h"
20#include "arccore/base/FatalErrorException.h"
27namespace Arcane::MessagePassing::internal
34SerializeMessageList(IMessagePassingMng* mpm)
35: m_message_passing_mng(mpm)
37, m_message_passing_phase(timeMetricPhaseMessagePassing(mpm->timeMetricCollector()))
44void SerializeMessageList::
45addMessage(ISerializeMessage* message)
47 BasicSerializeMessage* true_message =
dynamic_cast<BasicSerializeMessage*
>(message);
49 ARCCORE_FATAL(
"Can not convert 'ISerializeMessage' to 'BasicSerializeMessage'");
50 m_messages_to_process.add(true_message);
56void SerializeMessageList::
57processPendingMessages()
59 for (BasicSerializeMessage* sm : m_messages_to_process) {
60 PointToPointMessageInfo message_info(buildMessageInfo(sm));
61 bool is_any_source = sm->destination().isNull() || sm->destination().isAnySource();
62 if (is_any_source && !m_allow_any_rank_receive) {
64 m_messages_to_probe.add({ sm, message_info });
67 _addMessage(sm, message_info);
68 sm->setIsProcessed(
true);
70 m_messages_to_process.clear();
77waitMessages(eWaitType wait_type)
79 processPendingMessages();
84 while (!m_messages_to_probe.empty())
87 return _waitMessages(wait_type);
93void SerializeMessageList::
97 for (ProbeInfo& p : m_messages_to_probe) {
100 if (p.m_is_probe_done)
102 MessageId message_id =
mpProbe(m_message_passing_mng, p.m_message_info);
103 if (message_id.isValid()) {
105 PointToPointMessageInfo message_info(message_id, NonBlocking);
106 _addMessage(p.m_serialize_message, message_info);
107 p.m_is_probe_done =
true;
112 auto k = std::remove_if(m_messages_to_probe.begin(), m_messages_to_probe.end(),
113 [](
const ProbeInfo& p) { return p.m_is_probe_done; });
114 m_messages_to_probe.resize(k - m_messages_to_probe.begin());
121_waitMessages(eWaitType wait_type)
123 TimeMetricSentry tphase(m_message_passing_phase);
125 if (wait_type == WaitAll) {
126 m_request_list->wait(WaitAll);
128 for (ISerializeMessage* sm : m_messages_serialize)
129 sm->setFinished(
true);
130 m_request_list->clear();
131 m_messages_serialize.clear();
135 if (wait_type == WaitSome || wait_type == TestSome) {
136 Integer nb_request = m_request_list->size();
137 m_request_list->wait(wait_type);
138 m_remaining_serialize_messages.clear();
140 for (Integer i = 0; i < nb_request; ++i) {
141 BasicSerializeMessage* sm = m_messages_serialize[i];
142 if (m_request_list->isRequestDone(i)) {
144 sm->setFinished(
true);
147 m_remaining_serialize_messages.add(sm);
150 m_request_list->removeDoneRequests();
151 m_messages_serialize = m_remaining_serialize_messages;
152 if (nb_done == nb_request)
157 ARCCORE_THROW(NotImplementedException,
"waitMessage with wait_type=={0}", (
int)wait_type);
163void SerializeMessageList::
164_addMessage(BasicSerializeMessage* sm,
const PointToPointMessageInfo& message_info)
167 ISerializer* s = sm->serializer();
169 r =
mpSend(m_message_passing_mng, s, message_info);
171 r =
mpReceive(m_message_passing_mng, s, message_info);
172 m_request_list->add(r);
173 m_messages_serialize.add(sm);
179PointToPointMessageInfo SerializeMessageList::
180buildMessageInfo(ISerializeMessage* sm)
182 MessageId message_id(sm->_internalMessageId());
183 if (message_id.isValid()) {
184 PointToPointMessageInfo message_info(message_id, NonBlocking);
185 message_info.setEmiterRank(sm->source());
188 return { sm->source(), sm->destination(), sm->internalTag(), NonBlocking };
194Ref<ISerializeMessage> SerializeMessageList::
195createAndAddMessage(MessageRank destination, ePointToPointMessageType type)
#define ARCCORE_FATAL(...)
Macro throwing a FatalErrorException.
#define ARCCORE_THROW(exception_class,...)
Macro to throw an exception with formatting.
Brief list of message exchange functions.
Ref< ISerializeMessage > mpCreateSerializeMessage(IMessagePassingMng *pm, MessageRank target, ePointToPointMessageType type)
Creates a serialization message.
MessageId mpProbe(IMessagePassingMng *pm, const PointToPointMessageInfo &message)
Tests if a message is available.
Ref< IRequestList > mpCreateRequestListRef(IMessagePassingMng *pm)
Creates a list of requests.
Request mpReceive(IMessagePassingMng *pm, ISerializer *values, const PointToPointMessageInfo &message)
Receive message using an ISerializer.
Request mpSend(IMessagePassingMng *pm, const ISerializer *values, const PointToPointMessageInfo &message)
Send message using an ISerializer.
Int32 Integer
Type representing an integer.