X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=86e405158098d1bf9c995681985a5c6893ae86cf;hp=823b4a28aa9df3934df86e55b09c3469857631fc;hb=f73fabfda365d22e7dd180fb1614e37c446fbd9e;hpb=630543f28cee9c85a569bebd767bb40b2564bd42 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 823b4a28a..86e405158 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -2,19 +2,18 @@ * Copyright (C) 2011 - Julien Desfossez * Mathieu Desnoyers * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _GNU_SOURCE @@ -26,13 +25,16 @@ #include #include #include +#include #include #include #include #include #include +#include #include +#include #include "kernel-consumer.h" @@ -40,118 +42,6 @@ extern struct lttng_consumer_global_data consumer_data; extern int consumer_poll_timeout; extern volatile int consumer_quit; -/* - * Mmap the ring buffer, read it and write the data to the tracefile. - * - * Returns the number of bytes written - */ -ssize_t lttng_kconsumer_on_read_subbuffer_mmap( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) -{ - unsigned long mmap_offset; - ssize_t ret = 0; - off_t orig_offset = stream->out_fd_offset; - int fd = stream->wait_fd; - int outfd = stream->out_fd; - - /* get the offset inside the fd to mmap */ - ret = kernctl_get_mmap_read_offset(fd, &mmap_offset); - if (ret != 0) { - errno = -ret; - perror("kernctl_get_mmap_read_offset"); - goto end; - } - - while (len > 0) { - ret = write(outfd, stream->mmap_base + mmap_offset, len); - if (ret >= len) { - len = 0; - } else if (ret < 0) { - errno = -ret; - perror("Error in file write"); - goto end; - } - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; - } - - lttng_consumer_sync_trace_file(stream, orig_offset); - - goto end; - -end: - return ret; -} - -/* - * Splice the data from the ring buffer to the tracefile. - * - * Returns the number of bytes spliced. - */ -ssize_t lttng_kconsumer_on_read_subbuffer_splice( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) -{ - ssize_t ret = 0; - loff_t offset = 0; - off_t orig_offset = stream->out_fd_offset; - int fd = stream->wait_fd; - int outfd = stream->out_fd; - - while (len > 0) { - DBG("splice chan to pipe offset %lu (fd : %d)", - (unsigned long)offset, fd); - ret = splice(fd, &offset, ctx->consumer_thread_pipe[1], NULL, len, - SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice chan to pipe ret %zd", ret); - if (ret < 0) { - errno = -ret; - perror("Error in relay splice"); - goto splice_error; - } - - ret = splice(ctx->consumer_thread_pipe[0], NULL, outfd, NULL, ret, - SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("splice pipe to file %zd", ret); - if (ret < 0) { - errno = -ret; - perror("Error in file splice"); - goto splice_error; - } - len -= ret; - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; - } - lttng_consumer_sync_trace_file(stream, orig_offset); - - goto end; - -splice_error: - /* send the appropriate error description to sessiond */ - switch(ret) { - case EBADF: - lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EBADF); - break; - case EINVAL: - lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_EINVAL); - break; - case ENOMEM: - lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ENOMEM); - break; - case ESPIPE: - lttng_consumer_send_error(ctx, CONSUMERD_SPLICE_ESPIPE); - break; - } - -end: - return ret; -} - /* * Take a snapshot for a specific fd * @@ -202,14 +92,24 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { return -ENOENT; } + /* relayd needs RCU read-side protection */ + rcu_read_lock(); + switch (msg.cmd_type) { + case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: + { + ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, + msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, + &msg.u.relayd_sock.sock); + goto end_nosignal; + } case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; @@ -220,7 +120,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.mmap_len, msg.u.channel.max_sb_size); if (new_channel == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } if (ctx->on_recv_channel != NULL) { @@ -237,21 +137,24 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_ADD_STREAM: { - struct lttng_consumer_stream *new_stream; int fd; + struct consumer_relayd_sock_pair *relayd = NULL; + struct lttng_consumer_stream *new_stream; /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } + + /* Get stream file descriptor from socket */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); + rcu_read_unlock(); return ret; } - DBG("consumer_add_stream %s (%d)", msg.u.stream.path_name, - fd); new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fd, fd, @@ -260,48 +163,100 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.output, msg.u.stream.path_name, msg.u.stream.uid, - msg.u.stream.gid); + msg.u.stream.gid, + msg.u.stream.net_index, + msg.u.stream.metadata_flag); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); - goto end; + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + goto end_nosignal; } + + /* The stream is not metadata. Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.stream.net_index); + if (relayd != NULL) { + /* Add stream on the relayd */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_add_stream(&relayd->control_sock, + msg.u.stream.name, msg.u.stream.path_name, + &new_stream->relayd_stream_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end_nosignal; + } + } else if (msg.u.stream.net_index != -1) { + ERR("Network sequence index %d unknown. Not adding stream.", + msg.u.stream.net_index); + free(new_stream); + goto end_nosignal; + } + if (ctx->on_recv_stream != NULL) { ret = ctx->on_recv_stream(new_stream); if (ret == 0) { consumer_add_stream(new_stream); } else if (ret < 0) { - goto end; + goto end_nosignal; } } else { consumer_add_stream(new_stream); } + + DBG("Kernel consumer_add_stream (%d)", fd); break; } case LTTNG_CONSUMER_UPDATE_STREAM: { - if (ctx->on_update_stream != NULL) { - ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state); - if (ret == 0) { - consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); - } else if (ret < 0) { - goto end; - } - } else { - consumer_change_stream_state(msg.u.stream.stream_key, - msg.u.stream.state); + rcu_read_unlock(); + return -ENOSYS; + } + case LTTNG_CONSUMER_DESTROY_RELAYD: + { + uint64_t index = msg.u.destroy_relayd.net_seq_idx; + struct consumer_relayd_sock_pair *relayd; + + DBG("Kernel consumer destroying relayd %" PRIu64, index); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(index); + if (relayd == NULL) { + ERR("Unable to find relayd %" PRIu64, index); + goto end_nosignal; } - break; + + /* + * Each relayd socket pair has a refcount of stream attached to it + * which tells if the relayd is still active or not depending on the + * refcount value. + * + * This will set the destroy flag of the relayd object and destroy it + * if the refcount reaches zero when called. + * + * The destroy can happen either here or when a stream fd hangs up. + */ + consumer_flag_relayd_for_destroy(relayd); + + goto end_nosignal; } default: - break; - } -end: - /* signal the poll thread */ - ret = write(ctx->consumer_poll_pipe[1], "4", 1); - if (ret < 0) { - perror("write consumer poll"); + goto end_nosignal; } + + /* + * Wake-up the other end by writing a null byte in the pipe (non-blocking). + * Important note: Because writing into the pipe is non-blocking (and + * therefore we allow dropping wakeup data, as long as there is wakeup data + * present in the pipe buffer to wake up the other end), the other end + * should perform the following sequence for waiting: + * + * 1) empty the pipe (reads). + * 2) perform update operation. + * 3) wait on the pipe (poll). + */ + do { + ret = write(ctx->consumer_poll_pipe[1], "", 1); + } while (ret < 0 && errno == EINTR); end_nosignal: + rcu_read_unlock(); return 0; } @@ -343,13 +298,15 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* splice the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len); - if (ret < 0) { + if (ret != len) { /* * display the error but continue processing to try * to release the subbuffer */ - ERR("Error splicing to tracefile"); + ERR("Error splicing to tracefile (ret: %zd != len: %lu)", + ret, len); } + break; case LTTNG_EVENT_MMAP: /* read the used subbuffer size */ @@ -361,7 +318,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret < 0) { + if (ret != len) { /* * display the error but continue processing to try * to release the subbuffer @@ -395,7 +352,7 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) int ret; /* Opening the tracefile in write mode */ - if (stream->path_name != NULL) { + if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) { ret = run_as_open(stream->path_name, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO,