ust: make lttd work
[ust.git] / ustd / ustd.c
1 #define _GNU_SOURCE
2
3 #include <sys/types.h>
4 #include <sys/shm.h>
5 #include <fcntl.h>
6 #include <unistd.h>
7 #include <pthread.h>
8
9 #include <stdlib.h>
10 #include <stdio.h>
11 #include <string.h>
12
13 #include "localerr.h"
14 #include "ustcomm.h"
15
16 struct list_head buffers = LIST_HEAD_INIT(buffers);
17
18 struct buffer_info {
19 char *name;
20 pid_t pid;
21
22 int shmid;
23 void *mem;
24 int memlen;
25
26 int n_subbufs;
27 int subbuf_size;
28
29 int file_fd; /* output file */
30
31 struct list_head list;
32
33 long consumed_old;
34 };
35
36 /* return value: 0 = subbuffer is finished, it won't produce data anymore
37 * 1 = got subbuffer successfully
38 * <0 = error
39 */
40
41 int get_subbuffer(struct buffer_info *buf)
42 {
43 char *send_msg;
44 char *received_msg;
45 char *rep_code;
46 int retval;
47 int result;
48
49 asprintf(&send_msg, "get_subbuffer %s", buf->name);
50 result = send_message(buf->pid, send_msg, &received_msg);
51 if(result < 0) {
52 ERR("get_subbuffer: send_message failed");
53 return -1;
54 }
55 free(send_msg);
56
57 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
58 if(result != 2 && result != 1) {
59 ERR("unable to parse response to get_subbuffer");
60 return -1;
61 }
62
63 DBG("received msg is %s", received_msg);
64
65 if(!strcmp(rep_code, "OK")) {
66 DBG("got subbuffer %s", buf->name);
67 retval = 1;
68 }
69 else if(nth_token_is(received_msg, "END", 0) == 1) {
70 return 0;
71 }
72 else {
73 DBG("error getting subbuffer %s", buf->name);
74 retval = -1;
75 }
76
77 /* FIMXE: free correctly the stuff */
78 free(received_msg);
79 free(rep_code);
80 return retval;
81 }
82
83 int put_subbuffer(struct buffer_info *buf)
84 {
85 char *send_msg;
86 char *received_msg;
87 char *rep_code;
88 int retval;
89 int result;
90
91 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
92 result = send_message(buf->pid, send_msg, &received_msg);
93 if(result < 0) {
94 ERR("put_subbuffer: send_message failed");
95 return -1;
96 }
97 free(send_msg);
98
99 result = sscanf(received_msg, "%as", &rep_code);
100 if(result != 1) {
101 ERR("unable to parse response to put_subbuffer");
102 return -1;
103 }
104 free(received_msg);
105
106 if(!strcmp(rep_code, "OK")) {
107 DBG("subbuffer put %s", buf->name);
108 retval = 1;
109 }
110 else {
111 ERR("invalid response to put_subbuffer");
112 }
113
114 free(rep_code);
115 return retval;
116 }
117
118 ssize_t patient_write(int fd, const void *buf, size_t count)
119 {
120 const char *bufc = (const char *) buf;
121 int result;
122
123 for(;;) {
124 result = write(fd, bufc, count);
125 if(result <= 0) {
126 return result;
127 }
128 count -= result;
129 bufc += result;
130
131 if(count == 0) {
132 break;
133 }
134 }
135
136 return bufc-(const char *)buf;
137 }
138
139 void *consumer_thread(void *arg)
140 {
141 struct buffer_info *buf = (struct buffer_info *) arg;
142 int result;
143
144 for(;;) {
145 result = get_subbuffer(buf);
146 if(result == -1) {
147 ERR("error getting subbuffer");
148 continue;
149 }
150 if(result == 0) {
151 /* this is done */
152 break;
153 }
154
155 /* write data to file */
156 result = patient_write(buf->file_fd, buf->mem + (buf->consumed_old & (buf->n_subbufs * buf->subbuf_size-1)), buf->subbuf_size);
157 if(result == -1) {
158 PERROR("write");
159 /* FIXME: maybe drop this trace */
160 }
161
162 result = put_subbuffer(buf);
163 if(result == -1) {
164 ERR("error putting subbuffer");
165 break;
166 }
167 }
168
169 DBG("thread for buffer %s is stopping", buf->name);
170
171 return NULL;
172 }
173
174 int add_buffer(pid_t pid, char *bufname)
175 {
176 struct buffer_info *buf;
177 char *send_msg;
178 char *received_msg;
179 int result;
180 char *tmp;
181 int fd;
182 pthread_t thr;
183
184 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
185 if(buf == NULL) {
186 ERR("add_buffer: insufficient memory");
187 return -1;
188 }
189
190 buf->name = bufname;
191 buf->pid = pid;
192
193 /* get shmid */
194 asprintf(&send_msg, "get_shmid %s", buf->name);
195 send_message(pid, send_msg, &received_msg);
196 free(send_msg);
197 DBG("got buffer name %s", buf->name);
198
199 result = sscanf(received_msg, "%d", &buf->shmid);
200 if(result != 1) {
201 ERR("unable to parse response to get_shmid");
202 return -1;
203 }
204 free(received_msg);
205 DBG("got shmid %d", buf->shmid);
206
207 /* get n_subbufs */
208 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
209 send_message(pid, send_msg, &received_msg);
210 free(send_msg);
211
212 result = sscanf(received_msg, "%d", &buf->n_subbufs);
213 if(result != 1) {
214 ERR("unable to parse response to get_n_subbufs");
215 return -1;
216 }
217 free(received_msg);
218 DBG("got n_subbufs %d", buf->n_subbufs);
219
220 /* get subbuf size */
221 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
222 send_message(pid, send_msg, &received_msg);
223 free(send_msg);
224
225 result = sscanf(received_msg, "%d", &buf->subbuf_size);
226 if(result != 1) {
227 ERR("unable to parse response to get_subbuf_size");
228 return -1;
229 }
230 free(received_msg);
231 DBG("got subbuf_size %d", buf->subbuf_size);
232
233 /* attach memory */
234 buf->mem = shmat(buf->shmid, NULL, 0);
235 if(buf->mem == (void *) 0) {
236 perror("shmat");
237 return -1;
238 }
239 DBG("successfully attached memory");
240
241 /* open file for output */
242 asprintf(&tmp, "/tmp/trace/%s_0", buf->name);
243 result = fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC, 00600);
244 if(result == -1) {
245 PERROR("open");
246 return -1;
247 }
248 buf->file_fd = fd;
249 free(tmp);
250
251 //list_add(&buf->list, &buffers);
252
253 pthread_create(&thr, NULL, consumer_thread, buf);
254
255 return 0;
256 }
257
258 int main(int argc, char **argv)
259 {
260 struct ustcomm_ustd ustd;
261 int result;
262
263 result = ustcomm_init_ustd(&ustd);
264 if(result == -1) {
265 ERR("failed to initialize socket");
266 return 1;
267 }
268
269 /* app loop */
270 for(;;) {
271 char *recvbuf;
272
273 /* check for requests on our public socket */
274 result = ustcomm_ustd_recv_message(&ustd, &recvbuf, NULL, 100);
275 if(result == -1) {
276 ERR("error in ustcomm_ustd_recv_message");
277 continue;
278 }
279 if(result > 0) {
280 if(!strncmp(recvbuf, "collect", 7)) {
281 pid_t pid;
282 char *bufname;
283 int result;
284
285 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
286 if(result != 2) {
287 fprintf(stderr, "parsing error: %s\n", recvbuf);
288 }
289
290 result = add_buffer(pid, bufname);
291 if(result < 0) {
292 ERR("error in add_buffer");
293 continue;
294 }
295 }
296
297 free(recvbuf);
298 }
299 }
300
301 return 0;
302 }
This page took 0.035939 seconds and 4 git commands to generate.