static char *data_buffer;
static unsigned int data_buffer_size;
-/* Global hash table that stores relay index object. */
-static struct lttng_ht *indexes_ht;
-
/* We need those values for the file/dir creation. */
static uid_t relayd_uid;
static gid_t relayd_gid;
/* Global relay viewer stream hash table. */
struct lttng_ht *viewer_streams_ht;
+/* Global hash table that stores relay index object. */
+struct lttng_ht *indexes_ht;
+
/*
* usage function on stderr
*/
free(session);
}
-static void close_stream(struct relay_stream *stream,
+/*
+ * Close a given stream. The stream is freed using a call RCU.
+ *
+ * RCU read side lock MUST be acquired. If NO close_stream_check() was called
+ * BEFORE the stream lock MUST be acquired.
+ */
+static void destroy_stream(struct relay_stream *stream,
struct lttng_ht *ctf_traces_ht)
{
int delret;
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) {
node = lttng_ht_iter_get_node_ulong(&iter);
- if (node) {
- stream = caa_container_of(node,
- struct relay_stream, stream_n);
- if (stream->session == cmd->session) {
- ret = close(stream->fd);
- if (ret < 0) {
- PERROR("close stream fd on delete session");
- }
- ret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!ret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- }
- /* Cleanup index of that stream. */
- relay_index_destroy_by_stream_id(stream->stream_handle,
- indexes_ht);
+ if (!node) {
+ continue;
}
+ stream = caa_container_of(node, struct relay_stream, stream_n);
+ if (stream->session == cmd->session) {
+ destroy_stream(stream, cmd->ctf_traces_ht);
+ }
+ /* Cleanup index of that stream. */
+ relay_index_destroy_by_stream_id(stream->stream_handle);
}
+
+ /* Make this session not visible anymore. */
iter.iter.node = &cmd->session->session_n.node;
ret = lttng_ht_del(sessions_ht, &iter);
assert(!ret);
- call_rcu(&cmd->session->rcu_node,
- deferred_free_session);
+ call_rcu(&cmd->session->rcu_node, deferred_free_session);
rcu_read_unlock();
}
stream->close_flag = 1;
if (close_stream_check(stream)) {
- close_stream(stream, cmd->ctf_traces_ht);
+ destroy_stream(stream, cmd->ctf_traces_ht);
}
end_unlock:
*/
static
int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *indexes_ht)
+ struct relay_command *cmd)
{
int ret, send_ret, index_created = 0;
struct relay_session *session = cmd->session;
uint64_t net_seq_num;
assert(cmd);
- assert(indexes_ht);
DBG("Relay receiving index");
stream->beacon_ts_end = -1ULL;
}
- index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht);
+ index = relay_index_find(stream->stream_handle, net_seq_num);
if (!index) {
/* A successful creation will add the object to the HT. */
index = relay_index_create(stream->stream_handle, net_seq_num);
* already exist, destroy back the index created, set the data in this
* object and write it on disk.
*/
- relay_index_add(index, indexes_ht, &wr_index);
+ relay_index_add(index, &wr_index);
if (wr_index) {
copy_index_control_data(wr_index, &index_info);
free(index);
stream->index_fd = ret;
}
- ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
+ ret = relay_index_write(wr_index->fd, wr_index);
if (ret < 0) {
goto end_rcu_unlock;
}
ret = relay_end_data_pending(recv_hdr, cmd);
break;
case RELAYD_SEND_INDEX:
- ret = relay_recv_index(recv_hdr, cmd, indexes_ht);
+ ret = relay_recv_index(recv_hdr, cmd);
break;
case RELAYD_UPDATE_SYNC_INFO:
default:
* relay_process_data: Process the data received on the data socket
*/
static
-int relay_process_data(struct relay_command *cmd,
- struct lttng_ht *indexes_ht)
+int relay_process_data(struct relay_command *cmd)
{
int ret = 0, rotate_index = 0, index_created = 0;
struct relay_stream *stream;
* exists, the control thread already received the data for it thus we need
* to write it on disk.
*/
- index = relay_index_find(stream_id, net_seq_num, indexes_ht);
+ index = relay_index_find(stream_id, net_seq_num);
if (!index) {
/* A successful creation will add the object to the HT. */
index = relay_index_create(stream->stream_handle, net_seq_num);
* Try to add the relay index object to the hash table. If an object
* already exist, destroy back the index created and set the data.
*/
- relay_index_add(index, indexes_ht, &wr_index);
+ relay_index_add(index, &wr_index);
if (wr_index) {
/* Copy back data from the created index. */
wr_index->fd = index->fd;
stream->index_fd = ret;
}
- ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
+ ret = relay_index_write(wr_index->fd, wr_index);
if (ret < 0) {
goto end_rcu_unlock;
}
/* Check if we need to close the FD */
if (close_stream_check(stream)) {
- close_stream(stream, cmd->ctf_traces_ht);
+ destroy_stream(stream, cmd->ctf_traces_ht);
}
end_rcu_unlock:
continue;
}
- ret = relay_process_data(relay_connection, indexes_ht);
+ ret = relay_process_data(relay_connection);
/* connection closed */
if (ret < 0) {
relay_cleanup_poll_connection(&events, pollfd);
{
struct relay_index *index;
cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
- relay_index_delete(index, indexes_ht);
+ relay_index_delete(index);
}
lttng_ht_destroy(indexes_ht);
}