start adding LGPL headers
[ust.git] / ustd / ustd.c
CommitLineData
c39c72ee 1/* Copyright (C) 2009 Pierre-Marc Fournier
1f8b0dff 2 *
c39c72ee
PMF
3 * This library is free software; you can redistribute it and/or
4 * modify it under the terms of the GNU Lesser General Public
5 * License as published by the Free Software Foundation; either
6 * version 2.1 of the License, or (at your option) any later version.
1f8b0dff 7 *
c39c72ee 8 * This library is distributed in the hope that it will be useful,
1f8b0dff 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
c39c72ee
PMF
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 * Lesser General Public License for more details.
1f8b0dff 12 *
c39c72ee
PMF
13 * You should have received a copy of the GNU Lesser General Public
14 * License along with this library; if not, write to the Free Software
15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1f8b0dff
PMF
16 */
17
3796af9b
PMF
18#define _GNU_SOURCE
19
20#include <sys/types.h>
21#include <sys/shm.h>
688760ef
PMF
22#include <fcntl.h>
23#include <unistd.h>
3a7b90de 24#include <pthread.h>
a3cdd4a7 25#include <signal.h>
3796af9b
PMF
26
27#include <stdlib.h>
28#include <stdio.h>
29#include <string.h>
a3cdd4a7
PMF
30#include <errno.h>
31#include <assert.h>
3796af9b 32
0b0cd937 33#include "ustd.h"
3796af9b
PMF
34#include "localerr.h"
35#include "ustcomm.h"
36
3a7b90de
PMF
37/* return value: 0 = subbuffer is finished, it won't produce data anymore
38 * 1 = got subbuffer successfully
39 * <0 = error
40 */
3796af9b 41
8cefc145
PMF
42#define GET_SUBBUF_OK 1
43#define GET_SUBBUF_DONE 0
44#define GET_SUBBUF_DIED 2
45
a3cdd4a7
PMF
46#define PUT_SUBBUF_OK 1
47#define PUT_SUBBUF_DIED 0
48#define PUT_SUBBUF_PUSHED 2
49
50int test_sigpipe(void)
51{
52 sigset_t sigset;
53 int result;
54
55 result = sigemptyset(&sigset);
56 if(result == -1) {
57 perror("sigemptyset");
58 return -1;
59 }
60 result = sigaddset(&sigset, SIGPIPE);
61 if(result == -1) {
62 perror("sigaddset");
63 return -1;
64 }
65
66 result = sigtimedwait(&sigset, NULL, &(struct timespec){0,0});
67 if(result == -1 && errno == EAGAIN) {
68 /* no signal received */
69 return 0;
70 }
71 else if(result == -1) {
72 perror("sigtimedwait");
73 return -1;
74 }
75 else if(result == SIGPIPE) {
76 /* received sigpipe */
77 return 1;
78 }
79 else {
80 assert(0);
81 }
82}
83
688760ef
PMF
84int get_subbuffer(struct buffer_info *buf)
85{
86 char *send_msg;
87 char *received_msg;
88 char *rep_code;
89 int retval;
90 int result;
91
92 asprintf(&send_msg, "get_subbuffer %s", buf->name);
3bb56863 93 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
8cefc145 94 free(send_msg);
a3cdd4a7
PMF
95 if(test_sigpipe()) {
96 WARN("process %d destroyed before we could connect to it", buf->pid);
97 return GET_SUBBUF_DONE;
98 }
99 else if(result < 0) {
3bb56863 100 ERR("get_subbuffer: ustcomm_send_request failed");
688760ef
PMF
101 return -1;
102 }
8cefc145
PMF
103 else if(result == 0) {
104 DBG("app died while being traced");
105 return GET_SUBBUF_DIED;
106 }
688760ef
PMF
107
108 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
3a7b90de 109 if(result != 2 && result != 1) {
688760ef
PMF
110 ERR("unable to parse response to get_subbuffer");
111 return -1;
112 }
3a7b90de
PMF
113
114 DBG("received msg is %s", received_msg);
688760ef
PMF
115
116 if(!strcmp(rep_code, "OK")) {
117 DBG("got subbuffer %s", buf->name);
8cefc145 118 retval = GET_SUBBUF_OK;
688760ef 119 }
3a7b90de 120 else if(nth_token_is(received_msg, "END", 0) == 1) {
8cefc145 121 return GET_SUBBUF_DONE;
3a7b90de 122 }
688760ef 123 else {
3a7b90de
PMF
124 DBG("error getting subbuffer %s", buf->name);
125 retval = -1;
688760ef
PMF
126 }
127
3a7b90de
PMF
128 /* FIMXE: free correctly the stuff */
129 free(received_msg);
688760ef
PMF
130 free(rep_code);
131 return retval;
132}
133
134int put_subbuffer(struct buffer_info *buf)
135{
136 char *send_msg;
137 char *received_msg;
138 char *rep_code;
139 int retval;
140 int result;
141
142 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
3bb56863 143 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
688760ef
PMF
144 if(result < 0) {
145 ERR("put_subbuffer: send_message failed");
146 return -1;
147 }
148 free(send_msg);
149
150 result = sscanf(received_msg, "%as", &rep_code);
151 if(result != 1) {
152 ERR("unable to parse response to put_subbuffer");
153 return -1;
154 }
155 free(received_msg);
156
157 if(!strcmp(rep_code, "OK")) {
158 DBG("subbuffer put %s", buf->name);
a3cdd4a7 159 retval = PUT_SUBBUF_OK;
688760ef
PMF
160 }
161 else {
a3cdd4a7
PMF
162 DBG("put_subbuffer: received error, we were pushed");
163 return PUT_SUBBUF_PUSHED;
688760ef
PMF
164 }
165
166 free(rep_code);
167 return retval;
168}
169
a3cdd4a7
PMF
170/* This write is patient because it restarts if it was incomplete.
171 */
172
688760ef
PMF
173ssize_t patient_write(int fd, const void *buf, size_t count)
174{
175 const char *bufc = (const char *) buf;
176 int result;
177
178 for(;;) {
179 result = write(fd, bufc, count);
180 if(result <= 0) {
181 return result;
182 }
183 count -= result;
184 bufc += result;
185
186 if(count == 0) {
187 break;
188 }
189 }
190
191 return bufc-(const char *)buf;
192}
193
3a7b90de
PMF
194void *consumer_thread(void *arg)
195{
196 struct buffer_info *buf = (struct buffer_info *) arg;
197 int result;
198
199 for(;;) {
8cefc145 200 /* get the subbuffer */
0b0cd937
PMF
201 result = get_subbuffer(buf);
202 if(result == -1) {
203 ERR("error getting subbuffer");
204 continue;
3a7b90de 205 }
0b0cd937
PMF
206 else if(result == GET_SUBBUF_DONE) {
207 /* this is done */
208 break;
209 }
210 else if(result == GET_SUBBUF_DIED) {
211 finish_consuming_dead_subbuffer(buf);
212 break;
3a7b90de
PMF
213 }
214
215 /* write data to file */
216 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
217 if(result == -1) {
218 PERROR("write");
219 /* FIXME: maybe drop this trace */
220 }
221
8cefc145 222 /* put the subbuffer */
0b0cd937
PMF
223 result = put_subbuffer(buf);
224 if(result == -1) {
a3cdd4a7
PMF
225 ERR("unknown error putting subbuffer (channel=%s)", buf->name);
226 break;
227 }
228 else if(result == PUT_SUBBUF_PUSHED) {
229 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name);
0b0cd937 230 break;
3a7b90de 231 }
a3cdd4a7
PMF
232 else if(result == PUT_SUBBUF_DIED) {
233 WARN("application died while putting subbuffer");
234 /* FIXME: probably need to skip the first subbuffer in finish_consuming_dead_subbuffer */
235 finish_consuming_dead_subbuffer(buf);
236 }
237 else if(result == PUT_SUBBUF_OK) {
238 }
3a7b90de
PMF
239 }
240
241 DBG("thread for buffer %s is stopping", buf->name);
242
8cefc145
PMF
243 /* FIXME: destroy, unalloc... */
244
3a7b90de
PMF
245 return NULL;
246}
247
248int add_buffer(pid_t pid, char *bufname)
249{
250 struct buffer_info *buf;
251 char *send_msg;
252 char *received_msg;
253 int result;
254 char *tmp;
255 int fd;
256 pthread_t thr;
a3cdd4a7 257 struct shmid_ds shmds;
3a7b90de
PMF
258
259 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
260 if(buf == NULL) {
261 ERR("add_buffer: insufficient memory");
262 return -1;
263 }
264
265 buf->name = bufname;
266 buf->pid = pid;
267
4e2a8808
PMF
268 /* connect to app */
269 result = ustcomm_connect_app(buf->pid, &buf->conn);
270 if(result) {
a3cdd4a7 271 WARN("unable to connect to process, it probably died before we were able to connect");
4e2a8808
PMF
272 return -1;
273 }
274
3a7b90de
PMF
275 /* get shmid */
276 asprintf(&send_msg, "get_shmid %s", buf->name);
a3cdd4a7 277 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de 278 free(send_msg);
a3cdd4a7
PMF
279 if(result == -1) {
280 ERR("problem in ustcomm_send_request(get_shmid)");
281 return -1;
282 }
3a7b90de 283
8cefc145
PMF
284 result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
285 if(result != 2) {
3a7b90de
PMF
286 ERR("unable to parse response to get_shmid");
287 return -1;
288 }
289 free(received_msg);
8cefc145 290 DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
3a7b90de
PMF
291
292 /* get n_subbufs */
293 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
a3cdd4a7 294 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de 295 free(send_msg);
a3cdd4a7
PMF
296 if(result == -1) {
297 ERR("problem in ustcomm_send_request(g_n_subbufs)");
298 return -1;
299 }
3a7b90de
PMF
300
301 result = sscanf(received_msg, "%d", &buf->n_subbufs);
302 if(result != 1) {
303 ERR("unable to parse response to get_n_subbufs");
304 return -1;
305 }
306 free(received_msg);
307 DBG("got n_subbufs %d", buf->n_subbufs);
308
309 /* get subbuf size */
310 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
4e2a8808 311 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
3a7b90de
PMF
312 free(send_msg);
313
314 result = sscanf(received_msg, "%d", &buf->subbuf_size);
315 if(result != 1) {
316 ERR("unable to parse response to get_subbuf_size");
317 return -1;
318 }
319 free(received_msg);
320 DBG("got subbuf_size %d", buf->subbuf_size);
321
322 /* attach memory */
323 buf->mem = shmat(buf->shmid, NULL, 0);
324 if(buf->mem == (void *) 0) {
325 perror("shmat");
326 return -1;
327 }
8cefc145
PMF
328 DBG("successfully attached buffer memory");
329
330 buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
331 if(buf->bufstruct_mem == (void *) 0) {
332 perror("shmat");
333 return -1;
334 }
335 DBG("successfully attached buffer bufstruct memory");
3a7b90de 336
a3cdd4a7
PMF
337 /* obtain info on the memory segment */
338 result = shmctl(buf->shmid, IPC_STAT, &shmds);
339 if(result == -1) {
340 perror("shmctl");
341 return -1;
342 }
343 buf->memlen = shmds.shm_segsz;
344
3a7b90de
PMF
345 /* open file for output */
346 asprintf(&tmp, "/tmp/trace/%s_0", buf->name);
347 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
348 if(result == -1) {
349 PERROR("open");
6cb88bc0 350 ERR("failed opening trace file %s", tmp);
3a7b90de
PMF
351 return -1;
352 }
353 buf->file_fd = fd;
354 free(tmp);
355
3a7b90de
PMF
356 pthread_create(&thr, NULL, consumer_thread, buf);
357
358 return 0;
359}
360
3796af9b
PMF
361int main(int argc, char **argv)
362{
363 struct ustcomm_ustd ustd;
364 int result;
a3cdd4a7 365 sigset_t sigset;
3796af9b
PMF
366
367 result = ustcomm_init_ustd(&ustd);
368 if(result == -1) {
369 ERR("failed to initialize socket");
370 return 1;
371 }
372
a3cdd4a7
PMF
373 result = sigemptyset(&sigset);
374 if(result == -1) {
375 perror("sigemptyset");
376 return 1;
377 }
378 result = sigaddset(&sigset, SIGPIPE);
379 if(result == -1) {
380 perror("sigaddset");
381 return 1;
382 }
383 result = sigprocmask(SIG_BLOCK, &sigset, NULL);
384 if(result == -1) {
385 perror("sigprocmask");
386 return 1;
387 }
388
688760ef 389 /* app loop */
3796af9b
PMF
390 for(;;) {
391 char *recvbuf;
392
3a7b90de 393 /* check for requests on our public socket */
688760ef
PMF
394 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
395 if(result == -1) {
396 ERR("error in ustcomm_ustd_recv_message");
397 continue;
398 }
399 if(result > 0) {
400 if(!strncmp(recvbuf, "collect", 7)) {
401 pid_t pid;
402 char *bufname;
403 int result;
3796af9b 404
688760ef
PMF
405 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
406 if(result != 2) {
407 fprintf(stderr, "parsing error: %s\n", recvbuf);
408 }
3796af9b 409
688760ef
PMF
410 result = add_buffer(pid, bufname);
411 if(result < 0) {
412 ERR("error in add_buffer");
413 continue;
414 }
3796af9b
PMF
415 }
416
688760ef 417 free(recvbuf);
3796af9b 418 }
3796af9b
PMF
419 }
420
421 return 0;
422}
This page took 0.039981 seconds and 4 git commands to generate.