Add a module to distribute messages to many analysis modules
[lttv.git] / lttv / lttv / sync / event_processing_lttng_standard.c
index 218e7abf33751c99d772314ed73eb0d0998c1273..b79ca4af27d3cab4545eb79b8dc037d0280ef8c8 100644 (file)
@@ -27,8 +27,9 @@
 #include <netinet/in.h>
 #include <stdint.h>
 #include <stdlib.h>
+#include <string.h>
 
-#include "sync_chain_lttv.h"
+#include "sync_chain.h"
 #include "event_processing_lttng_common.h"
 
 #include "event_processing_lttng_standard.h"
@@ -131,7 +132,8 @@ static void initProcessingLTTVStandard(SyncState* const syncState, LttvTracesetC
        }
 
        registerHooks(processingData->hookListList, traceSetContext,
-               &processEventLTTVStandard, syncState);
+               &processEventLTTVStandard, syncState,
+               syncState->matchingModule->canMatch);
 }
 
 
@@ -244,10 +246,21 @@ static void printProcessingStatsLTTVStandard(SyncState* const syncState)
        printf("\treceived frames: %d\n", processingData->stats->totRecv);
        printf("\treceived frames that are IP: %d\n",
                processingData->stats->totRecvIp);
-       printf("\treceived and processed packets that are TCP: %d\n",
-               processingData->stats->totInE);
-       printf("\tsent packets that are TCP: %d\n",
-               processingData->stats->totOutE);
+       if (syncState->matchingModule->canMatch[TCP])
+       {
+               printf("\treceived and processed packets that are TCP: %d\n",
+                       processingData->stats->totRecvTCP);
+       }
+       if (syncState->matchingModule->canMatch[UDP])
+       {
+               printf("\treceived and processed packets that are UDP: %d\n",
+                       processingData->stats->totRecvUDP);
+       }
+       if (syncState->matchingModule->canMatch[TCP])
+       {
+               printf("\tsent packets that are TCP: %d\n",
+                       processingData->stats->totOutE);
+       }
 
        if (syncState->matchingModule->printMatchingStats != NULL)
        {
@@ -357,8 +370,9 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData)
        LttvTraceHook* traceHook;
        LttvTracefileContext* tfc;
        LttEvent* event;
-       LttTime time;
        LttCycleCount tsc;
+       LttTime time;
+       WallTime wTime;
        LttTrace* trace;
        unsigned long traceNum;
        struct marker_info* info;
@@ -367,19 +381,21 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData)
 
        traceHook= (LttvTraceHook*) hookData;
        tfc= (LttvTracefileContext*) callData;
+       trace= tfc->t_context->t;
        syncState= (SyncState*) traceHook->hook_data;
        processingData= (ProcessingDataLTTVStandard*) syncState->processingData;
        event= ltt_tracefile_get_event(tfc->tf);
-       time= ltt_event_time(event);
-       tsc= ltt_event_cycle_count(event);
-       trace= tfc->t_context->t;
        info= marker_get_info_from_id(tfc->tf->mdata, event->event_id);
+       tsc= ltt_event_cycle_count(event);
+       time= ltt_event_time(event);
+       wTime.seconds= time.tv_sec;
+       wTime.nanosec= time.tv_nsec;
 
        g_assert(g_hash_table_lookup_extended(processingData->traceNumTable,
                        trace, NULL, (gpointer*) &traceNum));
 
        g_debug("XXXX process event: time: %ld.%09ld trace: %ld (%p) name: %s ",
-               (long) time.tv_sec, time.tv_nsec, traceNum, trace,
+               time.tv_sec, time.tv_nsec, traceNum, trace,
                g_quark_to_string(info->name));
 
        if (info->name == LTT_EVENT_DEV_XMIT_EXTENDED)
@@ -394,6 +410,11 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData)
                        return FALSE;
                }
 
+               if (!syncState->matchingModule->canMatch[TCP])
+               {
+                       return FALSE;
+               }
+
                if (syncState->stats)
                {
                        processingData->stats->totOutE++;
@@ -401,18 +422,20 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData)
 
                outE= malloc(sizeof(Event));
                outE->traceNum= traceNum;
