consumer_stream_destroy(stream, ht);
}
+/*
+ * XXX naming of del vs destroy is all mixed up.
+ */
+void consumer_del_stream_for_data(struct lttng_consumer_stream *stream)
+{
+ consumer_stream_destroy(stream, data_ht);
+}
+
+void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream)
+{
+ consumer_stream_destroy(stream, metadata_ht);
+}
+
struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
uint64_t stream_key,
enum lttng_consumer_stream_state state,
/*
* Add a stream to the global list protected by a mutex.
*/
-static int add_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+int consumer_add_data_stream(struct lttng_consumer_stream *stream)
{
+ struct lttng_ht *ht = data_ht;
int ret = 0;
assert(stream);
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
rcu_read_lock();
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
+void consumer_del_data_stream(struct lttng_consumer_stream *stream)
+{
+ consumer_del_stream(stream, data_ht);
+}
+
/*
* Add relayd socket to global consumer data hashtable. RCU read side lock MUST
* be acquired before calling this.
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
pthread_mutex_init(&channel->lock, NULL);
+ pthread_mutex_init(&channel->timer_lock, NULL);
/*
* In monitor mode, the streams associated with the channel will be put in
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&channel->lock);
+ pthread_mutex_lock(&channel->timer_lock);
rcu_read_lock();
lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
end:
rcu_read_unlock();
+ pthread_mutex_unlock(&channel->timer_lock);
pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
ctx->consumer_error_socket = -1;
ctx->consumer_metadata_socket = -1;
+ pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
/* assign the callbacks */
ctx->on_buffer_ready = buffer_ready;
ctx->on_recv_channel = recv_channel;
if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
+ ret = -EPIPE;
goto end;
}
}
case LTTNG_CONSUMER_KERNEL:
mmap_base = stream->mmap_base;
ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
+ if (ret != 0) {
+ PERROR("tracer ctl get_mmap_read_offset");
+ written = -errno;
+ goto end;
+ }
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
mmap_base = lttng_ustctl_get_mmap_base(stream);
if (!mmap_base) {
ERR("read mmap get mmap base for stream %s", stream->name);
- written = -1;
+ written = -EPERM;
goto end;
}
ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
-
+ if (ret != 0) {
+ PERROR("tracer ctl get_mmap_read_offset");
+ written = ret;
+ goto end;
+ }
break;
default:
ERR("Unknown consumer_data type");
assert(0);
}
- if (ret != 0) {
- errno = -ret;
- PERROR("tracer ctl get_mmap_read_offset");
- written = ret;
- goto end;
- }
/* Handle stream on the relayd if the output is on the network */
if (relayd) {
outfd = stream->out_fd = ret;
/* Reset current size because we just perform a rotation. */
stream->tracefile_size_current = 0;
+ stream->out_fd_offset = 0;
+ orig_offset = 0;
}
stream->tracefile_size_current += len;
}
*/
DBG("Error in file write mmap");
if (written == 0) {
- written = ret;
+ written = -errno;
}
/* Socket operation failed. We consider the relayd dead */
if (errno == EPIPE || errno == EINVAL) {
if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
+ ret = -EPIPE;
goto end;
}
}
outfd = stream->out_fd = ret;
/* Reset current size because we just perform a rotation. */
stream->tracefile_size_current = 0;
+ stream->out_fd_offset = 0;
+ orig_offset = 0;
}
stream->tracefile_size_current += len;
}
end:
/*
* Nullify the stream reference so it is not used after deletion. The
- * consumer data lock MUST be acquired before being able to check for a
- * NULL pointer value.
+ * channel lock MUST be acquired before being able to check for
+ * a NULL pointer value.
*/
stream->chan->metadata_stream = NULL;
* Action done with the metadata stream when adding it to the consumer internal
* data structures to handle it.
*/
-static int add_metadata_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht)
+int consumer_add_metadata_stream(struct lttng_consumer_stream *stream)
{
+ struct lttng_ht *ht = metadata_ht;
int ret = 0;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
+ pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
/*
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
+ pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
&stream, sizeof(stream));
if (pipe_len < 0) {
- ERR("read metadata stream, ret: %ld", pipe_len);
+ ERR("read metadata stream, ret: %zd", pipe_len);
/*
* Continue here to handle the rest of the streams.
*/
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
- ret = add_metadata_stream(stream, metadata_ht);
- if (ret) {
- ERR("Unable to add metadata stream");
- /* Stream was not setup properly. Continuing. */
- consumer_del_metadata_stream(stream, NULL);
- continue;
- }
-
/* Add metadata stream to the global poll events list */
lttng_poll_add(&events, stream->wait_fd,
LPOLLIN | LPOLLPRI);
pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
&new_stream, sizeof(new_stream));
if (pipe_readlen < 0) {
- ERR("Consumer data pipe ret %ld", pipe_readlen);
+ ERR("Consumer data pipe ret %zd", pipe_readlen);
/* Continue so we can at least handle the current stream(s). */
continue;
}
continue;
}
- ret = add_stream(new_stream, data_ht);
- if (ret) {
- ERR("Consumer add stream %" PRIu64 " failed. Continuing",
- new_stream->key);
- /*
- * At this point, if the add_stream fails, it is not in the
- * hash table thus passing the NULL value here.
- */
- consumer_del_stream(new_stream, NULL);
- }
-
/* Continue to update the local streams and handle prio ones */
continue;
}