return (int) ret;
}
+/*
+ * Cleanup the stream list of a channel. Those streams are not yet globally
+ * visible
+ */
+static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
+{
+ struct lttng_consumer_stream *stream, *stmp;
+
+ assert(channel);
+
+ /* Delete streams that might have been left in the stream list. */
+ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+ send_node) {
+ cds_list_del(&stream->send_node);
+ /*
+ * Once a stream is added to this list, the buffers were created so we
+ * have a guarantee that this call will succeed. Setting the monitor
+ * mode to 0 so we don't lock nor try to delete the stream from the
+ * global hash table.
+ */
+ stream->monitor = 0;
+ consumer_stream_destroy(stream, NULL);
+ }
+}
+
/*
* Find a stream. The consumer_data.lock must be locked during this
* call.
{
int ret;
struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream, *stmp;
DBG("Consumer delete channel key %" PRIu64, channel->key);
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&channel->lock);
- /* Delete streams that might have been left in the stream list. */
- cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
- send_node) {
- cds_list_del(&stream->send_node);
- /*
- * Once a stream is added to this list, the buffers were created so
- * we have a guarantee that this call will succeed.
- */
- consumer_stream_destroy(stream, NULL);
- }
+ /* Destroy streams that might have been left in the stream list. */
+ clean_channel_stream_list(channel);
if (channel->live_timer_enabled == 1) {
consumer_timer_live_stop(channel);
{
unsigned long mmap_offset;
void *mmap_base;
- ssize_t ret = 0, written = 0;
+ ssize_t ret = 0;
off_t orig_offset = stream->out_fd_offset;
/* Default is on the disk */
int outfd = stream->out_fd;
case LTTNG_CONSUMER_KERNEL:
mmap_base = stream->mmap_base;
ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
- if (ret != 0) {
+ if (ret < 0) {
+ ret = -errno;
PERROR("tracer ctl get_mmap_read_offset");
- written = -errno;
goto end;
}
break;
mmap_base = lttng_ustctl_get_mmap_base(stream);
if (!mmap_base) {
ERR("read mmap get mmap base for stream %s", stream->name);
- written = -EPERM;
+ ret = -EPERM;
goto end;
}
ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
if (ret != 0) {
PERROR("tracer ctl get_mmap_read_offset");
- written = ret;
+ ret = -EINVAL;
goto end;
}
break;
}
ret = write_relayd_stream_header(stream, netlen, padding, relayd);
- if (ret >= 0) {
- /* Use the returned socket. */
- outfd = ret;
+ if (ret < 0) {
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ /* Use the returned socket. */
+ outfd = ret;
- /* Write metadata stream id before payload */
- if (stream->metadata_flag) {
- ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
- if (ret < 0) {
- written = ret;
- /* Socket operation failed. We consider the relayd dead */
- if (ret == -EPIPE || ret == -EINVAL) {
- relayd_hang_up = 1;
- goto write_error;
- }
- goto end;
- }
- }
- } else {
- /* Socket operation failed. We consider the relayd dead */
- if (ret == -EPIPE || ret == -EINVAL) {
+ /* Write metadata stream id before payload */
+ if (stream->metadata_flag) {
+ ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
+ if (ret < 0) {
relayd_hang_up = 1;
goto write_error;
}
- /* Else, use the default set before which is the filesystem. */
}
} else {
/* No streaming, we have to set the len with the full padding */
* amount written.
*/
if (ret < 0) {
- written = -errno;
- } else {
- written = ret;
+ ret = -errno;
}
+ relayd_hang_up = 1;
/* Socket operation failed. We consider the relayd dead */
- if (errno == EPIPE || errno == EINVAL) {
+ if (errno == EPIPE || errno == EINVAL || errno == EBADF) {
/*
* This is possible if the fd is closed on the other side
* (outfd) or any write problem. It can be verbose a bit for a
* abruptly. This can happen so set this to a DBG statement.
*/
DBG("Consumer mmap write detected relayd hang up");
- relayd_hang_up = 1;
- goto write_error;
+ } else {
+ /* Unhandled error, print it and stop function right now. */
+ PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
}
-
- /* Unhandled error, print it and stop function right now. */
- PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
- goto end;
+ goto write_error;
}
stream->output_written += ret;
- written = ret;
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
}
rcu_read_unlock();
- return written;
+ return ret;
}
/*
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- /*
- * Note: a mutex is taken internally within
- * liblttng-ust-ctl to protect timer wakeup_fd
- * use from concurrent close.
- */
- lttng_ustconsumer_close_stream_wakeup(stream);
+ if (stream->metadata_flag) {
+ /* Safe and protected by the stream lock. */
+ lttng_ustconsumer_close_metadata(stream->chan);
+ } else {
+ /*
+ * Note: a mutex is taken internally within
+ * liblttng-ust-ctl to protect timer wakeup_fd
+ * use from concurrent close.
+ */
+ lttng_ustconsumer_close_stream_wakeup(stream);
+ }
break;
default:
ERR("Unknown consumer_data type");
break;
case CONSUMER_CHANNEL_DEL:
{
- struct lttng_consumer_stream *stream, *stmp;
+ /*
+ * This command should never be called if the channel
+ * has streams monitored by either the data or metadata
+ * thread. The consumer only notify this thread with a
+ * channel del. command if it receives a destroy
+ * channel command from the session daemon that send it
+ * if a command prior to the GET_CHANNEL failed.
+ */
rcu_read_lock();
chan = consumer_find_channel(key);
iter.iter.node = &chan->wait_fd_node.node;
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
- consumer_close_channel_streams(chan);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- /* Delete streams that might have been left in the stream list. */
- cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
- send_node) {
- health_code_update();
-
- cds_list_del(&stream->send_node);
- lttng_ustconsumer_del_stream(stream);
- uatomic_sub(&stream->chan->refcount, 1);
- assert(&chan->refcount);
- free(stream);
- }
+ health_code_update();
+ /* Destroy streams that might have been left in the stream list. */
+ clean_channel_stream_list(chan);
break;
default:
ERR("Unknown consumer_data type");
lttng_poll_del(&events, chan->wait_fd);
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
+
+ /*
+ * This will close the wait fd for each stream associated to
+ * this channel AND monitored by the data/metadata thread thus
+ * will be clean by the right thread.
+ */
consumer_close_channel_streams(chan);
/* Release our own refcount */