&new_stream->relayd_stream_id);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
+ consumer_del_stream(new_stream, NULL);
goto end_nosignal;
}
} else if (msg.u.stream.net_index != -1) {
ERR("Network sequence index %d unknown. Not adding stream.",
msg.u.stream.net_index);
- free(new_stream);
+ consumer_del_stream(new_stream, NULL);
goto end_nosignal;
}
- /* Send stream to the metadata thread */
- if (new_stream->metadata_flag) {
- if (ctx->on_recv_stream) {
- ret = ctx->on_recv_stream(new_stream);
- if (ret < 0) {
- goto end_nosignal;
- }
+ if (ctx->on_recv_stream) {
+ ret = ctx->on_recv_stream(new_stream);
+ if (ret < 0) {
+ consumer_del_stream(new_stream, NULL);
+ goto end_nosignal;
}
+ }
+ /* Send stream to the metadata thread */
+ if (new_stream->metadata_flag) {
do {
- ret = write(ctx->consumer_metadata_pipe[1], new_stream,
- sizeof(struct lttng_consumer_stream));
+ ret = write(ctx->consumer_metadata_pipe[1], &new_stream,
+ sizeof(new_stream));
} while (ret < 0 && errno == EINTR);
if (ret < 0) {
PERROR("write metadata pipe");
+ consumer_del_stream(new_stream, NULL);
}
} else {
- if (ctx->on_recv_stream) {
- ret = ctx->on_recv_stream(new_stream);
- if (ret < 0) {
- goto end_nosignal;
- }
+ ret = consumer_add_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add stream %d failed. Continuing",
+ new_stream->key);
+ consumer_del_stream(new_stream, NULL);
+ goto end_nosignal;
}
- consumer_add_stream(new_stream);
}
DBG("Kernel consumer_add_stream (%d)", fd);