Rename C++ header files to .hpp
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
CommitLineData
3bd1e081 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3bd1e081 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
3bd1e081 7 *
3bd1e081
MD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
3bd1e081
MD
11#include <poll.h>
12#include <pthread.h>
13#include <stdlib.h>
14#include <string.h>
15#include <sys/mman.h>
16#include <sys/socket.h>
17#include <sys/types.h>
77c7c900 18#include <inttypes.h>
3bd1e081 19#include <unistd.h>
dbb5dfe6 20#include <sys/stat.h>
f5ba75b4 21#include <stdint.h>
3bd1e081 22
c9e313bc
SM
23#include <bin/lttng-consumerd/health-consumerd.hpp>
24#include <common/common.hpp>
25#include <common/kernel-ctl/kernel-ctl.hpp>
26#include <common/sessiond-comm/sessiond-comm.hpp>
27#include <common/sessiond-comm/relayd.hpp>
28#include <common/compat/fcntl.hpp>
29#include <common/compat/endian.hpp>
30#include <common/pipe.hpp>
31#include <common/relayd/relayd.hpp>
32#include <common/utils.hpp>
33#include <common/consumer/consumer-stream.hpp>
34#include <common/index/index.hpp>
35#include <common/consumer/consumer-timer.hpp>
36#include <common/optional.hpp>
37#include <common/buffer-view.hpp>
38#include <common/consumer/consumer.hpp>
39#include <common/consumer/metadata-bucket.hpp>
0857097f 40
c9e313bc 41#include "kernel-consumer.hpp"
3bd1e081 42
fa29bfbf 43extern struct lttng_consumer_global_data the_consumer_data;
3bd1e081 44extern int consumer_poll_timeout;
3bd1e081 45
3bd1e081
MD
46/*
47 * Take a snapshot for a specific fd
48 *
49 * Returns 0 on success, < 0 on error
50 */
ffe60014 51int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081
MD
52{
53 int ret = 0;
54 int infd = stream->wait_fd;
55
56 ret = kernctl_snapshot(infd);
d2d2f190
JD
57 /*
58 * -EAGAIN is not an error, it just means that there is no data to
59 * be read.
60 */
61 if (ret != 0 && ret != -EAGAIN) {
5a510c9f 62 PERROR("Getting sub-buffer snapshot.");
3bd1e081
MD
63 }
64
65 return ret;
66}
67
e9404c27
JG
68/*
69 * Sample consumed and produced positions for a specific fd.
70 *
71 * Returns 0 on success, < 0 on error.
72 */
73int lttng_kconsumer_sample_snapshot_positions(
74 struct lttng_consumer_stream *stream)
75{
a0377dfe 76 LTTNG_ASSERT(stream);
e9404c27
JG
77
78 return kernctl_snapshot_sample_positions(stream->wait_fd);
79}
80
3bd1e081
MD
81/*
82 * Get the produced position
83 *
84 * Returns 0 on success, < 0 on error
85 */
ffe60014 86int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
3bd1e081
MD
87 unsigned long *pos)
88{
89 int ret;
90 int infd = stream->wait_fd;
91
92 ret = kernctl_snapshot_get_produced(infd, pos);
93 if (ret != 0) {
5a510c9f 94 PERROR("kernctl_snapshot_get_produced");
3bd1e081
MD
95 }
96
97 return ret;
98}
99
07b86b52
JD
100/*
101 * Get the consumerd position
102 *
103 * Returns 0 on success, < 0 on error
104 */
105int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
106 unsigned long *pos)
107{
108 int ret;
109 int infd = stream->wait_fd;
110
111 ret = kernctl_snapshot_get_consumed(infd, pos);
112 if (ret != 0) {
5a510c9f 113 PERROR("kernctl_snapshot_get_consumed");
07b86b52
JD
114 }
115
116 return ret;
117}
118
128708c3
JG
119static
120int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
121 const char **addr)
122{
123 int ret;
124 unsigned long mmap_offset;
97535efa 125 const char *mmap_base = (const char *) stream->mmap_base;
128708c3
JG
126
127 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
128 if (ret < 0) {
129 PERROR("Failed to get mmap read offset");
130 goto error;
131 }
132
133 *addr = mmap_base + mmap_offset;
134error:
135 return ret;
136}
137
07b86b52
JD
138/*
139 * Take a snapshot of all the stream of a channel
3eb928aa 140 * RCU read-side lock must be held across this function to ensure existence of
947bd097 141 * channel.
07b86b52
JD
142 *
143 * Returns 0 on success, < 0 on error
144 */
f72bb42f
JG
145static int lttng_kconsumer_snapshot_channel(
146 struct lttng_consumer_channel *channel,
147 uint64_t key, char *path, uint64_t relayd_id,
f46376a1 148 uint64_t nb_packets_per_stream)
07b86b52
JD
149{
150 int ret;
07b86b52
JD
151 struct lttng_consumer_stream *stream;
152
6a00837f 153 DBG("Kernel consumer snapshot channel %" PRIu64, key);
07b86b52 154
947bd097
JR
155 /* Prevent channel modifications while we perform the snapshot.*/
156 pthread_mutex_lock(&channel->lock);
157
07b86b52
JD
158 rcu_read_lock();
159
07b86b52
JD
160 /* Splice is not supported yet for channel snapshot. */
161 if (channel->output != CONSUMER_CHANNEL_MMAP) {
9381314c
JG
162 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
163 channel->name);
07b86b52
JD
164 ret = -1;
165 goto end;
166 }
167
10a50311 168 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
923333cd 169 unsigned long consumed_pos, produced_pos;
9ce5646a
MD
170
171 health_code_update();
172
07b86b52
JD
173 /*
174 * Lock stream because we are about to change its state.
175 */
176 pthread_mutex_lock(&stream->lock);
177
a0377dfe 178 LTTNG_ASSERT(channel->trace_chunk);
d2956687
JG
179 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
180 /*
181 * Can't happen barring an internal error as the channel
182 * holds a reference to the trace chunk.
183 */
184 ERR("Failed to acquire reference to channel's trace chunk");
185 ret = -1;
186 goto end_unlock;
187 }
a0377dfe 188 LTTNG_ASSERT(!stream->trace_chunk);
d2956687
JG
189 stream->trace_chunk = channel->trace_chunk;
190
29decac3
DG
191 /*
192 * Assign the received relayd ID so we can use it for streaming. The streams
193 * are not visible to anyone so this is OK to change it.
194 */
07b86b52
JD
195 stream->net_seq_idx = relayd_id;
196 channel->relayd_id = relayd_id;
197 if (relayd_id != (uint64_t) -1ULL) {
10a50311 198 ret = consumer_send_relayd_stream(stream, path);
07b86b52
JD
199 if (ret < 0) {
200 ERR("sending stream to relayd");
201 goto end_unlock;
202 }
07b86b52 203 } else {
d2956687
JG
204 ret = consumer_stream_create_output_files(stream,
205 false);
07b86b52 206 if (ret < 0) {
07b86b52
JD
207 goto end_unlock;
208 }
d2956687
JG
209 DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
210 stream->key);
07b86b52
JD
211 }
212
f22dd891 213 ret = kernctl_buffer_flush_empty(stream->wait_fd);
07b86b52 214 if (ret < 0) {
f22dd891
MD
215 /*
216 * Doing a buffer flush which does not take into
217 * account empty packets. This is not perfect
218 * for stream intersection, but required as a
219 * fall-back when "flush_empty" is not
220 * implemented by lttng-modules.
221 */
222 ret = kernctl_buffer_flush(stream->wait_fd);
223 if (ret < 0) {
224 ERR("Failed to flush kernel stream");
225 goto end_unlock;
226 }
07b86b52
JD
227 goto end_unlock;
228 }
229
230 ret = lttng_kconsumer_take_snapshot(stream);
231 if (ret < 0) {
232 ERR("Taking kernel snapshot");
233 goto end_unlock;
234 }
235
236 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
237 if (ret < 0) {
238 ERR("Produced kernel snapshot position");
239 goto end_unlock;
240 }
241
242 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
243 if (ret < 0) {
244 ERR("Consumerd kernel snapshot position");
245 goto end_unlock;
246 }
247
d07ceecd
MD
248 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
249 produced_pos, nb_packets_per_stream,
250 stream->max_sb_size);
5c786ded 251
9377d830 252 while ((long) (consumed_pos - produced_pos) < 0) {
07b86b52
JD
253 ssize_t read_len;
254 unsigned long len, padded_len;
128708c3 255 const char *subbuf_addr;
fd424d99 256 struct lttng_buffer_view subbuf_view;
07b86b52 257
9ce5646a 258 health_code_update();
07b86b52
JD
259 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
260
261 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
262 if (ret < 0) {
32af2c95 263 if (ret != -EAGAIN) {
07b86b52
JD
264 PERROR("kernctl_get_subbuf snapshot");
265 goto end_unlock;
266 }
267 DBG("Kernel consumer get subbuf failed. Skipping it.");
268 consumed_pos += stream->max_sb_size;
ddc93ee4 269 stream->chan->lost_packets++;
07b86b52
JD
270 continue;
271 }
272
273 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
274 if (ret < 0) {
275 ERR("Snapshot kernctl_get_subbuf_size");
29decac3 276 goto error_put_subbuf;
07b86b52
JD
277 }
278
279 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
280 if (ret < 0) {
281 ERR("Snapshot kernctl_get_padded_subbuf_size");
29decac3 282 goto error_put_subbuf;
07b86b52
JD
283 }
284
128708c3
JG
285 ret = get_current_subbuf_addr(stream, &subbuf_addr);
286 if (ret) {
287 goto error_put_subbuf;
288 }
289
fd424d99
JG
290 subbuf_view = lttng_buffer_view_init(
291 subbuf_addr, 0, padded_len);
f5ba75b4 292 read_len = lttng_consumer_on_read_subbuffer_mmap(
fd424d99 293 stream, &subbuf_view,
6f9449c2 294 padded_len - len);
07b86b52 295 /*
29decac3
DG
296 * We write the padded len in local tracefiles but the data len
297 * when using a relay. Display the error but continue processing
298 * to try to release the subbuffer.
07b86b52
JD
299 */
300 if (relayd_id != (uint64_t) -1ULL) {
301 if (read_len != len) {
302 ERR("Error sending to the relay (ret: %zd != len: %lu)",
303 read_len, len);
304 }
305 } else {
306 if (read_len != padded_len) {
307 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
308 read_len, padded_len);
309 }
310 }
311
312 ret = kernctl_put_subbuf(stream->wait_fd);
313 if (ret < 0) {
314 ERR("Snapshot kernctl_put_subbuf");
315 goto end_unlock;
316 }
317 consumed_pos += stream->max_sb_size;
318 }
319
320 if (relayd_id == (uint64_t) -1ULL) {
fdf9986c
MD
321 if (stream->out_fd >= 0) {
322 ret = close(stream->out_fd);
323 if (ret < 0) {
324 PERROR("Kernel consumer snapshot close out_fd");
325 goto end_unlock;
326 }
327 stream->out_fd = -1;
07b86b52 328 }
07b86b52
JD
329 } else {
330 close_relayd_stream(stream);
331 stream->net_seq_idx = (uint64_t) -1ULL;
332 }
d2956687
JG
333 lttng_trace_chunk_put(stream->trace_chunk);
334 stream->trace_chunk = NULL;
07b86b52
JD
335 pthread_mutex_unlock(&stream->lock);
336 }
337
338 /* All good! */
339 ret = 0;
340 goto end;
341
29decac3
DG
342error_put_subbuf:
343 ret = kernctl_put_subbuf(stream->wait_fd);
344 if (ret < 0) {
345 ERR("Snapshot kernctl_put_subbuf error path");
346 }
07b86b52
JD
347end_unlock:
348 pthread_mutex_unlock(&stream->lock);
349end:
350 rcu_read_unlock();
947bd097 351 pthread_mutex_unlock(&channel->lock);
07b86b52
JD
352 return ret;
353}
354
355/*
356 * Read the whole metadata available for a snapshot.
3eb928aa 357 * RCU read-side lock must be held across this function to ensure existence of
947bd097 358 * metadata_channel.
07b86b52
JD
359 *
360 * Returns 0 on success, < 0 on error
361 */
d2956687
JG
362static int lttng_kconsumer_snapshot_metadata(
363 struct lttng_consumer_channel *metadata_channel,
3eb928aa
MD
364 uint64_t key, char *path, uint64_t relayd_id,
365 struct lttng_consumer_local_data *ctx)
07b86b52 366{
d771f832
DG
367 int ret, use_relayd = 0;
368 ssize_t ret_read;
07b86b52 369 struct lttng_consumer_stream *metadata_stream;
d771f832 370
a0377dfe 371 LTTNG_ASSERT(ctx);
07b86b52
JD
372
373 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
374 key, path);
375
376 rcu_read_lock();
377
07b86b52 378 metadata_stream = metadata_channel->metadata_stream;
a0377dfe 379 LTTNG_ASSERT(metadata_stream);
d2956687 380
947bd097 381 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
a0377dfe
FD
382 LTTNG_ASSERT(metadata_channel->trace_chunk);
383 LTTNG_ASSERT(metadata_stream->trace_chunk);
07b86b52 384
d771f832 385 /* Flag once that we have a valid relayd for the stream. */
e2039c7a 386 if (relayd_id != (uint64_t) -1ULL) {
d771f832
DG
387 use_relayd = 1;
388 }
389
390 if (use_relayd) {
10a50311 391 ret = consumer_send_relayd_stream(metadata_stream, path);
e2039c7a 392 if (ret < 0) {
fa27abe8 393 goto error_snapshot;
e2039c7a 394 }
e2039c7a 395 } else {
d2956687
JG
396 ret = consumer_stream_create_output_files(metadata_stream,
397 false);
e2039c7a 398 if (ret < 0) {
fa27abe8 399 goto error_snapshot;
e2039c7a 400 }
07b86b52 401 }
07b86b52 402
d771f832 403 do {
9ce5646a
MD
404 health_code_update();
405
6f9449c2 406 ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
d771f832 407 if (ret_read < 0) {
6e5e3c51
MD
408 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
409 ret_read);
410 ret = ret_read;
411 goto error_snapshot;
07b86b52 412 }
6e5e3c51 413 } while (ret_read > 0);
07b86b52 414
d771f832
DG
415 if (use_relayd) {
416 close_relayd_stream(metadata_stream);
417 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
418 } else {
fdf9986c
MD
419 if (metadata_stream->out_fd >= 0) {
420 ret = close(metadata_stream->out_fd);
421 if (ret < 0) {
422 PERROR("Kernel consumer snapshot metadata close out_fd");
423 /*
424 * Don't go on error here since the snapshot was successful at this
425 * point but somehow the close failed.
426 */
427 }
428 metadata_stream->out_fd = -1;
d2956687
JG
429 lttng_trace_chunk_put(metadata_stream->trace_chunk);
430 metadata_stream->trace_chunk = NULL;
e2039c7a 431 }
e2039c7a
JD
432 }
433
07b86b52 434 ret = 0;
fa27abe8 435error_snapshot:
947bd097 436 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
cf53a8a6
JD
437 consumer_stream_destroy(metadata_stream, NULL);
438 metadata_channel->metadata_stream = NULL;
07b86b52
JD
439 rcu_read_unlock();
440 return ret;
441}
442
1803a064
MD
443/*
444 * Receive command from session daemon and process it.
445 *
446 * Return 1 on success else a negative value or 0.
447 */
3bd1e081
MD
448int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
449 int sock, struct pollfd *consumer_sockpoll)
450{
0c5b3718 451 int ret_func;
0c759fc9 452 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3bd1e081
MD
453 struct lttcomm_consumer_msg msg;
454
9ce5646a
MD
455 health_code_update();
456
0c5b3718
SM
457 {
458 ssize_t ret_recv;
459
460 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
461 if (ret_recv != sizeof(msg)) {
462 if (ret_recv > 0) {
463 lttng_consumer_send_error(ctx,
464 LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
465 ret_recv = -1;
466 }
467 return ret_recv;
1803a064 468 }
3bd1e081 469 }
9ce5646a
MD
470
471 health_code_update();
472
84382d49 473 /* Deprecated command */
a0377dfe 474 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
3bd1e081 475
9ce5646a
MD
476 health_code_update();
477
b0b335c8
MD
478 /* relayd needs RCU read-side protection */
479 rcu_read_lock();
480
3bd1e081 481 switch (msg.cmd_type) {
00e2e675
DG
482 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
483 {
4222116f
JR
484 uint32_t major = msg.u.relayd_sock.major;
485 uint32_t minor = msg.u.relayd_sock.minor;
486 enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto)
487 msg.u.relayd_sock.relayd_socket_protocol;
488
f50f23d9 489 /* Session daemon status message are handled in the following call. */
2527bf85 490 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
4222116f
JR
491 msg.u.relayd_sock.type, ctx, sock,
492 consumer_sockpoll, msg.u.relayd_sock.session_id,
493 msg.u.relayd_sock.relayd_session_id, major,
494 minor, protocol);
00e2e675
DG
495 goto end_nosignal;
496 }
3bd1e081
MD
497 case LTTNG_CONSUMER_ADD_CHANNEL:
498 {
499 struct lttng_consumer_channel *new_channel;
afbf29db 500 int ret_send_status, ret_add_channel = 0;
d2956687 501 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
3bd1e081 502
9ce5646a
MD
503 health_code_update();
504
f50f23d9 505 /* First send a status message before receiving the fds. */
0c5b3718
SM
506 ret_send_status = consumer_send_status_msg(sock, ret_code);
507 if (ret_send_status < 0) {
f50f23d9 508 /* Somehow, the session daemon is not responding anymore. */
1803a064 509 goto error_fatal;
f50f23d9 510 }
9ce5646a
MD
511
512 health_code_update();
513
d88aee68 514 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
3bd1e081 515 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
d2956687
JG
516 msg.u.channel.session_id,
517 msg.u.channel.chunk_id.is_set ?
518 &chunk_id : NULL,
519 msg.u.channel.pathname,
520 msg.u.channel.name,
1624d5b7
JD
521 msg.u.channel.relayd_id, msg.u.channel.output,
522 msg.u.channel.tracefile_size,
1950109e 523 msg.u.channel.tracefile_count, 0,
ecc48a90 524 msg.u.channel.monitor,
d7ba1388 525 msg.u.channel.live_timer_interval,
a2814ea7 526 msg.u.channel.is_live,
3d071855 527 NULL, NULL);
3bd1e081 528 if (new_channel == NULL) {
f73fabfd 529 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
3bd1e081
MD
530 goto end_nosignal;
531 }
ffe60014 532 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
95a1109b
JD
533 switch (msg.u.channel.output) {
534 case LTTNG_EVENT_SPLICE:
535 new_channel->output = CONSUMER_CHANNEL_SPLICE;
536 break;
537 case LTTNG_EVENT_MMAP:
538 new_channel->output = CONSUMER_CHANNEL_MMAP;
539 break;
540 default:
541 ERR("Channel output unknown %d", msg.u.channel.output);
542 goto end_nosignal;
543 }
ffe60014
DG
544
545 /* Translate and save channel type. */
546 switch (msg.u.channel.type) {
547 case CONSUMER_CHANNEL_TYPE_DATA:
548 case CONSUMER_CHANNEL_TYPE_METADATA:
97535efa 549 new_channel->type = (consumer_channel_type) msg.u.channel.type;
ffe60014
DG
550 break;
551 default:
a0377dfe 552 abort();
ffe60014
DG
553 goto end_nosignal;
554 };
555
9ce5646a
MD
556 health_code_update();
557
3bd1e081 558 if (ctx->on_recv_channel != NULL) {
0c5b3718
SM
559 int ret_recv_channel =
560 ctx->on_recv_channel(new_channel);
561 if (ret_recv_channel == 0) {
562 ret_add_channel = consumer_add_channel(
563 new_channel, ctx);
564 } else if (ret_recv_channel < 0) {
3bd1e081
MD
565 goto end_nosignal;
566 }
567 } else {
0c5b3718
SM
568 ret_add_channel =
569 consumer_add_channel(new_channel, ctx);
3bd1e081 570 }
0c5b3718
SM
571 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA &&
572 !ret_add_channel) {
e9404c27
JG
573 int monitor_start_ret;
574
575 DBG("Consumer starting monitor timer");
94d49140
JD
576 consumer_timer_live_start(new_channel,
577 msg.u.channel.live_timer_interval);
e9404c27
JG
578 monitor_start_ret = consumer_timer_monitor_start(
579 new_channel,
580 msg.u.channel.monitor_timer_interval);
581 if (monitor_start_ret < 0) {
582 ERR("Starting channel monitoring timer failed");
583 goto end_nosignal;
584 }
94d49140 585 }
e43c41c5 586
9ce5646a
MD
587 health_code_update();
588
e43c41c5 589 /* If we received an error in add_channel, we need to report it. */
0c5b3718
SM
590 if (ret_add_channel < 0) {
591 ret_send_status = consumer_send_status_msg(
592 sock, ret_add_channel);
593 if (ret_send_status < 0) {
1803a064
MD
594 goto error_fatal;
595 }
e43c41c5
JD
596 goto end_nosignal;
597 }
598
3bd1e081
MD
599 goto end_nosignal;
600 }
601 case LTTNG_CONSUMER_ADD_STREAM:
602 {
dae10966
DG
603 int fd;
604 struct lttng_pipe *stream_pipe;
00e2e675 605 struct lttng_consumer_stream *new_stream;
ffe60014 606 struct lttng_consumer_channel *channel;
c80048c6 607 int alloc_ret = 0;
0c5b3718
SM
608 int ret_send_status, ret_poll, ret_get_max_subbuf_size;
609 ssize_t ret_pipe_write, ret_recv;
3bd1e081 610
ffe60014
DG
611 /*
612 * Get stream's channel reference. Needed when adding the stream to the
613 * global hash table.
614 */
615 channel = consumer_find_channel(msg.u.stream.channel_key);
616 if (!channel) {
617 /*
618 * We could not find the channel. Can happen if cpu hotplug
619 * happens while tearing down.
620 */
d88aee68 621 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
e462382a 622 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
ffe60014
DG
623 }
624
9ce5646a
MD
625 health_code_update();
626
f50f23d9 627 /* First send a status message before receiving the fds. */
0c5b3718
SM
628 ret_send_status = consumer_send_status_msg(sock, ret_code);
629 if (ret_send_status < 0) {
d771f832 630 /* Somehow, the session daemon is not responding anymore. */
c5c7998f 631 goto error_add_stream_fatal;
1803a064 632 }
9ce5646a
MD
633
634 health_code_update();
635
0c759fc9 636 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
d771f832 637 /* Channel was not found. */
c5c7998f 638 goto error_add_stream_nosignal;
f50f23d9
DG
639 }
640
d771f832 641 /* Blocking call */
9ce5646a 642 health_poll_entry();
0c5b3718 643 ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
9ce5646a 644 health_poll_exit();
0c5b3718 645 if (ret_poll) {
c5c7998f 646 goto error_add_stream_fatal;
3bd1e081 647 }
00e2e675 648
9ce5646a
MD
649 health_code_update();
650
00e2e675 651 /* Get stream file descriptor from socket */
0c5b3718
SM
652 ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
653 if (ret_recv != sizeof(fd)) {
f73fabfd 654 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
0c5b3718 655 ret_func = ret_recv;
c5c7998f 656 goto end;
3bd1e081 657 }
3bd1e081 658
9ce5646a
MD
659 health_code_update();
660
f50f23d9
DG
661 /*
662 * Send status code to session daemon only if the recv works. If the
663 * above recv() failed, the session daemon is notified through the
664 * error socket and the teardown is eventually done.
665 */
0c5b3718
SM
666 ret_send_status = consumer_send_status_msg(sock, ret_code);
667 if (ret_send_status < 0) {
f50f23d9 668 /* Somehow, the session daemon is not responding anymore. */
c5c7998f 669 goto error_add_stream_nosignal;
f50f23d9
DG
670 }
671
9ce5646a
MD
672 health_code_update();
673
d2956687 674 pthread_mutex_lock(&channel->lock);
6f9449c2 675 new_stream = consumer_stream_create(
49f45573
JG
676 channel,
677 channel->key,
ffe60014 678 fd,
ffe60014 679 channel->name,
ffe60014
DG
680 channel->relayd_id,
681 channel->session_id,
d2956687 682 channel->trace_chunk,
ffe60014
DG
683 msg.u.stream.cpu,
684 &alloc_ret,
4891ece8 685 channel->type,
d2956687 686 channel->monitor);
3bd1e081 687 if (new_stream == NULL) {
c80048c6
MD
688 switch (alloc_ret) {
689 case -ENOMEM:
690 case -EINVAL:
691 default:
692 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
693 break;
c80048c6 694 }
d2956687 695 pthread_mutex_unlock(&channel->lock);
c5c7998f 696 goto error_add_stream_nosignal;
3bd1e081 697 }
d771f832 698
ffe60014 699 new_stream->wait_fd = fd;
0c5b3718
SM
700 ret_get_max_subbuf_size = kernctl_get_max_subbuf_size(
701 new_stream->wait_fd, &new_stream->max_sb_size);
702 if (ret_get_max_subbuf_size < 0) {
d05185fa
JG
703 pthread_mutex_unlock(&channel->lock);
704 ERR("Failed to get kernel maximal subbuffer size");
c5c7998f 705 goto error_add_stream_nosignal;
d05185fa
JG
706 }
707
d9a2e16e
JD
708 consumer_stream_update_channel_attributes(new_stream,
709 channel);
00e2e675 710
a0c83db9
DG
711 /*
712 * We've just assigned the channel to the stream so increment the
07b86b52
JD
713 * refcount right now. We don't need to increment the refcount for
714 * streams in no monitor because we handle manually the cleanup of
715 * those. It is very important to make sure there is NO prior
716 * consumer_del_stream() calls or else the refcount will be unbalanced.
a0c83db9 717 */
07b86b52
JD
718 if (channel->monitor) {
719 uatomic_inc(&new_stream->chan->refcount);
720 }
9d9353f9 721
fb3a43a9
DG
722 /*
723 * The buffer flush is done on the session daemon side for the kernel
724 * so no need for the stream "hangup_flush_done" variable to be
725 * tracked. This is important for a kernel stream since we don't rely
726 * on the flush state of the stream to read data. It's not the case for
727 * user space tracing.
728 */
729 new_stream->hangup_flush_done = 0;
730
9ce5646a
MD
731 health_code_update();
732
d2956687 733 pthread_mutex_lock(&new_stream->lock);
633d0084 734 if (ctx->on_recv_stream) {
0c5b3718
SM
735 int ret_recv_stream = ctx->on_recv_stream(new_stream);
736 if (ret_recv_stream < 0) {
d2956687
JG
737 pthread_mutex_unlock(&new_stream->lock);
738 pthread_mutex_unlock(&channel->lock);
d771f832 739 consumer_stream_free(new_stream);
c5c7998f 740 goto error_add_stream_nosignal;
fb3a43a9 741 }
633d0084 742 }
9ce5646a
MD
743 health_code_update();
744
07b86b52
JD
745 if (new_stream->metadata_flag) {
746 channel->metadata_stream = new_stream;
747 }
748
2bba9e53
DG
749 /* Do not monitor this stream. */
750 if (!channel->monitor) {
5eecee74 751 DBG("Kernel consumer add stream %s in no monitor mode with "
6dc3064a 752 "relayd id %" PRIu64, new_stream->name,
5eecee74 753 new_stream->net_seq_idx);
10a50311 754 cds_list_add(&new_stream->send_node, &channel->streams.head);
d2956687
JG
755 pthread_mutex_unlock(&new_stream->lock);
756 pthread_mutex_unlock(&channel->lock);
c5c7998f 757 goto end_add_stream;
6dc3064a
DG
758 }
759
e1b71bdc
DG
760 /* Send stream to relayd if the stream has an ID. */
761 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
0c5b3718
SM
762 int ret_send_relayd_stream;
763
764 ret_send_relayd_stream = consumer_send_relayd_stream(
765 new_stream, new_stream->chan->pathname);
766 if (ret_send_relayd_stream < 0) {
d2956687
JG
767 pthread_mutex_unlock(&new_stream->lock);
768 pthread_mutex_unlock(&channel->lock);
e1b71bdc 769 consumer_stream_free(new_stream);
c5c7998f 770 goto error_add_stream_nosignal;
e1b71bdc 771 }
001b7e62
MD
772
773 /*
774 * If adding an extra stream to an already
775 * existing channel (e.g. cpu hotplug), we need
776 * to send the "streams_sent" command to relayd.
777 */
778 if (channel->streams_sent_to_relayd) {
0c5b3718
SM
779 int ret_send_relayd_streams_sent;
780
781 ret_send_relayd_streams_sent =
782 consumer_send_relayd_streams_sent(
783 new_stream->net_seq_idx);
784 if (ret_send_relayd_streams_sent < 0) {
d2956687
JG
785 pthread_mutex_unlock(&new_stream->lock);
786 pthread_mutex_unlock(&channel->lock);
c5c7998f 787 goto error_add_stream_nosignal;
001b7e62
MD
788 }
789 }
e2039c7a 790 }
d2956687
JG
791 pthread_mutex_unlock(&new_stream->lock);
792 pthread_mutex_unlock(&channel->lock);
e2039c7a 793
50f8ae69 794 /* Get the right pipe where the stream will be sent. */
633d0084 795 if (new_stream->metadata_flag) {
66d583dc 796 consumer_add_metadata_stream(new_stream);
dae10966 797 stream_pipe = ctx->consumer_metadata_pipe;
3bd1e081 798 } else {
66d583dc 799 consumer_add_data_stream(new_stream);
dae10966 800 stream_pipe = ctx->consumer_data_pipe;
50f8ae69
DG
801 }
802
66d583dc 803 /* Visible to other threads */
5ab66908
MD
804 new_stream->globally_visible = 1;
805
9ce5646a
MD
806 health_code_update();
807
0c5b3718
SM
808 ret_pipe_write = lttng_pipe_write(
809 stream_pipe, &new_stream, sizeof(new_stream));
810 if (ret_pipe_write < 0) {
dae10966 811 ERR("Consumer write %s stream to pipe %d",
50f8ae69 812 new_stream->metadata_flag ? "metadata" : "data",
dae10966 813 lttng_pipe_get_writefd(stream_pipe));
5ab66908
MD
814 if (new_stream->metadata_flag) {
815 consumer_del_stream_for_metadata(new_stream);
816 } else {
817 consumer_del_stream_for_data(new_stream);
818 }
c5c7998f 819 goto error_add_stream_nosignal;
3bd1e081 820 }
00e2e675 821
02d02e31
JD
822 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
823 new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
c5c7998f 824end_add_stream:
3bd1e081 825 break;
c5c7998f
JG
826error_add_stream_nosignal:
827 goto end_nosignal;
828error_add_stream_fatal:
829 goto error_fatal;
3bd1e081 830 }
a4baae1b
JD
831 case LTTNG_CONSUMER_STREAMS_SENT:
832 {
833 struct lttng_consumer_channel *channel;
0c5b3718 834 int ret_send_status;
a4baae1b
JD
835
836 /*
837 * Get stream's channel reference. Needed when adding the stream to the
838 * global hash table.
839 */
840 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
841 if (!channel) {
842 /*
843 * We could not find the channel. Can happen if cpu hotplug
844 * happens while tearing down.
845 */
846 ERR("Unable to find channel key %" PRIu64,
847 msg.u.sent_streams.channel_key);
e462382a 848 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
a4baae1b
JD
849 }
850
851 health_code_update();
852
853 /*
854 * Send status code to session daemon.
855 */
0c5b3718
SM
856 ret_send_status = consumer_send_status_msg(sock, ret_code);
857 if (ret_send_status < 0 ||
858 ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
a4baae1b 859 /* Somehow, the session daemon is not responding anymore. */
80d5a658 860 goto error_streams_sent_nosignal;
a4baae1b
JD
861 }
862
863 health_code_update();
864
865 /*
866 * We should not send this message if we don't monitor the
867 * streams in this channel.
868 */
869 if (!channel->monitor) {
80d5a658 870 goto end_error_streams_sent;
a4baae1b
JD
871 }
872
873 health_code_update();
874 /* Send stream to relayd if the stream has an ID. */
875 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
0c5b3718
SM
876 int ret_send_relay_streams;
877
878 ret_send_relay_streams = consumer_send_relayd_streams_sent(
a4baae1b 879 msg.u.sent_streams.net_seq_idx);
0c5b3718 880 if (ret_send_relay_streams < 0) {
80d5a658 881 goto error_streams_sent_nosignal;
a4baae1b 882 }
001b7e62 883 channel->streams_sent_to_relayd = true;
a4baae1b 884 }
80d5a658 885end_error_streams_sent:
a4baae1b 886 break;
80d5a658
JG
887error_streams_sent_nosignal:
888 goto end_nosignal;
a4baae1b 889 }
3bd1e081
MD
890 case LTTNG_CONSUMER_UPDATE_STREAM:
891 {
3f8e211f
DG
892 rcu_read_unlock();
893 return -ENOSYS;
894 }
895 case LTTNG_CONSUMER_DESTROY_RELAYD:
896 {
a6ba4fe1 897 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
3f8e211f 898 struct consumer_relayd_sock_pair *relayd;
0c5b3718 899 int ret_send_status;
3f8e211f 900
a6ba4fe1 901 DBG("Kernel consumer destroying relayd %" PRIu64, index);
3f8e211f
DG
902
903 /* Get relayd reference if exists. */
a6ba4fe1 904 relayd = consumer_find_relayd(index);
3f8e211f 905 if (relayd == NULL) {
3448e266 906 DBG("Unable to find relayd %" PRIu64, index);
e462382a 907 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
3bd1e081 908 }
3f8e211f 909
a6ba4fe1
DG
910 /*
911 * Each relayd socket pair has a refcount of stream attached to it
912 * which tells if the relayd is still active or not depending on the
913 * refcount value.
914 *
915 * This will set the destroy flag of the relayd object and destroy it
916 * if the refcount reaches zero when called.
917 *
918 * The destroy can happen either here or when a stream fd hangs up.
919 */
f50f23d9
DG
920 if (relayd) {
921 consumer_flag_relayd_for_destroy(relayd);
922 }
923
9ce5646a
MD
924 health_code_update();
925
0c5b3718
SM
926 ret_send_status = consumer_send_status_msg(sock, ret_code);
927 if (ret_send_status < 0) {
f50f23d9 928 /* Somehow, the session daemon is not responding anymore. */
1803a064 929 goto error_fatal;
f50f23d9 930 }
3f8e211f 931
3f8e211f 932 goto end_nosignal;
3bd1e081 933 }
6d805429 934 case LTTNG_CONSUMER_DATA_PENDING:
53632229 935 {
0c5b3718 936 int32_t ret_data_pending;
6d805429 937 uint64_t id = msg.u.data_pending.session_id;
0c5b3718 938 ssize_t ret_send;
c8f59ee5 939
6d805429 940 DBG("Kernel consumer data pending command for id %" PRIu64, id);
c8f59ee5 941
0c5b3718 942 ret_data_pending = consumer_data_pending(id);
c8f59ee5 943
9ce5646a
MD
944 health_code_update();
945
c8f59ee5 946 /* Send back returned value to session daemon */
0c5b3718
SM
947 ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
948 sizeof(ret_data_pending));
949 if (ret_send < 0) {
6d805429 950 PERROR("send data pending ret code");
1803a064 951 goto error_fatal;
c8f59ee5 952 }
f50f23d9
DG
953
954 /*
955 * No need to send back a status message since the data pending
956 * returned value is the response.
957 */
c8f59ee5 958 break;
53632229 959 }
6dc3064a
DG
960 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
961 {
3eb928aa
MD
962 struct lttng_consumer_channel *channel;
963 uint64_t key = msg.u.snapshot_channel.key;
0c5b3718 964 int ret_send_status;
3eb928aa
MD
965
966 channel = consumer_find_channel(key);
967 if (!channel) {
968 ERR("Channel %" PRIu64 " not found", key);
969 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
07b86b52 970 } else {
3eb928aa 971 if (msg.u.snapshot_channel.metadata == 1) {
0c5b3718
SM
972 int ret_snapshot;
973
974 ret_snapshot = lttng_kconsumer_snapshot_metadata(
975 channel, key,
3eb928aa 976 msg.u.snapshot_channel.pathname,
0c5b3718
SM
977 msg.u.snapshot_channel.relayd_id,
978 ctx);
979 if (ret_snapshot < 0) {
3eb928aa
MD
980 ERR("Snapshot metadata failed");
981 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
982 }
983 } else {
0c5b3718
SM
984 int ret_snapshot;
985
986 ret_snapshot = lttng_kconsumer_snapshot_channel(
987 channel, key,
3eb928aa
MD
988 msg.u.snapshot_channel.pathname,
989 msg.u.snapshot_channel.relayd_id,
0c5b3718 990 msg.u.snapshot_channel
f46376a1 991 .nb_packets_per_stream);
0c5b3718 992 if (ret_snapshot < 0) {
3eb928aa
MD
993 ERR("Snapshot channel failed");
994 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
995 }
07b86b52
JD
996 }
997 }
9ce5646a
MD
998 health_code_update();
999
0c5b3718
SM
1000 ret_send_status = consumer_send_status_msg(sock, ret_code);
1001 if (ret_send_status < 0) {
6dc3064a
DG
1002 /* Somehow, the session daemon is not responding anymore. */
1003 goto end_nosignal;
1004 }
1005 break;
1006 }
07b86b52
JD
1007 case LTTNG_CONSUMER_DESTROY_CHANNEL:
1008 {
1009 uint64_t key = msg.u.destroy_channel.key;
1010 struct lttng_consumer_channel *channel;
0c5b3718 1011 int ret_send_status;
07b86b52
JD
1012
1013 channel = consumer_find_channel(key);
1014 if (!channel) {
1015 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
e462382a 1016 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
07b86b52
JD
1017 }
1018
9ce5646a
MD
1019 health_code_update();
1020
0c5b3718
SM
1021 ret_send_status = consumer_send_status_msg(sock, ret_code);
1022 if (ret_send_status < 0) {
07b86b52 1023 /* Somehow, the session daemon is not responding anymore. */
a9d36096 1024 goto end_destroy_channel;
07b86b52
JD
1025 }
1026
9ce5646a
MD
1027 health_code_update();
1028
15dc512a
DG
1029 /* Stop right now if no channel was found. */
1030 if (!channel) {
a9d36096 1031 goto end_destroy_channel;
15dc512a
DG
1032 }
1033
07b86b52
JD
1034 /*
1035 * This command should ONLY be issued for channel with streams set in
1036 * no monitor mode.
1037 */
a0377dfe 1038 LTTNG_ASSERT(!channel->monitor);
07b86b52
JD
1039
1040 /*
1041 * The refcount should ALWAYS be 0 in the case of a channel in no
1042 * monitor mode.
1043 */
a0377dfe 1044 LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1));
07b86b52
JD
1045
1046 consumer_del_channel(channel);
a9d36096 1047end_destroy_channel:
07b86b52
JD
1048 goto end_nosignal;
1049 }
fb83fe64
JD
1050 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1051 {
66ab32be
JD
1052 ssize_t ret;
1053 uint64_t count;
fb83fe64
JD
1054 struct lttng_consumer_channel *channel;
1055 uint64_t id = msg.u.discarded_events.session_id;
1056 uint64_t key = msg.u.discarded_events.channel_key;
1057
e5742757
MD
1058 DBG("Kernel consumer discarded events command for session id %"
1059 PRIu64 ", channel key %" PRIu64, id, key);
1060
fb83fe64
JD
1061 channel = consumer_find_channel(key);
1062 if (!channel) {
1063 ERR("Kernel consumer discarded events channel %"
1064 PRIu64 " not found", key);
66ab32be 1065 count = 0;
e5742757 1066 } else {
66ab32be 1067 count = channel->discarded_events;
fb83fe64
JD
1068 }
1069
fb83fe64
JD
1070 health_code_update();
1071
1072 /* Send back returned value to session daemon */
66ab32be 1073 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
fb83fe64
JD
1074 if (ret < 0) {
1075 PERROR("send discarded events");
1076 goto error_fatal;
1077 }
1078
1079 break;
1080 }
1081 case LTTNG_CONSUMER_LOST_PACKETS:
1082 {
66ab32be
JD
1083 ssize_t ret;
1084 uint64_t count;
fb83fe64
JD
1085 struct lttng_consumer_channel *channel;
1086 uint64_t id = msg.u.lost_packets.session_id;
1087 uint64_t key = msg.u.lost_packets.channel_key;
1088
e5742757
MD
1089 DBG("Kernel consumer lost packets command for session id %"
1090 PRIu64 ", channel key %" PRIu64, id, key);
1091
fb83fe64
JD
1092 channel = consumer_find_channel(key);
1093 if (!channel) {
1094 ERR("Kernel consumer lost packets channel %"
1095 PRIu64 " not found", key);
66ab32be 1096 count = 0;
e5742757 1097 } else {
66ab32be 1098 count = channel->lost_packets;
fb83fe64
JD
1099 }
1100
fb83fe64
JD
1101 health_code_update();
1102
1103 /* Send back returned value to session daemon */
66ab32be 1104 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
fb83fe64
JD
1105 if (ret < 0) {
1106 PERROR("send lost packets");
1107 goto error_fatal;
1108 }
1109
1110 break;
1111 }
b3530820
JG
1112 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1113 {
1114 int channel_monitor_pipe;
0c5b3718
SM
1115 int ret_send_status, ret_set_channel_monitor_pipe;
1116 ssize_t ret_recv;
b3530820
JG
1117
1118 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1119 /* Successfully received the command's type. */
0c5b3718
SM
1120 ret_send_status = consumer_send_status_msg(sock, ret_code);
1121 if (ret_send_status < 0) {
b3530820
JG
1122 goto error_fatal;
1123 }
1124
0c5b3718
SM
1125 ret_recv = lttcomm_recv_fds_unix_sock(
1126 sock, &channel_monitor_pipe, 1);
1127 if (ret_recv != sizeof(channel_monitor_pipe)) {
b3530820
JG
1128 ERR("Failed to receive channel monitor pipe");
1129 goto error_fatal;
1130 }
1131
1132 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
0c5b3718
SM
1133 ret_set_channel_monitor_pipe =
1134 consumer_timer_thread_set_channel_monitor_pipe(
1135 channel_monitor_pipe);
1136 if (!ret_set_channel_monitor_pipe) {
b3530820 1137 int flags;
0c5b3718 1138 int ret_fcntl;
b3530820
JG
1139
1140 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1141 /* Set the pipe as non-blocking. */
0c5b3718
SM
1142 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
1143 if (ret_fcntl == -1) {
b3530820
JG
1144 PERROR("fcntl get flags of the channel monitoring pipe");
1145 goto error_fatal;
1146 }
0c5b3718 1147 flags = ret_fcntl;
b3530820 1148
0c5b3718 1149 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
b3530820 1150 flags | O_NONBLOCK);
0c5b3718 1151 if (ret_fcntl == -1) {
b3530820
JG
1152 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1153 goto error_fatal;
1154 }
1155 DBG("Channel monitor pipe set as non-blocking");
1156 } else {
1157 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1158 }
0c5b3718
SM
1159 ret_send_status = consumer_send_status_msg(sock, ret_code);
1160 if (ret_send_status < 0) {
b3530820
JG
1161 goto error_fatal;
1162 }
1163 break;
1164 }
b99a8d42
JD
1165 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1166 {
92b7a7f8
MD
1167 struct lttng_consumer_channel *channel;
1168 uint64_t key = msg.u.rotate_channel.key;
0c5b3718 1169 int ret_send_status;
b99a8d42 1170
92b7a7f8 1171 DBG("Consumer rotate channel %" PRIu64, key);
b99a8d42 1172
92b7a7f8
MD
1173 channel = consumer_find_channel(key);
1174 if (!channel) {
1175 ERR("Channel %" PRIu64 " not found", key);
1176 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1177 } else {
1178 /*
1179 * Sample the rotate position of all the streams in this channel.
1180 */
0c5b3718
SM
1181 int ret_rotate_channel;
1182
1183 ret_rotate_channel = lttng_consumer_rotate_channel(
1184 channel, key,
f46376a1 1185 msg.u.rotate_channel.relayd_id);
0c5b3718 1186 if (ret_rotate_channel < 0) {
92b7a7f8
MD
1187 ERR("Rotate channel failed");
1188 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1189 }
b99a8d42 1190
92b7a7f8
MD
1191 health_code_update();
1192 }
0c5b3718
SM
1193
1194 ret_send_status = consumer_send_status_msg(sock, ret_code);
1195 if (ret_send_status < 0) {
b99a8d42 1196 /* Somehow, the session daemon is not responding anymore. */
713bdd26 1197 goto error_rotate_channel;
b99a8d42 1198 }
92b7a7f8
MD
1199 if (channel) {
1200 /* Rotate the streams that are ready right now. */
0c5b3718
SM
1201 int ret_rotate;
1202
1203 ret_rotate = lttng_consumer_rotate_ready_streams(
f46376a1 1204 channel, key);
0c5b3718 1205 if (ret_rotate < 0) {
92b7a7f8
MD
1206 ERR("Rotate ready streams failed");
1207 }
b99a8d42 1208 }
b99a8d42 1209 break;
713bdd26
JG
1210error_rotate_channel:
1211 goto end_nosignal;
b99a8d42 1212 }
5f3aff8b
MD
1213 case LTTNG_CONSUMER_CLEAR_CHANNEL:
1214 {
1215 struct lttng_consumer_channel *channel;
1216 uint64_t key = msg.u.clear_channel.key;
0c5b3718 1217 int ret_send_status;
5f3aff8b
MD
1218
1219 channel = consumer_find_channel(key);
1220 if (!channel) {
1221 DBG("Channel %" PRIu64 " not found", key);
1222 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1223 } else {
0c5b3718
SM
1224 int ret_clear_channel;
1225
1226 ret_clear_channel =
1227 lttng_consumer_clear_channel(channel);
1228 if (ret_clear_channel) {
5f3aff8b 1229 ERR("Clear channel failed");
97535efa 1230 ret_code = (lttcomm_return_code) ret_clear_channel;
5f3aff8b
MD
1231 }
1232
1233 health_code_update();
1234 }
0c5b3718
SM
1235
1236 ret_send_status = consumer_send_status_msg(sock, ret_code);
1237 if (ret_send_status < 0) {
5f3aff8b
MD
1238 /* Somehow, the session daemon is not responding anymore. */
1239 goto end_nosignal;
1240 }
1241
1242 break;
1243 }
d2956687 1244 case LTTNG_CONSUMER_INIT:
00fb02ac 1245 {
0c5b3718
SM
1246 int ret_send_status;
1247
d2956687
JG
1248 ret_code = lttng_consumer_init_command(ctx,
1249 msg.u.init.sessiond_uuid);
00fb02ac 1250 health_code_update();
0c5b3718
SM
1251 ret_send_status = consumer_send_status_msg(sock, ret_code);
1252 if (ret_send_status < 0) {
00fb02ac
JD
1253 /* Somehow, the session daemon is not responding anymore. */
1254 goto end_nosignal;
1255 }
1256 break;
1257 }
d2956687 1258 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
d88744a4 1259 {
d2956687 1260 const struct lttng_credentials credentials = {
ff588497
JR
1261 .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
1262 .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
d2956687
JG
1263 };
1264 const bool is_local_trace =
1265 !msg.u.create_trace_chunk.relayd_id.is_set;
1266 const uint64_t relayd_id =
1267 msg.u.create_trace_chunk.relayd_id.value;
1268 const char *chunk_override_name =
1269 *msg.u.create_trace_chunk.override_name ?
1270 msg.u.create_trace_chunk.override_name :
1271 NULL;
cbf53d23 1272 struct lttng_directory_handle *chunk_directory_handle = NULL;
d88744a4 1273
d2956687
JG
1274 /*
1275 * The session daemon will only provide a chunk directory file
1276 * descriptor for local traces.
1277 */
1278 if (is_local_trace) {
1279 int chunk_dirfd;
0c5b3718
SM
1280 int ret_send_status;
1281 ssize_t ret_recv;
19990ed5 1282
d2956687 1283 /* Acnowledge the reception of the command. */
0c5b3718
SM
1284 ret_send_status = consumer_send_status_msg(
1285 sock, LTTCOMM_CONSUMERD_SUCCESS);
1286 if (ret_send_status < 0) {
d2956687
JG
1287 /* Somehow, the session daemon is not responding anymore. */
1288 goto end_nosignal;
1289 }
92816cc3 1290
0c5b3718
SM
1291 ret_recv = lttcomm_recv_fds_unix_sock(
1292 sock, &chunk_dirfd, 1);
1293 if (ret_recv != sizeof(chunk_dirfd)) {
d2956687
JG
1294 ERR("Failed to receive trace chunk directory file descriptor");
1295 goto error_fatal;
1296 }
92816cc3 1297
d2956687
JG
1298 DBG("Received trace chunk directory fd (%d)",
1299 chunk_dirfd);
cbf53d23 1300 chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
d2956687 1301 chunk_dirfd);
cbf53d23 1302 if (!chunk_directory_handle) {
d2956687
JG
1303 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1304 if (close(chunk_dirfd)) {
1305 PERROR("Failed to close chunk directory file descriptor");
1306 }
1307 goto error_fatal;
1308 }
92816cc3
JG
1309 }
1310
d2956687
JG
1311 ret_code = lttng_consumer_create_trace_chunk(
1312 !is_local_trace ? &relayd_id : NULL,
1313 msg.u.create_trace_chunk.session_id,
1314 msg.u.create_trace_chunk.chunk_id,
e5add6d0
JG
1315 (time_t) msg.u.create_trace_chunk
1316 .creation_timestamp,
d2956687 1317 chunk_override_name,
e5add6d0
JG
1318 msg.u.create_trace_chunk.credentials.is_set ?
1319 &credentials :
1320 NULL,
cbf53d23
JG
1321 chunk_directory_handle);
1322 lttng_directory_handle_put(chunk_directory_handle);
d2956687 1323 goto end_msg_sessiond;
d88744a4 1324 }
d2956687 1325 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
a1ae2ea5 1326 {
bbc4768c 1327 enum lttng_trace_chunk_command_type close_command =
97535efa 1328 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
d2956687
JG
1329 const uint64_t relayd_id =
1330 msg.u.close_trace_chunk.relayd_id.value;
ecd1a12f
MD
1331 struct lttcomm_consumer_close_trace_chunk_reply reply;
1332 char path[LTTNG_PATH_MAX];
0c5b3718 1333 ssize_t ret_send;
d2956687
JG
1334
1335 ret_code = lttng_consumer_close_trace_chunk(
1336 msg.u.close_trace_chunk.relayd_id.is_set ?
bbc4768c
JG
1337 &relayd_id :
1338 NULL,
d2956687
JG
1339 msg.u.close_trace_chunk.session_id,
1340 msg.u.close_trace_chunk.chunk_id,
bbc4768c
JG
1341 (time_t) msg.u.close_trace_chunk.close_timestamp,
1342 msg.u.close_trace_chunk.close_command.is_set ?
1343 &close_command :
ecd1a12f
MD
1344 NULL, path);
1345 reply.ret_code = ret_code;
1346 reply.path_length = strlen(path) + 1;
0c5b3718
SM
1347 ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1348 if (ret_send != sizeof(reply)) {
ecd1a12f
MD
1349 goto error_fatal;
1350 }
0c5b3718
SM
1351 ret_send = lttcomm_send_unix_sock(
1352 sock, path, reply.path_length);
1353 if (ret_send != reply.path_length) {
ecd1a12f
MD
1354 goto error_fatal;
1355 }
1356 goto end_nosignal;
3654ed19 1357 }
d2956687 1358 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
3654ed19 1359 {
d2956687
JG
1360 const uint64_t relayd_id =
1361 msg.u.trace_chunk_exists.relayd_id.value;
1362
1363 ret_code = lttng_consumer_trace_chunk_exists(
1364 msg.u.trace_chunk_exists.relayd_id.is_set ?
1365 &relayd_id : NULL,
1366 msg.u.trace_chunk_exists.session_id,
1367 msg.u.trace_chunk_exists.chunk_id);
1368 goto end_msg_sessiond;
a1ae2ea5 1369 }
04ed9e10
JG
1370 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
1371 {
1372 const uint64_t key = msg.u.open_channel_packets.key;
1373 struct lttng_consumer_channel *channel =
1374 consumer_find_channel(key);
1375
1376 if (channel) {
1377 pthread_mutex_lock(&channel->lock);
1378 ret_code = lttng_consumer_open_channel_packets(channel);
1379 pthread_mutex_unlock(&channel->lock);
1380 } else {
1381 WARN("Channel %" PRIu64 " not found", key);
1382 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1383 }
1384
1385 health_code_update();
1386 goto end_msg_sessiond;
1387 }
3bd1e081 1388 default:
3f8e211f 1389 goto end_nosignal;
3bd1e081 1390 }
3f8e211f 1391
3bd1e081 1392end_nosignal:
4cbc1a04
DG
1393 /*
1394 * Return 1 to indicate success since the 0 value can be a socket
1395 * shutdown during the recv() or send() call.
1396 */
0c5b3718 1397 ret_func = 1;
c5c7998f
JG
1398 goto end;
1399error_fatal:
1400 /* This will issue a consumer stop. */
0c5b3718 1401 ret_func = -1;
c5c7998f 1402 goto end;
d2956687
JG
1403end_msg_sessiond:
1404 /*
1405 * The returned value here is not useful since either way we'll return 1 to
1406 * the caller because the session daemon socket management is done
1407 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1408 */
0c5b3718
SM
1409 {
1410 int ret_send_status;
1411
1412 ret_send_status = consumer_send_status_msg(sock, ret_code);
1413 if (ret_send_status < 0) {
1414 goto error_fatal;
1415 }
d2956687 1416 }
0c5b3718
SM
1417
1418 ret_func = 1;
1419
c5c7998f 1420end:
d2956687 1421 health_code_update();
1803a064 1422 rcu_read_unlock();
0c5b3718 1423 return ret_func;
3bd1e081 1424}
d41f73b7 1425
94d49140
JD
1426/*
1427 * Sync metadata meaning request them to the session daemon and snapshot to the
1428 * metadata thread can consumer them.
1429 *
1430 * Metadata stream lock MUST be acquired.
94d49140 1431 */
577eea73
JG
1432enum sync_metadata_status lttng_kconsumer_sync_metadata(
1433 struct lttng_consumer_stream *metadata)
94d49140
JD
1434{
1435 int ret;
577eea73 1436 enum sync_metadata_status status;
94d49140 1437
a0377dfe 1438 LTTNG_ASSERT(metadata);
94d49140
JD
1439
1440 ret = kernctl_buffer_flush(metadata->wait_fd);
1441 if (ret < 0) {
1442 ERR("Failed to flush kernel stream");
577eea73 1443 status = SYNC_METADATA_STATUS_ERROR;
94d49140
JD
1444 goto end;
1445 }
1446
1447 ret = kernctl_snapshot(metadata->wait_fd);
1448 if (ret < 0) {
577eea73
JG
1449 if (errno == EAGAIN) {
1450 /* No new metadata, exit. */
1451 DBG("Sync metadata, no new kernel metadata");
1452 status = SYNC_METADATA_STATUS_NO_DATA;
1453 } else {
94d49140 1454 ERR("Sync metadata, taking kernel snapshot failed.");
577eea73 1455 status = SYNC_METADATA_STATUS_ERROR;
94d49140 1456 }
577eea73
JG
1457 } else {
1458 status = SYNC_METADATA_STATUS_NEW_DATA;
94d49140
JD
1459 }
1460
1461end:
577eea73 1462 return status;
94d49140 1463}
309167d2 1464
fb83fe64 1465static
6f9449c2
JG
1466int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
1467 struct stream_subbuffer *subbuf)
fb83fe64
JD
1468{
1469 int ret;
fb83fe64 1470
6f9449c2
JG
1471 ret = kernctl_get_subbuf_size(
1472 stream->wait_fd, &subbuf->info.data.subbuf_size);
1473 if (ret) {
fb83fe64
JD
1474 goto end;
1475 }
fb83fe64 1476
6f9449c2
JG
1477 ret = kernctl_get_padded_subbuf_size(
1478 stream->wait_fd, &subbuf->info.data.padded_subbuf_size);
1479 if (ret) {
fb83fe64
JD
1480 goto end;
1481 }
fb83fe64
JD
1482
1483end:
1484 return ret;
1485}
1486
93ec662e 1487static
6f9449c2
JG
1488int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
1489 struct stream_subbuffer *subbuf)
93ec662e
JD
1490{
1491 int ret;
93ec662e 1492
6f9449c2
JG
1493 ret = extract_common_subbuffer_info(stream, subbuf);
1494 if (ret) {
93ec662e
JD
1495 goto end;
1496 }
1497
6f9449c2
JG
1498 ret = kernctl_get_metadata_version(
1499 stream->wait_fd, &subbuf->info.metadata.version);
1500 if (ret) {
93ec662e
JD
1501 goto end;
1502 }
1503
93ec662e
JD
1504end:
1505 return ret;
1506}
1507
6f9449c2
JG
1508static
1509int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
1510 struct stream_subbuffer *subbuf)
d41f73b7 1511{
6f9449c2 1512 int ret;
d41f73b7 1513
6f9449c2
JG
1514 ret = extract_common_subbuffer_info(stream, subbuf);
1515 if (ret) {
1516 goto end;
1517 }
309167d2 1518
6f9449c2
JG
1519 ret = kernctl_get_packet_size(
1520 stream->wait_fd, &subbuf->info.data.packet_size);
1521 if (ret < 0) {
1522 PERROR("Failed to get sub-buffer packet size");
1523 goto end;
1524 }
02d02e31 1525
6f9449c2
JG
1526 ret = kernctl_get_content_size(
1527 stream->wait_fd, &subbuf->info.data.content_size);
1528 if (ret < 0) {
1529 PERROR("Failed to get sub-buffer content size");
1530 goto end;
d41f73b7
MD
1531 }
1532
6f9449c2
JG
1533 ret = kernctl_get_timestamp_begin(
1534 stream->wait_fd, &subbuf->info.data.timestamp_begin);
1535 if (ret < 0) {
1536 PERROR("Failed to get sub-buffer begin timestamp");
1537 goto end;
1d4dfdef
DG
1538 }
1539
6f9449c2
JG
1540 ret = kernctl_get_timestamp_end(
1541 stream->wait_fd, &subbuf->info.data.timestamp_end);
1542 if (ret < 0) {
1543 PERROR("Failed to get sub-buffer end timestamp");
1544 goto end;
1545 }
1546
1547 ret = kernctl_get_events_discarded(
1548 stream->wait_fd, &subbuf->info.data.events_discarded);
1549 if (ret) {
1550 PERROR("Failed to get sub-buffer events discarded count");
1551 goto end;
1552 }
1553
1554 ret = kernctl_get_sequence_number(stream->wait_fd,
1555 &subbuf->info.data.sequence_number.value);
1556 if (ret) {
1557 /* May not be supported by older LTTng-modules. */
1558 if (ret != -ENOTTY) {
1559 PERROR("Failed to get sub-buffer sequence number");
1560 goto end;
fb83fe64 1561 }
1c20f0e2 1562 } else {
6f9449c2 1563 subbuf->info.data.sequence_number.is_set = true;
309167d2
JD
1564 }
1565
6f9449c2
JG
1566 ret = kernctl_get_stream_id(
1567 stream->wait_fd, &subbuf->info.data.stream_id);
1568 if (ret < 0) {
1569 PERROR("Failed to get stream id");
1570 goto end;
1571 }
1d4dfdef 1572
6f9449c2
JG
1573 ret = kernctl_get_instance_id(stream->wait_fd,
1574 &subbuf->info.data.stream_instance_id.value);
1575 if (ret) {
1576 /* May not be supported by older LTTng-modules. */
1577 if (ret != -ENOTTY) {
1578 PERROR("Failed to get stream instance id");
1579 goto end;
1d4dfdef 1580 }
6f9449c2
JG
1581 } else {
1582 subbuf->info.data.stream_instance_id.is_set = true;
1583 }
1584end:
1585 return ret;
1586}
47e81c02 1587
6f9449c2 1588static
b6797c8e
JG
1589enum get_next_subbuffer_status get_subbuffer_common(
1590 struct lttng_consumer_stream *stream,
6f9449c2
JG
1591 struct stream_subbuffer *subbuffer)
1592{
1593 int ret;
b6797c8e 1594 enum get_next_subbuffer_status status;
6f9449c2
JG
1595
1596 ret = kernctl_get_next_subbuf(stream->wait_fd);
b6797c8e
JG
1597 switch (ret) {
1598 case 0:
1599 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1600 break;
1601 case -ENODATA:
1602 case -EAGAIN:
6e5e3c51
MD
1603 /*
1604 * The caller only expects -ENODATA when there is no data to
1605 * read, but the kernel tracer returns -EAGAIN when there is
1606 * currently no data for a non-finalized stream, and -ENODATA
1607 * when there is no data for a finalized stream. Those can be
1608 * combined into a -ENODATA return value.
1609 */
b6797c8e
JG
1610 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1611 goto end;
1612 default:
1613 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
6f9449c2
JG
1614 goto end;
1615 }
1616
1617 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
b6797c8e
JG
1618 stream, subbuffer);
1619 if (ret) {
1620 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1621 }
6f9449c2 1622end:
b6797c8e 1623 return status;
6f9449c2 1624}
128708c3 1625
6f9449c2 1626static
b6797c8e
JG
1627enum get_next_subbuffer_status get_next_subbuffer_splice(
1628 struct lttng_consumer_stream *stream,
6f9449c2
JG
1629 struct stream_subbuffer *subbuffer)
1630{
b6797c8e
JG
1631 const enum get_next_subbuffer_status status =
1632 get_subbuffer_common(stream, subbuffer);
1d4dfdef 1633
b6797c8e 1634 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
6f9449c2
JG
1635 goto end;
1636 }
1d4dfdef 1637
6f9449c2
JG
1638 subbuffer->buffer.fd = stream->wait_fd;
1639end:
b6797c8e 1640 return status;
6f9449c2 1641}
fd424d99 1642
6f9449c2 1643static
b6797c8e
JG
1644enum get_next_subbuffer_status get_next_subbuffer_mmap(
1645 struct lttng_consumer_stream *stream,
6f9449c2
JG
1646 struct stream_subbuffer *subbuffer)
1647{
1648 int ret;
b6797c8e 1649 enum get_next_subbuffer_status status;
6f9449c2
JG
1650 const char *addr;
1651
b6797c8e
JG
1652 status = get_subbuffer_common(stream, subbuffer);
1653 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
6f9449c2 1654 goto end;
128708c3 1655 }
6f9449c2
JG
1656
1657 ret = get_current_subbuf_addr(stream, &addr);
1658 if (ret) {
b6797c8e 1659 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
6f9449c2 1660 goto end;
d41f73b7 1661 }
6f9449c2
JG
1662
1663 subbuffer->buffer.buffer = lttng_buffer_view_init(
1664 addr, 0, subbuffer->info.data.padded_subbuf_size);
1665end:
b6797c8e 1666 return status;
6f9449c2
JG
1667}
1668
f5ba75b4 1669static
b6797c8e 1670enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
f5ba75b4
JG
1671 struct stream_subbuffer *subbuffer)
1672{
1673 int ret;
1674 const char *addr;
1675 bool coherent;
b6797c8e 1676 enum get_next_subbuffer_status status;
f5ba75b4
JG
1677
1678 ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
1679 &coherent);
1680 if (ret) {
1681 goto end;
1682 }
1683
1684 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1685 stream, subbuffer);
1686 if (ret) {
1687 goto end;
1688 }
1689
1690 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
1691
1692 ret = get_current_subbuf_addr(stream, &addr);
1693 if (ret) {
1694 goto end;
1695 }
1696
1697 subbuffer->buffer.buffer = lttng_buffer_view_init(
1698 addr, 0, subbuffer->info.data.padded_subbuf_size);
1699 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
1700 subbuffer->info.metadata.padded_subbuf_size,
1701 coherent ? "true" : "false");
1702end:
6e5e3c51
MD
1703 /*
1704 * The caller only expects -ENODATA when there is no data to read, but
1705 * the kernel tracer returns -EAGAIN when there is currently no data
1706 * for a non-finalized stream, and -ENODATA when there is no data for a
1707 * finalized stream. Those can be combined into a -ENODATA return value.
1708 */
b6797c8e
JG
1709 switch (ret) {
1710 case 0:
1711 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1712 break;
1713 case -ENODATA:
1714 case -EAGAIN:
1715 /*
1716 * The caller only expects -ENODATA when there is no data to
1717 * read, but the kernel tracer returns -EAGAIN when there is
1718 * currently no data for a non-finalized stream, and -ENODATA
1719 * when there is no data for a finalized stream. Those can be
1720 * combined into a -ENODATA return value.
1721 */
1722 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1723 break;
1724 default:
1725 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1726 break;
6e5e3c51
MD
1727 }
1728
b6797c8e 1729 return status;
f5ba75b4
JG
1730}
1731
6f9449c2
JG
1732static
1733int put_next_subbuffer(struct lttng_consumer_stream *stream,
f46376a1 1734 struct stream_subbuffer *subbuffer __attribute__((unused)))
6f9449c2
JG
1735{
1736 const int ret = kernctl_put_next_subbuf(stream->wait_fd);
1737
1738 if (ret) {
1739 if (ret == -EFAULT) {
1740 PERROR("Error in unreserving sub buffer");
1741 } else if (ret == -EIO) {
d41f73b7 1742 /* Should never happen with newer LTTng versions */
6f9449c2 1743 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
d41f73b7 1744 }
d41f73b7
MD
1745 }
1746
6f9449c2
JG
1747 return ret;
1748}
1c20f0e2 1749
f5ba75b4
JG
1750static
1751bool is_get_next_check_metadata_available(int tracer_fd)
1752{
741e787b
JG
1753 const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL);
1754 const bool available = ret != -ENOTTY;
1755
1756 if (ret == 0) {
1757 /* get succeeded, make sure to put the subbuffer. */
1758 kernctl_put_subbuf(tracer_fd);
1759 }
1760
1761 return available;
f5ba75b4
JG
1762}
1763
091441eb
MD
1764static
1765int signal_metadata(struct lttng_consumer_stream *stream,
f46376a1 1766 struct lttng_consumer_local_data *ctx __attribute__((unused)))
091441eb
MD
1767{
1768 ASSERT_LOCKED(stream->metadata_rdv_lock);
1769 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
1770}
1771
f5ba75b4
JG
1772static
1773int lttng_kconsumer_set_stream_ops(
6f9449c2
JG
1774 struct lttng_consumer_stream *stream)
1775{
f5ba75b4
JG
1776 int ret = 0;
1777
1778 if (stream->metadata_flag && stream->chan->is_live) {
1779 DBG("Attempting to enable metadata bucketization for live consumers");
1780 if (is_get_next_check_metadata_available(stream->wait_fd)) {
1781 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1782 stream->read_subbuffer_ops.get_next_subbuffer =
1783 get_next_subbuffer_metadata_check;
1784 ret = consumer_stream_enable_metadata_bucketization(
1785 stream);
1786 if (ret) {
1787 goto end;
1788 }
1789 } else {
1790 /*
1791 * The kernel tracer version is too old to indicate
1792 * when the metadata stream has reached a "coherent"
1793 * (parseable) point.
1794 *
1795 * This means that a live viewer may see an incoherent
1796 * sequence of metadata and fail to parse it.
1797 */
1798 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1799 metadata_bucket_destroy(stream->metadata_bucket);
1800 stream->metadata_bucket = NULL;
1801 }
091441eb
MD
1802
1803 stream->read_subbuffer_ops.on_sleep = signal_metadata;
f5ba75b4
JG
1804 }
1805
1806 if (!stream->read_subbuffer_ops.get_next_subbuffer) {
1807 if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
1808 stream->read_subbuffer_ops.get_next_subbuffer =
1809 get_next_subbuffer_mmap;
1810 } else {
1811 stream->read_subbuffer_ops.get_next_subbuffer =
1812 get_next_subbuffer_splice;
1813 }
94d49140
JD
1814 }
1815
6f9449c2
JG
1816 if (stream->metadata_flag) {
1817 stream->read_subbuffer_ops.extract_subbuffer_info =
1818 extract_metadata_subbuffer_info;
1819 } else {
1820 stream->read_subbuffer_ops.extract_subbuffer_info =
1821 extract_data_subbuffer_info;
1822 if (stream->chan->is_live) {
1823 stream->read_subbuffer_ops.send_live_beacon =
1824 consumer_flush_kernel_index;
1825 }
309167d2
JD
1826 }
1827
6f9449c2 1828 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
f5ba75b4
JG
1829end:
1830 return ret;
d41f73b7
MD
1831}
1832
1833int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1834{
1835 int ret;
ffe60014 1836
a0377dfe 1837 LTTNG_ASSERT(stream);
ffe60014 1838
2bba9e53 1839 /*
d2956687
JG
1840 * Don't create anything if this is set for streaming or if there is
1841 * no current trace chunk on the parent channel.
2bba9e53 1842 */
d2956687
JG
1843 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
1844 stream->chan->trace_chunk) {
1845 ret = consumer_stream_create_output_files(stream, true);
1846 if (ret) {
fe4477ee
JD
1847 goto error;
1848 }
ffe60014 1849 }
d41f73b7 1850
d41f73b7
MD
1851 if (stream->output == LTTNG_EVENT_MMAP) {
1852 /* get the len of the mmap region */
1853 unsigned long mmap_len;
1854
1855 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1856 if (ret != 0) {
ffe60014 1857 PERROR("kernctl_get_mmap_len");
d41f73b7
MD
1858 goto error_close_fd;
1859 }
1860 stream->mmap_len = (size_t) mmap_len;
1861
ffe60014
DG
1862 stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
1863 MAP_PRIVATE, stream->wait_fd, 0);
d41f73b7 1864 if (stream->mmap_base == MAP_FAILED) {
ffe60014 1865 PERROR("Error mmaping");
d41f73b7
MD
1866 ret = -1;
1867 goto error_close_fd;
1868 }
1869 }
1870
f5ba75b4
JG
1871 ret = lttng_kconsumer_set_stream_ops(stream);
1872 if (ret) {
1873 goto error_close_fd;
1874 }
6f9449c2 1875
d41f73b7
MD
1876 /* we return 0 to let the library handle the FD internally */
1877 return 0;
1878
1879error_close_fd:
2f225ce2 1880 if (stream->out_fd >= 0) {
d41f73b7
MD
1881 int err;
1882
1883 err = close(stream->out_fd);
a0377dfe 1884 LTTNG_ASSERT(!err);
2f225ce2 1885 stream->out_fd = -1;
d41f73b7
MD
1886 }
1887error:
1888 return ret;
1889}
1890
ca22feea
DG
1891/*
1892 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
1893 * stream. Consumer data lock MUST be acquired before calling this function
1894 * and the stream lock.
ca22feea 1895 *
6d805429 1896 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
1897 * data is available for trace viewer reading.
1898 */
6d805429 1899int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
1900{
1901 int ret;
1902
a0377dfe 1903 LTTNG_ASSERT(stream);
ca22feea 1904
873b9e9a
MD
1905 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1906 ret = 0;
1907 goto end;
1908 }
1909
ca22feea
DG
1910 ret = kernctl_get_next_subbuf(stream->wait_fd);
1911 if (ret == 0) {
1912 /* There is still data so let's put back this subbuffer. */
1913 ret = kernctl_put_subbuf(stream->wait_fd);
a0377dfe 1914 LTTNG_ASSERT(ret == 0);
6d805429 1915 ret = 1; /* Data is pending */
4e9a4686 1916 goto end;
ca22feea
DG
1917 }
1918
6d805429
DG
1919 /* Data is NOT pending and ready to be read. */
1920 ret = 0;
ca22feea 1921
6efae65e
DG
1922end:
1923 return ret;
ca22feea 1924}
This page took 0.190775 seconds and 4 git commands to generate.