#include <common/common.h>
#include <common/utils.h>
#include <common/compat/poll.h>
+#include <common/index/index.h>
#include <common/kernel-ctl/kernel-ctl.h>
#include <common/sessiond-comm/relayd.h>
#include <common/sessiond-comm/sessiond-comm.h>
#include <common/kernel-consumer/kernel-consumer.h>
#include <common/relayd/relayd.h>
#include <common/ust-consumer/ust-consumer.h>
+#include <common/consumer-timer.h>
#include "consumer.h"
#include "consumer-stream.h"
consumer_stream_destroy(stream, NULL);
}
+ if (channel->live_timer_enabled == 1) {
+ consumer_timer_live_stop(channel);
+ }
+
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
stream->session_id = session_id;
stream->monitor = monitor;
stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
+ stream->index_fd = -1;
pthread_mutex_init(&stream->lock, NULL);
/* If channel is the metadata, flag this stream as metadata. */
stream->metadata_flag = 1;
/* Metadata is flat out. */
strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
+ /* Live rendez-vous point. */
+ pthread_cond_init(&stream->metadata_rdv, NULL);
+ pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
} else {
/* Format stream name to <channel_name>_<cpu_number> */
ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
if (ret < 0) {
goto end;
}
+
uatomic_inc(&relayd->refcount);
stream->sent_to_relayd = 1;
} else {
uint64_t tracefile_size,
uint64_t tracefile_count,
uint64_t session_id_per_pid,
- unsigned int monitor)
+ unsigned int monitor,
+ unsigned int live_timer_interval)
{
struct lttng_consumer_channel *channel;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
+ channel->live_timer_interval = live_timer_interval;
pthread_mutex_init(&channel->lock, NULL);
pthread_mutex_init(&channel->timer_lock, NULL);
ssize_t lttng_consumer_on_read_subbuffer_mmap(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding)
+ unsigned long padding,
+ struct lttng_packet_index *index)
{
unsigned long mmap_offset;
void *mmap_base;
ret = utils_rotate_stream_file(stream->chan->pathname,
stream->name, stream->chan->tracefile_size,
stream->chan->tracefile_count, stream->uid, stream->gid,
- stream->out_fd, &(stream->tracefile_count_current));
+ stream->out_fd, &(stream->tracefile_count_current),
+ &stream->out_fd);
if (ret < 0) {
ERR("Rotating output file");
goto end;
}
- outfd = stream->out_fd = ret;
+ outfd = stream->out_fd;
+
+ if (stream->index_fd >= 0) {
+ ret = index_create_file(stream->chan->pathname,
+ stream->name, stream->uid, stream->gid,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ goto end;
+ }
+ stream->index_fd = ret;
+ }
+
/* Reset current size because we just perform a rotation. */
stream->tracefile_size_current = 0;
stream->out_fd_offset = 0;
orig_offset = 0;
}
stream->tracefile_size_current += len;
+ if (index) {
+ index->offset = htobe64(stream->out_fd_offset);
+ }
}
while (len > 0) {
ssize_t lttng_consumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding)
+ unsigned long padding,
+ struct lttng_packet_index *index)
{
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
ret = utils_rotate_stream_file(stream->chan->pathname,
stream->name, stream->chan->tracefile_size,
stream->chan->tracefile_count, stream->uid, stream->gid,
- stream->out_fd, &(stream->tracefile_count_current));
+ stream->out_fd, &(stream->tracefile_count_current),
+ &stream->out_fd);
if (ret < 0) {
ERR("Rotating output file");
goto end;
}
- outfd = stream->out_fd = ret;
+ outfd = stream->out_fd;
+
+ if (stream->index_fd >= 0) {
+ ret = index_create_file(stream->chan->pathname,
+ stream->name, stream->uid, stream->gid,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ goto end;
+ }
+ stream->index_fd = ret;
+ }
+
/* Reset current size because we just perform a rotation. */
stream->tracefile_size_current = 0;
stream->out_fd_offset = 0;
orig_offset = 0;
}
stream->tracefile_size_current += len;
+ index->offset = htobe64(stream->out_fd_offset);
}
while (len > 0) {
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- /* Just don't waste time if no returned events for the fd */
- if (!revents) {
- continue;
- }
-
if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
if (revents & (LPOLLERR | LPOLLHUP )) {
DBG("Metadata thread pipe hung up");
ssize_t ret;
pthread_mutex_lock(&stream->lock);
+ if (stream->metadata_flag) {
+ pthread_mutex_lock(&stream->metadata_rdv_lock);
+ }
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
}
+ if (stream->metadata_flag) {
+ pthread_cond_broadcast(&stream->metadata_rdv);
+ pthread_mutex_unlock(&stream->metadata_rdv_lock);
+ }
pthread_mutex_unlock(&stream->lock);
return ret;
}
int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll,
- struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id)
+ struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
+ uint64_t relayd_session_id)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttng_error_code ret_code = LTTNG_OK;
relayd->control_sock.major = relayd_sock->major;
relayd->control_sock.minor = relayd_sock->minor;
- /*
- * Create a session on the relayd and store the returned id. Lock the
- * control socket mutex if the relayd was NOT created before.
- */
- if (!relayd_created) {
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- }
- ret = relayd_create_session(&relayd->control_sock,
- &relayd->relayd_session_id);
- if (!relayd_created) {
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- }
- if (ret < 0) {
- /*
- * Close all sockets of a relayd object. It will be freed if it was
- * created at the error code path or else it will be garbage
- * collect.
- */
- (void) relayd_close(&relayd->control_sock);
- (void) relayd_close(&relayd->data_sock);
- ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
- goto error;
- }
+ relayd->relayd_session_id = relayd_session_id;
break;
case LTTNG_STREAM_DATA: