X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=bdf0eff7c4a2b56a72bdda1f5bd06ea35e213a98;hp=84be2db463fb0b8cd8a3a6107fd321531eef2aa9;hb=4891ece8d4eeb2645efdf1467680b037c63ab425;hpb=e2039c7a8a76decaf4403d13f2ec420fe33ef2be diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 84be2db46..bdf0eff7c 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -104,68 +104,6 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, return ret; } -/* - * Find a relayd and send the stream - * - * Returns 0 on success, < 0 on error - */ -static -int send_relayd_stream(struct lttng_consumer_stream *stream, char *path) -{ - struct consumer_relayd_sock_pair *relayd; - int ret = 0; - char *stream_path; - - if (path != NULL) { - stream_path = path; - } else { - stream_path = stream->chan->pathname; - } - /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - /* Add stream on the relayd */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_add_stream(&relayd->control_sock, - stream->name, stream_path, - &stream->relayd_stream_id, - stream->chan->tracefile_size, - stream->chan->tracefile_count); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - goto end; - } - uatomic_inc(&relayd->refcount); - } else if (stream->net_seq_idx != (uint64_t) -1ULL) { - ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", - stream->net_seq_idx); - ret = -1; - goto end; - } - -end: - rcu_read_unlock(); - return ret; -} - -/* - * Find a relayd and close the stream - */ -static -void close_relayd_stream(struct lttng_consumer_stream *stream) -{ - struct consumer_relayd_sock_pair *relayd; - - /* The stream is not metadata. Get relayd reference if exists. */ - rcu_read_lock(); - relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { - consumer_stream_relayd_close(stream, relayd); - } - rcu_read_unlock(); -} - /* * Take a snapshot of all the stream of a channel * @@ -197,25 +135,28 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, goto end; } - cds_list_for_each_entry(stream, &channel->stream_no_monitor_list.head, - no_monitor_node) { + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { /* * Lock stream because we are about to change its state. */ pthread_mutex_lock(&stream->lock); + /* + * Assign the received relayd ID so we can use it for streaming. The streams + * are not visible to anyone so this is OK to change it. + */ stream->net_seq_idx = relayd_id; channel->relayd_id = relayd_id; if (relayd_id != (uint64_t) -1ULL) { - ret = send_relayd_stream(stream, path); + ret = consumer_send_relayd_stream(stream, path); if (ret < 0) { ERR("sending stream to relayd"); goto end_unlock; } - DBG("Stream %s sent to the relayd", stream->name); } else { ret = utils_create_stream_file(path, stream->name, - stream->chan->tracefile_size, stream->tracefile_count_current, + stream->chan->tracefile_size, + stream->tracefile_count_current, stream->uid, stream->gid); if (ret < 0) { ERR("utils_create_stream_file"); @@ -231,7 +172,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { - ERR("Failed to flush kernel metadata stream"); + ERR("Failed to flush kernel stream"); goto end_unlock; } @@ -282,22 +223,21 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = kernctl_get_subbuf_size(stream->wait_fd, &len); if (ret < 0) { ERR("Snapshot kernctl_get_subbuf_size"); - goto end_unlock; + goto error_put_subbuf; } ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len); if (ret < 0) { ERR("Snapshot kernctl_get_padded_subbuf_size"); - goto end_unlock; + goto error_put_subbuf; } read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, padded_len - len); /* - * We write the padded len in local tracefiles but the - * data len when using a relay. - * Display the error but continue processing to try to - * release the subbuffer. + * We write the padded len in local tracefiles but the data len + * when using a relay. Display the error but continue processing + * to try to release the subbuffer. */ if (relayd_id != (uint64_t) -1ULL) { if (read_len != len) { @@ -337,6 +277,11 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = 0; goto end; +error_put_subbuf: + ret = kernctl_put_subbuf(stream->wait_fd); + if (ret < 0) { + ERR("Snapshot kernctl_put_subbuf error path"); + } end_unlock: pthread_mutex_unlock(&stream->lock); end: @@ -352,9 +297,12 @@ end: int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, struct lttng_consumer_local_data *ctx) { + int ret, use_relayd = 0; + ssize_t ret_read; struct lttng_consumer_channel *metadata_channel; struct lttng_consumer_stream *metadata_stream; - int ret; + + assert(ctx); DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", key, path); @@ -363,58 +311,69 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, metadata_channel = consumer_find_channel(key); if (!metadata_channel) { - ERR("Snapshot kernel metadata channel not found for key %lu", key); + ERR("Kernel snapshot metadata not found for key %" PRIu64, key); ret = -1; - goto end; + goto error; } metadata_stream = metadata_channel->metadata_stream; assert(metadata_stream); + /* Flag once that we have a valid relayd for the stream. */ if (relayd_id != (uint64_t) -1ULL) { - ret = send_relayd_stream(metadata_stream, path); + use_relayd = 1; + } + + if (use_relayd) { + ret = consumer_send_relayd_stream(metadata_stream, path); if (ret < 0) { - ERR("sending stream to relayd"); + goto error; } - DBG("Stream %s sent to the relayd", metadata_stream->name); } else { ret = utils_create_stream_file(path, metadata_stream->name, metadata_stream->chan->tracefile_size, metadata_stream->tracefile_count_current, metadata_stream->uid, metadata_stream->gid); if (ret < 0) { - goto end; + goto error; } metadata_stream->out_fd = ret; } - ret = 0; - while (ret >= 0) { - ret = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); - if (ret < 0) { - if (ret != -EPERM) { - ERR("Kernel snapshot reading subbuffer"); - goto end; + do { + ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); + if (ret_read < 0) { + if (ret_read != -EPERM) { + ERR("Kernel snapshot reading metadata subbuffer (ret: %ld)", + ret_read); + goto error; } - /* "ret" is negative at this point so we will exit the loop. */ + /* ret_read is negative at this point so we will exit the loop. */ continue; } - } + } while (ret_read >= 0); - if (relayd_id == (uint64_t) -1ULL) { + if (use_relayd) { + close_relayd_stream(metadata_stream); + metadata_stream->net_seq_idx = (uint64_t) -1ULL; + } else { ret = close(metadata_stream->out_fd); if (ret < 0) { - PERROR("Kernel consumer snapshot close out_fd"); - goto end; + PERROR("Kernel consumer snapshot metadata close out_fd"); + /* + * Don't go on error here since the snapshot was successful at this + * point but somehow the close failed. + */ } metadata_stream->out_fd = -1; - } else { - close_relayd_stream(metadata_stream); - metadata_stream->net_seq_idx = (uint64_t) -1ULL; } ret = 0; -end: + + cds_list_del(&metadata_stream->send_node); + consumer_stream_destroy(metadata_stream, NULL); + metadata_channel->metadata_stream = NULL; +error: rcu_read_unlock(); return ret; } @@ -480,13 +439,24 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid, msg.u.channel.relayd_id, msg.u.channel.output, msg.u.channel.tracefile_size, - msg.u.channel.tracefile_count, + msg.u.channel.tracefile_count, 0, msg.u.channel.monitor); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams; + switch (msg.u.channel.output) { + case LTTNG_EVENT_SPLICE: + new_channel->output = CONSUMER_CHANNEL_SPLICE; + break; + case LTTNG_EVENT_MMAP: + new_channel->output = CONSUMER_CHANNEL_MMAP; + break; + default: + ERR("Channel output unknown %d", msg.u.channel.output); + goto end_nosignal; + } /* Translate and save channel type. */ switch (msg.u.channel.type) { @@ -546,20 +516,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { - /* - * Somehow, the session daemon is not responding - * anymore. - */ + /* Somehow, the session daemon is not responding anymore. */ goto error_fatal; } if (ret_code != LTTNG_OK) { - /* - * Channel was not found. - */ + /* Channel was not found. */ goto end_nosignal; } - /* block */ + /* Blocking call */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { rcu_read_unlock(); return -EINTR; @@ -594,7 +559,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, channel->session_id, msg.u.stream.cpu, &alloc_ret, - channel->type); + channel->type, + channel->monitor); if (new_stream == NULL) { switch (alloc_ret) { case -ENOMEM: @@ -605,6 +571,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } goto end_nosignal; } + new_stream->chan = channel; new_stream->wait_fd = fd; switch (channel->output) { @@ -642,7 +609,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ctx->on_recv_stream) { ret = ctx->on_recv_stream(new_stream); if (ret < 0) { - consumer_del_stream(new_stream, NULL); + consumer_stream_free(new_stream); goto end_nosignal; } } @@ -653,18 +620,20 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Do not monitor this stream. */ if (!channel->monitor) { - DBG("Kernel consumer add stream %s in no monitor mode with" + DBG("Kernel consumer add stream %s in no monitor mode with " "relayd id %" PRIu64, new_stream->name, - new_stream->relayd_stream_id); - cds_list_add(&new_stream->no_monitor_node, - &channel->stream_no_monitor_list.head); + new_stream->net_seq_idx); + cds_list_add(&new_stream->send_node, &channel->streams.head); break; } - ret = send_relayd_stream(new_stream, NULL); - if (ret < 0) { - consumer_del_stream(new_stream, NULL); - goto end_nosignal; + /* Send stream to relayd if the stream has an ID. */ + if (new_stream->net_seq_idx != (uint64_t) -1ULL) { + ret = consumer_send_relayd_stream(new_stream, NULL); + if (ret < 0) { + consumer_stream_free(new_stream); + goto end_nosignal; + } } /* Get the right pipe where the stream will be sent. */ @@ -679,7 +648,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ERR("Consumer write %s stream to pipe %d", new_stream->metadata_flag ? "metadata" : "data", lttng_pipe_get_writefd(stream_pipe)); - consumer_del_stream(new_stream, NULL); + consumer_stream_free(new_stream); goto end_nosignal; } @@ -994,11 +963,12 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) return 0; error_close_fd: - { + if (stream->out_fd >= 0) { int err; err = close(stream->out_fd); assert(!err); + stream->out_fd = -1; } error: return ret;