X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=8416bc5a3716fb5400cbeb49ae373858a2137144;hp=0de73443e1bee5c51549e608d20b1548cb007257;hb=6a00837f8cb0431a2ad90974d67fae138ea97dd5;hpb=d8ef542d25837bdfb960e5df2a91c5d18f5ef401 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 0de73443e..8416bc5a3 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -34,7 +34,10 @@ #include #include #include +#include #include +#include +#include #include "kernel-consumer.h" @@ -81,6 +84,319 @@ int lttng_kconsumer_get_produced_snapshot(struct lttng_consumer_stream *stream, return ret; } +/* + * Get the consumerd position + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, + unsigned long *pos) +{ + int ret; + int infd = stream->wait_fd; + + ret = kernctl_snapshot_get_consumed(infd, pos); + if (ret != 0) { + errno = -ret; + perror("kernctl_snapshot_get_consumed"); + } + + return ret; +} + +/* + * Take a snapshot of all the stream of a channel + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, + uint64_t relayd_id, uint64_t max_stream_size, + struct lttng_consumer_local_data *ctx) +{ + int ret; + unsigned long consumed_pos, produced_pos; + struct lttng_consumer_channel *channel; + struct lttng_consumer_stream *stream; + + DBG("Kernel consumer snapshot channel %" PRIu64, key); + + rcu_read_lock(); + + channel = consumer_find_channel(key); + if (!channel) { + ERR("No channel found for key %" PRIu64, key); + ret = -1; + goto end; + } + + /* Splice is not supported yet for channel snapshot. */ + if (channel->output != CONSUMER_CHANNEL_MMAP) { + ERR("Unsupported output %d", channel->output); + ret = -1; + goto end; + } + + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + /* + * Lock stream because we are about to change its state. + */ + pthread_mutex_lock(&stream->lock); + + /* + * Assign the received relayd ID so we can use it for streaming. The streams + * are not visible to anyone so this is OK to change it. + */ + stream->net_seq_idx = relayd_id; + channel->relayd_id = relayd_id; + if (relayd_id != (uint64_t) -1ULL) { + ret = consumer_send_relayd_stream(stream, path); + if (ret < 0) { + ERR("sending stream to relayd"); + goto end_unlock; + } + } else { + ret = utils_create_stream_file(path, stream->name, + stream->chan->tracefile_size, + stream->tracefile_count_current, + stream->uid, stream->gid); + if (ret < 0) { + ERR("utils_create_stream_file"); + goto end_unlock; + } + + stream->out_fd = ret; + stream->tracefile_size_current = 0; + + DBG("Kernel consumer snapshot stream %s/%s (%lu)", path, + stream->name, stream->key); + } + + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end_unlock; + } + + ret = lttng_kconsumer_take_snapshot(stream); + if (ret < 0) { + ERR("Taking kernel snapshot"); + goto end_unlock; + } + + ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos); + if (ret < 0) { + ERR("Produced kernel snapshot position"); + goto end_unlock; + } + + ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos); + if (ret < 0) { + ERR("Consumerd kernel snapshot position"); + goto end_unlock; + } + + if (stream->max_sb_size == 0) { + ret = kernctl_get_max_subbuf_size(stream->wait_fd, + &stream->max_sb_size); + if (ret < 0) { + ERR("Getting kernel max_sb_size"); + goto end_unlock; + } + } + + /* + * The original value is sent back if max stream size is larger than + * the possible size of the snapshot. Also, we asume that the session + * daemon should never send a maximum stream size that is lower than + * subbuffer size. + */ + consumed_pos = consumer_get_consumed_maxsize(consumed_pos, + produced_pos, max_stream_size); + + while (consumed_pos < produced_pos) { + ssize_t read_len; + unsigned long len, padded_len; + + DBG("Kernel consumer taking snapshot at pos %lu", consumed_pos); + + ret = kernctl_get_subbuf(stream->wait_fd, &consumed_pos); + if (ret < 0) { + if (errno != EAGAIN) { + PERROR("kernctl_get_subbuf snapshot"); + goto end_unlock; + } + DBG("Kernel consumer get subbuf failed. Skipping it."); + consumed_pos += stream->max_sb_size; + continue; + } + + ret = kernctl_get_subbuf_size(stream->wait_fd, &len); + if (ret < 0) { + ERR("Snapshot kernctl_get_subbuf_size"); + goto error_put_subbuf; + } + + ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len); + if (ret < 0) { + ERR("Snapshot kernctl_get_padded_subbuf_size"); + goto error_put_subbuf; + } + + read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, + padded_len - len); + /* + * We write the padded len in local tracefiles but the data len + * when using a relay. Display the error but continue processing + * to try to release the subbuffer. + */ + if (relayd_id != (uint64_t) -1ULL) { + if (read_len != len) { + ERR("Error sending to the relay (ret: %zd != len: %lu)", + read_len, len); + } + } else { + if (read_len != padded_len) { + ERR("Error writing to tracefile (ret: %zd != len: %lu)", + read_len, padded_len); + } + } + + ret = kernctl_put_subbuf(stream->wait_fd); + if (ret < 0) { + ERR("Snapshot kernctl_put_subbuf"); + goto end_unlock; + } + consumed_pos += stream->max_sb_size; + } + + if (relayd_id == (uint64_t) -1ULL) { + if (stream->out_fd >= 0) { + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot close out_fd"); + goto end_unlock; + } + stream->out_fd = -1; + } + } else { + close_relayd_stream(stream); + stream->net_seq_idx = (uint64_t) -1ULL; + } + pthread_mutex_unlock(&stream->lock); + } + + /* All good! */ + ret = 0; + goto end; + +error_put_subbuf: + ret = kernctl_put_subbuf(stream->wait_fd); + if (ret < 0) { + ERR("Snapshot kernctl_put_subbuf error path"); + } +end_unlock: + pthread_mutex_unlock(&stream->lock); +end: + rcu_read_unlock(); + return ret; +} + +/* + * Read the whole metadata available for a snapshot. + * + * Returns 0 on success, < 0 on error + */ +int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, + uint64_t relayd_id, struct lttng_consumer_local_data *ctx) +{ + int ret, use_relayd = 0; + ssize_t ret_read; + struct lttng_consumer_channel *metadata_channel; + struct lttng_consumer_stream *metadata_stream; + + assert(ctx); + + DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", + key, path); + + rcu_read_lock(); + + metadata_channel = consumer_find_channel(key); + if (!metadata_channel) { + ERR("Kernel snapshot metadata not found for key %" PRIu64, key); + ret = -1; + goto error; + } + + metadata_stream = metadata_channel->metadata_stream; + assert(metadata_stream); + + /* Flag once that we have a valid relayd for the stream. */ + if (relayd_id != (uint64_t) -1ULL) { + use_relayd = 1; + } + + if (use_relayd) { + ret = consumer_send_relayd_stream(metadata_stream, path); + if (ret < 0) { + goto error; + } + } else { + ret = utils_create_stream_file(path, metadata_stream->name, + metadata_stream->chan->tracefile_size, + metadata_stream->tracefile_count_current, + metadata_stream->uid, metadata_stream->gid); + if (ret < 0) { + goto error; + } + metadata_stream->out_fd = ret; + } + + do { + ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); + if (ret_read < 0) { + if (ret_read != -EPERM) { + ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)", + ret_read); + goto error; + } + /* ret_read is negative at this point so we will exit the loop. */ + continue; + } + } while (ret_read >= 0); + + if (use_relayd) { + close_relayd_stream(metadata_stream); + metadata_stream->net_seq_idx = (uint64_t) -1ULL; + } else { + if (metadata_stream->out_fd >= 0) { + ret = close(metadata_stream->out_fd); + if (ret < 0) { + PERROR("Kernel consumer snapshot metadata close out_fd"); + /* + * Don't go on error here since the snapshot was successful at this + * point but somehow the close failed. + */ + } + metadata_stream->out_fd = -1; + } + } + + ret = 0; + + cds_list_del(&metadata_stream->send_node); + consumer_stream_destroy(metadata_stream, NULL); + metadata_channel->metadata_stream = NULL; +error: + rcu_read_unlock(); + return ret; +} + +/* + * Receive command from session daemon and process it. + * + * Return 1 on success else a negative value or 0. + */ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll) { @@ -90,7 +406,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg)); if (ret != sizeof(msg)) { - lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + if (ret > 0) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD); + ret = -1; + } return ret; } if (msg.cmd_type == LTTNG_CONSUMER_STOP) { @@ -120,24 +439,38 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, case LTTNG_CONSUMER_ADD_CHANNEL: { struct lttng_consumer_channel *new_channel; + int ret_recv; /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_fatal; } - DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key); new_channel = consumer_allocate_channel(msg.u.channel.channel_key, msg.u.channel.session_id, msg.u.channel.pathname, msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid, - msg.u.channel.relayd_id, msg.u.channel.output); + msg.u.channel.relayd_id, msg.u.channel.output, + msg.u.channel.tracefile_size, + msg.u.channel.tracefile_count, 0, + msg.u.channel.monitor); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams; + switch (msg.u.channel.output) { + case LTTNG_EVENT_SPLICE: + new_channel->output = CONSUMER_CHANNEL_SPLICE; + break; + case LTTNG_EVENT_MMAP: + new_channel->output = CONSUMER_CHANNEL_MMAP; + break; + default: + ERR("Channel output unknown %d", msg.u.channel.output); + goto end_nosignal; + } /* Translate and save channel type. */ switch (msg.u.channel.type) { @@ -151,21 +484,31 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, }; if (ctx->on_recv_channel != NULL) { - ret = ctx->on_recv_channel(new_channel); - if (ret == 0) { - consumer_add_channel(new_channel, ctx); - } else if (ret < 0) { + ret_recv = ctx->on_recv_channel(new_channel); + if (ret_recv == 0) { + ret = consumer_add_channel(new_channel, ctx); + } else if (ret_recv < 0) { goto end_nosignal; } } else { - consumer_add_channel(new_channel, ctx); + ret = consumer_add_channel(new_channel, ctx); } + + /* If we received an error in add_channel, we need to report it. */ + if (ret < 0) { + ret = consumer_send_status_msg(sock, ret); + if (ret < 0) { + goto error_fatal; + } + goto end_nosignal; + } + goto end_nosignal; } case LTTNG_CONSUMER_ADD_STREAM: { - int fd, stream_pipe; - struct consumer_relayd_sock_pair *relayd = NULL; + int fd; + struct lttng_pipe *stream_pipe; struct lttng_consumer_stream *new_stream; struct lttng_consumer_channel *channel; int alloc_ret = 0; @@ -186,15 +529,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0 || ret_code != LTTNG_OK) { - /* - * Somehow, the session daemon is not responding anymore or the - * channel was not found. - */ + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto error_fatal; + } + if (ret_code != LTTNG_OK) { + /* Channel was not found. */ goto end_nosignal; } - /* block */ + /* Blocking call */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { rcu_read_unlock(); return -EINTR; @@ -229,7 +573,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel->session_id, msg.u.stream.cpu, &alloc_ret, - channel->type); + channel->type, + channel->monitor); if (new_stream == NULL) { switch (alloc_ret) { case -ENOMEM: @@ -240,8 +585,31 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } goto end_nosignal; } + new_stream->chan = channel; new_stream->wait_fd = fd; + switch (channel->output) { + case CONSUMER_CHANNEL_SPLICE: + new_stream->output = LTTNG_EVENT_SPLICE; + break; + case CONSUMER_CHANNEL_MMAP: + new_stream->output = LTTNG_EVENT_MMAP; + break; + default: + ERR("Stream output unknown %d", channel->output); + goto end_nosignal; + } + + /* + * We've just assigned the channel to the stream so increment the + * refcount right now. We don't need to increment the refcount for + * streams in no monitor because we handle manually the cleanup of + * those. It is very important to make sure there is NO prior + * consumer_del_stream() calls or else the refcount will be unbalanced. + */ + if (channel->monitor) { + uatomic_inc(&new_stream->chan->refcount); + } /* * The buffer flush is done on the session daemon side for the kernel @@ -252,49 +620,71 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ new_stream->hangup_flush_done = 0; - /* The stream is not metadata. Get relayd reference if exists. */ - relayd = consumer_find_relayd(new_stream->net_seq_idx); - if (relayd != NULL) { - /* Add stream on the relayd */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_add_stream(&relayd->control_sock, - new_stream->name, new_stream->chan->pathname, - &new_stream->relayd_stream_id); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ctx->on_recv_stream) { + ret = ctx->on_recv_stream(new_stream); if (ret < 0) { - consumer_del_stream(new_stream, NULL); + consumer_stream_free(new_stream); goto end_nosignal; } - } else if (new_stream->net_seq_idx != (uint64_t) -1ULL) { - ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", + } + + if (new_stream->metadata_flag) { + channel->metadata_stream = new_stream; + } + + /* Do not monitor this stream. */ + if (!channel->monitor) { + DBG("Kernel consumer add stream %s in no monitor mode with " + "relayd id %" PRIu64, new_stream->name, new_stream->net_seq_idx); - consumer_del_stream(new_stream, NULL); - goto end_nosignal; + cds_list_add(&new_stream->send_node, &channel->streams.head); + break; } - if (ctx->on_recv_stream) { - ret = ctx->on_recv_stream(new_stream); + /* Send stream to relayd if the stream has an ID. */ + if (new_stream->net_seq_idx != (uint64_t) -1ULL) { + ret = consumer_send_relayd_stream(new_stream, + new_stream->chan->pathname); if (ret < 0) { - consumer_del_stream(new_stream, NULL); + consumer_stream_free(new_stream); goto end_nosignal; } } /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { - stream_pipe = ctx->consumer_metadata_pipe[1]; + ret = consumer_add_metadata_stream(new_stream); + if (ret) { + ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_stream_free(new_stream); + goto end_nosignal; + } + stream_pipe = ctx->consumer_metadata_pipe; } else { - stream_pipe = ctx->consumer_data_pipe[1]; + ret = consumer_add_data_stream(new_stream); + if (ret) { + ERR("Consumer add stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_stream_free(new_stream); + goto end_nosignal; + } + stream_pipe = ctx->consumer_data_pipe; } - do { - ret = write(stream_pipe, &new_stream, sizeof(new_stream)); - } while (ret < 0 && errno == EINTR); + /* Vitible to other threads */ + new_stream->globally_visible = 1; + + ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); if (ret < 0) { - PERROR("Consumer write %s stream to pipe %d", + ERR("Consumer write %s stream to pipe %d", new_stream->metadata_flag ? "metadata" : "data", - stream_pipe); - consumer_del_stream(new_stream, NULL); + lttng_pipe_get_writefd(stream_pipe)); + if (new_stream->metadata_flag) { + consumer_del_stream_for_metadata(new_stream); + } else { + consumer_del_stream_for_data(new_stream); + } goto end_nosignal; } @@ -338,7 +728,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_fatal; } goto end_nosignal; @@ -356,6 +746,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret)); if (ret < 0) { PERROR("send data pending ret code"); + goto error_fatal; } /* @@ -364,6 +755,68 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, */ break; } + case LTTNG_CONSUMER_SNAPSHOT_CHANNEL: + { + if (msg.u.snapshot_channel.metadata == 1) { + ret = lttng_kconsumer_snapshot_metadata(msg.u.snapshot_channel.key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, ctx); + if (ret < 0) { + ERR("Snapshot metadata failed"); + ret_code = LTTNG_ERR_KERN_META_FAIL; + } + } else { + ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key, + msg.u.snapshot_channel.pathname, + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.max_stream_size, + ctx); + if (ret < 0) { + ERR("Snapshot channel failed"); + ret_code = LTTNG_ERR_KERN_CHAN_FAIL; + } + } + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } + case LTTNG_CONSUMER_DESTROY_CHANNEL: + { + uint64_t key = msg.u.destroy_channel.key; + struct lttng_consumer_channel *channel; + + channel = consumer_find_channel(key); + if (!channel) { + ERR("Kernel consumer destroy channel %" PRIu64 " not found", key); + ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND; + } + + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + /* + * This command should ONLY be issued for channel with streams set in + * no monitor mode. + */ + assert(!channel->monitor); + + /* + * The refcount should ALWAYS be 0 in the case of a channel in no + * monitor mode. + */ + assert(!uatomic_sub_return(&channel->refcount, 1)); + + consumer_del_channel(channel); + + goto end_nosignal; + } default: goto end_nosignal; } @@ -376,6 +829,11 @@ end_nosignal: * shutdown during the recv() or send() call. */ return 1; + +error_fatal: + rcu_read_unlock(); + /* This will issue a consumer stop. */ + return -1; } /* @@ -415,8 +873,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } switch (stream->chan->output) { - case LTTNG_EVENT_SPLICE: - + case CONSUMER_CHANNEL_SPLICE: /* * XXX: The lttng-modules splice "actor" does not handle copying * partial pages hence only using the subbuffer size without the @@ -441,7 +898,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ret, subbuf_size); } break; - case LTTNG_EVENT_MMAP: + case CONSUMER_CHANNEL_MMAP: /* Get subbuffer size without padding */ err = kernctl_get_subbuf_size(infd, &subbuf_size); if (err != 0) { @@ -501,26 +958,22 @@ end: int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) { int ret; - char full_path[PATH_MAX]; assert(stream); - ret = snprintf(full_path, sizeof(full_path), "%s/%s", - stream->chan->pathname, stream->name); - if (ret < 0) { - PERROR("snprintf on_recv_stream"); - goto error; - } - - /* Opening the tracefile in write mode */ - if (stream->net_seq_idx == (uint64_t) -1ULL) { - ret = run_as_open(full_path, O_WRONLY | O_CREAT | O_TRUNC, - S_IRWXU|S_IRWXG|S_IRWXO, stream->uid, stream->gid); + /* + * Don't create anything if this is set for streaming or should not be + * monitored. + */ + if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) { + ret = utils_create_stream_file(stream->chan->pathname, stream->name, + stream->chan->tracefile_size, stream->tracefile_count_current, + stream->uid, stream->gid); if (ret < 0) { - PERROR("open kernel stream path %s", full_path); goto error; } stream->out_fd = ret; + stream->tracefile_size_current = 0; } if (stream->output == LTTNG_EVENT_MMAP) { @@ -548,11 +1001,12 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) return 0; error_close_fd: - { + if (stream->out_fd >= 0) { int err; err = close(stream->out_fd); assert(!err); + stream->out_fd = -1; } error: return ret; @@ -572,6 +1026,11 @@ int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream) assert(stream); + if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { + ret = 0; + goto end; + } + ret = kernctl_get_next_subbuf(stream->wait_fd); if (ret == 0) { /* There is still data so let's put back this subbuffer. */