#include "consumer.h"
#include "consumer-stream.h"
+#include "consumer-testpoint.h"
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
return (int) ret;
}
+/*
+ * Cleanup the stream list of a channel. Those streams are not yet globally
+ * visible
+ */
+static void clean_channel_stream_list(struct lttng_consumer_channel *channel)
+{
+ struct lttng_consumer_stream *stream, *stmp;
+
+ assert(channel);
+
+ /* Delete streams that might have been left in the stream list. */
+ cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
+ send_node) {
+ cds_list_del(&stream->send_node);
+ /*
+ * Once a stream is added to this list, the buffers were created so we
+ * have a guarantee that this call will succeed. Setting the monitor
+ * mode to 0 so we don't lock nor try to delete the stream from the
+ * global hash table.
+ */
+ stream->monitor = 0;
+ consumer_stream_destroy(stream, NULL);
+ }
+}
+
/*
* Find a stream. The consumer_data.lock must be locked during this
* call.
return channel;
}
-static void free_stream_rcu(struct rcu_head *head)
+/*
+ * There is a possibility that the consumer does not have enough time between
+ * the close of the channel on the session daemon and the cleanup in here thus
+ * once we have a channel add with an existing key, we know for sure that this
+ * channel will eventually get cleaned up by all streams being closed.
+ *
+ * This function just nullifies the already existing channel key.
+ */
+static void steal_channel_key(uint64_t key)
{
- struct lttng_ht_node_u64 *node =
- caa_container_of(head, struct lttng_ht_node_u64, head);
- struct lttng_consumer_stream *stream =
- caa_container_of(node, struct lttng_consumer_stream, node);
+ struct lttng_consumer_channel *channel;
- free(stream);
+ rcu_read_lock();
+ channel = consumer_find_channel(key);
+ if (channel) {
+ channel->key = (uint64_t) -1ULL;
+ /*
+ * We don't want the lookup to match, but we still need to iterate on
+ * this channel when iterating over the hash table. Just change the
+ * node key.
+ */
+ channel->node.key = (uint64_t) -1ULL;
+ }
+ rcu_read_unlock();
}
static void free_channel_rcu(struct rcu_head *head)
{
int ret;
struct lttng_ht_iter iter;
- struct lttng_consumer_stream *stream, *stmp;
DBG("Consumer delete channel key %" PRIu64, channel->key);
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&channel->lock);
- /* Delete streams that might have been left in the stream list. */
- cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
- send_node) {
- cds_list_del(&stream->send_node);
- /*
- * Once a stream is added to this list, the buffers were created so
- * we have a guarantee that this call will succeed.
- */
- consumer_stream_destroy(stream, NULL);
- }
+ /* Destroy streams that might have been left in the stream list. */
+ clean_channel_stream_list(channel);
if (channel->live_timer_enabled == 1) {
consumer_timer_live_stop(channel);
/*
* Add a channel to the global list protected by a mutex.
*
- * On success 0 is returned else a negative value.
+ * Always return 0 indicating success.
*/
int consumer_add_channel(struct lttng_consumer_channel *channel,
struct lttng_consumer_local_data *ctx)
{
- int ret = 0;
- struct lttng_ht_node_u64 *node;
- struct lttng_ht_iter iter;
-
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&channel->lock);
pthread_mutex_lock(&channel->timer_lock);
- rcu_read_lock();
- lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
- node = lttng_ht_iter_get_node_u64(&iter);
- if (node != NULL) {
- /* Channel already exist. Ignore the insertion */
- ERR("Consumer add channel key %" PRIu64 " already exists!",
- channel->key);
- ret = -EEXIST;
- goto end;
- }
+ /*
+ * This gives us a guarantee that the channel we are about to add to the
+ * channel hash table will be unique. See this function comment on the why
+ * we need to steel the channel key at this stage.
+ */
+ steal_channel_key(channel->key);
+ rcu_read_lock();
lttng_ht_add_unique_u64(consumer_data.channel_ht, &channel->node);
-
-end:
rcu_read_unlock();
+
pthread_mutex_unlock(&channel->timer_lock);
pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
- if (!ret && channel->wait_fd != -1 &&
- channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
+ if (channel->wait_fd != -1 && channel->type == CONSUMER_CHANNEL_TYPE_DATA) {
notify_channel_pipe(ctx, channel, -1, CONSUMER_CHANNEL_ADD);
}
- return ret;
+
+ return 0;
}
/*
}
/*
- * Poll on the should_quit pipe and the command socket return -1 on error and
- * should exit, 0 if data is available on the command socket
+ * Poll on the should_quit pipe and the command socket return -1 on
+ * error, 1 if should exit, 0 if data is available on the command socket
*/
int lttng_consumer_poll_socket(struct pollfd *consumer_sockpoll)
{
goto restart;
}
PERROR("Poll error");
- goto exit;
+ return -1;
}
if (consumer_sockpoll[0].revents & (POLLIN | POLLPRI)) {
DBG("consumer_should_quit wake up");
- goto exit;
+ return 1;
}
return 0;
-
-exit:
- return -1;
}
/*
{
unsigned long mmap_offset;
void *mmap_base;
- ssize_t ret = 0, written = 0;
+ ssize_t ret = 0;
off_t orig_offset = stream->out_fd_offset;
/* Default is on the disk */
int outfd = stream->out_fd;
case LTTNG_CONSUMER_KERNEL:
mmap_base = stream->mmap_base;
ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
- if (ret != 0) {
+ if (ret < 0) {
+ ret = -errno;
PERROR("tracer ctl get_mmap_read_offset");
- written = -errno;
goto end;
}
break;
mmap_base = lttng_ustctl_get_mmap_base(stream);
if (!mmap_base) {
ERR("read mmap get mmap base for stream %s", stream->name);
- written = -EPERM;
+ ret = -EPERM;
goto end;
}
ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
if (ret != 0) {
PERROR("tracer ctl get_mmap_read_offset");
- written = ret;
+ ret = -EINVAL;
goto end;
}
break;
}
ret = write_relayd_stream_header(stream, netlen, padding, relayd);
- if (ret >= 0) {
- /* Use the returned socket. */
- outfd = ret;
+ if (ret < 0) {
+ relayd_hang_up = 1;
+ goto write_error;
+ }
+ /* Use the returned socket. */
+ outfd = ret;
- /* Write metadata stream id before payload */
- if (stream->metadata_flag) {
- ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
- if (ret < 0) {
- written = ret;
- /* Socket operation failed. We consider the relayd dead */
- if (ret == -EPIPE || ret == -EINVAL) {
- relayd_hang_up = 1;
- goto write_error;
- }
- goto end;
- }
- }
- } else {
- /* Socket operation failed. We consider the relayd dead */
- if (ret == -EPIPE || ret == -EINVAL) {
+ /* Write metadata stream id before payload */
+ if (stream->metadata_flag) {
+ ret = write_relayd_metadata_id(outfd, stream, relayd, padding);
+ if (ret < 0) {
relayd_hang_up = 1;
goto write_error;
}
- /* Else, use the default set before which is the filesystem. */
}
} else {
/* No streaming, we have to set the len with the full padding */
}
}
- while (len > 0) {
- ret = lttng_write(outfd, mmap_base + mmap_offset, len);
- DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
- if (ret < len) {
+ /*
+ * This call guarantee that len or less is returned. It's impossible to
+ * receive a ret value that is bigger than len.
+ */
+ ret = lttng_write(outfd, mmap_base + mmap_offset, len);
+ DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
+ if (ret < 0 || ((size_t) ret != len)) {
+ /*
+ * Report error to caller if nothing was written else at least send the
+ * amount written.
+ */
+ if (ret < 0) {
+ ret = -errno;
+ }
+ relayd_hang_up = 1;
+
+ /* Socket operation failed. We consider the relayd dead */
+ if (errno == EPIPE || errno == EINVAL || errno == EBADF) {
/*
- * This is possible if the fd is closed on the other side (outfd)
- * or any write problem. It can be verbose a bit for a normal
- * execution if for instance the relayd is stopped abruptly. This
- * can happen so set this to a DBG statement.
+ * This is possible if the fd is closed on the other side
+ * (outfd) or any write problem. It can be verbose a bit for a
+ * normal execution if for instance the relayd is stopped
+ * abruptly. This can happen so set this to a DBG statement.
*/
- DBG("Error in file write mmap");
- if (written == 0) {
- written = -errno;
- }
- /* Socket operation failed. We consider the relayd dead */
- if (errno == EPIPE || errno == EINVAL) {
- relayd_hang_up = 1;
- goto write_error;
- }
- goto end;
- } else if (ret > len) {
- PERROR("Error in file write (ret %zd > len %lu)", ret, len);
- written += ret;
- goto end;
+ DBG("Consumer mmap write detected relayd hang up");
} else {
- len -= ret;
- mmap_offset += ret;
+ /* Unhandled error, print it and stop function right now. */
+ PERROR("Error in write mmap (ret %zd != len %lu)", ret, len);
}
+ goto write_error;
+ }
+ stream->output_written += ret;
- /* This call is useless on a socket so better save a syscall. */
- if (!relayd) {
- /* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(outfd, stream->out_fd_offset, ret,
- SYNC_FILE_RANGE_WRITE);
- stream->out_fd_offset += ret;
- }
- stream->output_written += ret;
- written += ret;
+ /* This call is useless on a socket so better save a syscall. */
+ if (!relayd) {
+ /* This won't block, but will start writeout asynchronously */
+ lttng_sync_file_range(outfd, stream->out_fd_offset, len,
+ SYNC_FILE_RANGE_WRITE);
+ stream->out_fd_offset += len;
}
lttng_consumer_sync_trace_file(stream, orig_offset);
}
rcu_read_unlock();
- return written;
+ return ret;
}
/*
if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
- ret = -EPIPE;
+ written = -ret;
goto end;
}
}
/* Write metadata stream id before payload */
if (relayd) {
- int total_len = len;
+ unsigned long total_len = len;
if (stream->metadata_flag) {
/*
padding);
if (ret < 0) {
written = ret;
- /* Socket operation failed. We consider the relayd dead */
- if (ret == -EBADF) {
- WARN("Remote relayd disconnected. Stopping");
- relayd_hang_up = 1;
- goto write_error;
- }
- goto end;
+ relayd_hang_up = 1;
+ goto write_error;
}
total_len += sizeof(struct lttcomm_relayd_metadata_payload);
}
ret = write_relayd_stream_header(stream, total_len, padding, relayd);
- if (ret >= 0) {
- /* Use the returned socket. */
- outfd = ret;
- } else {
- /* Socket operation failed. We consider the relayd dead */
- if (ret == -EBADF) {
- WARN("Remote relayd disconnected. Stopping");
- relayd_hang_up = 1;
- goto write_error;
- }
- goto end;
+ if (ret < 0) {
+ written = ret;
+ relayd_hang_up = 1;
+ goto write_error;
}
+ /* Use the returned socket. */
+ outfd = ret;
} else {
/* No streaming, we have to set the len with the full padding */
len += padding;
stream->out_fd, &(stream->tracefile_count_current),
&stream->out_fd);
if (ret < 0) {
+ written = ret;
ERR("Rotating output file");
goto end;
}
stream->chan->tracefile_size,
stream->tracefile_count_current);
if (ret < 0) {
+ written = ret;
goto end;
}
stream->index_fd = ret;
SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("splice chan to pipe, ret %zd", ret_splice);
if (ret_splice < 0) {
- PERROR("Error in relay splice");
- if (written == 0) {
- written = ret_splice;
- }
ret = errno;
+ written = -ret;
+ PERROR("Error in relay splice");
goto splice_error;
}
/* Handle stream on the relayd if the output is on the network */
- if (relayd) {
- if (stream->metadata_flag) {
- size_t metadata_payload_size =
- sizeof(struct lttcomm_relayd_metadata_payload);
+ if (relayd && stream->metadata_flag) {
+ size_t metadata_payload_size =
+ sizeof(struct lttcomm_relayd_metadata_payload);
- /* Update counter to fit the spliced data */
- ret_splice += metadata_payload_size;
- len += metadata_payload_size;
- /*
- * We do this so the return value can match the len passed as
- * argument to this function.
- */
- written -= metadata_payload_size;
- }
+ /* Update counter to fit the spliced data */
+ ret_splice += metadata_payload_size;
+ len += metadata_payload_size;
+ /*
+ * We do this so the return value can match the len passed as
+ * argument to this function.
+ */
+ written -= metadata_payload_size;
}
/* Splice data out */
ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("Consumer splice pipe to file, ret %zd", ret_splice);
if (ret_splice < 0) {
- PERROR("Error in file splice");
- if (written == 0) {
- written = ret_splice;
- }
- /* Socket operation failed. We consider the relayd dead */
- if (errno == EBADF || errno == EPIPE) {
- WARN("Remote relayd disconnected. Stopping");
- relayd_hang_up = 1;
- goto write_error;
- }
ret = errno;
- goto splice_error;
+ written = -ret;
+ relayd_hang_up = 1;
+ goto write_error;
} else if (ret_splice > len) {
- errno = EINVAL;
- PERROR("Wrote more data than requested %zd (len: %lu)",
- ret_splice, len);
- written += ret_splice;
+ /*
+ * We don't expect this code path to be executed but you never know
+ * so this is an extra protection agains a buggy splice().
+ */
ret = errno;
+ written += ret_splice;
+ PERROR("Wrote more data than requested %zd (len: %lu)", ret_splice,
+ len);
goto splice_error;
+ } else {
+ /* All good, update current len and continue. */
+ len -= ret_splice;
}
- len -= ret_splice;
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
written += ret_splice;
}
lttng_consumer_sync_trace_file(stream, orig_offset);
-
- ret = ret_splice;
-
goto end;
write_error:
}
}
-void lttng_consumer_close_metadata(void)
+void lttng_consumer_close_all_metadata(void)
{
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
* because at this point we are sure that the metadata producer is
* either dead or blocked.
*/
- lttng_ustconsumer_close_metadata(metadata_ht);
+ lttng_ustconsumer_close_all_metadata(metadata_ht);
break;
default:
ERR("Unknown consumer_data type");
void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht)
{
- int ret;
- struct lttng_ht_iter iter;
struct lttng_consumer_channel *free_chan = NULL;
- struct consumer_relayd_sock_pair *relayd;
assert(stream);
/*
DBG3("Consumer delete metadata stream %d", stream->wait_fd);
- if (ht == NULL) {
- /* Means the stream was allocated but not successfully added */
- goto free_stream_rcu;
- }
-
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- if (stream->mmap_base != NULL) {
- ret = munmap(stream->mmap_base, stream->mmap_len);
- if (ret != 0) {
- PERROR("munmap metadata stream");
- }
- }
- if (stream->wait_fd >= 0) {
- ret = close(stream->wait_fd);
- if (ret < 0) {
- PERROR("close kernel metadata wait_fd");
- }
- }
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- if (stream->monitor) {
- /* close the write-side in close_metadata */
- ret = close(stream->ust_metadata_poll_pipe[0]);
- if (ret < 0) {
- PERROR("Close UST metadata read-side poll pipe");
- }
- }
- lttng_ustconsumer_del_stream(stream);
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- goto end;
- }
-
- rcu_read_lock();
- iter.iter.node = &stream->node.node;
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
-
- iter.iter.node = &stream->node_channel_id.node;
- ret = lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
- assert(!ret);
-
- iter.iter.node = &stream->node_session_id.node;
- ret = lttng_ht_del(consumer_data.stream_list_ht, &iter);
- assert(!ret);
- rcu_read_unlock();
-
- if (stream->out_fd >= 0) {
- ret = close(stream->out_fd);
- if (ret) {
- PERROR("close");
- }
- }
-
- /* Check and cleanup relayd */
- rcu_read_lock();
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- uatomic_dec(&relayd->refcount);
- assert(uatomic_read(&relayd->refcount) >= 0);
-
- /* Closing streams requires to lock the control socket. */
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_send_close_stream(&relayd->control_sock,
- stream->relayd_stream_id, stream->next_net_seq_num - 1);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- DBG("Unable to close stream on the relayd. Continuing");
- /*
- * Continue here. There is nothing we can do for the relayd.
- * Chances are that the relayd has closed the socket so we just
- * continue cleaning up.
- */
- }
+ /* Remove any reference to that stream. */
+ consumer_stream_delete(stream, ht);
- /* Both conditions are met, we destroy the relayd. */
- if (uatomic_read(&relayd->refcount) == 0 &&
- uatomic_read(&relayd->destroy_flag)) {
- consumer_destroy_relayd(relayd);
- }
- }
- rcu_read_unlock();
+ /* Close down everything including the relayd if one. */
+ consumer_stream_close(stream);
+ /* Destroy tracer buffers of the stream. */
+ consumer_stream_destroy_buffers(stream);
/* Atomically decrement channel refcount since other threads can use it. */
if (!uatomic_sub_return(&stream->chan->refcount, 1)
free_chan = stream->chan;
}
-end:
/*
* Nullify the stream reference so it is not used after deletion. The
- * channel lock MUST be acquired before being able to check for
- * a NULL pointer value.
+ * channel lock MUST be acquired before being able to check for a NULL
+ * pointer value.
*/
stream->chan->metadata_stream = NULL;
consumer_del_channel(free_chan);
}
-free_stream_rcu:
- call_rcu(&stream->node.head, free_stream_rcu);
+ consumer_stream_free(stream);
}
/*
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
+ if (testpoint(consumerd_thread_metadata)) {
+ goto error_testpoint;
+ }
+
health_code_update();
DBG("Thread metadata poll started");
/* Add metadata stream to the global poll events list */
lttng_poll_add(&events, stream->wait_fd,
- LPOLLIN | LPOLLPRI);
+ LPOLLIN | LPOLLPRI | LPOLLHUP);
}
/* Handle other stream */
lttng_poll_clean(&events);
end_poll:
+error_testpoint:
if (err) {
health_error();
ERR("Health error occurred in %s", __func__);
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
+ if (testpoint(consumerd_thread_data)) {
+ goto error_testpoint;
+ }
+
health_code_update();
local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
*/
(void) lttng_pipe_write_close(ctx->consumer_metadata_pipe);
+error_testpoint:
if (err) {
health_error();
ERR("Health error occurred in %s", __func__);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- /*
- * Note: a mutex is taken internally within
- * liblttng-ust-ctl to protect timer wakeup_fd
- * use from concurrent close.
- */
- lttng_ustconsumer_close_stream_wakeup(stream);
+ if (stream->metadata_flag) {
+ /* Safe and protected by the stream lock. */
+ lttng_ustconsumer_close_metadata(stream->chan);
+ } else {
+ /*
+ * Note: a mutex is taken internally within
+ * liblttng-ust-ctl to protect timer wakeup_fd
+ * use from concurrent close.
+ */
+ lttng_ustconsumer_close_stream_wakeup(stream);
+ }
break;
default:
ERR("Unknown consumer_data type");
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
+ if (testpoint(consumerd_thread_channel)) {
+ goto error_testpoint;
+ }
+
health_code_update();
channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
break;
case CONSUMER_CHANNEL_DEL:
{
- struct lttng_consumer_stream *stream, *stmp;
+ /*
+ * This command should never be called if the channel
+ * has streams monitored by either the data or metadata
+ * thread. The consumer only notify this thread with a
+ * channel del. command if it receives a destroy
+ * channel command from the session daemon that send it
+ * if a command prior to the GET_CHANNEL failed.
+ */
rcu_read_lock();
chan = consumer_find_channel(key);
iter.iter.node = &chan->wait_fd_node.node;
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
- consumer_close_channel_streams(chan);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- /* Delete streams that might have been left in the stream list. */
- cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head,
- send_node) {
- health_code_update();
-
- cds_list_del(&stream->send_node);
- lttng_ustconsumer_del_stream(stream);
- uatomic_sub(&stream->chan->refcount, 1);
- assert(&chan->refcount);
- free(stream);
- }
+ health_code_update();
+ /* Destroy streams that might have been left in the stream list. */
+ clean_channel_stream_list(chan);
break;
default:
ERR("Unknown consumer_data type");
lttng_poll_del(&events, chan->wait_fd);
ret = lttng_ht_del(channel_ht, &iter);
assert(ret == 0);
+
+ /*
+ * This will close the wait fd for each stream associated to
+ * this channel AND monitored by the data/metadata thread thus
+ * will be clean by the right thread.
+ */
consumer_close_channel_streams(chan);
/* Release our own refcount */
end_poll:
destroy_channel_ht(channel_ht);
end_ht:
+error_testpoint:
DBG("Channel poll thread exiting");
if (err) {
health_error();
assert(ctx);
assert(sockpoll);
- if (lttng_consumer_poll_socket(sockpoll) < 0) {
- ret = -1;
+ ret = lttng_consumer_poll_socket(sockpoll);
+ if (ret) {
goto error;
}
DBG("Metadata connection on client_socket");
health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
+ if (testpoint(consumerd_thread_sessiond)) {
+ goto error_testpoint;
+ }
+
health_code_update();
DBG("Creating command socket %s", ctx->consumer_command_sock_path);
consumer_sockpoll[1].fd = client_socket;
consumer_sockpoll[1].events = POLLIN | POLLPRI;
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ if (ret) {
+ if (ret > 0) {
+ /* should exit */
+ err = 0;
+ }
goto end;
}
DBG("Connection on client_socket");
* command unix socket.
*/
ret = set_metadata_socket(ctx, consumer_sockpoll, client_socket);
- if (ret < 0) {
+ if (ret) {
+ if (ret > 0) {
+ /* should exit */
+ err = 0;
+ }
goto end;
}
health_poll_entry();
ret = lttng_consumer_poll_socket(consumer_sockpoll);
health_poll_exit();
- if (ret < 0) {
+ if (ret) {
+ if (ret > 0) {
+ /* should exit */
+ err = 0;
+ }
goto end;
}
DBG("Incoming command on sock");
ret = lttng_consumer_recv_cmd(ctx, sock, consumer_sockpoll);
- if (ret == -ENOENT) {
- DBG("Received STOP command");
- goto end;
- }
if (ret <= 0) {
/*
* This could simply be a session daemon quitting. Don't output
*
* NOTE: for now, this only applies to the UST tracer.
*/
- lttng_consumer_close_metadata();
+ lttng_consumer_close_all_metadata();
/*
* when all fds have hung up, the polling thread
}
}
+error_testpoint:
if (err) {
health_error();
ERR("Health error occurred in %s", __func__);
}
/* Poll on consumer socket. */
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ if (ret) {
+ /* Needing to exit in the middle of a command: error. */
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR);
ret = -EINTR;
goto error_nosignal;
{
struct lttcomm_consumer_status_msg msg;
+ memset(&msg, 0, sizeof(msg));
msg.ret_code = ret_code;
return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
assert(sock >= 0);
+ memset(&msg, 0, sizeof(msg));
if (!channel) {
msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
} else {