*/
#define _GNU_SOURCE
+#define _LGPL_SOURCE
#include <getopt.h>
#include <grp.h>
#include <limits.h>
#include <common/common.h>
#include <common/compat/poll.h>
#include <common/compat/socket.h>
+#include <common/compat/endian.h>
#include <common/defaults.h>
#include <common/daemonize.h>
#include <common/futex.h>
break;
case 'g':
tracing_group_name = strdup(arg);
+ if (tracing_group_name == NULL) {
+ ret = -errno;
+ PERROR("strdup");
+ goto end;
+ }
tracing_group_name_override = 1;
break;
case 'h':
if (arg) {
lttng_opt_verbose = config_parse_value(arg);
} else {
- lttng_opt_verbose += 1;
+ /* Only 3 level of verbosity (-vvv). */
+ if (lttng_opt_verbose < 3) {
+ lttng_opt_verbose += 1;
+ }
}
break;
default:
/* assign default values */
if (control_uri == NULL) {
- ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
- DEFAULT_NETWORK_CONTROL_PORT);
+ ret = asprintf(&default_address,
+ "tcp://" DEFAULT_NETWORK_CONTROL_BIND_ADDRESS ":%d",
+ DEFAULT_NETWORK_CONTROL_PORT);
if (ret < 0) {
PERROR("asprintf default data address");
goto exit;
}
}
if (data_uri == NULL) {
- ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
- DEFAULT_NETWORK_DATA_PORT);
+ ret = asprintf(&default_address,
+ "tcp://" DEFAULT_NETWORK_DATA_BIND_ADDRESS ":%d",
+ DEFAULT_NETWORK_DATA_PORT);
if (ret < 0) {
PERROR("asprintf default data address");
goto exit;
}
}
if (live_uri == NULL) {
- ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
- DEFAULT_NETWORK_VIEWER_PORT);
+ ret = asprintf(&default_address,
+ "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS ":%d",
+ DEFAULT_NETWORK_VIEWER_PORT);
if (ret < 0) {
PERROR("asprintf default viewer control address");
goto exit;
pthread_mutex_unlock(&session->viewer_ready_lock);
ret = stream_close(session, stream);
- if (!ret) {
+ if (ret || session->snapshot) {
/* Already close thus the ctf trace is being or has been destroyed. */
goto end;
}
new_conn->sock = newsock;
/* Enqueue request for the dispatcher thread. */
- cds_wfq_enqueue(&relay_conn_queue.queue, &new_conn->qnode);
+ cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail,
+ &new_conn->qnode);
/*
* Wake the dispatch queue futex. Implicit memory barrier with
- * the exchange in cds_wfq_enqueue.
+ * the exchange in cds_wfcq_enqueue.
*/
futex_nto1_wake(&relay_conn_queue.futex);
}
{
int err = -1;
ssize_t ret;
- struct cds_wfq_node *node;
+ struct cds_wfcq_node *node;
struct relay_connection *new_conn = NULL;
DBG("[thread] Relay dispatcher started");
health_code_update();
/* Dequeue commands */
- node = cds_wfq_dequeue_blocking(&relay_conn_queue.queue);
+ node = cds_wfcq_dequeue_blocking(&relay_conn_queue.head,
+ &relay_conn_queue.tail);
if (node == NULL) {
DBG("Woken up but nothing in the relay command queue");
/* Continue thread execution */
stream->session_id = session->id;
stream->index_fd = -1;
stream->read_index_fd = -1;
+ stream->ctf_stream_id = -1ULL;
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
stream->stream_handle);
end:
+ memset(&reply, 0, sizeof(reply));
reply.handle = htobe64(stream->stream_handle);
/* send the session id to the client or a negative return code on error */
if (ret < 0) {
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
stream->close_flag = 1;
session->stream_count--;
- assert(session->stream_count >= 0);
/* Check if we can close it or else the data will do it. */
try_close_stream(session, stream);
end_unlock:
rcu_read_unlock();
+ memset(&reply, 0, sizeof(reply));
if (ret < 0) {
reply.ret_code = htobe32(LTTNG_ERR_UNK);
} else {
struct lttcomm_relayd_generic_reply reply;
int ret;
+ memset(&reply, 0, sizeof(reply));
reply.ret_code = htobe32(LTTNG_ERR_UNK);
ret = conn->sock->ops->sendmsg(conn->sock, &reply,
sizeof(struct lttcomm_relayd_generic_reply), 0);
ret = htobe32(LTTNG_ERR_UNK);
}
+ memset(&reply, 0, sizeof(reply));
reply.ret_code = ret;
ret = conn->sock->ops->sendmsg(conn->sock, &reply,
sizeof(struct lttcomm_relayd_generic_reply), 0);
goto end;
}
+ memset(&reply, 0, sizeof(reply));
reply.major = RELAYD_VERSION_COMM_MAJOR;
reply.minor = RELAYD_VERSION_COMM_MINOR;
end_unlock:
rcu_read_unlock();
+ memset(&reply, 0, sizeof(reply));
reply.ret_code = htobe32(ret);
ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
if (ret < 0) {
}
rcu_read_unlock();
+ memset(&reply, 0, sizeof(reply));
reply.ret_code = htobe32(LTTNG_OK);
ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
if (ret < 0) {
}
rcu_read_unlock();
+ memset(&reply, 0, sizeof(reply));
/* All good, send back reply. */
reply.ret_code = htobe32(LTTNG_OK);
cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
node.node) {
if (stream->session_id == session_id &&
- !stream->data_pending_check_done) {
+ !stream->data_pending_check_done && !stream->terminated_flag) {
is_data_inflight = 1;
DBG("Data is still in flight for stream %" PRIu64,
stream->stream_handle);
}
rcu_read_unlock();
+ memset(&reply, 0, sizeof(reply));
/* All good, send back reply. */
reply.ret_code = htobe32(is_data_inflight);
DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
/*
- * Only flag a stream inactive when it has already received data.
+ * Only flag a stream inactive when it has already received data
+ * and no indexes are in flight.
*/
- if (stream->total_index_received > 0) {
+ if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
stream->beacon_ts_end = be64toh(index_info.timestamp_end);
}
ret = 0;
goto end_rcu_unlock;
}
index_created = 1;
+ stream->indexes_in_flight++;
}
copy_index_control_data(index, &index_info);
+ if (stream->ctf_stream_id == -1ULL) {
+ stream->ctf_stream_id = be64toh(index_info.stream_id);
+ }
if (index_created) {
/*
/* Do we have a writable ready index to write on disk. */
if (wr_index) {
- /* Starting at 2.4, create the index file if none available. */
- if (conn->minor >= 4 && stream->index_fd < 0) {
- ret = index_create_file(stream->path_name, stream->channel_name,
- relayd_uid, relayd_gid, stream->tracefile_size,
- stream->tracefile_count_current);
- if (ret < 0) {
- goto end_rcu_unlock;
- }
- stream->index_fd = ret;
- }
-
ret = relay_index_write(wr_index->fd, wr_index);
if (ret < 0) {
goto end_rcu_unlock;
}
stream->total_index_received++;
+ stream->indexes_in_flight--;
+ assert(stream->indexes_in_flight >= 0);
}
end_rcu_unlock:
rcu_read_unlock();
+ memset(&reply, 0, sizeof(reply));
if (ret < 0) {
reply.ret_code = htobe32(LTTNG_ERR_UNK);
} else {
/*
* Inform the viewer that there are new streams in the session.
*/
- uatomic_set(&conn->session->new_streams, 1);
+ if (conn->session->viewer_refcount) {
+ uatomic_set(&conn->session->new_streams, 1);
+ }
+ memset(&reply, 0, sizeof(reply));
reply.ret_code = htobe32(LTTNG_OK);
send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
if (send_ret < 0) {
goto error;
}
index_created = 1;
+ stream->indexes_in_flight++;
}
if (rotate_index || stream->index_fd < 0) {
goto error;
}
stream->total_index_received++;
+ stream->indexes_in_flight--;
+ assert(stream->indexes_in_flight >= 0);
}
error:
stream->tracefile_size, stream->tracefile_count,
relayd_uid, relayd_gid, stream->fd,
&(stream->tracefile_count_current), &stream->fd);
- stream->total_index_received = 0;
pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
if (ret < 0) {
ERR("Rotating stream output file");
connection_delete(relay_connections_ht, conn);
/* For the control socket, we try to destroy the session. */
- if (conn->type == RELAY_CONTROL) {
+ if (conn->type == RELAY_CONTROL && conn->session) {
destroy_session(conn->session, conn->sessions_ht);
}
struct lttcomm_relayd_hdr recv_hdr;
struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
+ struct relay_index *index;
DBG("[thread] Relay worker started");
if (revents & LPOLLIN) {
if (conn->type != RELAY_DATA) {
+ rcu_read_unlock();
continue;
}
}
rcu_read_unlock();
error_poll_create:
+ rcu_read_lock();
+ cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index,
+ index_n.node) {
+ health_code_update();
+ relay_index_delete(index);
+ relay_index_free_safe(index);
+ }
+ rcu_read_unlock();
lttng_ht_destroy(indexes_ht);
indexes_ht_error:
lttng_ht_destroy(relay_connections_ht);
}
/* Init relay command queue. */
- cds_wfq_init(&relay_conn_queue.queue);
+ cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail);
/* Set up max poll set size */
lttng_poll_set_max_size();