Arcane  4.1.12.0
User documentation
Loading...
Searching...
No Matches
SerializeMessageList.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SerializeMessageList.cc (C) 2000-2025 */
9/* */
10/* Serialization message list. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arccore/message_passing/internal/SerializeMessageList.h"
15
16#include "arccore/message_passing/IRequestList.h"
17#include "arccore/message_passing/BasicSerializeMessage.h"
19#include "arccore/base/NotImplementedException.h"
20#include "arccore/base/FatalErrorException.h"
21
22#include <algorithm>
23
24/*---------------------------------------------------------------------------*/
25/*---------------------------------------------------------------------------*/
26
27namespace Arcane::MessagePassing::internal
28{
29
30/*---------------------------------------------------------------------------*/
31/*---------------------------------------------------------------------------*/
32
33SerializeMessageList::
34SerializeMessageList(IMessagePassingMng* mpm)
35: m_message_passing_mng(mpm)
36, m_request_list(mpCreateRequestListRef(mpm))
37, m_message_passing_phase(timeMetricPhaseMessagePassing(mpm->timeMetricCollector()))
38{
39}
40
41/*---------------------------------------------------------------------------*/
42/*---------------------------------------------------------------------------*/
43
44void SerializeMessageList::
45addMessage(ISerializeMessage* message)
46{
47 BasicSerializeMessage* true_message = dynamic_cast<BasicSerializeMessage*>(message);
48 if (!true_message)
49 ARCCORE_FATAL("Can not convert 'ISerializeMessage' to 'BasicSerializeMessage'");
50 m_messages_to_process.add(true_message);
51}
52
53/*---------------------------------------------------------------------------*/
54/*---------------------------------------------------------------------------*/
55
56void SerializeMessageList::
57processPendingMessages()
58{
59 for (BasicSerializeMessage* sm : m_messages_to_process) {
60 PointToPointMessageInfo message_info(buildMessageInfo(sm));
61 bool is_any_source = sm->destination().isNull() || sm->destination().isAnySource();
62 if (is_any_source && !m_allow_any_rank_receive) {
63 // A probe will need to be done for this message
64 m_messages_to_probe.add({ sm, message_info });
65 }
66 else
67 _addMessage(sm, message_info);
68 sm->setIsProcessed(true);
69 }
70 m_messages_to_process.clear();
71}
72
73/*---------------------------------------------------------------------------*/
74/*---------------------------------------------------------------------------*/
75
76Integer SerializeMessageList::
77waitMessages(eWaitType wait_type)
78{
79 processPendingMessages();
80 // NOTE: It might also be necessary to perform probes() in the call
81 // to _waitMessages() because it is possible that not all messages have been posted.
82 // In this case, it would be necessary to switch to non-blocking mode until
83 // there are probes to perform.
84 while (!m_messages_to_probe.empty())
85 _doProbe();
86
87 return _waitMessages(wait_type);
88}
89
90/*---------------------------------------------------------------------------*/
91/*---------------------------------------------------------------------------*/
92
93void SerializeMessageList::
94_doProbe()
95{
96 // We must test with probe() if messages are available
97 for (ProbeInfo& p : m_messages_to_probe) {
98 //tm->info() << "CHECK PROBE msg=" << p.m_message_info << " is_done?=" << p.m_is_probe_done;
99 // Should not be 'true' but we perform the test for safety.
100 if (p.m_is_probe_done)
101 continue;
102 MessageId message_id = mpProbe(m_message_passing_mng, p.m_message_info);
103 if (message_id.isValid()) {
104 //tm->info() << "FOUND PROBE message_id=" << message_id;
105 PointToPointMessageInfo message_info(message_id, NonBlocking);
106 _addMessage(p.m_serialize_message, message_info);
107 p.m_is_probe_done = true;
108 }
109 }
110
111 // Remove probes that are finished.
112 auto k = std::remove_if(m_messages_to_probe.begin(), m_messages_to_probe.end(),
113 [](const ProbeInfo& p) { return p.m_is_probe_done; });
114 m_messages_to_probe.resize(k - m_messages_to_probe.begin());
115}
116
117/*---------------------------------------------------------------------------*/
118/*---------------------------------------------------------------------------*/
119
120Integer SerializeMessageList::
121_waitMessages(eWaitType wait_type)
122{
123 TimeMetricSentry tphase(m_message_passing_phase);
124
125 if (wait_type == WaitAll) {
126 m_request_list->wait(WaitAll);
127 // Indicates that the messages are finished
128 for (ISerializeMessage* sm : m_messages_serialize)
129 sm->setFinished(true);
130 m_request_list->clear();
131 m_messages_serialize.clear();
132 return (-1);
133 }
134
135 if (wait_type == WaitSome || wait_type == TestSome) {
136 Integer nb_request = m_request_list->size();
137 m_request_list->wait(wait_type);
138 m_remaining_serialize_messages.clear();
139 Integer nb_done = 0;
140 for (Integer i = 0; i < nb_request; ++i) {
141 BasicSerializeMessage* sm = m_messages_serialize[i];
142 if (m_request_list->isRequestDone(i)) {
143 ++nb_done;
144 sm->setFinished(true);
145 }
146 else {
147 m_remaining_serialize_messages.add(sm);
148 }
149 }
150 m_request_list->removeDoneRequests();
151 m_messages_serialize = m_remaining_serialize_messages;
152 if (nb_done == nb_request)
153 return (-1);
154 return nb_done;
155 }
156
157 ARCCORE_THROW(NotImplementedException, "waitMessage with wait_type=={0}", (int)wait_type);
158}
159
160/*---------------------------------------------------------------------------*/
161/*---------------------------------------------------------------------------*/
162
163void SerializeMessageList::
164_addMessage(BasicSerializeMessage* sm, const PointToPointMessageInfo& message_info)
165{
166 Request r;
167 ISerializer* s = sm->serializer();
168 if (sm->isSend())
169 r = mpSend(m_message_passing_mng, s, message_info);
170 else
171 r = mpReceive(m_message_passing_mng, s, message_info);
172 m_request_list->add(r);
173 m_messages_serialize.add(sm);
174}
175
176/*---------------------------------------------------------------------------*/
177/*---------------------------------------------------------------------------*/
178
179PointToPointMessageInfo SerializeMessageList::
180buildMessageInfo(ISerializeMessage* sm)
181{
182 MessageId message_id(sm->_internalMessageId());
183 if (message_id.isValid()) {
184 PointToPointMessageInfo message_info(message_id, NonBlocking);
185 message_info.setEmiterRank(sm->source());
186 return message_info;
187 }
188 return { sm->source(), sm->destination(), sm->internalTag(), NonBlocking };
189}
190
191/*---------------------------------------------------------------------------*/
192/*---------------------------------------------------------------------------*/
193
194Ref<ISerializeMessage> SerializeMessageList::
195createAndAddMessage(MessageRank destination, ePointToPointMessageType type)
196{
197 auto x = mpCreateSerializeMessage(m_message_passing_mng, destination, type);
198 addMessage(x.get());
199 return x;
200}
201
202/*---------------------------------------------------------------------------*/
203/*---------------------------------------------------------------------------*/
204
205} // namespace Arcane::MessagePassing::internal
206
207/*---------------------------------------------------------------------------*/
208/*---------------------------------------------------------------------------*/
#define ARCCORE_FATAL(...)
Macro throwing a FatalErrorException.
#define ARCCORE_THROW(exception_class,...)
Macro to throw an exception with formatting.
Brief list of message exchange functions.
Ref< ISerializeMessage > mpCreateSerializeMessage(IMessagePassingMng *pm, MessageRank target, ePointToPointMessageType type)
Creates a serialization message.
Definition Messages.cc:325
MessageId mpProbe(IMessagePassingMng *pm, const PointToPointMessageInfo &message)
Tests if a message is available.
Definition Messages.cc:220
Ref< IRequestList > mpCreateRequestListRef(IMessagePassingMng *pm)
Creates a list of requests.
Definition Messages.cc:155
Request mpReceive(IMessagePassingMng *pm, ISerializer *values, const PointToPointMessageInfo &message)
Receive message using an ISerializer.
Definition Messages.cc:290
Request mpSend(IMessagePassingMng *pm, const ISerializer *values, const PointToPointMessageInfo &message)
Send message using an ISerializer.
Definition Messages.cc:279
Int32 Integer
Type representing an integer.