1 /* Copyright (C) 2009 Pierre-Marc Fournier
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 #include <sys/epoll.h>
23 #include <sys/types.h>
40 #define GET_SUBBUF_OK 1
41 #define GET_SUBBUF_DONE 0
42 #define GET_SUBBUF_DIED 2
44 #define PUT_SUBBUF_OK 1
45 #define PUT_SUBBUF_DIED 0
46 #define PUT_SUBBUF_PUSHED 2
47 #define PUT_SUBBUF_DONE 3
49 #define UNIX_PATH_MAX 108
51 static int get_subbuffer(struct buffer_info
*buf
)
53 struct ustcomm_header _send_hdr
, *send_hdr
;
54 struct ustcomm_header _recv_hdr
, *recv_hdr
;
55 struct ustcomm_buffer_info _send_msg
, _recv_msg
;
56 struct ustcomm_buffer_info
*send_msg
, *recv_msg
;
59 send_hdr
= &_send_hdr
;
60 recv_hdr
= &_recv_hdr
;
61 send_msg
= &_send_msg
;
62 recv_msg
= &_recv_msg
;
64 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
,
65 buf
->channel
, buf
->channel_cpu
);
70 send_hdr
->command
= GET_SUBBUFFER
;
72 result
= ustcomm_req(buf
->app_sock
, send_hdr
, (char *)send_msg
,
73 recv_hdr
, (char *)recv_msg
);
74 if ((result
< 0 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
76 DBG("app died while being traced");
77 return GET_SUBBUF_DIED
;
78 } else if (result
< 0) {
79 ERR("get_subbuffer: ustcomm_req failed");
83 if (!recv_hdr
->result
) {
84 DBG("got subbuffer %s", buf
->name
);
85 buf
->consumed_old
= recv_msg
->consumed_old
;
87 } else if (recv_hdr
->result
== -ENODATA
) {
88 DBG("For buffer %s, the trace was not found. This likely means"
89 " it was destroyed by the user.", buf
->name
);
90 return GET_SUBBUF_DIED
;
93 DBG("error getting subbuffer %s", buf
->name
);
94 return recv_hdr
->result
;
97 static int put_subbuffer(struct buffer_info
*buf
)
99 struct ustcomm_header _send_hdr
, *send_hdr
;
100 struct ustcomm_header _recv_hdr
, *recv_hdr
;
101 struct ustcomm_buffer_info _send_msg
, *send_msg
;
104 send_hdr
= &_send_hdr
;
105 recv_hdr
= &_recv_hdr
;
106 send_msg
= &_send_msg
;
108 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
,
109 buf
->channel
, buf
->channel_cpu
);
114 send_hdr
->command
= PUT_SUBBUFFER
;
115 send_msg
->consumed_old
= buf
->consumed_old
;
117 result
= ustcomm_req(buf
->app_sock
, send_hdr
, (char *)send_msg
,
119 if ((result
< 0 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
121 DBG("app died while being traced");
122 return PUT_SUBBUF_DIED
;
123 } else if (result
< 0) {
124 ERR("put_subbuffer: ustcomm_req failed");
128 if (!recv_hdr
->result
) {
129 DBG("put subbuffer %s", buf
->name
);
130 return PUT_SUBBUF_OK
;
131 } else if (recv_hdr
->result
== -ENODATA
) {
132 DBG("For buffer %s, the trace was not found. This likely means"
133 " it was destroyed by the user.", buf
->name
);
134 return PUT_SUBBUF_DIED
;
137 DBG("error getting subbuffer %s", buf
->name
);
138 return recv_hdr
->result
;
141 void decrement_active_buffers(void *arg
)
143 struct libustd_instance
*instance
= arg
;
144 pthread_mutex_lock(&instance
->mutex
);
145 instance
->active_buffers
--;
146 pthread_mutex_unlock(&instance
->mutex
);
149 static int get_pidunique(int sock
, s64
*pidunique
)
151 struct ustcomm_header _send_hdr
, *send_hdr
;
152 struct ustcomm_header _recv_hdr
, *recv_hdr
;
153 struct ustcomm_pidunique _recv_msg
, *recv_msg
;
156 send_hdr
= &_send_hdr
;
157 recv_hdr
= &_recv_hdr
;
158 recv_msg
= &_recv_msg
;
160 memset(send_hdr
, 0, sizeof(*send_hdr
));
162 send_hdr
->command
= GET_PIDUNIQUE
;
163 result
= ustcomm_req(sock
, send_hdr
, NULL
, recv_hdr
, (char *)recv_msg
);
167 if (recv_hdr
->result
< 0) {
168 ERR("App responded with error: %s", strerror(recv_hdr
->result
));
169 return recv_hdr
->result
;
172 *pidunique
= recv_msg
->pidunique
;
177 static int get_buf_shmid_pipe_fd(int sock
, struct buffer_info
*buf
,
178 int *buf_shmid
, int *buf_struct_shmid
,
181 struct ustcomm_header _send_hdr
, *send_hdr
;
182 struct ustcomm_header _recv_hdr
, *recv_hdr
;
183 struct ustcomm_buffer_info _send_msg
, *send_msg
;
184 struct ustcomm_buffer_info _recv_msg
, *recv_msg
;
185 int result
, recv_pipe_fd
;
187 send_hdr
= &_send_hdr
;
188 recv_hdr
= &_recv_hdr
;
189 send_msg
= &_send_msg
;
190 recv_msg
= &_recv_msg
;
192 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
,
193 buf
->channel
, buf
->channel_cpu
);
195 ERR("Failed to pack buffer info");
199 send_hdr
->command
= GET_BUF_SHMID_PIPE_FD
;
201 result
= ustcomm_send(sock
, send_hdr
, (char *)send_msg
);
203 ERR("Failed to send request");
206 result
= ustcomm_recv_fd(sock
, recv_hdr
, (char *)recv_msg
, &recv_pipe_fd
);
208 ERR("Failed to receive message and fd");
211 if (recv_hdr
->result
< 0) {
212 ERR("App responded with error %s", strerror(recv_hdr
->result
));
213 return recv_hdr
->result
;
216 *buf_shmid
= recv_msg
->buf_shmid
;
217 *buf_struct_shmid
= recv_msg
->buf_struct_shmid
;
218 *buf_pipe_fd
= recv_pipe_fd
;
223 static int get_subbuf_num_size(int sock
, struct buffer_info
*buf
,
224 int *subbuf_num
, int *subbuf_size
)
226 struct ustcomm_header _send_hdr
, *send_hdr
;
227 struct ustcomm_header _recv_hdr
, *recv_hdr
;
228 struct ustcomm_channel_info _send_msg
, *send_msg
;
229 struct ustcomm_channel_info _recv_msg
, *recv_msg
;
232 send_hdr
= &_send_hdr
;
233 recv_hdr
= &_recv_hdr
;
234 send_msg
= &_send_msg
;
235 recv_msg
= &_recv_msg
;
237 result
= ustcomm_pack_channel_info(send_hdr
, send_msg
,
243 send_hdr
->command
= GET_SUBBUF_NUM_SIZE
;
245 result
= ustcomm_req(sock
, send_hdr
, (char *)send_msg
,
246 recv_hdr
, (char *)recv_msg
);
251 *subbuf_num
= recv_msg
->subbuf_num
;
252 *subbuf_size
= recv_msg
->subbuf_size
;
254 return recv_hdr
->result
;
258 static int notify_buffer_mapped(int sock
, struct buffer_info
*buf
)
260 struct ustcomm_header _send_hdr
, *send_hdr
;
261 struct ustcomm_header _recv_hdr
, *recv_hdr
;
262 struct ustcomm_buffer_info _send_msg
, *send_msg
;
265 send_hdr
= &_send_hdr
;
266 recv_hdr
= &_recv_hdr
;
267 send_msg
= &_send_msg
;
269 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
,
270 buf
->channel
, buf
->channel_cpu
);
275 send_hdr
->command
= NOTIFY_BUF_MAPPED
;
277 result
= ustcomm_req(sock
, send_hdr
, (char *)send_msg
,
283 return recv_hdr
->result
;
287 struct buffer_info
*connect_buffer(struct libustd_instance
*instance
, pid_t pid
,
288 const char *channel
, int channel_cpu
)
290 struct buffer_info
*buf
;
292 struct shmid_ds shmds
;
294 buf
= (struct buffer_info
*) zmalloc(sizeof(struct buffer_info
));
296 ERR("add_buffer: insufficient memory");
300 buf
->channel
= strdup(channel
);
305 result
= asprintf(&buf
->name
, "%s_%d", channel
, channel_cpu
);
306 if (result
< 0 || buf
->name
== NULL
) {
307 goto free_buf_channel
;
310 buf
->channel_cpu
= channel_cpu
;
313 result
= ustcomm_connect_app(buf
->pid
, &buf
->app_sock
);
315 WARN("unable to connect to process, it probably died before we were able to connect");
320 result
= get_pidunique(buf
->app_sock
, &buf
->pidunique
);
322 ERR("Failed to get pidunique");
326 /* get shmid and pipe fd */
327 result
= get_buf_shmid_pipe_fd(buf
->app_sock
, buf
, &buf
->shmid
,
328 &buf
->bufstruct_shmid
, &buf
->pipe_fd
);
330 ERR("Failed to get buf_shmid and pipe_fd");
334 fstat(buf
->pipe_fd
, &temp
);
335 if (!S_ISFIFO(temp
.st_mode
)) {
336 ERR("Didn't receive a fifo from the app");
342 /* get number of subbufs and subbuf size */
343 result
= get_subbuf_num_size(buf
->app_sock
, buf
, &buf
->n_subbufs
,
346 ERR("Failed to get subbuf number and size");
351 buf
->mem
= shmat(buf
->shmid
, NULL
, 0);
352 if(buf
->mem
== (void *) 0) {
356 DBG("successfully attached buffer memory");
358 buf
->bufstruct_mem
= shmat(buf
->bufstruct_shmid
, NULL
, 0);
359 if(buf
->bufstruct_mem
== (void *) 0) {
363 DBG("successfully attached buffer bufstruct memory");
365 /* obtain info on the memory segment */
366 result
= shmctl(buf
->shmid
, IPC_STAT
, &shmds
);
369 goto shmdt_bufstruct_mem
;
371 buf
->memlen
= shmds
.shm_segsz
;
373 /* Notify the application that we have mapped the buffer */
374 result
= notify_buffer_mapped(buf
->app_sock
, buf
);
376 goto shmdt_bufstruct_mem
;
379 if(instance
->callbacks
->on_open_buffer
)
380 instance
->callbacks
->on_open_buffer(instance
->callbacks
, buf
);
382 pthread_mutex_lock(&instance
->mutex
);
383 instance
->active_buffers
++;
384 pthread_mutex_unlock(&instance
->mutex
);
389 shmdt(buf
->bufstruct_mem
);
398 close(buf
->app_sock
);
411 static void destroy_buffer(struct libustd_callbacks
*callbacks
,
412 struct buffer_info
*buf
)
416 result
= close(buf
->app_sock
);
418 WARN("problem calling ustcomm_close_app");
421 result
= shmdt(buf
->mem
);
426 result
= shmdt(buf
->bufstruct_mem
);
431 if(callbacks
->on_close_buffer
)
432 callbacks
->on_close_buffer(callbacks
, buf
);
437 int consumer_loop(struct libustd_instance
*instance
, struct buffer_info
*buf
)
439 int result
, read_result
;
442 pthread_cleanup_push(decrement_active_buffers
, instance
);
445 read_result
= read(buf
->pipe_fd
, &read_buf
, 1);
446 /* get the subbuffer */
447 if (read_result
== 1) {
448 result
= get_subbuffer(buf
);
450 ERR("error getting subbuffer");
452 } else if (result
== GET_SUBBUF_DIED
) {
453 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
456 } else if ((read_result
== -1 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
458 DBG("App died while being traced");
459 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
463 if(instance
->callbacks
->on_read_subbuffer
)
464 instance
->callbacks
->on_read_subbuffer(instance
->callbacks
, buf
);
466 /* put the subbuffer */
467 result
= put_subbuffer(buf
);
469 ERR("unknown error putting subbuffer (channel=%s)", buf
->name
);
472 else if(result
== PUT_SUBBUF_PUSHED
) {
473 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf
->name
);
476 else if(result
== PUT_SUBBUF_DIED
) {
477 DBG("application died while putting subbuffer");
478 /* Skip the first subbuffer. We are not sure it is trustable
479 * because the put_subbuffer() did not complete.
481 if(instance
->callbacks
->on_put_error
)
482 instance
->callbacks
->on_put_error(instance
->callbacks
, buf
);
484 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
487 else if(result
== PUT_SUBBUF_DONE
) {
488 /* Done with this subbuffer */
489 /* FIXME: add a case where this branch is used? Upon
490 * normal trace termination, at put_subbuf time, a
491 * special last-subbuffer code could be returned by
496 else if(result
== PUT_SUBBUF_OK
) {
500 DBG("thread for buffer %s is stopping", buf
->name
);
502 /* FIXME: destroy, unalloc... */
504 pthread_cleanup_pop(1);
509 struct consumer_thread_args
{
513 struct libustd_instance
*instance
;
516 void *consumer_thread(void *arg
)
518 struct buffer_info
*buf
;
519 struct consumer_thread_args
*args
= (struct consumer_thread_args
*) arg
;
523 if(args
->instance
->callbacks
->on_new_thread
)
524 args
->instance
->callbacks
->on_new_thread(args
->instance
->callbacks
);
526 /* Block signals that should be handled by the main thread. */
527 result
= sigemptyset(&sigset
);
529 PERROR("sigemptyset");
532 result
= sigaddset(&sigset
, SIGTERM
);
537 result
= sigaddset(&sigset
, SIGINT
);
542 result
= sigprocmask(SIG_BLOCK
, &sigset
, NULL
);
544 PERROR("sigprocmask");
548 buf
= connect_buffer(args
->instance
, args
->pid
,
549 args
->channel
, args
->channel_cpu
);
551 ERR("failed to connect to buffer");
555 consumer_loop(args
->instance
, buf
);
557 destroy_buffer(args
->instance
->callbacks
, buf
);
561 if(args
->instance
->callbacks
->on_close_thread
)
562 args
->instance
->callbacks
->on_close_thread(args
->instance
->callbacks
);
564 free((void *)args
->channel
);
569 int start_consuming_buffer(struct libustd_instance
*instance
, pid_t pid
,
570 const char *channel
, int channel_cpu
)
573 struct consumer_thread_args
*args
;
576 DBG("beginning of start_consuming_buffer: args: pid %d bufname %s_%d", pid
, channel
,
579 args
= (struct consumer_thread_args
*) zmalloc(sizeof(struct consumer_thread_args
));
585 args
->channel
= strdup(channel
);
586 args
->channel_cpu
= channel_cpu
;
587 args
->instance
= instance
;
588 DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s_%d",
589 args
->pid
, args
->channel
, args
->channel_cpu
);
591 result
= pthread_create(&thr
, NULL
, consumer_thread
, args
);
593 ERR("pthread_create failed");
596 result
= pthread_detach(thr
);
598 ERR("pthread_detach failed");
601 DBG("end of start_consuming_buffer: args: pid %d bufname %s_%d",
602 args
->pid
, args
->channel
, args
->channel_cpu
);
606 static void process_client_cmd(int sock
, struct ustcomm_header
*req_header
,
607 char *recvbuf
, struct libustd_instance
*instance
)
610 struct ustcomm_header _res_header
;
611 struct ustcomm_header
*res_header
= &_res_header
;
612 struct ustcomm_buffer_info
*buf_inf
;
614 DBG("Processing client command");
616 switch (req_header
->command
) {
619 buf_inf
= (struct ustcomm_buffer_info
*)recvbuf
;
620 result
= ustcomm_unpack_buffer_info(buf_inf
);
622 ERR("Couldn't unpack buffer info");
626 DBG("Going to consume buffer %s_%d in process %d",
627 buf_inf
->channel
, buf_inf
->ch_cpu
, buf_inf
->pid
);
628 result
= start_consuming_buffer(instance
, buf_inf
->pid
,
632 ERR("error in add_buffer");
636 res_header
->result
= 0;
639 res_header
->result
= 0;
640 /* Only there to force poll to return */
643 res_header
->result
= -EINVAL
;
644 WARN("unknown command: %d", req_header
->command
);
647 if (ustcomm_send(sock
, res_header
, NULL
) <= 0) {
648 ERR("couldn't send command response");
652 #define MAX_EVENTS 10
654 int libustd_start_instance(struct libustd_instance
*instance
)
656 struct ustcomm_header recv_hdr
;
657 char recv_buf
[USTCOMM_BUFFER_SIZE
];
658 struct ustcomm_sock
*epoll_sock
;
659 struct epoll_event events
[MAX_EVENTS
];
660 struct sockaddr addr
;
661 int result
, epoll_fd
, accept_fd
, nfds
, i
, addr_size
, timeout
;
663 if(!instance
->is_init
) {
664 ERR("libustd instance not initialized");
667 epoll_fd
= instance
->epoll_fd
;
673 nfds
= epoll_wait(epoll_fd
, events
, MAX_EVENTS
, timeout
);
674 if (nfds
== -1 && errno
== EINTR
) {
676 } else if (nfds
== -1) {
677 PERROR("libustd_start_instance: epoll_wait failed");
681 for (i
= 0; i
< nfds
; ++i
) {
682 epoll_sock
= (struct ustcomm_sock
*)events
[i
].data
.ptr
;
683 if (epoll_sock
== instance
->listen_sock
) {
684 addr_size
= sizeof(struct sockaddr
);
685 accept_fd
= accept(epoll_sock
->fd
,
687 (socklen_t
*)&addr_size
);
688 if (accept_fd
== -1) {
689 PERROR("libustd_start_instance: "
693 ustcomm_init_sock(accept_fd
, epoll_fd
,
694 &instance
->connections
);
696 result
= ustcomm_recv(epoll_sock
->fd
, &recv_hdr
,
699 ustcomm_del_sock(epoll_sock
, 0);
701 process_client_cmd(epoll_sock
->fd
,
709 if (instance
->quit_program
) {
710 pthread_mutex_lock(&instance
->mutex
);
711 if(instance
->active_buffers
== 0) {
712 pthread_mutex_unlock(&instance
->mutex
);
715 pthread_mutex_unlock(&instance
->mutex
);
720 if(instance
->callbacks
->on_trace_end
)
721 instance
->callbacks
->on_trace_end(instance
);
723 libustd_delete_instance(instance
);
728 /* FIXME: threads and connections !? */
729 void libustd_delete_instance(struct libustd_instance
*instance
)
731 if (instance
->is_init
) {
732 ustcomm_del_named_sock(instance
->listen_sock
, 0);
733 close(instance
->epoll_fd
);
736 pthread_mutex_destroy(&instance
->mutex
);
737 free(instance
->sock_path
);
741 /* FIXME: Do something about the fixed path length, maybe get rid
742 * of the whole concept and use a pipe?
744 int libustd_stop_instance(struct libustd_instance
*instance
, int send_msg
)
752 instance
->quit_program
= 1;
757 /* Send a message through the socket to force poll to return */
759 struct sockaddr_un addr
;
761 result
= fd
= socket(PF_UNIX
, SOCK_STREAM
, 0);
767 addr
.sun_family
= AF_UNIX
;
769 strncpy(addr
.sun_path
, instance
->sock_path
, UNIX_PATH_MAX
);
770 addr
.sun_path
[UNIX_PATH_MAX
-1] = '\0';
772 result
= connect(fd
, (struct sockaddr
*)&addr
, sizeof(addr
));
777 while(bytes
!= sizeof(msg
))
778 bytes
+= send(fd
, msg
, sizeof(msg
), 0);
785 struct libustd_instance
786 *libustd_new_instance(struct libustd_callbacks
*callbacks
,
789 struct libustd_instance
*instance
=
790 zmalloc(sizeof(struct libustd_instance
));
795 instance
->callbacks
= callbacks
;
796 instance
->quit_program
= 0;
797 instance
->is_init
= 0;
798 instance
->active_buffers
= 0;
799 pthread_mutex_init(&instance
->mutex
, NULL
);
802 instance
->sock_path
= strdup(sock_path
);
804 instance
->sock_path
= NULL
;
810 static int init_ustd_socket(struct libustd_instance
*instance
)
814 if (instance
->sock_path
) {
815 if (asprintf(&name
, "%s", instance
->sock_path
) < 0) {
816 ERR("ustcomm_init_ustd : asprintf failed (sock_path %s)",
817 instance
->sock_path
);
823 /* Only check if socket dir exists if we are using the default directory */
824 result
= ensure_dir_exists(SOCK_DIR
);
826 ERR("Unable to create socket directory %s", SOCK_DIR
);
830 if (asprintf(&name
, "%s/%s", SOCK_DIR
, "ustd") < 0) {
831 ERR("ustcomm_init_ustd : asprintf failed (%s/ustd)",
838 instance
->epoll_fd
= epoll_create(MAX_EVENTS
);
839 if (instance
->epoll_fd
== -1) {
840 ERR("epoll_create failed, start instance bailing");
844 /* Create the named socket */
845 instance
->listen_sock
= ustcomm_init_named_socket(name
,
847 if(!instance
->listen_sock
) {
848 ERR("error initializing named socket at %s", name
);
852 INIT_LIST_HEAD(&instance
->connections
);
859 close(instance
->epoll_fd
);
866 int libustd_init_instance(struct libustd_instance
*instance
)
869 result
= init_ustd_socket(instance
);
871 ERR("failed to initialize socket");
874 instance
->is_init
= 1;