ustd: specify ust component name for build
[ust.git] / ustd / ustd.c
CommitLineData
c39c72ee 1/* Copyright (C) 2009 Pierre-Marc Fournier
1f8b0dff 2 *
c39c72ee
PMF
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
1f8b0dff 7 *
c39c72ee 8 * This library is distributed in the hope that it will be useful,
1f8b0dff 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
c39c72ee
PMF
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
1f8b0dff 12 *
c39c72ee
PMF
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1f8b0dff
PMF
16 */
17
3796af9b
PMF
18#define _GNU_SOURCE
19
20#include <sys/types.h>
21#include <sys/shm.h>
688760ef
PMF
22#include <fcntl.h>
23#include <unistd.h>
3a7b90de 24#include <pthread.h>
a3cdd4a7 25#include <signal.h>
3796af9b
PMF
26
27#include <stdlib.h>
28#include <stdio.h>
29#include <string.h>
a3cdd4a7
PMF
30#include <errno.h>
31#include <assert.h>
3796af9b 32
0b0cd937 33#include "ustd.h"
3796af9b
PMF
34#include "localerr.h"
35#include "ustcomm.h"
36
3a7b90de
PMF
37/* return value: 0 = subbuffer is finished, it won't produce data anymore
38 * 1 = got subbuffer successfully
39 * <0 = error
40 */
3796af9b 41
8cefc145
PMF
42#define GET_SUBBUF_OK 1
43#define GET_SUBBUF_DONE 0
44#define GET_SUBBUF_DIED 2
45
a3cdd4a7
PMF
46#define PUT_SUBBUF_OK 1
47#define PUT_SUBBUF_DIED 0
48#define PUT_SUBBUF_PUSHED 2
49
c97d4437
PMF
50char *sock_path=NULL;
51
a3cdd4a7
PMF
52int test_sigpipe(void)
53{
54 sigset_t sigset;
55 int result;
56
57 result = sigemptyset(&sigset);
58 if(result == -1) {
59 perror("sigemptyset");
60 return -1;
61 }
62 result = sigaddset(&sigset, SIGPIPE);
63 if(result == -1) {
64 perror("sigaddset");
65 return -1;
66 }
67
68 result = sigtimedwait(&sigset, NULL, &(struct timespec){0,0});
69 if(result == -1 && errno == EAGAIN) {
70 /* no signal received */
71 return 0;
72 }
73 else if(result == -1) {
74 perror("sigtimedwait");
75 return -1;
76 }
77 else if(result == SIGPIPE) {
78 /* received sigpipe */
79 return 1;
80 }
81 else {
82 assert(0);
83 }
84}
85
688760ef
PMF
86int get_subbuffer(struct buffer_info *buf)
87{
88 char *send_msg;
89 char *received_msg;
90 char *rep_code;
91 int retval;
92 int result;
93
94 asprintf(&send_msg, "get_subbuffer %s", buf->name);
3bb56863 95 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
8cefc145 96 free(send_msg);
a3cdd4a7
PMF
97 if(test_sigpipe()) {
98 WARN("process %d destroyed before we could connect to it", buf->pid);
99 return GET_SUBBUF_DONE;
100 }
101 else if(result < 0) {
3bb56863 102 ERR("get_subbuffer: ustcomm_send_request failed");
688760ef
PMF
103 return -1;
104 }
8cefc145
PMF
105 else if(result == 0) {
106 DBG("app died while being traced");
107 return GET_SUBBUF_DIED;
108 }
688760ef
PMF
109
110 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
3a7b90de 111 if(result != 2 && result != 1) {
688760ef
PMF
112 ERR("unable to parse response to get_subbuffer");
113 return -1;
114 }
3a7b90de
PMF
115
116 DBG("received msg is %s", received_msg);
688760ef
PMF
117
118 if(!strcmp(rep_code, "OK")) {
119 DBG("got subbuffer %s", buf->name);
8cefc145 120 retval = GET_SUBBUF_OK;
688760ef 121 }
3a7b90de 122 else if(nth_token_is(received_msg, "END", 0) == 1) {
8cefc145 123 return GET_SUBBUF_DONE;
3a7b90de 124 }
688760ef 125 else {
3a7b90de
PMF
126 DBG("error getting subbuffer %s", buf->name);
127 retval = -1;
688760ef
PMF
128 }
129
3a7b90de
PMF
130 /* FIMXE: free correctly the stuff */
131 free(received_msg);
688760ef
PMF
132 free(rep_code);
133 return retval;
134}
135
136int put_subbuffer(struct buffer_info *buf)
137{
138 char *send_msg;
139 char *received_msg;
140 char *rep_code;
141 int retval;
142 int result;
143
144 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
3bb56863 145 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
688760ef
PMF
146 if(result < 0) {
147 ERR("put_subbuffer: send_message failed");
148 return -1;
149 }
150 free(send_msg);
151
152 result = sscanf(received_msg, "%as", &rep_code);
153 if(result != 1) {
154 ERR("unable to parse response to put_subbuffer");
155 return -1;
156 }
157 free(received_msg);
158
159 if(!strcmp(rep_code, "OK")) {
160 DBG("subbuffer put %s", buf->name);
a3cdd4a7 161 retval = PUT_SUBBUF_OK;
688760ef
PMF
162 }
163 else {
a3cdd4a7
PMF
164 DBG("put_subbuffer: received error, we were pushed");
165 return PUT_SUBBUF_PUSHED;
688760ef
PMF
166 }
167
168 free(rep_code);
169 return retval;
170}
171
a3cdd4a7
PMF
172/* This write is patient because it restarts if it was incomplete.
173 */
174
688760ef
PMF
175ssize_t patient_write(int fd, const void *buf, size_t count)
176{
177 const char *bufc = (const char *) buf;
178 int result;
179
180 for(;;) {
181 result = write(fd, bufc, count);
182 if(result <= 0) {
183 return result;
184 }
185 count -= result;
186 bufc += result;
187
188 if(count == 0) {
189 break;
190 }
191 }
192
193 return bufc-(const char *)buf;
194}
195
3a7b90de
PMF
196void *consumer_thread(void *arg)
197{
198 struct buffer_info *buf = (struct buffer_info *) arg;
199 int result;
200
201 for(;;) {
8cefc145 202 /* get the subbuffer */
0b0cd937
PMF
203 result = get_subbuffer(buf);
204 if(result == -1) {
205 ERR("error getting subbuffer");
206 continue;
3a7b90de 207 }
0b0cd937
PMF
208 else if(result == GET_SUBBUF_DONE) {
209 /* this is done */
210 break;
211 }
212 else if(result == GET_SUBBUF_DIED) {
213 finish_consuming_dead_subbuffer(buf);
214 break;
3a7b90de
PMF
215 }
216
217 /* write data to file */
218 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
219 if(result == -1) {
220 PERROR("write");
221 /* FIXME: maybe drop this trace */
222 }
223
8cefc145 224 /* put the subbuffer */
0b0cd937
PMF
225 result = put_subbuffer(buf);
226 if(result == -1) {
a3cdd4a7
PMF
227 ERR("unknown error putting subbuffer (channel=%s)", buf->name);
228 break;
229 }
230 else if(result == PUT_SUBBUF_PUSHED) {
231 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name);
0b0cd937 232 break;
3a7b90de 233 }
a3cdd4a7
PMF
234 else if(result == PUT_SUBBUF_DIED) {
235 WARN("application died while putting subbuffer");
236 /* FIXME: probably need to skip the first subbuffer in finish_consuming_dead_subbuffer */
237 finish_consuming_dead_subbuffer(buf);
238 }
239 else if(result == PUT_SUBBUF_OK) {
240 }
3a7b90de
PMF
241 }
242
243 DBG("thread for buffer %s is stopping", buf->name);
244
8cefc145
PMF
245 /* FIXME: destroy, unalloc... */
246
3a7b90de
PMF
247 return NULL;
248}
249
72ebd39a
PMF
250int create_dir_if_needed(char *dir)
251{
252 int result;
253 result = mkdir(dir, 0777);
254 if(result == -1) {
255 if(errno != EEXIST) {
256 perror("mkdir");
257 return -1;
258 }
259 }
260
261 return 0;
262}
263
3a7b90de
PMF
264int add_buffer(pid_t pid, char *bufname)
265{
266 struct buffer_info *buf;
267 char *send_msg;
268 char *received_msg;
269 int result;
270 char *tmp;
271 int fd;
272 pthread_t thr;
a3cdd4a7 273 struct shmid_ds shmds;
3a7b90de
PMF
274
275 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
276 if(buf == NULL) {
277 ERR("add_buffer: insufficient memory");
278 return -1;
279 }
280
281 buf->name = bufname;
282 buf->pid = pid;
283
4e2a8808
PMF
284 /* connect to app */
285 result = ustcomm_connect_app(buf->pid, &buf->conn);
286 if(result) {
a3cdd4a7 287 WARN("unable to connect to process, it probably died before we were able to connect");
4e2a8808
PMF
288 return -1;
289 }
290
3a7b90de
PMF
291 /* get shmid */
292 asprintf(&send_msg, "get_shmid %s", buf->name);
a3cdd4a7 293 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de 294 free(send_msg);
a3cdd4a7
PMF
295 if(result == -1) {
296 ERR("problem in ustcomm_send_request(get_shmid)");
297 return -1;
298 }
3a7b90de 299
8cefc145
PMF
300 result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
301 if(result != 2) {
3a7b90de
PMF
302 ERR("unable to parse response to get_shmid");
303 return -1;
304 }
305 free(received_msg);
8cefc145 306 DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
3a7b90de
PMF
307
308 /* get n_subbufs */
309 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
a3cdd4a7 310 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de 311 free(send_msg);
a3cdd4a7
PMF
312 if(result == -1) {
313 ERR("problem in ustcomm_send_request(g_n_subbufs)");
314 return -1;
315 }
3a7b90de
PMF
316
317 result = sscanf(received_msg, "%d", &buf->n_subbufs);
318 if(result != 1) {
319 ERR("unable to parse response to get_n_subbufs");
320 return -1;
321 }
322 free(received_msg);
323 DBG("got n_subbufs %d", buf->n_subbufs);
324
325 /* get subbuf size */
326 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
4e2a8808 327 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de
PMF
328 free(send_msg);
329
330 result = sscanf(received_msg, "%d", &buf->subbuf_size);
331 if(result != 1) {
332 ERR("unable to parse response to get_subbuf_size");
333 return -1;
334 }
335 free(received_msg);
336 DBG("got subbuf_size %d", buf->subbuf_size);
337
338 /* attach memory */
339 buf->mem = shmat(buf->shmid, NULL, 0);
340 if(buf->mem == (void *) 0) {
341 perror("shmat");
342 return -1;
343 }
8cefc145
PMF
344 DBG("successfully attached buffer memory");
345
346 buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
347 if(buf->bufstruct_mem == (void *) 0) {
348 perror("shmat");
349 return -1;
350 }
351 DBG("successfully attached buffer bufstruct memory");
3a7b90de 352
a3cdd4a7
PMF
353 /* obtain info on the memory segment */
354 result = shmctl(buf->shmid, IPC_STAT, &shmds);
355 if(result == -1) {
356 perror("shmctl");
357 return -1;
358 }
359 buf->memlen = shmds.shm_segsz;
360
3a7b90de 361 /* open file for output */
72ebd39a
PMF
362 result = create_dir_if_needed(USTD_DEFAULT_TRACE_PATH);
363 if(result == -1) {
364 ERR("could not create directory %s", USTD_DEFAULT_TRACE_PATH);
365 return -1;
366 }
367
368 asprintf(&tmp, "%s/%u", USTD_DEFAULT_TRACE_PATH, buf->pid);
369 result = create_dir_if_needed(tmp);
370 if(result == -1) {
371 ERR("could not create directory %s", tmp);
372 free(tmp);
373 return -1;
374 }
375 free(tmp);
376
377 asprintf(&tmp, "%s/%u/%s_0", USTD_DEFAULT_TRACE_PATH, buf->pid, buf->name);
3a7b90de
PMF
378 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
379 if(result == -1) {
380 PERROR("open");
6cb88bc0 381 ERR("failed opening trace file %s", tmp);
3a7b90de
PMF
382 return -1;
383 }
384 buf->file_fd = fd;
385 free(tmp);
386
3a7b90de
PMF
387 pthread_create(&thr, NULL, consumer_thread, buf);
388
389 return 0;
390}
391
3796af9b
PMF
392int main(int argc, char **argv)
393{
394 struct ustcomm_ustd ustd;
395 int result;
a3cdd4a7 396 sigset_t sigset;
3796af9b
PMF
397
398 result = ustcomm_init_ustd(&ustd);
399 if(result == -1) {
400 ERR("failed to initialize socket");
401 return 1;
402 }
403
a3cdd4a7
PMF
404 result = sigemptyset(&sigset);
405 if(result == -1) {
406 perror("sigemptyset");
407 return 1;
408 }
409 result = sigaddset(&sigset, SIGPIPE);
410 if(result == -1) {
411 perror("sigaddset");
412 return 1;
413 }
414 result = sigprocmask(SIG_BLOCK, &sigset, NULL);
415 if(result == -1) {
416 perror("sigprocmask");
417 return 1;
418 }
419
688760ef 420 /* app loop */
3796af9b
PMF
421 for(;;) {
422 char *recvbuf;
423
3a7b90de 424 /* check for requests on our public socket */
688760ef
PMF
425 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
426 if(result == -1) {
427 ERR("error in ustcomm_ustd_recv_message");
428 continue;
429 }
430 if(result > 0) {
431 if(!strncmp(recvbuf, "collect", 7)) {
432 pid_t pid;
433 char *bufname;
434 int result;
3796af9b 435
688760ef
PMF
436 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
437 if(result != 2) {
438 fprintf(stderr, "parsing error: %s\n", recvbuf);
439 }
3796af9b 440
688760ef
PMF
441 result = add_buffer(pid, bufname);
442 if(result < 0) {
443 ERR("error in add_buffer");
444 continue;
445 }
3796af9b
PMF
446 }
447
688760ef 448 free(recvbuf);
3796af9b 449 }
3796af9b
PMF
450 }
451
452 return 0;
453}
This page took 0.041884 seconds and 4 git commands to generate.