}
} else {
if (!stream->closed ||
- !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
+ !(((int64_t) (stream->prev_data_seq - stream->last_net_seq_num)) >= 0)) {
(*nb_total)++;
}
if (ret < 0) {
goto error;
}
- DBG("Listening on sock %d for live", sock->fd);
+ DBG("Listening on sock %d for lttng-live", sock->fd);
ret = sock->ops->bind(sock);
if (ret < 0) {
+ PERROR("Failed to bind lttng-live socket");
goto error;
}
revents = LTTNG_POLL_GETEV(&events, i);
pollfd = LTTNG_POLL_GETFD(&events, i);
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
health_code_update();
- while (!CMM_LOAD_SHARED(live_dispatch_thread_exit)) {
+ for (;;) {
health_code_update();
/* Atomically prepare the queue futex */
futex_nto1_prepare(&viewer_conn_queue.futex);
+ if (CMM_LOAD_SHARED(live_dispatch_thread_exit)) {
+ break;
+ }
+
do {
health_code_update();
health_code_update();
+ if (session->connection_closed) {
+ /* Skip closed session */
+ continue;
+ }
+
if (count >= buf_count) {
struct lttng_viewer_session *newbuf;
uint32_t new_buf_count = buf_count << 1;
session = session_get_by_id(be64toh(request.session_id));
if (!session) {
DBG("Relay session %" PRIu64 " not found",
- be64toh(request.session_id));
+ (uint64_t) be64toh(request.session_id));
response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
goto send_reply;
}
DBG("Attach session ID %" PRIu64 " received",
- be64toh(request.session_id));
+ (uint64_t) be64toh(request.session_id));
if (session->live_timer == 0) {
DBG("Not live session");
if (closed) {
send_streams = 0;
response.streams_count = 0;
- response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
+ response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
goto send_reply;
}
struct relay_stream *rstream)
{
int ret = 0;
+ const uint32_t connection_major = rstream->trace->session->major;
+ const uint32_t connection_minor = rstream->trace->session->minor;
if (vstream->index_file) {
goto end;
ret = -ENOENT;
goto end;
}
- vstream->index_file = lttng_index_file_open(vstream->path_name,
- vstream->channel_name,
- vstream->stream->tracefile_count,
- vstream->current_tracefile_id);
+ vstream->index_file = lttng_index_file_create_from_trace_chunk_read_only(
+ rstream->trace_chunk, rstream->path_name,
+ rstream->channel_name, rstream->tracefile_size,
+ vstream->current_tracefile_id,
+ lttng_to_index_major(connection_major, connection_minor),
+ lttng_to_index_minor(connection_major, connection_minor));
if (!vstream->index_file) {
ret = -1;
}
{
int ret;
- if (trace->session->connection_closed
+ if ((trace->session->connection_closed || rstream->closed)
&& rstream->index_received_seqcount
== vstream->index_sent_seqcount) {
- /* Last index sent and session connection is closed. */
+ /*
+ * Last index sent and session connection or relay
+ * stream are closed.
+ */
index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
goto hup;
} else if (rstream->beacon_ts_end != -1ULL &&
vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
if (!vstream) {
DBG("Client requested index of unknown stream id %" PRIu64,
- be64toh(request_index.stream_id));
+ (uint64_t) be64toh(request_index.stream_id));
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
goto send_reply;
}
* overwrite caused by tracefile rotation (in association with
* unlink performed before overwrite).
*/
- if (!vstream->stream_fd) {
- char fullpath[PATH_MAX];
-
- if (vstream->stream->tracefile_count > 0) {
- ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64,
- vstream->path_name,
- vstream->channel_name,
- vstream->current_tracefile_id);
- } else {
- ret = snprintf(fullpath, PATH_MAX, "%s/%s",
- vstream->path_name,
- vstream->channel_name);
- }
+ if (!vstream->stream_file.fd) {
+ int fd;
+ char file_path[LTTNG_PATH_MAX];
+ enum lttng_trace_chunk_status status;
+
+ ret = utils_stream_file_path(rstream->path_name,
+ rstream->channel_name, rstream->tracefile_size,
+ vstream->current_tracefile_id, NULL, file_path,
+ sizeof(file_path));
if (ret < 0) {
goto error_put;
}
- ret = open(fullpath, O_RDONLY);
- if (ret < 0) {
- PERROR("Relay opening trace file");
+
+ status = lttng_trace_chunk_open_file(
+ vstream->stream_file.trace_chunk,
+ file_path, O_RDONLY, 0, &fd);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ PERROR("Failed to open trace file for viewer stream");
goto error_put;
}
- vstream->stream_fd = stream_fd_create(ret);
- if (!vstream->stream_fd) {
- if (close(ret)) {
- PERROR("close");
+ vstream->stream_file.fd = stream_fd_create(fd);
+ if (!vstream->stream_file.fd) {
+ if (close(fd)) {
+ PERROR("Failed to close viewer stream file");
}
goto error_put;
}
*/
DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64,
rstream->stream_handle,
- be64toh(packet_index.offset));
+ (uint64_t) be64toh(packet_index.offset));
viewer_index.offset = packet_index.offset;
viewer_index.packet_size = packet_index.packet_size;
viewer_index.content_size = packet_index.content_size;
static
int viewer_get_packet(struct relay_connection *conn)
{
- int ret, send_data = 0;
- char *data = NULL;
- uint32_t len = 0;
- ssize_t read_len;
+ int ret;
+ off_t lseek_ret;
+ char *reply = NULL;
struct lttng_viewer_get_packet get_packet_info;
- struct lttng_viewer_trace_packet reply;
+ struct lttng_viewer_trace_packet reply_header;
struct relay_viewer_stream *vstream = NULL;
+ uint32_t reply_size = sizeof(reply_header);
+ uint32_t packet_data_len = 0;
+ ssize_t read_len;
DBG2("Relay get data packet");
health_code_update();
/* From this point on, the error label can be reached. */
- memset(&reply, 0, sizeof(reply));
+ memset(&reply_header, 0, sizeof(reply_header));
vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
if (!vstream) {
DBG("Client requested packet of unknown stream id %" PRIu64,
- be64toh(get_packet_info.stream_id));
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+ (uint64_t) be64toh(get_packet_info.stream_id));
+ reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
goto send_reply_nolock;
+ } else {
+ packet_data_len = be32toh(get_packet_info.len);
+ reply_size += packet_data_len;
}
- pthread_mutex_lock(&vstream->stream->lock);
-
- len = be32toh(get_packet_info.len);
- data = zmalloc(len);
- if (!data) {
- PERROR("relay data zmalloc");
+ reply = zmalloc(reply_size);
+ if (!reply) {
+ PERROR("packet reply zmalloc");
+ reply_size = sizeof(reply_header);
goto error;
}
- ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
- SEEK_SET);
- if (ret < 0) {
- PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd,
- be64toh(get_packet_info.offset));
+ pthread_mutex_lock(&vstream->stream->lock);
+ lseek_ret = lseek(vstream->stream_file.fd->fd,
+ be64toh(get_packet_info.offset), SEEK_SET);
+ if (lseek_ret < 0) {
+ PERROR("lseek fd %d to offset %" PRIu64,
+ vstream->stream_file.fd->fd,
+ (uint64_t) be64toh(get_packet_info.offset));
goto error;
}
- read_len = lttng_read(vstream->stream_fd->fd, data, len);
- if (read_len < len) {
+ read_len = lttng_read(vstream->stream_file.fd->fd,
+ reply + sizeof(reply_header), packet_data_len);
+ if (read_len < packet_data_len) {
PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
- vstream->stream_fd->fd,
- be64toh(get_packet_info.offset));
+ vstream->stream_file.fd->fd,
+ (uint64_t) be64toh(get_packet_info.offset));
goto error;
}
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
- reply.len = htobe32(len);
- send_data = 1;
+ reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
+ reply_header.len = htobe32(packet_data_len);
goto send_reply;
error:
- reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+ reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
send_reply:
if (vstream) {
pthread_mutex_unlock(&vstream->stream->lock);
}
send_reply_nolock:
- reply.flags = htobe32(reply.flags);
health_code_update();
- ret = send_response(conn->sock, &reply, sizeof(reply));
- if (ret < 0) {
- goto end_free;
+ if (reply) {
+ memcpy(reply, &reply_header, sizeof(reply_header));
+ ret = send_response(conn->sock, reply, reply_size);
+ } else {
+ /* No reply to send. */
+ ret = send_response(conn->sock, &reply_header,
+ reply_size);
}
- health_code_update();
- if (send_data) {
- health_code_update();
- ret = send_response(conn->sock, data, len);
- if (ret < 0) {
- goto end_free;
- }
- health_code_update();
+ health_code_update();
+ if (ret < 0) {
+ PERROR("sendmsg of packet data failed");
+ goto end_free;
}
- DBG("Sent %u bytes for stream %" PRIu64, len,
- be64toh(get_packet_info.stream_id));
+ DBG("Sent %u bytes for stream %" PRIu64, reply_size,
+ (uint64_t) be64toh(get_packet_info.stream_id));
end_free:
- free(data);
+ free(reply);
end:
if (vstream) {
viewer_stream_put(vstream);
* find it.
*/
DBG("Client requested metadata of unknown stream id %" PRIu64,
- be64toh(request.stream_id));
+ (uint64_t) be64toh(request.stream_id));
reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
goto send_reply;
}
}
/* first time, we open the metadata file */
- if (!vstream->stream_fd) {
- char fullpath[PATH_MAX];
-
- ret = snprintf(fullpath, PATH_MAX, "%s/%s", vstream->path_name,
- vstream->channel_name);
+ if (!vstream->stream_file.fd) {
+ int fd;
+ char file_path[LTTNG_PATH_MAX];
+ enum lttng_trace_chunk_status status;
+ struct relay_stream *rstream = vstream->stream;
+
+ ret = utils_stream_file_path(rstream->path_name,
+ rstream->channel_name, rstream->tracefile_size,
+ vstream->current_tracefile_id, NULL, file_path,
+ sizeof(file_path));
if (ret < 0) {
goto error;
}
- ret = open(fullpath, O_RDONLY);
- if (ret < 0) {
- PERROR("Relay opening metadata file");
+
+ status = lttng_trace_chunk_open_file(
+ vstream->stream_file.trace_chunk,
+ file_path, O_RDONLY, 0, &fd);
+ if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ PERROR("Failed to open metadata file for viewer stream");
goto error;
}
- vstream->stream_fd = stream_fd_create(ret);
- if (!vstream->stream_fd) {
- if (close(ret)) {
- PERROR("close");
+ vstream->stream_file.fd = stream_fd_create(fd);
+ if (!vstream->stream_file.fd) {
+ if (close(fd)) {
+ PERROR("Failed to close viewer metadata file");
}
goto error;
}
goto error;
}
- read_len = lttng_read(vstream->stream_fd->fd, data, len);
+ read_len = lttng_read(vstream->stream_file.fd->fd, data, len);
if (read_len < len) {
PERROR("Relay reading metadata file");
goto error;
}
DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, len,
- be64toh(request.stream_id));
+ (uint64_t) be64toh(request.stream_id));
DBG("Metadata sent");
session = session_get_by_id(be64toh(request.session_id));
if (!session) {
DBG("Relay session %" PRIu64 " not found",
- be64toh(request.session_id));
+ (uint64_t) be64toh(request.session_id));
response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_UNK);
goto send_reply;
}
health_code_update();
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
if (ret) {
error:
lttng_poll_clean(&events);
- /* Cleanup reamaining connection object. */
+ /* Cleanup remaining connection object. */
rcu_read_lock();
cds_lfht_for_each_entry(viewer_connections_ht->ht, &iter.iter,
destroy_conn,