X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=0005b1039cb27d6ced5dfac3537e72bbf737fce0;hp=6830084e8c5a959f7407f80e0a35061089a738a2;hb=10a5031171c7bca5b4498c871b119e5a88b6a3fb;hpb=da009f2c0baef56316db8087abd1867f99969374 diff --git a/src/common/consumer.c b/src/common/consumer.c index 6830084e8..0005b1039 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -301,8 +301,11 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, send_node) { cds_list_del(&stream->send_node); - lttng_ustconsumer_del_stream(stream); - free(stream); + /* + * Once a stream is added to this list, the buffers were created so + * we have a guarantee that this call will succeed. + */ + consumer_stream_destroy(stream, NULL); } lttng_ustconsumer_del_channel(channel); break; @@ -312,25 +315,6 @@ void consumer_del_channel(struct lttng_consumer_channel *channel) goto end; } - /* Empty no monitor streams list. */ - if (!channel->monitor) { - struct lttng_consumer_stream *stream, *stmp; - - /* - * So, these streams are not visible to any data thread. This is why we - * close and free them because they were never added to any data - * structure apart from this one. - */ - cds_list_for_each_entry_safe(stream, stmp, - &channel->stream_no_monitor_list.head, no_monitor_node) { - cds_list_del(&stream->no_monitor_node); - /* Close everything in that stream. */ - consumer_stream_close(stream); - /* Free the ressource. */ - consumer_stream_free(stream); - } - } - rcu_read_lock(); iter.iter.node = &channel->node.node; ret = lttng_ht_del(consumer_data.channel_ht, &iter); @@ -693,6 +677,66 @@ error: return relayd; } +/* + * Find a relayd and send the stream + * + * Returns 0 on success, < 0 on error + */ +int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, + char *path) +{ + int ret = 0; + struct consumer_relayd_sock_pair *relayd; + + assert(stream); + assert(stream->net_seq_idx != -1ULL); + assert(path); + + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd != NULL) { + /* Add stream on the relayd */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_add_stream(&relayd->control_sock, stream->name, + path, &stream->relayd_stream_id, + stream->chan->tracefile_size, stream->chan->tracefile_count); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + uatomic_inc(&relayd->refcount); + } else { + ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.", + stream->key, stream->net_seq_idx); + ret = -1; + goto end; + } + + DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64, + stream->name, stream->key, stream->net_seq_idx); + +end: + rcu_read_unlock(); + return ret; +} + +/* + * Find a relayd and close the stream + */ +void close_relayd_stream(struct lttng_consumer_stream *stream) +{ + struct consumer_relayd_sock_pair *relayd; + + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd) { + consumer_stream_relayd_close(stream, relayd); + } + rcu_read_unlock(); +} + /* * Handle stream for relayd transmission if the stream applies for network * streaming where the net sequence index is set. @@ -816,7 +860,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->wait_fd = -1; CDS_INIT_LIST_HEAD(&channel->streams.head); - CDS_INIT_LIST_HEAD(&channel->stream_no_monitor_list.head); DBG("Allocated channel (key %" PRIu64 ")", channel->key) @@ -856,7 +899,7 @@ end: pthread_mutex_unlock(&consumer_data.lock); if (!ret && channel->wait_fd != -1 && - channel->metadata_stream == NULL) { + channel->type == CONSUMER_CHANNEL_TYPE_DATA) { notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD); } return ret; @@ -2744,7 +2787,6 @@ restart: lttng_poll_del(&events, chan->wait_fd); ret = lttng_ht_del(channel_ht, &iter); assert(ret == 0); - assert(cds_list_empty(&chan->streams.head)); consumer_close_channel_streams(chan); /* Release our own refcount */ @@ -3017,8 +3059,9 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, /* Not found. Allocate one. */ relayd = consumer_allocate_relayd_sock_pair(net_seq_idx); if (relayd == NULL) { - ret_code = LTTCOMM_CONSUMERD_ENOMEM; ret = -ENOMEM; + ret_code = LTTCOMM_CONSUMERD_ENOMEM; + goto error; } else { relayd->sessiond_session_id = (uint64_t) sessiond_id; relayd_created = 1; @@ -3037,23 +3080,23 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, } /* First send a status message before receiving the fds. */ - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0 || ret_code != LTTNG_OK) { + ret = consumer_send_status_msg(sock, LTTNG_OK); + if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto error; + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL); + goto error_nosignal; } /* Poll on consumer socket. */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); ret = -EINTR; - goto error; + goto error_nosignal; } /* Get relayd socket from session daemon */ ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { - ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD; ret = -1; fd = -1; /* Just in case it gets set with an invalid value. */ @@ -3067,18 +3110,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, * issue when reaching the fd limit. */ lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); - - /* - * This code path MUST continue to the consumer send status message so - * we can send the error to the thread expecting a reply. The above - * call will make everything stop. - */ - } - - /* We have the fds without error. Send status back. */ - ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0 || ret_code != LTTNG_OK) { - /* Somehow, the session daemon is not responding anymore. */ + ret_code = LTTCOMM_CONSUMERD_ERROR_RECV_FD; goto error; } @@ -3090,6 +3122,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, ret = lttcomm_create_sock(&relayd->control_sock.sock); /* Handle create_sock error. */ if (ret < 0) { + ret_code = LTTCOMM_CONSUMERD_ENOMEM; goto error; } /* @@ -3128,6 +3161,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, */ (void) relayd_close(&relayd->control_sock); (void) relayd_close(&relayd->data_sock); + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; goto error; } @@ -3138,6 +3172,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, ret = lttcomm_create_sock(&relayd->data_sock.sock); /* Handle create_sock error. */ if (ret < 0) { + ret_code = LTTCOMM_CONSUMERD_ENOMEM; goto error; } /* @@ -3159,6 +3194,7 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, default: ERR("Unknown relayd socket type (%d)", sock_type); ret = -1; + ret_code = LTTCOMM_CONSUMERD_FATAL; goto error; } @@ -3166,6 +3202,14 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, sock_type == LTTNG_STREAM_CONTROL ? "control" : "data", relayd->net_seq_idx, fd); + /* We successfully added the socket. Send status back. */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL); + goto error_nosignal; + } + /* * Add relayd socket pair to consumer data hashtable. If object already * exists or on error, the function gracefully returns. @@ -3176,6 +3220,11 @@ int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, return 0; error: + if (consumer_send_status_msg(sock, ret_code) < 0) { + lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL); + } + +error_nosignal: /* Close received socket if valid. */ if (fd >= 0) { if (close(fd)) {