X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=f47c498777531ef6702b41d34b153abba3320e1e;hp=4a7ba04516e442aae6b824d61923bffd1f652e1c;hb=9d9353f936717527863fda5afcbb89aa459f0852;hpb=a6ba4fe1a8217fd5cb9e286b4d88a9252c0d5d06 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 4a7ba0451..f47c49877 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -34,7 +34,9 @@ #include #include #include +#include #include +#include #include "kernel-consumer.h" @@ -47,8 +49,7 @@ extern volatile int consumer_quit; * * Returns 0 on success, < 0 on error */ -int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) +int lttng_kconsumer_take_snapshot(struct lttng_consumer_stream *stream) { int ret = 0; int infd = stream->wait_fd; @@ -67,9 +68,7 @@ int lttng_kconsumer_take_snapshot(struct lttng_consumer_local_data *ctx, * * Returns 0 on success, < 0 on error */ -int lttng_kconsumer_get_produced_snapshot( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, +int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos) { int ret; @@ -88,14 +87,23 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { ssize_t ret; + enum lttng_error_code ret_code = LTTNG_OK; struct lttcomm_consumer_msg msg; ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_CMD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { + /* + * Notify the session daemon that the command is completed. + * + * On transport layer error, the function call will print an error + * message so handling the returned code is a bit useless since we + * return an error code anyway. + */ + (void) consumer_send_status_msg(sock, ret_code); return -ENOENT; } @@ -105,41 +113,98 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { + /* Session daemon status message are handled in the following call. */ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index, msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock); + &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; + int ret_recv; - DBG("consumer_add_channel %d", msg.u.channel.channel_key); + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, - -1, -1, - msg.u.channel.mmap_len, - msg.u.channel.max_sb_size); + msg.u.channel.session_id, msg.u.channel.pathname, + msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid, + msg.u.channel.relayd_id, msg.u.channel.output, + msg.u.channel.tracefile_size, + msg.u.channel.tracefile_count); if (new_channel == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } + new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams; + + /* Translate and save channel type. */ + switch (msg.u.channel.type) { + case CONSUMER_CHANNEL_TYPE_DATA: + case CONSUMER_CHANNEL_TYPE_METADATA: + new_channel->type = msg.u.channel.type; + break; + default: + assert(0); + goto end_nosignal; + }; + if (ctx->on_recv_channel != NULL) { - ret = ctx->on_recv_channel(new_channel); - if (ret == 0) { - consumer_add_channel(new_channel); - } else if (ret < 0) { + ret_recv = ctx->on_recv_channel(new_channel); + if (ret_recv == 0) { + ret = consumer_add_channel(new_channel, ctx); + } else if (ret_recv < 0) { goto end_nosignal; } } else { - consumer_add_channel(new_channel); + ret = consumer_add_channel(new_channel, ctx); + } + + /* If we received an error in add_channel, we need to report it. */ + if (ret != 0) { + consumer_send_status_msg(sock, ret); + goto end_nosignal; } + goto end_nosignal; } case LTTNG_CONSUMER_ADD_STREAM: { int fd; + struct lttng_pipe *stream_pipe; struct consumer_relayd_sock_pair *relayd = NULL; struct lttng_consumer_stream *new_stream; + struct lttng_consumer_channel *channel; + int alloc_ret = 0; + + /* + * Get stream's channel reference. Needed when adding the stream to the + * global hash table. + */ + channel = consumer_find_channel(msg.u.stream.channel_key); + if (!channel) { + /* + * We could not find the channel. Can happen if cpu hotplug + * happens while tearing down. + */ + ERR("Unable to find channel key %" PRIu64, msg.u.stream.channel_key); + ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND; + } + + /* First send a status message before receiving the fds. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0 || ret_code != LTTNG_OK) { + /* + * Somehow, the session daemon is not responding anymore or the + * channel was not found. + */ + goto end_nosignal; + } /* block */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { @@ -150,58 +215,109 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get stream file descriptor from socket */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { - lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD); + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); rcu_read_unlock(); return ret; } - new_stream = consumer_allocate_stream(msg.u.stream.channel_key, - msg.u.stream.stream_key, - fd, fd, - msg.u.stream.state, - msg.u.stream.mmap_len, - msg.u.stream.output, - msg.u.stream.path_name, - msg.u.stream.uid, - msg.u.stream.gid, - msg.u.stream.net_index, - msg.u.stream.metadata_flag); + /* + * Send status code to session daemon only if the recv works. If the + * above recv() failed, the session daemon is notified through the + * error socket and the teardown is eventually done. + */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + new_stream = consumer_allocate_stream(channel->key, + fd, + LTTNG_CONSUMER_ACTIVE_STREAM, + channel->name, + channel->uid, + channel->gid, + channel->relayd_id, + channel->session_id, + msg.u.stream.cpu, + &alloc_ret, + channel->type); if (new_stream == NULL) { - lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR); + switch (alloc_ret) { + case -ENOMEM: + case -EINVAL: + default: + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); + break; + } goto end_nosignal; } + new_stream->chan = channel; + new_stream->wait_fd = fd; + + /* Metadata chan refcount is increment in add_metadata_stream */ + if (new_stream->chan->type != CONSUMER_CHANNEL_TYPE_METADATA) { + /* Update channel refcount */ + uatomic_inc(&new_stream->chan->refcount); + } + + /* + * The buffer flush is done on the session daemon side for the kernel + * so no need for the stream "hangup_flush_done" variable to be + * tracked. This is important for a kernel stream since we don't rely + * on the flush state of the stream to read data. It's not the case for + * user space tracing. + */ + new_stream->hangup_flush_done = 0; /* The stream is not metadata. Get relayd reference if exists. */ - relayd = consumer_find_relayd(msg.u.stream.net_index); + relayd = consumer_find_relayd(new_stream->net_seq_idx); if (relayd != NULL) { /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_add_stream(&relayd->control_sock, - msg.u.stream.name, msg.u.stream.path_name, - &new_stream->relayd_stream_id); + new_stream->name, new_stream->chan->pathname, + &new_stream->relayd_stream_id, + new_stream->chan->tracefile_size, + new_stream->chan->tracefile_count); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + consumer_del_stream(new_stream, NULL); goto end_nosignal; } - } else if (msg.u.stream.net_index != -1) { - ERR("Network sequence index %d unknown. Not adding stream.", - msg.u.stream.net_index); - free(new_stream); + } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) { + ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", + new_stream->net_seq_idx); + consumer_del_stream(new_stream, NULL); goto end_nosignal; } - if (ctx->on_recv_stream != NULL) { + if (ctx->on_recv_stream) { ret = ctx->on_recv_stream(new_stream); - if (ret == 0) { - consumer_add_stream(new_stream); - } else if (ret < 0) { + if (ret < 0) { + consumer_del_stream(new_stream, NULL); goto end_nosignal; } + } + + /* Get the right pipe where the stream will be sent. */ + if (new_stream->metadata_flag) { + stream_pipe = ctx->consumer_metadata_pipe; } else { - consumer_add_stream(new_stream); + stream_pipe = ctx->consumer_data_pipe; + } + + ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); + if (ret < 0) { + ERR("Consumer write %s stream to pipe %d", + new_stream->metadata_flag ? "metadata" : "data", + lttng_pipe_get_writefd(stream_pipe)); + consumer_del_stream(new_stream, NULL); + goto end_nosignal; } - DBG("Kernel consumer_add_stream (%d)", fd); + DBG("Kernel consumer ADD_STREAM %s (fd: %d) with relayd id %" PRIu64, + new_stream->name, fd, new_stream->relayd_stream_id); break; } case LTTNG_CONSUMER_UPDATE_STREAM: @@ -219,8 +335,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get relayd reference if exists. */ relayd = consumer_find_relayd(index); if (relayd == NULL) { - ERR("Unable to find relayd %" PRIu64, index); - goto end_nosignal; + DBG("Unable to find relayd %" PRIu64, index); + ret_code = LTTNG_ERR_NO_CONSUMER; } /* @@ -233,31 +349,51 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * * The destroy can happen either here or when a stream fd hangs up. */ - consumer_flag_relayd_for_destroy(relayd); + if (relayd) { + consumer_flag_relayd_for_destroy(relayd); + } + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } goto end_nosignal; } + case LTTNG_CONSUMER_DATA_PENDING: + { + int32_t ret; + uint64_t id = msg.u.data_pending.session_id; + + DBG("Kernel consumer data pending command for id %" PRIu64, id); + + ret = consumer_data_pending(id); + + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); + if (ret < 0) { + PERROR("send data pending ret code"); + } + + /* + * No need to send back a status message since the data pending + * returned value is the response. + */ + break; + } default: goto end_nosignal; } - /* - * Wake-up the other end by writing a null byte in the pipe (non-blocking). - * Important note: Because writing into the pipe is non-blocking (and - * therefore we allow dropping wakeup data, as long as there is wakeup data - * present in the pipe buffer to wake up the other end), the other end - * should perform the following sequence for waiting: - * - * 1) empty the pipe (reads). - * 2) perform update operation. - * 3) wait on the pipe (poll). - */ - do { - ret = write(ctx->consumer_poll_pipe[1], "", 1); - } while (ret < 0 && errno == EINTR); end_nosignal: rcu_read_unlock(); - return 0; + + /* + * Return 1 to indicate success since the 0 value can be a socket + * shutdown during the recv() or send() call. + */ + return 1; } /* @@ -266,7 +402,7 @@ end_nosignal: ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { - unsigned long len; + unsigned long len, subbuf_size, padding; int err; ssize_t ret = 0; int infd = stream->wait_fd; @@ -275,6 +411,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* Get the next subbuffer */ err = kernctl_get_next_subbuf(infd); if (err != 0) { + ret = err; /* * This is a debug message even for single-threaded consumer, * because poll() have more relaxed criterions than get subbuf, @@ -286,60 +423,92 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto end; } - switch (stream->output) { - case LTTNG_EVENT_SPLICE: - /* read the whole subbuffer */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - errno = -ret; - perror("Getting sub-buffer len failed."); - goto end; - } + /* Get the full subbuffer size including padding */ + err = kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + errno = -err; + perror("Getting sub-buffer len failed."); + ret = err; + goto end; + } - /* splice the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, len); - if (ret != len) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error splicing to tracefile (ret: %zd != len: %lu)", - ret, len); - } + switch (stream->chan->output) { + case LTTNG_EVENT_SPLICE: - break; - case LTTNG_EVENT_MMAP: - /* read the used subbuffer size */ - err = kernctl_get_padded_subbuf_size(infd, &len); - if (err != 0) { - errno = -ret; - perror("Getting sub-buffer len failed."); - goto end; - } - /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len); - if (ret != len) { - /* - * display the error but continue processing to try - * to release the subbuffer - */ - ERR("Error writing to tracefile"); - } - break; - default: - ERR("Unknown output method"); - ret = -1; + /* + * XXX: The lttng-modules splice "actor" does not handle copying + * partial pages hence only using the subbuffer size without the + * padding makes the splice fail. + */ + subbuf_size = len; + padding = 0; + + /* splice the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_splice(ctx, stream, subbuf_size, + padding); + /* + * XXX: Splice does not support network streaming so the return value + * is simply checked against subbuf_size and not like the mmap() op. + */ + if (ret != subbuf_size) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + ERR("Error splicing to tracefile (ret: %zd != len: %lu)", + ret, subbuf_size); + } + break; + case LTTNG_EVENT_MMAP: + /* Get subbuffer size without padding */ + err = kernctl_get_subbuf_size(infd, &subbuf_size); + if (err != 0) { + errno = -err; + perror("Getting sub-buffer len failed."); + ret = err; + goto end; + } + + /* Make sure the tracer is not gone mad on us! */ + assert(len >= subbuf_size); + + padding = len - subbuf_size; + + /* write the subbuffer to the tracefile */ + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, + padding); + /* + * The mmap operation should write subbuf_size amount of data when + * network streaming or the full padding (len) size when we are _not_ + * streaming. + */ + if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) || + (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) { + /* + * Display the error but continue processing to try to release the + * subbuffer + */ + ERR("Error writing to tracefile " + "(ret: %zd != len: %lu != subbuf_size: %lu)", + ret, len, subbuf_size); + } + break; + default: + ERR("Unknown output method"); + ret = -1; } err = kernctl_put_next_subbuf(infd); if (err != 0) { - errno = -ret; + errno = -err; if (errno == EFAULT) { perror("Error in unreserving sub buffer\n"); } else if (errno == EIO) { /* Should never happen with newer LTTng versions */ perror("Reader has been pushed by the writer, last sub-buffer corrupted."); } + + ret = -err; goto end; } @@ -351,18 +520,18 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) { int ret; - /* Opening the tracefile in write mode */ - if (strlen(stream->path_name) > 0 && stream->net_seq_idx == -1) { - ret = run_as_open(stream->path_name, - O_WRONLY|O_CREAT|O_TRUNC, - S_IRWXU|S_IRWXG|S_IRWXO, + assert(stream); + + /* Don't create anything if this is set for streaming. */ + if (stream->net_seq_idx == (uint64_t) -1ULL) { + ret = utils_create_stream_file(stream->chan->pathname, stream->name, + stream->chan->tracefile_size, stream->tracefile_count_current, stream->uid, stream->gid); if (ret < 0) { - ERR("Opening %s", stream->path_name); - perror("open"); goto error; } stream->out_fd = ret; + stream->tracefile_size_current = 0; } if (stream->output == LTTNG_EVENT_MMAP) { @@ -372,15 +541,15 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) ret = kernctl_get_mmap_len(stream->wait_fd, &mmap_len); if (ret != 0) { errno = -ret; - perror("kernctl_get_mmap_len"); + PERROR("kernctl_get_mmap_len"); goto error_close_fd; } stream->mmap_len = (size_t) mmap_len; - stream->mmap_base = mmap(NULL, stream->mmap_len, - PROT_READ, MAP_PRIVATE, stream->wait_fd, 0); + stream->mmap_base = mmap(NULL, stream->mmap_len, PROT_READ, + MAP_PRIVATE, stream->wait_fd, 0); if (stream->mmap_base == MAP_FAILED) { - perror("Error mmaping"); + PERROR("Error mmaping"); ret = -1; goto error_close_fd; } @@ -400,3 +569,32 @@ error: return ret; } +/* + * Check if data is still being extracted from the buffers for a specific + * stream. Consumer data lock MUST be acquired before calling this function + * and the stream lock. + * + * Return 1 if the traced data are still getting read else 0 meaning that the + * data is available for trace viewer reading. + */ +int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream) +{ + int ret; + + assert(stream); + + ret = kernctl_get_next_subbuf(stream->wait_fd); + if (ret == 0) { + /* There is still data so let's put back this subbuffer. */ + ret = kernctl_put_subbuf(stream->wait_fd); + assert(ret == 0); + ret = 1; /* Data is pending */ + goto end; + } + + /* Data is NOT pending and ready to be read. */ + ret = 0; + +end: + return ret; +}