Fix: Unexpected payload size in cmd_recv_stream_2_11
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.cpp
CommitLineData
3bd1e081 1/*
21cf9b6b 2 * Copyright (C) 2011 EfficiOS Inc.
ab5be9fa
MJ
3 * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3bd1e081 5 *
ab5be9fa 6 * SPDX-License-Identifier: GPL-2.0-only
3bd1e081 7 *
3bd1e081
MD
8 */
9
6c1c0768 10#define _LGPL_SOURCE
3bd1e081
MD
11#include <poll.h>
12#include <pthread.h>
13#include <stdlib.h>
14#include <string.h>
15#include <sys/mman.h>
16#include <sys/socket.h>
17#include <sys/types.h>
77c7c900 18#include <inttypes.h>
3bd1e081 19#include <unistd.h>
dbb5dfe6 20#include <sys/stat.h>
f5ba75b4 21#include <stdint.h>
3bd1e081 22
51a9e1c7 23#include <bin/lttng-consumerd/health-consumerd.h>
990570ed 24#include <common/common.h>
10a8a223 25#include <common/kernel-ctl/kernel-ctl.h>
10a8a223 26#include <common/sessiond-comm/sessiond-comm.h>
00e2e675 27#include <common/sessiond-comm/relayd.h>
dbb5dfe6 28#include <common/compat/fcntl.h>
f263b7fd 29#include <common/compat/endian.h>
acdb9057 30#include <common/pipe.h>
00e2e675 31#include <common/relayd/relayd.h>
fe4477ee 32#include <common/utils.h>
c8fea79c 33#include <common/consumer/consumer-stream.h>
309167d2 34#include <common/index/index.h>
c8fea79c 35#include <common/consumer/consumer-timer.h>
d2956687 36#include <common/optional.h>
6f9449c2
JG
37#include <common/buffer-view.h>
38#include <common/consumer/consumer.h>
f5ba75b4 39#include <common/consumer/metadata-bucket.h>
0857097f 40
10a8a223 41#include "kernel-consumer.h"
3bd1e081 42
fa29bfbf 43extern struct lttng_consumer_global_data the_consumer_data;
3bd1e081 44extern int consumer_poll_timeout;
3bd1e081 45
3bd1e081
MD
46/*
47 * Take a snapshot for a specific fd
48 *
49 * Returns 0 on success, < 0 on error
50 */
ffe60014 51int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081
MD
52{
53 int ret = 0;
54 int infd = stream->wait_fd;
55
56 ret = kernctl_snapshot(infd);
d2d2f190
JD
57 /*
58 * -EAGAIN is not an error, it just means that there is no data to
59 * be read.
60 */
61 if (ret != 0 && ret != -EAGAIN) {
5a510c9f 62 PERROR("Getting sub-buffer snapshot.");
3bd1e081
MD
63 }
64
65 return ret;
66}
67
e9404c27
JG
68/*
69 * Sample consumed and produced positions for a specific fd.
70 *
71 * Returns 0 on success, < 0 on error.
72 */
73int lttng_kconsumer_sample_snapshot_positions(
74 struct lttng_consumer_stream *stream)
75{
a0377dfe 76 LTTNG_ASSERT(stream);
e9404c27
JG
77
78 return kernctl_snapshot_sample_positions(stream->wait_fd);
79}
80
3bd1e081
MD
81/*
82 * Get the produced position
83 *
84 * Returns 0 on success, < 0 on error
85 */
ffe60014 86int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
3bd1e081
MD
87 unsigned long *pos)
88{
89 int ret;
90 int infd = stream->wait_fd;
91
92 ret = kernctl_snapshot_get_produced(infd, pos);
93 if (ret != 0) {
5a510c9f 94 PERROR("kernctl_snapshot_get_produced");
3bd1e081
MD
95 }
96
97 return ret;
98}
99
07b86b52
JD
100/*
101 * Get the consumerd position
102 *
103 * Returns 0 on success, < 0 on error
104 */
105int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
106 unsigned long *pos)
107{
108 int ret;
109 int infd = stream->wait_fd;
110
111 ret = kernctl_snapshot_get_consumed(infd, pos);
112 if (ret != 0) {
5a510c9f 113 PERROR("kernctl_snapshot_get_consumed");
07b86b52
JD
114 }
115
116 return ret;
117}
118
128708c3
JG
119static
120int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
121 const char **addr)
122{
123 int ret;
124 unsigned long mmap_offset;
97535efa 125 const char *mmap_base = (const char *) stream->mmap_base;
128708c3
JG
126
127 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
128 if (ret < 0) {
129 PERROR("Failed to get mmap read offset");
130 goto error;
131 }
132
133 *addr = mmap_base + mmap_offset;
134error:
135 return ret;
136}
137
07b86b52
JD
138/*
139 * Take a snapshot of all the stream of a channel
3eb928aa 140 * RCU read-side lock must be held across this function to ensure existence of
947bd097 141 * channel.
07b86b52
JD
142 *
143 * Returns 0 on success, < 0 on error
144 */
f72bb42f
JG
145static int lttng_kconsumer_snapshot_channel(
146 struct lttng_consumer_channel *channel,
147 uint64_t key, char *path, uint64_t relayd_id,
148 uint64_t nb_packets_per_stream,
5c786ded 149 struct lttng_consumer_local_data *ctx)
07b86b52
JD
150{
151 int ret;
07b86b52
JD
152 struct lttng_consumer_stream *stream;
153
6a00837f 154 DBG("Kernel consumer snapshot channel %" PRIu64, key);
07b86b52 155
947bd097
JR
156 /* Prevent channel modifications while we perform the snapshot.*/
157 pthread_mutex_lock(&channel->lock);
158
07b86b52
JD
159 rcu_read_lock();
160
07b86b52
JD
161 /* Splice is not supported yet for channel snapshot. */
162 if (channel->output != CONSUMER_CHANNEL_MMAP) {
9381314c
JG
163 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
164 channel->name);
07b86b52
JD
165 ret = -1;
166 goto end;
167 }
168
10a50311 169 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
923333cd 170 unsigned long consumed_pos, produced_pos;
9ce5646a
MD
171
172 health_code_update();
173
07b86b52
JD
174 /*
175 * Lock stream because we are about to change its state.
176 */
177 pthread_mutex_lock(&stream->lock);
178
a0377dfe 179 LTTNG_ASSERT(channel->trace_chunk);
d2956687
JG
180 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
181 /*
182 * Can't happen barring an internal error as the channel
183 * holds a reference to the trace chunk.
184 */
185 ERR("Failed to acquire reference to channel's trace chunk");
186 ret = -1;
187 goto end_unlock;
188 }
a0377dfe 189 LTTNG_ASSERT(!stream->trace_chunk);
d2956687
JG
190 stream->trace_chunk = channel->trace_chunk;
191
29decac3
DG
192 /*
193 * Assign the received relayd ID so we can use it for streaming. The streams
194 * are not visible to anyone so this is OK to change it.
195 */
07b86b52
JD
196 stream->net_seq_idx = relayd_id;
197 channel->relayd_id = relayd_id;
198 if (relayd_id != (uint64_t) -1ULL) {
10a50311 199 ret = consumer_send_relayd_stream(stream, path);
07b86b52
JD
200 if (ret < 0) {
201 ERR("sending stream to relayd");
202 goto end_unlock;
203 }
07b86b52 204 } else {
d2956687
JG
205 ret = consumer_stream_create_output_files(stream,
206 false);
07b86b52 207 if (ret < 0) {
07b86b52
JD
208 goto end_unlock;
209 }
d2956687
JG
210 DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
211 stream->key);
07b86b52
JD
212 }
213
f22dd891 214 ret = kernctl_buffer_flush_empty(stream->wait_fd);
07b86b52 215 if (ret < 0) {
f22dd891
MD
216 /*
217 * Doing a buffer flush which does not take into
218 * account empty packets. This is not perfect
219 * for stream intersection, but required as a
220 * fall-back when "flush_empty" is not
221 * implemented by lttng-modules.
222 */
223 ret = kernctl_buffer_flush(stream->wait_fd);
224 if (ret < 0) {
225 ERR("Failed to flush kernel stream");
226 goto end_unlock;
227 }
07b86b52
JD
228 goto end_unlock;
229 }
230
231 ret = lttng_kconsumer_take_snapshot(stream);
232 if (ret < 0) {
233 ERR("Taking kernel snapshot");
234 goto end_unlock;
235 }
236
237 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
238 if (ret < 0) {
239 ERR("Produced kernel snapshot position");
240 goto end_unlock;
241 }
242
243 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
244 if (ret < 0) {
245 ERR("Consumerd kernel snapshot position");
246 goto end_unlock;
247 }
248
d07ceecd
MD
249 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
250 produced_pos, nb_packets_per_stream,
251 stream->max_sb_size);
5c786ded 252
9377d830 253 while ((long) (consumed_pos - produced_pos) < 0) {
07b86b52
JD
254 ssize_t read_len;
255 unsigned long len, padded_len;
128708c3 256 const char *subbuf_addr;
fd424d99 257 struct lttng_buffer_view subbuf_view;
07b86b52 258
9ce5646a 259 health_code_update();
07b86b52
JD
260 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
261
262 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
263 if (ret < 0) {
32af2c95 264 if (ret != -EAGAIN) {
07b86b52
JD
265 PERROR("kernctl_get_subbuf snapshot");
266 goto end_unlock;
267 }
268 DBG("Kernel consumer get subbuf failed. Skipping it.");
269 consumed_pos += stream->max_sb_size;
ddc93ee4 270 stream->chan->lost_packets++;
07b86b52
JD
271 continue;
272 }
273
274 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
275 if (ret < 0) {
276 ERR("Snapshot kernctl_get_subbuf_size");
29decac3 277 goto error_put_subbuf;
07b86b52
JD
278 }
279
280 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
281 if (ret < 0) {
282 ERR("Snapshot kernctl_get_padded_subbuf_size");
29decac3 283 goto error_put_subbuf;
07b86b52
JD
284 }
285
128708c3
JG
286 ret = get_current_subbuf_addr(stream, &subbuf_addr);
287 if (ret) {
288 goto error_put_subbuf;
289 }
290
fd424d99
JG
291 subbuf_view = lttng_buffer_view_init(
292 subbuf_addr, 0, padded_len);
f5ba75b4 293 read_len = lttng_consumer_on_read_subbuffer_mmap(
fd424d99 294 stream, &subbuf_view,
6f9449c2 295 padded_len - len);
07b86b52 296 /*
29decac3
DG
297 * We write the padded len in local tracefiles but the data len
298 * when using a relay. Display the error but continue processing
299 * to try to release the subbuffer.
07b86b52
JD
300 */
301 if (relayd_id != (uint64_t) -1ULL) {
302 if (read_len != len) {
303 ERR("Error sending to the relay (ret: %zd != len: %lu)",
304 read_len, len);
305 }
306 } else {
307 if (read_len != padded_len) {
308 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
309 read_len, padded_len);
310 }
311 }
312
313 ret = kernctl_put_subbuf(stream->wait_fd);
314 if (ret < 0) {
315 ERR("Snapshot kernctl_put_subbuf");
316 goto end_unlock;
317 }
318 consumed_pos += stream->max_sb_size;
319 }
320
321 if (relayd_id == (uint64_t) -1ULL) {
fdf9986c
MD
322 if (stream->out_fd >= 0) {
323 ret = close(stream->out_fd);
324 if (ret < 0) {
325 PERROR("Kernel consumer snapshot close out_fd");
326 goto end_unlock;
327 }
328 stream->out_fd = -1;
07b86b52 329 }
07b86b52
JD
330 } else {
331 close_relayd_stream(stream);
332 stream->net_seq_idx = (uint64_t) -1ULL;
333 }
d2956687
JG
334 lttng_trace_chunk_put(stream->trace_chunk);
335 stream->trace_chunk = NULL;
07b86b52
JD
336 pthread_mutex_unlock(&stream->lock);
337 }
338
339 /* All good! */
340 ret = 0;
341 goto end;
342
29decac3
DG
343error_put_subbuf:
344 ret = kernctl_put_subbuf(stream->wait_fd);
345 if (ret < 0) {
346 ERR("Snapshot kernctl_put_subbuf error path");
347 }
07b86b52
JD
348end_unlock:
349 pthread_mutex_unlock(&stream->lock);
350end:
351 rcu_read_unlock();
947bd097 352 pthread_mutex_unlock(&channel->lock);
07b86b52
JD
353 return ret;
354}
355
356/*
357 * Read the whole metadata available for a snapshot.
3eb928aa 358 * RCU read-side lock must be held across this function to ensure existence of
947bd097 359 * metadata_channel.
07b86b52
JD
360 *
361 * Returns 0 on success, < 0 on error
362 */
d2956687
JG
363static int lttng_kconsumer_snapshot_metadata(
364 struct lttng_consumer_channel *metadata_channel,
3eb928aa
MD
365 uint64_t key, char *path, uint64_t relayd_id,
366 struct lttng_consumer_local_data *ctx)
07b86b52 367{
d771f832
DG
368 int ret, use_relayd = 0;
369 ssize_t ret_read;
07b86b52 370 struct lttng_consumer_stream *metadata_stream;
d771f832 371
a0377dfe 372 LTTNG_ASSERT(ctx);
07b86b52
JD
373
374 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
375 key, path);
376
377 rcu_read_lock();
378
07b86b52 379 metadata_stream = metadata_channel->metadata_stream;
a0377dfe 380 LTTNG_ASSERT(metadata_stream);
d2956687 381
947bd097 382 metadata_stream->read_subbuffer_ops.lock(metadata_stream);
a0377dfe
FD
383 LTTNG_ASSERT(metadata_channel->trace_chunk);
384 LTTNG_ASSERT(metadata_stream->trace_chunk);
07b86b52 385
d771f832 386 /* Flag once that we have a valid relayd for the stream. */
e2039c7a 387 if (relayd_id != (uint64_t) -1ULL) {
d771f832
DG
388 use_relayd = 1;
389 }
390
391 if (use_relayd) {
10a50311 392 ret = consumer_send_relayd_stream(metadata_stream, path);
e2039c7a 393 if (ret < 0) {
fa27abe8 394 goto error_snapshot;
e2039c7a 395 }
e2039c7a 396 } else {
d2956687
JG
397 ret = consumer_stream_create_output_files(metadata_stream,
398 false);
e2039c7a 399 if (ret < 0) {
fa27abe8 400 goto error_snapshot;
e2039c7a 401 }
07b86b52 402 }
07b86b52 403
d771f832 404 do {
9ce5646a
MD
405 health_code_update();
406
6f9449c2 407 ret_read = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
d771f832 408 if (ret_read < 0) {
6e5e3c51
MD
409 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
410 ret_read);
411 ret = ret_read;
412 goto error_snapshot;
07b86b52 413 }
6e5e3c51 414 } while (ret_read > 0);
07b86b52 415
d771f832
DG
416 if (use_relayd) {
417 close_relayd_stream(metadata_stream);
418 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
419 } else {
fdf9986c
MD
420 if (metadata_stream->out_fd >= 0) {
421 ret = close(metadata_stream->out_fd);
422 if (ret < 0) {
423 PERROR("Kernel consumer snapshot metadata close out_fd");
424 /*
425 * Don't go on error here since the snapshot was successful at this
426 * point but somehow the close failed.
427 */
428 }
429 metadata_stream->out_fd = -1;
d2956687
JG
430 lttng_trace_chunk_put(metadata_stream->trace_chunk);
431 metadata_stream->trace_chunk = NULL;
e2039c7a 432 }
e2039c7a
JD
433 }
434
07b86b52 435 ret = 0;
fa27abe8 436error_snapshot:
947bd097 437 metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
cf53a8a6
JD
438 cds_list_del(&metadata_stream->send_node);
439 consumer_stream_destroy(metadata_stream, NULL);
440 metadata_channel->metadata_stream = NULL;
07b86b52
JD
441 rcu_read_unlock();
442 return ret;
443}
444
1803a064
MD
445/*
446 * Receive command from session daemon and process it.
447 *
448 * Return 1 on success else a negative value or 0.
449 */
3bd1e081
MD
450int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
451 int sock, struct pollfd *consumer_sockpoll)
452{
0c5b3718 453 int ret_func;
0c759fc9 454 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3bd1e081
MD
455 struct lttcomm_consumer_msg msg;
456
9ce5646a
MD
457 health_code_update();
458
0c5b3718
SM
459 {
460 ssize_t ret_recv;
461
462 ret_recv = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
463 if (ret_recv != sizeof(msg)) {
464 if (ret_recv > 0) {
465 lttng_consumer_send_error(ctx,
466 LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
467 ret_recv = -1;
468 }
469 return ret_recv;
1803a064 470 }
3bd1e081 471 }
9ce5646a
MD
472
473 health_code_update();
474
84382d49 475 /* Deprecated command */
a0377dfe 476 LTTNG_ASSERT(msg.cmd_type != LTTNG_CONSUMER_STOP);
3bd1e081 477
9ce5646a
MD
478 health_code_update();
479
b0b335c8
MD
480 /* relayd needs RCU read-side protection */
481 rcu_read_lock();
482
3bd1e081 483 switch (msg.cmd_type) {
00e2e675
DG
484 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
485 {
4222116f
JR
486 uint32_t major = msg.u.relayd_sock.major;
487 uint32_t minor = msg.u.relayd_sock.minor;
488 enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto)
489 msg.u.relayd_sock.relayd_socket_protocol;
490
f50f23d9 491 /* Session daemon status message are handled in the following call. */
2527bf85 492 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
4222116f
JR
493 msg.u.relayd_sock.type, ctx, sock,
494 consumer_sockpoll, msg.u.relayd_sock.session_id,
495 msg.u.relayd_sock.relayd_session_id, major,
496 minor, protocol);
00e2e675
DG
497 goto end_nosignal;
498 }
3bd1e081
MD
499 case LTTNG_CONSUMER_ADD_CHANNEL:
500 {
501 struct lttng_consumer_channel *new_channel;
afbf29db 502 int ret_send_status, ret_add_channel = 0;
d2956687 503 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
3bd1e081 504
9ce5646a
MD
505 health_code_update();
506
f50f23d9 507 /* First send a status message before receiving the fds. */
0c5b3718
SM
508 ret_send_status = consumer_send_status_msg(sock, ret_code);
509 if (ret_send_status < 0) {
f50f23d9 510 /* Somehow, the session daemon is not responding anymore. */
1803a064 511 goto error_fatal;
f50f23d9 512 }
9ce5646a
MD
513
514 health_code_update();
515
d88aee68 516 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
3bd1e081 517 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
d2956687
JG
518 msg.u.channel.session_id,
519 msg.u.channel.chunk_id.is_set ?
520 &chunk_id : NULL,
521 msg.u.channel.pathname,
522 msg.u.channel.name,
1624d5b7
JD
523 msg.u.channel.relayd_id, msg.u.channel.output,
524 msg.u.channel.tracefile_size,
1950109e 525 msg.u.channel.tracefile_count, 0,
ecc48a90 526 msg.u.channel.monitor,
d7ba1388 527 msg.u.channel.live_timer_interval,
a2814ea7 528 msg.u.channel.is_live,
3d071855 529 NULL, NULL);
3bd1e081 530 if (new_channel == NULL) {
f73fabfd 531 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
3bd1e081
MD
532 goto end_nosignal;
533 }
ffe60014 534 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
95a1109b
JD
535 switch (msg.u.channel.output) {
536 case LTTNG_EVENT_SPLICE:
537 new_channel->output = CONSUMER_CHANNEL_SPLICE;
538 break;
539 case LTTNG_EVENT_MMAP:
540 new_channel->output = CONSUMER_CHANNEL_MMAP;
541 break;
542 default:
543 ERR("Channel output unknown %d", msg.u.channel.output);
544 goto end_nosignal;
545 }
ffe60014
DG
546
547 /* Translate and save channel type. */
548 switch (msg.u.channel.type) {
549 case CONSUMER_CHANNEL_TYPE_DATA:
550 case CONSUMER_CHANNEL_TYPE_METADATA:
97535efa 551 new_channel->type = (consumer_channel_type) msg.u.channel.type;
ffe60014
DG
552 break;
553 default:
a0377dfe 554 abort();
ffe60014
DG
555 goto end_nosignal;
556 };
557
9ce5646a
MD
558 health_code_update();
559
3bd1e081 560 if (ctx->on_recv_channel != NULL) {
0c5b3718
SM
561 int ret_recv_channel =
562 ctx->on_recv_channel(new_channel);
563 if (ret_recv_channel == 0) {
564 ret_add_channel = consumer_add_channel(
565 new_channel, ctx);
566 } else if (ret_recv_channel < 0) {
3bd1e081
MD
567 goto end_nosignal;
568 }
569 } else {
0c5b3718
SM
570 ret_add_channel =
571 consumer_add_channel(new_channel, ctx);
3bd1e081 572 }
0c5b3718
SM
573 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA &&
574 !ret_add_channel) {
e9404c27
JG
575 int monitor_start_ret;
576
577 DBG("Consumer starting monitor timer");
94d49140
JD
578 consumer_timer_live_start(new_channel,
579 msg.u.channel.live_timer_interval);
e9404c27
JG
580 monitor_start_ret = consumer_timer_monitor_start(
581 new_channel,
582 msg.u.channel.monitor_timer_interval);
583 if (monitor_start_ret < 0) {
584 ERR("Starting channel monitoring timer failed");
585 goto end_nosignal;
586 }
94d49140 587 }
e43c41c5 588
9ce5646a
MD
589 health_code_update();
590
e43c41c5 591 /* If we received an error in add_channel, we need to report it. */
0c5b3718
SM
592 if (ret_add_channel < 0) {
593 ret_send_status = consumer_send_status_msg(
594 sock, ret_add_channel);
595 if (ret_send_status < 0) {
1803a064
MD
596 goto error_fatal;
597 }
e43c41c5
JD
598 goto end_nosignal;
599 }
600
3bd1e081
MD
601 goto end_nosignal;
602 }
603 case LTTNG_CONSUMER_ADD_STREAM:
604 {
dae10966
DG
605 int fd;
606 struct lttng_pipe *stream_pipe;
00e2e675 607 struct lttng_consumer_stream *new_stream;
ffe60014 608 struct lttng_consumer_channel *channel;
c80048c6 609 int alloc_ret = 0;
0c5b3718
SM
610 int ret_send_status, ret_poll, ret_get_max_subbuf_size;
611 ssize_t ret_pipe_write, ret_recv;
3bd1e081 612
ffe60014
DG
613 /*
614 * Get stream's channel reference. Needed when adding the stream to the
615 * global hash table.
616 */
617 channel = consumer_find_channel(msg.u.stream.channel_key);
618 if (!channel) {
619 /*
620 * We could not find the channel. Can happen if cpu hotplug
621 * happens while tearing down.
622 */
d88aee68 623 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
e462382a 624 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
ffe60014
DG
625 }
626
9ce5646a
MD
627 health_code_update();
628
f50f23d9 629 /* First send a status message before receiving the fds. */
0c5b3718
SM
630 ret_send_status = consumer_send_status_msg(sock, ret_code);
631 if (ret_send_status < 0) {
d771f832 632 /* Somehow, the session daemon is not responding anymore. */
c5c7998f 633 goto error_add_stream_fatal;
1803a064 634 }
9ce5646a
MD
635
636 health_code_update();
637
0c759fc9 638 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
d771f832 639 /* Channel was not found. */
c5c7998f 640 goto error_add_stream_nosignal;
f50f23d9
DG
641 }
642
d771f832 643 /* Blocking call */
9ce5646a 644 health_poll_entry();
0c5b3718 645 ret_poll = lttng_consumer_poll_socket(consumer_sockpoll);
9ce5646a 646 health_poll_exit();
0c5b3718 647 if (ret_poll) {
c5c7998f 648 goto error_add_stream_fatal;
3bd1e081 649 }
00e2e675 650
9ce5646a
MD
651 health_code_update();
652
00e2e675 653 /* Get stream file descriptor from socket */
0c5b3718
SM
654 ret_recv = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
655 if (ret_recv != sizeof(fd)) {
f73fabfd 656 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
0c5b3718 657 ret_func = ret_recv;
c5c7998f 658 goto end;
3bd1e081 659 }
3bd1e081 660
9ce5646a
MD
661 health_code_update();
662
f50f23d9
DG
663 /*
664 * Send status code to session daemon only if the recv works. If the
665 * above recv() failed, the session daemon is notified through the
666 * error socket and the teardown is eventually done.
667 */
0c5b3718
SM
668 ret_send_status = consumer_send_status_msg(sock, ret_code);
669 if (ret_send_status < 0) {
f50f23d9 670 /* Somehow, the session daemon is not responding anymore. */
c5c7998f 671 goto error_add_stream_nosignal;
f50f23d9
DG
672 }
673
9ce5646a
MD
674 health_code_update();
675
d2956687 676 pthread_mutex_lock(&channel->lock);
6f9449c2 677 new_stream = consumer_stream_create(
49f45573
JG
678 channel,
679 channel->key,
ffe60014 680 fd,
ffe60014 681 channel->name,
ffe60014
DG
682 channel->relayd_id,
683 channel->session_id,
d2956687 684 channel->trace_chunk,
ffe60014
DG
685 msg.u.stream.cpu,
686 &alloc_ret,
4891ece8 687 channel->type,
d2956687 688 channel->monitor);
3bd1e081 689 if (new_stream == NULL) {
c80048c6
MD
690 switch (alloc_ret) {
691 case -ENOMEM:
692 case -EINVAL:
693 default:
694 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
695 break;
c80048c6 696 }
d2956687 697 pthread_mutex_unlock(&channel->lock);
c5c7998f 698 goto error_add_stream_nosignal;
3bd1e081 699 }
d771f832 700
ffe60014 701 new_stream->wait_fd = fd;
0c5b3718
SM
702 ret_get_max_subbuf_size = kernctl_get_max_subbuf_size(
703 new_stream->wait_fd, &new_stream->max_sb_size);
704 if (ret_get_max_subbuf_size < 0) {
d05185fa
JG
705 pthread_mutex_unlock(&channel->lock);
706 ERR("Failed to get kernel maximal subbuffer size");
c5c7998f 707 goto error_add_stream_nosignal;
d05185fa
JG
708 }
709
d9a2e16e
JD
710 consumer_stream_update_channel_attributes(new_stream,
711 channel);
00e2e675 712
a0c83db9
DG
713 /*
714 * We've just assigned the channel to the stream so increment the
07b86b52
JD
715 * refcount right now. We don't need to increment the refcount for
716 * streams in no monitor because we handle manually the cleanup of
717 * those. It is very important to make sure there is NO prior
718 * consumer_del_stream() calls or else the refcount will be unbalanced.
a0c83db9 719 */
07b86b52
JD
720 if (channel->monitor) {
721 uatomic_inc(&new_stream->chan->refcount);
722 }
9d9353f9 723
fb3a43a9
DG
724 /*
725 * The buffer flush is done on the session daemon side for the kernel
726 * so no need for the stream "hangup_flush_done" variable to be
727 * tracked. This is important for a kernel stream since we don't rely
728 * on the flush state of the stream to read data. It's not the case for
729 * user space tracing.
730 */
731 new_stream->hangup_flush_done = 0;
732
9ce5646a
MD
733 health_code_update();
734
d2956687 735 pthread_mutex_lock(&new_stream->lock);
633d0084 736 if (ctx->on_recv_stream) {
0c5b3718
SM
737 int ret_recv_stream = ctx->on_recv_stream(new_stream);
738 if (ret_recv_stream < 0) {
d2956687
JG
739 pthread_mutex_unlock(&new_stream->lock);
740 pthread_mutex_unlock(&channel->lock);
d771f832 741 consumer_stream_free(new_stream);
c5c7998f 742 goto error_add_stream_nosignal;
fb3a43a9 743 }
633d0084 744 }
9ce5646a
MD
745 health_code_update();
746
07b86b52
JD
747 if (new_stream->metadata_flag) {
748 channel->metadata_stream = new_stream;
749 }
750
2bba9e53
DG
751 /* Do not monitor this stream. */
752 if (!channel->monitor) {
5eecee74 753 DBG("Kernel consumer add stream %s in no monitor mode with "
6dc3064a 754 "relayd id %" PRIu64, new_stream->name,
5eecee74 755 new_stream->net_seq_idx);
10a50311 756 cds_list_add(&new_stream->send_node, &channel->streams.head);
d2956687
JG
757 pthread_mutex_unlock(&new_stream->lock);
758 pthread_mutex_unlock(&channel->lock);
c5c7998f 759 goto end_add_stream;
6dc3064a
DG
760 }
761
e1b71bdc
DG
762 /* Send stream to relayd if the stream has an ID. */
763 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
0c5b3718
SM
764 int ret_send_relayd_stream;
765
766 ret_send_relayd_stream = consumer_send_relayd_stream(
767 new_stream, new_stream->chan->pathname);
768 if (ret_send_relayd_stream < 0) {
d2956687
JG
769 pthread_mutex_unlock(&new_stream->lock);
770 pthread_mutex_unlock(&channel->lock);
e1b71bdc 771 consumer_stream_free(new_stream);
c5c7998f 772 goto error_add_stream_nosignal;
e1b71bdc 773 }
001b7e62
MD
774
775 /*
776 * If adding an extra stream to an already
777 * existing channel (e.g. cpu hotplug), we need
778 * to send the "streams_sent" command to relayd.
779 */
780 if (channel->streams_sent_to_relayd) {
0c5b3718
SM
781 int ret_send_relayd_streams_sent;
782
783 ret_send_relayd_streams_sent =
784 consumer_send_relayd_streams_sent(
785 new_stream->net_seq_idx);
786 if (ret_send_relayd_streams_sent < 0) {
d2956687
JG
787 pthread_mutex_unlock(&new_stream->lock);
788 pthread_mutex_unlock(&channel->lock);
c5c7998f 789 goto error_add_stream_nosignal;
001b7e62
MD
790 }
791 }
e2039c7a 792 }
d2956687
JG
793 pthread_mutex_unlock(&new_stream->lock);
794 pthread_mutex_unlock(&channel->lock);
e2039c7a 795
50f8ae69 796 /* Get the right pipe where the stream will be sent. */
633d0084 797 if (new_stream->metadata_flag) {
66d583dc 798 consumer_add_metadata_stream(new_stream);
dae10966 799 stream_pipe = ctx->consumer_metadata_pipe;
3bd1e081 800 } else {
66d583dc 801 consumer_add_data_stream(new_stream);
dae10966 802 stream_pipe = ctx->consumer_data_pipe;
50f8ae69
DG
803 }
804
66d583dc 805 /* Visible to other threads */
5ab66908
MD
806 new_stream->globally_visible = 1;
807
9ce5646a
MD
808 health_code_update();
809
0c5b3718
SM
810 ret_pipe_write = lttng_pipe_write(
811 stream_pipe, &new_stream, sizeof(new_stream));
812 if (ret_pipe_write < 0) {
dae10966 813 ERR("Consumer write %s stream to pipe %d",
50f8ae69 814 new_stream->metadata_flag ? "metadata" : "data",
dae10966 815 lttng_pipe_get_writefd(stream_pipe));
5ab66908
MD
816 if (new_stream->metadata_flag) {
817 consumer_del_stream_for_metadata(new_stream);
818 } else {
819 consumer_del_stream_for_data(new_stream);
820 }
c5c7998f 821 goto error_add_stream_nosignal;
3bd1e081 822 }
00e2e675 823
02d02e31
JD
824 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
825 new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
c5c7998f 826end_add_stream:
3bd1e081 827 break;
c5c7998f
JG
828error_add_stream_nosignal:
829 goto end_nosignal;
830error_add_stream_fatal:
831 goto error_fatal;
3bd1e081 832 }
a4baae1b
JD
833 case LTTNG_CONSUMER_STREAMS_SENT:
834 {
835 struct lttng_consumer_channel *channel;
0c5b3718 836 int ret_send_status;
a4baae1b
JD
837
838 /*
839 * Get stream's channel reference. Needed when adding the stream to the
840 * global hash table.
841 */
842 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
843 if (!channel) {
844 /*
845 * We could not find the channel. Can happen if cpu hotplug
846 * happens while tearing down.
847 */
848 ERR("Unable to find channel key %" PRIu64,
849 msg.u.sent_streams.channel_key);
e462382a 850 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
a4baae1b
JD
851 }
852
853 health_code_update();
854
855 /*
856 * Send status code to session daemon.
857 */
0c5b3718
SM
858 ret_send_status = consumer_send_status_msg(sock, ret_code);
859 if (ret_send_status < 0 ||
860 ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
a4baae1b 861 /* Somehow, the session daemon is not responding anymore. */
80d5a658 862 goto error_streams_sent_nosignal;
a4baae1b
JD
863 }
864
865 health_code_update();
866
867 /*
868 * We should not send this message if we don't monitor the
869 * streams in this channel.
870 */
871 if (!channel->monitor) {
80d5a658 872 goto end_error_streams_sent;
a4baae1b
JD
873 }
874
875 health_code_update();
876 /* Send stream to relayd if the stream has an ID. */
877 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
0c5b3718
SM
878 int ret_send_relay_streams;
879
880 ret_send_relay_streams = consumer_send_relayd_streams_sent(
a4baae1b 881 msg.u.sent_streams.net_seq_idx);
0c5b3718 882 if (ret_send_relay_streams < 0) {
80d5a658 883 goto error_streams_sent_nosignal;
a4baae1b 884 }
001b7e62 885 channel->streams_sent_to_relayd = true;
a4baae1b 886 }
80d5a658 887end_error_streams_sent:
a4baae1b 888 break;
80d5a658
JG
889error_streams_sent_nosignal:
890 goto end_nosignal;
a4baae1b 891 }
3bd1e081
MD
892 case LTTNG_CONSUMER_UPDATE_STREAM:
893 {
3f8e211f
DG
894 rcu_read_unlock();
895 return -ENOSYS;
896 }
897 case LTTNG_CONSUMER_DESTROY_RELAYD:
898 {
a6ba4fe1 899 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
3f8e211f 900 struct consumer_relayd_sock_pair *relayd;
0c5b3718 901 int ret_send_status;
3f8e211f 902
a6ba4fe1 903 DBG("Kernel consumer destroying relayd %" PRIu64, index);
3f8e211f
DG
904
905 /* Get relayd reference if exists. */
a6ba4fe1 906 relayd = consumer_find_relayd(index);
3f8e211f 907 if (relayd == NULL) {
3448e266 908 DBG("Unable to find relayd %" PRIu64, index);
e462382a 909 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
3bd1e081 910 }
3f8e211f 911
a6ba4fe1
DG
912 /*
913 * Each relayd socket pair has a refcount of stream attached to it
914 * which tells if the relayd is still active or not depending on the
915 * refcount value.
916 *
917 * This will set the destroy flag of the relayd object and destroy it
918 * if the refcount reaches zero when called.
919 *
920 * The destroy can happen either here or when a stream fd hangs up.
921 */
f50f23d9
DG
922 if (relayd) {
923 consumer_flag_relayd_for_destroy(relayd);
924 }
925
9ce5646a
MD
926 health_code_update();
927
0c5b3718
SM
928 ret_send_status = consumer_send_status_msg(sock, ret_code);
929 if (ret_send_status < 0) {
f50f23d9 930 /* Somehow, the session daemon is not responding anymore. */
1803a064 931 goto error_fatal;
f50f23d9 932 }
3f8e211f 933
3f8e211f 934 goto end_nosignal;
3bd1e081 935 }
6d805429 936 case LTTNG_CONSUMER_DATA_PENDING:
53632229 937 {
0c5b3718 938 int32_t ret_data_pending;
6d805429 939 uint64_t id = msg.u.data_pending.session_id;
0c5b3718 940 ssize_t ret_send;
c8f59ee5 941
6d805429 942 DBG("Kernel consumer data pending command for id %" PRIu64, id);
c8f59ee5 943
0c5b3718 944 ret_data_pending = consumer_data_pending(id);
c8f59ee5 945
9ce5646a
MD
946 health_code_update();
947
c8f59ee5 948 /* Send back returned value to session daemon */
0c5b3718
SM
949 ret_send = lttcomm_send_unix_sock(sock, &ret_data_pending,
950 sizeof(ret_data_pending));
951 if (ret_send < 0) {
6d805429 952 PERROR("send data pending ret code");
1803a064 953 goto error_fatal;
c8f59ee5 954 }
f50f23d9
DG
955
956 /*
957 * No need to send back a status message since the data pending
958 * returned value is the response.
959 */
c8f59ee5 960 break;
53632229 961 }
6dc3064a
DG
962 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
963 {
3eb928aa
MD
964 struct lttng_consumer_channel *channel;
965 uint64_t key = msg.u.snapshot_channel.key;
0c5b3718 966 int ret_send_status;
3eb928aa
MD
967
968 channel = consumer_find_channel(key);
969 if (!channel) {
970 ERR("Channel %" PRIu64 " not found", key);
971 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
07b86b52 972 } else {
3eb928aa 973 if (msg.u.snapshot_channel.metadata == 1) {
0c5b3718
SM
974 int ret_snapshot;
975
976 ret_snapshot = lttng_kconsumer_snapshot_metadata(
977 channel, key,
3eb928aa 978 msg.u.snapshot_channel.pathname,
0c5b3718
SM
979 msg.u.snapshot_channel.relayd_id,
980 ctx);
981 if (ret_snapshot < 0) {
3eb928aa
MD
982 ERR("Snapshot metadata failed");
983 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
984 }
985 } else {
0c5b3718
SM
986 int ret_snapshot;
987
988 ret_snapshot = lttng_kconsumer_snapshot_channel(
989 channel, key,
3eb928aa
MD
990 msg.u.snapshot_channel.pathname,
991 msg.u.snapshot_channel.relayd_id,
0c5b3718
SM
992 msg.u.snapshot_channel
993 .nb_packets_per_stream,
3eb928aa 994 ctx);
0c5b3718 995 if (ret_snapshot < 0) {
3eb928aa
MD
996 ERR("Snapshot channel failed");
997 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
998 }
07b86b52
JD
999 }
1000 }
9ce5646a
MD
1001 health_code_update();
1002
0c5b3718
SM
1003 ret_send_status = consumer_send_status_msg(sock, ret_code);
1004 if (ret_send_status < 0) {
6dc3064a
DG
1005 /* Somehow, the session daemon is not responding anymore. */
1006 goto end_nosignal;
1007 }
1008 break;
1009 }
07b86b52
JD
1010 case LTTNG_CONSUMER_DESTROY_CHANNEL:
1011 {
1012 uint64_t key = msg.u.destroy_channel.key;
1013 struct lttng_consumer_channel *channel;
0c5b3718 1014 int ret_send_status;
07b86b52
JD
1015
1016 channel = consumer_find_channel(key);
1017 if (!channel) {
1018 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
e462382a 1019 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
07b86b52
JD
1020 }
1021
9ce5646a
MD
1022 health_code_update();
1023
0c5b3718
SM
1024 ret_send_status = consumer_send_status_msg(sock, ret_code);
1025 if (ret_send_status < 0) {
07b86b52 1026 /* Somehow, the session daemon is not responding anymore. */
a9d36096 1027 goto end_destroy_channel;
07b86b52
JD
1028 }
1029
9ce5646a
MD
1030 health_code_update();
1031
15dc512a
DG
1032 /* Stop right now if no channel was found. */
1033 if (!channel) {
a9d36096 1034 goto end_destroy_channel;
15dc512a
DG
1035 }
1036
07b86b52
JD
1037 /*
1038 * This command should ONLY be issued for channel with streams set in
1039 * no monitor mode.
1040 */
a0377dfe 1041 LTTNG_ASSERT(!channel->monitor);
07b86b52
JD
1042
1043 /*
1044 * The refcount should ALWAYS be 0 in the case of a channel in no
1045 * monitor mode.
1046 */
a0377dfe 1047 LTTNG_ASSERT(!uatomic_sub_return(&channel->refcount, 1));
07b86b52
JD
1048
1049 consumer_del_channel(channel);
a9d36096 1050end_destroy_channel:
07b86b52
JD
1051 goto end_nosignal;
1052 }
fb83fe64
JD
1053 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1054 {
66ab32be
JD
1055 ssize_t ret;
1056 uint64_t count;
fb83fe64
JD
1057 struct lttng_consumer_channel *channel;
1058 uint64_t id = msg.u.discarded_events.session_id;
1059 uint64_t key = msg.u.discarded_events.channel_key;
1060
e5742757
MD
1061 DBG("Kernel consumer discarded events command for session id %"
1062 PRIu64 ", channel key %" PRIu64, id, key);
1063
fb83fe64
JD
1064 channel = consumer_find_channel(key);
1065 if (!channel) {
1066 ERR("Kernel consumer discarded events channel %"
1067 PRIu64 " not found", key);
66ab32be 1068 count = 0;
e5742757 1069 } else {
66ab32be 1070 count = channel->discarded_events;
fb83fe64
JD
1071 }
1072
fb83fe64
JD
1073 health_code_update();
1074
1075 /* Send back returned value to session daemon */
66ab32be 1076 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
fb83fe64
JD
1077 if (ret < 0) {
1078 PERROR("send discarded events");
1079 goto error_fatal;
1080 }
1081
1082 break;
1083 }
1084 case LTTNG_CONSUMER_LOST_PACKETS:
1085 {
66ab32be
JD
1086 ssize_t ret;
1087 uint64_t count;
fb83fe64
JD
1088 struct lttng_consumer_channel *channel;
1089 uint64_t id = msg.u.lost_packets.session_id;
1090 uint64_t key = msg.u.lost_packets.channel_key;
1091
e5742757
MD
1092 DBG("Kernel consumer lost packets command for session id %"
1093 PRIu64 ", channel key %" PRIu64, id, key);
1094
fb83fe64
JD
1095 channel = consumer_find_channel(key);
1096 if (!channel) {
1097 ERR("Kernel consumer lost packets channel %"
1098 PRIu64 " not found", key);
66ab32be 1099 count = 0;
e5742757 1100 } else {
66ab32be 1101 count = channel->lost_packets;
fb83fe64
JD
1102 }
1103
fb83fe64
JD
1104 health_code_update();
1105
1106 /* Send back returned value to session daemon */
66ab32be 1107 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
fb83fe64
JD
1108 if (ret < 0) {
1109 PERROR("send lost packets");
1110 goto error_fatal;
1111 }
1112
1113 break;
1114 }
b3530820
JG
1115 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1116 {
1117 int channel_monitor_pipe;
0c5b3718
SM
1118 int ret_send_status, ret_set_channel_monitor_pipe;
1119 ssize_t ret_recv;
b3530820
JG
1120
1121 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1122 /* Successfully received the command's type. */
0c5b3718
SM
1123 ret_send_status = consumer_send_status_msg(sock, ret_code);
1124 if (ret_send_status < 0) {
b3530820
JG
1125 goto error_fatal;
1126 }
1127
0c5b3718
SM
1128 ret_recv = lttcomm_recv_fds_unix_sock(
1129 sock, &channel_monitor_pipe, 1);
1130 if (ret_recv != sizeof(channel_monitor_pipe)) {
b3530820
JG
1131 ERR("Failed to receive channel monitor pipe");
1132 goto error_fatal;
1133 }
1134
1135 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
0c5b3718
SM
1136 ret_set_channel_monitor_pipe =
1137 consumer_timer_thread_set_channel_monitor_pipe(
1138 channel_monitor_pipe);
1139 if (!ret_set_channel_monitor_pipe) {
b3530820 1140 int flags;
0c5b3718 1141 int ret_fcntl;
b3530820
JG
1142
1143 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1144 /* Set the pipe as non-blocking. */
0c5b3718
SM
1145 ret_fcntl = fcntl(channel_monitor_pipe, F_GETFL, 0);
1146 if (ret_fcntl == -1) {
b3530820
JG
1147 PERROR("fcntl get flags of the channel monitoring pipe");
1148 goto error_fatal;
1149 }
0c5b3718 1150 flags = ret_fcntl;
b3530820 1151
0c5b3718 1152 ret_fcntl = fcntl(channel_monitor_pipe, F_SETFL,
b3530820 1153 flags | O_NONBLOCK);
0c5b3718 1154 if (ret_fcntl == -1) {
b3530820
JG
1155 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1156 goto error_fatal;
1157 }
1158 DBG("Channel monitor pipe set as non-blocking");
1159 } else {
1160 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1161 }
0c5b3718
SM
1162 ret_send_status = consumer_send_status_msg(sock, ret_code);
1163 if (ret_send_status < 0) {
b3530820
JG
1164 goto error_fatal;
1165 }
1166 break;
1167 }
b99a8d42
JD
1168 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1169 {
92b7a7f8
MD
1170 struct lttng_consumer_channel *channel;
1171 uint64_t key = msg.u.rotate_channel.key;
0c5b3718 1172 int ret_send_status;
b99a8d42 1173
92b7a7f8 1174 DBG("Consumer rotate channel %" PRIu64, key);
b99a8d42 1175
92b7a7f8
MD
1176 channel = consumer_find_channel(key);
1177 if (!channel) {
1178 ERR("Channel %" PRIu64 " not found", key);
1179 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1180 } else {
1181 /*
1182 * Sample the rotate position of all the streams in this channel.
1183 */
0c5b3718
SM
1184 int ret_rotate_channel;
1185
1186 ret_rotate_channel = lttng_consumer_rotate_channel(
1187 channel, key,
92b7a7f8 1188 msg.u.rotate_channel.relayd_id,
0c5b3718
SM
1189 msg.u.rotate_channel.metadata, ctx);
1190 if (ret_rotate_channel < 0) {
92b7a7f8
MD
1191 ERR("Rotate channel failed");
1192 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1193 }
b99a8d42 1194
92b7a7f8
MD
1195 health_code_update();
1196 }
0c5b3718
SM
1197
1198 ret_send_status = consumer_send_status_msg(sock, ret_code);
1199 if (ret_send_status < 0) {
b99a8d42 1200 /* Somehow, the session daemon is not responding anymore. */
713bdd26 1201 goto error_rotate_channel;
b99a8d42 1202 }
92b7a7f8
MD
1203 if (channel) {
1204 /* Rotate the streams that are ready right now. */
0c5b3718
SM
1205 int ret_rotate;
1206
1207 ret_rotate = lttng_consumer_rotate_ready_streams(
92b7a7f8 1208 channel, key, ctx);
0c5b3718 1209 if (ret_rotate < 0) {
92b7a7f8
MD
1210 ERR("Rotate ready streams failed");
1211 }
b99a8d42 1212 }
b99a8d42 1213 break;
713bdd26
JG
1214error_rotate_channel:
1215 goto end_nosignal;
b99a8d42 1216 }
5f3aff8b
MD
1217 case LTTNG_CONSUMER_CLEAR_CHANNEL:
1218 {
1219 struct lttng_consumer_channel *channel;
1220 uint64_t key = msg.u.clear_channel.key;
0c5b3718 1221 int ret_send_status;
5f3aff8b
MD
1222
1223 channel = consumer_find_channel(key);
1224 if (!channel) {
1225 DBG("Channel %" PRIu64 " not found", key);
1226 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1227 } else {
0c5b3718
SM
1228 int ret_clear_channel;
1229
1230 ret_clear_channel =
1231 lttng_consumer_clear_channel(channel);
1232 if (ret_clear_channel) {
5f3aff8b 1233 ERR("Clear channel failed");
97535efa 1234 ret_code = (lttcomm_return_code) ret_clear_channel;
5f3aff8b
MD
1235 }
1236
1237 health_code_update();
1238 }
0c5b3718
SM
1239
1240 ret_send_status = consumer_send_status_msg(sock, ret_code);
1241 if (ret_send_status < 0) {
5f3aff8b
MD
1242 /* Somehow, the session daemon is not responding anymore. */
1243 goto end_nosignal;
1244 }
1245
1246 break;
1247 }
d2956687 1248 case LTTNG_CONSUMER_INIT:
00fb02ac 1249 {
0c5b3718
SM
1250 int ret_send_status;
1251
d2956687
JG
1252 ret_code = lttng_consumer_init_command(ctx,
1253 msg.u.init.sessiond_uuid);
00fb02ac 1254 health_code_update();
0c5b3718
SM
1255 ret_send_status = consumer_send_status_msg(sock, ret_code);
1256 if (ret_send_status < 0) {
00fb02ac
JD
1257 /* Somehow, the session daemon is not responding anymore. */
1258 goto end_nosignal;
1259 }
1260 break;
1261 }
d2956687 1262 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
d88744a4 1263 {
d2956687 1264 const struct lttng_credentials credentials = {
ff588497
JR
1265 .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid),
1266 .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid),
d2956687
JG
1267 };
1268 const bool is_local_trace =
1269 !msg.u.create_trace_chunk.relayd_id.is_set;
1270 const uint64_t relayd_id =
1271 msg.u.create_trace_chunk.relayd_id.value;
1272 const char *chunk_override_name =
1273 *msg.u.create_trace_chunk.override_name ?
1274 msg.u.create_trace_chunk.override_name :
1275 NULL;
cbf53d23 1276 struct lttng_directory_handle *chunk_directory_handle = NULL;
d88744a4 1277
d2956687
JG
1278 /*
1279 * The session daemon will only provide a chunk directory file
1280 * descriptor for local traces.
1281 */
1282 if (is_local_trace) {
1283 int chunk_dirfd;
0c5b3718
SM
1284 int ret_send_status;
1285 ssize_t ret_recv;
19990ed5 1286
d2956687 1287 /* Acnowledge the reception of the command. */
0c5b3718
SM
1288 ret_send_status = consumer_send_status_msg(
1289 sock, LTTCOMM_CONSUMERD_SUCCESS);
1290 if (ret_send_status < 0) {
d2956687
JG
1291 /* Somehow, the session daemon is not responding anymore. */
1292 goto end_nosignal;
1293 }
92816cc3 1294
0c5b3718
SM
1295 ret_recv = lttcomm_recv_fds_unix_sock(
1296 sock, &chunk_dirfd, 1);
1297 if (ret_recv != sizeof(chunk_dirfd)) {
d2956687
JG
1298 ERR("Failed to receive trace chunk directory file descriptor");
1299 goto error_fatal;
1300 }
92816cc3 1301
d2956687
JG
1302 DBG("Received trace chunk directory fd (%d)",
1303 chunk_dirfd);
cbf53d23 1304 chunk_directory_handle = lttng_directory_handle_create_from_dirfd(
d2956687 1305 chunk_dirfd);
cbf53d23 1306 if (!chunk_directory_handle) {
d2956687
JG
1307 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1308 if (close(chunk_dirfd)) {
1309 PERROR("Failed to close chunk directory file descriptor");
1310 }
1311 goto error_fatal;
1312 }
92816cc3
JG
1313 }
1314
d2956687
JG
1315 ret_code = lttng_consumer_create_trace_chunk(
1316 !is_local_trace ? &relayd_id : NULL,
1317 msg.u.create_trace_chunk.session_id,
1318 msg.u.create_trace_chunk.chunk_id,
e5add6d0
JG
1319 (time_t) msg.u.create_trace_chunk
1320 .creation_timestamp,
d2956687 1321 chunk_override_name,
e5add6d0
JG
1322 msg.u.create_trace_chunk.credentials.is_set ?
1323 &credentials :
1324 NULL,
cbf53d23
JG
1325 chunk_directory_handle);
1326 lttng_directory_handle_put(chunk_directory_handle);
d2956687 1327 goto end_msg_sessiond;
d88744a4 1328 }
d2956687 1329 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
a1ae2ea5 1330 {
bbc4768c 1331 enum lttng_trace_chunk_command_type close_command =
97535efa 1332 (lttng_trace_chunk_command_type) msg.u.close_trace_chunk.close_command.value;
d2956687
JG
1333 const uint64_t relayd_id =
1334 msg.u.close_trace_chunk.relayd_id.value;
ecd1a12f
MD
1335 struct lttcomm_consumer_close_trace_chunk_reply reply;
1336 char path[LTTNG_PATH_MAX];
0c5b3718 1337 ssize_t ret_send;
d2956687
JG
1338
1339 ret_code = lttng_consumer_close_trace_chunk(
1340 msg.u.close_trace_chunk.relayd_id.is_set ?
bbc4768c
JG
1341 &relayd_id :
1342 NULL,
d2956687
JG
1343 msg.u.close_trace_chunk.session_id,
1344 msg.u.close_trace_chunk.chunk_id,
bbc4768c
JG
1345 (time_t) msg.u.close_trace_chunk.close_timestamp,
1346 msg.u.close_trace_chunk.close_command.is_set ?
1347 &close_command :
ecd1a12f
MD
1348 NULL, path);
1349 reply.ret_code = ret_code;
1350 reply.path_length = strlen(path) + 1;
0c5b3718
SM
1351 ret_send = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1352 if (ret_send != sizeof(reply)) {
ecd1a12f
MD
1353 goto error_fatal;
1354 }
0c5b3718
SM
1355 ret_send = lttcomm_send_unix_sock(
1356 sock, path, reply.path_length);
1357 if (ret_send != reply.path_length) {
ecd1a12f
MD
1358 goto error_fatal;
1359 }
1360 goto end_nosignal;
3654ed19 1361 }
d2956687 1362 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
3654ed19 1363 {
d2956687
JG
1364 const uint64_t relayd_id =
1365 msg.u.trace_chunk_exists.relayd_id.value;
1366
1367 ret_code = lttng_consumer_trace_chunk_exists(
1368 msg.u.trace_chunk_exists.relayd_id.is_set ?
1369 &relayd_id : NULL,
1370 msg.u.trace_chunk_exists.session_id,
1371 msg.u.trace_chunk_exists.chunk_id);
1372 goto end_msg_sessiond;
a1ae2ea5 1373 }
04ed9e10
JG
1374 case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS:
1375 {
1376 const uint64_t key = msg.u.open_channel_packets.key;
1377 struct lttng_consumer_channel *channel =
1378 consumer_find_channel(key);
1379
1380 if (channel) {
1381 pthread_mutex_lock(&channel->lock);
1382 ret_code = lttng_consumer_open_channel_packets(channel);
1383 pthread_mutex_unlock(&channel->lock);
1384 } else {
1385 WARN("Channel %" PRIu64 " not found", key);
1386 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1387 }
1388
1389 health_code_update();
1390 goto end_msg_sessiond;
1391 }
3bd1e081 1392 default:
3f8e211f 1393 goto end_nosignal;
3bd1e081 1394 }
3f8e211f 1395
3bd1e081 1396end_nosignal:
4cbc1a04
DG
1397 /*
1398 * Return 1 to indicate success since the 0 value can be a socket
1399 * shutdown during the recv() or send() call.
1400 */
0c5b3718 1401 ret_func = 1;
c5c7998f
JG
1402 goto end;
1403error_fatal:
1404 /* This will issue a consumer stop. */
0c5b3718 1405 ret_func = -1;
c5c7998f 1406 goto end;
d2956687
JG
1407end_msg_sessiond:
1408 /*
1409 * The returned value here is not useful since either way we'll return 1 to
1410 * the caller because the session daemon socket management is done
1411 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1412 */
0c5b3718
SM
1413 {
1414 int ret_send_status;
1415
1416 ret_send_status = consumer_send_status_msg(sock, ret_code);
1417 if (ret_send_status < 0) {
1418 goto error_fatal;
1419 }
d2956687 1420 }
0c5b3718
SM
1421
1422 ret_func = 1;
1423
c5c7998f 1424end:
d2956687 1425 health_code_update();
1803a064 1426 rcu_read_unlock();
0c5b3718 1427 return ret_func;
3bd1e081 1428}
d41f73b7 1429
94d49140
JD
1430/*
1431 * Sync metadata meaning request them to the session daemon and snapshot to the
1432 * metadata thread can consumer them.
1433 *
1434 * Metadata stream lock MUST be acquired.
94d49140 1435 */
577eea73
JG
1436enum sync_metadata_status lttng_kconsumer_sync_metadata(
1437 struct lttng_consumer_stream *metadata)
94d49140
JD
1438{
1439 int ret;
577eea73 1440 enum sync_metadata_status status;
94d49140 1441
a0377dfe 1442 LTTNG_ASSERT(metadata);
94d49140
JD
1443
1444 ret = kernctl_buffer_flush(metadata->wait_fd);
1445 if (ret < 0) {
1446 ERR("Failed to flush kernel stream");
577eea73 1447 status = SYNC_METADATA_STATUS_ERROR;
94d49140
JD
1448 goto end;
1449 }
1450
1451 ret = kernctl_snapshot(metadata->wait_fd);
1452 if (ret < 0) {
577eea73
JG
1453 if (errno == EAGAIN) {
1454 /* No new metadata, exit. */
1455 DBG("Sync metadata, no new kernel metadata");
1456 status = SYNC_METADATA_STATUS_NO_DATA;
1457 } else {
94d49140 1458 ERR("Sync metadata, taking kernel snapshot failed.");
577eea73 1459 status = SYNC_METADATA_STATUS_ERROR;
94d49140 1460 }
577eea73
JG
1461 } else {
1462 status = SYNC_METADATA_STATUS_NEW_DATA;
94d49140
JD
1463 }
1464
1465end:
577eea73 1466 return status;
94d49140 1467}
309167d2 1468
fb83fe64 1469static
6f9449c2
JG
1470int extract_common_subbuffer_info(struct lttng_consumer_stream *stream,
1471 struct stream_subbuffer *subbuf)
fb83fe64
JD
1472{
1473 int ret;
fb83fe64 1474
6f9449c2
JG
1475 ret = kernctl_get_subbuf_size(
1476 stream->wait_fd, &subbuf->info.data.subbuf_size);
1477 if (ret) {
fb83fe64
JD
1478 goto end;
1479 }
fb83fe64 1480
6f9449c2
JG
1481 ret = kernctl_get_padded_subbuf_size(
1482 stream->wait_fd, &subbuf->info.data.padded_subbuf_size);
1483 if (ret) {
fb83fe64
JD
1484 goto end;
1485 }
fb83fe64
JD
1486
1487end:
1488 return ret;
1489}
1490
93ec662e 1491static
6f9449c2
JG
1492int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream,
1493 struct stream_subbuffer *subbuf)
93ec662e
JD
1494{
1495 int ret;
93ec662e 1496
6f9449c2
JG
1497 ret = extract_common_subbuffer_info(stream, subbuf);
1498 if (ret) {
93ec662e
JD
1499 goto end;
1500 }
1501
6f9449c2
JG
1502 ret = kernctl_get_metadata_version(
1503 stream->wait_fd, &subbuf->info.metadata.version);
1504 if (ret) {
93ec662e
JD
1505 goto end;
1506 }
1507
93ec662e
JD
1508end:
1509 return ret;
1510}
1511
6f9449c2
JG
1512static
1513int extract_data_subbuffer_info(struct lttng_consumer_stream *stream,
1514 struct stream_subbuffer *subbuf)
d41f73b7 1515{
6f9449c2 1516 int ret;
d41f73b7 1517
6f9449c2
JG
1518 ret = extract_common_subbuffer_info(stream, subbuf);
1519 if (ret) {
1520 goto end;
1521 }
309167d2 1522
6f9449c2
JG
1523 ret = kernctl_get_packet_size(
1524 stream->wait_fd, &subbuf->info.data.packet_size);
1525 if (ret < 0) {
1526 PERROR("Failed to get sub-buffer packet size");
1527 goto end;
1528 }
02d02e31 1529
6f9449c2
JG
1530 ret = kernctl_get_content_size(
1531 stream->wait_fd, &subbuf->info.data.content_size);
1532 if (ret < 0) {
1533 PERROR("Failed to get sub-buffer content size");
1534 goto end;
d41f73b7
MD
1535 }
1536
6f9449c2
JG
1537 ret = kernctl_get_timestamp_begin(
1538 stream->wait_fd, &subbuf->info.data.timestamp_begin);
1539 if (ret < 0) {
1540 PERROR("Failed to get sub-buffer begin timestamp");
1541 goto end;
1d4dfdef
DG
1542 }
1543
6f9449c2
JG
1544 ret = kernctl_get_timestamp_end(
1545 stream->wait_fd, &subbuf->info.data.timestamp_end);
1546 if (ret < 0) {
1547 PERROR("Failed to get sub-buffer end timestamp");
1548 goto end;
1549 }
1550
1551 ret = kernctl_get_events_discarded(
1552 stream->wait_fd, &subbuf->info.data.events_discarded);
1553 if (ret) {
1554 PERROR("Failed to get sub-buffer events discarded count");
1555 goto end;
1556 }
1557
1558 ret = kernctl_get_sequence_number(stream->wait_fd,
1559 &subbuf->info.data.sequence_number.value);
1560 if (ret) {
1561 /* May not be supported by older LTTng-modules. */
1562 if (ret != -ENOTTY) {
1563 PERROR("Failed to get sub-buffer sequence number");
1564 goto end;
fb83fe64 1565 }
1c20f0e2 1566 } else {
6f9449c2 1567 subbuf->info.data.sequence_number.is_set = true;
309167d2
JD
1568 }
1569
6f9449c2
JG
1570 ret = kernctl_get_stream_id(
1571 stream->wait_fd, &subbuf->info.data.stream_id);
1572 if (ret < 0) {
1573 PERROR("Failed to get stream id");
1574 goto end;
1575 }
1d4dfdef 1576
6f9449c2
JG
1577 ret = kernctl_get_instance_id(stream->wait_fd,
1578 &subbuf->info.data.stream_instance_id.value);
1579 if (ret) {
1580 /* May not be supported by older LTTng-modules. */
1581 if (ret != -ENOTTY) {
1582 PERROR("Failed to get stream instance id");
1583 goto end;
1d4dfdef 1584 }
6f9449c2
JG
1585 } else {
1586 subbuf->info.data.stream_instance_id.is_set = true;
1587 }
1588end:
1589 return ret;
1590}
47e81c02 1591
6f9449c2 1592static
b6797c8e
JG
1593enum get_next_subbuffer_status get_subbuffer_common(
1594 struct lttng_consumer_stream *stream,
6f9449c2
JG
1595 struct stream_subbuffer *subbuffer)
1596{
1597 int ret;
b6797c8e 1598 enum get_next_subbuffer_status status;
6f9449c2
JG
1599
1600 ret = kernctl_get_next_subbuf(stream->wait_fd);
b6797c8e
JG
1601 switch (ret) {
1602 case 0:
1603 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1604 break;
1605 case -ENODATA:
1606 case -EAGAIN:
6e5e3c51
MD
1607 /*
1608 * The caller only expects -ENODATA when there is no data to
1609 * read, but the kernel tracer returns -EAGAIN when there is
1610 * currently no data for a non-finalized stream, and -ENODATA
1611 * when there is no data for a finalized stream. Those can be
1612 * combined into a -ENODATA return value.
1613 */
b6797c8e
JG
1614 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1615 goto end;
1616 default:
1617 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
6f9449c2
JG
1618 goto end;
1619 }
1620
1621 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
b6797c8e
JG
1622 stream, subbuffer);
1623 if (ret) {
1624 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1625 }
6f9449c2 1626end:
b6797c8e 1627 return status;
6f9449c2 1628}
128708c3 1629
6f9449c2 1630static
b6797c8e
JG
1631enum get_next_subbuffer_status get_next_subbuffer_splice(
1632 struct lttng_consumer_stream *stream,
6f9449c2
JG
1633 struct stream_subbuffer *subbuffer)
1634{
b6797c8e
JG
1635 const enum get_next_subbuffer_status status =
1636 get_subbuffer_common(stream, subbuffer);
1d4dfdef 1637
b6797c8e 1638 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
6f9449c2
JG
1639 goto end;
1640 }
1d4dfdef 1641
6f9449c2
JG
1642 subbuffer->buffer.fd = stream->wait_fd;
1643end:
b6797c8e 1644 return status;
6f9449c2 1645}
fd424d99 1646
6f9449c2 1647static
b6797c8e
JG
1648enum get_next_subbuffer_status get_next_subbuffer_mmap(
1649 struct lttng_consumer_stream *stream,
6f9449c2
JG
1650 struct stream_subbuffer *subbuffer)
1651{
1652 int ret;
b6797c8e 1653 enum get_next_subbuffer_status status;
6f9449c2
JG
1654 const char *addr;
1655
b6797c8e
JG
1656 status = get_subbuffer_common(stream, subbuffer);
1657 if (status != GET_NEXT_SUBBUFFER_STATUS_OK) {
6f9449c2 1658 goto end;
128708c3 1659 }
6f9449c2
JG
1660
1661 ret = get_current_subbuf_addr(stream, &addr);
1662 if (ret) {
b6797c8e 1663 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
6f9449c2 1664 goto end;
d41f73b7 1665 }
6f9449c2
JG
1666
1667 subbuffer->buffer.buffer = lttng_buffer_view_init(
1668 addr, 0, subbuffer->info.data.padded_subbuf_size);
1669end:
b6797c8e 1670 return status;
6f9449c2
JG
1671}
1672
f5ba75b4 1673static
b6797c8e 1674enum get_next_subbuffer_status get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
f5ba75b4
JG
1675 struct stream_subbuffer *subbuffer)
1676{
1677 int ret;
1678 const char *addr;
1679 bool coherent;
b6797c8e 1680 enum get_next_subbuffer_status status;
f5ba75b4
JG
1681
1682 ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
1683 &coherent);
1684 if (ret) {
1685 goto end;
1686 }
1687
1688 ret = stream->read_subbuffer_ops.extract_subbuffer_info(
1689 stream, subbuffer);
1690 if (ret) {
1691 goto end;
1692 }
1693
1694 LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
1695
1696 ret = get_current_subbuf_addr(stream, &addr);
1697 if (ret) {
1698 goto end;
1699 }
1700
1701 subbuffer->buffer.buffer = lttng_buffer_view_init(
1702 addr, 0, subbuffer->info.data.padded_subbuf_size);
1703 DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
1704 subbuffer->info.metadata.padded_subbuf_size,
1705 coherent ? "true" : "false");
1706end:
6e5e3c51
MD
1707 /*
1708 * The caller only expects -ENODATA when there is no data to read, but
1709 * the kernel tracer returns -EAGAIN when there is currently no data
1710 * for a non-finalized stream, and -ENODATA when there is no data for a
1711 * finalized stream. Those can be combined into a -ENODATA return value.
1712 */
b6797c8e
JG
1713 switch (ret) {
1714 case 0:
1715 status = GET_NEXT_SUBBUFFER_STATUS_OK;
1716 break;
1717 case -ENODATA:
1718 case -EAGAIN:
1719 /*
1720 * The caller only expects -ENODATA when there is no data to
1721 * read, but the kernel tracer returns -EAGAIN when there is
1722 * currently no data for a non-finalized stream, and -ENODATA
1723 * when there is no data for a finalized stream. Those can be
1724 * combined into a -ENODATA return value.
1725 */
1726 status = GET_NEXT_SUBBUFFER_STATUS_NO_DATA;
1727 break;
1728 default:
1729 status = GET_NEXT_SUBBUFFER_STATUS_ERROR;
1730 break;
6e5e3c51
MD
1731 }
1732
b6797c8e 1733 return status;
f5ba75b4
JG
1734}
1735
6f9449c2
JG
1736static
1737int put_next_subbuffer(struct lttng_consumer_stream *stream,
1738 struct stream_subbuffer *subbuffer)
1739{
1740 const int ret = kernctl_put_next_subbuf(stream->wait_fd);
1741
1742 if (ret) {
1743 if (ret == -EFAULT) {
1744 PERROR("Error in unreserving sub buffer");
1745 } else if (ret == -EIO) {
d41f73b7 1746 /* Should never happen with newer LTTng versions */
6f9449c2 1747 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted");
d41f73b7 1748 }
d41f73b7
MD
1749 }
1750
6f9449c2
JG
1751 return ret;
1752}
1c20f0e2 1753
f5ba75b4
JG
1754static
1755bool is_get_next_check_metadata_available(int tracer_fd)
1756{
741e787b
JG
1757 const int ret = kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL);
1758 const bool available = ret != -ENOTTY;
1759
1760 if (ret == 0) {
1761 /* get succeeded, make sure to put the subbuffer. */
1762 kernctl_put_subbuf(tracer_fd);
1763 }
1764
1765 return available;
f5ba75b4
JG
1766}
1767
091441eb
MD
1768static
1769int signal_metadata(struct lttng_consumer_stream *stream,
1770 struct lttng_consumer_local_data *ctx)
1771{
1772 ASSERT_LOCKED(stream->metadata_rdv_lock);
1773 return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
1774}
1775
f5ba75b4
JG
1776static
1777int lttng_kconsumer_set_stream_ops(
6f9449c2
JG
1778 struct lttng_consumer_stream *stream)
1779{
f5ba75b4
JG
1780 int ret = 0;
1781
1782 if (stream->metadata_flag && stream->chan->is_live) {
1783 DBG("Attempting to enable metadata bucketization for live consumers");
1784 if (is_get_next_check_metadata_available(stream->wait_fd)) {
1785 DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
1786 stream->read_subbuffer_ops.get_next_subbuffer =
1787 get_next_subbuffer_metadata_check;
1788 ret = consumer_stream_enable_metadata_bucketization(
1789 stream);
1790 if (ret) {
1791 goto end;
1792 }
1793 } else {
1794 /*
1795 * The kernel tracer version is too old to indicate
1796 * when the metadata stream has reached a "coherent"
1797 * (parseable) point.
1798 *
1799 * This means that a live viewer may see an incoherent
1800 * sequence of metadata and fail to parse it.
1801 */
1802 WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
1803 metadata_bucket_destroy(stream->metadata_bucket);
1804 stream->metadata_bucket = NULL;
1805 }
091441eb
MD
1806
1807 stream->read_subbuffer_ops.on_sleep = signal_metadata;
f5ba75b4
JG
1808 }
1809
1810 if (!stream->read_subbuffer_ops.get_next_subbuffer) {
1811 if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
1812 stream->read_subbuffer_ops.get_next_subbuffer =
1813 get_next_subbuffer_mmap;
1814 } else {
1815 stream->read_subbuffer_ops.get_next_subbuffer =
1816 get_next_subbuffer_splice;
1817 }
94d49140
JD
1818 }
1819
6f9449c2
JG
1820 if (stream->metadata_flag) {
1821 stream->read_subbuffer_ops.extract_subbuffer_info =
1822 extract_metadata_subbuffer_info;
1823 } else {
1824 stream->read_subbuffer_ops.extract_subbuffer_info =
1825 extract_data_subbuffer_info;
1826 if (stream->chan->is_live) {
1827 stream->read_subbuffer_ops.send_live_beacon =
1828 consumer_flush_kernel_index;
1829 }
309167d2
JD
1830 }
1831
6f9449c2 1832 stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
f5ba75b4
JG
1833end:
1834 return ret;
d41f73b7
MD
1835}
1836
1837int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1838{
1839 int ret;
ffe60014 1840
a0377dfe 1841 LTTNG_ASSERT(stream);
ffe60014 1842
2bba9e53 1843 /*
d2956687
JG
1844 * Don't create anything if this is set for streaming or if there is
1845 * no current trace chunk on the parent channel.
2bba9e53 1846 */
d2956687
JG
1847 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
1848 stream->chan->trace_chunk) {
1849 ret = consumer_stream_create_output_files(stream, true);
1850 if (ret) {
fe4477ee
JD
1851 goto error;
1852 }
ffe60014 1853 }
d41f73b7 1854
d41f73b7
MD
1855 if (stream->output == LTTNG_EVENT_MMAP) {
1856 /* get the len of the mmap region */
1857 unsigned long mmap_len;
1858
1859 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1860 if (ret != 0) {
ffe60014 1861 PERROR("kernctl_get_mmap_len");
d41f73b7
MD
1862 goto error_close_fd;
1863 }
1864 stream->mmap_len = (size_t) mmap_len;
1865
ffe60014
DG
1866 stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
1867 MAP_PRIVATE, stream->wait_fd, 0);
d41f73b7 1868 if (stream->mmap_base == MAP_FAILED) {
ffe60014 1869 PERROR("Error mmaping");
d41f73b7
MD
1870 ret = -1;
1871 goto error_close_fd;
1872 }
1873 }
1874
f5ba75b4
JG
1875 ret = lttng_kconsumer_set_stream_ops(stream);
1876 if (ret) {
1877 goto error_close_fd;
1878 }
6f9449c2 1879
d41f73b7
MD
1880 /* we return 0 to let the library handle the FD internally */
1881 return 0;
1882
1883error_close_fd:
2f225ce2 1884 if (stream->out_fd >= 0) {
d41f73b7
MD
1885 int err;
1886
1887 err = close(stream->out_fd);
a0377dfe 1888 LTTNG_ASSERT(!err);
2f225ce2 1889 stream->out_fd = -1;
d41f73b7
MD
1890 }
1891error:
1892 return ret;
1893}
1894
ca22feea
DG
1895/*
1896 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
1897 * stream. Consumer data lock MUST be acquired before calling this function
1898 * and the stream lock.
ca22feea 1899 *
6d805429 1900 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
1901 * data is available for trace viewer reading.
1902 */
6d805429 1903int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
1904{
1905 int ret;
1906
a0377dfe 1907 LTTNG_ASSERT(stream);
ca22feea 1908
873b9e9a
MD
1909 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1910 ret = 0;
1911 goto end;
1912 }
1913
ca22feea
DG
1914 ret = kernctl_get_next_subbuf(stream->wait_fd);
1915 if (ret == 0) {
1916 /* There is still data so let's put back this subbuffer. */
1917 ret = kernctl_put_subbuf(stream->wait_fd);
a0377dfe 1918 LTTNG_ASSERT(ret == 0);
6d805429 1919 ret = 1; /* Data is pending */
4e9a4686 1920 goto end;
ca22feea
DG
1921 }
1922
6d805429
DG
1923 /* Data is NOT pending and ready to be read. */
1924 ret = 0;
ca22feea 1925
6efae65e
DG
1926end:
1927 return ret;
ca22feea 1928}
This page took 0.188934 seconds and 4 git commands to generate.