Fix: trace_archive_id is not sent in add_stream command
[lttng-tools.git] / src / common / relayd / relayd.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
6c1c0768 18#define _LGPL_SOURCE
00e2e675
DG
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
77c7c900 24#include <inttypes.h>
00e2e675
DG
25
26#include <common/common.h>
27#include <common/defaults.h>
f263b7fd 28#include <common/compat/endian.h>
27283937 29#include <common/compat/string.h>
00e2e675 30#include <common/sessiond-comm/relayd.h>
50adc264 31#include <common/index/ctf-index.h>
00e2e675
DG
32
33#include "relayd.h"
34
35/*
36 * Send command. Fill up the header and append the data.
37 */
6151a90f 38static int send_command(struct lttcomm_relayd_sock *rsock,
76b9afaa 39 enum lttcomm_relayd_command cmd, const void *data, size_t size,
00e2e675
DG
40 int flags)
41{
42 int ret;
43 struct lttcomm_relayd_hdr header;
44 char *buf;
45 uint64_t buf_size = sizeof(header);
46
f96e4545
MD
47 if (rsock->sock.fd < 0) {
48 return -ECONNRESET;
49 }
50
00e2e675
DG
51 if (data) {
52 buf_size += size;
53 }
54
55 buf = zmalloc(buf_size);
56 if (buf == NULL) {
57 PERROR("zmalloc relayd send command buf");
58 ret = -1;
59 goto alloc_error;
60 }
61
53efb85a 62 memset(&header, 0, sizeof(header));
00e2e675
DG
63 header.cmd = htobe32(cmd);
64 header.data_size = htobe64(size);
65
66 /* Zeroed for now since not used. */
67 header.cmd_version = 0;
68 header.circuit_id = 0;
69
70 /* Prepare buffer to send. */
71 memcpy(buf, &header, sizeof(header));
72 if (data) {
73 memcpy(buf + sizeof(header), data, size);
74 }
75
06586bbe 76 DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
6151a90f 77 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
00e2e675 78 if (ret < 0) {
06586bbe
JG
79 PERROR("Failed to send command %d of size %" PRIu64,
80 (int) cmd, buf_size);
8994307f 81 ret = -errno;
00e2e675
DG
82 goto error;
83 }
00e2e675
DG
84error:
85 free(buf);
86alloc_error:
87 return ret;
88}
89
90/*
91 * Receive reply data on socket. This MUST be call after send_command or else
92 * could result in unexpected behavior(s).
93 */
6151a90f 94static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
00e2e675
DG
95{
96 int ret;
97
f96e4545
MD
98 if (rsock->sock.fd < 0) {
99 return -ECONNRESET;
100 }
101
8fd623e0 102 DBG3("Relayd waiting for reply of size %zu", size);
00e2e675 103
6151a90f 104 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
20275fe8
DG
105 if (ret <= 0 || ret != size) {
106 if (ret == 0) {
107 /* Orderly shutdown. */
6151a90f 108 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
20275fe8 109 } else {
8fd623e0 110 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
6151a90f 111 rsock->sock.fd, size, ret);
20275fe8
DG
112 }
113 /* Always return -1 here and the caller can use errno. */
114 ret = -1;
00e2e675
DG
115 goto error;
116 }
117
118error:
119 return ret;
120}
121
d3e2ba59 122/*
f86f6389
JR
123 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name & hostname)
124 * have no length restriction on the sender side.
125 * Length for both payloads is stored in the msg struct. A new dynamic size
126 * payload size is introduced.
127 */
128static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
129 char *session_name, char *hostname,
130 int session_live_timer, unsigned int snapshot)
131{
132 int ret;
133 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
134 size_t session_name_len;
135 size_t hostname_len;
136 size_t msg_length;
137
138 /* The two names are sent with a '\0' delimiter between them. */
139 session_name_len = strlen(session_name) + 1;
140 hostname_len = strlen(hostname) + 1;
141
142 msg_length = sizeof(*msg) + session_name_len + hostname_len;
143 msg = zmalloc(msg_length);
144 if (!msg) {
145 PERROR("zmalloc create_session_2_11 command message");
146 ret = -1;
147 goto error;
148 }
149
150 assert(session_name_len <= UINT32_MAX);
151 msg->session_name_len = htobe32(session_name_len);
152
153 assert(hostname_len <= UINT32_MAX);
154 msg->hostname_len = htobe32(hostname_len);
155
156 if (lttng_strncpy(msg->names, session_name, session_name_len)) {
157 ret = -1;
158 goto error;
159 }
160 if (lttng_strncpy(msg->names + session_name_len, hostname, hostname_len)) {
161 ret = -1;
162 goto error;
163 }
164
165 msg->live_timer = htobe32(session_live_timer);
166 msg->snapshot = !!snapshot;
167
168 /* Send command */
169 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
170 if (ret < 0) {
171 goto error;
172 }
173error:
174 free(msg);
175 return ret;
176}
177/*
178 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
d3e2ba59
JD
179 * support the live reading capability.
180 */
181static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
42e9a27b
JR
182 char *session_name, char *hostname, int session_live_timer,
183 unsigned int snapshot)
d3e2ba59
JD
184{
185 int ret;
186 struct lttcomm_relayd_create_session_2_4 msg;
187
3a13ffd5
MD
188 if (lttng_strncpy(msg.session_name, session_name,
189 sizeof(msg.session_name))) {
246777db
MD
190 ret = -1;
191 goto error;
192 }
3a13ffd5 193 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
246777db
MD
194 ret = -1;
195 goto error;
196 }
d3e2ba59 197 msg.live_timer = htobe32(session_live_timer);
7d2f7452 198 msg.snapshot = htobe32(snapshot);
d3e2ba59
JD
199
200 /* Send command */
201 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
202 if (ret < 0) {
203 goto error;
204 }
205
206error:
207 return ret;
208}
209
210/*
211 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
212 */
42e9a27b 213static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock)
d3e2ba59
JD
214{
215 int ret;
216
217 /* Send command */
218 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
219 if (ret < 0) {
220 goto error;
221 }
222
223error:
224 return ret;
225}
226
c5b6f4f0
DG
227/*
228 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
229 * set session_id of the relayd if we have a successful reply from the relayd.
230 *
20275fe8
DG
231 * On success, return 0 else a negative value which is either an errno error or
232 * a lttng error code from the relayd.
c5b6f4f0 233 */
d3e2ba59 234int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id,
7d2f7452
DG
235 char *session_name, char *hostname, int session_live_timer,
236 unsigned int snapshot)
c5b6f4f0
DG
237{
238 int ret;
239 struct lttcomm_relayd_status_session reply;
240
6151a90f 241 assert(rsock);
c5b6f4f0
DG
242 assert(session_id);
243
244 DBG("Relayd create session");
245
f86f6389
JR
246 if (rsock->minor < 4) {
247 /* From 2.1 to 2.3 */
248 ret = relayd_create_session_2_1(rsock);
249 } else if (rsock->minor >= 4 && rsock->minor < 11) {
250 /* From 2.4 to 2.10 */
251 ret = relayd_create_session_2_4(rsock, session_name,
252 hostname, session_live_timer, snapshot);
253 } else {
254 /* From 2.11 to ... */
255 ret = relayd_create_session_2_11(rsock, session_name,
256 hostname, session_live_timer, snapshot);
d3e2ba59
JD
257 }
258
c5b6f4f0
DG
259 if (ret < 0) {
260 goto error;
261 }
262
20275fe8 263 /* Receive response */
6151a90f 264 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c5b6f4f0
DG
265 if (ret < 0) {
266 goto error;
267 }
268
269 reply.session_id = be64toh(reply.session_id);
270 reply.ret_code = be32toh(reply.ret_code);
271
272 /* Return session id or negative ret code. */
273 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
274 ret = -1;
275 ERR("Relayd create session replied error %d", reply.ret_code);
c5b6f4f0
DG
276 goto error;
277 } else {
278 ret = 0;
279 *session_id = reply.session_id;
280 }
281
282 DBG("Relayd session created with id %" PRIu64, reply.session_id);
283
284error:
285 return ret;
286}
287
2f21a469
JR
288static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
289 const char *channel_name, const char *pathname)
290{
291 int ret;
292 struct lttcomm_relayd_add_stream msg;
293
294 memset(&msg, 0, sizeof(msg));
295 if (lttng_strncpy(msg.channel_name, channel_name,
296 sizeof(msg.channel_name))) {
297 ret = -1;
298 goto error;
299 }
300
301 if (lttng_strncpy(msg.pathname, pathname,
302 sizeof(msg.pathname))) {
303 ret = -1;
304 goto error;
305 }
306
307 /* Send command */
308 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
309 if (ret < 0) {
310 ret = -1;
311 goto error;
312 }
313 ret = 0;
314error:
315 return ret;
316}
317
318static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
319 const char *channel_name, const char *pathname,
320 uint64_t tracefile_size, uint64_t tracefile_count)
321{
322 int ret;
323 struct lttcomm_relayd_add_stream_2_2 msg;
324
325 memset(&msg, 0, sizeof(msg));
326 /* Compat with relayd 2.2 to 2.10 */
327 if (lttng_strncpy(msg.channel_name, channel_name,
328 sizeof(msg.channel_name))) {
329 ret = -1;
330 goto error;
331 }
332 if (lttng_strncpy(msg.pathname, pathname,
333 sizeof(msg.pathname))) {
334 ret = -1;
335 goto error;
336 }
337 msg.tracefile_size = htobe64(tracefile_size);
338 msg.tracefile_count = htobe64(tracefile_count);
339
340 /* Send command */
341 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
342 if (ret < 0) {
343 goto error;
344 }
345 ret = 0;
346error:
347 return ret;
348}
349
350static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
351 const char *channel_name, const char *pathname,
b00e554e
JG
352 uint64_t tracefile_size, uint64_t tracefile_count,
353 uint64_t trace_archive_id)
2f21a469
JR
354{
355 int ret;
356 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
357 size_t channel_name_len;
358 size_t pathname_len;
359 size_t msg_length;
360
361 /* The two names are sent with a '\0' delimiter between them. */
362 channel_name_len = strlen(channel_name) + 1;
363 pathname_len = strlen(pathname) + 1;
364
365 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
366 msg = zmalloc(msg_length);
367 if (!msg) {
368 PERROR("zmalloc add_stream_2_11 command message");
369 ret = -1;
370 goto error;
371 }
372
373 assert(channel_name_len <= UINT32_MAX);
374 msg->channel_name_len = htobe32(channel_name_len);
375
376 assert(pathname_len <= UINT32_MAX);
377 msg->pathname_len = htobe32(pathname_len);
378
379 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
380 ret = -1;
381 goto error;
382 }
383 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
384 ret = -1;
385 goto error;
386 }
387
388 msg->tracefile_size = htobe64(tracefile_size);
389 msg->tracefile_count = htobe64(tracefile_count);
b00e554e 390 msg->trace_archive_id = htobe64(trace_archive_id);
2f21a469
JR
391
392 /* Send command */
393 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
394 if (ret < 0) {
395 goto error;
396 }
397 ret = 0;
398error:
399 free(msg);
400 return ret;
401}
402
00e2e675
DG
403/*
404 * Add stream on the relayd and assign stream handle to the stream_id argument.
405 *
406 * On success return 0 else return ret_code negative value.
407 */
6151a90f 408int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
0f907de1 409 const char *pathname, uint64_t *stream_id,
0b50e4b3
JG
410 uint64_t tracefile_size, uint64_t tracefile_count,
411 uint64_t trace_archive_id)
00e2e675
DG
412{
413 int ret;
00e2e675
DG
414 struct lttcomm_relayd_status_stream reply;
415
416 /* Code flow error. Safety net. */
6151a90f 417 assert(rsock);
00e2e675
DG
418 assert(channel_name);
419 assert(pathname);
420
421 DBG("Relayd adding stream for channel name %s", channel_name);
422
0f907de1
JD
423 /* Compat with relayd 2.1 */
424 if (rsock->minor == 1) {
2f21a469
JR
425 /* For 2.1 */
426 ret = relayd_add_stream_2_1(rsock, channel_name, pathname);
427
428 } else if (rsock->minor > 1 && rsock->minor < 11) {
429 /* From 2.2 to 2.10 */
430 ret = relayd_add_stream_2_2(rsock, channel_name, pathname,
431 tracefile_size, tracefile_count);
0f907de1 432 } else {
2f21a469
JR
433 /* From 2.11 to ...*/
434 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
b00e554e
JG
435 tracefile_size, tracefile_count,
436 trace_archive_id);
2f21a469 437 }
0f907de1 438
2f21a469
JR
439 if (ret) {
440 ret = -1;
441 goto error;
00e2e675
DG
442 }
443
633d0084 444 /* Waiting for reply */
6151a90f 445 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
00e2e675
DG
446 if (ret < 0) {
447 goto error;
448 }
449
450 /* Back to host bytes order. */
451 reply.handle = be64toh(reply.handle);
452 reply.ret_code = be32toh(reply.ret_code);
453
454 /* Return session id or negative ret code. */
f73fabfd 455 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
456 ret = -1;
457 ERR("Relayd add stream replied error %d", reply.ret_code);
00e2e675
DG
458 } else {
459 /* Success */
460 ret = 0;
461 *stream_id = reply.handle;
462 }
463
77c7c900 464 DBG("Relayd stream added successfully with handle %" PRIu64,
2f21a469 465 reply.handle);
00e2e675
DG
466
467error:
468 return ret;
469}
470
a4baae1b
JD
471/*
472 * Inform the relay that all the streams for the current channel has been sent.
473 *
474 * On success return 0 else return ret_code negative value.
475 */
476int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
477{
478 int ret;
479 struct lttcomm_relayd_generic_reply reply;
480
481 /* Code flow error. Safety net. */
482 assert(rsock);
483
484 DBG("Relayd sending streams sent.");
485
486 /* This feature was introduced in 2.4, ignore it for earlier versions. */
487 if (rsock->minor < 4) {
488 ret = 0;
489 goto end;
490 }
491
492 /* Send command */
493 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
494 if (ret < 0) {
495 goto error;
496 }
497
498 /* Waiting for reply */
499 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
500 if (ret < 0) {
501 goto error;
502 }
503
504 /* Back to host bytes order. */
505 reply.ret_code = be32toh(reply.ret_code);
506
507 /* Return session id or negative ret code. */
508 if (reply.ret_code != LTTNG_OK) {
509 ret = -1;
510 ERR("Relayd streams sent replied error %d", reply.ret_code);
511 goto error;
512 } else {
513 /* Success */
514 ret = 0;
515 }
516
517 DBG("Relayd streams sent success");
518
519error:
520end:
521 return ret;
522}
523
00e2e675
DG
524/*
525 * Check version numbers on the relayd.
d4519fa3
JD
526 * If major versions are compatible, we assign minor_to_use to the
527 * minor version of the procotol we are going to use for this session.
00e2e675 528 *
67d5aa28
JD
529 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
530 * otherwise, or a negative value on network errors.
00e2e675 531 */
6151a90f 532int relayd_version_check(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
533{
534 int ret;
092b6259 535 struct lttcomm_relayd_version msg;
00e2e675
DG
536
537 /* Code flow error. Safety net. */
6151a90f 538 assert(rsock);
00e2e675 539
6151a90f
JD
540 DBG("Relayd version check for major.minor %u.%u", rsock->major,
541 rsock->minor);
00e2e675 542
53efb85a 543 memset(&msg, 0, sizeof(msg));
092b6259 544 /* Prepare network byte order before transmission. */
6151a90f
JD
545 msg.major = htobe32(rsock->major);
546 msg.minor = htobe32(rsock->minor);
092b6259 547
00e2e675 548 /* Send command */
6151a90f 549 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
00e2e675
DG
550 if (ret < 0) {
551 goto error;
552 }
553
20275fe8 554 /* Receive response */
6151a90f 555 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
00e2e675
DG
556 if (ret < 0) {
557 goto error;
558 }
559
560 /* Set back to host bytes order */
092b6259
DG
561 msg.major = be32toh(msg.major);
562 msg.minor = be32toh(msg.minor);
563
564 /*
565 * Only validate the major version. If the other side is higher,
566 * communication is not possible. Only major version equal can talk to each
567 * other. If the minor version differs, the lowest version is used by both
568 * sides.
092b6259 569 */
6151a90f 570 if (msg.major != rsock->major) {
d4519fa3 571 /* Not compatible */
67d5aa28 572 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
d4519fa3 573 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
6151a90f 574 msg.major, rsock->major);
092b6259 575 goto error;
00e2e675
DG
576 }
577
092b6259 578 /*
6151a90f
JD
579 * If the relayd's minor version is higher, it will adapt to our version so
580 * we can continue to use the latest relayd communication data structure.
581 * If the received minor version is higher, the relayd should adapt to us.
092b6259 582 */
6151a90f
JD
583 if (rsock->minor > msg.minor) {
584 rsock->minor = msg.minor;
d4519fa3 585 }
092b6259 586
d4519fa3
JD
587 /* Version number compatible */
588 DBG2("Relayd version is compatible, using protocol version %u.%u",
6151a90f 589 rsock->major, rsock->minor);
d4519fa3 590 ret = 0;
00e2e675
DG
591
592error:
593 return ret;
594}
595
00e2e675
DG
596/*
597 * Add stream on the relayd and assign stream handle to the stream_id argument.
598 *
599 * On success return 0 else return ret_code negative value.
600 */
6151a90f 601int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
00e2e675
DG
602{
603 int ret;
604
605 /* Code flow error. Safety net. */
6151a90f 606 assert(rsock);
00e2e675 607
77c7c900 608 DBG("Relayd sending metadata of size %zu", len);
00e2e675
DG
609
610 /* Send command */
6151a90f 611 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
00e2e675
DG
612 if (ret < 0) {
613 goto error;
614 }
615
616 DBG2("Relayd metadata added successfully");
617
618 /*
619 * After that call, the metadata data MUST be sent to the relayd so the
620 * receive size on the other end matches the len of the metadata packet
633d0084 621 * header. This is why we don't wait for a reply here.
00e2e675
DG
622 */
623
624error:
625 return ret;
626}
627
628/*
6151a90f 629 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
00e2e675 630 */
6151a90f 631int relayd_connect(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
632{
633 /* Code flow error. Safety net. */
6151a90f 634 assert(rsock);
00e2e675 635
f96e4545
MD
636 if (!rsock->sock.ops) {
637 /*
638 * Attempting a connect on a non-initialized socket.
639 */
640 return -ECONNRESET;
641 }
642
00e2e675
DG
643 DBG3("Relayd connect ...");
644
6151a90f 645 return rsock->sock.ops->connect(&rsock->sock);
00e2e675
DG
646}
647
648/*
6151a90f 649 * Close relayd socket with an allocated lttcomm_relayd_sock.
ffe60014
DG
650 *
651 * If no socket operations are found, simply return 0 meaning that everything
652 * is fine. Without operations, the socket can not possibly be opened or used.
653 * This is possible if the socket was allocated but not created. However, the
654 * caller could simply use it to store a valid file descriptor for instance
655 * passed over a Unix socket and call this to cleanup but still without a valid
656 * ops pointer.
657 *
658 * Return the close returned value. On error, a negative value is usually
659 * returned back from close(2).
00e2e675 660 */
6151a90f 661int relayd_close(struct lttcomm_relayd_sock *rsock)
00e2e675 662{
ffe60014
DG
663 int ret;
664
00e2e675 665 /* Code flow error. Safety net. */
6151a90f 666 assert(rsock);
00e2e675 667
ffe60014 668 /* An invalid fd is fine, return success. */
6151a90f 669 if (rsock->sock.fd < 0) {
ffe60014
DG
670 ret = 0;
671 goto end;
672 }
673
6151a90f 674 DBG3("Relayd closing socket %d", rsock->sock.fd);
00e2e675 675
6151a90f
JD
676 if (rsock->sock.ops) {
677 ret = rsock->sock.ops->close(&rsock->sock);
ffe60014
DG
678 } else {
679 /* Default call if no specific ops found. */
6151a90f 680 ret = close(rsock->sock.fd);
ffe60014
DG
681 if (ret < 0) {
682 PERROR("relayd_close default close");
683 }
684 }
f96e4545 685 rsock->sock.fd = -1;
ffe60014
DG
686
687end:
688 return ret;
00e2e675
DG
689}
690
691/*
692 * Send data header structure to the relayd.
693 */
6151a90f 694int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
00e2e675
DG
695 struct lttcomm_relayd_data_hdr *hdr, size_t size)
696{
697 int ret;
698
699 /* Code flow error. Safety net. */
6151a90f 700 assert(rsock);
00e2e675
DG
701 assert(hdr);
702
f96e4545
MD
703 if (rsock->sock.fd < 0) {
704 return -ECONNRESET;
705 }
706
8fd623e0 707 DBG3("Relayd sending data header of size %zu", size);
00e2e675
DG
708
709 /* Again, safety net */
710 if (size == 0) {
711 size = sizeof(struct lttcomm_relayd_data_hdr);
712 }
713
714 /* Only send data header. */
6151a90f 715 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
00e2e675 716 if (ret < 0) {
8994307f 717 ret = -errno;
00e2e675
DG
718 goto error;
719 }
720
721 /*
722 * The data MUST be sent right after that command for the receive on the
723 * other end to match the size in the header.
724 */
725
726error:
727 return ret;
728}
173af62f
DG
729
730/*
731 * Send close stream command to the relayd.
732 */
6151a90f 733int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
173af62f
DG
734 uint64_t last_net_seq_num)
735{
736 int ret;
737 struct lttcomm_relayd_close_stream msg;
738 struct lttcomm_relayd_generic_reply reply;
739
740 /* Code flow error. Safety net. */
6151a90f 741 assert(rsock);
173af62f 742
77c7c900 743 DBG("Relayd closing stream id %" PRIu64, stream_id);
173af62f 744
53efb85a 745 memset(&msg, 0, sizeof(msg));
173af62f
DG
746 msg.stream_id = htobe64(stream_id);
747 msg.last_net_seq_num = htobe64(last_net_seq_num);
748
749 /* Send command */
6151a90f 750 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
173af62f
DG
751 if (ret < 0) {
752 goto error;
753 }
754
20275fe8 755 /* Receive response */
6151a90f 756 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
173af62f
DG
757 if (ret < 0) {
758 goto error;
759 }
760
761 reply.ret_code = be32toh(reply.ret_code);
762
763 /* Return session id or negative ret code. */
f73fabfd 764 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
765 ret = -1;
766 ERR("Relayd close stream replied error %d", reply.ret_code);
173af62f
DG
767 } else {
768 /* Success */
769 ret = 0;
770 }
771
77c7c900 772 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
173af62f
DG
773
774error:
775 return ret;
776}
c8f59ee5
DG
777
778/*
779 * Check for data availability for a given stream id.
780 *
6d805429 781 * Return 0 if NOT pending, 1 if so and a negative value on error.
c8f59ee5 782 */
6151a90f 783int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
c8f59ee5
DG
784 uint64_t last_net_seq_num)
785{
786 int ret;
6d805429 787 struct lttcomm_relayd_data_pending msg;
c8f59ee5
DG
788 struct lttcomm_relayd_generic_reply reply;
789
790 /* Code flow error. Safety net. */
6151a90f 791 assert(rsock);
c8f59ee5 792
6d805429 793 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
c8f59ee5 794
53efb85a 795 memset(&msg, 0, sizeof(msg));
c8f59ee5
DG
796 msg.stream_id = htobe64(stream_id);
797 msg.last_net_seq_num = htobe64(last_net_seq_num);
798
799 /* Send command */
6151a90f 800 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
c8f59ee5
DG
801 sizeof(msg), 0);
802 if (ret < 0) {
803 goto error;
804 }
805
20275fe8 806 /* Receive response */
6151a90f 807 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
808 if (ret < 0) {
809 goto error;
810 }
811
812 reply.ret_code = be32toh(reply.ret_code);
813
814 /* Return session id or negative ret code. */
815 if (reply.ret_code >= LTTNG_OK) {
bb63afd9 816 ERR("Relayd data pending replied error %d", reply.ret_code);
c8f59ee5
DG
817 }
818
819 /* At this point, the ret code is either 1 or 0 */
820 ret = reply.ret_code;
821
6d805429 822 DBG("Relayd data is %s pending for stream id %" PRIu64,
9dd26bb9 823 ret == 1 ? "" : "NOT", stream_id);
c8f59ee5
DG
824
825error:
826 return ret;
827}
828
829/*
830 * Check on the relayd side for a quiescent state on the control socket.
831 */
6151a90f 832int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
ad7051c0 833 uint64_t metadata_stream_id)
c8f59ee5
DG
834{
835 int ret;
ad7051c0 836 struct lttcomm_relayd_quiescent_control msg;
c8f59ee5
DG
837 struct lttcomm_relayd_generic_reply reply;
838
839 /* Code flow error. Safety net. */
6151a90f 840 assert(rsock);
c8f59ee5
DG
841
842 DBG("Relayd checking quiescent control state");
843
53efb85a 844 memset(&msg, 0, sizeof(msg));
ad7051c0
DG
845 msg.stream_id = htobe64(metadata_stream_id);
846
c8f59ee5 847 /* Send command */
6151a90f 848 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
c8f59ee5
DG
849 if (ret < 0) {
850 goto error;
851 }
852
20275fe8 853 /* Receive response */
6151a90f 854 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
855 if (ret < 0) {
856 goto error;
857 }
858
859 reply.ret_code = be32toh(reply.ret_code);
860
861 /* Return session id or negative ret code. */
862 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
863 ret = -1;
864 ERR("Relayd quiescent control replied error %d", reply.ret_code);
c8f59ee5
DG
865 goto error;
866 }
867
868 /* Control socket is quiescent */
6d805429 869 return 0;
c8f59ee5
DG
870
871error:
872 return ret;
873}
f7079f67
DG
874
875/*
876 * Begin a data pending command for a specific session id.
877 */
6151a90f 878int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
f7079f67
DG
879{
880 int ret;
881 struct lttcomm_relayd_begin_data_pending msg;
882 struct lttcomm_relayd_generic_reply reply;
883
884 /* Code flow error. Safety net. */
6151a90f 885 assert(rsock);
f7079f67
DG
886
887 DBG("Relayd begin data pending");
888
53efb85a 889 memset(&msg, 0, sizeof(msg));
f7079f67
DG
890 msg.session_id = htobe64(id);
891
892 /* Send command */
6151a90f 893 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
894 if (ret < 0) {
895 goto error;
896 }
897
20275fe8 898 /* Receive response */
6151a90f 899 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
900 if (ret < 0) {
901 goto error;
902 }
903
904 reply.ret_code = be32toh(reply.ret_code);
905
906 /* Return session id or negative ret code. */
907 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
908 ret = -1;
909 ERR("Relayd begin data pending replied error %d", reply.ret_code);
f7079f67
DG
910 goto error;
911 }
912
913 return 0;
914
915error:
916 return ret;
917}
918
919/*
920 * End a data pending command for a specific session id.
921 *
922 * Return 0 on success and set is_data_inflight to 0 if no data is being
923 * streamed or 1 if it is the case.
924 */
6151a90f 925int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
f7079f67
DG
926 unsigned int *is_data_inflight)
927{
af6c30b5 928 int ret, recv_ret;
f7079f67
DG
929 struct lttcomm_relayd_end_data_pending msg;
930 struct lttcomm_relayd_generic_reply reply;
931
932 /* Code flow error. Safety net. */
6151a90f 933 assert(rsock);
f7079f67
DG
934
935 DBG("Relayd end data pending");
936
53efb85a 937 memset(&msg, 0, sizeof(msg));
f7079f67
DG
938 msg.session_id = htobe64(id);
939
940 /* Send command */
6151a90f 941 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
942 if (ret < 0) {
943 goto error;
944 }
945
20275fe8 946 /* Receive response */
6151a90f 947 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
948 if (ret < 0) {
949 goto error;
950 }
951
af6c30b5
DG
952 recv_ret = be32toh(reply.ret_code);
953 if (recv_ret < 0) {
954 ret = recv_ret;
f7079f67
DG
955 goto error;
956 }
957
af6c30b5 958 *is_data_inflight = recv_ret;
f7079f67 959
af6c30b5 960 DBG("Relayd end data pending is data inflight: %d", recv_ret);
f7079f67
DG
961
962 return 0;
963
964error:
965 return ret;
966}
1c20f0e2
JD
967
968/*
969 * Send index to the relayd.
970 */
971int relayd_send_index(struct lttcomm_relayd_sock *rsock,
50adc264 972 struct ctf_packet_index *index, uint64_t relay_stream_id,
1c20f0e2
JD
973 uint64_t net_seq_num)
974{
975 int ret;
976 struct lttcomm_relayd_index msg;
977 struct lttcomm_relayd_generic_reply reply;
978
979 /* Code flow error. Safety net. */
980 assert(rsock);
981
982 if (rsock->minor < 4) {
983 DBG("Not sending indexes before protocol 2.4");
984 ret = 0;
985 goto error;
986 }
987
988 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
989
53efb85a 990 memset(&msg, 0, sizeof(msg));
1c20f0e2
JD
991 msg.relay_stream_id = htobe64(relay_stream_id);
992 msg.net_seq_num = htobe64(net_seq_num);
993
994 /* The index is already in big endian. */
995 msg.packet_size = index->packet_size;
996 msg.content_size = index->content_size;
997 msg.timestamp_begin = index->timestamp_begin;
998 msg.timestamp_end = index->timestamp_end;
999 msg.events_discarded = index->events_discarded;
1000 msg.stream_id = index->stream_id;
1001
234cd636
JD
1002 if (rsock->minor >= 8) {
1003 msg.stream_instance_id = index->stream_instance_id;
1004 msg.packet_seq_num = index->packet_seq_num;
1005 }
1006
1c20f0e2 1007 /* Send command */
f8f3885c
MD
1008 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1009 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1010 rsock->minor),
1011 lttng_to_index_minor(rsock->major, rsock->minor)),
1012 0);
1c20f0e2
JD
1013 if (ret < 0) {
1014 goto error;
1015 }
1016
1017 /* Receive response */
1018 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1019 if (ret < 0) {
1020 goto error;
1021 }
1022
1023 reply.ret_code = be32toh(reply.ret_code);
1024
1025 /* Return session id or negative ret code. */
1026 if (reply.ret_code != LTTNG_OK) {
1027 ret = -1;
1028 ERR("Relayd send index replied error %d", reply.ret_code);
1029 } else {
1030 /* Success */
1031 ret = 0;
1032 }
1033
1034error:
1035 return ret;
1036}
93ec662e
JD
1037
1038/*
1039 * Ask the relay to reset the metadata trace file (regeneration).
1040 */
1041int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1042 uint64_t stream_id, uint64_t version)
1043{
1044 int ret;
1045 struct lttcomm_relayd_reset_metadata msg;
1046 struct lttcomm_relayd_generic_reply reply;
1047
1048 /* Code flow error. Safety net. */
1049 assert(rsock);
1050
1051 /* Should have been prevented by the sessiond. */
1052 if (rsock->minor < 8) {
1053 ERR("Metadata regeneration unsupported before 2.8");
1054 ret = -1;
1055 goto error;
1056 }
1057
1058 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1059
1060 memset(&msg, 0, sizeof(msg));
1061 msg.stream_id = htobe64(stream_id);
1062 msg.version = htobe64(version);
1063
1064 /* Send command */
1065 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1066 if (ret < 0) {
1067 goto error;
1068 }
1069
1070 /* Receive response */
1071 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1072 if (ret < 0) {
1073 goto error;
1074 }
1075
1076 reply.ret_code = be32toh(reply.ret_code);
1077
1078 /* Return session id or negative ret code. */
1079 if (reply.ret_code != LTTNG_OK) {
1080 ret = -1;
1081 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1082 } else {
1083 /* Success */
1084 ret = 0;
1085 }
1086
1087 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1088
1089error:
1090 return ret;
1091}
a1ae2ea5 1092
d73bf3d7
JD
1093int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
1094 const char *new_pathname, uint64_t new_chunk_id,
1095 uint64_t seq_num)
1096{
1097 int ret;
1098 struct lttcomm_relayd_rotate_stream *msg = NULL;
1099 struct lttcomm_relayd_generic_reply reply;
1100 size_t len;
1101 int msg_len;
1102
1103 /* Code flow error. Safety net. */
1104 assert(rsock);
1105
1106 DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id);
1107
1108 /* Account for the trailing NULL. */
27283937 1109 len = lttng_strnlen(new_pathname, LTTNG_PATH_MAX) + 1;
d73bf3d7
JD
1110 if (len > LTTNG_PATH_MAX) {
1111 ERR("Path used in relayd rotate stream command exceeds the maximal allowed length");
1112 ret = -1;
1113 goto error;
1114 }
1115
1116 msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len;
1117 msg = zmalloc(msg_len);
1118 if (!msg) {
1119 PERROR("Failed to allocate relayd rotate stream command of %d bytes",
1120 msg_len);
1121 ret = -1;
1122 goto error;
1123 }
1124
1125 if (lttng_strncpy(msg->new_pathname, new_pathname, len)) {
1126 ret = -1;
1127 ERR("Failed to copy relayd rotate stream command's new path name");
1128 goto error;
1129 }
1130
1131 msg->pathname_length = htobe32(len);
1132 msg->stream_id = htobe64(stream_id);
1133 msg->new_chunk_id = htobe64(new_chunk_id);
1134 /*
1135 * The seq_num is invalid for metadata streams, but it is ignored on
1136 * the relay.
1137 */
1138 msg->rotate_at_seq_num = htobe64(seq_num);
1139
1140 /* Send command. */
1141 ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0);
1142 if (ret < 0) {
1143 ERR("Send rotate command");
1144 goto error;
1145 }
1146
1147 /* Receive response. */
1148 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1149 if (ret < 0) {
1150 ERR("Receive rotate reply");
1151 goto error;
1152 }
1153
1154 reply.ret_code = be32toh(reply.ret_code);
1155
1156 /* Return session id or negative ret code. */
1157 if (reply.ret_code != LTTNG_OK) {
1158 ret = -1;
1159 ERR("Relayd rotate stream replied error %d", reply.ret_code);
1160 } else {
1161 /* Success. */
1162 ret = 0;
1163 DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
1164 }
1165
1166error:
1167 free(msg);
1168 return ret;
1169}
1170
00fb02ac
JD
1171int relayd_rotate_rename(struct lttcomm_relayd_sock *rsock,
1172 const char *old_path, const char *new_path)
1173{
1174 int ret;
1175 struct lttcomm_relayd_rotate_rename *msg = NULL;
1176 struct lttcomm_relayd_generic_reply reply;
1177 size_t old_path_length, new_path_length;
1178 size_t msg_length;
1179
1180 /* Code flow error. Safety net. */
1181 assert(rsock);
1182
1183 DBG("Relayd rename chunk %s to %s", old_path, new_path);
1184
1185 /* The two paths are sent with a '\0' delimiter between them. */
1186 old_path_length = strlen(old_path) + 1;
1187 new_path_length = strlen(new_path) + 1;
1188
1189 msg_length = sizeof(*msg) + old_path_length + new_path_length;
1190 msg = zmalloc(msg_length);
1191 if (!msg) {
1192 PERROR("zmalloc rotate-rename command message");
1193 ret = -1;
1194 goto error;
1195 }
1196
1197 assert(old_path_length <= UINT32_MAX);
1198 msg->old_path_length = htobe32(old_path_length);
1199
1200 assert(new_path_length <= UINT32_MAX);
1201 msg->new_path_length = htobe32(new_path_length);
1202
1203 strcpy(msg->paths, old_path);
1204 strcpy(msg->paths + old_path_length, new_path);
1205
1206 /* Send command */
1207 ret = send_command(rsock, RELAYD_ROTATE_RENAME, (const void *) msg,
1208 msg_length, 0);
1209 if (ret < 0) {
1210 goto error;
1211 }
1212
1213 /* Receive response */
1214 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1215 if (ret < 0) {
1216 goto error;
1217 }
1218
1219 reply.ret_code = be32toh(reply.ret_code);
1220
1221 /* Return session id or negative ret code. */
1222 if (reply.ret_code != LTTNG_OK) {
1223 ret = -1;
1224 ERR("Relayd rotate rename replied error %d", reply.ret_code);
1225 } else {
1226 /* Success */
1227 ret = 0;
1228 }
1229
1230 DBG("Relayd rotate rename completed successfully");
1231
1232error:
1233 free(msg);
1234 return ret;
1235}
1236
d88744a4
JD
1237int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id)
1238{
1239 int ret;
1240 struct lttcomm_relayd_rotate_pending msg;
1241 struct lttcomm_relayd_rotate_pending_reply reply;
1242
1243 /* Code flow error. Safety net. */
1244 assert(rsock);
1245
1246 DBG("Querying relayd for rotate pending with chunk_id %" PRIu64,
1247 chunk_id);
1248
1249 memset(&msg, 0, sizeof(msg));
1250 msg.chunk_id = htobe64(chunk_id);
1251
1252 /* Send command */
1253 ret = send_command(rsock, RELAYD_ROTATE_PENDING, (void *) &msg,
1254 sizeof(msg), 0);
1255 if (ret < 0) {
1256 goto error;
1257 }
1258
1259 /* Receive response */
1260 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1261 if (ret < 0) {
1262 goto error;
1263 }
1264
1265 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1266
1267 /* Return session id or negative ret code. */
1268 if (reply.generic.ret_code != LTTNG_OK) {
1269 ret = -reply.generic.ret_code;
1270 ERR("Relayd rotate pending replied with error %d", ret);
1271 goto error;
1272 } else {
1273 /* No error, just rotate pending state */
1274 if (reply.is_pending == 0 || reply.is_pending == 1) {
1275 ret = reply.is_pending;
1276 DBG("Relayd rotate pending command completed successfully with result \"%s\"",
1277 ret ? "rotation pending" : "rotation NOT pending");
1278 } else {
1279 ret = -LTTNG_ERR_UNK;
1280 }
1281 }
1282
1283error:
1284 return ret;
1285}
1286
a1ae2ea5
JD
1287int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path)
1288{
1289 int ret;
1290 struct lttcomm_relayd_mkdir *msg;
1291 struct lttcomm_relayd_generic_reply reply;
1292 size_t len;
1293
1294 /* Code flow error. Safety net. */
1295 assert(rsock);
1296
1297 DBG("Relayd mkdir path %s", path);
1298
1299 len = strlen(path) + 1;
1300 msg = zmalloc(sizeof(msg->length) + len);
1301 if (!msg) {
1302 PERROR("Alloc mkdir msg");
1303 ret = -1;
1304 goto error;
1305 }
1306 msg->length = htobe32((uint32_t) len);
1307
1308 if (lttng_strncpy(msg->path, path, len)) {
1309 ret = -1;
1310 goto error;
1311 }
1312
1313 /* Send command */
1314 ret = send_command(rsock, RELAYD_MKDIR, (void *) msg,
1315 sizeof(msg->length) + len, 0);
1316 if (ret < 0) {
1317 goto error;
1318 }
1319
1320 /* Receive response */
1321 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1322 if (ret < 0) {
1323 goto error;
1324 }
1325
1326 reply.ret_code = be32toh(reply.ret_code);
1327
1328 /* Return session id or negative ret code. */
1329 if (reply.ret_code != LTTNG_OK) {
1330 ret = -1;
1331 ERR("Relayd mkdir replied error %d", reply.ret_code);
1332 } else {
1333 /* Success */
1334 ret = 0;
1335 }
1336
1337 DBG("Relayd mkdir completed successfully");
1338
1339error:
1340 free(msg);
1341 return ret;
a1ae2ea5 1342}
This page took 0.101367 seconds and 4 git commands to generate.