Replace explicit rcu_read_lock/unlock with lttng::urcu::read_lock_guard
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
CommitLineData
3bd1e081 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3bd1e081 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
3bd1e081 7 *
3bd1e081
MD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
28ab034a 11#include "kernel-consumer.hpp"
3bd1e081 12
28ab034a 13#include <common/buffer-view.hpp>
c9e313bc 14#include <common/common.hpp>
c9e313bc 15#include <common/compat/endian.hpp>
28ab034a 16#include <common/compat/fcntl.hpp>
c9e313bc 17#include <common/consumer/consumer-stream.hpp>
c9e313bc 18#include <common/consumer/consumer-timer.hpp>
c9e313bc
SM
19#include <common/consumer/consumer.hpp>
20#include <common/consumer/metadata-bucket.hpp>
28ab034a
JG
21#include <common/index/index.hpp>
22#include <common/kernel-ctl/kernel-ctl.hpp>
23#include <common/optional.hpp>
24#include <common/pipe.hpp>
25#include <common/relayd/relayd.hpp>
26#include <common/sessiond-comm/relayd.hpp>
27#include <common/sessiond-comm/sessiond-comm.hpp>
56047f5a 28#include <common/urcu.hpp>
28ab034a 29#include <common/utils.hpp>
0857097f 30
28ab034a
JG
31#include <bin/lttng-consumerd/health-consumerd.hpp>
32#include <inttypes.h>
33#include <poll.h>
34#include <pthread.h>
35#include <stdint.h>
36#include <stdlib.h>
37#include <string.h>
38#include <sys/mman.h>
39#include <sys/socket.h>
40#include <sys/stat.h>
41#include <sys/types.h>
42#include <unistd.h>
3bd1e081 43
fa29bfbf 44extern struct lttng_consumer_global_data the_consumer_data;
3bd1e081 45extern int consumer_poll_timeout;
3bd1e081 46
3bd1e081
MD
47/*
48 * Take a snapshot for a specific fd
49 *
50 * Returns 0 on success, < 0 on error
51 */
ffe60014 52int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081
MD
53{
54 int ret = 0;
55 int infd = stream->wait_fd;
56
57 ret = kernctl_snapshot(infd);
d2d2f190
JD
58 /*
59 * -EAGAIN is not an error, it just means that there is no data to
60 * be read.
61 */
62 if (ret != 0 && ret != -EAGAIN) {
5a510c9f 63 PERROR("Getting sub-buffer snapshot.");
3bd1e081
MD
64 }
65
66 return ret;
67}
68
e9404c27
JG
69/*
70 * Sample consumed and produced positions for a specific fd.
71 *
72 * Returns 0 on success, < 0 on error.
73 */
28ab034a 74int lttng_kconsumer_sample_snapshot_positions(struct lttng_consumer_stream *stream)
e9404c27 75{
a0377dfe 76 LTTNG_ASSERT(stream);
e9404c27
JG
77
78 return kernctl_snapshot_sample_positions(stream->wait_fd);
79}
80
3bd1e081
MD
81/*
82 * Get the produced position
83 *
84 * Returns 0 on success, < 0 on error
85 */
28ab034a 86int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
3bd1e081
MD
87{
88 int ret;
89 int infd = stream->wait_fd;
90
91 ret = kernctl_snapshot_get_produced(infd, pos);
92 if (ret != 0) {
5a510c9f 93 PERROR("kernctl_snapshot_get_produced");
3bd1e081
MD
94 }
95
96 return ret;
97}
98
07b86b52
JD
99/*
100 * Get the consumerd position
101 *
102 * Returns 0 on success, < 0 on error
103 */
28ab034a 104int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos)
07b86b52
JD
105{
106 int ret;
107 int infd = stream->wait_fd;
108
109 ret = kernctl_snapshot_get_consumed(infd, pos);
110 if (ret != 0) {
5a510c9f 111 PERROR("kernctl_snapshot_get_consumed");
07b86b52
JD
112 }
113
114 return ret;
115}
116
28ab034a 117static int get_current_subbuf_addr(struct lttng_consumer_stream *stream, const char **addr)
128708c3
JG
118{
119 int ret;
120 unsigned long mmap_offset;
97535efa 121 const char *mmap_base = (const char *) stream->mmap_base;
128708c3
JG
122
123 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
124 if (ret < 0) {
125 PERROR("Failed to get mmap read offset");
126 goto error;
127 }
128
129 *addr = mmap_base + mmap_offset;
130error:
131 return ret;
132}
133
07b86b52
JD
134/*
135 * Take a snapshot of all the stream of a channel
3eb928aa 136 * RCU read-side lock must be held across this function to ensure existence of
947bd097 137 * channel.
07b86b52
JD
138 *
139 * Returns 0 on success, < 0 on error
140 */
28ab034a
JG
141static int lttng_kconsumer_snapshot_channel(struct lttng_consumer_channel *channel,
142 uint64_t key,
143 char *path,
144 uint64_t relayd_id,
145 uint64_t nb_packets_per_stream)
07b86b52
JD
146{
147 int ret;
07b86b52
JD
148 struct lttng_consumer_stream *stream;
149
6a00837f 150 DBG("Kernel consumer snapshot channel %" PRIu64, key);
07b86b52 151
947bd097
JR
152 /* Prevent channel modifications while we perform the snapshot.*/
153 pthread_mutex_lock(&channel->lock);
154
56047f5a 155 lttng::urcu::read_lock_guard read_lock;
07b86b52 156
07b86b52
JD
157 /* Splice is not supported yet for channel snapshot. */
158 if (channel->output != CONSUMER_CHANNEL_MMAP) {
9381314c 159 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
28ab034a 160 channel->name);
07b86b52
JD
161 ret = -1;
162 goto end;
163 }
164
28ab034a 165 cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
923333cd 166 unsigned long consumed_pos, produced_pos;
9ce5646a
MD
167
168 health_code_update();
169
07b86b52
JD
170 /*
171 * Lock stream because we are about to change its state.
172 */
173 pthread_mutex_lock(&stream->lock);
174
a0377dfe 175 LTTNG_ASSERT(channel->trace_chunk);
d2956687
JG
176 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
177 /*
178 * Can't happen barring an internal error as the channel
179 * holds a reference to the trace chunk.
180 */
181 ERR("Failed to acquire reference to channel's trace chunk");
182 ret = -1;
183 goto end_unlock;
184 }
a0377dfe 185 LTTNG_ASSERT(!stream->trace_chunk);
d2956687
JG
186 stream->trace_chunk = channel->trace_chunk;
187
29decac3
DG
188 /*
189 * Assign the received relayd ID so we can use it for streaming. The streams
190 * are not visible to anyone so this is OK to change it.
191 */
07b86b52
JD
192 stream->net_seq_idx = relayd_id;
193 channel->relayd_id = relayd_id;
194 if (relayd_id != (uint64_t) -1ULL) {
10a50311 195 ret = consumer_send_relayd_stream(stream, path);
07b86b52
JD
196 if (ret < 0) {
197 ERR("sending stream to relayd");
d119bd01 198 goto error_close_stream_output;
07b86b52 199 }
07b86b52 200 } else {
28ab034a 201 ret = consumer_stream_create_output_files(stream, false);
07b86b52 202 if (ret < 0) {
d119bd01 203 goto error_close_stream_output;
07b86b52 204 }
28ab034a 205 DBG("Kernel consumer snapshot stream (%" PRIu64 ")", stream->key);
07b86b52
JD
206 }
207
f22dd891 208 ret = kernctl_buffer_flush_empty(stream->wait_fd);
07b86b52 209 if (ret < 0) {
f22dd891
MD
210 /*
211 * Doing a buffer flush which does not take into
212 * account empty packets. This is not perfect
213 * for stream intersection, but required as a
214 * fall-back when "flush_empty" is not
215 * implemented by lttng-modules.
216 */
217 ret = kernctl_buffer_flush(stream->wait_fd);
218 if (ret < 0) {
219 ERR("Failed to flush kernel stream");
d119bd01 220 goto error_close_stream_output;
f22dd891 221 }
07b86b52
JD
222 goto end_unlock;
223 }
224
225 ret = lttng_kconsumer_take_snapshot(stream);
226 if (ret < 0) {
227 ERR("Taking kernel snapshot");
d119bd01 228 goto error_close_stream_output;
07b86b52
JD
229 }
230
231 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
232 if (ret < 0) {
233 ERR("Produced kernel snapshot position");
d119bd01 234 goto error_close_stream_output;
07b86b52
JD
235 }
236
237 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
238 if (ret < 0) {
239 ERR("Consumerd kernel snapshot position");
d119bd01 240 goto error_close_stream_output;
07b86b52
JD
241 }
242
28ab034a
JG
243 consumed_pos = consumer_get_consume_start_pos(
244 consumed_pos, produced_pos, nb_packets_per_stream, stream->max_sb_size);
5c786ded 245
9377d830 246 while ((long) (consumed_pos - produced_pos) < 0) {
07b86b52
JD
247 ssize_t read_len;
248 unsigned long len, padded_len;
128708c3 249 const char *subbuf_addr;
fd424d99 250 struct lttng_buffer_view subbuf_view;
07b86b52 251
9ce5646a 252 health_code_update();
07b86b52
JD
253 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
254
255 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
256 if (ret < 0) {
32af2c95 257 if (ret != -EAGAIN) {
07b86b52 258 PERROR("kernctl_get_subbuf snapshot");
d119bd01 259 goto error_close_stream_output;
07b86b52
JD
260 }
261 DBG("Kernel consumer get subbuf failed. Skipping it.");
262 consumed_pos += stream->max_sb_size;
ddc93ee4 263 stream->chan->lost_packets++;
07b86b52
JD
264 continue;
265 }
266
267 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
268 if (ret < 0) {
269 ERR("Snapshot kernctl_get_subbuf_size");
29decac3 270 goto error_put_subbuf;
07b86b52
JD
271 }
272
273 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
274 if (ret < 0) {
275 ERR("Snapshot kernctl_get_padded_subbuf_size");
29decac3 276 goto error_put_subbuf;
07b86b52
JD
277 }
278
128708c3
JG
279 ret = get_current_subbuf_addr(stream, &subbuf_addr);
280 if (ret) {
281 goto error_put_subbuf;
282 }
283
28ab034a 284 subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, padded_len);
f5ba75b4 285 read_len = lttng_consumer_on_read_subbuffer_mmap(
28ab034a 286 stream, &subbuf_view, padded_len - len);
07b86b52 287 /*
29decac3
DG
288 * We write the padded len in local tracefiles but the data len
289 * when using a relay. Display the error but continue processing
290 * to try to release the subbuffer.
07b86b52
JD
291 */
292 if (relayd_id != (uint64_t) -1ULL) {
293 if (read_len != len) {
294 ERR("Error sending to the relay (ret: %zd != len: %lu)",
28ab034a
JG
295 read_len,
296 len);
07b86b52
JD
297 }
298 } else {
299 if (read_len != padded_len) {
300 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
28ab034a
JG
301 read_len,
302 padded_len);
07b86b52
JD
303 }
304 }
305
306 ret = kernctl_put_subbuf(stream->wait_fd);
307 if (ret < 0) {
308 ERR("Snapshot kernctl_put_subbuf");
d119bd01 309 goto error_close_stream_output;
07b86b52
JD
310 }
311 consumed_pos += stream->max_sb_size;
312 }
313
d119bd01 314 consumer_stream_close_output(stream);
07b86b52
JD
315 pthread_mutex_unlock(&stream->lock);
316 }
317
318 /* All good! */
319 ret = 0;
320 goto end;
321
29decac3
DG
322error_put_subbuf:
323 ret = kernctl_put_subbuf(stream->wait_fd);
324 if (ret < 0) {
325 ERR("Snapshot kernctl_put_subbuf error path");
326 }
d119bd01
JG
327error_close_stream_output:
328 consumer_stream_close_output(stream);
07b86b52
JD
329end_unlock:
330 pthread_mutex_unlock(&stream->lock);
331end:
947bd097 332 pthread_mutex_unlock(&channel->lock);
07b86b52
JD
333 return ret;
334}
335
336/*
337 * Read the whole metadata available for a snapshot.
3eb928aa 338 * RCU read-side lock must be held across this function to ensure existence of
947bd097 339 * metadata_channel.
07b86b52
JD
340 *
341 * Returns 0 on success, < 0 on error
342 */
28ab034a
JG
343static int lttng_kconsumer_snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
344 uint64_t key,
345 char *path,
346 uint64_t relayd_id,
347 struct lttng_consumer_local_data *ctx)
07b86b52 348{
d771f832
DG
349 int ret, use_relayd = 0;
350 ssize_t ret_read;
07b86b52 351 struct lttng_consumer_stream *metadata_stream;
d771f832 352
a0377dfe 353 LTTNG_ASSERT(ctx);
07b86b52 354
28ab034a 355 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", key, path);
07b86b52 356
56047f5a 357 lttng::urcu::read_lock_guard read_lock;
07b86b52 358
07b86b52 359 metadata_stream = metadata_channel->metadata_stream;
a0377dfe 360 LTTNG_ASSERT(metadata_stream);
d2956687 361
947bd097 362 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
a0377dfe
FD
363 LTTNG_ASSERT(metadata_channel->trace_chunk);
364 LTTNG_ASSERT(metadata_stream->trace_chunk);
07b86b52 365
d771f832 366 /* Flag once that we have a valid relayd for the stream. */
e2039c7a 367 if (relayd_id != (uint64_t) -1ULL) {
d771f832
DG
368 use_relayd = 1;
369 }
370
371 if (use_relayd) {
10a50311 372 ret = consumer_send_relayd_stream(metadata_stream, path);
e2039c7a 373 if (ret < 0) {
fa27abe8 374 goto error_snapshot;
e2039c7a 375 }
e2039c7a 376 } else {
28ab034a 377 ret = consumer_stream_create_output_files(metadata_stream, false);
e2039c7a 378 if (ret < 0) {
fa27abe8 379 goto error_snapshot;
e2039c7a 380 }
07b86b52 381 }
07b86b52 382
d771f832 383 do {
9ce5646a
MD
384 health_code_update();
385
6f9449c2 386 ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
d771f832 387 if (ret_read < 0) {
28ab034a 388 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", ret_read);
6e5e3c51
MD
389 ret = ret_read;
390 goto error_snapshot;
07b86b52 391 }
6e5e3c51 392 } while (ret_read > 0);
07b86b52 393
d771f832
DG
394 if (use_relayd) {
395 close_relayd_stream(metadata_stream);
396 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
397 } else {
fdf9986c
MD
398 if (metadata_stream->out_fd >= 0) {
399 ret = close(metadata_stream->out_fd);
400 if (ret < 0) {
401 PERROR("Kernel consumer snapshot metadata close out_fd");
402 /*
403 * Don't go on error here since the snapshot was successful at this
404 * point but somehow the close failed.
405 */
406 }
407 metadata_stream->out_fd = -1;
d2956687 408 lttng_trace_chunk_put(metadata_stream->trace_chunk);
cd9adb8b 409 metadata_stream->trace_chunk = nullptr;
e2039c7a 410 }
e2039c7a
JD
411 }
412
07b86b52 413 ret = 0;
fa27abe8 414error_snapshot:
947bd097 415 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
cd9adb8b
JG
416 consumer_stream_destroy(metadata_stream, nullptr);
417 metadata_channel->metadata_stream = nullptr;
07b86b52
JD
418 return ret;
419}
420
1803a064
MD
421/*
422 * Receive command from session daemon and process it.
423 *
424 * Return 1 on success else a negative value or 0.
425 */
3bd1e081 426int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
28ab034a
JG
427 int sock,
428 struct pollfd *consumer_sockpoll)
3bd1e081 429{
0c5b3718 430 int ret_func;
0c759fc9 431 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3bd1e081
MD
432 struct lttcomm_consumer_msg msg;
433
9ce5646a
MD
434 health_code_update();
435
0c5b3718
SM
436 {
437 ssize_t ret_recv;
438
439 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
440 if (ret_recv != sizeof(msg)) {
441 if (ret_recv > 0) {
28ab034a 442 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
0c5b3718
SM
443 ret_recv = -1;
444 }
445 return ret_recv;
1803a064 446 }
3bd1e081 447 }
9ce5646a
MD
448
449 health_code_update();
450
84382d49 451 /* Deprecated command */
a0377dfe 452 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
3bd1e081 453
9ce5646a
MD
454 health_code_update();
455
b0b335c8 456 /* relayd needs RCU read-side protection */
56047f5a 457 lttng::urcu::read_lock_guard read_lock;
b0b335c8 458
3bd1e081 459 switch (msg.cmd_type) {
00e2e675
DG
460 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
461 {
4222116f
JR
462 uint32_t major = msg.u.relayd_sock.major;
463 uint32_t minor = msg.u.relayd_sock.minor;
28ab034a
JG
464 enum lttcomm_sock_proto protocol =
465 (enum lttcomm_sock_proto) msg.u.relayd_sock.relayd_socket_protocol;
4222116f 466
f50f23d9 467 /* Session daemon status message are handled in the following call. */
2527bf85 468 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
28ab034a
JG
469 msg.u.relayd_sock.type,
470 ctx,
471 sock,
472 consumer_sockpoll,
473 msg.u.relayd_sock.session_id,
474 msg.u.relayd_sock.relayd_session_id,
475 major,
476 minor,
477 protocol);
00e2e675
DG
478 goto end_nosignal;
479 }
3bd1e081
MD
480 case LTTNG_CONSUMER_ADD_CHANNEL:
481 {
482 struct lttng_consumer_channel *new_channel;
afbf29db 483 int ret_send_status, ret_add_channel = 0;
d2956687 484 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
3bd1e081 485
9ce5646a
MD
486 health_code_update();
487
f50f23d9 488 /* First send a status message before receiving the fds. */
0c5b3718
SM
489 ret_send_status = consumer_send_status_msg(sock, ret_code);
490 if (ret_send_status < 0) {
f50f23d9 491 /* Somehow, the session daemon is not responding anymore. */
1803a064 492 goto error_fatal;
f50f23d9 493 }
9ce5646a
MD
494
495 health_code_update();
496
d88aee68 497 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
cd9adb8b
JG
498 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
499 msg.u.channel.session_id,
500 msg.u.channel.chunk_id.is_set ? &chunk_id :
501 nullptr,
502 msg.u.channel.pathname,
503 msg.u.channel.name,
504 msg.u.channel.relayd_id,
505 msg.u.channel.output,
506 msg.u.channel.tracefile_size,
507 msg.u.channel.tracefile_count,
508 0,
509 msg.u.channel.monitor,
510 msg.u.channel.live_timer_interval,
511 msg.u.channel.is_live,
512 nullptr,
513 nullptr);
514 if (new_channel == nullptr) {
f73fabfd 515 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
3bd1e081
MD
516 goto end_nosignal;
517 }
ffe60014 518 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
95a1109b
JD
519 switch (msg.u.channel.output) {
520 case LTTNG_EVENT_SPLICE:
521 new_channel->output = CONSUMER_CHANNEL_SPLICE;
522 break;
523 case LTTNG_EVENT_MMAP:
524 new_channel->output = CONSUMER_CHANNEL_MMAP;
525 break;
526 default:
527 ERR("Channel output unknown %d", msg.u.channel.output);
528 goto end_nosignal;
529 }
ffe60014
DG
530
531 /* Translate and save channel type. */
532 switch (msg.u.channel.type) {
533 case CONSUMER_CHANNEL_TYPE_DATA:
534 case CONSUMER_CHANNEL_TYPE_METADATA:
97535efa 535 new_channel->type = (consumer_channel_type) msg.u.channel.type;
ffe60014
DG
536 break;
537 default:
a0377dfe 538 abort();
ffe60014
DG
539 goto end_nosignal;
540 };
541
9ce5646a
MD
542 health_code_update();
543
cd9adb8b 544 if (ctx->on_recv_channel != nullptr) {
28ab034a 545 int ret_recv_channel = ctx->on_recv_channel(new_channel);
0c5b3718 546 if (ret_recv_channel == 0) {
28ab034a 547 ret_add_channel = consumer_add_channel(new_channel, ctx);
0c5b3718 548 } else if (ret_recv_channel < 0) {
3bd1e081
MD
549 goto end_nosignal;
550 }
551 } else {
28ab034a 552 ret_add_channel = consumer_add_channel(new_channel, ctx);
3bd1e081 553 }
28ab034a 554 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret_add_channel) {
e9404c27
JG
555 int monitor_start_ret;
556
557 DBG("Consumer starting monitor timer");
28ab034a 558 consumer_timer_live_start(new_channel, msg.u.channel.live_timer_interval);
e9404c27 559 monitor_start_ret = consumer_timer_monitor_start(
28ab034a 560 new_channel, msg.u.channel.monitor_timer_interval);
e9404c27
JG
561 if (monitor_start_ret < 0) {
562 ERR("Starting channel monitoring timer failed");
563 goto end_nosignal;
564 }
94d49140 565 }
e43c41c5 566
9ce5646a
MD
567 health_code_update();
568
e43c41c5 569 /* If we received an error in add_channel, we need to report it. */
0c5b3718 570 if (ret_add_channel < 0) {
28ab034a 571 ret_send_status = consumer_send_status_msg(sock, ret_add_channel);
0c5b3718 572 if (ret_send_status < 0) {
1803a064
MD
573 goto error_fatal;
574 }
e43c41c5
JD
575 goto end_nosignal;
576 }
577
3bd1e081
MD
578 goto end_nosignal;
579 }
580 case LTTNG_CONSUMER_ADD_STREAM:
581 {
dae10966
DG
582 int fd;
583 struct lttng_pipe *stream_pipe;
00e2e675 584 struct lttng_consumer_stream *new_stream;
ffe60014 585 struct lttng_consumer_channel *channel;
c80048c6 586 int alloc_ret = 0;
0c5b3718
SM
587 int ret_send_status, ret_poll, ret_get_max_subbuf_size;
588 ssize_t ret_pipe_write, ret_recv;
3bd1e081 589
ffe60014
DG
590 /*
591 * Get stream's channel reference. Needed when adding the stream to the
592 * global hash table.
593 */
594 channel = consumer_find_channel(msg.u.stream.channel_key);
595 if (!channel) {
596 /*
597 * We could not find the channel. Can happen if cpu hotplug
598 * happens while tearing down.
599 */
d88aee68 600 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
e462382a 601 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
ffe60014
DG
602 }
603
9ce5646a
MD
604 health_code_update();
605
f50f23d9 606 /* First send a status message before receiving the fds. */
0c5b3718
SM
607 ret_send_status = consumer_send_status_msg(sock, ret_code);
608 if (ret_send_status < 0) {
d771f832 609 /* Somehow, the session daemon is not responding anymore. */
c5c7998f 610 goto error_add_stream_fatal;
1803a064 611 }
9ce5646a
MD
612
613 health_code_update();
614
0c759fc9 615 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
d771f832 616 /* Channel was not found. */
c5c7998f 617 goto error_add_stream_nosignal;
f50f23d9
DG
618 }
619
d771f832 620 /* Blocking call */
9ce5646a 621 health_poll_entry();
0c5b3718 622 ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
9ce5646a 623 health_poll_exit();
0c5b3718 624 if (ret_poll) {
c5c7998f 625 goto error_add_stream_fatal;
3bd1e081 626 }
00e2e675 627
9ce5646a
MD
628 health_code_update();
629
00e2e675 630 /* Get stream file descriptor from socket */
0c5b3718
SM
631 ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
632 if (ret_recv != sizeof(fd)) {
f73fabfd 633 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
0c5b3718 634 ret_func = ret_recv;
c5c7998f 635 goto end;
3bd1e081 636 }
3bd1e081 637
9ce5646a
MD
638 health_code_update();
639
f50f23d9
DG
640 /*
641 * Send status code to session daemon only if the recv works. If the
642 * above recv() failed, the session daemon is notified through the
643 * error socket and the teardown is eventually done.
644 */
0c5b3718
SM
645 ret_send_status = consumer_send_status_msg(sock, ret_code);
646 if (ret_send_status < 0) {
f50f23d9 647 /* Somehow, the session daemon is not responding anymore. */
c5c7998f 648 goto error_add_stream_nosignal;
f50f23d9
DG
649 }
650
9ce5646a
MD
651 health_code_update();
652
d2956687 653 pthread_mutex_lock(&channel->lock);
28ab034a
JG
654 new_stream = consumer_stream_create(channel,
655 channel->key,
656 fd,
657 channel->name,
658 channel->relayd_id,
659 channel->session_id,
660 channel->trace_chunk,
661 msg.u.stream.cpu,
662 &alloc_ret,
663 channel->type,
664 channel->monitor);
cd9adb8b 665 if (new_stream == nullptr) {
c80048c6
MD
666 switch (alloc_ret) {
667 case -ENOMEM:
668 case -EINVAL:
669 default:
670 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
671 break;
c80048c6 672 }
d2956687 673 pthread_mutex_unlock(&channel->lock);
c5c7998f 674 goto error_add_stream_nosignal;
3bd1e081 675 }
d771f832 676
ffe60014 677 new_stream->wait_fd = fd;
28ab034a
JG
678 ret_get_max_subbuf_size =
679 kernctl_get_max_subbuf_size(new_stream->wait_fd, &new_stream->max_sb_size);
0c5b3718 680 if (ret_get_max_subbuf_size < 0) {
d05185fa
JG
681 pthread_mutex_unlock(&channel->lock);
682 ERR("Failed to get kernel maximal subbuffer size");
c5c7998f 683 goto error_add_stream_nosignal;
d05185fa
JG
684 }
685
28ab034a 686 consumer_stream_update_channel_attributes(new_stream, channel);
00e2e675 687
a0c83db9
DG
688 /*
689 * We've just assigned the channel to the stream so increment the
07b86b52
JD
690 * refcount right now. We don't need to increment the refcount for
691 * streams in no monitor because we handle manually the cleanup of
692 * those. It is very important to make sure there is NO prior
693 * consumer_del_stream() calls or else the refcount will be unbalanced.
a0c83db9 694 */
07b86b52
JD
695 if (channel->monitor) {
696 uatomic_inc(&new_stream->chan->refcount);
697 }
9d9353f9 698
fb3a43a9
DG
699 /*
700 * The buffer flush is done on the session daemon side for the kernel
701 * so no need for the stream "hangup_flush_done" variable to be
702 * tracked. This is important for a kernel stream since we don't rely
703 * on the flush state of the stream to read data. It's not the case for
704 * user space tracing.
705 */
706 new_stream->hangup_flush_done = 0;
707
9ce5646a
MD
708 health_code_update();
709
d2956687 710 pthread_mutex_lock(&new_stream->lock);
633d0084 711 if (ctx->on_recv_stream) {
0c5b3718
SM
712 int ret_recv_stream = ctx->on_recv_stream(new_stream);
713 if (ret_recv_stream < 0) {
d2956687
JG
714 pthread_mutex_unlock(&new_stream->lock);
715 pthread_mutex_unlock(&channel->lock);
d771f832 716 consumer_stream_free(new_stream);
c5c7998f 717 goto error_add_stream_nosignal;
fb3a43a9 718 }
633d0084 719 }
9ce5646a
MD
720 health_code_update();
721
07b86b52
JD
722 if (new_stream->metadata_flag) {
723 channel->metadata_stream = new_stream;
724 }
725
2bba9e53
DG
726 /* Do not monitor this stream. */
727 if (!channel->monitor) {
5eecee74 728 DBG("Kernel consumer add stream %s in no monitor mode with "
28ab034a
JG
729 "relayd id %" PRIu64,
730 new_stream->name,
731 new_stream->net_seq_idx);
10a50311 732 cds_list_add(&new_stream->send_node, &channel->streams.head);
d2956687
JG
733 pthread_mutex_unlock(&new_stream->lock);
734 pthread_mutex_unlock(&channel->lock);
c5c7998f 735 goto end_add_stream;
6dc3064a
DG
736 }
737
e1b71bdc
DG
738 /* Send stream to relayd if the stream has an ID. */
739 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
0c5b3718
SM
740 int ret_send_relayd_stream;
741
28ab034a
JG
742 ret_send_relayd_stream =
743 consumer_send_relayd_stream(new_stream, new_stream->chan->pathname);
0c5b3718 744 if (ret_send_relayd_stream < 0) {
d2956687
JG
745 pthread_mutex_unlock(&new_stream->lock);
746 pthread_mutex_unlock(&channel->lock);
e1b71bdc 747 consumer_stream_free(new_stream);
c5c7998f 748 goto error_add_stream_nosignal;
e1b71bdc 749 }
001b7e62
MD
750
751 /*
752 * If adding an extra stream to an already
753 * existing channel (e.g. cpu hotplug), we need
754 * to send the "streams_sent" command to relayd.
755 */
756 if (channel->streams_sent_to_relayd) {
0c5b3718
SM
757 int ret_send_relayd_streams_sent;
758
759 ret_send_relayd_streams_sent =
28ab034a 760 consumer_send_relayd_streams_sent(new_stream->net_seq_idx);
0c5b3718 761 if (ret_send_relayd_streams_sent < 0) {
d2956687
JG
762 pthread_mutex_unlock(&new_stream->lock);
763 pthread_mutex_unlock(&channel->lock);
c5c7998f 764 goto error_add_stream_nosignal;
001b7e62
MD
765 }
766 }
e2039c7a 767 }
d2956687
JG
768 pthread_mutex_unlock(&new_stream->lock);
769 pthread_mutex_unlock(&channel->lock);
e2039c7a 770
50f8ae69 771 /* Get the right pipe where the stream will be sent. */
633d0084 772 if (new_stream->metadata_flag) {
66d583dc 773 consumer_add_metadata_stream(new_stream);
dae10966 774 stream_pipe = ctx->consumer_metadata_pipe;
3bd1e081 775 } else {
66d583dc 776 consumer_add_data_stream(new_stream);
dae10966 777 stream_pipe = ctx->consumer_data_pipe;
50f8ae69
DG
778 }
779
66d583dc 780 /* Visible to other threads */
5ab66908
MD
781 new_stream->globally_visible = 1;
782
9ce5646a
MD
783 health_code_update();
784
5c7248cd
JG
785 ret_pipe_write =
786 lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); /* NOLINT
787 sizeof
788 used on a
789 pointer.
790 */
0c5b3718 791 if (ret_pipe_write < 0) {
dae10966 792 ERR("Consumer write %s stream to pipe %d",
28ab034a
JG
793 new_stream->metadata_flag ? "metadata" : "data",
794 lttng_pipe_get_writefd(stream_pipe));
5ab66908
MD
795 if (new_stream->metadata_flag) {
796 consumer_del_stream_for_metadata(new_stream);
797 } else {
798 consumer_del_stream_for_data(new_stream);
799 }
c5c7998f 800 goto error_add_stream_nosignal;
3bd1e081 801 }
00e2e675 802
02d02e31 803 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
28ab034a
JG
804 new_stream->name,
805 fd,
806 new_stream->chan->pathname,
807 new_stream->relayd_stream_id);
808 end_add_stream:
3bd1e081 809 break;
28ab034a 810 error_add_stream_nosignal:
c5c7998f 811 goto end_nosignal;
28ab034a 812 error_add_stream_fatal:
c5c7998f 813 goto error_fatal;
3bd1e081 814 }
a4baae1b
JD
815 case LTTNG_CONSUMER_STREAMS_SENT:
816 {
817 struct lttng_consumer_channel *channel;
0c5b3718 818 int ret_send_status;
a4baae1b
JD
819
820 /*
821 * Get stream's channel reference. Needed when adding the stream to the
822 * global hash table.
823 */
824 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
825 if (!channel) {
826 /*
827 * We could not find the channel. Can happen if cpu hotplug
828 * happens while tearing down.
829 */
28ab034a 830 ERR("Unable to find channel key %" PRIu64, msg.u.sent_streams.channel_key);
e462382a 831 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
a4baae1b
JD
832 }
833
834 health_code_update();
835
836 /*
837 * Send status code to session daemon.
838 */
0c5b3718 839 ret_send_status = consumer_send_status_msg(sock, ret_code);
28ab034a 840 if (ret_send_status < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
a4baae1b 841 /* Somehow, the session daemon is not responding anymore. */
80d5a658 842 goto error_streams_sent_nosignal;
a4baae1b
JD
843 }
844
845 health_code_update();
846
847 /*
848 * We should not send this message if we don't monitor the
849 * streams in this channel.
850 */
851 if (!channel->monitor) {
80d5a658 852 goto end_error_streams_sent;
a4baae1b
JD
853 }
854
855 health_code_update();
856 /* Send stream to relayd if the stream has an ID. */
857 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
0c5b3718
SM
858 int ret_send_relay_streams;
859
28ab034a
JG
860 ret_send_relay_streams =
861 consumer_send_relayd_streams_sent(msg.u.sent_streams.net_seq_idx);
0c5b3718 862 if (ret_send_relay_streams < 0) {
80d5a658 863 goto error_streams_sent_nosignal;
a4baae1b 864 }
001b7e62 865 channel->streams_sent_to_relayd = true;
a4baae1b 866 }
28ab034a 867 end_error_streams_sent:
a4baae1b 868 break;
28ab034a 869 error_streams_sent_nosignal:
80d5a658 870 goto end_nosignal;
a4baae1b 871 }
3bd1e081
MD
872 case LTTNG_CONSUMER_UPDATE_STREAM:
873 {
3f8e211f
DG
874 return -ENOSYS;
875 }
876 case LTTNG_CONSUMER_DESTROY_RELAYD:
877 {
a6ba4fe1 878 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
3f8e211f 879 struct consumer_relayd_sock_pair *relayd;
0c5b3718 880 int ret_send_status;
3f8e211f 881
a6ba4fe1 882 DBG("Kernel consumer destroying relayd %" PRIu64, index);
3f8e211f
DG
883
884 /* Get relayd reference if exists. */
a6ba4fe1 885 relayd = consumer_find_relayd(index);
cd9adb8b 886 if (relayd == nullptr) {
3448e266 887 DBG("Unable to find relayd %" PRIu64, index);
e462382a 888 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
3bd1e081 889 }
3f8e211f 890
a6ba4fe1
DG
891 /*
892 * Each relayd socket pair has a refcount of stream attached to it
893 * which tells if the relayd is still active or not depending on the
894 * refcount value.
895 *
896 * This will set the destroy flag of the relayd object and destroy it
897 * if the refcount reaches zero when called.
898 *
899 * The destroy can happen either here or when a stream fd hangs up.
900 */
f50f23d9
DG
901 if (relayd) {
902 consumer_flag_relayd_for_destroy(relayd);
903 }
904
9ce5646a
MD
905 health_code_update();
906
0c5b3718
SM
907 ret_send_status = consumer_send_status_msg(sock, ret_code);
908 if (ret_send_status < 0) {
f50f23d9 909 /* Somehow, the session daemon is not responding anymore. */
1803a064 910 goto error_fatal;
f50f23d9 911 }
3f8e211f 912
3f8e211f 913 goto end_nosignal;
3bd1e081 914 }
6d805429 915 case LTTNG_CONSUMER_DATA_PENDING:
53632229 916 {
0c5b3718 917 int32_t ret_data_pending;
6d805429 918 uint64_t id = msg.u.data_pending.session_id;
0c5b3718 919 ssize_t ret_send;
c8f59ee5 920
6d805429 921 DBG("Kernel consumer data pending command for id %" PRIu64, id);
c8f59ee5 922
0c5b3718 923 ret_data_pending = consumer_data_pending(id);
c8f59ee5 924
9ce5646a
MD
925 health_code_update();
926
c8f59ee5 927 /* Send back returned value to session daemon */
28ab034a
JG
928 ret_send =
929 lttcomm_send_unix_sock(sock, &ret_data_pending, sizeof(ret_data_pending));
0c5b3718 930 if (ret_send < 0) {
6d805429 931 PERROR("send data pending ret code");
1803a064 932 goto error_fatal;
c8f59ee5 933 }
f50f23d9
DG
934
935 /*
936 * No need to send back a status message since the data pending
937 * returned value is the response.
938 */
c8f59ee5 939 break;
53632229 940 }
6dc3064a
DG
941 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
942 {
3eb928aa
MD
943 struct lttng_consumer_channel *channel;
944 uint64_t key = msg.u.snapshot_channel.key;
0c5b3718 945 int ret_send_status;
3eb928aa
MD
946
947 channel = consumer_find_channel(key);
948 if (!channel) {
949 ERR("Channel %" PRIu64 " not found", key);
950 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
07b86b52 951 } else {
3eb928aa 952 if (msg.u.snapshot_channel.metadata == 1) {
0c5b3718
SM
953 int ret_snapshot;
954
955 ret_snapshot = lttng_kconsumer_snapshot_metadata(
28ab034a
JG
956 channel,
957 key,
958 msg.u.snapshot_channel.pathname,
959 msg.u.snapshot_channel.relayd_id,
960 ctx);
0c5b3718 961 if (ret_snapshot < 0) {
3eb928aa
MD
962 ERR("Snapshot metadata failed");
963 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
964 }
965 } else {
0c5b3718
SM
966 int ret_snapshot;
967
968 ret_snapshot = lttng_kconsumer_snapshot_channel(
28ab034a
JG
969 channel,
970 key,
971 msg.u.snapshot_channel.pathname,
972 msg.u.snapshot_channel.relayd_id,
973 msg.u.snapshot_channel.nb_packets_per_stream);
0c5b3718 974 if (ret_snapshot < 0) {
3eb928aa
MD
975 ERR("Snapshot channel failed");
976 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
977 }
07b86b52
JD
978 }
979 }
9ce5646a
MD
980 health_code_update();
981
0c5b3718
SM
982 ret_send_status = consumer_send_status_msg(sock, ret_code);
983 if (ret_send_status < 0) {
6dc3064a
DG
984 /* Somehow, the session daemon is not responding anymore. */
985 goto end_nosignal;
986 }
987 break;
988 }
07b86b52
JD
989 case LTTNG_CONSUMER_DESTROY_CHANNEL:
990 {
991 uint64_t key = msg.u.destroy_channel.key;
992 struct lttng_consumer_channel *channel;
0c5b3718 993 int ret_send_status;
07b86b52
JD
994
995 channel = consumer_find_channel(key);
996 if (!channel) {
997 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
e462382a 998 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
07b86b52
JD
999 }
1000
9ce5646a
MD
1001 health_code_update();
1002
0c5b3718
SM
1003 ret_send_status = consumer_send_status_msg(sock, ret_code);
1004 if (ret_send_status < 0) {
07b86b52 1005 /* Somehow, the session daemon is not responding anymore. */
a9d36096 1006 goto end_destroy_channel;
07b86b52
JD
1007 }
1008
9ce5646a
MD
1009 health_code_update();
1010
15dc512a
DG
1011 /* Stop right now if no channel was found. */
1012 if (!channel) {
a9d36096 1013 goto end_destroy_channel;
15dc512a
DG
1014 }
1015
07b86b52
JD
1016 /*
1017 * This command should ONLY be issued for channel with streams set in
1018 * no monitor mode.
1019 */
a0377dfe 1020 LTTNG_ASSERT(!channel->monitor);
07b86b52
JD
1021
1022 /*
1023 * The refcount should ALWAYS be 0 in the case of a channel in no
1024 * monitor mode.
1025 */
a0377dfe 1026 LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1));
07b86b52
JD
1027
1028 consumer_del_channel(channel);
28ab034a 1029 end_destroy_channel:
07b86b52
JD
1030 goto end_nosignal;
1031 }
fb83fe64
JD
1032 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1033 {
66ab32be
JD
1034 ssize_t ret;
1035 uint64_t count;
fb83fe64
JD
1036 struct lttng_consumer_channel *channel;
1037 uint64_t id = msg.u.discarded_events.session_id;
1038 uint64_t key = msg.u.discarded_events.channel_key;
1039
28ab034a
JG
1040 DBG("Kernel consumer discarded events command for session id %" PRIu64
1041 ", channel key %" PRIu64,
1042 id,
1043 key);
e5742757 1044
fb83fe64
JD
1045 channel = consumer_find_channel(key);
1046 if (!channel) {
28ab034a 1047 ERR("Kernel consumer discarded events channel %" PRIu64 " not found", key);
66ab32be 1048 count = 0;
e5742757 1049 } else {
66ab32be 1050 count = channel->discarded_events;
fb83fe64
JD
1051 }
1052
fb83fe64
JD
1053 health_code_update();
1054
1055 /* Send back returned value to session daemon */
66ab32be 1056 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
fb83fe64
JD
1057 if (ret < 0) {
1058 PERROR("send discarded events");
1059 goto error_fatal;
1060 }
1061
1062 break;
1063 }
1064 case LTTNG_CONSUMER_LOST_PACKETS:
1065 {
66ab32be
JD
1066 ssize_t ret;
1067 uint64_t count;
fb83fe64
JD
1068 struct lttng_consumer_channel *channel;
1069 uint64_t id = msg.u.lost_packets.session_id;
1070 uint64_t key = msg.u.lost_packets.channel_key;
1071
28ab034a
JG
1072 DBG("Kernel consumer lost packets command for session id %" PRIu64
1073 ", channel key %" PRIu64,
1074 id,
1075 key);
e5742757 1076
fb83fe64
JD
1077 channel = consumer_find_channel(key);
1078 if (!channel) {
28ab034a 1079 ERR("Kernel consumer lost packets channel %" PRIu64 " not found", key);
66ab32be 1080 count = 0;
e5742757 1081 } else {
66ab32be 1082 count = channel->lost_packets;
fb83fe64
JD
1083 }
1084
fb83fe64
JD
1085 health_code_update();
1086
1087 /* Send back returned value to session daemon */
66ab32be 1088 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
fb83fe64
JD
1089 if (ret < 0) {
1090 PERROR("send lost packets");
1091 goto error_fatal;
1092 }
1093
1094 break;
1095 }
b3530820
JG
1096 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1097 {
1098 int channel_monitor_pipe;
0c5b3718
SM
1099 int ret_send_status, ret_set_channel_monitor_pipe;
1100 ssize_t ret_recv;
b3530820
JG
1101
1102 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1103 /* Successfully received the command's type. */
0c5b3718
SM
1104 ret_send_status = consumer_send_status_msg(sock, ret_code);
1105 if (ret_send_status < 0) {
b3530820
JG
1106 goto error_fatal;
1107 }
1108
28ab034a 1109 ret_recv = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe, 1);
0c5b3718 1110 if (ret_recv != sizeof(channel_monitor_pipe)) {
b3530820
JG
1111 ERR("Failed to receive channel monitor pipe");
1112 goto error_fatal;
1113 }
1114
1115 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
0c5b3718 1116 ret_set_channel_monitor_pipe =
28ab034a 1117 consumer_timer_thread_set_channel_monitor_pipe(channel_monitor_pipe);
0c5b3718 1118 if (!ret_set_channel_monitor_pipe) {
b3530820 1119 int flags;
0c5b3718 1120 int ret_fcntl;
b3530820
JG
1121
1122 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1123 /* Set the pipe as non-blocking. */
0c5b3718
SM
1124 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
1125 if (ret_fcntl == -1) {
b3530820
JG
1126 PERROR("fcntl get flags of the channel monitoring pipe");
1127 goto error_fatal;
1128 }
0c5b3718 1129 flags = ret_fcntl;
b3530820 1130
28ab034a 1131 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL, flags | O_NONBLOCK);
0c5b3718 1132 if (ret_fcntl == -1) {
b3530820
JG
1133 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1134 goto error_fatal;
1135 }
1136 DBG("Channel monitor pipe set as non-blocking");
1137 } else {
1138 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1139 }
0c5b3718
SM
1140 ret_send_status = consumer_send_status_msg(sock, ret_code);
1141 if (ret_send_status < 0) {
b3530820
JG
1142 goto error_fatal;
1143 }
1144 break;
1145 }
b99a8d42
JD
1146 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1147 {
92b7a7f8
MD
1148 struct lttng_consumer_channel *channel;
1149 uint64_t key = msg.u.rotate_channel.key;
0c5b3718 1150 int ret_send_status;
b99a8d42 1151
92b7a7f8 1152 DBG("Consumer rotate channel %" PRIu64, key);
b99a8d42 1153
92b7a7f8
MD
1154 channel = consumer_find_channel(key);
1155 if (!channel) {
1156 ERR("Channel %" PRIu64 " not found", key);
1157 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1158 } else {
1159 /*
1160 * Sample the rotate position of all the streams in this channel.
1161 */
0c5b3718
SM
1162 int ret_rotate_channel;
1163
1164 ret_rotate_channel = lttng_consumer_rotate_channel(
28ab034a 1165 channel, key, msg.u.rotate_channel.relayd_id);
0c5b3718 1166 if (ret_rotate_channel < 0) {
92b7a7f8
MD
1167 ERR("Rotate channel failed");
1168 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1169 }
b99a8d42 1170
92b7a7f8
MD
1171 health_code_update();
1172 }
0c5b3718
SM
1173
1174 ret_send_status = consumer_send_status_msg(sock, ret_code);
1175 if (ret_send_status < 0) {
b99a8d42 1176 /* Somehow, the session daemon is not responding anymore. */
713bdd26 1177 goto error_rotate_channel;
b99a8d42 1178 }
92b7a7f8
MD
1179 if (channel) {
1180 /* Rotate the streams that are ready right now. */
0c5b3718
SM
1181 int ret_rotate;
1182
28ab034a 1183 ret_rotate = lttng_consumer_rotate_ready_streams(channel, key);
0c5b3718 1184 if (ret_rotate < 0) {
92b7a7f8
MD
1185 ERR("Rotate ready streams failed");
1186 }
b99a8d42 1187 }
b99a8d42 1188 break;
28ab034a 1189 error_rotate_channel:
713bdd26 1190 goto end_nosignal;
b99a8d42 1191 }
5f3aff8b
MD
1192 case LTTNG_CONSUMER_CLEAR_CHANNEL:
1193 {
1194 struct lttng_consumer_channel *channel;
1195 uint64_t key = msg.u.clear_channel.key;
0c5b3718 1196 int ret_send_status;
5f3aff8b
MD
1197
1198 channel = consumer_find_channel(key);
1199 if (!channel) {
1200 DBG("Channel %" PRIu64 " not found", key);
1201 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1202 } else {
0c5b3718
SM
1203 int ret_clear_channel;
1204
28ab034a 1205 ret_clear_channel = lttng_consumer_clear_channel(channel);
0c5b3718 1206 if (ret_clear_channel) {
5f3aff8b 1207 ERR("Clear channel failed");
97535efa 1208 ret_code = (lttcomm_return_code) ret_clear_channel;
5f3aff8b
MD
1209 }
1210
1211 health_code_update();
1212 }
0c5b3718
SM
1213
1214 ret_send_status = consumer_send_status_msg(sock, ret_code);
1215 if (ret_send_status < 0) {
5f3aff8b
MD
1216 /* Somehow, the session daemon is not responding anymore. */
1217 goto end_nosignal;
1218 }
1219
1220 break;
1221 }
d2956687 1222 case LTTNG_CONSUMER_INIT:
00fb02ac 1223 {
0c5b3718 1224 int ret_send_status;
328c2fe7
JG
1225 lttng_uuid sessiond_uuid;
1226
28ab034a
JG
1227 std::copy(std::begin(msg.u.init.sessiond_uuid),
1228 std::end(msg.u.init.sessiond_uuid),
1229 sessiond_uuid.begin());
0c5b3718 1230
28ab034a 1231 ret_code = lttng_consumer_init_command(ctx, sessiond_uuid);
00fb02ac 1232 health_code_update();
0c5b3718
SM
1233 ret_send_status = consumer_send_status_msg(sock, ret_code);
1234 if (ret_send_status < 0) {
00fb02ac
JD
1235 /* Somehow, the session daemon is not responding anymore. */
1236 goto end_nosignal;
1237 }
1238 break;
1239 }
d2956687 1240 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
d88744a4 1241 {
d2956687 1242 const struct lttng_credentials credentials = {
28ab034a
JG
1243 .uid = LTTNG_OPTIONAL_INIT_VALUE(
1244 msg.u.create_trace_chunk.credentials.value.uid),
1245 .gid = LTTNG_OPTIONAL_INIT_VALUE(
1246 msg.u.create_trace_chunk.credentials.value.gid),
d2956687 1247 };
28ab034a
JG
1248 const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set;
1249 const uint64_t relayd_id = msg.u.create_trace_chunk.relayd_id.value;
1250 const char *chunk_override_name = *msg.u.create_trace_chunk.override_name ?
1251 msg.u.create_trace_chunk.override_name :
cd9adb8b
JG
1252 nullptr;
1253 struct lttng_directory_handle *chunk_directory_handle = nullptr;
d88744a4 1254
d2956687
JG
1255 /*
1256 * The session daemon will only provide a chunk directory file
1257 * descriptor for local traces.
1258 */
1259 if (is_local_trace) {
1260 int chunk_dirfd;
0c5b3718
SM
1261 int ret_send_status;
1262 ssize_t ret_recv;
19990ed5 1263
d2956687 1264 /* Acnowledge the reception of the command. */
28ab034a 1265 ret_send_status = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
0c5b3718 1266 if (ret_send_status < 0) {
d2956687
JG
1267 /* Somehow, the session daemon is not responding anymore. */
1268 goto end_nosignal;
1269 }
92816cc3 1270
28ab034a 1271 ret_recv = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
0c5b3718 1272 if (ret_recv != sizeof(chunk_dirfd)) {
d2956687
JG
1273 ERR("Failed to receive trace chunk directory file descriptor");
1274 goto error_fatal;
1275 }
92816cc3 1276
28ab034a
JG
1277 DBG("Received trace chunk directory fd (%d)", chunk_dirfd);
1278 chunk_directory_handle =
1279 lttng_directory_handle_create_from_dirfd(chunk_dirfd);
cbf53d23 1280 if (!chunk_directory_handle) {
d2956687
JG
1281 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1282 if (close(chunk_dirfd)) {
1283 PERROR("Failed to close chunk directory file descriptor");
1284 }
1285 goto error_fatal;
1286 }
92816cc3
JG
1287 }
1288
d2956687 1289 ret_code = lttng_consumer_create_trace_chunk(
cd9adb8b 1290 !is_local_trace ? &relayd_id : nullptr,
28ab034a
JG
1291 msg.u.create_trace_chunk.session_id,
1292 msg.u.create_trace_chunk.chunk_id,
1293 (time_t) msg.u.create_trace_chunk.creation_timestamp,
1294 chunk_override_name,
cd9adb8b 1295 msg.u.create_trace_chunk.credentials.is_set ? &credentials : nullptr,
28ab034a 1296 chunk_directory_handle);
cbf53d23 1297 lttng_directory_handle_put(chunk_directory_handle);
d2956687 1298 goto end_msg_sessiond;
d88744a4 1299 }
d2956687 1300 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
a1ae2ea5 1301 {
bbc4768c 1302 enum lttng_trace_chunk_command_type close_command =
28ab034a
JG
1303 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
1304 const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value;
ecd1a12f
MD
1305 struct lttcomm_consumer_close_trace_chunk_reply reply;
1306 char path[LTTNG_PATH_MAX];
0c5b3718 1307 ssize_t ret_send;
d2956687
JG
1308
1309 ret_code = lttng_consumer_close_trace_chunk(
cd9adb8b 1310 msg.u.close_trace_chunk.relayd_id.is_set ? &relayd_id : nullptr,
28ab034a
JG
1311 msg.u.close_trace_chunk.session_id,
1312 msg.u.close_trace_chunk.chunk_id,
1313 (time_t) msg.u.close_trace_chunk.close_timestamp,
cd9adb8b 1314 msg.u.close_trace_chunk.close_command.is_set ? &close_command : nullptr,
28ab034a 1315 path);
ecd1a12f
MD
1316 reply.ret_code = ret_code;
1317 reply.path_length = strlen(path) + 1;
0c5b3718
SM
1318 ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1319 if (ret_send != sizeof(reply)) {
ecd1a12f
MD
1320 goto error_fatal;
1321 }
28ab034a 1322 ret_send = lttcomm_send_unix_sock(sock, path, reply.path_length);
0c5b3718 1323 if (ret_send != reply.path_length) {
ecd1a12f
MD
1324 goto error_fatal;
1325 }
1326 goto end_nosignal;
3654ed19 1327 }
d2956687 1328 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
3654ed19 1329 {
28ab034a 1330 const uint64_t relayd_id = msg.u.trace_chunk_exists.relayd_id.value;
d2956687
JG
1331
1332 ret_code = lttng_consumer_trace_chunk_exists(
cd9adb8b 1333 msg.u.trace_chunk_exists.relayd_id.is_set ? &relayd_id : nullptr,
28ab034a
JG
1334 msg.u.trace_chunk_exists.session_id,
1335 msg.u.trace_chunk_exists.chunk_id);
d2956687 1336 goto end_msg_sessiond;
a1ae2ea5 1337 }
04ed9e10
JG
1338 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
1339 {
1340 const uint64_t key = msg.u.open_channel_packets.key;
28ab034a 1341 struct lttng_consumer_channel *channel = consumer_find_channel(key);
04ed9e10
JG
1342
1343 if (channel) {
1344 pthread_mutex_lock(&channel->lock);
1345 ret_code = lttng_consumer_open_channel_packets(channel);
1346 pthread_mutex_unlock(&channel->lock);
1347 } else {
1348 WARN("Channel %" PRIu64 " not found", key);
1349 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1350 }
1351
1352 health_code_update();
1353 goto end_msg_sessiond;
1354 }
3bd1e081 1355 default:
3f8e211f 1356 goto end_nosignal;
3bd1e081 1357 }
3f8e211f 1358
3bd1e081 1359end_nosignal:
4cbc1a04
DG
1360 /*
1361 * Return 1 to indicate success since the 0 value can be a socket
1362 * shutdown during the recv() or send() call.
1363 */
0c5b3718 1364 ret_func = 1;
c5c7998f
JG
1365 goto end;
1366error_fatal:
1367 /* This will issue a consumer stop. */
0c5b3718 1368 ret_func = -1;
c5c7998f 1369 goto end;
d2956687
JG
1370end_msg_sessiond:
1371 /*
1372 * The returned value here is not useful since either way we'll return 1 to
1373 * the caller because the session daemon socket management is done
1374 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1375 */
0c5b3718
SM
1376 {
1377 int ret_send_status;
1378
1379 ret_send_status = consumer_send_status_msg(sock, ret_code);
1380 if (ret_send_status < 0) {
1381 goto error_fatal;
1382 }
d2956687 1383 }
0c5b3718
SM
1384
1385 ret_func = 1;
1386
c5c7998f 1387end:
d2956687 1388 health_code_update();
0c5b3718 1389 return ret_func;
3bd1e081 1390}
d41f73b7 1391
94d49140
JD
1392/*
1393 * Sync metadata meaning request them to the session daemon and snapshot to the
1394 * metadata thread can consumer them.
1395 *
1396 * Metadata stream lock MUST be acquired.
94d49140 1397 */
28ab034a 1398enum sync_metadata_status lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
94d49140
JD
1399{
1400 int ret;
577eea73 1401 enum sync_metadata_status status;
94d49140 1402
a0377dfe 1403 LTTNG_ASSERT(metadata);
94d49140
JD
1404
1405 ret = kernctl_buffer_flush(metadata->wait_fd);
1406 if (ret < 0) {
1407 ERR("Failed to flush kernel stream");
577eea73 1408 status = SYNC_METADATA_STATUS_ERROR;
94d49140
JD
1409 goto end;
1410 }
1411
1412 ret = kernctl_snapshot(metadata->wait_fd);
1413 if (ret < 0) {
577eea73
JG
1414 if (errno == EAGAIN) {
1415 /* No new metadata, exit. */
1416 DBG("Sync metadata, no new kernel metadata");
1417 status = SYNC_METADATA_STATUS_NO_DATA;
1418 } else {
94d49140 1419 ERR("Sync metadata, taking kernel snapshot failed.");
577eea73 1420 status = SYNC_METADATA_STATUS_ERROR;
94d49140 1421 }
577eea73
JG
1422 } else {
1423 status = SYNC_METADATA_STATUS_NEW_DATA;
94d49140
JD
1424 }
1425
1426end:
577eea73 1427 return status;
94d49140 1428}
309167d2 1429
28ab034a
JG
1430static int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
1431 struct stream_subbuffer *subbuf)
fb83fe64
JD
1432{
1433 int ret;
fb83fe64 1434
28ab034a 1435 ret = kernctl_get_subbuf_size(stream->wait_fd, &subbuf->info.data.subbuf_size);
6f9449c2 1436 if (ret) {
fb83fe64
JD
1437 goto end;
1438 }
fb83fe64 1439
28ab034a
JG
1440 ret = kernctl_get_padded_subbuf_size(stream->wait_fd,
1441 &subbuf->info.data.padded_subbuf_size);
6f9449c2 1442 if (ret) {
fb83fe64
JD
1443 goto end;
1444 }
fb83fe64
JD
1445
1446end:
1447 return ret;
1448}
1449
28ab034a
JG
1450static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
1451 struct stream_subbuffer *subbuf)
93ec662e
JD
1452{
1453 int ret;
93ec662e 1454
6f9449c2
JG
1455 ret = extract_common_subbuffer_info(stream, subbuf);
1456 if (ret) {
93ec662e
JD
1457 goto end;
1458 }
1459
28ab034a 1460 ret = kernctl_get_metadata_version(stream->wait_fd, &subbuf->info.metadata.version);
6f9449c2 1461 if (ret) {
93ec662e
JD
1462 goto end;
1463 }
1464
93ec662e
JD
1465end:
1466 return ret;
1467}
1468
28ab034a
JG
1469static int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
1470 struct stream_subbuffer *subbuf)
d41f73b7 1471{
6f9449c2 1472 int ret;
d41f73b7 1473
6f9449c2
JG
1474 ret = extract_common_subbuffer_info(stream, subbuf);
1475 if (ret) {
1476 goto end;
1477 }
309167d2 1478
28ab034a 1479 ret = kernctl_get_packet_size(stream->wait_fd, &subbuf->info.data.packet_size);
6f9449c2
JG
1480 if (ret < 0) {
1481 PERROR("Failed to get sub-buffer packet size");
1482 goto end;
1483 }
02d02e31 1484
28ab034a 1485 ret = kernctl_get_content_size(stream->wait_fd, &subbuf->info.data.content_size);
6f9449c2
JG
1486 if (ret < 0) {
1487 PERROR("Failed to get sub-buffer content size");
1488 goto end;
d41f73b7
MD
1489 }
1490
28ab034a 1491 ret = kernctl_get_timestamp_begin(stream->wait_fd, &subbuf->info.data.timestamp_begin);
6f9449c2
JG
1492 if (ret < 0) {
1493 PERROR("Failed to get sub-buffer begin timestamp");
1494 goto end;
1d4dfdef
DG
1495 }
1496
28ab034a 1497 ret = kernctl_get_timestamp_end(stream->wait_fd, &subbuf->info.data.timestamp_end);
6f9449c2
JG
1498 if (ret < 0) {
1499 PERROR("Failed to get sub-buffer end timestamp");
1500 goto end;
1501 }
1502
28ab034a 1503 ret = kernctl_get_events_discarded(stream->wait_fd, &subbuf->info.data.events_discarded);
6f9449c2
JG
1504 if (ret) {
1505 PERROR("Failed to get sub-buffer events discarded count");
1506 goto end;
1507 }
1508
1509 ret = kernctl_get_sequence_number(stream->wait_fd,
28ab034a 1510 &subbuf->info.data.sequence_number.value);
6f9449c2
JG
1511 if (ret) {
1512 /* May not be supported by older LTTng-modules. */
1513 if (ret != -ENOTTY) {
1514 PERROR("Failed to get sub-buffer sequence number");
1515 goto end;
fb83fe64 1516 }
1c20f0e2 1517 } else {
6f9449c2 1518 subbuf->info.data.sequence_number.is_set = true;
309167d2
JD
1519 }
1520
28ab034a 1521 ret = kernctl_get_stream_id(stream->wait_fd, &subbuf->info.data.stream_id);
6f9449c2
JG
1522 if (ret < 0) {
1523 PERROR("Failed to get stream id");
1524 goto end;
1525 }
1d4dfdef 1526
28ab034a 1527 ret = kernctl_get_instance_id(stream->wait_fd, &subbuf->info.data.stream_instance_id.value);
6f9449c2
JG
1528 if (ret) {
1529 /* May not be supported by older LTTng-modules. */
1530 if (ret != -ENOTTY) {
1531 PERROR("Failed to get stream instance id");
1532 goto end;
1d4dfdef 1533 }
6f9449c2
JG
1534 } else {
1535 subbuf->info.data.stream_instance_id.is_set = true;
1536 }
1537end:
1538 return ret;
1539}
47e81c02 1540
28ab034a
JG
1541static enum get_next_subbuffer_status get_subbuffer_common(struct lttng_consumer_stream *stream,
1542 struct stream_subbuffer *subbuffer)
6f9449c2
JG
1543{
1544 int ret;
b6797c8e 1545 enum get_next_subbuffer_status status;
6f9449c2
JG
1546
1547 ret = kernctl_get_next_subbuf(stream->wait_fd);
b6797c8e
JG
1548 switch (ret) {
1549 case 0:
1550 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1551 break;
1552 case -ENODATA:
28ab034a 1553 case -EAGAIN:
6e5e3c51
MD
1554 /*
1555 * The caller only expects -ENODATA when there is no data to
1556 * read, but the kernel tracer returns -EAGAIN when there is
1557 * currently no data for a non-finalized stream, and -ENODATA
1558 * when there is no data for a finalized stream. Those can be
1559 * combined into a -ENODATA return value.
1560 */
b6797c8e
JG
1561 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1562 goto end;
1563 default:
1564 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
6f9449c2
JG
1565 goto end;
1566 }
1567
28ab034a 1568 ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer);
b6797c8e
JG
1569 if (ret) {
1570 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1571 }
6f9449c2 1572end:
b6797c8e 1573 return status;
6f9449c2 1574}
128708c3 1575
28ab034a
JG
1576static enum get_next_subbuffer_status
1577get_next_subbuffer_splice(struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer)
6f9449c2 1578{
28ab034a 1579 const enum get_next_subbuffer_status status = get_subbuffer_common(stream, subbuffer);
1d4dfdef 1580
b6797c8e 1581 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
6f9449c2
JG
1582 goto end;
1583 }
1d4dfdef 1584
6f9449c2
JG
1585 subbuffer->buffer.fd = stream->wait_fd;
1586end:
b6797c8e 1587 return status;
6f9449c2 1588}
fd424d99 1589
28ab034a
JG
1590static enum get_next_subbuffer_status get_next_subbuffer_mmap(struct lttng_consumer_stream *stream,
1591 struct stream_subbuffer *subbuffer)
6f9449c2
JG
1592{
1593 int ret;
b6797c8e 1594 enum get_next_subbuffer_status status;
6f9449c2
JG
1595 const char *addr;
1596
b6797c8e
JG
1597 status = get_subbuffer_common(stream, subbuffer);
1598 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
6f9449c2 1599 goto end;
128708c3 1600 }
6f9449c2
JG
1601
1602 ret = get_current_subbuf_addr(stream, &addr);
1603 if (ret) {
b6797c8e 1604 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
6f9449c2 1605 goto end;
d41f73b7 1606 }
6f9449c2 1607
28ab034a
JG
1608 subbuffer->buffer.buffer =
1609 lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size);
6f9449c2 1610end:
b6797c8e 1611 return status;
6f9449c2
JG
1612}
1613
28ab034a
JG
1614static enum get_next_subbuffer_status
1615get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
1616 struct stream_subbuffer *subbuffer)
f5ba75b4
JG
1617{
1618 int ret;
1619 const char *addr;
1620 bool coherent;
b6797c8e 1621 enum get_next_subbuffer_status status;
f5ba75b4 1622
28ab034a 1623 ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd, &coherent);
f5ba75b4
JG
1624 if (ret) {
1625 goto end;
1626 }
1627
28ab034a 1628 ret = stream->read_subbuffer_ops.extract_subbuffer_info(stream, subbuffer);
f5ba75b4
JG
1629 if (ret) {
1630 goto end;
1631 }
1632
1633 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
1634
1635 ret = get_current_subbuf_addr(stream, &addr);
1636 if (ret) {
1637 goto end;
1638 }
1639
28ab034a
JG
1640 subbuffer->buffer.buffer =
1641 lttng_buffer_view_init(addr, 0, subbuffer->info.data.padded_subbuf_size);
f5ba75b4 1642 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
28ab034a
JG
1643 subbuffer->info.metadata.padded_subbuf_size,
1644 coherent ? "true" : "false");
f5ba75b4 1645end:
6e5e3c51
MD
1646 /*
1647 * The caller only expects -ENODATA when there is no data to read, but
1648 * the kernel tracer returns -EAGAIN when there is currently no data
1649 * for a non-finalized stream, and -ENODATA when there is no data for a
1650 * finalized stream. Those can be combined into a -ENODATA return value.
1651 */
b6797c8e
JG
1652 switch (ret) {
1653 case 0:
1654 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1655 break;
1656 case -ENODATA:
1657 case -EAGAIN:
1658 /*
1659 * The caller only expects -ENODATA when there is no data to
1660 * read, but the kernel tracer returns -EAGAIN when there is
1661 * currently no data for a non-finalized stream, and -ENODATA
1662 * when there is no data for a finalized stream. Those can be
1663 * combined into a -ENODATA return value.
1664 */
1665 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1666 break;
1667 default:
1668 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1669 break;
6e5e3c51
MD
1670 }
1671
b6797c8e 1672 return status;
f5ba75b4
JG
1673}
1674
28ab034a
JG
1675static int put_next_subbuffer(struct lttng_consumer_stream *stream,
1676 struct stream_subbuffer *subbuffer __attribute__((unused)))
6f9449c2
JG
1677{
1678 const int ret = kernctl_put_next_subbuf(stream->wait_fd);
1679
1680 if (ret) {
1681 if (ret == -EFAULT) {
1682 PERROR("Error in unreserving sub buffer");
1683 } else if (ret == -EIO) {
d41f73b7 1684 /* Should never happen with newer LTTng versions */
6f9449c2 1685 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
d41f73b7 1686 }
d41f73b7
MD
1687 }
1688
6f9449c2
JG
1689 return ret;
1690}
1c20f0e2 1691
28ab034a 1692static bool is_get_next_check_metadata_available(int tracer_fd)
f5ba75b4 1693{
cd9adb8b 1694 const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, nullptr);
741e787b
JG
1695 const bool available = ret != -ENOTTY;
1696
1697 if (ret == 0) {
1698 /* get succeeded, make sure to put the subbuffer. */
1699 kernctl_put_subbuf(tracer_fd);
1700 }
1701
1702 return available;
f5ba75b4
JG
1703}
1704
28ab034a
JG
1705static int signal_metadata(struct lttng_consumer_stream *stream,
1706 struct lttng_consumer_local_data *ctx __attribute__((unused)))
091441eb
MD
1707{
1708 ASSERT_LOCKED(stream->metadata_rdv_lock);
1709 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
1710}
1711
28ab034a 1712static int lttng_kconsumer_set_stream_ops(struct lttng_consumer_stream *stream)
6f9449c2 1713{
f5ba75b4
JG
1714 int ret = 0;
1715
1716 if (stream->metadata_flag && stream->chan->is_live) {
1717 DBG("Attempting to enable metadata bucketization for live consumers");
1718 if (is_get_next_check_metadata_available(stream->wait_fd)) {
1719 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1720 stream->read_subbuffer_ops.get_next_subbuffer =
28ab034a
JG
1721 get_next_subbuffer_metadata_check;
1722 ret = consumer_stream_enable_metadata_bucketization(stream);
f5ba75b4
JG
1723 if (ret) {
1724 goto end;
1725 }
1726 } else {
1727 /*
1728 * The kernel tracer version is too old to indicate
1729 * when the metadata stream has reached a "coherent"
1730 * (parseable) point.
1731 *
1732 * This means that a live viewer may see an incoherent
1733 * sequence of metadata and fail to parse it.
1734 */
1735 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1736 metadata_bucket_destroy(stream->metadata_bucket);
cd9adb8b 1737 stream->metadata_bucket = nullptr;
f5ba75b4 1738 }
091441eb
MD
1739
1740 stream->read_subbuffer_ops.on_sleep = signal_metadata;
f5ba75b4
JG
1741 }
1742
1743 if (!stream->read_subbuffer_ops.get_next_subbuffer) {
1744 if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
28ab034a 1745 stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_mmap;
f5ba75b4 1746 } else {
28ab034a 1747 stream->read_subbuffer_ops.get_next_subbuffer = get_next_subbuffer_splice;
f5ba75b4 1748 }
94d49140
JD
1749 }
1750
6f9449c2 1751 if (stream->metadata_flag) {
28ab034a 1752 stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info;
6f9449c2 1753 } else {
28ab034a 1754 stream->read_subbuffer_ops.extract_subbuffer_info = extract_data_subbuffer_info;
6f9449c2 1755 if (stream->chan->is_live) {
28ab034a 1756 stream->read_subbuffer_ops.send_live_beacon = consumer_flush_kernel_index;
6f9449c2 1757 }
309167d2
JD
1758 }
1759
6f9449c2 1760 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
f5ba75b4
JG
1761end:
1762 return ret;
d41f73b7
MD
1763}
1764
1765int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1766{
1767 int ret;
ffe60014 1768
a0377dfe 1769 LTTNG_ASSERT(stream);
ffe60014 1770
2bba9e53 1771 /*
d2956687
JG
1772 * Don't create anything if this is set for streaming or if there is
1773 * no current trace chunk on the parent channel.
2bba9e53 1774 */
d2956687 1775 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
28ab034a 1776 stream->chan->trace_chunk) {
d2956687
JG
1777 ret = consumer_stream_create_output_files(stream, true);
1778 if (ret) {
fe4477ee
JD
1779 goto error;
1780 }
ffe60014 1781 }
d41f73b7 1782
d41f73b7
MD
1783 if (stream->output == LTTNG_EVENT_MMAP) {
1784 /* get the len of the mmap region */
1785 unsigned long mmap_len;
1786
1787 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1788 if (ret != 0) {
ffe60014 1789 PERROR("kernctl_get_mmap_len");
d41f73b7
MD
1790 goto error_close_fd;
1791 }
1792 stream->mmap_len = (size_t) mmap_len;
1793
28ab034a 1794 stream->mmap_base =
cd9adb8b 1795 mmap(nullptr, stream->mmap_len, PROT_READ, MAP_PRIVATE, stream->wait_fd, 0);
d41f73b7 1796 if (stream->mmap_base == MAP_FAILED) {
ffe60014 1797 PERROR("Error mmaping");
d41f73b7
MD
1798 ret = -1;
1799 goto error_close_fd;
1800 }
1801 }
1802
f5ba75b4
JG
1803 ret = lttng_kconsumer_set_stream_ops(stream);
1804 if (ret) {
1805 goto error_close_fd;
1806 }
6f9449c2 1807
d41f73b7
MD
1808 /* we return 0 to let the library handle the FD internally */
1809 return 0;
1810
1811error_close_fd:
2f225ce2 1812 if (stream->out_fd >= 0) {
d41f73b7
MD
1813 int err;
1814
1815 err = close(stream->out_fd);
a0377dfe 1816 LTTNG_ASSERT(!err);
2f225ce2 1817 stream->out_fd = -1;
d41f73b7
MD
1818 }
1819error:
1820 return ret;
1821}
1822
ca22feea
DG
1823/*
1824 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
1825 * stream. Consumer data lock MUST be acquired before calling this function
1826 * and the stream lock.
ca22feea 1827 *
6d805429 1828 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
1829 * data is available for trace viewer reading.
1830 */
6d805429 1831int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
1832{
1833 int ret;
1834
a0377dfe 1835 LTTNG_ASSERT(stream);
ca22feea 1836
873b9e9a
MD
1837 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1838 ret = 0;
1839 goto end;
1840 }
1841
ca22feea
DG
1842 ret = kernctl_get_next_subbuf(stream->wait_fd);
1843 if (ret == 0) {
1844 /* There is still data so let's put back this subbuffer. */
1845 ret = kernctl_put_subbuf(stream->wait_fd);
a0377dfe 1846 LTTNG_ASSERT(ret == 0);
28ab034a 1847 ret = 1; /* Data is pending */
4e9a4686 1848 goto end;
ca22feea
DG
1849 }
1850
6d805429
DG
1851 /* Data is NOT pending and ready to be read. */
1852 ret = 0;
ca22feea 1853
6efae65e
DG
1854end:
1855 return ret;
ca22feea 1856}
This page took 0.196821 seconds and 4 git commands to generate.