-               outE->time= tsc;
+               outE->cpuTime= tsc;
+               outE->wallTime= wTime;
                outE->type= TCP;
+               outE->copy= &copyTCPEvent;
                outE->destroy= &destroyTCPEvent;
                outE->event.tcpEvent= malloc(sizeof(TCPEvent));
                outE->event.tcpEvent->direction= OUT;
                outE->event.tcpEvent->segmentKey= malloc(sizeof(SegmentKey));
                outE->event.tcpEvent->segmentKey->connectionKey.saddr=
-                       ltt_event_get_unsigned(event, lttv_trace_get_hook_field(traceHook,
-                                       3));
+                       htonl(ltt_event_get_unsigned(event,
+                                       lttv_trace_get_hook_field(traceHook, 3)));
                outE->event.tcpEvent->segmentKey->connectionKey.daddr=
-                       ltt_event_get_unsigned(event, lttv_trace_get_hook_field(traceHook,
-                                       4));
+                       htonl(ltt_event_get_unsigned(event,
+                                       lttv_trace_get_hook_field(traceHook, 4)));
                outE->event.tcpEvent->segmentKey->tot_len=
                        ltt_event_get_unsigned(event, lttv_trace_get_hook_field(traceHook,
                                        5));
@@ -468,8 +491,10 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData)
 
                        inE= malloc(sizeof(Event));
                        inE->traceNum= traceNum;
-                       inE->time= tsc;
+                       inE->cpuTime= tsc;
+                       inE->wallTime= wTime;
                        inE->event.tcpEvent= NULL;
+                       inE->copy= &copyEvent;
                        inE->destroy= &destroyEvent;
 
                        skb= (void*) (long) ltt_event_get_long_unsigned(event,
@@ -498,13 +523,13 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData)
                if (inE == NULL)
                {
                        // This should only happen in case of lost events
-                       g_debug("No matching pending receive event found\n");
+                       g_warning("No matching pending receive event found");
                }
                else
                {
                        if (syncState->stats)
                        {
-                               processingData->stats->totInE++;
+                               processingData->stats->totRecvTCP++;
                        }
 
                        // If it's there, remove it and proceed with a receive event
@@ -512,15 +537,16 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData)
 
                        inE->type= TCP;
                        inE->event.tcpEvent= malloc(sizeof(TCPEvent));
+                       inE->copy= &copyTCPEvent;
                        inE->destroy= &destroyTCPEvent;
                        inE->event.tcpEvent->direction= IN;
                        inE->event.tcpEvent->segmentKey= malloc(sizeof(SegmentKey));
                        inE->event.tcpEvent->segmentKey->connectionKey.saddr=
-                               ltt_event_get_unsigned(event,
-                                       lttv_trace_get_hook_field(traceHook, 1));
+                               htonl(ltt_event_get_unsigned(event,
+                                               lttv_trace_get_hook_field(traceHook, 1)));
                        inE->event.tcpEvent->segmentKey->connectionKey.daddr=
-                               ltt_event_get_unsigned(event,
-                                       lttv_trace_get_hook_field(traceHook, 2));
+                               htonl(ltt_event_get_unsigned(event,
+                                               lttv_trace_get_hook_field(traceHook, 2)));
                        inE->event.tcpEvent->segmentKey->tot_len=
                                ltt_event_get_unsigned(event,
                                        lttv_trace_get_hook_field(traceHook, 3));
@@ -557,29 +583,82 @@ static gboolean processEventLTTVStandard(void* hookData, void* callData)
 
                        syncState->matchingModule->matchEvent(syncState, inE);
 
-                       g_debug("Input event %p for skb %p done\n", inE, skb);
+                       g_debug("TCP input event %p for skb %p done\n", inE, skb);
                }
        }
