X-Git-Url: https://git.lttng.org/?p=lttngtop.git;a=blobdiff_plain;f=src%2Flttngtop.c;h=15d44e02e08eec5660d607c6b593146646e9e4e4;hp=f78a8653580e7f050e7aac2f298ced97ac0feb19;hb=b520ab458ea26829d00fca98f4bbb90309fd30df;hpb=da401e9c85ea307f2837d9c80eba8636273d9232 diff --git a/src/lttngtop.c b/src/lttngtop.c index f78a865..15d44e0 100644 --- a/src/lttngtop.c +++ b/src/lttngtop.c @@ -37,6 +37,10 @@ #include #include #include +#include +#include +#include +#include #include "lttngtoptypes.h" #include "cputop.h" @@ -51,11 +55,26 @@ const char *opt_input_path; struct lttngtop *copy; pthread_t display_thread; pthread_t timer_thread; +pthread_t live_trace_thread; unsigned long refresh_display = 1 * NSEC_PER_SEC; unsigned long last_display_update = 0; int quit = 0; +/* LIVE */ +pthread_t thread_live_consume; +/* list of FDs available for being read with snapshots */ +struct mmap_stream_list mmap_list; +GPtrArray *lttng_consumer_stream_array; +int sessiond_metadata, consumerd_metadata; +struct lttng_consumer_local_data *ctx = NULL; +/* list of snapshots currently not consumed */ +GPtrArray *available_snapshots; +sem_t metadata_available; +FILE *metadata_fp; +int trace_opened = 0; +int metadata_ready = 0; + enum { OPT_NONE = 0, OPT_HELP, @@ -113,6 +132,29 @@ void *ncurses_display(void *p) } } +/* + * hook on each event to check the timestamp and refresh the display if + * necessary + */ +enum bt_cb_ret print_timestamp(struct bt_ctf_event *call_data, void *private_data) +{ + unsigned long timestamp; + struct tm start; + uint64_t ts_nsec_start; + + + timestamp = bt_ctf_get_timestamp(call_data); + + start = format_timestamp(timestamp); + ts_nsec_start = timestamp % NSEC_PER_SEC; + + printf("%02d:%02d:%02d.%09" PRIu64 " %s\n", start.tm_hour, + start.tm_min, start.tm_sec, ts_nsec_start, + bt_ctf_event_name(call_data)); + + return BT_CB_OK; +} + /* * hook on each event to check the timestamp and refresh the display if * necessary @@ -190,12 +232,12 @@ void update_perf_value(struct processtop *proc, struct cputime *cpu, } void extract_perf_counter_scope(const struct bt_ctf_event *event, - const struct definition *scope, + const struct bt_definition *scope, struct processtop *proc, struct cputime *cpu) { - struct definition const * const *list = NULL; - const struct definition *field; + struct bt_definition const * const *list = NULL; + const struct bt_definition *field; unsigned int count; struct perfcounter *perfcounter; GHashTableIter iter; @@ -230,7 +272,7 @@ end: void update_perf_counter(struct processtop *proc, const struct bt_ctf_event *event) { struct cputime *cpu; - const struct definition *scope; + const struct bt_definition *scope; cpu = get_cpu(get_cpu_id(event)); @@ -337,11 +379,6 @@ static int parse_options(int argc, char **argv) poptContext pc; int opt, ret = 0; - if (argc == 1) { - usage(stdout); - return 1; /* exit cleanly */ - } - pc = poptGetContext(NULL, argc, (const char **) argv, long_options, 0); poptReadDefaultConfig(pc, 0); @@ -358,10 +395,7 @@ static int parse_options(int argc, char **argv) } opt_input_path = poptGetArg(pc); - if (!opt_input_path) { - ret = -EINVAL; - goto end; - } + end: if (pc) { poptFreeContext(pc); @@ -379,6 +413,11 @@ void iter_trace(struct bt_context *bt_ctx) begin_pos.type = BT_SEEK_BEGIN; iter = bt_ctf_iter_create(bt_ctx, &begin_pos, NULL); + bt_ctf_iter_add_callback(iter, 0, NULL, 0, + print_timestamp, + NULL, NULL, NULL); + +#if 0 /* at each event check if we need to refresh */ bt_ctf_iter_add_callback(iter, 0, NULL, 0, check_timestamp, @@ -423,7 +462,7 @@ void iter_trace(struct bt_context *bt_ctx) "lttng_statedump_file_descriptor"), NULL, 0, handle_statedump_file_descriptor, NULL, NULL, NULL); - +#endif while ((event = bt_ctf_iter_read_event(iter)) != NULL) { ret = bt_iter_next(bt_ctf_get_iter(iter)); if (ret < 0) @@ -451,7 +490,7 @@ end_iter: */ int bt_context_add_traces_recursive(struct bt_context *ctx, const char *path, const char *format_str, - void (*packet_seek)(struct stream_pos *pos, + void (*packet_seek)(struct bt_stream_pos *pos, size_t offset, int whence)) { FTS *tree; @@ -633,6 +672,262 @@ int check_requirements(struct bt_context *ctx) return ret; } +ssize_t read_subbuffer(struct lttng_consumer_stream *kconsumerd_fd, + struct lttng_consumer_local_data *ctx) +{ + unsigned long len; + int err; + long ret = 0; + int infd = helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd); + + if (helper_get_lttng_consumer_stream_output(kconsumerd_fd) == LTTNG_EVENT_SPLICE) { + /* Get the next subbuffer */ + err = helper_kernctl_get_next_subbuf(infd); + if (err != 0) { + ret = errno; + perror("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + /* read the whole subbuffer */ + err = helper_kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + + /* splice the subbuffer to the tracefile */ + ret = helper_lttng_consumer_on_read_subbuffer_splice(ctx, kconsumerd_fd, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + fprintf(stderr,"Error splicing to tracefile\n"); + } + err = helper_kernctl_put_next_subbuf(infd); + if (err != 0) { + ret = errno; + perror("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + sem_post(&metadata_available); + } + +end: + return 0; +} + +int on_update_fd(int key, uint32_t state) +{ + /* let the lib handle the metadata FD */ + if (key == sessiond_metadata) + return 0; + return 1; +} + +int on_recv_fd(struct lttng_consumer_stream *kconsumerd_fd) +{ + int ret; + struct mmap_stream *new_mmap_stream; + + /* Opening the tracefile in write mode */ + if (helper_get_lttng_consumer_stream_path_name(kconsumerd_fd) != NULL) { + ret = open(helper_get_lttng_consumer_stream_path_name(kconsumerd_fd), + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + perror("open"); + goto end; + } + helper_set_lttng_consumer_stream_out_fd(kconsumerd_fd, ret); + } + + if (helper_get_lttng_consumer_stream_output(kconsumerd_fd) == LTTNG_EVENT_MMAP) { + new_mmap_stream = malloc(sizeof(struct mmap_stream)); + new_mmap_stream->fd = helper_get_lttng_consumer_stream_wait_fd( + kconsumerd_fd); + bt_list_add(&new_mmap_stream->list, &mmap_list.head); + + g_ptr_array_add(lttng_consumer_stream_array, kconsumerd_fd); + /* keep mmap FDs internally */ + ret = 1; + } else { + consumerd_metadata = helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd); + sessiond_metadata = helper_get_lttng_consumer_stream_key(kconsumerd_fd); + ret = 0; + } + +end: + return ret; +} + +void *live_consume() +{ + struct bt_context *bt_ctx = NULL; + int ret; + + if (!metadata_ready) { + sem_wait(&metadata_available); + if (access("/tmp/livesession/kernel/metadata", F_OK) != 0) { + fprintf(stderr,"no metadata\n"); + return NULL; + } + metadata_ready = 1; + metadata_fp = fopen("/tmp/livesession/kernel/metadata", "r"); + } + + if (!trace_opened) { + bt_ctx = bt_context_create(); + ret = bt_context_add_trace(bt_ctx, NULL, "ctf", + lttngtop_ctf_packet_seek, &mmap_list, metadata_fp); + if (ret < 0) { + printf("Error adding trace\n"); + return NULL; + } + trace_opened = 1; + } + iter_trace(bt_ctx); + + return NULL; +} + +int setup_consumer(char *command_sock_path, pthread_t *threads, + struct lttng_consumer_local_data *ctx) +{ + int ret = 0; + + ctx = helper_lttng_consumer_create(HELPER_LTTNG_CONSUMER_KERNEL, + read_subbuffer, NULL, on_recv_fd, on_update_fd); + if (!ctx) + goto end; + + unlink(command_sock_path); + helper_lttng_consumer_set_command_sock_path(ctx, command_sock_path); + helper_lttng_consumer_init(); + + /* Create the thread to manage the receive of fd */ + ret = pthread_create(&threads[0], NULL, helper_lttng_consumer_thread_receive_fds, + (void *) ctx); + if (ret != 0) { + perror("pthread_create receive fd"); + goto end; + } + /* Create thread to manage the polling/writing of traces */ + ret = pthread_create(&threads[1], NULL, helper_lttng_consumer_thread_poll_fds, + (void *) ctx); + if (ret != 0) { + perror("pthread_create poll fd"); + goto end; + } + +end: + return ret; +} + +void *setup_live_tracing() +{ + struct lttng_domain dom; + struct lttng_channel chan; + char *channel_name = "mmapchan"; + struct lttng_event ev; + int ret = 0; + char *command_sock_path = "/tmp/consumerd_sock"; + static pthread_t threads[2]; /* recv_fd, poll */ + struct lttng_event_context kctxpid, kctxcomm, kctxppid, kctxtid; + + struct lttng_handle *handle; + + BT_INIT_LIST_HEAD(&mmap_list.head); + + lttng_consumer_stream_array = g_ptr_array_new(); + + if ((ret = setup_consumer(command_sock_path, threads, ctx)) < 0) { + fprintf(stderr,"error setting up consumer\n"); + goto end; + } + + available_snapshots = g_ptr_array_new(); + + /* setup the session */ + dom.type = LTTNG_DOMAIN_KERNEL; + + ret = system("rm -rf /tmp/livesession"); + + lttng_destroy_session("test"); + if ((ret = lttng_create_session("test", "/tmp/livesession")) < 0) { + fprintf(stderr,"error creating the session : %s\n", + helper_lttcomm_get_readable_code(ret)); + goto end; + } + + if ((handle = lttng_create_handle("test", &dom)) == NULL) { + fprintf(stderr,"error creating handle\n"); + goto end; + } + + if ((ret = lttng_register_consumer(handle, command_sock_path)) < 0) { + fprintf(stderr,"error registering consumer : %s\n", + helper_lttcomm_get_readable_code(ret)); + goto end; + } + + strcpy(chan.name, channel_name); + chan.attr.overwrite = 0; + chan.attr.subbuf_size = 32768; +// chan.attr.subbuf_size = 1048576; /* 1MB */ + chan.attr.num_subbuf = 4; + chan.attr.switch_timer_interval = 0; + chan.attr.read_timer_interval = 200; + chan.attr.output = LTTNG_EVENT_MMAP; + + if ((ret = lttng_enable_channel(handle, &chan)) < 0) { + fprintf(stderr,"error creating channel : %s\n", helper_lttcomm_get_readable_code(ret)); + goto end; + } + + sprintf(ev.name, "sched_switch"); + ev.type = LTTNG_EVENT_TRACEPOINT; + + //if ((ret = lttng_enable_event(handle, NULL, channel_name)) < 0) { + if ((ret = lttng_enable_event(handle, &ev, channel_name)) < 0) { + fprintf(stderr,"error enabling event : %s\n", helper_lttcomm_get_readable_code(ret)); + goto end; + } + + kctxpid.ctx = LTTNG_EVENT_CONTEXT_PID; + lttng_add_context(handle, &kctxpid, NULL, NULL); + kctxppid.ctx = LTTNG_EVENT_CONTEXT_PPID; + lttng_add_context(handle, &kctxppid, NULL, NULL); + kctxcomm.ctx = LTTNG_EVENT_CONTEXT_PROCNAME; + lttng_add_context(handle, &kctxcomm, NULL, NULL); + kctxtid.ctx = LTTNG_EVENT_CONTEXT_TID; + lttng_add_context(handle, &kctxtid, NULL, NULL); + + if ((ret = lttng_start_tracing("test")) < 0) { + fprintf(stderr,"error starting tracing : %s\n", helper_lttcomm_get_readable_code(ret)); + goto end; + } + + helper_kernctl_buffer_flush(consumerd_metadata); + + /* Create thread to manage the polling/writing of traces */ + ret = pthread_create(&thread_live_consume, NULL, live_consume, NULL); + if (ret != 0) { + perror("pthread_create"); + goto end; + } + + /* block until metadata is ready */ + sem_init(&metadata_available, 0, 0); + + //init_lttngtop(); + +end: + return NULL; +} + int main(int argc, char **argv) { int ret; @@ -647,31 +942,45 @@ int main(int argc, char **argv) exit(EXIT_SUCCESS); } - init_lttngtop(); - - bt_ctx = bt_context_create(); - ret = bt_context_add_traces_recursive(bt_ctx, opt_input_path, "ctf", NULL); - if (ret < 0) { - fprintf(stderr, "[error] Opening the trace\n"); - goto end; - } - - ret = check_requirements(bt_ctx); - if (ret < 0) { - fprintf(stderr, "[error] some mandatory contexts were missing, exiting.\n"); + if (!opt_input_path) { + pthread_create(&live_trace_thread, NULL, setup_live_tracing, (void *) NULL); + sleep(2000); + printf("STOPPING\n"); + lttng_stop_tracing("test"); + printf("DESTROYING\n"); + lttng_destroy_session("test"); + + printf("CANCELLING\n"); + pthread_cancel(live_trace_thread); goto end; - } + } else { + init_lttngtop(); + + bt_ctx = bt_context_create(); + ret = bt_context_add_traces_recursive(bt_ctx, opt_input_path, "ctf", NULL); + if (ret < 0) { + fprintf(stderr, "[error] Opening the trace\n"); + goto end; + } - pthread_create(&display_thread, NULL, ncurses_display, (void *) NULL); - pthread_create(&timer_thread, NULL, refresh_thread, (void *) NULL); + ret = check_requirements(bt_ctx); + if (ret < 0) { + fprintf(stderr, "[error] some mandatory contexts were missing, exiting.\n"); + goto end; + } + pthread_create(&display_thread, NULL, ncurses_display, (void *) NULL); + pthread_create(&timer_thread, NULL, refresh_thread, (void *) NULL); - iter_trace(bt_ctx); + iter_trace(bt_ctx); - quit = 1; - pthread_join(display_thread, NULL); - pthread_join(timer_thread, NULL); + quit = 1; + pthread_join(display_thread, NULL); + pthread_join(timer_thread, NULL); + } end: - bt_context_put(bt_ctx); + if (bt_ctx) + bt_context_put(bt_ctx); + return 0; }