Skip to content
Snippets Groups Projects
Select Git revision
  • 6bbb6b3f02e19fcc894b67ff61641cede36223e4
  • master default protected
  • pymilter-1.0.4
  • pymilter-1.0.3
  • pymilter-1.0.2
  • pymilter-1.0.1
  • pymilter-1_0
  • milter-0_8_18
  • pymilter-0_9_8
  • pymilter-0_9_7
  • pymilter-0_9_6
  • pymilter-0_9_5
  • pymilter-0_9_4
  • pymilter-0_9_2
  • pymilter-0_9_1
  • pymilter-0_9_0
  • pymilter-0_8_12
  • pymilter-0_8_11
  • pymilter-0_8_10
  • pymilter-0_8_9
  • milter-0_8_8
  • milter-0_8_7
22 results

testmime.py

Blame
  • mime.py 15.50 KiB
    # $Log$
    # Revision 1.4  2005/06/17 01:49:39  customdesigned
    # Handle zip within zip.
    #
    # Revision 1.3  2005/06/02 15:00:17  customdesigned
    # Configure banned extensions.  Scan zipfile option with test case.
    #
    # Revision 1.2  2005/06/02 04:18:55  customdesigned
    # Update copyright notices after reading article on /.
    #
    # Revision 1.1.1.4  2005/05/31 18:23:49  customdesigned
    # Development changes since 0.7.2
    #
    # Revision 1.62  2005/02/14 22:31:17  stuart
    # _parseparam replacement not needed for python2.4
    #
    # Revision 1.61  2005/02/12 02:11:11  stuart
    # Pass unit tests with python2.4.
    #
    # Revision 1.60  2005/02/11 18:34:14  stuart
    # Handle garbage after quote in boundary.
    #
    # Revision 1.59  2005/02/10 01:10:59  stuart
    # Fixed MimeMessage.ismodified()
    #
    # Revision 1.58  2005/02/10 00:56:49  stuart
    # Runs with python2.4.  Defang not working correctly - more work needed.
    #
    # Revision 1.57  2004/11/20 16:37:52  stuart
    # fix regex for splitting header and body
    #
    # Revision 1.56  2004/11/09 20:33:51  stuart
    # Recognize more dynamic PTR variations.
    #
    # Revision 1.55  2004/10/06 21:39:20  stuart
    # Handle message attachments with boundary errors by not parsing them
    # until needed.
    #
    # Revision 1.54  2004/08/18 01:59:46  stuart
    # Handle mislabeled multipart messages
    #
    # Revision 1.53  2004/04/24 22:53:20  stuart
    # Rename some local variables to avoid shadowing builtins
    #
    # Revision 1.52  2004/04/24 22:47:13  stuart
    # Convert header values to str
    #
    # Revision 1.51  2004/03/25 03:19:10  stuart
    # Correctly defang rfc822 attachments when boundary specified with
    # content-type message/rfc822.
    #
    # Revision 1.50  2003/10/15 22:01:00  stuart
    # Test for and work around email bug with encoded filenames.
    #
    # Revision 1.49  2003/09/04 18:48:13  stuart
    # Support python-2.2.3
    #
    # Revision 1.48  2003/09/02 00:27:27  stuart
    # Should have full milter based dspam support working
    #
    # Revision 1.47  2003/08/26 06:08:18  stuart
    # Use new python boolean since we now require 2.2.2
    #
    # Revision 1.46  2003/08/26 05:01:38  stuart
    # Release 0.6.0
    #
    # Revision 1.45  2003/08/26 04:01:24  stuart
    # Use new email module for parsing mail.  Still need mime module to
    # provide various bug fixes to email module, and maintain some compatibility
    # with old milter code.
    #
    
    # This module provides a "defang" function to replace naughty attachments
    # with a warning message.
    
    # Author: Stuart D. Gathman <stuart@bmsi.com>
    # Copyright 2001,2002,2003,2004,2005 Business Management Systems, Inc.
    # This code is under the GNU General Public License.  See COPYING for details.
    
    import StringIO
    import socket
    import Milter
    import zipfile
    
    import email
    import email.Message
    from email.Message import Message
    from email.Generator import Generator
    from email.Utils import quote
    from email import Utils
    from email.Parser import Parser
    from email import Errors
    
    from types import ListType,StringType
    
    def zipnames(txt):
      fp =  StringIO.StringIO(txt)
      zipf = zipfile.ZipFile(fp,'r')
      names = []
      for nm in zipf.namelist():
        names.append(('zipname',nm))
        if nm.lower().endswith('.zip'):
          names += zipnames(zipf.read(nm))
      return names
    
    class MimeGenerator(Generator):
        def _dispatch(self, msg):
            # Get the Content-Type: for the message, then try to dispatch to
            # self._handle_<maintype>_<subtype>().  If there's no handler for the
            # full MIME type, then dispatch to self._handle_<maintype>().  If
            # that's missing too, then dispatch to self._writeBody().
            main = msg.get_content_maintype()
    	if msg.is_multipart() and main.lower() != 'multipart':
    	  self._handle_multipart(msg)
    	else:
    	  Generator._dispatch(self,msg)
    
    def unquote(s):
        """Remove quotes from a string."""
        if len(s) > 1:
            if s.startswith('"'):
    	  if s.endswith('"'):
                s = s[1:-1]
    	  else: # remove garbage after trailing quote
    	    try: s = s[1:s[1:].index('"')+1]
    	    except:
    	      return s
    	  return s.replace('\\\\', '\\').replace('\\"', '"')
            if s.startswith('<') and s.endswith('>'):
    	  return s[1:-1]
        return s
    
    from types import TupleType
    
    def _unquotevalue(value):
      if isinstance(value, TupleType):
          return value[0], value[1], unquote(value[2])
      else:
          return unquote(value)
    
    #email.Message._unquotevalue = _unquotevalue
    
    from email.Message import _parseparam
    
    # Enhance email.Message 
    # - Provide a headerchange event for integration with Milter
    #   Headerchange attribute can be assigned a function to be called when
    #   changing headers.  The signature is:
    #	headerchange(msg,name,value) -> None
    # - Track modifications to headers of body or any part independently
    
    class MimeMessage(Message):
      """Version of email.Message.Message compatible with old mime module
      """
      def __init__(self,fp=None,seekable=1):
        Message.__init__(self)
        self.headerchange = None
        self.submsg = None
        self.modified = False
    
      def get_param(self, param, failobj=None, header='content-type', unquote=True):
        val = Message.get_param(self,param,failobj,header,unquote)
        if val != failobj and param == 'boundary' and unquote:
          # unquote boundaries an extra time, test case testDefang5
          return _unquotevalue(val)
        return val
    
      getfilename = Message.get_filename
      ismultipart = Message.is_multipart
      getheaders = Message.get_all
      gettype = Message.get_content_type
      getparam = Message.get_param
    
      def getparams(self): return self.get_params([])
    
      def getname(self):
        return self.get_param('name')
    
      def getnames(self,scan_zip=False):
        """Return a list of (attr,name) pairs of attributes that IE might
           interpret as a name - and hence decide to execute this message."""
        names = []
        for attr,val in self._get_params_preserve([],'content-type'):
          if isinstance(val, TupleType):
    	  # It's an RFC 2231 encoded parameter
    	  newvalue = _unquotevalue(val)
    	  if val[0]:
    	    val =  unicode(newvalue[2], newvalue[0])
    	  else:
    	    val = unicode(newvalue[2])
          else:
    	  val = _unquotevalue(val.strip())
          names.append((attr,val))
        names += [("filename",self.get_filename())]
        if scan_zip:
          for key,name in tuple(names):	# copy by converting to tuple
    	if name and name.lower().endswith('.zip'):
    	  txt = self.get_payload(decode=True)
    	  if txt.strip():
    	    names += zipnames(txt)
        return names
    
      def ismodified(self):
        "True if this message or a subpart has been modified."
        if not self.is_multipart():
          if isinstance(self.submsg,Message):
            return self.submsg.ismodified()
          return self.modified
        if self.modified: return True
        for i in self.get_payload():
          if i.ismodified(): return True
        return False
    
      def dump(self,file,unixfrom=False):
        "Write this message (and all subparts) to a file"
        g = MimeGenerator(file)
        g.flatten(self,unixfrom=unixfrom)
    
      def as_string(self, unixfrom=False):
          "Return the entire formatted message as a string."
          fp = StringIO.StringIO()
          self.dump(fp,unixfrom=unixfrom)
          return fp.getvalue()
    
      def getencoding(self):
        return self.get('content-transfer-encoding',None)
    
      # Decode body to stream according to transfer encoding, return encoding name
      def decode(self,filt):
        try:
          filt.write(self.get_payload(decode=True))
        except:
          pass
        return self.getencoding()
    
      def get_payload_decoded(self):
        return self.get_payload(decode=True)
    
      def __setitem__(self, name, value):
        rc = Message.__setitem__(self,name,value)
        self.modified = True
        if self.headerchange: self.headerchange(self,name,str(value))
        return rc
    
      def __delitem__(self, name):
        if self.headerchange: self.headerchange(self,name,None)
        rc = Message.__delitem__(self,name)
        self.modified = True
        return rc
    
      def get_payload(self,i=None,decode=False):
        msg = self.submsg
        if isinstance(msg,Message) and msg.ismodified():
          self.set_payload([msg])
        return Message.get_payload(self,i,decode)
    
      def set_payload(self, val, charset=None):
        self.modified = True
        try:
          val.seek(0)
          val = val.read()
        except: pass
        Message.set_payload(self,val,charset)
        self.submsg = None
    
      def get_submsg(self):
        t = self.get_content_type().lower()
        if t == 'message/rfc822' or t.startswith('multipart/'):
          if not self.submsg:
            txt = self.get_payload()
    	if type(txt) == str:
    	  txt = self.get_payload(decode=True)
    	  self.submsg = email.message_from_string(txt,MimeMessage)
    	  for part in self.submsg.walk():
    	    part.modified = False
    	else:
    	  self.submsg = txt[0]
          return self.submsg
        return None
    
    def message_from_file(fp):
      msg = email.message_from_file(fp,MimeMessage)
      for part in msg.walk():
        part.modified = False
      assert not msg.ismodified()
      return msg
    
    extlist = ''.join("""
    ade,adp,asd,asx,asp,bas,bat,chm,cmd,com,cpl,crt,dll,exe,hlp,hta,inf,ins,isp,js,
    jse,lnk,mdb,mde,msc,msi,msp,mst,ocx,pcd,pif,reg,scr,sct,shs,url,vb,vbe,vbs,wsc,
    wsf,wsh 
    """.split())
    bad_extensions = map(lambda x:'.' + x,extlist.split(','))
    
    def check_ext(name):
      "Check a name for dangerous Winblows extensions."
      if not name: return name
      lname = name.lower()
      for ext in bad_extensions:
        if lname.endswith(ext): return name
      return None
    
    virus_msg = """This message appeared to contain a virus.
    It was originally named '%s', and has been removed.
    A copy of your original message was saved as '%s:%s'.
    See your administrator.
    """
    
    def check_name(msg,savname=None,ckname=check_ext,scan_zip=False):
      "Replace attachment with a warning if its name is suspicious."
      try:
        for key,name in msg.getnames(scan_zip):
          badname = ckname(name)
          if badname:
            if key == 'zipname':
              badname = msg.get_filename()
    	break
        else:
          return Milter.CONTINUE
      except zipfile.BadZipfile:
        # a ZIP that is not a zip is very suspicious
        badname = msg.get_filename()
      hostname = socket.gethostname()
      msg.set_payload(virus_msg % (badname,hostname,savname))
      del msg["content-type"]
      del msg["content-disposition"]
      del msg["content-transfer-encoding"]
      name = "WARNING.TXT"
      msg["Content-Type"] = "text/plain; name="+name
      return Milter.CONTINUE
    
    import email.Iterators
    
    def check_attachments(msg,check):
      """Scan attachments.
    msg	MimeMessage
    check	function(MimeMessage): int
    	Return CONTINUE, REJECT, ACCEPT
      """
      if msg.is_multipart():
        for i in msg.get_payload():
          rc = check_attachments(i,check)
          if rc != Milter.CONTINUE: return rc
        return Milter.CONTINUE
      return check(msg)
    
    # save call context for Python without nested_scopes
    class _defang:
    
      def __init__(self,scan_html=True):
        self.scan_html = scan_html
    
      def _chk_name(self,msg):
        rc = check_name(msg,self._savname,self._check,self.scan_zip)
        if self.scan_html:
          check_html(msg,self._savname)	# remove scripts from HTML
        if self.scan_rfc822:
          msg = msg.get_submsg()
          if isinstance(msg,Message):
            return check_attachments(msg,self._chk_name)
        return rc
    
      def __call__(self,msg,savname=None,check=check_ext,scan_rfc822=True,
      		scan_zip=False):
        """Compatible entry point.
        Replace all attachments with dangerous names."""
        self._savname = savname
        self._check = check
        self.scan_rfc822 = scan_rfc822
        self.scan_zip = scan_zip
        check_attachments(msg,self._chk_name)
        if msg.ismodified():
          return True
        return False
    
    # emulate old defang function
    defang = _defang()
    
    import sgmllib
    
    import re
    declname = re.compile(r'[a-zA-Z][-_.a-zA-Z0-9]*\s*')
    declstringlit = re.compile(r'(\'[^\']*\'|"[^"]*")\s*')
    
    class SGMLFilter(sgmllib.SGMLParser):
      """Parse HTML and pass through all constructs unchanged.  It is intended for
         derived classes to implement exceptional processing for selected cases.
      """
      def __init__(self,out):
        sgmllib.SGMLParser.__init__(self)
        self.out = out
    
      def handle_comment(self,comment):
        self.out.write("<!--%s-->" % comment)
    
      def unknown_starttag(self,tag,attr):
        if hasattr(self,"get_starttag_text"):
          self.out.write(self.get_starttag_text())
        else:
          self.out.write("<%s" % tag)
          for (key,val) in attr:
            self.out.write(' %s="%s"' % (key,val))
          self.out.write('>')
    
      def handle_data(self,data):
        self.out.write(data)
    
      def handle_entityref(self,ref):
        self.out.write("&%s;" % ref)
    
      def handle_charref(self,ref):
        self.out.write("&#%s;" % ref)
          
      def unknown_endtag(self,tag):
        self.out.write("</%s>" % tag)
    
      def handle_special(self,data):
        self.out.write("<!%s>" % data)
    
      def write(self,buf):
        "Act like a writer.  Why doesn't SGMLParser do this by default?"
        self.feed(buf)
    
      # Python-2.1 sgmllib rejects illegal declarations.  Since various Microsoft
      # products accept and output them, we need to pass them through -
      # at least until we discover that MS will execute them.
      # sgmlop-1.1 will not use this method, but calls handle_special to
      # do what we want.
      def parse_declaration(self, i):
          rawdata = self.rawdata
          n = len(rawdata)
          j = i + 2
          while j < n:
    	  c = rawdata[j]
    	  if c == ">":
    	      # end of declaration syntax
    	      self.handle_special(rawdata[i+2:j])
    	      return j + 1
    	  if c in "\"'":
    	      m = declstringlit.match(rawdata, j)
    	      if not m:
    		  # incomplete or an error?
    		  return -1
    	      j = m.end()
    	  elif c in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ":
    	      m = declname.match(rawdata, j)
    	      if not m:
    		  # incomplete or an error?
    		  return -1
    	      j = m.end()
    	  else:
    	      j += 1
          # end of buffer between tokens
          return -1
    
    class HTMLScriptFilter(SGMLFilter):
      "Remove scripts from an HTML document."
      def __init__(self,out):
        SGMLFilter.__init__(self,out)
        self.ignoring = 0
        self.modified = False
        self.msg = "<!-- WARNING: embedded script removed -->"
      def start_script(self,unused):
        self.ignoring += 1
        self.modified = True
        self.out.write(self.msg)
      def end_script(self):
        self.ignoring -= 1
      def handle_data(self,data):
        if not self.ignoring: SGMLFilter.handle_data(self,data)
      def handle_comment(self,comment):
        if not self.ignoring: SGMLFilter.handle_comment(self,comment)
    
    def check_html(msg,savname=None):
      "Remove scripts from HTML attachments."
      msgtype = msg.get_content_type().lower()
      # check for more MSIE braindamage
      if msgtype == 'application/octet-stream':
        for (attr,name) in msg.getnames():
          if name and name.lower().endswith(".htm"):
    	msgtype = 'text/html'
      if msgtype == 'text/html':
        out = StringIO.StringIO()
        htmlfilter = HTMLScriptFilter(out)
        try:
          htmlfilter.write(msg.get_payload(decode=True))
          htmlfilter.close()
        #except sgmllib.SGMLParseError:
        except:
          #mimetools.copyliteral(msg.get_payload(),open('debug.out','w')
          htmlfilter.close()
          hostname = socket.gethostname()
          msg.set_payload(
      "An HTML attachment could not be parsed.  The original is saved as '%s:%s'"
          % (hostname,savname))
          del msg["content-type"]
          del msg["content-disposition"]
          del msg["content-transfer-encoding"]
          name = "WARNING.TXT"
          msg["Content-Type"] = "text/plain; name="+name
          return Milter.CONTINUE
        if htmlfilter.modified:
          msg.set_payload(out)	# remove embedded scripts
          del msg["content-transfer-encoding"]
          email.Encoders.encode_quopri(msg)
      return Milter.CONTINUE
    
    if __name__ == '__main__':
      import sys
      def _list_attach(msg):
        t = msg.get_content_type()
        p = msg.get_payload(decode=True)
        print msg.get_filename(),msg.get_content_type(),type(p)
        msg = msg.get_submsg()
        if isinstance(msg,Message):
          return check_attachments(msg,_list_attach)
        return Milter.CONTINUE
    
      for fname in sys.argv[1:]:
        fp = open(fname)
        msg = message_from_file(fp)
        email.Iterators._structure(msg)
        check_attachments(msg,_list_attach)