Tests: Fix nprocesses applications shutdown
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
d14d33bf
AM
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
3bd1e081
MD
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
d14d33bf
AM
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
f02e1e8a 21#include <lttng/ust-ctl.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
dbb5dfe6 28#include <sys/stat.h>
3bd1e081 29#include <sys/types.h>
77c7c900 30#include <inttypes.h>
3bd1e081 31#include <unistd.h>
ffe60014 32#include <urcu/list.h>
0857097f 33
990570ed 34#include <common/common.h>
10a8a223 35#include <common/sessiond-comm/sessiond-comm.h>
00e2e675 36#include <common/relayd/relayd.h>
dbb5dfe6 37#include <common/compat/fcntl.h>
10a8a223
DG
38
39#include "ust-consumer.h"
3bd1e081
MD
40
41extern struct lttng_consumer_global_data consumer_data;
42extern int consumer_poll_timeout;
43extern volatile int consumer_quit;
44
45/*
ffe60014
DG
46 * Free channel object and all streams associated with it. This MUST be used
47 * only and only if the channel has _NEVER_ been added to the global channel
48 * hash table.
3bd1e081 49 */
ffe60014 50static void destroy_channel(struct lttng_consumer_channel *channel)
3bd1e081 51{
ffe60014
DG
52 struct lttng_consumer_stream *stream, *stmp;
53
54 assert(channel);
55
56 DBG("UST consumer cleaning stream list");
57
58 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
59 send_node) {
60 cds_list_del(&stream->send_node);
61 ustctl_destroy_stream(stream->ustream);
62 free(stream);
63 }
64
65 /*
66 * If a channel is available meaning that was created before the streams
67 * were, delete it.
68 */
69 if (channel->uchan) {
70 lttng_ustconsumer_del_channel(channel);
71 }
72 free(channel);
73}
3bd1e081
MD
74
75/*
ffe60014 76 * Add channel to internal consumer state.
3bd1e081 77 *
ffe60014 78 * Returns 0 on success or else a negative value.
3bd1e081 79 */
ffe60014
DG
80static int add_channel(struct lttng_consumer_channel *channel,
81 struct lttng_consumer_local_data *ctx)
3bd1e081
MD
82{
83 int ret = 0;
84
ffe60014
DG
85 assert(channel);
86 assert(ctx);
87
88 if (ctx->on_recv_channel != NULL) {
89 ret = ctx->on_recv_channel(channel);
90 if (ret == 0) {
d8ef542d 91 ret = consumer_add_channel(channel, ctx);
ffe60014
DG
92 } else if (ret < 0) {
93 /* Most likely an ENOMEM. */
94 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
95 goto error;
96 }
97 } else {
d8ef542d 98 ret = consumer_add_channel(channel, ctx);
3bd1e081
MD
99 }
100
d88aee68 101 DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
ffe60014
DG
102
103error:
3bd1e081
MD
104 return ret;
105}
106
107/*
ffe60014
DG
108 * Allocate and return a consumer channel object.
109 */
110static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
111 const char *pathname, const char *name, uid_t uid, gid_t gid,
d88aee68 112 int relayd_id, uint64_t key, enum lttng_event_output output)
ffe60014
DG
113{
114 assert(pathname);
115 assert(name);
116
117 return consumer_allocate_channel(key, session_id, pathname, name, uid, gid,
118 relayd_id, output);
119}
120
121/*
122 * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
123 * error value if applicable is set in it else it is kept untouched.
3bd1e081 124 *
ffe60014 125 * Return NULL on error else the newly allocated stream object.
3bd1e081 126 */
ffe60014
DG
127static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
128 struct lttng_consumer_channel *channel,
129 struct lttng_consumer_local_data *ctx, int *_alloc_ret)
130{
131 int alloc_ret;
132 struct lttng_consumer_stream *stream = NULL;
133
134 assert(channel);
135 assert(ctx);
136
137 stream = consumer_allocate_stream(channel->key,
138 key,
139 LTTNG_CONSUMER_ACTIVE_STREAM,
140 channel->name,
141 channel->uid,
142 channel->gid,
143 channel->relayd_id,
144 channel->session_id,
145 cpu,
146 &alloc_ret,
147 channel->type);
148 if (stream == NULL) {
149 switch (alloc_ret) {
150 case -ENOENT:
151 /*
152 * We could not find the channel. Can happen if cpu hotplug
153 * happens while tearing down.
154 */
155 DBG3("Could not find channel");
156 break;
157 case -ENOMEM:
158 case -EINVAL:
159 default:
160 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
161 break;
162 }
163 goto error;
164 }
165
166 stream->chan = channel;
167
168error:
169 if (_alloc_ret) {
170 *_alloc_ret = alloc_ret;
171 }
172 return stream;
173}
174
175/*
176 * Send the given stream pointer to the corresponding thread.
177 *
178 * Returns 0 on success else a negative value.
179 */
180static int send_stream_to_thread(struct lttng_consumer_stream *stream,
181 struct lttng_consumer_local_data *ctx)
182{
183 int ret, stream_pipe;
184
185 /* Get the right pipe where the stream will be sent. */
186 if (stream->metadata_flag) {
187 stream_pipe = ctx->consumer_metadata_pipe[1];
188 } else {
189 stream_pipe = ctx->consumer_data_pipe[1];
190 }
191
192 do {
193 ret = write(stream_pipe, &stream, sizeof(stream));
194 } while (ret < 0 && errno == EINTR);
195 if (ret < 0) {
196 PERROR("Consumer write %s stream to pipe %d",
197 stream->metadata_flag ? "metadata" : "data", stream_pipe);
198 }
199
200 return ret;
201}
202
203/*
204 * Search for a relayd object related to the stream. If found, send the stream
205 * to the relayd.
206 *
207 * On success, returns 0 else a negative value.
208 */
209static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
210{
211 int ret = 0;
212 struct consumer_relayd_sock_pair *relayd;
213
214 assert(stream);
215
216 relayd = consumer_find_relayd(stream->net_seq_idx);
217 if (relayd != NULL) {
218 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
219 /* Add stream on the relayd */
220 ret = relayd_add_stream(&relayd->control_sock, stream->name,
221 stream->chan->pathname, &stream->relayd_stream_id);
222 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
223 if (ret < 0) {
224 goto error;
225 }
d88aee68
DG
226 } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
227 ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
ffe60014
DG
228 stream->net_seq_idx);
229 ret = -1;
230 goto error;
231 }
232
233error:
234 return ret;
235}
236
d88aee68
DG
237/*
238 * Create streams for the given channel using liblttng-ust-ctl.
239 *
240 * Return 0 on success else a negative value.
241 */
ffe60014
DG
242static int create_ust_streams(struct lttng_consumer_channel *channel,
243 struct lttng_consumer_local_data *ctx)
244{
245 int ret, cpu = 0;
246 struct ustctl_consumer_stream *ustream;
247 struct lttng_consumer_stream *stream;
248
249 assert(channel);
250 assert(ctx);
251
252 /*
253 * While a stream is available from ustctl. When NULL is returned, we've
254 * reached the end of the possible stream for the channel.
255 */
256 while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
257 int wait_fd;
258
749d339a 259 wait_fd = ustctl_stream_get_wait_fd(ustream);
ffe60014
DG
260
261 /* Allocate consumer stream object. */
262 stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
263 if (!stream) {
264 goto error_alloc;
265 }
266 stream->ustream = ustream;
267 /*
268 * Store it so we can save multiple function calls afterwards since
269 * this value is used heavily in the stream threads. This is UST
270 * specific so this is why it's done after allocation.
271 */
272 stream->wait_fd = wait_fd;
273
274 /*
275 * Order is important this is why a list is used. On error, the caller
276 * should clean this list.
277 */
278 cds_list_add_tail(&stream->send_node, &channel->streams.head);
279
280 ret = ustctl_get_max_subbuf_size(stream->ustream,
281 &stream->max_sb_size);
282 if (ret < 0) {
283 ERR("ustctl_get_max_subbuf_size failed for stream %s",
284 stream->name);
285 goto error;
286 }
287
288 /* Do actions once stream has been received. */
289 if (ctx->on_recv_stream) {
290 ret = ctx->on_recv_stream(stream);
291 if (ret < 0) {
292 goto error;
293 }
294 }
295
d88aee68 296 DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
ffe60014
DG
297 stream->name, stream->key, stream->relayd_stream_id);
298
299 /* Set next CPU stream. */
300 channel->streams.count = ++cpu;
d88aee68
DG
301
302 /* Keep stream reference when creating metadata. */
303 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
304 channel->metadata_stream = stream;
305 }
ffe60014
DG
306 }
307
308 return 0;
309
310error:
311error_alloc:
312 return ret;
313}
314
315/*
316 * Create an UST channel with the given attributes and send it to the session
317 * daemon using the ust ctl API.
318 *
319 * Return 0 on success or else a negative value.
320 */
321static int create_ust_channel(struct ustctl_consumer_channel_attr *attr,
322 struct ustctl_consumer_channel **chanp)
323{
324 int ret;
325 struct ustctl_consumer_channel *channel;
326
327 assert(attr);
328 assert(chanp);
329
330 DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
331 "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
332 "switch_timer_interval: %u, read_timer_interval: %u, "
333 "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
334 attr->num_subbuf, attr->switch_timer_interval,
335 attr->read_timer_interval, attr->output, attr->type);
336
337 channel = ustctl_create_channel(attr);
338 if (!channel) {
339 ret = -1;
340 goto error_create;
341 }
342
343 *chanp = channel;
344
345 return 0;
346
347error_create:
348 return ret;
349}
350
d88aee68
DG
351/*
352 * Send a single given stream to the session daemon using the sock.
353 *
354 * Return 0 on success else a negative value.
355 */
ffe60014
DG
356static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
357{
358 int ret;
359
360 assert(stream);
361 assert(sock >= 0);
362
d88aee68 363 DBG2("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
ffe60014
DG
364
365 /* Send stream to session daemon. */
366 ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
367 if (ret < 0) {
368 goto error;
369 }
370
ffe60014
DG
371error:
372 return ret;
373}
374
375/*
376 * Send channel to sessiond.
377 *
d88aee68 378 * Return 0 on success or else a negative value.
ffe60014
DG
379 */
380static int send_sessiond_channel(int sock,
381 struct lttng_consumer_channel *channel,
382 struct lttng_consumer_local_data *ctx, int *relayd_error)
383{
384 int ret;
385 struct lttng_consumer_stream *stream;
386
387 assert(channel);
388 assert(ctx);
389 assert(sock >= 0);
390
391 DBG("UST consumer sending channel %s to sessiond", channel->name);
392
393 /* Send channel to sessiond. */
394 ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
395 if (ret < 0) {
396 goto error;
397 }
398
d8ef542d
MD
399 ret = ustctl_channel_close_wakeup_fd(channel->uchan);
400 if (ret < 0) {
401 goto error;
402 }
403
ffe60014
DG
404 /* The channel was sent successfully to the sessiond at this point. */
405 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
406 /* Try to send the stream to the relayd if one is available. */
407 ret = send_stream_to_relayd(stream);
408 if (ret < 0) {
409 /*
410 * Flag that the relayd was the problem here probably due to a
411 * communicaton error on the socket.
412 */
413 if (relayd_error) {
414 *relayd_error = 1;
415 }
416 goto error;
417 }
418
419 /* Send stream to session daemon. */
420 ret = send_sessiond_stream(sock, stream);
421 if (ret < 0) {
422 goto error;
423 }
424 }
425
426 /* Tell sessiond there is no more stream. */
427 ret = ustctl_send_stream_to_sessiond(sock, NULL);
428 if (ret < 0) {
429 goto error;
430 }
431
432 DBG("UST consumer NULL stream sent to sessiond");
433
434 return 0;
435
436error:
437 return ret;
438}
439
440/*
441 * Creates a channel and streams and add the channel it to the channel internal
442 * state. The created stream must ONLY be sent once the GET_CHANNEL command is
443 * received.
444 *
445 * Return 0 on success or else, a negative value is returned and the channel
446 * MUST be destroyed by consumer_del_channel().
447 */
448static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
449 struct lttng_consumer_channel *channel,
450 struct ustctl_consumer_channel_attr *attr)
3bd1e081
MD
451{
452 int ret;
453
ffe60014
DG
454 assert(ctx);
455 assert(channel);
456 assert(attr);
457
458 /*
459 * This value is still used by the kernel consumer since for the kernel,
460 * the stream ownership is not IN the consumer so we need to have the
461 * number of left stream that needs to be initialized so we can know when
462 * to delete the channel (see consumer.c).
463 *
464 * As for the user space tracer now, the consumer creates and sends the
465 * stream to the session daemon which only sends them to the application
466 * once every stream of a channel is received making this value useless
467 * because we they will be added to the poll thread before the application
468 * receives them. This ensures that a stream can not hang up during
469 * initilization of a channel.
470 */
471 channel->nb_init_stream_left = 0;
472
473 /* The reply msg status is handled in the following call. */
474 ret = create_ust_channel(attr, &channel->uchan);
475 if (ret < 0) {
476 goto error;
3bd1e081
MD
477 }
478
d8ef542d
MD
479 channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
480
481 if (ret < 0) {
482 goto error;
483 }
484
ffe60014
DG
485 /* Open all streams for this channel. */
486 ret = create_ust_streams(channel, ctx);
487 if (ret < 0) {
488 goto error;
489 }
490
491error:
3bd1e081
MD
492 return ret;
493}
494
d88aee68
DG
495/*
496 * Send all stream of a channel to the right thread handling it.
497 *
498 * On error, return a negative value else 0 on success.
499 */
500static int send_streams_to_thread(struct lttng_consumer_channel *channel,
501 struct lttng_consumer_local_data *ctx)
502{
503 int ret = 0;
504 struct lttng_consumer_stream *stream, *stmp;
505
506 assert(channel);
507 assert(ctx);
508
509 /* Send streams to the corresponding thread. */
510 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
511 send_node) {
512 /* Sending the stream to the thread. */
513 ret = send_stream_to_thread(stream, ctx);
514 if (ret < 0) {
515 /*
516 * If we are unable to send the stream to the thread, there is
517 * a big problem so just stop everything.
518 */
519 goto error;
520 }
521
522 /* Remove node from the channel stream list. */
523 cds_list_del(&stream->send_node);
524 }
525
526error:
527 return ret;
528}
529
530/*
531 * Write metadata to the given channel using ustctl to convert the string to
532 * the ringbuffer.
533 *
534 * Return 0 on success else a negative value.
535 */
536static int push_metadata(struct lttng_consumer_channel *metadata,
537 const char *metadata_str, uint64_t target_offset, uint64_t len)
538{
539 int ret;
540
541 assert(metadata);
542 assert(metadata_str);
543
544 DBG("UST consumer writing metadata to channel %s", metadata->name);
545
546 assert(target_offset == metadata->contig_metadata_written);
547 ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len);
548 if (ret < 0) {
549 ERR("ustctl write metadata fail with ret %d, len %ld", ret, len);
550 goto error;
551 }
552 metadata->contig_metadata_written += len;
553
554 ustctl_flush_buffer(metadata->metadata_stream->ustream, 1);
555
556error:
557 return ret;
558}
559
7972aab2
DG
560/*
561 * Flush channel's streams using the given key to retrieve the channel.
562 *
563 * Return 0 on success else an LTTng error code.
564 */
565static int flush_channel(uint64_t chan_key)
566{
567 int ret = 0;
568 struct lttng_consumer_channel *channel;
569 struct lttng_consumer_stream *stream;
570 struct lttng_ht *ht;
571 struct lttng_ht_iter iter;
572
573 DBG("UST consumer flush channel key %lu", chan_key);
574
575 channel = consumer_find_channel(chan_key);
576 if (!channel) {
577 ERR("UST consumer flush channel %lu not found", chan_key);
578 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
579 goto error;
580 }
581
582 ht = consumer_data.stream_per_chan_id_ht;
583
584 /* For each stream of the channel id, flush it. */
585 rcu_read_lock();
586 cds_lfht_for_each_entry_duplicate(ht->ht,
587 ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
588 &channel->key, &iter.iter, stream, node_channel_id.node) {
589 ustctl_flush_buffer(stream->ustream, 1);
590 }
591 rcu_read_unlock();
592
593error:
594 return ret;
595}
596
d88aee68
DG
597/*
598 * Close metadata stream wakeup_fd using the given key to retrieve the channel.
599 *
600 * Return 0 on success else an LTTng error code.
601 */
602static int close_metadata(uint64_t chan_key)
603{
604 int ret;
605 struct lttng_consumer_channel *channel;
606
607 DBG("UST consumer close metadata key %lu", chan_key);
608
609 channel = consumer_find_channel(chan_key);
610 if (!channel) {
611 ERR("UST consumer close metadata %lu not found", chan_key);
612 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
613 goto error;
614 }
615
616 ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
617 if (ret < 0) {
618 ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
619 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
620 goto error;
621 }
622
623error:
624 return ret;
625}
626
627/*
628 * RCU read side lock MUST be acquired before calling this function.
629 *
630 * Return 0 on success else an LTTng error code.
631 */
632static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
633{
634 int ret;
635 struct lttng_consumer_channel *metadata;
636
637 DBG("UST consumer setup metadata key %lu", key);
638
639 metadata = consumer_find_channel(key);
640 if (!metadata) {
641 ERR("UST consumer push metadata %" PRIu64 " not found", key);
642 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
643 goto error;
644 }
645
646 /*
647 * Send metadata stream to relayd if one available. Availability is
648 * known if the stream is still in the list of the channel.
649 */
650 if (cds_list_empty(&metadata->streams.head)) {
651 ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
652 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
653 goto error;
654 }
655
656 /* Send metadata stream to relayd if needed. */
657 ret = send_stream_to_relayd(metadata->metadata_stream);
658 if (ret < 0) {
659 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
660 goto error;
661 }
662
663 ret = send_streams_to_thread(metadata, ctx);
664 if (ret < 0) {
665 /*
666 * If we are unable to send the stream to the thread, there is
667 * a big problem so just stop everything.
668 */
669 ret = LTTCOMM_CONSUMERD_FATAL;
670 goto error;
671 }
672 /* List MUST be empty after or else it could be reused. */
673 assert(cds_list_empty(&metadata->streams.head));
674
675 ret = 0;
676
677error:
678 return ret;
679}
680
4cbc1a04
DG
681/*
682 * Receive command from session daemon and process it.
683 *
684 * Return 1 on success else a negative value or 0.
685 */
3bd1e081
MD
686int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
687 int sock, struct pollfd *consumer_sockpoll)
688{
689 ssize_t ret;
f50f23d9 690 enum lttng_error_code ret_code = LTTNG_OK;
3bd1e081 691 struct lttcomm_consumer_msg msg;
ffe60014 692 struct lttng_consumer_channel *channel = NULL;
3bd1e081
MD
693
694 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
695 if (ret != sizeof(msg)) {
173af62f
DG
696 DBG("Consumer received unexpected message size %zd (expects %zu)",
697 ret, sizeof(msg));
ffe60014 698 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
3be74084
DG
699 /*
700 * The ret value might 0 meaning an orderly shutdown but this is ok
701 * since the caller handles this.
702 */
3bd1e081
MD
703 return ret;
704 }
705 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
f50f23d9
DG
706 /*
707 * Notify the session daemon that the command is completed.
708 *
709 * On transport layer error, the function call will print an error
710 * message so handling the returned code is a bit useless since we
711 * return an error code anyway.
712 */
713 (void) consumer_send_status_msg(sock, ret_code);
3bd1e081
MD
714 return -ENOENT;
715 }
716
3f8e211f 717 /* relayd needs RCU read-side lock */
b0b335c8
MD
718 rcu_read_lock();
719
3bd1e081 720 switch (msg.cmd_type) {
00e2e675
DG
721 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
722 {
f50f23d9 723 /* Session daemon status message are handled in the following call. */
7735ef9e
DG
724 ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
725 msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
46e6455f 726 &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
00e2e675
DG
727 goto end_nosignal;
728 }
173af62f
DG
729 case LTTNG_CONSUMER_DESTROY_RELAYD:
730 {
a6ba4fe1 731 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
173af62f
DG
732 struct consumer_relayd_sock_pair *relayd;
733
a6ba4fe1 734 DBG("UST consumer destroying relayd %" PRIu64, index);
173af62f
DG
735
736 /* Get relayd reference if exists. */
a6ba4fe1 737 relayd = consumer_find_relayd(index);
173af62f 738 if (relayd == NULL) {
3448e266 739 DBG("Unable to find relayd %" PRIu64, index);
f50f23d9 740 ret_code = LTTNG_ERR_NO_CONSUMER;
173af62f
DG
741 }
742
a6ba4fe1
DG
743 /*
744 * Each relayd socket pair has a refcount of stream attached to it
745 * which tells if the relayd is still active or not depending on the
746 * refcount value.
747 *
748 * This will set the destroy flag of the relayd object and destroy it
749 * if the refcount reaches zero when called.
750 *
751 * The destroy can happen either here or when a stream fd hangs up.
752 */
f50f23d9
DG
753 if (relayd) {
754 consumer_flag_relayd_for_destroy(relayd);
755 }
756
d88aee68 757 goto end_msg_sessiond;
173af62f 758 }
3bd1e081
MD
759 case LTTNG_CONSUMER_UPDATE_STREAM:
760 {
3f8e211f 761 rcu_read_unlock();
7ad0a0cb 762 return -ENOSYS;
3bd1e081 763 }
6d805429 764 case LTTNG_CONSUMER_DATA_PENDING:
53632229 765 {
3be74084 766 int ret, is_data_pending;
6d805429 767 uint64_t id = msg.u.data_pending.session_id;
ca22feea 768
6d805429 769 DBG("UST consumer data pending command for id %" PRIu64, id);
ca22feea 770
3be74084 771 is_data_pending = consumer_data_pending(id);
ca22feea
DG
772
773 /* Send back returned value to session daemon */
3be74084
DG
774 ret = lttcomm_send_unix_sock(sock, &is_data_pending,
775 sizeof(is_data_pending));
ca22feea 776 if (ret < 0) {
3be74084 777 DBG("Error when sending the data pending ret code: %d", ret);
ca22feea 778 }
f50f23d9
DG
779
780 /*
781 * No need to send back a status message since the data pending
782 * returned value is the response.
783 */
ca22feea 784 break;
53632229 785 }
ffe60014
DG
786 case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
787 {
788 int ret;
789 struct ustctl_consumer_channel_attr attr;
790
791 /* Create a plain object and reserve a channel key. */
792 channel = allocate_channel(msg.u.ask_channel.session_id,
793 msg.u.ask_channel.pathname, msg.u.ask_channel.name,
794 msg.u.ask_channel.uid, msg.u.ask_channel.gid,
795 msg.u.ask_channel.relayd_id, msg.u.ask_channel.key,
796 (enum lttng_event_output) msg.u.ask_channel.output);
797 if (!channel) {
798 goto end_channel_error;
799 }
800
801 /* Build channel attributes from received message. */
802 attr.subbuf_size = msg.u.ask_channel.subbuf_size;
803 attr.num_subbuf = msg.u.ask_channel.num_subbuf;
804 attr.overwrite = msg.u.ask_channel.overwrite;
805 attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
806 attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
7972aab2 807 attr.chan_id = msg.u.ask_channel.chan_id;
ffe60014
DG
808 memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
809
810 /* Translate the event output type to UST. */
811 switch (channel->output) {
812 case LTTNG_EVENT_SPLICE:
813 /* Splice not supported so fallback on mmap(). */
814 case LTTNG_EVENT_MMAP:
815 default:
816 attr.output = CONSUMER_CHANNEL_MMAP;
817 break;
818 };
819
820 /* Translate and save channel type. */
821 switch (msg.u.ask_channel.type) {
822 case LTTNG_UST_CHAN_PER_CPU:
823 channel->type = CONSUMER_CHANNEL_TYPE_DATA;
824 attr.type = LTTNG_UST_CHAN_PER_CPU;
825 break;
826 case LTTNG_UST_CHAN_METADATA:
827 channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
828 attr.type = LTTNG_UST_CHAN_METADATA;
829 break;
830 default:
831 assert(0);
832 goto error_fatal;
833 };
834
835 ret = ask_channel(ctx, sock, channel, &attr);
836 if (ret < 0) {
837 goto end_channel_error;
838 }
839
840 /*
841 * Add the channel to the internal state AFTER all streams were created
842 * and successfully sent to session daemon. This way, all streams must
843 * be ready before this channel is visible to the threads.
844 */
845 ret = add_channel(channel, ctx);
846 if (ret < 0) {
847 goto end_channel_error;
848 }
849
850 /*
851 * Channel and streams are now created. Inform the session daemon that
852 * everything went well and should wait to receive the channel and
853 * streams with ustctl API.
854 */
855 ret = consumer_send_status_channel(sock, channel);
856 if (ret < 0) {
857 /*
858 * There is probably a problem on the socket so the poll will get
859 * it and clean everything up.
860 */
861 goto end_nosignal;
862 }
863
864 break;
865 }
866 case LTTNG_CONSUMER_GET_CHANNEL:
867 {
868 int ret, relayd_err = 0;
d88aee68 869 uint64_t key = msg.u.get_channel.key;
ffe60014 870 struct lttng_consumer_channel *channel;
ffe60014
DG
871
872 channel = consumer_find_channel(key);
873 if (!channel) {
874 ERR("UST consumer get channel key %lu not found", key);
875 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
876 goto end_msg_sessiond;
877 }
878
879 /* Inform sessiond that we are about to send channel and streams. */
880 ret = consumer_send_status_msg(sock, LTTNG_OK);
881 if (ret < 0) {
882 /* Somehow, the session daemon is not responding anymore. */
883 goto end_nosignal;
884 }
885
886 /* Send everything to sessiond. */
887 ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
888 if (ret < 0) {
889 if (relayd_err) {
890 /*
891 * We were unable to send to the relayd the stream so avoid
892 * sending back a fatal error to the thread since this is OK
893 * and the consumer can continue its work.
894 */
895 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
896 goto end_msg_sessiond;
897 }
898 /*
899 * The communicaton was broken hence there is a bad state between
900 * the consumer and sessiond so stop everything.
901 */
902 goto error_fatal;
903 }
904
d88aee68
DG
905 ret = send_streams_to_thread(channel, ctx);
906 if (ret < 0) {
907 /*
908 * If we are unable to send the stream to the thread, there is
909 * a big problem so just stop everything.
910 */
911 goto error_fatal;
ffe60014 912 }
ffe60014
DG
913 /* List MUST be empty after or else it could be reused. */
914 assert(cds_list_empty(&channel->streams.head));
915
d88aee68
DG
916 goto end_msg_sessiond;
917 }
918 case LTTNG_CONSUMER_DESTROY_CHANNEL:
919 {
920 uint64_t key = msg.u.destroy_channel.key;
921 struct lttng_consumer_channel *channel;
922
923 channel = consumer_find_channel(key);
924 if (!channel) {
925 ERR("UST consumer get channel key %lu not found", key);
926 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
927 goto end_msg_sessiond;
ffe60014
DG
928 }
929
d88aee68
DG
930 destroy_channel(channel);
931
932 goto end_msg_sessiond;
ffe60014 933 }
d88aee68
DG
934 case LTTNG_CONSUMER_CLOSE_METADATA:
935 {
936 int ret;
937
938 ret = close_metadata(msg.u.close_metadata.key);
939 if (ret != 0) {
940 ret_code = ret;
941 }
942
943 goto end_msg_sessiond;
944 }
7972aab2
DG
945 case LTTNG_CONSUMER_FLUSH_CHANNEL:
946 {
947 int ret;
948
949 ret = flush_channel(msg.u.flush_channel.key);
950 if (ret != 0) {
951 ret_code = ret;
952 }
953
954 goto end_msg_sessiond;
955 }
d88aee68 956 case LTTNG_CONSUMER_PUSH_METADATA:
ffe60014
DG
957 {
958 int ret;
d88aee68
DG
959 uint64_t len = msg.u.push_metadata.len;
960 uint64_t target_offset = msg.u.push_metadata.target_offset;
961 uint64_t key = msg.u.push_metadata.key;
ffe60014 962 struct lttng_consumer_channel *channel;
d88aee68 963 char *metadata_str;
ffe60014 964
d88aee68 965 DBG("UST consumer push metadata key %lu of len %lu", key, len);
ffe60014
DG
966
967 channel = consumer_find_channel(key);
968 if (!channel) {
d88aee68 969 ERR("UST consumer push metadata %lu not found", key);
ffe60014 970 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
7972aab2 971 goto end_msg_sessiond;
ffe60014
DG
972 }
973
d88aee68
DG
974 metadata_str = zmalloc(len * sizeof(char));
975 if (!metadata_str) {
976 PERROR("zmalloc metadata string");
977 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
978 goto end_msg_sessiond;
979 }
980
981 /* Tell session daemon we are ready to receive the metadata. */
ffe60014
DG
982 ret = consumer_send_status_msg(sock, LTTNG_OK);
983 if (ret < 0) {
984 /* Somehow, the session daemon is not responding anymore. */
d88aee68
DG
985 goto error_fatal;
986 }
987
988 /* Wait for more data. */
989 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
990 goto end_nosignal;
991 }
992
993 /* Receive metadata string. */
994 ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
995 if (ret < 0) {
996 /* Session daemon is dead so return gracefully. */
ffe60014
DG
997 goto end_nosignal;
998 }
d88aee68
DG
999
1000 ret = push_metadata(channel, metadata_str, target_offset, len);
1001 free(metadata_str);
1002 if (ret < 0) {
1003 /* Unable to handle metadata. Notify session daemon. */
1004 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
1005 goto end_msg_sessiond;
1006 }
1007
1008 goto end_msg_sessiond;
1009 }
1010 case LTTNG_CONSUMER_SETUP_METADATA:
1011 {
1012 int ret;
1013
1014 ret = setup_metadata(ctx, msg.u.setup_metadata.key);
1015 if (ret) {
1016 ret_code = ret;
1017 }
1018 goto end_msg_sessiond;
ffe60014 1019 }
3bd1e081
MD
1020 default:
1021 break;
1022 }
3f8e211f 1023
3bd1e081 1024end_nosignal:
b0b335c8 1025 rcu_read_unlock();
4cbc1a04
DG
1026
1027 /*
1028 * Return 1 to indicate success since the 0 value can be a socket
1029 * shutdown during the recv() or send() call.
1030 */
1031 return 1;
ffe60014
DG
1032
1033end_msg_sessiond:
1034 /*
1035 * The returned value here is not useful since either way we'll return 1 to
1036 * the caller because the session daemon socket management is done
1037 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
1038 */
1039 (void) consumer_send_status_msg(sock, ret_code);
1040 rcu_read_unlock();
1041 return 1;
1042end_channel_error:
1043 if (channel) {
1044 /*
1045 * Free channel here since no one has a reference to it. We don't
1046 * free after that because a stream can store this pointer.
1047 */
1048 destroy_channel(channel);
1049 }
1050 /* We have to send a status channel message indicating an error. */
1051 ret = consumer_send_status_channel(sock, NULL);
1052 if (ret < 0) {
1053 /* Stop everything if session daemon can not be notified. */
1054 goto error_fatal;
1055 }
1056 rcu_read_unlock();
1057 return 1;
1058error_fatal:
1059 rcu_read_unlock();
1060 /* This will issue a consumer stop. */
1061 return -1;
3bd1e081
MD
1062}
1063
ffe60014
DG
1064/*
1065 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
1066 * compiled out, we isolate it in this library.
1067 */
1068int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
1069 unsigned long *off)
3bd1e081 1070{
ffe60014
DG
1071 assert(stream);
1072 assert(stream->ustream);
b5c5fc29 1073
ffe60014 1074 return ustctl_get_mmap_read_offset(stream->ustream, off);
3bd1e081
MD
1075}
1076
ffe60014
DG
1077/*
1078 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
1079 * compiled out, we isolate it in this library.
1080 */
1081void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream)
d056b477 1082{
ffe60014
DG
1083 assert(stream);
1084 assert(stream->ustream);
1085
1086 return ustctl_get_mmap_base(stream->ustream);
d056b477
MD
1087}
1088
ffe60014
DG
1089/*
1090 * Take a snapshot for a specific fd
1091 *
1092 * Returns 0 on success, < 0 on error
1093 */
1094int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081 1095{
ffe60014
DG
1096 assert(stream);
1097 assert(stream->ustream);
1098
1099 return ustctl_snapshot(stream->ustream);
3bd1e081
MD
1100}
1101
ffe60014
DG
1102/*
1103 * Get the produced position
1104 *
1105 * Returns 0 on success, < 0 on error
1106 */
1107int lttng_ustconsumer_get_produced_snapshot(
1108 struct lttng_consumer_stream *stream, unsigned long *pos)
3bd1e081 1109{
ffe60014
DG
1110 assert(stream);
1111 assert(stream->ustream);
1112 assert(pos);
7a57cf92 1113
ffe60014
DG
1114 return ustctl_snapshot_get_produced(stream->ustream, pos);
1115}
7a57cf92 1116
ffe60014
DG
1117/*
1118 * Called when the stream signal the consumer that it has hang up.
1119 */
1120void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
1121{
1122 assert(stream);
1123 assert(stream->ustream);
2c1dd183 1124
ffe60014
DG
1125 ustctl_flush_buffer(stream->ustream, 0);
1126 stream->hangup_flush_done = 1;
1127}
ee77a7b0 1128
ffe60014
DG
1129void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
1130{
1131 assert(chan);
1132 assert(chan->uchan);
e316aad5 1133
ffe60014 1134 ustctl_destroy_channel(chan->uchan);
3bd1e081
MD
1135}
1136
1137void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
1138{
ffe60014
DG
1139 assert(stream);
1140 assert(stream->ustream);
d41f73b7 1141
ffe60014
DG
1142 ustctl_destroy_stream(stream->ustream);
1143}
d41f73b7
MD
1144
1145int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
1146 struct lttng_consumer_local_data *ctx)
1147{
1d4dfdef 1148 unsigned long len, subbuf_size, padding;
d41f73b7
MD
1149 int err;
1150 long ret = 0;
d41f73b7 1151 char dummy;
ffe60014
DG
1152 struct ustctl_consumer_stream *ustream;
1153
1154 assert(stream);
1155 assert(stream->ustream);
1156 assert(ctx);
d41f73b7 1157
ffe60014
DG
1158 DBG2("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
1159 stream->name);
1160
1161 /* Ease our life for what's next. */
1162 ustream = stream->ustream;
d41f73b7
MD
1163
1164 /* We can consume the 1 byte written into the wait_fd by UST */
effcf122 1165 if (!stream->hangup_flush_done) {
c617c0c6
MD
1166 ssize_t readlen;
1167
effcf122
MD
1168 do {
1169 readlen = read(stream->wait_fd, &dummy, 1);
87dc6a9c 1170 } while (readlen == -1 && errno == EINTR);
effcf122
MD
1171 if (readlen == -1) {
1172 ret = readlen;
1173 goto end;
1174 }
d41f73b7
MD
1175 }
1176
d41f73b7 1177 /* Get the next subbuffer */
ffe60014 1178 err = ustctl_get_next_subbuf(ustream);
d41f73b7 1179 if (err != 0) {
1d4dfdef 1180 ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
d41f73b7
MD
1181 /*
1182 * This is a debug message even for single-threaded consumer,
1183 * because poll() have more relaxed criterions than get subbuf,
1184 * so get_subbuf may fail for short race windows where poll()
1185 * would issue wakeups.
1186 */
1187 DBG("Reserving sub buffer failed (everything is normal, "
ffe60014 1188 "it is due to concurrency) [ret: %d]", err);
d41f73b7
MD
1189 goto end;
1190 }
ffe60014 1191 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
1d4dfdef 1192 /* Get the full padded subbuffer size */
ffe60014 1193 err = ustctl_get_padded_subbuf_size(ustream, &len);
effcf122 1194 assert(err == 0);
1d4dfdef
DG
1195
1196 /* Get subbuffer data size (without padding) */
ffe60014 1197 err = ustctl_get_subbuf_size(ustream, &subbuf_size);
1d4dfdef
DG
1198 assert(err == 0);
1199
1200 /* Make sure we don't get a subbuffer size bigger than the padded */
1201 assert(len >= subbuf_size);
1202
1203 padding = len - subbuf_size;
d41f73b7 1204 /* write the subbuffer to the tracefile */
1d4dfdef 1205 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding);
91dfef6e
DG
1206 /*
1207 * The mmap operation should write subbuf_size amount of data when network
1208 * streaming or the full padding (len) size when we are _not_ streaming.
1209 */
d88aee68
DG
1210 if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
1211 (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
d41f73b7 1212 /*
91dfef6e 1213 * Display the error but continue processing to try to release the
c5c45efa
DG
1214 * subbuffer. This is a DBG statement since any unexpected kill or
1215 * signal, the application gets unregistered, relayd gets closed or
1216 * anything that affects the buffer lifetime will trigger this error.
1217 * So, for the sake of the user, don't print this error since it can
1218 * happen and it is OK with the code flow.
d41f73b7 1219 */
c5c45efa 1220 DBG("Error writing to tracefile "
91dfef6e
DG
1221 "(ret: %zd != len: %lu != subbuf_size: %lu)",
1222 ret, len, subbuf_size);
d41f73b7 1223 }
ffe60014 1224 err = ustctl_put_next_subbuf(ustream);
effcf122 1225 assert(err == 0);
d41f73b7
MD
1226end:
1227 return ret;
1228}
1229
ffe60014
DG
1230/*
1231 * Called when a stream is created.
1232 */
d41f73b7
MD
1233int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1234{
1235 int ret;
ffe60014 1236 char full_path[PATH_MAX];
d41f73b7
MD
1237
1238 /* Opening the tracefile in write mode */
d88aee68 1239 if (stream->net_seq_idx != (uint64_t) -1ULL) {
ffe60014 1240 goto end;
d41f73b7
MD
1241 }
1242
ffe60014
DG
1243 ret = snprintf(full_path, sizeof(full_path), "%s/%s",
1244 stream->chan->pathname, stream->name);
1245 if (ret < 0) {
1246 PERROR("snprintf on_recv_stream");
1247 goto error;
1248 }
1249
1250 ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC,
1251 S_IRWXU | S_IRWXG | S_IRWXO, stream->uid, stream->gid);
1252 if (ret < 0) {
1253 PERROR("open stream path %s", full_path);
c869f647
DG
1254 goto error;
1255 }
ffe60014 1256 stream->out_fd = ret;
c869f647 1257
ffe60014 1258end:
d41f73b7
MD
1259 /* we return 0 to let the library handle the FD internally */
1260 return 0;
1261
1262error:
1263 return ret;
1264}
ca22feea
DG
1265
1266/*
1267 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
1268 * stream. Consumer data lock MUST be acquired before calling this function
1269 * and the stream lock.
ca22feea 1270 *
6d805429 1271 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
1272 * data is available for trace viewer reading.
1273 */
6d805429 1274int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
1275{
1276 int ret;
1277
1278 assert(stream);
ffe60014 1279 assert(stream->ustream);
ca22feea 1280
6d805429 1281 DBG("UST consumer checking data pending");
c8f59ee5 1282
ffe60014 1283 ret = ustctl_get_next_subbuf(stream->ustream);
ca22feea
DG
1284 if (ret == 0) {
1285 /* There is still data so let's put back this subbuffer. */
ffe60014 1286 ret = ustctl_put_subbuf(stream->ustream);
ca22feea 1287 assert(ret == 0);
6d805429 1288 ret = 1; /* Data is pending */
4e9a4686 1289 goto end;
ca22feea
DG
1290 }
1291
6d805429
DG
1292 /* Data is NOT pending so ready to be read. */
1293 ret = 0;
ca22feea 1294
6efae65e
DG
1295end:
1296 return ret;
ca22feea 1297}
d88aee68
DG
1298
1299/*
1300 * Close every metadata stream wait fd of the metadata hash table. This
1301 * function MUST be used very carefully so not to run into a race between the
1302 * metadata thread handling streams and this function closing their wait fd.
1303 *
1304 * For UST, this is used when the session daemon hangs up. Its the metadata
1305 * producer so calling this is safe because we are assured that no state change
1306 * can occur in the metadata thread for the streams in the hash table.
1307 */
1308void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
1309{
1310 int ret;
1311 struct lttng_ht_iter iter;
1312 struct lttng_consumer_stream *stream;
1313
1314 assert(metadata_ht);
1315 assert(metadata_ht->ht);
1316
1317 DBG("UST consumer closing all metadata streams");
1318
1319 rcu_read_lock();
1320 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
1321 node.node) {
1322 int fd = stream->wait_fd;
1323
1324 /*
1325 * Whatever happens here we have to continue to try to close every
1326 * streams. Let's report at least the error on failure.
1327 */
1328 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
1329 if (ret) {
1330 ERR("Unable to close metadata stream fd %d ret %d", fd, ret);
1331 }
1332 DBG("Metadata wait fd %d closed", fd);
1333 }
1334 rcu_read_unlock();
1335}
d8ef542d
MD
1336
1337void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
1338{
1339 int ret;
1340
1341 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
1342 if (ret < 0) {
1343 ERR("Unable to close wakeup fd");
1344 }
1345}
This page took 0.155431 seconds and 4 git commands to generate.