Build fix: missing stdio.h include in signal-helper.hpp
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
CommitLineData
3bd1e081 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3bd1e081 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
3bd1e081 7 *
3bd1e081
MD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
3bd1e081
MD
11#include <poll.h>
12#include <pthread.h>
13#include <stdlib.h>
14#include <string.h>
15#include <sys/mman.h>
16#include <sys/socket.h>
17#include <sys/types.h>
77c7c900 18#include <inttypes.h>
3bd1e081 19#include <unistd.h>
dbb5dfe6 20#include <sys/stat.h>
f5ba75b4 21#include <stdint.h>
3bd1e081 22
c9e313bc
SM
23#include <bin/lttng-consumerd/health-consumerd.hpp>
24#include <common/common.hpp>
25#include <common/kernel-ctl/kernel-ctl.hpp>
26#include <common/sessiond-comm/sessiond-comm.hpp>
27#include <common/sessiond-comm/relayd.hpp>
28#include <common/compat/fcntl.hpp>
29#include <common/compat/endian.hpp>
30#include <common/pipe.hpp>
31#include <common/relayd/relayd.hpp>
32#include <common/utils.hpp>
33#include <common/consumer/consumer-stream.hpp>
34#include <common/index/index.hpp>
35#include <common/consumer/consumer-timer.hpp>
36#include <common/optional.hpp>
37#include <common/buffer-view.hpp>
38#include <common/consumer/consumer.hpp>
39#include <common/consumer/metadata-bucket.hpp>
0857097f 40
c9e313bc 41#include "kernel-consumer.hpp"
3bd1e081 42
fa29bfbf 43extern struct lttng_consumer_global_data the_consumer_data;
3bd1e081 44extern int consumer_poll_timeout;
3bd1e081 45
3bd1e081
MD
46/*
47 * Take a snapshot for a specific fd
48 *
49 * Returns 0 on success, < 0 on error
50 */
ffe60014 51int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081
MD
52{
53 int ret = 0;
54 int infd = stream->wait_fd;
55
56 ret = kernctl_snapshot(infd);
d2d2f190
JD
57 /*
58 * -EAGAIN is not an error, it just means that there is no data to
59 * be read.
60 */
61 if (ret != 0 && ret != -EAGAIN) {
5a510c9f 62 PERROR("Getting sub-buffer snapshot.");
3bd1e081
MD
63 }
64
65 return ret;
66}
67
e9404c27
JG
68/*
69 * Sample consumed and produced positions for a specific fd.
70 *
71 * Returns 0 on success, < 0 on error.
72 */
73int lttng_kconsumer_sample_snapshot_positions(
74 struct lttng_consumer_stream *stream)
75{
a0377dfe 76 LTTNG_ASSERT(stream);
e9404c27
JG
77
78 return kernctl_snapshot_sample_positions(stream->wait_fd);
79}
80
3bd1e081
MD
81/*
82 * Get the produced position
83 *
84 * Returns 0 on success, < 0 on error
85 */
ffe60014 86int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
3bd1e081
MD
87 unsigned long *pos)
88{
89 int ret;
90 int infd = stream->wait_fd;
91
92 ret = kernctl_snapshot_get_produced(infd, pos);
93 if (ret != 0) {
5a510c9f 94 PERROR("kernctl_snapshot_get_produced");
3bd1e081
MD
95 }
96
97 return ret;
98}
99
07b86b52
JD
100/*
101 * Get the consumerd position
102 *
103 * Returns 0 on success, < 0 on error
104 */
105int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
106 unsigned long *pos)
107{
108 int ret;
109 int infd = stream->wait_fd;
110
111 ret = kernctl_snapshot_get_consumed(infd, pos);
112 if (ret != 0) {
5a510c9f 113 PERROR("kernctl_snapshot_get_consumed");
07b86b52
JD
114 }
115
116 return ret;
117}
118
128708c3
JG
119static
120int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
121 const char **addr)
122{
123 int ret;
124 unsigned long mmap_offset;
97535efa 125 const char *mmap_base = (const char *) stream->mmap_base;
128708c3
JG
126
127 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
128 if (ret < 0) {
129 PERROR("Failed to get mmap read offset");
130 goto error;
131 }
132
133 *addr = mmap_base + mmap_offset;
134error:
135 return ret;
136}
137
07b86b52
JD
138/*
139 * Take a snapshot of all the stream of a channel
3eb928aa 140 * RCU read-side lock must be held across this function to ensure existence of
947bd097 141 * channel.
07b86b52
JD
142 *
143 * Returns 0 on success, < 0 on error
144 */
f72bb42f
JG
145static int lttng_kconsumer_snapshot_channel(
146 struct lttng_consumer_channel *channel,
147 uint64_t key, char *path, uint64_t relayd_id,
f46376a1 148 uint64_t nb_packets_per_stream)
07b86b52
JD
149{
150 int ret;
07b86b52
JD
151 struct lttng_consumer_stream *stream;
152
6a00837f 153 DBG("Kernel consumer snapshot channel %" PRIu64, key);
07b86b52 154
947bd097
JR
155 /* Prevent channel modifications while we perform the snapshot.*/
156 pthread_mutex_lock(&channel->lock);
157
07b86b52
JD
158 rcu_read_lock();
159
07b86b52
JD
160 /* Splice is not supported yet for channel snapshot. */
161 if (channel->output != CONSUMER_CHANNEL_MMAP) {
9381314c
JG
162 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
163 channel->name);
07b86b52
JD
164 ret = -1;
165 goto end;
166 }
167
10a50311 168 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
923333cd 169 unsigned long consumed_pos, produced_pos;
9ce5646a
MD
170
171 health_code_update();
172
07b86b52
JD
173 /*
174 * Lock stream because we are about to change its state.
175 */
176 pthread_mutex_lock(&stream->lock);
177
a0377dfe 178 LTTNG_ASSERT(channel->trace_chunk);
d2956687
JG
179 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
180 /*
181 * Can't happen barring an internal error as the channel
182 * holds a reference to the trace chunk.
183 */
184 ERR("Failed to acquire reference to channel's trace chunk");
185 ret = -1;
186 goto end_unlock;
187 }
a0377dfe 188 LTTNG_ASSERT(!stream->trace_chunk);
d2956687
JG
189 stream->trace_chunk = channel->trace_chunk;
190
29decac3
DG
191 /*
192 * Assign the received relayd ID so we can use it for streaming. The streams
193 * are not visible to anyone so this is OK to change it.
194 */
07b86b52
JD
195 stream->net_seq_idx = relayd_id;
196 channel->relayd_id = relayd_id;
197 if (relayd_id != (uint64_t) -1ULL) {
10a50311 198 ret = consumer_send_relayd_stream(stream, path);
07b86b52
JD
199 if (ret < 0) {
200 ERR("sending stream to relayd");
d119bd01 201 goto error_close_stream_output;
07b86b52 202 }
07b86b52 203 } else {
d2956687
JG
204 ret = consumer_stream_create_output_files(stream,
205 false);
07b86b52 206 if (ret < 0) {
d119bd01 207 goto error_close_stream_output;
07b86b52 208 }
d2956687
JG
209 DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
210 stream->key);
07b86b52
JD
211 }
212
f22dd891 213 ret = kernctl_buffer_flush_empty(stream->wait_fd);
07b86b52 214 if (ret < 0) {
f22dd891
MD
215 /*
216 * Doing a buffer flush which does not take into
217 * account empty packets. This is not perfect
218 * for stream intersection, but required as a
219 * fall-back when "flush_empty" is not
220 * implemented by lttng-modules.
221 */
222 ret = kernctl_buffer_flush(stream->wait_fd);
223 if (ret < 0) {
224 ERR("Failed to flush kernel stream");
d119bd01 225 goto error_close_stream_output;
f22dd891 226 }
07b86b52
JD
227 goto end_unlock;
228 }
229
230 ret = lttng_kconsumer_take_snapshot(stream);
231 if (ret < 0) {
232 ERR("Taking kernel snapshot");
d119bd01 233 goto error_close_stream_output;
07b86b52
JD
234 }
235
236 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
237 if (ret < 0) {
238 ERR("Produced kernel snapshot position");
d119bd01 239 goto error_close_stream_output;
07b86b52
JD
240 }
241
242 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
243 if (ret < 0) {
244 ERR("Consumerd kernel snapshot position");
d119bd01 245 goto error_close_stream_output;
07b86b52
JD
246 }
247
d07ceecd
MD
248 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
249 produced_pos, nb_packets_per_stream,
250 stream->max_sb_size);
5c786ded 251
9377d830 252 while ((long) (consumed_pos - produced_pos) < 0) {
07b86b52
JD
253 ssize_t read_len;
254 unsigned long len, padded_len;
128708c3 255 const char *subbuf_addr;
fd424d99 256 struct lttng_buffer_view subbuf_view;
07b86b52 257
9ce5646a 258 health_code_update();
07b86b52
JD
259 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
260
261 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
262 if (ret < 0) {
32af2c95 263 if (ret != -EAGAIN) {
07b86b52 264 PERROR("kernctl_get_subbuf snapshot");
d119bd01 265 goto error_close_stream_output;
07b86b52
JD
266 }
267 DBG("Kernel consumer get subbuf failed. Skipping it.");
268 consumed_pos += stream->max_sb_size;
ddc93ee4 269 stream->chan->lost_packets++;
07b86b52
JD
270 continue;
271 }
272
273 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
274 if (ret < 0) {
275 ERR("Snapshot kernctl_get_subbuf_size");
29decac3 276 goto error_put_subbuf;
07b86b52
JD
277 }
278
279 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
280 if (ret < 0) {
281 ERR("Snapshot kernctl_get_padded_subbuf_size");
29decac3 282 goto error_put_subbuf;
07b86b52
JD
283 }
284
128708c3
JG
285 ret = get_current_subbuf_addr(stream, &subbuf_addr);
286 if (ret) {
287 goto error_put_subbuf;
288 }
289
fd424d99
JG
290 subbuf_view = lttng_buffer_view_init(
291 subbuf_addr, 0, padded_len);
f5ba75b4 292 read_len = lttng_consumer_on_read_subbuffer_mmap(
fd424d99 293 stream, &subbuf_view,
6f9449c2 294 padded_len - len);
07b86b52 295 /*
29decac3
DG
296 * We write the padded len in local tracefiles but the data len
297 * when using a relay. Display the error but continue processing
298 * to try to release the subbuffer.
07b86b52
JD
299 */
300 if (relayd_id != (uint64_t) -1ULL) {
301 if (read_len != len) {
302 ERR("Error sending to the relay (ret: %zd != len: %lu)",
303 read_len, len);
304 }
305 } else {
306 if (read_len != padded_len) {
307 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
308 read_len, padded_len);
309 }
310 }
311
312 ret = kernctl_put_subbuf(stream->wait_fd);
313 if (ret < 0) {
314 ERR("Snapshot kernctl_put_subbuf");
d119bd01 315 goto error_close_stream_output;
07b86b52
JD
316 }
317 consumed_pos += stream->max_sb_size;
318 }
319
d119bd01 320 consumer_stream_close_output(stream);
07b86b52
JD
321 pthread_mutex_unlock(&stream->lock);
322 }
323
324 /* All good! */
325 ret = 0;
326 goto end;
327
29decac3
DG
328error_put_subbuf:
329 ret = kernctl_put_subbuf(stream->wait_fd);
330 if (ret < 0) {
331 ERR("Snapshot kernctl_put_subbuf error path");
332 }
d119bd01
JG
333error_close_stream_output:
334 consumer_stream_close_output(stream);
07b86b52
JD
335end_unlock:
336 pthread_mutex_unlock(&stream->lock);
337end:
338 rcu_read_unlock();
947bd097 339 pthread_mutex_unlock(&channel->lock);
07b86b52
JD
340 return ret;
341}
342
343/*
344 * Read the whole metadata available for a snapshot.
3eb928aa 345 * RCU read-side lock must be held across this function to ensure existence of
947bd097 346 * metadata_channel.
07b86b52
JD
347 *
348 * Returns 0 on success, < 0 on error
349 */
d2956687
JG
350static int lttng_kconsumer_snapshot_metadata(
351 struct lttng_consumer_channel *metadata_channel,
3eb928aa
MD
352 uint64_t key, char *path, uint64_t relayd_id,
353 struct lttng_consumer_local_data *ctx)
07b86b52 354{
d771f832
DG
355 int ret, use_relayd = 0;
356 ssize_t ret_read;
07b86b52 357 struct lttng_consumer_stream *metadata_stream;
d771f832 358
a0377dfe 359 LTTNG_ASSERT(ctx);
07b86b52
JD
360
361 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
362 key, path);
363
364 rcu_read_lock();
365
07b86b52 366 metadata_stream = metadata_channel->metadata_stream;
a0377dfe 367 LTTNG_ASSERT(metadata_stream);
d2956687 368
947bd097 369 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
a0377dfe
FD
370 LTTNG_ASSERT(metadata_channel->trace_chunk);
371 LTTNG_ASSERT(metadata_stream->trace_chunk);
07b86b52 372
d771f832 373 /* Flag once that we have a valid relayd for the stream. */
e2039c7a 374 if (relayd_id != (uint64_t) -1ULL) {
d771f832
DG
375 use_relayd = 1;
376 }
377
378 if (use_relayd) {
10a50311 379 ret = consumer_send_relayd_stream(metadata_stream, path);
e2039c7a 380 if (ret < 0) {
fa27abe8 381 goto error_snapshot;
e2039c7a 382 }
e2039c7a 383 } else {
d2956687
JG
384 ret = consumer_stream_create_output_files(metadata_stream,
385 false);
e2039c7a 386 if (ret < 0) {
fa27abe8 387 goto error_snapshot;
e2039c7a 388 }
07b86b52 389 }
07b86b52 390
d771f832 391 do {
9ce5646a
MD
392 health_code_update();
393
6f9449c2 394 ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
d771f832 395 if (ret_read < 0) {
6e5e3c51
MD
396 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
397 ret_read);
398 ret = ret_read;
399 goto error_snapshot;
07b86b52 400 }
6e5e3c51 401 } while (ret_read > 0);
07b86b52 402
d771f832
DG
403 if (use_relayd) {
404 close_relayd_stream(metadata_stream);
405 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
406 } else {
fdf9986c
MD
407 if (metadata_stream->out_fd >= 0) {
408 ret = close(metadata_stream->out_fd);
409 if (ret < 0) {
410 PERROR("Kernel consumer snapshot metadata close out_fd");
411 /*
412 * Don't go on error here since the snapshot was successful at this
413 * point but somehow the close failed.
414 */
415 }
416 metadata_stream->out_fd = -1;
d2956687
JG
417 lttng_trace_chunk_put(metadata_stream->trace_chunk);
418 metadata_stream->trace_chunk = NULL;
e2039c7a 419 }
e2039c7a
JD
420 }
421
07b86b52 422 ret = 0;
fa27abe8 423error_snapshot:
947bd097 424 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
cf53a8a6
JD
425 consumer_stream_destroy(metadata_stream, NULL);
426 metadata_channel->metadata_stream = NULL;
07b86b52
JD
427 rcu_read_unlock();
428 return ret;
429}
430
1803a064
MD
431/*
432 * Receive command from session daemon and process it.
433 *
434 * Return 1 on success else a negative value or 0.
435 */
3bd1e081
MD
436int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
437 int sock, struct pollfd *consumer_sockpoll)
438{
0c5b3718 439 int ret_func;
0c759fc9 440 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3bd1e081
MD
441 struct lttcomm_consumer_msg msg;
442
9ce5646a
MD
443 health_code_update();
444
0c5b3718
SM
445 {
446 ssize_t ret_recv;
447
448 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
449 if (ret_recv != sizeof(msg)) {
450 if (ret_recv > 0) {
451 lttng_consumer_send_error(ctx,
452 LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
453 ret_recv = -1;
454 }
455 return ret_recv;
1803a064 456 }
3bd1e081 457 }
9ce5646a
MD
458
459 health_code_update();
460
84382d49 461 /* Deprecated command */
a0377dfe 462 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
3bd1e081 463
9ce5646a
MD
464 health_code_update();
465
b0b335c8
MD
466 /* relayd needs RCU read-side protection */
467 rcu_read_lock();
468
3bd1e081 469 switch (msg.cmd_type) {
00e2e675
DG
470 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
471 {
4222116f
JR
472 uint32_t major = msg.u.relayd_sock.major;
473 uint32_t minor = msg.u.relayd_sock.minor;
474 enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto)
475 msg.u.relayd_sock.relayd_socket_protocol;
476
f50f23d9 477 /* Session daemon status message are handled in the following call. */
2527bf85 478 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
4222116f
JR
479 msg.u.relayd_sock.type, ctx, sock,
480 consumer_sockpoll, msg.u.relayd_sock.session_id,
481 msg.u.relayd_sock.relayd_session_id, major,
482 minor, protocol);
00e2e675
DG
483 goto end_nosignal;
484 }
3bd1e081
MD
485 case LTTNG_CONSUMER_ADD_CHANNEL:
486 {
487 struct lttng_consumer_channel *new_channel;
afbf29db 488 int ret_send_status, ret_add_channel = 0;
d2956687 489 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
3bd1e081 490
9ce5646a
MD
491 health_code_update();
492
f50f23d9 493 /* First send a status message before receiving the fds. */
0c5b3718
SM
494 ret_send_status = consumer_send_status_msg(sock, ret_code);
495 if (ret_send_status < 0) {
f50f23d9 496 /* Somehow, the session daemon is not responding anymore. */
1803a064 497 goto error_fatal;
f50f23d9 498 }
9ce5646a
MD
499
500 health_code_update();
501
d88aee68 502 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
3bd1e081 503 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
d2956687
JG
504 msg.u.channel.session_id,
505 msg.u.channel.chunk_id.is_set ?
506 &chunk_id : NULL,
507 msg.u.channel.pathname,
508 msg.u.channel.name,
1624d5b7
JD
509 msg.u.channel.relayd_id, msg.u.channel.output,
510 msg.u.channel.tracefile_size,
1950109e 511 msg.u.channel.tracefile_count, 0,
ecc48a90 512 msg.u.channel.monitor,
d7ba1388 513 msg.u.channel.live_timer_interval,
a2814ea7 514 msg.u.channel.is_live,
3d071855 515 NULL, NULL);
3bd1e081 516 if (new_channel == NULL) {
f73fabfd 517 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
3bd1e081
MD
518 goto end_nosignal;
519 }
ffe60014 520 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
95a1109b
JD
521 switch (msg.u.channel.output) {
522 case LTTNG_EVENT_SPLICE:
523 new_channel->output = CONSUMER_CHANNEL_SPLICE;
524 break;
525 case LTTNG_EVENT_MMAP:
526 new_channel->output = CONSUMER_CHANNEL_MMAP;
527 break;
528 default:
529 ERR("Channel output unknown %d", msg.u.channel.output);
530 goto end_nosignal;
531 }
ffe60014
DG
532
533 /* Translate and save channel type. */
534 switch (msg.u.channel.type) {
535 case CONSUMER_CHANNEL_TYPE_DATA:
536 case CONSUMER_CHANNEL_TYPE_METADATA:
97535efa 537 new_channel->type = (consumer_channel_type) msg.u.channel.type;
ffe60014
DG
538 break;
539 default:
a0377dfe 540 abort();
ffe60014
DG
541 goto end_nosignal;
542 };
543
9ce5646a
MD
544 health_code_update();
545
3bd1e081 546 if (ctx->on_recv_channel != NULL) {
0c5b3718
SM
547 int ret_recv_channel =
548 ctx->on_recv_channel(new_channel);
549 if (ret_recv_channel == 0) {
550 ret_add_channel = consumer_add_channel(
551 new_channel, ctx);
552 } else if (ret_recv_channel < 0) {
3bd1e081
MD
553 goto end_nosignal;
554 }
555 } else {
0c5b3718
SM
556 ret_add_channel =
557 consumer_add_channel(new_channel, ctx);
3bd1e081 558 }
0c5b3718
SM
559 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA &&
560 !ret_add_channel) {
e9404c27
JG
561 int monitor_start_ret;
562
563 DBG("Consumer starting monitor timer");
94d49140
JD
564 consumer_timer_live_start(new_channel,
565 msg.u.channel.live_timer_interval);
e9404c27
JG
566 monitor_start_ret = consumer_timer_monitor_start(
567 new_channel,
568 msg.u.channel.monitor_timer_interval);
569 if (monitor_start_ret < 0) {
570 ERR("Starting channel monitoring timer failed");
571 goto end_nosignal;
572 }
94d49140 573 }
e43c41c5 574
9ce5646a
MD
575 health_code_update();
576
e43c41c5 577 /* If we received an error in add_channel, we need to report it. */
0c5b3718
SM
578 if (ret_add_channel < 0) {
579 ret_send_status = consumer_send_status_msg(
580 sock, ret_add_channel);
581 if (ret_send_status < 0) {
1803a064
MD
582 goto error_fatal;
583 }
e43c41c5
JD
584 goto end_nosignal;
585 }
586
3bd1e081
MD
587 goto end_nosignal;
588 }
589 case LTTNG_CONSUMER_ADD_STREAM:
590 {
dae10966
DG
591 int fd;
592 struct lttng_pipe *stream_pipe;
00e2e675 593 struct lttng_consumer_stream *new_stream;
ffe60014 594 struct lttng_consumer_channel *channel;
c80048c6 595 int alloc_ret = 0;
0c5b3718
SM
596 int ret_send_status, ret_poll, ret_get_max_subbuf_size;
597 ssize_t ret_pipe_write, ret_recv;
3bd1e081 598
ffe60014
DG
599 /*
600 * Get stream's channel reference. Needed when adding the stream to the
601 * global hash table.
602 */
603 channel = consumer_find_channel(msg.u.stream.channel_key);
604 if (!channel) {
605 /*
606 * We could not find the channel. Can happen if cpu hotplug
607 * happens while tearing down.
608 */
d88aee68 609 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
e462382a 610 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
ffe60014
DG
611 }
612
9ce5646a
MD
613 health_code_update();
614
f50f23d9 615 /* First send a status message before receiving the fds. */
0c5b3718
SM
616 ret_send_status = consumer_send_status_msg(sock, ret_code);
617 if (ret_send_status < 0) {
d771f832 618 /* Somehow, the session daemon is not responding anymore. */
c5c7998f 619 goto error_add_stream_fatal;
1803a064 620 }
9ce5646a
MD
621
622 health_code_update();
623
0c759fc9 624 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
d771f832 625 /* Channel was not found. */
c5c7998f 626 goto error_add_stream_nosignal;
f50f23d9
DG
627 }
628
d771f832 629 /* Blocking call */
9ce5646a 630 health_poll_entry();
0c5b3718 631 ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
9ce5646a 632 health_poll_exit();
0c5b3718 633 if (ret_poll) {
c5c7998f 634 goto error_add_stream_fatal;
3bd1e081 635 }
00e2e675 636
9ce5646a
MD
637 health_code_update();
638
00e2e675 639 /* Get stream file descriptor from socket */
0c5b3718
SM
640 ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
641 if (ret_recv != sizeof(fd)) {
f73fabfd 642 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
0c5b3718 643 ret_func = ret_recv;
c5c7998f 644 goto end;
3bd1e081 645 }
3bd1e081 646
9ce5646a
MD
647 health_code_update();
648
f50f23d9
DG
649 /*
650 * Send status code to session daemon only if the recv works. If the
651 * above recv() failed, the session daemon is notified through the
652 * error socket and the teardown is eventually done.
653 */
0c5b3718
SM
654 ret_send_status = consumer_send_status_msg(sock, ret_code);
655 if (ret_send_status < 0) {
f50f23d9 656 /* Somehow, the session daemon is not responding anymore. */
c5c7998f 657 goto error_add_stream_nosignal;
f50f23d9
DG
658 }
659
9ce5646a
MD
660 health_code_update();
661
d2956687 662 pthread_mutex_lock(&channel->lock);
6f9449c2 663 new_stream = consumer_stream_create(
49f45573
JG
664 channel,
665 channel->key,
ffe60014 666 fd,
ffe60014 667 channel->name,
ffe60014
DG
668 channel->relayd_id,
669 channel->session_id,
d2956687 670 channel->trace_chunk,
ffe60014
DG
671 msg.u.stream.cpu,
672 &alloc_ret,
4891ece8 673 channel->type,
d2956687 674 channel->monitor);
3bd1e081 675 if (new_stream == NULL) {
c80048c6
MD
676 switch (alloc_ret) {
677 case -ENOMEM:
678 case -EINVAL:
679 default:
680 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
681 break;
c80048c6 682 }
d2956687 683 pthread_mutex_unlock(&channel->lock);
c5c7998f 684 goto error_add_stream_nosignal;
3bd1e081 685 }
d771f832 686
ffe60014 687 new_stream->wait_fd = fd;
0c5b3718
SM
688 ret_get_max_subbuf_size = kernctl_get_max_subbuf_size(
689 new_stream->wait_fd, &new_stream->max_sb_size);
690 if (ret_get_max_subbuf_size < 0) {
d05185fa
JG
691 pthread_mutex_unlock(&channel->lock);
692 ERR("Failed to get kernel maximal subbuffer size");
c5c7998f 693 goto error_add_stream_nosignal;
d05185fa
JG
694 }
695
d9a2e16e
JD
696 consumer_stream_update_channel_attributes(new_stream,
697 channel);
00e2e675 698
a0c83db9
DG
699 /*
700 * We've just assigned the channel to the stream so increment the
07b86b52
JD
701 * refcount right now. We don't need to increment the refcount for
702 * streams in no monitor because we handle manually the cleanup of
703 * those. It is very important to make sure there is NO prior
704 * consumer_del_stream() calls or else the refcount will be unbalanced.
a0c83db9 705 */
07b86b52
JD
706 if (channel->monitor) {
707 uatomic_inc(&new_stream->chan->refcount);
708 }
9d9353f9 709
fb3a43a9
DG
710 /*
711 * The buffer flush is done on the session daemon side for the kernel
712 * so no need for the stream "hangup_flush_done" variable to be
713 * tracked. This is important for a kernel stream since we don't rely
714 * on the flush state of the stream to read data. It's not the case for
715 * user space tracing.
716 */
717 new_stream->hangup_flush_done = 0;
718
9ce5646a
MD
719 health_code_update();
720
d2956687 721 pthread_mutex_lock(&new_stream->lock);
633d0084 722 if (ctx->on_recv_stream) {
0c5b3718
SM
723 int ret_recv_stream = ctx->on_recv_stream(new_stream);
724 if (ret_recv_stream < 0) {
d2956687
JG
725 pthread_mutex_unlock(&new_stream->lock);
726 pthread_mutex_unlock(&channel->lock);
d771f832 727 consumer_stream_free(new_stream);
c5c7998f 728 goto error_add_stream_nosignal;
fb3a43a9 729 }
633d0084 730 }
9ce5646a
MD
731 health_code_update();
732
07b86b52
JD
733 if (new_stream->metadata_flag) {
734 channel->metadata_stream = new_stream;
735 }
736
2bba9e53
DG
737 /* Do not monitor this stream. */
738 if (!channel->monitor) {
5eecee74 739 DBG("Kernel consumer add stream %s in no monitor mode with "
6dc3064a 740 "relayd id %" PRIu64, new_stream->name,
5eecee74 741 new_stream->net_seq_idx);
10a50311 742 cds_list_add(&new_stream->send_node, &channel->streams.head);
d2956687
JG
743 pthread_mutex_unlock(&new_stream->lock);
744 pthread_mutex_unlock(&channel->lock);
c5c7998f 745 goto end_add_stream;
6dc3064a
DG
746 }
747
e1b71bdc
DG
748 /* Send stream to relayd if the stream has an ID. */
749 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
0c5b3718
SM
750 int ret_send_relayd_stream;
751
752 ret_send_relayd_stream = consumer_send_relayd_stream(
753 new_stream, new_stream->chan->pathname);
754 if (ret_send_relayd_stream < 0) {
d2956687
JG
755 pthread_mutex_unlock(&new_stream->lock);
756 pthread_mutex_unlock(&channel->lock);
e1b71bdc 757 consumer_stream_free(new_stream);
c5c7998f 758 goto error_add_stream_nosignal;
e1b71bdc 759 }
001b7e62
MD
760
761 /*
762 * If adding an extra stream to an already
763 * existing channel (e.g. cpu hotplug), we need
764 * to send the "streams_sent" command to relayd.
765 */
766 if (channel->streams_sent_to_relayd) {
0c5b3718
SM
767 int ret_send_relayd_streams_sent;
768
769 ret_send_relayd_streams_sent =
770 consumer_send_relayd_streams_sent(
771 new_stream->net_seq_idx);
772 if (ret_send_relayd_streams_sent < 0) {
d2956687
JG
773 pthread_mutex_unlock(&new_stream->lock);
774 pthread_mutex_unlock(&channel->lock);
c5c7998f 775 goto error_add_stream_nosignal;
001b7e62
MD
776 }
777 }
e2039c7a 778 }
d2956687
JG
779 pthread_mutex_unlock(&new_stream->lock);
780 pthread_mutex_unlock(&channel->lock);
e2039c7a 781
50f8ae69 782 /* Get the right pipe where the stream will be sent. */
633d0084 783 if (new_stream->metadata_flag) {
66d583dc 784 consumer_add_metadata_stream(new_stream);
dae10966 785 stream_pipe = ctx->consumer_metadata_pipe;
3bd1e081 786 } else {
66d583dc 787 consumer_add_data_stream(new_stream);
dae10966 788 stream_pipe = ctx->consumer_data_pipe;
50f8ae69
DG
789 }
790
66d583dc 791 /* Visible to other threads */
5ab66908
MD
792 new_stream->globally_visible = 1;
793
9ce5646a
MD
794 health_code_update();
795
0c5b3718
SM
796 ret_pipe_write = lttng_pipe_write(
797 stream_pipe, &new_stream, sizeof(new_stream));
798 if (ret_pipe_write < 0) {
dae10966 799 ERR("Consumer write %s stream to pipe %d",
50f8ae69 800 new_stream->metadata_flag ? "metadata" : "data",
dae10966 801 lttng_pipe_get_writefd(stream_pipe));
5ab66908
MD
802 if (new_stream->metadata_flag) {
803 consumer_del_stream_for_metadata(new_stream);
804 } else {
805 consumer_del_stream_for_data(new_stream);
806 }
c5c7998f 807 goto error_add_stream_nosignal;
3bd1e081 808 }
00e2e675 809
02d02e31
JD
810 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
811 new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
c5c7998f 812end_add_stream:
3bd1e081 813 break;
c5c7998f
JG
814error_add_stream_nosignal:
815 goto end_nosignal;
816error_add_stream_fatal:
817 goto error_fatal;
3bd1e081 818 }
a4baae1b
JD
819 case LTTNG_CONSUMER_STREAMS_SENT:
820 {
821 struct lttng_consumer_channel *channel;
0c5b3718 822 int ret_send_status;
a4baae1b
JD
823
824 /*
825 * Get stream's channel reference. Needed when adding the stream to the
826 * global hash table.
827 */
828 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
829 if (!channel) {
830 /*
831 * We could not find the channel. Can happen if cpu hotplug
832 * happens while tearing down.
833 */
834 ERR("Unable to find channel key %" PRIu64,
835 msg.u.sent_streams.channel_key);
e462382a 836 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
a4baae1b
JD
837 }
838
839 health_code_update();
840
841 /*
842 * Send status code to session daemon.
843 */
0c5b3718
SM
844 ret_send_status = consumer_send_status_msg(sock, ret_code);
845 if (ret_send_status < 0 ||
846 ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
a4baae1b 847 /* Somehow, the session daemon is not responding anymore. */
80d5a658 848 goto error_streams_sent_nosignal;
a4baae1b
JD
849 }
850
851 health_code_update();
852
853 /*
854 * We should not send this message if we don't monitor the
855 * streams in this channel.
856 */
857 if (!channel->monitor) {
80d5a658 858 goto end_error_streams_sent;
a4baae1b
JD
859 }
860
861 health_code_update();
862 /* Send stream to relayd if the stream has an ID. */
863 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
0c5b3718
SM
864 int ret_send_relay_streams;
865
866 ret_send_relay_streams = consumer_send_relayd_streams_sent(
a4baae1b 867 msg.u.sent_streams.net_seq_idx);
0c5b3718 868 if (ret_send_relay_streams < 0) {
80d5a658 869 goto error_streams_sent_nosignal;
a4baae1b 870 }
001b7e62 871 channel->streams_sent_to_relayd = true;
a4baae1b 872 }
80d5a658 873end_error_streams_sent:
a4baae1b 874 break;
80d5a658
JG
875error_streams_sent_nosignal:
876 goto end_nosignal;
a4baae1b 877 }
3bd1e081
MD
878 case LTTNG_CONSUMER_UPDATE_STREAM:
879 {
3f8e211f
DG
880 rcu_read_unlock();
881 return -ENOSYS;
882 }
883 case LTTNG_CONSUMER_DESTROY_RELAYD:
884 {
a6ba4fe1 885 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
3f8e211f 886 struct consumer_relayd_sock_pair *relayd;
0c5b3718 887 int ret_send_status;
3f8e211f 888
a6ba4fe1 889 DBG("Kernel consumer destroying relayd %" PRIu64, index);
3f8e211f
DG
890
891 /* Get relayd reference if exists. */
a6ba4fe1 892 relayd = consumer_find_relayd(index);
3f8e211f 893 if (relayd == NULL) {
3448e266 894 DBG("Unable to find relayd %" PRIu64, index);
e462382a 895 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
3bd1e081 896 }
3f8e211f 897
a6ba4fe1
DG
898 /*
899 * Each relayd socket pair has a refcount of stream attached to it
900 * which tells if the relayd is still active or not depending on the
901 * refcount value.
902 *
903 * This will set the destroy flag of the relayd object and destroy it
904 * if the refcount reaches zero when called.
905 *
906 * The destroy can happen either here or when a stream fd hangs up.
907 */
f50f23d9
DG
908 if (relayd) {
909 consumer_flag_relayd_for_destroy(relayd);
910 }
911
9ce5646a
MD
912 health_code_update();
913
0c5b3718
SM
914 ret_send_status = consumer_send_status_msg(sock, ret_code);
915 if (ret_send_status < 0) {
f50f23d9 916 /* Somehow, the session daemon is not responding anymore. */
1803a064 917 goto error_fatal;
f50f23d9 918 }
3f8e211f 919
3f8e211f 920 goto end_nosignal;
3bd1e081 921 }
6d805429 922 case LTTNG_CONSUMER_DATA_PENDING:
53632229 923 {
0c5b3718 924 int32_t ret_data_pending;
6d805429 925 uint64_t id = msg.u.data_pending.session_id;
0c5b3718 926 ssize_t ret_send;
c8f59ee5 927
6d805429 928 DBG("Kernel consumer data pending command for id %" PRIu64, id);
c8f59ee5 929
0c5b3718 930 ret_data_pending = consumer_data_pending(id);
c8f59ee5 931
9ce5646a
MD
932 health_code_update();
933
c8f59ee5 934 /* Send back returned value to session daemon */
0c5b3718
SM
935 ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
936 sizeof(ret_data_pending));
937 if (ret_send < 0) {
6d805429 938 PERROR("send data pending ret code");
1803a064 939 goto error_fatal;
c8f59ee5 940 }
f50f23d9
DG
941
942 /*
943 * No need to send back a status message since the data pending
944 * returned value is the response.
945 */
c8f59ee5 946 break;
53632229 947 }
6dc3064a
DG
948 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
949 {
3eb928aa
MD
950 struct lttng_consumer_channel *channel;
951 uint64_t key = msg.u.snapshot_channel.key;
0c5b3718 952 int ret_send_status;
3eb928aa
MD
953
954 channel = consumer_find_channel(key);
955 if (!channel) {
956 ERR("Channel %" PRIu64 " not found", key);
957 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
07b86b52 958 } else {
3eb928aa 959 if (msg.u.snapshot_channel.metadata == 1) {
0c5b3718
SM
960 int ret_snapshot;
961
962 ret_snapshot = lttng_kconsumer_snapshot_metadata(
963 channel, key,
3eb928aa 964 msg.u.snapshot_channel.pathname,
0c5b3718
SM
965 msg.u.snapshot_channel.relayd_id,
966 ctx);
967 if (ret_snapshot < 0) {
3eb928aa
MD
968 ERR("Snapshot metadata failed");
969 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
970 }
971 } else {
0c5b3718
SM
972 int ret_snapshot;
973
974 ret_snapshot = lttng_kconsumer_snapshot_channel(
975 channel, key,
3eb928aa
MD
976 msg.u.snapshot_channel.pathname,
977 msg.u.snapshot_channel.relayd_id,
0c5b3718 978 msg.u.snapshot_channel
f46376a1 979 .nb_packets_per_stream);
0c5b3718 980 if (ret_snapshot < 0) {
3eb928aa
MD
981 ERR("Snapshot channel failed");
982 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
983 }
07b86b52
JD
984 }
985 }
9ce5646a
MD
986 health_code_update();
987
0c5b3718
SM
988 ret_send_status = consumer_send_status_msg(sock, ret_code);
989 if (ret_send_status < 0) {
6dc3064a
DG
990 /* Somehow, the session daemon is not responding anymore. */
991 goto end_nosignal;
992 }
993 break;
994 }
07b86b52
JD
995 case LTTNG_CONSUMER_DESTROY_CHANNEL:
996 {
997 uint64_t key = msg.u.destroy_channel.key;
998 struct lttng_consumer_channel *channel;
0c5b3718 999 int ret_send_status;
07b86b52
JD
1000
1001 channel = consumer_find_channel(key);
1002 if (!channel) {
1003 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
e462382a 1004 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
07b86b52
JD
1005 }
1006
9ce5646a
MD
1007 health_code_update();
1008
0c5b3718
SM
1009 ret_send_status = consumer_send_status_msg(sock, ret_code);
1010 if (ret_send_status < 0) {
07b86b52 1011 /* Somehow, the session daemon is not responding anymore. */
a9d36096 1012 goto end_destroy_channel;
07b86b52
JD
1013 }
1014
9ce5646a
MD
1015 health_code_update();
1016
15dc512a
DG
1017 /* Stop right now if no channel was found. */
1018 if (!channel) {
a9d36096 1019 goto end_destroy_channel;
15dc512a
DG
1020 }
1021
07b86b52
JD
1022 /*
1023 * This command should ONLY be issued for channel with streams set in
1024 * no monitor mode.
1025 */
a0377dfe 1026 LTTNG_ASSERT(!channel->monitor);
07b86b52
JD
1027
1028 /*
1029 * The refcount should ALWAYS be 0 in the case of a channel in no
1030 * monitor mode.
1031 */
a0377dfe 1032 LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1));
07b86b52
JD
1033
1034 consumer_del_channel(channel);
a9d36096 1035end_destroy_channel:
07b86b52
JD
1036 goto end_nosignal;
1037 }
fb83fe64
JD
1038 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1039 {
66ab32be
JD
1040 ssize_t ret;
1041 uint64_t count;
fb83fe64
JD
1042 struct lttng_consumer_channel *channel;
1043 uint64_t id = msg.u.discarded_events.session_id;
1044 uint64_t key = msg.u.discarded_events.channel_key;
1045
e5742757
MD
1046 DBG("Kernel consumer discarded events command for session id %"
1047 PRIu64 ", channel key %" PRIu64, id, key);
1048
fb83fe64
JD
1049 channel = consumer_find_channel(key);
1050 if (!channel) {
1051 ERR("Kernel consumer discarded events channel %"
1052 PRIu64 " not found", key);
66ab32be 1053 count = 0;
e5742757 1054 } else {
66ab32be 1055 count = channel->discarded_events;
fb83fe64
JD
1056 }
1057
fb83fe64
JD
1058 health_code_update();
1059
1060 /* Send back returned value to session daemon */
66ab32be 1061 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
fb83fe64
JD
1062 if (ret < 0) {
1063 PERROR("send discarded events");
1064 goto error_fatal;
1065 }
1066
1067 break;
1068 }
1069 case LTTNG_CONSUMER_LOST_PACKETS:
1070 {
66ab32be
JD
1071 ssize_t ret;
1072 uint64_t count;
fb83fe64
JD
1073 struct lttng_consumer_channel *channel;
1074 uint64_t id = msg.u.lost_packets.session_id;
1075 uint64_t key = msg.u.lost_packets.channel_key;
1076
e5742757
MD
1077 DBG("Kernel consumer lost packets command for session id %"
1078 PRIu64 ", channel key %" PRIu64, id, key);
1079
fb83fe64
JD
1080 channel = consumer_find_channel(key);
1081 if (!channel) {
1082 ERR("Kernel consumer lost packets channel %"
1083 PRIu64 " not found", key);
66ab32be 1084 count = 0;
e5742757 1085 } else {
66ab32be 1086 count = channel->lost_packets;
fb83fe64
JD
1087 }
1088
fb83fe64
JD
1089 health_code_update();
1090
1091 /* Send back returned value to session daemon */
66ab32be 1092 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
fb83fe64
JD
1093 if (ret < 0) {
1094 PERROR("send lost packets");
1095 goto error_fatal;
1096 }
1097
1098 break;
1099 }
b3530820
JG
1100 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1101 {
1102 int channel_monitor_pipe;
0c5b3718
SM
1103 int ret_send_status, ret_set_channel_monitor_pipe;
1104 ssize_t ret_recv;
b3530820
JG
1105
1106 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1107 /* Successfully received the command's type. */
0c5b3718
SM
1108 ret_send_status = consumer_send_status_msg(sock, ret_code);
1109 if (ret_send_status < 0) {
b3530820
JG
1110 goto error_fatal;
1111 }
1112
0c5b3718
SM
1113 ret_recv = lttcomm_recv_fds_unix_sock(
1114 sock, &channel_monitor_pipe, 1);
1115 if (ret_recv != sizeof(channel_monitor_pipe)) {
b3530820
JG
1116 ERR("Failed to receive channel monitor pipe");
1117 goto error_fatal;
1118 }
1119
1120 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
0c5b3718
SM
1121 ret_set_channel_monitor_pipe =
1122 consumer_timer_thread_set_channel_monitor_pipe(
1123 channel_monitor_pipe);
1124 if (!ret_set_channel_monitor_pipe) {
b3530820 1125 int flags;
0c5b3718 1126 int ret_fcntl;
b3530820
JG
1127
1128 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1129 /* Set the pipe as non-blocking. */
0c5b3718
SM
1130 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
1131 if (ret_fcntl == -1) {
b3530820
JG
1132 PERROR("fcntl get flags of the channel monitoring pipe");
1133 goto error_fatal;
1134 }
0c5b3718 1135 flags = ret_fcntl;
b3530820 1136
0c5b3718 1137 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
b3530820 1138 flags | O_NONBLOCK);
0c5b3718 1139 if (ret_fcntl == -1) {
b3530820
JG
1140 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1141 goto error_fatal;
1142 }
1143 DBG("Channel monitor pipe set as non-blocking");
1144 } else {
1145 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1146 }
0c5b3718
SM
1147 ret_send_status = consumer_send_status_msg(sock, ret_code);
1148 if (ret_send_status < 0) {
b3530820
JG
1149 goto error_fatal;
1150 }
1151 break;
1152 }
b99a8d42
JD
1153 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1154 {
92b7a7f8
MD
1155 struct lttng_consumer_channel *channel;
1156 uint64_t key = msg.u.rotate_channel.key;
0c5b3718 1157 int ret_send_status;
b99a8d42 1158
92b7a7f8 1159 DBG("Consumer rotate channel %" PRIu64, key);
b99a8d42 1160
92b7a7f8
MD
1161 channel = consumer_find_channel(key);
1162 if (!channel) {
1163 ERR("Channel %" PRIu64 " not found", key);
1164 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1165 } else {
1166 /*
1167 * Sample the rotate position of all the streams in this channel.
1168 */
0c5b3718
SM
1169 int ret_rotate_channel;
1170
1171 ret_rotate_channel = lttng_consumer_rotate_channel(
1172 channel, key,
f46376a1 1173 msg.u.rotate_channel.relayd_id);
0c5b3718 1174 if (ret_rotate_channel < 0) {
92b7a7f8
MD
1175 ERR("Rotate channel failed");
1176 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1177 }
b99a8d42 1178
92b7a7f8
MD
1179 health_code_update();
1180 }
0c5b3718
SM
1181
1182 ret_send_status = consumer_send_status_msg(sock, ret_code);
1183 if (ret_send_status < 0) {
b99a8d42 1184 /* Somehow, the session daemon is not responding anymore. */
713bdd26 1185 goto error_rotate_channel;
b99a8d42 1186 }
92b7a7f8
MD
1187 if (channel) {
1188 /* Rotate the streams that are ready right now. */
0c5b3718
SM
1189 int ret_rotate;
1190
1191 ret_rotate = lttng_consumer_rotate_ready_streams(
f46376a1 1192 channel, key);
0c5b3718 1193 if (ret_rotate < 0) {
92b7a7f8
MD
1194 ERR("Rotate ready streams failed");
1195 }
b99a8d42 1196 }
b99a8d42 1197 break;
713bdd26
JG
1198error_rotate_channel:
1199 goto end_nosignal;
b99a8d42 1200 }
5f3aff8b
MD
1201 case LTTNG_CONSUMER_CLEAR_CHANNEL:
1202 {
1203 struct lttng_consumer_channel *channel;
1204 uint64_t key = msg.u.clear_channel.key;
0c5b3718 1205 int ret_send_status;
5f3aff8b
MD
1206
1207 channel = consumer_find_channel(key);
1208 if (!channel) {
1209 DBG("Channel %" PRIu64 " not found", key);
1210 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1211 } else {
0c5b3718
SM
1212 int ret_clear_channel;
1213
1214 ret_clear_channel =
1215 lttng_consumer_clear_channel(channel);
1216 if (ret_clear_channel) {
5f3aff8b 1217 ERR("Clear channel failed");
97535efa 1218 ret_code = (lttcomm_return_code) ret_clear_channel;
5f3aff8b
MD
1219 }
1220
1221 health_code_update();
1222 }
0c5b3718
SM
1223
1224 ret_send_status = consumer_send_status_msg(sock, ret_code);
1225 if (ret_send_status < 0) {
5f3aff8b
MD
1226 /* Somehow, the session daemon is not responding anymore. */
1227 goto end_nosignal;
1228 }
1229
1230 break;
1231 }
d2956687 1232 case LTTNG_CONSUMER_INIT:
00fb02ac 1233 {
0c5b3718 1234 int ret_send_status;
328c2fe7
JG
1235 lttng_uuid sessiond_uuid;
1236
1237 std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid),
1238 sessiond_uuid.begin());
0c5b3718 1239
d2956687 1240 ret_code = lttng_consumer_init_command(ctx,
328c2fe7 1241 sessiond_uuid);
00fb02ac 1242 health_code_update();
0c5b3718
SM
1243 ret_send_status = consumer_send_status_msg(sock, ret_code);
1244 if (ret_send_status < 0) {
00fb02ac
JD
1245 /* Somehow, the session daemon is not responding anymore. */
1246 goto end_nosignal;
1247 }
1248 break;
1249 }
d2956687 1250 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
d88744a4 1251 {
d2956687 1252 const struct lttng_credentials credentials = {
ff588497
JR
1253 .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
1254 .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
d2956687
JG
1255 };
1256 const bool is_local_trace =
1257 !msg.u.create_trace_chunk.relayd_id.is_set;
1258 const uint64_t relayd_id =
1259 msg.u.create_trace_chunk.relayd_id.value;
1260 const char *chunk_override_name =
1261 *msg.u.create_trace_chunk.override_name ?
1262 msg.u.create_trace_chunk.override_name :
1263 NULL;
cbf53d23 1264 struct lttng_directory_handle *chunk_directory_handle = NULL;
d88744a4 1265
d2956687
JG
1266 /*
1267 * The session daemon will only provide a chunk directory file
1268 * descriptor for local traces.
1269 */
1270 if (is_local_trace) {
1271 int chunk_dirfd;
0c5b3718
SM
1272 int ret_send_status;
1273 ssize_t ret_recv;
19990ed5 1274
d2956687 1275 /* Acnowledge the reception of the command. */
0c5b3718
SM
1276 ret_send_status = consumer_send_status_msg(
1277 sock, LTTCOMM_CONSUMERD_SUCCESS);
1278 if (ret_send_status < 0) {
d2956687
JG
1279 /* Somehow, the session daemon is not responding anymore. */
1280 goto end_nosignal;
1281 }
92816cc3 1282
0c5b3718
SM
1283 ret_recv = lttcomm_recv_fds_unix_sock(
1284 sock, &chunk_dirfd, 1);
1285 if (ret_recv != sizeof(chunk_dirfd)) {
d2956687
JG
1286 ERR("Failed to receive trace chunk directory file descriptor");
1287 goto error_fatal;
1288 }
92816cc3 1289
d2956687
JG
1290 DBG("Received trace chunk directory fd (%d)",
1291 chunk_dirfd);
cbf53d23 1292 chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
d2956687 1293 chunk_dirfd);
cbf53d23 1294 if (!chunk_directory_handle) {
d2956687
JG
1295 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1296 if (close(chunk_dirfd)) {
1297 PERROR("Failed to close chunk directory file descriptor");
1298 }
1299 goto error_fatal;
1300 }
92816cc3
JG
1301 }
1302
d2956687
JG
1303 ret_code = lttng_consumer_create_trace_chunk(
1304 !is_local_trace ? &relayd_id : NULL,
1305 msg.u.create_trace_chunk.session_id,
1306 msg.u.create_trace_chunk.chunk_id,
e5add6d0
JG
1307 (time_t) msg.u.create_trace_chunk
1308 .creation_timestamp,
d2956687 1309 chunk_override_name,
e5add6d0
JG
1310 msg.u.create_trace_chunk.credentials.is_set ?
1311 &credentials :
1312 NULL,
cbf53d23
JG
1313 chunk_directory_handle);
1314 lttng_directory_handle_put(chunk_directory_handle);
d2956687 1315 goto end_msg_sessiond;
d88744a4 1316 }
d2956687 1317 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
a1ae2ea5 1318 {
bbc4768c 1319 enum lttng_trace_chunk_command_type close_command =
97535efa 1320 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
d2956687
JG
1321 const uint64_t relayd_id =
1322 msg.u.close_trace_chunk.relayd_id.value;
ecd1a12f
MD
1323 struct lttcomm_consumer_close_trace_chunk_reply reply;
1324 char path[LTTNG_PATH_MAX];
0c5b3718 1325 ssize_t ret_send;
d2956687
JG
1326
1327 ret_code = lttng_consumer_close_trace_chunk(
1328 msg.u.close_trace_chunk.relayd_id.is_set ?
bbc4768c
JG
1329 &relayd_id :
1330 NULL,
d2956687
JG
1331 msg.u.close_trace_chunk.session_id,
1332 msg.u.close_trace_chunk.chunk_id,
bbc4768c
JG
1333 (time_t) msg.u.close_trace_chunk.close_timestamp,
1334 msg.u.close_trace_chunk.close_command.is_set ?
1335 &close_command :
ecd1a12f
MD
1336 NULL, path);
1337 reply.ret_code = ret_code;
1338 reply.path_length = strlen(path) + 1;
0c5b3718
SM
1339 ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1340 if (ret_send != sizeof(reply)) {
ecd1a12f
MD
1341 goto error_fatal;
1342 }
0c5b3718
SM
1343 ret_send = lttcomm_send_unix_sock(
1344 sock, path, reply.path_length);
1345 if (ret_send != reply.path_length) {
ecd1a12f
MD
1346 goto error_fatal;
1347 }
1348 goto end_nosignal;
3654ed19 1349 }
d2956687 1350 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
3654ed19 1351 {
d2956687
JG
1352 const uint64_t relayd_id =
1353 msg.u.trace_chunk_exists.relayd_id.value;
1354
1355 ret_code = lttng_consumer_trace_chunk_exists(
1356 msg.u.trace_chunk_exists.relayd_id.is_set ?
1357 &relayd_id : NULL,
1358 msg.u.trace_chunk_exists.session_id,
1359 msg.u.trace_chunk_exists.chunk_id);
1360 goto end_msg_sessiond;
a1ae2ea5 1361 }
04ed9e10
JG
1362 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
1363 {
1364 const uint64_t key = msg.u.open_channel_packets.key;
1365 struct lttng_consumer_channel *channel =
1366 consumer_find_channel(key);
1367
1368 if (channel) {
1369 pthread_mutex_lock(&channel->lock);
1370 ret_code = lttng_consumer_open_channel_packets(channel);
1371 pthread_mutex_unlock(&channel->lock);
1372 } else {
1373 WARN("Channel %" PRIu64 " not found", key);
1374 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1375 }
1376
1377 health_code_update();
1378 goto end_msg_sessiond;
1379 }
3bd1e081 1380 default:
3f8e211f 1381 goto end_nosignal;
3bd1e081 1382 }
3f8e211f 1383
3bd1e081 1384end_nosignal:
4cbc1a04
DG
1385 /*
1386 * Return 1 to indicate success since the 0 value can be a socket
1387 * shutdown during the recv() or send() call.
1388 */
0c5b3718 1389 ret_func = 1;
c5c7998f
JG
1390 goto end;
1391error_fatal:
1392 /* This will issue a consumer stop. */
0c5b3718 1393 ret_func = -1;
c5c7998f 1394 goto end;
d2956687
JG
1395end_msg_sessiond:
1396 /*
1397 * The returned value here is not useful since either way we'll return 1 to
1398 * the caller because the session daemon socket management is done
1399 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1400 */
0c5b3718
SM
1401 {
1402 int ret_send_status;
1403
1404 ret_send_status = consumer_send_status_msg(sock, ret_code);
1405 if (ret_send_status < 0) {
1406 goto error_fatal;
1407 }
d2956687 1408 }
0c5b3718
SM
1409
1410 ret_func = 1;
1411
c5c7998f 1412end:
d2956687 1413 health_code_update();
1803a064 1414 rcu_read_unlock();
0c5b3718 1415 return ret_func;
3bd1e081 1416}
d41f73b7 1417
94d49140
JD
1418/*
1419 * Sync metadata meaning request them to the session daemon and snapshot to the
1420 * metadata thread can consumer them.
1421 *
1422 * Metadata stream lock MUST be acquired.
94d49140 1423 */
577eea73
JG
1424enum sync_metadata_status lttng_kconsumer_sync_metadata(
1425 struct lttng_consumer_stream *metadata)
94d49140
JD
1426{
1427 int ret;
577eea73 1428 enum sync_metadata_status status;
94d49140 1429
a0377dfe 1430 LTTNG_ASSERT(metadata);
94d49140
JD
1431
1432 ret = kernctl_buffer_flush(metadata->wait_fd);
1433 if (ret < 0) {
1434 ERR("Failed to flush kernel stream");
577eea73 1435 status = SYNC_METADATA_STATUS_ERROR;
94d49140
JD
1436 goto end;
1437 }
1438
1439 ret = kernctl_snapshot(metadata->wait_fd);
1440 if (ret < 0) {
577eea73
JG
1441 if (errno == EAGAIN) {
1442 /* No new metadata, exit. */
1443 DBG("Sync metadata, no new kernel metadata");
1444 status = SYNC_METADATA_STATUS_NO_DATA;
1445 } else {
94d49140 1446 ERR("Sync metadata, taking kernel snapshot failed.");
577eea73 1447 status = SYNC_METADATA_STATUS_ERROR;
94d49140 1448 }
577eea73
JG
1449 } else {
1450 status = SYNC_METADATA_STATUS_NEW_DATA;
94d49140
JD
1451 }
1452
1453end:
577eea73 1454 return status;
94d49140 1455}
309167d2 1456
fb83fe64 1457static
6f9449c2
JG
1458int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
1459 struct stream_subbuffer *subbuf)
fb83fe64
JD
1460{
1461 int ret;
fb83fe64 1462
6f9449c2
JG
1463 ret = kernctl_get_subbuf_size(
1464 stream->wait_fd, &subbuf->info.data.subbuf_size);
1465 if (ret) {
fb83fe64
JD
1466 goto end;
1467 }
fb83fe64 1468
6f9449c2
JG
1469 ret = kernctl_get_padded_subbuf_size(
1470 stream->wait_fd, &subbuf->info.data.padded_subbuf_size);
1471 if (ret) {
fb83fe64
JD
1472 goto end;
1473 }
fb83fe64
JD
1474
1475end:
1476 return ret;
1477}
1478
93ec662e 1479static
6f9449c2
JG
1480int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
1481 struct stream_subbuffer *subbuf)
93ec662e
JD
1482{
1483 int ret;
93ec662e 1484
6f9449c2
JG
1485 ret = extract_common_subbuffer_info(stream, subbuf);
1486 if (ret) {
93ec662e
JD
1487 goto end;
1488 }
1489
6f9449c2
JG
1490 ret = kernctl_get_metadata_version(
1491 stream->wait_fd, &subbuf->info.metadata.version);
1492 if (ret) {
93ec662e
JD
1493 goto end;
1494 }
1495
93ec662e
JD
1496end:
1497 return ret;
1498}
1499
6f9449c2
JG
1500static
1501int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
1502 struct stream_subbuffer *subbuf)
d41f73b7 1503{
6f9449c2 1504 int ret;
d41f73b7 1505
6f9449c2
JG
1506 ret = extract_common_subbuffer_info(stream, subbuf);
1507 if (ret) {
1508 goto end;
1509 }
309167d2 1510
6f9449c2
JG
1511 ret = kernctl_get_packet_size(
1512 stream->wait_fd, &subbuf->info.data.packet_size);
1513 if (ret < 0) {
1514 PERROR("Failed to get sub-buffer packet size");
1515 goto end;
1516 }
02d02e31 1517
6f9449c2
JG
1518 ret = kernctl_get_content_size(
1519 stream->wait_fd, &subbuf->info.data.content_size);
1520 if (ret < 0) {
1521 PERROR("Failed to get sub-buffer content size");
1522 goto end;
d41f73b7
MD
1523 }
1524
6f9449c2
JG
1525 ret = kernctl_get_timestamp_begin(
1526 stream->wait_fd, &subbuf->info.data.timestamp_begin);
1527 if (ret < 0) {
1528 PERROR("Failed to get sub-buffer begin timestamp");
1529 goto end;
1d4dfdef
DG
1530 }
1531
6f9449c2
JG
1532 ret = kernctl_get_timestamp_end(
1533 stream->wait_fd, &subbuf->info.data.timestamp_end);
1534 if (ret < 0) {
1535 PERROR("Failed to get sub-buffer end timestamp");
1536 goto end;
1537 }
1538
1539 ret = kernctl_get_events_discarded(
1540 stream->wait_fd, &subbuf->info.data.events_discarded);
1541 if (ret) {
1542 PERROR("Failed to get sub-buffer events discarded count");
1543 goto end;
1544 }
1545
1546 ret = kernctl_get_sequence_number(stream->wait_fd,
1547 &subbuf->info.data.sequence_number.value);
1548 if (ret) {
1549 /* May not be supported by older LTTng-modules. */
1550 if (ret != -ENOTTY) {
1551 PERROR("Failed to get sub-buffer sequence number");
1552 goto end;
fb83fe64 1553 }
1c20f0e2 1554 } else {
6f9449c2 1555 subbuf->info.data.sequence_number.is_set = true;
309167d2
JD
1556 }
1557
6f9449c2
JG
1558 ret = kernctl_get_stream_id(
1559 stream->wait_fd, &subbuf->info.data.stream_id);
1560 if (ret < 0) {
1561 PERROR("Failed to get stream id");
1562 goto end;
1563 }
1d4dfdef 1564
6f9449c2
JG
1565 ret = kernctl_get_instance_id(stream->wait_fd,
1566 &subbuf->info.data.stream_instance_id.value);
1567 if (ret) {
1568 /* May not be supported by older LTTng-modules. */
1569 if (ret != -ENOTTY) {
1570 PERROR("Failed to get stream instance id");
1571 goto end;
1d4dfdef 1572 }
6f9449c2
JG
1573 } else {
1574 subbuf->info.data.stream_instance_id.is_set = true;
1575 }
1576end:
1577 return ret;
1578}
47e81c02 1579
6f9449c2 1580static
b6797c8e
JG
1581enum get_next_subbuffer_status get_subbuffer_common(
1582 struct lttng_consumer_stream *stream,
6f9449c2
JG
1583 struct stream_subbuffer *subbuffer)
1584{
1585 int ret;
b6797c8e 1586 enum get_next_subbuffer_status status;
6f9449c2
JG
1587
1588 ret = kernctl_get_next_subbuf(stream->wait_fd);
b6797c8e
JG
1589 switch (ret) {
1590 case 0:
1591 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1592 break;
1593 case -ENODATA:
1594 case -EAGAIN:
6e5e3c51
MD
1595 /*
1596 * The caller only expects -ENODATA when there is no data to
1597 * read, but the kernel tracer returns -EAGAIN when there is
1598 * currently no data for a non-finalized stream, and -ENODATA
1599 * when there is no data for a finalized stream. Those can be
1600 * combined into a -ENODATA return value.
1601 */
b6797c8e
JG
1602 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1603 goto end;
1604 default:
1605 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
6f9449c2
JG
1606 goto end;
1607 }
1608
1609 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
b6797c8e
JG
1610 stream, subbuffer);
1611 if (ret) {
1612 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1613 }
6f9449c2 1614end:
b6797c8e 1615 return status;
6f9449c2 1616}
128708c3 1617
6f9449c2 1618static
b6797c8e
JG
1619enum get_next_subbuffer_status get_next_subbuffer_splice(
1620 struct lttng_consumer_stream *stream,
6f9449c2
JG
1621 struct stream_subbuffer *subbuffer)
1622{
b6797c8e
JG
1623 const enum get_next_subbuffer_status status =
1624 get_subbuffer_common(stream, subbuffer);
1d4dfdef 1625
b6797c8e 1626 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
6f9449c2
JG
1627 goto end;
1628 }
1d4dfdef 1629
6f9449c2
JG
1630 subbuffer->buffer.fd = stream->wait_fd;
1631end:
b6797c8e 1632 return status;
6f9449c2 1633}
fd424d99 1634
6f9449c2 1635static
b6797c8e
JG
1636enum get_next_subbuffer_status get_next_subbuffer_mmap(
1637 struct lttng_consumer_stream *stream,
6f9449c2
JG
1638 struct stream_subbuffer *subbuffer)
1639{
1640 int ret;
b6797c8e 1641 enum get_next_subbuffer_status status;
6f9449c2
JG
1642 const char *addr;
1643
b6797c8e
JG
1644 status = get_subbuffer_common(stream, subbuffer);
1645 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
6f9449c2 1646 goto end;
128708c3 1647 }
6f9449c2
JG
1648
1649 ret = get_current_subbuf_addr(stream, &addr);
1650 if (ret) {
b6797c8e 1651 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
6f9449c2 1652 goto end;
d41f73b7 1653 }
6f9449c2
JG
1654
1655 subbuffer->buffer.buffer = lttng_buffer_view_init(
1656 addr, 0, subbuffer->info.data.padded_subbuf_size);
1657end:
b6797c8e 1658 return status;
6f9449c2
JG
1659}
1660
f5ba75b4 1661static
b6797c8e 1662enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
f5ba75b4
JG
1663 struct stream_subbuffer *subbuffer)
1664{
1665 int ret;
1666 const char *addr;
1667 bool coherent;
b6797c8e 1668 enum get_next_subbuffer_status status;
f5ba75b4
JG
1669
1670 ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
1671 &coherent);
1672 if (ret) {
1673 goto end;
1674 }
1675
1676 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1677 stream, subbuffer);
1678 if (ret) {
1679 goto end;
1680 }
1681
1682 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
1683
1684 ret = get_current_subbuf_addr(stream, &addr);
1685 if (ret) {
1686 goto end;
1687 }
1688
1689 subbuffer->buffer.buffer = lttng_buffer_view_init(
1690 addr, 0, subbuffer->info.data.padded_subbuf_size);
1691 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
1692 subbuffer->info.metadata.padded_subbuf_size,
1693 coherent ? "true" : "false");
1694end:
6e5e3c51
MD
1695 /*
1696 * The caller only expects -ENODATA when there is no data to read, but
1697 * the kernel tracer returns -EAGAIN when there is currently no data
1698 * for a non-finalized stream, and -ENODATA when there is no data for a
1699 * finalized stream. Those can be combined into a -ENODATA return value.
1700 */
b6797c8e
JG
1701 switch (ret) {
1702 case 0:
1703 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1704 break;
1705 case -ENODATA:
1706 case -EAGAIN:
1707 /*
1708 * The caller only expects -ENODATA when there is no data to
1709 * read, but the kernel tracer returns -EAGAIN when there is
1710 * currently no data for a non-finalized stream, and -ENODATA
1711 * when there is no data for a finalized stream. Those can be
1712 * combined into a -ENODATA return value.
1713 */
1714 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1715 break;
1716 default:
1717 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1718 break;
6e5e3c51
MD
1719 }
1720
b6797c8e 1721 return status;
f5ba75b4
JG
1722}
1723
6f9449c2
JG
1724static
1725int put_next_subbuffer(struct lttng_consumer_stream *stream,
f46376a1 1726 struct stream_subbuffer *subbuffer __attribute__((unused)))
6f9449c2
JG
1727{
1728 const int ret = kernctl_put_next_subbuf(stream->wait_fd);
1729
1730 if (ret) {
1731 if (ret == -EFAULT) {
1732 PERROR("Error in unreserving sub buffer");
1733 } else if (ret == -EIO) {
d41f73b7 1734 /* Should never happen with newer LTTng versions */
6f9449c2 1735 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
d41f73b7 1736 }
d41f73b7
MD
1737 }
1738
6f9449c2
JG
1739 return ret;
1740}
1c20f0e2 1741
f5ba75b4
JG
1742static
1743bool is_get_next_check_metadata_available(int tracer_fd)
1744{
741e787b
JG
1745 const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL);
1746 const bool available = ret != -ENOTTY;
1747
1748 if (ret == 0) {
1749 /* get succeeded, make sure to put the subbuffer. */
1750 kernctl_put_subbuf(tracer_fd);
1751 }
1752
1753 return available;
f5ba75b4
JG
1754}
1755
091441eb
MD
1756static
1757int signal_metadata(struct lttng_consumer_stream *stream,
f46376a1 1758 struct lttng_consumer_local_data *ctx __attribute__((unused)))
091441eb
MD
1759{
1760 ASSERT_LOCKED(stream->metadata_rdv_lock);
1761 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
1762}
1763
f5ba75b4
JG
1764static
1765int lttng_kconsumer_set_stream_ops(
6f9449c2
JG
1766 struct lttng_consumer_stream *stream)
1767{
f5ba75b4
JG
1768 int ret = 0;
1769
1770 if (stream->metadata_flag && stream->chan->is_live) {
1771 DBG("Attempting to enable metadata bucketization for live consumers");
1772 if (is_get_next_check_metadata_available(stream->wait_fd)) {
1773 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1774 stream->read_subbuffer_ops.get_next_subbuffer =
1775 get_next_subbuffer_metadata_check;
1776 ret = consumer_stream_enable_metadata_bucketization(
1777 stream);
1778 if (ret) {
1779 goto end;
1780 }
1781 } else {
1782 /*
1783 * The kernel tracer version is too old to indicate
1784 * when the metadata stream has reached a "coherent"
1785 * (parseable) point.
1786 *
1787 * This means that a live viewer may see an incoherent
1788 * sequence of metadata and fail to parse it.
1789 */
1790 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1791 metadata_bucket_destroy(stream->metadata_bucket);
1792 stream->metadata_bucket = NULL;
1793 }
091441eb
MD
1794
1795 stream->read_subbuffer_ops.on_sleep = signal_metadata;
f5ba75b4
JG
1796 }
1797
1798 if (!stream->read_subbuffer_ops.get_next_subbuffer) {
1799 if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
1800 stream->read_subbuffer_ops.get_next_subbuffer =
1801 get_next_subbuffer_mmap;
1802 } else {
1803 stream->read_subbuffer_ops.get_next_subbuffer =
1804 get_next_subbuffer_splice;
1805 }
94d49140
JD
1806 }
1807
6f9449c2
JG
1808 if (stream->metadata_flag) {
1809 stream->read_subbuffer_ops.extract_subbuffer_info =
1810 extract_metadata_subbuffer_info;
1811 } else {
1812 stream->read_subbuffer_ops.extract_subbuffer_info =
1813 extract_data_subbuffer_info;
1814 if (stream->chan->is_live) {
1815 stream->read_subbuffer_ops.send_live_beacon =
1816 consumer_flush_kernel_index;
1817 }
309167d2
JD
1818 }
1819
6f9449c2 1820 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
f5ba75b4
JG
1821end:
1822 return ret;
d41f73b7
MD
1823}
1824
1825int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1826{
1827 int ret;
ffe60014 1828
a0377dfe 1829 LTTNG_ASSERT(stream);
ffe60014 1830
2bba9e53 1831 /*
d2956687
JG
1832 * Don't create anything if this is set for streaming or if there is
1833 * no current trace chunk on the parent channel.
2bba9e53 1834 */
d2956687
JG
1835 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
1836 stream->chan->trace_chunk) {
1837 ret = consumer_stream_create_output_files(stream, true);
1838 if (ret) {
fe4477ee
JD
1839 goto error;
1840 }
ffe60014 1841 }
d41f73b7 1842
d41f73b7
MD
1843 if (stream->output == LTTNG_EVENT_MMAP) {
1844 /* get the len of the mmap region */
1845 unsigned long mmap_len;
1846
1847 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1848 if (ret != 0) {
ffe60014 1849 PERROR("kernctl_get_mmap_len");
d41f73b7
MD
1850 goto error_close_fd;
1851 }
1852 stream->mmap_len = (size_t) mmap_len;
1853
ffe60014
DG
1854 stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
1855 MAP_PRIVATE, stream->wait_fd, 0);
d41f73b7 1856 if (stream->mmap_base == MAP_FAILED) {
ffe60014 1857 PERROR("Error mmaping");
d41f73b7
MD
1858 ret = -1;
1859 goto error_close_fd;
1860 }
1861 }
1862
f5ba75b4
JG
1863 ret = lttng_kconsumer_set_stream_ops(stream);
1864 if (ret) {
1865 goto error_close_fd;
1866 }
6f9449c2 1867
d41f73b7
MD
1868 /* we return 0 to let the library handle the FD internally */
1869 return 0;
1870
1871error_close_fd:
2f225ce2 1872 if (stream->out_fd >= 0) {
d41f73b7
MD
1873 int err;
1874
1875 err = close(stream->out_fd);
a0377dfe 1876 LTTNG_ASSERT(!err);
2f225ce2 1877 stream->out_fd = -1;
d41f73b7
MD
1878 }
1879error:
1880 return ret;
1881}
1882
ca22feea
DG
1883/*
1884 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
1885 * stream. Consumer data lock MUST be acquired before calling this function
1886 * and the stream lock.
ca22feea 1887 *
6d805429 1888 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
1889 * data is available for trace viewer reading.
1890 */
6d805429 1891int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
1892{
1893 int ret;
1894
a0377dfe 1895 LTTNG_ASSERT(stream);
ca22feea 1896
873b9e9a
MD
1897 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1898 ret = 0;
1899 goto end;
1900 }
1901
ca22feea
DG
1902 ret = kernctl_get_next_subbuf(stream->wait_fd);
1903 if (ret == 0) {
1904 /* There is still data so let's put back this subbuffer. */
1905 ret = kernctl_put_subbuf(stream->wait_fd);
a0377dfe 1906 LTTNG_ASSERT(ret == 0);
6d805429 1907 ret = 1; /* Data is pending */
4e9a4686 1908 goto end;
ca22feea
DG
1909 }
1910
6d805429
DG
1911 /* Data is NOT pending and ready to be read. */
1912 ret = 0;
ca22feea 1913
6efae65e
DG
1914end:
1915 return ret;
ca22feea 1916}
This page took 0.196685 seconds and 4 git commands to generate.