Source code for chains.links.packet_meta

"""PacketMeta, Use DPKT to pull out packet information and convert those
   attributes to a dictionary based output."""
import datetime
import dpkt

# Local imports
from chains.links import link
from chains.utils import file_utils, data_utils

[docs]class PacketMeta(link.Link): """PacketMeta, Use DPKT to pull out packet information and convert those attributes to a dictionary based output.""" def __init__(self): """Initialize PacketMeta Class""" # Call super class init super(PacketMeta, self).__init__() # Set my output self.output_stream = self.packet_meta_data()
[docs] def packet_meta_data(self): """Pull out the metadata about each packet from the input_stream Args: None Returns: generator (dictionary): a generator that contains packet meta data in the form of a dictionary""" # For each packet in the pcap process the contents for item in self.input_stream: # Output object output = {} # Grab the fields I need timestamp = item['timestamp'] buf = item['raw_buf'] # Print out the timestamp in UTC output['timestamp'] = datetime.datetime.utcfromtimestamp(timestamp) # Unpack the Ethernet frame (mac src/dst, ethertype) eth = dpkt.ethernet.Ethernet(buf) output['eth'] = {'src': eth.src, 'dst': eth.dst, 'type':eth.type, 'len': len(eth)} # Grab packet data packet = eth.data # Packet Type ('EtherType') (IP, ARP, PPPoE, IP6... see http://en.wikipedia.org/wiki/EtherType) if hasattr(packet, 'data'): output['packet'] = {'type': packet.__class__.__name__, 'data': packet.data} else: output['packet'] = {'type': None, 'data': None} # It this an IP packet? if output['packet']['type'] == 'IP': # Pull out fragment information (flags and offset all packed into off field, so use bitmasks) df = bool(packet.off & dpkt.ip.IP_DF) mf = bool(packet.off & dpkt.ip.IP_MF) offset = packet.off & dpkt.ip.IP_OFFMASK # Pulling out src, dst, length, fragment info, TTL, checksum and Protocol output['packet'].update({'src':packet.src, 'dst':packet.dst, 'p': packet.p, 'len':packet.len, 'ttl':packet.ttl, 'df':df, 'mf': mf, 'offset': offset, 'checksum': packet.sum}) # Is this an IPv6 packet? elif output['packet']['type'] == 'IP6': # Pulling out the IP6 fields output['packet'].update({'src':packet.src, 'dst':packet.dst, 'p': packet.p, 'len':packet.plen, 'ttl':packet.hlim}) # If the packet isn't IP or IPV6 just pack it as a dictionary else: output['packet'].update(data_utils.make_dict(packet)) # For the transport layer we're going to set the transport to None. and # hopefully a 'link' upstream will manage the transport functionality output['transport'] = None # For the application layer we're going to set the application to None. and # hopefully a 'link' upstream will manage the application functionality output['application'] = None # All done yield output
[docs]def test(): """Test for PacketMeta class""" from chains.sources import packet_streamer import pprint # Create a PacketStreamer and set its output to PacketMeta input data_path = file_utils.relative_dir(__file__, '../../data/http.pcap') streamer = packet_streamer.PacketStreamer(iface_name=data_path, max_packets=50) meta = PacketMeta() meta.link(streamer) for item in meta.output_stream: pprint.pprint(item)
if __name__ == '__main__': test()