Sync babeltrace lttng-live functions
authorJulien Desfossez <jdesfossez@efficios.com>
Fri, 7 Feb 2014 19:20:15 +0000 (14:20 -0500)
committerJulien Desfossez <jdesfossez@efficios.com>
Fri, 7 Feb 2014 19:20:15 +0000 (14:20 -0500)
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
src/liblttng-live.c
src/liblttng-live.h
src/network-live.c

index 076c4391d55c64b0ca3d01fd9642dba80f8249f4..117b851b436173cbf06b073c84811ddb94637bc0 100644 (file)
 #include <babeltrace/ctf/events-internal.h>
 /*
 #include <formats/ctf/events-private.h>
-replaced with 
+replaced with
 */
 #include "network-live.h"
+#include "lttngtop.h"
 
 #include "liblttng-live.h"
 #include "lttng-viewer-abi.h"
-#include "lttngtop.h"
 
 /*
  * Memory allocation zeroed
@@ -67,14 +67,18 @@ replaced with
        ((type) (a) > (type) (b) ? (type) (a) : (type) (b))
 #endif
 
-int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
-               int port)
+static void ctf_live_packet_seek(struct bt_stream_pos *stream_pos,
+               size_t index, int whence);
+static void add_traces(gpointer key, gpointer value, gpointer user_data);
+static int del_traces(gpointer key, gpointer value, gpointer user_data);
+
+int lttng_live_connect_viewer(struct lttng_live_ctx *ctx)
 {
        struct hostent *host;
        struct sockaddr_in server_addr;
        int ret;
 
-       host = gethostbyname(hostname);
+       host = gethostbyname(ctx->relay_hostname);
        if (!host) {
                ret = -1;
                goto end;
@@ -87,7 +91,7 @@ int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
        }
 
        server_addr.sin_family = AF_INET;
-       server_addr.sin_port = htons(port);
+       server_addr.sin_port = htons(ctx->port);
        server_addr.sin_addr = *((struct in_addr *) host->h_addr);
        bzero(&(server_addr.sin_zero), 8);
 
@@ -161,14 +165,82 @@ error:
        return ret;
 }
 
+static
+void free_session_list(GPtrArray *session_list)
+{
+       int i;
+       struct lttng_live_relay_session *relay_session;
+
+       for (i = 0; i < session_list->len; i++) {
+               relay_session = g_ptr_array_index(session_list, i);
+               free(relay_session->name);
+               free(relay_session->hostname);
+       }
+       g_ptr_array_free(session_list, TRUE);
+}
+
+static
+void print_session_list(GPtrArray *session_list, const char *path)
+{
+       int i;
+       struct lttng_live_relay_session *relay_session;
+
+       for (i = 0; i < session_list->len; i++) {
+               relay_session = g_ptr_array_index(session_list, i);
+               fprintf(stdout, "%s/host/%s/%s (timer = %u, "
+                               "%u stream(s), %u client(s) connected)\n",
+                               path, relay_session->hostname,
+                               relay_session->name, relay_session->timer,
+                               relay_session->streams, relay_session->clients);
+       }
+}
+
+static
+void update_session_list(GPtrArray *session_list, char *hostname,
+               char *session_name, uint32_t streams, uint32_t clients,
+               uint32_t timer)
+{
+       int i, found = 0;
+       struct lttng_live_relay_session *relay_session;
+
+       for (i = 0; i < session_list->len; i++) {
+               relay_session = g_ptr_array_index(session_list, i);
+               if ((strncmp(relay_session->hostname, hostname, NAME_MAX) == 0) &&
+                               strncmp(relay_session->name,
+                                       session_name, NAME_MAX) == 0) {
+                       relay_session->streams += streams;
+                       if (relay_session->clients < clients)
+                               relay_session->clients = clients;
+                       found = 1;
+                       break;
+               }
+       }
+       if (found)
+               return;
+
+       relay_session = g_new0(struct lttng_live_relay_session, 1);
+       relay_session->hostname = strndup(hostname, NAME_MAX);
+       relay_session->name = strndup(session_name, NAME_MAX);
+       relay_session->clients = clients;
+       relay_session->streams = streams;
+       relay_session->timer = timer;
+       g_ptr_array_add(session_list, relay_session);
+}
+
 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_list_sessions list;
        struct lttng_viewer_session lsession;
-       int i, ret;
+       int i, ret, sessions_count, print_list = 0;
        ssize_t ret_len;
-       int sessions_count;
+       uint64_t session_id;
+       GPtrArray *session_list = NULL;
+
+       if (strlen(ctx->session_name) == 0) {
+               print_list = 1;
+               session_list = g_ptr_array_new();
+       }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
        cmd.data_size = 0;
@@ -195,8 +267,6 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
        assert(ret_len == sizeof(list));
 
        sessions_count = be32toh(list.sessions_count);
-       fprintf(stdout, "%u active session(s)%c\n", sessions_count,
-                       sessions_count > 0 ? ':' : ' ');
        for (i = 0; i < sessions_count; i++) {
                do {
                        ret_len = recv(ctx->control_sock, &lsession, sizeof(lsession), 0);
@@ -209,14 +279,30 @@ int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path)
                assert(ret_len == sizeof(lsession));
                lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
                lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+               session_id = be64toh(lsession.id);
+
+               if (print_list) {
+                       update_session_list(session_list,
+                                       lsession.hostname,
+                                       lsession.session_name,
+                                       be32toh(lsession.streams),
+                                       be32toh(lsession.clients),
+                                       be32toh(lsession.live_timer));
+               } else {
+                       if ((strncmp(lsession.session_name, ctx->session_name,
+                               NAME_MAX) == 0) && (strncmp(lsession.hostname,
+                                       ctx->traced_hostname, NAME_MAX) == 0)) {
+                               printf_verbose("Reading from session %" PRIu64 "\n",
+                                               session_id);
+                               g_array_append_val(ctx->session_ids,
+                                               session_id);
+                       }
+               }
+       }
 
-               fprintf(stdout, "%s/%" PRIu64 " : %s on host %s (timer = %u, "
-                               "%u stream(s), %u client(s) connected)\n",
-                               path, be64toh(lsession.id),
-                               lsession.session_name, lsession.hostname,
-                               be32toh(lsession.live_timer),
-                               be32toh(lsession.streams),
-                               be32toh(lsession.clients));
+       if (print_list) {
+               print_session_list(session_list, path);
+               free_session_list(session_list);
        }
 
        ret = 0;
@@ -328,7 +414,7 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
                goto end;
        }
 
-       ctx->session->stream_count = be32toh(rp.streams_count);
+       ctx->session->stream_count += be32toh(rp.streams_count);
        /*
         * When the session is created but not started, we do an active wait
         * until it starts. It allows the viewer to start processing the trace
@@ -409,6 +495,44 @@ error:
        return ret;
 }
 
+static
+int ask_new_streams(struct lttng_live_ctx *ctx)
+{
+       int i, ret = 0;
+       uint64_t id;
+
+restart:
+       for (i = 0; i < ctx->session_ids->len; i++) {
+               id = g_array_index(ctx->session_ids, uint64_t, i);
+               ret = lttng_live_get_new_streams(ctx, id);
+               printf_verbose("Asking for new streams returns %d\n",
+                               ret);
+               if (ret < 0) {
+                       if (ret == -LTTNG_VIEWER_NEW_STREAMS_HUP) {
+                               printf_verbose("Session %" PRIu64 " closed\n",
+                                               id);
+                               /*
+                                * The streams have already been closed during
+                                * the reading, so we only need to get rid of
+                                * the trace in our internal table of sessions.
+                                */
+                               g_array_remove_index(ctx->session_ids, i);
+                               /*
+                                * We can't continue iterating on the g_array
+                                * after a remove, we have to start again.
+                                */
+                               goto restart;
+                       } else {
+                               ret = -1;
+                               goto end;
+                       }
+               }
+       }
+
+end:
+       return ret;
+}
+
 static
 int get_data_packet(struct lttng_live_ctx *ctx,
                struct ctf_stream_pos *pos,
@@ -485,6 +609,13 @@ int get_data_packet(struct lttng_live_ctx *ctx,
                        ret = 0;
                        goto end;
                }
+               if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+                       ret = ask_new_streams(ctx);
+                       if (ret < 0)
+                               goto error;
+                       g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
+                                       ctx->bt_ctx);
+               }
                fprintf(stderr, "[error] get_data_packet: error\n");
                ret = -1;
                goto end;
