X-Git-Url: https://git.lttng.org/?p=lttngtop.git;a=blobdiff_plain;f=src%2Fnetwork-live.c;h=4f5caef1b18ba7d87a561c78535a407442960037;hp=46a5b4d2d0c317cd80de5be78c527b24c7871452;hb=3bac1a1f2f88bc39f1f2eb09efe508f4c59d4428;hpb=38e240608abcec56b7bee5a076e3992c835de834 diff --git a/src/network-live.c b/src/network-live.c index 46a5b4d..4f5caef 100644 --- a/src/network-live.c +++ b/src/network-live.c @@ -34,9 +34,9 @@ #include #include -#include "lttng-viewer.h" -#include "ctf-index.h" +#include "lttng-viewer-abi.h" #include "network-live.h" +#include "lttng-live-comm.h" #include #include @@ -49,793 +49,210 @@ #include #include #include +#include /* - * Memory allocation zeroed + * hostname parameter needs to hold NAME_MAX chars. */ -#define zmalloc(x) calloc(1, x) -/* FIXME : completely arbitrary */ -#define mmap_size 524288 - -static int control_sock; -struct live_session *session; - -struct viewer_stream { - uint64_t id; - uint64_t ctf_trace_id; - void *mmap_base; - int fd; - int metadata_flag; - int first_read; - char path[PATH_MAX]; -}; - -struct live_session { - struct viewer_stream *streams; - uint64_t live_timer_interval; - uint64_t stream_count; -}; - static -int connect_viewer(char *hostname) +int parse_url(const char *path, struct lttng_live_ctx *ctx) { - struct hostent *host; - struct sockaddr_in server_addr; - int ret; - - host = gethostbyname(hostname); - if (!host) { - ret = -1; - goto end; - } - - if ((control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - perror("Socket"); - ret = -1; - goto end; - } - - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(5344); - server_addr.sin_addr = *((struct in_addr *) host->h_addr); - bzero(&(server_addr.sin_zero), 8); + char remain[3][NAME_MAX]; + int ret = -1, proto, proto_offset = 0; + size_t path_len = strlen(path); - if (connect(control_sock, (struct sockaddr *) &server_addr, - sizeof(struct sockaddr)) == -1) { - perror("Connect"); - ret = -1; + /* + * Since sscanf API does not allow easily checking string length + * against a size defined by a macro. Test it beforehand on the + * input. We know the output is always <= than the input length. + */ + if (path_len > NAME_MAX) { goto end; } - - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(5345); - server_addr.sin_addr = *((struct in_addr *) host->h_addr); - bzero(&(server_addr.sin_zero), 8); - - ret = 0; - -end: - return ret; -} - -static -int establish_connection(void) -{ - struct lttng_viewer_cmd cmd; - struct lttng_viewer_connect connect; - int ret; - - cmd.cmd = htobe32(VIEWER_CONNECT); - cmd.data_size = sizeof(connect); - cmd.cmd_version = 0; - - connect.major = htobe32(2); - connect.minor = htobe32(4); - connect.type = htobe32(VIEWER_CLIENT_COMMAND); - - do { - ret = send(control_sock, &cmd, sizeof(cmd), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error sending cmd\n"); - goto error; - } - do { - ret = send(control_sock, &connect, sizeof(connect), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error sending version\n"); - goto error; - } - - do { - ret = recv(control_sock, &connect, sizeof(connect), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error receiving version\n"); - goto error; - } - fprintf(stderr, " - Received viewer session ID : %" PRIu64 "\n", - be64toh(connect.viewer_session_id)); - fprintf(stderr, " - Received version : %u.%u\n", be32toh(connect.major), - be32toh(connect.minor)); - - ret = 0; - -error: - return ret; -} - -int list_sessions(void) -{ - struct lttng_viewer_cmd cmd; - struct lttng_viewer_list_sessions list; - struct lttng_viewer_session lsession; - int i, ret; - int first_session = 0; - - cmd.cmd = htobe32(VIEWER_LIST_SESSIONS); - cmd.data_size = 0; - cmd.cmd_version = 0; - - do { - ret = send(control_sock, &cmd, sizeof(cmd), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error sending cmd\n"); - goto error; - } - - do { - ret = recv(control_sock, &list, sizeof(list), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error receiving session list\n"); - goto error; - } - - fprintf(stderr, " - %u active session(s)\n", be32toh(list.sessions_count)); - for (i = 0; i < be32toh(list.sessions_count); i++) { - do { - ret = recv(control_sock, &lsession, sizeof(lsession), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error receiving session\n"); - goto error; - } - fprintf(stderr, " - %" PRIu64 " : %s on host %s (timer = %u, " - "%u client(s) connected)\n", - be64toh(lsession.id), lsession.session_name, - lsession.hostname, be32toh(lsession.live_timer), - be32toh(lsession.clients)); - if (first_session <= 0) { - first_session = be64toh(lsession.id); - } - } - - /* I know, type mismatch */ - ret = (int) first_session; - -error: - return ret; -} - -static -int attach_session(int id, int begin) -{ - struct lttng_viewer_cmd cmd; - struct lttng_viewer_attach_session_request rq; - struct lttng_viewer_attach_session_response rp; - struct lttng_viewer_stream stream; - int ret, i; - - cmd.cmd = htobe32(VIEWER_ATTACH_SESSION); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; - - rq.session_id = htobe64(id); - if (begin) { - rq.seek = htobe32(VIEWER_SEEK_BEGINNING); + ret = sscanf(path, "net%d://", &proto); + if (ret < 1) { + proto = 4; + /* net:// */ + proto_offset = strlen("net://"); } else { - rq.seek = htobe32(VIEWER_SEEK_LAST); - } - - do { - ret = send(control_sock, &cmd, sizeof(cmd), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error sending cmd\n"); - goto error; - } - do { - ret = send(control_sock, &rq, sizeof(rq), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error sending attach request\n"); - goto error; - } - - do { - ret = recv(control_sock, &rp, sizeof(rp), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error receiving attach response\n"); - goto error; + /* net4:// or net6:// */ + proto_offset = strlen("netX://"); } - fprintf(stderr, " - session attach response : %u\n", be32toh(rp.status)); - if (be32toh(rp.status) != VIEWER_ATTACH_OK) { - ret = 1; + if (proto_offset > path_len) { goto end; } - - session->stream_count = be32toh(rp.streams_count); - fprintf(stderr, " - Waiting for %" PRIu64 " streams\n", session->stream_count); - session->streams = zmalloc(session->stream_count * - sizeof(struct viewer_stream)); - if (!session->streams) { - ret = -1; - goto error; - } - - for (i = 0; i < be32toh(rp.streams_count); i++) { - do { - ret = recv(control_sock, &stream, sizeof(stream), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error receiving stream\n"); - goto error; - } - fprintf(stderr, " - stream %" PRIu64 " : %s/%s\n", - be64toh(stream.id), stream.path_name, - stream.channel_name); - session->streams[i].id = be64toh(stream.id); - - session->streams[i].ctf_trace_id = be64toh(stream.ctf_trace_id); - session->streams[i].first_read = 1; - session->streams[i].mmap_base = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE, - MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - if (session->streams[i].mmap_base == MAP_FAILED) { - fprintf(stderr, "mmap error\n"); - ret = -1; - goto error; - } - - if (be32toh(stream.metadata_flag)) { - session->streams[i].metadata_flag = 1; - unlink("testlivetrace"); - mkdir("testlivetrace", S_IRWXU | S_IRWXG); - snprintf(session->streams[i].path, - sizeof(session->streams[i].path), - "testlivetrace/%s", - stream.channel_name); - ret = open(session->streams[i].path, - O_WRONLY | O_CREAT | O_TRUNC, - S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); + /* TODO : parse for IPv6 as well */ + /* Parse the hostname or IP */ + ret = sscanf(&path[proto_offset], "%[a-zA-Z.0-9%-]%s", + ctx->relay_hostname, remain[0]); + if (ret == 2) { + /* Optional port number */ + switch (remain[0][0]) { + case ':': + ret = sscanf(remain[0], ":%d%s", &ctx->port, remain[1]); + /* Optional session ID with port number */ + if (ret == 2) { + ret = sscanf(remain[1], "/%s", remain[2]); + /* Accept 0 or 1 (optional) */ + if (ret < 0) { + goto end; + } + } + break; + case '/': + /* Optional session ID */ + ret = sscanf(remain[0], "/%s", remain[2]); + /* Accept 0 or 1 (optional) */ if (ret < 0) { - goto error; + goto end; } - session->streams[i].fd = ret; + break; + default: + fprintf(stderr, "[error] wrong delimitor : %c\n", + remain[0][0]); + ret = -1; + goto end; } } - ret = 0; -end: -error: - return ret; -} - -#if 0 -/* useful debug */ -static -void dump_packet_index(struct lttng_packet_index *index) -{ - printf(" - index : %lu, %lu, %lu, %lu, %lu, %lu, %lu\n", - be64toh(index->offset), - be64toh(index->packet_size), - be64toh(index->content_size), - be64toh(index->timestamp_begin), - be64toh(index->timestamp_end), - be64toh(index->events_discarded), - be64toh(index->stream_id)); -} -#endif + if (ctx->port < 0) + ctx->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT; -static -int get_data_packet(int id, uint64_t offset, - uint64_t len) -{ - struct lttng_viewer_cmd cmd; - struct lttng_viewer_get_packet rq; - struct lttng_viewer_trace_packet rp; - int ret; - - cmd.cmd = htobe32(VIEWER_GET_PACKET); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; - - rq.stream_id = htobe64(session->streams[id].id); - /* Already in big endian. */ - rq.offset = offset; - rq.len = htobe32(len); - fprintf(stderr, " - get_packet "); - - do { - ret = send(control_sock, &cmd, sizeof(cmd), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error sending cmd\n"); - goto error; - } - do { - ret = send(control_sock, &rq, sizeof(rq), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error sending get_data_packet request\n"); - goto error; - } - do { - ret = recv(control_sock, &rp, sizeof(rp), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error receiving data response\n"); - goto error; - } - rp.flags = be32toh(rp.flags); - - switch (be32toh(rp.status)) { - case VIEWER_GET_PACKET_OK: - fprintf(stderr, "OK\n"); - break; - case VIEWER_GET_PACKET_RETRY: - fprintf(stderr, "RETRY\n"); - ret = -1; - goto end; - case VIEWER_GET_PACKET_ERR: - if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { - fprintf(stderr, "NEW_METADATA\n"); - ret = 0; - goto end; - } - fprintf(stderr, "ERR\n"); - ret = -1; - goto end; - default: - fprintf(stderr, "UNKNOWN\n"); - ret = -1; + if (strlen(remain[2]) == 0) { + printf_verbose("Connecting to hostname : %s, port : %d, " + "proto : IPv%d\n", + ctx->relay_hostname, ctx->port, proto); + ret = 0; goto end; } - - len = be32toh(rp.len); - fprintf(stderr, " - writing %" PRIu64" bytes to tracefile\n", len); - if (len <= 0) { + ret = sscanf(remain[2], "host/%[a-zA-Z.0-9%-]/%s", + ctx->traced_hostname, ctx->session_name); + if (ret != 2) { + fprintf(stderr, "[error] Format : " + "net:///host//\n"); goto end; } - if (len > mmap_size) { - fprintf(stderr, "mmap_size not big enough\n"); - ret = -1; - goto error; - } - - do { - ret = recv(control_sock, session->streams[id].mmap_base, len, MSG_WAITALL); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error receiving trace packet\n"); - goto error; - } + printf_verbose("Connecting to hostname : %s, port : %d, " + "traced hostname : %s, session name : %s, " + "proto : IPv%d\n", + ctx->relay_hostname, ctx->port, ctx->traced_hostname, + ctx->session_name, proto); + ret = 0; end: -error: return ret; } -/* - * Return number of metadata bytes written or a negative value on error. - */ -static -int get_new_metadata(int id) +static int lttng_live_open_trace_read(const char *path) { - struct lttng_viewer_cmd cmd; - struct lttng_viewer_get_metadata rq; - struct lttng_viewer_metadata_packet rp; - int ret; - uint64_t i; - char *data = NULL; - uint64_t len = 0; - int metadata_stream_id = -1; + int ret = 0; + struct lttng_live_ctx *ctx; - cmd.cmd = htobe32(VIEWER_GET_METADATA); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; + ctx = g_new0(struct lttng_live_ctx, 1); + ctx->session = g_new0(struct lttng_live_session, 1); - /* find the metadata stream for this ctf_trace */ - for (i = 0; i < session->stream_count; i++) { - if (session->streams[i].metadata_flag && - session->streams[i].ctf_trace_id == - session->streams[id].ctf_trace_id) { - metadata_stream_id = i; - break; - } - } - if (metadata_stream_id < 0) { - fprintf(stderr, "No metadata stream found\n"); - ret = -1; - goto error; - } + /* We need a pointer to the context from the packet_seek function. */ + ctx->session->ctx = ctx; - rq.stream_id = htobe64(session->streams[metadata_stream_id].id); - fprintf(stderr, " - get_metadata "); + /* HT to store the CTF traces. */ + ctx->session->ctf_traces = g_hash_table_new(g_direct_hash, + g_direct_equal); + ctx->port = -1; + ctx->session_ids = g_array_new(FALSE, TRUE, sizeof(uint64_t)); - do { - ret = send(control_sock, &cmd, sizeof(cmd), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error sending cmd\n"); - goto error; - } - do { - ret = send(control_sock, &rq, sizeof(rq), 0); - } while (ret < 0 && errno == EINTR); + ret = parse_url(path, ctx); if (ret < 0) { - fprintf(stderr, "Error sending get_metadata request\n"); - goto error; - } - do { - ret = recv(control_sock, &rp, sizeof(rp), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error receiving metadata response\n"); - goto error; - } - switch (be32toh(rp.status)) { - case VIEWER_METADATA_OK: - fprintf(stderr, "OK\n"); - break; - case VIEWER_NO_NEW_METADATA: - fprintf(stderr, "NO NEW\n"); - ret = -1; - goto end; - case VIEWER_METADATA_ERR: - fprintf(stderr, "ERR\n"); - ret = -1; - goto end; - default: - fprintf(stderr, "UNKNOWN\n"); - ret = -1; - goto end; + goto end_free; } - len = be64toh(rp.len); - fprintf(stderr, " - writing %" PRIu64" bytes to metadata\n", len); - if (len <= 0) { - goto end; + ret = lttng_live_connect_viewer(ctx); + if (ret < 0) { + fprintf(stderr, "[error] Connection failed\n"); + goto end_free; } + printf_verbose("LTTng-live connected to relayd\n"); - data = zmalloc(len); - if (!data) { - perror("relay data zmalloc"); - goto error; - } - do { - ret = recv(control_sock, data, len, MSG_WAITALL); - } while (ret < 0 && errno == EINTR); + ret = lttng_live_establish_connection(ctx); if (ret < 0) { - fprintf(stderr, "Error receiving trace packet\n"); - free(data); - goto error; + goto end_free; } - do { - ret = write(session->streams[metadata_stream_id].fd, data, len); - } while (ret < 0 && errno == EINTR); + printf_verbose("Listing sessions\n"); + ret = lttng_live_list_sessions(ctx, path); if (ret < 0) { - free(data); - goto error; + fprintf(stderr, "[error] List error\n"); + goto end_free; } - free(data); - /* FIXME : bad */ - ret = (int) len; -end: -error: + if (ctx->session_ids->len > 0) + lttng_live_read(ctx); + +end_free: + g_hash_table_destroy(ctx->session->ctf_traces); + g_free(ctx->session); + g_free(ctx->session->streams); + g_free(ctx); return ret; } -/* - * Get one index for a stream. - */ -int get_next_index(int id, struct packet_index *index) +static +struct bt_trace_descriptor *lttng_live_open_trace(const char *path, int flags, + void (*packet_seek)(struct bt_stream_pos *pos, size_t index, + int whence), FILE *metadata_fp) { - struct lttng_viewer_cmd cmd; - struct lttng_viewer_get_next_index rq; - struct lttng_viewer_index rp; - int ret; + struct ctf_text_stream_pos *pos; - cmd.cmd = htobe32(VIEWER_GET_NEXT_INDEX); - cmd.data_size = sizeof(rq); - cmd.cmd_version = 0; - - fprintf(stderr, " - get next index for stream %" PRIu64 "\n", - session->streams[id].id); - rq.stream_id = htobe64(session->streams[id].id); - -retry: - do { - ret = send(control_sock, &cmd, sizeof(cmd), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error sending cmd\n"); - goto error; - } - do { - ret = send(control_sock, &rq, sizeof(rq), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error sending get_next_index request\n"); - goto error; - } - do { - ret = recv(control_sock, &rp, sizeof(rp), 0); - } while (ret < 0 && errno == EINTR); - if (ret < 0) { - fprintf(stderr, "Error receiving index response\n"); - goto error; - } - fprintf(stderr, " - reply : %u ", be32toh(rp.status)); - - rp.flags = be32toh(rp.flags); - - switch (be32toh(rp.status)) { - case VIEWER_INDEX_INACTIVE: - fprintf(stderr, "(INACTIVE)\n"); - memset(index, 0, sizeof(struct packet_index)); - index->timestamp_end = be64toh(rp.timestamp_end); - break; - case VIEWER_INDEX_OK: - fprintf(stderr, "(OK), need metadata update : %u\n", - rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA); - index->offset = be64toh(rp.offset); - index->packet_size = be64toh(rp.packet_size); - index->content_size = be64toh(rp.content_size); - index->timestamp_begin = be64toh(rp.timestamp_begin); - index->timestamp_end = be64toh(rp.timestamp_end); - index->events_discarded = be64toh(rp.events_discarded); - - if (rp.flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { - fprintf(stderr, "NEW METADATA NEEDED\n"); - ret = get_new_metadata(id); - if (ret < 0) { - goto error; - } - } + switch (flags & O_ACCMODE) { + case O_RDONLY: + /* OK */ break; - case VIEWER_INDEX_RETRY: - fprintf(stderr, "(RETRY)\n"); - sleep(1); - goto retry; - case VIEWER_INDEX_HUP: - fprintf(stderr, "(HUP)\n"); - session->streams[id].id = -1ULL; - session->streams[id].fd = -1; - break; - case VIEWER_INDEX_ERR: - fprintf(stderr, "(ERR)\n"); - ret = -1; + case O_RDWR: + fprintf(stderr, "[error] lttng live plugin cannot be used as output plugin.\n"); goto error; default: - fprintf(stderr, "SHOULD NOT HAPPEN\n"); - ret = -1; + fprintf(stderr, "[error] Incorrect open flags.\n"); goto error; } + pos = g_new0(struct ctf_text_stream_pos, 1); + pos->parent.rw_table = NULL; + pos->parent.event_cb = NULL; + pos->parent.trace = &pos->trace_descriptor; + lttng_live_open_trace_read(path); + return &pos->trace_descriptor; + error: - return ret; + return NULL; } -void ctf_live_packet_seek(struct bt_stream_pos *stream_pos, size_t index, - int whence) +static +int lttng_live_close_trace(struct bt_trace_descriptor *td) { - struct ctf_stream_pos *pos; - struct ctf_file_stream *file_stream; - struct packet_index packet_index; - int ret; - - pos = ctf_pos(stream_pos); - file_stream = container_of(pos, struct ctf_file_stream, pos); - - fprintf(stderr, "BT GET_NEXT_INDEX %d\n", pos->fd); - ret = get_next_index(pos->fd, &packet_index); - if (ret < 0) { - fprintf(stderr, "get_next_index failed\n"); - return; - } - - pos->packet_size = packet_index.packet_size; - pos->content_size = packet_index.content_size; - pos->mmap_base_offset = 0; - pos->offset = 0; - if (packet_index.offset == EOF) { - pos->offset = EOF; - } else { - pos->offset = 0; - } - - file_stream->parent.cycles_timestamp = packet_index.timestamp_end; - file_stream->parent.real_timestamp = ctf_get_real_timestamp( - &file_stream->parent, packet_index.timestamp_end); - - if (pos->packet_size == 0) { - goto end; - } - - fprintf(stderr, "BT GET_DATA_PACKET\n"); - ret = get_data_packet(pos->fd, be64toh(packet_index.offset), - packet_index.packet_size / CHAR_BIT); - if (ret < 0) { - fprintf(stderr, "get_data_packet failed"); - return; - } - - fprintf(stderr, "BT MMAP %d\n", pos->fd); - fprintf(stderr, "packet_size : %lu, offset %lu, content_size %lu, timestamp_end : %lu, real : %lu\n", - packet_index.packet_size, - packet_index.offset, - packet_index.content_size, - packet_index.timestamp_end, - ctf_get_real_timestamp( - &file_stream->parent, packet_index.timestamp_end)); - if (!pos->base_mma) { - pos->base_mma = zmalloc(sizeof(*pos->base_mma)); - if (!pos->base_mma) { - fprintf(stderr, "alloc pos->base_mma\n"); - return; - } - } - - mmap_align_set_addr(pos->base_mma, session->streams[pos->fd].mmap_base); - if (pos->base_mma == MAP_FAILED) { - perror("Error mmaping"); - return; - } - - /* update trace_packet_header and stream_packet_context */ - if (pos->prot != PROT_WRITE && file_stream->parent.trace_packet_header) { - /* Read packet header */ - ret = generic_rw(&pos->parent, &file_stream->parent.trace_packet_header->p); - assert(!ret); - } - if (pos->prot != PROT_WRITE && file_stream->parent.stream_packet_context) { - /* Read packet context */ - ret = generic_rw(&pos->parent, &file_stream->parent.stream_packet_context->p); - assert(!ret); - } - -end: - return; + struct ctf_text_stream_pos *pos = + container_of(td, struct ctf_text_stream_pos, + trace_descriptor); + free(pos); + return 0; } -int open_trace(struct bt_context **bt_ctx) -{ - struct bt_mmap_stream *new_mmap_stream; - struct bt_mmap_stream_list mmap_list; - FILE *metadata_fp = NULL; - int i; - int ret = 0; - - *bt_ctx = bt_context_create(); - BT_INIT_LIST_HEAD(&mmap_list.head); - - for (i = 0; i < session->stream_count; i++) { - int total_metadata = 0; - - if (!session->streams[i].metadata_flag) { - new_mmap_stream = zmalloc(sizeof(struct bt_mmap_stream)); - /* - * The FD is unused when we handle manually the - * packet seek, so we store here the ID of the - * stream in our stream list to be able to use it - * later. - */ - new_mmap_stream->fd = i; - bt_list_add(&new_mmap_stream->list, &mmap_list.head); - } else { - /* Get all possible metadata before starting */ - do { - ret = get_new_metadata(i); - if (ret > 0) { - total_metadata += ret; - } - } while (ret > 0 || total_metadata == 0); - metadata_fp = fopen(session->streams[i].path, "r"); - } - } - - if (!metadata_fp) { - fprintf(stderr, "No metadata stream opened\n"); - goto end; - } - - ret = bt_context_add_trace(*bt_ctx, NULL, "ctf", - ctf_live_packet_seek, &mmap_list, metadata_fp); - if (ret < 0) { - fprintf(stderr, "Error adding trace\n"); - goto end; - } - - /* - begin_pos.type = BT_SEEK_BEGIN; - iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL); - while ((event = bt_ctf_iter_read_event(iter)) != NULL) { - if (!skip) { - ret = sout->parent.event_cb(&sout->parent, event->parent->stream); - if (ret) { - fprintf(stderr, "[error] Writing event failed.\n"); - goto end; - } - } - - ret = bt_iter_next(bt_ctf_get_iter(iter)); - if (ret < 0) { - goto end; - } else if (ret == EAGAIN) { - skip = 1; - continue; - } - skip = 0; - } - */ - -end: - return ret; -} +static +struct bt_format lttng_live_format = { + .open_trace = lttng_live_open_trace, + .close_trace = lttng_live_close_trace, +}; -int setup_network_live(char *hostname, int begin) +static +void __attribute__((constructor)) lttng_live_init(void) { int ret; - int session_id; - session = zmalloc(sizeof(struct live_session)); - if (!session) { - goto error; - } - - ret = connect_viewer(hostname); - if (ret < 0) { - goto error; - } - fprintf(stderr, "* Connected\n"); - - fprintf(stderr, "* Establish connection and version check\n"); - ret = establish_connection(); - if (ret < 0) { - goto error; - } - - fprintf(stderr, "* List sessions\n"); - ret = list_sessions(); - if (ret < 0) { - fprintf(stderr, "* List error\n"); - goto error; - } else if (ret == 0) { - fprintf(stderr, "* No session to attach to, exiting\n"); - ret = 0; - goto end; - } - session_id = ret; - - do { - fprintf(stderr, "* Attach session %d\n", ret); - ret = attach_session(session_id, begin); - if (ret < 0) { - goto error; - } - } while (session->stream_count == 0); - -end: - return 0; + lttng_live_format.name = g_quark_from_static_string("lttng-live"); + ret = bt_register_format(<tng_live_format); + assert(!ret); +} -error: - free(session->streams); - fprintf(stderr, "* Exiting %d\n", ret); - return ret; +static +void __attribute__((destructor)) lttng_live_exit(void) +{ + bt_unregister_format(<tng_live_format); }