Arcane  4.1.12.0
User documentation
Loading...
Searching...
No Matches
PointToPointSerializerMng.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* PointToPointSerializerMng.cc (C) 2000-2025 */
9/* */
10/* Point-to-point communications using 'ISerializer'. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arccore/message_passing/PointToPointSerializerMng.h"
15
16#include "arccore/message_passing/BasicSerializeMessage.h"
18#include "arccore/message_passing/ISerializeMessageList.h"
19#include "arccore/base/NotImplementedException.h"
20#include "arccore/base/FatalErrorException.h"
21
22/*---------------------------------------------------------------------------*/
23/*---------------------------------------------------------------------------*/
24
26{
28
29/*---------------------------------------------------------------------------*/
30/*---------------------------------------------------------------------------*/
31
33{
34 public:
35
36 Impl(IMessagePassingMng* mpm)
37 : m_message_passing_mng(mpm)
38 , m_rank(mpm->commRank())
39 {
40 m_message_list = mpCreateSerializeMessageListRef(mpm);
41 }
42
43 public:
44
45 void addMessage(Ref<ISerializeMessage> message)
46 {
47 message->setStrategy(m_strategy);
48 m_pending_messages.add(message);
49 }
51 {
52 for (auto& x : m_pending_messages) {
53 m_message_list->addMessage(x.get());
54 m_waiting_messages.add(x);
55 }
56 m_pending_messages.clear();
57 }
58 Integer waitMessages(eWaitType wt, const std::function<void(ISerializeMessage*)>& functor)
59 {
61 Integer n = m_message_list->waitMessages(wt);
62 UniqueArray<Ref<ISerializeMessage>> new_waiting_messages;
63 for (auto& x : m_waiting_messages) {
64 if (x->finished()) {
65 functor(x.get());
66 }
67 else
68 new_waiting_messages.add(x);
69 }
70 m_waiting_messages.clear();
71 m_waiting_messages.copy(new_waiting_messages);
72 return n;
73 }
74 bool hasMessages() const { return !m_pending_messages.empty() || !m_waiting_messages.empty(); }
75
76 public:
77
78 IMessagePassingMng* m_message_passing_mng;
79 MessageRank m_rank;
80 Ref<ISerializeMessageList> m_message_list;
83
84 private:
85
86 UniqueArray<Ref<ISerializeMessage>> m_pending_messages;
87 UniqueArray<Ref<ISerializeMessage>> m_waiting_messages;
88};
89
90/*---------------------------------------------------------------------------*/
91/*---------------------------------------------------------------------------*/
92
93PointToPointSerializerMng::
94PointToPointSerializerMng(IMessagePassingMng* mpm)
95: m_p(new Impl(mpm))
96{
97}
98
99/*---------------------------------------------------------------------------*/
100/*---------------------------------------------------------------------------*/
101
102PointToPointSerializerMng::
103~PointToPointSerializerMng()
104{
105 delete m_p;
106}
107
108/*---------------------------------------------------------------------------*/
109/*---------------------------------------------------------------------------*/
110
112messagePassingMng() const
113{
114 return m_p->m_message_passing_mng;
115}
116
117/*---------------------------------------------------------------------------*/
118/*---------------------------------------------------------------------------*/
119
122{
123 m_p->processPendingMessages();
124}
125
126/*---------------------------------------------------------------------------*/
127/*---------------------------------------------------------------------------*/
128
130waitMessages(eWaitType wt, std::function<void(ISerializeMessage*)> functor)
131{
132 return m_p->waitMessages(wt, functor);
133}
134
135/*---------------------------------------------------------------------------*/
136/*---------------------------------------------------------------------------*/
137
139hasMessages() const
140{
141 return m_p->hasMessages();
142}
143
144/*---------------------------------------------------------------------------*/
145/*---------------------------------------------------------------------------*/
146
148setDefaultTag(MessageTag default_tag)
149{
150 if (hasMessages())
151 ARCCORE_FATAL("Can not call setDefaultTag() if hasMessages()==true");
152 m_p->m_tag = default_tag;
153}
154
155/*---------------------------------------------------------------------------*/
156/*---------------------------------------------------------------------------*/
157
160{
161 if (hasMessages())
162 ARCCORE_FATAL("Can not call setStrategy() if hasMessages()==true");
163 m_p->m_strategy = strategy;
164}
165
166/*---------------------------------------------------------------------------*/
167/*---------------------------------------------------------------------------*/
168
170addSendMessage(MessageRank receiver_rank)
171{
172 auto x = BasicSerializeMessage::create(m_p->m_rank, receiver_rank, m_p->m_tag, MsgSend);
173 m_p->addMessage(x);
174 return x;
175}
176
177/*---------------------------------------------------------------------------*/
178/*---------------------------------------------------------------------------*/
179
182{
183 auto x = BasicSerializeMessage::create(m_p->m_rank, sender_rank, m_p->m_tag, MsgReceive);
184 m_p->addMessage(x);
185 return x;
186}
187
188/*---------------------------------------------------------------------------*/
189/*---------------------------------------------------------------------------*/
190
193{
194 auto x = BasicSerializeMessage::create(m_p->m_rank, message_id);
195 m_p->addMessage(x);
196 return x;
197}
198
199/*---------------------------------------------------------------------------*/
200/*---------------------------------------------------------------------------*/
201
202} // End namespace Arcane::MessagePassing
203
204/*---------------------------------------------------------------------------*/
205/*---------------------------------------------------------------------------*/
#define ARCCORE_FATAL(...)
Macro throwing a FatalErrorException.
Brief list of message exchange functions.
void clear()
Removes the elements from the array.
void add(ConstReferenceType val)
Adds element val to the end of the array.
static MessageTag defaultTag()
Default tag for serialization messages.
Interface of the message passing manager.
virtual Int32 commRank() const =0
Rank of this instance in the communicator.
IMessagePassingMng * messagePassingMng() const
Associated message manager.
bool hasMessages() const
Indicates if there are remaining messages that have not yet finished.
void setStrategy(ISerializeMessage::eStrategy strategy)
Strategy used for messages.
Integer waitMessages(eWaitType wt, std::function< void(ISerializeMessage *)> functor)
Waits for the messages to finish execution.
Ref< ISerializeMessage > addSendMessage(MessageRank receiver_rank)
Creates a sending serialization message.
void setDefaultTag(MessageTag default_tag)
Default tag used for messages.
Ref< ISerializeMessage > addReceiveMessage(MessageRank sender_rank)
Creates a receiving serialization message.
void processPendingMessages()
Sends the messages from the list that have not yet been processed.
Reference to an instance.
1D data vector with value semantics (STL style).
Declarations of types and methods used by message exchange mechanisms.
Ref< ISerializeMessageList > mpCreateSerializeMessageListRef(IMessagePassingMng *pm)
Creates a serialization message list.
Definition Messages.cc:269
Int32 Integer
Type representing an integer.