X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=2b8098835857106a811f03761fcafe7489f1ce1f;hp=c8ba084606180790b51fc7d11ba8ea5075f019c4;hb=4cbc1a04e8ac3c1dd4f9a4dc44b56ee8430189f0;hpb=1af0e2e4d4a770e32f4fc169122b0c338bd799f6 diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index c8ba08460..2b8098835 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2,23 +2,23 @@ * Copyright (C) 2011 - Julien Desfossez * Mathieu Desnoyers * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; only version 2 - * of the License. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _GNU_SOURCE #include +#include #include #include #include @@ -27,11 +27,12 @@ #include #include #include +#include #include -#include #include #include +#include #include #include "ust-consumer.h" @@ -41,61 +42,14 @@ extern int consumer_poll_timeout; extern volatile int consumer_quit; /* - * Mmap the ring buffer, read it and write the data to the tracefile. - * - * Returns the number of bytes written, else negative value on error. + * Wrapper over the mmap() read offset from ust-ctl library. Since this can be + * compiled out, we isolate it in this library. */ -ssize_t lttng_ustconsumer_on_read_subbuffer_mmap( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) +int lttng_ustctl_get_mmap_read_offset(struct lttng_ust_shm_handle *handle, + struct lttng_ust_lib_ring_buffer *buf, unsigned long *off) { - unsigned long mmap_offset; - long ret = 0; - off_t orig_offset = stream->out_fd_offset; - int outfd = stream->out_fd; - - /* get the offset inside the fd to mmap */ - ret = ustctl_get_mmap_read_offset(stream->chan->handle, - stream->buf, &mmap_offset); - if (ret != 0) { - errno = -ret; - perror("ustctl_get_mmap_read_offset"); - goto end; - } - while (len > 0) { - ret = write(outfd, stream->mmap_base + mmap_offset, len); - if (ret >= len) { - len = 0; - } else if (ret < 0) { - errno = -ret; - perror("Error in file write"); - goto end; - } - /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range(outfd, stream->out_fd_offset, ret, - SYNC_FILE_RANGE_WRITE); - stream->out_fd_offset += ret; - } - - lttng_consumer_sync_trace_file(stream, orig_offset); - - goto end; - -end: - return ret; -} - -/* - * Splice the data from the ring buffer to the tracefile. - * - * Returns the number of bytes spliced. - */ -ssize_t lttng_ustconsumer_on_read_subbuffer_splice( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len) -{ - return -ENOSYS; -} + return ustctl_get_mmap_read_offset(handle, buf, off); +}; /* * Take a snapshot for a specific fd @@ -110,7 +64,7 @@ int lttng_ustconsumer_take_snapshot(struct lttng_consumer_local_data *ctx, ret = ustctl_snapshot(stream->chan->handle, stream->buf); if (ret != 0) { errno = -ret; - perror("Getting sub-buffer snapshot."); + PERROR("Getting sub-buffer snapshot."); } return ret; @@ -132,12 +86,17 @@ int lttng_ustconsumer_get_produced_snapshot( stream->buf, pos); if (ret != 0) { errno = -ret; - perror("kernctl_snapshot_get_produced"); + PERROR("kernctl_snapshot_get_produced"); } return ret; } +/* + * Receive command from session daemon and process it. + * + * Return 1 on success else a negative value or 0. + */ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { @@ -146,27 +105,43 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + DBG("Consumer received unexpected message size %zd (expects %zu)", + ret, sizeof(msg)); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { return -ENOENT; } + /* relayd needs RCU read-side lock */ + rcu_read_lock(); + switch (msg.cmd_type) { + case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: + { + ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, + msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, + &msg.u.relayd_sock.sock); + goto end_nosignal; + } case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; int fds[1]; size_t nb_fd = 1; + DBG("UST Consumer adding channel"); + /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); if (ret != sizeof(fds)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); + rcu_read_unlock(); return ret; } @@ -177,7 +152,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.mmap_len, msg.u.channel.max_sb_size); if (new_channel == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } if (ctx->on_recv_channel != NULL) { @@ -197,21 +172,28 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *new_stream; int fds[2]; size_t nb_fd = 2; + struct consumer_relayd_sock_pair *relayd = NULL; + + DBG("UST Consumer adding stream"); /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + rcu_read_unlock(); return -EINTR; } ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd); if (ret != sizeof(fds)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); + rcu_read_unlock(); return ret; } - DBG("consumer_add_stream %s (%d,%d)", msg.u.stream.path_name, - fds[0], fds[1]); + DBG("consumer_add_stream chan %d stream %d", + msg.u.stream.channel_key, + msg.u.stream.stream_key); + assert(msg.u.stream.output == LTTNG_EVENT_MMAP); - new_stream = consumer_allocate_stream(msg.u.channel.channel_key, + new_stream = consumer_allocate_stream(msg.u.stream.channel_key, msg.u.stream.stream_key, fds[0], fds[1], msg.u.stream.state, @@ -219,25 +201,80 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.output, msg.u.stream.path_name, msg.u.stream.uid, - msg.u.stream.gid); + msg.u.stream.gid, + msg.u.stream.net_index, + msg.u.stream.metadata_flag); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); - goto end; + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + goto end_nosignal; + } + + /* The stream is not metadata. Get relayd reference if exists. */ + relayd = consumer_find_relayd(msg.u.stream.net_index); + if (relayd != NULL) { + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + /* Add stream on the relayd */ + ret = relayd_add_stream(&relayd->control_sock, + msg.u.stream.name, msg.u.stream.path_name, + &new_stream->relayd_stream_id); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end_nosignal; + } + } else if (msg.u.stream.net_index != -1) { + ERR("Network sequence index %d unknown. Not adding stream.", + msg.u.stream.net_index); + free(new_stream); + goto end_nosignal; } + if (ctx->on_recv_stream != NULL) { ret = ctx->on_recv_stream(new_stream); if (ret == 0) { consumer_add_stream(new_stream); } else if (ret < 0) { - goto end; + goto end_nosignal; } } else { consumer_add_stream(new_stream); } + + DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64, + msg.u.stream.path_name, fds[0], fds[1], + new_stream->relayd_stream_id); break; } + case LTTNG_CONSUMER_DESTROY_RELAYD: + { + uint64_t index = msg.u.destroy_relayd.net_seq_idx; + struct consumer_relayd_sock_pair *relayd; + + DBG("UST consumer destroying relayd %" PRIu64, index); + + /* Get relayd reference if exists. */ + relayd = consumer_find_relayd(index); + if (relayd == NULL) { + ERR("Unable to find relayd %" PRIu64, index); + goto end_nosignal; + } + + /* + * Each relayd socket pair has a refcount of stream attached to it + * which tells if the relayd is still active or not depending on the + * refcount value. + * + * This will set the destroy flag of the relayd object and destroy it + * if the refcount reaches zero when called. + * + * The destroy can happen either here or when a stream fd hangs up. + */ + consumer_flag_relayd_for_destroy(relayd); + + goto end_nosignal; + } case LTTNG_CONSUMER_UPDATE_STREAM: { + rcu_read_unlock(); return -ENOSYS; #if 0 if (ctx->on_update_stream != NULL) { @@ -251,20 +288,35 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state); } -#endif break; +#endif } default: break; } -end: - /* signal the poll thread */ - ret = write(ctx->consumer_poll_pipe[1], "4", 1); - if (ret < 0) { - perror("write consumer poll"); - } + + /* + * Wake-up the other end by writing a null byte in the pipe (non-blocking). + * Important note: Because writing into the pipe is non-blocking (and + * therefore we allow dropping wakeup data, as long as there is wakeup data + * present in the pipe buffer to wake up the other end), the other end + * should perform the following sequence for waiting: + * + * 1) empty the pipe (reads). + * 2) perform update operation. + * 3) wait on the pipe (poll). + */ + do { + ret = write(ctx->consumer_poll_pipe[1], "", 1); + } while (ret < 0 && errno == EINTR); end_nosignal: - return 0; + rcu_read_unlock(); + + /* + * Return 1 to indicate success since the 0 value can be a socket + * shutdown during the recv() or send() call. + */ + return 1; } int lttng_ustconsumer_allocate_channel(struct lttng_consumer_channel *chan) @@ -376,12 +428,12 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, assert(err == 0); /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret < 0) { + if (ret != len) { /* * display the error but continue processing to try * to release the subbuffer */ - ERR("Error writing to tracefile"); + ERR("Error writing to tracefile (expected: %ld, got: %ld)", ret, len); } err = ustctl_put_next_subbuf(handle, buf); assert(err == 0); @@ -394,14 +446,14 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream) int ret; /* Opening the tracefile in write mode */ - if (stream->path_name != NULL) { + if (stream->path_name != NULL && stream->net_seq_idx == -1) { ret = run_as_open(stream->path_name, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO, stream->uid, stream->gid); if (ret < 0) { ERR("Opening %s", stream->path_name); - perror("open"); + PERROR("open"); goto error; } stream->out_fd = ret;