* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <getopt.h>
#include <grp.h>
#include <urcu/uatomic.h>
#include <unistd.h>
#include <fcntl.h>
-#include <config.h>
#include <lttng/lttng.h>
#include <common/common.h>
#include <common/sessiond-comm/relayd.h>
#include <common/uri.h>
#include <common/utils.h>
-#include <common/config/config.h>
+#include <common/config/session-config.h>
#include <urcu/rculist.h>
#include "cmd.h"
/*
* config_entry_handler_cb used to handle options read from a config file.
- * See config_entry_handler_cb comment in common/config/config.h for the
+ * See config_entry_handler_cb comment in common/config/session-config.h for the
* return value conventions.
*/
static int config_entry_handler(const struct config_entry *entry, void *unused)
* Set index data from the control port to a given index object.
*/
static int set_index_control_data(struct relay_index *index,
- struct lttcomm_relayd_index *data)
+ struct lttcomm_relayd_index *data,
+ struct relay_connection *conn)
{
struct ctf_packet_index index_data;
index_data.timestamp_end = data->timestamp_end;
index_data.events_discarded = data->events_discarded;
index_data.stream_id = data->stream_id;
+
+ if (conn->minor >= 8) {
+ index->index_data.stream_instance_id = data->stream_instance_id;
+ index->index_data.packet_seq_num = data->packet_seq_num;
+ }
+
return relay_index_set_data(index, &index_data);
}
int ret = 0, send_ret;
struct relay_session *session;
struct lttcomm_relayd_status_session reply;
- char session_name[NAME_MAX];
- char hostname[HOST_NAME_MAX];
+ char session_name[LTTNG_NAME_MAX];
+ char hostname[LTTNG_HOST_NAME_MAX];
uint32_t live_timer = 0;
bool snapshot = false;
- memset(session_name, 0, NAME_MAX);
- memset(hostname, 0, HOST_NAME_MAX);
+ memset(session_name, 0, LTTNG_NAME_MAX);
+ memset(hostname, 0, LTTNG_HOST_NAME_MAX);
memset(&reply, 0, sizeof(reply));
return ret;
}
+/*
+ * relay_reset_metadata: reset a metadata stream
+ */
+static
+int relay_reset_metadata(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret, send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_reset_metadata stream_info;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+
+ DBG("Reset metadata received");
+
+ if (!session || conn->version_check_done == 0) {
+ ERR("Trying to reset a metadata stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = conn->sock->ops->recvmsg(conn->sock, &stream_info,
+ sizeof(struct lttcomm_relayd_reset_metadata), 0);
+ if (ret < sizeof(struct lttcomm_relayd_reset_metadata)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid reset_metadata struct "
+ "size : %d", ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+ DBG("Update metadata to version %" PRIu64, be64toh(stream_info.version));
+
+ /* Unsupported for live sessions for now. */
+ if (session->live_timer != 0) {
+ ret = -1;
+ goto end;
+ }
+
+ stream = stream_get_by_id(be64toh(stream_info.stream_id));
+ if (!stream) {
+ ret = -1;
+ goto end;
+ }
+ pthread_mutex_lock(&stream->lock);
+ if (!stream->is_metadata) {
+ ret = -1;
+ goto end_unlock;
+ }
+
+ ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
+ 0, 0, -1, -1, stream->stream_fd->fd, NULL,
+ &stream->stream_fd->fd);
+ if (ret < 0) {
+ ERR("Failed to rotate metadata file %s of channel %s",
+ stream->path_name, stream->channel_name);
+ goto end_unlock;
+ }
+
+end_unlock:
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+
+end:
+ memset(&reply, 0, sizeof(reply));
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+ sizeof(struct lttcomm_relayd_generic_reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending reset metadata reply");
+ ret = send_ret;
+ }
+
+end_no_session:
+ return ret;
+}
+
/*
* relay_unknown_command: send -1 if received unknown command
*/
ERR("relay_index_get_by_id_or_create index NULL");
goto end_stream_put;
}
- if (set_index_control_data(index, &index_info)) {
+ if (set_index_control_data(index, &index_info, conn)) {
ERR("set_index_control_data error");
relay_index_put(index);
ret = -1;
case RELAYD_STREAMS_SENT:
ret = relay_streams_sent(recv_hdr, conn);
break;
+ case RELAYD_RESET_METADATA:
+ ret = relay_reset_metadata(recv_hdr, conn);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
/* Get data offset because we are about to update the index. */
data_offset = htobe64(stream->tracefile_size_current);
- DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64,
- stream->stream_handle, stream->tracefile_size_current);
+ DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
+ stream->stream_handle, net_seq_num, stream->tracefile_size_current);
/*
* Lookup for an existing index for that stream id/sequence
stream_id = be64toh(data_hdr.stream_id);
stream = stream_get_by_id(stream_id);
if (!stream) {
+ ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id);
ret = -1;
goto end;
}
if (ret == 0) {
/* Orderly shutdown. Not necessary to print an error. */
DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Socket %d error %d", conn->sock->fd, ret);
}
ret = -1;
goto end_stream_put;
if (session->minor >= 4 && !session->snapshot) {
ret = handle_index_data(stream, net_seq_num, rotate_index);
if (ret < 0) {
+ ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ stream->stream_handle, net_seq_num, ret);
goto end_stream_unlock;
}
}
ret = write_padding_to_file(stream->stream_fd->fd,
be32toh(data_hdr.padding_size));
if (ret < 0) {
+ ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ stream->stream_handle, net_seq_num, ret);
goto end_stream_unlock;
}
stream->tracefile_size_current +=