Arcane  v3.15.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
HyodaTcp.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2022 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*****************************************************************************
8 * HyodaTcp.cc (C) 2012~2016 *
9 *****************************************************************************/
10#include <poll.h>
11#include <errno.h>
12#include <sys/socket.h>
13#include <unistd.h>
14
15#include "arcane/IMesh.h"
16#include "arcane/IApplication.h"
17#include "arcane/IParallelMng.h"
18#include "arcane/IVariableMng.h"
19#include "arcane/IApplication.h"
20#include "arcane/FactoryService.h"
21#include "arcane/ServiceFinder2.h"
22#include "arcane/SharedVariable.h"
23#include "arcane/CommonVariables.h"
24#include "arcane/utils/ScopedPtr.h"
25#include "arcane/AbstractService.h"
26#include "arcane/utils/ApplicationInfo.h"
27#include "arcane/utils/ValueConvert.h"
29#include "arcane/utils/PlatformUtils.h"
30#include "arcane/utils/IOnlineDebuggerService.h"
31#include "arcane/ITransferValuesParallelOperation.h"
32#include "arcane/VariableCollection.h"
33
34#include "arcane/hyoda/HyodaArc.h"
35#include "arcane/hyoda/HyodaTcp.h"
36#define VARIABLE_PACKET_MAX_LENGTH 4*1024
37
38
39/*---------------------------------------------------------------------------*/
40/*---------------------------------------------------------------------------*/
41
42ARCANE_BEGIN_NAMESPACE
43
44/*---------------------------------------------------------------------------*/
45/*---------------------------------------------------------------------------*/
46
47
48/******************************************************************************
49 * HyodaTcp
50 *****************************************************************************/
51HyodaTcp::
52HyodaTcp(Hyoda *_hyoda,ISubDomain *sd,ITraceMng *tm,Integer adrs,
53 Integer port, Integer pyld, bool break_at_startup)
54: TraceAccessor(tm)
55, hyoda(_hyoda)
56, m_sub_domain(sd)
57, m_payload(pyld)
58, m_sockfd(0)
59, m_servaddr(nullptr)
60, m_nfds(0)
61, m_fds(nullptr)
62{
63 ARCANE_UNUSED(break_at_startup);
64 if ((m_servaddr=(sockaddr_in *)malloc(sizeof(struct sockaddr_in)))==NULL)
65 ARCANE_FATAL("Could not allocate data space for sockaddr");
66 if ((m_fds=(pollfd *)malloc(sizeof(struct pollfd)))==NULL)
67 ARCANE_FATAL("Could not allocate data space for pollfd");
68 // Il y en a qu'un qui tente la connection
69 if (m_sub_domain->parallelMng()->commRank()!=0) return;
70 // internet protocol, pseudo protocol number
71 if ((m_sockfd=socket(PF_INET, SOCK_STREAM, 0))<0)
72 fatal()<<"[HyodaTcp::HyodaTcp] Error creating socket !";
73 memset(m_servaddr, 0, sizeof(struct sockaddr_in));
74 m_servaddr->sin_family=AF_INET;
75 m_servaddr->sin_port=htons(port);
76 //inet_aton("127.0.0.1", &m_servaddr->sin_addr);
77 m_servaddr->sin_addr.s_addr=adrs;
78 debug() << "[HyodaTcp::HyodaTcp] \33[7m @ " << m_servaddr->sin_addr.s_addr << " ?\33[m";
79 if (checkTcpError(connect(m_sockfd, (struct sockaddr *) m_servaddr, sizeof(struct sockaddr_in)))!=0)
80 fatal()<<"[HyodaTcp::HyodaTcp] Error connecting to socket!";
81 // Poll preparation
82 debug()<<"[HyodaTcp::HyodaTcp] \33[7mConnected!\33[m";
83 memset(m_fds, 0, sizeof(struct pollfd));
84 m_fds->fd=m_sockfd;
85 m_fds->events=POLLIN;
86 m_nfds=1;
87 // Un packet de handshaking est envoyé
88 handshake();
89 // Puis un packet des variables
90 getVariableCollectionAndSendItToHost();
91}
92
93HyodaTcp::
94~HyodaTcp()
95{
96 ::free(m_servaddr);
97 ::free(m_fds);
98}
99
100/******************************************************************************
101 * disconect
102 *****************************************************************************/
103void HyodaTcp::disconect(void){
104 debug()<<"\33[7m[HyodaTcp::disconect]\33[m";
105 //#warning HyodaTcp close should be done while disconnecting
106 //if (m_sockfd!=0) ::close(m_sockfd);
107}
108
109// ******************************************************************************
110// * Envoi des information de connection
111// * BaseForm[Hash["handshake", "CRC32"], 16] = e73b2e9c
112// *****************************************************************************
113void HyodaTcp::handshake(void){
114 int of7=4+4; // Header + taille
115 char pkt[VARIABLE_PACKET_MAX_LENGTH];
116
117 debug() << "[HyodaTcp::handshake]";
118 // On écrit le header 'HandShake' de QHyodaTcpSwitch
119 *(unsigned int*)&pkt[0]=0xe73b2e9cul;
120
121 // On pousse notre pid
122 *(unsigned int*)&pkt[of7]=platform::getProcessId();
123 debug() << "[HyodaTcp::handshake] pid=" << platform::getProcessId();//<<", pid in packet="<<*(unsigned int*)&pkt[of7];
124 of7+=4;
125
126 // On pousse notre hostname
127 // On écrit le nombre de caractères du hostname
128 String hostname=platform::getHostName();//.localstr();
129 *(unsigned int*)&pkt[of7]=hostname.len()+1;
130 if (sprintf(&pkt[of7+4],"%s%c",hostname.localstr(),'\0')!=(hostname.len()+1))
131 fatal()<<"Error pushing hostname into packet !";
132 debug() << "[HyodaTcp::handshake] hostname=" << platform::getHostName();
133 of7+=4+hostname.len()+1;
134
135 // On trouve la ligne de commande, que l'on soit broadcasté ou pas
136 String hyoda_bridge_broadcast_original_cmd=platform::getEnvironmentVariable("BRIDGE_BROADCAST_ORIGINAL_CMD");
137 if (hyoda_bridge_broadcast_original_cmd!=NULL){
138 debug() << "[HyodaTcp::handshake] Bridged from:"<<hyoda_bridge_broadcast_original_cmd;
139 *(unsigned int*)&pkt[of7]=hyoda_bridge_broadcast_original_cmd.len()+1;
140 if (sprintf(&pkt[of7+4], "%s%c",hyoda_bridge_broadcast_original_cmd.localstr(),'\0')
141 !=(hyoda_bridge_broadcast_original_cmd.len()+1))
142 fatal()<<"Error pushing hyoda_bridge_broadcast_original_cmd into packet !";
143 of7+=hyoda_bridge_broadcast_original_cmd.len()+1;
144 }else{
145 const ApplicationInfo& app_info = hyoda->application()->applicationInfo();
146 //int argc = *app_info.commandLineArgc();
147 char** argv = *app_info.commandLineArgv();
148 //for(int i=0;i<argc;++i) debug()<<argv[i];
149 *(unsigned int*)&pkt[of7]=strlen(argv[0])+1;
150 if (sprintf(&pkt[of7+4],"%s%c",argv[0],'\0')!=(strlen(argv[0])+1))
151 fatal()<<"Error pushing commandLineArgv into packet !";
152 debug() << "[HyodaTcp::handshake] command line:"<<argv[0];
153 of7+=strlen(argv[0])+1;
154 }
155 of7+=4;
156
157 // On récupère le SLURM_ID
158 String slurm_job_id=platform::getEnvironmentVariable("SLURM_JOB_ID");
159 if (slurm_job_id!=NULL){
160 Integer sjobid=0;
161 if (!builtInGetValue(sjobid,slurm_job_id)){
162 if (m_sub_domain->parallelMng()->commRank()==0) debug()<<"\33[7m[Hyoda] slurm_job_id="<<sjobid<<"\33[m";
163 debug() << "[HyodaTcp::handshake] SLURM_JOB_ID=" << sjobid;
164 *(unsigned int*)&pkt[of7]=sjobid;
165 }else{
166 debug() << "[HyodaTcp::handshake] SLURM_JOB_ID but no builtInGetValue";
167 *(unsigned int*)&pkt[of7]=0;
168 }
169 }else{
170 debug() << "[HyodaTcp::handshake] not Slurm'ed";
171 *(unsigned int*)&pkt[of7]=0;
172 }
173 of7+=4;
174
175 // On pousse la taille du paquet
176 *(unsigned int*)&pkt[4]=of7;
177
178 // On envoi le packet
179 send(pkt,of7);
180
181 // Et on attend la réponse de QHyoda
182 waitForAcknowledgment(); // venant du QHyodaTcp::Sleeping
183 //waitForAcknowledgment(); // puis du QHyodaTcp::HandShake
184}
185
186
187/******************************************************************************
188 * Récupération de la liste des variables affichables
189 *****************************************************************************/
190void HyodaTcp::
191getVariableCollectionAndSendItToHost(void)
192{
193 int varPacketOffset=4+4;
194 char varPacket[VARIABLE_PACKET_MAX_LENGTH];
195 VariableCollection variables = m_sub_domain->variableMng()->usedVariables();
196 // On écrit le 'VariableName' QHyodaTcpSwitch
197 *(unsigned int*)&varPacket[0]=0xca6cd6f0ul;
198 debug()<< "Variables count=" << variables.count();
199 for(VariableCollection::Enumerator ivar(variables); ++ivar; ){
200 IVariable* var = *ivar;
201 debug() << "[HyodaTcp::getVariableCollectionAndSendItToHost]"
202 << "Focusing variable"
203 << "name=" << var->name().localstr();
204 // Pas de références, pas de variable
205 if (var->nbReference()==0) {debug() << "No nbReference"; continue;}
206 // Pas sur le bon support, pas de variable
207 if (var->itemKind()!=IK_Node &&
208 var->itemKind()!=IK_Cell &&
209 var->itemKind()!=IK_Face &&
210 var->itemKind() != IK_Particle) continue;
211 // Pas réclamée en tant que PostProcess'able, pas de variable
212 if (var->itemKind()!=IK_Particle && (!var->hasTag("PostProcessing")))
213 {debug() << "No PostProcessing"; continue;}
214 // Pas de type non supportés
215 //if (var->dataType()>=DT_String) continue;
216 if (var->dataType()!=DT_Real) continue;
217 debug() << "[HyodaTcp::getVariableCollectionAndSendItToHost] Found variable"
218 << " name=" <<var->name().localstr()
219 << ", dataType=" <<dataTypeName(var->dataType())
220 << ", fullName=" <<var->fullName()
221 << ", family=" <<var->itemFamilyName()
222 << ", isUsed=" << var->isUsed()
223 << ", nbReference=" << var->nbReference()
224 << ", dimension=" << var->dimension()
225 << ", isPartial=" <<var->isPartial()
226 << ", var.hasTag(\"PostProcessing\")"<<var->hasTag("PostProcessing");
227 // On écrit le nombre de caractères du nom de la variable
228 //debug() << "[getVariableCollectionAndSendItToHost] var->name().len()+1="<<var->name().len()+1;
229 *(unsigned int*)&varPacket[varPacketOffset]=var->name().len()+1;
230 // On écrit le nom de la variable
231 debug() << "[HyodaTcp::getVariableCollectionAndSendItToHost] \33[7mVariable: "<<var->name()<<"\33[m";
232 if (sprintf(&varPacket[varPacketOffset+4],"%s%c",var->name().localstr(),'\0')!=(var->name().len()+1))
233 fatal()<<"Error pushing variable name into packet !";
234 ARCANE_ASSERT(varPacketOffset+4+var->name().len()+1<VARIABLE_PACKET_MAX_LENGTH, ("VARIABLE_PACKET_MAX_LENGTH"));
235 varPacketOffset+=4+var->name().len()+1;
236 //debug() << "number_of_characters_printed="<<number_of_characters_printed;
237 //for(int i=0;i<varPacketHeaderLen;++i) debug()<<"\tvariablePacket["<<i<<"]="<<varPacketHeader[i];
238 }
239 // On pousse la taille du paquet
240 *(unsigned int*)&varPacket[4]=varPacketOffset;
241 //debug() << "[getVariableCollectionAndSendItToHost] varPacketLength="<<varPacketOffset<<"o";
242 if (m_sub_domain->parallelMng()->commRank()==0)
243 send(varPacket,varPacketOffset);
244 waitForAcknowledgment(); // venant du QHyodaTcp::VariableNameleeping
245}
246
247
248
249
250/******************************************************************************
251 * send
252 *****************************************************************************/
253void HyodaTcp::
254send(const void *data, size_t nleft)
255{
256 size_t payload = m_payload;
257 size_t offset = 0;
258 debug() << "\33[7m[HyodaTcp::send] sending "<< nleft << " bytes of data, payload=" <<m_payload << "\33[m";
259 while (nleft>0){
260 if (nleft<payload) payload=nleft;
261 if (wData(m_sockfd, (unsigned char*)data+offset, payload)!=payload)
262 fatal()<<"Error sending into socket !";
263 nleft -= payload;
264 offset += payload;
265 }
266 ARCANE_ASSERT(nleft==0, ("[send] nleft!=0"));
267 debug() << "\33[7m[HyodaTcp::send] done\33[m";
268 ::fsync(m_sockfd);
269}
270
271
272/******************************************************************************
273 * Write a line to a socket
274 *****************************************************************************/
275ssize_t HyodaTcp::
276wData(int sockd, const void *vptr, size_t n)
277{
278 size_t nleft=n;
279 ssize_t nwritten;
280 const char *buffer=(char*)vptr;
281 while (nleft>0) {
282 if ((nwritten = write(sockd, buffer, nleft))<=0){
283 if (errno==EINTR) nwritten=0;
284 else return -1;
285 }
286 nleft -= nwritten;
287 buffer += nwritten;
288 }
289 return n;
290}
291
292
293// ******************************************************************************
294// * recv
295// * The timeout argument specifies an upper limit on the time for which poll()
296// * will block, in milliseconds.
297// * Specifying a negative value in timeout means an infinite timeout.
298// *****************************************************************************
299void HyodaTcp::recvPov(double *pov){
300 recvPacket((char*)pov, 8*(1+3+1+1), -1);
301 debug() << "\33[7m[HyodaTcp::recvPov] ok\33[m";
302 //sendAcknowledgmentPacket();
303}
304
305void HyodaTcp::recvPov(double *pov, int ms_timeout){
306 recvPacket((char*)pov, 8*(1+3+1+1), ms_timeout);
307 debug() <<"\33[7m[HyodaTcp::recv] pov "
308 << " scale=" << pov[0]
309 << " rot_x=" << pov[1]
310 << " rot_y=" << pov[2]
311 << " rot_z=" << pov[3]
312 << " idx=" << pov[4]
313 << " plg=" << pov[5]
314 <<"\33[m";
315}
316
317void HyodaTcp::sendAcknowledgmentPacket(void){
318 // Il faut au moins 8o pour que le Sleeping state se déclenche
319 char ack[8];
320 *(unsigned int*)&ack[0]=0x3e9ff203ul;
321 *(unsigned int*)&ack[4]=0;
322 debug() << "\33[7m[HyodaTcp::sendAcknowledgmentPacket] ...\33[m";
323 send(ack, 8);
324 debug() << "\33[7m[HyodaTcp::sendAcknowledgmentPacket] !\33[m";
325}
326
327void HyodaTcp::waitForAcknowledgment(void){
328 char ack[4];
329 debug() << "\33[7m[HyodaTcp::waitForAcknowledgment] ?\33[m";
330 recvPacket((char*)&ack[0], 4, -1);
331 debug() << "\33[7m[HyodaTcp::waitForAcknowledgment] !\33[m";
332 if ((*(unsigned int*)&ack[0]) != 0x3e9ff203ul)
333 fatal() << "HyodaTcp::waitForAcknowledgment, ack[0]="<<*(unsigned int*)&ack[0];
334 debug() << "\33[7m[HyodaTcp::waitForAcknowledgment] ok\33[m";
335}
336
337void HyodaTcp::recvPacket(char *pov, int maxSize, int ms_timeout){
338 int returned_events;
339 debug() << "\33[7m[HyodaTcp::recvPacket] ...\33[m";
340 do{
341 returned_events=poll(m_fds, m_nfds, ms_timeout);
342 } while(returned_events==-1 && errno == EINTR);
343 if (returned_events==-1) fatal()<<"Error polling socket !";
344 if (returned_events==0) debug()<<"Timeout polling socket !";
345 if (!(m_fds->revents && POLLIN)) return;
346 rData(m_sockfd, (char*)&pov[0], maxSize);
347 debug() << "\33[7m[HyodaTcp::recvPacket] !\33[m";
348}
349
350
351/******************************************************************************
352 * Read a line from a socket
353 *****************************************************************************/
354ssize_t HyodaTcp::rData(int sockd, void *vptr, size_t maxlen){
355 ssize_t n, rc;
356 char c, *buffer=(char*)vptr;
357 for(n=0; n<maxlen; n++){
358 if ((rc = read(sockd, &c, 1))==1){
359 //debug() << "got char '" << c <<"'";
360 *buffer++ = c;
361 }
362 else if (rc==0) { // end of file
363 //debug() << "eof";
364 if (n==0){
365 //debug() << "n==0, returning 0";
366 return 0;
367 }
368 else break;
369 }
370 else {
371 if (errno==EINTR){
372 //debug() << "EINTR, continue";
373 continue; // The call was interrupted by a signal before any data was read.
374 }
375 //debug() << "returning -1";
376 return -1;
377 }
378 }
379 // Attention, dans le cas exact, il ne faut pas déborder!
380 //*buffer = 0;
381 return n;
382}
383
384
385/******************************************************************************
386 * checkTcpError
387 *****************************************************************************/
388int HyodaTcp::checkTcpError(int error){
389 if (error>=0) return error;
390 switch (errno) {
391 case EACCES:
392 debug() << "\33[7m" << "EACCES" << "\33[m:" << "Write permission is denied on the socket.";
393 break;
394 case EPERM:
395 debug() << "\33[7m" << "EPERM" << "\33[m:"
396 << "The user tried to connect to a broadcast address without having the socket broadcast flag enabled"
397 << "or the connection request failed because of a local firewall rule.";
398 break;
399 case EADDRINUSE:
400 debug() << "\33[7m" << "EADDRINUSE" << "\33[m:"
401 << "Local address is already in use.";
402 break;
403 case EAFNOSUPPORT:
404 debug() << "\33[7m" << "EAFNOSUPPORT" << "\33[m:"
405 <<"The passed address didn't have the correct address family in its sa_family field.";
406 break;
407 case EADDRNOTAVAIL:
408 debug() << "\33[7m" << "EADDRNOTAVAIL" << "\33[m:"
409 << "Non-existent interface was requested or the requested address was not local.";
410 break;
411 case EALREADY:
412 debug() << "\33[7m" << "EALREADY" << "\33[m:"
413 << "The socket is non-blocking and a previous connection attempt has not yet been completed.";
414 break;
415 case EBADF:
416 debug() << "\33[7m" << "EBADF" << "\33[m:"
417 << " The file descriptor is not a valid index in the descriptor table.";
418 break;
419 case ECONNREFUSED:
420 debug() << "\33[7m" << "ECONNREFUSED" << "\33[m:"
421 << "No one listening on the remote address.";
422 break;
423 case EFAULT:
424 debug() << "\33[7m" << "EFAULT" << "\33[m:"
425 <<"The socket structure address is outside the user's address space.";
426 break;
427 case EINPROGRESS:
428 debug() << "\33[7m" << "EINPROGRESS" << "\33[m:"
429 <<"The socket is non-blocking and the connection cannot be completed immediately.";
430 break;
431 case EINTR:
432 debug() << "\33[7m" << "EINTR" << "\33[m:"
433 << "The system call was interrupted by a signal that was caught.";
434 break;
435 case EISCONN:
436 debug() << "\33[7m" << "EISCONN" << "\33[m:"
437 <<"The socket is already connected.";
438 break;
439 case ENETUNREACH:
440 debug() << "\33[7m" << "ENETUNREACH" << "\33[m:"
441 << "Network is unreachable.";
442 break;
443 case ENOTSOCK:
444 debug() << "\33[7m" << "ENOTSOCK" << "\33[m:"
445 << "The file descriptor is not associated with a socket.";
446 break;
447 case ETIMEDOUT:
448 debug() << "\33[7m" << "ETIMEDOUT" << "\33[m:"
449 <<"Timeout while attempting connection.";
450 break;
451 default: debug()<<"## UNKNOWN ERROR CODE error="<<error<<", errno="<<errno;
452 }
453 return error;
454}
455
456
457/*---------------------------------------------------------------------------*/
458/*---------------------------------------------------------------------------*/
459
460ARCANE_END_NAMESPACE
461
462/*---------------------------------------------------------------------------*/
463/*---------------------------------------------------------------------------*/
Fichier de configuration d'Arcane.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
const char * dataTypeName(eDataType type)
Nom du type de donnée.
Definition DataTypes.cc:70
Int32 Integer
Type représentant un entier.