X-Git-Url: https://git.lttng.org/?p=lttngtop.git;a=blobdiff_plain;f=src%2Flttngtop.c;h=fd23e74da416c746cf437586ecb275d3c0628a2d;hp=2ee303a318dccd6028b9994193e3174a1eec377e;hb=c78f2cdca5c8f39eddd468a7c4ec6c373ed5495e;hpb=bb6d72a22332433cbe10ae9d88627be7637b84b1 diff --git a/src/lttngtop.c b/src/lttngtop.c index 2ee303a..fd23e74 100644 --- a/src/lttngtop.c +++ b/src/lttngtop.c @@ -37,6 +37,10 @@ #include #include #include +#include +#include +#include +#include #include "lttngtoptypes.h" #include "cputop.h" @@ -51,11 +55,26 @@ const char *opt_input_path; struct lttngtop *copy; pthread_t display_thread; pthread_t timer_thread; +pthread_t live_trace_thread; unsigned long refresh_display = 1 * NSEC_PER_SEC; unsigned long last_display_update = 0; int quit = 0; +/* LIVE */ +pthread_t thread_live_consume; +/* list of FDs available for being read with snapshots */ +struct mmap_stream_list mmap_list; +GPtrArray *lttng_consumer_stream_array; +int sessiond_metadata, consumerd_metadata; +struct lttng_consumer_local_data *ctx = NULL; +/* list of snapshots currently not consumed */ +GPtrArray *available_snapshots; +sem_t metadata_available; +FILE *metadata_fp; +int trace_opened = 0; +int metadata_ready = 0; + enum { OPT_NONE = 0, OPT_HELP, @@ -190,12 +209,12 @@ void update_perf_value(struct processtop *proc, struct cputime *cpu, } void extract_perf_counter_scope(const struct bt_ctf_event *event, - const struct definition *scope, + const struct bt_definition *scope, struct processtop *proc, struct cputime *cpu) { - struct definition const * const *list = NULL; - const struct definition *field; + struct bt_definition const * const *list = NULL; + const struct bt_definition *field; unsigned int count; struct perfcounter *perfcounter; GHashTableIter iter; @@ -230,7 +249,7 @@ end: void update_perf_counter(struct processtop *proc, const struct bt_ctf_event *event) { struct cputime *cpu; - const struct definition *scope; + const struct bt_definition *scope; cpu = get_cpu(get_cpu_id(event)); @@ -337,11 +356,6 @@ static int parse_options(int argc, char **argv) poptContext pc; int opt, ret = 0; - if (argc == 1) { - usage(stdout); - return 1; /* exit cleanly */ - } - pc = poptGetContext(NULL, argc, (const char **) argv, long_options, 0); poptReadDefaultConfig(pc, 0); @@ -358,10 +372,7 @@ static int parse_options(int argc, char **argv) } opt_input_path = poptGetArg(pc); - if (!opt_input_path) { - ret = -EINVAL; - goto end; - } + end: if (pc) { poptFreeContext(pc); @@ -451,7 +462,7 @@ end_iter: */ int bt_context_add_traces_recursive(struct bt_context *ctx, const char *path, const char *format_str, - void (*packet_seek)(struct stream_pos *pos, + void (*packet_seek)(struct bt_stream_pos *pos, size_t offset, int whence)) { FTS *tree; @@ -544,7 +555,7 @@ static int check_field_requirements(const struct bt_ctf_field_decl *const * fiel (*tid_check)++; } if (*pid_check == 0) { - if (strncmp(name, "tid", 3) == 0) + if (strncmp(name, "pid", 3) == 0) (*pid_check)++; } if (*ppid_check == 0) { @@ -633,6 +644,364 @@ int check_requirements(struct bt_context *ctx) return ret; } +void dump_snapshot() +{ +#if 0 + struct lttng_consumer_stream *iter; + unsigned long spos; + struct mmap_stream *new_snapshot; + + int ret = 0; + int i; + /* + * try lock mutex ressource courante (overrun) + * if fail : overrun + * stop trace (flush implicite avant stop) + * lttng_consumer_take_snapshot + * read timestamp packet end (use time as end pos) + * - stream_packet_context + * - reculer de 1 subbuf : pos - max_subbuff_size + * + * - position de fin (take_snapshot) + * - mov_pos_slow ( fin - max_subbuff_size) lire timestamp packet end + * - prend min(end) (activité sur tous les streams) + * + * start trace + * unlock mutex + */ + + helper_kernctl_buffer_flush(consumerd_metadata); + for (i = 0; i < lttng_consumer_stream_array->len; i++) { + iter = g_ptr_array_index(lttng_consumer_stream_array, i); + helper_kernctl_buffer_flush(helper_get_lttng_consumer_stream_wait_fd(iter)); + printf("Taking snapshot of fd : %d\n", helper_get_lttng_consumer_stream_wait_fd(iter)); + ret = helper_lttng_consumer_take_snapshot(ctx, iter); + if (ret != 0) { + ret = errno; + perror("lttng_consumer_take_snapshots"); + goto end; + } + } + for (i = 0; i < lttng_consumer_stream_array->len; i++) { + iter = g_ptr_array_index(lttng_consumer_stream_array, i); + ret = helper_lttng_consumer_get_produced_snapshot(ctx, iter, &spos); + if (ret != 0) { + ret = errno; + perror("helper_lttng_consumer_get_produced_snapshot"); + goto end; + } + while (helper_get_lttng_consumer_stream_wait_last_pos(iter) < spos) { + new_snapshot = g_new0(struct mmap_stream, 1); + new_snapshot->fd = helper_get_lttng_consumer_stream_wait_fd(iter); + new_snapshot->last_pos = helper_get_lttng_consumer_stream_wait_last_pos(iter); + fprintf(stderr,"ADDING AVAILABLE SNAPSHOT ON FD %d AT POSITION %lu\n", + new_snapshot->fd, + new_snapshot->last_pos); + g_ptr_array_add(available_snapshots, new_snapshot); + helper_set_lttng_consumer_stream_wait_last_pos(iter, + helper_get_lttng_consumer_stream_wait_last_pos(iter) + + helper_get_lttng_consumer_stream_chan_max_sb_size(iter)); + } + } + + if (!metadata_ready) { + fprintf(stderr, "BLOCKING BEFORE METADATA\n"); + sem_wait(&metadata_available); + fprintf(stderr,"OPENING TRACE\n"); + if (access("/tmp/livesession/kernel/metadata", F_OK) != 0) { + fprintf(stderr,"NO METADATA FILE, SKIPPING\n"); + return; + } + metadata_ready = 1; + metadata_fp = fopen("/tmp/livesession/kernel/metadata", "r"); + } + + +end: + return; +#endif +} + +ssize_t read_subbuffer(struct lttng_consumer_stream *kconsumerd_fd, + struct lttng_consumer_local_data *ctx) +{ + unsigned long len; + int err; + long ret = 0; + int infd = helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd); + + if (helper_get_lttng_consumer_stream_output(kconsumerd_fd) == LTTNG_EVENT_SPLICE) { + /* Get the next subbuffer */ + printf("get_next : %d\n", infd); + err = helper_kernctl_get_next_subbuf(infd); + if (err != 0) { + ret = errno; + perror("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + /* read the whole subbuffer */ + err = helper_kernctl_get_padded_subbuf_size(infd, &len); + if (err != 0) { + ret = errno; + perror("Getting sub-buffer len failed."); + goto end; + } + printf("len : %ld\n", len); + + /* splice the subbuffer to the tracefile */ + ret = helper_lttng_consumer_on_read_subbuffer_splice(ctx, kconsumerd_fd, len); + if (ret < 0) { + /* + * display the error but continue processing to try + * to release the subbuffer + */ + fprintf(stderr,"Error splicing to tracefile\n"); + } + printf("ret : %ld\n", ret); + printf("put_next : %d\n", infd); + err = helper_kernctl_put_next_subbuf(infd); + if (err != 0) { + ret = errno; + perror("Reserving sub buffer failed (everything is normal, " + "it is due to concurrency)"); + goto end; + } + sem_post(&metadata_available); + } + +end: + return 0; +} + +int on_update_fd(int key, uint32_t state) +{ + /* let the lib handle the metadata FD */ + if (key == sessiond_metadata) + return 0; + return 1; +} + +int on_recv_fd(struct lttng_consumer_stream *kconsumerd_fd) +{ + int ret; + struct mmap_stream *new_info; + size_t tmp_mmap_len; + + /* Opening the tracefile in write mode */ + if (helper_get_lttng_consumer_stream_path_name(kconsumerd_fd) != NULL) { + ret = open(helper_get_lttng_consumer_stream_path_name(kconsumerd_fd), + O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + if (ret < 0) { + perror("open"); + goto end; + } + helper_set_lttng_consumer_stream_out_fd(kconsumerd_fd, ret); + } + + if (helper_get_lttng_consumer_stream_output(kconsumerd_fd) == LTTNG_EVENT_MMAP) { + new_info = malloc(sizeof(struct mmap_stream)); + new_info->fd = helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd); + bt_list_add(&new_info->list, &mmap_list.head); + + /* get the len of the mmap region */ + ret = helper_kernctl_get_mmap_len(helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd), + &tmp_mmap_len); + if (ret != 0) { + ret = errno; + perror("helper_kernctl_get_mmap_len"); + goto end; + } + helper_set_lttng_consumer_stream_mmap_len(kconsumerd_fd, tmp_mmap_len); + + helper_set_lttng_consumer_stream_mmap_base(kconsumerd_fd, + mmap(NULL, helper_get_lttng_consumer_stream_mmap_len(kconsumerd_fd), + PROT_READ, MAP_PRIVATE, helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd), 0)); + if (helper_get_lttng_consumer_stream_mmap_base(kconsumerd_fd) == MAP_FAILED) { + perror("Error mmaping"); + ret = -1; + goto end; + } + + g_ptr_array_add(lttng_consumer_stream_array, kconsumerd_fd); + ret = 0; + } else { + consumerd_metadata = helper_get_lttng_consumer_stream_wait_fd(kconsumerd_fd); + sessiond_metadata = helper_get_lttng_consumer_stream_key(kconsumerd_fd); + ret = 0; + } + +end: + return ret; +} + +void *live_consume() +{ + struct bt_context *bt_ctx = NULL; + int ret; + + while (1) { +// dump_snapshot(); + + if (!metadata_ready) { + fprintf(stderr, "BLOCKING BEFORE METADATA\n"); + sem_wait(&metadata_available); + fprintf(stderr,"OPENING TRACE\n"); + if (access("/tmp/livesession/kernel/metadata", F_OK) != 0) { + fprintf(stderr,"NO METADATA FILE, SKIPPING\n"); + return; + } + metadata_ready = 1; + metadata_fp = fopen("/tmp/livesession/kernel/metadata", "r"); + } + + if (!trace_opened) { + bt_ctx = bt_context_create(); + ret = bt_context_add_trace(bt_ctx, NULL, "ctf", + lttngtop_ctf_packet_seek, &mmap_list, metadata_fp); + trace_opened = 1; + } + iter_trace(bt_ctx); + sleep(1); + } +} + +int setup_consumer(char *command_sock_path, pthread_t *threads, + struct lttng_consumer_local_data *ctx) +{ + int ret = 0; + + ctx = helper_lttng_consumer_create(HELPER_LTTNG_CONSUMER_KERNEL, + read_subbuffer, NULL, on_recv_fd, on_update_fd); + if (!ctx) + goto end; + + unlink(command_sock_path); + helper_lttng_consumer_set_command_sock_path(ctx, command_sock_path); + helper_lttng_consumer_init(); + + /* Create the thread to manage the receive of fd */ + ret = pthread_create(&threads[0], NULL, helper_lttng_consumer_thread_receive_fds, + (void *) ctx); + if (ret != 0) { + perror("pthread_create receive fd"); + goto end; + } + /* Create thread to manage the polling/writing of traces */ + ret = pthread_create(&threads[1], NULL, helper_lttng_consumer_thread_poll_fds, + (void *) ctx); + if (ret != 0) { + perror("pthread_create poll fd"); + goto end; + } + +end: + return ret; +} + +void *setup_live_tracing() +{ + struct lttng_domain dom; + struct lttng_channel chan; + char *channel_name = "mmapchan"; + struct lttng_event ev; + int ret = 0; + char *command_sock_path = "/tmp/consumerd_sock"; + static pthread_t threads[2]; /* recv_fd, poll */ + struct lttng_event_context kctxpid, kctxcomm, kctxppid, kctxtid; + + struct lttng_handle *handle; + + BT_INIT_LIST_HEAD(&mmap_list.head); + + lttng_consumer_stream_array = g_ptr_array_new(); + + if ((ret = setup_consumer(command_sock_path, threads, ctx)) < 0) { + fprintf(stderr,"error setting up consumer\n"); + goto end; + } + + available_snapshots = g_ptr_array_new(); + + /* setup the session */ + dom.type = LTTNG_DOMAIN_KERNEL; + + ret = system("rm -rf /tmp/livesession"); + + if ((ret = lttng_create_session("test", "/tmp/livesession")) < 0) { + fprintf(stderr,"error creating the session : %s\n", + helper_lttcomm_get_readable_code(ret)); + goto end; + } + + if ((handle = lttng_create_handle("test", &dom)) == NULL) { + fprintf(stderr,"error creating handle\n"); + goto end; + } + + if ((ret = lttng_register_consumer(handle, command_sock_path)) < 0) { + fprintf(stderr,"error registering consumer : %s\n", + helper_lttcomm_get_readable_code(ret)); + goto end; + } + + strcpy(chan.name, channel_name); + chan.attr.overwrite = 0; + chan.attr.subbuf_size = 32768; +// chan.attr.subbuf_size = 1048576; /* 1MB */ + chan.attr.num_subbuf = 4; + chan.attr.switch_timer_interval = 0; + chan.attr.read_timer_interval = 200; + chan.attr.output = LTTNG_EVENT_MMAP; + + if ((ret = lttng_enable_channel(handle, &chan)) < 0) { + fprintf(stderr,"error creating channel : %s\n", helper_lttcomm_get_readable_code(ret)); + goto end; + } + + sprintf(ev.name, "sched_switch"); + ev.type = LTTNG_EVENT_TRACEPOINT; + + //if ((ret = lttng_enable_event(handle, NULL, channel_name)) < 0) { + if ((ret = lttng_enable_event(handle, &ev, channel_name)) < 0) { + fprintf(stderr,"error enabling event : %s\n", helper_lttcomm_get_readable_code(ret)); + goto end; + } + + kctxpid.ctx = LTTNG_EVENT_CONTEXT_PID; + lttng_add_context(handle, &kctxpid, NULL, NULL); + kctxppid.ctx = LTTNG_EVENT_CONTEXT_PPID; + lttng_add_context(handle, &kctxppid, NULL, NULL); + kctxcomm.ctx = LTTNG_EVENT_CONTEXT_PROCNAME; + lttng_add_context(handle, &kctxcomm, NULL, NULL); + kctxtid.ctx = LTTNG_EVENT_CONTEXT_TID; + lttng_add_context(handle, &kctxtid, NULL, NULL); + + if ((ret = lttng_start_tracing("test")) < 0) { + fprintf(stderr,"error starting tracing : %s\n", helper_lttcomm_get_readable_code(ret)); + goto end; + } + + helper_kernctl_buffer_flush(consumerd_metadata); + + /* Create thread to manage the polling/writing of traces */ + ret = pthread_create(&thread_live_consume, NULL, live_consume, NULL); + if (ret != 0) { + perror("pthread_create"); + goto end; + } + +// pthread_cancel(live_trace_thread); + + /* block until metadata is ready */ + sem_init(&metadata_available, 0, 0); + + //init_lttngtop(); + +end: + return ret; +} + int main(int argc, char **argv) { int ret; @@ -647,31 +1016,46 @@ int main(int argc, char **argv) exit(EXIT_SUCCESS); } - init_lttngtop(); - - bt_ctx = bt_context_create(); - ret = bt_context_add_traces_recursive(bt_ctx, opt_input_path, "ctf", NULL); - if (ret < 0) { - fprintf(stderr, "[error] Opening the trace\n"); - goto end; - } - - ret = check_requirements(bt_ctx); - if (ret < 0) { - fprintf(stderr, "[error] some mandatory contexts were missing, exiting.\n"); + if (!opt_input_path) { + printf("live tracing enabled\n"); + pthread_create(&live_trace_thread, NULL, setup_live_tracing, (void *) NULL); + sleep(20); + printf("STOPPING\n"); + lttng_stop_tracing("test"); + printf("DESTROYING\n"); + lttng_destroy_session("test"); + + printf("CANCELLING\n"); + pthread_cancel(live_trace_thread); goto end; - } + } else { + init_lttngtop(); + + bt_ctx = bt_context_create(); + ret = bt_context_add_traces_recursive(bt_ctx, opt_input_path, "ctf", NULL); + if (ret < 0) { + fprintf(stderr, "[error] Opening the trace\n"); + goto end; + } - pthread_create(&display_thread, NULL, ncurses_display, (void *) NULL); - pthread_create(&timer_thread, NULL, refresh_thread, (void *) NULL); + ret = check_requirements(bt_ctx); + if (ret < 0) { + fprintf(stderr, "[error] some mandatory contexts were missing, exiting.\n"); + goto end; + } + pthread_create(&display_thread, NULL, ncurses_display, (void *) NULL); + pthread_create(&timer_thread, NULL, refresh_thread, (void *) NULL); - iter_trace(bt_ctx); + iter_trace(bt_ctx); - quit = 1; - pthread_join(display_thread, NULL); - pthread_join(timer_thread, NULL); + quit = 1; + pthread_join(display_thread, NULL); + pthread_join(timer_thread, NULL); + } end: - bt_context_put(bt_ctx); + if (bt_ctx) + bt_context_put(bt_ctx); + return 0; }