X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=a57bf15987d28061bf56a994dfacc0ad882c6ecd;hp=c8ba084606180790b51fc7d11ba8ea5075f019c4;hb=87c1611dab332f28b4b72c6ed96cb2a2ef31d5f7;hpb=4078b776c9382a540125d810bcd7cca3a8c84bc8 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index c8ba08460..a57bf1598 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2,19 +2,18 @@ * Copyright (C) 2011 - Julien Desfossez * Mathieu Desnoyers * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _GNU_SOURCE @@ -32,6 +31,7 @@ #include #include +#include #include #include "ust-consumer.h" @@ -50,39 +50,102 @@ ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( struct lttng_consumer_stream *stream, unsigned long len) { unsigned long mmap_offset; - long ret = 0; + long ret = 0, written = 0; off_t orig_offset = stream->out_fd_offset; int outfd = stream->out_fd; + uint64_t metadata_id; + struct consumer_relayd_sock_pair *relayd = NULL; + + /* RCU lock for the relayd pointer */ + rcu_read_lock(); + + /* Flag that the current stream if set for network streaming. */ + if (stream->net_seq_idx != -1) { + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + ERR("UST consumer mmap(), unable to find relay for index %d", + stream->net_seq_idx); + goto end; + } + } /* get the offset inside the fd to mmap */ ret = ustctl_get_mmap_read_offset(stream->chan->handle, stream->buf, &mmap_offset); if (ret != 0) { errno = -ret; - perror("ustctl_get_mmap_read_offset"); + PERROR("ustctl_get_mmap_read_offset"); + written = ret; goto end; } + + /* Handle stream on the relayd if the output is on the network */ + if (relayd) { + if (stream->metadata_flag) { + /* Only lock if metadata since we use the control socket. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + } + + ret = consumer_handle_stream_before_relayd(stream, len); + if (ret >= 0) { + outfd = ret; + + /* Write metadata stream id before payload */ + if (stream->metadata_flag) { + metadata_id = htobe64(stream->relayd_stream_id); + do { + ret = write(outfd, (void *) &metadata_id, + sizeof(stream->relayd_stream_id)); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("write metadata stream id"); + written = ret; + goto end; + } + DBG("Metadata stream id %zu written before data", + stream->relayd_stream_id); + } + } + /* Else, use the default set before which is the filesystem. */ + } + while (len > 0) { - ret = write(outfd, stream->mmap_base + mmap_offset, len); - if (ret >= len) { - len = 0; - } else if (ret < 0) { - errno = -ret; - perror("Error in file write"); + do { + ret = write(outfd, stream->mmap_base + mmap_offset, len); + } while (ret < 0 && errno == EINTR); + if (ret < 0) { + PERROR("Error in file write"); + if (written == 0) { + written = ret; + } + goto end; + } else if (ret > len) { + PERROR("ret %ld > len %lu", ret, len); + written += ret; goto end; + } else { + len -= ret; + mmap_offset += ret; + } + DBG("UST mmap write() ret %ld (len %lu)", ret, len); + + /* This call is useless on a socket so better save a syscall. */ + if (!relayd) { + /* This won't block, but will start writeout asynchronously */ + lttng_sync_file_range(outfd, stream->out_fd_offset, ret, + SYNC_FILE_RANGE_WRITE); + stream->out_fd_offset += ret; } - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; + written += ret; } - lttng_consumer_sync_trace_file(stream, orig_offset); - goto end; - end: - return ret; + if (relayd && stream->metadata_flag) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + } + rcu_read_unlock(); + return written; } /* @@ -110,7 +173,7 @@ int lttng_ustconsumer_take_snapshot(struct lttng_consumer_local_data *ctx, ret = ustctl_snapshot(stream->chan->handle, stream->buf); if (ret != 0) { errno = -ret; - perror("Getting sub-buffer snapshot."); + PERROR("Getting sub-buffer snapshot."); } return ret; @@ -132,7 +195,7 @@ int lttng_ustconsumer_get_produced_snapshot( stream->buf, pos); if (ret != 0) { errno = -ret; - perror("kernctl_snapshot_get_produced"); + PERROR("kernctl_snapshot_get_produced"); } return ret; @@ -153,7 +216,88 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return -ENOENT; } + /* relayd need RCU read-side lock */ + rcu_read_lock(); + switch (msg.cmd_type) { + case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: + { + int fd; + struct consumer_relayd_sock_pair *relayd; + + DBG("UST Consumer adding relayd socket"); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.relayd_sock.net_index); + if (relayd == NULL) { + /* Not found. Allocate one. */ + relayd = consumer_allocate_relayd_sock_pair( + msg.u.relayd_sock.net_index); + if (relayd == NULL) { + lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + goto end_nosignal; + } + } + + /* Poll on consumer socket. */ + if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + return -EINTR; + } + + /* Get relayd socket from session daemon */ + ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); + if (ret != sizeof(fd)) { + lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + goto end_nosignal; + } + + /* Copy socket information and received FD */ + switch (msg.u.relayd_sock.type) { + case LTTNG_STREAM_CONTROL: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock); + ret = lttcomm_create_sock(&relayd->control_sock); + if (ret < 0) { + goto end_nosignal; + } + + /* Close the created socket fd which is useless */ + close(relayd->control_sock.fd); + + /* Assign new file descriptor */ + relayd->control_sock.fd = fd; + break; + case LTTNG_STREAM_DATA: + /* Copy received lttcomm socket */ + lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock); + ret = lttcomm_create_sock(&relayd->data_sock); + if (ret < 0) { + goto end_nosignal; + } + + /* Close the created socket fd which is useless */ + close(relayd->data_sock.fd); + + /* Assign new file descriptor */ + relayd->data_sock.fd = fd; + break; + default: + ERR("Unknown relayd socket type"); + goto end_nosignal; + } + + DBG("Consumer %s socket created successfully with net idx %d (fd: %d)", + msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data", + relayd->net_seq_idx, fd); + + /* + * Add relayd socket pair to consumer data hashtable. If object already + * exists or on error, the function gracefully returns. + */ + consumer_add_relayd(relayd); + + goto end_nosignal; + } case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; @@ -197,6 +341,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *new_stream; int fds[2]; size_t nb_fd = 2; + struct consumer_relayd_sock_pair *relayd = NULL; /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { @@ -208,8 +353,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, return ret; } - DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, - fds[0], fds[1]); assert(msg.u.stream.output == LTTNG_EVENT_MMAP); new_stream = consumer_allocate_stream(msg.u.channel.channel_key, msg.u.stream.stream_key, @@ -219,11 +362,33 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.output, msg.u.stream.path_name, msg.u.stream.uid, - msg.u.stream.gid); + msg.u.stream.gid, + msg.u.stream.net_index, + msg.u.stream.metadata_flag); if (new_stream == NULL) { lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); goto end; } + + /* The stream is not metadata. Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.stream.net_index); + if (relayd != NULL) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + /* Add stream on the relayd */ + ret = relayd_add_stream(&relayd->control_sock, + msg.u.stream.name, msg.u.stream.path_name, + &new_stream->relayd_stream_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + } else if (msg.u.stream.net_index != -1) { + ERR("Network sequence index %d unknown. Not adding stream.", + msg.u.stream.net_index); + free(new_stream); + goto end; + } + if (ctx->on_recv_stream != NULL) { ret = ctx->on_recv_stream(new_stream); if (ret == 0) { @@ -234,6 +399,10 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { consumer_add_stream(new_stream); } + + DBG("UST consumer_add_stream %s (%d,%d) with relayd id %lu", + msg.u.stream.path_name, fds[0], fds[1], + new_stream->relayd_stream_id); break; } case LTTNG_CONSUMER_UPDATE_STREAM: @@ -258,12 +427,22 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } end: - /* signal the poll thread */ - ret = write(ctx->consumer_poll_pipe[1], "4", 1); - if (ret < 0) { - perror("write consumer poll"); - } + /* + * Wake-up the other end by writing a null byte in the pipe + * (non-blocking). Important note: Because writing into the + * pipe is non-blocking (and therefore we allow dropping wakeup + * data, as long as there is wakeup data present in the pipe + * buffer to wake up the other end), the other end should + * perform the following sequence for waiting: + * 1) empty the pipe (reads). + * 2) perform update operation. + * 3) wait on the pipe (poll). + */ + do { + ret = write(ctx->consumer_poll_pipe[1], "", 1); + } while (ret < 0 && errno == EINTR); end_nosignal: + rcu_read_unlock(); return 0; } @@ -376,7 +555,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, assert(err == 0); /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret < 0) { + if (ret != len) { /* * display the error but continue processing to try * to release the subbuffer @@ -394,14 +573,14 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) int ret; /* Opening the tracefile in write mode */ - if (stream->path_name != NULL) { + if (stream->path_name != NULL && stream->net_seq_idx == -1) { ret = run_as_open(stream->path_name, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO, stream->uid, stream->gid); if (ret < 0) { ERR("Opening %s", stream->path_name); - perror("open"); + PERROR("open"); goto error; } stream->out_fd = ret;