X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=lttv%2Flttv%2Fsync%2Fevent_processing_lttv_standard.c;fp=lttv%2Flttv%2Fsync%2Fevent_processing_lttv_standard.c;h=cef9f2b970f2a5f8712bb6b7355045ae931ff76a;hb=70407e861d8430dbe06cc52e6fe4ed5c9cd0872a;hp=0000000000000000000000000000000000000000;hpb=d26f04742e44ea6a45a5034a521c5948bc9361a4;p=lttv.git diff --git a/lttv/lttv/sync/event_processing_lttv_standard.c b/lttv/lttv/sync/event_processing_lttv_standard.c new file mode 100644 index 00000000..cef9f2b9 --- /dev/null +++ b/lttv/lttv/sync/event_processing_lttv_standard.c @@ -0,0 +1,582 @@ +/* This file is part of the Linux Trace Toolkit viewer + * Copyright (C) 2009 Benjamin Poirier + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License Version 2 as + * published by the Free Software Foundation; + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, + * MA 02111-1307, USA. + */ + +#define _ISOC99_SOURCE + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include +#include +#include + +#include "sync_chain.h" +#include "event_processing_lttv_common.h" + +#include "event_processing_lttv_standard.h" + + +#ifndef g_info +#define g_info(format...) g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, format) +#endif + + +// Functions common to all matching modules +static void initProcessingLTTVStandard(SyncState* const syncState, + LttvTracesetContext* const traceSetContext); +static void destroyProcessingLTTVStandard(SyncState* const syncState); + +static void finalizeProcessingLTTVStandard(SyncState* const syncState); +static void printProcessingStatsLTTVStandard(SyncState* const syncState); + +// Functions specific to this module +static void registerProcessingLTTVStandard() __attribute__((constructor (102))); +static gboolean processEventLTTVStandard(void* hookData, void* callData); +static void partialDestroyProcessingLTTVStandard(SyncState* const syncState); + + +static ProcessingModule processingModuleLTTVStandard = { + .name= "LTTV-standard", + .initProcessing= &initProcessingLTTVStandard, + .destroyProcessing= &destroyProcessingLTTVStandard, + .finalizeProcessing= &finalizeProcessingLTTVStandard, + .printProcessingStats= &printProcessingStatsLTTVStandard, +}; + + + +/* + * Processing Module registering function + */ +static void registerProcessingLTTVStandard() +{ + g_queue_push_tail(&processingModules, &processingModuleLTTVStandard); + + createQuarks(); +} + + +/* + * Allocate and initialize data structures for synchronizing a traceset. + * Register event hooks. + * + * Args: + * syncState: container for synchronization data. + * This function allocates these processingData members: + * traceNumTable + * pendingRecv + * hookListList + * stats + * traceSetContext: set of LTTV traces + */ +static void initProcessingLTTVStandard(SyncState* const syncState, LttvTracesetContext* + const traceSetContext) +{ + unsigned int i; + ProcessingDataLTTVStandard* processingData; + + processingData= malloc(sizeof(ProcessingDataLTTVStandard)); + syncState->processingData= processingData; + processingData->traceSetContext= traceSetContext; + + if (syncState->stats) + { + processingData->stats= calloc(1, sizeof(ProcessingStatsLTTVStandard)); + } + else + { + processingData->stats= NULL; + } + + processingData->traceNumTable= g_hash_table_new(&g_direct_hash, NULL); + processingData->hookListList= g_array_sized_new(FALSE, FALSE, + sizeof(GArray*), syncState->traceNb); + processingData->pendingRecv= malloc(sizeof(GHashTable*) * + syncState->traceNb); + + for(i= 0; i < syncState->traceNb; i++) + { + g_hash_table_insert(processingData->traceNumTable, + processingData->traceSetContext->traces[i]->t, (gpointer) i); + } + + for(i= 0; i < syncState->traceNb; i++) + { + processingData->pendingRecv[i]= g_hash_table_new_full(&g_direct_hash, + NULL, NULL, &gdnDestroyNetEvent); + } + + registerHooks(processingData->hookListList, traceSetContext, + syncState->traceNb, &processEventLTTVStandard, syncState); +} + + +/* + * Call the partial processing destroyer, obtain and adjust the factors from + * downstream + * + * Args: + * syncState container for synchronization data. + */ +static void finalizeProcessingLTTVStandard(SyncState* const syncState) +{ + unsigned int i; + GArray* factors; + double minOffset, minDrift; + unsigned int refFreqTrace; + ProcessingDataLTTVStandard* processingData; + + processingData= (ProcessingDataLTTVStandard*) syncState->processingData; + + partialDestroyProcessingLTTVStandard(syncState); + + factors= syncState->matchingModule->finalizeMatching(syncState); + + /* The offsets are adjusted so the lowest one is 0. This is done because + * of a Lttv specific limitation: events cannot have negative times. By + * having non-negative offsets, events cannot be moved backwards to + * negative times. + */ + minOffset= 0; + for (i= 0; i < syncState->traceNb; i++) + { + minOffset= MIN(g_array_index(factors, Factors, i).offset, minOffset); + } + + for (i= 0; i < syncState->traceNb; i++) + { + g_array_index(factors, Factors, i).offset-= minOffset; + } + + /* Because the timestamps are corrected at the TSC level (not at the + * LttTime level) all trace frequencies must be made equal. We choose to + * use the frequency of the system with the lowest drift + */ + minDrift= INFINITY; + refFreqTrace= 0; + for (i= 0; i < syncState->traceNb; i++) + { + if (g_array_index(factors, Factors, i).drift < minDrift) + { + minDrift= g_array_index(factors, Factors, i).drift; + refFreqTrace= i; + } + } + g_assert(syncState->traceNb == 0 || minDrift != INFINITY); + + // Write the factors to the LttTrace structures + for (i= 0; i < syncState->traceNb; i++) + { + LttTrace* t; + Factors* traceFactors; + + t= processingData->traceSetContext->traces[i]->t; + traceFactors= &g_array_index(factors, Factors, i); + + t->drift= traceFactors->drift; + t->offset= traceFactors->offset; + t->start_freq= + processingData->traceSetContext->traces[refFreqTrace]->t->start_freq; + t->freq_scale= + processingData->traceSetContext->traces[refFreqTrace]->t->freq_scale; + t->start_time_from_tsc = + ltt_time_from_uint64(tsc_to_uint64(t->freq_scale, t->start_freq, + t->drift * t->start_tsc + t->offset)); + } + + g_array_free(factors, TRUE); + + lttv_traceset_context_compute_time_span(processingData->traceSetContext, + &processingData->traceSetContext->time_span); + + g_debug("traceset start %ld.%09ld end %ld.%09ld\n", + processingData->traceSetContext->time_span.start_time.tv_sec, + processingData->traceSetContext->time_span.start_time.tv_nsec, + processingData->traceSetContext->time_span.end_time.tv_sec, + processingData->traceSetContext->time_span.end_time.tv_nsec); + + return; +} + + +/* + * Print statistics related to processing and downstream modules. Must be + * called after finalizeProcessing. + * + * Args: + * syncState container for synchronization data. + */ +static void printProcessingStatsLTTVStandard(SyncState* const syncState) +{ + unsigned int i; + ProcessingDataLTTVStandard* processingData; + + if (!syncState->stats) + { + return; + } + + processingData= (ProcessingDataLTTVStandard*) syncState->processingData; + + printf("LTTV processing stats:\n"); + printf("\treceived frames: %d\n", processingData->stats->totRecv); + printf("\treceived frames that are IP: %d\n", + processingData->stats->totRecvIp); + printf("\treceived and processed packets that are TCP: %d\n", + processingData->stats->totInE); + printf("\tsent packets that are TCP: %d\n", + processingData->stats->totOutE); + + if (syncState->matchingModule->printMatchingStats != NULL) + { + syncState->matchingModule->printMatchingStats(syncState); + } + + printf("Resulting synchronization factors:\n"); + for (i= 0; i < syncState->traceNb; i++) + { + LttTrace* t; + + t= processingData->traceSetContext->traces[i]->t; + + printf("\ttrace %u drift= %g offset= %g (%f) start time= %ld.%09ld\n", + i, t->drift, t->offset, (double) tsc_to_uint64(t->freq_scale, + t->start_freq, t->offset) / NANOSECONDS_PER_SECOND, + t->start_time_from_tsc.tv_sec, t->start_time_from_tsc.tv_nsec); + } +} + + +/* + * Unregister event hooks. Deallocate processingData. + * + * Args: + * syncState: container for synchronization data. + * This function deallocates these processingData members: + * stats + */ +static void destroyProcessingLTTVStandard(SyncState* const syncState) +{ + ProcessingDataLTTVStandard* processingData; + + processingData= (ProcessingDataLTTVStandard*) syncState->processingData; + + if (processingData == NULL) + { + return; + } + + partialDestroyProcessingLTTVStandard(syncState); + + if (syncState->stats) + { + free(processingData->stats); + } + + free(syncState->processingData); + syncState->processingData= NULL; +} + + +/* + * Unregister event hooks. Deallocate some of processingData. + * + * This function can be called right after the events have been processed to + * free some data structures that are not needed for finalization. + * + * Args: + * syncState: container for synchronization data. + * This function deallocates these members: + * traceNumTable + * hookListList + * pendingRecv + */ +static void partialDestroyProcessingLTTVStandard(SyncState* const syncState) +{ + unsigned int i; + ProcessingDataLTTVStandard* processingData; + + processingData= (ProcessingDataLTTVStandard*) syncState->processingData; + + if (processingData == NULL || processingData->traceNumTable == NULL) + { + return; + } + + g_hash_table_destroy(processingData->traceNumTable); + processingData->traceNumTable= NULL; + + for(i= 0; i < syncState->traceNb; i++) + { + + g_debug("Cleaning up pendingRecv list\n"); + g_hash_table_destroy(processingData->pendingRecv[i]); + } + free(processingData->pendingRecv); + + unregisterHooks(processingData->hookListList, + processingData->traceSetContext, syncState->traceNb); +} + + +/* + * Lttv hook function that will be called for network events + * + * Args: + * hookData: LttvTraceHook* for the type of event that generated the call + * callData: LttvTracefileContext* at the moment of the event + * + * Returns: + * FALSE Always returns FALSE, meaning to keep processing hooks for + * this event + */ +static gboolean processEventLTTVStandard(void* hookData, void* callData) +{ + LttvTraceHook* traceHook; + LttvTracefileContext* tfc; + LttEvent* event; + LttTime time; + LttCycleCount tsc; + LttTrace* trace; + unsigned long traceNum; + struct marker_info* info; + SyncState* syncState; + ProcessingDataLTTVStandard* processingData; + + traceHook= (LttvTraceHook*) hookData; + tfc= (LttvTracefileContext*) callData; + syncState= (SyncState*) traceHook->hook_data; + processingData= (ProcessingDataLTTVStandard*) syncState->processingData; + event= ltt_tracefile_get_event(tfc->tf); + time= ltt_event_time(event); + tsc= ltt_event_cycle_count(event); + trace= tfc->t_context->t; + info= marker_get_info_from_id(tfc->tf->mdata, event->event_id); + + g_assert(g_hash_table_lookup_extended(processingData->traceNumTable, + trace, NULL, (gpointer*) &traceNum)); + + g_debug("XXXX process event: time: %ld.%09ld trace: %ld (%p) name: %s ", + (long) time.tv_sec, time.tv_nsec, traceNum, trace, + g_quark_to_string(info->name)); + + if (info->name == LTT_EVENT_DEV_XMIT) + { + NetEvent* outE; + + if (!ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 1)) == ETH_P_IP || + !ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 2)) == IPPROTO_TCP) + { + return FALSE; + } + + if (syncState->stats) + { + processingData->stats->totOutE++; + } + + outE= malloc(sizeof(NetEvent)); + outE->packetKey= malloc(sizeof(PacketKey)); + + outE->traceNum= traceNum; + outE->tsc= tsc; + outE->skb= NULL; + outE->packetKey->connectionKey.saddr= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 3)); + outE->packetKey->connectionKey.daddr= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 4)); + outE->packetKey->tot_len= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 5)); + outE->packetKey->ihl= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 6)); + outE->packetKey->connectionKey.source= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 7)); + outE->packetKey->connectionKey.dest= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 8)); + outE->packetKey->seq= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 9)); + outE->packetKey->ack_seq= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 10)); + outE->packetKey->doff= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 11)); + outE->packetKey->ack= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 12)); + outE->packetKey->rst= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 13)); + outE->packetKey->syn= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 14)); + outE->packetKey->fin= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 15)); + + syncState->matchingModule->matchEvent(syncState, outE, OUT); + + g_debug("Output event done\n"); + } + else if (info->name == LTT_EVENT_DEV_RECEIVE) + { + guint32 protocol; + + if (syncState->stats) + { + processingData->stats->totRecv++; + } + + protocol= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 1)); + + if (protocol == ETH_P_IP) + { + NetEvent* inE; + + if (syncState->stats) + { + processingData->stats->totRecvIp++; + } + + inE= malloc(sizeof(NetEvent)); + + inE->traceNum= traceNum; + inE->tsc= tsc; + inE->skb= (void*) (long) ltt_event_get_long_unsigned(event, + lttv_trace_get_hook_field(traceHook, 0)); + inE->packetKey= NULL; + + g_hash_table_insert(processingData->pendingRecv[traceNum], + inE->skb, inE); + + g_debug("Adding inE %p for skb %p to pendingRecv\n", inE, inE->skb); + } + else + { + g_debug("\n"); + } + } + else if (info->name == LTT_EVENT_TCPV4_RCV) + { + NetEvent* inE; + void* skb; + + // Search pendingRecv for an event with the same skb + skb= (void*) (long) ltt_event_get_long_unsigned(event, + lttv_trace_get_hook_field(traceHook, 0)); + + inE= (NetEvent*) + g_hash_table_lookup(processingData->pendingRecv[traceNum], skb); + if (inE == NULL) + { + // This should only happen in case of lost events + g_debug("No matching pending receive event found\n"); + } + else + { + if (syncState->stats) + { + processingData->stats->totInE++; + } + + // If it's there, remove it and proceed with a receive event + g_hash_table_steal(processingData->pendingRecv[traceNum], skb); + + inE->packetKey= malloc(sizeof(PacketKey)); + + inE->packetKey->connectionKey.saddr= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 1)); + inE->packetKey->connectionKey.daddr= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 2)); + inE->packetKey->tot_len= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 3)); + inE->packetKey->ihl= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 4)); + inE->packetKey->connectionKey.source= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 5)); + inE->packetKey->connectionKey.dest= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 6)); + inE->packetKey->seq= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 7)); + inE->packetKey->ack_seq= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 8)); + inE->packetKey->doff= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 9)); + inE->packetKey->ack= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 10)); + inE->packetKey->rst= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 11)); + inE->packetKey->syn= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 12)); + inE->packetKey->fin= ltt_event_get_unsigned(event, + lttv_trace_get_hook_field(traceHook, 13)); + + syncState->matchingModule->matchEvent(syncState, inE, IN); + + g_debug("Input event %p for skb %p done\n", inE, skb); + } + } + else if (info->name == LTT_EVENT_PKFREE_SKB) + { + gboolean result; + void* skb; + + // Search pendingRecv for an event with the same skb + skb= (void*) (long) ltt_event_get_long_unsigned(event, + lttv_trace_get_hook_field(traceHook, 0)); + + result= g_hash_table_remove(processingData->pendingRecv[traceNum], + skb); + if (result == FALSE) + { + g_debug("No matching pending receive event found, \"shaddow" + "skb\" %p\n", skb); + } + else + { + g_debug("Non-TCP skb %p\n", skb); + } + } + else if (info->name == LTT_EVENT_NETWORK_IPV4_INTERFACE) + { + char* name; + guint64 address; + gint64 up; + char addressString[17]; + + address= ltt_event_get_long_unsigned(event, + lttv_trace_get_hook_field(traceHook, 1)); + up= ltt_event_get_long_int(event, lttv_trace_get_hook_field(traceHook, + 2)); + /* name must be the last field to get or else copy the string, see the + * doc for ltt_event_get_string() + */ + name= ltt_event_get_string(event, lttv_trace_get_hook_field(traceHook, + 0)); + + convertIP(addressString, address); + + g_debug("name \"%s\" address %s up %lld\n", name, addressString, up); + } + else + { + g_assert_not_reached(); + } + + return FALSE; +}