consumerd: move address computation from on_read_subbuffer_mmap
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
b3530820 4 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3bd1e081 5 *
d14d33bf
AM
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
3bd1e081
MD
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
d14d33bf
AM
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
18 */
19
7775df52 20#include <stdint.h>
6c1c0768 21#define _LGPL_SOURCE
3bd1e081 22#include <assert.h>
3bd1e081
MD
23#include <poll.h>
24#include <pthread.h>
25#include <stdlib.h>
26#include <string.h>
27#include <sys/mman.h>
28#include <sys/socket.h>
29#include <sys/types.h>
77c7c900 30#include <inttypes.h>
3bd1e081 31#include <unistd.h>
dbb5dfe6 32#include <sys/stat.h>
3bd1e081 33
51a9e1c7 34#include <bin/lttng-consumerd/health-consumerd.h>
990570ed 35#include <common/common.h>
10a8a223 36#include <common/kernel-ctl/kernel-ctl.h>
10a8a223 37#include <common/sessiond-comm/sessiond-comm.h>
00e2e675 38#include <common/sessiond-comm/relayd.h>
dbb5dfe6 39#include <common/compat/fcntl.h>
f263b7fd 40#include <common/compat/endian.h>
acdb9057 41#include <common/pipe.h>
00e2e675 42#include <common/relayd/relayd.h>
fe4477ee 43#include <common/utils.h>
c8fea79c 44#include <common/consumer/consumer-stream.h>
309167d2 45#include <common/index/index.h>
c8fea79c 46#include <common/consumer/consumer-timer.h>
e5148e25 47#include <common/optional.h>
0857097f 48
10a8a223 49#include "kernel-consumer.h"
3bd1e081
MD
50
51extern struct lttng_consumer_global_data consumer_data;
52extern int consumer_poll_timeout;
3bd1e081 53
3bd1e081
MD
54/*
55 * Take a snapshot for a specific fd
56 *
57 * Returns 0 on success, < 0 on error
58 */
ffe60014 59int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081
MD
60{
61 int ret = 0;
62 int infd = stream->wait_fd;
63
64 ret = kernctl_snapshot(infd);
d2d2f190
JD
65 /*
66 * -EAGAIN is not an error, it just means that there is no data to
67 * be read.
68 */
69 if (ret != 0 && ret != -EAGAIN) {
5a510c9f 70 PERROR("Getting sub-buffer snapshot.");
3bd1e081
MD
71 }
72
73 return ret;
74}
75
e9404c27
JG
76/*
77 * Sample consumed and produced positions for a specific fd.
78 *
79 * Returns 0 on success, < 0 on error.
80 */
81int lttng_kconsumer_sample_snapshot_positions(
82 struct lttng_consumer_stream *stream)
83{
84 assert(stream);
85
86 return kernctl_snapshot_sample_positions(stream->wait_fd);
87}
88
3bd1e081
MD
89/*
90 * Get the produced position
91 *
92 * Returns 0 on success, < 0 on error
93 */
ffe60014 94int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
3bd1e081
MD
95 unsigned long *pos)
96{
97 int ret;
98 int infd = stream->wait_fd;
99
100 ret = kernctl_snapshot_get_produced(infd, pos);
101 if (ret != 0) {
5a510c9f 102 PERROR("kernctl_snapshot_get_produced");
3bd1e081
MD
103 }
104
105 return ret;
106}
107
07b86b52
JD
108/*
109 * Get the consumerd position
110 *
111 * Returns 0 on success, < 0 on error
112 */
113int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
114 unsigned long *pos)
115{
116 int ret;
117 int infd = stream->wait_fd;
118
119 ret = kernctl_snapshot_get_consumed(infd, pos);
120 if (ret != 0) {
5a510c9f 121 PERROR("kernctl_snapshot_get_consumed");
07b86b52
JD
122 }
123
124 return ret;
125}
126
7775df52
JG
127static
128int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
129 const char **addr)
130{
131 int ret;
132 unsigned long mmap_offset;
133 const char *mmap_base = stream->mmap_base;
134
135 ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
136 if (ret < 0) {
137 PERROR("Failed to get mmap read offset");
138 goto error;
139 }
140
141 *addr = mmap_base + mmap_offset;
142error:
143 return ret;
144}
145
07b86b52
JD
146/*
147 * Take a snapshot of all the stream of a channel
b0226bd4 148 * RCU read-side lock must be held across this function to ensure existence of
e5148e25 149 * channel. The channel lock must be held by the caller.
07b86b52
JD
150 *
151 * Returns 0 on success, < 0 on error
152 */
f0f048c9
JG
153static int lttng_kconsumer_snapshot_channel(
154 struct lttng_consumer_channel *channel,
155 uint64_t key, char *path, uint64_t relayd_id,
156 uint64_t nb_packets_per_stream,
5c786ded 157 struct lttng_consumer_local_data *ctx)
07b86b52
JD
158{
159 int ret;
07b86b52
JD
160 struct lttng_consumer_stream *stream;
161
6a00837f 162 DBG("Kernel consumer snapshot channel %" PRIu64, key);
07b86b52
JD
163
164 rcu_read_lock();
165
07b86b52
JD
166 /* Splice is not supported yet for channel snapshot. */
167 if (channel->output != CONSUMER_CHANNEL_MMAP) {
7318a78f
JG
168 ERR("Unsupported output type for channel \"%s\": mmap output is required to record a snapshot",
169 channel->name);
07b86b52
JD
170 ret = -1;
171 goto end;
172 }
173
10a50311 174 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
923333cd 175 unsigned long consumed_pos, produced_pos;
9ce5646a
MD
176
177 health_code_update();
178
07b86b52
JD
179 /*
180 * Lock stream because we are about to change its state.
181 */
182 pthread_mutex_lock(&stream->lock);
183
e5148e25
JG
184 assert(channel->trace_chunk);
185 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
186 /*
187 * Can't happen barring an internal error as the channel
188 * holds a reference to the trace chunk.
189 */
190 ERR("Failed to acquire reference to channel's trace chunk");
191 ret = -1;
192 goto end_unlock;
193 }
194 assert(!stream->trace_chunk);
195 stream->trace_chunk = channel->trace_chunk;
196
29decac3
DG
197 /*
198 * Assign the received relayd ID so we can use it for streaming. The streams
199 * are not visible to anyone so this is OK to change it.
200 */
07b86b52
JD
201 stream->net_seq_idx = relayd_id;
202 channel->relayd_id = relayd_id;
203 if (relayd_id != (uint64_t) -1ULL) {
10a50311 204 ret = consumer_send_relayd_stream(stream, path);
07b86b52
JD
205 if (ret < 0) {
206 ERR("sending stream to relayd");
207 goto end_unlock;
208 }
07b86b52 209 } else {
e5148e25
JG
210 ret = consumer_stream_create_output_files(stream,
211 false);
07b86b52 212 if (ret < 0) {
07b86b52
JD
213 goto end_unlock;
214 }
e5148e25
JG
215 DBG("Kernel consumer snapshot stream (%" PRIu64 ")",
216 stream->key);
07b86b52
JD
217 }
218
f22dd891 219 ret = kernctl_buffer_flush_empty(stream->wait_fd);
07b86b52 220 if (ret < 0) {
f22dd891
MD
221 /*
222 * Doing a buffer flush which does not take into
223 * account empty packets. This is not perfect
224 * for stream intersection, but required as a
225 * fall-back when "flush_empty" is not
226 * implemented by lttng-modules.
227 */
228 ret = kernctl_buffer_flush(stream->wait_fd);
229 if (ret < 0) {
230 ERR("Failed to flush kernel stream");
231 goto end_unlock;
232 }
07b86b52
JD
233 goto end_unlock;
234 }
235
236 ret = lttng_kconsumer_take_snapshot(stream);
237 if (ret < 0) {
238 ERR("Taking kernel snapshot");
239 goto end_unlock;
240 }
241
242 ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos);
243 if (ret < 0) {
244 ERR("Produced kernel snapshot position");
245 goto end_unlock;
246 }
247
248 ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos);
249 if (ret < 0) {
250 ERR("Consumerd kernel snapshot position");
251 goto end_unlock;
252 }
253
d07ceecd
MD
254 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
255 produced_pos, nb_packets_per_stream,
256 stream->max_sb_size);
5c786ded 257
0fdaf1ed 258 while ((long) (consumed_pos - produced_pos) < 0) {
07b86b52
JD
259 ssize_t read_len;
260 unsigned long len, padded_len;
7775df52 261 const char *subbuf_addr;
07b86b52 262
9ce5646a
MD
263 health_code_update();
264
07b86b52
JD
265 DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos);
266
267 ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos);
268 if (ret < 0) {
32af2c95 269 if (ret != -EAGAIN) {
07b86b52
JD
270 PERROR("kernctl_get_subbuf snapshot");
271 goto end_unlock;
272 }
273 DBG("Kernel consumer get subbuf failed. Skipping it.");
274 consumed_pos += stream->max_sb_size;
ddc93ee4 275 stream->chan->lost_packets++;
07b86b52
JD
276 continue;
277 }
278
279 ret = kernctl_get_subbuf_size(stream->wait_fd, &len);
280 if (ret < 0) {
281 ERR("Snapshot kernctl_get_subbuf_size");
29decac3 282 goto error_put_subbuf;
07b86b52
JD
283 }
284
285 ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len);
286 if (ret < 0) {
287 ERR("Snapshot kernctl_get_padded_subbuf_size");
29decac3 288 goto error_put_subbuf;
07b86b52
JD
289 }
290
7775df52
JG
291 ret = get_current_subbuf_addr(stream, &subbuf_addr);
292 if (ret) {
293 goto error_put_subbuf;
294 }
295
296 read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
297 stream, subbuf_addr, len,
309167d2 298 padded_len - len, NULL);
07b86b52 299 /*
29decac3
DG
300 * We write the padded len in local tracefiles but the data len
301 * when using a relay. Display the error but continue processing
302 * to try to release the subbuffer.
07b86b52
JD
303 */
304 if (relayd_id != (uint64_t) -1ULL) {
305 if (read_len != len) {
306 ERR("Error sending to the relay (ret: %zd != len: %lu)",
307 read_len, len);
308 }
309 } else {
310 if (read_len != padded_len) {
311 ERR("Error writing to tracefile (ret: %zd != len: %lu)",
312 read_len, padded_len);
313 }
314 }
315
316 ret = kernctl_put_subbuf(stream->wait_fd);
317 if (ret < 0) {
318 ERR("Snapshot kernctl_put_subbuf");
319 goto end_unlock;
320 }
321 consumed_pos += stream->max_sb_size;
322 }
323
324 if (relayd_id == (uint64_t) -1ULL) {
fdf9986c
MD
325 if (stream->out_fd >= 0) {
326 ret = close(stream->out_fd);
327 if (ret < 0) {
328 PERROR("Kernel consumer snapshot close out_fd");
329 goto end_unlock;
330 }
331 stream->out_fd = -1;
07b86b52 332 }
07b86b52
JD
333 } else {
334 close_relayd_stream(stream);
335 stream->net_seq_idx = (uint64_t) -1ULL;
336 }
e5148e25
JG
337 lttng_trace_chunk_put(stream->trace_chunk);
338 stream->trace_chunk = NULL;
07b86b52
JD
339 pthread_mutex_unlock(&stream->lock);
340 }
341
342 /* All good! */
343 ret = 0;
344 goto end;
345
29decac3
DG
346error_put_subbuf:
347 ret = kernctl_put_subbuf(stream->wait_fd);
348 if (ret < 0) {
349 ERR("Snapshot kernctl_put_subbuf error path");
350 }
07b86b52
JD
351end_unlock:
352 pthread_mutex_unlock(&stream->lock);
353end:
354 rcu_read_unlock();
355 return ret;
356}
357
358/*
359 * Read the whole metadata available for a snapshot.
b0226bd4 360 * RCU read-side lock must be held across this function to ensure existence of
e5148e25 361 * metadata_channel. The channel lock must be held by the caller.
07b86b52
JD
362 *
363 * Returns 0 on success, < 0 on error
364 */
e5148e25
JG
365static int lttng_kconsumer_snapshot_metadata(
366 struct lttng_consumer_channel *metadata_channel,
b0226bd4
MD
367 uint64_t key, char *path, uint64_t relayd_id,
368 struct lttng_consumer_local_data *ctx)
07b86b52 369{
d771f832
DG
370 int ret, use_relayd = 0;
371 ssize_t ret_read;
07b86b52 372 struct lttng_consumer_stream *metadata_stream;
d771f832
DG
373
374 assert(ctx);
07b86b52
JD
375
376 DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s",
377 key, path);
378
379 rcu_read_lock();
380
07b86b52
JD
381 metadata_stream = metadata_channel->metadata_stream;
382 assert(metadata_stream);
e5148e25 383
c55fe3e3 384 pthread_mutex_lock(&metadata_stream->lock);
e5148e25
JG
385 assert(metadata_channel->trace_chunk);
386 assert(metadata_stream->trace_chunk);
07b86b52 387
d771f832 388 /* Flag once that we have a valid relayd for the stream. */
e2039c7a 389 if (relayd_id != (uint64_t) -1ULL) {
d771f832
DG
390 use_relayd = 1;
391 }
392
393 if (use_relayd) {
10a50311 394 ret = consumer_send_relayd_stream(metadata_stream, path);
e2039c7a 395 if (ret < 0) {
c55fe3e3 396 goto error_snapshot;
e2039c7a 397 }
e2039c7a 398 } else {
e5148e25
JG
399 ret = consumer_stream_create_output_files(metadata_stream,
400 false);
e2039c7a 401 if (ret < 0) {
c55fe3e3 402 goto error_snapshot;
e2039c7a 403 }
07b86b52 404 }
07b86b52 405
d771f832 406 do {
9ce5646a
MD
407 health_code_update();
408
e5148e25 409 ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx);
d771f832 410 if (ret_read < 0) {
56591bac 411 if (ret_read != -EAGAIN) {
6a00837f 412 ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
d771f832 413 ret_read);
c55fe3e3
JG
414 ret = ret_read;
415 goto error_snapshot;
07b86b52 416 }
d771f832 417 /* ret_read is negative at this point so we will exit the loop. */
07b86b52
JD
418 continue;
419 }
d771f832 420 } while (ret_read >= 0);
07b86b52 421
d771f832
DG
422 if (use_relayd) {
423 close_relayd_stream(metadata_stream);
424 metadata_stream->net_seq_idx = (uint64_t) -1ULL;
425 } else {
fdf9986c
MD
426 if (metadata_stream->out_fd >= 0) {
427 ret = close(metadata_stream->out_fd);
428 if (ret < 0) {
429 PERROR("Kernel consumer snapshot metadata close out_fd");
430 /*
431 * Don't go on error here since the snapshot was successful at this
432 * point but somehow the close failed.
433 */
434 }
435 metadata_stream->out_fd = -1;
e5148e25
JG
436 lttng_trace_chunk_put(metadata_stream->trace_chunk);
437 metadata_stream->trace_chunk = NULL;
e2039c7a 438 }
e2039c7a
JD
439 }
440
07b86b52 441 ret = 0;
c55fe3e3
JG
442error_snapshot:
443 pthread_mutex_unlock(&metadata_stream->lock);
cf53a8a6
JD
444 cds_list_del(&metadata_stream->send_node);
445 consumer_stream_destroy(metadata_stream, NULL);
446 metadata_channel->metadata_stream = NULL;
07b86b52
JD
447 rcu_read_unlock();
448 return ret;
449}
450
1803a064
MD
451/*
452 * Receive command from session daemon and process it.
453 *
454 * Return 1 on success else a negative value or 0.
455 */
3bd1e081
MD
456int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
457 int sock, struct pollfd *consumer_sockpoll)
458{
459 ssize_t ret;
0c759fc9 460 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3bd1e081
MD
461 struct lttcomm_consumer_msg msg;
462
9ce5646a
MD
463 health_code_update();
464
3bd1e081
MD
465 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
466 if (ret != sizeof(msg)) {
1803a064 467 if (ret > 0) {
c6857fcf 468 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
1803a064
MD
469 ret = -1;
470 }
3bd1e081
MD
471 return ret;
472 }
9ce5646a
MD
473
474 health_code_update();
475
84382d49
MD
476 /* Deprecated command */
477 assert(msg.cmd_type != LTTNG_CONSUMER_STOP);
3bd1e081 478
9ce5646a
MD
479 health_code_update();
480
b0b335c8
MD
481 /* relayd needs RCU read-side protection */
482 rcu_read_lock();
483
3bd1e081 484 switch (msg.cmd_type) {
00e2e675
DG
485 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
486 {
f50f23d9 487 /* Session daemon status message are handled in the following call. */
2527bf85 488 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
7735ef9e 489 msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
d3e2ba59 490 &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
2527bf85 491 msg.u.relayd_sock.relayd_session_id);
00e2e675
DG
492 goto end_nosignal;
493 }
3bd1e081
MD
494 case LTTNG_CONSUMER_ADD_CHANNEL:
495 {
496 struct lttng_consumer_channel *new_channel;
e43c41c5 497 int ret_recv;
e5148e25 498 const uint64_t chunk_id = msg.u.channel.chunk_id.value;
3bd1e081 499
9ce5646a
MD
500 health_code_update();
501
f50f23d9
DG
502 /* First send a status message before receiving the fds. */
503 ret = consumer_send_status_msg(sock, ret_code);
504 if (ret < 0) {
505 /* Somehow, the session daemon is not responding anymore. */
1803a064 506 goto error_fatal;
f50f23d9 507 }
9ce5646a
MD
508
509 health_code_update();
510
d88aee68 511 DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
3bd1e081 512 new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
e5148e25
JG
513 msg.u.channel.session_id,
514 msg.u.channel.chunk_id.is_set ?
515 &chunk_id : NULL,
516 msg.u.channel.pathname,
517 msg.u.channel.name,
1624d5b7
JD
518 msg.u.channel.relayd_id, msg.u.channel.output,
519 msg.u.channel.tracefile_size,
1950109e 520 msg.u.channel.tracefile_count, 0,
ecc48a90 521 msg.u.channel.monitor,
d7ba1388 522 msg.u.channel.live_timer_interval,
3d071855 523 NULL, NULL);
3bd1e081 524 if (new_channel == NULL) {
f73fabfd 525 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
3bd1e081
MD
526 goto end_nosignal;
527 }
ffe60014 528 new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams;
95a1109b
JD
529 switch (msg.u.channel.output) {
530 case LTTNG_EVENT_SPLICE:
531 new_channel->output = CONSUMER_CHANNEL_SPLICE;
532 break;
533 case LTTNG_EVENT_MMAP:
534 new_channel->output = CONSUMER_CHANNEL_MMAP;
535 break;
536 default:
537 ERR("Channel output unknown %d", msg.u.channel.output);
538 goto end_nosignal;
539 }
ffe60014
DG
540
541 /* Translate and save channel type. */
542 switch (msg.u.channel.type) {
543 case CONSUMER_CHANNEL_TYPE_DATA:
544 case CONSUMER_CHANNEL_TYPE_METADATA:
545 new_channel->type = msg.u.channel.type;
546 break;
547 default:
548 assert(0);
549 goto end_nosignal;
550 };
551
9ce5646a
MD
552 health_code_update();
553
3bd1e081 554 if (ctx->on_recv_channel != NULL) {
e43c41c5
JD
555 ret_recv = ctx->on_recv_channel(new_channel);
556 if (ret_recv == 0) {
557 ret = consumer_add_channel(new_channel, ctx);
558 } else if (ret_recv < 0) {
3bd1e081
MD
559 goto end_nosignal;
560 }
561 } else {
e43c41c5 562 ret = consumer_add_channel(new_channel, ctx);
3bd1e081 563 }
e9404c27
JG
564 if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) {
565 int monitor_start_ret;
566
567 DBG("Consumer starting monitor timer");
94d49140
JD
568 consumer_timer_live_start(new_channel,
569 msg.u.channel.live_timer_interval);
e9404c27
JG
570 monitor_start_ret = consumer_timer_monitor_start(
571 new_channel,
572 msg.u.channel.monitor_timer_interval);
573 if (monitor_start_ret < 0) {
574 ERR("Starting channel monitoring timer failed");
575 goto end_nosignal;
576 }
577
94d49140 578 }
e43c41c5 579
9ce5646a
MD
580 health_code_update();
581
e43c41c5 582 /* If we received an error in add_channel, we need to report it. */
821fffb2 583 if (ret < 0) {
1803a064
MD
584 ret = consumer_send_status_msg(sock, ret);
585 if (ret < 0) {
586 goto error_fatal;
587 }
e43c41c5
JD
588 goto end_nosignal;
589 }
590
3bd1e081
MD
591 goto end_nosignal;
592 }
593 case LTTNG_CONSUMER_ADD_STREAM:
594 {
dae10966
DG
595 int fd;
596 struct lttng_pipe *stream_pipe;
00e2e675 597 struct lttng_consumer_stream *new_stream;
ffe60014 598 struct lttng_consumer_channel *channel;
c80048c6 599 int alloc_ret = 0;
3bd1e081 600
ffe60014
DG
601 /*
602 * Get stream's channel reference. Needed when adding the stream to the
603 * global hash table.
604 */
605 channel = consumer_find_channel(msg.u.stream.channel_key);
606 if (!channel) {
607 /*
608 * We could not find the channel. Can happen if cpu hotplug
609 * happens while tearing down.
610 */
d88aee68 611 ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key);
e462382a 612 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
ffe60014
DG
613 }
614
9ce5646a
MD
615 health_code_update();
616
f50f23d9
DG
617 /* First send a status message before receiving the fds. */
618 ret = consumer_send_status_msg(sock, ret_code);
1803a064 619 if (ret < 0) {
d771f832 620 /* Somehow, the session daemon is not responding anymore. */
e0e7757d 621 goto error_add_stream_fatal;
1803a064 622 }
9ce5646a
MD
623
624 health_code_update();
625
0c759fc9 626 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
d771f832 627 /* Channel was not found. */
e0e7757d 628 goto error_add_stream_nosignal;
f50f23d9
DG
629 }
630
d771f832 631 /* Blocking call */
9ce5646a
MD
632 health_poll_entry();
633 ret = lttng_consumer_poll_socket(consumer_sockpoll);
634 health_poll_exit();
84382d49 635 if (ret) {
e0e7757d 636 goto error_add_stream_fatal;
3bd1e081 637 }
00e2e675 638
9ce5646a
MD
639 health_code_update();
640
00e2e675 641 /* Get stream file descriptor from socket */
f2fc6720
MD
642 ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
643 if (ret != sizeof(fd)) {
f73fabfd 644 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
e0e7757d 645 goto end;
3bd1e081 646 }
3bd1e081 647
9ce5646a
MD
648 health_code_update();
649
f50f23d9
DG
650 /*
651 * Send status code to session daemon only if the recv works. If the
652 * above recv() failed, the session daemon is notified through the
653 * error socket and the teardown is eventually done.
654 */
655 ret = consumer_send_status_msg(sock, ret_code);
656 if (ret < 0) {
657 /* Somehow, the session daemon is not responding anymore. */
e0e7757d 658 goto error_add_stream_nosignal;
f50f23d9
DG
659 }
660
9ce5646a
MD
661 health_code_update();
662
e5148e25 663 pthread_mutex_lock(&channel->lock);
ffe60014
DG
664 new_stream = consumer_allocate_stream(channel->key,
665 fd,
ffe60014 666 channel->name,
ffe60014
DG
667 channel->relayd_id,
668 channel->session_id,
e5148e25 669 channel->trace_chunk,
ffe60014
DG
670 msg.u.stream.cpu,
671 &alloc_ret,
4891ece8 672 channel->type,
e5148e25 673 channel->monitor);
3bd1e081 674 if (new_stream == NULL) {
c80048c6
MD
675 switch (alloc_ret) {
676 case -ENOMEM:
677 case -EINVAL:
678 default:
679 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
680 break;
c80048c6 681 }
e5148e25 682 pthread_mutex_unlock(&channel->lock);
e0e7757d 683 goto error_add_stream_nosignal;
3bd1e081 684 }
d771f832 685
ffe60014
DG
686 new_stream->chan = channel;
687 new_stream->wait_fd = fd;
2c42aa5c
JG
688 ret = kernctl_get_max_subbuf_size(new_stream->wait_fd,
689 &new_stream->max_sb_size);
690 if (ret < 0) {
691 pthread_mutex_unlock(&channel->lock);
692 ERR("Failed to get kernel maximal subbuffer size");
e0e7757d 693 goto error_add_stream_nosignal;
2c42aa5c
JG
694 }
695
d9a2e16e
JD
696 consumer_stream_update_channel_attributes(new_stream,
697 channel);
07b86b52
JD
698 switch (channel->output) {
699 case CONSUMER_CHANNEL_SPLICE:
700 new_stream->output = LTTNG_EVENT_SPLICE;
a2361a61
JD
701 ret = utils_create_pipe(new_stream->splice_pipe);
702 if (ret < 0) {
e5148e25 703 pthread_mutex_unlock(&channel->lock);
e0e7757d 704 goto error_add_stream_nosignal;
a2361a61 705 }
07b86b52
JD
706 break;
707 case CONSUMER_CHANNEL_MMAP:
708 new_stream->output = LTTNG_EVENT_MMAP;
709 break;
710 default:
711 ERR("Stream output unknown %d", channel->output);
e5148e25 712 pthread_mutex_unlock(&channel->lock);
e0e7757d 713 goto error_add_stream_nosignal;
07b86b52 714 }
00e2e675 715
a0c83db9
DG
716 /*
717 * We've just assigned the channel to the stream so increment the
07b86b52
JD
718 * refcount right now. We don't need to increment the refcount for
719 * streams in no monitor because we handle manually the cleanup of
720 * those. It is very important to make sure there is NO prior
721 * consumer_del_stream() calls or else the refcount will be unbalanced.
a0c83db9 722 */
07b86b52
JD
723 if (channel->monitor) {
724 uatomic_inc(&new_stream->chan->refcount);
725 }
9d9353f9 726
fb3a43a9
DG
727 /*
728 * The buffer flush is done on the session daemon side for the kernel
729 * so no need for the stream "hangup_flush_done" variable to be
730 * tracked. This is important for a kernel stream since we don't rely
731 * on the flush state of the stream to read data. It's not the case for
732 * user space tracing.
733 */
734 new_stream->hangup_flush_done = 0;
735
9ce5646a
MD
736 health_code_update();
737
e5148e25 738 pthread_mutex_lock(&new_stream->lock);
633d0084
DG
739 if (ctx->on_recv_stream) {
740 ret = ctx->on_recv_stream(new_stream);
741 if (ret < 0) {
e5148e25
JG
742 pthread_mutex_unlock(&new_stream->lock);
743 pthread_mutex_unlock(&channel->lock);
d771f832 744 consumer_stream_free(new_stream);
e0e7757d 745 goto error_add_stream_nosignal;
fb3a43a9 746 }
633d0084 747 }
9ce5646a
MD
748 health_code_update();
749
07b86b52
JD
750 if (new_stream->metadata_flag) {
751 channel->metadata_stream = new_stream;
752 }
753
2bba9e53
DG
754 /* Do not monitor this stream. */
755 if (!channel->monitor) {
5eecee74 756 DBG("Kernel consumer add stream %s in no monitor mode with "
6dc3064a 757 "relayd id %" PRIu64, new_stream->name,
5eecee74 758 new_stream->net_seq_idx);
10a50311 759 cds_list_add(&new_stream->send_node, &channel->streams.head);
e5148e25
JG
760 pthread_mutex_unlock(&new_stream->lock);
761 pthread_mutex_unlock(&channel->lock);
e0e7757d 762 goto end_add_stream;
6dc3064a
DG
763 }
764
e1b71bdc
DG
765 /* Send stream to relayd if the stream has an ID. */
766 if (new_stream->net_seq_idx != (uint64_t) -1ULL) {
194ee077
DG
767 ret = consumer_send_relayd_stream(new_stream,
768 new_stream->chan->pathname);
e1b71bdc 769 if (ret < 0) {
e5148e25
JG
770 pthread_mutex_unlock(&new_stream->lock);
771 pthread_mutex_unlock(&channel->lock);
e1b71bdc 772 consumer_stream_free(new_stream);
e0e7757d 773 goto error_add_stream_nosignal;
e1b71bdc 774 }
001b7e62
MD
775
776 /*
777 * If adding an extra stream to an already
778 * existing channel (e.g. cpu hotplug), we need
779 * to send the "streams_sent" command to relayd.
780 */
781 if (channel->streams_sent_to_relayd) {
782 ret = consumer_send_relayd_streams_sent(
783 new_stream->net_seq_idx);
784 if (ret < 0) {
e5148e25
JG
785 pthread_mutex_unlock(&new_stream->lock);
786 pthread_mutex_unlock(&channel->lock);
e0e7757d 787 goto error_add_stream_nosignal;
001b7e62
MD
788 }
789 }
e2039c7a 790 }
e5148e25
JG
791 pthread_mutex_unlock(&new_stream->lock);
792 pthread_mutex_unlock(&channel->lock);
e2039c7a 793
50f8ae69 794 /* Get the right pipe where the stream will be sent. */
633d0084 795 if (new_stream->metadata_flag) {
66d583dc 796 consumer_add_metadata_stream(new_stream);
dae10966 797 stream_pipe = ctx->consumer_metadata_pipe;
3bd1e081 798 } else {
66d583dc 799 consumer_add_data_stream(new_stream);
dae10966 800 stream_pipe = ctx->consumer_data_pipe;
50f8ae69
DG
801 }
802
66d583dc 803 /* Visible to other threads */
5ab66908
MD
804 new_stream->globally_visible = 1;
805
9ce5646a
MD
806 health_code_update();
807
dae10966 808 ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream));
50f8ae69 809 if (ret < 0) {
dae10966 810 ERR("Consumer write %s stream to pipe %d",
50f8ae69 811 new_stream->metadata_flag ? "metadata" : "data",
dae10966 812 lttng_pipe_get_writefd(stream_pipe));
5ab66908
MD
813 if (new_stream->metadata_flag) {
814 consumer_del_stream_for_metadata(new_stream);
815 } else {
816 consumer_del_stream_for_data(new_stream);
817 }
e0e7757d 818 goto error_add_stream_nosignal;
3bd1e081 819 }
00e2e675 820
02d02e31
JD
821 DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64,
822 new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id);
e0e7757d 823end_add_stream:
3bd1e081 824 break;
e0e7757d
JG
825error_add_stream_nosignal:
826 goto end_nosignal;
827error_add_stream_fatal:
828 goto error_fatal;
3bd1e081 829 }
a4baae1b
JD
830 case LTTNG_CONSUMER_STREAMS_SENT:
831 {
832 struct lttng_consumer_channel *channel;
833
834 /*
835 * Get stream's channel reference. Needed when adding the stream to the
836 * global hash table.
837 */
838 channel = consumer_find_channel(msg.u.sent_streams.channel_key);
839 if (!channel) {
840 /*
841 * We could not find the channel. Can happen if cpu hotplug
842 * happens while tearing down.
843 */
844 ERR("Unable to find channel key %" PRIu64,
845 msg.u.sent_streams.channel_key);
e462382a 846 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
a4baae1b
JD
847 }
848
849 health_code_update();
850
851 /*
852 * Send status code to session daemon.
853 */
854 ret = consumer_send_status_msg(sock, ret_code);
f261ad0a 855 if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
a4baae1b 856 /* Somehow, the session daemon is not responding anymore. */
6ec577e9 857 goto error_streams_sent_nosignal;
a4baae1b
JD
858 }
859
860 health_code_update();
861
862 /*
863 * We should not send this message if we don't monitor the
864 * streams in this channel.
865 */
866 if (!channel->monitor) {
6ec577e9 867 goto end_error_streams_sent;
a4baae1b
JD
868 }
869
870 health_code_update();
871 /* Send stream to relayd if the stream has an ID. */
872 if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) {
873 ret = consumer_send_relayd_streams_sent(
874 msg.u.sent_streams.net_seq_idx);
875 if (ret < 0) {
6ec577e9 876 goto error_streams_sent_nosignal;
a4baae1b 877 }
001b7e62 878 channel->streams_sent_to_relayd = true;
a4baae1b 879 }
6ec577e9 880end_error_streams_sent:
a4baae1b 881 break;
6ec577e9
JG
882error_streams_sent_nosignal:
883 goto end_nosignal;
a4baae1b 884 }
3bd1e081
MD
885 case LTTNG_CONSUMER_UPDATE_STREAM:
886 {
3f8e211f
DG
887 rcu_read_unlock();
888 return -ENOSYS;
889 }
890 case LTTNG_CONSUMER_DESTROY_RELAYD:
891 {
a6ba4fe1 892 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
3f8e211f
DG
893 struct consumer_relayd_sock_pair *relayd;
894
a6ba4fe1 895 DBG("Kernel consumer destroying relayd %" PRIu64, index);
3f8e211f
DG
896
897 /* Get relayd reference if exists. */
a6ba4fe1 898 relayd = consumer_find_relayd(index);
3f8e211f 899 if (relayd == NULL) {
3448e266 900 DBG("Unable to find relayd %" PRIu64, index);
e462382a 901 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
3bd1e081 902 }
3f8e211f 903
a6ba4fe1
DG
904 /*
905 * Each relayd socket pair has a refcount of stream attached to it
906 * which tells if the relayd is still active or not depending on the
907 * refcount value.
908 *
909 * This will set the destroy flag of the relayd object and destroy it
910 * if the refcount reaches zero when called.
911 *
912 * The destroy can happen either here or when a stream fd hangs up.
913 */
f50f23d9
DG
914 if (relayd) {
915 consumer_flag_relayd_for_destroy(relayd);
916 }
917
9ce5646a
MD
918 health_code_update();
919
f50f23d9
DG
920 ret = consumer_send_status_msg(sock, ret_code);
921 if (ret < 0) {
922 /* Somehow, the session daemon is not responding anymore. */
1803a064 923 goto error_fatal;
f50f23d9 924 }
3f8e211f 925
3f8e211f 926 goto end_nosignal;
3bd1e081 927 }
6d805429 928 case LTTNG_CONSUMER_DATA_PENDING:
53632229 929 {
c8f59ee5 930 int32_t ret;
6d805429 931 uint64_t id = msg.u.data_pending.session_id;
c8f59ee5 932
6d805429 933 DBG("Kernel consumer data pending command for id %" PRIu64, id);
c8f59ee5 934
6d805429 935 ret = consumer_data_pending(id);
c8f59ee5 936
9ce5646a
MD
937 health_code_update();
938
c8f59ee5
DG
939 /* Send back returned value to session daemon */
940 ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
941 if (ret < 0) {
6d805429 942 PERROR("send data pending ret code");
1803a064 943 goto error_fatal;
c8f59ee5 944 }
f50f23d9
DG
945
946 /*
947 * No need to send back a status message since the data pending
948 * returned value is the response.
949 */
c8f59ee5 950 break;
53632229 951 }
6dc3064a
DG
952 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
953 {
b0226bd4
MD
954 struct lttng_consumer_channel *channel;
955 uint64_t key = msg.u.snapshot_channel.key;
956
957 channel = consumer_find_channel(key);
958 if (!channel) {
959 ERR("Channel %" PRIu64 " not found", key);
960 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
07b86b52 961 } else {
e5148e25 962 pthread_mutex_lock(&channel->lock);
b0226bd4
MD
963 if (msg.u.snapshot_channel.metadata == 1) {
964 ret = lttng_kconsumer_snapshot_metadata(channel, key,
965 msg.u.snapshot_channel.pathname,
966 msg.u.snapshot_channel.relayd_id, ctx);
967 if (ret < 0) {
968 ERR("Snapshot metadata failed");
969 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
970 }
971 } else {
972 ret = lttng_kconsumer_snapshot_channel(channel, key,
973 msg.u.snapshot_channel.pathname,
974 msg.u.snapshot_channel.relayd_id,
975 msg.u.snapshot_channel.nb_packets_per_stream,
976 ctx);
977 if (ret < 0) {
978 ERR("Snapshot channel failed");
979 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
980 }
07b86b52 981 }
e5148e25 982 pthread_mutex_unlock(&channel->lock);
07b86b52 983 }
9ce5646a
MD
984 health_code_update();
985
6dc3064a
DG
986 ret = consumer_send_status_msg(sock, ret_code);
987 if (ret < 0) {
988 /* Somehow, the session daemon is not responding anymore. */
989 goto end_nosignal;
990 }
991 break;
992 }
07b86b52
JD
993 case LTTNG_CONSUMER_DESTROY_CHANNEL:
994 {
995 uint64_t key = msg.u.destroy_channel.key;
996 struct lttng_consumer_channel *channel;
997
998 channel = consumer_find_channel(key);
999 if (!channel) {
1000 ERR("Kernel consumer destroy channel %" PRIu64 " not found", key);
e462382a 1001 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
07b86b52
JD
1002 }
1003
9ce5646a
MD
1004 health_code_update();
1005
07b86b52
JD
1006 ret = consumer_send_status_msg(sock, ret_code);
1007 if (ret < 0) {
1008 /* Somehow, the session daemon is not responding anymore. */
09d3dbc0 1009 goto end_destroy_channel;
07b86b52
JD
1010 }
1011
9ce5646a
MD
1012 health_code_update();
1013
15dc512a
DG
1014 /* Stop right now if no channel was found. */
1015 if (!channel) {
09d3dbc0 1016 goto end_destroy_channel;
15dc512a
DG
1017 }
1018
07b86b52
JD
1019 /*
1020 * This command should ONLY be issued for channel with streams set in
1021 * no monitor mode.
1022 */
1023 assert(!channel->monitor);
1024
1025 /*
1026 * The refcount should ALWAYS be 0 in the case of a channel in no
1027 * monitor mode.
1028 */
1029 assert(!uatomic_sub_return(&channel->refcount, 1));
1030
1031 consumer_del_channel(channel);
09d3dbc0 1032end_destroy_channel:
07b86b52
JD
1033 goto end_nosignal;
1034 }
fb83fe64
JD
1035 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1036 {
66ab32be
JD
1037 ssize_t ret;
1038 uint64_t count;
fb83fe64
JD
1039 struct lttng_consumer_channel *channel;
1040 uint64_t id = msg.u.discarded_events.session_id;
1041 uint64_t key = msg.u.discarded_events.channel_key;
1042
e5742757
MD
1043 DBG("Kernel consumer discarded events command for session id %"
1044 PRIu64 ", channel key %" PRIu64, id, key);
1045
fb83fe64
JD
1046 channel = consumer_find_channel(key);
1047 if (!channel) {
1048 ERR("Kernel consumer discarded events channel %"
1049 PRIu64 " not found", key);
66ab32be 1050 count = 0;
e5742757 1051 } else {
66ab32be 1052 count = channel->discarded_events;
fb83fe64
JD
1053 }
1054
fb83fe64
JD
1055 health_code_update();
1056
1057 /* Send back returned value to session daemon */
66ab32be 1058 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
fb83fe64
JD
1059 if (ret < 0) {
1060 PERROR("send discarded events");
1061 goto error_fatal;
1062 }
1063
1064 break;
1065 }
1066 case LTTNG_CONSUMER_LOST_PACKETS:
1067 {
66ab32be
JD
1068 ssize_t ret;
1069 uint64_t count;
fb83fe64
JD
1070 struct lttng_consumer_channel *channel;
1071 uint64_t id = msg.u.lost_packets.session_id;
1072 uint64_t key = msg.u.lost_packets.channel_key;
1073
e5742757
MD
1074 DBG("Kernel consumer lost packets command for session id %"
1075 PRIu64 ", channel key %" PRIu64, id, key);
1076
fb83fe64
JD
1077 channel = consumer_find_channel(key);
1078 if (!channel) {
1079 ERR("Kernel consumer lost packets channel %"
1080 PRIu64 " not found", key);
66ab32be 1081 count = 0;
e5742757 1082 } else {
66ab32be 1083 count = channel->lost_packets;
fb83fe64
JD
1084 }
1085
fb83fe64
JD
1086 health_code_update();
1087
1088 /* Send back returned value to session daemon */
66ab32be 1089 ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
fb83fe64
JD
1090 if (ret < 0) {
1091 PERROR("send lost packets");
1092 goto error_fatal;
1093 }
1094
1095 break;
1096 }
b3530820
JG
1097 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1098 {
1099 int channel_monitor_pipe;
1100
1101 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1102 /* Successfully received the command's type. */
1103 ret = consumer_send_status_msg(sock, ret_code);
1104 if (ret < 0) {
1105 goto error_fatal;
1106 }
1107
1108 ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
1109 1);
1110 if (ret != sizeof(channel_monitor_pipe)) {
1111 ERR("Failed to receive channel monitor pipe");
1112 goto error_fatal;
1113 }
1114
1115 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
1116 ret = consumer_timer_thread_set_channel_monitor_pipe(
1117 channel_monitor_pipe);
1118 if (!ret) {
1119 int flags;
1120
1121 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1122 /* Set the pipe as non-blocking. */
1123 ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
1124 if (ret == -1) {
1125 PERROR("fcntl get flags of the channel monitoring pipe");
1126 goto error_fatal;
1127 }
1128 flags = ret;
1129
1130 ret = fcntl(channel_monitor_pipe, F_SETFL,
1131 flags | O_NONBLOCK);
1132 if (ret == -1) {
1133 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
1134 goto error_fatal;
1135 }
1136 DBG("Channel monitor pipe set as non-blocking");
1137 } else {
1138 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
1139 }
1140 ret = consumer_send_status_msg(sock, ret_code);
1141 if (ret < 0) {
1142 goto error_fatal;
1143 }
1144 break;
1145 }
b99a8d42
JD
1146 case LTTNG_CONSUMER_ROTATE_CHANNEL:
1147 {
e96d66b4
MD
1148 struct lttng_consumer_channel *channel;
1149 uint64_t key = msg.u.rotate_channel.key;
b99a8d42 1150
e96d66b4 1151 DBG("Consumer rotate channel %" PRIu64, key);
b99a8d42 1152
e96d66b4
MD
1153 channel = consumer_find_channel(key);
1154 if (!channel) {
1155 ERR("Channel %" PRIu64 " not found", key);
1156 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1157 } else {
1158 /*
1159 * Sample the rotate position of all the streams in this channel.
1160 */
1161 ret = lttng_consumer_rotate_channel(channel, key,
e96d66b4
MD
1162 msg.u.rotate_channel.relayd_id,
1163 msg.u.rotate_channel.metadata,
e96d66b4
MD
1164 ctx);
1165 if (ret < 0) {
1166 ERR("Rotate channel failed");
1167 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
1168 }
b99a8d42 1169
e96d66b4
MD
1170 health_code_update();
1171 }
b99a8d42
JD
1172 ret = consumer_send_status_msg(sock, ret_code);
1173 if (ret < 0) {
1174 /* Somehow, the session daemon is not responding anymore. */
812e4d26 1175 goto error_rotate_channel;
b99a8d42 1176 }
e96d66b4
MD
1177 if (channel) {
1178 /* Rotate the streams that are ready right now. */
1179 ret = lttng_consumer_rotate_ready_streams(
1180 channel, key, ctx);
1181 if (ret < 0) {
1182 ERR("Rotate ready streams failed");
1183 }
b99a8d42 1184 }
b99a8d42 1185 break;
812e4d26
JG
1186error_rotate_channel:
1187 goto end_nosignal;
b99a8d42 1188 }
e5148e25 1189 case LTTNG_CONSUMER_INIT:
00fb02ac 1190 {
e5148e25
JG
1191 ret_code = lttng_consumer_init_command(ctx,
1192 msg.u.init.sessiond_uuid);
00fb02ac 1193 health_code_update();
00fb02ac
JD
1194 ret = consumer_send_status_msg(sock, ret_code);
1195 if (ret < 0) {
1196 /* Somehow, the session daemon is not responding anymore. */
1197 goto end_nosignal;
1198 }
1199 break;
1200 }
e5148e25 1201 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
d88744a4 1202 {
e5148e25 1203 const struct lttng_credentials credentials = {
0ebdafe0
JG
1204 .uid = msg.u.create_trace_chunk.credentials.value.uid,
1205 .gid = msg.u.create_trace_chunk.credentials.value.gid,
e5148e25
JG
1206 };
1207 const bool is_local_trace =
1208 !msg.u.create_trace_chunk.relayd_id.is_set;
1209 const uint64_t relayd_id =
1210 msg.u.create_trace_chunk.relayd_id.value;
1211 const char *chunk_override_name =
1212 *msg.u.create_trace_chunk.override_name ?
1213 msg.u.create_trace_chunk.override_name :
1214 NULL;
1215 LTTNG_OPTIONAL(struct lttng_directory_handle) chunk_directory_handle =
1216 LTTNG_OPTIONAL_INIT;
d88744a4 1217
e5148e25
JG
1218 /*
1219 * The session daemon will only provide a chunk directory file
1220 * descriptor for local traces.
1221 */
1222 if (is_local_trace) {
1223 int chunk_dirfd;
19990ed5 1224
e5148e25
JG
1225 /* Acnowledge the reception of the command. */
1226 ret = consumer_send_status_msg(sock,
1227 LTTCOMM_CONSUMERD_SUCCESS);
1228 if (ret < 0) {
1229 /* Somehow, the session daemon is not responding anymore. */
1230 goto end_nosignal;
1231 }
82528808 1232
e5148e25
JG
1233 ret = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
1234 if (ret != sizeof(chunk_dirfd)) {
1235 ERR("Failed to receive trace chunk directory file descriptor");
1236 goto error_fatal;
1237 }
82528808 1238
e5148e25
JG
1239 DBG("Received trace chunk directory fd (%d)",
1240 chunk_dirfd);
1241 ret = lttng_directory_handle_init_from_dirfd(
1242 &chunk_directory_handle.value,
1243 chunk_dirfd);
1244 if (ret) {
1245 ERR("Failed to initialize chunk directory handle from directory file descriptor");
1246 if (close(chunk_dirfd)) {
1247 PERROR("Failed to close chunk directory file descriptor");
1248 }
1249 goto error_fatal;
1250 }
1251 chunk_directory_handle.is_set = true;
82528808
JG
1252 }
1253
e5148e25
JG
1254 ret_code = lttng_consumer_create_trace_chunk(
1255 !is_local_trace ? &relayd_id : NULL,
1256 msg.u.create_trace_chunk.session_id,
1257 msg.u.create_trace_chunk.chunk_id,
0ebdafe0
JG
1258 (time_t) msg.u.create_trace_chunk
1259 .creation_timestamp,
e5148e25 1260 chunk_override_name,
0ebdafe0
JG
1261 msg.u.create_trace_chunk.credentials.is_set ?
1262 &credentials :
1263 NULL,
e5148e25
JG
1264 chunk_directory_handle.is_set ?
1265 &chunk_directory_handle.value :
1266 NULL);
82528808 1267
e5148e25
JG
1268 if (chunk_directory_handle.is_set) {
1269 lttng_directory_handle_fini(
1270 &chunk_directory_handle.value);
d88744a4 1271 }
e5148e25 1272 goto end_msg_sessiond;
d88744a4 1273 }
e5148e25 1274 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
a1ae2ea5 1275 {
6bbcff33
JG
1276 enum lttng_trace_chunk_command_type close_command =
1277 msg.u.close_trace_chunk.close_command.value;
e5148e25
JG
1278 const uint64_t relayd_id =
1279 msg.u.close_trace_chunk.relayd_id.value;
41b23598
MD
1280 struct lttcomm_consumer_close_trace_chunk_reply reply;
1281 char path[LTTNG_PATH_MAX];
e5148e25
JG
1282
1283 ret_code = lttng_consumer_close_trace_chunk(
1284 msg.u.close_trace_chunk.relayd_id.is_set ?
6bbcff33
JG
1285 &relayd_id :
1286 NULL,
e5148e25
JG
1287 msg.u.close_trace_chunk.session_id,
1288 msg.u.close_trace_chunk.chunk_id,
6bbcff33
JG
1289 (time_t) msg.u.close_trace_chunk.close_timestamp,
1290 msg.u.close_trace_chunk.close_command.is_set ?
1291 &close_command :
41b23598
MD
1292 NULL, path);
1293 reply.ret_code = ret_code;
1294 reply.path_length = strlen(path) + 1;
1295 ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
1296 if (ret != sizeof(reply)) {
1297 goto error_fatal;
1298 }
1299 ret = lttcomm_send_unix_sock(sock, path, reply.path_length);
1300 if (ret != reply.path_length) {
1301 goto error_fatal;
1302 }
1303 goto end_nosignal;
fc181d72 1304 }
e5148e25 1305 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
fc181d72 1306 {
e5148e25
JG
1307 const uint64_t relayd_id =
1308 msg.u.trace_chunk_exists.relayd_id.value;
1309
1310 ret_code = lttng_consumer_trace_chunk_exists(
1311 msg.u.trace_chunk_exists.relayd_id.is_set ?
1312 &relayd_id : NULL,
1313 msg.u.trace_chunk_exists.session_id,
1314 msg.u.trace_chunk_exists.chunk_id);
1315 goto end_msg_sessiond;
a1ae2ea5 1316 }
3bd1e081 1317 default:
3f8e211f 1318 goto end_nosignal;
3bd1e081 1319 }
3f8e211f 1320
3bd1e081 1321end_nosignal:
4cbc1a04
DG
1322 /*
1323 * Return 1 to indicate success since the 0 value can be a socket
1324 * shutdown during the recv() or send() call.
1325 */
e0e7757d
JG
1326 ret = 1;
1327 goto end;
1328error_fatal:
1329 /* This will issue a consumer stop. */
1330 ret = -1;
1331 goto end;
e5148e25
JG
1332end_msg_sessiond:
1333 /*
1334 * The returned value here is not useful since either way we'll return 1 to
1335 * the caller because the session daemon socket management is done
1336 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1337 */
1338 ret = consumer_send_status_msg(sock, ret_code);
1339 if (ret < 0) {
1340 goto error_fatal;
1341 }
e0e7757d
JG
1342 ret = 1;
1343end:
e5148e25 1344 health_code_update();
1803a064 1345 rcu_read_unlock();
e0e7757d 1346 return ret;
3bd1e081 1347}
d41f73b7 1348
309167d2
JD
1349/*
1350 * Populate index values of a kernel stream. Values are set in big endian order.
1351 *
1352 * Return 0 on success or else a negative value.
1353 */
50adc264 1354static int get_index_values(struct ctf_packet_index *index, int infd)
309167d2
JD
1355{
1356 int ret;
641e8873
JG
1357 uint64_t packet_size, content_size, timestamp_begin, timestamp_end,
1358 events_discarded, stream_id, stream_instance_id,
1359 packet_seq_num;
309167d2 1360
641e8873 1361 ret = kernctl_get_timestamp_begin(infd, &timestamp_begin);
309167d2
JD
1362 if (ret < 0) {
1363 PERROR("kernctl_get_timestamp_begin");
1364 goto error;
1365 }
309167d2 1366
641e8873 1367 ret = kernctl_get_timestamp_end(infd, &timestamp_end);
309167d2
JD
1368 if (ret < 0) {
1369 PERROR("kernctl_get_timestamp_end");
1370 goto error;
1371 }
309167d2 1372
641e8873 1373 ret = kernctl_get_events_discarded(infd, &events_discarded);
309167d2
JD
1374 if (ret < 0) {
1375 PERROR("kernctl_get_events_discarded");
1376 goto error;
1377 }
309167d2 1378
641e8873 1379 ret = kernctl_get_content_size(infd, &content_size);
309167d2
JD
1380 if (ret < 0) {
1381 PERROR("kernctl_get_content_size");
1382 goto error;
1383 }
309167d2 1384
641e8873 1385 ret = kernctl_get_packet_size(infd, &packet_size);
309167d2
JD
1386 if (ret < 0) {
1387 PERROR("kernctl_get_packet_size");
1388 goto error;
1389 }
309167d2 1390
641e8873 1391 ret = kernctl_get_stream_id(infd, &stream_id);
309167d2
JD
1392 if (ret < 0) {
1393 PERROR("kernctl_get_stream_id");
1394 goto error;
1395 }
309167d2 1396
641e8873 1397 ret = kernctl_get_instance_id(infd, &stream_instance_id);
234cd636 1398 if (ret < 0) {
f0b03c22
MD
1399 if (ret == -ENOTTY) {
1400 /* Command not implemented by lttng-modules. */
641e8873 1401 stream_instance_id = -1ULL;
f0b03c22
MD
1402 } else {
1403 PERROR("kernctl_get_instance_id");
1404 goto error;
1405 }
234cd636 1406 }
234cd636 1407
641e8873 1408 ret = kernctl_get_sequence_number(infd, &packet_seq_num);
234cd636 1409 if (ret < 0) {
f0b03c22
MD
1410 if (ret == -ENOTTY) {
1411 /* Command not implemented by lttng-modules. */
641e8873 1412 packet_seq_num = -1ULL;
f0b03c22
MD
1413 ret = 0;
1414 } else {
1415 PERROR("kernctl_get_sequence_number");
1416 goto error;
1417 }
234cd636
JD
1418 }
1419 index->packet_seq_num = htobe64(index->packet_seq_num);
1420
641e8873
JG
1421 *index = (typeof(*index)) {
1422 .offset = index->offset,
1423 .packet_size = htobe64(packet_size),
1424 .content_size = htobe64(content_size),
1425 .timestamp_begin = htobe64(timestamp_begin),
1426 .timestamp_end = htobe64(timestamp_end),
1427 .events_discarded = htobe64(events_discarded),
1428 .stream_id = htobe64(stream_id),
1429 .stream_instance_id = htobe64(stream_instance_id),
1430 .packet_seq_num = htobe64(packet_seq_num),
1431 };
1432
309167d2
JD
1433error:
1434 return ret;
1435}
94d49140
JD
1436/*
1437 * Sync metadata meaning request them to the session daemon and snapshot to the
1438 * metadata thread can consumer them.
1439 *
1440 * Metadata stream lock MUST be acquired.
1441 *
1442 * Return 0 if new metadatda is available, EAGAIN if the metadata stream
1443 * is empty or a negative value on error.
1444 */
1445int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata)
1446{
1447 int ret;
1448
1449 assert(metadata);
1450
1451 ret = kernctl_buffer_flush(metadata->wait_fd);
1452 if (ret < 0) {
1453 ERR("Failed to flush kernel stream");
1454 goto end;
1455 }
1456
1457 ret = kernctl_snapshot(metadata->wait_fd);
1458 if (ret < 0) {
32af2c95 1459 if (ret != -EAGAIN) {
94d49140
JD
1460 ERR("Sync metadata, taking kernel snapshot failed.");
1461 goto end;
1462 }
1463 DBG("Sync metadata, no new kernel metadata");
1464 /* No new metadata, exit. */
1465 ret = ENODATA;
1466 goto end;
1467 }
1468
1469end:
1470 return ret;
1471}
309167d2 1472
fb83fe64
JD
1473static
1474int update_stream_stats(struct lttng_consumer_stream *stream)
1475{
1476 int ret;
1477 uint64_t seq, discarded;
1478
1479 ret = kernctl_get_sequence_number(stream->wait_fd, &seq);
1480 if (ret < 0) {
f0b03c22
MD
1481 if (ret == -ENOTTY) {
1482 /* Command not implemented by lttng-modules. */
1483 seq = -1ULL;
c8eabe73 1484 stream->sequence_number_unavailable = true;
f0b03c22
MD
1485 } else {
1486 PERROR("kernctl_get_sequence_number");
1487 goto end;
1488 }
fb83fe64
JD
1489 }
1490
1491 /*
1492 * Start the sequence when we extract the first packet in case we don't
1493 * start at 0 (for example if a consumer is not connected to the
1494 * session immediately after the beginning).
1495 */
1496 if (stream->last_sequence_number == -1ULL) {
1497 stream->last_sequence_number = seq;
1498 } else if (seq > stream->last_sequence_number) {
1499 stream->chan->lost_packets += seq -
1500 stream->last_sequence_number - 1;
1501 } else {
1502 /* seq <= last_sequence_number */
1503 ERR("Sequence number inconsistent : prev = %" PRIu64
1504 ", current = %" PRIu64,
1505 stream->last_sequence_number, seq);
1506 ret = -1;
1507 goto end;
1508 }
1509 stream->last_sequence_number = seq;
1510
1511 ret = kernctl_get_events_discarded(stream->wait_fd, &discarded);
1512 if (ret < 0) {
1513 PERROR("kernctl_get_events_discarded");
1514 goto end;
1515 }
1516 if (discarded < stream->last_discarded_events) {
1517 /*
83f4233d
MJ
1518 * Overflow has occurred. We assume only one wrap-around
1519 * has occurred.
fb83fe64
JD
1520 */
1521 stream->chan->discarded_events += (1ULL << (CAA_BITS_PER_LONG - 1)) -
1522 stream->last_discarded_events + discarded;
1523 } else {
1524 stream->chan->discarded_events += discarded -
1525 stream->last_discarded_events;
1526 }
1527 stream->last_discarded_events = discarded;
1528 ret = 0;
1529
1530end:
1531 return ret;
1532}
1533
93ec662e
JD
1534/*
1535 * Check if the local version of the metadata stream matches with the version
1536 * of the metadata stream in the kernel. If it was updated, set the reset flag
1537 * on the stream.
1538 */
1539static
1540int metadata_stream_check_version(int infd, struct lttng_consumer_stream *stream)
1541{
1542 int ret;
1543 uint64_t cur_version;
1544
1545 ret = kernctl_get_metadata_version(infd, &cur_version);
1546 if (ret < 0) {
f0b03c22
MD
1547 if (ret == -ENOTTY) {
1548 /*
1549 * LTTng-modules does not implement this
1550 * command.
1551 */
1552 ret = 0;
1553 goto end;
1554 }
93ec662e
JD
1555 ERR("Failed to get the metadata version");
1556 goto end;
1557 }
1558
1559 if (stream->metadata_version == cur_version) {
1560 ret = 0;
1561 goto end;
1562 }
1563
1564 DBG("New metadata version detected");
1565 stream->metadata_version = cur_version;
1566 stream->reset_metadata_flag = 1;
1567 ret = 0;
1568
1569end:
1570 return ret;
1571}
1572
d41f73b7
MD
1573/*
1574 * Consume data on a file descriptor and write it on a trace file.
e5148e25 1575 * The stream and channel locks must be held by the caller.
d41f73b7 1576 */
4078b776 1577ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
e5148e25 1578 struct lttng_consumer_local_data *ctx)
d41f73b7 1579{
1d4dfdef 1580 unsigned long len, subbuf_size, padding;
02d02e31 1581 int err, write_index = 1, rotation_ret;
4078b776 1582 ssize_t ret = 0;
d41f73b7 1583 int infd = stream->wait_fd;
b2d8a465 1584 struct ctf_packet_index index = {};
d41f73b7
MD
1585
1586 DBG("In read_subbuffer (infd : %d)", infd);
309167d2 1587
02d02e31
JD
1588 /*
1589 * If the stream was flagged to be ready for rotation before we extract the
1590 * next packet, rotate it now.
1591 */
1592 if (stream->rotate_ready) {
1593 DBG("Rotate stream before extracting data");
e5148e25 1594 rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
02d02e31
JD
1595 if (rotation_ret < 0) {
1596 ERR("Stream rotation error");
1597 ret = -1;
1598 goto error;
1599 }
1600 }
1601
d41f73b7
MD
1602 /* Get the next subbuffer */
1603 err = kernctl_get_next_subbuf(infd);
1604 if (err != 0) {
d41f73b7
MD
1605 /*
1606 * This is a debug message even for single-threaded consumer,
1607 * because poll() have more relaxed criterions than get subbuf,
1608 * so get_subbuf may fail for short race windows where poll()
1609 * would issue wakeups.
1610 */
1611 DBG("Reserving sub buffer failed (everything is normal, "
1612 "it is due to concurrency)");
32af2c95 1613 ret = err;
02d02e31 1614 goto error;
d41f73b7
MD
1615 }
1616
1d4dfdef
DG
1617 /* Get the full subbuffer size including padding */
1618 err = kernctl_get_padded_subbuf_size(infd, &len);
1619 if (err != 0) {
5a510c9f 1620 PERROR("Getting sub-buffer len failed.");
8265f19e
MD
1621 err = kernctl_put_subbuf(infd);
1622 if (err != 0) {
32af2c95 1623 if (err == -EFAULT) {
5a510c9f 1624 PERROR("Error in unreserving sub buffer\n");
32af2c95 1625 } else if (err == -EIO) {
8265f19e 1626 /* Should never happen with newer LTTng versions */
5a510c9f 1627 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
8265f19e 1628 }
32af2c95 1629 ret = err;
02d02e31 1630 goto error;
8265f19e 1631 }
32af2c95 1632 ret = err;
02d02e31 1633 goto error;
1d4dfdef
DG
1634 }
1635
1c20f0e2 1636 if (!stream->metadata_flag) {
309167d2
JD
1637 ret = get_index_values(&index, infd);
1638 if (ret < 0) {
8265f19e
MD
1639 err = kernctl_put_subbuf(infd);
1640 if (err != 0) {
32af2c95 1641 if (err == -EFAULT) {
5a510c9f 1642 PERROR("Error in unreserving sub buffer\n");
32af2c95 1643 } else if (err == -EIO) {
8265f19e 1644 /* Should never happen with newer LTTng versions */
5a510c9f 1645 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
8265f19e 1646 }
32af2c95 1647 ret = err;
02d02e31 1648 goto error;
8265f19e 1649 }
02d02e31 1650 goto error;
309167d2 1651 }
fb83fe64
JD
1652 ret = update_stream_stats(stream);
1653 if (ret < 0) {
7b87473d
MD
1654 err = kernctl_put_subbuf(infd);
1655 if (err != 0) {
1656 if (err == -EFAULT) {
1657 PERROR("Error in unreserving sub buffer\n");
1658 } else if (err == -EIO) {
1659 /* Should never happen with newer LTTng versions */
1660 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
1661 }
1662 ret = err;
02d02e31 1663 goto error;
7b87473d 1664 }
02d02e31 1665 goto error;
fb83fe64 1666 }
1c20f0e2
JD
1667 } else {
1668 write_index = 0;
93ec662e
JD
1669 ret = metadata_stream_check_version(infd, stream);
1670 if (ret < 0) {
7b87473d
MD
1671 err = kernctl_put_subbuf(infd);
1672 if (err != 0) {
1673 if (err == -EFAULT) {
1674 PERROR("Error in unreserving sub buffer\n");
1675 } else if (err == -EIO) {
1676 /* Should never happen with newer LTTng versions */
1677 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
1678 }
1679 ret = err;
02d02e31 1680 goto error;
7b87473d 1681 }
02d02e31 1682 goto error;
93ec662e 1683 }
309167d2
JD
1684 }
1685
ffe60014 1686 switch (stream->chan->output) {
07b86b52 1687 case CONSUMER_CHANNEL_SPLICE:
1d4dfdef
DG
1688 /*
1689 * XXX: The lttng-modules splice "actor" does not handle copying
1690 * partial pages hence only using the subbuffer size without the
1691 * padding makes the splice fail.
1692 */
1693 subbuf_size = len;
1694 padding = 0;
1695
1696 /* splice the subbuffer to the tracefile */
91dfef6e 1697 ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size,
309167d2 1698 padding, &index);
91dfef6e
DG
1699 /*
1700 * XXX: Splice does not support network streaming so the return value
1701 * is simply checked against subbuf_size and not like the mmap() op.
1702 */
1d4dfdef
DG
1703 if (ret != subbuf_size) {
1704 /*
1705 * display the error but continue processing to try
1706 * to release the subbuffer
1707 */
1708 ERR("Error splicing to tracefile (ret: %zd != len: %lu)",
1709 ret, subbuf_size);
309167d2 1710 write_index = 0;
1d4dfdef
DG
1711 }
1712 break;
07b86b52 1713 case CONSUMER_CHANNEL_MMAP:
7775df52
JG
1714 {
1715 const char *subbuf_addr;
1716
1d4dfdef
DG
1717 /* Get subbuffer size without padding */
1718 err = kernctl_get_subbuf_size(infd, &subbuf_size);
1719 if (err != 0) {
5a510c9f 1720 PERROR("Getting sub-buffer len failed.");
8265f19e
MD
1721 err = kernctl_put_subbuf(infd);
1722 if (err != 0) {
32af2c95 1723 if (err == -EFAULT) {
5a510c9f 1724 PERROR("Error in unreserving sub buffer\n");
32af2c95 1725 } else if (err == -EIO) {
8265f19e 1726 /* Should never happen with newer LTTng versions */
5a510c9f 1727 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
8265f19e 1728 }
32af2c95 1729 ret = err;
02d02e31 1730 goto error;
8265f19e 1731 }
32af2c95 1732 ret = err;
02d02e31 1733 goto error;
1d4dfdef 1734 }
47e81c02 1735
7775df52
JG
1736 ret = get_current_subbuf_addr(stream, &subbuf_addr);
1737 if (ret) {
1738 goto error_put_subbuf;
1739 }
1740
1d4dfdef
DG
1741 /* Make sure the tracer is not gone mad on us! */
1742 assert(len >= subbuf_size);
1743
1744 padding = len - subbuf_size;
1745
1746 /* write the subbuffer to the tracefile */
7775df52
JG
1747 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream,
1748 subbuf_addr,
1749 subbuf_size,
309167d2 1750 padding, &index);
91dfef6e
DG
1751 /*
1752 * The mmap operation should write subbuf_size amount of data when
1753 * network streaming or the full padding (len) size when we are _not_
1754 * streaming.
1755 */
d88aee68
DG
1756 if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
1757 (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
1d4dfdef 1758 /*
91dfef6e 1759 * Display the error but continue processing to try to release the
2336629e
DG
1760 * subbuffer. This is a DBG statement since this is possible to
1761 * happen without being a critical error.
1d4dfdef 1762 */
2336629e 1763 DBG("Error writing to tracefile "
91dfef6e
DG
1764 "(ret: %zd != len: %lu != subbuf_size: %lu)",
1765 ret, len, subbuf_size);
309167d2 1766 write_index = 0;
1d4dfdef
DG
1767 }
1768 break;
7775df52 1769 }
1d4dfdef
DG
1770 default:
1771 ERR("Unknown output method");
56591bac 1772 ret = -EPERM;
d41f73b7 1773 }
7775df52 1774error_put_subbuf:
d41f73b7
MD
1775 err = kernctl_put_next_subbuf(infd);
1776 if (err != 0) {
32af2c95 1777 if (err == -EFAULT) {
5a510c9f 1778 PERROR("Error in unreserving sub buffer\n");
32af2c95 1779 } else if (err == -EIO) {
d41f73b7 1780 /* Should never happen with newer LTTng versions */
5a510c9f 1781 PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
d41f73b7 1782 }
32af2c95 1783 ret = err;
02d02e31 1784 goto error;
d41f73b7
MD
1785 }
1786
309167d2 1787 /* Write index if needed. */
1c20f0e2 1788 if (!write_index) {
02d02e31 1789 goto rotate;
1c20f0e2
JD
1790 }
1791
94d49140
JD
1792 if (stream->chan->live_timer_interval && !stream->metadata_flag) {
1793 /*
1794 * In live, block until all the metadata is sent.
1795 */
c585821b
MD
1796 pthread_mutex_lock(&stream->metadata_timer_lock);
1797 assert(!stream->missed_metadata_flush);
1798 stream->waiting_on_metadata = true;
1799 pthread_mutex_unlock(&stream->metadata_timer_lock);
1800
94d49140 1801 err = consumer_stream_sync_metadata(ctx, stream->session_id);
c585821b
MD
1802
1803 pthread_mutex_lock(&stream->metadata_timer_lock);
1804 stream->waiting_on_metadata = false;
1805 if (stream->missed_metadata_flush) {
1806 stream->missed_metadata_flush = false;
1807 pthread_mutex_unlock(&stream->metadata_timer_lock);
1808 (void) consumer_flush_kernel_index(stream);
1809 } else {
1810 pthread_mutex_unlock(&stream->metadata_timer_lock);
1811 }
94d49140 1812 if (err < 0) {
02d02e31 1813 goto error;
94d49140
JD
1814 }
1815 }
1816
1c20f0e2
JD
1817 err = consumer_stream_write_index(stream, &index);
1818 if (err < 0) {
02d02e31 1819 goto error;
309167d2
JD
1820 }
1821
02d02e31
JD
1822rotate:
1823 /*
1824 * After extracting the packet, we check if the stream is now ready to be
1825 * rotated and perform the action immediately.
1826 */
1827 rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
1828 if (rotation_ret == 1) {
e5148e25 1829 rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
02d02e31
JD
1830 if (rotation_ret < 0) {
1831 ERR("Stream rotation error");
1832 ret = -1;
1833 goto error;
1834 }
1835 } else if (rotation_ret < 0) {
1836 ERR("Checking if stream is ready to rotate");
1837 ret = -1;
1838 goto error;
1839 }
1840
1841error:
d41f73b7
MD
1842 return ret;
1843}
1844
1845int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1846{
1847 int ret;
ffe60014
DG
1848
1849 assert(stream);
1850
2bba9e53 1851 /*
e5148e25
JG
1852 * Don't create anything if this is set for streaming or if there is
1853 * no current trace chunk on the parent channel.
2bba9e53 1854 */
e5148e25
JG
1855 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
1856 stream->chan->trace_chunk) {
1857 ret = consumer_stream_create_output_files(stream, true);
1858 if (ret) {
fe4477ee
JD
1859 goto error;
1860 }
ffe60014 1861 }
d41f73b7 1862
d41f73b7
MD
1863 if (stream->output == LTTNG_EVENT_MMAP) {
1864 /* get the len of the mmap region */
1865 unsigned long mmap_len;
1866
1867 ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len);
1868 if (ret != 0) {
ffe60014 1869 PERROR("kernctl_get_mmap_len");
d41f73b7
MD
1870 goto error_close_fd;
1871 }
1872 stream->mmap_len = (size_t) mmap_len;
1873
ffe60014
DG
1874 stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ,
1875 MAP_PRIVATE, stream->wait_fd, 0);
d41f73b7 1876 if (stream->mmap_base == MAP_FAILED) {
ffe60014 1877 PERROR("Error mmaping");
d41f73b7
MD
1878 ret = -1;
1879 goto error_close_fd;
1880 }
1881 }
1882
1883 /* we return 0 to let the library handle the FD internally */
1884 return 0;
1885
1886error_close_fd:
2f225ce2 1887 if (stream->out_fd >= 0) {
d41f73b7
MD
1888 int err;
1889
1890 err = close(stream->out_fd);
1891 assert(!err);
2f225ce2 1892 stream->out_fd = -1;
d41f73b7
MD
1893 }
1894error:
1895 return ret;
1896}
1897
ca22feea
DG
1898/*
1899 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
1900 * stream. Consumer data lock MUST be acquired before calling this function
1901 * and the stream lock.
ca22feea 1902 *
6d805429 1903 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
1904 * data is available for trace viewer reading.
1905 */
6d805429 1906int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
1907{
1908 int ret;
1909
1910 assert(stream);
1911
873b9e9a
MD
1912 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
1913 ret = 0;
1914 goto end;
1915 }
1916
ca22feea
DG
1917 ret = kernctl_get_next_subbuf(stream->wait_fd);
1918 if (ret == 0) {
1919 /* There is still data so let's put back this subbuffer. */
1920 ret = kernctl_put_subbuf(stream->wait_fd);
1921 assert(ret == 0);
6d805429 1922 ret = 1; /* Data is pending */
4e9a4686 1923 goto end;
ca22feea
DG
1924 }
1925
6d805429
DG
1926 /* Data is NOT pending and ready to be read. */
1927 ret = 0;
ca22feea 1928
6efae65e
DG
1929end:
1930 return ret;
ca22feea 1931}
This page took 0.207356 seconds and 4 git commands to generate.