c++ - cuda thrust based approach to grouping packets in tcp stream -
i have tcpdumps (.pcap) files of captured packets millions of packets. need group network packets tcp streams.
example: let consider following packets no => source_ip, destination_ip,source_port,destination_port
1 => ip1, ip2, s1, s2
2 => ip1, ip3, s3, s4
3 => ip2,ip1, s2, s1
4 => ip3,ip1, s4,s3
now in above example of 4 packets, packets 1,3 , 2,4 packets of same stream. i.e need resolve following packets [[1,3],[2,4]].
my approach:
since (ip1, ip2, s1, s2) , (ip2, ip1, s2, s1) indicates same stream decided hash both of them , name forward_hash , reverse hash denote packets of same stream flowing in opposite directions.
i use index array keep track of packets during replacing , sorting. after final sorting, starting , ending of same hashes extracted , used against index array packet indices represent stream
keys forward_hash of each packets, count number of packets, packet_ids id of each packet corresponding each of hash thrust::device_vector<unsigned long long> d_keys(keys,(keys+count)); thrust::device_vector<unsigned long long> d_ids(packet_ids,(packet_ids+count)); // sort ids according keys thrust::sort_by_key(d_keys.begin(), d_keys.end(), d_ids.begin()); // after sorting, need find index of each hash thrust::device_vector<unsigned long long> u_keys(count); thrust::device_vector<unsigned long long> output(count); thrust::pair<thrust::device_vector<unsigned long long>::iterator, thrust::device_vector<unsigned long long>::iterator> new_end; new_end = thrust::reduce_by_key(d_keys.begin(), d_keys.end(),thrust::make_constant_iterator(1),u_keys.begin(),output.begin()); // need find starting index each hash ....
i've tried implement hash table lookup unique forward , reverse hash replacing every reverse hash forward hash before sorting...but quite slow on performance. help?
thanks
i propose approach first sorts within each packet , sorts packets.
the example code following steps:
in order identify packets of same tcp stream, need sort packets. before can that, need make sure within each sent packet source , destination sorted. example:
20:1 -> 10:4
becomes10:4 -> 20:1
now can sort packets packets of same stream grouped. code assumes input packets sorted in time. apply stable sort in order keep sorting within each stream.
we need find out each tcp stream starts. result of step indices point beginning of tcp stream in sorted packet list.
depending on how need result, can generate additional information streams such number of packets per stream.
possible improvement:
if know ip addresses of limited range, might represented using 16 bit. possibly compress sender address, sender port, receiver address, receiver port 64bit integer improve sort performance.
compile , run
nvcc -std=c++11 sort_packets.cu -o sort_packets && ./sort_packets
output
input data d_src_addr: 20 10 20 20 30 30 10 20 30 20 d_src_port: 1 2 3 1 2 2 6 1 1 1 d_dst_addr: 10 20 30 10 20 20 30 10 10 10 d_dst_port: 4 2 3 4 5 5 1 4 6 4 packets after sort_within_packet d_src_addr: 10 10 20 10 20 20 10 10 10 10 d_src_port: 4 2 3 4 5 5 6 4 6 4 d_dst_addr: 20 20 30 20 30 30 30 20 30 20 d_dst_port: 1 2 3 1 2 2 1 1 1 1 after stable_sort d_orig_ind: 1 0 3 7 9 6 8 2 4 5 packets after stable_sort d_src_addr: 10 10 10 10 10 10 10 20 20 20 d_src_port: 2 4 4 4 4 6 6 3 5 5 d_dst_addr: 20 20 20 20 20 30 30 30 30 30 d_dst_port: 2 1 1 1 1 1 1 3 2 2 after copy_if d_start_indices: 0 1 5 7 8 d_stream_lengths: 1 4 2 1 2 group of streams referencing original indices [1] [0,3,7,9] [6,8] [2] [4,5]
sort_packets.cu
#include <stdint.h> #include <iostream> #include <thrust/device_vector.h> #include <thrust/iterator/zip_iterator.h> #include <thrust/iterator/transform_iterator.h> #include <thrust/iterator/counting_iterator.h> #include <thrust/sort.h> #include <thrust/sequence.h> #include <thrust/copy.h> #include <thrust/functional.h> #include <thrust/adjacent_difference.h> #include <thrust/scatter.h> #define printer(name) print(#name, (name)) template <template <typename...> class v, typename t, typename ...args> void print(const char* name, const v<t,args...> & v) { std::cout << name << ":\t"; thrust::copy(v.begin(), v.end(), std::ostream_iterator<t>(std::cout, "\t")); std::cout << std::endl; } typedef thrust::tuple<uint32_t, uint16_t, uint32_t, uint16_t> packet; struct sort_within_packet : public thrust::unary_function<packet, packet> { __host__ __device__ packet operator()(packet p) const { if (thrust::get<0>(p) > thrust::get<2>(p)) { packet copy(p); thrust::get<0>(p) = thrust::get<2>(copy); thrust::get<1>(p) = thrust::get<3>(copy); thrust::get<2>(p) = thrust::get<0>(copy); thrust::get<3>(p) = thrust::get<1>(copy); } return p; } }; struct find_start_indices : public thrust::unary_function<thrust::tuple<packet, packet>, bool> { __host__ __device__ bool operator()(thrust::tuple<packet, packet> p) { return (thrust::get<0>(p) != thrust::get<1>(p)); } }; template<typename... iterators> __host__ __device__ thrust::zip_iterator<thrust::tuple<iterators...>> zip(iterators... its) { return thrust::make_zip_iterator(thrust::make_tuple(its...)); } int main() { // in example have 10 packets const int n = 10; // demo data // example uses simple "ip addresses" uint32_t srcaddrarray[n] = {20, 10, 20, 20, 30, 30, 10, 20, 30, 20}; uint16_t srcportarray[n] = {1 , 2 , 3 , 1 , 2 , 2 , 6 , 1 , 1 , 1 }; uint32_t dstaddrarray[n] = {10, 20, 30, 10, 20, 20, 30, 10, 10, 10}; uint16_t dstportarray[n] = {4 , 2 , 3 , 4 , 5 , 5 , 1 , 4 , 6 , 4 }; // upload data gpu thrust::device_vector<uint32_t> d_src_addr(srcaddrarray, srcaddrarray+n); thrust::device_vector<uint16_t> d_src_port(srcportarray, srcportarray+n); thrust::device_vector<uint32_t> d_dst_addr(dstaddrarray, dstaddrarray+n); thrust::device_vector<uint16_t> d_dst_port(dstportarray, dstportarray+n); thrust::device_vector<uint32_t> d_orig_ind(n); thrust::sequence(d_orig_ind.begin(), d_orig_ind.end()); std::cout << "input data" << std::endl; printer(d_src_addr); printer(d_src_port); printer(d_dst_addr); printer(d_dst_port); std::cout << std::endl; // 1. sort within packet auto zip_begin = zip(d_src_addr.begin(), d_src_port.begin(), d_dst_addr.begin(), d_dst_port.begin()); auto zip_end = zip(d_src_addr.end(), d_src_port.end(), d_dst_addr.end(), d_dst_port.end()); thrust::transform(zip_begin, zip_end, zip_begin, sort_within_packet()); std::cout << "packets after sort_within_packet" << std::endl; printer(d_src_addr); printer(d_src_port); printer(d_dst_addr); printer(d_dst_port); std::cout << std::endl; // 2. sort packets thrust::stable_sort(zip(d_src_addr.begin(), d_src_port.begin(), d_dst_addr.begin(), d_dst_port.begin(), d_orig_ind.begin()), zip(d_src_addr.end(), d_src_port.end(), d_dst_addr.end(), d_dst_port.end(), d_orig_ind.end())); std::cout << "after stable_sort" << std::endl; printer(d_orig_ind); std::cout << std::endl; std::cout << "packets after stable_sort" << std::endl; printer(d_src_addr); printer(d_src_port); printer(d_dst_addr); printer(d_dst_port); std::cout << std::endl; // 3. find stard indices of each stream thrust::device_vector<uint32_t> d_start_indices(n); using namespace thrust::placeholders; thrust::device_vector<uint32_t>::iterator copyend = thrust::copy_if(thrust::make_counting_iterator(1), thrust::make_counting_iterator(n), thrust::make_transform_iterator( zip( zip(d_src_addr.begin(), d_src_port.begin(), d_dst_addr.begin(), d_dst_port.begin()), zip(d_src_addr.begin()+1, d_src_port.begin()+1, d_dst_addr.begin()+1, d_dst_port.begin()+1) ), find_start_indices() ), d_start_indices.begin()+1, _1); uint32_t streamcount = copyend-d_start_indices.begin(); d_start_indices.resize(streamcount); std::cout << "after copy_if" << std::endl; printer(d_start_indices); // 4. generate additional information result , print result formatted thrust::device_vector<uint32_t> d_stream_lengths(streamcount+1); thrust::adjacent_difference(d_start_indices.begin(), d_start_indices.end(), d_stream_lengths.begin()); d_stream_lengths.erase(d_stream_lengths.begin()); d_stream_lengths.back() = n-d_start_indices.back(); printer(d_stream_lengths); thrust::host_vector<uint32_t> h_start_indices = d_start_indices; thrust::host_vector<uint32_t> h_orig_ind = d_orig_ind; auto index = h_start_indices.begin(); index++; std::cout << std::endl << "group of streams referencing original indices"<< std::endl << "[" << h_orig_ind[0]; for(int i=1; i<n;++i) { if (i == *index) { index++; std::cout << "]\t["; } else { std::cout << ","; } std::cout << h_orig_ind[i]; } std::cout << "]" << std::endl; return 0; }
Comments
Post a Comment