X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=libtracectl%2Ftracectl.c;h=c407be97c13f22f4e99f1a36a7a508924eda5ef4;hb=688760ef257bee85e3841e0d27ecf4e2f921ef00;hp=3c5ff859c7f5f9b15139e86ca11e2f3c4b2d3184;hpb=d0b5f2b948ed0da3f8acd10817f85f5200749121;p=ust.git diff --git a/libtracectl/tracectl.c b/libtracectl/tracectl.c index 3c5ff85..c407be9 100644 --- a/libtracectl/tracectl.c +++ b/libtracectl/tracectl.c @@ -12,6 +12,8 @@ #include "localerr.h" #include "ustcomm.h" +//#define USE_CLONE + #define UNIX_PATH_MAX 108 #define SOCKETDIR "/tmp/socks" @@ -45,7 +47,6 @@ struct trctl_msg { char payload[94]; }; -pid_t mypid; char mysocketfile[UNIX_PATH_MAX] = ""; //int pfd = -1; @@ -87,7 +88,7 @@ int consumer(void *arg) consumer_channels[i].chan = chan; snprintf(tmp, sizeof(tmp), "trace/%s_0", chan->channel_name); - result = consumer_channels[i].fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00644); + result = consumer_channels[i].fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600); if(result == -1) { perror("open"); return -1; @@ -120,54 +121,22 @@ int consumer(void *arg) sleep(1); } - -// CPRINTF("consumer: got a trace: %s with %d channels\n", trace_name, trace->nr_channels); -// -// struct ltt_channel_struct *chan = &trace->channels[0]; -// -// CPRINTF("channel 1 (%s) active=%u", chan->channel_name, chan->active & 1); - -// struct rchan *rchan = chan->trans_channel_data; -// struct rchan_buf *rbuf = rchan->buf; -// struct ltt_channel_buf_struct *lttbuf = chan->buf; -// long consumed_old; -// -// result = fd = open("trace.out", O_WRONLY | O_CREAT | O_TRUNC, 00644); -// if(result == -1) { -// perror("open"); -// return -1; -// } - -// for(;;) { -// write(STDOUT_FILENO, str, sizeof(str)); -// -// result = ltt_do_get_subbuf(rbuf, lttbuf, &consumed_old); -// if(result < 0) { -// CPRINTF("ltt_do_get_subbuf: error: %s", strerror(-result)); -// } -// else { -// CPRINTF("success!"); -// -// result = write(fd, rbuf->buf_data + (consumed_old & (2 * 4096-1)), 4096); -// ltt_do_put_subbuf(rbuf, lttbuf, consumed_old); -// } -// -// //CPRINTF("There seems to be %ld bytes available", SUBBUF_TRUNC(local_read(<tbuf->offset), rbuf->chan) - consumed_old); -// CPRINTF("Commit count %ld", local_read(<tbuf->commit_count[0])); -// -// -// sleep(1); -// } } void start_consumer(void) { +#ifdef USE_CLONE int result; result = clone(consumer, consumer_stack+sizeof(consumer_stack)-1, CLONE_FS | CLONE_FILES | CLONE_VM | CLONE_SIGHAND | CLONE_THREAD, NULL); if(result == -1) { perror("clone"); } +#else + pthread_t thread; + + pthread_create(&thread, NULL, consumer, NULL); +#endif } static void print_markers(void) @@ -219,12 +188,16 @@ void notif_cb(void) static int inform_consumer_daemon(void) { + ustcomm_request_consumer(getpid(), "metadata"); + ustcomm_request_consumer(getpid(), "ust"); } int listener_main(void *p) { int result; + DBG("LISTENER"); + for(;;) { uint32_t size; struct sockaddr_un addr; @@ -233,18 +206,16 @@ int listener_main(void *p) char trace_type[] = "ustrelay"; char *recvbuf; int len; + struct ustcomm_source src; - result = ustcomm_app_recv_message(&ustcomm_app, &recvbuf); - if(result) { + result = ustcomm_app_recv_message(&ustcomm_app, &recvbuf, &src, -1); + if(result <= 0) { WARN("error in ustcomm_app_recv_message"); continue; } DBG("received a message! it's: %s\n", recvbuf); len = strlen(recvbuf); - if(len && recvbuf[len-1] == '\n') { - recvbuf[len-1] = '\0'; - } if(!strcmp(recvbuf, "print_markers")) { print_markers(); @@ -301,20 +272,297 @@ int listener_main(void *p) return; } } + else if(nth_token_is(recvbuf, "get_shmid", 0) == 1) { + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_name; + + DBG("get_shmid"); + + channel_name = nth_token(recvbuf, 1); + if(channel_name == NULL) { + ERR("get_shmid: cannot parse channel"); + goto next_cmd; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + CPRINTF("cannot find trace!"); + return 1; + } + + for(i=0; inr_channels; i++) { + struct rchan *rchan = trace->channels[i].trans_channel_data; + struct rchan_buf *rbuf = rchan->buf; + + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + char *reply; + + DBG("the shmid for the requested channel is %d", rbuf->shmid); + asprintf(&reply, "%d", rbuf->shmid); + + result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); + if(result) { + ERR("listener: get_shmid: ustcomm_send_reply failed"); + goto next_cmd; + } + + free(reply); + + break; + } + } + } + else if(nth_token_is(recvbuf, "get_n_subbufs", 0) == 1) { + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_name; + + DBG("get_n_subbufs"); + + channel_name = nth_token(recvbuf, 1); + if(channel_name == NULL) { + ERR("get_n_subbufs: cannot parse channel"); + goto next_cmd; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + CPRINTF("cannot find trace!"); + return 1; + } + + for(i=0; inr_channels; i++) { + struct rchan *rchan = trace->channels[i].trans_channel_data; + + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + char *reply; + + DBG("the n_subbufs for the requested channel is %d", rchan->n_subbufs); + asprintf(&reply, "%d", rchan->n_subbufs); + + result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); + if(result) { + ERR("listener: get_n_subbufs: ustcomm_send_reply failed"); + goto next_cmd; + } + + free(reply); + + break; + } + } + } + else if(nth_token_is(recvbuf, "get_subbuf_size", 0) == 1) { + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_name; + + DBG("get_subbuf_size"); + + channel_name = nth_token(recvbuf, 1); + if(channel_name == NULL) { + ERR("get_subbuf_size: cannot parse channel"); + goto next_cmd; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + CPRINTF("cannot find trace!"); + return 1; + } + + for(i=0; inr_channels; i++) { + struct rchan *rchan = trace->channels[i].trans_channel_data; + + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + char *reply; + + DBG("the subbuf_size for the requested channel is %d", rchan->subbuf_size); + asprintf(&reply, "%d", rchan->subbuf_size); + + result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); + if(result) { + ERR("listener: get_subbuf_size: ustcomm_send_reply failed"); + goto next_cmd; + } + + free(reply); + + break; + } + } + } + else if(nth_token_is(recvbuf, "load_probe_lib", 0) == 1) { + char *libfile; + + libfile = nth_token(recvbuf, 1); + + DBG("load_probe_lib loading %s", libfile); + } + else if(nth_token_is(recvbuf, "get_subbuffer", 0) == 1) { + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_name; + + DBG("get_subbuf"); + + channel_name = nth_token(recvbuf, 1); + if(channel_name == NULL) { + ERR("get_subbuf: cannot parse channel"); + goto next_cmd; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + CPRINTF("cannot find trace!"); + return 1; + } + + for(i=0; inr_channels; i++) { + struct rchan *rchan = trace->channels[i].trans_channel_data; + + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + struct rchan_buf *rbuf = rchan->buf; + struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf; + char *reply; + long consumed_old=0; + + result = ltt_do_get_subbuf(rbuf, lttbuf, &consumed_old); + if(result < 0) { + DBG("ltt_do_get_subbuf: error: %s", strerror(-result)); + asprintf(&reply, "%s %ld", "UNAVAIL", 0); + } + else { + DBG("ltt_do_get_subbuf: success"); + asprintf(&reply, "%s %ld", "OK", consumed_old); + } + + result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); + if(result) { + ERR("listener: get_subbuf: ustcomm_send_reply failed"); + goto next_cmd; + } + + free(reply); + + break; + } + } + } + else if(nth_token_is(recvbuf, "put_subbuffer", 0) == 1) { + struct ltt_trace_struct *trace; + char trace_name[] = "auto"; + int i; + char *channel_name; + long consumed_old; + char *consumed_old_str; + char *endptr; + + DBG("put_subbuf"); + + channel_name = strdup_malloc(nth_token(recvbuf, 1)); + if(channel_name == NULL) { + ERR("put_subbuf_size: cannot parse channel"); + goto next_cmd; + } + consumed_old_str = strdup_malloc(nth_token(recvbuf, 2)); + if(consumed_old_str == NULL) { + ERR("put_subbuf: cannot parse consumed_old"); + goto next_cmd; + } + consumed_old = strtol(consumed_old_str, &endptr, 10); + if(*endptr != '\0') { + ERR("put_subbuf: invalid value for consumed_old"); + goto next_cmd; + } + + ltt_lock_traces(); + trace = _ltt_trace_find(trace_name); + ltt_unlock_traces(); + + if(trace == NULL) { + CPRINTF("cannot find trace!"); + return 1; + } + + for(i=0; inr_channels; i++) { + struct rchan *rchan = trace->channels[i].trans_channel_data; + + if(!strcmp(trace->channels[i].channel_name, channel_name)) { + struct rchan_buf *rbuf = rchan->buf; + struct ltt_channel_buf_struct *lttbuf = trace->channels[i].buf; + char *reply; + long consumed_old=0; + + result = ltt_do_put_subbuf(rbuf, lttbuf, consumed_old); + if(result < 0) { + WARN("ltt_do_put_subbuf: error"); + } + else { + DBG("ltt_do_put_subbuf: success"); + } + asprintf(&reply, "%s", "OK", consumed_old); + + result = ustcomm_send_reply(&ustcomm_app.server, reply, &src); + if(result) { + ERR("listener: put_subbuf: ustcomm_send_reply failed"); + goto next_cmd; + } + + free(reply); + + break; + } + } + + free(channel_name); + free(consumed_old_str); + } + else { + ERR("unable to parse message: %s", recvbuf); + } + + next_cmd: free(recvbuf); } } +static char listener_stack[16384]; + void create_listener(void) { int result; static char listener_stack[16384]; + //char *listener_stack = malloc(16384); +#ifdef USE_CLONE result = clone(listener_main, listener_stack+sizeof(listener_stack)-1, CLONE_FS | CLONE_FILES | CLONE_VM | CLONE_SIGHAND | CLONE_THREAD, NULL); if(result == -1) { perror("clone"); } +#else + pthread_t thread; + + pthread_create(&thread, NULL, listener_main, NULL); +#endif } /* The signal handler itself. Signals must be setup so there cannot be @@ -414,7 +662,18 @@ static void __attribute__((constructor(1000))) init() DBG("UST_TRACE constructor"); - mypid = getpid(); + /* Must create socket before signal handler to prevent races. + */ + result = init_socket(); + if(result == -1) { + ERR("init_socket error"); + return; + } + result = init_signal_handler(); + if(result == -1) { + ERR("init_signal_handler error"); + return; + } if(getenv("UST_TRACE")) { char trace_name[] = "auto"; @@ -457,22 +716,10 @@ static void __attribute__((constructor(1000))) init() ERR("ltt_trace_start failed"); return; } - start_consumer(); + //start_consumer(); + inform_consumer_daemon(); } - /* Must create socket before signal handler to prevent races - * on pfd variable. - */ - result = init_socket(); - if(result == -1) { - ERR("init_socket error"); - return; - } - result = init_signal_handler(); - if(result == -1) { - ERR("init_signal_handler error"); - return; - } return; @@ -502,7 +749,8 @@ static void __attribute__((destructor)) fini() } /* FIXME: wait for the consumer to be done */ - sleep(1); + DBG("waiting 5 sec for consume"); + sleep(5); destroy_socket(); }