start implementing sharing of buffer info
[ust.git] / ustd / ustd.c
1 /*
2 * Copyright (C) 2009 Pierre-Marc Fournier
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 #define _GNU_SOURCE
19
20 #include <sys/types.h>
21 #include <sys/shm.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <pthread.h>
25
26 #include <stdlib.h>
27 #include <stdio.h>
28 #include <string.h>
29
30 #include "localerr.h"
31 #include "ustcomm.h"
32
33 struct list_head buffers = LIST_HEAD_INIT(buffers);
34
35 struct buffer_info {
36 char *name;
37 pid_t pid;
38 struct ustcomm_connection conn;
39
40 int shmid;
41 int bufstruct_shmid;
42
43 /* the buffer memory */
44 void *mem;
45 /* buffer size */
46 int memlen;
47 /* number of subbuffers in buffer */
48 int n_subbufs;
49 /* size of each subbuffer */
50 int subbuf_size;
51
52 /* the buffer information struct */
53 void *bufstruct_mem;
54
55 int file_fd; /* output file */
56
57 struct list_head list;
58
59 long consumed_old;
60 };
61
62 /* return value: 0 = subbuffer is finished, it won't produce data anymore
63 * 1 = got subbuffer successfully
64 * <0 = error
65 */
66
67 #define GET_SUBBUF_OK 1
68 #define GET_SUBBUF_DONE 0
69 #define GET_SUBBUF_DIED 2
70
71 int get_subbuffer(struct buffer_info *buf)
72 {
73 char *send_msg;
74 char *received_msg;
75 char *rep_code;
76 int retval;
77 int result;
78
79 asprintf(&send_msg, "get_subbuffer %s", buf->name);
80 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
81 free(send_msg);
82 if(result < 0) {
83 ERR("get_subbuffer: ustcomm_send_request failed");
84 return -1;
85 }
86 else if(result == 0) {
87 DBG("app died while being traced");
88 return GET_SUBBUF_DIED;
89 }
90
91 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
92 if(result != 2 && result != 1) {
93 ERR("unable to parse response to get_subbuffer");
94 return -1;
95 }
96
97 DBG("received msg is %s", received_msg);
98
99 if(!strcmp(rep_code, "OK")) {
100 DBG("got subbuffer %s", buf->name);
101 retval = GET_SUBBUF_OK;
102 }
103 else if(nth_token_is(received_msg, "END", 0) == 1) {
104 return GET_SUBBUF_DONE;
105 }
106 else {
107 DBG("error getting subbuffer %s", buf->name);
108 retval = -1;
109 }
110
111 /* FIMXE: free correctly the stuff */
112 free(received_msg);
113 free(rep_code);
114 return retval;
115 }
116
117 int put_subbuffer(struct buffer_info *buf)
118 {
119 char *send_msg;
120 char *received_msg;
121 char *rep_code;
122 int retval;
123 int result;
124
125 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
126 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
127 if(result < 0) {
128 ERR("put_subbuffer: send_message failed");
129 return -1;
130 }
131 free(send_msg);
132
133 result = sscanf(received_msg, "%as", &rep_code);
134 if(result != 1) {
135 ERR("unable to parse response to put_subbuffer");
136 return -1;
137 }
138 free(received_msg);
139
140 if(!strcmp(rep_code, "OK")) {
141 DBG("subbuffer put %s", buf->name);
142 retval = 1;
143 }
144 else {
145 ERR("invalid response to put_subbuffer");
146 }
147
148 free(rep_code);
149 return retval;
150 }
151
152 ssize_t patient_write(int fd, const void *buf, size_t count)
153 {
154 const char *bufc = (const char *) buf;
155 int result;
156
157 for(;;) {
158 result = write(fd, bufc, count);
159 if(result <= 0) {
160 return result;
161 }
162 count -= result;
163 bufc += result;
164
165 if(count == 0) {
166 break;
167 }
168 }
169
170 return bufc-(const char *)buf;
171 }
172
173 int get_subbuffer_died(struct buffer_info *buf)
174 {
175 return 0;
176 }
177
178 //int ltt_do_get_subbuf(struct rchan_buf *buf, struct ltt_channel_buf_struct *ltt_buf, long *pconsumed_old)
179 //{
180 // struct ltt_channel_buf_struct *ltt_buf = buf->bufstruct_mem;
181 //
182 ////ust// struct ltt_channel_struct *ltt_channel = (struct ltt_channel_struct *)buf->chan->private_data;
183 // long consumed_old, consumed_idx, commit_count, write_offset;
184 // consumed_old = atomic_long_read(&ltt_buf->consumed);
185 // consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
186 // commit_count = local_read(&ltt_buf->commit_count[consumed_idx]);
187 // /*
188 // * Make sure we read the commit count before reading the buffer
189 // * data and the write offset. Correct consumed offset ordering
190 // * wrt commit count is insured by the use of cmpxchg to update
191 // * the consumed offset.
192 // */
193 // smp_rmb();
194 // write_offset = local_read(&ltt_buf->offset);
195 // /*
196 // * Check that the subbuffer we are trying to consume has been
197 // * already fully committed.
198 // */
199 // if (((commit_count - buf->chan->subbuf_size)
200 // & ltt_channel->commit_count_mask)
201 // - (BUFFER_TRUNC(consumed_old, buf->chan)
202 // >> ltt_channel->n_subbufs_order)
203 // != 0) {
204 // return -EAGAIN;
205 // }
206 // /*
207 // * Check that we are not about to read the same subbuffer in
208 // * which the writer head is.
209 // */
210 // if ((SUBBUF_TRUNC(write_offset, buf->chan)
211 // - SUBBUF_TRUNC(consumed_old, buf->chan))
212 // == 0) {
213 // return -EAGAIN;
214 // }
215 //
216 // *pconsumed_old = consumed_old;
217 // return 0;
218 //}
219
220 void *consumer_thread(void *arg)
221 {
222 struct buffer_info *buf = (struct buffer_info *) arg;
223 int result;
224 int died = 0;
225
226 for(;;) {
227 /* get the subbuffer */
228 if(died == 0) {
229 result = get_subbuffer(buf);
230 if(result == -1) {
231 ERR("error getting subbuffer");
232 continue;
233 }
234 else if(result == GET_SUBBUF_DONE) {
235 /* this is done */
236 break;
237 }
238 else if(result == GET_SUBBUF_DIED) {
239 died = 1;
240 }
241 }
242 if(died == 1) {
243 result = get_subbuffer_died(buf);
244 if(result <= 0) {
245 break;
246 }
247 }
248
249 /* write data to file */
250 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
251 if(result == -1) {
252 PERROR("write");
253 /* FIXME: maybe drop this trace */
254 }
255
256 /* put the subbuffer */
257 if(died == 0) {
258 result = put_subbuffer(buf);
259 if(result == -1) {
260 ERR("error putting subbuffer");
261 break;
262 }
263 }
264 else {
265 // result = put_subbuffer_died(buf);
266 }
267 }
268
269 DBG("thread for buffer %s is stopping", buf->name);
270
271 /* FIXME: destroy, unalloc... */
272
273 return NULL;
274 }
275
276 int add_buffer(pid_t pid, char *bufname)
277 {
278 struct buffer_info *buf;
279 char *send_msg;
280 char *received_msg;
281 int result;
282 char *tmp;
283 int fd;
284 pthread_t thr;
285
286 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
287 if(buf == NULL) {
288 ERR("add_buffer: insufficient memory");
289 return -1;
290 }
291
292 buf->name = bufname;
293 buf->pid = pid;
294
295 /* connect to app */
296 result = ustcomm_connect_app(buf->pid, &buf->conn);
297 if(result) {
298 ERR("unable to connect to process");
299 return -1;
300 }
301
302 /* get shmid */
303 asprintf(&send_msg, "get_shmid %s", buf->name);
304 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
305 free(send_msg);
306 DBG("got buffer name %s", buf->name);
307
308 result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
309 if(result != 2) {
310 ERR("unable to parse response to get_shmid");
311 return -1;
312 }
313 free(received_msg);
314 DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
315
316 /* get n_subbufs */
317 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
318 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
319 free(send_msg);
320
321 result = sscanf(received_msg, "%d", &buf->n_subbufs);
322 if(result != 1) {
323 ERR("unable to parse response to get_n_subbufs");
324 return -1;
325 }
326 free(received_msg);
327 DBG("got n_subbufs %d", buf->n_subbufs);
328
329 /* get subbuf size */
330 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
331 ustcomm_send_request(&buf->conn, send_msg, &received_msg);
332 free(send_msg);
333
334 result = sscanf(received_msg, "%d", &buf->subbuf_size);
335 if(result != 1) {
336 ERR("unable to parse response to get_subbuf_size");
337 return -1;
338 }
339 free(received_msg);
340 DBG("got subbuf_size %d", buf->subbuf_size);
341
342 /* attach memory */
343 buf->mem = shmat(buf->shmid, NULL, 0);
344 if(buf->mem == (void *) 0) {
345 perror("shmat");
346 return -1;
347 }
348 DBG("successfully attached buffer memory");
349
350 buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
351 if(buf->bufstruct_mem == (void *) 0) {
352 perror("shmat");
353 return -1;
354 }
355 DBG("successfully attached buffer bufstruct memory");
356
357 /* open file for output */
358 asprintf(&tmp, "/tmp/trace/%s_0", buf->name);
359 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
360 if(result == -1) {
361 PERROR("open");
362 return -1;
363 }
364 buf->file_fd = fd;
365 free(tmp);
366
367 //list_add(&buf->list, &buffers);
368
369 pthread_create(&thr, NULL, consumer_thread, buf);
370
371 return 0;
372 }
373
374 int main(int argc, char **argv)
375 {
376 struct ustcomm_ustd ustd;
377 int result;
378
379 result = ustcomm_init_ustd(&ustd);
380 if(result == -1) {
381 ERR("failed to initialize socket");
382 return 1;
383 }
384
385 /* app loop */
386 for(;;) {
387 char *recvbuf;
388
389 /* check for requests on our public socket */
390 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
391 if(result == -1) {
392 ERR("error in ustcomm_ustd_recv_message");
393 continue;
394 }
395 if(result > 0) {
396 if(!strncmp(recvbuf, "collect", 7)) {
397 pid_t pid;
398 char *bufname;
399 int result;
400
401 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
402 if(result != 2) {
403 fprintf(stderr, "parsing error: %s\n", recvbuf);
404 }
405
406 result = add_buffer(pid, bufname);
407 if(result < 0) {
408 ERR("error in add_buffer");
409 continue;
410 }
411 }
412
413 free(recvbuf);
414 }
415 }
416
417 return 0;
418 }
This page took 0.037137 seconds and 4 git commands to generate.