X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=ada5594b791e84e166138a42ab1678a1540dcc82;hb=41ad701255719d65645effd849c06b506b6b38d4;hp=84be2db463fb0b8cd8a3a6107fd321531eef2aa9;hpb=e2039c7a8a76decaf4403d13f2ec420fe33ef2be;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 84be2db46..ada5594b7 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -109,41 +109,46 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, * * Returns 0 on success, < 0 on error */ -static -int send_relayd_stream(struct lttng_consumer_stream *stream, char *path) +static int send_relayd_stream(struct lttng_consumer_stream *stream, + char *path) { - struct consumer_relayd_sock_pair *relayd; int ret = 0; - char *stream_path; + const char *stream_path; + struct consumer_relayd_sock_pair *relayd; + + assert(stream); + assert(stream->net_seq_idx != -1ULL); if (path != NULL) { stream_path = path; } else { stream_path = stream->chan->pathname; } + /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd != NULL) { /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_add_stream(&relayd->control_sock, - stream->name, stream_path, - &stream->relayd_stream_id, - stream->chan->tracefile_size, - stream->chan->tracefile_count); + ret = relayd_add_stream(&relayd->control_sock, stream->name, + stream_path, &stream->relayd_stream_id, + stream->chan->tracefile_size, stream->chan->tracefile_count); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { goto end; } uatomic_inc(&relayd->refcount); - } else if (stream->net_seq_idx != (uint64_t) -1ULL) { - ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", - stream->net_seq_idx); + } else { + ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.", + stream->key, stream->net_seq_idx); ret = -1; goto end; } + DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64, + stream->name, stream->key, stream->net_seq_idx); + end: rcu_read_unlock(); return ret; @@ -152,15 +157,14 @@ end: /* * Find a relayd and close the stream */ -static -void close_relayd_stream(struct lttng_consumer_stream *stream) +static void close_relayd_stream(struct lttng_consumer_stream *stream) { struct consumer_relayd_sock_pair *relayd; /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { + if (relayd) { consumer_stream_relayd_close(stream, relayd); } rcu_read_unlock(); @@ -204,6 +208,10 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, */ pthread_mutex_lock(&stream->lock); + /* + * Assign the received relayd ID so we can use it for streaming. The streams + * are not visible to anyone so this is OK to change it. + */ stream->net_seq_idx = relayd_id; channel->relayd_id = relayd_id; if (relayd_id != (uint64_t) -1ULL) { @@ -212,7 +220,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ERR("sending stream to relayd"); goto end_unlock; } - DBG("Stream %s sent to the relayd", stream->name); } else { ret = utils_create_stream_file(path, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, @@ -282,22 +289,21 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = kernctl_get_subbuf_size(stream->wait_fd, &len); if (ret < 0) { ERR("Snapshot kernctl_get_subbuf_size"); - goto end_unlock; + goto error_put_subbuf; } ret = kernctl_get_padded_subbuf_size(stream->wait_fd, &padded_len); if (ret < 0) { ERR("Snapshot kernctl_get_padded_subbuf_size"); - goto end_unlock; + goto error_put_subbuf; } read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, padded_len - len); /* - * We write the padded len in local tracefiles but the - * data len when using a relay. - * Display the error but continue processing to try to - * release the subbuffer. + * We write the padded len in local tracefiles but the data len + * when using a relay. Display the error but continue processing + * to try to release the subbuffer. */ if (relayd_id != (uint64_t) -1ULL) { if (read_len != len) { @@ -337,6 +343,11 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ret = 0; goto end; +error_put_subbuf: + ret = kernctl_put_subbuf(stream->wait_fd); + if (ret < 0) { + ERR("Snapshot kernctl_put_subbuf error path"); + } end_unlock: pthread_mutex_unlock(&stream->lock); end: @@ -480,13 +491,24 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.name, msg.u.channel.uid, msg.u.channel.gid, msg.u.channel.relayd_id, msg.u.channel.output, msg.u.channel.tracefile_size, - msg.u.channel.tracefile_count, + msg.u.channel.tracefile_count, 0, msg.u.channel.monitor); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; } new_channel->nb_init_stream_left = msg.u.channel.nb_init_streams; + switch (msg.u.channel.output) { + case LTTNG_EVENT_SPLICE: + new_channel->output = CONSUMER_CHANNEL_SPLICE; + break; + case LTTNG_EVENT_MMAP: + new_channel->output = CONSUMER_CHANNEL_MMAP; + break; + default: + ERR("Channel output unknown %d", msg.u.channel.output); + goto end_nosignal; + } /* Translate and save channel type. */ switch (msg.u.channel.type) { @@ -653,9 +675,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Do not monitor this stream. */ if (!channel->monitor) { - DBG("Kernel consumer add stream %s in no monitor mode with" + DBG("Kernel consumer add stream %s in no monitor mode with " "relayd id %" PRIu64, new_stream->name, - new_stream->relayd_stream_id); + new_stream->net_seq_idx); cds_list_add(&new_stream->no_monitor_node, &channel->stream_no_monitor_list.head); break;