/usr/share/pyshared/social_auth/views.py is in python-django-social-auth 0.7.23-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | """Views
Notes:
* Some views are marked to avoid csrf tocken check because they rely
on third party providers that (if using POST) won't be sending csrf
token back.
"""
from urllib2 import quote
from django.http import HttpResponseRedirect, HttpResponse
from django.contrib.auth import login, REDIRECT_FIELD_NAME
from django.contrib.auth.decorators import login_required
from django.contrib import messages
from django.views.decorators.csrf import csrf_exempt
from social_auth.utils import sanitize_redirect, setting, \
backend_setting, clean_partial_pipeline
from social_auth.decorators import dsa_view, disconnect_view
DEFAULT_REDIRECT = setting('SOCIAL_AUTH_LOGIN_REDIRECT_URL',
setting('LOGIN_REDIRECT_URL'))
LOGIN_ERROR_URL = setting('LOGIN_ERROR_URL', setting('LOGIN_URL'))
PIPELINE_KEY = setting('SOCIAL_AUTH_PARTIAL_PIPELINE_KEY', 'partial_pipeline')
@dsa_view(setting('SOCIAL_AUTH_COMPLETE_URL_NAME', 'socialauth_complete'))
def auth(request, backend):
"""Start authentication process"""
return auth_process(request, backend)
@csrf_exempt
@dsa_view()
def complete(request, backend, *args, **kwargs):
"""Authentication complete view, override this view if transaction
management doesn't suit your needs."""
if request.user.is_authenticated():
return associate_complete(request, backend, *args, **kwargs)
else:
return complete_process(request, backend, *args, **kwargs)
@login_required
def associate_complete(request, backend, *args, **kwargs):
"""Authentication complete process"""
# pop redirect value before the session is trashed on login()
redirect_value = request.session.get(REDIRECT_FIELD_NAME, '')
user = auth_complete(request, backend, request.user, *args, **kwargs)
if not user:
url = backend_setting(backend, 'LOGIN_ERROR_URL', LOGIN_ERROR_URL)
elif isinstance(user, HttpResponse):
return user
else:
url = redirect_value or \
backend_setting(backend,
'SOCIAL_AUTH_NEW_ASSOCIATION_REDIRECT_URL') or \
DEFAULT_REDIRECT
return HttpResponseRedirect(url)
@login_required
@dsa_view()
@disconnect_view
def disconnect(request, backend, association_id=None):
"""Disconnects given backend from current logged in user."""
backend.disconnect(request.user, association_id)
url = request.REQUEST.get(REDIRECT_FIELD_NAME, '') or \
backend_setting(backend, 'SOCIAL_AUTH_DISCONNECT_REDIRECT_URL') or \
DEFAULT_REDIRECT
return HttpResponseRedirect(url)
def auth_process(request, backend):
"""Authenticate using social backend"""
data = request.POST if request.method == 'POST' else request.GET
# Save extra data into session.
for field_name in setting('SOCIAL_AUTH_FIELDS_STORED_IN_SESSION', []):
if field_name in data:
request.session[field_name] = data[field_name]
# Save any defined next value into session
if REDIRECT_FIELD_NAME in data:
# Check and sanitize a user-defined GET/POST next field value
redirect = data[REDIRECT_FIELD_NAME]
if setting('SOCIAL_AUTH_SANITIZE_REDIRECTS', True):
redirect = sanitize_redirect(request.get_host(), redirect)
request.session[REDIRECT_FIELD_NAME] = redirect or DEFAULT_REDIRECT
# Clean any partial pipeline info before starting the process
clean_partial_pipeline(request)
if backend.uses_redirect:
return HttpResponseRedirect(backend.auth_url())
else:
return HttpResponse(backend.auth_html(),
content_type='text/html;charset=UTF-8')
def complete_process(request, backend, *args, **kwargs):
"""Authentication complete process"""
# pop redirect value before the session is trashed on login()
redirect_value = request.session.get(REDIRECT_FIELD_NAME, '') or \
request.REQUEST.get(REDIRECT_FIELD_NAME, '')
user = auth_complete(request, backend, *args, **kwargs)
if isinstance(user, HttpResponse):
return user
if not user and request.user.is_authenticated():
return HttpResponseRedirect(redirect_value)
msg = None
if user:
if getattr(user, 'is_active', True):
# catch is_new flag before login() might reset the instance
is_new = getattr(user, 'is_new', False)
login(request, user)
# user.social_user is the used UserSocialAuth instance defined
# in authenticate process
social_user = user.social_user
if redirect_value:
request.session[REDIRECT_FIELD_NAME] = redirect_value or \
DEFAULT_REDIRECT
if setting('SOCIAL_AUTH_SESSION_EXPIRATION', True):
# Set session expiration date if present and not disabled by
# setting. Use last social-auth instance for current provider,
# users can associate several accounts with a same provider.
expiration = social_user.expiration_datetime()
if expiration:
try:
request.session.set_expiry(expiration)
except OverflowError:
# Handle django time zone overflow, set default expiry.
request.session.set_expiry(None)
# store last login backend name in session
request.session['social_auth_last_login_backend'] = \
social_user.provider
# Remove possible redirect URL from session, if this is a new
# account, send him to the new-users-page if defined.
new_user_redirect = backend_setting(backend,
'SOCIAL_AUTH_NEW_USER_REDIRECT_URL')
if new_user_redirect and is_new:
url = new_user_redirect
else:
url = redirect_value or \
backend_setting(backend,
'SOCIAL_AUTH_LOGIN_REDIRECT_URL') or \
DEFAULT_REDIRECT
else:
msg = setting('SOCIAL_AUTH_INACTIVE_USER_MESSAGE', None)
url = backend_setting(backend, 'SOCIAL_AUTH_INACTIVE_USER_URL',
LOGIN_ERROR_URL)
else:
msg = setting('LOGIN_ERROR_MESSAGE', None)
url = backend_setting(backend, 'LOGIN_ERROR_URL', LOGIN_ERROR_URL)
if msg:
messages.error(request, msg)
if redirect_value and redirect_value != url:
redirect_value = quote(redirect_value)
if '?' in url:
url += '&%s=%s' % (REDIRECT_FIELD_NAME, redirect_value)
else:
url += '?%s=%s' % (REDIRECT_FIELD_NAME, redirect_value)
return HttpResponseRedirect(url)
def auth_complete(request, backend, user=None, *args, **kwargs):
"""Complete auth process. Return authenticated user or None."""
if user and not user.is_authenticated():
user = None
if request.session.get(PIPELINE_KEY):
data = request.session.pop(PIPELINE_KEY)
kwargs = kwargs.copy()
if user:
kwargs['user'] = user
idx, xargs, xkwargs = backend.from_session_dict(data, request=request,
*args, **kwargs)
if 'backend' in xkwargs and \
xkwargs['backend'].name == backend.AUTH_BACKEND.name:
return backend.continue_pipeline(pipeline_index=idx,
*xargs, **xkwargs)
return backend.auth_complete(user=user, request=request, *args, **kwargs)
|