Implement channel fd monitoring thread for UST
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
d14d33bf
AM
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
3bd1e081
MD
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
d14d33bf
AM
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
f02e1e8a 21#include <lttng/ust-ctl.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
dbb5dfe6 28#include <sys/stat.h>
3bd1e081 29#include <sys/types.h>
77c7c900 30#include <inttypes.h>
3bd1e081 31#include <unistd.h>
ffe60014 32#include <urcu/list.h>
0857097f 33
990570ed 34#include <common/common.h>
10a8a223 35#include <common/sessiond-comm/sessiond-comm.h>
00e2e675 36#include <common/relayd/relayd.h>
dbb5dfe6 37#include <common/compat/fcntl.h>
10a8a223
DG
38
39#include "ust-consumer.h"
3bd1e081
MD
40
41extern struct lttng_consumer_global_data consumer_data;
42extern int consumer_poll_timeout;
43extern volatile int consumer_quit;
44
45/*
ffe60014
DG
46 * Free channel object and all streams associated with it. This MUST be used
47 * only and only if the channel has _NEVER_ been added to the global channel
48 * hash table.
3bd1e081 49 */
ffe60014 50static void destroy_channel(struct lttng_consumer_channel *channel)
3bd1e081 51{
ffe60014
DG
52 struct lttng_consumer_stream *stream, *stmp;
53
54 assert(channel);
55
56 DBG("UST consumer cleaning stream list");
57
58 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
59 send_node) {
60 cds_list_del(&stream->send_node);
61 ustctl_destroy_stream(stream->ustream);
62 free(stream);
63 }
64
65 /*
66 * If a channel is available meaning that was created before the streams
67 * were, delete it.
68 */
69 if (channel->uchan) {
70 lttng_ustconsumer_del_channel(channel);
71 }
72 free(channel);
73}
3bd1e081
MD
74
75/*
ffe60014 76 * Add channel to internal consumer state.
3bd1e081 77 *
ffe60014 78 * Returns 0 on success or else a negative value.
3bd1e081 79 */
ffe60014
DG
80static int add_channel(struct lttng_consumer_channel *channel,
81 struct lttng_consumer_local_data *ctx)
3bd1e081
MD
82{
83 int ret = 0;
84
ffe60014
DG
85 assert(channel);
86 assert(ctx);
87
88 if (ctx->on_recv_channel != NULL) {
89 ret = ctx->on_recv_channel(channel);
90 if (ret == 0) {
d8ef542d 91 ret = consumer_add_channel(channel, ctx);
ffe60014
DG
92 } else if (ret < 0) {
93 /* Most likely an ENOMEM. */
94 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
95 goto error;
96 }
97 } else {
d8ef542d 98 ret = consumer_add_channel(channel, ctx);
3bd1e081
MD
99 }
100
d88aee68 101 DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
ffe60014
DG
102
103error:
3bd1e081
MD
104 return ret;
105}
106
107/*
ffe60014
DG
108 * Allocate and return a consumer channel object.
109 */
110static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
111 const char *pathname, const char *name, uid_t uid, gid_t gid,
d88aee68 112 int relayd_id, uint64_t key, enum lttng_event_output output)
ffe60014
DG
113{
114 assert(pathname);
115 assert(name);
116
117 return consumer_allocate_channel(key, session_id, pathname, name, uid, gid,
118 relayd_id, output);
119}
120
121/*
122 * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
123 * error value if applicable is set in it else it is kept untouched.
3bd1e081 124 *
ffe60014 125 * Return NULL on error else the newly allocated stream object.
3bd1e081 126 */
ffe60014
DG
127static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
128 struct lttng_consumer_channel *channel,
129 struct lttng_consumer_local_data *ctx, int *_alloc_ret)
130{
131 int alloc_ret;
132 struct lttng_consumer_stream *stream = NULL;
133
134 assert(channel);
135 assert(ctx);
136
137 stream = consumer_allocate_stream(channel->key,
138 key,
139 LTTNG_CONSUMER_ACTIVE_STREAM,
140 channel->name,
141 channel->uid,
142 channel->gid,
143 channel->relayd_id,
144 channel->session_id,
145 cpu,
146 &alloc_ret,
147 channel->type);
148 if (stream == NULL) {
149 switch (alloc_ret) {
150 case -ENOENT:
151 /*
152 * We could not find the channel. Can happen if cpu hotplug
153 * happens while tearing down.
154 */
155 DBG3("Could not find channel");
156 break;
157 case -ENOMEM:
158 case -EINVAL:
159 default:
160 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
161 break;
162 }
163 goto error;
164 }
165
166 stream->chan = channel;
167
168error:
169 if (_alloc_ret) {
170 *_alloc_ret = alloc_ret;
171 }
172 return stream;
173}
174
175/*
176 * Send the given stream pointer to the corresponding thread.
177 *
178 * Returns 0 on success else a negative value.
179 */
180static int send_stream_to_thread(struct lttng_consumer_stream *stream,
181 struct lttng_consumer_local_data *ctx)
182{
183 int ret, stream_pipe;
184
185 /* Get the right pipe where the stream will be sent. */
186 if (stream->metadata_flag) {
187 stream_pipe = ctx->consumer_metadata_pipe[1];
188 } else {
189 stream_pipe = ctx->consumer_data_pipe[1];
190 }
191
192 do {
193 ret = write(stream_pipe, &stream, sizeof(stream));
194 } while (ret < 0 && errno == EINTR);
195 if (ret < 0) {
196 PERROR("Consumer write %s stream to pipe %d",
197 stream->metadata_flag ? "metadata" : "data", stream_pipe);
198 }
199
200 return ret;
201}
202
203/*
204 * Search for a relayd object related to the stream. If found, send the stream
205 * to the relayd.
206 *
207 * On success, returns 0 else a negative value.
208 */
209static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
210{
211 int ret = 0;
212 struct consumer_relayd_sock_pair *relayd;
213
214 assert(stream);
215
216 relayd = consumer_find_relayd(stream->net_seq_idx);
217 if (relayd != NULL) {
218 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
219 /* Add stream on the relayd */
220 ret = relayd_add_stream(&relayd->control_sock, stream->name,
221 stream->chan->pathname, &stream->relayd_stream_id);
222 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
223 if (ret < 0) {
224 goto error;
225 }
d88aee68
DG
226 } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
227 ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
ffe60014
DG
228 stream->net_seq_idx);
229 ret = -1;
230 goto error;
231 }
232
233error:
234 return ret;
235}
236
d88aee68
DG
237/*
238 * Create streams for the given channel using liblttng-ust-ctl.
239 *
240 * Return 0 on success else a negative value.
241 */
ffe60014
DG
242static int create_ust_streams(struct lttng_consumer_channel *channel,
243 struct lttng_consumer_local_data *ctx)
244{
245 int ret, cpu = 0;
246 struct ustctl_consumer_stream *ustream;
247 struct lttng_consumer_stream *stream;
248
249 assert(channel);
250 assert(ctx);
251
252 /*
253 * While a stream is available from ustctl. When NULL is returned, we've
254 * reached the end of the possible stream for the channel.
255 */
256 while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
257 int wait_fd;
258
749d339a 259 wait_fd = ustctl_stream_get_wait_fd(ustream);
ffe60014
DG
260
261 /* Allocate consumer stream object. */
262 stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
263 if (!stream) {
264 goto error_alloc;
265 }
266 stream->ustream = ustream;
267 /*
268 * Store it so we can save multiple function calls afterwards since
269 * this value is used heavily in the stream threads. This is UST
270 * specific so this is why it's done after allocation.
271 */
272 stream->wait_fd = wait_fd;
273
274 /*
275 * Order is important this is why a list is used. On error, the caller
276 * should clean this list.
277 */
278 cds_list_add_tail(&stream->send_node, &channel->streams.head);
279
280 ret = ustctl_get_max_subbuf_size(stream->ustream,
281 &stream->max_sb_size);
282 if (ret < 0) {
283 ERR("ustctl_get_max_subbuf_size failed for stream %s",
284 stream->name);
285 goto error;
286 }
287
288 /* Do actions once stream has been received. */
289 if (ctx->on_recv_stream) {
290 ret = ctx->on_recv_stream(stream);
291 if (ret < 0) {
292 goto error;
293 }
294 }
295
d88aee68 296 DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
ffe60014
DG
297 stream->name, stream->key, stream->relayd_stream_id);
298
299 /* Set next CPU stream. */
300 channel->streams.count = ++cpu;
d88aee68
DG
301
302 /* Keep stream reference when creating metadata. */
303 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
304 channel->metadata_stream = stream;
305 }
ffe60014
DG
306 }
307
308 return 0;
309
310error:
311error_alloc:
312 return ret;
313}
314
315/*
316 * Create an UST channel with the given attributes and send it to the session
317 * daemon using the ust ctl API.
318 *
319 * Return 0 on success or else a negative value.
320 */
321static int create_ust_channel(struct ustctl_consumer_channel_attr *attr,
322 struct ustctl_consumer_channel **chanp)
323{
324 int ret;
325 struct ustctl_consumer_channel *channel;
326
327 assert(attr);
328 assert(chanp);
329
330 DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
331 "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
332 "switch_timer_interval: %u, read_timer_interval: %u, "
333 "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
334 attr->num_subbuf, attr->switch_timer_interval,
335 attr->read_timer_interval, attr->output, attr->type);
336
337 channel = ustctl_create_channel(attr);
338 if (!channel) {
339 ret = -1;
340 goto error_create;
341 }
342
343 *chanp = channel;
344
345 return 0;
346
347error_create:
348 return ret;
349}
350
d88aee68
DG
351/*
352 * Send a single given stream to the session daemon using the sock.
353 *
354 * Return 0 on success else a negative value.
355 */
ffe60014
DG
356static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
357{
358 int ret;
359
360 assert(stream);
361 assert(sock >= 0);
362
d88aee68 363 DBG2("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
ffe60014
DG
364
365 /* Send stream to session daemon. */
366 ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
367 if (ret < 0) {
368 goto error;
369 }
370
ffe60014
DG
371error:
372 return ret;
373}
374
375/*
376 * Send channel to sessiond.
377 *
d88aee68 378 * Return 0 on success or else a negative value.
ffe60014
DG
379 */
380static int send_sessiond_channel(int sock,
381 struct lttng_consumer_channel *channel,
382 struct lttng_consumer_local_data *ctx, int *relayd_error)
383{
384 int ret;
385 struct lttng_consumer_stream *stream;
386
387 assert(channel);
388 assert(ctx);
389 assert(sock >= 0);
390
391 DBG("UST consumer sending channel %s to sessiond", channel->name);
392
393 /* Send channel to sessiond. */
394 ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
395 if (ret < 0) {
396 goto error;
397 }
398
d8ef542d
MD
399 ret = ustctl_channel_close_wakeup_fd(channel->uchan);
400 if (ret < 0) {
401 goto error;
402 }
403
ffe60014
DG
404 /* The channel was sent successfully to the sessiond at this point. */
405 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
406 /* Try to send the stream to the relayd if one is available. */
407 ret = send_stream_to_relayd(stream);
408 if (ret < 0) {
409 /*
410 * Flag that the relayd was the problem here probably due to a
411 * communicaton error on the socket.
412 */
413 if (relayd_error) {
414 *relayd_error = 1;
415 }
416 goto error;
417 }
418
419 /* Send stream to session daemon. */
420 ret = send_sessiond_stream(sock, stream);
421 if (ret < 0) {
422 goto error;
423 }
424 }
425
426 /* Tell sessiond there is no more stream. */
427 ret = ustctl_send_stream_to_sessiond(sock, NULL);
428 if (ret < 0) {
429 goto error;
430 }
431
432 DBG("UST consumer NULL stream sent to sessiond");
433
434 return 0;
435
436error:
437 return ret;
438}
439
440/*
441 * Creates a channel and streams and add the channel it to the channel internal
442 * state. The created stream must ONLY be sent once the GET_CHANNEL command is
443 * received.
444 *
445 * Return 0 on success or else, a negative value is returned and the channel
446 * MUST be destroyed by consumer_del_channel().
447 */
448static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
449 struct lttng_consumer_channel *channel,
450 struct ustctl_consumer_channel_attr *attr)
3bd1e081
MD
451{
452 int ret;
453
ffe60014
DG
454 assert(ctx);
455 assert(channel);
456 assert(attr);
457
458 /*
459 * This value is still used by the kernel consumer since for the kernel,
460 * the stream ownership is not IN the consumer so we need to have the
461 * number of left stream that needs to be initialized so we can know when
462 * to delete the channel (see consumer.c).
463 *
464 * As for the user space tracer now, the consumer creates and sends the
465 * stream to the session daemon which only sends them to the application
466 * once every stream of a channel is received making this value useless
467 * because we they will be added to the poll thread before the application
468 * receives them. This ensures that a stream can not hang up during
469 * initilization of a channel.
470 */
471 channel->nb_init_stream_left = 0;
472
473 /* The reply msg status is handled in the following call. */
474 ret = create_ust_channel(attr, &channel->uchan);
475 if (ret < 0) {
476 goto error;
3bd1e081
MD
477 }
478
d8ef542d
MD
479 channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
480
481 if (ret < 0) {
482 goto error;
483 }
484
ffe60014
DG
485 /* Open all streams for this channel. */
486 ret = create_ust_streams(channel, ctx);
487 if (ret < 0) {
488 goto error;
489 }
490
491error:
3bd1e081
MD
492 return ret;
493}
494
d88aee68
DG
495/*
496 * Send all stream of a channel to the right thread handling it.
497 *
498 * On error, return a negative value else 0 on success.
499 */
500static int send_streams_to_thread(struct lttng_consumer_channel *channel,
501 struct lttng_consumer_local_data *ctx)
502{
503 int ret = 0;
504 struct lttng_consumer_stream *stream, *stmp;
505
506 assert(channel);
507 assert(ctx);
508
509 /* Send streams to the corresponding thread. */
510 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
511 send_node) {
512 /* Sending the stream to the thread. */
513 ret = send_stream_to_thread(stream, ctx);
514 if (ret < 0) {
515 /*
516 * If we are unable to send the stream to the thread, there is
517 * a big problem so just stop everything.
518 */
519 goto error;
520 }
521
522 /* Remove node from the channel stream list. */
523 cds_list_del(&stream->send_node);
524 }
525
526error:
527 return ret;
528}
529
530/*
531 * Write metadata to the given channel using ustctl to convert the string to
532 * the ringbuffer.
533 *
534 * Return 0 on success else a negative value.
535 */
536static int push_metadata(struct lttng_consumer_channel *metadata,
537 const char *metadata_str, uint64_t target_offset, uint64_t len)
538{
539 int ret;
540
541 assert(metadata);
542 assert(metadata_str);
543
544 DBG("UST consumer writing metadata to channel %s", metadata->name);
545
546 assert(target_offset == metadata->contig_metadata_written);
547 ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len);
548 if (ret < 0) {
549 ERR("ustctl write metadata fail with ret %d, len %ld", ret, len);
550 goto error;
551 }
552 metadata->contig_metadata_written += len;
553
554 ustctl_flush_buffer(metadata->metadata_stream->ustream, 1);
555
556error:
557 return ret;
558}
559
560/*
561 * Close metadata stream wakeup_fd using the given key to retrieve the channel.
562 *
563 * Return 0 on success else an LTTng error code.
564 */
565static int close_metadata(uint64_t chan_key)
566{
567 int ret;
568 struct lttng_consumer_channel *channel;
569
570 DBG("UST consumer close metadata key %lu", chan_key);
571
572 channel = consumer_find_channel(chan_key);
573 if (!channel) {
574 ERR("UST consumer close metadata %lu not found", chan_key);
575 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
576 goto error;
577 }
578
579 ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
580 if (ret < 0) {
581 ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
582 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
583 goto error;
584 }
585
586error:
587 return ret;
588}
589
590/*
591 * RCU read side lock MUST be acquired before calling this function.
592 *
593 * Return 0 on success else an LTTng error code.
594 */
595static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
596{
597 int ret;
598 struct lttng_consumer_channel *metadata;
599
600 DBG("UST consumer setup metadata key %lu", key);
601
602 metadata = consumer_find_channel(key);
603 if (!metadata) {
604 ERR("UST consumer push metadata %" PRIu64 " not found", key);
605 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
606 goto error;
607 }
608
609 /*
610 * Send metadata stream to relayd if one available. Availability is
611 * known if the stream is still in the list of the channel.
612 */
613 if (cds_list_empty(&metadata->streams.head)) {
614 ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
615 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
616 goto error;
617 }
618
619 /* Send metadata stream to relayd if needed. */
620 ret = send_stream_to_relayd(metadata->metadata_stream);
621 if (ret < 0) {
622 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
623 goto error;
624 }
625
626 ret = send_streams_to_thread(metadata, ctx);
627 if (ret < 0) {
628 /*
629 * If we are unable to send the stream to the thread, there is
630 * a big problem so just stop everything.
631 */
632 ret = LTTCOMM_CONSUMERD_FATAL;
633 goto error;
634 }
635 /* List MUST be empty after or else it could be reused. */
636 assert(cds_list_empty(&metadata->streams.head));
637
638 ret = 0;
639
640error:
641 return ret;
642}
643
4cbc1a04
DG
644/*
645 * Receive command from session daemon and process it.
646 *
647 * Return 1 on success else a negative value or 0.
648 */
3bd1e081
MD
649int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
650 int sock, struct pollfd *consumer_sockpoll)
651{
652 ssize_t ret;
f50f23d9 653 enum lttng_error_code ret_code = LTTNG_OK;
3bd1e081 654 struct lttcomm_consumer_msg msg;
ffe60014 655 struct lttng_consumer_channel *channel = NULL;
3bd1e081
MD
656
657 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
658 if (ret != sizeof(msg)) {
173af62f
DG
659 DBG("Consumer received unexpected message size %zd (expects %zu)",
660 ret, sizeof(msg));
ffe60014 661 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
3be74084
DG
662 /*
663 * The ret value might 0 meaning an orderly shutdown but this is ok
664 * since the caller handles this.
665 */
3bd1e081
MD
666 return ret;
667 }
668 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
f50f23d9
DG
669 /*
670 * Notify the session daemon that the command is completed.
671 *
672 * On transport layer error, the function call will print an error
673 * message so handling the returned code is a bit useless since we
674 * return an error code anyway.
675 */
676 (void) consumer_send_status_msg(sock, ret_code);
3bd1e081
MD
677 return -ENOENT;
678 }
679
3f8e211f 680 /* relayd needs RCU read-side lock */
b0b335c8
MD
681 rcu_read_lock();
682
3bd1e081 683 switch (msg.cmd_type) {
00e2e675
DG
684 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
685 {
f50f23d9 686 /* Session daemon status message are handled in the following call. */
7735ef9e
DG
687 ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
688 msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
46e6455f 689 &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
00e2e675
DG
690 goto end_nosignal;
691 }
173af62f
DG
692 case LTTNG_CONSUMER_DESTROY_RELAYD:
693 {
a6ba4fe1 694 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
173af62f
DG
695 struct consumer_relayd_sock_pair *relayd;
696
a6ba4fe1 697 DBG("UST consumer destroying relayd %" PRIu64, index);
173af62f
DG
698
699 /* Get relayd reference if exists. */
a6ba4fe1 700 relayd = consumer_find_relayd(index);
173af62f 701 if (relayd == NULL) {
3448e266 702 DBG("Unable to find relayd %" PRIu64, index);
f50f23d9 703 ret_code = LTTNG_ERR_NO_CONSUMER;
173af62f
DG
704 }
705
a6ba4fe1
DG
706 /*
707 * Each relayd socket pair has a refcount of stream attached to it
708 * which tells if the relayd is still active or not depending on the
709 * refcount value.
710 *
711 * This will set the destroy flag of the relayd object and destroy it
712 * if the refcount reaches zero when called.
713 *
714 * The destroy can happen either here or when a stream fd hangs up.
715 */
f50f23d9
DG
716 if (relayd) {
717 consumer_flag_relayd_for_destroy(relayd);
718 }
719
d88aee68 720 goto end_msg_sessiond;
173af62f 721 }
3bd1e081
MD
722 case LTTNG_CONSUMER_UPDATE_STREAM:
723 {
3f8e211f 724 rcu_read_unlock();
7ad0a0cb 725 return -ENOSYS;
3bd1e081 726 }
6d805429 727 case LTTNG_CONSUMER_DATA_PENDING:
53632229 728 {
3be74084 729 int ret, is_data_pending;
6d805429 730 uint64_t id = msg.u.data_pending.session_id;
ca22feea 731
6d805429 732 DBG("UST consumer data pending command for id %" PRIu64, id);
ca22feea 733
3be74084 734 is_data_pending = consumer_data_pending(id);
ca22feea
DG
735
736 /* Send back returned value to session daemon */
3be74084
DG
737 ret = lttcomm_send_unix_sock(sock, &is_data_pending,
738 sizeof(is_data_pending));
ca22feea 739 if (ret < 0) {
3be74084 740 DBG("Error when sending the data pending ret code: %d", ret);
ca22feea 741 }
f50f23d9
DG
742
743 /*
744 * No need to send back a status message since the data pending
745 * returned value is the response.
746 */
ca22feea 747 break;
53632229 748 }
ffe60014
DG
749 case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
750 {
751 int ret;
752 struct ustctl_consumer_channel_attr attr;
753
754 /* Create a plain object and reserve a channel key. */
755 channel = allocate_channel(msg.u.ask_channel.session_id,
756 msg.u.ask_channel.pathname, msg.u.ask_channel.name,
757 msg.u.ask_channel.uid, msg.u.ask_channel.gid,
758 msg.u.ask_channel.relayd_id, msg.u.ask_channel.key,
759 (enum lttng_event_output) msg.u.ask_channel.output);
760 if (!channel) {
761 goto end_channel_error;
762 }
763
764 /* Build channel attributes from received message. */
765 attr.subbuf_size = msg.u.ask_channel.subbuf_size;
766 attr.num_subbuf = msg.u.ask_channel.num_subbuf;
767 attr.overwrite = msg.u.ask_channel.overwrite;
768 attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
769 attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
770 memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
771
772 /* Translate the event output type to UST. */
773 switch (channel->output) {
774 case LTTNG_EVENT_SPLICE:
775 /* Splice not supported so fallback on mmap(). */
776 case LTTNG_EVENT_MMAP:
777 default:
778 attr.output = CONSUMER_CHANNEL_MMAP;
779 break;
780 };
781
782 /* Translate and save channel type. */
783 switch (msg.u.ask_channel.type) {
784 case LTTNG_UST_CHAN_PER_CPU:
785 channel->type = CONSUMER_CHANNEL_TYPE_DATA;
786 attr.type = LTTNG_UST_CHAN_PER_CPU;
787 break;
788 case LTTNG_UST_CHAN_METADATA:
789 channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
790 attr.type = LTTNG_UST_CHAN_METADATA;
791 break;
792 default:
793 assert(0);
794 goto error_fatal;
795 };
796
797 ret = ask_channel(ctx, sock, channel, &attr);
798 if (ret < 0) {
799 goto end_channel_error;
800 }
801
802 /*
803 * Add the channel to the internal state AFTER all streams were created
804 * and successfully sent to session daemon. This way, all streams must
805 * be ready before this channel is visible to the threads.
806 */
807 ret = add_channel(channel, ctx);
808 if (ret < 0) {
809 goto end_channel_error;
810 }
811
812 /*
813 * Channel and streams are now created. Inform the session daemon that
814 * everything went well and should wait to receive the channel and
815 * streams with ustctl API.
816 */
817 ret = consumer_send_status_channel(sock, channel);
818 if (ret < 0) {
819 /*
820 * There is probably a problem on the socket so the poll will get
821 * it and clean everything up.
822 */
823 goto end_nosignal;
824 }
825
826 break;
827 }
828 case LTTNG_CONSUMER_GET_CHANNEL:
829 {
830 int ret, relayd_err = 0;
d88aee68 831 uint64_t key = msg.u.get_channel.key;
ffe60014 832 struct lttng_consumer_channel *channel;
ffe60014
DG
833
834 channel = consumer_find_channel(key);
835 if (!channel) {
836 ERR("UST consumer get channel key %lu not found", key);
837 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
838 goto end_msg_sessiond;
839 }
840
841 /* Inform sessiond that we are about to send channel and streams. */
842 ret = consumer_send_status_msg(sock, LTTNG_OK);
843 if (ret < 0) {
844 /* Somehow, the session daemon is not responding anymore. */
845 goto end_nosignal;
846 }
847
848 /* Send everything to sessiond. */
849 ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
850 if (ret < 0) {
851 if (relayd_err) {
852 /*
853 * We were unable to send to the relayd the stream so avoid
854 * sending back a fatal error to the thread since this is OK
855 * and the consumer can continue its work.
856 */
857 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
858 goto end_msg_sessiond;
859 }
860 /*
861 * The communicaton was broken hence there is a bad state between
862 * the consumer and sessiond so stop everything.
863 */
864 goto error_fatal;
865 }
866
d88aee68
DG
867 ret = send_streams_to_thread(channel, ctx);
868 if (ret < 0) {
869 /*
870 * If we are unable to send the stream to the thread, there is
871 * a big problem so just stop everything.
872 */
873 goto error_fatal;
ffe60014 874 }
ffe60014
DG
875 /* List MUST be empty after or else it could be reused. */
876 assert(cds_list_empty(&channel->streams.head));
877
d88aee68
DG
878 goto end_msg_sessiond;
879 }
880 case LTTNG_CONSUMER_DESTROY_CHANNEL:
881 {
882 uint64_t key = msg.u.destroy_channel.key;
883 struct lttng_consumer_channel *channel;
884
885 channel = consumer_find_channel(key);
886 if (!channel) {
887 ERR("UST consumer get channel key %lu not found", key);
888 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
889 goto end_msg_sessiond;
ffe60014
DG
890 }
891
d88aee68
DG
892 destroy_channel(channel);
893
894 goto end_msg_sessiond;
ffe60014 895 }
d88aee68
DG
896 case LTTNG_CONSUMER_CLOSE_METADATA:
897 {
898 int ret;
899
900 ret = close_metadata(msg.u.close_metadata.key);
901 if (ret != 0) {
902 ret_code = ret;
903 }
904
905 goto end_msg_sessiond;
906 }
907 case LTTNG_CONSUMER_PUSH_METADATA:
ffe60014
DG
908 {
909 int ret;
d88aee68
DG
910 uint64_t len = msg.u.push_metadata.len;
911 uint64_t target_offset = msg.u.push_metadata.target_offset;
912 uint64_t key = msg.u.push_metadata.key;
ffe60014 913 struct lttng_consumer_channel *channel;
d88aee68 914 char *metadata_str;
ffe60014 915
d88aee68 916 DBG("UST consumer push metadata key %lu of len %lu", key, len);
ffe60014
DG
917
918 channel = consumer_find_channel(key);
919 if (!channel) {
d88aee68 920 ERR("UST consumer push metadata %lu not found", key);
ffe60014 921 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
ffe60014
DG
922 }
923
d88aee68
DG
924 metadata_str = zmalloc(len * sizeof(char));
925 if (!metadata_str) {
926 PERROR("zmalloc metadata string");
927 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
928 goto end_msg_sessiond;
929 }
930
931 /* Tell session daemon we are ready to receive the metadata. */
ffe60014
DG
932 ret = consumer_send_status_msg(sock, LTTNG_OK);
933 if (ret < 0) {
934 /* Somehow, the session daemon is not responding anymore. */
d88aee68
DG
935 goto error_fatal;
936 }
937
938 /* Wait for more data. */
939 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
940 goto end_nosignal;
941 }
942
943 /* Receive metadata string. */
944 ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
945 if (ret < 0) {
946 /* Session daemon is dead so return gracefully. */
ffe60014
DG
947 goto end_nosignal;
948 }
d88aee68
DG
949
950 ret = push_metadata(channel, metadata_str, target_offset, len);
951 free(metadata_str);
952 if (ret < 0) {
953 /* Unable to handle metadata. Notify session daemon. */
954 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
955 goto end_msg_sessiond;
956 }
957
958 goto end_msg_sessiond;
959 }
960 case LTTNG_CONSUMER_SETUP_METADATA:
961 {
962 int ret;
963
964 ret = setup_metadata(ctx, msg.u.setup_metadata.key);
965 if (ret) {
966 ret_code = ret;
967 }
968 goto end_msg_sessiond;
ffe60014 969 }
3bd1e081
MD
970 default:
971 break;
972 }
3f8e211f 973
3bd1e081 974end_nosignal:
b0b335c8 975 rcu_read_unlock();
4cbc1a04
DG
976
977 /*
978 * Return 1 to indicate success since the 0 value can be a socket
979 * shutdown during the recv() or send() call.
980 */
981 return 1;
ffe60014
DG
982
983end_msg_sessiond:
984 /*
985 * The returned value here is not useful since either way we'll return 1 to
986 * the caller because the session daemon socket management is done
987 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
988 */
989 (void) consumer_send_status_msg(sock, ret_code);
990 rcu_read_unlock();
991 return 1;
992end_channel_error:
993 if (channel) {
994 /*
995 * Free channel here since no one has a reference to it. We don't
996 * free after that because a stream can store this pointer.
997 */
998 destroy_channel(channel);
999 }
1000 /* We have to send a status channel message indicating an error. */
1001 ret = consumer_send_status_channel(sock, NULL);
1002 if (ret < 0) {
1003 /* Stop everything if session daemon can not be notified. */
1004 goto error_fatal;
1005 }
1006 rcu_read_unlock();
1007 return 1;
1008error_fatal:
1009 rcu_read_unlock();
1010 /* This will issue a consumer stop. */
1011 return -1;
3bd1e081
MD
1012}
1013
ffe60014
DG
1014/*
1015 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
1016 * compiled out, we isolate it in this library.
1017 */
1018int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
1019 unsigned long *off)
3bd1e081 1020{
ffe60014
DG
1021 assert(stream);
1022 assert(stream->ustream);
b5c5fc29 1023
ffe60014 1024 return ustctl_get_mmap_read_offset(stream->ustream, off);
3bd1e081
MD
1025}
1026
ffe60014
DG
1027/*
1028 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
1029 * compiled out, we isolate it in this library.
1030 */
1031void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream)
d056b477 1032{
ffe60014
DG
1033 assert(stream);
1034 assert(stream->ustream);
1035
1036 return ustctl_get_mmap_base(stream->ustream);
d056b477
MD
1037}
1038
ffe60014
DG
1039/*
1040 * Take a snapshot for a specific fd
1041 *
1042 * Returns 0 on success, < 0 on error
1043 */
1044int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081 1045{
ffe60014
DG
1046 assert(stream);
1047 assert(stream->ustream);
1048
1049 return ustctl_snapshot(stream->ustream);
3bd1e081
MD
1050}
1051
ffe60014
DG
1052/*
1053 * Get the produced position
1054 *
1055 * Returns 0 on success, < 0 on error
1056 */
1057int lttng_ustconsumer_get_produced_snapshot(
1058 struct lttng_consumer_stream *stream, unsigned long *pos)
3bd1e081 1059{
ffe60014
DG
1060 assert(stream);
1061 assert(stream->ustream);
1062 assert(pos);
7a57cf92 1063
ffe60014
DG
1064 return ustctl_snapshot_get_produced(stream->ustream, pos);
1065}
7a57cf92 1066
ffe60014
DG
1067/*
1068 * Called when the stream signal the consumer that it has hang up.
1069 */
1070void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
1071{
1072 assert(stream);
1073 assert(stream->ustream);
2c1dd183 1074
ffe60014
DG
1075 ustctl_flush_buffer(stream->ustream, 0);
1076 stream->hangup_flush_done = 1;
1077}
ee77a7b0 1078
ffe60014
DG
1079void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
1080{
1081 assert(chan);
1082 assert(chan->uchan);
e316aad5 1083
ffe60014 1084 ustctl_destroy_channel(chan->uchan);
3bd1e081
MD
1085}
1086
1087void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
1088{
ffe60014
DG
1089 assert(stream);
1090 assert(stream->ustream);
d41f73b7 1091
ffe60014
DG
1092 ustctl_destroy_stream(stream->ustream);
1093}
d41f73b7
MD
1094
1095int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
1096 struct lttng_consumer_local_data *ctx)
1097{
1d4dfdef 1098 unsigned long len, subbuf_size, padding;
d41f73b7
MD
1099 int err;
1100 long ret = 0;
d41f73b7 1101 char dummy;
ffe60014
DG
1102 struct ustctl_consumer_stream *ustream;
1103
1104 assert(stream);
1105 assert(stream->ustream);
1106 assert(ctx);
d41f73b7 1107
ffe60014
DG
1108 DBG2("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
1109 stream->name);
1110
1111 /* Ease our life for what's next. */
1112 ustream = stream->ustream;
d41f73b7
MD
1113
1114 /* We can consume the 1 byte written into the wait_fd by UST */
effcf122 1115 if (!stream->hangup_flush_done) {
c617c0c6
MD
1116 ssize_t readlen;
1117
effcf122
MD
1118 do {
1119 readlen = read(stream->wait_fd, &dummy, 1);
87dc6a9c 1120 } while (readlen == -1 && errno == EINTR);
effcf122
MD
1121 if (readlen == -1) {
1122 ret = readlen;
1123 goto end;
1124 }
d41f73b7
MD
1125 }
1126
d41f73b7 1127 /* Get the next subbuffer */
ffe60014 1128 err = ustctl_get_next_subbuf(ustream);
d41f73b7 1129 if (err != 0) {
1d4dfdef 1130 ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
d41f73b7
MD
1131 /*
1132 * This is a debug message even for single-threaded consumer,
1133 * because poll() have more relaxed criterions than get subbuf,
1134 * so get_subbuf may fail for short race windows where poll()
1135 * would issue wakeups.
1136 */
1137 DBG("Reserving sub buffer failed (everything is normal, "
ffe60014 1138 "it is due to concurrency) [ret: %d]", err);
d41f73b7
MD
1139 goto end;
1140 }
ffe60014 1141 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
1d4dfdef 1142 /* Get the full padded subbuffer size */
ffe60014 1143 err = ustctl_get_padded_subbuf_size(ustream, &len);
effcf122 1144 assert(err == 0);
1d4dfdef
DG
1145
1146 /* Get subbuffer data size (without padding) */
ffe60014 1147 err = ustctl_get_subbuf_size(ustream, &subbuf_size);
1d4dfdef
DG
1148 assert(err == 0);
1149
1150 /* Make sure we don't get a subbuffer size bigger than the padded */
1151 assert(len >= subbuf_size);
1152
1153 padding = len - subbuf_size;
d41f73b7 1154 /* write the subbuffer to the tracefile */
1d4dfdef 1155 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding);
91dfef6e
DG
1156 /*
1157 * The mmap operation should write subbuf_size amount of data when network
1158 * streaming or the full padding (len) size when we are _not_ streaming.
1159 */
d88aee68
DG
1160 if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
1161 (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
d41f73b7 1162 /*
91dfef6e 1163 * Display the error but continue processing to try to release the
c5c45efa
DG
1164 * subbuffer. This is a DBG statement since any unexpected kill or
1165 * signal, the application gets unregistered, relayd gets closed or
1166 * anything that affects the buffer lifetime will trigger this error.
1167 * So, for the sake of the user, don't print this error since it can
1168 * happen and it is OK with the code flow.
d41f73b7 1169 */
c5c45efa 1170 DBG("Error writing to tracefile "
91dfef6e
DG
1171 "(ret: %zd != len: %lu != subbuf_size: %lu)",
1172 ret, len, subbuf_size);
d41f73b7 1173 }
ffe60014 1174 err = ustctl_put_next_subbuf(ustream);
effcf122 1175 assert(err == 0);
d41f73b7
MD
1176end:
1177 return ret;
1178}
1179
ffe60014
DG
1180/*
1181 * Called when a stream is created.
1182 */
d41f73b7
MD
1183int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1184{
1185 int ret;
ffe60014 1186 char full_path[PATH_MAX];
d41f73b7
MD
1187
1188 /* Opening the tracefile in write mode */
d88aee68 1189 if (stream->net_seq_idx != (uint64_t) -1ULL) {
ffe60014 1190 goto end;
d41f73b7
MD
1191 }
1192
ffe60014
DG
1193 ret = snprintf(full_path, sizeof(full_path), "%s/%s",
1194 stream->chan->pathname, stream->name);
1195 if (ret < 0) {
1196 PERROR("snprintf on_recv_stream");
1197 goto error;
1198 }
1199
1200 ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC,
1201 S_IRWXU | S_IRWXG | S_IRWXO, stream->uid, stream->gid);
1202 if (ret < 0) {
1203 PERROR("open stream path %s", full_path);
c869f647
DG
1204 goto error;
1205 }
ffe60014 1206 stream->out_fd = ret;
c869f647 1207
ffe60014 1208end:
d41f73b7
MD
1209 /* we return 0 to let the library handle the FD internally */
1210 return 0;
1211
1212error:
1213 return ret;
1214}
ca22feea
DG
1215
1216/*
1217 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
1218 * stream. Consumer data lock MUST be acquired before calling this function
1219 * and the stream lock.
ca22feea 1220 *
6d805429 1221 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
1222 * data is available for trace viewer reading.
1223 */
6d805429 1224int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
1225{
1226 int ret;
1227
1228 assert(stream);
ffe60014 1229 assert(stream->ustream);
ca22feea 1230
6d805429 1231 DBG("UST consumer checking data pending");
c8f59ee5 1232
ffe60014 1233 ret = ustctl_get_next_subbuf(stream->ustream);
ca22feea
DG
1234 if (ret == 0) {
1235 /* There is still data so let's put back this subbuffer. */
ffe60014 1236 ret = ustctl_put_subbuf(stream->ustream);
ca22feea 1237 assert(ret == 0);
6d805429 1238 ret = 1; /* Data is pending */
4e9a4686 1239 goto end;
ca22feea
DG
1240 }
1241
6d805429
DG
1242 /* Data is NOT pending so ready to be read. */
1243 ret = 0;
ca22feea 1244
6efae65e
DG
1245end:
1246 return ret;
ca22feea 1247}
d88aee68
DG
1248
1249/*
1250 * Close every metadata stream wait fd of the metadata hash table. This
1251 * function MUST be used very carefully so not to run into a race between the
1252 * metadata thread handling streams and this function closing their wait fd.
1253 *
1254 * For UST, this is used when the session daemon hangs up. Its the metadata
1255 * producer so calling this is safe because we are assured that no state change
1256 * can occur in the metadata thread for the streams in the hash table.
1257 */
1258void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
1259{
1260 int ret;
1261 struct lttng_ht_iter iter;
1262 struct lttng_consumer_stream *stream;
1263
1264 assert(metadata_ht);
1265 assert(metadata_ht->ht);
1266
1267 DBG("UST consumer closing all metadata streams");
1268
1269 rcu_read_lock();
1270 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
1271 node.node) {
1272 int fd = stream->wait_fd;
1273
1274 /*
1275 * Whatever happens here we have to continue to try to close every
1276 * streams. Let's report at least the error on failure.
1277 */
1278 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
1279 if (ret) {
1280 ERR("Unable to close metadata stream fd %d ret %d", fd, ret);
1281 }
1282 DBG("Metadata wait fd %d closed", fd);
1283 }
1284 rcu_read_unlock();
1285}
d8ef542d
MD
1286
1287void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
1288{
1289 int ret;
1290
1291 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
1292 if (ret < 0) {
1293 ERR("Unable to close wakeup fd");
1294 }
1295}
This page took 0.090665 seconds and 4 git commands to generate.