Skip to content
Snippets Groups Projects
Commit 755f3edb authored by Stuart D. Gathman's avatar Stuart D. Gathman
Browse files

Use binary files for email. Still some issues to work out with payloads.

parent bae79a4f
No related branches found
No related tags found
No related merge requests found
......@@ -4,9 +4,9 @@
from __future__ import print_function
import mime
try:
from StringIO import StringIO
from io import BytesIO
except:
from io import StringIO
from StringIO import StringIO as BytesIO
import Milter
Milter.NOREPLY = Milter.CONTINUE
......@@ -155,20 +155,20 @@ class TestBase(object):
rc = self.eoh()
if rc != Milter.CONTINUE: return rc
header,body = msg.as_bytes().split(b'\n\n',1)
bfp = StringIO(body)
bfp = BytesIO(body)
while 1:
buf = bfp.read(8192)
if len(buf) == 0: break
rc = self.body(buf)
if rc != Milter.CONTINUE: return rc
self._msg = msg
self._body = StringIO()
self._body = BytesIO()
rc = self.eom()
if self._bodyreplaced:
body = self._body.getvalue()
self._body = StringIO()
self._body = BytesIO()
self._body.write(header)
self._body.write('\n\n')
self._body.write(b'\n\n')
self._body.write(body)
return rc
......@@ -177,7 +177,7 @@ class TestBase(object):
# @param sender MAIL FROM
# @param rcpts RCPT TO, multiple recipients may be supplied
def feedMsg(self,fname,sender="spam@adv.com",*rcpts):
with open('test/'+fname,'r') as fp:
with open('test/'+fname,'rb') as fp:
return self.feedFile(fp,sender,*rcpts)
## Call the connect and helo callbacks.
......
......@@ -212,11 +212,11 @@ def parse_header(val):
for s,enc in h:
if enc:
try:
u.append(unicode(s,enc,'replace'))
u.append(s.decode(enc,'replace'))
except LookupError:
u.append(unicode(s))
u.append(s.decode())
else:
u.append(unicode(s))
u.append(s.decode())
u = u''.join(u)
for enc in ('us-ascii','iso-8859-1','utf-8'):
try:
......
......@@ -95,16 +95,20 @@
from __future__ import print_function
try:
from StringIO import StringIO
from io import BytesIO, StringIO
except:
from io import StringIO
from StringIO import StringIO
BytesIO = StringIO
import socket
import Milter
import zipfile
import email
from email.message import Message
from email.generator import Generator
try:
from email.generator import BytesGenerator
except:
from email.generator import Generator as BytesGenerator
from email.utils import quote
if not getattr(Message,'as_bytes',None):
......@@ -113,7 +117,7 @@ if not getattr(Message,'as_bytes',None):
## Return a list of filenames in a zip file.
# Embedded zip files are recursively expanded.
def zipnames(txt):
fp = StringIO(txt)
fp = BytesIO(txt)
zipf = zipfile.ZipFile(fp,'r')
names = []
for nm in zipf.namelist():
......@@ -124,7 +128,7 @@ def zipnames(txt):
## Fix multipart handling in email.Generator.
#
class MimeGenerator(Generator):
class MimeGenerator(BytesGenerator):
def _dispatch(self, msg):
# Get the Content-Type: for the message, then try to dispatch to
# self._handle_<maintype>_<subtype>(). If there's no handler for the
......@@ -134,7 +138,7 @@ class MimeGenerator(Generator):
if msg.is_multipart() and main.lower() != 'multipart':
self._handle_multipart(msg)
else:
Generator._dispatch(self,msg)
BytesGenerator._dispatch(self,msg)
def unquote(s):
"""Remove quotes from a string."""
......@@ -237,9 +241,9 @@ class MimeMessage(Message):
g = MimeGenerator(file)
g.flatten(self,unixfrom=unixfrom)
def as_string(self, unixfrom=False):
def as_bytes(self, unixfrom=False):
"Return the entire formatted message as a string."
fp = StringIO()
fp = BytesIO()
self.dump(fp,unixfrom=unixfrom)
return fp.getvalue()
......@@ -300,7 +304,7 @@ class MimeMessage(Message):
return None
def message_from_file(fp):
msg = email.message_from_file(fp,MimeMessage)
msg = email.message_from_binary_file(fp,MimeMessage)
for part in msg.walk():
part.modified = False
assert not msg.ismodified()
......@@ -509,7 +513,7 @@ def check_html(msg,savname=None):
htmlfilter.close()
#except sgmllib.SGMLParseError:
except:
#mimetools.copyliteral(msg.get_payload(),open('debug.out','w')
mimetools.copyliteral(msg.get_payload(),open('debug.out','wb'))
htmlfilter.close()
hostname = socket.gethostname()
msg.set_payload(
......@@ -539,7 +543,7 @@ if __name__ == '__main__':
return Milter.CONTINUE
for fname in sys.argv[1:]:
fp = open(fname)
fp = open(fname,'rb')
msg = message_from_file(fp)
email.iterators._structure(msg)
check_attachments(msg,_list_attach)
......@@ -8,9 +8,9 @@ from __future__ import print_function
import sys
import os
try:
from StringIO import StringIO
from io import BytesIO
except:
from io import StringIO
from StringIO import StringIO as BytesIO
import mime
import Milter
import tempfile
......@@ -41,7 +41,7 @@ class sampleMilter(Milter.Milter):
def envfrom(self,f,*str):
"start of MAIL transaction"
self.log("mail from",f,str)
self.fp = StringIO()
self.fp = BytesIO()
self.tempname = None
self.mailfrom = f
self.bodysize = 0
......@@ -97,12 +97,12 @@ class sampleMilter(Milter.Milter):
if lname in ('subject','x-mailer'):
self.log('%s: %s' % (name,val))
if self.fp:
self.fp.write("%s: %s\n" % (name,val)) # add header to buffer
self.fp.write(("%s: %s\n" % (name,val)).encode()) # add header to buffer
return Milter.CONTINUE
def eoh(self):
if not self.fp: return Milter.TEMPFAIL # not seen by envfrom
self.fp.write("\n")
self.fp.write(b'\n')
self.fp.seek(0)
# copy headers to a temp file for scanning the body
headers = self.fp.getvalue()
......@@ -144,7 +144,7 @@ class sampleMilter(Milter.Milter):
msg.dump(out)
out.seek(0)
msg = mime.message_from_file(out)
fp = StringIO(msg.as_bytes().split(b'\n\n',1)[1])
fp = BytesIO(msg.as_bytes().split(b'\n\n',1)[1])
while 1:
buf = fp.read(8192)
if len(buf) == 0: break
......
......@@ -62,18 +62,18 @@ class MimeTestCase(unittest.TestCase):
self.failUnless(plist[0] == 'name="Jim&amp;amp;Girlz.jpg"')
def testParse(self,fname='samp1'):
msg = mime.message_from_file(open('test/'+fname,"r"))
msg = mime.message_from_file(open('test/'+fname,"rb"))
self.failUnless(msg.ismultipart())
parts = msg.get_payload()
self.failUnless(len(parts) == 2)
txt1 = parts[0].get_payload()
self.failUnless(txt1.rstrip() == samp1_txt1,txt1)
msg = mime.message_from_file(open('test/missingboundary',"r"))
msg = mime.message_from_file(open('test/missingboundary',"rb"))
# should get no exception as long as we don't try to parse
# message attachments
mime.defang(msg,scan_rfc822=False)
msg.dump(open('test/missingboundary.out','w'))
msg = mime.message_from_file(open('test/missingboundary',"r"))
msg.dump(open('test/missingboundary.out','wb'))
msg = mime.message_from_file(open('test/missingboundary',"rb"))
try:
mime.defang(msg)
# python 2.4 doesn't get exceptions on missing boundaries, and
......@@ -85,12 +85,12 @@ class MimeTestCase(unittest.TestCase):
def testDefang(self,vname='virus1',part=1,
fname='LOVE-LETTER-FOR-YOU.TXT.vbs'):
msg = mime.message_from_file(open('test/'+vname,"r"))
msg = mime.message_from_file(open('test/'+vname,"rb"))
mime.defang(msg,scan_zip=True)
self.failUnless(msg.ismodified(),"virus not removed")
oname = vname + '.out'
msg.dump(open('test/'+oname,"w"))
msg = mime.message_from_file(open('test/'+oname,"r"))
msg.dump(open('test/'+oname,"wb"))
msg = mime.message_from_file(open('test/'+oname,"rb"))
txt2 = msg.get_payload()
if type(txt2) == list:
txt2 = txt2[part].get_payload()
......@@ -110,11 +110,11 @@ class MimeTestCase(unittest.TestCase):
# virus6 has no parts - the virus is directly inline
def testDefang6(self,vname="virus6",fname='FAX20.exe'):
msg = mime.message_from_file(open('test/'+vname,"r"))
msg = mime.message_from_file(open('test/'+vname,"rb"))
mime.defang(msg)
oname = vname + '.out'
msg.dump(open('test/'+oname,"w"))
msg = mime.message_from_file(open('test/'+oname,"r"))
msg.dump(open('test/'+oname,"wb"))
msg = mime.message_from_file(open('test/'+oname,"rb"))
self.failIf(msg.ismultipart())
txt2 = msg.get_payload()
self.failUnless(txt2 == mime.virus_msg % \
......@@ -123,11 +123,11 @@ class MimeTestCase(unittest.TestCase):
# honey virus has a sneaky ASP payload which is parsed correctly
# by email package in python-2.2.2, but not by mime.MimeMessage or 2.2.1
def testDefang7(self,vname="honey",fname='story[1].scr'):
msg = mime.message_from_file(open('test/'+vname,"r"))
msg = mime.message_from_file(open('test/'+vname,"rb"))
mime.defang(msg)
oname = vname + '.out'
msg.dump(open('test/'+oname,"w"))
msg = mime.message_from_file(open('test/'+oname,"r"))
msg.dump(open('test/'+oname,"wb"))
msg = mime.message_from_file(open('test/'+oname,"rb"))
parts = msg.get_payload()
txt2 = parts[1].get_payload()
txt3 = parts[2].get_payload()
......@@ -138,7 +138,7 @@ class MimeTestCase(unittest.TestCase):
('story[1].asp',hostname,None),txt3)
def testParse2(self,fname="spam7"):
msg = mime.message_from_file(open('test/'+fname,"r"))
msg = mime.message_from_file(open('test/'+fname,"rb"))
self.failUnless(msg.ismultipart())
parts = msg.get_payload()
self.failUnless(len(parts) == 2)
......@@ -148,11 +148,11 @@ class MimeTestCase(unittest.TestCase):
def testZip(self,vname="zip1",fname='zip.zip'):
self.testDefang(vname,1,'zip.zip')
# test scan_zip flag
msg = mime.message_from_file(open('test/'+vname,"r"))
msg = mime.message_from_file(open('test/'+vname,"rb"))
mime.defang(msg,scan_zip=False)
self.failIf(msg.ismodified())
# test ignoring empty zip (often found in DSNs)
msg = mime.message_from_file(open('test/zip2','r'))
msg = mime.message_from_file(open('test/zip2','rb'))
mime.defang(msg,scan_zip=True)
self.failIf(msg.ismodified())
# test corrupt zip (often an EXE named as a ZIP)
......@@ -177,10 +177,10 @@ class MimeTestCase(unittest.TestCase):
def testCheckAttach(self,fname="test1"):
# test1 contains a very long filename
msg = mime.message_from_file(open('test/'+fname,'r'))
msg = mime.message_from_file(open('test/'+fname,'rb'))
mime.defang(msg,scan_zip=True)
self.failIf(msg.ismodified())
msg = mime.message_from_file(open('test/test2','r'))
msg = mime.message_from_file(open('test/test2','rb'))
rc = mime.check_attachments(msg,self._chk_attach)
self.assertEquals(self.filename,"7501'S FOR TWO GOLDEN SOURCES SHIPMENTS FOR TAX & DUTY PURPOSES ONLY.PDF")
self.assertEquals(rc,Milter.CONTINUE)
......@@ -205,7 +205,7 @@ if __name__ == '__main__':
unittest.main()
else:
for fname in sys.argv[1:]:
fp = open(fname,'r')
fp = open(fname,'rb')
msg = mime.message_from_file(fp)
mime.defang(msg,scan_zip=True)
print(msg.as_string())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment