Fix: streaming and snapshot backward compat for relayd < 2.11
[lttng-tools.git] / src / common / relayd / relayd.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
6c1c0768 18#define _LGPL_SOURCE
00e2e675
DG
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
77c7c900 24#include <inttypes.h>
00e2e675
DG
25
26#include <common/common.h>
27#include <common/defaults.h>
f263b7fd 28#include <common/compat/endian.h>
27283937 29#include <common/compat/string.h>
00e2e675 30#include <common/sessiond-comm/relayd.h>
50adc264 31#include <common/index/ctf-index.h>
d2956687 32#include <common/trace-chunk.h>
c35f9726 33#include <common/string-utils/format.h>
00e2e675
DG
34
35#include "relayd.h"
36
070b6a86
MD
37static
38bool relayd_supports_chunks(const struct lttcomm_relayd_sock *sock)
39{
40 if (sock->major > 2) {
41 return true;
42 } else if (sock->major == 2 && sock->minor >= 11) {
43 return true;
44 }
45 return false;
46}
47
00e2e675
DG
48/*
49 * Send command. Fill up the header and append the data.
50 */
6151a90f 51static int send_command(struct lttcomm_relayd_sock *rsock,
76b9afaa 52 enum lttcomm_relayd_command cmd, const void *data, size_t size,
00e2e675
DG
53 int flags)
54{
55 int ret;
56 struct lttcomm_relayd_hdr header;
57 char *buf;
58 uint64_t buf_size = sizeof(header);
59
f96e4545
MD
60 if (rsock->sock.fd < 0) {
61 return -ECONNRESET;
62 }
63
00e2e675
DG
64 if (data) {
65 buf_size += size;
66 }
67
68 buf = zmalloc(buf_size);
69 if (buf == NULL) {
70 PERROR("zmalloc relayd send command buf");
71 ret = -1;
72 goto alloc_error;
73 }
74
53efb85a 75 memset(&header, 0, sizeof(header));
00e2e675
DG
76 header.cmd = htobe32(cmd);
77 header.data_size = htobe64(size);
78
79 /* Zeroed for now since not used. */
80 header.cmd_version = 0;
81 header.circuit_id = 0;
82
83 /* Prepare buffer to send. */
84 memcpy(buf, &header, sizeof(header));
85 if (data) {
86 memcpy(buf + sizeof(header), data, size);
87 }
88
06586bbe 89 DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
6151a90f 90 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
00e2e675 91 if (ret < 0) {
06586bbe
JG
92 PERROR("Failed to send command %d of size %" PRIu64,
93 (int) cmd, buf_size);
8994307f 94 ret = -errno;
00e2e675
DG
95 goto error;
96 }
00e2e675
DG
97error:
98 free(buf);
99alloc_error:
100 return ret;
101}
102
103/*
104 * Receive reply data on socket. This MUST be call after send_command or else
105 * could result in unexpected behavior(s).
106 */
6151a90f 107static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
00e2e675
DG
108{
109 int ret;
110
f96e4545
MD
111 if (rsock->sock.fd < 0) {
112 return -ECONNRESET;
113 }
114
8fd623e0 115 DBG3("Relayd waiting for reply of size %zu", size);
00e2e675 116
6151a90f 117 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
20275fe8
DG
118 if (ret <= 0 || ret != size) {
119 if (ret == 0) {
120 /* Orderly shutdown. */
6151a90f 121 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
20275fe8 122 } else {
8fd623e0 123 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
6151a90f 124 rsock->sock.fd, size, ret);
20275fe8
DG
125 }
126 /* Always return -1 here and the caller can use errno. */
127 ret = -1;
00e2e675
DG
128 goto error;
129 }
130
131error:
132 return ret;
133}
134
d3e2ba59 135/*
6fa5fe7c
MD
136 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
137 * hostname, and base_path) have no length restriction on the sender side.
f86f6389
JR
138 * Length for both payloads is stored in the msg struct. A new dynamic size
139 * payload size is introduced.
140 */
141static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
fb9a95c4 142 const char *session_name, const char *hostname,
6fa5fe7c
MD
143 const char *base_path, int session_live_timer,
144 unsigned int snapshot, uint64_t sessiond_session_id,
145 const lttng_uuid sessiond_uuid, const uint64_t *current_chunk_id,
46ef2188 146 time_t creation_time, bool session_name_contains_creation_time)
f86f6389
JR
147{
148 int ret;
149 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
150 size_t session_name_len;
151 size_t hostname_len;
6fa5fe7c 152 size_t base_path_len;
f86f6389 153 size_t msg_length;
6fa5fe7c 154 char *dst;
f86f6389 155
46ef2188
MD
156 if (!base_path) {
157 base_path = "";
158 }
159 /* The three names are sent with a '\0' delimiter between them. */
f86f6389
JR
160 session_name_len = strlen(session_name) + 1;
161 hostname_len = strlen(hostname) + 1;
6fa5fe7c 162 base_path_len = base_path ? strlen(base_path) + 1 : 0;
f86f6389 163
6fa5fe7c 164 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
f86f6389
JR
165 msg = zmalloc(msg_length);
166 if (!msg) {
167 PERROR("zmalloc create_session_2_11 command message");
168 ret = -1;
169 goto error;
170 }
171
172 assert(session_name_len <= UINT32_MAX);
173 msg->session_name_len = htobe32(session_name_len);
174
175 assert(hostname_len <= UINT32_MAX);
176 msg->hostname_len = htobe32(hostname_len);
177
6fa5fe7c
MD
178 assert(base_path_len <= UINT32_MAX);
179 msg->base_path_len = htobe32(base_path_len);
180
181 dst = msg->names;
182 if (lttng_strncpy(dst, session_name, session_name_len)) {
183 ret = -1;
184 goto error;
185 }
186 dst += session_name_len;
187 if (lttng_strncpy(dst, hostname, hostname_len)) {
f86f6389
JR
188 ret = -1;
189 goto error;
190 }
6fa5fe7c
MD
191 dst += hostname_len;
192 if (base_path && lttng_strncpy(dst, base_path, base_path_len)) {
f86f6389
JR
193 ret = -1;
194 goto error;
195 }
196
197 msg->live_timer = htobe32(session_live_timer);
198 msg->snapshot = !!snapshot;
199
658f12fa
JG
200 lttng_uuid_copy(msg->sessiond_uuid, sessiond_uuid);
201 msg->session_id = htobe64(sessiond_session_id);
46ef2188 202 msg->session_name_contains_creation_time = session_name_contains_creation_time;
f39bd140
JG
203 if (current_chunk_id) {
204 LTTNG_OPTIONAL_SET(&msg->current_chunk_id,
205 htobe64(*current_chunk_id));
206 }
207
db1da059
JG
208 msg->creation_time = htobe64((uint64_t) creation_time);
209
f86f6389
JR
210 /* Send command */
211 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
212 if (ret < 0) {
213 goto error;
214 }
215error:
216 free(msg);
217 return ret;
218}
219/*
220 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
d3e2ba59
JD
221 * support the live reading capability.
222 */
223static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
fb9a95c4
JG
224 const char *session_name, const char *hostname,
225 int session_live_timer, unsigned int snapshot)
d3e2ba59
JD
226{
227 int ret;
228 struct lttcomm_relayd_create_session_2_4 msg;
229
3a13ffd5
MD
230 if (lttng_strncpy(msg.session_name, session_name,
231 sizeof(msg.session_name))) {
246777db
MD
232 ret = -1;
233 goto error;
234 }
3a13ffd5 235 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
246777db
MD
236 ret = -1;
237 goto error;
238 }
d3e2ba59 239 msg.live_timer = htobe32(session_live_timer);
7d2f7452 240 msg.snapshot = htobe32(snapshot);
d3e2ba59
JD
241
242 /* Send command */
243 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
244 if (ret < 0) {
245 goto error;
246 }
247
248error:
249 return ret;
250}
251
252/*
253 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
254 */
42e9a27b 255static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock)
d3e2ba59
JD
256{
257 int ret;
258
259 /* Send command */
260 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
261 if (ret < 0) {
262 goto error;
263 }
264
265error:
266 return ret;
267}
268
c5b6f4f0
DG
269/*
270 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
271 * set session_id of the relayd if we have a successful reply from the relayd.
272 *
20275fe8
DG
273 * On success, return 0 else a negative value which is either an errno error or
274 * a lttng error code from the relayd.
c5b6f4f0 275 */
fb9a95c4
JG
276int relayd_create_session(struct lttcomm_relayd_sock *rsock,
277 uint64_t *relayd_session_id,
278 const char *session_name, const char *hostname,
6fa5fe7c 279 const char *base_path, int session_live_timer,
658f12fa 280 unsigned int snapshot, uint64_t sessiond_session_id,
f39bd140 281 const lttng_uuid sessiond_uuid,
db1da059 282 const uint64_t *current_chunk_id,
46ef2188 283 time_t creation_time, bool session_name_contains_creation_time)
c5b6f4f0
DG
284{
285 int ret;
286 struct lttcomm_relayd_status_session reply;
287
6151a90f 288 assert(rsock);
658f12fa 289 assert(relayd_session_id);
c5b6f4f0
DG
290
291 DBG("Relayd create session");
292
f86f6389
JR
293 if (rsock->minor < 4) {
294 /* From 2.1 to 2.3 */
295 ret = relayd_create_session_2_1(rsock);
296 } else if (rsock->minor >= 4 && rsock->minor < 11) {
297 /* From 2.4 to 2.10 */
298 ret = relayd_create_session_2_4(rsock, session_name,
299 hostname, session_live_timer, snapshot);
300 } else {
301 /* From 2.11 to ... */
302 ret = relayd_create_session_2_11(rsock, session_name,
6fa5fe7c 303 hostname, base_path, session_live_timer, snapshot,
f39bd140 304 sessiond_session_id, sessiond_uuid,
46ef2188
MD
305 current_chunk_id, creation_time,
306 session_name_contains_creation_time);
d3e2ba59
JD
307 }
308
c5b6f4f0
DG
309 if (ret < 0) {
310 goto error;
311 }
312
20275fe8 313 /* Receive response */
6151a90f 314 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c5b6f4f0
DG
315 if (ret < 0) {
316 goto error;
317 }
318
319 reply.session_id = be64toh(reply.session_id);
320 reply.ret_code = be32toh(reply.ret_code);
321
322 /* Return session id or negative ret code. */
323 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
324 ret = -1;
325 ERR("Relayd create session replied error %d", reply.ret_code);
c5b6f4f0
DG
326 goto error;
327 } else {
328 ret = 0;
658f12fa 329 *relayd_session_id = reply.session_id;
c5b6f4f0
DG
330 }
331
332 DBG("Relayd session created with id %" PRIu64, reply.session_id);
333
334error:
335 return ret;
336}
337
2f21a469
JR
338static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
339 const char *channel_name, const char *pathname)
340{
341 int ret;
342 struct lttcomm_relayd_add_stream msg;
343
344 memset(&msg, 0, sizeof(msg));
345 if (lttng_strncpy(msg.channel_name, channel_name,
346 sizeof(msg.channel_name))) {
347 ret = -1;
348 goto error;
349 }
350
351 if (lttng_strncpy(msg.pathname, pathname,
352 sizeof(msg.pathname))) {
353 ret = -1;
354 goto error;
355 }
356
357 /* Send command */
358 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
359 if (ret < 0) {
360 ret = -1;
361 goto error;
362 }
363 ret = 0;
364error:
365 return ret;
366}
367
368static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
369 const char *channel_name, const char *pathname,
370 uint64_t tracefile_size, uint64_t tracefile_count)
371{
372 int ret;
373 struct lttcomm_relayd_add_stream_2_2 msg;
374
375 memset(&msg, 0, sizeof(msg));
376 /* Compat with relayd 2.2 to 2.10 */
377 if (lttng_strncpy(msg.channel_name, channel_name,
378 sizeof(msg.channel_name))) {
379 ret = -1;
380 goto error;
381 }
382 if (lttng_strncpy(msg.pathname, pathname,
383 sizeof(msg.pathname))) {
384 ret = -1;
385 goto error;
386 }
387 msg.tracefile_size = htobe64(tracefile_size);
388 msg.tracefile_count = htobe64(tracefile_count);
389
390 /* Send command */
391 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
392 if (ret < 0) {
393 goto error;
394 }
395 ret = 0;
396error:
397 return ret;
398}
399
400static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
401 const char *channel_name, const char *pathname,
b00e554e
JG
402 uint64_t tracefile_size, uint64_t tracefile_count,
403 uint64_t trace_archive_id)
2f21a469
JR
404{
405 int ret;
406 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
407 size_t channel_name_len;
408 size_t pathname_len;
409 size_t msg_length;
410
411 /* The two names are sent with a '\0' delimiter between them. */
412 channel_name_len = strlen(channel_name) + 1;
413 pathname_len = strlen(pathname) + 1;
414
415 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
416 msg = zmalloc(msg_length);
417 if (!msg) {
418 PERROR("zmalloc add_stream_2_11 command message");
419 ret = -1;
420 goto error;
421 }
422
423 assert(channel_name_len <= UINT32_MAX);
424 msg->channel_name_len = htobe32(channel_name_len);
425
426 assert(pathname_len <= UINT32_MAX);
427 msg->pathname_len = htobe32(pathname_len);
428
429 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
430 ret = -1;
431 goto error;
432 }
433 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
434 ret = -1;
435 goto error;
436 }
437
438 msg->tracefile_size = htobe64(tracefile_size);
439 msg->tracefile_count = htobe64(tracefile_count);
348a81dc 440 msg->trace_chunk_id = htobe64(trace_archive_id);
2f21a469
JR
441
442 /* Send command */
443 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
444 if (ret < 0) {
445 goto error;
446 }
447 ret = 0;
448error:
449 free(msg);
450 return ret;
451}
452
00e2e675
DG
453/*
454 * Add stream on the relayd and assign stream handle to the stream_id argument.
455 *
070b6a86
MD
456 * Chunks are not supported by relayd prior to 2.11, but are used to
457 * internally between session daemon and consumer daemon to keep track
458 * of the channel and stream output path.
459 *
00e2e675
DG
460 * On success return 0 else return ret_code negative value.
461 */
6151a90f 462int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
0f907de1 463 const char *pathname, uint64_t *stream_id,
0b50e4b3 464 uint64_t tracefile_size, uint64_t tracefile_count,
d2956687 465 struct lttng_trace_chunk *trace_chunk)
00e2e675
DG
466{
467 int ret;
00e2e675
DG
468 struct lttcomm_relayd_status_stream reply;
469
470 /* Code flow error. Safety net. */
6151a90f 471 assert(rsock);
00e2e675
DG
472 assert(channel_name);
473 assert(pathname);
070b6a86 474 assert(trace_chunk);
00e2e675
DG
475
476 DBG("Relayd adding stream for channel name %s", channel_name);
477
0f907de1
JD
478 /* Compat with relayd 2.1 */
479 if (rsock->minor == 1) {
2f21a469
JR
480 /* For 2.1 */
481 ret = relayd_add_stream_2_1(rsock, channel_name, pathname);
482
483 } else if (rsock->minor > 1 && rsock->minor < 11) {
484 /* From 2.2 to 2.10 */
485 ret = relayd_add_stream_2_2(rsock, channel_name, pathname,
486 tracefile_size, tracefile_count);
0f907de1 487 } else {
d2956687
JG
488 enum lttng_trace_chunk_status chunk_status;
489 uint64_t chunk_id;
490
d2956687
JG
491 chunk_status = lttng_trace_chunk_get_id(trace_chunk,
492 &chunk_id);
493 assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
494
2f21a469
JR
495 /* From 2.11 to ...*/
496 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
b00e554e 497 tracefile_size, tracefile_count,
d2956687 498 chunk_id);
2f21a469 499 }
0f907de1 500
2f21a469
JR
501 if (ret) {
502 ret = -1;
503 goto error;
00e2e675
DG
504 }
505
633d0084 506 /* Waiting for reply */
6151a90f 507 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
00e2e675
DG
508 if (ret < 0) {
509 goto error;
510 }
511
512 /* Back to host bytes order. */
513 reply.handle = be64toh(reply.handle);
514 reply.ret_code = be32toh(reply.ret_code);
515
516 /* Return session id or negative ret code. */
f73fabfd 517 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
518 ret = -1;
519 ERR("Relayd add stream replied error %d", reply.ret_code);
00e2e675
DG
520 } else {
521 /* Success */
522 ret = 0;
523 *stream_id = reply.handle;
524 }
525
77c7c900 526 DBG("Relayd stream added successfully with handle %" PRIu64,
2f21a469 527 reply.handle);
00e2e675
DG
528
529error:
530 return ret;
531}
532
a4baae1b
JD
533/*
534 * Inform the relay that all the streams for the current channel has been sent.
535 *
536 * On success return 0 else return ret_code negative value.
537 */
538int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
539{
540 int ret;
541 struct lttcomm_relayd_generic_reply reply;
542
543 /* Code flow error. Safety net. */
544 assert(rsock);
545
546 DBG("Relayd sending streams sent.");
547
548 /* This feature was introduced in 2.4, ignore it for earlier versions. */
549 if (rsock->minor < 4) {
550 ret = 0;
551 goto end;
552 }
553
554 /* Send command */
555 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
556 if (ret < 0) {
557 goto error;
558 }
559
560 /* Waiting for reply */
561 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
562 if (ret < 0) {
563 goto error;
564 }
565
566 /* Back to host bytes order. */
567 reply.ret_code = be32toh(reply.ret_code);
568
569 /* Return session id or negative ret code. */
570 if (reply.ret_code != LTTNG_OK) {
571 ret = -1;
572 ERR("Relayd streams sent replied error %d", reply.ret_code);
573 goto error;
574 } else {
575 /* Success */
576 ret = 0;
577 }
578
579 DBG("Relayd streams sent success");
580
581error:
582end:
583 return ret;
584}
585
00e2e675
DG
586/*
587 * Check version numbers on the relayd.
d4519fa3
JD
588 * If major versions are compatible, we assign minor_to_use to the
589 * minor version of the procotol we are going to use for this session.
00e2e675 590 *
67d5aa28
JD
591 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
592 * otherwise, or a negative value on network errors.
00e2e675 593 */
6151a90f 594int relayd_version_check(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
595{
596 int ret;
092b6259 597 struct lttcomm_relayd_version msg;
00e2e675
DG
598
599 /* Code flow error. Safety net. */
6151a90f 600 assert(rsock);
00e2e675 601
6151a90f
JD
602 DBG("Relayd version check for major.minor %u.%u", rsock->major,
603 rsock->minor);
00e2e675 604
53efb85a 605 memset(&msg, 0, sizeof(msg));
092b6259 606 /* Prepare network byte order before transmission. */
6151a90f
JD
607 msg.major = htobe32(rsock->major);
608 msg.minor = htobe32(rsock->minor);
092b6259 609
00e2e675 610 /* Send command */
6151a90f 611 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
00e2e675
DG
612 if (ret < 0) {
613 goto error;
614 }
615
20275fe8 616 /* Receive response */
6151a90f 617 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
00e2e675
DG
618 if (ret < 0) {
619 goto error;
620 }
621
622 /* Set back to host bytes order */
092b6259
DG
623 msg.major = be32toh(msg.major);
624 msg.minor = be32toh(msg.minor);
625
626 /*
627 * Only validate the major version. If the other side is higher,
628 * communication is not possible. Only major version equal can talk to each
629 * other. If the minor version differs, the lowest version is used by both
630 * sides.
092b6259 631 */
6151a90f 632 if (msg.major != rsock->major) {
d4519fa3 633 /* Not compatible */
67d5aa28 634 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
d4519fa3 635 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
6151a90f 636 msg.major, rsock->major);
092b6259 637 goto error;
00e2e675
DG
638 }
639
092b6259 640 /*
6151a90f
JD
641 * If the relayd's minor version is higher, it will adapt to our version so
642 * we can continue to use the latest relayd communication data structure.
643 * If the received minor version is higher, the relayd should adapt to us.
092b6259 644 */
6151a90f
JD
645 if (rsock->minor > msg.minor) {
646 rsock->minor = msg.minor;
d4519fa3 647 }
092b6259 648
d4519fa3
JD
649 /* Version number compatible */
650 DBG2("Relayd version is compatible, using protocol version %u.%u",
6151a90f 651 rsock->major, rsock->minor);
d4519fa3 652 ret = 0;
00e2e675
DG
653
654error:
655 return ret;
656}
657
00e2e675
DG
658/*
659 * Add stream on the relayd and assign stream handle to the stream_id argument.
660 *
661 * On success return 0 else return ret_code negative value.
662 */
6151a90f 663int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
00e2e675
DG
664{
665 int ret;
666
667 /* Code flow error. Safety net. */
6151a90f 668 assert(rsock);
00e2e675 669
77c7c900 670 DBG("Relayd sending metadata of size %zu", len);
00e2e675
DG
671
672 /* Send command */
6151a90f 673 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
00e2e675
DG
674 if (ret < 0) {
675 goto error;
676 }
677
678 DBG2("Relayd metadata added successfully");
679
680 /*
681 * After that call, the metadata data MUST be sent to the relayd so the
682 * receive size on the other end matches the len of the metadata packet
633d0084 683 * header. This is why we don't wait for a reply here.
00e2e675
DG
684 */
685
686error:
687 return ret;
688}
689
690/*
6151a90f 691 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
00e2e675 692 */
6151a90f 693int relayd_connect(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
694{
695 /* Code flow error. Safety net. */
6151a90f 696 assert(rsock);
00e2e675 697
f96e4545
MD
698 if (!rsock->sock.ops) {
699 /*
700 * Attempting a connect on a non-initialized socket.
701 */
702 return -ECONNRESET;
703 }
704
00e2e675
DG
705 DBG3("Relayd connect ...");
706
6151a90f 707 return rsock->sock.ops->connect(&rsock->sock);
00e2e675
DG
708}
709
710/*
6151a90f 711 * Close relayd socket with an allocated lttcomm_relayd_sock.
ffe60014
DG
712 *
713 * If no socket operations are found, simply return 0 meaning that everything
714 * is fine. Without operations, the socket can not possibly be opened or used.
715 * This is possible if the socket was allocated but not created. However, the
716 * caller could simply use it to store a valid file descriptor for instance
717 * passed over a Unix socket and call this to cleanup but still without a valid
718 * ops pointer.
719 *
720 * Return the close returned value. On error, a negative value is usually
721 * returned back from close(2).
00e2e675 722 */
6151a90f 723int relayd_close(struct lttcomm_relayd_sock *rsock)
00e2e675 724{
ffe60014
DG
725 int ret;
726
00e2e675 727 /* Code flow error. Safety net. */
6151a90f 728 assert(rsock);
00e2e675 729
ffe60014 730 /* An invalid fd is fine, return success. */
6151a90f 731 if (rsock->sock.fd < 0) {
ffe60014
DG
732 ret = 0;
733 goto end;
734 }
735
6151a90f 736 DBG3("Relayd closing socket %d", rsock->sock.fd);
00e2e675 737
6151a90f
JD
738 if (rsock->sock.ops) {
739 ret = rsock->sock.ops->close(&rsock->sock);
ffe60014
DG
740 } else {
741 /* Default call if no specific ops found. */
6151a90f 742 ret = close(rsock->sock.fd);
ffe60014
DG
743 if (ret < 0) {
744 PERROR("relayd_close default close");
745 }
746 }
f96e4545 747 rsock->sock.fd = -1;
ffe60014
DG
748
749end:
750 return ret;
00e2e675
DG
751}
752
753/*
754 * Send data header structure to the relayd.
755 */
6151a90f 756int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
00e2e675
DG
757 struct lttcomm_relayd_data_hdr *hdr, size_t size)
758{
759 int ret;
760
761 /* Code flow error. Safety net. */
6151a90f 762 assert(rsock);
00e2e675
DG
763 assert(hdr);
764
f96e4545
MD
765 if (rsock->sock.fd < 0) {
766 return -ECONNRESET;
767 }
768
8fd623e0 769 DBG3("Relayd sending data header of size %zu", size);
00e2e675
DG
770
771 /* Again, safety net */
772 if (size == 0) {
773 size = sizeof(struct lttcomm_relayd_data_hdr);
774 }
775
776 /* Only send data header. */
6151a90f 777 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
00e2e675 778 if (ret < 0) {
8994307f 779 ret = -errno;
00e2e675
DG
780 goto error;
781 }
782
783 /*
784 * The data MUST be sent right after that command for the receive on the
785 * other end to match the size in the header.
786 */
787
788error:
789 return ret;
790}
173af62f
DG
791
792/*
793 * Send close stream command to the relayd.
794 */
6151a90f 795int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
173af62f
DG
796 uint64_t last_net_seq_num)
797{
798 int ret;
799 struct lttcomm_relayd_close_stream msg;
800 struct lttcomm_relayd_generic_reply reply;
801
802 /* Code flow error. Safety net. */
6151a90f 803 assert(rsock);
173af62f 804
77c7c900 805 DBG("Relayd closing stream id %" PRIu64, stream_id);
173af62f 806
53efb85a 807 memset(&msg, 0, sizeof(msg));
173af62f
DG
808 msg.stream_id = htobe64(stream_id);
809 msg.last_net_seq_num = htobe64(last_net_seq_num);
810
811 /* Send command */
6151a90f 812 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
173af62f
DG
813 if (ret < 0) {
814 goto error;
815 }
816
20275fe8 817 /* Receive response */
6151a90f 818 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
173af62f
DG
819 if (ret < 0) {
820 goto error;
821 }
822
823 reply.ret_code = be32toh(reply.ret_code);
824
825 /* Return session id or negative ret code. */
f73fabfd 826 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
827 ret = -1;
828 ERR("Relayd close stream replied error %d", reply.ret_code);
173af62f
DG
829 } else {
830 /* Success */
831 ret = 0;
832 }
833
77c7c900 834 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
173af62f
DG
835
836error:
837 return ret;
838}
c8f59ee5
DG
839
840/*
841 * Check for data availability for a given stream id.
842 *
6d805429 843 * Return 0 if NOT pending, 1 if so and a negative value on error.
c8f59ee5 844 */
6151a90f 845int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
c8f59ee5
DG
846 uint64_t last_net_seq_num)
847{
848 int ret;
6d805429 849 struct lttcomm_relayd_data_pending msg;
c8f59ee5
DG
850 struct lttcomm_relayd_generic_reply reply;
851
852 /* Code flow error. Safety net. */
6151a90f 853 assert(rsock);
c8f59ee5 854
6d805429 855 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
c8f59ee5 856
53efb85a 857 memset(&msg, 0, sizeof(msg));
c8f59ee5
DG
858 msg.stream_id = htobe64(stream_id);
859 msg.last_net_seq_num = htobe64(last_net_seq_num);
860
861 /* Send command */
6151a90f 862 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
c8f59ee5
DG
863 sizeof(msg), 0);
864 if (ret < 0) {
865 goto error;
866 }
867
20275fe8 868 /* Receive response */
6151a90f 869 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
870 if (ret < 0) {
871 goto error;
872 }
873
874 reply.ret_code = be32toh(reply.ret_code);
875
876 /* Return session id or negative ret code. */
877 if (reply.ret_code >= LTTNG_OK) {
bb63afd9 878 ERR("Relayd data pending replied error %d", reply.ret_code);
c8f59ee5
DG
879 }
880
881 /* At this point, the ret code is either 1 or 0 */
882 ret = reply.ret_code;
883
6d805429 884 DBG("Relayd data is %s pending for stream id %" PRIu64,
9dd26bb9 885 ret == 1 ? "" : "NOT", stream_id);
c8f59ee5
DG
886
887error:
888 return ret;
889}
890
891/*
892 * Check on the relayd side for a quiescent state on the control socket.
893 */
6151a90f 894int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
ad7051c0 895 uint64_t metadata_stream_id)
c8f59ee5
DG
896{
897 int ret;
ad7051c0 898 struct lttcomm_relayd_quiescent_control msg;
c8f59ee5
DG
899 struct lttcomm_relayd_generic_reply reply;
900
901 /* Code flow error. Safety net. */
6151a90f 902 assert(rsock);
c8f59ee5
DG
903
904 DBG("Relayd checking quiescent control state");
905
53efb85a 906 memset(&msg, 0, sizeof(msg));
ad7051c0
DG
907 msg.stream_id = htobe64(metadata_stream_id);
908
c8f59ee5 909 /* Send command */
6151a90f 910 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
c8f59ee5
DG
911 if (ret < 0) {
912 goto error;
913 }
914
20275fe8 915 /* Receive response */
6151a90f 916 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
917 if (ret < 0) {
918 goto error;
919 }
920
921 reply.ret_code = be32toh(reply.ret_code);
922
923 /* Return session id or negative ret code. */
924 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
925 ret = -1;
926 ERR("Relayd quiescent control replied error %d", reply.ret_code);
c8f59ee5
DG
927 goto error;
928 }
929
930 /* Control socket is quiescent */
6d805429 931 return 0;
c8f59ee5
DG
932
933error:
934 return ret;
935}
f7079f67
DG
936
937/*
938 * Begin a data pending command for a specific session id.
939 */
6151a90f 940int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
f7079f67
DG
941{
942 int ret;
943 struct lttcomm_relayd_begin_data_pending msg;
944 struct lttcomm_relayd_generic_reply reply;
945
946 /* Code flow error. Safety net. */
6151a90f 947 assert(rsock);
f7079f67
DG
948
949 DBG("Relayd begin data pending");
950
53efb85a 951 memset(&msg, 0, sizeof(msg));
f7079f67
DG
952 msg.session_id = htobe64(id);
953
954 /* Send command */
6151a90f 955 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
956 if (ret < 0) {
957 goto error;
958 }
959
20275fe8 960 /* Receive response */
6151a90f 961 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
962 if (ret < 0) {
963 goto error;
964 }
965
966 reply.ret_code = be32toh(reply.ret_code);
967
968 /* Return session id or negative ret code. */
969 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
970 ret = -1;
971 ERR("Relayd begin data pending replied error %d", reply.ret_code);
f7079f67
DG
972 goto error;
973 }
974
975 return 0;
976
977error:
978 return ret;
979}
980
981/*
982 * End a data pending command for a specific session id.
983 *
984 * Return 0 on success and set is_data_inflight to 0 if no data is being
985 * streamed or 1 if it is the case.
986 */
6151a90f 987int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
f7079f67
DG
988 unsigned int *is_data_inflight)
989{
af6c30b5 990 int ret, recv_ret;
f7079f67
DG
991 struct lttcomm_relayd_end_data_pending msg;
992 struct lttcomm_relayd_generic_reply reply;
993
994 /* Code flow error. Safety net. */
6151a90f 995 assert(rsock);
f7079f67
DG
996
997 DBG("Relayd end data pending");
998
53efb85a 999 memset(&msg, 0, sizeof(msg));
f7079f67
DG
1000 msg.session_id = htobe64(id);
1001
1002 /* Send command */
6151a90f 1003 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
1004 if (ret < 0) {
1005 goto error;
1006 }
1007
20275fe8 1008 /* Receive response */
6151a90f 1009 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
1010 if (ret < 0) {
1011 goto error;
1012 }
1013
af6c30b5
DG
1014 recv_ret = be32toh(reply.ret_code);
1015 if (recv_ret < 0) {
1016 ret = recv_ret;
f7079f67
DG
1017 goto error;
1018 }
1019
af6c30b5 1020 *is_data_inflight = recv_ret;
f7079f67 1021
af6c30b5 1022 DBG("Relayd end data pending is data inflight: %d", recv_ret);
f7079f67
DG
1023
1024 return 0;
1025
1026error:
1027 return ret;
1028}
1c20f0e2
JD
1029
1030/*
1031 * Send index to the relayd.
1032 */
1033int relayd_send_index(struct lttcomm_relayd_sock *rsock,
50adc264 1034 struct ctf_packet_index *index, uint64_t relay_stream_id,
1c20f0e2
JD
1035 uint64_t net_seq_num)
1036{
1037 int ret;
1038 struct lttcomm_relayd_index msg;
1039 struct lttcomm_relayd_generic_reply reply;
1040
1041 /* Code flow error. Safety net. */
1042 assert(rsock);
1043
1044 if (rsock->minor < 4) {
1045 DBG("Not sending indexes before protocol 2.4");
1046 ret = 0;
1047 goto error;
1048 }
1049
1050 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
1051
53efb85a 1052 memset(&msg, 0, sizeof(msg));
1c20f0e2
JD
1053 msg.relay_stream_id = htobe64(relay_stream_id);
1054 msg.net_seq_num = htobe64(net_seq_num);
1055
1056 /* The index is already in big endian. */
1057 msg.packet_size = index->packet_size;
1058 msg.content_size = index->content_size;
1059 msg.timestamp_begin = index->timestamp_begin;
1060 msg.timestamp_end = index->timestamp_end;
1061 msg.events_discarded = index->events_discarded;
1062 msg.stream_id = index->stream_id;
1063
234cd636
JD
1064 if (rsock->minor >= 8) {
1065 msg.stream_instance_id = index->stream_instance_id;
1066 msg.packet_seq_num = index->packet_seq_num;
1067 }
1068
1c20f0e2 1069 /* Send command */
f8f3885c
MD
1070 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1071 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1072 rsock->minor),
1073 lttng_to_index_minor(rsock->major, rsock->minor)),
1074 0);
1c20f0e2
JD
1075 if (ret < 0) {
1076 goto error;
1077 }
1078
1079 /* Receive response */
1080 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1081 if (ret < 0) {
1082 goto error;
1083 }
1084
1085 reply.ret_code = be32toh(reply.ret_code);
1086
1087 /* Return session id or negative ret code. */
1088 if (reply.ret_code != LTTNG_OK) {
1089 ret = -1;
1090 ERR("Relayd send index replied error %d", reply.ret_code);
1091 } else {
1092 /* Success */
1093 ret = 0;
1094 }
1095
1096error:
1097 return ret;
1098}
93ec662e
JD
1099
1100/*
1101 * Ask the relay to reset the metadata trace file (regeneration).
1102 */
1103int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1104 uint64_t stream_id, uint64_t version)
1105{
1106 int ret;
1107 struct lttcomm_relayd_reset_metadata msg;
1108 struct lttcomm_relayd_generic_reply reply;
1109
1110 /* Code flow error. Safety net. */
1111 assert(rsock);
1112
1113 /* Should have been prevented by the sessiond. */
1114 if (rsock->minor < 8) {
1115 ERR("Metadata regeneration unsupported before 2.8");
1116 ret = -1;
1117 goto error;
1118 }
1119
1120 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1121
1122 memset(&msg, 0, sizeof(msg));
1123 msg.stream_id = htobe64(stream_id);
1124 msg.version = htobe64(version);
1125
1126 /* Send command */
1127 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1128 if (ret < 0) {
1129 goto error;
1130 }
1131
1132 /* Receive response */
1133 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1134 if (ret < 0) {
1135 goto error;
1136 }
1137
1138 reply.ret_code = be32toh(reply.ret_code);
1139
1140 /* Return session id or negative ret code. */
1141 if (reply.ret_code != LTTNG_OK) {
1142 ret = -1;
1143 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1144 } else {
1145 /* Success */
1146 ret = 0;
1147 }
1148
1149 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1150
1151error:
1152 return ret;
1153}
a1ae2ea5 1154
c35f9726 1155int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
ebb29c10 1156 unsigned int stream_count, const uint64_t *new_chunk_id,
c35f9726 1157 const struct relayd_stream_rotation_position *positions)
d73bf3d7
JD
1158{
1159 int ret;
c35f9726
JG
1160 unsigned int i;
1161 struct lttng_dynamic_buffer payload;
1162 struct lttcomm_relayd_generic_reply reply = {};
1163 const struct lttcomm_relayd_rotate_streams msg = {
1164 .stream_count = htobe32((uint32_t) stream_count),
1165 .new_chunk_id = (typeof(msg.new_chunk_id)) {
1166 .is_set = !!new_chunk_id,
1167 .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
1168 },
1169 };
1170 char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
1171 const char *new_chunk_id_str;
d73bf3d7 1172
070b6a86
MD
1173 if (!relayd_supports_chunks(sock)) {
1174 DBG("Refusing to rotate remote streams: relayd does not support chunks");
1175 return 0;
1176 }
1177
c35f9726 1178 lttng_dynamic_buffer_init(&payload);
d73bf3d7 1179
c35f9726
JG
1180 /* Code flow error. Safety net. */
1181 assert(sock);
d73bf3d7 1182
c35f9726
JG
1183 if (new_chunk_id) {
1184 ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf),
1185 "%" PRIu64, *new_chunk_id);
1186 if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
1187 new_chunk_id_str = "formatting error";
1188 } else {
1189 new_chunk_id_str = new_chunk_id_buf;
1190 }
1191 } else {
1192 new_chunk_id_str = "none";
d73bf3d7
JD
1193 }
1194
c35f9726
JG
1195 DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
1196 new_chunk_id_str, stream_count);
d73bf3d7 1197
c35f9726
JG
1198 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1199 if (ret) {
1200 ERR("Failed to allocate \"rotate streams\" command payload");
d73bf3d7
JD
1201 goto error;
1202 }
1203
c35f9726
JG
1204 for (i = 0; i < stream_count; i++) {
1205 const struct relayd_stream_rotation_position *position =
1206 &positions[i];
1207 const struct lttcomm_relayd_stream_rotation_position comm_position = {
1208 .stream_id = htobe64(position->stream_id),
1209 .rotate_at_seq_num = htobe64(
1210 position->rotate_at_seq_num),
1211 };
1212
1213 DBG("Rotate stream %" PRIu64 "at sequence number %" PRIu64,
1214 position->stream_id,
1215 position->rotate_at_seq_num);
1216 ret = lttng_dynamic_buffer_append(&payload, &comm_position,
1217 sizeof(comm_position));
1218 if (ret) {
1219 ERR("Failed to allocate \"rotate streams\" command payload");
1220 goto error;
1221 }
1222 }
d73bf3d7
JD
1223
1224 /* Send command. */
c35f9726
JG
1225 ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data,
1226 payload.size, 0);
d73bf3d7 1227 if (ret < 0) {
c35f9726 1228 ERR("Failed to send \"rotate stream\" command");
d73bf3d7
JD
1229 goto error;
1230 }
1231
1232 /* Receive response. */
c35f9726 1233 ret = recv_reply(sock, &reply, sizeof(reply));
d73bf3d7 1234 if (ret < 0) {
c35f9726 1235 ERR("Failed to receive \"rotate streams\" command reply");
d73bf3d7
JD
1236 goto error;
1237 }
1238
1239 reply.ret_code = be32toh(reply.ret_code);
d73bf3d7
JD
1240 if (reply.ret_code != LTTNG_OK) {
1241 ret = -1;
c35f9726 1242 ERR("Relayd rotate streams replied error %d", reply.ret_code);
d73bf3d7
JD
1243 } else {
1244 /* Success. */
1245 ret = 0;
c35f9726 1246 DBG("Relayd rotated streams successfully");
d73bf3d7
JD
1247 }
1248
1249error:
c35f9726 1250 lttng_dynamic_buffer_reset(&payload);
d73bf3d7
JD
1251 return ret;
1252}
e5add6d0
JG
1253
1254int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
1255 struct lttng_trace_chunk *chunk)
1256{
1257 int ret = 0;
1258 enum lttng_trace_chunk_status status;
1259 struct lttcomm_relayd_create_trace_chunk msg = {};
1260 struct lttcomm_relayd_generic_reply reply = {};
1261 struct lttng_dynamic_buffer payload;
1262 uint64_t chunk_id;
1263 time_t creation_timestamp;
1264 const char *chunk_name;
1265 size_t chunk_name_length;
913a542b 1266 bool overridden_name;
e5add6d0
JG
1267
1268 lttng_dynamic_buffer_init(&payload);
1269
070b6a86
MD
1270 if (!relayd_supports_chunks(sock)) {
1271 DBG("Refusing to create remote trace chunk: relayd does not support chunks");
1272 goto end;
1273 }
1274
e5add6d0
JG
1275 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1276 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1277 ret = -1;
1278 goto end;
1279 }
1280
1281 status = lttng_trace_chunk_get_creation_timestamp(
1282 chunk, &creation_timestamp);
1283 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1284 ret = -1;
1285 goto end;
1286 }
1287
1288 status = lttng_trace_chunk_get_name(
913a542b 1289 chunk, &chunk_name, &overridden_name);
e5add6d0
JG
1290 if (status != LTTNG_TRACE_CHUNK_STATUS_OK &&
1291 status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1292 ret = -1;
1293 goto end;
1294 }
1295
913a542b 1296 chunk_name_length = overridden_name ? (strlen(chunk_name) + 1) : 0;
e5add6d0
JG
1297 msg = (typeof(msg)){
1298 .chunk_id = htobe64(chunk_id),
1299 .creation_timestamp = htobe64((uint64_t) creation_timestamp),
1300 .override_name_length = htobe32((uint32_t) chunk_name_length),
1301 };
1302
1303 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1304 if (ret) {
1305 goto end;
1306 }
1307 if (chunk_name_length) {
1308 ret = lttng_dynamic_buffer_append(
1309 &payload, chunk_name, chunk_name_length);
1310 if (ret) {
1311 goto end;
1312 }
1313 }
1314
bbc4768c
JG
1315 ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data,
1316 payload.size, 0);
e5add6d0
JG
1317 if (ret < 0) {
1318 ERR("Failed to send trace chunk creation command to relay daemon");
1319 goto end;
1320 }
1321
1322 ret = recv_reply(sock, &reply, sizeof(reply));
1323 if (ret < 0) {
1324 ERR("Failed to receive relay daemon trace chunk creation command reply");
1325 goto end;
1326 }
1327
1328 reply.ret_code = be32toh(reply.ret_code);
1329 if (reply.ret_code != LTTNG_OK) {
1330 ret = -1;
1331 ERR("Relayd trace chunk create replied error %d",
1332 reply.ret_code);
1333 } else {
1334 ret = 0;
1335 DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64,
1336 chunk_id);
1337 }
1338
1339end:
1340 lttng_dynamic_buffer_reset(&payload);
1341 return ret;
1342}
bbc4768c
JG
1343
1344int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
1345 struct lttng_trace_chunk *chunk)
1346{
1347 int ret = 0;
1348 enum lttng_trace_chunk_status status;
1349 struct lttcomm_relayd_close_trace_chunk msg = {};
1350 struct lttcomm_relayd_generic_reply reply = {};
1351 uint64_t chunk_id;
1352 time_t close_timestamp;
1353 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
1354
070b6a86
MD
1355 if (!relayd_supports_chunks(sock)) {
1356 DBG("Refusing to close remote trace chunk: relayd does not support chunks");
1357 goto end;
1358 }
1359
bbc4768c
JG
1360 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1361 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1362 ERR("Failed to get trace chunk id");
1363 ret = -1;
1364 goto end;
1365 }
1366
1367 status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
1368 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1369 ERR("Failed to get trace chunk close timestamp");
1370 ret = -1;
1371 goto end;
1372 }
1373
1374 status = lttng_trace_chunk_get_close_command(chunk,
1375 &close_command.value);
1376 switch (status) {
1377 case LTTNG_TRACE_CHUNK_STATUS_OK:
1378 close_command.is_set = 1;
1379 break;
1380 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1381 break;
1382 default:
1383 ERR("Failed to get trace chunk close command");
1384 ret = -1;
1385 goto end;
1386 }
1387
1388 msg = (typeof(msg)){
1389 .chunk_id = htobe64(chunk_id),
1390 .close_timestamp = htobe64((uint64_t) close_timestamp),
1391 .close_command = {
1392 .value = htobe32((uint32_t) close_command.value),
1393 .is_set = close_command.is_set,
1394 },
1395 };
1396
1397 ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg),
1398 0);
1399 if (ret < 0) {
1400 ERR("Failed to send trace chunk close command to relay daemon");
1401 goto end;
1402 }
1403
1404 ret = recv_reply(sock, &reply, sizeof(reply));
1405 if (ret < 0) {
1406 ERR("Failed to receive relay daemon trace chunk close command reply");
1407 goto end;
1408 }
1409
1410 reply.ret_code = be32toh(reply.ret_code);
1411 if (reply.ret_code != LTTNG_OK) {
1412 ret = -1;
1413 ERR("Relayd trace chunk close replied error %d",
1414 reply.ret_code);
1415 } else {
1416 ret = 0;
1417 DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
1418 chunk_id);
1419 }
1420end:
1421 return ret;
1422}
c35f9726
JG
1423
1424int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
1425 uint64_t chunk_id, bool *chunk_exists)
1426{
1427 int ret = 0;
1428 struct lttcomm_relayd_trace_chunk_exists msg = {};
1429 struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
1430
070b6a86
MD
1431 if (!relayd_supports_chunks(sock)) {
1432 DBG("Refusing to check for trace chunk existence: relayd does not support chunks");
1433 goto end;
1434 }
1435
c35f9726
JG
1436 msg = (typeof(msg)){
1437 .chunk_id = htobe64(chunk_id),
1438 };
1439
1440 ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg),
1441 0);
1442 if (ret < 0) {
1443 ERR("Failed to send trace chunk exists command to relay daemon");
1444 goto end;
1445 }
1446
1447 ret = recv_reply(sock, &reply, sizeof(reply));
1448 if (ret < 0) {
1449 ERR("Failed to receive relay daemon trace chunk close command reply");
1450 goto end;
1451 }
1452
1453 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1454 if (reply.generic.ret_code != LTTNG_OK) {
1455 ret = -1;
1456 ERR("Relayd trace chunk close replied error %d",
1457 reply.generic.ret_code);
1458 } else {
1459 ret = 0;
1460 DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
1461 ", exists = %s", chunk_id,
1462 reply.trace_chunk_exists ? "true" : "false");
1463 *chunk_exists = !!reply.trace_chunk_exists;
1464 }
1465end:
1466 return ret;
1467}
This page took 0.189471 seconds and 4 git commands to generate.