Skip to content
Snippets Groups Projects
Commit 135adc0f authored by Art's avatar Art :lizard:
Browse files

RC1.

parent 2aa97ee0
No related branches found
No related tags found
No related merge requests found
Showing
with 990 additions and 0 deletions
#!/bin/bash
# Django usually listens on port 8000 without SSL.
# Some of our projects however require SSL.
# Run this script to set up nginx SSL proxy for Django.
if [[ "$EUID" != 0 ]]; then
echo "Need to be run as root."
exit 1
fi
SITES_AVAILABLE="/etc/nginx/sites-available"
SITES_ENABLED="/etc/nginx/sites-enabled"
CONFIG_FILENAME="443-ssl-to-8000"
CERT_PUB="/etc/ssl/certs/ssl-cert-snakeoil.pem"
CERT_KEY="/etc/ssl/private/ssl-cert-snakeoil.key"
set -e # die on error
set -u # die if some var not set
if [[ ! -d "$SITES_AVAILABLE" ]] || [[ ! -d "$SITES_AVAILABLE" ]]; then
echo "Is nginx installed?"
exit 1
fi
if [[ ! -e "$CERT_PUB" ]] || [[ ! -e "$CERT_KEY" ]]; then
echo "No snakeoil certs?"
exit 1
fi
echo "Seting up site config..."
sudo cat > "$SITES_AVAILABLE/$CONFIG_FILENAME" << EOF
upstream localhost8000 {
server localhost:8000;
}
server {
listen 443;
ssl on;
ssl_certificate $CERT_PUB;
ssl_certificate_key $CERT_KEY;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
ssl_ciphers "ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-SHA384:ECDHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-SHA:ECDHE-RSA-AES128-SHA:DHE-RSA-AES256-SHA256:DHE-RSA-AES128-SHA256:DHE-RSA-AES256-SHA:DHE-RSA-AES128-SHA:ECDHE-RSA-DES-CBC3-SHA:EDH-RSA-DES-CBC3-SHA:AES256-GCM-SHA384:AES128-GCM-SHA256:AES256-SHA256:AES128-SHA256:AES256-SHA:AES128-SHA:DES-CBC3-SHA:HIGH:!aNULL:!eNULL:!EXPORT:!DES:!MD5:!PSK:!RC4";
location / {
proxy_pass http://localhost8000;
proxy_set_header Host \$host;
proxy_set_header X-Forwarded-For \$remote_addr;
proxy_set_header X-Forwarded-Proto "https";
proxy_set_header REMOTE-USER \$remote_user;
}
}
EOF
set +e # stop dying on errors now
sudo ln -f -s "$SITES_AVAILABLE/$CONFIG_FILENAME" "$SITES_ENABLED/"
echo "Restarting nginx..."
sudo systemctl restart nginx
if [[ "`systemctl is-active nginx`" == "active" ]]; then
echo "http://localhost:8000 with SSL is here: https://localhost"
else
echo "Oops. Seems like nginx is dead now."
echo
systemctl status nginx | cat
echo
fi
exit
#### Minimal Intro:
- [SSO](https://lmddgtfy.net/?q=SSO): Single Sign On
- SP: Service Provider (your client)
- IdP: Identity Provider (server, Shibboleth)
- Metadata/Meta: an XML that describes SP or IdP (or another entity)
- SAML/SAML2: It's just another XML-based enterprise-grade standard that will make you cry blood
#### Integrating the app into your Django project.
- Binary dependencies: `sudo apt install libxml2-dev libxslt1-dev xmlsec1 libxmlsec1-dev pkg-config`
- Python dependencies: see `requirements.txt` or `setup.py`
- Add the app into `INSTALLED_APPS`
- Include the app's `urls.py` into the project `urls.py` `urlpatterns`
- Add to the project settings
- ```python
SP_HOST = "141.71.foo.bar" # your SP host or IP address
IDP_META_URL = "https://idp-test.it.hs-hannover.de/idp/shibboleth" # development
# IDP_META_URL = "https://idp.hs-hannover.de/idp/shibboleth" # production
```
- generate a new key pair:
- create `cert` directory:
- inside of project `settings` directory if it's a package
- next to project `settings.py` file if it's a module
- `openssl req -new -x509 -days 3650 -nodes -out sp.pem -keyout sp.key`
- `./cert/sp.key` put the private key here
- `./cert/sp.pem` put the certificate here, signing is optional
#### Development setup
- `sudo apt-get install nginx`
- run `.dev-config/nginx-https-config.sh`
- in case you already have port 443 busy, use your force to solve the issue :)
#### Integrating your project into the existing SSO infrastructure
- **Ask somebody who knows more. Seriously.**
- **Try on the test IdP before you brick the production!**
- Grab your meta
- Run your project.
- Find meta of your SP (relative path `/saml2/meta` or view name `sso-saml2-meta`)
- **if using Firefox:** use "show source code" option or you will get invalid XML
- configure the IdP:
- `SSH` to the IdP and locate the Shibboleth directory, most likely `/opt/shibboleth-idp/`
- put your meta into a new file in `./metadata/` and give it a nice verbose name
- edit `./conf/metadata-providers.xml`
- create a new `<MetadataProvider .../>` element, your best guess is to copy an existing line that belongs to some existing Django project
- `id` should be unique
- `metadataFile` should point on your new metadata
- edit `./conf/attribute-filter.xml`
- add a new `<Rule .../>` element inside of `<AttributeFilterPolicy id="releaseToDjango">`
- `value` (looks like URI) should be the `entityID` of your SP (you find it in your meta)
- `systemctl restart tomcat8 & tail -fn0 /opt/shibboleth-idp/logs/idp-warn.log` (you now have some time to grab you a coffee)
Django>=1.11,<2.0
python3-saml
setup.py 0 → 100644
import os
from setuptools import setup, find_packages
from pip.req import parse_requirements
README = open(os.path.join(os.path.dirname(__file__), "README.md")).read()
# automagic
requirements_parsed = parse_requirements(open(os.path.join(os.path.dirname(__file__), "requirements.txt")))
requirements_list = [str(r.req) for r in requirements_parsed]
# allow setup.py to be run from any path
os.chdir(os.path.normpath(os.path.join(os.path.abspath(__file__), os.pardir)))
setup(
name='django-ssoauth',
version='1.0',
packages=find_packages(),
include_package_data=True,
license='BSD',
description='SSO using SAML2',
long_description=README,
url='https://lab.it.hs-hannover.de/django/ssoauth',
author='Art Lukyanchyk',
author_email='artiom.lukyanchyk@hs-hannover.de',
zip_safe=False,
install_requires=requirements_list,
classifiers=[
'Environment :: Web Environment',
'Framework :: Django',
'Intended Audience :: Developers',
'License :: OSI Approved :: BSD License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 3.4',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
],
)
import logging
from . import checks # As for Django 1.11 it still doesn't auto-import checks >.<
assert checks
logger = logging.getLogger("ssoauth")
default_app_config = "ssoauth.apps.SSOAuthConfig"
from onelogin.saml2 import settings as onelogin_settings
from .defaults import *
from django import conf
# merge defaults with customized user settings
for setting_name in [k for k in globals().keys() if k.isupper()]:
for name in ["SSOAUTH_" + setting_name, setting_name]:
try:
globals()[setting_name] = getattr(conf.settings, name)
continue
except (KeyError, AttributeError):
pass # not set
# checks
assert SP_HOST and SP_PORT, "Need SP_HOST and SP_PORT configured in settings."
assert not SP_HOST.startswith(("127.", "::1", "localhost",)), "Too many localhosts around. Need a real hostname or IP address."
assert not SP_HOST.lower().startswith(("http:", "https:",)), "Need host name without a protocol."
# helpers
_SET_ON_RUNTIME = None and "doesn't make sense to change it"
def get_project_settings_path():
module = os.environ.get(conf.ENVIRONMENT_VARIABLE)
path = os.path.abspath(os.path.join(*module.split(".")))
if not os.path.isdir(path):
# module, not a package
path = os.path.dirname(path)
return path
def read_key(path):
path = path.format(project_settings=get_project_settings_path())
try:
with open(path, "r") as f:
return f.read()
except FileNotFoundError:
raise FileNotFoundError("SSO requires a key pair. Missing: {path}".format(path=path))
# template for OneLogin toolkit settings
ONELOGIN_SETTINGS_TEMPLATE = {
# Don't change it (you cannot in fact)
# If you really need to adjust something here, please use ONELOGIN_OVERRIDES instead
"strict": True,
"debug": conf.settings.DEBUG,
"sp": {
"entityId": _SET_ON_RUNTIME or str(),
"assertionConsumerService": {
"url": _SET_ON_RUNTIME or str(),
},
"x509cert": read_key(SP_CERT),
"privateKey": read_key(SP_KEY),
"NameIDFormat": onelogin_settings.OneLogin_Saml2_Constants.NAMEID_PERSISTENT, # otherwise Shibboleth shows warnings
},
"idp": {
"entityId": IDP_META_URL,
"x509certMulti": {
"signing": _SET_ON_RUNTIME or dict(),
"encryption": _SET_ON_RUNTIME or dict(),
},
"singleSignOnService": {
"url": _SET_ON_RUNTIME or str(),
"binding": onelogin_settings.OneLogin_Saml2_Constants.BINDING_HTTP_REDIRECT,
},
"singleLogoutService": {
"url": _SET_ON_RUNTIME or str(),
"binding": onelogin_settings.OneLogin_Saml2_Constants.BINDING_HTTP_REDIRECT,
}
},
"security": {
"nameIdEncrypted": False,
"authnRequestsSigned": True,
"logoutRequestSigned": True,
"logoutResponseSigned": False,
"signMetadata": True,
"wantMessagesSigned": True,
"wantAssertionsSigned": True,
"wantNameId": True,
"wantNameIdEncrypted": False,
"wantAssertionsEncrypted": True,
"signatureAlgorithm": "http://www.w3.org/2001/04/xmldsig-more#rsa-sha512",
"metadataCacheDuration": SP_METADATA_LIFETIME,
},
"contactPerson": SP_CONTACTS,
"organization": SP_ORGANIZATION,
}
ONELOGIN_SETTINGS_TEMPLATE.update(ONELOGIN_OVERRIDES)
ONELOGIN_SETTINGS = _SET_ON_RUNTIME
import os
"""
If you want to change something:
- optionally prefix the setting with "SSOAUTH_"
- put it into your project settings
"""
"""
Settings you want to change:
"""
# host and port, not what Django thinks, but what nginx serves
SP_HOST = None # e.g. "141.71.113.1"
SP_PORT = 443
IDP_META_URL = "https://idp.hs-hannover.de/idp/shibboleth" # test is "https://idp-test.it.hs-hannover.de/idp/shibboleth"
"""
Settings you DON'T want to change (in fact, you want to avoid even thinking about them):
"""
IDP_REQUIRED = True # die on start if cannot find IDP or parse its meta
SP_KEY = "{project_settings}/cert/sp.key"
SP_CERT = "{project_settings}/cert/sp.pem"
SP_METADATA_LIFETIME = "P20Y" # "P7D"-like (https://www.w3.org/TR/xmlschema-2/#duration)
# if you really really need to add/modify something in OneLogin settings, add it to ONELOGIN_OVERRIDES
ONELOGIN_OVERRIDES = {} # e.g.: ONELOGIN_OVERRIDES = { "strict": False }
PROJECT_NAME = os.environ.get('DJANGO_SETTINGS_MODULE').split('.')[0]
SU_GROUP_NAME = "{0}_superusers".format(PROJECT_NAME)
SU_PERM_NAME = "superuser"
STAFF_GROUP_NAME = "{0}_staff".format(PROJECT_NAME)
STAFF_PERM_NAME = "staff"
"""
Not settings... just... Eww.
"""
SP_CONTACTS = {
"technical": {
"givenName": "Service Desk",
"emailAddress": "mailto:support-it@hs-hannover.de",
},
"support": {
"givenName": "Service Desk",
"emailAddress": "mailto:support-it@hs-hannover.de",
},
"administrative": {
"givenName": "Service Desk",
"emailAddress": "mailto:support-it@hs-hannover.de",
},
}
SP_ORGANIZATION = {
"de": {
"name": "Hochschule Hannover",
"displayname": "Hochschule Hannover",
"url": "https://www.hs-hannover.de",
},
"en": {
"name": "Hannover University of Applied Sciences and Arts",
"displayname": "Hannover University of Applied Sciences and Arts",
"url": "https://www.hs-hannover.de",
},
}
from django.apps import AppConfig
from django.core import management
from django.db.models.signals import post_migrate
from . import logger
from . import sso_utils
from . import app_settings
class SSOAuthConfig(AppConfig):
name = 'ssoauth'
def ready(self, *args, **kwargs):
super().ready(*args, **kwargs)
# OneLogin settings stuff
try:
app_settings.ONELOGIN_SETTINGS = sso_utils.create_onelogin_settings(app_settings.ONELOGIN_SETTINGS_TEMPLATE)
except Exception as e:
if app_settings.IDP_REQUIRED:
raise e
else:
logger.error("SSO will not work. {ec}: {e}".format(ec=e.__class__.__name__, e=str(e),))
# default groups
post_migrate.connect(self.post_migrate_callback, sender=self)
# ...
logger.warning("Need to ensure user.has_usable_password is False")
@staticmethod
def post_migrate_callback(*args, **kwargs):
management.call_command("create_compat_groups")
from django.db import transaction
from django.db.models import Q
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from uuid import UUID
from . import models
from . import logger
from . import app_settings
import functools
import re
EXTRACT_RDN = re.compile(r"^\w+=(\w+),.+$", re.IGNORECASE) # RDN is in the first regex group
def _validate_username(username):
assert isinstance(username, str)
assert username.islower()
def get_user(uuid=None, username=None):
""" This helper just gets a user instance. """
assert bool(uuid) ^ bool(username), "Need uuid OR username"
if uuid:
assert isinstance(uuid, UUID), "Not an UUID instance"
return models.UserMapping.objects.get(uuid=uuid).user
elif username:
assert isinstance(username, str), "Seriously?"
return get_user_model().objects.get(username__iexact=username.lower())
@transaction.atomic()
def get_or_create_user(uuid, username):
"""
Returns a user.
Is able to process many weird cases like changed username or re-created user in DS.
If it dies with an exception, with 90% probability the data state is very bad.
"""
def get_user_by_uuid(uuid, username):
try:
user = get_user(uuid=uuid)
if user.username != username:
# have to do it because users might be renamed
logger.warning("Usernames mismatch. Changing from {0} to {1}.".format(user.username, username))
user.username = username
user.save()
return user
except models.UserMapping.DoesNotExist:
return None
def get_user_by_username(uuid, username):
try:
user = get_user(username=username)
except get_user_model().DoesNotExist:
return None
try:
meta = models.UserMapping.objects.get(user=user)
logger.warning("User UUID changed. Assuming re-created in DS. Automagically fixing...")
meta.uuid = uuid
meta.save()
except models.UserMapping.DoesNotExist:
logger.Error("User {username} lost their meta. Auto-fixing and hoping for the best.".format(**locals()))
models.UserMapping.objects.create(user=user, uuid=uuid)
return user
def create_user(uuid, username):
_validate_username(username)
user = get_user_model().objects.create(username=username, is_staff=False)
models.UserMapping.objects.create(user=user, uuid=uuid)
logger.info("Created user: {username} {uuid}".format(**locals()))
return user
# checks and casts
if isinstance(uuid, str):
uuid = UUID(uuid)
assert isinstance(uuid, UUID) and isinstance(username, str), "Bad arguments"
username = username.lower()
# get or create
user = get_user_by_uuid(uuid, username) # best case scenario
if not user:
user = get_user_by_username(uuid, username) # worst case scenario
if not user:
user = create_user(uuid, username) # create if not present
# sanity check
if user and user.user_permissions.all().count():
logger.error("Who assigned permissions directly to user {user}?! Removing: {perms}".format(
user=user, perms=", ".join(str(p) for p in user.user_permissions)))
user.user_permissions.clear()
return user
def update_user_data(user, surname=None, forename=None, email=None):
""" Updates User instance with data received from SSO. """
assert isinstance(user, get_user_model())
user.first_name = forename or str()
user.last_name = surname or str()
user.email = email or None
user.save()
def extract_rdn(dn):
""" Extracts group name from the DN. """
match = EXTRACT_RDN.search(dn)
assert match, "Received something weird instead of a DN: {group_dn}".format(**locals())
return match.group(1)
def set_user_groups(user, group_dn_list):
""" Updates groups for the user. """
# using Q to create ignore-case DN lookup since DS is case insensitive
q_list = [Q(sso_mapping__dn__iexact=dn) for dn in group_dn_list]
if q_list:
q_combined = functools.reduce(lambda q1, q2: q1 | q2, q_list)
groups = list(Group.objects.filter(q_combined))
else:
groups = list()
# modify user groups
if set(user.groups.all()) != set(groups):
user.groups.set(groups)
assert set(user.groups.all()) == set(groups) # dunno how relation.set() behaves, better safe than sorry
logger.info("Groups for {user} are updated to: {groups}".format(user=user, groups=", ".join(g.name for g in groups)))
logger.debug("User {user} has {g_n} group(s) based on {dn_n} DN(s): {g_names}".format(
user=user, g_n=len(groups), g_names=", ".join(str(g) for g in groups), dn_n=len(group_dn_list)))
def set_user_compat_flags(user):
is_active = True
user.is_staff = False
user.is_superuser = False
try:
group_su = Group.objects.get(name=app_settings.SU_GROUP_NAME)
group_staff = Group.objects.get(name=app_settings.STAFF_GROUP_NAME)
if group_su in user.groups.all():
logger.info("User {user} is superuser.".format(**locals()))
user.is_staff = True
user.is_superuser = True
if group_staff in user.groups.all():
logger.info("User {user} has admin access.".format(**locals()))
user.is_staff = True
except Group.DoesNotExist:
logger.error("No compat groups. Migrate or run management command create_compat_groups.")
user.save()
from django.core.checks import Tags, Error, Warning, register
from django.contrib.auth import get_user_model
from django.db.utils import OperationalError, ProgrammingError
from django import conf
from django import urls
def _get_abstract_user():
""" Cannot import when loading the module. """
from django.contrib.auth.models import AbstractUser
return AbstractUser
def _ignore_db_errors(function):
""" Needed for checks that read data. """
def wrapper(*args, **kwargs):
try:
return function(*args, **kwargs)
except (OperationalError, ProgrammingError,) as e:
return [Warning("Could not perform a check. Not migrated yet?", obj=function.__name__)]
return wrapper
@register(Tags.security)
@_ignore_db_errors
def no_passwords_stored(app_configs, **kwargs):
errors = list()
user_model = get_user_model()
users_with_password = user_model.objects.exclude(password__isnull=True).exclude(password="")
if users_with_password:
errors.append(Error(
"Some users have their password stored in the database: {}".format(", ".join(u.username for u in users_with_password)),
obj=user_model
))
for user in users_with_password:
user.password = str()
user.save()
return errors
@register(Tags.compatibility)
def compatible_user_model(app_configs, **kwargs):
errors = list()
if not issubclass(get_user_model(), _get_abstract_user()):
errors.append(Error(
"{} is probably incompatible with ssoauth.".format(get_user_model()),
obj=get_user_model(),
))
return errors
@register(Tags.security)
@_ignore_db_errors
def no_direct_user_permissions(app_configs, **kwargs):
errors = list()
qs_bad_users = get_user_model().objects.filter(user_permissions__isnull=False)
if qs_bad_users.count() is not 0:
errors.append(Error(
"Detected directly assigned permissions. Truncate the User<->Permission table. Investigate the reason. " +
"Bad users: {0}".format(", ".join(u.username for u in qs_bad_users)),
obj=get_user_model(),
))
return errors
@register(Tags.urls)
def auth_urls_configured(app_configs, **kwargs):
errors = list()
def _url_from_setting(setting_name):
# get url based on django setting name, handles most of cases of something going wrong
try:
setting_value = getattr(conf.settings, setting_name, None) # setting exists
assert urls.resolve(setting_value) # page exists
return setting_value
except Exception as e:
errors.append(Warning("{ec}: {e}".format(ec=e.__class__.__name__, e=str(e)), obj=conf.settings,))
return None
# login
login_url_needed = urls.reverse("sso-login")
login_url_current = _url_from_setting("LOGIN_URL")
if login_url_current != login_url_needed:
errors.append(Warning(
"LOGIN_URL must point on {login_url_needed}, not on {login_url_current}. Add the following to your project settings: "
"LOGIN_URL = urls.reverse_lazy(\"sso-login\")".format(**locals()),
obj=conf.settings,
))
# default landing and logout pages
for setting_name in ("LOGIN_REDIRECT_URL", "LOGOUT_REDIRECT_URL",):
if not _url_from_setting(setting_name):
errors.append(Warning("{setting_name} is not found or invalid.".format(**locals()), obj=conf.settings,))
return errors
from django.core.management.base import BaseCommand, CommandError
from django.contrib.auth.models import Group, Permission
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth import get_user_model
from ... import app_settings
from ... import logger
class Command(BaseCommand):
help = "Creates groups required for django.contrib.auth compatibility."
requires_migrations_checks = True
requires_system_checks = True
def handle(self, *args, **options):
try:
self.ensure_group_exists(app_settings.SU_GROUP_NAME, [app_settings.SU_PERM_NAME])
self.ensure_group_exists(app_settings.STAFF_GROUP_NAME, [app_settings.STAFF_PERM_NAME])
except Exception as e:
raise CommandError("Could not ensure that compatibility groups and permissions exist. {0}".format(str(e)))
@staticmethod
def ensure_group_exists(group_name, permission_names=list()):
"""
Ensures the group with the given permissions exists.
Creates the group and/or permissions if needed.
Adds permissions to the group if needed (but never removes any).
"""
group, created = Group.objects.get_or_create(name=group_name)
if created:
logger.info("Created group '{}'".format(group))
for permission_name in permission_names:
permission, created = Permission.objects.get_or_create(
codename=permission_name,
content_type_id=ContentType.objects.get_for_model(get_user_model()).id,
)
if created:
logger.info("Created permission '{}'".format(permission.codename))
if permission not in group.permissions.all():
group.permissions.add(permission)
logger.info("Added permission '{1}' to group '{0}'".format(group, permission.codename))
from django.core.management.base import BaseCommand, CommandError
from django.contrib.auth.models import Group, Permission
from ... models import GroupMapping
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth import get_user_model
from ... import app_settings
from ... import logger
class Command(BaseCommand):
help = "Modify group mapping. You can also do it from your project admin."
requires_migrations_checks = True
requires_system_checks = True
def add_arguments(self, parser):
parser.add_argument(
"action",
action="store",
choices=["list", "add", "del", "?"],
help="action",
)
parser.add_argument(
"name",
action="store",
default=None,
nargs="?",
help="local group name (in this project)",
)
parser.add_argument(
"dn",
action="store",
default=None,
nargs="?",
help="group DN in a directory as returned by Shibboleth",
)
def handle(self, *args, **options):
action, name, dn = options["action"], options["name"], options["dn"]
if action == "list":
self.print_mapping()
elif action == "add":
if not name or not dn:
return logger.error("Need group name and DN.")
self.add_mapping(name, dn)
self.print_mapping()
elif action == "del":
if not name:
return logger.error("Need group name.")
self.delete_mapping(name)
else:
self.print_help(str(), str())
def print_mapping(self):
groups = Group.objects.all()
mapped = groups.filter(sso_mapping__isnull=False)
unmapped = groups.filter(sso_mapping__isnull=True)
logger.info("There {g} groups, {m} mapped and {u} unmapped:".format(g=len(groups), m=len(mapped), u=len(unmapped)))
for group in groups.order_by("-sso_mapping", "name"):
name = group.name
try:
dn = group.sso_mapping.dn
except GroupMapping.DoesNotExist:
dn = None
logger.info(" {name:<50} {dn}".format(**locals()))
def delete_mapping(self, group_name):
mapping = self._get_mapping_for(group_name)
if mapping:
logger.info("Deleting {mapping}.".format(**locals()))
mapping.delete()
else:
logger.error("There is no mapping for {group_name}.".format(**locals()))
def add_mapping(self, group_name, dn):
# group
try:
group = Group.objects.get(name__iexact=group_name)
logger.info("Existing group {group} will be used.".format(**locals()))
except Group.DoesNotExist:
logger.info("Group {group_name} will be created.".format(**locals()))
group = Group.objects.create(name=group_name)
# mapping
existing = self._get_mapping_for(group_name)
if existing:
logger.warning("Removing existing mapping {existing}".format(**locals()))
existing.delete()
new = GroupMapping.objects.create(group=group, dn=dn)
logger.info("Created {new}.".format(**locals()))
def _get_mapping_for(self, group_name):
try:
return GroupMapping.objects.get(group__name__iexact=group_name)
except GroupMapping.DoesNotExist:
return None
# raise CommandError("Could not ensure that compatibility groups and permissions exist. {0}".format(str(e)))
# @staticmethod
# def ensure_group_exists(group_name, permission_names=list()):
# """
# Ensures the group with the given permissions exists.
# Creates the group and/or permissions if needed.
# Adds permissions to the group if needed (but never removes any).
# """
# group, created = Group.objects.get_or_create(name=group_name)
# if created:
# logger.info("Created group '{}'".format(group))
# for permission_name in permission_names:
# permission, created = Permission.objects.get_or_create(
# codename=permission_name,
# content_type_id=ContentType.objects.get_for_model(get_user_model()).id,
# )
# if created:
# logger.info("Created permission '{}'".format(permission.codename))
# if permission not in group.permissions.all():
# group.permissions.add(permission)
# logger.info("Added permission '{1}' to group '{0}'".format(group, permission.codename))
# -*- coding: utf-8 -*-
# Generated by Django 1.11.3 on 2017-08-14 16:16
from __future__ import unicode_literals
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0008_alter_user_username_max_length'),
]
operations = [
migrations.CreateModel(
name='GroupMapping',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('dn', models.CharField(max_length=1000)),
('created_on', models.DateField(auto_now_add=True)),
('group', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='sso_mapping', to='auth.Group')),
],
),
migrations.CreateModel(
name='UserMapping',
fields=[
('user', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, primary_key=True, related_name='sso_mapping', serialize=False, to=settings.AUTH_USER_MODEL)),
('uuid', models.UUIDField()),
('anonymized', models.BooleanField(default=False)),
('imported_on', models.DateField(auto_now_add=True)),
],
),
]
from django.db import models
from django import conf
class UserMapping(models.Model):
user = models.OneToOneField(conf.settings.AUTH_USER_MODEL, primary_key=True, on_delete=models.CASCADE, related_name="sso_mapping")
uuid = models.UUIDField(null=False)
anonymized = models.BooleanField(null=False, default=False)
imported_on = models.DateField(null=False, auto_now_add=True)
def __str__(self):
return "{user} <-> {uuid}".format(user=self.user, uuid=self.uuid)
class GroupMapping(models.Model):
group = models.OneToOneField("auth.Group", null=False, on_delete=models.CASCADE, related_name="sso_mapping")
dn = models.CharField(max_length=1000, null=False, blank=False)
created_on = models.DateField(null=False, auto_now_add=True)
def __str__(self):
return "{group} <-> \"{dn}\"".format(group=self.group, dn=self.dn)
import urllib.request
from xml.etree import ElementTree
from . import logger
from . import app_settings
from django import urls
from onelogin.saml2.settings import OneLogin_Saml2_Settings
from copy import copy
def get_idp_runtime_info(meta_url):
""" Grab data directly from the running IDP instance """
# fetch the meta
try:
io = urllib.request.urlopen(meta_url, timeout=10)
except Exception as e:
logger.error("Could not download IDP meta.")
raise e
# parse the xml
try:
idp_meta = ElementTree.parse(io)
except Exception as e:
logger.error("Could not parse IDP meta.")
raise e
# namespaces
namespaces = {
"ds": "http://www.w3.org/2000/09/xmldsig#",
"saml2": "urn:oasis:names:tc:SAML:2.0:metadata",
}
for prefix, uri in namespaces.items():
ElementTree.register_namespace(prefix, uri)
# get the required data
signing = idp_meta.findall(".//*[@use='signing']//ds:X509Certificate", namespaces)
encryption = idp_meta.findall(".//*[@use='encryption']//ds:X509Certificate", namespaces)
bindings_sso_redirect = idp_meta.findall(".//saml2:SingleSignOnService[@Binding='urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect']", namespaces)
bindings_slo_redirect = idp_meta.findall(".//saml2:SingleLogoutService[@Binding='urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect']", namespaces)
# log and check
logger.debug("From the IDP metadata received {s} signing certs, {e} encryption certs, {sso} SSO bindings, {slo} SLO bindings.".format(
s=len(signing), e=len(encryption), sso=len(bindings_sso_redirect), slo=len(bindings_slo_redirect),
))
assert signing and encryption, "Could not find certificates in IDP meta."
assert bindings_sso_redirect is not None, "Could not find SSO HTTP-Redirect binding."
# pack the received data
return {
"certificates": {
"signing": [e.text for e in signing],
"encryption": [e.text for e in encryption]
},
"bindings": {
"sso_redirect": bindings_sso_redirect[0].get("Location"),
"slo_redirect": bindings_slo_redirect[0].get("Location") if bindings_slo_redirect else None,
}
}
def create_onelogin_settings(template):
""" This function is intended to be run only once, on app startup. Raises exceptions. """
template = copy(template)
# SP settings
this_host = "https://{host}".format(host=app_settings.SP_HOST)
if app_settings.SP_PORT != 443:
this_host += ":{}".format(app_settings.SP_PORT)
template["sp"]["entityId"] = this_host + urls.reverse("sso-saml2-meta")
template["sp"]["assertionConsumerService"]["url"] = this_host + urls.reverse("sso-saml2-acs")
# IDP settings
idp_info = get_idp_runtime_info(app_settings.IDP_META_URL)
template["idp"]["x509certMulti"]["signing"] = idp_info["certificates"]["signing"]
template["idp"]["x509certMulti"]["encryption"] = idp_info["certificates"]["encryption"]
template["idp"]["singleSignOnService"]["url"] = idp_info["bindings"]["sso_redirect"]
template["idp"]["singleLogoutService"]["url"] = idp_info["bindings"]["slo_redirect"]
return OneLogin_Saml2_Settings(settings=template, sp_validation_only=True)
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Debug info</title>
<link rel="stylesheet" href="//fonts.googleapis.com/css?family=Roboto:300,300italic,700,700italic">
<link rel="stylesheet" href="//cdn.rawgit.com/milligram/milligram/master/dist/milligram.min.css">
<style>
.hidden {display: none !important;}
.wrapper>.container { padding-bottom: 2rem; padding-top: 2rem; }
</style>
</head>
<body>
<main class="wrapper">
<section class="container">
<h2 class="title">Actions</h2>
<form action="" method="post">
{% csrf_token %}
<div class="row">
<div class="column">
<h4 class="title">Log In</h4>
<input type="text" name="username" id="id_username" placeholder="username" autofocus>
<p><i>Log in as any user</i></p>
</div>
<div class="column">
<h4 class="title">Toggle Groups</h4>
<select name="toggle_group" class="disabled" id="id_toggle_group" onchange="this.form.submit()">
{% for value, name in form.fields.toggle_group.choices %}
<option value="{{ value }}">{% if value %}{{ name }}{% endif %}</option>
{% endfor %}
</select>
<p><i>Add or remove groups for the logged in user</i></p>
</div>
<div class="column">
<h4 class="title">Production Actions</h4>
<a class="button button-outline button-black" href="{% url "sso-login" %}">SSO Log in</a>
<a class="button button-outline button-black" href="{% url "sso-logout" %}">Log out</a>
<p><i>These actions are used in production</i></p>
</div>
</div>
<input class="hidden" type="submit">
</form>
</section>
{% for table_name, table_contents in tables %}
<br/>
<section class="container">
<h2 class="title">{{ table_name }}</h2>
<table class="pure-table">
{% for k, v in table_contents.items %}
<tr><td>{{ k }}</td><td>{{ v }}</td>
{% empty %}
<tr><td>(empty)</td></tr>
{% endfor %}
</table>
</section>
<br/>
{% endfor %}
</main>
</body>
</html>
from django.conf.urls import url
from . import views
urlpatterns = (
url(r"^login/?$", views.LogInView.as_view(), name="sso-login"),
url(r"^logout/?$", views.LogOutView.as_view(), name="sso-logout"),
url(r"^saml2/acs/?$", views.ACSAuthNView.as_view(), name="sso-saml2-acs"),
url(r"^saml2/meta/?$", views.MetadataView.as_view(), name="sso-saml2-meta"),
url(r"^d(?:ev)?/?$", views.DevView.as_view(), name="sso-dev"),
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment