.type = LTTNG_CONSUMER_UNKNOWN,
};
-/* timeout parameter, to control the polling thread grace period. */
-int consumer_poll_timeout = -1;
-
/*
* Flag to inform the polling thread to quit when all fd hung up. Updated by
* the consumer_thread_receive_fds when it notices that all fds has hung up.
volatile int consumer_quit;
/*
- * The following two hash tables are visible by all threads which are separated
- * in different source files.
- *
* Global hash table containing respectively metadata and data streams. The
* stream element in this ht should only be updated by the metadata poll thread
* for the metadata and the data poll thread for the data.
*/
-struct lttng_ht *metadata_ht;
-struct lttng_ht *data_ht;
+static struct lttng_ht *metadata_ht;
+static struct lttng_ht *data_ht;
/*
* Notify a thread pipe to poll back again. This usually means that some global
rcu_read_unlock();
}
+/*
+ * Return a channel object for the given key.
+ *
+ * RCU read side lock MUST be acquired before calling this function and
+ * protects the channel ptr.
+ */
static struct lttng_consumer_channel *consumer_find_channel(int key)
{
struct lttng_ht_iter iter;
return NULL;
}
- rcu_read_lock();
-
lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
channel = caa_container_of(node, struct lttng_consumer_channel, node);
}
- rcu_read_unlock();
-
return channel;
}
goto free_stream;
}
- pthread_mutex_lock(&stream->lock);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
end:
consumer_data.need_update = 1;
- pthread_mutex_unlock(&consumer_data.lock);
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
consumer_del_channel(free_chan);
goto end;
}
+ rcu_read_lock();
+
/*
* Get stream's channel reference. Needed when adding the stream to the
* global hash table.
stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
(unsigned long long) stream->mmap_len, stream->out_fd,
stream->net_seq_idx, stream->session_id);
+
+ rcu_read_unlock();
return stream;
error:
+ rcu_read_unlock();
free(stream);
end:
return NULL;
DBG3("Adding consumer stream %d", stream->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->lock);
rcu_read_lock();
/* Steal stream identifier to avoid having streams with the same key */
consumer_data.need_update = 1;
rcu_read_unlock();
+ pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
/* RCU lock for the relayd pointer */
rcu_read_lock();
- pthread_mutex_lock(&stream->lock);
-
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
return written;
/* RCU lock for the relayd pointer */
rcu_read_lock();
- pthread_mutex_lock(&stream->lock);
-
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
return written;
goto free_stream;
}
+ pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->lock);
- pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
if (stream->mmap_base != NULL) {
}
end:
- pthread_mutex_unlock(&consumer_data.lock);
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
consumer_del_channel(free_chan);
{
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
assert(stream);
assert(ht);
DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->lock);
/*
* From here, refcounts are updated so be _careful_ when returning an error
*/
rcu_read_lock();
+
+ /*
+ * Lookup the stream just to make sure it does not exist in our internal
+ * state. This should NEVER happen.
+ */
+ lttng_ht_lookup(ht, (void *)((unsigned long) stream->wait_fd), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ assert(!node);
+
/* Find relayd and, if one is found, increment refcount. */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
uatomic_dec(&stream->chan->nb_init_streams);
}
- /* Steal stream identifier to avoid having streams with the same key */
- consumer_steal_stream_key(stream->key, ht);
-
lttng_ht_add_unique_ulong(ht, &stream->node);
/*
rcu_read_unlock();
+ pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
* since their might be data to consume.
*/
lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
- close(ctx->consumer_metadata_pipe[0]);
+ ret = close(ctx->consumer_metadata_pipe[0]);
+ if (ret < 0) {
+ PERROR("close metadata pipe");
+ }
continue;
} else if (revents & LPOLLIN) {
do {
/* poll on the array of fds */
restart:
DBG("polling on %d fd", nb_fd + 1);
- num_rdy = poll(pollfd, nb_fd + 1, consumer_poll_timeout);
+ num_rdy = poll(pollfd, nb_fd + 1, -1);
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
/*
* only tracked fd in the poll set. The thread will take care of closing
* the read side.
*/
- close(ctx->consumer_metadata_pipe[1]);
+ ret = close(ctx->consumer_metadata_pipe[1]);
+ if (ret < 0) {
+ PERROR("close data pipe");
+ }
if (data_ht) {
destroy_data_stream_ht(data_ht);
*/
consumer_quit = 1;
- /*
- * 2s of grace period, if no polling events occur during
- * this period, the polling thread will exit even if there
- * are still open FDs (should not happen, but safety mechanism).
- */
- consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
-
/*
* Notify the data poll thread to poll back again and test the
* consumer_quit state that we just set so to quit gracefully.
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
+ ssize_t ret;
+
+ pthread_mutex_lock(&stream->lock);
+
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- return lttng_kconsumer_read_subbuffer(stream, ctx);
+ ret = lttng_kconsumer_read_subbuffer(stream, ctx);
+ break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- return lttng_ustconsumer_read_subbuffer(stream, ctx);
+ ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
+ break;
default:
ERR("Unknown consumer_data type");
assert(0);
- return -ENOSYS;
+ ret = -ENOSYS;
+ break;
}
+
+ pthread_mutex_unlock(&stream->lock);
+ return ret;
}
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
}
/* Close the created socket fd which is useless */
- close(relayd->control_sock.fd);
+ ret = close(relayd->control_sock.fd);
+ if (ret < 0) {
+ PERROR("close relayd control socket");
+ }
/* Assign new file descriptor */
relayd->control_sock.fd = fd;
}
/* Close the created socket fd which is useless */
- close(relayd->data_sock.fd);
+ ret = close(relayd->data_sock.fd);
+ if (ret < 0) {
+ PERROR("close relayd control socket");
+ }
/* Assign new file descriptor */
relayd->data_sock.fd = fd;
ht = consumer_data.stream_list_ht;
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct((void *)((unsigned long) id), 0x42UL),
+ ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
ht->match_fct, (void *)((unsigned long) id),
&iter.iter, stream, node_session_id.node) {
/* If this call fails, the stream is being used hence data pending. */