DBG("Consumer delete channel key %" PRIu64, channel->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
call_rcu(&channel->node.head, free_channel_rcu);
end:
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
}
uint64_t session_id,
int cpu,
int *alloc_ret,
- enum consumer_channel_type type)
+ enum consumer_channel_type type,
+ unsigned int monitor)
{
int ret;
struct lttng_consumer_stream *stream;
stream->gid = gid;
stream->net_seq_idx = relayd_id;
stream->session_id = session_id;
+ stream->monitor = monitor;
+ stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
pthread_mutex_init(&stream->lock, NULL);
/* If channel is the metadata, flag this stream as metadata. */
struct lttng_ht *ht)
{
int ret = 0;
- struct consumer_relayd_sock_pair *relayd;
assert(stream);
assert(ht);
DBG3("Adding consumer stream %" PRIu64, stream->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
rcu_read_lock();
*/
lttng_ht_add_u64(consumer_data.stream_list_ht, &stream->node_session_id);
- /* Check and cleanup relayd */
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- uatomic_inc(&relayd->refcount);
- }
-
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
* in terms of destroying the associated channel, because the action that
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->timer_lock);
+ pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
goto end;
}
uatomic_inc(&relayd->refcount);
+ stream->sent_to_relayd = 1;
} else {
ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.",
stream->key, stream->net_seq_idx);
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
+ pthread_mutex_init(&channel->lock, NULL);
+ pthread_mutex_init(&channel->timer_lock, NULL);
/*
* In monitor mode, the streams associated with the channel will be put in
struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
+ pthread_mutex_lock(&channel->timer_lock);
rcu_read_lock();
lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
end:
rcu_read_unlock();
+ pthread_mutex_unlock(&channel->timer_lock);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
if (!ret && channel->wait_fd != -1 &&
struct lttng_consumer_local_data *ctx),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
- int (*update_stream)(int stream_key, uint32_t state))
+ int (*update_stream)(uint64_t stream_key, uint32_t state))
{
int ret;
struct lttng_consumer_local_data *ctx;
ctx->consumer_error_socket = -1;
ctx->consumer_metadata_socket = -1;
+ pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
/* assign the callbacks */
ctx->on_buffer_ready = buffer_ready;
ctx->on_recv_channel = recv_channel;
}
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
switch (consumer_data.type) {
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
+ if (stream->monitor) {
+ /* close the write-side in close_metadata */
+ ret = close(stream->ust_metadata_poll_pipe[0]);
+ if (ret < 0) {
+ PERROR("Close UST metadata read-side poll pipe");
+ }
+ }
lttng_ustconsumer_del_stream(stream);
break;
default:
end:
/*
* Nullify the stream reference so it is not used after deletion. The
- * consumer data lock MUST be acquired before being able to check for a
- * NULL pointer value.
+ * channel lock MUST be acquired before being able to check for
+ * a NULL pointer value.
*/
stream->chan->metadata_stream = NULL;
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
struct lttng_ht *ht)
{
int ret = 0;
- struct consumer_relayd_sock_pair *relayd;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
/*
node = lttng_ht_iter_get_node_u64(&iter);
assert(!node);
- /* Find relayd and, if one is found, increment refcount. */
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
- uatomic_inc(&relayd->refcount);
- }
-
/*
* When nb_init_stream_left reaches 0, we don't need to trigger any action
* in terms of destroying the associated channel, because the action that
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
+ pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
DBG("Metadata available on fd %d", pollfd);
assert(stream->wait_fd == pollfd);
- len = ctx->on_buffer_ready(stream, ctx);
+ do {
+ len = ctx->on_buffer_ready(stream, ctx);
+ /*
+ * We don't check the return value here since if we get
+ * a negative len, it means an error occured thus we
+ * simply remove it from the poll set and free the
+ * stream.
+ */
+ } while (len > 0);
+
/* It's ok to have an unavailable sub-buffer */
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
/* Clean up stream from consumer and free it. */
lttng_poll_del(&events, stream->wait_fd);
consumer_del_metadata_stream(stream, metadata_ht);
- } else if (len > 0) {
- stream->data_read = 1;
}
}
goto end;
}
- local_stream = zmalloc(sizeof(struct lttng_consumer_stream));
+ local_stream = zmalloc(sizeof(struct lttng_consumer_stream *));
+ if (local_stream == NULL) {
+ PERROR("local_stream malloc");
+ goto end;
+ }
while (1) {
high_prio = 0;
int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll,
- struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
+ struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttng_error_code ret_code = LTTNG_OK;
ret_code = LTTCOMM_CONSUMERD_ENOMEM;
goto error;
} else {
- relayd->sessiond_session_id = (uint64_t) sessiond_id;
+ relayd->sessiond_session_id = sessiond_id;
relayd_created = 1;
}
return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
}
+
+/*
+ * Using a maximum stream size with the produced and consumed position of a
+ * stream, computes the new consumed position to be as close as possible to the
+ * maximum possible stream size.
+ *
+ * If maximum stream size is lower than the possible buffer size (produced -
+ * consumed), the consumed_pos given is returned untouched else the new value
+ * is returned.
+ */
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+ unsigned long produced_pos, uint64_t max_stream_size)
+{
+ if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
+ /* Offset from the produced position to get the latest buffers. */
+ return produced_pos - max_stream_size;
+ }
+
+ return consumed_pos;
+}