X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=29ef3a4ce426d1db8755796fdafffceed43525b6;hp=bc3ddc930cf1cb8a10d0cd03b9150a4385c15bed;hb=07b86b528dc279d59cdf16e6cb946c144fe773f2;hpb=fe4477ee14abb348ce9e167f8b4c09312d67de36 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index bc3ddc930..29ef3a4ce 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -34,8 +34,10 @@ #include #include #include +#include #include #include +#include #include "kernel-consumer.h" @@ -82,6 +84,326 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, return ret; } +/* + * Get the consumerd position + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + int ret; + int infd = stream->wait_fd; + + ret = kernctl_snapshot_get_consumed(infd, pos); + if (ret != 0) { + errno = -ret; + perror("kernctl_snapshot_get_consumed"); + } + + return ret; +} + +/* + * Find a relayd and send the stream + * + * Returns 0 on success, < 0 on error + */ +static +int send_relayd_stream(struct lttng_consumer_stream *stream, char *path) +{ + struct consumer_relayd_sock_pair *relayd; + int ret = 0; + char *stream_path; + + if (path != NULL) { + stream_path = path; + } else { + stream_path = stream->chan->pathname; + } + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + /* Add stream on the relayd */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_add_stream(&relayd->control_sock, + stream->name, stream_path, + &stream->relayd_stream_id, + stream->chan->tracefile_size, + stream->chan->tracefile_count); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + uatomic_inc(&relayd->refcount); + } else if (stream->net_seq_idx != (uint64_t) -1ULL) { + ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", + stream->net_seq_idx); + ret = -1; + goto end; + } + +end: + rcu_read_unlock(); + return ret; +} + +/* + * Find a relayd and close the stream + */ +static +void close_relayd_stream(struct lttng_consumer_stream *stream) +{ + struct consumer_relayd_sock_pair *relayd; + + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + consumer_stream_relayd_close(stream, relayd); + } + rcu_read_unlock(); +} + +/* + * Take a snapshot of all the stream of a channel + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, + uint64_t relayd_id, struct lttng_consumer_local_data *ctx) +{ + int ret; + unsigned long consumed_pos, produced_pos; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + + DBG("Kernel consumer snapshot channel %lu", key); + + rcu_read_lock(); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("No channel found for key %lu", key); + ret = -1; + goto end; + } + + /* Splice is not supported yet for channel snapshot. */ + if (channel->output != CONSUMER_CHANNEL_MMAP) { + ERR("Unsupported output %d", channel->output); + ret = -1; + goto end; + } + + cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head, + no_monitor_node) { + /* + * Lock stream because we are about to change its state. + */ + pthread_mutex_lock(&stream->lock); + + stream->net_seq_idx = relayd_id; + channel->relayd_id = relayd_id; + if (relayd_id != (uint64_t) -1ULL) { + ret = send_relayd_stream(stream, path); + if (ret < 0) { + ERR("sending stream to relayd"); + goto end_unlock; + } + DBG("Stream %s sent to the relayd", stream->name); + } else { + ret = utils_create_stream_file(path, stream->name, + stream->chan->tracefile_size, stream->tracefile_count_current, + stream->uid, stream->gid); + if (ret < 0) { + ERR("utils_create_stream_file"); + goto end_unlock; + } + + stream->out_fd = ret; + stream->tracefile_size_current = 0; + + DBG("Kernel consumer snapshot stream %s/%s (%lu)", path, + stream->name, stream->key); + } + + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel metadata stream"); + goto end_unlock; + } + + ret = lttng_kconsumer_take_snapshot(stream); + if (ret < 0) { + ERR("Taking kernel snapshot"); + goto end_unlock; + } + + ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos); + if (ret < 0) { + ERR("Produced kernel snapshot position"); + goto end_unlock; + } + + ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret < 0) { + ERR("Consumerd kernel snapshot position"); + goto end_unlock; + } + + if (stream->max_sb_size == 0) { + ret = kernctl_get_max_subbuf_size(stream->wait_fd, + &stream->max_sb_size); + if (ret < 0) { + ERR("Getting kernel max_sb_size"); + goto end_unlock; + } + } + + while (consumed_pos < produced_pos) { + ssize_t read_len; + unsigned long len, padded_len; + + DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos); + + ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos); + if (ret < 0) { + if (errno != EAGAIN) { + PERROR("kernctl_get_subbuf snapshot"); + goto end_unlock; + } + DBG("Kernel consumer get subbuf failed. Skipping it."); + consumed_pos += stream->max_sb_size; + continue; + } + + ret = kernctl_get_subbuf_size(stream->wait_fd, &len); + if (ret < 0) { + ERR("Snapshot kernctl_get_subbuf_size"); + goto end_unlock; + } + + ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len); + if (ret < 0) { + ERR("Snapshot kernctl_get_padded_subbuf_size"); + goto end_unlock; + } + + read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, + padded_len - len); + /* + * We write the padded len in local tracefiles but the + * data len when using a relay. + * Display the error but continue processing to try to + * release the subbuffer. + */ + if (relayd_id != (uint64_t) -1ULL) { + if (read_len != len) { + ERR("Error sending to the relay (ret: %zd != len: %lu)", + read_len, len); + } + } else { + if (read_len != padded_len) { + ERR("Error writing to tracefile (ret: %zd != len: %lu)", + read_len, padded_len); + } + } + + ret = kernctl_put_subbuf(stream->wait_fd); + if (ret < 0) { + ERR("Snapshot kernctl_put_subbuf"); + goto end_unlock; + } + consumed_pos += stream->max_sb_size; + } + + if (relayd_id == (uint64_t) -1ULL) { + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot close out_fd"); + goto end_unlock; + } + stream->out_fd = -1; + } else { + close_relayd_stream(stream); + stream->net_seq_idx = (uint64_t) -1ULL; + } + pthread_mutex_unlock(&stream->lock); + } + + /* All good! */ + ret = 0; + goto end; + +end_unlock: + pthread_mutex_unlock(&stream->lock); +end: + rcu_read_unlock(); + return ret; +} + +/* + * Read the whole metadata available for a snapshot. + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, + struct lttng_consumer_local_data *ctx) +{ + struct lttng_consumer_channel *metadata_channel; + struct lttng_consumer_stream *metadata_stream; + int ret; + + DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", + key, path); + + rcu_read_lock(); + + metadata_channel = consumer_find_channel(key); + if (!metadata_channel) { + ERR("Snapshot kernel metadata channel not found for key %lu", key); + ret = -1; + goto end; + } + + metadata_stream = metadata_channel->metadata_stream; + assert(metadata_stream); + + ret = utils_create_stream_file(path, metadata_stream->name, + metadata_stream->chan->tracefile_size, + metadata_stream->tracefile_count_current, + metadata_stream->uid, metadata_stream->gid); + if (ret < 0) { + goto end; + } + metadata_stream->out_fd = ret; + + ret = 0; + while (ret >= 0) { + ret = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); + if (ret < 0) { + if (ret != -EPERM) { + ERR("Kernel snapshot reading subbuffer"); + goto end; + } + /* "ret" is negative at this point so we will exit the loop. */ + continue; + } + } + + ret = 0; +end: + rcu_read_unlock(); + return ret; +} + +/* + * Receive command from session daemon and process it. + * + * Return 1 on success else a negative value or 0. + */ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { @@ -92,6 +414,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + if (ret > 0) { + ret = -1; + } return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { @@ -121,21 +446,22 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; + int ret_recv; /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_fatal; } - DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, msg.u.channel.session_id, msg.u.channel.pathname, msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid, msg.u.channel.relayd_id, msg.u.channel.output, msg.u.channel.tracefile_size, - msg.u.channel.tracefile_count); + msg.u.channel.tracefile_count, + msg.u.channel.monitor); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -154,21 +480,31 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, }; if (ctx->on_recv_channel != NULL) { - ret = ctx->on_recv_channel(new_channel); - if (ret == 0) { - consumer_add_channel(new_channel, ctx); - } else if (ret < 0) { + ret_recv = ctx->on_recv_channel(new_channel); + if (ret_recv == 0) { + ret = consumer_add_channel(new_channel, ctx); + } else if (ret_recv < 0) { goto end_nosignal; } } else { - consumer_add_channel(new_channel, ctx); + ret = consumer_add_channel(new_channel, ctx); } + + /* If we received an error in add_channel, we need to report it. */ + if (ret < 0) { + ret = consumer_send_status_msg(sock, ret); + if (ret < 0) { + goto error_fatal; + } + goto end_nosignal; + } + goto end_nosignal; } case LTTNG_CONSUMER_ADD_STREAM: { - int fd, stream_pipe; - struct consumer_relayd_sock_pair *relayd = NULL; + int fd; + struct lttng_pipe *stream_pipe; struct lttng_consumer_stream *new_stream; struct lttng_consumer_channel *channel; int alloc_ret = 0; @@ -189,10 +525,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0 || ret_code != LTTNG_OK) { + if (ret < 0) { /* - * Somehow, the session daemon is not responding anymore or the - * channel was not found. + * Somehow, the session daemon is not responding + * anymore. + */ + goto error_fatal; + } + if (ret_code != LTTNG_OK) { + /* + * Channel was not found. */ goto end_nosignal; } @@ -245,6 +587,28 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } new_stream->chan = channel; new_stream->wait_fd = fd; + switch (channel->output) { + case CONSUMER_CHANNEL_SPLICE: + new_stream->output = LTTNG_EVENT_SPLICE; + break; + case CONSUMER_CHANNEL_MMAP: + new_stream->output = LTTNG_EVENT_MMAP; + break; + default: + ERR("Stream output unknown %d", channel->output); + goto end_nosignal; + } + + /* + * We've just assigned the channel to the stream so increment the + * refcount right now. We don't need to increment the refcount for + * streams in no monitor because we handle manually the cleanup of + * those. It is very important to make sure there is NO prior + * consumer_del_stream() calls or else the refcount will be unbalanced. + */ + if (channel->monitor) { + uatomic_inc(&new_stream->chan->refcount); + } /* * The buffer flush is done on the session daemon side for the kernel @@ -255,22 +619,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ new_stream->hangup_flush_done = 0; - /* The stream is not metadata. Get relayd reference if exists. */ - relayd = consumer_find_relayd(new_stream->net_seq_idx); - if (relayd != NULL) { - /* Add stream on the relayd */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_add_stream(&relayd->control_sock, - new_stream->name, new_stream->chan->pathname, - &new_stream->relayd_stream_id); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - consumer_del_stream(new_stream, NULL); - goto end_nosignal; - } - } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) { - ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", - new_stream->net_seq_idx); + ret = send_relayd_stream(new_stream, NULL); + if (ret < 0) { consumer_del_stream(new_stream, NULL); goto end_nosignal; } @@ -283,20 +633,32 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } + if (new_stream->metadata_flag) { + channel->metadata_stream = new_stream; + } + + /* Do not monitor this stream. */ + if (!channel->monitor) { + DBG("Kernel consumer add stream %s in no monitor mode with" + "relayd id %" PRIu64, new_stream->name, + new_stream->relayd_stream_id); + cds_list_add(&new_stream->no_monitor_node, + &channel->stream_no_monitor_list.head); + break; + } + /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { - stream_pipe = ctx->consumer_metadata_pipe[1]; + stream_pipe = ctx->consumer_metadata_pipe; } else { - stream_pipe = ctx->consumer_data_pipe[1]; + stream_pipe = ctx->consumer_data_pipe; } - do { - ret = write(stream_pipe, &new_stream, sizeof(new_stream)); - } while (ret < 0 && errno == EINTR); + ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); if (ret < 0) { - PERROR("Consumer write %s stream to pipe %d", + ERR("Consumer write %s stream to pipe %d", new_stream->metadata_flag ? "metadata" : "data", - stream_pipe); + lttng_pipe_get_writefd(stream_pipe)); consumer_del_stream(new_stream, NULL); goto end_nosignal; } @@ -341,7 +703,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_fatal; } goto end_nosignal; @@ -359,6 +721,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); if (ret < 0) { PERROR("send data pending ret code"); + goto error_fatal; } /* @@ -367,6 +730,65 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ break; } + case LTTNG_CONSUMER_SNAPSHOT_CHANNEL: + { + if (msg.u.snapshot_channel.metadata == 1) { + ret = lttng_kconsumer_snapshot_metadata(msg.u.snapshot_channel.key, + msg.u.snapshot_channel.pathname, ctx); + if (ret < 0) { + ERR("Snapshot metadata failed"); + ret_code = LTTNG_ERR_KERN_META_FAIL; + } + } else { + ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, ctx); + if (ret < 0) { + ERR("Snapshot channel failed"); + ret_code = LTTNG_ERR_KERN_CHAN_FAIL; + } + } + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } + case LTTNG_CONSUMER_DESTROY_CHANNEL: + { + uint64_t key = msg.u.destroy_channel.key; + struct lttng_consumer_channel *channel; + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Kernel consumer destroy channel %" PRIu64 " not found", key); + ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND; + } + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + /* + * This command should ONLY be issued for channel with streams set in + * no monitor mode. + */ + assert(!channel->monitor); + + /* + * The refcount should ALWAYS be 0 in the case of a channel in no + * monitor mode. + */ + assert(!uatomic_sub_return(&channel->refcount, 1)); + + consumer_del_channel(channel); + + goto end_nosignal; + } default: goto end_nosignal; } @@ -379,6 +801,11 @@ end_nosignal: * shutdown during the recv() or send() call. */ return 1; + +error_fatal: + rcu_read_unlock(); + /* This will issue a consumer stop. */ + return -1; } /* @@ -418,8 +845,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } switch (stream->chan->output) { - case LTTNG_EVENT_SPLICE: - + case CONSUMER_CHANNEL_SPLICE: /* * XXX: The lttng-modules splice "actor" does not handle copying * partial pages hence only using the subbuffer size without the @@ -444,7 +870,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ret, subbuf_size); } break; - case LTTNG_EVENT_MMAP: + case CONSUMER_CHANNEL_MMAP: /* Get subbuffer size without padding */ err = kernctl_get_subbuf_size(infd, &subbuf_size); if (err != 0) { @@ -507,8 +933,11 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) assert(stream); - /* Don't create anything if this is set for streaming. */ - if (stream->net_seq_idx == (uint64_t) -1ULL) { + /* + * Don't create anything if this is set for streaming or should not be + * monitored. + */ + if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { ret = utils_create_stream_file(stream->chan->pathname, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, stream->uid, stream->gid);