static void free_stream_rcu(struct rcu_head *head)
{
struct lttng_ht_node_u64 *node =
- caa_container_of(head, struct lttng_ht_node_u64, head);
+ lttng::utils::container_of(head, <tng_ht_node_u64::head);
struct lttng_consumer_stream *stream =
- caa_container_of(node, struct lttng_consumer_stream, node);
+ lttng::utils::container_of(node, <tng_consumer_stream::node);
pthread_mutex_destroy(&stream->lock);
free(stream);
int ret;
struct lttng_consumer_stream *stream;
- stream = (lttng_consumer_stream *) zmalloc(sizeof(*stream));
+ stream = zmalloc<lttng_consumer_stream>();
if (stream == NULL) {
PERROR("malloc struct lttng_consumer_stream");
ret = -ENOMEM;
* The consumer data lock MUST be acquired.
* The stream lock MUST be acquired.
*/
-void consumer_stream_close(struct lttng_consumer_stream *stream)
+void consumer_stream_close_output(struct lttng_consumer_stream *stream)
{
- int ret;
struct consumer_relayd_sock_pair *relayd;
LTTNG_ASSERT(stream);
- switch (the_consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- if (stream->mmap_base != NULL) {
- ret = munmap(stream->mmap_base, stream->mmap_len);
- if (ret != 0) {
- PERROR("munmap");
- }
- }
-
- if (stream->wait_fd >= 0) {
- ret = close(stream->wait_fd);
- if (ret) {
- PERROR("close");
- }
- stream->wait_fd = -1;
- }
- if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
- utils_close_pipe(stream->splice_pipe);
- }
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- {
- /*
- * Special case for the metadata since the wait fd is an internal pipe
- * polled in the metadata thread.
- */
- if (stream->metadata_flag && stream->chan->monitor) {
- int rpipe = stream->ust_metadata_poll_pipe[0];
-
- /*
- * This will stop the channel timer if one and close the write side
- * of the metadata poll pipe.
- */
- lttng_ustconsumer_close_metadata(stream->chan);
- if (rpipe >= 0) {
- ret = close(rpipe);
- if (ret < 0) {
- PERROR("closing metadata pipe read side");
- }
- stream->ust_metadata_poll_pipe[0] = -1;
- }
- }
- break;
- }
- default:
- ERR("Unknown consumer_data type");
- abort();
- }
-
/* Close output fd. Could be a socket or local file at this point. */
if (stream->out_fd >= 0) {
- ret = close(stream->out_fd);
+ const auto ret = close(stream->out_fd);
if (ret) {
- PERROR("close");
+ PERROR("Failed to close stream output file descriptor");
}
+
stream->out_fd = -1;
}
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
consumer_stream_relayd_close(stream, relayd);
+ stream->net_seq_idx = -1ULL;
}
+
rcu_read_unlock();
}
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
+ if (stream->mmap_base != NULL) {
+ const auto ret = munmap(stream->mmap_base, stream->mmap_len);
+
+ if (ret != 0) {
+ PERROR("munmap");
+ }
+ }
+
+ if (stream->wait_fd >= 0) {
+ const auto ret = close(stream->wait_fd);
+
+ if (ret) {
+ PERROR("close");
+ }
+
+ stream->wait_fd = -1;
+ }
+
+ if (stream->chan->output == CONSUMER_CHANNEL_SPLICE) {
+ utils_close_pipe(stream->splice_pipe);
+ }
+
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
+ /*
+ * Special case for the metadata since the wait fd is an internal pipe
+ * polled in the metadata thread.
+ */
+ if (stream->metadata_flag && stream->chan->monitor) {
+ const auto rpipe = stream->ust_metadata_poll_pipe[0];
+
+ /*
+ * This will stop the channel timer if one and close the write side
+ * of the metadata poll pipe.
+ */
+ lttng_ustconsumer_close_metadata(stream->chan);
+ if (rpipe >= 0) {
+ const auto ret = close(rpipe);
+
+ if (ret < 0) {
+ PERROR("closing metadata pipe read side");
+ }
+
+ stream->ust_metadata_poll_pipe[0] = -1;
+ }
+ }
+
lttng_ustconsumer_del_stream(stream);
break;
default:
/* Destroy tracer buffers of the stream. */
consumer_stream_destroy_buffers(stream);
/* Close down everything including the relayd if one. */
- consumer_stream_close(stream);
+ consumer_stream_close_output(stream);
}
/*
if (stream->globally_visible) {
pthread_mutex_lock(&the_consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
+
pthread_mutex_lock(&stream->lock);
/* Remove every reference of the stream in the consumer. */
consumer_stream_delete(stream, ht);
+
destroy_close_stream(stream);
/* Update channel's refcount of the stream. */