Fix: relayd: create_index_file error handling
[lttng-tools.git] / src / common / relayd / relayd.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
6c1c0768 18#define _LGPL_SOURCE
00e2e675
DG
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
77c7c900 24#include <inttypes.h>
00e2e675
DG
25
26#include <common/common.h>
27#include <common/defaults.h>
f263b7fd 28#include <common/compat/endian.h>
27283937 29#include <common/compat/string.h>
00e2e675 30#include <common/sessiond-comm/relayd.h>
50adc264 31#include <common/index/ctf-index.h>
d2956687 32#include <common/trace-chunk.h>
c35f9726 33#include <common/string-utils/format.h>
00e2e675
DG
34
35#include "relayd.h"
36
37/*
38 * Send command. Fill up the header and append the data.
39 */
6151a90f 40static int send_command(struct lttcomm_relayd_sock *rsock,
76b9afaa 41 enum lttcomm_relayd_command cmd, const void *data, size_t size,
00e2e675
DG
42 int flags)
43{
44 int ret;
45 struct lttcomm_relayd_hdr header;
46 char *buf;
47 uint64_t buf_size = sizeof(header);
48
f96e4545
MD
49 if (rsock->sock.fd < 0) {
50 return -ECONNRESET;
51 }
52
00e2e675
DG
53 if (data) {
54 buf_size += size;
55 }
56
57 buf = zmalloc(buf_size);
58 if (buf == NULL) {
59 PERROR("zmalloc relayd send command buf");
60 ret = -1;
61 goto alloc_error;
62 }
63
53efb85a 64 memset(&header, 0, sizeof(header));
00e2e675
DG
65 header.cmd = htobe32(cmd);
66 header.data_size = htobe64(size);
67
68 /* Zeroed for now since not used. */
69 header.cmd_version = 0;
70 header.circuit_id = 0;
71
72 /* Prepare buffer to send. */
73 memcpy(buf, &header, sizeof(header));
74 if (data) {
75 memcpy(buf + sizeof(header), data, size);
76 }
77
06586bbe 78 DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
6151a90f 79 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
00e2e675 80 if (ret < 0) {
06586bbe
JG
81 PERROR("Failed to send command %d of size %" PRIu64,
82 (int) cmd, buf_size);
8994307f 83 ret = -errno;
00e2e675
DG
84 goto error;
85 }
00e2e675
DG
86error:
87 free(buf);
88alloc_error:
89 return ret;
90}
91
92/*
93 * Receive reply data on socket. This MUST be call after send_command or else
94 * could result in unexpected behavior(s).
95 */
6151a90f 96static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
00e2e675
DG
97{
98 int ret;
99
f96e4545
MD
100 if (rsock->sock.fd < 0) {
101 return -ECONNRESET;
102 }
103
8fd623e0 104 DBG3("Relayd waiting for reply of size %zu", size);
00e2e675 105
6151a90f 106 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
20275fe8
DG
107 if (ret <= 0 || ret != size) {
108 if (ret == 0) {
109 /* Orderly shutdown. */
6151a90f 110 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
20275fe8 111 } else {
8fd623e0 112 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
6151a90f 113 rsock->sock.fd, size, ret);
20275fe8
DG
114 }
115 /* Always return -1 here and the caller can use errno. */
116 ret = -1;
00e2e675
DG
117 goto error;
118 }
119
120error:
121 return ret;
122}
123
d3e2ba59 124/*
6fa5fe7c
MD
125 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
126 * hostname, and base_path) have no length restriction on the sender side.
f86f6389
JR
127 * Length for both payloads is stored in the msg struct. A new dynamic size
128 * payload size is introduced.
129 */
130static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
fb9a95c4 131 const char *session_name, const char *hostname,
6fa5fe7c
MD
132 const char *base_path, int session_live_timer,
133 unsigned int snapshot, uint64_t sessiond_session_id,
134 const lttng_uuid sessiond_uuid, const uint64_t *current_chunk_id,
46ef2188 135 time_t creation_time, bool session_name_contains_creation_time)
f86f6389
JR
136{
137 int ret;
138 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
139 size_t session_name_len;
140 size_t hostname_len;
6fa5fe7c 141 size_t base_path_len;
f86f6389 142 size_t msg_length;
6fa5fe7c 143 char *dst;
f86f6389 144
46ef2188
MD
145 if (!base_path) {
146 base_path = "";
147 }
148 /* The three names are sent with a '\0' delimiter between them. */
f86f6389
JR
149 session_name_len = strlen(session_name) + 1;
150 hostname_len = strlen(hostname) + 1;
6fa5fe7c 151 base_path_len = base_path ? strlen(base_path) + 1 : 0;
f86f6389 152
6fa5fe7c 153 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
f86f6389
JR
154 msg = zmalloc(msg_length);
155 if (!msg) {
156 PERROR("zmalloc create_session_2_11 command message");
157 ret = -1;
158 goto error;
159 }
160
161 assert(session_name_len <= UINT32_MAX);
162 msg->session_name_len = htobe32(session_name_len);
163
164 assert(hostname_len <= UINT32_MAX);
165 msg->hostname_len = htobe32(hostname_len);
166
6fa5fe7c
MD
167 assert(base_path_len <= UINT32_MAX);
168 msg->base_path_len = htobe32(base_path_len);
169
170 dst = msg->names;
171 if (lttng_strncpy(dst, session_name, session_name_len)) {
172 ret = -1;
173 goto error;
174 }
175 dst += session_name_len;
176 if (lttng_strncpy(dst, hostname, hostname_len)) {
f86f6389
JR
177 ret = -1;
178 goto error;
179 }
6fa5fe7c
MD
180 dst += hostname_len;
181 if (base_path && lttng_strncpy(dst, base_path, base_path_len)) {
f86f6389
JR
182 ret = -1;
183 goto error;
184 }
185
186 msg->live_timer = htobe32(session_live_timer);
187 msg->snapshot = !!snapshot;
188
658f12fa
JG
189 lttng_uuid_copy(msg->sessiond_uuid, sessiond_uuid);
190 msg->session_id = htobe64(sessiond_session_id);
46ef2188 191 msg->session_name_contains_creation_time = session_name_contains_creation_time;
f39bd140
JG
192 if (current_chunk_id) {
193 LTTNG_OPTIONAL_SET(&msg->current_chunk_id,
194 htobe64(*current_chunk_id));
195 }
196
db1da059
JG
197 msg->creation_time = htobe64((uint64_t) creation_time);
198
f86f6389
JR
199 /* Send command */
200 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
201 if (ret < 0) {
202 goto error;
203 }
204error:
205 free(msg);
206 return ret;
207}
208/*
209 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
d3e2ba59
JD
210 * support the live reading capability.
211 */
212static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
fb9a95c4
JG
213 const char *session_name, const char *hostname,
214 int session_live_timer, unsigned int snapshot)
d3e2ba59
JD
215{
216 int ret;
217 struct lttcomm_relayd_create_session_2_4 msg;
218
3a13ffd5
MD
219 if (lttng_strncpy(msg.session_name, session_name,
220 sizeof(msg.session_name))) {
246777db
MD
221 ret = -1;
222 goto error;
223 }
3a13ffd5 224 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
246777db
MD
225 ret = -1;
226 goto error;
227 }
d3e2ba59 228 msg.live_timer = htobe32(session_live_timer);
7d2f7452 229 msg.snapshot = htobe32(snapshot);
d3e2ba59
JD
230
231 /* Send command */
232 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
233 if (ret < 0) {
234 goto error;
235 }
236
237error:
238 return ret;
239}
240
241/*
242 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
243 */
42e9a27b 244static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock)
d3e2ba59
JD
245{
246 int ret;
247
248 /* Send command */
249 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
250 if (ret < 0) {
251 goto error;
252 }
253
254error:
255 return ret;
256}
257
c5b6f4f0
DG
258/*
259 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
260 * set session_id of the relayd if we have a successful reply from the relayd.
261 *
20275fe8
DG
262 * On success, return 0 else a negative value which is either an errno error or
263 * a lttng error code from the relayd.
c5b6f4f0 264 */
fb9a95c4
JG
265int relayd_create_session(struct lttcomm_relayd_sock *rsock,
266 uint64_t *relayd_session_id,
267 const char *session_name, const char *hostname,
6fa5fe7c 268 const char *base_path, int session_live_timer,
658f12fa 269 unsigned int snapshot, uint64_t sessiond_session_id,
f39bd140 270 const lttng_uuid sessiond_uuid,
db1da059 271 const uint64_t *current_chunk_id,
46ef2188 272 time_t creation_time, bool session_name_contains_creation_time)
c5b6f4f0
DG
273{
274 int ret;
275 struct lttcomm_relayd_status_session reply;
276
6151a90f 277 assert(rsock);
658f12fa 278 assert(relayd_session_id);
c5b6f4f0
DG
279
280 DBG("Relayd create session");
281
f86f6389
JR
282 if (rsock->minor < 4) {
283 /* From 2.1 to 2.3 */
284 ret = relayd_create_session_2_1(rsock);
285 } else if (rsock->minor >= 4 && rsock->minor < 11) {
286 /* From 2.4 to 2.10 */
287 ret = relayd_create_session_2_4(rsock, session_name,
288 hostname, session_live_timer, snapshot);
289 } else {
290 /* From 2.11 to ... */
291 ret = relayd_create_session_2_11(rsock, session_name,
6fa5fe7c 292 hostname, base_path, session_live_timer, snapshot,
f39bd140 293 sessiond_session_id, sessiond_uuid,
46ef2188
MD
294 current_chunk_id, creation_time,
295 session_name_contains_creation_time);
d3e2ba59
JD
296 }
297
c5b6f4f0
DG
298 if (ret < 0) {
299 goto error;
300 }
301
20275fe8 302 /* Receive response */
6151a90f 303 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c5b6f4f0
DG
304 if (ret < 0) {
305 goto error;
306 }
307
308 reply.session_id = be64toh(reply.session_id);
309 reply.ret_code = be32toh(reply.ret_code);
310
311 /* Return session id or negative ret code. */
312 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
313 ret = -1;
314 ERR("Relayd create session replied error %d", reply.ret_code);
c5b6f4f0
DG
315 goto error;
316 } else {
317 ret = 0;
658f12fa 318 *relayd_session_id = reply.session_id;
c5b6f4f0
DG
319 }
320
321 DBG("Relayd session created with id %" PRIu64, reply.session_id);
322
323error:
324 return ret;
325}
326
2f21a469
JR
327static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
328 const char *channel_name, const char *pathname)
329{
330 int ret;
331 struct lttcomm_relayd_add_stream msg;
332
333 memset(&msg, 0, sizeof(msg));
334 if (lttng_strncpy(msg.channel_name, channel_name,
335 sizeof(msg.channel_name))) {
336 ret = -1;
337 goto error;
338 }
339
340 if (lttng_strncpy(msg.pathname, pathname,
341 sizeof(msg.pathname))) {
342 ret = -1;
343 goto error;
344 }
345
346 /* Send command */
347 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
348 if (ret < 0) {
349 ret = -1;
350 goto error;
351 }
352 ret = 0;
353error:
354 return ret;
355}
356
357static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
358 const char *channel_name, const char *pathname,
359 uint64_t tracefile_size, uint64_t tracefile_count)
360{
361 int ret;
362 struct lttcomm_relayd_add_stream_2_2 msg;
363
364 memset(&msg, 0, sizeof(msg));
365 /* Compat with relayd 2.2 to 2.10 */
366 if (lttng_strncpy(msg.channel_name, channel_name,
367 sizeof(msg.channel_name))) {
368 ret = -1;
369 goto error;
370 }
371 if (lttng_strncpy(msg.pathname, pathname,
372 sizeof(msg.pathname))) {
373 ret = -1;
374 goto error;
375 }
376 msg.tracefile_size = htobe64(tracefile_size);
377 msg.tracefile_count = htobe64(tracefile_count);
378
379 /* Send command */
380 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
381 if (ret < 0) {
382 goto error;
383 }
384 ret = 0;
385error:
386 return ret;
387}
388
389static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
390 const char *channel_name, const char *pathname,
b00e554e
JG
391 uint64_t tracefile_size, uint64_t tracefile_count,
392 uint64_t trace_archive_id)
2f21a469
JR
393{
394 int ret;
395 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
396 size_t channel_name_len;
397 size_t pathname_len;
398 size_t msg_length;
399
400 /* The two names are sent with a '\0' delimiter between them. */
401 channel_name_len = strlen(channel_name) + 1;
402 pathname_len = strlen(pathname) + 1;
403
404 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
405 msg = zmalloc(msg_length);
406 if (!msg) {
407 PERROR("zmalloc add_stream_2_11 command message");
408 ret = -1;
409 goto error;
410 }
411
412 assert(channel_name_len <= UINT32_MAX);
413 msg->channel_name_len = htobe32(channel_name_len);
414
415 assert(pathname_len <= UINT32_MAX);
416 msg->pathname_len = htobe32(pathname_len);
417
418 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
419 ret = -1;
420 goto error;
421 }
422 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
423 ret = -1;
424 goto error;
425 }
426
427 msg->tracefile_size = htobe64(tracefile_size);
428 msg->tracefile_count = htobe64(tracefile_count);
348a81dc 429 msg->trace_chunk_id = htobe64(trace_archive_id);
2f21a469
JR
430
431 /* Send command */
432 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
433 if (ret < 0) {
434 goto error;
435 }
436 ret = 0;
437error:
438 free(msg);
439 return ret;
440}
441
00e2e675
DG
442/*
443 * Add stream on the relayd and assign stream handle to the stream_id argument.
444 *
445 * On success return 0 else return ret_code negative value.
446 */
6151a90f 447int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
0f907de1 448 const char *pathname, uint64_t *stream_id,
0b50e4b3 449 uint64_t tracefile_size, uint64_t tracefile_count,
d2956687 450 struct lttng_trace_chunk *trace_chunk)
00e2e675
DG
451{
452 int ret;
00e2e675
DG
453 struct lttcomm_relayd_status_stream reply;
454
455 /* Code flow error. Safety net. */
6151a90f 456 assert(rsock);
00e2e675
DG
457 assert(channel_name);
458 assert(pathname);
459
460 DBG("Relayd adding stream for channel name %s", channel_name);
461
0f907de1
JD
462 /* Compat with relayd 2.1 */
463 if (rsock->minor == 1) {
2f21a469 464 /* For 2.1 */
d2956687 465 assert(!trace_chunk);
2f21a469
JR
466 ret = relayd_add_stream_2_1(rsock, channel_name, pathname);
467
468 } else if (rsock->minor > 1 && rsock->minor < 11) {
469 /* From 2.2 to 2.10 */
d2956687 470 assert(!trace_chunk);
2f21a469
JR
471 ret = relayd_add_stream_2_2(rsock, channel_name, pathname,
472 tracefile_size, tracefile_count);
0f907de1 473 } else {
d2956687
JG
474 enum lttng_trace_chunk_status chunk_status;
475 uint64_t chunk_id;
476
477 assert(trace_chunk);
478 chunk_status = lttng_trace_chunk_get_id(trace_chunk,
479 &chunk_id);
480 assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
481
2f21a469
JR
482 /* From 2.11 to ...*/
483 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
b00e554e 484 tracefile_size, tracefile_count,
d2956687 485 chunk_id);
2f21a469 486 }
0f907de1 487
2f21a469
JR
488 if (ret) {
489 ret = -1;
490 goto error;
00e2e675
DG
491 }
492
633d0084 493 /* Waiting for reply */
6151a90f 494 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
00e2e675
DG
495 if (ret < 0) {
496 goto error;
497 }
498
499 /* Back to host bytes order. */
500 reply.handle = be64toh(reply.handle);
501 reply.ret_code = be32toh(reply.ret_code);
502
503 /* Return session id or negative ret code. */
f73fabfd 504 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
505 ret = -1;
506 ERR("Relayd add stream replied error %d", reply.ret_code);
00e2e675
DG
507 } else {
508 /* Success */
509 ret = 0;
510 *stream_id = reply.handle;
511 }
512
77c7c900 513 DBG("Relayd stream added successfully with handle %" PRIu64,
2f21a469 514 reply.handle);
00e2e675
DG
515
516error:
517 return ret;
518}
519
a4baae1b
JD
520/*
521 * Inform the relay that all the streams for the current channel has been sent.
522 *
523 * On success return 0 else return ret_code negative value.
524 */
525int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
526{
527 int ret;
528 struct lttcomm_relayd_generic_reply reply;
529
530 /* Code flow error. Safety net. */
531 assert(rsock);
532
533 DBG("Relayd sending streams sent.");
534
535 /* This feature was introduced in 2.4, ignore it for earlier versions. */
536 if (rsock->minor < 4) {
537 ret = 0;
538 goto end;
539 }
540
541 /* Send command */
542 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
543 if (ret < 0) {
544 goto error;
545 }
546
547 /* Waiting for reply */
548 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
549 if (ret < 0) {
550 goto error;
551 }
552
553 /* Back to host bytes order. */
554 reply.ret_code = be32toh(reply.ret_code);
555
556 /* Return session id or negative ret code. */
557 if (reply.ret_code != LTTNG_OK) {
558 ret = -1;
559 ERR("Relayd streams sent replied error %d", reply.ret_code);
560 goto error;
561 } else {
562 /* Success */
563 ret = 0;
564 }
565
566 DBG("Relayd streams sent success");
567
568error:
569end:
570 return ret;
571}
572
00e2e675
DG
573/*
574 * Check version numbers on the relayd.
d4519fa3
JD
575 * If major versions are compatible, we assign minor_to_use to the
576 * minor version of the procotol we are going to use for this session.
00e2e675 577 *
67d5aa28
JD
578 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
579 * otherwise, or a negative value on network errors.
00e2e675 580 */
6151a90f 581int relayd_version_check(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
582{
583 int ret;
092b6259 584 struct lttcomm_relayd_version msg;
00e2e675
DG
585
586 /* Code flow error. Safety net. */
6151a90f 587 assert(rsock);
00e2e675 588
6151a90f
JD
589 DBG("Relayd version check for major.minor %u.%u", rsock->major,
590 rsock->minor);
00e2e675 591
53efb85a 592 memset(&msg, 0, sizeof(msg));
092b6259 593 /* Prepare network byte order before transmission. */
6151a90f
JD
594 msg.major = htobe32(rsock->major);
595 msg.minor = htobe32(rsock->minor);
092b6259 596
00e2e675 597 /* Send command */
6151a90f 598 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
00e2e675
DG
599 if (ret < 0) {
600 goto error;
601 }
602
20275fe8 603 /* Receive response */
6151a90f 604 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
00e2e675
DG
605 if (ret < 0) {
606 goto error;
607 }
608
609 /* Set back to host bytes order */
092b6259
DG
610 msg.major = be32toh(msg.major);
611 msg.minor = be32toh(msg.minor);
612
613 /*
614 * Only validate the major version. If the other side is higher,
615 * communication is not possible. Only major version equal can talk to each
616 * other. If the minor version differs, the lowest version is used by both
617 * sides.
092b6259 618 */
6151a90f 619 if (msg.major != rsock->major) {
d4519fa3 620 /* Not compatible */
67d5aa28 621 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
d4519fa3 622 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
6151a90f 623 msg.major, rsock->major);
092b6259 624 goto error;
00e2e675
DG
625 }
626
092b6259 627 /*
6151a90f
JD
628 * If the relayd's minor version is higher, it will adapt to our version so
629 * we can continue to use the latest relayd communication data structure.
630 * If the received minor version is higher, the relayd should adapt to us.
092b6259 631 */
6151a90f
JD
632 if (rsock->minor > msg.minor) {
633 rsock->minor = msg.minor;
d4519fa3 634 }
092b6259 635
d4519fa3
JD
636 /* Version number compatible */
637 DBG2("Relayd version is compatible, using protocol version %u.%u",
6151a90f 638 rsock->major, rsock->minor);
d4519fa3 639 ret = 0;
00e2e675
DG
640
641error:
642 return ret;
643}
644
00e2e675
DG
645/*
646 * Add stream on the relayd and assign stream handle to the stream_id argument.
647 *
648 * On success return 0 else return ret_code negative value.
649 */
6151a90f 650int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
00e2e675
DG
651{
652 int ret;
653
654 /* Code flow error. Safety net. */
6151a90f 655 assert(rsock);
00e2e675 656
77c7c900 657 DBG("Relayd sending metadata of size %zu", len);
00e2e675
DG
658
659 /* Send command */
6151a90f 660 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
00e2e675
DG
661 if (ret < 0) {
662 goto error;
663 }
664
665 DBG2("Relayd metadata added successfully");
666
667 /*
668 * After that call, the metadata data MUST be sent to the relayd so the
669 * receive size on the other end matches the len of the metadata packet
633d0084 670 * header. This is why we don't wait for a reply here.
00e2e675
DG
671 */
672
673error:
674 return ret;
675}
676
677/*
6151a90f 678 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
00e2e675 679 */
6151a90f 680int relayd_connect(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
681{
682 /* Code flow error. Safety net. */
6151a90f 683 assert(rsock);
00e2e675 684
f96e4545
MD
685 if (!rsock->sock.ops) {
686 /*
687 * Attempting a connect on a non-initialized socket.
688 */
689 return -ECONNRESET;
690 }
691
00e2e675
DG
692 DBG3("Relayd connect ...");
693
6151a90f 694 return rsock->sock.ops->connect(&rsock->sock);
00e2e675
DG
695}
696
697/*
6151a90f 698 * Close relayd socket with an allocated lttcomm_relayd_sock.
ffe60014
DG
699 *
700 * If no socket operations are found, simply return 0 meaning that everything
701 * is fine. Without operations, the socket can not possibly be opened or used.
702 * This is possible if the socket was allocated but not created. However, the
703 * caller could simply use it to store a valid file descriptor for instance
704 * passed over a Unix socket and call this to cleanup but still without a valid
705 * ops pointer.
706 *
707 * Return the close returned value. On error, a negative value is usually
708 * returned back from close(2).
00e2e675 709 */
6151a90f 710int relayd_close(struct lttcomm_relayd_sock *rsock)
00e2e675 711{
ffe60014
DG
712 int ret;
713
00e2e675 714 /* Code flow error. Safety net. */
6151a90f 715 assert(rsock);
00e2e675 716
ffe60014 717 /* An invalid fd is fine, return success. */
6151a90f 718 if (rsock->sock.fd < 0) {
ffe60014
DG
719 ret = 0;
720 goto end;
721 }
722
6151a90f 723 DBG3("Relayd closing socket %d", rsock->sock.fd);
00e2e675 724
6151a90f
JD
725 if (rsock->sock.ops) {
726 ret = rsock->sock.ops->close(&rsock->sock);
ffe60014
DG
727 } else {
728 /* Default call if no specific ops found. */
6151a90f 729 ret = close(rsock->sock.fd);
ffe60014
DG
730 if (ret < 0) {
731 PERROR("relayd_close default close");
732 }
733 }
f96e4545 734 rsock->sock.fd = -1;
ffe60014
DG
735
736end:
737 return ret;
00e2e675
DG
738}
739
740/*
741 * Send data header structure to the relayd.
742 */
6151a90f 743int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
00e2e675
DG
744 struct lttcomm_relayd_data_hdr *hdr, size_t size)
745{
746 int ret;
747
748 /* Code flow error. Safety net. */
6151a90f 749 assert(rsock);
00e2e675
DG
750 assert(hdr);
751
f96e4545
MD
752 if (rsock->sock.fd < 0) {
753 return -ECONNRESET;
754 }
755
8fd623e0 756 DBG3("Relayd sending data header of size %zu", size);
00e2e675
DG
757
758 /* Again, safety net */
759 if (size == 0) {
760 size = sizeof(struct lttcomm_relayd_data_hdr);
761 }
762
763 /* Only send data header. */
6151a90f 764 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
00e2e675 765 if (ret < 0) {
8994307f 766 ret = -errno;
00e2e675
DG
767 goto error;
768 }
769
770 /*
771 * The data MUST be sent right after that command for the receive on the
772 * other end to match the size in the header.
773 */
774
775error:
776 return ret;
777}
173af62f
DG
778
779/*
780 * Send close stream command to the relayd.
781 */
6151a90f 782int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
173af62f
DG
783 uint64_t last_net_seq_num)
784{
785 int ret;
786 struct lttcomm_relayd_close_stream msg;
787 struct lttcomm_relayd_generic_reply reply;
788
789 /* Code flow error. Safety net. */
6151a90f 790 assert(rsock);
173af62f 791
77c7c900 792 DBG("Relayd closing stream id %" PRIu64, stream_id);
173af62f 793
53efb85a 794 memset(&msg, 0, sizeof(msg));
173af62f
DG
795 msg.stream_id = htobe64(stream_id);
796 msg.last_net_seq_num = htobe64(last_net_seq_num);
797
798 /* Send command */
6151a90f 799 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
173af62f
DG
800 if (ret < 0) {
801 goto error;
802 }
803
20275fe8 804 /* Receive response */
6151a90f 805 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
173af62f
DG
806 if (ret < 0) {
807 goto error;
808 }
809
810 reply.ret_code = be32toh(reply.ret_code);
811
812 /* Return session id or negative ret code. */
f73fabfd 813 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
814 ret = -1;
815 ERR("Relayd close stream replied error %d", reply.ret_code);
173af62f
DG
816 } else {
817 /* Success */
818 ret = 0;
819 }
820
77c7c900 821 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
173af62f
DG
822
823error:
824 return ret;
825}
c8f59ee5
DG
826
827/*
828 * Check for data availability for a given stream id.
829 *
6d805429 830 * Return 0 if NOT pending, 1 if so and a negative value on error.
c8f59ee5 831 */
6151a90f 832int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
c8f59ee5
DG
833 uint64_t last_net_seq_num)
834{
835 int ret;
6d805429 836 struct lttcomm_relayd_data_pending msg;
c8f59ee5
DG
837 struct lttcomm_relayd_generic_reply reply;
838
839 /* Code flow error. Safety net. */
6151a90f 840 assert(rsock);
c8f59ee5 841
6d805429 842 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
c8f59ee5 843
53efb85a 844 memset(&msg, 0, sizeof(msg));
c8f59ee5
DG
845 msg.stream_id = htobe64(stream_id);
846 msg.last_net_seq_num = htobe64(last_net_seq_num);
847
848 /* Send command */
6151a90f 849 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
c8f59ee5
DG
850 sizeof(msg), 0);
851 if (ret < 0) {
852 goto error;
853 }
854
20275fe8 855 /* Receive response */
6151a90f 856 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
857 if (ret < 0) {
858 goto error;
859 }
860
861 reply.ret_code = be32toh(reply.ret_code);
862
863 /* Return session id or negative ret code. */
864 if (reply.ret_code >= LTTNG_OK) {
bb63afd9 865 ERR("Relayd data pending replied error %d", reply.ret_code);
c8f59ee5
DG
866 }
867
868 /* At this point, the ret code is either 1 or 0 */
869 ret = reply.ret_code;
870
6d805429 871 DBG("Relayd data is %s pending for stream id %" PRIu64,
9dd26bb9 872 ret == 1 ? "" : "NOT", stream_id);
c8f59ee5
DG
873
874error:
875 return ret;
876}
877
878/*
879 * Check on the relayd side for a quiescent state on the control socket.
880 */
6151a90f 881int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
ad7051c0 882 uint64_t metadata_stream_id)
c8f59ee5
DG
883{
884 int ret;
ad7051c0 885 struct lttcomm_relayd_quiescent_control msg;
c8f59ee5
DG
886 struct lttcomm_relayd_generic_reply reply;
887
888 /* Code flow error. Safety net. */
6151a90f 889 assert(rsock);
c8f59ee5
DG
890
891 DBG("Relayd checking quiescent control state");
892
53efb85a 893 memset(&msg, 0, sizeof(msg));
ad7051c0
DG
894 msg.stream_id = htobe64(metadata_stream_id);
895
c8f59ee5 896 /* Send command */
6151a90f 897 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
c8f59ee5
DG
898 if (ret < 0) {
899 goto error;
900 }
901
20275fe8 902 /* Receive response */
6151a90f 903 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
904 if (ret < 0) {
905 goto error;
906 }
907
908 reply.ret_code = be32toh(reply.ret_code);
909
910 /* Return session id or negative ret code. */
911 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
912 ret = -1;
913 ERR("Relayd quiescent control replied error %d", reply.ret_code);
c8f59ee5
DG
914 goto error;
915 }
916
917 /* Control socket is quiescent */
6d805429 918 return 0;
c8f59ee5
DG
919
920error:
921 return ret;
922}
f7079f67
DG
923
924/*
925 * Begin a data pending command for a specific session id.
926 */
6151a90f 927int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
f7079f67
DG
928{
929 int ret;
930 struct lttcomm_relayd_begin_data_pending msg;
931 struct lttcomm_relayd_generic_reply reply;
932
933 /* Code flow error. Safety net. */
6151a90f 934 assert(rsock);
f7079f67
DG
935
936 DBG("Relayd begin data pending");
937
53efb85a 938 memset(&msg, 0, sizeof(msg));
f7079f67
DG
939 msg.session_id = htobe64(id);
940
941 /* Send command */
6151a90f 942 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
943 if (ret < 0) {
944 goto error;
945 }
946
20275fe8 947 /* Receive response */
6151a90f 948 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
949 if (ret < 0) {
950 goto error;
951 }
952
953 reply.ret_code = be32toh(reply.ret_code);
954
955 /* Return session id or negative ret code. */
956 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
957 ret = -1;
958 ERR("Relayd begin data pending replied error %d", reply.ret_code);
f7079f67
DG
959 goto error;
960 }
961
962 return 0;
963
964error:
965 return ret;
966}
967
968/*
969 * End a data pending command for a specific session id.
970 *
971 * Return 0 on success and set is_data_inflight to 0 if no data is being
972 * streamed or 1 if it is the case.
973 */
6151a90f 974int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
f7079f67
DG
975 unsigned int *is_data_inflight)
976{
af6c30b5 977 int ret, recv_ret;
f7079f67
DG
978 struct lttcomm_relayd_end_data_pending msg;
979 struct lttcomm_relayd_generic_reply reply;
980
981 /* Code flow error. Safety net. */
6151a90f 982 assert(rsock);
f7079f67
DG
983
984 DBG("Relayd end data pending");
985
53efb85a 986 memset(&msg, 0, sizeof(msg));
f7079f67
DG
987 msg.session_id = htobe64(id);
988
989 /* Send command */
6151a90f 990 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
991 if (ret < 0) {
992 goto error;
993 }
994
20275fe8 995 /* Receive response */
6151a90f 996 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
997 if (ret < 0) {
998 goto error;
999 }
1000
af6c30b5
DG
1001 recv_ret = be32toh(reply.ret_code);
1002 if (recv_ret < 0) {
1003 ret = recv_ret;
f7079f67
DG
1004 goto error;
1005 }
1006
af6c30b5 1007 *is_data_inflight = recv_ret;
f7079f67 1008
af6c30b5 1009 DBG("Relayd end data pending is data inflight: %d", recv_ret);
f7079f67
DG
1010
1011 return 0;
1012
1013error:
1014 return ret;
1015}
1c20f0e2
JD
1016
1017/*
1018 * Send index to the relayd.
1019 */
1020int relayd_send_index(struct lttcomm_relayd_sock *rsock,
50adc264 1021 struct ctf_packet_index *index, uint64_t relay_stream_id,
1c20f0e2
JD
1022 uint64_t net_seq_num)
1023{
1024 int ret;
1025 struct lttcomm_relayd_index msg;
1026 struct lttcomm_relayd_generic_reply reply;
1027
1028 /* Code flow error. Safety net. */
1029 assert(rsock);
1030
1031 if (rsock->minor < 4) {
1032 DBG("Not sending indexes before protocol 2.4");
1033 ret = 0;
1034 goto error;
1035 }
1036
1037 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
1038
53efb85a 1039 memset(&msg, 0, sizeof(msg));
1c20f0e2
JD
1040 msg.relay_stream_id = htobe64(relay_stream_id);
1041 msg.net_seq_num = htobe64(net_seq_num);
1042
1043 /* The index is already in big endian. */
1044 msg.packet_size = index->packet_size;
1045 msg.content_size = index->content_size;
1046 msg.timestamp_begin = index->timestamp_begin;
1047 msg.timestamp_end = index->timestamp_end;
1048 msg.events_discarded = index->events_discarded;
1049 msg.stream_id = index->stream_id;
1050
234cd636
JD
1051 if (rsock->minor >= 8) {
1052 msg.stream_instance_id = index->stream_instance_id;
1053 msg.packet_seq_num = index->packet_seq_num;
1054 }
1055
1c20f0e2 1056 /* Send command */
f8f3885c
MD
1057 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1058 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1059 rsock->minor),
1060 lttng_to_index_minor(rsock->major, rsock->minor)),
1061 0);
1c20f0e2
JD
1062 if (ret < 0) {
1063 goto error;
1064 }
1065
1066 /* Receive response */
1067 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1068 if (ret < 0) {
1069 goto error;
1070 }
1071
1072 reply.ret_code = be32toh(reply.ret_code);
1073
1074 /* Return session id or negative ret code. */
1075 if (reply.ret_code != LTTNG_OK) {
1076 ret = -1;
1077 ERR("Relayd send index replied error %d", reply.ret_code);
1078 } else {
1079 /* Success */
1080 ret = 0;
1081 }
1082
1083error:
1084 return ret;
1085}
93ec662e
JD
1086
1087/*
1088 * Ask the relay to reset the metadata trace file (regeneration).
1089 */
1090int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1091 uint64_t stream_id, uint64_t version)
1092{
1093 int ret;
1094 struct lttcomm_relayd_reset_metadata msg;
1095 struct lttcomm_relayd_generic_reply reply;
1096
1097 /* Code flow error. Safety net. */
1098 assert(rsock);
1099
1100 /* Should have been prevented by the sessiond. */
1101 if (rsock->minor < 8) {
1102 ERR("Metadata regeneration unsupported before 2.8");
1103 ret = -1;
1104 goto error;
1105 }
1106
1107 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1108
1109 memset(&msg, 0, sizeof(msg));
1110 msg.stream_id = htobe64(stream_id);
1111 msg.version = htobe64(version);
1112
1113 /* Send command */
1114 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1115 if (ret < 0) {
1116 goto error;
1117 }
1118
1119 /* Receive response */
1120 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1121 if (ret < 0) {
1122 goto error;
1123 }
1124
1125 reply.ret_code = be32toh(reply.ret_code);
1126
1127 /* Return session id or negative ret code. */
1128 if (reply.ret_code != LTTNG_OK) {
1129 ret = -1;
1130 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1131 } else {
1132 /* Success */
1133 ret = 0;
1134 }
1135
1136 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1137
1138error:
1139 return ret;
1140}
a1ae2ea5 1141
c35f9726 1142int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
ebb29c10 1143 unsigned int stream_count, const uint64_t *new_chunk_id,
c35f9726 1144 const struct relayd_stream_rotation_position *positions)
d73bf3d7
JD
1145{
1146 int ret;
c35f9726
JG
1147 unsigned int i;
1148 struct lttng_dynamic_buffer payload;
1149 struct lttcomm_relayd_generic_reply reply = {};
1150 const struct lttcomm_relayd_rotate_streams msg = {
1151 .stream_count = htobe32((uint32_t) stream_count),
1152 .new_chunk_id = (typeof(msg.new_chunk_id)) {
1153 .is_set = !!new_chunk_id,
1154 .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
1155 },
1156 };
1157 char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
1158 const char *new_chunk_id_str;
d73bf3d7 1159
c35f9726 1160 lttng_dynamic_buffer_init(&payload);
d73bf3d7 1161
c35f9726
JG
1162 /* Code flow error. Safety net. */
1163 assert(sock);
d73bf3d7 1164
c35f9726
JG
1165 if (new_chunk_id) {
1166 ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf),
1167 "%" PRIu64, *new_chunk_id);
1168 if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
1169 new_chunk_id_str = "formatting error";
1170 } else {
1171 new_chunk_id_str = new_chunk_id_buf;
1172 }
1173 } else {
1174 new_chunk_id_str = "none";
d73bf3d7
JD
1175 }
1176
c35f9726
JG
1177 DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
1178 new_chunk_id_str, stream_count);
d73bf3d7 1179
c35f9726
JG
1180 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1181 if (ret) {
1182 ERR("Failed to allocate \"rotate streams\" command payload");
d73bf3d7
JD
1183 goto error;
1184 }
1185
c35f9726
JG
1186 for (i = 0; i < stream_count; i++) {
1187 const struct relayd_stream_rotation_position *position =
1188 &positions[i];
1189 const struct lttcomm_relayd_stream_rotation_position comm_position = {
1190 .stream_id = htobe64(position->stream_id),
1191 .rotate_at_seq_num = htobe64(
1192 position->rotate_at_seq_num),
1193 };
1194
1195 DBG("Rotate stream %" PRIu64 "at sequence number %" PRIu64,
1196 position->stream_id,
1197 position->rotate_at_seq_num);
1198 ret = lttng_dynamic_buffer_append(&payload, &comm_position,
1199 sizeof(comm_position));
1200 if (ret) {
1201 ERR("Failed to allocate \"rotate streams\" command payload");
1202 goto error;
1203 }
1204 }
d73bf3d7
JD
1205
1206 /* Send command. */
c35f9726
JG
1207 ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data,
1208 payload.size, 0);
d73bf3d7 1209 if (ret < 0) {
c35f9726 1210 ERR("Failed to send \"rotate stream\" command");
d73bf3d7
JD
1211 goto error;
1212 }
1213
1214 /* Receive response. */
c35f9726 1215 ret = recv_reply(sock, &reply, sizeof(reply));
d73bf3d7 1216 if (ret < 0) {
c35f9726 1217 ERR("Failed to receive \"rotate streams\" command reply");
d73bf3d7
JD
1218 goto error;
1219 }
1220
1221 reply.ret_code = be32toh(reply.ret_code);
d73bf3d7
JD
1222 if (reply.ret_code != LTTNG_OK) {
1223 ret = -1;
c35f9726 1224 ERR("Relayd rotate streams replied error %d", reply.ret_code);
d73bf3d7
JD
1225 } else {
1226 /* Success. */
1227 ret = 0;
c35f9726 1228 DBG("Relayd rotated streams successfully");
d73bf3d7
JD
1229 }
1230
1231error:
c35f9726 1232 lttng_dynamic_buffer_reset(&payload);
d73bf3d7
JD
1233 return ret;
1234}
e5add6d0
JG
1235
1236int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
1237 struct lttng_trace_chunk *chunk)
1238{
1239 int ret = 0;
1240 enum lttng_trace_chunk_status status;
1241 struct lttcomm_relayd_create_trace_chunk msg = {};
1242 struct lttcomm_relayd_generic_reply reply = {};
1243 struct lttng_dynamic_buffer payload;
1244 uint64_t chunk_id;
1245 time_t creation_timestamp;
1246 const char *chunk_name;
1247 size_t chunk_name_length;
913a542b 1248 bool overridden_name;
e5add6d0
JG
1249
1250 lttng_dynamic_buffer_init(&payload);
1251
1252 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1253 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1254 ret = -1;
1255 goto end;
1256 }
1257
1258 status = lttng_trace_chunk_get_creation_timestamp(
1259 chunk, &creation_timestamp);
1260 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1261 ret = -1;
1262 goto end;
1263 }
1264
1265 status = lttng_trace_chunk_get_name(
913a542b 1266 chunk, &chunk_name, &overridden_name);
e5add6d0
JG
1267 if (status != LTTNG_TRACE_CHUNK_STATUS_OK &&
1268 status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1269 ret = -1;
1270 goto end;
1271 }
1272
913a542b 1273 chunk_name_length = overridden_name ? (strlen(chunk_name) + 1) : 0;
e5add6d0
JG
1274 msg = (typeof(msg)){
1275 .chunk_id = htobe64(chunk_id),
1276 .creation_timestamp = htobe64((uint64_t) creation_timestamp),
1277 .override_name_length = htobe32((uint32_t) chunk_name_length),
1278 };
1279
1280 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1281 if (ret) {
1282 goto end;
1283 }
1284 if (chunk_name_length) {
1285 ret = lttng_dynamic_buffer_append(
1286 &payload, chunk_name, chunk_name_length);
1287 if (ret) {
1288 goto end;
1289 }
1290 }
1291
bbc4768c
JG
1292 ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data,
1293 payload.size, 0);
e5add6d0
JG
1294 if (ret < 0) {
1295 ERR("Failed to send trace chunk creation command to relay daemon");
1296 goto end;
1297 }
1298
1299 ret = recv_reply(sock, &reply, sizeof(reply));
1300 if (ret < 0) {
1301 ERR("Failed to receive relay daemon trace chunk creation command reply");
1302 goto end;
1303 }
1304
1305 reply.ret_code = be32toh(reply.ret_code);
1306 if (reply.ret_code != LTTNG_OK) {
1307 ret = -1;
1308 ERR("Relayd trace chunk create replied error %d",
1309 reply.ret_code);
1310 } else {
1311 ret = 0;
1312 DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64,
1313 chunk_id);
1314 }
1315
1316end:
1317 lttng_dynamic_buffer_reset(&payload);
1318 return ret;
1319}
bbc4768c
JG
1320
1321int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
1322 struct lttng_trace_chunk *chunk)
1323{
1324 int ret = 0;
1325 enum lttng_trace_chunk_status status;
1326 struct lttcomm_relayd_close_trace_chunk msg = {};
1327 struct lttcomm_relayd_generic_reply reply = {};
1328 uint64_t chunk_id;
1329 time_t close_timestamp;
1330 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
1331
1332 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1333 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1334 ERR("Failed to get trace chunk id");
1335 ret = -1;
1336 goto end;
1337 }
1338
1339 status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
1340 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1341 ERR("Failed to get trace chunk close timestamp");
1342 ret = -1;
1343 goto end;
1344 }
1345
1346 status = lttng_trace_chunk_get_close_command(chunk,
1347 &close_command.value);
1348 switch (status) {
1349 case LTTNG_TRACE_CHUNK_STATUS_OK:
1350 close_command.is_set = 1;
1351 break;
1352 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1353 break;
1354 default:
1355 ERR("Failed to get trace chunk close command");
1356 ret = -1;
1357 goto end;
1358 }
1359
1360 msg = (typeof(msg)){
1361 .chunk_id = htobe64(chunk_id),
1362 .close_timestamp = htobe64((uint64_t) close_timestamp),
1363 .close_command = {
1364 .value = htobe32((uint32_t) close_command.value),
1365 .is_set = close_command.is_set,
1366 },
1367 };
1368
1369 ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg),
1370 0);
1371 if (ret < 0) {
1372 ERR("Failed to send trace chunk close command to relay daemon");
1373 goto end;
1374 }
1375
1376 ret = recv_reply(sock, &reply, sizeof(reply));
1377 if (ret < 0) {
1378 ERR("Failed to receive relay daemon trace chunk close command reply");
1379 goto end;
1380 }
1381
1382 reply.ret_code = be32toh(reply.ret_code);
1383 if (reply.ret_code != LTTNG_OK) {
1384 ret = -1;
1385 ERR("Relayd trace chunk close replied error %d",
1386 reply.ret_code);
1387 } else {
1388 ret = 0;
1389 DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
1390 chunk_id);
1391 }
1392end:
1393 return ret;
1394}
c35f9726
JG
1395
1396int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
1397 uint64_t chunk_id, bool *chunk_exists)
1398{
1399 int ret = 0;
1400 struct lttcomm_relayd_trace_chunk_exists msg = {};
1401 struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
1402
1403 msg = (typeof(msg)){
1404 .chunk_id = htobe64(chunk_id),
1405 };
1406
1407 ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg),
1408 0);
1409 if (ret < 0) {
1410 ERR("Failed to send trace chunk exists command to relay daemon");
1411 goto end;
1412 }
1413
1414 ret = recv_reply(sock, &reply, sizeof(reply));
1415 if (ret < 0) {
1416 ERR("Failed to receive relay daemon trace chunk close command reply");
1417 goto end;
1418 }
1419
1420 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1421 if (reply.generic.ret_code != LTTNG_OK) {
1422 ret = -1;
1423 ERR("Relayd trace chunk close replied error %d",
1424 reply.generic.ret_code);
1425 } else {
1426 ret = 0;
1427 DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
1428 ", exists = %s", chunk_id,
1429 reply.trace_chunk_exists ? "true" : "false");
1430 *chunk_exists = !!reply.trace_chunk_exists;
1431 }
1432end:
1433 return ret;
1434}
This page took 0.116223 seconds and 4 git commands to generate.