diff --git a/Milter/__init__.py b/Milter/__init__.py index 8d8ac93af4776f389194eb70724c6f076bebee6c..e406b2e6289406524d0fd1681e07898f3044ed89 100755 --- a/Milter/__init__.py +++ b/Milter/__init__.py @@ -9,7 +9,7 @@ # This code is under the GNU General Public License. See COPYING for details. from __future__ import print_function -__version__ = '1.0.2' +__version__ = '1.0.3' import os import re @@ -419,12 +419,12 @@ class Base(object): func = getattr(self,func) syms = getattr(func,'_symlist',None) if syms is not None: - self.setsymlist(stage,syms) + self.setsymlist(stage,*syms) opts[1] = self._protocol = p & ~self.protocol_mask() opts[2] = 0 opts[3] = 0 #self.log("Negotiated:",opts) - except: + except Exception as x: # don't change anything if something went wrong return ALL_OPTS return CONTINUE diff --git a/Milter/test.py b/Milter/test.py index e73fbb2f0f5cdc1f96e589927ca6561f9d39cf21..846752e2d08723290fc682193d4c55af22c6f8a7 100644 --- a/Milter/test.py +++ b/Milter/test.py @@ -14,6 +14,7 @@ Milter.NOREPLY = Milter.CONTINUE ## Test mixin for unit testing %milter applications. # This mixin overrides many Milter.MilterBase methods # with stub versions that simply record what was done. +# @deprecated Use Milter.test.TestCtx # @since 0.9.8 class TestBase(object): @@ -138,6 +139,7 @@ class TestBase(object): raise ValueError('setsymlist limited to 5 macros by MTA') if self._symlist[stage] is not None: raise ValueError('setsymlist already called for stage:'+stage) + print('setsymlist',stage,a) self._symlist[stage] = set(a) ## Feed a file like object to the %milter. Calls envfrom, envrcpt for diff --git a/Milter/testctx.py b/Milter/testctx.py new file mode 100644 index 0000000000000000000000000000000000000000..6e79807a432a9248bc732f654a3e7dd1f12e4eea --- /dev/null +++ b/Milter/testctx.py @@ -0,0 +1,294 @@ +## @package Milter.testctx +# A test framework for milters that replaces milterContext rather +# than Milter.Base. Since miltermodule.c doesn't currently export +# a way to query callbacks set (and we might want to run without +# loading milter), we assume the callbacks set by Milter.runmilter(). + +from __future__ import print_function +from socket import AF_INET,AF_INET6 +import time +import mime +try: + from io import BytesIO +except: + from StringIO import StringIO as BytesIO +import Milter +from Milter import utils +import mime + +## Milter context for unit testing %milter applications. +# A substitute for milter.milterContext that can be passed to +# Milter.Base._setctx(). +# @since 1.0.3 +class TestCtx(object): + default_opts = [Milter.CURR_ACTS,0x1fffff,0,0] + def __init__(self,logfile='test/milter.log'): + ## Usually the Milter application derived from Milter.Base + self._priv = None + ## List of recipients deleted + self._delrcpt = [] + ## List of recipients added + self._addrcpt = [] + ## Macros defined + self._macros = { } + ## Reply codes and messages set by the %milter + self._reply = None + ## The macros returned by protocol stage + self._symlist = [ None, None, None, None, None, None, None ] + ## The message body. + self._body = None + ## True if the %milter replaced the message body. + self._bodyreplaced = False + ## True if the %milter changed any headers. + self._headerschanged = False + ## The rfc822 message object for the current email being fed to the %milter. + self._msg = None + ## The MAIL FROM for the current email being fed to the %milter + self._sender = None + ## True if the %milter changed the envelope from. + self._envfromchanged = False + ## List of recipients added + self._addrcpt = [] + ## Negotiated options + self._opts = TestCtx.default_opts + ## Last activity + self._activity = time.time() + + def getpriv(self): + return self._priv + + def setpriv(self,priv): + self._priv = priv + + def getsymval(self,name): + stage = self._stage + if stage >= 0: + syms = self._symlist[stage] + if syms is not None and name not in syms: + return syms + return self._macros.get(name,'notfound') + + def _setsymval(self,name,val): + self._macros[name] = val + + def setreply(self,rcode,xcode,*msg): + self._reply = (rcode,xcode) + msg + + def setsymlist(self,stage,macros): + if self._stage != -1: + raise RuntimeError("setsymlist may only be called from negotiate") + # Records which macros are available to getsymval() + m = macros + try: + m = m.encode('utf8') + except: pass + try: + m = m.split(b' ') + except: pass + if len(m) > 5: + raise ValueError('setsymlist limited to 5 macros by MTA') + if self._symlist[stage] is not None: + raise ValueError('setsymlist already called for stage:'+stage) + if not m: + raise ValueError('setsymlist with empty list for stage:'+stage) + self._symlist[stage] = set(m) + + def addheader(self,field,value,idx): + if not self._body: + raise IOError("addheader not called from eom()") + self._msg[field] = value + self._headerschanged = True + + def chgheader(self,field,idx,value): + if not self._body: + raise IOError("chgheader not called from eom()") + if value == '': + del self._msg[field] + else: + self._msg[field] = value + self._headerschanged = True + + def addrcpt(self,rcpt,params): + if not self._body: + raise IOError("addrcpt not called from eom()") + self._addrcpt.append((rcpt,params)) + + def delrcpt(self,rcpt): + if not self._body: + raise IOError("delrcpt not called from eom()") + self._delrcpt.append(rcpt) + + def replacebody(self,chunk): + if self._body: + self._body.write(chunk) + self._bodyreplaced = True + else: + raise IOError("replacebody not called from eom()") + + def chgfrom(self,sender,params=None): + if not self._body: + raise IOError("chgfrom not called from eom()") + self._envfromchanged = True + self._sender = sender + + def quarantine(self,reason): + raise NotImplemented + + ## Reset activity timer. + def progress(self): + self._activity = time.time() + + def _abort(self): + "What Milter sets for abort_callback" + self._priv.abort() + self._close() + + def _close(self): + Milter.close_callback(self) + + def _negotiate(self): + self._body = None + self._bodyreplaced = False + self._priv = None + self._opts = TestCtx.default_opts + self._stage = -1 + rc = Milter.negotiate_callback(self,self._opts) + if rc == Milter.ALL_OPTS: + self._opts = TestCtx.default_opts + elif rc != Milter.CONTINUE: + self._abort() + self._close() + self._protocol = self._opts[1] + return rc + + def _connect(self,host='localhost',helo='spamrelay',ip='1.2.3.4'): + rc = self._negotiate() + # FIXME: what if not CONTINUE or ALL_OPTS? + if self._protocol & Milter.P_NOCONNECT: + return Milter.CONTINUE + if utils.ip4re.match(ip): + af = AF_INET + elif utils.ip6re.match(ip): + af = AF_INET6 + else: + raise ValueError('TestCtx.connect: invalid ip address: '+ip) + self._stage = Milter.M_CONNECT + rc = Milter.connect_callback(self,host,af,ip) + self._stage = None + if rc != Milter.CONTINUE: + self._close() + return rc + return self._helo(helo) + + def _helo(self,helo): + if self._protocol & Milter.P_NOHELO: + return Milter.CONTINUE + self._stage = Milter.M_HELO + rc = self._priv.hello(helo) + self._stage = None + if rc != Milter.CONTINUE: + self._close() + return rc + + def _envfrom(self,*s): + self._sender = s[0] + if self._protocol & Milter.P_NOMAIL: + return Milter.CONTINUE + self._stage = Milter.M_ENVFROM + rc = self._priv.envfrom(*s) + self._stage = None + return rc + + def _envrcpt(self,s): + if self._protocol & Milter.P_NORCPT: + return Milter.CONTINUE + self._stage = Milter.M_ENVRCPT + rc = self._priv.envrcpt(s) + self._stage = None + return rc + + def _data(self): + if self._protocol & Milter.P_NODATA: + return Milter.CONTINUE + self._stage = Milter.M_DATA + rc = self._priv.data() + self._stage = None + return rc + + def _header(self,fld,val): + return self._priv.header(fld,val) + + def _eoh(self): + if self._protocol & Milter.P_NOEOH: + return Milter.CONTINUE + self._stage = Milter.M_EOH + rc = self._priv.eoh() + self._stage = None + return rc + + def _feed_body(self,bfp): + if self._protocol & Milter.P_NOBODY: + return Milter.CONTINUE + while True: + buf = bfp.read(8192) + if len(buf) == 0: break + rc = self._priv.body(buf) + if rc != Milter.CONTINUE: return rc + return Milter.CONTINUE + + def _eom(self): + self._body = BytesIO() + self._stage = Milter.M_EOM + rc = self._priv.eom() + self._stage = None + return rc + + ## Feed a file like object to the ctx. Calls the callbacks in + # the same sequence as libmilter. + # @param fp the file with rfc2822 message stream + # @param sender the MAIL FROM + # @param rcpt RCPT TO - additional recipients may follow + def _feedFile(self,fp,sender="spam@adv.com",rcpt="victim@lamb.com",*rcpts): + self._body = None + self._bodyreplaced = False + self._headerschanged = False + self._reply = None + msg = mime.message_from_file(fp) + self._msg = msg + # envfrom + rc = self._envfrom('<%s>'%sender) + if rc != Milter.CONTINUE: return rc + # envrcpt + for rcpt in (rcpt,) + rcpts: + rc = self._envrcpt('<%s>'%rcpt) + if rc != Milter.CONTINUE: return rc + # data + rc = self._data() + if rc != Milter.CONTINUE: return rc + # header + for h,val in msg.items(): + rc = self._header(h,val) + if rc != Milter.CONTINUE: return rc + # eoh + rc = self._eoh() + if rc != Milter.CONTINUE: return rc + # body + header,body = msg.as_bytes().split(b'\n\n',1) + rc = self._feed_body(BytesIO(body)) + if rc != Milter.CONTINUE: return rc + rc = self._eom() + if self._bodyreplaced: + body = self._body.getvalue() + self._body = BytesIO() + self._body.write(header) + self._body.write(b'\n\n') + self._body.write(body) + return rc + + ## Feed an email contained in a file to the %milter. + # This is a convenience method that invokes @link #feedFile feedFile @endlink. + # @param sender MAIL FROM + # @param rcpts RCPT TO, multiple recipients may be supplied + def _feedMsg(self,fname,sender="spam@adv.com",*rcpts): + with open('test/'+fname,'rb') as fp: + return self._feedFile(fp,sender,*rcpts) diff --git a/testsample.py b/testsample.py index b21647da2c110c82036a799a32a4673e7266a7f2..509cd360263b6538fd2375852f7bb0adc3bdb40c 100644 --- a/testsample.py +++ b/testsample.py @@ -3,6 +3,7 @@ import Milter import sample import mime from Milter.test import TestBase +from Milter.testctx import TestCtx class TestMilter(TestBase,sample.sampleMilter): def __init__(self): @@ -11,6 +12,31 @@ class TestMilter(TestBase,sample.sampleMilter): class BMSMilterTestCase(unittest.TestCase): + def testCtx(self,fname='virus1'): + ctx = TestCtx() + Milter.factory = sample.sampleMilter + ctx._setsymval('{auth_authen}','batman') + ctx._setsymval('{auth_type}','batcomputer') + ctx._setsymval('j','mailhost') + rc = ctx._connect() + self.assertTrue(rc == Milter.CONTINUE) + rc = ctx._feedMsg(fname) + milter = ctx.getpriv() +# self.assertTrue(milter.user == 'batman',"getsymval failed: "+ +# "%s != %s"%(milter.user,'batman')) + self.assertEquals(milter.user,'batman') + self.assertTrue(milter.auth_type != 'batcomputer',"setsymlist failed") + self.assertTrue(rc == Milter.ACCEPT) + self.assertTrue(ctx._bodyreplaced,"Message body not replaced") + fp = ctx._body + open('test/'+fname+".tstout","wb").write(fp.getvalue()) + #self.assertTrue(fp.getvalue() == open("test/virus1.out","r").read()) + fp.seek(0) + msg = mime.message_from_file(fp) + s = msg.get_payload(1).get_payload() + milter.log(s) + ctx._close() + def testDefang(self,fname='virus1'): milter = TestMilter() milter.setsymval('{auth_authen}','batman')