Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import logging 

2from typing import Dict 

3 

4import requests 

5from django.core.exceptions import ImproperlyConfigured, ValidationError 

6from django.core.validators import URLValidator 

7 

8from discuss_data.dhrep import settings 

9 

10logger = logging.getLogger(__name__) 

11 

12 

13class Token: 

14 """Manage DARIAH-DE Repository token 

15 

16 API Documentation: 

17 https://repository.de.dariah.eu/doc/services/submodules/kolibri/kolibri-dhpublish-service/docs/index.html#api-documentation 

18 

19 Usage: 

20 

21 """ 

22 

23 def __init__( 

24 self, access_token: str, token_type: str, expires_in: str, scope: str 

25 ) -> None: 

26 self._access_token = access_token 

27 self._token_type = token_type 

28 self._expires_in = expires_in 

29 self._scope = scope 

30 

31 @property 

32 def access_token(self) -> str: 

33 return str(self._access_token) 

34 

35 @property 

36 def token_type(self) -> str: 

37 return str(self._token_type) 

38 

39 @property 

40 def expires_in(self) -> str: 

41 return str(self._expires_in) 

42 

43 @property 

44 def scope(self) -> str: 

45 return str(self._scope) 

46 

47 def check(self) -> bool: 

48 """check token validity 

49 

50 :return: true if token is valid, false otherwise 

51 :rtype: bool 

52 

53 """ 

54 response = requests.get( 

55 settings.STORAGE_URL + "auth/info", 

56 headers={"Authorization": self.token_type + " " + self.access_token}, 

57 ).ok 

58 

59 return response 

60 

61 def info(self) -> str: 

62 """inform about token validity 

63 

64 :return: the status of the token: "OK", "Gone" 

65 :rtype: str 

66 """ 

67 response = requests.get( 

68 settings.STORAGE_URL + "auth/info", 

69 headers={"Authorization": self.token_type + " " + self.access_token}, 

70 ).reason 

71 

72 return response 

73 

74 def to_dict(self) -> Dict[str, str]: 

75 """convert a token object to a dictionary 

76 

77 :return: the token as a dictionary 

78 :rtype: dict 

79 """ 

80 return { 

81 "access_token": self.access_token, 

82 "token_type": self.token_type, 

83 "expires_in": self.expires_in, 

84 "scope": self.scope, 

85 } 

86 

87 def __str__(self) -> str: 

88 return self.access_token 

89 

90 

91class TokenHelper: 

92 """Help managing DARIAH-DE Repository token 

93 

94 objects of this class provide access to the token configuration and provide helper 

95 functions to convert from and to Token objects 

96 

97 """ 

98 

99 def __init__(self): 

100 

101 validator = URLValidator(["https"]) 

102 try: 

103 validator(settings.PDP_URL) 

104 validator(settings.PDP_REDIRECT_URI) 

105 except ValidationError as e: 

106 logger.error(e) 

107 raise ImproperlyConfigured 

108 

109 pdp_url = settings.PDP_URL 

110 if not pdp_url.endswith("/"): 

111 pdp_url += "/" 

112 

113 self._request_url = ( 

114 pdp_url 

115 + "oauth2/oauth2/authorize?response_type=token" 

116 + "&client_id=" 

117 + settings.PDP_CLIENT_ID 

118 + "&scope=read,write" 

119 + "&redirect_uri=" 

120 + settings.PDP_REDIRECT_URI 

121 ) 

122 

123 @property 

124 def request_url(self) -> str: 

125 return str(self._request_url) 

126 

127 @staticmethod 

128 def create_token_object(token: str) -> Token: 

129 """takes a token uri fragment and returns a token object 

130 

131 :param token: token uri fragment 

132 :type token: str 

133 :raises InvalidTokenException: the given token fragment can not be converted 

134 into a Token object 

135 :return: the resulting token object 

136 :rtype: Token 

137 

138 """ 

139 

140 d = dict(x.split("=") for x in token.split("&")) 

141 try: 

142 t = Token(d["access_token"], d["token_type"], d["expires_in"], d["scope"]) 

143 except (ValueError, KeyError) as e: 

144 logger.error(e) 

145 raise InvalidTokenException("Invalid Token: " + token) 

146 

147 return t 

148 

149 @staticmethod 

150 def create_token_object_from_dict(token: Dict[str, str]) -> Token: 

151 """takes a token in dict format, e.g. from the session dictionary, 

152 and converts it to a token object 

153 

154 :param token: token dictionary 

155 :type token: str 

156 :return: the resulting Token object 

157 :rtype: Token 

158 

159 """ 

160 

161 try: 

162 t = Token( 

163 token["access_token"], 

164 token["token_type"], 

165 token["expires_in"], 

166 token["scope"], 

167 ) 

168 except (TypeError, KeyError) as e: 

169 logger.error(e) 

170 raise InvalidTokenException(e) 

171 else: 

172 return t 

173 

174 def get_token_from_session(self, request): 

175 """tries to get the token from the session dictionary 

176 

177 :param request: the view request 

178 :type request: HttpRequest 

179 

180 :return: a token object if there is one stored in the session 

181 :rtype: Token Only, if there is a token stored, None otherwise. 

182 

183 """ 

184 if request.session.get("token", None): 

185 try: 

186 t = self.create_token_object_from_dict(request.session.get("token")) 

187 

188 except InvalidTokenException: 

189 return None 

190 else: 

191 return t 

192 

193 return None 

194 

195 

196class InvalidTokenException(Exception): 

197 """Thrown when there is something wrong with the token"""