# pydhcplib # Copyright (C) 2008 Mathieu Ignacio -- mignacio@april.org # # This file is part of pydhcplib. # Pydhcplib is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import operator from struct import unpack from struct import pack from dhcp_basic_packet import * from dhcp_constants import * from type_ipv4 import ipv4 from type_strlist import strlist from type_hwmac import hwmac import sys class DhcpPacket(DhcpBasicPacket): def str(self): # Process headers : printable_data = "# Header fields\n" op = self.packet_data[DhcpFields['op'][0]:DhcpFields['op'][0]+DhcpFields['op'][1]] printable_data += "op : " + DhcpFieldsName['op'][str(op[0])] + "\n" for opt in ['htype','hlen','hops','xid','secs','flags', 'ciaddr','yiaddr','siaddr','giaddr','chaddr','sname','file'] : begin = DhcpFields[opt][0] end = DhcpFields[opt][0]+DhcpFields[opt][1] data = self.packet_data[begin:end] result = '' if DhcpFieldsTypes[opt] == "int" : result = str(data[0]) elif DhcpFieldsTypes[opt] == "int2" : result = str(data[0]*256+data[1]) elif DhcpFieldsTypes[opt] == "int4" : result = str(ipv4(data).int()) elif DhcpFieldsTypes[opt] == "str" : for each in data : if each != 0 : result += chr(each) else : break elif DhcpFieldsTypes[opt] == "ipv4" : result = ipv4(data).str() elif DhcpFieldsTypes[opt] == "hwmac" : result = [] hexsym = ['0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'] for iterator in range(6) : result += [str(hexsym[data[iterator]/16]+hexsym[data[iterator]%16])] result = ':'.join(result) printable_data += opt+" : "+result + "\n" # Process options : printable_data += "# Options fields\n" for opt in self.options_data.keys(): data = self.options_data[opt] result = "" optnum = DhcpOptions[opt] if opt=='dhcp_message_type' : result = DhcpFieldsName['dhcp_message_type'][str(data[0])] elif DhcpOptionsTypes[optnum] == "char" : result = str(data[0]) elif DhcpOptionsTypes[optnum] == "16-bits" : result = str(data[0]*256+data[0]) elif DhcpOptionsTypes[optnum] == "32-bits" : result = str(ipv4(data).int()) elif DhcpOptionsTypes[optnum] == "string" : for each in data : if each != 0 : result += chr(each) else : break elif DhcpOptionsTypes[optnum] == "ipv4" : result = ipv4(data).str() elif DhcpOptionsTypes[optnum] == "ipv4+" : for i in range(0,len(data),4) : if len(data[i:i+4]) == 4 : result += ipv4(data[i:i+4]).str() + " - " elif DhcpOptionsTypes[optnum] == "char+" : if optnum == 55 : # parameter_request_list result = ','.join([DhcpOptionsList[each] for each in data]) else : result += str(data) printable_data += opt + " : " + result + "\n" return printable_data def AddLine(self,_string) : (parameter,junk,value) = _string.partition(':') parameter = parameter.strip() # If value begin with a whitespace, remove it, leave others if len(value)>0 and value[0] == ' ' : value = value[1:] value = self._OptionsToBinary(parameter,value) if value : self.SetOption(parameter,value) def _OptionsToBinary(self,parameter,value) : # Transform textual data into dhcp binary data p = parameter.strip() # 1- Search for header informations or specific parameter if p == 'op' or p == 'htype' : value = value.strip() if value.isdigit() : return [int(value)] try : value = DhcpNames[value.strip()] return [value] except KeyError : return [0] elif p == 'hlen' or p == 'hops' : try : value = int(value) return [value] except ValueError : return [0] elif p == 'secs' or p == 'flags' : try : value = ipv4(int(value)).list() except ValueError : value = [0,0,0,0] return value[2:] elif p == 'xid' : try : value = ipv4(int(value)).list() except ValueError : value = [0,0,0,0] return value elif p == 'ciaddr' or p == 'yiaddr' or p == 'siaddr' or p == 'giaddr' : try : ip = ipv4(value).list() except ValueError : ip = [0,0,0,0] return ip elif p == 'chaddr' : try: value = hwmac(value).list()+[0]*10 except ValueError,TypeError : value = [0]*16 return value elif p == 'sname' : return elif p == 'file' : return elif p == 'parameter_request_list' : value = value.strip().split(',') tmp = [] for each in value: if DhcpOptions.has_key(each) : tmp.append(DhcpOptions[each]) return tmp elif p=='dhcp_message_type' : try : return [DhcpNames[value]] except KeyError: return # 2- Search for options try : option_type = DhcpOptionsTypes[DhcpOptions[parameter]] except KeyError : return False if option_type == "ipv4" : # this is a single ip address try : binary_value = map(int,value.split(".")) except ValueError : return False elif option_type == "ipv4+" : # this is multiple ip address iplist = value.split(",") opt = [] for single in iplist : opt += (ipv4(single).list()) binary_value = opt elif option_type == "32-bits" : # This is probably a number... try : digit = int(value) binary_value = [digit>>24&0xFF,(digit>>16)&0xFF,(digit>>8)&0xFF,digit&0xFF] except ValueError : return False elif option_type == "16-bits" : try : digit = int(value) binary_value = [(digit>>8)&0xFF,digit&0xFF] except ValueError : return False elif option_type == "char" : try : digit = int(value) binary_value = [digit&0xFF] except ValueError : return False elif option_type == "bool" : if value=="False" or value=="false" or value==0 : binary_value = [0] else : binary_value = [1] elif option_type == "string" : binary_value = strlist(value).list() else : binary_value = strlist(value).list() return binary_value # FIXME: This is called from IsDhcpSomethingPacket, but is this really # needed? Or maybe this testing should be done in # DhcpBasicPacket.DecodePacket(). # Test Packet Type def IsDhcpSomethingPacket(self,type): if self.IsDhcpPacket() == False : return False if self.IsOption("dhcp_message_type") == False : return False if self.GetOption("dhcp_message_type") != type : return False return True def IsDhcpDiscoverPacket(self): return self.IsDhcpSomethingPacket([1]) def IsDhcpOfferPacket(self): return self.IsDhcpSomethingPacket([2]) def IsDhcpRequestPacket(self): return self.IsDhcpSomethingPacket([3]) def IsDhcpDeclinePacket(self): return self.IsDhcpSomethingPacket([4]) def IsDhcpAckPacket(self): return self.IsDhcpSomethingPacket([5]) def IsDhcpNackPacket(self): return self.IsDhcpSomethingPacket([6]) def IsDhcpReleasePacket(self): return self.IsDhcpSomethingPacket([7]) def IsDhcpInformPacket(self): return self.IsDhcpSomethingPacket([8]) def GetMultipleOptions(self,options=()): result = {} for each in options: result[each] = self.GetOption(each) return result def SetMultipleOptions(self,options={}): for each in options.keys(): self.SetOption(each,options[each]) # Creating Response Packet # Server-side functions # From RFC 2132 page 28/29 def CreateDhcpOfferPacketFrom(self,src): # src = discover packet self.SetOption("htype",src.GetOption("htype")) self.SetOption("xid",src.GetOption("xid")) self.SetOption("flags",src.GetOption("flags")) self.SetOption("giaddr",src.GetOption("giaddr")) self.SetOption("chaddr",src.GetOption("chaddr")) self.SetOption("ip_address_lease_time",src.GetOption("ip_address_lease_time")) self.TransformToDhcpOfferPacket() def TransformToDhcpOfferPacket(self): self.SetOption("dhcp_message_type",[2]) self.SetOption("op",[2]) self.SetOption("hlen",[6]) self.DeleteOption("secs") self.DeleteOption("ciaddr") self.DeleteOption("request_ip_address") self.DeleteOption("parameter_request_list") self.DeleteOption("client_identifier") self.DeleteOption("maximum_message_size") """ Dhcp ACK packet creation """ def CreateDhcpAckPacketFrom(self,src): # src = request or inform packet self.SetOption("htype",src.GetOption("htype")) self.SetOption("xid",src.GetOption("xid")) self.SetOption("ciaddr",src.GetOption("ciaddr")) self.SetOption("flags",src.GetOption("flags")) self.SetOption("giaddr",src.GetOption("giaddr")) self.SetOption("chaddr",src.GetOption("chaddr")) self.SetOption("ip_address_lease_time_option",src.GetOption("ip_address_lease_time_option")) self.TransformToDhcpAckPacket() def TransformToDhcpAckPacket(self): # src = request or inform packet self.SetOption("op",[2]) self.SetOption("hlen",[6]) self.SetOption("dhcp_message_type",[5]) self.DeleteOption("secs") self.DeleteOption("request_ip_address") self.DeleteOption("parameter_request_list") self.DeleteOption("client_identifier") self.DeleteOption("maximum_message_size") """ Dhcp NACK packet creation """ def CreateDhcpNackPacketFrom(self,src): # src = request or inform packet self.SetOption("htype",src.GetOption("htype")) self.SetOption("xid",src.GetOption("xid")) self.SetOption("flags",src.GetOption("flags")) self.SetOption("giaddr",src.GetOption("giaddr")) self.SetOption("chaddr",src.GetOption("chaddr")) self.TransformToDhcpNackPacket() def TransformToDhcpNackPacket(self): self.SetOption("op",[2]) self.SetOption("hlen",[6]) self.DeleteOption("secs") self.DeleteOption("ciaddr") self.DeleteOption("yiaddr") self.DeleteOption("siaddr") self.DeleteOption("sname") self.DeleteOption("file") self.DeleteOption("request_ip_address") self.DeleteOption("ip_address_lease_time_option") self.DeleteOption("parameter_request_list") self.DeleteOption("client_identifier") self.DeleteOption("maximum_message_size") self.SetOption("dhcp_message_type",[6]) """ GetClientIdentifier """ def GetClientIdentifier(self) : if self.IsOption("client_identifier") : return self.GetOption("client_identifier") return [] def GetGiaddr(self) : return self.GetOption("giaddr") def GetHardwareAddress(self) : length = self.GetOption("hlen")[0] full_hw = self.GetOption("chaddr") if length!=[] and length