#include <common/sessiond-comm/relayd.hpp>
#include <common/sessiond-comm/sessiond-comm.hpp>
#include <common/string-utils/format.hpp>
+#include <common/urcu.hpp>
#include <common/uri.hpp>
#include <common/utils.hpp>
for (i = 0; i < (sizeof(long_options) / sizeof(struct option)) - 1; i++) {
/* Ignore if entry name is not fully matched. */
- if (strcmp(entry->name, long_options[i].name)) {
+ if (strcmp(entry->name, long_options[i].name) != 0) {
continue;
}
* the data will be read at some point in time
* or wait to the end of the world :)
*/
- ret = lttng_write(relay_conn_pipe[1], &new_conn, sizeof(new_conn));
+ ret = lttng_write(
+ relay_conn_pipe[1], &new_conn, sizeof(new_conn)); /* NOLINT
+ sizeof
+ used
+ on a
+ pointer.
+ */
if (ret < 0) {
PERROR("write connection pipe");
connection_put(new_conn);
* session lock.
*/
pthread_mutex_lock(&session->lock);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_list_for_each_entry_rcu(stream, &session->recv_list, recv_node)
{
stream_publish(stream);
}
- rcu_read_unlock();
/*
* Inform the viewer that there are new streams in the session.
* to iterate over all streams to find the one associated with
* the right session_id.
*/
- rcu_read_lock();
- cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
- if (!stream_get(stream)) {
- continue;
- }
- if (stream->trace->session->id == msg.session_id) {
- pthread_mutex_lock(&stream->lock);
- stream->data_pending_check_done = false;
- pthread_mutex_unlock(&stream->lock);
- DBG("Set begin data pending flag to stream %" PRIu64,
- stream->stream_handle);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
+ if (!stream_get(stream)) {
+ continue;
+ }
+
+ if (stream->trace->session->id == msg.session_id) {
+ pthread_mutex_lock(&stream->lock);
+ stream->data_pending_check_done = false;
+ pthread_mutex_unlock(&stream->lock);
+ DBG("Set begin data pending flag to stream %" PRIu64,
+ stream->stream_handle);
+ }
+
+ stream_put(stream);
}
- stream_put(stream);
}
- rcu_read_unlock();
memset(&reply, 0, sizeof(reply));
/* All good, send back reply. */
* Iterate over all streams to see if the begin data pending
* flag is set.
*/
- rcu_read_lock();
- cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
- if (!stream_get(stream)) {
- continue;
- }
- if (stream->trace->session->id != msg.session_id) {
- stream_put(stream);
- continue;
- }
- pthread_mutex_lock(&stream->lock);
- if (!stream->data_pending_check_done) {
- uint64_t stream_seq;
+ {
+ lttng::urcu::read_lock_guard read_lock;
- if (session_streams_have_index(conn->session)) {
- /*
- * Ensure that both the index and stream data have been
- * flushed up to the requested point.
- */
- stream_seq =
- std::min(stream->prev_data_seq, stream->prev_index_seq);
- } else {
- stream_seq = stream->prev_data_seq;
+ cds_lfht_for_each_entry (relay_streams_ht->ht, &iter.iter, stream, node.node) {
+ if (!stream_get(stream)) {
+ continue;
}
- if (!stream->closed ||
- !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
- is_data_inflight = 1;
- DBG("Data is still in flight for stream %" PRIu64,
- stream->stream_handle);
- pthread_mutex_unlock(&stream->lock);
+
+ if (stream->trace->session->id != msg.session_id) {
stream_put(stream);
- break;
+ continue;
+ }
+
+ pthread_mutex_lock(&stream->lock);
+ if (!stream->data_pending_check_done) {
+ uint64_t stream_seq;
+
+ if (session_streams_have_index(conn->session)) {
+ /*
+ * Ensure that both the index and stream data have been
+ * flushed up to the requested point.
+ */
+ stream_seq = std::min(stream->prev_data_seq,
+ stream->prev_index_seq);
+ } else {
+ stream_seq = stream->prev_data_seq;
+ }
+
+ if (!stream->closed ||
+ !(((int64_t) (stream_seq - stream->last_net_seq_num)) >= 0)) {
+ is_data_inflight = 1;
+ DBG("Data is still in flight for stream %" PRIu64,
+ stream->stream_handle);
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+ break;
+ }
}
+
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
}
- pthread_mutex_unlock(&stream->lock);
- stream_put(stream);
}
- rcu_read_unlock();
memset(&reply, 0, sizeof(reply));
/* All good, send back reply. */
if (revents & LPOLLIN) {
struct relay_connection *conn;
- ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
+ ret = lttng_read(relay_conn_pipe[0],
+ &conn,
+ sizeof(conn)); /* NOLINT sizeof used on a
+ pointer. */
if (ret < 0) {
goto error;
}
exit:
error:
/* Cleanup remaining connection object. */
- rcu_read_lock();
- cds_lfht_for_each_entry (relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
- health_code_update();
+ {
+ lttng::urcu::read_lock_guard read_lock;
- session_abort(destroy_conn->session);
+ cds_lfht_for_each_entry (
+ relay_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) {
+ health_code_update();
- /*
- * No need to grab another ref, because we own
- * destroy_conn.
- */
- relay_thread_close_connection(&events, destroy_conn->sock->fd, destroy_conn);
+ session_abort(destroy_conn->session);
+
+ /*
+ * No need to grab another ref, because we own
+ * destroy_conn.
+ */
+ relay_thread_close_connection(
+ &events, destroy_conn->sock->fd, destroy_conn);
+ }
}
- rcu_read_unlock();
(void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
error_poll_create: