diff --git a/Milter/pyip6.py b/Milter/pyip6.py new file mode 100644 index 0000000000000000000000000000000000000000..8cf8835d46cbaa0d07eb6fb2580a85a043cad13f --- /dev/null +++ b/Milter/pyip6.py @@ -0,0 +1,117 @@ +"""Pure Python IP6 parsing and formatting + +Copyright (c) 2006 Stuart Gathman <stuart@bmsi.com> + +This module is free software, and you may redistribute it and/or modify +it under the same terms as Python itself, so long as this copyright message +and disclaimer are retained in their original form. +""" +import struct +#from spf import RE_IP4 +import re +PAT_IP4 = r'\.'.join([r'(?:\d|[1-9]\d|1\d\d|2[0-4]\d|25[0-5])']*4) +RE_IP4 = re.compile(PAT_IP4+'$') + +def inet_ntop(s): + """ + Convert ip6 address to standard hex notation. + + Examples: + + >>> inet_ntop(struct.pack("!HHHHHHHH",0,0,0,0,0,0xFFFF,0x0102,0x0304)) + '::FFFF:1.2.3.4' + + >>> inet_ntop(struct.pack("!HHHHHHHH",0x1234,0x5678,0,0,0,0,0x0102,0x0304)) + '1234:5678::102:304' + + >>> inet_ntop(struct.pack("!HHHHHHHH",0,0,0,0x1234,0x5678,0,0x0102,0x0304)) + '::1234:5678:0:102:304' + + >>> inet_ntop(struct.pack("!HHHHHHHH",0x1234,0x5678,0,0x0102,0x0304,0,0,0)) + '1234:5678:0:102:304::' + + >>> inet_ntop(struct.pack("!HHHHHHHH",0,0,0,0,0,0,0,0)) + '::' + """ + # convert to 8 words + a = struct.unpack("!HHHHHHHH",s) + n = (0,0,0,0,0,0,0,0) # null ip6 + if a == n: return '::' + # check for ip4 mapped + if a[:5] == (0,0,0,0,0) and a[5] in (0,0xFFFF): + ip4 = '.'.join([str(i) for i in struct.unpack("!BBBB",s[12:])]) + if a[5]: + return "::FFFF:" + ip4 + return "::" + ip4 + # find index of longest sequence of 0 + for l in (7,6,5,4,3,2,1): + e = n[:l] + for i in range(9-l): + if a[i:i+l] == e: + if i == 0: + return ':'+':%x'*(8-l) % a[l:] + if i == 8 - l: + return '%x:'*(8-l) % a[:-l] + ':' + return '%x:'*i % a[:i] + ':%x'*(8-l-i) % a[i+l:] + return "%x:%x:%x:%x:%x:%x:%x:%x" % a + +def inet_pton(p): + """ + Convert ip6 standard hex notation to ip6 address. + + Examples: + + >>> struct.unpack('!HHHHHHHH',inet_pton('::')) + (0, 0, 0, 0, 0, 0, 0, 0) + + >>> struct.unpack('!HHHHHHHH',inet_pton('::1234')) + (0, 0, 0, 0, 0, 0, 0, 4660) + + >>> struct.unpack('!HHHHHHHH',inet_pton('1234::')) + (4660, 0, 0, 0, 0, 0, 0, 0) + + >>> struct.unpack('!HHHHHHHH',inet_pton('1234::5678')) + (4660, 0, 0, 0, 0, 0, 0, 22136) + + >>> struct.unpack('!HHHHHHHH',inet_pton('::FFFF:1.2.3.4')) + (0, 0, 0, 0, 0, 65535, 258, 772) + + >>> struct.unpack('!HHHHHHHH',inet_pton('1.2.3.4')) + (0, 0, 0, 0, 0, 65535, 258, 772) + + >>> try: inet_pton('::1.2.3.4.5') + ... except ValueError,x: print x + ::1.2.3.4.5 + """ + if p == '::': + return '\0'*16 + s = p + m = RE_IP4.search(s) + try: + if m: + pos = m.start() + ip4 = [int(i) for i in s[pos:].split('.')] + if not pos: + return struct.pack('!QLBBBB',0,65535,*ip4) + s = s[:pos]+'%x%02x:%x%02x'%tuple(ip4) + a = s.split('::') + if len(a) == 2: + l,r = a + if not l: + r = r.split(':') + return struct.pack('!HHHHHHHH', + *[0]*(8-len(r)) + [int(s,16) for s in r]) + if not r: + l = l.split(':') + return struct.pack('!HHHHHHHH', + *[int(s,16) for s in l] + [0]*(8-len(l))) + l = l.split(':') + r = r.split(':') + return struct.pack('!HHHHHHHH', + *[int(s,16) for s in l] + [0]*(8-len(l)-len(r)) + + [int(s,16) for s in r]) + if len(a) == 1: + return struct.pack('!HHHHHHHH', + *[int(s,16) for s in a[0].split(':')]) + except ValueError: pass + raise ValueError,p diff --git a/Milter/utils.py b/Milter/utils.py index 93f8645a475fa8e9427c310f7099ebdc3b4aef84..5fa3f8807eb6d00162ba7497a458d747bbffadac 100644 --- a/Milter/utils.py +++ b/Milter/utils.py @@ -11,17 +11,48 @@ from email.Header import decode_header #import email.Utils import rfc822 -ip4re = re.compile(r'^[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*$') +PAT_IP4 = r'\.'.join([r'(?:\d|[1-9]\d|1\d\d|2[0-4]\d|25[0-5])']*4) +ip4re = re.compile(PAT_IP4+'$') +ip6re = re.compile( '(?:%(hex4)s:){6}%(ls32)s$' + '|::(?:%(hex4)s:){5}%(ls32)s$' + '|(?:%(hex4)s)?::(?:%(hex4)s:){4}%(ls32)s$' + '|(?:(?:%(hex4)s:){0,1}%(hex4)s)?::(?:%(hex4)s:){3}%(ls32)s$' + '|(?:(?:%(hex4)s:){0,2}%(hex4)s)?::(?:%(hex4)s:){2}%(ls32)s$' + '|(?:(?:%(hex4)s:){0,3}%(hex4)s)?::%(hex4)s:%(ls32)s$' + '|(?:(?:%(hex4)s:){0,4}%(hex4)s)?::%(ls32)s$' + '|(?:(?:%(hex4)s:){0,5}%(hex4)s)?::%(hex4)s$' + '|(?:(?:%(hex4)s:){0,6}%(hex4)s)?::$' + % { + 'ls32': r'(?:[0-9a-f]{1,4}:[0-9a-f]{1,4}|%s)'%PAT_IP4, + 'hex4': r'[0-9a-f]{1,4}' + }, re.IGNORECASE) # from spf.py def addr2bin(str): """Convert a string IPv4 address into an unsigned integer.""" - return struct.unpack("!L", socket.inet_aton(str))[0] + try: + return struct.unpack("!L", socket.inet_aton(str))[0] + except socket.error: + raise socket.error("Invalid IP4 address: "+str) + +def bin2long6(str): + """Convert binary IP6 address into an unsigned Python long integer.""" + h, l = struct.unpack("!QQ", str) + return h << 64 | l + +if hasattr(socket,'has_ipv6') and socket.has_ipv6: + def inet_ntop(s): + return socket.inet_ntop(socket.AF_INET6,s) + def inet_pton(s): + return socket.inet_pton(socket.AF_INET6,s) +else: + from pyip6 import inet_ntop, inet_pton MASK = 0xFFFFFFFFL +MASK6 = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFL -def cidr(i,n): - return ~(MASK >> n) & MASK & i +def cidr(i,n,mask=MASK): + return ~(mask >> n) & mask & i def iniplist(ipaddr,iplist): """Return whether ip is in cidr list @@ -31,8 +62,17 @@ def iniplist(ipaddr,iplist): True >>> iniplist('192.168.0.45',['192.168.0.*']) True + >>> iniplist('2001:610:779:0:223:6cff:fe9a:9cf3',['127.0.0.1','172.20.1.0/24','2001:610:779::/48']) + True + >>> iniplist('2G01:610:779:0:223:6cff:fe9a:9cf3',['127.0.0.1','172.20.1.0/24','2001:610:779::/48']) + True """ - ipnum = addr2bin(ipaddr) + if ip4re.match(ipaddr): + ipnum = addr2bin(ipaddr) + elif ip6re.match(ipaddr): + ipnum = bin2long6(inet_pton(ipaddr)) + else: + raise ValueError('Invalid ip syntax:'+ipaddr) for pat in iplist: p = pat.split('/',1) if ip4re.match(p[0]): @@ -42,6 +82,13 @@ def iniplist(ipaddr,iplist): n = 32 if cidr(addr2bin(p[0]),n) == cidr(ipnum,n): return True + elif ip6re.match(p[0]): + if len(p) > 1: + n = int(p[1]) + else: + n = 128 + if cidr(bin2long6(inet_pton(p[0])),n,MASK6) == cidr(ipnum,n,MASK6): + return True elif fnmatchcase(ipaddr,pat): return True return False