@@ -747,6 +878,13 @@ retry:
                                goto error;
                        }
                }
+               if (rp.flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
+                       ret = ask_new_streams(ctx);
+                       if (ret < 0)
+                               goto error;
+                       g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
+                                       ctx->bt_ctx);
+               }
                break;
        case LTTNG_VIEWER_INDEX_RETRY:
                printf_verbose("get_next_index: retry\n");
@@ -757,6 +895,7 @@ retry:
                viewer_stream->id = -1ULL;
                viewer_stream->fd = -1;
                index->offset = EOF;
+               ctx->session->stream_count--;
                break;
        case LTTNG_VIEWER_INDEX_ERR:
                fprintf(stderr, "[error] get_next_index: error\n");
@@ -774,6 +913,7 @@ error:
        return ret;
 }
 
+static
 void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index,
                int whence)
 {
@@ -865,7 +1005,50 @@ end:
        return;
 }
 
-static int del_traces(gpointer key, gpointer value, gpointer user_data)
+int lttng_live_create_viewer_session(struct lttng_live_ctx *ctx)
+{
+       struct lttng_viewer_cmd cmd;
+       struct lttng_viewer_create_session_response resp;
+       int ret;
+       ssize_t ret_len;
+
+       cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
+       cmd.data_size = 0;
+       cmd.cmd_version = 0;
+
+       do {
+               ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error sending cmd\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(cmd));
+
+       do {
+               ret_len = recv(ctx->control_sock, &resp, sizeof(resp), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error receiving create session reply\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(resp));
+
+       if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
+               fprintf(stderr, "[error] Error creating viewer session\n");
+               ret = -1;
+               goto error;
+       }
+       ret = 0;
+
+error:
+       return ret;
+}
+
+static
+int del_traces(gpointer key, gpointer value, gpointer user_data)
 {
        struct bt_context *bt_ctx = user_data;
        struct lttng_live_ctf_trace *trace = value;
@@ -879,7 +1062,8 @@ static int del_traces(gpointer key, gpointer value, gpointer user_data)
        return 1;
 }
 
