Tests: Add runner script for tests
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <assert.h>
21 #include <lttng/ust-ctl.h>
22 #include <poll.h>
23 #include <pthread.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <sys/socket.h>
28 #include <sys/stat.h>
29 #include <sys/types.h>
30 #include <inttypes.h>
31 #include <unistd.h>
32 #include <urcu/list.h>
33
34 #include <common/common.h>
35 #include <common/sessiond-comm/sessiond-comm.h>
36 #include <common/relayd/relayd.h>
37 #include <common/compat/fcntl.h>
38
39 #include "ust-consumer.h"
40
41 extern struct lttng_consumer_global_data consumer_data;
42 extern int consumer_poll_timeout;
43 extern volatile int consumer_quit;
44
45 /*
46 * Free channel object and all streams associated with it. This MUST be used
47 * only and only if the channel has _NEVER_ been added to the global channel
48 * hash table.
49 */
50 static void destroy_channel(struct lttng_consumer_channel *channel)
51 {
52 struct lttng_consumer_stream *stream, *stmp;
53
54 assert(channel);
55
56 DBG("UST consumer cleaning stream list");
57
58 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
59 send_node) {
60 cds_list_del(&stream->send_node);
61 ustctl_destroy_stream(stream->ustream);
62 free(stream);
63 }
64
65 /*
66 * If a channel is available meaning that was created before the streams
67 * were, delete it.
68 */
69 if (channel->uchan) {
70 lttng_ustconsumer_del_channel(channel);
71 }
72 free(channel);
73 }
74
75 /*
76 * Add channel to internal consumer state.
77 *
78 * Returns 0 on success or else a negative value.
79 */
80 static int add_channel(struct lttng_consumer_channel *channel,
81 struct lttng_consumer_local_data *ctx)
82 {
83 int ret = 0;
84
85 assert(channel);
86 assert(ctx);
87
88 if (ctx->on_recv_channel != NULL) {
89 ret = ctx->on_recv_channel(channel);
90 if (ret == 0) {
91 ret = consumer_add_channel(channel);
92 } else if (ret < 0) {
93 /* Most likely an ENOMEM. */
94 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
95 goto error;
96 }
97 } else {
98 ret = consumer_add_channel(channel);
99 }
100
101 DBG("UST consumer channel added (key: %u)", channel->key);
102
103 error:
104 return ret;
105 }
106
107 /*
108 * Allocate and return a consumer channel object.
109 */
110 static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
111 const char *pathname, const char *name, uid_t uid, gid_t gid,
112 int relayd_id, unsigned long key, enum lttng_event_output output)
113 {
114 assert(pathname);
115 assert(name);
116
117 return consumer_allocate_channel(key, session_id, pathname, name, uid, gid,
118 relayd_id, output);
119 }
120
121 /*
122 * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
123 * error value if applicable is set in it else it is kept untouched.
124 *
125 * Return NULL on error else the newly allocated stream object.
126 */
127 static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
128 struct lttng_consumer_channel *channel,
129 struct lttng_consumer_local_data *ctx, int *_alloc_ret)
130 {
131 int alloc_ret;
132 struct lttng_consumer_stream *stream = NULL;
133
134 assert(channel);
135 assert(ctx);
136
137 stream = consumer_allocate_stream(channel->key,
138 key,
139 LTTNG_CONSUMER_ACTIVE_STREAM,
140 channel->name,
141 channel->uid,
142 channel->gid,
143 channel->relayd_id,
144 channel->session_id,
145 cpu,
146 &alloc_ret,
147 channel->type);
148 if (stream == NULL) {
149 switch (alloc_ret) {
150 case -ENOENT:
151 /*
152 * We could not find the channel. Can happen if cpu hotplug
153 * happens while tearing down.
154 */
155 DBG3("Could not find channel");
156 break;
157 case -ENOMEM:
158 case -EINVAL:
159 default:
160 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
161 break;
162 }
163 goto error;
164 }
165
166 stream->chan = channel;
167
168 error:
169 if (_alloc_ret) {
170 *_alloc_ret = alloc_ret;
171 }
172 return stream;
173 }
174
175 /*
176 * Send the given stream pointer to the corresponding thread.
177 *
178 * Returns 0 on success else a negative value.
179 */
180 static int send_stream_to_thread(struct lttng_consumer_stream *stream,
181 struct lttng_consumer_local_data *ctx)
182 {
183 int ret, stream_pipe;
184
185 /* Get the right pipe where the stream will be sent. */
186 if (stream->metadata_flag) {
187 stream_pipe = ctx->consumer_metadata_pipe[1];
188 } else {
189 stream_pipe = ctx->consumer_data_pipe[1];
190 }
191
192 do {
193 ret = write(stream_pipe, &stream, sizeof(stream));
194 } while (ret < 0 && errno == EINTR);
195 if (ret < 0) {
196 PERROR("Consumer write %s stream to pipe %d",
197 stream->metadata_flag ? "metadata" : "data", stream_pipe);
198 }
199
200 return ret;
201 }
202
203 /*
204 * Search for a relayd object related to the stream. If found, send the stream
205 * to the relayd.
206 *
207 * On success, returns 0 else a negative value.
208 */
209 static int send_stream_to_relayd(struct lttng_consumer_stream *stream)
210 {
211 int ret = 0;
212 struct consumer_relayd_sock_pair *relayd;
213
214 assert(stream);
215
216 relayd = consumer_find_relayd(stream->net_seq_idx);
217 if (relayd != NULL) {
218 pthread_mutex_lock(&relayd->ctrl_sock_mutex);
219 /* Add stream on the relayd */
220 ret = relayd_add_stream(&relayd->control_sock, stream->name,
221 stream->chan->pathname, &stream->relayd_stream_id);
222 pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
223 if (ret < 0) {
224 goto error;
225 }
226 } else if (stream->net_seq_idx != -1) {
227 ERR("Network sequence index %d unknown. Not adding stream.",
228 stream->net_seq_idx);
229 ret = -1;
230 goto error;
231 }
232
233 error:
234 return ret;
235 }
236
237 static int create_ust_streams(struct lttng_consumer_channel *channel,
238 struct lttng_consumer_local_data *ctx)
239 {
240 int ret, cpu = 0;
241 struct ustctl_consumer_stream *ustream;
242 struct lttng_consumer_stream *stream;
243
244 assert(channel);
245 assert(ctx);
246
247 /*
248 * While a stream is available from ustctl. When NULL is returned, we've
249 * reached the end of the possible stream for the channel.
250 */
251 while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
252 int wait_fd;
253
254 wait_fd = ustctl_get_wait_fd(ustream);
255
256 /* Allocate consumer stream object. */
257 stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
258 if (!stream) {
259 goto error_alloc;
260 }
261 stream->ustream = ustream;
262 /*
263 * Store it so we can save multiple function calls afterwards since
264 * this value is used heavily in the stream threads. This is UST
265 * specific so this is why it's done after allocation.
266 */
267 stream->wait_fd = wait_fd;
268
269 /*
270 * Order is important this is why a list is used. On error, the caller
271 * should clean this list.
272 */
273 cds_list_add_tail(&stream->send_node, &channel->streams.head);
274
275 ret = ustctl_get_max_subbuf_size(stream->ustream,
276 &stream->max_sb_size);
277 if (ret < 0) {
278 ERR("ustctl_get_max_subbuf_size failed for stream %s",
279 stream->name);
280 goto error;
281 }
282
283 /* Do actions once stream has been received. */
284 if (ctx->on_recv_stream) {
285 ret = ctx->on_recv_stream(stream);
286 if (ret < 0) {
287 goto error;
288 }
289 }
290
291 DBG("UST consumer add stream %s (key: %d) with relayd id %" PRIu64,
292 stream->name, stream->key, stream->relayd_stream_id);
293
294 /* Set next CPU stream. */
295 channel->streams.count = ++cpu;
296 }
297
298 return 0;
299
300 error:
301 error_alloc:
302 return ret;
303 }
304
305 /*
306 * Create an UST channel with the given attributes and send it to the session
307 * daemon using the ust ctl API.
308 *
309 * Return 0 on success or else a negative value.
310 */
311 static int create_ust_channel(struct ustctl_consumer_channel_attr *attr,
312 struct ustctl_consumer_channel **chanp)
313 {
314 int ret;
315 struct ustctl_consumer_channel *channel;
316
317 assert(attr);
318 assert(chanp);
319
320 DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
321 "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
322 "switch_timer_interval: %u, read_timer_interval: %u, "
323 "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
324 attr->num_subbuf, attr->switch_timer_interval,
325 attr->read_timer_interval, attr->output, attr->type);
326
327 channel = ustctl_create_channel(attr);
328 if (!channel) {
329 ret = -1;
330 goto error_create;
331 }
332
333 *chanp = channel;
334
335 return 0;
336
337 error_create:
338 return ret;
339 }
340
341 static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
342 {
343 int ret;
344
345 assert(stream);
346 assert(sock >= 0);
347
348 DBG2("UST consumer sending stream %d to sessiond", stream->key);
349
350 /* Send stream to session daemon. */
351 ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
352 if (ret < 0) {
353 goto error;
354 }
355
356 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
357 if (ret < 0) {
358 goto error;
359 }
360
361 error:
362 return ret;
363 }
364
365 /*
366 * Send channel to sessiond.
367 *
368 * Return 0 on success or else a negative value. On error, the channel is
369 * destroy using ustctl.
370 */
371 static int send_sessiond_channel(int sock,
372 struct lttng_consumer_channel *channel,
373 struct lttng_consumer_local_data *ctx, int *relayd_error)
374 {
375 int ret;
376 struct lttng_consumer_stream *stream;
377
378 assert(channel);
379 assert(ctx);
380 assert(sock >= 0);
381
382 DBG("UST consumer sending channel %s to sessiond", channel->name);
383
384 /* Send channel to sessiond. */
385 ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
386 if (ret < 0) {
387 goto error;
388 }
389
390 /* The channel was sent successfully to the sessiond at this point. */
391 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
392 /* Try to send the stream to the relayd if one is available. */
393 ret = send_stream_to_relayd(stream);
394 if (ret < 0) {
395 /*
396 * Flag that the relayd was the problem here probably due to a
397 * communicaton error on the socket.
398 */
399 if (relayd_error) {
400 *relayd_error = 1;
401 }
402 goto error;
403 }
404
405 /* Send stream to session daemon. */
406 ret = send_sessiond_stream(sock, stream);
407 if (ret < 0) {
408 goto error;
409 }
410 }
411
412 /* Tell sessiond there is no more stream. */
413 ret = ustctl_send_stream_to_sessiond(sock, NULL);
414 if (ret < 0) {
415 goto error;
416 }
417
418 DBG("UST consumer NULL stream sent to sessiond");
419
420 return 0;
421
422 error:
423 return ret;
424 }
425
426 /*
427 * Creates a channel and streams and add the channel it to the channel internal
428 * state. The created stream must ONLY be sent once the GET_CHANNEL command is
429 * received.
430 *
431 * Return 0 on success or else, a negative value is returned and the channel
432 * MUST be destroyed by consumer_del_channel().
433 */
434 static int ask_channel(struct lttng_consumer_local_data *ctx, int sock,
435 struct lttng_consumer_channel *channel,
436 struct ustctl_consumer_channel_attr *attr)
437 {
438 int ret;
439
440 assert(ctx);
441 assert(channel);
442 assert(attr);
443
444 /*
445 * This value is still used by the kernel consumer since for the kernel,
446 * the stream ownership is not IN the consumer so we need to have the
447 * number of left stream that needs to be initialized so we can know when
448 * to delete the channel (see consumer.c).
449 *
450 * As for the user space tracer now, the consumer creates and sends the
451 * stream to the session daemon which only sends them to the application
452 * once every stream of a channel is received making this value useless
453 * because we they will be added to the poll thread before the application
454 * receives them. This ensures that a stream can not hang up during
455 * initilization of a channel.
456 */
457 channel->nb_init_stream_left = 0;
458
459 /* The reply msg status is handled in the following call. */
460 ret = create_ust_channel(attr, &channel->uchan);
461 if (ret < 0) {
462 goto error;
463 }
464
465 /* Open all streams for this channel. */
466 ret = create_ust_streams(channel, ctx);
467 if (ret < 0) {
468 goto error;
469 }
470
471 error:
472 return ret;
473 }
474
475 /*
476 * Receive command from session daemon and process it.
477 *
478 * Return 1 on success else a negative value or 0.
479 */
480 int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
481 int sock, struct pollfd *consumer_sockpoll)
482 {
483 ssize_t ret;
484 enum lttng_error_code ret_code = LTTNG_OK;
485 struct lttcomm_consumer_msg msg;
486 struct lttng_consumer_channel *channel = NULL;
487
488 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
489 if (ret != sizeof(msg)) {
490 DBG("Consumer received unexpected message size %zd (expects %zu)",
491 ret, sizeof(msg));
492 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
493 /*
494 * The ret value might 0 meaning an orderly shutdown but this is ok
495 * since the caller handles this.
496 */
497 return ret;
498 }
499 if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
500 /*
501 * Notify the session daemon that the command is completed.
502 *
503 * On transport layer error, the function call will print an error
504 * message so handling the returned code is a bit useless since we
505 * return an error code anyway.
506 */
507 (void) consumer_send_status_msg(sock, ret_code);
508 return -ENOENT;
509 }
510
511 /* relayd needs RCU read-side lock */
512 rcu_read_lock();
513
514 switch (msg.cmd_type) {
515 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
516 {
517 /* Session daemon status message are handled in the following call. */
518 ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
519 msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
520 &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
521 goto end_nosignal;
522 }
523 case LTTNG_CONSUMER_DESTROY_RELAYD:
524 {
525 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
526 struct consumer_relayd_sock_pair *relayd;
527
528 DBG("UST consumer destroying relayd %" PRIu64, index);
529
530 /* Get relayd reference if exists. */
531 relayd = consumer_find_relayd(index);
532 if (relayd == NULL) {
533 DBG("Unable to find relayd %" PRIu64, index);
534 ret_code = LTTNG_ERR_NO_CONSUMER;
535 }
536
537 /*
538 * Each relayd socket pair has a refcount of stream attached to it
539 * which tells if the relayd is still active or not depending on the
540 * refcount value.
541 *
542 * This will set the destroy flag of the relayd object and destroy it
543 * if the refcount reaches zero when called.
544 *
545 * The destroy can happen either here or when a stream fd hangs up.
546 */
547 if (relayd) {
548 consumer_flag_relayd_for_destroy(relayd);
549 }
550
551 ret = consumer_send_status_msg(sock, ret_code);
552 if (ret < 0) {
553 /* Somehow, the session daemon is not responding anymore. */
554 goto end_nosignal;
555 }
556
557 goto end_nosignal;
558 }
559 case LTTNG_CONSUMER_UPDATE_STREAM:
560 {
561 rcu_read_unlock();
562 return -ENOSYS;
563 }
564 case LTTNG_CONSUMER_DATA_PENDING:
565 {
566 int ret, is_data_pending;
567 uint64_t id = msg.u.data_pending.session_id;
568
569 DBG("UST consumer data pending command for id %" PRIu64, id);
570
571 is_data_pending = consumer_data_pending(id);
572
573 /* Send back returned value to session daemon */
574 ret = lttcomm_send_unix_sock(sock, &is_data_pending,
575 sizeof(is_data_pending));
576 if (ret < 0) {
577 DBG("Error when sending the data pending ret code: %d", ret);
578 }
579
580 /*
581 * No need to send back a status message since the data pending
582 * returned value is the response.
583 */
584 break;
585 }
586 case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
587 {
588 int ret;
589 struct ustctl_consumer_channel_attr attr;
590
591 /* Create a plain object and reserve a channel key. */
592 channel = allocate_channel(msg.u.ask_channel.session_id,
593 msg.u.ask_channel.pathname, msg.u.ask_channel.name,
594 msg.u.ask_channel.uid, msg.u.ask_channel.gid,
595 msg.u.ask_channel.relayd_id, msg.u.ask_channel.key,
596 (enum lttng_event_output) msg.u.ask_channel.output);
597 if (!channel) {
598 goto end_channel_error;
599 }
600
601 /* Build channel attributes from received message. */
602 attr.subbuf_size = msg.u.ask_channel.subbuf_size;
603 attr.num_subbuf = msg.u.ask_channel.num_subbuf;
604 attr.overwrite = msg.u.ask_channel.overwrite;
605 attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
606 attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
607 memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
608
609 /* Translate the event output type to UST. */
610 switch (channel->output) {
611 case LTTNG_EVENT_SPLICE:
612 /* Splice not supported so fallback on mmap(). */
613 case LTTNG_EVENT_MMAP:
614 default:
615 attr.output = CONSUMER_CHANNEL_MMAP;
616 break;
617 };
618
619 /* Translate and save channel type. */
620 switch (msg.u.ask_channel.type) {
621 case LTTNG_UST_CHAN_PER_CPU:
622 channel->type = CONSUMER_CHANNEL_TYPE_DATA;
623 attr.type = LTTNG_UST_CHAN_PER_CPU;
624 break;
625 case LTTNG_UST_CHAN_METADATA:
626 channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
627 attr.type = LTTNG_UST_CHAN_METADATA;
628 break;
629 default:
630 assert(0);
631 goto error_fatal;
632 };
633
634 ret = ask_channel(ctx, sock, channel, &attr);
635 if (ret < 0) {
636 goto end_channel_error;
637 }
638
639 /*
640 * Add the channel to the internal state AFTER all streams were created
641 * and successfully sent to session daemon. This way, all streams must
642 * be ready before this channel is visible to the threads.
643 */
644 ret = add_channel(channel, ctx);
645 if (ret < 0) {
646 goto end_channel_error;
647 }
648
649 /*
650 * Channel and streams are now created. Inform the session daemon that
651 * everything went well and should wait to receive the channel and
652 * streams with ustctl API.
653 */
654 ret = consumer_send_status_channel(sock, channel);
655 if (ret < 0) {
656 /*
657 * There is probably a problem on the socket so the poll will get
658 * it and clean everything up.
659 */
660 goto end_nosignal;
661 }
662
663 break;
664 }
665 case LTTNG_CONSUMER_GET_CHANNEL:
666 {
667 int ret, relayd_err = 0;
668 unsigned long key = msg.u.get_channel.key;
669 struct lttng_consumer_channel *channel;
670 struct lttng_consumer_stream *stream, *stmp;
671
672 channel = consumer_find_channel(key);
673 if (!channel) {
674 ERR("UST consumer get channel key %lu not found", key);
675 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
676 goto end_msg_sessiond;
677 }
678
679 /* Inform sessiond that we are about to send channel and streams. */
680 ret = consumer_send_status_msg(sock, LTTNG_OK);
681 if (ret < 0) {
682 /* Somehow, the session daemon is not responding anymore. */
683 goto end_nosignal;
684 }
685
686 /* Send everything to sessiond. */
687 ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
688 if (ret < 0) {
689 if (relayd_err) {
690 /*
691 * We were unable to send to the relayd the stream so avoid
692 * sending back a fatal error to the thread since this is OK
693 * and the consumer can continue its work.
694 */
695 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
696 goto end_msg_sessiond;
697 }
698 /*
699 * The communicaton was broken hence there is a bad state between
700 * the consumer and sessiond so stop everything.
701 */
702 goto error_fatal;
703 }
704
705 /* Send streams to the corresponding thread. */
706 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
707 send_node) {
708 /* Sending the stream to the thread. */
709 ret = send_stream_to_thread(stream, ctx);
710 if (ret < 0) {
711 /*
712 * If we are unable to send the stream to the thread, there is
713 * a big problem so just stop everything.
714 */
715 goto error_fatal;
716 }
717
718 /* Remove node from the channel stream list. */
719 cds_list_del(&stream->send_node);
720 }
721
722 /* List MUST be empty after or else it could be reused. */
723 assert(cds_list_empty(&channel->streams.head));
724
725 /* Inform sessiond that everything is done and OK on our side. */
726 ret = consumer_send_status_msg(sock, LTTNG_OK);
727 if (ret < 0) {
728 /* Somehow, the session daemon is not responding anymore. */
729 goto end_nosignal;
730 }
731
732 break;
733 }
734 case LTTNG_CONSUMER_DESTROY_CHANNEL:
735 {
736 int ret;
737 unsigned long key = msg.u.destroy_channel.key;
738 struct lttng_consumer_channel *channel;
739
740 DBG("UST consumer destroy channel key %lu", key);
741
742 channel = consumer_find_channel(key);
743 if (!channel) {
744 ERR("UST consumer destroy channel %lu not found", key);
745 ret_code = LTTNG_ERR_UST_CHAN_NOT_FOUND;
746 } else {
747 /* Protocol error if the stream list is NOT empty. */
748 assert(!cds_list_empty(&channel->streams.head));
749 consumer_del_channel(channel);
750 }
751
752 ret = consumer_send_status_msg(sock, LTTNG_OK);
753 if (ret < 0) {
754 /* Somehow, the session daemon is not responding anymore. */
755 goto end_nosignal;
756 }
757 }
758 default:
759 break;
760 }
761
762 end_nosignal:
763 rcu_read_unlock();
764
765 /*
766 * Return 1 to indicate success since the 0 value can be a socket
767 * shutdown during the recv() or send() call.
768 */
769 return 1;
770
771 end_msg_sessiond:
772 /*
773 * The returned value here is not useful since either way we'll return 1 to
774 * the caller because the session daemon socket management is done
775 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
776 */
777 (void) consumer_send_status_msg(sock, ret_code);
778 rcu_read_unlock();
779 return 1;
780 end_channel_error:
781 if (channel) {
782 /*
783 * Free channel here since no one has a reference to it. We don't
784 * free after that because a stream can store this pointer.
785 */
786 destroy_channel(channel);
787 }
788 /* We have to send a status channel message indicating an error. */
789 ret = consumer_send_status_channel(sock, NULL);
790 if (ret < 0) {
791 /* Stop everything if session daemon can not be notified. */
792 goto error_fatal;
793 }
794 rcu_read_unlock();
795 return 1;
796 error_fatal:
797 rcu_read_unlock();
798 /* This will issue a consumer stop. */
799 return -1;
800 }
801
802 /*
803 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
804 * compiled out, we isolate it in this library.
805 */
806 int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
807 unsigned long *off)
808 {
809 assert(stream);
810 assert(stream->ustream);
811
812 return ustctl_get_mmap_read_offset(stream->ustream, off);
813 }
814
815 /*
816 * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
817 * compiled out, we isolate it in this library.
818 */
819 void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream)
820 {
821 assert(stream);
822 assert(stream->ustream);
823
824 return ustctl_get_mmap_base(stream->ustream);
825 }
826
827 /*
828 * Take a snapshot for a specific fd
829 *
830 * Returns 0 on success, < 0 on error
831 */
832 int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
833 {
834 assert(stream);
835 assert(stream->ustream);
836
837 return ustctl_snapshot(stream->ustream);
838 }
839
840 /*
841 * Get the produced position
842 *
843 * Returns 0 on success, < 0 on error
844 */
845 int lttng_ustconsumer_get_produced_snapshot(
846 struct lttng_consumer_stream *stream, unsigned long *pos)
847 {
848 assert(stream);
849 assert(stream->ustream);
850 assert(pos);
851
852 return ustctl_snapshot_get_produced(stream->ustream, pos);
853 }
854
855 /*
856 * Called when the stream signal the consumer that it has hang up.
857 */
858 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
859 {
860 assert(stream);
861 assert(stream->ustream);
862
863 ustctl_flush_buffer(stream->ustream, 0);
864 stream->hangup_flush_done = 1;
865 }
866
867 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
868 {
869 assert(chan);
870 assert(chan->uchan);
871
872 ustctl_destroy_channel(chan->uchan);
873 }
874
875 void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
876 {
877 assert(stream);
878 assert(stream->ustream);
879
880 ustctl_destroy_stream(stream->ustream);
881 }
882
883 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
884 struct lttng_consumer_local_data *ctx)
885 {
886 unsigned long len, subbuf_size, padding;
887 int err;
888 long ret = 0;
889 char dummy;
890 struct ustctl_consumer_stream *ustream;
891
892 assert(stream);
893 assert(stream->ustream);
894 assert(ctx);
895
896 DBG2("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
897 stream->name);
898
899 /* Ease our life for what's next. */
900 ustream = stream->ustream;
901
902 /* We can consume the 1 byte written into the wait_fd by UST */
903 if (!stream->hangup_flush_done) {
904 ssize_t readlen;
905
906 do {
907 readlen = read(stream->wait_fd, &dummy, 1);
908 } while (readlen == -1 && errno == EINTR);
909 if (readlen == -1) {
910 ret = readlen;
911 goto end;
912 }
913 }
914
915 /* Get the next subbuffer */
916 err = ustctl_get_next_subbuf(ustream);
917 if (err != 0) {
918 ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
919 /*
920 * This is a debug message even for single-threaded consumer,
921 * because poll() have more relaxed criterions than get subbuf,
922 * so get_subbuf may fail for short race windows where poll()
923 * would issue wakeups.
924 */
925 DBG("Reserving sub buffer failed (everything is normal, "
926 "it is due to concurrency) [ret: %d]", err);
927 goto end;
928 }
929 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
930 /* Get the full padded subbuffer size */
931 err = ustctl_get_padded_subbuf_size(ustream, &len);
932 assert(err == 0);
933
934 /* Get subbuffer data size (without padding) */
935 err = ustctl_get_subbuf_size(ustream, &subbuf_size);
936 assert(err == 0);
937
938 /* Make sure we don't get a subbuffer size bigger than the padded */
939 assert(len >= subbuf_size);
940
941 padding = len - subbuf_size;
942 /* write the subbuffer to the tracefile */
943 ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding);
944 /*
945 * The mmap operation should write subbuf_size amount of data when network
946 * streaming or the full padding (len) size when we are _not_ streaming.
947 */
948 if ((ret != subbuf_size && stream->net_seq_idx != -1) ||
949 (ret != len && stream->net_seq_idx == -1)) {
950 /*
951 * Display the error but continue processing to try to release the
952 * subbuffer. This is a DBG statement since any unexpected kill or
953 * signal, the application gets unregistered, relayd gets closed or
954 * anything that affects the buffer lifetime will trigger this error.
955 * So, for the sake of the user, don't print this error since it can
956 * happen and it is OK with the code flow.
957 */
958 DBG("Error writing to tracefile "
959 "(ret: %zd != len: %lu != subbuf_size: %lu)",
960 ret, len, subbuf_size);
961 }
962 err = ustctl_put_next_subbuf(ustream);
963 assert(err == 0);
964 end:
965 return ret;
966 }
967
968 /*
969 * Called when a stream is created.
970 */
971 int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
972 {
973 int ret;
974 char full_path[PATH_MAX];
975
976 /* Opening the tracefile in write mode */
977 if (stream->net_seq_idx != -1) {
978 goto end;
979 }
980
981 ret = snprintf(full_path, sizeof(full_path), "%s/%s",
982 stream->chan->pathname, stream->name);
983 if (ret < 0) {
984 PERROR("snprintf on_recv_stream");
985 goto error;
986 }
987
988 ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC,
989 S_IRWXU | S_IRWXG | S_IRWXO, stream->uid, stream->gid);
990 if (ret < 0) {
991 PERROR("open stream path %s", full_path);
992 goto error;
993 }
994 stream->out_fd = ret;
995
996 end:
997 /* we return 0 to let the library handle the FD internally */
998 return 0;
999
1000 error:
1001 return ret;
1002 }
1003
1004 /*
1005 * Check if data is still being extracted from the buffers for a specific
1006 * stream. Consumer data lock MUST be acquired before calling this function
1007 * and the stream lock.
1008 *
1009 * Return 1 if the traced data are still getting read else 0 meaning that the
1010 * data is available for trace viewer reading.
1011 */
1012 int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
1013 {
1014 int ret;
1015
1016 assert(stream);
1017 assert(stream->ustream);
1018
1019 DBG("UST consumer checking data pending");
1020
1021 ret = ustctl_get_next_subbuf(stream->ustream);
1022 if (ret == 0) {
1023 /* There is still data so let's put back this subbuffer. */
1024 ret = ustctl_put_subbuf(stream->ustream);
1025 assert(ret == 0);
1026 ret = 1; /* Data is pending */
1027 goto end;
1028 }
1029
1030 /* Data is NOT pending so ready to be read. */
1031 ret = 0;
1032
1033 end:
1034 return ret;
1035 }
This page took 0.086644 seconds and 4 git commands to generate.