return ret;
}
+/*
+ * Find a relayd and send the streams sent message
+ *
+ * Returns 0 on success, < 0 on error
+ */
+int consumer_send_relayd_streams_sent(uint64_t net_seq_idx)
+{
+ int ret = 0;
+ struct consumer_relayd_sock_pair *relayd;
+
+ assert(net_seq_idx != -1ULL);
+
+ /* The stream is not metadata. Get relayd reference if exists. */
+ rcu_read_lock();
+ relayd = consumer_find_relayd(net_seq_idx);
+ if (relayd != NULL) {
+ /* Add stream on the relayd */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_streams_sent(&relayd->control_sock);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto end;
+ }
+ } else {
+ ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.",
+ net_seq_idx);
+ ret = -1;
+ goto end;
+ }
+
+ ret = 0;
+ DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
/*
* Find a relayd and close the stream
*/
channel->uid = uid;
channel->gid = gid;
channel->relayd_id = relayd_id;
- channel->output = output;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
pthread_mutex_init(&channel->lock, NULL);
pthread_mutex_init(&channel->timer_lock, NULL);
+ switch (output) {
+ case LTTNG_EVENT_SPLICE:
+ channel->output = CONSUMER_CHANNEL_SPLICE;
+ break;
+ case LTTNG_EVENT_MMAP:
+ channel->output = CONSUMER_CHANNEL_MMAP;
+ break;
+ default:
+ assert(0);
+ free(channel);
+ channel = NULL;
+ goto end;
+ }
+
/*
* In monitor mode, the streams associated with the channel will be put in
* a special list ONLY owned by this channel. So, the refcount is set to 1
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
unsigned long padding,
- struct lttng_packet_index *index)
+ struct ctf_packet_index *index)
{
unsigned long mmap_offset;
void *mmap_base;
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
unsigned long padding,
- struct lttng_packet_index *index)
+ struct ctf_packet_index *index)
{
ssize_t ret = 0, written = 0, ret_splice = 0;
loff_t offset = 0;
uint64_t relayd_session_id)
{
int fd = -1, ret = -1, relayd_created = 0;
- enum lttng_error_code ret_code = LTTNG_OK;
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct consumer_relayd_sock_pair *relayd = NULL;
assert(ctx);
}
/* First send a status message before receiving the fds. */
- ret = consumer_send_status_msg(sock, LTTNG_OK);
+ ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_FATAL);
assert(sock >= 0);
if (!channel) {
- msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
+ msg.ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
} else {
- msg.ret_code = LTTNG_OK;
+ msg.ret_code = LTTCOMM_CONSUMERD_SUCCESS;
msg.key = channel->key;
msg.stream_count = channel->streams.count;
}