Source code for chains.links.transport_meta

"""TransportMeta: Pull out transport meta data from incoming packet data"""
import dpkt

# Local imports
from chains.links import link
from chains.utils import file_utils, log_utils, data_utils


[docs]class TransportMeta(link.Link): """Pull out transport meta data from incoming packet data""" def __init__(self): """Initialize TransportMeta Class""" # Call super class init super(TransportMeta, self).__init__() # Set my output self.output_stream = self.transport_meta_data()
[docs] def transport_meta_data(self): """Pull out the transport metadata for each packet in the input_stream""" # For each packet in the pcap process the contents for item in self.input_stream: # Get the transport data and type trans_data = item['packet']['data'] trans_type = self._get_transport_type(trans_data) if trans_type and trans_data: item['transport'] = data_utils.make_dict(trans_data) item['transport']['type'] = trans_type item['transport']['flags'] = self._readable_flags(item['transport']) item['transport']['data'] = trans_data['data'] # All done yield item
@staticmethod def _get_transport_type(transport): """Give the transport as a string or None if not one""" return transport.__class__.__name__ if transport.__class__.__name__ != 'str' else None @staticmethod def _readable_flags(transport): """Method that turns bit flags into a human readable list Args: transport (dict): transport info, specifically needs a 'flags' key with bit_flags Returns: list: a list of human readable flags (e.g. ['syn_ack', 'fin', 'rst', ...] """ if 'flags' not in transport: return None _flag_list = [] flags = transport['flags'] if flags & dpkt.tcp.TH_SYN: if flags & dpkt.tcp.TH_ACK: _flag_list.append('syn_ack') else: _flag_list.append('syn') elif flags & dpkt.tcp.TH_FIN: if flags & dpkt.tcp.TH_ACK: _flag_list.append('fin_ack') else: _flag_list.append('fin') elif flags & dpkt.tcp.TH_RST: _flag_list.append('rst') elif flags & dpkt.tcp.TH_PUSH: _flag_list.append('psh') return _flag_list
[docs]def test(): """Test for TransportMeta class""" import pprint # Local imports from chains.sources import packet_streamer from chains.links import packet_meta, reverse_dns # Create a PacketStreamer, a PacketMeta, and link them to TransportMeta data_path = file_utils.relative_dir(__file__, '../../data/http.pcap') streamer = packet_streamer.PacketStreamer(iface_name=data_path, max_packets=50) meta = packet_meta.PacketMeta() rdns = reverse_dns.ReverseDNS() tmeta = TransportMeta() # Set up chain meta.link(streamer) rdns.link(meta) tmeta.link(rdns) # Print out the tags for item in tmeta.output_stream: pprint.pprint(item)
if __name__ == '__main__': test()