X-Git-Url: https://git.lttng.org/?p=lttngtop.git;a=blobdiff_plain;f=src%2Flttngtop.c;h=bae4fad217b95bc8dc0c474e81a00c394d47fd86;hp=f15eea5518f4119c896114da3e4936f8caf860e7;hb=6112d0d4436698a4e5625b89309d7ac8864ebef1;hpb=16b22a0fe150f4923b9902e802bbf28bacce8d0e diff --git a/src/lttngtop.c b/src/lttngtop.c index f15eea5..bae4fad 100644 --- a/src/lttngtop.c +++ b/src/lttngtop.c @@ -40,6 +40,7 @@ #include #include #include +#include #include "lttngtoptypes.h" #include "cputop.h" @@ -54,16 +55,25 @@ const char *opt_input_path; struct lttngtop *copy; pthread_t display_thread; pthread_t timer_thread; +pthread_t live_trace_thread; unsigned long refresh_display = 1 * NSEC_PER_SEC; unsigned long last_display_update = 0; int quit = 0; /* LIVE */ +pthread_t thread_live_consume; /* list of FDs available for being read with snapshots */ struct mmap_stream_list mmap_list; GPtrArray *lttng_consumer_stream_array; int sessiond_metadata, consumerd_metadata; +struct lttng_consumer_local_data *ctx = NULL; +/* list of snapshots currently not consumed */ +GPtrArray *available_snapshots; +sem_t metadata_available; +FILE *metadata_fp; +int trace_opened = 0; +int metadata_ready = 0; enum { OPT_NONE = 0, @@ -122,6 +132,45 @@ void *ncurses_display(void *p) } } +/* FIXME : TMP */ +struct tm ts_format_timestamp(uint64_t timestamp) +{ + struct tm tm; + uint64_t ts_sec = 0, ts_nsec; + time_t time_s; + + ts_nsec = timestamp; + ts_sec += ts_nsec / NSEC_PER_SEC; + ts_nsec = ts_nsec % NSEC_PER_SEC; + + time_s = (time_t) ts_sec; + + localtime_r(&time_s, &tm); + + return tm; +} + +/* + * hook on each event to check the timestamp and refresh the display if + * necessary + */ +enum bt_cb_ret print_timestamp(struct bt_ctf_event *call_data, void *private_data) +{ + unsigned long timestamp; + struct tm start; + uint64_t ts_nsec_start, ts_nsec_end; + + + timestamp = bt_ctf_get_timestamp(call_data); + + start = ts_format_timestamp(timestamp); + ts_nsec_start = timestamp % NSEC_PER_SEC; + +// printf("%02d:%02d:%02d.%09" PRIu64 "\n", start.tm_hour, start.tm_min, start.tm_sec, ts_nsec_start); + + return BT_CB_OK; +} + /* * hook on each event to check the timestamp and refresh the display if * necessary @@ -380,6 +429,11 @@ void iter_trace(struct bt_context *bt_ctx) begin_pos.type = BT_SEEK_BEGIN; iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL); + bt_ctf_iter_add_callback(iter, 0, NULL, 0, + print_timestamp, + NULL, NULL, NULL); + +#if 0 /* at each event check if we need to refresh */ bt_ctf_iter_add_callback(iter, 0, NULL, 0, check_timestamp, @@ -424,7 +478,7 @@ void iter_trace(struct bt_context *bt_ctx) "lttng_statedump_file_descriptor"), NULL, 0, handle_statedump_file_descriptor, NULL, NULL, NULL); - +#endif while ((event = bt_ctf_iter_read_event(iter)) != NULL) { ret = bt_iter_next(bt_ctf_get_iter(iter)); if (ret < 0) @@ -634,6 +688,84 @@ int check_requirements(struct bt_context *ctx) return ret; } +void dump_snapshot() +{ +#if 0 + struct lttng_consumer_stream *iter; + unsigned long spos; + struct mmap_stream *new_snapshot; + + int ret = 0; + int i; + /* + * try lock mutex ressource courante (overrun) + * if fail : overrun + * stop trace (flush implicite avant stop) + * lttng_consumer_take_snapshot + * read timestamp packet end (use time as end pos) + * - stream_packet_context + * - reculer de 1 subbuf : pos - max_subbuff_size + * + * - position de fin (take_snapshot) + * - mov_pos_slow ( fin - max_subbuff_size) lire timestamp packet end + * - prend min(end) (activité sur tous les streams) + * + * start trace + * unlock mutex + */ + + helper_kernctl_buffer_flush(consumerd_metadata); + for (i = 0; i < lttng_consumer_stream_array->len; i++) { + iter = g_ptr_array_index(lttng_consumer_stream_array, i); + helper_kernctl_buffer_flush(helper_get_lttng_consumer_stream_wait_fd(iter)); + printf("Taking snapshot of fd : %d\n", helper_get_lttng_consumer_stream_wait_fd(iter)); + ret = helper_lttng_consumer_take_snapshot(ctx, iter); + if (ret != 0) { + ret = errno; + perror("lttng_consumer_take_snapshots"); + goto end; + } + } + for (i = 0; i < lttng_consumer_stream_array->len; i++) { + iter = g_ptr_array_index(lttng_consumer_stream_array, i); + ret = helper_lttng_consumer_get_produced_snapshot(ctx, iter, &spos); + if (ret != 0) { + ret = errno; + perror("helper_lttng_consumer_get_produced_snapshot"); + goto end; + } + while (helper_get_lttng_consumer_stream_wait_last_pos(iter) < spos) { + new_snapshot = g_new0(struct mmap_stream, 1); + new_snapshot->fd = helper_get_lttng_consumer_stream_wait_fd(iter); + new_snapshot->last_pos = helper_get_lttng_consumer_stream_wait_last_pos(iter); + fprintf(stderr,"ADDING AVAILABLE SNAPSHOT ON FD %d AT POSITION %lu\n", + new_snapshot->fd, + new_snapshot->last_pos); + g_ptr_array_add(available_snapshots, new_snapshot); + helper_set_lttng_consumer_stream_wait_last_pos(iter, + helper_get_lttng_consumer_stream_wait_last_pos(iter) + + helper_get_lttng_consumer_stream_chan_max_sb_size(iter)); + } + } + + if (!metadata_ready) { + fprintf(stderr, "BLOCKING BEFORE METADATA\n"); + sem_wait(&metadata_available); + fprintf(stderr,"OPENING TRACE\n"); + if (access("/tmp/livesession/kernel/metadata", F_OK) != 0) { + fprintf(stderr,"NO METADATA FILE, SKIPPING\n"); + return; + } + metadata_ready = 1; + metadata_fp = fopen("/tmp/livesession/kernel/metadata", "r"); + } + + +end: + return; +#endif +} + ssize_t read_subbuffer(struct lttng_consumer_stream *kconsumerd_fd, struct lttng_consumer_local_data *ctx) { @@ -679,7 +811,7 @@ ssize_t read_subbuffer(struct lttng_consumer_stream *kconsumerd_fd, "it is due to concurrency)"); goto end; } -// sem_post(&metadata_available); + sem_post(&metadata_available); } end: @@ -700,7 +832,6 @@ int on_recv_fd(struct lttng_consumer_stream *kconsumerd_fd) struct mmap_stream *new_info; size_t tmp_mmap_len; - printf("Receiving %d\n", helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd)); /* Opening the tracefile in write mode */ if (helper_get_lttng_consumer_stream_path_name(kconsumerd_fd) != NULL) { ret = open(helper_get_lttng_consumer_stream_path_name(kconsumerd_fd), @@ -726,7 +857,6 @@ int on_recv_fd(struct lttng_consumer_stream *kconsumerd_fd) goto end; } helper_set_lttng_consumer_stream_mmap_len(kconsumerd_fd, tmp_mmap_len); - printf("mmap len : %ld\n", tmp_mmap_len); helper_set_lttng_consumer_stream_mmap_base(kconsumerd_fd, mmap(NULL, helper_get_lttng_consumer_stream_mmap_len(kconsumerd_fd), @@ -738,6 +868,7 @@ int on_recv_fd(struct lttng_consumer_stream *kconsumerd_fd) } g_ptr_array_add(lttng_consumer_stream_array, kconsumerd_fd); + /* keep mmap FDs internally */ ret = 1; } else { consumerd_metadata = helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd); @@ -749,6 +880,40 @@ end: return ret; } +void *live_consume() +{ + struct bt_context *bt_ctx = NULL; + int ret; + + while (1) { +// dump_snapshot(); + + if (!metadata_ready) { + fprintf(stderr, "BLOCKING BEFORE METADATA\n"); + sem_wait(&metadata_available); + fprintf(stderr,"OPENING TRACE\n"); + if (access("/tmp/livesession/kernel/metadata", F_OK) != 0) { + fprintf(stderr,"NO METADATA FILE, SKIPPING\n"); + return NULL; + } + metadata_ready = 1; + metadata_fp = fopen("/tmp/livesession/kernel/metadata", "r"); + } + + if (!trace_opened) { + bt_ctx = bt_context_create(); + ret = bt_context_add_trace(bt_ctx, NULL, "ctf", + lttngtop_ctf_packet_seek, &mmap_list, metadata_fp); + if (ret < 0) { + printf("Error adding trace\n"); + return NULL; + } + trace_opened = 1; + } + iter_trace(bt_ctx); + sleep(1); + } +} int setup_consumer(char *command_sock_path, pthread_t *threads, struct lttng_consumer_local_data *ctx) @@ -768,14 +933,14 @@ int setup_consumer(char *command_sock_path, pthread_t *threads, ret = pthread_create(&threads[0], NULL, helper_lttng_consumer_thread_receive_fds, (void *) ctx); if (ret != 0) { - perror("pthread_create"); + perror("pthread_create receive fd"); goto end; } /* Create thread to manage the polling/writing of traces */ ret = pthread_create(&threads[1], NULL, helper_lttng_consumer_thread_poll_fds, (void *) ctx); if (ret != 0) { - perror("pthread_create"); + perror("pthread_create poll fd"); goto end; } @@ -783,19 +948,16 @@ end: return ret; } -int setup_live_tracing() +void *setup_live_tracing() { struct lttng_domain dom; struct lttng_channel chan; char *channel_name = "mmapchan"; - struct lttng_consumer_local_data *ctx = NULL; struct lttng_event ev; int ret = 0; char *command_sock_path = "/tmp/consumerd_sock"; static pthread_t threads[2]; /* recv_fd, poll */ struct lttng_event_context kctxpid, kctxcomm, kctxppid, kctxtid; - /* list of snapshots currently not consumed */ - GPtrArray *available_snapshots; struct lttng_handle *handle; @@ -833,9 +995,9 @@ int setup_live_tracing() } strcpy(chan.name, channel_name); - chan.attr.overwrite = 1; -// chan.attr.subbuf_size = 32768; - chan.attr.subbuf_size = 1048576; /* 1MB */ + chan.attr.overwrite = 0; + chan.attr.subbuf_size = 32768; +// chan.attr.subbuf_size = 1048576; /* 1MB */ chan.attr.num_subbuf = 4; chan.attr.switch_timer_interval = 0; chan.attr.read_timer_interval = 200; @@ -870,18 +1032,23 @@ int setup_live_tracing() } helper_kernctl_buffer_flush(consumerd_metadata); - sleep(10); - lttng_stop_tracing("test"); - lttng_destroy_session("test"); + /* Create thread to manage the polling/writing of traces */ + ret = pthread_create(&thread_live_consume, NULL, live_consume, NULL); + if (ret != 0) { + perror("pthread_create"); + goto end; + } + +// pthread_cancel(live_trace_thread); /* block until metadata is ready */ - //sem_init(&metadata_available, 0, 0); + sem_init(&metadata_available, 0, 0); //init_lttngtop(); end: - return ret; + return NULL; } int main(int argc, char **argv) @@ -900,7 +1067,15 @@ int main(int argc, char **argv) if (!opt_input_path) { printf("live tracing enabled\n"); - setup_live_tracing(); + pthread_create(&live_trace_thread, NULL, setup_live_tracing, (void *) NULL); + sleep(20); + printf("STOPPING\n"); + lttng_stop_tracing("test"); + printf("DESTROYING\n"); + lttng_destroy_session("test"); + + printf("CANCELLING\n"); + pthread_cancel(live_trace_thread); goto end; } else { init_lttngtop();