Arcane  v3.15.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
SharedMemoryMessageQueue.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryMessageQueue.cc (C) 2000-2024 */
9/* */
10/* Implémentation d'une file de messages en mémoire partagée. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/NotImplementedException.h"
16#include "arcane/utils/NotSupportedException.h"
17#include "arcane/utils/ArgumentException.h"
18#include "arcane/utils/ITraceMng.h"
19#include "arcane/utils/ValueConvert.h"
20
21#include "arcane/utils/internal/MemoryResourceMng.h"
22
23#include "arcane/parallel/thread/SharedMemoryMessageQueue.h"
24#include "arcane/parallel/thread/IAsyncQueue.h"
25
26#include "arcane/core/ISerializeMessage.h"
27
28// Macro pour afficher des messages pour debug
29#define TRACE_DEBUG(format_str,...) \
30 if (m_is_debug && m_trace_mng){ \
31 m_trace_mng->info() << String::format(format_str,__VA_ARGS__); \
32 m_trace_mng->flush();\
33 }
34
35/*---------------------------------------------------------------------------*/
36/*---------------------------------------------------------------------------*/
37
39{
40
41/*---------------------------------------------------------------------------*/
42/*---------------------------------------------------------------------------*/
55{
56 SendBufferInfo send_info = sender->sendBufferInfo();
57 ReceiveBufferInfo receive_info = this->receiveBufferInfo();
58
59 const ISerializer* send_serializer = send_info.serializer();
62 if (!send_serializer)
63 ARCANE_FATAL("No send serializer for receive serializer");
65 return;
66 }
67
68 ByteConstSpan send_span = send_info.memoryBuffer();
69 ByteSpan receive_span = receive_info.memoryBuffer();
70 Int64 send_size = send_span.size();
71 Int64 receive_size = receive_span.size();
73 ARCANE_FATAL("Not enough memory for receiving message receive={0} send={1}",
75
77}
78
79/*---------------------------------------------------------------------------*/
80/*---------------------------------------------------------------------------*/
81
82void SharedMemoryMessageRequest::
83destroy()
84{
85 if (m_is_destroyed)
86 ARCANE_FATAL("Request already destroyed");
87 m_is_destroyed = true;
88 // Commenter pour debug.
89 delete this;
90}
91
92/*---------------------------------------------------------------------------*/
93/*---------------------------------------------------------------------------*/
99{
100 public:
102 : m_async_queue(IAsyncQueue::createQueue()){}
104 {
105 delete m_async_queue;
106 }
107 public:
108 void push(SharedMemoryMessageRequest* v)
109 {
110 m_async_queue->push(v);
111 }
113 {
114 return reinterpret_cast<SharedMemoryMessageRequest*>(m_async_queue->pop());
115 }
117 {
118 return reinterpret_cast<SharedMemoryMessageRequest*>(m_async_queue->tryPop());
119 }
120 private:
121 IAsyncQueue* m_async_queue;
122};
123
124/*---------------------------------------------------------------------------*/
125/*---------------------------------------------------------------------------*/
126
127/*---------------------------------------------------------------------------*/
128/*---------------------------------------------------------------------------*/
141{
142 static MessageTag SERIALIZER_TAG() { return MessageTag(125); }
143
144 public:
145
147
148 public:
149
150 MessageRank rank() const { return m_rank; }
151 void setTraceMng(ITraceMng* tm) { m_trace_mng = tm; }
153 void testRequest(SharedMemoryMessageRequest* tmr);
154 void waitRequestAvailable();
155 void checkRequestAvailable();
157 MessageId probe(const PointToPointMessageInfo& message);
158 MessageSourceInfo legacyProbe(const PointToPointMessageInfo& message);
159
160 public:
161
163 addReceive(Int64 request_id,const PointToPointMessageInfo& message,ReceiveBufferInfo buf);
165 addSend(Int64 request_id,const PointToPointMessageInfo& message,SendBufferInfo buf);
166
167 private:
168
169 SharedMemoryMessageQueue* m_master_queue = nullptr;
170 MessageRank m_rank;
174 RequestAsyncQueue m_async_message_queue;
175 ITraceMng* m_trace_mng = nullptr;
176 bool m_is_debug = false;
177 bool m_is_allow_null_rank_for_any_source = true;
178
179 private:
180
182 bool _checkSendDone(SharedMemoryMessageRequest* tmr_send);
183 bool _checkRecvDone(SharedMemoryMessageRequest* tmr_recv);
184 void _checkRequestDone(SharedMemoryMessageRequest* tmr);
187 _getMatchingSendRequest(MessageRank recv_dest,MessageRank recv_orig,MessageTag tag);
188 void _testOrWaitRequestAvailable(bool is_blocking);
190 _createReceiveRequest(Int64 request_id,MessageRank dest,MessageTag tag,
193 _createSendRequest(Int64 request_id,MessageRank orig,MessageTag tag,
195};
196
197/*---------------------------------------------------------------------------*/
198/*---------------------------------------------------------------------------*/
199
200SharedMemoryMessageQueue::SubQueue::
202: m_master_queue(master_queue)
203, m_rank(rank)
204{
205 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCCORE_ALLOW_NULL_RANK_FOR_MPI_ANY_SOURCE", true))
206 m_is_allow_null_rank_for_any_source = v.value() != 0;
207}
208
209/*---------------------------------------------------------------------------*/
210/*---------------------------------------------------------------------------*/
211
212/*---------------------------------------------------------------------------*/
213/*---------------------------------------------------------------------------*/
214
215SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
216_createReceiveRequest(Int64 request_id,MessageRank dest,
218{
219 auto* tmr = new SharedMemoryMessageRequest(this,request_id,m_rank,dest,tag,receive_buffer);
220 return tmr;
221}
222
223/*---------------------------------------------------------------------------*/
224/*---------------------------------------------------------------------------*/
225
226SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
227_createSendRequest(Int64 request_id,MessageRank orig,
228 MessageTag tag,SendBufferInfo send_buffer)
229{
230 SubQueue* queue = m_master_queue->_getSubQueue(orig);
231 auto* tmr = new SharedMemoryMessageRequest(queue,request_id,orig,m_rank,tag,send_buffer);
232 return tmr;
233}
234
235/*---------------------------------------------------------------------------*/
236/*---------------------------------------------------------------------------*/
237
238SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
239addReceive(Int64 request_id,const PointToPointMessageInfo& message,ReceiveBufferInfo buf)
240{
241 SharedMemoryMessageRequest* tmr = nullptr;
242 if (message.isRankTag()){
243 MessageTag tag = message.tag();
244 MessageRank dest = message.destinationRank();
245 tmr = _createReceiveRequest(request_id,dest,tag,buf);
246 TRACE_DEBUG("** ADD RECV queue={0} id={1} ORIG={2} DEST={3} tag={4} tmr={5} size={6} serializer={7}",
247 this,request_id,m_rank,dest,tag,tmr,buf.memoryBuffer().size(),buf.serializer());
248 }
249 else if (message.isMessageId()){
250 MessageId message_id = message.messageId();
251 MessageId::SourceInfo si = message_id.sourceInfo();
252 MessageRank dest = si.rank();
253 MessageTag tag = si.tag();
254
255 // On connait la requête 'send' qui matche celle ci. Il faut donc
256 // la positionner dès maintenant dans \a tmr pour être sur qu'on utilisera la bonne.
257 // Pour cela, cherche le send correspondant à l'id de notre requête
258 Int64 req_id = (size_t)message_id;
259 SharedMemoryMessageRequest* send_request = nullptr;
260 for( Integer i=0, n=m_send_requests.size(); i<n; ++i )
261 if (m_send_requests[i]->id()==req_id){
262 send_request = m_send_requests[i];
263 break;
264 }
265 if (!send_request)
266 ARCANE_FATAL("Can not find matching send request from MessageId");
267
268 tmr = _createReceiveRequest(request_id,dest,tag,buf);
269 TRACE_DEBUG("** ADD RECV FromMessageId queue={0} id={1} ORIG={2} DEST={3} tag={4} tmr={5} size={6} serializer={7}",
270 this,request_id,m_rank,dest,tag,tmr,buf.memoryBuffer().size(),buf.serializer());
271 tmr->setMatchingSendRequest(send_request);
272 }
273 else
274 ARCANE_THROW(NotSupportedException,"Invalid 'MessageInfo'");
275
276 m_recv_requests.add(tmr);
277 return tmr;
278}
279
280/*---------------------------------------------------------------------------*/
281/*---------------------------------------------------------------------------*/
282
283SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
284addSend(Int64 request_id,const PointToPointMessageInfo& message,SendBufferInfo buf)
285{
286 MessageTag tag = message.tag();
287 if (tag.isNull())
288 ARCANE_THROW(ArgumentException,"null tag");
289 MessageRank orig = message.emiterRank();
290 auto* tmr = _createSendRequest(request_id,orig,tag,buf);
291 m_async_message_queue.push(tmr);
292 TRACE_DEBUG("** ADD SEND queue={0} ORIG={1} DEST={2} tag={3} size={4} tmr={5} serializer={6}",
293 this,orig,m_rank,tag,buf.memoryBuffer().size(),tmr,buf.serializer());
294 return tmr;
295}
296
297/*---------------------------------------------------------------------------*/
298/*---------------------------------------------------------------------------*/
299
300void SharedMemoryMessageQueue::SubQueue::
301_checkRequestDone(SharedMemoryMessageRequest* tmr)
302{
303 if (tmr->isDone())
304 ARCANE_FATAL("Can not check already done request");
305
306 if (tmr->isRecv())
307 _checkRecvDone(tmr);
308 else
309 _checkSendDone(tmr);
310}
311
312/*---------------------------------------------------------------------------*/
313/*---------------------------------------------------------------------------*/
321{
322 if (tmr->isDone()){
323 if (tmr->isRecv())
324 _removeRequest(tmr,m_recv_requests);
325 else
326 _removeRequest(tmr,m_done_requests);
327 }
328}
329
330/*---------------------------------------------------------------------------*/
331/*---------------------------------------------------------------------------*/
332
333void SharedMemoryMessageQueue::SubQueue::
334_testOrWaitRequestAvailable(bool is_blocking)
335{
337 if (is_blocking)
338 sq = m_async_message_queue.pop();
339 else
340 sq = m_async_message_queue.tryPop();
341 if (sq){
342 if (sq->orig()==m_rank)
343 m_done_requests.add(sq);
344 else
345 m_send_requests.add(sq);
346 }
347}
348
349/*---------------------------------------------------------------------------*/
350/*---------------------------------------------------------------------------*/
351
352void SharedMemoryMessageQueue::SubQueue::
353waitRequestAvailable()
354{
355 // Bloque tant qu'on n'a pas recu de message.
356 _testOrWaitRequestAvailable(true);
357}
358
359/*---------------------------------------------------------------------------*/
360/*---------------------------------------------------------------------------*/
361
362void SharedMemoryMessageQueue::SubQueue::
363checkRequestAvailable()
364{
365 _testOrWaitRequestAvailable(false);
366}
367
368/*---------------------------------------------------------------------------*/
369/*---------------------------------------------------------------------------*/
370
371void SharedMemoryMessageQueue::SubQueue::
372wait(SharedMemoryMessageRequest* tmr)
373{
374 if (tmr->queue()!=this)
375 ARCANE_FATAL("Bad queue");
376 TRACE_DEBUG("**** WAIT MESSAGE tmr={0} rank={1} recv?={2} dest={3}"
377 " nb_send={4} nb_done={5}",tmr,m_rank,tmr->isRecv(),
378 tmr->dest(),m_send_requests.size(),m_done_requests.size());
379 while (!tmr->isDone()){
380 _checkRequestDone(tmr);
381
382 if (!tmr->isDone()){
383 // Bloque tant qu'on n'a pas recu de message.
384 waitRequestAvailable();
385 }
386 }
387
388 _cleanupRequestIfDone(tmr);
389}
390
391/*---------------------------------------------------------------------------*/
392/*---------------------------------------------------------------------------*/
393
394void SharedMemoryMessageQueue::SubQueue::
395testRequest(SharedMemoryMessageRequest* tmr)
396{
397 if (tmr->queue()!=this)
398 ARCANE_FATAL("Bad queue");
399
400 _checkRequestDone(tmr);
401 _cleanupRequestIfDone(tmr);
402}
403
404/*---------------------------------------------------------------------------*/
405/*---------------------------------------------------------------------------*/
406
407void SharedMemoryMessageQueue::SubQueue::
408_removeRequest(SharedMemoryMessageRequest* tmr,Array<SharedMemoryMessageRequest*>& requests)
409{
410 for( Integer i=0, n=requests.size(); i<n; ++i ){
411 SharedMemoryMessageRequest* tmr2 = requests[i];
412 if (tmr==tmr2){
413 requests.remove(i);
414 return;
415 }
416 }
417 ARCANE_FATAL("Can not remove request");
418}
419
420/*---------------------------------------------------------------------------*/
421/*---------------------------------------------------------------------------*/
422
423bool SharedMemoryMessageQueue::SubQueue::
424_checkSendDone(SharedMemoryMessageRequest* tmr_send)
425{
426 for( SharedMemoryMessageRequest* tmr : m_done_requests ){
427 if (tmr==tmr_send){
428 tmr_send->setDone(true);
429 return true;
430 }
431 }
432 return false;
433}
434
435/*---------------------------------------------------------------------------*/
436/*---------------------------------------------------------------------------*/
437/*
438 * \brief Vérifie qu'un message d'envoie correspond à la requête \a tmr_recv.
439 *
440 * Regarde si une requête dans la liste des messages envoyés correspond à
441 * \a tmr_recv. Cela est le cas si le couple origine/destination convient
442 * ou si la destination est A_NULL_RANK (ce qui correspond à MPI_ANY_SOURCE
443 * dans le cas MPI).
444 * Il est important de prendre les requêtes dans l'ordre d'arrivée pour se
445 * conformer à la norme MPI.
446 *
447 * \retval true si une requête correspondante a été trouvée.
448 * \retval false sinon.
449 */
450SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
451_getMatchingSendRequest(MessageRank recv_dest,MessageRank recv_orig,MessageTag tag)
452{
453 bool is_any_tag = tag.isNull();
454 bool is_any_dest = recv_dest.isNull() || recv_dest.isAnySource();
455 if (recv_dest.isNull() && !m_is_allow_null_rank_for_any_source)
456 ARCANE_FATAL("Can not use probe() with null rank. Use MessageRank::anySourceRank() instead");
457 for( Integer j=0, n=m_send_requests.size(); j<n; ++j ){
458 SharedMemoryMessageRequest* tmr_send = m_send_requests[j];
459 TRACE_DEBUG("CHECK RECV DONE id={7} tmr_send={0} recv_dest={1}"
460 " recv_orig={2} send_dest={3} send_orig={4} request={5}/{6}\n",
461 tmr_send,recv_dest,recv_orig,
462 tmr_send->dest(),tmr_send->orig(),j,n,m_rank);
463 if (recv_orig==tmr_send->dest()){
464 bool is_rank_ok = (recv_dest==tmr_send->orig()) || (is_any_dest && m_rank==recv_orig);
465 bool is_tag_ok = (is_any_tag || tmr_send->tag()==tag);
466 if (is_rank_ok && is_tag_ok) {
467 return tmr_send;
468 }
469 }
470 }
471 return nullptr;
472}
473
474/*---------------------------------------------------------------------------*/
475/*---------------------------------------------------------------------------*/
476
477bool SharedMemoryMessageQueue::SubQueue::
478_checkRecvDone(SharedMemoryMessageRequest* tmr_recv)
479{
480 // Regarde si le send est déjà associé à notre requête. C'est le cas
481 // si on a utiliser un appel à probe().
482 auto* tmr_send = tmr_recv->matchingSendRequest();
483 if (!tmr_send)
484 tmr_send = _getMatchingSendRequest(tmr_recv->dest(),tmr_recv->orig(),tmr_recv->tag());
485 if (tmr_send){
486 tmr_recv->setSource(tmr_send->orig());
487 tmr_recv->copyFromSender(tmr_send);
488 tmr_recv->setDone(true);
489 tmr_send->queue()->m_async_message_queue.push(tmr_send);
490 _removeRequest(tmr_send,m_send_requests);
491 return true;
492 }
493 return false;
494}
495
496/*---------------------------------------------------------------------------*/
497/*---------------------------------------------------------------------------*/
498
499void SharedMemoryMessageQueue::SubQueue::
500waitSome(ArrayView<Request> requests,ArrayView<bool> requests_done,
501 bool is_non_blocking)
502{
503 Integer nb_request = requests.size();
504 requests_done.fill(false);
505 bool one_request_done = false;
506 TRACE_DEBUG("** WAIT SOME REQUEST rank={0} nb_request={1}\n",m_rank,nb_request);
507 while (!one_request_done){
508 if (is_non_blocking)
509 checkRequestAvailable();
510 bool has_valid_request = false;
511 for( Integer i=0; i<nb_request; ++i ){
512 Request request = requests[i];
513 if (!request.isValid())
514 continue;
515 SharedMemoryMessageRequest* tmr = requests[i];
516 if (!tmr)
517 continue;
518 this->testRequest(tmr);
519 if (tmr->isDone()){
520 one_request_done = true;
521 requests_done[i] = true;
522 tmr->destroy();
523 if (requests[i].hasSubRequest())
524 ARCANE_THROW(NotImplementedException,"handling of sub requests");
525 requests[i].reset();
526 }
527 has_valid_request = true;
528 }
529 // Si au moins une requête est terminée, sort de la boucle.
530 if (one_request_done)
531 break;
532 // Si ici, aucune requête n'a abouti. On bloque jusqu'à ce qu'il y ait
533 // un message dans la file sauf s'il n'y a aucune requête valide
534 // (Ceci est possible si toutes les requêtes de la liste sont des
535 // requêtes nulles).
536 if (!has_valid_request)
537 break;
538 // En mode non bloquant, on a déjà testé en début de boucle si
539 // une requête est disponible. Si on est ici on peut sortir directement
540 // sinon on va bloquer.
541 if (is_non_blocking)
542 break;
543 TRACE_DEBUG("** WAIT REQUEST AVAILABLE rank={0}",m_rank);
544 waitRequestAvailable();
545 }
546}
547
548/*---------------------------------------------------------------------------*/
549/*---------------------------------------------------------------------------*/
550
551MP::MessageId SharedMemoryMessageQueue::SubQueue::
552probe(const MP::PointToPointMessageInfo& message)
553{
554 TRACE_DEBUG("Probe rank={0} nb_send={1} nb_receive={2} queue={3} is_valid={4}",
555 m_rank,m_send_requests.size(),m_recv_requests.size(),this,message.isValid());
556 if (!message.isValid())
557 return MessageId();
558
559 // Il faut avoir initialisé le message avec un couple (rang/tag).
560 if (!message.isRankTag())
561 ARCCORE_FATAL("Invalid message_info: message.isRankTag() is false");
562
563 MessageRank rank = message.destinationRank();
564 MessageTag tag = message.tag();
565 bool is_blocking = message.isBlocking();
566 if (is_blocking)
567 ARCANE_THROW(NotImplementedException,"blocking probe");
568
569 // TODO: regarder pour mettre une sécurité anti-bouclage
570 // TODO: il faudrait vérifier que si on appelle deux fois cette
571 // méthode avec les mêmes informations on ne récupère pas le même message.
572 // Lorsque ce sera aussi le cas il faudra modifier legacyProbe() en
573 // conséquence.
574 for(;;){
575 _testOrWaitRequestAvailable(is_blocking);
576 auto* req = _getMatchingSendRequest(rank,m_rank,tag);
577 if (req){
578 // TODO: Vérifier que la réquête est bien avec un buffer et pas un
579 // 'ISerializer'.
580 Int64 send_size = req->sendBufferInfo().memoryBuffer().size();
581 MessageId::SourceInfo si(req->orig(),req->tag(),send_size);
582 return MessageId(si,(size_t)req->id());
583 }
584 if (!is_blocking)
585 // En non bloquant, sort de la boucle même si on n'a pas de requête.
586 break;
587 }
588 return {};
589}
590
591/*---------------------------------------------------------------------------*/
592/*---------------------------------------------------------------------------*/
593
594MP::MessageSourceInfo SharedMemoryMessageQueue::SubQueue::
595legacyProbe(const MP::PointToPointMessageInfo& message)
596{
597 // Fait un probe normal mais ne conserve pas l'information du message.
598 // NOTE: cela fonctionne car probe() peut retourner plusieurs fois le même
599 // message. Lorsque ce ne sera plus le cas il faudra modifier cela.
600 MP::MessageId message_id = probe(message);
601 if (message_id.isValid())
602 return message_id.sourceInfo();
603 return {};
604}
605
606/*---------------------------------------------------------------------------*/
607/*---------------------------------------------------------------------------*/
608
609/*---------------------------------------------------------------------------*/
610/*---------------------------------------------------------------------------*/
611
612SharedMemoryMessageQueue::
613~SharedMemoryMessageQueue()
614{
615 for( SubQueue* sq : m_sub_queues )
616 delete sq;
617}
618
619/*---------------------------------------------------------------------------*/
620/*---------------------------------------------------------------------------*/
621
622void SharedMemoryMessageQueue::
623init(Integer nb_thread)
624{
625 m_atomic_request_id = 1;
626 m_nb_thread = nb_thread;
627 Integer nb_queue = nb_thread;
628 m_sub_queues.resize(nb_queue);
629 for( Integer i=0; i<nb_queue; ++i ){
630 m_sub_queues[i] = new SubQueue(this,MessageRank(i));
631 }
632}
633
634/*---------------------------------------------------------------------------*/
635/*---------------------------------------------------------------------------*/
636
637SharedMemoryMessageQueue::SubQueue* SharedMemoryMessageQueue::
638_getSourceSubQueue(const MP::PointToPointMessageInfo& message)
639{
640 MessageRank orig = message.emiterRank();
641 if (orig.isNull())
642 ARCANE_THROW(ArgumentException,"null message.sourceRank()");
643 return _getSubQueue(orig);
644}
645
646/*---------------------------------------------------------------------------*/
647/*---------------------------------------------------------------------------*/
648
649SharedMemoryMessageQueue::SubQueue* SharedMemoryMessageQueue::
650_getDestinationSubQueue(const MP::PointToPointMessageInfo& message)
651{
652 MessageRank dest = message.destinationRank();
653 if (dest.isNull())
654 ARCANE_THROW(ArgumentException,"null message.destinationRank()");
655 return _getSubQueue(dest);
656}
657
658/*---------------------------------------------------------------------------*/
659/*---------------------------------------------------------------------------*/
660
661void SharedMemoryMessageQueue::
662waitAll(ArrayView<Request> requests)
663{
664 Integer nb_request = requests.size();
665 // Pour ne pas que cela bloque, il faut que les requêtes soient faites
666 // par ordre croissant des files, et les 'receive' D'ABORD !!
667 UniqueArray<SharedMemoryMessageRequest*> sorted_requests(nb_request);
668 for( Integer i=0; i<nb_request; ++i ){
669 sorted_requests[i] = (SharedMemoryMessageRequest*)requests[i].requestAsVoidPtr();
670 }
671 std::sort(std::begin(sorted_requests),std::end(sorted_requests),
672 SharedMemoryMessageRequest::SortFunctor(m_nb_thread));
673
674#if 0
675 for( Integer i=0; i<nb_request; ++i ){
676 SharedMemoryMessageRequest* tmr = sorted_requests[i];
677 cout << String::format("** WAIT FOR REQUEST tmr={0}\n",tmr);
678 }
679#endif
680
681 for( Integer i=0; i<nb_request; ++i ){
682 SharedMemoryMessageRequest* tmr = sorted_requests[i];
683 if (tmr){
684 SubQueue* sub_queue = tmr->queue();
685 sub_queue->wait(tmr);
686 tmr->destroy();
687 }
688 if (requests[i].hasSubRequest())
689 ARCANE_THROW(NotImplementedException,"handling of sub requests");
690 requests[i].reset();
691 }
692}
693
694/*---------------------------------------------------------------------------*/
695/*---------------------------------------------------------------------------*/
696
697void SharedMemoryMessageQueue::
698waitSome(Int32 rank,ArrayView<Parallel::Request> requests,
699 ArrayView<bool> requests_done,bool is_non_blocking)
700{
701 requests_done.fill(false);
702 auto sub_queue = _getSubQueue(MessageRank(rank));
703 sub_queue->waitSome(requests,requests_done,is_non_blocking);
704}
705
706/*---------------------------------------------------------------------------*/
707/*---------------------------------------------------------------------------*/
708
709auto SharedMemoryMessageQueue::
710addReceive(const PointToPointMessageInfo& message,ReceiveBufferInfo buf) -> Request
711{
712 auto* sq = _getSourceSubQueue(message);
713 return _request(sq->addReceive(_getNextRequestId(),message,buf));
714}
715
716/*---------------------------------------------------------------------------*/
717/*---------------------------------------------------------------------------*/
718
719auto SharedMemoryMessageQueue::
720addSend(const PointToPointMessageInfo& message,SendBufferInfo buf) -> Request
721{
722 auto* sq = _getDestinationSubQueue(message);
723 return _request(sq->addSend(_getNextRequestId(),message,buf));
724}
725
726/*---------------------------------------------------------------------------*/
727/*---------------------------------------------------------------------------*/
728
729auto SharedMemoryMessageQueue::
730probe(const MP::PointToPointMessageInfo& message) -> MessageId
731{
732 auto* sq = _getSourceSubQueue(message);
733 return sq->probe(message);
734}
735
736/*---------------------------------------------------------------------------*/
737/*---------------------------------------------------------------------------*/
738
739auto SharedMemoryMessageQueue::
740legacyProbe(const MP::PointToPointMessageInfo& message) -> MessageSourceInfo
741{
742 auto* sq = _getSourceSubQueue(message);
743 return sq->legacyProbe(message);
744}
745
746/*---------------------------------------------------------------------------*/
747/*---------------------------------------------------------------------------*/
748
749void SharedMemoryMessageQueue::
750setTraceMng(Int32 rank,ITraceMng* tm)
751{
752 _getSubQueue(MessageRank(rank))->setTraceMng(tm);
753}
754
755/*---------------------------------------------------------------------------*/
756/*---------------------------------------------------------------------------*/
757
758auto SharedMemoryMessageQueue::
759_request(SharedMemoryMessageRequest* tmr) -> Request
760{
761 return Request(0,this,tmr);
762}
763
764/*---------------------------------------------------------------------------*/
765/*---------------------------------------------------------------------------*/
766
767} // End namespace Arcane::MessagePassing
768
769/*---------------------------------------------------------------------------*/
770/*---------------------------------------------------------------------------*/
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:149
static void genericCopy(ConstMemoryView from, MutableMemoryView to)
Copie générique utilisant platform::getDataMemoryRessourceMng()
File asynchrone permettant d'échanger des informations entre threads.
Definition IAsyncQueue.h:32
virtual void * pop()=0
Récupère la première valeur de la file et bloque s'il n'y en a pas.
virtual void push(void *v)=0
Ajoute v dans la file.
virtual void * tryPop()=0
Récupère la première valeur s'il y en. Retourne nullptr sinon.
Informations des buffers de réception.
File pour les messages d'un rang en mémoire partagée.
void _cleanupRequestIfDone(SharedMemoryMessageRequest *tmr)
Nettoyage de la requête tmr si elle est finie.
File de messages entre les rangs partagés par un SharedMemoryParallelMng.
void copyFromSender(SharedMemoryMessageRequest *sender)
Copie dans le message de réception les informations du message d'envoi.
Interface du gestionnaire de traces.
MessageSourceInfo sourceInfo() const
Informations sur la source du message;.
Definition MessageId.h:153
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
MessageRank emiterRank() const
Rang de l'émetteur du message.
MessageRank destinationRank() const
Rang de la destination du message.
bool isBlocking() const
Indique si le message est bloquant.
bool isRankTag() const
Vrai si l'instance a été créée avec un couple (rank,tag). Dans ce cas rank() et tag() sont valides.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Definition Parallel.h:94
Int32 Integer
Type représentant un entier.