static struct lttng_ht *metadata_ht;
static struct lttng_ht *data_ht;
-/*
- * This hash table contains the mapping between the session id of the sessiond
- * and the relayd session id. Element of the ht are indexed by sessiond_id.
- *
- * Node can be added when a relayd communication is opened in the sessiond
- * thread.
- *
- * Note that a session id of the session daemon is unique to a tracing session
- * and not to a domain session. However, a domain session has one consumer
- * which forces the 1-1 mapping between a consumer and a domain session (ex:
- * UST). This means that we can't have duplicate in this ht.
- */
-static struct lttng_ht *relayd_session_id_ht;
-
/*
* Notify a thread pipe to poll back again. This usually means that some global
* state has changed so we just send back the thread in a poll wait call.
{
int ret;
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
if (relayd == NULL) {
return;
DBG("Consumer destroy and close relayd socket pair");
- lttng_ht_lookup(relayd_session_id_ht,
- (void *)((unsigned long) relayd->session_id), &iter);
- node = lttng_ht_iter_get_node_ulong(&iter);
- if (node != NULL) {
- /* We assume the relayd is being or is destroyed */
- return;
- }
-
- ret = lttng_ht_del(relayd_session_id_ht, &iter);
- if (ret != 0) {
- /* We assume the relayd is being or is destroyed */
- return;
- }
-
iter.iter.node = &relayd->node.node;
ret = lttng_ht_del(consumer_data.relayd_ht, &iter);
if (ret != 0) {
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
data_hdr.padding_size = htobe32(padding);
- data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
+ /*
+ * Note that net_seq_num below is assigned with the *current* value of
+ * next_net_seq_num and only after that the next_net_seq_num will be
+ * increment. This is why when issuing a command on the relayd using
+ * this next value, 1 should always be substracted in order to compare
+ * the last seen sequence number on the relayd side to the last sent.
+ */
+ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
/* Other fields are zeroed previously */
ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
goto error;
}
+ ++stream->next_net_seq_num;
+
/* Set to go on data socket */
outfd = relayd->data_sock.fd;
}
do {
ret = write(ctx->consumer_should_quit[1], "4", 1);
} while (ret < 0 && errno == EINTR);
- if (ret < 0) {
+ if (ret < 0 || ret != 1) {
PERROR("write consumer quit");
}
do {
ret = write(fd, (void *) &hdr, sizeof(hdr));
} while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write metadata stream id");
+ if (ret < 0 || ret != sizeof(hdr)) {
+ /*
+ * This error means that the fd's end is closed so ignore the perror
+ * not to clubber the error output since this can happen in a normal
+ * code path.
+ */
+ if (errno != EPIPE) {
+ PERROR("write metadata stream id");
+ }
+ DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
+ /*
+ * Set ret to a negative value because if ret != sizeof(hdr), we don't
+ * handle writting the missing part so report that as an error and
+ * don't lie to the caller.
+ */
+ ret = -1;
goto end;
}
DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
} while (ret < 0 && errno == EINTR);
DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
if (ret < 0) {
- PERROR("Error in file write");
+ /*
+ * This is possible if the fd is closed on the other side (outfd)
+ * or any write problem. It can be verbose a bit for a normal
+ * execution if for instance the relayd is stopped abruptly. This
+ * can happen so set this to a DBG statement.
+ */
+ DBG("Error in file write mmap");
if (written == 0) {
written = ret;
}
DBG("Metadata main loop started");
while (1) {
- lttng_poll_reset(&events);
-
- nb_fd = LTTNG_POLL_GETNB(&events);
-
/* Only the metadata pipe is set */
- if (nb_fd == 0 && consumer_quit == 1) {
+ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
goto end;
}
restart:
- DBG("Metadata poll wait with %d fd(s)", nb_fd);
+ DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
ret = lttng_poll_wait(&events, -1);
DBG("Metadata event catched in thread");
if (ret < 0) {
goto error;
}
+ nb_fd = ret;
+
/* From here, the event is a metadata wait fd */
for (i = 0; i < nb_fd; i++) {
revents = LTTNG_POLL_GETEV(&events, i);
if (stream == NULL) {
/* Check for deleted streams. */
validate_endpoint_status_metadata_stream(&events);
- continue;
+ goto restart;
}
DBG("Adding metadata stream %d to poll set",
*/
pthread_mutex_lock(&consumer_data.lock);
if (consumer_data.need_update) {
- if (pollfd != NULL) {
- free(pollfd);
- pollfd = NULL;
- }
- if (local_stream != NULL) {
- free(local_stream);
- local_stream = NULL;
- }
+ free(pollfd);
+ pollfd = NULL;
+
+ free(local_stream);
+ local_stream = NULL;
/* allocate for all fds + 1 for the consumer_data_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
* array update over low-priority reads.
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
- size_t pipe_readlen;
+ ssize_t pipe_readlen;
DBG("consumer_data_pipe wake up");
/* Consume 1 byte of pipe data */
pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
sizeof(new_stream));
} while (pipe_readlen == -1 && errno == EINTR);
+ if (pipe_readlen < 0) {
+ PERROR("read consumer data pipe");
+ /* Continue so we can at least handle the current stream(s). */
+ continue;
+ }
/*
* If the stream is NULL, just ignore it. It's also possible that
}
end:
DBG("polling thread exiting");
- if (pollfd != NULL) {
- free(pollfd);
- pollfd = NULL;
- }
- if (local_stream != NULL) {
- free(local_stream);
- local_stream = NULL;
- }
+ free(pollfd);
+ free(local_stream);
/*
* Close the write side of the pipe so epoll_wait() in
/* Blocking call, waiting for transmission */
sock = lttcomm_accept_unix_sock(client_socket);
- if (sock <= 0) {
+ if (sock < 0) {
WARN("On accept");
goto end;
}
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- relayd_session_id_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
}
/*
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
unsigned int sessiond_id)
{
- int fd = -1, ret = -1;
+ int fd = -1, ret = -1, relayd_created = 0;
enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
- struct consumer_relayd_session_id *relayd_id_node;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ ret = -1;
goto error;
}
+ relayd->sessiond_session_id = (uint64_t) sessiond_id;
+ relayd_created = 1;
}
/* Poll on consumer socket. */
relayd->control_sock.fd = fd;
/*
- * Create a session on the relayd and store the returned id. No need to
- * grab the socket lock since the relayd object is not yet visible.
+ * Create a session on the relayd and store the returned id. Lock the
+ * control socket mutex if the relayd was NOT created before.
*/
+ if (!relayd_created) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ }
ret = relayd_create_session(&relayd->control_sock,
- &relayd->session_id);
- if (ret < 0) {
- goto error;
+ &relayd->relayd_session_id);
+ if (!relayd_created) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
-
- /* Set up a relayd session id node. */
- relayd_id_node = zmalloc(sizeof(struct consumer_relayd_session_id));
- if (!relayd_id_node) {
- PERROR("zmalloc relayd id node");
+ if (ret < 0) {
goto error;
}
- relayd_id_node->relayd_id = relayd->session_id;
- relayd_id_node->sessiond_id = (uint64_t) sessiond_id;
-
- /* Indexed by session id of the session daemon. */
- lttng_ht_node_init_ulong(&relayd_id_node->node,
- relayd_id_node->sessiond_id);
- rcu_read_lock();
- lttng_ht_add_unique_ulong(relayd_session_id_ht, &relayd_id_node->node);
- rcu_read_unlock();
-
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
+ ret = -1;
goto error;
}
PERROR("close received socket");
}
}
+
+ if (relayd_created) {
+ /* We just want to cleanup. Ignore ret value. */
+ (void) relayd_close(&relayd->control_sock);
+ (void) relayd_close(&relayd->data_sock);
+ free(relayd);
+ }
+
return ret;
}
return ret;
}
+/*
+ * Search for a relayd associated to the session id and return the reference.
+ *
+ * A rcu read side lock MUST be acquire before calling this function and locked
+ * until the relayd object is no longer necessary.
+ */
+static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
+{
+ struct lttng_ht_iter iter;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+
+ /* Iterate over all relayd since they are indexed by net_seq_idx. */
+ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+ node.node) {
+ /*
+ * Check by sessiond id which is unique here where the relayd session
+ * id might not be when having multiple relayd.
+ */
+ if (relayd->sessiond_session_id == id) {
+ /* Found the relayd. There can be only one per id. */
+ goto found;
+ }
+ }
+
+ return NULL;
+
+found:
+ return relayd;
+}
+
/*
* Check if for a given session id there is still data needed to be extract
* from the buffers.
struct lttng_ht_iter iter;
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
- struct consumer_relayd_sock_pair *relayd;
+ struct consumer_relayd_sock_pair *relayd = NULL;
int (*data_pending)(struct lttng_consumer_stream *);
DBG("Consumer data pending command on session id %" PRIu64, id);
/* Ease our life a bit */
ht = consumer_data.stream_list_ht;
+ relayd = find_relayd_by_session_id(id);
+ if (relayd) {
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_begin_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ /* Communication error thus the relayd so no data pending. */
+ goto data_not_pending;
+ }
+ }
+
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
ht->match_fct, (void *)((unsigned long) id),
/* If this call fails, the stream is being used hence data pending. */
ret = stream_try_lock(stream);
if (!ret) {
- goto data_not_pending;
+ goto data_pending;
}
/*
ret = data_pending(stream);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
+ goto data_pending;
}
}
/* Relayd check */
- if (stream->net_seq_idx != -1) {
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (!relayd) {
- /*
- * At this point, if the relayd object is not available for the
- * given stream, it is because the relayd is being cleaned up
- * so every stream associated with it (for a session id value)
- * are or will be marked for deletion hence no data pending.
- */
- pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
- }
-
+ if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
if (stream->metadata_flag) {
- ret = relayd_quiescent_control(&relayd->control_sock);
+ ret = relayd_quiescent_control(&relayd->control_sock,
+ stream->relayd_stream_id);
} else {
ret = relayd_data_pending(&relayd->control_sock,
- stream->relayd_stream_id, stream->next_net_seq_num);
+ stream->relayd_stream_id,
+ stream->next_net_seq_num - 1);
}
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
+ goto data_pending;
}
}
pthread_mutex_unlock(&stream->lock);
}
+ if (relayd) {
+ unsigned int is_data_inflight = 0;
+
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_end_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id, &is_data_inflight);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto data_not_pending;
+ }
+ if (is_data_inflight) {
+ goto data_pending;
+ }
+ }
+
/*
- * Finding _no_ node in the hash table means that the stream(s) have been
- * removed thus data is guaranteed to be available for analysis from the
- * trace files. This is *only* true for local consumer and not network
- * streaming.
+ * Finding _no_ node in the hash table and no inflight data means that the
+ * stream(s) have been removed thus data is guaranteed to be available for
+ * analysis from the trace files.
*/
+data_not_pending:
/* Data is available to be read by a viewer. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
return 0;
-data_not_pending:
+data_pending:
/* Data is still being extracted from buffers. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();