static char *data_buffer;
static unsigned int data_buffer_size;
-/* Global hash table that stores relay index object. */
-static struct lttng_ht *indexes_ht;
-
/* We need those values for the file/dir creation. */
static uid_t relayd_uid;
static gid_t relayd_gid;
/* Global relay stream hash table. */
struct lttng_ht *relay_streams_ht;
+/* Global relay viewer stream hash table. */
+struct lttng_ht *viewer_streams_ht;
+
+/* Global hash table that stores relay index object. */
+struct lttng_ht *indexes_ht;
+
/*
* usage function on stderr
*/
free(session);
}
+/*
+ * Close a given stream. The stream is freed using a call RCU.
+ *
+ * RCU read side lock MUST be acquired. If NO close_stream_check() was called
+ * BEFORE the stream lock MUST be acquired.
+ */
+static void destroy_stream(struct relay_stream *stream,
+ struct lttng_ht *ctf_traces_ht)
+{
+ int delret;
+ struct relay_viewer_stream *vstream;
+ struct lttng_ht_iter iter;
+
+ assert(stream);
+
+ delret = close(stream->fd);
+ if (delret < 0) {
+ PERROR("close stream");
+ }
+
+ if (stream->index_fd >= 0) {
+ delret = close(stream->index_fd);
+ if (delret < 0) {
+ PERROR("close stream index_fd");
+ }
+ }
+
+ vstream = live_find_viewer_stream_by_id(stream->stream_handle);
+ if (vstream) {
+ /*
+ * Set the last good value into the viewer stream. This is done
+ * right before the stream gets deleted from the hash table. The
+ * lookup failure on the live thread side of a stream indicates
+ * that the viewer stream index received value should be used.
+ */
+ vstream->total_index_received = stream->total_index_received;
+ }
+
+ iter.iter.node = &stream->stream_n.node;
+ delret = lttng_ht_del(relay_streams_ht, &iter);
+ assert(!delret);
+ iter.iter.node = &stream->ctf_trace_node.node;
+ delret = lttng_ht_del(ctf_traces_ht, &iter);
+ assert(!delret);
+ call_rcu(&stream->rcu_node, deferred_free_stream);
+ DBG("Closed tracefile %d from close stream", stream->fd);
+}
+
/*
* relay_delete_session: Free all memory associated with a session and
* close all the FDs
rcu_read_lock();
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, node, node) {
node = lttng_ht_iter_get_node_ulong(&iter);
- if (node) {
- stream = caa_container_of(node,
- struct relay_stream, stream_n);
- if (stream->session == cmd->session) {
- ret = close(stream->fd);
- if (ret < 0) {
- PERROR("close stream fd on delete session");
- }
- ret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!ret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- }
- /* Cleanup index of that stream. */
- relay_index_destroy_by_stream_id(stream->stream_handle,
- indexes_ht);
+ if (!node) {
+ continue;
+ }
+ stream = caa_container_of(node, struct relay_stream, stream_n);
+ if (stream->session == cmd->session) {
+ destroy_stream(stream, cmd->ctf_traces_ht);
}
+ /* Cleanup index of that stream. */
+ relay_index_destroy_by_stream_id(stream->stream_handle);
}
+
+ /* Make this session not visible anymore. */
iter.iter.node = &cmd->session->session_n.node;
ret = lttng_ht_del(sessions_ht, &iter);
assert(!ret);
- call_rcu(&cmd->session->rcu_node,
- deferred_free_session);
+ call_rcu(&cmd->session->rcu_node, deferred_free_session);
rcu_read_unlock();
}
*/
static
int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *viewer_streams_ht)
+ struct relay_command *cmd)
{
+ int ret, send_ret;
struct relay_session *session = cmd->session;
struct lttcomm_relayd_close_stream stream_info;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
- int ret, send_ret;
- struct lttng_ht_iter iter;
DBG("Close stream received");
stream->close_flag = 1;
if (close_stream_check(stream)) {
- int delret;
- struct relay_viewer_stream *vstream;
-
- delret = close(stream->fd);
- if (delret < 0) {
- PERROR("close stream");
- }
-
- if (stream->index_fd >= 0) {
- delret = close(stream->index_fd);
- if (delret < 0) {
- PERROR("close stream index_fd");
- }
- }
-
- vstream = live_find_viewer_stream_by_id(stream->stream_handle,
- viewer_streams_ht);
- if (vstream) {
- /*
- * Set the last good value into the viewer stream. This is done
- * right before the stream gets deleted from the hash table. The
- * lookup failure on the live thread side of a stream indicates
- * that the viewer stream index received value should be used.
- */
- vstream->total_index_received = stream->total_index_received;
- }
-
- iter.iter.node = &stream->stream_n.node;
- delret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!delret);
- iter.iter.node = &stream->ctf_trace_node.node;
- delret = lttng_ht_del(cmd->ctf_traces_ht, &iter);
- assert(!delret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- DBG("Closed tracefile %d from close stream", stream->fd);
+ destroy_stream(stream, cmd->ctf_traces_ht);
}
end_unlock:
*/
static
int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
- struct relay_command *cmd, struct lttng_ht *indexes_ht)
+ struct relay_command *cmd)
{
int ret, send_ret, index_created = 0;
struct relay_session *session = cmd->session;
uint64_t net_seq_num;
assert(cmd);
- assert(indexes_ht);
DBG("Relay receiving index");
stream->beacon_ts_end = -1ULL;
}
- index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht);
+ index = relay_index_find(stream->stream_handle, net_seq_num);
if (!index) {
/* A successful creation will add the object to the HT. */
index = relay_index_create(stream->stream_handle, net_seq_num);
* already exist, destroy back the index created, set the data in this
* object and write it on disk.
*/
- relay_index_add(index, indexes_ht, &wr_index);
+ relay_index_add(index, &wr_index);
if (wr_index) {
copy_index_control_data(wr_index, &index_info);
free(index);
stream->index_fd = ret;
}
- ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
+ ret = relay_index_write(wr_index->fd, wr_index);
if (ret < 0) {
goto end_rcu_unlock;
}
ret = relay_send_version(recv_hdr, cmd, ctx->sessions_ht);
break;
case RELAYD_CLOSE_STREAM:
- ret = relay_close_stream(recv_hdr, cmd, ctx->viewer_streams_ht);
+ ret = relay_close_stream(recv_hdr, cmd);
break;
case RELAYD_DATA_PENDING:
ret = relay_data_pending(recv_hdr, cmd);
ret = relay_end_data_pending(recv_hdr, cmd);
break;
case RELAYD_SEND_INDEX:
- ret = relay_recv_index(recv_hdr, cmd, indexes_ht);
+ ret = relay_recv_index(recv_hdr, cmd);
break;
case RELAYD_UPDATE_SYNC_INFO:
default:
* relay_process_data: Process the data received on the data socket
*/
static
-int relay_process_data(struct relay_command *cmd,
- struct lttng_ht *indexes_ht)
+int relay_process_data(struct relay_command *cmd)
{
int ret = 0, rotate_index = 0, index_created = 0;
struct relay_stream *stream;
* exists, the control thread already received the data for it thus we need
* to write it on disk.
*/
- index = relay_index_find(stream_id, net_seq_num, indexes_ht);
+ index = relay_index_find(stream_id, net_seq_num);
if (!index) {
/* A successful creation will add the object to the HT. */
index = relay_index_create(stream->stream_handle, net_seq_num);
* Try to add the relay index object to the hash table. If an object
* already exist, destroy back the index created and set the data.
*/
- relay_index_add(index, indexes_ht, &wr_index);
+ relay_index_add(index, &wr_index);
if (wr_index) {
/* Copy back data from the created index. */
wr_index->fd = index->fd;
stream->index_fd = ret;
}
- ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
+ ret = relay_index_write(wr_index->fd, wr_index);
if (ret < 0) {
goto end_rcu_unlock;
}
/* Check if we need to close the FD */
if (close_stream_check(stream)) {
- int cret;
- struct lttng_ht_iter iter;
-
- cret = close(stream->fd);
- if (cret < 0) {
- PERROR("close stream process data");
- }
-
- cret = close(stream->index_fd);
- if (cret < 0) {
- PERROR("close stream index_fd");
- }
- iter.iter.node = &stream->stream_n.node;
- ret = lttng_ht_del(relay_streams_ht, &iter);
- assert(!ret);
- call_rcu(&stream->rcu_node,
- deferred_free_stream);
- DBG("Closed tracefile %d after recv data", stream->fd);
+ destroy_stream(stream, cmd->ctf_traces_ht);
}
end_rcu_unlock:
continue;
}
- ret = relay_process_data(relay_connection, indexes_ht);
+ ret = relay_process_data(relay_connection);
/* connection closed */
if (ret < 0) {
relay_cleanup_poll_connection(&events, pollfd);
&iter, relay_connection, sessions_ht);
}
}
- rcu_read_unlock();
error_poll_create:
- lttng_ht_destroy(indexes_ht);
+ {
+ struct relay_index *index;
+ cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) {
+ relay_index_delete(index);
+ }
+ lttng_ht_destroy(indexes_ht);
+ }
+ rcu_read_unlock();
indexes_ht_error:
lttng_ht_destroy(relay_connections_ht);
relay_connections_ht_error:
}
/* tables of streams indexed by stream ID */
- relay_ctx->viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
- if (!relay_ctx->viewer_streams_ht) {
+ viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
+ if (!viewer_streams_ht) {
goto exit_relay_ctx_viewer_streams;
}
PERROR("pthread_join");
goto error; /* join error, exit without cleanup */
}
- lttng_ht_destroy(relay_ctx->viewer_streams_ht);
+ lttng_ht_destroy(viewer_streams_ht);
exit_relay_ctx_viewer_streams:
lttng_ht_destroy(relay_streams_ht);