conn->minor = be32toh(msg.minor);
}
- if (be32toh(msg.type) == VIEWER_CLIENT_COMMAND) {
+ if (be32toh(msg.type) == LTTNG_VIEWER_CLIENT_COMMAND) {
conn->type = RELAY_VIEWER_COMMAND;
- } else if (be32toh(msg.type) == VIEWER_CLIENT_NOTIFICATION) {
+ } else if (be32toh(msg.type) == LTTNG_VIEWER_CLIENT_NOTIFICATION) {
conn->type = RELAY_VIEWER_NOTIFICATION;
} else {
ERR("Unknown connection type : %u", be32toh(msg.type));
if (!session) {
DBG("Relay session %" PRIu64 " not found",
be64toh(request.session_id));
- response.status = htobe32(VIEWER_NEW_STREAMS_ERR);
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
goto send_reply;
}
if (conn->session_id == session->id) {
/* We confirmed the viewer is asking for the same session. */
send_streams = 1;
- response.status = htobe32(VIEWER_NEW_STREAMS_OK);
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
} else {
send_streams = 0;
- response.status = htobe32(VIEWER_NEW_STREAMS_ERR);
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
goto send_reply;
}
goto send_reply;
}
- ret = make_viewer_streams(session, VIEWER_SEEK_LAST, NULL, &nb_unsent,
+ ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, NULL, &nb_unsent,
&nb_created);
if (ret < 0) {
goto end_unlock;
health_code_update();
+ if (!conn->viewer_session) {
+ DBG("Client trying to attach before creating a live viewer session");
+ response.status = htobe32(LTTNG_VIEWER_ATTACH_NO_SESSION);
+ goto send_reply;
+ }
+
rcu_read_lock();
session = session_find_by_id(conn->sessions_ht,
be64toh(request.session_id));
if (!session) {
DBG("Relay session %" PRIu64 " not found",
be64toh(request.session_id));
- response.status = htobe32(VIEWER_ATTACH_UNK);
+ response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
goto send_reply;
}
session_viewer_attach(session);
if (uatomic_read(&session->viewer_refcount) > 1) {
DBG("Already a viewer attached");
- response.status = htobe32(VIEWER_ATTACH_ALREADY);
+ response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY);
session_viewer_detach(session);
goto send_reply;
} else if (session->live_timer == 0) {
DBG("Not live session");
- response.status = htobe32(VIEWER_ATTACH_NOT_LIVE);
+ response.status = htobe32(LTTNG_VIEWER_ATTACH_NOT_LIVE);
goto send_reply;
} else {
send_streams = 1;
- response.status = htobe32(VIEWER_ATTACH_OK);
+ response.status = htobe32(LTTNG_VIEWER_ATTACH_OK);
conn->session_id = session->id;
conn->session = session;
}
switch (be32toh(request.seek)) {
- case VIEWER_SEEK_BEGINNING:
- case VIEWER_SEEK_LAST:
+ case LTTNG_VIEWER_SEEK_BEGINNING:
+ case LTTNG_VIEWER_SEEK_LAST:
seek_type = be32toh(request.seek);
break;
default:
ERR("Wrong seek parameter");
- response.status = htobe32(VIEWER_ATTACH_SEEK_ERR);
+ response.status = htobe32(LTTNG_VIEWER_ATTACH_SEEK_ERR);
send_streams = 0;
goto send_reply;
}
* The viewer should not ask for index on metadata stream.
*/
if (vstream->metadata_flag) {
- viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
goto send_reply;
}
* The index is created only when the first data packet arrives, it
* might not be ready at the beginning of the session
*/
- viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
goto send_reply;
} else if (ret < 0) {
- viewer_index.status = htobe32(VIEWER_INDEX_ERR);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
goto send_reply;
}
vstream->index_read_fd = ret;
if (ret < 0) {
goto end_unlock;
} else if (ret == 1) {
- viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
viewer_stream_destroy(ctf_trace, vstream);
goto send_reply;
if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
if (rstream->beacon_ts_end != -1ULL &&
vstream->last_sent_index == rstream->total_index_received) {
- viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
goto send_reply;
*/
pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
/* No new index to send, retry later. */
- viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
goto send_reply;
}
}
} else if (rstream->close_flag && vstream->close_write_flag &&
vstream->total_index_received == vstream->last_sent_index) {
/* Last index sent and current tracefile closed in write */
- viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
viewer_stream_destroy(ctf_trace, vstream);
goto send_reply;
/*
* The file is being overwritten by the writer, we cannot * use it.
*/
- viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
pthread_mutex_unlock(&vstream->overwrite_lock);
ret = viewer_stream_rotate(vstream, rstream);
if (ret < 0) {
goto end_unlock;
} else if (ret == 1) {
- viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
viewer_stream_destroy(ctf_trace, vstream);
goto send_reply;
* The tracefile is closed in write, so we read up to EOF.
*/
if (vstream->close_write_flag == 1) {
- viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
/* Rotate on normal EOF */
ret = viewer_stream_rotate(vstream, rstream);
if (ret < 0) {
goto end_unlock;
} else if (ret == 1) {
- viewer_index.status = htobe32(VIEWER_INDEX_HUP);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
viewer_stream_delete(vstream);
viewer_stream_destroy(ctf_trace, vstream);
goto send_reply;
}
} else {
PERROR("Relay reading index file %d", vstream->index_read_fd);
- viewer_index.status = htobe32(VIEWER_INDEX_ERR);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
}
goto send_reply;
} else {
- viewer_index.status = htobe32(VIEWER_INDEX_OK);
+ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
vstream->last_sent_index++;
}
if (!ctf_trace->metadata_received ||
ctf_trace->metadata_received > ctf_trace->metadata_sent) {
- reply.status = htobe32(VIEWER_GET_PACKET_ERR);
+ reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
reply.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
goto send_reply;
}
if (ret < 0) {
goto end_unlock;
} else if (ret == 1) {
- reply.status = htobe32(VIEWER_GET_PACKET_ERR);
+ reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
reply.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
goto send_reply;
}
PERROR("lseek");
goto error;
}
- reply.status = htobe32(VIEWER_GET_PACKET_EOF);
+ reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
goto send_reply;
}
read_len = lttng_read(stream->read_fd, data, len);
be64toh(get_packet_info.offset));
goto error;
} else {
- reply.status = htobe32(VIEWER_GET_PACKET_EOF);
+ reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_EOF);
goto send_reply;
}
}
- reply.status = htobe32(VIEWER_GET_PACKET_OK);
+ reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
reply.len = htobe32(len);
send_data = 1;
goto send_reply;
error:
- reply.status = htobe32(VIEWER_GET_PACKET_ERR);
+ reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
send_reply:
reply.flags = htobe32(reply.flags);
len = ctf_trace->metadata_received - ctf_trace->metadata_sent;
if (len == 0) {
- reply.status = htobe32(VIEWER_NO_NEW_METADATA);
+ reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
goto send_reply;
}
goto error;
}
ctf_trace->metadata_sent += read_len;
- reply.status = htobe32(VIEWER_METADATA_OK);
+ reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
goto send_reply;
error:
- reply.status = htobe32(VIEWER_METADATA_ERR);
+ reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
send_reply:
health_code_update();
return ret;
}
+/*
+ * Create a viewer session.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static
+int viewer_create_session(struct relay_connection *conn)
+{
+ int ret;
+ struct lttng_viewer_create_session_response resp;
+
+ DBG("Viewer create session received");
+
+ resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_OK);
+ conn->viewer_session = zmalloc(sizeof(conn->viewer_session));
+ if (!conn->viewer_session) {
+ ERR("Allocation viewer session");
+ resp.status = htobe32(LTTNG_VIEWER_CREATE_SESSION_ERR);
+ goto send_reply;
+ }
+ CDS_INIT_LIST_HEAD(&conn->viewer_session->sessions_head);
+
+send_reply:
+ health_code_update();
+ ret = send_response(conn->sock, &resp, sizeof(resp));
+ if (ret < 0) {
+ goto end;
+ }
+ health_code_update();
+ ret = 0;
+
+end:
+ return ret;
+}
+
+
/*
* live_relay_unknown_command: send -1 if received unknown command
*/
* Make sure we've done the version check before any command other then a
* new client connection.
*/
- if (msg_value != VIEWER_CONNECT && !conn->version_check_done) {
+ if (msg_value != LTTNG_VIEWER_CONNECT && !conn->version_check_done) {
ERR("Viewer conn value %" PRIu32 " before version check", msg_value);
ret = -1;
goto end;
}
switch (msg_value) {
- case VIEWER_CONNECT:
+ case LTTNG_VIEWER_CONNECT:
ret = viewer_connect(conn);
break;
- case VIEWER_LIST_SESSIONS:
+ case LTTNG_VIEWER_LIST_SESSIONS:
ret = viewer_list_sessions(conn);
break;
- case VIEWER_ATTACH_SESSION:
+ case LTTNG_VIEWER_ATTACH_SESSION:
ret = viewer_attach_session(conn);
break;
- case VIEWER_GET_NEXT_INDEX:
+ case LTTNG_VIEWER_GET_NEXT_INDEX:
ret = viewer_get_next_index(conn);
break;
- case VIEWER_GET_PACKET:
+ case LTTNG_VIEWER_GET_PACKET:
ret = viewer_get_packet(conn);
break;
- case VIEWER_GET_METADATA:
+ case LTTNG_VIEWER_GET_METADATA:
ret = viewer_get_metadata(conn);
break;
- case VIEWER_GET_NEW_STREAMS:
+ case LTTNG_VIEWER_GET_NEW_STREAMS:
ret = viewer_get_new_streams(conn);
break;
+ case LTTNG_VIEWER_CREATE_SESSION:
+ ret = viewer_create_session(conn);
+ break;
default:
ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd));
live_relay_unknown_command(conn);