stream->key = stream_key;
stream->out_fd = -1;
stream->out_fd_offset = 0;
+ stream->output_written = 0;
stream->state = state;
stream->uid = uid;
stream->gid = gid;
channel->uid = uid;
channel->gid = gid;
channel->relayd_id = relayd_id;
- channel->output = output;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
pthread_mutex_init(&channel->lock, NULL);
pthread_mutex_init(&channel->timer_lock, NULL);
+ switch (output) {
+ case LTTNG_EVENT_SPLICE:
+ channel->output = CONSUMER_CHANNEL_SPLICE;
+ break;
+ case LTTNG_EVENT_MMAP:
+ channel->output = CONSUMER_CHANNEL_MMAP;
+ break;
+ default:
+ assert(0);
+ free(channel);
+ channel = NULL;
+ goto end;
+ }
+
/*
* In monitor mode, the streams associated with the channel will be put in
* a special list ONLY owned by this channel. So, the refcount is set to 1
outfd = stream->out_fd = ret;
/* Reset current size because we just perform a rotation. */
stream->tracefile_size_current = 0;
+ stream->out_fd_offset = 0;
+ orig_offset = 0;
}
stream->tracefile_size_current += len;
}
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += ret;
}
+ stream->output_written += ret;
written += ret;
}
lttng_consumer_sync_trace_file(stream, orig_offset);
outfd = stream->out_fd = ret;
/* Reset current size because we just perform a rotation. */
stream->tracefile_size_current = 0;
+ stream->out_fd_offset = 0;
+ orig_offset = 0;
}
stream->tracefile_size_current += len;
}
SYNC_FILE_RANGE_WRITE);
stream->out_fd_offset += ret_splice;
}
+ stream->output_written += ret_splice;
written += ret_splice;
}
lttng_consumer_sync_trace_file(stream, orig_offset);
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- /* Just don't waste time if no returned events for the fd */
- if (!revents) {
- continue;
- }
-
if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) {
if (revents & (LPOLLERR | LPOLLHUP )) {
DBG("Metadata thread pipe hung up");
struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id)
{
int fd = -1, ret = -1, relayd_created = 0;
- enum lttng_error_code ret_code = LTTNG_OK;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct consumer_relayd_sock_pair *relayd = NULL;
assert(ctx);
}
/* First send a status message before receiving the fds. */
- ret = consumer_send_status_msg(sock, LTTNG_OK);
+ ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
*/
ret = cds_lfht_is_node_deleted(&stream->node.node);
if (!ret) {
+ /*
+ * An empty output file is not valid. We need at least one packet
+ * generated per stream, even if it contains no event, so it
+ * contains at least one packet header.
+ */
+ if (stream->output_written == 0) {
+ pthread_mutex_unlock(&stream->lock);
+ goto data_pending;
+ }
/* Check the stream if there is data in the buffers. */
ret = data_pending(stream);
if (ret == 1) {
assert(sock >= 0);
if (!channel) {
- msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
+ msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
} else {
- msg.ret_code = LTTNG_OK;
+ msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
msg.key = channel->key;
msg.stream_count = channel->streams.count;
}