static
int notify_thread_pipe(int wpipe)
{
- int ret;
+ ssize_t ret;
- do {
- ret = write(wpipe, "!", 1);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != 1) {
+ ret = lttng_write(wpipe, "!", 1);
+ if (ret < 1) {
PERROR("write poll pipe");
}
- return ret;
+ return (int) ret;
}
/*
static
void *thread_dispatcher(void *data)
{
- int ret, err = -1;
+ int err = -1;
+ ssize_t ret;
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
* so we can be assured that the data will be read at some point in
* time or wait to the end of the world :)
*/
- do {
- ret = write(live_relay_cmd_pipe[1], relay_cmd,
- sizeof(*relay_cmd));
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_write(live_relay_cmd_pipe[1], relay_cmd,
+ sizeof(*relay_cmd));
free(relay_cmd);
- if (ret < 0 || ret != sizeof(struct relay_command)) {
+ if (ret < sizeof(struct relay_command)) {
PERROR("write cmd pipe");
goto error;
}
return ret;
}
+/*
+ * Open index file using a given viewer stream.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int open_index(struct relay_viewer_stream *stream)
+{
+ int ret;
+ char fullpath[PATH_MAX];
+ struct lttng_packet_index_file_hdr hdr;
+
+ if (stream->tracefile_size > 0) {
+ /* For now we don't support on-disk ring buffer. */
+ ret = -1;
+ goto end;
+ }
+
+ ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s"
+ DEFAULT_INDEX_FILE_SUFFIX, stream->path_name,
+ stream->channel_name);
+ if (ret < 0) {
+ PERROR("snprintf index path");
+ goto error;
+ }
+
+ DBG("Opening index file %s in read only", fullpath);
+ ret = open(fullpath, O_RDONLY);
+ if (ret < 0) {
+ if (errno == ENOENT) {
+ ret = -ENOENT;
+ goto error;
+ } else {
+ PERROR("opening index in read-only");
+ }
+ goto error;
+ }
+ stream->index_read_fd = ret;
+ DBG("Opening index file %s in read only, (fd: %d)", fullpath, ret);
+
+ ret = lttng_read(stream->index_read_fd, &hdr, sizeof(hdr));
+ if (ret < sizeof(hdr)) {
+ PERROR("Reading index header");
+ goto error;
+ }
+ if (strncmp(hdr.magic, INDEX_MAGIC, sizeof(hdr.magic)) != 0) {
+ ERR("Invalid header magic");
+ ret = -1;
+ goto error;
+ }
+ if (be32toh(hdr.index_major) != INDEX_MAJOR ||
+ be32toh(hdr.index_minor) != INDEX_MINOR) {
+ ERR("Invalid header version");
+ ret = -1;
+ goto error;
+ }
+ ret = 0;
+
+error:
+end:
+ return ret;
+}
+
/*
* Allocate and init a new viewer_stream.
*
* Returns 0 on success or a negative value on error.
*/
static
-int init_viewer_stream(struct relay_stream *stream)
+int init_viewer_stream(struct relay_stream *stream, int seek_last)
{
int ret;
struct relay_viewer_stream *viewer_stream;
viewer_stream->tracefile_count = stream->tracefile_count;
viewer_stream->metadata_flag = stream->metadata_flag;
+ if (seek_last && viewer_stream->total_index_received > 0) {
+ ret = open_index(viewer_stream);
+ if (ret < 0) {
+ goto error;
+ }
+ ret = lseek(viewer_stream->index_read_fd,
+ viewer_stream->total_index_received *
+ sizeof(struct lttng_packet_index),
+ SEEK_CUR);
+ if (ret < 0) {
+ goto error;
+ }
+ viewer_stream->last_sent_index =
+ viewer_stream->total_index_received;
+ }
+
/*
* This is to avoid a race between the initialization of this object and
* the close of the given stream. If the stream is unable to find this
struct lttng_ht_node_u64 *node64;
struct lttng_ht_iter iter;
struct relay_session *session;
+ int seek_last = 0;
assert(cmd);
assert(sessions_ht);
/* Default behaviour. */
break;
case VIEWER_SEEK_LAST:
- /* TODO */
+ seek_last = 1;
break;
default:
ERR("Wrong seek parameter");
vstream = live_find_viewer_stream_by_id(stream->stream_handle);
if (!vstream) {
- ret = init_viewer_stream(stream);
+ ret = init_viewer_stream(stream, seek_last);
if (ret < 0) {
goto end_unlock;
}
return ret;
}
-/*
- * Open index file using a given viewer stream.
- *
- * Return 0 on success or else a negative value.
- */
-static int open_index(struct relay_viewer_stream *stream)
-{
- int ret;
- char fullpath[PATH_MAX];
- struct lttng_packet_index_file_hdr hdr;
-
- if (stream->tracefile_size > 0) {
- /* For now we don't support on-disk ring buffer. */
- ret = -1;
- goto end;
- } else {
- ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR
- "/%s" DEFAULT_INDEX_FILE_SUFFIX,
- stream->path_name, stream->channel_name);
- if (ret < 0) {
- PERROR("snprintf index path");
- goto error;
- }
- }
-
- DBG("Opening index file %s in read only", fullpath);
- ret = open(fullpath, O_RDONLY);
- if (ret < 0) {
- if (errno == ENOENT) {
- ret = ENOENT;
- goto error;
- } else {
- PERROR("opening index in read-only");
- }
- goto error;
- }
- stream->index_read_fd = ret;
- DBG("Opening index file %s in read only, (fd: %d)", fullpath, ret);
-
- do {
- health_code_update();
- ret = read(stream->index_read_fd, &hdr, sizeof(hdr));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("Reading index header");
- goto error;
- }
- if (strncmp(hdr.magic, INDEX_MAGIC, sizeof(hdr.magic)) != 0) {
- ERR("Invalid header magic");
- ret = -1;
- goto error;
- }
- if (be32toh(hdr.index_major) != INDEX_MAJOR ||
- be32toh(hdr.index_minor) != INDEX_MINOR) {
- ERR("Invalid header version");
- ret = -1;
- goto error;
- }
- ret = 0;
-
-error:
-end:
- return ret;
-}
-
/*
* Get viewer stream from stream id.
*
/* First time, we open the index file */
if (vstream->index_read_fd < 0) {
ret = open_index(vstream);
- if (ret == ENOENT) {
+ if (ret == -ENOENT) {
/*
* The index is created only when the first data packet arrives, it
* might not be ready at the beginning of the session
viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
}
- do {
- health_code_update();
- ret = read(vstream->index_read_fd, &packet_index,
- sizeof(packet_index));
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_read(vstream->index_read_fd, &packet_index,
+ sizeof(packet_index));
if (ret < sizeof(packet_index)) {
PERROR("Relay reading index file");
viewer_index.status = htobe32(VIEWER_INDEX_ERR);
PERROR("lseek");
goto error;
}
- read_len = read(stream->read_fd, data, len);
- if (read_len < (ssize_t) len) {
+ read_len = lttng_read(stream->read_fd, data, len);
+ if (read_len < len) {
PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
stream->read_fd, be64toh(get_packet_info.offset));
goto error;
goto error;
}
- read_len = read(stream->read_fd, data, len);
- if (read_len < (ssize_t) len) {
+ read_len = lttng_read(stream->read_fd, data, len);
+ if (read_len < len) {
PERROR("Relay reading metadata file");
goto error;
}
goto error;
}
- do {
- health_code_update();
- ret = read(fd, relay_connection, sizeof(*relay_connection));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret < sizeof(*relay_connection)) {
+ ret = lttng_read(fd, relay_connection, sizeof(*relay_connection));
+ if (ret < sizeof(*relay_connection)) {
PERROR("read relay cmd pipe");
goto error_read;
}
/* connection closed */
if (ret <= 0) {
cleanup_poll_connection(&events, pollfd);
- del_connection( relay_connections_ht, &iter,
+ del_connection(relay_connections_ht, &iter,
relay_connection);
DBG("Viewer control connection closed with %d",
pollfd);
return ret;
}
-void live_stop_threads()
+void live_stop_threads(void)
{
int ret;
void *status;