#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
+#include <inttypes.h>
#include <unistd.h>
#include <common/common.h>
return -ENOENT;
}
- /* relayd need RCU read-side lock */
+ /* relayd needs RCU read-side lock */
rcu_read_lock();
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
- int fd;
- struct consumer_relayd_sock_pair *relayd;
-
- DBG("UST Consumer adding relayd socket");
-
- /* Get relayd reference if exists. */
- relayd = consumer_find_relayd(msg.u.relayd_sock.net_index);
- if (relayd == NULL) {
- /* Not found. Allocate one. */
- relayd = consumer_allocate_relayd_sock_pair(
- msg.u.relayd_sock.net_index);
- if (relayd == NULL) {
- lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
- goto end_nosignal;
- }
- }
-
- /* Poll on consumer socket. */
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
- return -EINTR;
- }
-
- /* Get relayd socket from session daemon */
- ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
- if (ret != sizeof(fd)) {
- lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
- goto end_nosignal;
- }
-
- /* Copy socket information and received FD */
- switch (msg.u.relayd_sock.type) {
- case LTTNG_STREAM_CONTROL:
- /* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->control_sock, &msg.u.relayd_sock.sock);
- ret = lttcomm_create_sock(&relayd->control_sock);
- if (ret < 0) {
- goto end_nosignal;
- }
-
- /* Close the created socket fd which is useless */
- close(relayd->control_sock.fd);
-
- /* Assign new file descriptor */
- relayd->control_sock.fd = fd;
- break;
- case LTTNG_STREAM_DATA:
- /* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->data_sock, &msg.u.relayd_sock.sock);
- ret = lttcomm_create_sock(&relayd->data_sock);
- if (ret < 0) {
- goto end_nosignal;
- }
-
- /* Close the created socket fd which is useless */
- close(relayd->data_sock.fd);
-
- /* Assign new file descriptor */
- relayd->data_sock.fd = fd;
- break;
- default:
- ERR("Unknown relayd socket type");
- goto end_nosignal;
- }
-
- DBG("Consumer %s socket created successfully with net idx %d (fd: %d)",
- msg.u.relayd_sock.type == LTTNG_STREAM_CONTROL ? "control" : "data",
- relayd->net_seq_idx, fd);
-
- /*
- * Add relayd socket pair to consumer data hashtable. If object already
- * exists or on error, the function gracefully returns.
- */
- consumer_add_relayd(relayd);
-
+ ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
+ msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
+ &msg.u.relayd_sock.sock);
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_CHANNEL:
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ rcu_read_unlock();
return -EINTR;
}
ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
if (ret != sizeof(fds)) {
lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+ rcu_read_unlock();
return ret;
}
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ rcu_read_unlock();
return -EINTR;
}
ret = lttcomm_recv_fds_unix_sock(sock, fds, nb_fd);
if (ret != sizeof(fds)) {
lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+ rcu_read_unlock();
return ret;
}
msg.u.stream.metadata_flag);
if (new_stream == NULL) {
lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
- goto end;
+ goto end_nosignal;
}
/* The stream is not metadata. Get relayd reference if exists. */
&new_stream->relayd_stream_id);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- goto end;
+ goto end_nosignal;
}
} else if (msg.u.stream.net_index != -1) {
ERR("Network sequence index %d unknown. Not adding stream.",
msg.u.stream.net_index);
free(new_stream);
- goto end;
+ goto end_nosignal;
}
if (ctx->on_recv_stream != NULL) {
if (ret == 0) {
consumer_add_stream(new_stream);
} else if (ret < 0) {
- goto end;
+ goto end_nosignal;
}
} else {
consumer_add_stream(new_stream);
}
- DBG("UST consumer_add_stream %s (%d,%d) with relayd id %lu",
+ DBG("UST consumer_add_stream %s (%d,%d) with relayd id %" PRIu64,
msg.u.stream.path_name, fds[0], fds[1],
new_stream->relayd_stream_id);
break;
{
struct consumer_relayd_sock_pair *relayd;
- DBG("UST consumer destroying relayd %zu",
+ DBG("UST consumer destroying relayd %" PRIu64,
msg.u.destroy_relayd.net_seq_idx);
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx);
if (relayd == NULL) {
- ERR("Unable to find relayd %zu",
- msg.u.destroy_relayd.net_seq_idx);
+ ERR("Unable to find relayd %" PRIu64, msg.u.destroy_relayd.net_seq_idx);
+ goto end_nosignal;
}
/* Set destroy flag for this object */
if (uatomic_read(&relayd->refcount) == 0) {
consumer_destroy_relayd(relayd);
}
- break;
+ goto end_nosignal;
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
+ rcu_read_unlock();
return -ENOSYS;
#if 0
if (ctx->on_update_stream != NULL) {
consumer_change_stream_state(msg.u.stream.stream_key,
msg.u.stream.state);
}
-#endif
break;
+#endif
}
default:
break;
}
-end:
+
/*
- * Wake-up the other end by writing a null byte in the pipe
- * (non-blocking). Important note: Because writing into the
- * pipe is non-blocking (and therefore we allow dropping wakeup
- * data, as long as there is wakeup data present in the pipe
- * buffer to wake up the other end), the other end should
- * perform the following sequence for waiting:
+ * Wake-up the other end by writing a null byte in the pipe (non-blocking).
+ * Important note: Because writing into the pipe is non-blocking (and
+ * therefore we allow dropping wakeup data, as long as there is wakeup data
+ * present in the pipe buffer to wake up the other end), the other end
+ * should perform the following sequence for waiting:
+ *
* 1) empty the pipe (reads).
* 2) perform update operation.
* 3) wait on the pipe (poll).
ret = write(ctx->consumer_poll_pipe[1], "", 1);
} while (ret < 0 && errno == EINTR);
end_nosignal:
+ /* XXX: At some point we might want to return something else than zero */
rcu_read_unlock();
return 0;
}
* display the error but continue processing to try
* to release the subbuffer
*/
- ERR("Error writing to tracefile");
+ ERR("Error writing to tracefile (expected: %ld, got: %ld)", ret, len);
}
err = ustctl_put_next_subbuf(handle, buf);
assert(err == 0);