-       else if (info->name == LTT_EVENT_NETWORK_IPV4_INTERFACE)
+       else if (info->name == LTT_EVENT_UDPV4_RCV_EXTENDED)
        {
-               char* name;
-               guint64 address;
-               gint64 up;
-               char addressString[17];
+               Event* inE;
+               void* skb;
 
-               address= ltt_event_get_long_unsigned(event,
-                       lttv_trace_get_hook_field(traceHook, 1));
-               up= ltt_event_get_long_int(event, lttv_trace_get_hook_field(traceHook,
-                               2));
-               /* name must be the last field to get or else copy the string, see the
-                * doc for ltt_event_get_string()
-                */
-               name= ltt_event_get_string(event, lttv_trace_get_hook_field(traceHook,
-                               0));
+               // Search pendingRecv for an event with the same skb
+               skb= (void*) (long) ltt_event_get_long_unsigned(event,
+                       lttv_trace_get_hook_field(traceHook, 0));
 
-               convertIP(addressString, address);
+               inE= (Event*)
+                       g_hash_table_lookup(processingData->pendingRecv[traceNum], skb);
+               if (inE == NULL)
+               {
+                       // This should only happen in case of lost events
+                       g_warning("No matching pending receive event found");
+               }
+               else
+               {
+                       guint64 dataStart;
+
+                       if (syncState->stats)
+                       {
+                               processingData->stats->totRecvUDP++;
+                       }
+
+                       // If it's there, remove it and proceed with a receive event
+                       g_hash_table_steal(processingData->pendingRecv[traceNum], skb);
 
-               g_debug("name \"%s\" address %s up %lld\n", name, addressString, up);
+                       inE->type= UDP;
+                       inE->event.udpEvent= malloc(sizeof(UDPEvent));
+                       inE->copy= &copyUDPEvent;
+                       inE->destroy= &destroyUDPEvent;
+                       inE->event.udpEvent->direction= IN;
+                       inE->event.udpEvent->datagramKey= malloc(sizeof(DatagramKey));
+                       inE->event.udpEvent->datagramKey->saddr=
+                               htonl(ltt_event_get_unsigned(event,
+                                       lttv_trace_get_hook_field(traceHook, 1)));
+                       inE->event.udpEvent->datagramKey->daddr=
+                               htonl(ltt_event_get_unsigned(event,
+                                       lttv_trace_get_hook_field(traceHook, 2)));
+                       inE->event.udpEvent->unicast= ltt_event_get_unsigned(event,
+                               lttv_trace_get_hook_field(traceHook, 3)) == 0 ? false : true;
+                       inE->event.udpEvent->datagramKey->ulen=
+                               ltt_event_get_unsigned(event,
+                                       lttv_trace_get_hook_field(traceHook, 4));
+                       inE->event.udpEvent->datagramKey->source=
+                               ltt_event_get_unsigned(event,
+                                       lttv_trace_get_hook_field(traceHook, 5));
+                       inE->event.udpEvent->datagramKey->dest=
+                               ltt_event_get_unsigned(event,
+                                       lttv_trace_get_hook_field(traceHook, 6));
+                       dataStart= ltt_event_get_long_unsigned(event,
+                               lttv_trace_get_hook_field(traceHook, 7));
+                       g_assert_cmpuint(sizeof(inE->event.udpEvent->datagramKey->dataKey),
+                               ==, sizeof(guint64));
+                       if (inE->event.udpEvent->datagramKey->ulen - 8 >=
+                               sizeof(inE->event.udpEvent->datagramKey->dataKey))
+                       {
+                               memcpy(inE->event.udpEvent->datagramKey->dataKey, &dataStart,
+                                       sizeof(inE->event.udpEvent->datagramKey->dataKey));
+                       }
+                       else
+                       {
+                               memset(inE->event.udpEvent->datagramKey->dataKey, 0,
+                                       sizeof(inE->event.udpEvent->datagramKey->dataKey));
+                               memcpy(inE->event.udpEvent->datagramKey->dataKey, &dataStart,
+                                               inE->event.udpEvent->datagramKey->ulen - 8);
+                       }
+
+                       syncState->matchingModule->matchEvent(syncState, inE);
+
+                       g_debug("UDP input event %p for skb %p done\n", inE, skb);
+               }
        }
        else
        {
This page took 0.026359 seconds and 4 git commands to generate.