-static void add_traces(gpointer key, gpointer value, gpointer user_data)
+static
+void add_traces(gpointer key, gpointer value, gpointer user_data)
 {
        int i, ret, total_metadata = 0;
        uint64_t metadata_len;
@@ -890,6 +1074,18 @@ static void add_traces(gpointer key, gpointer value, gpointer user_data)
        struct bt_mmap_stream_list mmap_list;
        struct lttng_live_ctx *ctx = NULL;
 
+       /*
+        * We don't know how many streams we will receive for a trace, so
+        * once we are done receiving the traces, we add all the traces
+        * received to the bt_context.
+        * We can receive streams during the attach command or the
+        * get_new_streams, so we have to make sure not to add multiple
+        * times the same traces.
+        * If a trace is already in the context, we just skip this function.
+        */
+       if (trace->in_use)
+               return;
+
        BT_INIT_LIST_HEAD(&mmap_list.head);
 
        for (i = 0; i < trace->streams->len; i++) {
@@ -925,7 +1121,20 @@ static void add_traces(gpointer key, gpointer value, gpointer user_data)
                fprintf(stderr, "[error] Error adding trace\n");
                goto end_free;
        }
+
+       if (bt_ctx->current_iterator) {
+               struct bt_trace_descriptor *td;
+               struct bt_trace_handle *handle;
+
+               handle = (struct bt_trace_handle *) g_hash_table_lookup(
+                               bt_ctx->trace_handles,
+                               (gpointer) (unsigned long) ret);
+               td = handle->td;
+               bt_iter_add_trace(bt_ctx->current_iterator, td);
+       }
+
        trace->trace_id = ret;
+       trace->in_use = 1;
 
        goto end;
 
@@ -935,41 +1144,261 @@ end:
        return;
 }
 
