Skip to content
Snippets Groups Projects
Commit dc3c8794 authored by Jan Philipp Timme's avatar Jan Philipp Timme
Browse files

Allow rough configuration to take place

parent 358ec62a
No related branches found
No related tags found
No related merge requests found
# pymilter-suspicious-from
This is a very rough python milter implementation which is supposed to parse `From:` header values and mark suspicious ones.
```
From: "Totally Official <totally.official@example.com>" <nope-its-fake@fake.example.net>
```
import logging
log_level = logging.INFO
milter_socket = "inet:7777@127.0.0.1"
milter_timeout = 60
import time
import sys import sys
import logging import logging
import Milter import Milter
import re
from email.header import decode_header from email.header import decode_header
from email.utils import getaddresses from email.utils import getaddresses
import re
import config
# Basic logger that also logs to stdout # Basic logger that also logs to stdout
# TODO: Improve this a lot. # TODO: Improve this a lot.
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) logger.setLevel(config.log_level)
handler = logging.StreamHandler(sys.stdout) handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG) handler.setLevel(config.log_level)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
# Rough regex to fetch domain values from address-like text
address_domain_regex = re.compile('.*@(?P<domain>[\.\w-]+)') address_domain_regex = re.compile('.*@(?P<domain>[\.\w-]+)')
def get_decoded_header(value): def get_decoded_header(value):
"""Use python builtins to decode from header properly."""
decoded_header_items = decode_header(value) decoded_header_items = decode_header(value)
decoded_header_value = '' decoded_header_value = ''
for item in decoded_header_items: for item in decoded_header_items:
...@@ -37,6 +39,7 @@ def get_decoded_header(value): ...@@ -37,6 +39,7 @@ def get_decoded_header(value):
def normalizeRawFromHeader(value): def normalizeRawFromHeader(value):
"""Clean up linebreaks and spaces that are not needed."""
return value.replace('\n', '').replace('\r', '').strip() return value.replace('\n', '').replace('\r', '').strip()
...@@ -53,6 +56,8 @@ class SuspiciousFrom(Milter.Base): ...@@ -53,6 +56,8 @@ class SuspiciousFrom(Milter.Base):
logger.debug(f"({self.id}) Instanciated.") logger.debug(f"({self.id}) Instanciated.")
def reset(self): def reset(self):
"""It looks like one milter instance can reach eom hook multiple times.
This allows to re-use an instance in a more clean way."""
self.final_result = Milter.ACCEPT self.final_result = Milter.ACCEPT
self.new_headers = [] self.new_headers = []
...@@ -69,6 +74,7 @@ class SuspiciousFrom(Milter.Base): ...@@ -69,6 +74,7 @@ class SuspiciousFrom(Milter.Base):
if data[0] == '': if data[0] == '':
logger.info(f"({self.id}) No label in from header, OK!") logger.info(f"({self.id}) No label in from header, OK!")
self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - No label specified'}) self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - No label specified'})
self.new_headers.append({'name': 'X-From-Suspicious', 'value': 'NO'})
else: else:
label_domain = getDomainFromValue(data[0]) label_domain = getDomainFromValue(data[0])
address_domain = getDomainFromValue(data[1]) address_domain = getDomainFromValue(data[1])
...@@ -78,12 +84,15 @@ class SuspiciousFrom(Milter.Base): ...@@ -78,12 +84,15 @@ class SuspiciousFrom(Milter.Base):
if label_domain.lower() == address_domain.lower(): if label_domain.lower() == address_domain.lower():
logger.info(f"({self.id}) Label domain '{label_domain}' matches address domain '{address_domain}'. Good!") logger.info(f"({self.id}) Label domain '{label_domain}' matches address domain '{address_domain}'. Good!")
self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - Label domain matches address domain'}) self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - Label domain matches address domain'})
self.new_headers.append({'name': 'X-From-Suspicious', 'value': 'NO'})
else: else:
logger.info(f"({self.id}) Label domain '{label_domain}' did NOT match address domain '{address_domain}'. BAD!") logger.info(f"({self.id}) Label domain '{label_domain}' did NOT match address domain '{address_domain}'. BAD!")
self.new_headers.append({'name': 'X-From-Checked', 'value': 'FAIL - Label domain does NOT match address domain'}) self.new_headers.append({'name': 'X-From-Checked', 'value': 'FAIL - Label domain does NOT match address domain'})
self.new_headers.append({'name': 'X-From-Suspicious', 'value': 'YES'})
else: else:
logger.info(f"({self.id}) No domain found in label. Good!") logger.info(f"({self.id}) No domain found in label. Good!")
self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - No domain found in label.'}) self.new_headers.append({'name': 'X-From-Checked', 'value': 'OK - No domain found in label.'})
self.new_headers.append({'name': 'X-From-Suspicious', 'value': 'NO'})
# Use continue here, so we can reach eom hook. # Use continue here, so we can reach eom hook.
# TODO: Log and react if multiple From-headers are found? # TODO: Log and react if multiple From-headers are found?
return Milter.CONTINUE return Milter.CONTINUE
...@@ -100,13 +109,11 @@ class SuspiciousFrom(Milter.Base): ...@@ -100,13 +109,11 @@ class SuspiciousFrom(Milter.Base):
def main(): def main():
# TODO: Move this into configuration of some sort. # TODO: Move this into configuration of some sort.
milter_socket = "inet:7777@127.0.0.1"
milter_timeout = 60
Milter.factory = SuspiciousFrom Milter.factory = SuspiciousFrom
logger.info(f"Starting Milter.") logger.info(f"Starting Milter.")
# This call blocks the main thread. # This call blocks the main thread.
# TODO: Improve handling CTRL+C # TODO: Improve handling CTRL+C
Milter.runmilter("SuspiciousFromMilter", milter_socket, milter_timeout, rmsock=False) Milter.runmilter("SuspiciousFromMilter", config.milter_socket, config.milter_timeout, rmsock=False)
logger.info(f"Milter finished running.") logger.info(f"Milter finished running.")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment