Skip to content
Snippets Groups Projects
Commit 8932dc36 authored by Stuart Gathman's avatar Stuart Gathman
Browse files

Move parse_addr, iniplist, ip4re to Milter.utils

parent bda654b7
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@ include cid2spf.py
include spfquery.py
include test.py
include sample.py
include spfmilter.py
include test/*
include doc/*
include Milter/*.py
......
import re
import struct
import socket
from fnmatch import fnmatchcase
ip4re = re.compile(r'^[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*$')
# from spf.py
def addr2bin(str):
"Convert a string IPv4 address into an unsigned integer."
return struct.unpack("!L", socket.inet_aton(str))[0]
MASK = 0xFFFFFFFFL
def cidr(i,n):
return ~(MASK >> n) & MASK & i
def iniplist(ipaddr,iplist):
"""Return whether ip is in cidr list
>>> iniplist('66.179.26.146',['127.0.0.1','66.179.26.128/26'])
True
>>> iniplist('127.0.0.1',['127.0.0.1','66.179.26.128/26'])
True
>>> iniplist('192.168.0.45',['192.168.0.*'])
True
"""
ipnum = addr2bin(ipaddr)
for pat in iplist:
p = pat.split('/',1)
if ip4re.match(p[0]):
if len(p) > 1:
n = int(p[1])
else:
n = 32
if cidr(addr2bin(p[0]),n) == cidr(ipnum,n):
return True
elif fnmatchcase(ipaddr,pat):
return True
return False
def parse_addr(t):
"""Split email into user,domain.
>>> parse_addr('user@example.com')
['user', 'example.com']
>>> parse_addr('"user@example.com"')
['user@example.com']
>>> parse_addr('"user@bar"@example.com')
['user@bar', 'example.com']
>>> parse_addr('foo')
['foo']
"""
if t.startswith('<') and t.endswith('>'): t = t[1:-1]
if t.startswith('"'):
if t.endswith('"'): return [t[1:-1]]
pos = t.find('"@')
if pos > 0: return [t[1:pos],t[pos+2:]]
return t.split('@')
#!/usr/bin/env python
# A simple milter that has grown quite a bit.
# $Log$
# Revision 1.79 2007/01/05 21:25:40 customdesigned
# Move AddrCache to Milter package.
#
# Revision 1.78 2007/01/04 18:01:10 customdesigned
# Do plain CBV when template missing.
#
......@@ -53,6 +56,7 @@ import gc
import anydbm
import Milter.dsn as dsn
from Milter.dynip import is_dynip as dynip
from Milter.utils import iniplist,parse_addr,ip4re
from fnmatch import fnmatchcase
from email.Header import decode_header
......@@ -77,8 +81,6 @@ except: SES = None
try: import spf
except: spf = None
ip4re = re.compile(r'^[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*$')
# Sometimes, MTAs reply to our DSN. We recognize this type of reply/DSN
# and check for the original recipient SRS encoded in Message-ID.
# If found, we blacklist that recipient.
......@@ -362,25 +364,6 @@ def read_config(list):
srs_domain.add(cp.getdefault('srs','fwdomain'))
banned_users = cp.getlist('srs','banned_users')
def parse_addr(t):
"""Split email into user,domain.
>>> parse_addr('user@example.com')
['user', 'example.com']
>>> parse_addr('"user@example.com"')
['user@example.com']
>>> parse_addr('"user@bar"@example.com')
['user@bar', 'example.com']
>>> parse_addr('foo')
['foo']
"""
if t.startswith('<') and t.endswith('>'): t = t[1:-1]
if t.startswith('"'):
if t.endswith('"'): return [t[1:-1]]
pos = t.find('"@')
if pos > 0: return [t[1:pos],t[pos+2:]]
return t.split('@')
def parse_header(val):
"""Decode headers gratuitously encoded to hide the content.
"""
......@@ -480,39 +463,6 @@ class SPFPolicy(object):
policy = 'OK'
return policy
# from spf.py
def addr2bin(str):
"Convert a string IPv4 address into an unsigned integer."
return struct.unpack("!L", socket.inet_aton(str))[0]
MASK = 0xFFFFFFFFL
def cidr(i,n):
return ~(MASK >> n) & MASK & i
def iniplist(ipaddr,iplist):
"""Return whether ip is in cidr list
>>> iniplist('66.179.26.146',['127.0.0.1','66.179.26.128/26'])
True
>>> iniplist('127.0.0.1',['127.0.0.1','66.179.26.128/26'])
True
>>> iniplist('192.168.0.45',['192.168.0.*'])
True
"""
ipnum = addr2bin(ipaddr)
for pat in iplist:
p = pat.split('/',1)
if ip4re.match(p[0]):
if len(p) > 1:
n = int(p[1])
else:
n = 32
if cidr(addr2bin(p[0]),n) == cidr(ipnum,n):
return True
elif fnmatchcase(ipaddr,pat):
return True
return False
from Milter.cache import AddrCache
cbv_cache = AddrCache(renew=7)
......
......@@ -14,6 +14,8 @@ import struct
import socket
import syslog
from Milter.utils import iniplist,parse_addr
syslog.openlog('spfmilter',0,syslog.LOG_MAIL)
# list of trusted forwarder domains. An SPF record for a forwarder
......@@ -27,60 +29,7 @@ trusted_relay = []
socketname = "/var/run/milter/spfmiltersock"
#socketname = os.getenv("HOME") + "/pythonsock"
ip4re = re.compile(r'^[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*$')
# from spf.py
def addr2bin(str):
"Convert a string IPv4 address into an unsigned integer."
return struct.unpack("!L", socket.inet_aton(str))[0]
MASK = 0xFFFFFFFFL
def cidr(i,n):
return ~(MASK >> n) & MASK & i
def iniplist(ipaddr,iplist):
"""Return whether ip is in cidr list
>>> iniplist('66.179.26.146',['127.0.0.1','66.179.26.128/26'])
True
>>> iniplist('127.0.0.1',['127.0.0.1','66.179.26.128/26'])
True
>>> iniplist('192.168.0.45',['192.168.0.*'])
True
"""
ipnum = addr2bin(ipaddr)
for pat in iplist:
p = pat.split('/',1)
if ip4re.match(p[0]):
if len(p) > 1:
n = int(p[1])
else:
n = 32
if cidr(addr2bin(p[0]),n) == cidr(ipnum,n):
return True
elif fnmatchcase(ipaddr,pat):
return True
return False
def parse_addr(t):
"""Split email into user,domain.
>>> parse_addr('user@example.com')
['user', 'example.com']
>>> parse_addr('"user@example.com"')
['user@example.com']
>>> parse_addr('"user@bar"@example.com')
['user@bar', 'example.com']
>>> parse_addr('foo')
['foo']
"""
if t.startswith('<') and t.endswith('>'): t = t[1:-1]
if t.startswith('"'):
if t.endswith('"'): return [t[1:-1]]
pos = t.find('"@')
if pos > 0: return [t[1:pos],t[pos+2:]]
return t.split('@')
miltername = "pyspffilter"
class spfMilter(Milter.Milter):
"Milter to check SPF."
......@@ -221,11 +170,11 @@ if __name__ == "__main__":
Milter.set_flags(Milter.CHGHDRS + Milter.ADDHDRS)
print """To use this with sendmail, add the following to sendmail.cf:
O InputMailFilters=pyspffilter
Xpyspffilter, S=local:%s
O InputMailFilters=%s
X%s, S=local:%s
See the sendmail README for libmilter.
sample spfmilter startup""" % socketname
sample spfmilter startup""" % (miltername,miltername,socketname)
sys.stdout.flush()
Milter.runmilter("pyspffilter",socketname,240)
print "sample spfmilter shutdown"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment