Fix: remove enable consumer from test
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
CommitLineData
3bd1e081
MD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
d14d33bf
AM
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
3bd1e081
MD
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
d14d33bf
AM
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
3bd1e081
MD
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
f02e1e8a 21#include <lttng/ust-ctl.h>
3bd1e081
MD
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
dbb5dfe6 28#include <sys/stat.h>
3bd1e081 29#include <sys/types.h>
77c7c900 30#include <inttypes.h>
3bd1e081 31#include <unistd.h>
ffe60014 32#include <urcu/list.h>
0857097f 33
990570ed 34#include <common/common.h>
10a8a223 35#include <common/sessiond-comm/sessiond-comm.h>
00e2e675 36#include <common/relayd/relayd.h>
dbb5dfe6 37#include <common/compat/fcntl.h>
10a8a223
DG
38
39#include "ust-consumer.h"
3bd1e081
MD
40
41extern struct lttng_consumer_global_data consumer_data;
42extern int consumer_poll_timeout;
43extern volatile int consumer_quit;
44
45/*
ffe60014
DG
46 * Free channel object and all streams associated with it. This MUST be used
47 * only and only if the channel has _NEVER_ been added to the global channel
48 * hash table.
3bd1e081 49 */
ffe60014 50static void destroy_channel(struct lttng_consumer_channel *channel)
3bd1e081 51{
ffe60014
DG
52 struct lttng_consumer_stream *stream, *stmp;
53
54 assert(channel);
55
56 DBG("UST consumer cleaning stream list");
57
58 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
59 send_node) {
60 cds_list_del(&stream->send_node);
61 ustctl_destroy_stream(stream->ustream);
62 free(stream);
63 }
64
65 /*
66 * If a channel is available meaning that was created before the streams
67 * were, delete it.
68 */
69 if (channel->uchan) {
70 lttng_ustconsumer_del_channel(channel);
71 }
72 free(channel);
73}
3bd1e081
MD
74
75/*
ffe60014 76 * Add channel to internal consumer state.
3bd1e081 77 *
ffe60014 78 * Returns 0 on success or else a negative value.
3bd1e081 79 */
ffe60014
DG
80static int add_channel(struct lttng_consumer_channel *channel,
81 struct lttng_consumer_local_data *ctx)
3bd1e081
MD
82{
83 int ret = 0;
84
ffe60014
DG
85 assert(channel);
86 assert(ctx);
87
88 if (ctx->on_recv_channel != NULL) {
89 ret = ctx->on_recv_channel(channel);
90 if (ret == 0) {
91 ret = consumer_add_channel(channel);
92 } else if (ret < 0) {
93 /* Most likely an ENOMEM. */
94 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
95 goto error;
96 }
97 } else {
98 ret = consumer_add_channel(channel);
3bd1e081
MD
99 }
100
d88aee68 101 DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
ffe60014
DG
102
103error:
3bd1e081
MD
104 return ret;
105}
106
107/*
ffe60014
DG
108 * Allocate and return a consumer channel object.
109 */
110static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
111 const char *pathname, const char *name, uid_t uid, gid_t gid,
d88aee68 112 int relayd_id, uint64_t key, enum lttng_event_output output)
ffe60014
DG
113{
114 assert(pathname);
115 assert(name);
116
117 return consumer_allocate_channel(key, session_id, pathname, name, uid, gid,
118 relayd_id, output);
119}
120
121/*
122 * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
123 * error value if applicable is set in it else it is kept untouched.
3bd1e081 124 *
ffe60014 125 * Return NULL on error else the newly allocated stream object.
3bd1e081 126 */
ffe60014
DG
127static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
128 struct lttng_consumer_channel *channel,
129 struct lttng_consumer_local_data *ctx, int *_alloc_ret)
130{
131 int alloc_ret;
132 struct lttng_consumer_stream *stream = NULL;
133
134 assert(channel);
135 assert(ctx);
136
137 stream = consumer_allocate_stream(channel->key,
138 key,
139 LTTNG_CONSUMER_ACTIVE_STREAM,
140 channel->name,
141 channel->uid,
142 channel->gid,
143 channel->relayd_id,
144 channel->session_id,
145 cpu,
146 &alloc_ret,
147 channel->type);
148 if (stream == NULL) {
149 switch (alloc_ret) {
150 case -ENOENT:
151 /*
152 * We could not find the channel. Can happen if cpu hotplug
153 * happens while tearing down.
154 */
155 DBG3("Could not find channel");
156 break;
157 case -ENOMEM:
158 case -EINVAL:
159 default:
160 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
161 break;
162 }
163 goto error;
164 }
165
166 stream->chan = channel;
167
168error:
169 if (_alloc_ret) {
170 *_alloc_ret = alloc_ret;
171 }
172 return stream;
173}
174
175/*
176 * Send the given stream pointer to the corresponding thread.
177 *
178 * Returns 0 on success else a negative value.
179 */
180static int send_stream_to_thread(struct lttng_consumer_stream *stream,
181 struct lttng_consumer_local_data *ctx)
182{
183 int ret, stream_pipe;
184
185 /* Get the right pipe where the stream will be sent. */
186 if (stream->metadata_flag) {
187 stream_pipe = ctx->consumer_metadata_pipe[1];
188 } else {
189 stream_pipe = ctx->consumer_data_pipe[1];
190 }
191
192 do {
193 ret = write(stream_pipe, &stream, sizeof(stream));
194 } while (ret < 0 && errno == EINTR);
195 if (ret < 0) {
196 PERROR("Consumer write %s stream to pipe %d",
197 stream->metadata_flag ? "metadata" : "data", stream_pipe);
198 }
199
200 return ret;
201}
202
203/*
204 * Search for a relayd object related to the stream. If found, send the stream
205 * to the relayd.
206 *
207 * On success, returns 0 else a negative value.
208 */
209static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
210{
211 int ret = 0;
212 struct consumer_relayd_sock_pair *relayd;
213
214 assert(stream);
215
216 relayd = consumer_find_relayd(stream->net_seq_idx);
217 if (relayd != NULL) {
218 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
219 /* Add stream on the relayd */
220 ret = relayd_add_stream(&relayd->control_sock, stream->name,
221 stream->chan->pathname, &stream->relayd_stream_id);
222 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
223 if (ret < 0) {
224 goto error;
225 }
d88aee68
DG
226 } else if (stream->net_seq_idx != (uint64_t) -1ULL) {
227 ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.",
ffe60014
DG
228 stream->net_seq_idx);
229 ret = -1;
230 goto error;
231 }
232
233error:
234 return ret;
235}
236
d88aee68
DG
237/*
238 * Create streams for the given channel using liblttng-ust-ctl.
239 *
240 * Return 0 on success else a negative value.
241 */
ffe60014
DG
242static int create_ust_streams(struct lttng_consumer_channel *channel,
243 struct lttng_consumer_local_data *ctx)
244{
245 int ret, cpu = 0;
246 struct ustctl_consumer_stream *ustream;
247 struct lttng_consumer_stream *stream;
248
249 assert(channel);
250 assert(ctx);
251
252 /*
253 * While a stream is available from ustctl. When NULL is returned, we've
254 * reached the end of the possible stream for the channel.
255 */
256 while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
257 int wait_fd;
258
259 wait_fd = ustctl_get_wait_fd(ustream);
260
261 /* Allocate consumer stream object. */
262 stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
263 if (!stream) {
264 goto error_alloc;
265 }
266 stream->ustream = ustream;
267 /*
268 * Store it so we can save multiple function calls afterwards since
269 * this value is used heavily in the stream threads. This is UST
270 * specific so this is why it's done after allocation.
271 */
272 stream->wait_fd = wait_fd;
273
274 /*
275 * Order is important this is why a list is used. On error, the caller
276 * should clean this list.
277 */
278 cds_list_add_tail(&stream->send_node, &channel->streams.head);
279
280 ret = ustctl_get_max_subbuf_size(stream->ustream,
281 &stream->max_sb_size);
282 if (ret < 0) {
283 ERR("ustctl_get_max_subbuf_size failed for stream %s",
284 stream->name);
285 goto error;
286 }
287
288 /* Do actions once stream has been received. */
289 if (ctx->on_recv_stream) {
290 ret = ctx->on_recv_stream(stream);
291 if (ret < 0) {
292 goto error;
293 }
294 }
295
d88aee68 296 DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
ffe60014
DG
297 stream->name, stream->key, stream->relayd_stream_id);
298
299 /* Set next CPU stream. */
300 channel->streams.count = ++cpu;
d88aee68
DG
301
302 /* Keep stream reference when creating metadata. */
303 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
304 channel->metadata_stream = stream;
305 }
ffe60014
DG
306 }
307
308 return 0;
309
310error:
311error_alloc:
312 return ret;
313}
314
315/*
316 * Create an UST channel with the given attributes and send it to the session
317 * daemon using the ust ctl API.
318 *
319 * Return 0 on success or else a negative value.
320 */
321static int create_ust_channel(struct ustctl_consumer_channel_attr *attr,
322 struct ustctl_consumer_channel **chanp)
323{
324 int ret;
325 struct ustctl_consumer_channel *channel;
326
327 assert(attr);
328 assert(chanp);
329
330 DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
331 "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
332 "switch_timer_interval: %u, read_timer_interval: %u, "
333 "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
334 attr->num_subbuf, attr->switch_timer_interval,
335 attr->read_timer_interval, attr->output, attr->type);
336
337 channel = ustctl_create_channel(attr);
338 if (!channel) {
339 ret = -1;
340 goto error_create;
341 }
342
343 *chanp = channel;
344
345 return 0;
346
347error_create:
348 return ret;
349}
350
d88aee68
DG
351/*
352 * Send a single given stream to the session daemon using the sock.
353 *
354 * Return 0 on success else a negative value.
355 */
ffe60014
DG
356static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
357{
358 int ret;
359
360 assert(stream);
361 assert(sock >= 0);
362
d88aee68 363 DBG2("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
ffe60014
DG
364
365 /* Send stream to session daemon. */
366 ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
367 if (ret < 0) {
368 goto error;
369 }
370
371 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
372 if (ret < 0) {
373 goto error;
374 }
375
376error:
377 return ret;
378}
379
380/*
381 * Send channel to sessiond.
382 *
d88aee68 383 * Return 0 on success or else a negative value.
ffe60014
DG
384 */
385static int send_sessiond_channel(int sock,
386 struct lttng_consumer_channel *channel,
387 struct lttng_consumer_local_data *ctx, int *relayd_error)
388{
389 int ret;
390 struct lttng_consumer_stream *stream;
391
392 assert(channel);
393 assert(ctx);
394 assert(sock >= 0);
395
396 DBG("UST consumer sending channel %s to sessiond", channel->name);
397
398 /* Send channel to sessiond. */
399 ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
400 if (ret < 0) {
401 goto error;
402 }
403
404 /* The channel was sent successfully to the sessiond at this point. */
405 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
406 /* Try to send the stream to the relayd if one is available. */
407 ret = send_stream_to_relayd(stream);
408 if (ret < 0) {
409 /*
410 * Flag that the relayd was the problem here probably due to a
411 * communicaton error on the socket.
412 */
413 if (relayd_error) {
414 *relayd_error = 1;
415 }
416 goto error;
417 }
418
419 /* Send stream to session daemon. */
420 ret = send_sessiond_stream(sock, stream);
421 if (ret < 0) {
422 goto error;
423 }
424 }
425
426 /* Tell sessiond there is no more stream. */
427 ret = ustctl_send_stream_to_sessiond(sock, NULL);
428 if (ret < 0) {
429 goto error;
430 }
431
432 DBG("UST consumer NULL stream sent to sessiond");
433
434 return 0;
435
436error:
437 return ret;
438}
439
440/*
441 * Creates a channel and streams and add the channel it to the channel internal
442 * state. The created stream must ONLY be sent once the GET_CHANNEL command is
443 * received.
444 *
445 * Return 0 on success or else, a negative value is returned and the channel
446 * MUST be destroyed by consumer_del_channel().
447 */
448static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
449 struct lttng_consumer_channel *channel,
450 struct ustctl_consumer_channel_attr *attr)
3bd1e081
MD
451{
452 int ret;
453
ffe60014
DG
454 assert(ctx);
455 assert(channel);
456 assert(attr);
457
458 /*
459 * This value is still used by the kernel consumer since for the kernel,
460 * the stream ownership is not IN the consumer so we need to have the
461 * number of left stream that needs to be initialized so we can know when
462 * to delete the channel (see consumer.c).
463 *
464 * As for the user space tracer now, the consumer creates and sends the
465 * stream to the session daemon which only sends them to the application
466 * once every stream of a channel is received making this value useless
467 * because we they will be added to the poll thread before the application
468 * receives them. This ensures that a stream can not hang up during
469 * initilization of a channel.
470 */
471 channel->nb_init_stream_left = 0;
472
473 /* The reply msg status is handled in the following call. */
474 ret = create_ust_channel(attr, &channel->uchan);
475 if (ret < 0) {
476 goto error;
3bd1e081
MD
477 }
478
ffe60014
DG
479 /* Open all streams for this channel. */
480 ret = create_ust_streams(channel, ctx);
481 if (ret < 0) {
482 goto error;
483 }
484
485error:
3bd1e081
MD
486 return ret;
487}
488
d88aee68
DG
489/*
490 * Send all stream of a channel to the right thread handling it.
491 *
492 * On error, return a negative value else 0 on success.
493 */
494static int send_streams_to_thread(struct lttng_consumer_channel *channel,
495 struct lttng_consumer_local_data *ctx)
496{
497 int ret = 0;
498 struct lttng_consumer_stream *stream, *stmp;
499
500 assert(channel);
501 assert(ctx);
502
503 /* Send streams to the corresponding thread. */
504 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
505 send_node) {
506 /* Sending the stream to the thread. */
507 ret = send_stream_to_thread(stream, ctx);
508 if (ret < 0) {
509 /*
510 * If we are unable to send the stream to the thread, there is
511 * a big problem so just stop everything.
512 */
513 goto error;
514 }
515
516 /* Remove node from the channel stream list. */
517 cds_list_del(&stream->send_node);
518 }
519
520error:
521 return ret;
522}
523
524/*
525 * Write metadata to the given channel using ustctl to convert the string to
526 * the ringbuffer.
527 *
528 * Return 0 on success else a negative value.
529 */
530static int push_metadata(struct lttng_consumer_channel *metadata,
531 const char *metadata_str, uint64_t target_offset, uint64_t len)
532{
533 int ret;
534
535 assert(metadata);
536 assert(metadata_str);
537
538 DBG("UST consumer writing metadata to channel %s", metadata->name);
539
540 assert(target_offset == metadata->contig_metadata_written);
541 ret = ustctl_write_metadata_to_channel(metadata->uchan, metadata_str, len);
542 if (ret < 0) {
543 ERR("ustctl write metadata fail with ret %d, len %ld", ret, len);
544 goto error;
545 }
546 metadata->contig_metadata_written += len;
547
548 ustctl_flush_buffer(metadata->metadata_stream->ustream, 1);
549
550error:
551 return ret;
552}
553
554/*
555 * Close metadata stream wakeup_fd using the given key to retrieve the channel.
556 *
557 * Return 0 on success else an LTTng error code.
558 */
559static int close_metadata(uint64_t chan_key)
560{
561 int ret;
562 struct lttng_consumer_channel *channel;
563
564 DBG("UST consumer close metadata key %lu", chan_key);
565
566 channel = consumer_find_channel(chan_key);
567 if (!channel) {
568 ERR("UST consumer close metadata %lu not found", chan_key);
569 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
570 goto error;
571 }
572
573 ret = ustctl_stream_close_wakeup_fd(channel->metadata_stream->ustream);
574 if (ret < 0) {
575 ERR("UST consumer unable to close fd of metadata (ret: %d)", ret);
576 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
577 goto error;
578 }
579
580error:
581 return ret;
582}
583
584/*
585 * RCU read side lock MUST be acquired before calling this function.
586 *
587 * Return 0 on success else an LTTng error code.
588 */
589static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
590{
591 int ret;
592 struct lttng_consumer_channel *metadata;
593
594 DBG("UST consumer setup metadata key %lu", key);
595
596 metadata = consumer_find_channel(key);
597 if (!metadata) {
598 ERR("UST consumer push metadata %" PRIu64 " not found", key);
599 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
600 goto error;
601 }
602
603 /*
604 * Send metadata stream to relayd if one available. Availability is
605 * known if the stream is still in the list of the channel.
606 */
607 if (cds_list_empty(&metadata->streams.head)) {
608 ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
609 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
610 goto error;
611 }
612
613 /* Send metadata stream to relayd if needed. */
614 ret = send_stream_to_relayd(metadata->metadata_stream);
615 if (ret < 0) {
616 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
617 goto error;
618 }
619
620 ret = send_streams_to_thread(metadata, ctx);
621 if (ret < 0) {
622 /*
623 * If we are unable to send the stream to the thread, there is
624 * a big problem so just stop everything.
625 */
626 ret = LTTCOMM_CONSUMERD_FATAL;
627 goto error;
628 }
629 /* List MUST be empty after or else it could be reused. */
630 assert(cds_list_empty(&metadata->streams.head));
631
632 ret = 0;
633
634error:
635 return ret;
636}
637
4cbc1a04
DG
638/*
639 * Receive command from session daemon and process it.
640 *
641 * Return 1 on success else a negative value or 0.
642 */
3bd1e081
MD
643int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
644 int sock, struct pollfd *consumer_sockpoll)
645{
646 ssize_t ret;
f50f23d9 647 enum lttng_error_code ret_code = LTTNG_OK;
3bd1e081 648 struct lttcomm_consumer_msg msg;
ffe60014 649 struct lttng_consumer_channel *channel = NULL;
3bd1e081
MD
650
651 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
652 if (ret != sizeof(msg)) {
173af62f
DG
653 DBG("Consumer received unexpected message size %zd (expects %zu)",
654 ret, sizeof(msg));
ffe60014 655 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
3be74084
DG
656 /*
657 * The ret value might 0 meaning an orderly shutdown but this is ok
658 * since the caller handles this.
659 */
3bd1e081
MD
660 return ret;
661 }
662 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
f50f23d9
DG
663 /*
664 * Notify the session daemon that the command is completed.
665 *
666 * On transport layer error, the function call will print an error
667 * message so handling the returned code is a bit useless since we
668 * return an error code anyway.
669 */
670 (void) consumer_send_status_msg(sock, ret_code);
3bd1e081
MD
671 return -ENOENT;
672 }
673
3f8e211f 674 /* relayd needs RCU read-side lock */
b0b335c8
MD
675 rcu_read_lock();
676
3bd1e081 677 switch (msg.cmd_type) {
00e2e675
DG
678 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
679 {
f50f23d9 680 /* Session daemon status message are handled in the following call. */
7735ef9e
DG
681 ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
682 msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
46e6455f 683 &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
00e2e675
DG
684 goto end_nosignal;
685 }
173af62f
DG
686 case LTTNG_CONSUMER_DESTROY_RELAYD:
687 {
a6ba4fe1 688 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
173af62f
DG
689 struct consumer_relayd_sock_pair *relayd;
690
a6ba4fe1 691 DBG("UST consumer destroying relayd %" PRIu64, index);
173af62f
DG
692
693 /* Get relayd reference if exists. */
a6ba4fe1 694 relayd = consumer_find_relayd(index);
173af62f 695 if (relayd == NULL) {
3448e266 696 DBG("Unable to find relayd %" PRIu64, index);
f50f23d9 697 ret_code = LTTNG_ERR_NO_CONSUMER;
173af62f
DG
698 }
699
a6ba4fe1
DG
700 /*
701 * Each relayd socket pair has a refcount of stream attached to it
702 * which tells if the relayd is still active or not depending on the
703 * refcount value.
704 *
705 * This will set the destroy flag of the relayd object and destroy it
706 * if the refcount reaches zero when called.
707 *
708 * The destroy can happen either here or when a stream fd hangs up.
709 */
f50f23d9
DG
710 if (relayd) {
711 consumer_flag_relayd_for_destroy(relayd);
712 }
713
d88aee68 714 goto end_msg_sessiond;
173af62f 715 }
3bd1e081
MD
716 case LTTNG_CONSUMER_UPDATE_STREAM:
717 {
3f8e211f 718 rcu_read_unlock();
7ad0a0cb 719 return -ENOSYS;
3bd1e081 720 }
6d805429 721 case LTTNG_CONSUMER_DATA_PENDING:
53632229 722 {
3be74084 723 int ret, is_data_pending;
6d805429 724 uint64_t id = msg.u.data_pending.session_id;
ca22feea 725
6d805429 726 DBG("UST consumer data pending command for id %" PRIu64, id);
ca22feea 727
3be74084 728 is_data_pending = consumer_data_pending(id);
ca22feea
DG
729
730 /* Send back returned value to session daemon */
3be74084
DG
731 ret = lttcomm_send_unix_sock(sock, &is_data_pending,
732 sizeof(is_data_pending));
ca22feea 733 if (ret < 0) {
3be74084 734 DBG("Error when sending the data pending ret code: %d", ret);
ca22feea 735 }
f50f23d9
DG
736
737 /*
738 * No need to send back a status message since the data pending
739 * returned value is the response.
740 */
ca22feea 741 break;
53632229 742 }
ffe60014
DG
743 case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
744 {
745 int ret;
746 struct ustctl_consumer_channel_attr attr;
747
748 /* Create a plain object and reserve a channel key. */
749 channel = allocate_channel(msg.u.ask_channel.session_id,
750 msg.u.ask_channel.pathname, msg.u.ask_channel.name,
751 msg.u.ask_channel.uid, msg.u.ask_channel.gid,
752 msg.u.ask_channel.relayd_id, msg.u.ask_channel.key,
753 (enum lttng_event_output) msg.u.ask_channel.output);
754 if (!channel) {
755 goto end_channel_error;
756 }
757
758 /* Build channel attributes from received message. */
759 attr.subbuf_size = msg.u.ask_channel.subbuf_size;
760 attr.num_subbuf = msg.u.ask_channel.num_subbuf;
761 attr.overwrite = msg.u.ask_channel.overwrite;
762 attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
763 attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
764 memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
765
766 /* Translate the event output type to UST. */
767 switch (channel->output) {
768 case LTTNG_EVENT_SPLICE:
769 /* Splice not supported so fallback on mmap(). */
770 case LTTNG_EVENT_MMAP:
771 default:
772 attr.output = CONSUMER_CHANNEL_MMAP;
773 break;
774 };
775
776 /* Translate and save channel type. */
777 switch (msg.u.ask_channel.type) {
778 case LTTNG_UST_CHAN_PER_CPU:
779 channel->type = CONSUMER_CHANNEL_TYPE_DATA;
780 attr.type = LTTNG_UST_CHAN_PER_CPU;
781 break;
782 case LTTNG_UST_CHAN_METADATA:
783 channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
784 attr.type = LTTNG_UST_CHAN_METADATA;
785 break;
786 default:
787 assert(0);
788 goto error_fatal;
789 };
790
791 ret = ask_channel(ctx, sock, channel, &attr);
792 if (ret < 0) {
793 goto end_channel_error;
794 }
795
796 /*
797 * Add the channel to the internal state AFTER all streams were created
798 * and successfully sent to session daemon. This way, all streams must
799 * be ready before this channel is visible to the threads.
800 */
801 ret = add_channel(channel, ctx);
802 if (ret < 0) {
803 goto end_channel_error;
804 }
805
806 /*
807 * Channel and streams are now created. Inform the session daemon that
808 * everything went well and should wait to receive the channel and
809 * streams with ustctl API.
810 */
811 ret = consumer_send_status_channel(sock, channel);
812 if (ret < 0) {
813 /*
814 * There is probably a problem on the socket so the poll will get
815 * it and clean everything up.
816 */
817 goto end_nosignal;
818 }
819
820 break;
821 }
822 case LTTNG_CONSUMER_GET_CHANNEL:
823 {
824 int ret, relayd_err = 0;
d88aee68 825 uint64_t key = msg.u.get_channel.key;
ffe60014 826 struct lttng_consumer_channel *channel;
ffe60014
DG
827
828 channel = consumer_find_channel(key);
829 if (!channel) {
830 ERR("UST consumer get channel key %lu not found", key);
831 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
832 goto end_msg_sessiond;
833 }
834
835 /* Inform sessiond that we are about to send channel and streams. */
836 ret = consumer_send_status_msg(sock, LTTNG_OK);
837 if (ret < 0) {
838 /* Somehow, the session daemon is not responding anymore. */
839 goto end_nosignal;
840 }
841
842 /* Send everything to sessiond. */
843 ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
844 if (ret < 0) {
845 if (relayd_err) {
846 /*
847 * We were unable to send to the relayd the stream so avoid
848 * sending back a fatal error to the thread since this is OK
849 * and the consumer can continue its work.
850 */
851 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
852 goto end_msg_sessiond;
853 }
854 /*
855 * The communicaton was broken hence there is a bad state between
856 * the consumer and sessiond so stop everything.
857 */
858 goto error_fatal;
859 }
860
d88aee68
DG
861 ret = send_streams_to_thread(channel, ctx);
862 if (ret < 0) {
863 /*
864 * If we are unable to send the stream to the thread, there is
865 * a big problem so just stop everything.
866 */
867 goto error_fatal;
ffe60014 868 }
ffe60014
DG
869 /* List MUST be empty after or else it could be reused. */
870 assert(cds_list_empty(&channel->streams.head));
871
d88aee68
DG
872 goto end_msg_sessiond;
873 }
874 case LTTNG_CONSUMER_DESTROY_CHANNEL:
875 {
876 uint64_t key = msg.u.destroy_channel.key;
877 struct lttng_consumer_channel *channel;
878
879 channel = consumer_find_channel(key);
880 if (!channel) {
881 ERR("UST consumer get channel key %lu not found", key);
882 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
883 goto end_msg_sessiond;
ffe60014
DG
884 }
885
d88aee68
DG
886 destroy_channel(channel);
887
888 goto end_msg_sessiond;
ffe60014 889 }
d88aee68
DG
890 case LTTNG_CONSUMER_CLOSE_METADATA:
891 {
892 int ret;
893
894 ret = close_metadata(msg.u.close_metadata.key);
895 if (ret != 0) {
896 ret_code = ret;
897 }
898
899 goto end_msg_sessiond;
900 }
901 case LTTNG_CONSUMER_PUSH_METADATA:
ffe60014
DG
902 {
903 int ret;
d88aee68
DG
904 uint64_t len = msg.u.push_metadata.len;
905 uint64_t target_offset = msg.u.push_metadata.target_offset;
906 uint64_t key = msg.u.push_metadata.key;
ffe60014 907 struct lttng_consumer_channel *channel;
d88aee68 908 char *metadata_str;
ffe60014 909
d88aee68 910 DBG("UST consumer push metadata key %lu of len %lu", key, len);
ffe60014
DG
911
912 channel = consumer_find_channel(key);
913 if (!channel) {
d88aee68 914 ERR("UST consumer push metadata %lu not found", key);
ffe60014 915 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
ffe60014
DG
916 }
917
d88aee68
DG
918 metadata_str = zmalloc(len * sizeof(char));
919 if (!metadata_str) {
920 PERROR("zmalloc metadata string");
921 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
922 goto end_msg_sessiond;
923 }
924
925 /* Tell session daemon we are ready to receive the metadata. */
ffe60014
DG
926 ret = consumer_send_status_msg(sock, LTTNG_OK);
927 if (ret < 0) {
928 /* Somehow, the session daemon is not responding anymore. */
d88aee68
DG
929 goto error_fatal;
930 }
931
932 /* Wait for more data. */
933 if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
934 goto end_nosignal;
935 }
936
937 /* Receive metadata string. */
938 ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
939 if (ret < 0) {
940 /* Session daemon is dead so return gracefully. */
ffe60014
DG
941 goto end_nosignal;
942 }
d88aee68
DG
943
944 ret = push_metadata(channel, metadata_str, target_offset, len);
945 free(metadata_str);
946 if (ret < 0) {
947 /* Unable to handle metadata. Notify session daemon. */
948 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
949 goto end_msg_sessiond;
950 }
951
952 goto end_msg_sessiond;
953 }
954 case LTTNG_CONSUMER_SETUP_METADATA:
955 {
956 int ret;
957
958 ret = setup_metadata(ctx, msg.u.setup_metadata.key);
959 if (ret) {
960 ret_code = ret;
961 }
962 goto end_msg_sessiond;
ffe60014 963 }
3bd1e081
MD
964 default:
965 break;
966 }
3f8e211f 967
3bd1e081 968end_nosignal:
b0b335c8 969 rcu_read_unlock();
4cbc1a04
DG
970
971 /*
972 * Return 1 to indicate success since the 0 value can be a socket
973 * shutdown during the recv() or send() call.
974 */
975 return 1;
ffe60014
DG
976
977end_msg_sessiond:
978 /*
979 * The returned value here is not useful since either way we'll return 1 to
980 * the caller because the session daemon socket management is done
981 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
982 */
983 (void) consumer_send_status_msg(sock, ret_code);
984 rcu_read_unlock();
985 return 1;
986end_channel_error:
987 if (channel) {
988 /*
989 * Free channel here since no one has a reference to it. We don't
990 * free after that because a stream can store this pointer.
991 */
992 destroy_channel(channel);
993 }
994 /* We have to send a status channel message indicating an error. */
995 ret = consumer_send_status_channel(sock, NULL);
996 if (ret < 0) {
997 /* Stop everything if session daemon can not be notified. */
998 goto error_fatal;
999 }
1000 rcu_read_unlock();
1001 return 1;
1002error_fatal:
1003 rcu_read_unlock();
1004 /* This will issue a consumer stop. */
1005 return -1;
3bd1e081
MD
1006}
1007
ffe60014
DG
1008/*
1009 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
1010 * compiled out, we isolate it in this library.
1011 */
1012int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
1013 unsigned long *off)
3bd1e081 1014{
ffe60014
DG
1015 assert(stream);
1016 assert(stream->ustream);
b5c5fc29 1017
ffe60014 1018 return ustctl_get_mmap_read_offset(stream->ustream, off);
3bd1e081
MD
1019}
1020
ffe60014
DG
1021/*
1022 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
1023 * compiled out, we isolate it in this library.
1024 */
1025void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream)
d056b477 1026{
ffe60014
DG
1027 assert(stream);
1028 assert(stream->ustream);
1029
1030 return ustctl_get_mmap_base(stream->ustream);
d056b477
MD
1031}
1032
ffe60014
DG
1033/*
1034 * Take a snapshot for a specific fd
1035 *
1036 * Returns 0 on success, < 0 on error
1037 */
1038int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
3bd1e081 1039{
ffe60014
DG
1040 assert(stream);
1041 assert(stream->ustream);
1042
1043 return ustctl_snapshot(stream->ustream);
3bd1e081
MD
1044}
1045
ffe60014
DG
1046/*
1047 * Get the produced position
1048 *
1049 * Returns 0 on success, < 0 on error
1050 */
1051int lttng_ustconsumer_get_produced_snapshot(
1052 struct lttng_consumer_stream *stream, unsigned long *pos)
3bd1e081 1053{
ffe60014
DG
1054 assert(stream);
1055 assert(stream->ustream);
1056 assert(pos);
7a57cf92 1057
ffe60014
DG
1058 return ustctl_snapshot_get_produced(stream->ustream, pos);
1059}
7a57cf92 1060
ffe60014
DG
1061/*
1062 * Called when the stream signal the consumer that it has hang up.
1063 */
1064void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
1065{
1066 assert(stream);
1067 assert(stream->ustream);
2c1dd183 1068
ffe60014
DG
1069 ustctl_flush_buffer(stream->ustream, 0);
1070 stream->hangup_flush_done = 1;
1071}
ee77a7b0 1072
ffe60014
DG
1073void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
1074{
1075 assert(chan);
1076 assert(chan->uchan);
e316aad5 1077
ffe60014 1078 ustctl_destroy_channel(chan->uchan);
3bd1e081
MD
1079}
1080
1081void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
1082{
ffe60014
DG
1083 assert(stream);
1084 assert(stream->ustream);
d41f73b7 1085
ffe60014
DG
1086 ustctl_destroy_stream(stream->ustream);
1087}
d41f73b7
MD
1088
1089int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
1090 struct lttng_consumer_local_data *ctx)
1091{
1d4dfdef 1092 unsigned long len, subbuf_size, padding;
d41f73b7
MD
1093 int err;
1094 long ret = 0;
d41f73b7 1095 char dummy;
ffe60014
DG
1096 struct ustctl_consumer_stream *ustream;
1097
1098 assert(stream);
1099 assert(stream->ustream);
1100 assert(ctx);
d41f73b7 1101
ffe60014
DG
1102 DBG2("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
1103 stream->name);
1104
1105 /* Ease our life for what's next. */
1106 ustream = stream->ustream;
d41f73b7
MD
1107
1108 /* We can consume the 1 byte written into the wait_fd by UST */
effcf122 1109 if (!stream->hangup_flush_done) {
c617c0c6
MD
1110 ssize_t readlen;
1111
effcf122
MD
1112 do {
1113 readlen = read(stream->wait_fd, &dummy, 1);
87dc6a9c 1114 } while (readlen == -1 && errno == EINTR);
effcf122
MD
1115 if (readlen == -1) {
1116 ret = readlen;
1117 goto end;
1118 }
d41f73b7
MD
1119 }
1120
d41f73b7 1121 /* Get the next subbuffer */
ffe60014 1122 err = ustctl_get_next_subbuf(ustream);
d41f73b7 1123 if (err != 0) {
1d4dfdef 1124 ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
d41f73b7
MD
1125 /*
1126 * This is a debug message even for single-threaded consumer,
1127 * because poll() have more relaxed criterions than get subbuf,
1128 * so get_subbuf may fail for short race windows where poll()
1129 * would issue wakeups.
1130 */
1131 DBG("Reserving sub buffer failed (everything is normal, "
ffe60014 1132 "it is due to concurrency) [ret: %d]", err);
d41f73b7
MD
1133 goto end;
1134 }
ffe60014 1135 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
1d4dfdef 1136 /* Get the full padded subbuffer size */
ffe60014 1137 err = ustctl_get_padded_subbuf_size(ustream, &len);
effcf122 1138 assert(err == 0);
1d4dfdef
DG
1139
1140 /* Get subbuffer data size (without padding) */
ffe60014 1141 err = ustctl_get_subbuf_size(ustream, &subbuf_size);
1d4dfdef
DG
1142 assert(err == 0);
1143
1144 /* Make sure we don't get a subbuffer size bigger than the padded */
1145 assert(len >= subbuf_size);
1146
1147 padding = len - subbuf_size;
d41f73b7 1148 /* write the subbuffer to the tracefile */
1d4dfdef 1149 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding);
91dfef6e
DG
1150 /*
1151 * The mmap operation should write subbuf_size amount of data when network
1152 * streaming or the full padding (len) size when we are _not_ streaming.
1153 */
d88aee68
DG
1154 if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
1155 (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
d41f73b7 1156 /*
91dfef6e 1157 * Display the error but continue processing to try to release the
c5c45efa
DG
1158 * subbuffer. This is a DBG statement since any unexpected kill or
1159 * signal, the application gets unregistered, relayd gets closed or
1160 * anything that affects the buffer lifetime will trigger this error.
1161 * So, for the sake of the user, don't print this error since it can
1162 * happen and it is OK with the code flow.
d41f73b7 1163 */
c5c45efa 1164 DBG("Error writing to tracefile "
91dfef6e
DG
1165 "(ret: %zd != len: %lu != subbuf_size: %lu)",
1166 ret, len, subbuf_size);
d41f73b7 1167 }
ffe60014 1168 err = ustctl_put_next_subbuf(ustream);
effcf122 1169 assert(err == 0);
d41f73b7
MD
1170end:
1171 return ret;
1172}
1173
ffe60014
DG
1174/*
1175 * Called when a stream is created.
1176 */
d41f73b7
MD
1177int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
1178{
1179 int ret;
ffe60014 1180 char full_path[PATH_MAX];
d41f73b7
MD
1181
1182 /* Opening the tracefile in write mode */
d88aee68 1183 if (stream->net_seq_idx != (uint64_t) -1ULL) {
ffe60014 1184 goto end;
d41f73b7
MD
1185 }
1186
ffe60014
DG
1187 ret = snprintf(full_path, sizeof(full_path), "%s/%s",
1188 stream->chan->pathname, stream->name);
1189 if (ret < 0) {
1190 PERROR("snprintf on_recv_stream");
1191 goto error;
1192 }
1193
1194 ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC,
1195 S_IRWXU | S_IRWXG | S_IRWXO, stream->uid, stream->gid);
1196 if (ret < 0) {
1197 PERROR("open stream path %s", full_path);
c869f647
DG
1198 goto error;
1199 }
ffe60014 1200 stream->out_fd = ret;
c869f647 1201
ffe60014 1202end:
d41f73b7
MD
1203 /* we return 0 to let the library handle the FD internally */
1204 return 0;
1205
1206error:
1207 return ret;
1208}
ca22feea
DG
1209
1210/*
1211 * Check if data is still being extracted from the buffers for a specific
4e9a4686
DG
1212 * stream. Consumer data lock MUST be acquired before calling this function
1213 * and the stream lock.
ca22feea 1214 *
6d805429 1215 * Return 1 if the traced data are still getting read else 0 meaning that the
ca22feea
DG
1216 * data is available for trace viewer reading.
1217 */
6d805429 1218int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
ca22feea
DG
1219{
1220 int ret;
1221
1222 assert(stream);
ffe60014 1223 assert(stream->ustream);
ca22feea 1224
6d805429 1225 DBG("UST consumer checking data pending");
c8f59ee5 1226
ffe60014 1227 ret = ustctl_get_next_subbuf(stream->ustream);
ca22feea
DG
1228 if (ret == 0) {
1229 /* There is still data so let's put back this subbuffer. */
ffe60014 1230 ret = ustctl_put_subbuf(stream->ustream);
ca22feea 1231 assert(ret == 0);
6d805429 1232 ret = 1; /* Data is pending */
4e9a4686 1233 goto end;
ca22feea
DG
1234 }
1235
6d805429
DG
1236 /* Data is NOT pending so ready to be read. */
1237 ret = 0;
ca22feea 1238
6efae65e
DG
1239end:
1240 return ret;
ca22feea 1241}
d88aee68
DG
1242
1243/*
1244 * Close every metadata stream wait fd of the metadata hash table. This
1245 * function MUST be used very carefully so not to run into a race between the
1246 * metadata thread handling streams and this function closing their wait fd.
1247 *
1248 * For UST, this is used when the session daemon hangs up. Its the metadata
1249 * producer so calling this is safe because we are assured that no state change
1250 * can occur in the metadata thread for the streams in the hash table.
1251 */
1252void lttng_ustconsumer_close_metadata(struct lttng_ht *metadata_ht)
1253{
1254 int ret;
1255 struct lttng_ht_iter iter;
1256 struct lttng_consumer_stream *stream;
1257
1258 assert(metadata_ht);
1259 assert(metadata_ht->ht);
1260
1261 DBG("UST consumer closing all metadata streams");
1262
1263 rcu_read_lock();
1264 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
1265 node.node) {
1266 int fd = stream->wait_fd;
1267
1268 /*
1269 * Whatever happens here we have to continue to try to close every
1270 * streams. Let's report at least the error on failure.
1271 */
1272 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
1273 if (ret) {
1274 ERR("Unable to close metadata stream fd %d ret %d", fd, ret);
1275 }
1276 DBG("Metadata wait fd %d closed", fd);
1277 }
1278 rcu_read_unlock();
1279}
This page took 0.090358 seconds and 4 git commands to generate.