projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Fix: Add output option to enable-channel command
[lttng-tools.git]
/
src
/
bin
/
lttng-relayd
/
main.c
diff --git
a/src/bin/lttng-relayd/main.c
b/src/bin/lttng-relayd/main.c
index 0f81d556dd4df73a910471756b75cf6567581a36..18e60d1efbbe40fb45c678adc149b4e3c366468e 100644
(file)
--- a/
src/bin/lttng-relayd/main.c
+++ b/
src/bin/lttng-relayd/main.c
@@
-241,6
+241,9
@@
void cleanup(void)
/* Close relay cmd pipes */
utils_close_pipe(relay_cmd_pipe);
/* Close relay cmd pipes */
utils_close_pipe(relay_cmd_pipe);
+
+ uri_free(control_uri);
+ uri_free(data_uri);
}
/*
}
/*
@@
-884,7
+887,8
@@
void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht
}
DBG("Relay deleting session %" PRIu64, cmd->session->id);
}
DBG("Relay deleting session %" PRIu64, cmd->session->id);
- free(cmd->session->sock);
+
+ lttcomm_destroy_sock(cmd->session->sock);
rcu_read_lock();
cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) {
rcu_read_lock();
cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) {
@@
-902,6
+906,8
@@
void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht
}
}
rcu_read_unlock();
}
}
rcu_read_unlock();
+
+ free(cmd->session);
}
/*
}
/*
@@
-1176,6
+1182,8
@@
static int write_padding_to_file(int fd, uint32_t size)
PERROR("write padding to file");
}
PERROR("write padding to file");
}
+ free(zeros);
+
end:
return ret;
}
end:
return ret;
}
@@
-1208,9
+1216,12
@@
int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
payload_size -= sizeof(struct lttcomm_relayd_metadata_payload);
if (data_buffer_size < data_size) {
payload_size -= sizeof(struct lttcomm_relayd_metadata_payload);
if (data_buffer_size < data_size) {
+ /* In case the realloc fails, we can free the memory */
+ char *tmp_data_ptr = data_buffer;
data_buffer = realloc(data_buffer, data_size);
if (!data_buffer) {
ERR("Allocating data buffer");
data_buffer = realloc(data_buffer, data_size);
if (!data_buffer) {
ERR("Allocating data buffer");
+ free(tmp_data_ptr);
ret = -1;
goto end;
}
ret = -1;
goto end;
}
@@
-1306,14
+1317,14
@@
end:
}
/*
}
/*
- * Check for data
availability
for a given stream id from the session daemon.
+ * Check for data
pending
for a given stream id from the session daemon.
*/
static
*/
static
-int relay_data_
available
(struct lttcomm_relayd_hdr *recv_hdr,
+int relay_data_
pending
(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_command *cmd, struct lttng_ht *streams_ht)
{
struct relay_session *session = cmd->session;
struct relay_command *cmd, struct lttng_ht *streams_ht)
{
struct relay_session *session = cmd->session;
- struct lttcomm_relayd_data_
available
msg;
+ struct lttcomm_relayd_data_
pending
msg;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
int ret;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
int ret;
@@
-1321,7
+1332,7
@@
int relay_data_available(struct lttcomm_relayd_hdr *recv_hdr,
struct lttng_ht_iter iter;
uint64_t last_net_seq_num, stream_id;
struct lttng_ht_iter iter;
uint64_t last_net_seq_num, stream_id;
- DBG("Data
available
command received");
+ DBG("Data
pending
command received");
if (!session || session->version_check_done == 0) {
ERR("Trying to check for data before version check");
if (!session || session->version_check_done == 0) {
ERR("Trying to check for data before version check");
@@
-1331,7
+1342,7
@@
int relay_data_available(struct lttcomm_relayd_hdr *recv_hdr,
ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), MSG_WAITALL);
if (ret < sizeof(msg)) {
ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), MSG_WAITALL);
if (ret < sizeof(msg)) {
- ERR("Relay didn't receive valid data_
available
struct size : %d", ret);
+ ERR("Relay didn't receive valid data_
pending
struct size : %d", ret);
ret = -1;
goto end_no_session;
}
ret = -1;
goto end_no_session;
}
@@
-1351,16
+1362,17
@@
int relay_data_available(struct lttcomm_relayd_hdr *recv_hdr,
stream = caa_container_of(node, struct relay_stream, stream_n);
assert(stream);
stream = caa_container_of(node, struct relay_stream, stream_n);
assert(stream);
- DBG("Data
available
for stream id %" PRIu64 " prev_seq %" PRIu64
+ DBG("Data
pending
for stream id %" PRIu64 " prev_seq %" PRIu64
" and last_seq %" PRIu64, stream_id, stream->prev_seq,
last_net_seq_num);
" and last_seq %" PRIu64, stream_id, stream->prev_seq,
last_net_seq_num);
- if (stream->prev_seq == -1UL || stream->prev_seq <= last_net_seq_num) {
- /* Data has in fact been written and is available */
- ret = 1;
- } else {
- /* Data still being streamed. */
+ /* Avoid wrapping issue */
+ if (((int64_t) (stream->prev_seq - last_net_seq_num)) <= 0) {
+ /* Data has in fact been written and is NOT pending */
ret = 0;
ret = 0;
+ } else {
+ /* Data still being streamed thus pending */
+ ret = 1;
}
end_unlock:
}
end_unlock:
@@
-1369,7
+1381,7
@@
end_unlock:
reply.ret_code = htobe32(ret);
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (ret < 0) {
reply.ret_code = htobe32(ret);
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (ret < 0) {
- ERR("Relay data
available
ret code failed");
+ ERR("Relay data
pending
ret code failed");
}
end_no_session:
}
end_no_session:
@@
-1395,7
+1407,7
@@
int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
reply.ret_code = htobe32(LTTNG_OK);
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (ret < 0) {
reply.ret_code = htobe32(LTTNG_OK);
ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
if (ret < 0) {
- ERR("Relay data
available
ret code failed");
+ ERR("Relay data
quiescent control
ret code failed");
}
return ret;
}
return ret;
@@
-1431,8
+1443,8
@@
int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
case RELAYD_CLOSE_STREAM:
ret = relay_close_stream(recv_hdr, cmd, streams_ht);
break;
case RELAYD_CLOSE_STREAM:
ret = relay_close_stream(recv_hdr, cmd, streams_ht);
break;
- case RELAYD_DATA_
AVAILABLE
:
- ret = relay_data_
available
(recv_hdr, cmd, streams_ht);
+ case RELAYD_DATA_
PENDING
:
+ ret = relay_data_
pending
(recv_hdr, cmd, streams_ht);
break;
case RELAYD_QUIESCENT_CONTROL:
ret = relay_quiescent_control(recv_hdr, cmd);
break;
case RELAYD_QUIESCENT_CONTROL:
ret = relay_quiescent_control(recv_hdr, cmd);
@@
-1481,9
+1493,11
@@
int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht)
data_size = be32toh(data_hdr.data_size);
if (data_buffer_size < data_size) {
data_size = be32toh(data_hdr.data_size);
if (data_buffer_size < data_size) {
+ char *tmp_data_ptr = data_buffer;
data_buffer = realloc(data_buffer, data_size);
if (!data_buffer) {
ERR("Allocating data buffer");
data_buffer = realloc(data_buffer, data_size);
if (!data_buffer) {
ERR("Allocating data buffer");
+ free(tmp_data_ptr);
ret = -1;
goto end_unlock;
}
ret = -1;
goto end_unlock;
}
@@
-1591,6
+1605,8
@@
void deferred_free_connection(struct rcu_head *head)
{
struct relay_command *relay_connection =
caa_container_of(head, struct relay_command, rcu_node);
{
struct relay_command *relay_connection =
caa_container_of(head, struct relay_command, rcu_node);
+
+ lttcomm_destroy_sock(relay_connection->sock);
free(relay_connection);
}
free(relay_connection);
}
@@
-1606,6
+1622,7
@@
void relay_del_connection(struct lttng_ht *relay_connections_ht,
if (relay_connection->type == RELAY_CONTROL) {
relay_delete_session(relay_connection, streams_ht);
}
if (relay_connection->type == RELAY_CONTROL) {
relay_delete_session(relay_connection, streams_ht);
}
+
call_rcu(&relay_connection->rcu_node,
deferred_free_connection);
}
call_rcu(&relay_connection->rcu_node,
deferred_free_connection);
}
This page took
0.038416 seconds
and
4
git commands to generate.