Skip to content
Snippets Groups Projects
testbms.py 10.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • import unittest
    
    import doctest
    
    import Milter
    import bms
    import mime
    import rfc822
    import StringIO
    
    import email
    import sys
    
    #import pdb
    
    class TestMilter(bms.bmsMilter):
    
      def __init__(self):
        bms.bmsMilter.__init__(self)
        self.logfp = open("test/milter.log","a")
        self._delrcpt = []	# record deleted rcpts for testing
        self._addrcpt = []	# record added rcpts for testing
    
      def log(self,*msg):
        for i in msg: print >>self.logfp, i,
        print >>self.logfp
    
      def getsymval(self,name):
        if name == 'j': return 'test.milter.org'
    
        return ''
    
    
      def replacebody(self,chunk):
        if self._body:
          self._body.write(chunk)
    
          self.bodyreplaced = True
    
        else:
          raise IOError,"replacebody not called from eom()"
    
      # FIXME: rfc822 indexing does not really reflect the way chg/add header
      # work for a milter
      def chgheader(self,field,idx,value):
        if not self._body:
          raise IOError,"chgheader not called from eom()"
        self.log('chgheader: %s[%d]=%s' % (field,idx,value))
        if value == '':
          del self._msg[field]
        else:
          self._msg[field] = value
    
        self.headerschanged = True
    
    
      def addheader(self,field,value):
        if not self._body:
          raise IOError,"addheader not called from eom()"
        self.log('addheader: %s=%s' % (field,value))
        self._msg[field] = value
    
        self.headerschanged = True
    
    
      def delrcpt(self,rcpt):
        if not self._body:
          raise IOError,"delrcpt not called from eom()"
        self._delrcpt.append(rcpt)
    
      def addrcpt(self,rcpt):
        if not self._body:
          raise IOError,"addrcpt not called from eom()"
        self._addrcpt.append(rcpt)
    
      def setreply(self,rcode,xcode,msg):
        self.reply = (rcode,xcode,msg)
    
      def feedFile(self,fp,sender="spam@adv.com",rcpt="victim@lamb.com"):
        self._body = None
    
        self.bodyreplaced = False
        self.headerschanged = False
    
        self.reply = None
        msg = rfc822.Message(fp)
        rc = self.envfrom('<%s>'%sender)
        if rc != Milter.CONTINUE: return rc
        rc = self.envrcpt('<%s>'%rcpt)
        if rc != Milter.CONTINUE: return rc
        line = None
        for h in msg.headers:
          if h[:1].isspace():
    	line = line + h
    	continue
          if not line:
    	line = h
    	continue
          s = line.split(': ',1)
          if len(s) > 1: val = s[1].strip()
          else: val = ''
          rc = self.header(s[0],val)
          if rc != Milter.CONTINUE: return rc
          line = h
        if line:
          s = line.split(': ',1)
          rc = self.header(s[0],s[1])
          if rc != Milter.CONTINUE: return rc
        rc = self.eoh()
        if rc != Milter.CONTINUE: return rc
        while 1:
          buf = fp.read(8192)
          if len(buf) == 0: break
          rc = self.body(buf)
          if rc != Milter.CONTINUE: return rc
        self._msg = msg
        self._body = StringIO.StringIO()
        rc = self.eom()
        if self.bodyreplaced:
          body = self._body.getvalue()
        else:
          msg.rewindbody()
          body = msg.fp.read()
        self._body = StringIO.StringIO()
        self._body.writelines(msg.headers)
        self._body.write('\n')
        self._body.write(body)
        return rc
    
      def feedMsg(self,fname,sender="spam@adv.com",rcpt="victim@lamb.com"):
        fp = open('test/'+fname,'r')
        rc = self.feedFile(fp,sender,rcpt)
        fp.close()
        return rc
    
      def connect(self,host='localhost'):
        self._body = None
    
        self.bodyreplaced = False
    
        rc =  bms.bmsMilter.connect(self,host,1,('1.2.3.4',1234)) 
        if rc != Milter.CONTINUE and rc != Milter.ACCEPT:
          self.close()
          return rc
        rc = self.hello('spamrelay')
        if rc != Milter.CONTINUE:
          self.close()
        return rc
    
    class BMSMilterTestCase(unittest.TestCase):
    
      def testDefang(self,fname='virus1'):
        milter = TestMilter()
        rc = milter.connect('testDefang')
        self.assertEqual(rc,Milter.CONTINUE)
        rc = milter.feedMsg(fname)
        self.assertEqual(rc,Milter.ACCEPT)
        self.failUnless(milter.bodyreplaced,"Message body not replaced")
        fp = milter._body
        open('test/'+fname+".tstout","w").write(fp.getvalue())
        #self.failUnless(fp.getvalue() == open("test/virus1.out","r").read())
        fp.seek(0)
    
        msg = mime.message_from_file(fp)
    
        str = msg.get_payload(1).get_payload()
        milter.log(str)
        milter.close()
    
      # test some spams that crashed our parser
      def testParse(self,fname='spam7'):
        milter = TestMilter()
        milter.connect('testParse')
        rc = milter.feedMsg(fname)
        self.assertEqual(rc,Milter.ACCEPT)
        self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
        fp = milter._body
        open('test/'+fname+".tstout","w").write(fp.getvalue())
        milter.connect('pro-send.com')
        rc = milter.feedMsg('spam8')
        self.assertEqual(rc,Milter.ACCEPT)
        self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
        rc = milter.feedMsg('bounce')
        self.assertEqual(rc,Milter.ACCEPT)
        self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
        rc = milter.feedMsg('bounce1')
        self.assertEqual(rc,Milter.ACCEPT)
        self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
        milter.close()
    
      def testDefang2(self):
        milter = TestMilter()
        milter.connect('testDefang2')
        rc = milter.feedMsg('samp1')
        self.assertEqual(rc,Milter.ACCEPT)
        self.failIf(milter.bodyreplaced,"Milter needlessly replaced body.")
        rc = milter.feedMsg("virus3")
        self.assertEqual(rc,Milter.ACCEPT)
        self.failUnless(milter.bodyreplaced,"Message body not replaced")
        fp = milter._body
        open("test/virus3.tstout","w").write(fp.getvalue())
        #self.failUnless(fp.getvalue() == open("test/virus3.out","r").read())
        rc = milter.feedMsg("virus6")
        self.assertEqual(rc,Milter.ACCEPT)
        self.failUnless(milter.bodyreplaced,"Message body not replaced")
        self.failUnless(milter.headerschanged,"Message headers not adjusted")
        fp = milter._body
        open("test/virus6.tstout","w").write(fp.getvalue())
        milter.close()
    
      def testDefang3(self):
        milter = TestMilter()
        milter.connect('testDefang3')
        # test script removal on complex HTML attachment
        rc = milter.feedMsg('amazon')
        self.assertEqual(rc,Milter.ACCEPT)
        self.failUnless(milter.bodyreplaced,"Message body not replaced")
        fp = milter._body
        open("test/amazon.tstout","w").write(fp.getvalue())
        # test defanging Klez virus
        rc = milter.feedMsg("virus13")
        self.assertEqual(rc,Milter.ACCEPT)
        self.failUnless(milter.bodyreplaced,"Message body not replaced")
        fp = milter._body
        open("test/virus13.tstout","w").write(fp.getvalue())
        # test script removal on quoted-printable HTML attachment
        # sgmllib can't handle the <![if cond]> syntax
        rc = milter.feedMsg('spam44')
        self.assertEqual(rc,Milter.ACCEPT)
        self.failIf(milter.bodyreplaced,"Message body replaced")
        fp = milter._body
        open("test/spam44.tstout","w").write(fp.getvalue())
        milter.close()
     
      def testRFC822(self):
        milter = TestMilter()
        milter.connect('testRFC822')
        # test encoded rfc822 attachment
        #pdb.set_trace()
        rc = milter.feedMsg('test8')
        self.assertEqual(rc,Milter.ACCEPT)
    
        # python2.4 doesn't scan encoded message attachments
        if sys.hexversion < 0x02040000:
          self.failUnless(milter.bodyreplaced,"Message body not replaced")
    
        #self.failIf(milter.bodyreplaced,"Message body replaced")
        fp = milter._body
        open("test/test8.tstout","w").write(fp.getvalue())
        rc = milter.feedMsg('virus7')
        self.assertEqual(rc,Milter.ACCEPT)
        self.failUnless(milter.bodyreplaced,"Message body not replaced")
        #self.failIf(milter.bodyreplaced,"Message body replaced")
        fp = milter._body
        open("test/virus7.tstout","w").write(fp.getvalue())
    
      def testSmartAlias(self):
        milter = TestMilter()
        milter.connect('testSmartAlias')
        # test smart alias feature
    
    Stuart Gathman's avatar
    Stuart Gathman committed
        key = ('foo@example.com','baz@bat.com')
    
        bms.smart_alias[key] = ['ham@eggs.com']
        rc = milter.feedMsg('test8',key[0],key[1])
        self.assertEqual(rc,Milter.ACCEPT)
        self.failUnless(milter._delrcpt == ['<baz@bat.com>'])
        self.failUnless(milter._addrcpt == ['<ham@eggs.com>'])
    
        # python2.4 email does not decode message attachments, so script
        # is not replaced
        if sys.hexversion < 0x02040000:
          self.failUnless(milter.bodyreplaced,"Message body not replaced")
    
    
      def testBadBoundary(self):
        milter = TestMilter()
        milter.connect('testBadBoundary')
        # test rfc822 attachment with invalid boundaries
        #pdb.set_trace()
        rc = milter.feedMsg('bound')
    
        if sys.hexversion < 0x02040000:
          # python2.4 adds invalid boundaries to decects list and makes
          # payload a str
          self.assertEqual(rc,Milter.REJECT)
          self.assertEqual(milter.reply[0],'554')
    
        #self.failUnless(milter.bodyreplaced,"Message body not replaced")
        self.failIf(milter.bodyreplaced,"Message body replaced")
        fp = milter._body
        open("test/bound.tstout","w").write(fp.getvalue())
    
      def testCompoundFilename(self):
        milter = TestMilter()
        milter.connect('testCompoundFilename')
        # test rfc822 attachment with invalid boundaries
        #pdb.set_trace()
        rc = milter.feedMsg('test1')
        self.assertEqual(rc,Milter.ACCEPT)
        #self.failUnless(milter.bodyreplaced,"Message body not replaced")
        self.failIf(milter.bodyreplaced,"Message body replaced")
        fp = milter._body
        open("test/test1.tstout","w").write(fp.getvalue())
    
    
      def testFindsrs(self):
        if not bms.srs:
          import SRS
          bms.srs = SRS.new(secret='test')
        sender = bms.srs.forward('foo@bar.com','mail.example.com')
        sndr = bms.findsrs(StringIO.StringIO(
    """Received: from [1.16.33.86] (helo=mail.example.com)
    	by bastion4.mail.zen.co.uk with smtp (Exim 4.50) id 1H3IBC-00013b-O9
    	for foo@bar.com; Sat, 06 Jan 2007 20:30:17 +0000
    X-Mailer: "PyMilter-0.8.5"
    	<%s> foo
    MIME-Version: 1.0
    Content-Type: text/plain
    To: foo@bar.com
    From: postmaster@mail.example.com
    """ % sender
        ))
        self.assertEqual(sndr,'foo@bar.com')
    
    
    #  def testReject(self):
    #    "Test content based spam rejection."
    #    milter = TestMilter()
    #    milter.connect('gogo-china.com')
    #    rc = milter.feedMsg('big5');
    #    self.failUnless(rc == Milter.REJECT)
    #    milter.close();
    
    
    def suite(): 
      s = unittest.makeSuite(BMSMilterTestCase,'test')
      s.addTest(doctest.DocTestSuite(bms))
      return s
    
    
    if __name__ == '__main__':
      if len(sys.argv) > 1:
        for fname in sys.argv[1:]:
          milter = TestMilter()
          milter.connect('main')
          fp = open(fname,'r')
          rc = milter.feedFile(fp)
          fp = milter._body
          sys.stdout.write(fp.getvalue())
      else:
    
    Stuart Gathman's avatar
    Stuart Gathman committed
        #unittest.main()
        unittest.TextTestRunner().run(suite())