/usr/lib/python2.7/dist-packages/social/backends/weixin.py is in python-social-auth 1:0.2.21+dfsg-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | # -*- coding: utf-8 -*-
# author:duoduo3369@gmail.com https://github.com/duoduo369
"""
Weixin OAuth2 backend
"""
import urllib
from requests import HTTPError
from social.backends.oauth import BaseOAuth2
from social.exceptions import AuthCanceled, AuthUnknownError
class WeixinOAuth2(BaseOAuth2):
"""Weixin OAuth authentication backend"""
name = 'weixin'
ID_KEY = 'openid'
AUTHORIZATION_URL = 'https://open.weixin.qq.com/connect/qrconnect'
ACCESS_TOKEN_URL = 'https://api.weixin.qq.com/sns/oauth2/access_token'
ACCESS_TOKEN_METHOD = 'POST'
REDIRECT_STATE = False
EXTRA_DATA = [
('nickname', 'username'),
('headimgurl', 'profile_image_url'),
]
def get_user_details(self, response):
"""Return user details from Weixin. API URL is:
https://api.weixin.qq.com/sns/userinfo
"""
if self.setting('DOMAIN_AS_USERNAME'):
username = response.get('domain', '')
else:
username = response.get('nickname', '')
return {
'username': username,
'profile_image_url': response.get('headimgurl', '')
}
def user_data(self, access_token, *args, **kwargs):
data = self.get_json('https://api.weixin.qq.com/sns/userinfo', params={
'access_token': access_token,
'openid': kwargs['response']['openid']
})
nickname = data.get('nickname')
if nickname:
# weixin api has some encode bug, here need handle
data['nickname'] = nickname.encode(
'raw_unicode_escape'
).decode('utf-8')
return data
def auth_params(self, state=None):
appid, secret = self.get_key_and_secret()
params = {
'appid': appid,
'redirect_uri': self.get_redirect_uri(state)
}
if self.STATE_PARAMETER and state:
params['state'] = state
if self.RESPONSE_TYPE:
params['response_type'] = self.RESPONSE_TYPE
return params
def auth_complete_params(self, state=None):
appid, secret = self.get_key_and_secret()
return {
'grant_type': 'authorization_code', # request auth code
'code': self.data.get('code', ''), # server response code
'appid': appid,
'secret': secret,
'redirect_uri': self.get_redirect_uri(state)
}
def refresh_token_params(self, token, *args, **kwargs):
appid, secret = self.get_key_and_secret()
return {
'refresh_token': token,
'grant_type': 'refresh_token',
'appid': appid,
'secret': secret
}
def auth_complete(self, *args, **kwargs):
"""Completes loging process, must return user instance"""
self.process_error(self.data)
try:
response = self.request_access_token(
self.ACCESS_TOKEN_URL,
data=self.auth_complete_params(self.validate_state()),
headers=self.auth_headers(),
method=self.ACCESS_TOKEN_METHOD
)
except HTTPError as err:
if err.response.status_code == 400:
raise AuthCanceled(self, response=err.response)
else:
raise
except KeyError:
raise AuthUnknownError(self)
if 'errcode' in response:
raise AuthCanceled(self)
self.process_error(response)
return self.do_auth(response['access_token'], response=response,
*args, **kwargs)
class WeixinOAuth2APP(WeixinOAuth2):
"""
Weixin OAuth authentication backend
Can't use in web, only in weixin app
"""
name = 'weixinapp'
ID_KEY = 'openid'
AUTHORIZATION_URL = 'https://open.weixin.qq.com/connect/oauth2/authorize'
ACCESS_TOKEN_URL = 'https://api.weixin.qq.com/sns/oauth2/access_token'
ACCESS_TOKEN_METHOD = 'POST'
REDIRECT_STATE = False
def auth_url(self):
if self.STATE_PARAMETER or self.REDIRECT_STATE:
# Store state in session for further request validation. The state
# value is passed as state parameter (as specified in OAuth2 spec),
# but also added to redirect, that way we can still verify the
# request if the provider doesn't implement the state parameter.
# Reuse token if any.
name = self.name + '_state'
state = self.strategy.session_get(name)
if state is None:
state = self.state_token()
self.strategy.session_set(name, state)
else:
state = None
params = self.auth_params(state)
params.update(self.get_scope_argument())
params.update(self.auth_extra_arguments())
params = urllib.urlencode(sorted(params.items()))
return '{}#wechat_redirect'.format(
self.AUTHORIZATION_URL + '?' + params
)
def auth_complete_params(self, state=None):
appid, secret = self.get_key_and_secret()
return {
'grant_type': 'authorization_code', # request auth code
'code': self.data.get('code', ''), # server response code
'appid': appid,
'secret': secret,
}
def validate_state(self):
return None
def auth_complete(self, *args, **kwargs):
"""Completes loging process, must return user instance"""
self.process_error(self.data)
try:
response = self.request_access_token(
self.ACCESS_TOKEN_URL,
data=self.auth_complete_params(self.validate_state()),
headers=self.auth_headers(),
method=self.ACCESS_TOKEN_METHOD
)
except HTTPError as err:
if err.response.status_code == 400:
raise AuthCanceled(self)
else:
raise
except KeyError:
raise AuthUnknownError(self)
if 'errcode' in response:
raise AuthCanceled(self)
self.process_error(response)
return self.do_auth(response['access_token'], response=response,
*args, **kwargs)
|