830d5e26441fe368b975b2c8b75fe44dd5e93060
[ust.git] / ustd / ustd.c
1 /*
2 * Copyright (C) 2009 Pierre-Marc Fournier
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 #define _GNU_SOURCE
19
20 #include <sys/types.h>
21 #include <sys/shm.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <pthread.h>
25
26 #include <stdlib.h>
27 #include <stdio.h>
28 #include <string.h>
29
30 #include "localerr.h"
31 #include "ustcomm.h"
32
33 struct list_head buffers = LIST_HEAD_INIT(buffers);
34
35 struct buffer_info {
36 char *name;
37 pid_t pid;
38
39 int shmid;
40 void *mem;
41 int memlen;
42
43 int n_subbufs;
44 int subbuf_size;
45
46 int file_fd; /* output file */
47
48 struct list_head list;
49
50 long consumed_old;
51 };
52
53 /* return value: 0 = subbuffer is finished, it won't produce data anymore
54 * 1 = got subbuffer successfully
55 * <0 = error
56 */
57
58 int get_subbuffer(struct buffer_info *buf)
59 {
60 char *send_msg;
61 char *received_msg;
62 char *rep_code;
63 int retval;
64 int result;
65
66 asprintf(&send_msg, "get_subbuffer %s", buf->name);
67 result = send_message(buf->pid, send_msg, &received_msg);
68 if(result < 0) {
69 ERR("get_subbuffer: send_message failed");
70 return -1;
71 }
72 free(send_msg);
73
74 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
75 if(result != 2 && result != 1) {
76 ERR("unable to parse response to get_subbuffer");
77 return -1;
78 }
79
80 DBG("received msg is %s", received_msg);
81
82 if(!strcmp(rep_code, "OK")) {
83 DBG("got subbuffer %s", buf->name);
84 retval = 1;
85 }
86 else if(nth_token_is(received_msg, "END", 0) == 1) {
87 return 0;
88 }
89 else {
90 DBG("error getting subbuffer %s", buf->name);
91 retval = -1;
92 }
93
94 /* FIMXE: free correctly the stuff */
95 free(received_msg);
96 free(rep_code);
97 return retval;
98 }
99
100 int put_subbuffer(struct buffer_info *buf)
101 {
102 char *send_msg;
103 char *received_msg;
104 char *rep_code;
105 int retval;
106 int result;
107
108 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
109 result = send_message(buf->pid, send_msg, &received_msg);
110 if(result < 0) {
111 ERR("put_subbuffer: send_message failed");
112 return -1;
113 }
114 free(send_msg);
115
116 result = sscanf(received_msg, "%as", &rep_code);
117 if(result != 1) {
118 ERR("unable to parse response to put_subbuffer");
119 return -1;
120 }
121 free(received_msg);
122
123 if(!strcmp(rep_code, "OK")) {
124 DBG("subbuffer put %s", buf->name);
125 retval = 1;
126 }
127 else {
128 ERR("invalid response to put_subbuffer");
129 }
130
131 free(rep_code);
132 return retval;
133 }
134
135 ssize_t patient_write(int fd, const void *buf, size_t count)
136 {
137 const char *bufc = (const char *) buf;
138 int result;
139
140 for(;;) {
141 result = write(fd, bufc, count);
142 if(result <= 0) {
143 return result;
144 }
145 count -= result;
146 bufc += result;
147
148 if(count == 0) {
149 break;
150 }
151 }
152
153 return bufc-(const char *)buf;
154 }
155
156 void *consumer_thread(void *arg)
157 {
158 struct buffer_info *buf = (struct buffer_info *) arg;
159 int result;
160
161 for(;;) {
162 result = get_subbuffer(buf);
163 if(result == -1) {
164 ERR("error getting subbuffer");
165 continue;
166 }
167 if(result == 0) {
168 /* this is done */
169 break;
170 }
171
172 /* write data to file */
173 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
174 if(result == -1) {
175 PERROR("write");
176 /* FIXME: maybe drop this trace */
177 }
178
179 result = put_subbuffer(buf);
180 if(result == -1) {
181 ERR("error putting subbuffer");
182 break;
183 }
184 }
185
186 DBG("thread for buffer %s is stopping", buf->name);
187
188 return NULL;
189 }
190
191 int add_buffer(pid_t pid, char *bufname)
192 {
193 struct buffer_info *buf;
194 char *send_msg;
195 char *received_msg;
196 int result;
197 char *tmp;
198 int fd;
199 pthread_t thr;
200
201 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
202 if(buf == NULL) {
203 ERR("add_buffer: insufficient memory");
204 return -1;
205 }
206
207 buf->name = bufname;
208 buf->pid = pid;
209
210 /* get shmid */
211 asprintf(&send_msg, "get_shmid %s", buf->name);
212 send_message(pid, send_msg, &received_msg);
213 free(send_msg);
214 DBG("got buffer name %s", buf->name);
215
216 result = sscanf(received_msg, "%d", &buf->shmid);
217 if(result != 1) {
218 ERR("unable to parse response to get_shmid");
219 return -1;
220 }
221 free(received_msg);
222 DBG("got shmid %d", buf->shmid);
223
224 /* get n_subbufs */
225 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
226 send_message(pid, send_msg, &received_msg);
227 free(send_msg);
228
229 result = sscanf(received_msg, "%d", &buf->n_subbufs);
230 if(result != 1) {
231 ERR("unable to parse response to get_n_subbufs");
232 return -1;
233 }
234 free(received_msg);
235 DBG("got n_subbufs %d", buf->n_subbufs);
236
237 /* get subbuf size */
238 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
239 send_message(pid, send_msg, &received_msg);
240 free(send_msg);
241
242 result = sscanf(received_msg, "%d", &buf->subbuf_size);
243 if(result != 1) {
244 ERR("unable to parse response to get_subbuf_size");
245 return -1;
246 }
247 free(received_msg);
248 DBG("got subbuf_size %d", buf->subbuf_size);
249
250 /* attach memory */
251 buf->mem = shmat(buf->shmid, NULL, 0);
252 if(buf->mem == (void *) 0) {
253 perror("shmat");
254 return -1;
255 }
256 DBG("successfully attached memory");
257
258 /* open file for output */
259 asprintf(&tmp, "/tmp/trace/%s_0", buf->name);
260 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
261 if(result == -1) {
262 PERROR("open");
263 return -1;
264 }
265 buf->file_fd = fd;
266 free(tmp);
267
268 //list_add(&buf->list, &buffers);
269
270 pthread_create(&thr, NULL, consumer_thread, buf);
271
272 return 0;
273 }
274
275 int main(int argc, char **argv)
276 {
277 struct ustcomm_ustd ustd;
278 int result;
279
280 result = ustcomm_init_ustd(&ustd);
281 if(result == -1) {
282 ERR("failed to initialize socket");
283 return 1;
284 }
285
286 /* app loop */
287 for(;;) {
288 char *recvbuf;
289
290 /* check for requests on our public socket */
291 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
292 if(result == -1) {
293 ERR("error in ustcomm_ustd_recv_message");
294 continue;
295 }
296 if(result > 0) {
297 if(!strncmp(recvbuf, "collect", 7)) {
298 pid_t pid;
299 char *bufname;
300 int result;
301
302 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
303 if(result != 2) {
304 fprintf(stderr, "parsing error: %s\n", recvbuf);
305 }
306
307 result = add_buffer(pid, bufname);
308 if(result < 0) {
309 ERR("error in add_buffer");
310 continue;
311 }
312 }
313
314 free(recvbuf);
315 }
316 }
317
318 return 0;
319 }
This page took 0.035846 seconds and 3 git commands to generate.