/* * CORR An implementation of the FlowMate, a protocol for partitioning * flows emerging from a busy sever and sharing bottlenecks. * * Version: @(#)corr.c 1.0 08/20/03 * * Authors: Ossama Younis, * * This program is free software; you can redistribute it and/or * modify it. */ #include // for jiffies #include #include #include // for kmalloc #include #include // Constants used for debugging (0-disable, 1-enable) #define DEBUG_COEFF 0 #define DEBUG_SAMPLES 0 #define PRINT_FLOW_ID 1 #define PRINT_FLOW_DST 2 int print_option = PRINT_FLOW_DST; /*** assuming network byte order ****/ #define firstbyte(dst) (dst)&0xFF #define secondbyte(dst) (dst>>8)&0xFF #define thirdbyte(dst) (dst>>16)&0xFF #define fourthbyte(dst) (dst>>24)&0xFF /*** global variables initialization ******/ FlowCorrelator *fcorrelator; int CorrCreated = 0; // correlator created ? int corr_mode = RTT; // RTT or FORWARD_DELAY int ignore_history = 0; /******************************************/ int howToChoose = HIGHEST_CORR; unsigned long addr_mask=0xFFFF; /* show lower 1 or 2 bytes */ int mask_bits=16; void timer_expire(unsigned long data) { GroupList *gl; printk(KERN_INFO " Correlator: Timer expires (Cluster now) -------------\n"); gl = FC_buildGroups(fcorrelator); GL_listGroups(gl); // FT_listFlows(fcorrelator->flowTable); FC_updateFlowTable(fcorrelator); mod_timer(&(fcorrelator->ctimer), jiffies + fcorrelator->win_sz); } /************************************************************ * general purpose functions * to avoid linking kernel with math library /usr/lib/libm.so ************************************************************/ /* double genExpoRV(double lambda) */ /* { */ /* double temp; */ /* temp = (1.0*(rand()+1)) / (1.0*RAND_MAX+1); */ /* return -lambda*log_(temp); */ /* } */ int floor_(double x) /* returns the floor of a real number */ { int y = x; return y; } double fabs_(double x) /* returns the absolute value of a real number */ { if (x<0) x = -x; return x; } double sqrt_(double x) /* returns the square root of a real number */ { double root=0, upper, lower; int found; if (x<0) return 0; found = 0; lower = 0; upper = 100000; while (!found) { root = lower + (upper - lower)/2; if (fabs_(root*root - x) < 0.0001) found = 1; else { if (fabs_(root*root) > x) upper = root; else lower = root; } } return root; } double log_(double x) /* returns the logarithmic value of a real number */ { double sum, powerx; int i; if (x<0.000001) return 0; sum=0; powerx = (x-1)/(x+1); for (i=1; i<100000; i++) { sum += (powerx / (2*i-1)); powerx *= (((x-1)*(x-1))/((x+1)*(x+1))); } sum*=2; return sum; } double uniform(double *seed) /* returns a uniform(0, 1) value */ { double d2p31m = 2147483647.0, d2p31 = 2147483711.0; *seed = 16807*(*seed) - floor_(16807*(*seed)/d2p31m) * d2p31m; return( fabs_((*seed / d2p31)) ); } double expon(double *dseed, double xm) /* returns an exponential R.V. with mean xm */ { return( (-(xm) * log_((double)uniform(dseed))) ); } /*********************************************** * sampleset implementation ***********************************************/ void SS_init(sampleset *ss) { SS_Reset(ss); } void SS_Reset(sampleset *ss) { ss->sample_total = 0; ss->sample_square_total = 0; ss->num_samples = 0; } double SS_mean(sampleset ss) { if (ss.num_samples > 0) return ss.sample_total / (double) ss.num_samples; else return 0; } double SS_meansquare(sampleset ss) { if (ss.num_samples > 0) return ss.sample_square_total / (double) ss.num_samples; else return 0; } int SS_add(sampleset *ss, double sample) { ss->sample_total += sample; ss->sample_square_total += sample * sample; ss->num_samples++; return ss->num_samples; } double SS_variance(sampleset ss) { double x = 0; x = SS_meansquare(ss) - SS_mean(ss) * SS_mean(ss); if (x > 0) return sqrt_(x); else return 0; } double PairwiseCorrCoeff(sampleset joinset, sampleset s1, sampleset s2) { double var1 = SS_variance(s1); double var2 = SS_variance(s2); if ((int)var1 == 0 || (int)var2 ==0) return 1; // printk(KERN_INFO " jmean=%d, s1mean=%d, s2mean=%d, s1var=%d, s2var=%d\n", // (int)(SS_mean(joinset)*10),(int)(SS_mean(s1)*10),(int)(SS_mean(s2)*10), // (int)(SS_variance(s1)*10),(int)(SS_variance(s2)*10)); return (SS_mean(joinset) - SS_mean(s1) * SS_mean(s2)) / (var1*var2); } /*************************************************** * CorrelationCoeff implementation ***************************************************/ void CorrCoeff_init(CorrelationCoeff *cc, unsigned long f1, unsigned long f2) { cc->fid1=f1; cc->fid2=f2; cc->coeffVal = 0; cc->autoSeed = 1234567; CorrCoeff_Reset(cc); } void CorrCoeff_destroy(CorrelationCoeff *cc) {} void CorrCoeff_Reset(CorrelationCoeff *cc) /* reset sample sets */ { SS_Reset(&(cc->s1)); SS_Reset(&(cc->s2)); SS_Reset(&(cc->joinset)); } int CorrCoeff_add(CorrelationCoeff *cc, double sample1, double sample2) /* returns total # of samples so far after adding a sample */ { int num_samples; if (sample1 < 0 || sample2 < 0) { printk(KERN_INFO "Sample less than 0\n"); return cc->s1.num_samples; } num_samples = SS_add(&(cc->s1), sample1); SS_add(&(cc->s2), sample2); SS_add(&(cc->joinset), sample1 * sample2); return num_samples; } int NumSamples(CorrelationCoeff *cc) { return cc->s1.num_samples; } int CorrCoeff_CCReady(CorrelationCoeff *cc) /* return true if CC will return a real number */ { return ((SS_variance(cc->s1) != 0) && (SS_variance(cc->s2) != 0) && (cc->s1.num_samples > 1)); } double CorrCoeff_CC(CorrelationCoeff *cc) /* returns the correlation coefficient */ { if (CorrCoeff_CCReady(cc) == 0) cc->coeffVal = 0; else cc->coeffVal = PairwiseCorrCoeff(cc->joinset, cc->s1, cc->s2); return cc->coeffVal; } /********************************************************** * CorrSet implementation *********************************************************/ void CorrSet_init(CorrSet *cs) { cs->crossCoeff = cs->autoCoeff = NULL; cs->sumDiff = cs->nSamples = 0; cs->next = NULL; } void CorrSet_destroy(CorrSet *cs) { if (cs->crossCoeff) { CorrCoeff_destroy(cs->crossCoeff); kfree(cs->crossCoeff); } if (cs->autoCoeff) { CorrCoeff_destroy(cs->autoCoeff); kfree(cs->autoCoeff); } } /************************************************************ * CorrelationCoeffList implementation ************************************************************/ void CCList_init(CCList * ccl) { ccl->head = ccl->tail = NULL; ccl->nItems = 0; } void CCList_add(CCList * ccl, CorrSet *cor) { if (ccl->head == NULL) { ccl->head = ccl->tail = cor; ccl->head->next = NULL; } else { ccl->tail->next = cor; cor->next = NULL; ccl->tail = cor; } ccl->nItems++; } CorrSet * CCList_getHead(CCList * ccl) { return ccl->head; } int CCList_getNumItems(CCList * ccl) { return ccl->nItems; } CorrSet *CCList_find(CCList * ccl, unsigned long f1, unsigned long f2) { CorrSet *cs = ccl->head; while (cs) { if (cs->crossCoeff->fid1==f1 && cs->crossCoeff->fid2==f2) return cs; else cs = cs->next; } return NULL; } void CCList_destroy(CCList * ccl) { CorrSet *cs; while (ccl->head) { cs = ccl->head; ccl->head = ccl->head->next; CorrSet_destroy(cs); kfree(cs); } } /************************************************** * GroupNode implementation **************************************************/ void GN_init(GroupNode *gn, int g, unsigned long f, unsigned long d) /* f is the group representative and d is its destination address */ { gn->gid = g; gn->nFlows = 1; gn->next = NULL; gn->flowsList = (unsigned long *)kmalloc(sizeof(unsigned long), GFP_ATOMIC); if (gn->flowsList) gn->flowsList[0] = f; gn->rep_fid = f; gn->rep_dst = d; } void GN_addFlow(GroupNode *gn, unsigned long fid) { unsigned long *ptr = (unsigned long *)kmalloc((gn->nFlows+1)*sizeof(int), GFP_ATOMIC); if (ptr == NULL) { printk(KERN_INFO "\n Memory reallocation problem: could not add flow \n"); return; } memcpy(ptr, gn->flowsList, (gn->nFlows)*(sizeof(int))); ptr[gn->nFlows] = fid; kfree(gn->flowsList); gn->flowsList = ptr; gn->nFlows++; } void GN_removeFlow(GroupNode *gn, unsigned long fid) { int i = 0, found = FALSE; while (inFlows && !found) { if (gn->flowsList[i] == fid) found = TRUE; else i++; } if (found == FALSE) { printk(KERN_INFO "\n GroupNode::removeFlow ... can not find flow %ld\n", fid); return; } // shift flows list up while (inFlows-1) { gn->flowsList[i] = gn->flowsList[i+1]; i++; } gn->nFlows--; } int GN_isMember(GroupNode *gn, unsigned long fid) /* checks if fid is a group member */ { int i; if (gn->nFlows==0) return FALSE; i=0; while (i < gn->nFlows) if (gn->flowsList[i] == fid) return TRUE; else i++; return FALSE; } void GN_destroy(GroupNode *gn) { kfree(gn->flowsList); } /************************************************** * GroupList implementation **************************************************/ void GL_init(GroupList *gl) { gl->head = NULL; gl->nGroups = gl->max_gid = 0; } void GL_addGroup(GroupList *gl, GroupNode *gn) { GroupNode *temp; if (gl->head==NULL) { gl->head = gn; } else { temp = gl->head; while (temp->next != NULL) temp = temp->next; temp->next = gn; } gn->next = NULL; gl->nGroups++; } void GL_deleteAllGroups(GroupList *gl) { GroupNode *temp; while (gl->head) { temp = gl->head; gl->head = gl->head->next; GN_destroy(temp); kfree(temp); } gl->nGroups = 0; } void GL_destroy(GroupList *gl) { GL_deleteAllGroups(gl); } GroupNode* GN_findGroup(GroupList *gl, unsigned long fid) /* find the group containing flow fid */ { GroupNode *temp = gl->head; while (temp) { if (GN_isMember(temp, fid)) return temp; else temp = temp->next; } return NULL; } int GL_new_gid(GroupList *gl) { return gl->max_gid++; } GroupNode * GL_joinGroups(GroupList *gl, GroupNode *g1, GroupNode*g2) { int i, g1Deleted, numFlows; unsigned long *list1, *list2; GroupNode *joined, *g, *prev; if (!g1 || !g2) return NULL; // first, prepare the new group node joined = (GroupNode *) kmalloc(sizeof(GroupNode), GFP_ATOMIC); GN_init(joined, g1->gid, g1->rep_fid, g1->rep_dst); numFlows = g1->nFlows + g2->nFlows; list1 = g1->flowsList; list2 = g2->flowsList; for (i=1; inFlows; i++) GN_addFlow(joined, list1[i]); for (i=0; inFlows; i++) GN_addFlow(joined, list2[i]); // now update the list g = prev = gl->head; g1Deleted = FALSE; while (g) { if (g1Deleted == FALSE) { if (g->gid == g1->gid) { if (g->gid == gl->head->gid) { joined->next = gl->head->next; gl->head = prev = joined; g = g->next; GN_destroy(g1); kfree(g1); g1Deleted = TRUE; } else { prev->next = joined; prev = joined; joined->next = g->next; g = g->next; GN_destroy(g1); kfree(g1); g1Deleted = TRUE; } } else { prev = g; g = g->next; } } else if (g->gid == g2->gid) { prev->next = g->next; GN_destroy(g2); kfree(g2); break; } else { prev = g; g = g->next; } } gl->nGroups--; return joined; } void GL_listGroups(GroupList *gl) /* lists groups and their member flows */ { int i, num_flows; GroupNode *gn; char *str, cc[50]; FlowInfo *fi; if (!gl) return; gn = gl->head; num_flows = 0; while (gn) { num_flows += gn->nFlows; gn = gn->next; } if (print_option == PRINT_FLOW_DST) str = (char *) kmalloc(sizeof(unsigned long) * num_flows * 4 + 100, GFP_ATOMIC); else str = (char *) kmalloc(sizeof(unsigned long) * num_flows * 3 + 100, GFP_ATOMIC); gn = gl->head; strcpy(str, " Resulting groups : "); while (gn) { strcat(str, " { "); for (i=0; inFlows; i++) { if (print_option == PRINT_FLOW_DST) { fi = FT_findFlow(fcorrelator->flowTable, gn->flowsList[i], 0, 0); sprintf(cc, "%ld.%ld, ", thirdbyte(fi->dst), fourthbyte(fi->dst)); } else sprintf(cc, "%ld, ", (unsigned long)gn->flowsList[i]); strcat(str, cc); } strcat(str, "} "); gn = gn->next; } printk(KERN_INFO "%s\n", str); kfree(str); } /************************************************** * SampleInfo implementation **************************************************/ void SI_init(SampleInfo *si, unsigned long f, unsigned long r, unsigned long t, unsigned long rt) { si->fid = f; si->recv_time = r; si->send_time = t; si->rtt = rt; si->cancelled = 0; } /*************************************************** * SampleInfoList implementation **************************************************/ void SIL_init(SampleInfoList *sil, unsigned long f) { sil->n_samples = 0; sil->fid = f; } void SIL_deleteAllItems(SampleInfoList *sil) { sil->n_samples = 0; } void SIL_append(SampleInfoList *sil, SampleInfo si) /* add a sample at the end of the list */ { if (si.recv_time < si.send_time || sil->n_samples >= MAX_NUM_SAMPLES) return; if (sil->n_samples > 0) { if (sil->sampleInfo[sil->n_samples-1].send_time > si.send_time) return; } sil->sampleInfo[sil->n_samples].fid = si.fid; sil->sampleInfo[sil->n_samples].send_time = si.send_time; sil->sampleInfo[sil->n_samples].recv_time = si.recv_time; sil->sampleInfo[sil->n_samples].rtt = si.rtt; sil->sampleInfo[sil->n_samples].cancelled = FALSE; sil->n_samples++; } void SIL_enableAllSamples(SampleInfoList *sil) /* reset "cancelled" flag of all samples */ { int i; for (i=0; in_samples; i++) sil->sampleInfo[i].cancelled = 0; } /***************************************************** * FlowInfo implementation *****************************************************/ int FI_init(FlowInfo *fi, unsigned long f, unsigned long d) { fi->fid = f; fi->dst = d; fi->gid = -1; fi->lastSampleSendTime = 0; fi->next = NULL; fi->sampleList = (SampleInfoList *) kmalloc(sizeof(SampleInfoList), GFP_ATOMIC); if (fi->sampleList == NULL) return 0; SIL_init(fi->sampleList, f); fi->ccl = (CCList *) kmalloc(sizeof(CCList), GFP_ATOMIC); if (fi->ccl == NULL) { kfree(fi->sampleList); return 0; } CCList_init(fi->ccl); return 1; } void FI_destroy(FlowInfo *fi) { if (fi->ccl) { CCList_destroy(fi->ccl); kfree(fi->ccl); fi->ccl = NULL; } if (fi->sampleList) { kfree(fi->sampleList); fi->sampleList = NULL; } } void FI_addSampleInfo(FlowInfo *fi, SampleInfo sampleInfo) { // if last send time > new time, ignore this packet arrival if (fi->lastSampleSendTime < sampleInfo.send_time && sampleInfo.send_time < sampleInfo.recv_time) { if (DEBUG_SAMPLES) printk(KERN_INFO "dst=%ld.%ld,send=%ld,recv=%ld,diff=%ld,rtt=%ld, nsamples=%d\n", thirdbyte(fi->dst), fourthbyte(fi->dst), sampleInfo.send_time, sampleInfo.recv_time, sampleInfo.recv_time-sampleInfo.send_time, sampleInfo.rtt,fi->sampleList->n_samples+1); fi->lastSampleSendTime = sampleInfo.send_time; SIL_append(fi->sampleList, sampleInfo); } } /************************************************* * FlowTable implementation *************************************************/ void FT_init(FlowTable *ft) { int i; for (i=0; ihead[i] = NULL; ft->numFlows = 0; } FlowInfo *FT_addFlow(FlowTable *ft, unsigned long fid, unsigned long dst) { unsigned long index; FlowInfo *fi, *p; int success; index = (dst>>(32-N_HASH_BITS)) & (HASH_TABLE_SIZE - 1); fi = (FlowInfo *) kmalloc(sizeof(FlowInfo), GFP_ATOMIC); if (fi) success = FI_init(fi, fid, dst); else return NULL; if (!success) { kfree(fi); return NULL; } if (ft->head[index]==NULL) { ft->head[index] = fi; } else { p = ft->head[index]; while (p->next!=NULL) p = p->next; p->next = fi; } ft->numFlows++; return fi; } void FT_removeFlow(FlowTable *ft, unsigned long fid, unsigned long dst) { unsigned long index; FlowInfo *p, *q; index = (dst>>(32-N_HASH_BITS)) & (HASH_TABLE_SIZE - 1); if (ft->head[index] != NULL) { p = ft->head[index]; if (p->fid == fid) { ft->head[index] = p->next; FI_destroy(p); kfree(p); ft->numFlows--; p = NULL; } else { q = p->next; while (q) { if (q->fid==fid) { p->next = q->next; FI_destroy(q); kfree(q); ft->numFlows--; q = NULL; return; } else { p = q; q = p->next; } } } } } FlowInfo *FT_findFlow(FlowTable *ft, unsigned long fid, unsigned long dst, int is_dst_provided) { unsigned long index; int i; FlowInfo *p; if (ft->numFlows == 0) return NULL; if (is_dst_provided == 0) { // destination not provided for (i=0; ihead[i]; while (p) { if (p->fid == fid) { return p; } else { p = p->next; } } } return NULL; } index = (dst>>(32-N_HASH_BITS)) & (HASH_TABLE_SIZE - 1); p = ft->head[index]; while (p) { if (p->fid == fid) return p; p = p->next; } return NULL; } int FT_sameGroup(FlowTable *ft, unsigned long fid1, unsigned long fid2) /* check if fid1 and fid2 belong to the same group */ { FlowInfo *flow1, *flow2; flow1 = FT_findFlow(ft, fid1, 0, 0); flow2 = FT_findFlow(ft, fid2, 0, 0); if (flow1==NULL || flow2==NULL) return FALSE; return ((flow1->gid >= 0) && (flow2->gid >= 0) && (flow1->gid == flow2->gid)); } void FT_listFlows(FlowTable *ft) /* print a list of all flows in the table */ { int i; FlowInfo *fi; char *str, cc[50]; str = (char *) kmalloc(sizeof(unsigned long)*ft->numFlows*4, GFP_ATOMIC); strcpy(str, " Flows & destinations : "); for (i=0; ihead[i]) { fi = ft->head[i]; while (fi) { if (fi->sampleList->n_samples >= MIN_THRESH) { sprintf(cc, "(%ld, %ld.%ld) ", fi->fid, thirdbyte(fi->dst), fourthbyte(fi->dst)); strcat(str, cc); } fi = fi->next; } } } printk(KERN_INFO "%s\n", str); kfree(str); } void FT_destroy(FlowTable *ft) { FlowInfo *fi; int i; for (i=0; ihead[i]) { fi = ft->head[i]; ft->head[i] = ft->head[i]->next; FI_destroy(fi); kfree(fi); fi = NULL; } } } void FT_updateTable(FlowTable *ft) /* remove all samples fom the current sample lists */ { FlowInfo *fi; int i; for (i=0; ihead[i]) { fi = ft->head[i]; while (fi) { SIL_deleteAllItems(fi->sampleList); fi = fi->next; } } // if head } // for } GroupList *FT_correlateFlows(FlowTable *ft) { int i, correlated, ngroups; double highest_corr; FlowInfo *fi=NULL, *groupRep, *rem_flow; GroupNode *gn, *chosen_gn, *prevChoice, *gnode; CorrSet *cs=NULL; GroupList *newGroups= NULL; ngroups = 0; for (i=0; ihead[i]; while (fi) { if (fi->sampleList->n_samples == 0) { rem_flow = fi; fi = fi->next; FT_removeFlow(ft, rem_flow->fid, rem_flow->dst); continue; } else if (fi->sampleList->n_samples < MIN_THRESH) { fi = fi->next; continue; } if (ngroups == 0) { ngroups++; newGroups = (GroupList *) kmalloc(sizeof(GroupList), GFP_ATOMIC); GL_init(newGroups); gn = (GroupNode *) kmalloc(sizeof(GroupNode), GFP_ATOMIC); GN_init(gn, GL_new_gid(newGroups), fi->fid, fi->dst); GL_addGroup(newGroups, gn); } else { gn = newGroups->head; if (gn==NULL) { printk(KERN_INFO "\n CorrelateFlows problem: Could not allocate memory\n"); return NULL; } correlated = FALSE; highest_corr = -100; chosen_gn = NULL; prevChoice = NULL; while (gn) { groupRep = FT_findFlow(ft, gn->rep_fid, gn->rep_dst, 1); if (groupRep == NULL) { printk(KERN_INFO "\n CorrelateFlows problem: Flow not found \n"); return NULL; } // perform correlation test of the two flows cs = NULL; correlated = FT_correlateTwoFlows(ft, groupRep, fi, &cs); if (cs==NULL) return NULL; if (DEBUG_COEFF) { printk(KERN_INFO " Corr : %ld.%ld(%d) and %ld.%ld(%d)\n", /*(groupRep->dst>>mask_bits)&addr_mask*/thirdbyte(groupRep->dst), fourthbyte(groupRep->dst), groupRep->sampleList->n_samples, /*(fi->dst>>mask_bits)&addr_mask*/thirdbyte(fi->dst), fourthbyte(fi->dst), fi->sampleList->n_samples); printk(KERN_INFO " cross= %d(%d), auto= %d(%d), corr = %s !!! \n", (int)(cs->crossCoeff->coeffVal*1000), NumSamples(cs->crossCoeff), (int)(cs->autoCoeff->coeffVal*1000), NumSamples(cs->autoCoeff), (correlated)? "yes":"no"); } if (correlated && howToChoose==HIGHEST_CORR) { if (fabs_(cs->crossCoeff->coeffVal) > highest_corr) { highest_corr = fabs_(cs->crossCoeff->coeffVal); chosen_gn = gn; } } else if (correlated && howToChoose==JOIN_METHOD) { if (!prevChoice) prevChoice = chosen_gn = gn; else { // Before merging the two groups // make sure the the rep. flows have non-negative cross coeff. // otherwise, use HIGHEST_COEFF approach CorrSet *tempCS; FlowInfo *repFlow1 = FT_findFlow(ft, prevChoice->rep_fid, prevChoice->rep_dst, 1); if (groupRep->fid < repFlow1->fid) tempCS = CCList_find(groupRep->ccl, groupRep->fid, repFlow1->fid); else tempCS = CCList_find(repFlow1->ccl, repFlow1->fid, groupRep->fid); if (tempCS->crossCoeff->coeffVal > 0) { chosen_gn = GL_joinGroups(newGroups, prevChoice, gn); highest_corr = fabs_(tempCS->crossCoeff->coeffVal); prevChoice = chosen_gn; } else { if (fabs_(cs->crossCoeff->coeffVal) > highest_corr) { highest_corr = fabs_(cs->crossCoeff->coeffVal); chosen_gn = gn; } } } } gn = gn->next; } // while gn if (chosen_gn) { GN_addFlow(chosen_gn, fi->fid); fi->gid = chosen_gn->gid; } else { gnode = (GroupNode *) kmalloc(sizeof(GroupNode), GFP_ATOMIC); GN_init(gnode, GL_new_gid(newGroups), fi->fid, fi->dst); GL_addGroup(newGroups, gnode); ngroups++; fi->gid = gnode->gid; if (howToChoose == HIGHEST_CORR) FT_regroup(ft, newGroups, gnode); } } fi = fi->next; } } return newGroups; } void FT_regroup(FlowTable *ft, GroupList *gl, GroupNode *newGN) /* check already grouped flows with a new created group representative */ { int i, correlated; FlowInfo *repFlow, *otherFlow, *orig_rep; GroupNode *gn=NULL; unsigned long *flows; CorrSet *oldCS, *cs; repFlow = FT_findFlow(ft, newGN->rep_fid, newGN->rep_dst, 1); gn = gl->head; while (gn) { if (gn->gid == newGN->gid) break; flows = gn->flowsList; for (i=1; inFlows; i++) { otherFlow = FT_findFlow(ft, flows[i], 0, 0); correlated = FALSE; cs = NULL; correlated = FT_correlateTwoFlows(ft, repFlow, otherFlow, &cs); if (cs==NULL) return; if (DEBUG_COEFF) { printk(KERN_INFO " Corr : %ld.%ld(%d) and %ld.%ld(%d)\n", /*(repFlow->dst>>mask_bits)&addr_mask*/thirdbyte(repFlow->dst), fourthbyte(repFlow->dst), repFlow->sampleList->n_samples, /*(otherFlow->dst>>mask_bits)&addr_mask*/thirdbyte(otherFlow->dst), fourthbyte(otherFlow->dst), otherFlow->sampleList->n_samples); printk(KERN_INFO " cross= %d(%d), auto= %d(%d), corr = %s --- \n", (int)(cs->crossCoeff->coeffVal*1000), NumSamples(cs->crossCoeff), (int)(cs->autoCoeff->coeffVal*1000), NumSamples(cs->autoCoeff), (correlated)? "yes":"no"); } if (correlated) { // compare with current correlation coefficient of the flow in its group orig_rep = FT_findFlow(ft, gn->rep_fid, gn->rep_dst, 1); oldCS = NULL; if (orig_rep->fid < otherFlow->fid) oldCS = CCList_find(orig_rep->ccl, orig_rep->fid, otherFlow->fid); else oldCS = CCList_find(otherFlow->ccl, otherFlow->fid, orig_rep->fid); if(oldCS == NULL) return; if (fabs_(cs->crossCoeff->coeffVal)>fabs_(oldCS->crossCoeff->coeffVal)) { // move that flow to the new group GN_removeFlow(gn, otherFlow->fid); GN_addFlow(newGN, otherFlow->fid); otherFlow->gid = newGN->gid; i--; // adjust loop index to compensate for the removed flow } } } // for i gn = gn->next; } // while gn } double FT_getAvgDist(FlowTable *ft, CorrSet *cs, SampleInfoList *si1, SampleInfoList *si2) /* get the average distance between samples (required for the correlation test) */ { unsigned long newSendTime1=0, newSendTime2=0, sendTime1=0, sendTime2=0; int index1, index2, go_f1, go_f2, needSample1, needSample2; index1 = index2 = 0; needSample1 = needSample2 = TRUE; go_f1 = go_f2 = FALSE; if (si1->n_samples == 0 || si2->n_samples == 0) return 0; while (index1 < si1->n_samples && index2 < si2->n_samples) { if (needSample1) { newSendTime1 = si1->sampleInfo[index1].send_time; index1++; needSample1 = FALSE; } if (needSample2) { newSendTime2 = si2->sampleInfo[index2].send_time; index2++; needSample2 = FALSE; } if (!go_f1 && !go_f2) { sendTime1 = newSendTime1; sendTime2 = newSendTime2; if ((sendTime1 < sendTime2)) { needSample1 = TRUE; go_f1 = TRUE; } else { needSample2 = TRUE; go_f2 = TRUE; } } else if (go_f1) { if (newSendTime1 <= sendTime2) needSample1 = TRUE; else { sendTime1 = newSendTime1; cs->sumDiff += fabs_((double)sendTime1 - (double)sendTime2); cs->nSamples++; go_f1 = go_f2 = FALSE; needSample2 = needSample2 = TRUE; } } else if (go_f2) { if ((newSendTime2 <= sendTime1)) needSample2 = TRUE; else { sendTime2 = newSendTime2; cs->sumDiff += fabs_((double)sendTime1 - (double)sendTime2); cs->nSamples++; go_f1 = go_f2 = FALSE; needSample1 = needSample2 = TRUE; } } } if (cs->nSamples > 0) return (cs->sumDiff/cs->nSamples); return 0; } void FT_CrossCorr(FlowTable *ft, SampleInfoList *si1, SampleInfoList *si2, CorrelationCoeff *cc) { unsigned long sendTime1=0, sendTime2=0, newSendTime1=0, newSendTime2=0; double newsample1=0, newsample2=0, sample1=0, sample2=0; int index1, index2; int go_f1=FALSE, go_f2=FALSE, needSample1=TRUE, needSample2=TRUE; index1 = index2 = 0; if (si1->n_samples == 0 || si2->n_samples == 0) return; while (index1 < si1->n_samples && index2 < si2->n_samples) { if (needSample1) { if (si1->sampleInfo[index1].cancelled == 1) { index1++; continue; } newSendTime1 = si1->sampleInfo[index1].send_time; if (corr_mode == FORWARD_DELAY) newsample1 = (si1->sampleInfo[index1].recv_time - si1->sampleInfo[index1].send_time)%100000; else newsample1 = si1->sampleInfo[index1].rtt; index1++; needSample1 = FALSE; } if (needSample2) { if (si2->sampleInfo[index2].cancelled == 1) { index2++; continue; } newSendTime2 = si2->sampleInfo[index2].send_time; if (corr_mode == FORWARD_DELAY) newsample2 = (si2->sampleInfo[index2].recv_time-si2->sampleInfo[index2].send_time)%100000; else newsample2 = si2->sampleInfo[index2].rtt; index2++; needSample2 = FALSE; } if (!go_f1 && !go_f2) { sample1 = newsample1; sendTime1 = newSendTime1; sample2 = newsample2; sendTime2 = newSendTime2; if ((sendTime1 < sendTime2)) { needSample1 = TRUE; go_f1 = TRUE; } else { needSample2 = TRUE; go_f2 = TRUE; } } else { if (go_f1) { if (newSendTime1 <= sendTime2) needSample1 = TRUE; else { sample1 = newsample1; sendTime1 = newSendTime1; CorrCoeff_add(cc, sample1, sample2); go_f1 = go_f2 = FALSE; needSample2 = needSample2 = TRUE; } } else if (go_f2) { if ((newSendTime2 <= sendTime1)) { needSample2 = TRUE; } else { sample2 = newsample2; sendTime2 = newSendTime2; CorrCoeff_add(cc, sample1, sample2); go_f1 = go_f2 = FALSE; needSample1 = needSample2 = TRUE; } } } } CorrCoeff_CC(cc); } void FT_AutoCorr(FlowTable *ft, SampleInfoList *sil, CorrelationCoeff *cc, double avg_t) { unsigned long sendTime1=0, sendTime2=0, sendTime; double sample, sample1=0, sample2=0; int index, needSample1, needSample2; double rval; index = 0; needSample1 = needSample2 = TRUE; while (index < sil->n_samples) { rval = uniform(&(cc->autoSeed)); sendTime=sil->sampleInfo[index].send_time; if (corr_mode == FORWARD_DELAY) sample = (sil->sampleInfo[index].recv_time-sil->sampleInfo[index].send_time)%100000; else sample = sil->sampleInfo[index].rtt; if (rval < 0.5) { if (needSample1) { sendTime1 = sendTime; sample1 = sample; needSample1 = FALSE; if (!needSample2) { if (fabs_((double)sendTime1 - (double)sendTime2) > avg_t) { CorrCoeff_add(cc, sample1, sample2); needSample1 = needSample2 = TRUE; } else { sil->sampleInfo[index].cancelled = 1; needSample1 = TRUE; } } } } else { // rval >= 0.5 if (needSample2) { sendTime2 = sendTime; sample2 = sample; needSample2 = FALSE; if (!needSample1) { if (fabs_((double)sendTime1 - (double)sendTime2) > avg_t) { CorrCoeff_add(cc, sample1, sample2); needSample1 = needSample2 = TRUE; } else { sil->sampleInfo[index].cancelled = 1; needSample2 = TRUE; } } } } index++; } CorrCoeff_CC(cc); } int FT_correlateTwoFlows(FlowTable *ft, FlowInfo *f1, FlowInfo *f2, CorrSet **cs) /* perform the correlation test for f1 and f2 */ { int state = 0; double avg_t = 0; SIL_enableAllSamples(f1->sampleList); SIL_enableAllSamples(f2->sampleList); if (f1->fid < f2->fid) (*cs) = CCList_find(f1->ccl, f1->fid, f2->fid); else (*cs) = CCList_find(f2->ccl, f2->fid, f1->fid); if (ignore_history && (*cs)) { if (f1->fid < f2->fid) { CorrCoeff_init((*cs)->crossCoeff, f1->fid, f2->fid); CorrCoeff_init((*cs)->autoCoeff, f2->fid, f2->fid); } else { CorrCoeff_init((*cs)->crossCoeff, f2->fid, f1->fid); CorrCoeff_init((*cs)->autoCoeff, f1->fid, f1->fid); } } if (!(*cs)) { (*cs) = (CorrSet *) kmalloc(sizeof(CorrSet), GFP_ATOMIC); CorrSet_init(*cs); if (f1->fid < f2->fid) { (*cs)->crossCoeff = (CorrelationCoeff *) kmalloc(sizeof(CorrelationCoeff), GFP_ATOMIC); CorrCoeff_init((*cs)->crossCoeff, f1->fid, f2->fid); (*cs)->autoCoeff = (CorrelationCoeff *) kmalloc(sizeof(CorrelationCoeff), GFP_ATOMIC); CorrCoeff_init((*cs)->autoCoeff, f2->fid, f2->fid); CCList_add(f1->ccl, (*cs)); } else { (*cs)->crossCoeff = (CorrelationCoeff *) kmalloc(sizeof(CorrelationCoeff), GFP_ATOMIC); CorrCoeff_init((*cs)->crossCoeff, f2->fid, f1->fid); (*cs)->autoCoeff = (CorrelationCoeff *) kmalloc(sizeof(CorrelationCoeff), GFP_ATOMIC); CorrCoeff_init((*cs)->autoCoeff, f1->fid, f1->fid); CCList_add(f2->ccl, (*cs)); } } if (f1->fid < f2->fid) { avg_t = FT_getAvgDist(ft, *cs, f1->sampleList, f2->sampleList); FT_AutoCorr(ft, f2->sampleList, (*cs)->autoCoeff, avg_t); FT_CrossCorr(ft, f1->sampleList, f2->sampleList, (*cs)->crossCoeff); } else { avg_t = FT_getAvgDist(ft, *cs, f2->sampleList, f1->sampleList); FT_AutoCorr(ft, f1->sampleList, (*cs)->autoCoeff, avg_t); FT_CrossCorr(ft, f2->sampleList, f1->sampleList, (*cs)->crossCoeff); } //state = ((*cs)->crossCoeff->coeffVal > 0); state = (fabs_((*cs)->crossCoeff->coeffVal) > fabs_((*cs)->autoCoeff->coeffVal)); //state &= (fabs_((*cs)->crossCoeff->coeffVal) >= 0.2); return state; } /****************************************************** * FlowCorrelator implementation ******************************************************/ void FC_init(int win) { int ret; fcorrelator = (FlowCorrelator *)kmalloc(sizeof(FlowCorrelator), GFP_ATOMIC); if (!fcorrelator) { printk(KERN_INFO "FC_init: Could not allocate memory to correlator..\n"); CorrCreated = 0; return; } fcorrelator->win_sz = win; fcorrelator->groups = (GroupList *) kmalloc(sizeof(GroupList), GFP_ATOMIC); if (!fcorrelator->groups) { kfree(fcorrelator); fcorrelator = NULL; CorrCreated = 0; return; } GL_init(fcorrelator->groups); fcorrelator->flowTable = (FlowTable *)kmalloc(sizeof(FlowTable), GFP_ATOMIC); if (!fcorrelator->flowTable) { kfree(fcorrelator->groups); kfree(fcorrelator); fcorrelator = NULL; CorrCreated = 0; return; } FT_init(fcorrelator->flowTable); /***** timer initialization *****/ init_timer(&fcorrelator->ctimer); fcorrelator->ctimer.function = &timer_expire; fcorrelator->ctimer.data = (unsigned long) fcorrelator; ret = mod_timer(&(fcorrelator->ctimer), jiffies + fcorrelator->win_sz); CorrCreated = 1; } void FC_destroy(FlowCorrelator *fc) { if (fc->groups) { GL_destroy(fc->groups); kfree(fc->groups); } if (fc->flowTable) { FT_destroy(fc->flowTable); kfree(fc->flowTable); } } void FC_gatherSamples(FlowCorrelator *fc, unsigned long fid, unsigned long dst, unsigned long recv_time, unsigned long send_time, unsigned long rtt) { FlowInfo *fi; SampleInfo p; fi = FT_findFlow(fc->flowTable, fid, dst, 1); // if (fi) // printk(KERN_INFO " Sample: dst=%d.%d, send=%d, rtt=%d\n", // /*(dst>>mask_bits)&addr_mask*/thirdbyte(dst), fourthbyte(dst), // send_time, rtt); SI_init(&p, fid, recv_time, send_time, rtt); if (fi==NULL) fi = FT_addFlow(fc->flowTable, fid, dst); if (fi) FI_addSampleInfo(fi, p); } void FC_addFlow(FlowCorrelator *fc, unsigned long fid, unsigned long dst) { FT_addFlow(fc->flowTable, fid, dst); } void FC_removeFlow(FlowCorrelator *fc, unsigned long fid, unsigned long dst) { FT_removeFlow(fc->flowTable, fid, dst); } GroupList *FC_buildGroups(FlowCorrelator *fc) { if (fc->groups!=NULL) { GL_destroy(fc->groups); kfree(fc->groups); fc->groups = NULL; } fc->groups = FT_correlateFlows(fc->flowTable); return fc->groups; } void FC_updateFlowTable(FlowCorrelator *fc) { FT_updateTable(fc->flowTable); }