-void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
+int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
 {
-       int ret, active_session = 0;
-       struct bt_context *bt_ctx;
+       struct lttng_viewer_cmd cmd;
+       struct lttng_viewer_new_streams_request rq;
+       struct lttng_viewer_new_streams_response rp;
+       struct lttng_viewer_stream stream;
+       int ret, i;
+       ssize_t ret_len;
+       uint32_t stream_count;
 
-       bt_ctx = bt_context_create();
-       if (!bt_ctx) {
-               fprintf(stderr, "[error] bt_context_create allocation\n");
+       cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
+       cmd.data_size = sizeof(rq);
+       cmd.cmd_version = 0;
+
+       memset(&rq, 0, sizeof(rq));
+       rq.session_id = htobe64(id);
+
+       do {
+               ret_len = send(ctx->control_sock, &cmd, sizeof(cmd), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error sending cmd\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(cmd));
+
+       do {
+               ret_len = send(ctx->control_sock, &rq, sizeof(rq), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error sending get_new_streams request\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(rq));
+
+       do {
+               ret_len = recv(ctx->control_sock, &rp, sizeof(rp), 0);
+       } while (ret_len < 0 && errno == EINTR);
+       if (ret_len < 0) {
+               fprintf(stderr, "[error] Error receiving get_new_streams response\n");
+               ret = ret_len;
+               goto error;
+       }
+       assert(ret_len == sizeof(rp));
+
+       switch(be32toh(rp.status)) {
+       case LTTNG_VIEWER_NEW_STREAMS_OK:
+               break;
+       case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
+               ret = 0;
+               goto end;
+       case LTTNG_VIEWER_NEW_STREAMS_HUP:
+               ret = -LTTNG_VIEWER_NEW_STREAMS_HUP;
+               goto end;
+       case LTTNG_VIEWER_NEW_STREAMS_ERR:
+               fprintf(stderr, "[error] get_new_streams error\n");
+               ret = -1;
+               goto end;
+       default:
+               fprintf(stderr, "[error] Unknown return code %u\n",
+                               be32toh(rp.status));
+               ret = -1;
                goto end;
        }
 
+       stream_count = be32toh(rp.streams_count);
+       ctx->session->stream_count += stream_count;
        /*
-        * As long as the session is active, we try to reattach to it,
-        * even if all the streams get closed.
+        * When the session is created but not started, we do an active wait
+        * until it starts. It allows the viewer to start processing the trace
+        * as soon as the session starts.
         */
-//     do {
+       if (ctx->session->stream_count == 0) {
+               ret = 0;
+               goto end;
+       }
+       printf_verbose("Waiting for %" PRIu64 " streams:\n",
+               ctx->session->stream_count);
+       ctx->session->streams = g_new0(struct lttng_live_viewer_stream,
+                       ctx->session->stream_count);
+       for (i = 0; i < stream_count; i++) {
                do {
-                       ret = lttng_live_attach_session(ctx, session_id);
-                       printf_verbose("Attaching session returns %d\n", ret);
+                       ret_len = recv(ctx->control_sock, &stream, sizeof(stream), 0);
+               } while (ret_len < 0 && errno == EINTR);
+               if (ret_len < 0) {
+                       fprintf(stderr, "[error] Error receiving stream\n");
+                       ret = ret_len;
+                       goto error;
+               }
+               assert(ret_len == sizeof(stream));
+               stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
+               stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
+
+               printf_verbose("    stream %" PRIu64 " : %s/%s\n",
+                               be64toh(stream.id), stream.path_name,
+                               stream.channel_name);
+               ctx->session->streams[i].id = be64toh(stream.id);
+               ctx->session->streams[i].session = ctx->session;
+
+               ctx->session->streams[i].first_read = 1;
+               ctx->session->streams[i].mmap_size = 0;
+
+               if (be32toh(stream.metadata_flag)) {
+                       char *path;
+
+                       path = strdup(LTTNG_METADATA_PATH_TEMPLATE);
+                       if (!path) {
+                               perror("strdup");
+                               ret = -1;
+                               goto error;
+                       }
+                       if (!mkdtemp(path)) {
+                               perror("mkdtemp");
+                               free(path);
+                               ret = -1;
+                               goto error;
+                       }
+                       ctx->session->streams[i].metadata_flag = 1;
+                       snprintf(ctx->session->streams[i].path,
+                                       sizeof(ctx->session->streams[i].path),
+                                       "%s/%s", path,
+                                       stream.channel_name);
+                       ret = open(ctx->session->streams[i].path,
+                                       O_WRONLY | O_CREAT | O_TRUNC,
+                                       S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
                        if (ret < 0) {
-                               if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
-                                       if (active_session)
-                                               goto end_free;
-                                       fprintf(stderr, "[error] Unknown "
-                                                       "session ID\n");
-                               }
-                               goto end_free;
-                       } else {
-                               active_session = 1;
+                               perror("open");
+                               free(path);
+                               goto error;
                        }
-               } while (ctx->session->stream_count == 0);
+                       ctx->session->streams[i].fd = ret;
+                       free(path);
+               }
+               ret = lttng_live_ctf_trace_assign(&ctx->session->streams[i],
+                               be64toh(stream.ctf_trace_id));
+               if (ret < 0) {
+                       goto error;
+               }
+
+       }
+       ret = 0;
+
+end:
+error:
+       return ret;
+}
 
-               g_hash_table_foreach(ctx->session->ctf_traces, add_traces, bt_ctx);
+void lttng_live_read(struct lttng_live_ctx *ctx)
+{
+       int ret, i;
+#if 0
+       struct bt_ctf_iter *iter;
+       const struct bt_ctf_event *event;
+       struct bt_iter_pos begin_pos;
+       struct bt_trace_descriptor *td_write;
+       struct bt_format *fmt_write;
+       struct ctf_text_stream_pos *sout;
+#endif
+       uint64_t id;
 
-               ret = check_requirements(bt_ctx);
+       ctx->bt_ctx = bt_context_create();
+       if (!ctx->bt_ctx) {
+               fprintf(stderr, "[error] bt_context_create allocation\n");
+               goto end;
+       }
+
+#if 0
+       fmt_write = bt_lookup_format(g_quark_from_static_string("text"));
+       if (!fmt_write) {
+               fprintf(stderr, "[error] ctf-text error\n");
+               goto end;
+       }
+
+       td_write = fmt_write->open_trace(NULL, O_RDWR, NULL, NULL);
+       if (!td_write) {
+               fprintf(stderr, "[error] Error opening output trace\n");
+               goto end_free;
+       }
+
+       sout = container_of(td_write, struct ctf_text_stream_pos,
+                       trace_descriptor);
+       if (!sout->parent.event_cb)
+               goto end_free;
+#endif
+
+       ret = lttng_live_create_viewer_session(ctx);
+       if (ret < 0) {
+               goto end_free;
+       }
+
+       for (i = 0; i < ctx->session_ids->len; i++) {
+               id = g_array_index(ctx->session_ids, uint64_t, i);
+               printf_verbose("Attaching to session %lu\n", id);
+               ret = lttng_live_attach_session(ctx, id);
+               printf_verbose("Attaching session returns %d\n", ret);
+               if (ret < 0) {
+                       if (ret == -LTTNG_VIEWER_ATTACH_UNK) {
+                               fprintf(stderr, "[error] Unknown session ID\n");
+                       }
+                       goto end_free;
+               }
+       }
+
+       /*
+        * As long as the session is active, we try to get new streams.
+        */
+#if 0
+       for (;;) {
+               int flags;
+#endif
+
+               while (!ctx->session->stream_count) {
+                       if (ctx->session_ids->len == 0)
+                               goto end_free;
+                       ret = ask_new_streams(ctx);
+                       if (ret < 0)
+                               goto end_free;
+               }
+
+               g_hash_table_foreach(ctx->session->ctf_traces, add_traces,
+                               ctx->bt_ctx);
+
+#if 0
+               begin_pos.type = BT_SEEK_BEGIN;
+               iter = bt_ctf_iter_create(ctx->bt_ctx, &begin_pos, NULL);
+               if (!iter) {
+                       fprintf(stderr, "[error] Iterator creation error\n");
+                       goto end;
+               }
+
+               for (;;) {
+                       event = bt_ctf_iter_read_event_flags(iter, &flags);
+                       if (!(flags & BT_ITER_FLAG_RETRY)) {
+                               if (!event) {
+                                       /* End of trace */
+                                       break;
+                               }
+                               ret = sout->parent.event_cb(&sout->parent,
+                                               event->parent->stream);
+                               if (ret) {
+                                       fprintf(stderr, "[error] Writing "
+                                                       "event failed.\n");
+                                       goto end_free;
+                               }
+                       }
+                       ret = bt_iter_next(bt_ctf_get_iter(iter));
+                       if (ret < 0) {
+                               goto end_free;
+                       }
+               }
+               bt_ctf_iter_destroy(iter);
+#endif
+               ret = check_requirements(ctx->bt_ctx);
                if (ret < 0) {
                        fprintf(stderr, "[error] some mandatory contexts "
                                        "were missing, exiting.\n");
@@ -982,14 +1411,16 @@ void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id)
                        pthread_create(&timer_thread, NULL, refresh_thread,
                                        (void *) NULL);
                }
-
-               iter_trace(bt_ctx);
-
-               g_hash_table_foreach_remove(ctx->session->ctf_traces, del_traces, bt_ctx);
-//     } while (active_session);
+               iter_trace(ctx->bt_ctx);
+               g_hash_table_foreach_remove(ctx->session->ctf_traces,
+                               del_traces, ctx->bt_ctx);
+               ctx->session->stream_count = 0;
+#if 0
+       }
+#endif
 
 end_free:
-       bt_context_put(bt_ctx);
+       bt_context_put(ctx->bt_ctx);
 end:
        return;
 }
index 778540d619ec81c5a39bc76866130a40fc4f5768..2638da19a9c84ae13fce96c21f43ccc72f8e672b 100644 (file)
 #define LTTNG_LIVE_MINOR                       4
 
 struct lttng_live_ctx {
+       char traced_hostname[NAME_MAX];
+       char session_name[NAME_MAX];
+       char relay_hostname[NAME_MAX];
        int control_sock;
+       int port;
        struct lttng_live_session *session;
+       struct bt_context *bt_ctx;
+       GArray *session_ids;
 };
 
 struct lttng_live_viewer_stream {
@@ -62,13 +68,23 @@ struct lttng_live_ctf_trace {
        GPtrArray *streams;
        FILE *metadata_fp;
        int trace_id;
+       int in_use;
 };
 
-int lttng_live_connect_viewer(struct lttng_live_ctx *ctx, char *hostname,
-               int port);
+/* Just used in listing. */
+struct lttng_live_relay_session {
+       uint32_t streams;
+       uint32_t clients;
+       uint32_t timer;
+       char *name;
+       char *hostname;
+};
+
+int lttng_live_connect_viewer(struct lttng_live_ctx *ctx);
 int lttng_live_establish_connection(struct lttng_live_ctx *ctx);
 int lttng_live_list_sessions(struct lttng_live_ctx *ctx, const char *path);
 int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id);
-void lttng_live_read(struct lttng_live_ctx *ctx, uint64_t session_id);
+void lttng_live_read(struct lttng_live_ctx *ctx);
+int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id);
 
 #endif /* _LTTNG_LIVE_FUNCTIONS_H */
