struct lttng_consumer_stream *stream = NULL;
/* Negative keys are lookup failures */
- if (key < 0)
+ if (key < 0) {
return NULL;
+ }
rcu_read_lock();
struct lttng_consumer_channel *channel = NULL;
/* Negative keys are lookup failures */
- if (key < 0)
+ if (key < 0) {
return NULL;
+ }
rcu_read_lock();
if (stream->mmap_base != NULL) {
ret = munmap(stream->mmap_base, stream->mmap_len);
if (ret != 0) {
- perror("munmap");
+ PERROR("munmap");
}
}
break;
}
rcu_read_unlock();
- if (!--stream->chan->refcount) {
+ uatomic_dec(&stream->chan->refcount);
+ if (!uatomic_read(&stream->chan->refcount)
+ && !uatomic_read(&stream->chan->nb_init_streams)) {
free_chan = stream->chan;
}
-
call_rcu(&stream->node.head, consumer_free_stream);
end:
consumer_data.need_update = 1;
pthread_mutex_unlock(&consumer_data.lock);
- if (free_chan)
+ if (free_chan) {
consumer_del_channel(free_chan);
+ }
}
struct lttng_consumer_stream *consumer_allocate_stream(
uid_t uid,
gid_t gid,
int net_index,
- int metadata_flag)
+ int metadata_flag,
+ int *alloc_ret)
{
struct lttng_consumer_stream *stream;
int ret;
stream = zmalloc(sizeof(*stream));
if (stream == NULL) {
- perror("malloc struct lttng_consumer_stream");
+ PERROR("malloc struct lttng_consumer_stream");
+ *alloc_ret = -ENOMEM;
goto end;
}
+
+ /*
+ * Get stream's channel reference. Needed when adding the stream to the
+ * global hash table.
+ */
stream->chan = consumer_find_channel(channel_key);
if (!stream->chan) {
- perror("Unable to find channel key");
- goto end;
+ *alloc_ret = -ENOENT;
+ ERR("Unable to find channel for stream %d", stream_key);
+ goto error;
}
stream->chan->refcount++;
stream->key = stream_key;
stream->cpu = stream->chan->cpucount++;
ret = lttng_ustconsumer_allocate_stream(stream);
if (ret) {
- free(stream);
- return NULL;
+ *alloc_ret = -EINVAL;
+ goto error;
}
break;
default:
ERR("Unknown consumer_data type");
- assert(0);
- goto end;
+ *alloc_ret = -EINVAL;
+ goto error;
}
- DBG("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, out_fd %d, net_seq_idx %d)",
- stream->path_name, stream->key,
- stream->shm_fd,
- stream->wait_fd,
- (unsigned long long) stream->mmap_len,
- stream->out_fd,
+
+ /*
+ * When nb_init_streams reaches 0, we don't need to trigger any action in
+ * terms of destroying the associated channel, because the action that
+ * causes the count to become 0 also causes a stream to be added. The
+ * channel deletion will thus be triggered by the following removal of this
+ * stream.
+ */
+ if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
+ uatomic_dec(&stream->chan->nb_init_streams);
+ }
+
+ DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
+ " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
+ stream->shm_fd, stream->wait_fd,
+ (unsigned long long) stream->mmap_len, stream->out_fd,
stream->net_seq_idx);
-end:
return stream;
+
+error:
+ free(stream);
+end:
+ return NULL;
}
/*
if (channel->mmap_base != NULL) {
ret = munmap(channel->mmap_base, channel->mmap_len);
if (ret != 0) {
- perror("munmap");
+ PERROR("munmap");
}
}
if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
int channel_key,
int shm_fd, int wait_fd,
uint64_t mmap_len,
- uint64_t max_sb_size)
+ uint64_t max_sb_size,
+ unsigned int nb_init_streams)
{
struct lttng_consumer_channel *channel;
int ret;
channel = zmalloc(sizeof(*channel));
if (channel == NULL) {
- perror("malloc struct lttng_consumer_channel");
+ PERROR("malloc struct lttng_consumer_channel");
goto end;
}
channel->key = channel_key;
channel->mmap_len = mmap_len;
channel->max_sb_size = max_sb_size;
channel->refcount = 0;
+ channel->nb_init_streams = nb_init_streams;
lttng_ht_node_init_ulong(&channel->node, channel->key);
switch (consumer_data.type) {
if (errno == EINTR) {
goto restart;
}
- perror("Poll error");
+ PERROR("Poll error");
goto exit;
}
if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
ret = write(ctx->consumer_should_quit[1], "4", 1);
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
- perror("write consumer quit");
+ PERROR("write consumer quit");
}
}
ctx = zmalloc(sizeof(struct lttng_consumer_local_data));
if (ctx == NULL) {
- perror("allocating context");
+ PERROR("allocating context");
goto error;
}
ret = pipe(ctx->consumer_poll_pipe);
if (ret < 0) {
- perror("Error creating poll pipe");
+ PERROR("Error creating poll pipe");
goto error_poll_pipe;
}
/* set read end of the pipe to non-blocking */
ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
if (ret < 0) {
- perror("fcntl O_NONBLOCK");
+ PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
}
/* set write end of the pipe to non-blocking */
ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
if (ret < 0) {
- perror("fcntl O_NONBLOCK");
+ PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
}
ret = pipe(ctx->consumer_should_quit);
if (ret < 0) {
- perror("Error creating recv pipe");
+ PERROR("Error creating recv pipe");
goto error_quit_pipe;
}
ret = pipe(ctx->consumer_thread_pipe);
if (ret < 0) {
- perror("Error creating thread pipe");
+ PERROR("Error creating thread pipe");
goto error_thread_pipe;
}
static void consumer_del_metadata_stream(struct lttng_consumer_stream *stream)
{
int ret;
- struct lttng_consumer_channel *free_chan = NULL;
struct consumer_relayd_sock_pair *relayd;
assert(stream);
/* Atomically decrement channel refcount since other threads can use it. */
uatomic_dec(&stream->chan->refcount);
- if (!uatomic_read(&stream->chan->refcount)) {
- free_chan = stream->chan;
- }
-
- if (free_chan) {
- consumer_del_channel(free_chan);
+ if (!uatomic_read(&stream->chan->refcount)
+ && !uatomic_read(&stream->chan->nb_init_streams)) {
+ /* Go for channel deletion! */
+ consumer_del_channel(stream->chan);
}
free(stream);
close(ctx->consumer_metadata_pipe[0]);
continue;
} else if (revents & LPOLLIN) {
- stream = zmalloc(sizeof(struct lttng_consumer_stream));
- if (stream == NULL) {
- PERROR("zmalloc metadata consumer stream");
- goto error;
- }
-
do {
- /* Get the stream and add it to the local hash table */
- ret = read(pollfd, stream,
- sizeof(struct lttng_consumer_stream));
+ /* Get the stream pointer received */
+ ret = read(pollfd, &stream, sizeof(stream));
} while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret < sizeof(struct lttng_consumer_stream)) {
+ if (ret < 0 ||
+ ret < sizeof(struct lttng_consumer_stream *)) {
PERROR("read metadata stream");
- free(stream);
/*
* Let's continue here and hope we can still work
* without stopping the consumer. XXX: Should we?
/* allocate for all fds + 1 for the consumer_poll_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
- perror("pollfd malloc");
+ PERROR("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
- perror("local_stream malloc");
+ PERROR("local_stream malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
if (errno == EINTR) {
goto restart;
}
- perror("Poll error");
+ PERROR("Poll error");
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
goto end;
} else if (num_rdy == 0) {
ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
if (ret < 0) {
- perror("fcntl O_NONBLOCK");
+ PERROR("fcntl O_NONBLOCK");
goto end;
}
}
ret = fcntl(sock, F_SETFL, O_NONBLOCK);
if (ret < 0) {
- perror("fcntl O_NONBLOCK");
+ PERROR("fcntl O_NONBLOCK");
goto end;
}