index 278e8aa1c8a0569f7618fce1b35db0cc27dd97bb..8f2a7effd8527a8abb9c1149f9dd043843ea93fa 100644 (file)
 /*
  * hostname parameter needs to hold NAME_MAX chars.
  */
-static int parse_url(const char *path, char *hostname, int *port,
-               uint64_t *session_id)
+static
+int parse_url(const char *path, struct lttng_live_ctx *ctx)
 {
-       char remain[2][NAME_MAX];
+       char remain[3][NAME_MAX];
        int ret = -1, proto, proto_offset = 0;
        size_t path_len = strlen(path);
 
@@ -84,16 +84,15 @@ static int parse_url(const char *path, char *hostname, int *port,
        /* TODO : parse for IPv6 as well */
        /* Parse the hostname or IP */
        ret = sscanf(&path[proto_offset], "%[a-zA-Z.0-9%-]%s",
-               hostname, remain[0]);
+               ctx->relay_hostname, remain[0]);
        if (ret == 2) {
                /* Optional port number */
                switch (remain[0][0]) {
                case ':':
-                       ret = sscanf(remain[0], ":%d%s", port, remain[1]);
+                       ret = sscanf(remain[0], ":%d%s", &ctx->port, remain[1]);
                        /* Optional session ID with port number */
                        if (ret == 2) {
-                               ret = sscanf(remain[1], "/%" PRIu64,
-                                       session_id);
+                               ret = sscanf(remain[1], "/%s", remain[2]);
                                /* Accept 0 or 1 (optional) */
                                if (ret < 0) {
                                        goto end;
@@ -102,7 +101,7 @@ static int parse_url(const char *path, char *hostname, int *port,
                        break;
                case '/':
                        /* Optional session ID */
-                       ret = sscanf(remain[0], "/%" PRIu64, session_id);
+                       ret = sscanf(remain[0], "/%s", remain[2]);
                        /* Accept 0 or 1 (optional) */
                        if (ret < 0) {
                                goto end;
@@ -116,17 +115,29 @@ static int parse_url(const char *path, char *hostname, int *port,
                }
        }
 
-       if (*port < 0)
-               *port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
+       if (ctx->port < 0)
+               ctx->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
 
-       if (*session_id == -1ULL)
+       if (strlen(remain[2]) == 0) {
                printf_verbose("Connecting to hostname : %s, port : %d, "
                                "proto : IPv%d\n",
-                               hostname, *port, proto);
-       else
-               printf_verbose("Connecting to hostname : %s, port : %d, "
-                               "session id : %" PRIu64 ", proto : IPv%d\n",
-                               hostname, *port, *session_id, proto);
+                               ctx->relay_hostname, ctx->port, proto);
+               ret = 0;
+               goto end;
+       }
+       ret = sscanf(remain[2], "host/%[a-zA-Z.0-9%-]/%s",
+                       ctx->traced_hostname, ctx->session_name);
+       if (ret != 2) {
+               fprintf(stderr, "[error] Format : "
+                       "net://<hostname>/host/<traced_hostname>/<session_name>\n");
+               goto end;
+       }
+
+       printf_verbose("Connecting to hostname : %s, port : %d, "
+                       "traced hostname : %s, session name : %s, "
+                       "proto : IPv%d\n",
+                       ctx->relay_hostname, ctx->port, ctx->traced_hostname,
+                       ctx->session_name, proto);
        ret = 0;
 
 end:
@@ -135,53 +146,52 @@ end:
 
 static int lttng_live_open_trace_read(const char *path)
 {
-       char hostname[NAME_MAX];
-       int port = -1;
-       uint64_t session_id = -1ULL;
        int ret = 0;
-       struct lttng_live_ctx ctx;
+       struct lttng_live_ctx *ctx;
 
-       ctx.session = g_new0(struct lttng_live_session, 1);
+       ctx = g_new0(struct lttng_live_ctx, 1);
+       ctx->session = g_new0(struct lttng_live_session, 1);
 
        /* We need a pointer to the context from the packet_seek function. */
-       ctx.session->ctx = &ctx;
+       ctx->session->ctx = ctx;
 
        /* HT to store the CTF traces. */
-       ctx.session->ctf_traces = g_hash_table_new(g_direct_hash,
+       ctx->session->ctf_traces = g_hash_table_new(g_direct_hash,
                        g_direct_equal);
+       ctx->port = -1;
+       ctx->session_ids = g_array_new(FALSE, TRUE, sizeof(uint64_t));
 
-       ret = parse_url(path, hostname, &port, &session_id);
+       ret = parse_url(path, ctx);
        if (ret < 0) {
                goto end_free;
        }
 
-       ret = lttng_live_connect_viewer(&ctx, hostname, port);
+       ret = lttng_live_connect_viewer(ctx);
        if (ret < 0) {
                fprintf(stderr, "[error] Connection failed\n");
                goto end_free;
        }
        printf_verbose("LTTng-live connected to relayd\n");
 
-       ret = lttng_live_establish_connection(&ctx);
+       ret = lttng_live_establish_connection(ctx);
        if (ret < 0) {
                goto end_free;
        }
-
-       if (session_id == -1ULL) {
-               printf_verbose("Listing sessions\n");
-               ret = lttng_live_list_sessions(&ctx, path);
-               if (ret < 0) {
-                       fprintf(stderr, "[error] List error\n");
-                       goto end_free;
-               }
-       } else {
-               lttng_live_read(&ctx, session_id);
+       printf_verbose("Listing sessions\n");
+       ret = lttng_live_list_sessions(ctx, path);
+       if (ret < 0) {
+               fprintf(stderr, "[error] List error\n");
+               goto end_free;
        }
 
+       if (ctx->session_ids->len > 0)
+               lttng_live_read(ctx);
+
 end_free:
-       g_hash_table_destroy(ctx.session->ctf_traces);
-       g_free(ctx.session);
-       g_free(ctx.session->streams);
+       g_hash_table_destroy(ctx->session->ctf_traces);
+       g_free(ctx->session);
+       g_free(ctx->session->streams);
+       g_free(ctx);
        return ret;
 }
 
This page took 0.034091 seconds and 4 git commands to generate.