Coverage for control/auth.py : 56%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Authentication
3* User management
4* Identity provider attribute handling
5* Authorization
6"""
8from flask import request, session, abort
9from control.utils import (
10 pick as G,
11 utf8FromLatin1,
12 serverprint,
13 shiftRegional,
14 E,
15 AT,
16 BLANK,
17 WHYPHEN,
18)
19from config import Config as C, Names as N
20from control.html import HtmlElements as H
21from control.perm import (
22 AUTH,
23 UNAUTH,
24 COORD,
25 OFFICE,
26 ROOT,
27 SYSTEM,
28)
30CB = C.base
31CW = C.web
32CP = C.perm
34DEBUG = CB.debug
35DEBUG_AUTH = G(DEBUG, N.auth)
36TRANSPORT_ATTRIBUTES = CB.transportAttributes
37SHIB_KEY = CB.shibKey
38ATTRIBUTES = CB.attributes
40LIMIT_JSON = CW.limitJson
42Qc = H.icon(CW.unknown[N.country], asChar=True)
43Qu = H.icon(CW.unknown[N.user], asChar=True)
44Qg = H.icon(CW.unknown[N.group], asChar=True)
47class Auth:
48 """Deal with user Authentication.
50 Facilitates the login/logout process of users.
51 Maintains the attributes that the DARIAH Identity Provider supplies about users.
52 """
54 def __init__(self, db, regime):
55 """## Initialization
57 Include a handle to `control.db.Db` into the
58 attributes.
60 Parameters
61 ----------
62 db: object
63 See below.
64 """
66 self.db = db
67 """*object* The `control.db.Db` singleton
69 Provides methods to retrieve user
70 info from the database and store user info there.
71 """
73 permissionGroupInv = db.permissionGroupInv
75 # determine production or devel
76 self.isDevel = regime == N.development
77 """*boolean* Whether the server runs in production or in development.
79 In production we use the DARIAH Identity provider,
80 while in development we use a simple, console-based way of
81 logging a few test users in.
82 """
84 self.authority = N.local if self.isDevel else N.DARIAH
85 """*string* The name of the authority that identifies users.
87 In production it is "DARIAH", which stands for the DARIAH Identity Provider.
88 In development it is "local".
89 """
91 self.authId = G(permissionGroupInv, AUTH)
92 """*string* The groupId of the `auth` permission group.
93 """
95 self.authUser = {N.group: self.authId}
96 """*string* Info of the `auth` permission group.
97 """
99 self.unauthId = G(permissionGroupInv, UNAUTH)
100 """*string* The groupId of the `public` permission group.
101 """
103 self.unauthUser = {N.group: self.unauthId}
104 """*string* Info of the `public` permission group.
105 """
107 self.user = {}
108 """*dict* The attributes of the currently logged in user."""
110 def clearUser(self):
111 """Forgets the currently logged in user.
113 The attributes in the `user` attribute will be cleared and attributes
114 for an unauthenticated user will take their place.
115 """
117 user = self.user
118 user.clear()
119 user.update(self.unauthUser)
121 def getUser(self, eppn, email=None, mayCreate=False):
122 """Find a user in the database.
124 This is called to get extra information for an authenticated user
125 from the database.
126 The resulting data will be stored in the `user` attribute of Auth.
128 !!! caution
129 Even if the user can be found, the attribute `mayLogin`
130 might be false, in which case it will be prevented to log in that user.
132 !!! tip
133 When assigning reviewers, office users may select people who are not yet
134 known to the contrib tool by specifying their email address.
135 When such users log in for the first time, their `eppn` and other
136 attributes become known, and are merged into a record in the user table.
138 Parameters
139 ----------
140 eppn: string
141 The unique identifier of a user as assigned by the DARIAH identity provider.
142 email: string, optional `None`
143 New users may not have an eppn, but might already be present in the
144 user table by their email.
145 If so, the email address can be used to look up the user.
146 mayCreate: boolean, optional `False`
147 If a user cannot be found, they will be created if this flag is `True`.
148 This is relevant for situation where a new user has been authenticated
149 by the identity provider.
151 Returns
152 -------
153 boolean
154 Whether a user was authenticated and logged in.
155 The attributes retrieved from the database will be merged into
156 the `user` attribute.
157 If no user was logged in, the `user` attribute will be filled with
158 info that says that the current user is the public and nothing more.
159 """
161 user = self.user
162 db = self.db
163 authority = self.authority
164 authId = self.authId
166 userFound = [
167 record
168 for record in db.user.values()
169 if (
170 G(record, N.authority) == authority
171 and (
172 (eppn is not None and G(record, N.eppn) == eppn)
173 or (
174 eppn is None
175 and email is not None
176 and G(record, N.eppn) is None
177 and G(record, N.email) == email
178 )
179 )
180 )
181 ]
182 user.clear()
183 if len(userFound) > 1: 183 ↛ 184line 183 didn't jump to line 184, because the condition on line 183 was never true
184 self.clearUser()
185 if DEBUG_AUTH:
186 serverprint(f"LOGIN: multiple matches in user DB: {eppn} / {email}")
187 return False
189 if len(userFound) == 1:
190 if not G(userFound[0], N.mayLogin, default=True): 190 ↛ 191line 190 didn't jump to line 191, because the condition on line 190 was never true
191 self.clearUser()
192 if DEBUG_AUTH:
193 serverprint(f"LOGIN: existing user may not login: {eppn} / {email}")
194 return False
196 user.update({N.eppn: eppn, N.authority: authority})
197 if email: 197 ↛ 198line 197 didn't jump to line 198, because the condition on line 197 was never true
198 user[N.email] = email
200 if len(userFound) == 0:
201 if mayCreate: 201 ↛ 202line 201 didn't jump to line 202, because the condition on line 201 was never true
202 db.insertUser(user)
203 if DEBUG_AUTH:
204 serverprint(f"LOGIN: new user: {eppn} / {email}")
205 else:
206 if DEBUG_AUTH: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true
207 serverprint(f"LOGIN: may not create new user: {eppn} / {email}")
208 return False
209 else:
210 user.update(userFound[0])
211 if DEBUG_AUTH: 211 ↛ 212line 211 didn't jump to line 212, because the condition on line 211 was never true
212 serverprint(f"LOGIN: existing user: {eppn} / {email}")
214 # new users do not have yet group information
215 group = user[N.group] if N.group in user else authId
216 if N.group not in user: 216 ↛ 217line 216 didn't jump to line 217, because the condition on line 216 was never true
217 user[N.group] = group
219 groupRep = G(G(db.permissionGroup, group), N.rep)
220 result = groupRep != UNAUTH
221 if DEBUG_AUTH: 221 ↛ 222line 221 didn't jump to line 222, because the condition on line 221 was never true
222 if result:
223 serverprint(f"LOGIN: user authenticated: {eppn} / {email}")
224 else:
225 serverprint(f"LOGIN: user not authenticated: {eppn} / {email}")
226 return result
228 def wrapTestUsers(self):
229 """Present a widget to select a test user for login.
231 !!! caution
232 In production this will do nothing.
233 Only in development mode one can select a test user.
234 """
235 if not self.isDevel: 235 ↛ 236line 235 didn't jump to line 236, because the condition on line 235 was never true
236 return E
238 db = self.db
240 testUsers = {
241 record[N.eppn]: record
242 for record in db.user.values()
243 if N.eppn in record and G(record, N.authority) == N.local
244 }
245 return H.join(
246 [
247 H.div(H.a(u, href=f"/login?eppn={u}", cls="button small"))
248 for u in testUsers
249 ]
250 + [
251 H.div(
252 H.input(
253 E,
254 placeholder="email",
255 onchange="window.location.href=`/login?email=${this.value}`",
256 )
257 )
258 ]
259 )
261 def checkLogin(self):
262 """Checks for a currently logged in user and sets `user` accordingly.
264 This happens after a login action and is meant to adapt the `user` attribute
265 to a newly logged-in user.
267 Returns
268 -------
269 Whether an authenticated user has just logged in.
270 """
272 db = self.db
273 user = self.user
274 isDevel = self.isDevel
275 unauthUser = self.unauthUser
277 contentLength = request.content_length
278 if contentLength is not None and contentLength > LIMIT_JSON: 278 ↛ 279line 278 didn't jump to line 279, because the condition on line 278 was never true
279 abort(400)
280 authEnv = (
281 {
282 k[4:].lower(): utf8FromLatin1(v)
283 for (k, v) in request.environ.items()
284 if k.startswith("""AJP_""")
285 }
286 if TRANSPORT_ATTRIBUTES == N.ajp
287 else {k.lower(): utf8FromLatin1(v) for (k, v) in request.headers}
288 if TRANSPORT_ATTRIBUTES == N.http
289 else {k.lower(): utf8FromLatin1(v) for (k, v) in request.environ.items()}
290 )
291 if DEBUG_AUTH: 291 ↛ 292line 291 didn't jump to line 292, because the condition on line 291 was never true
292 serverprint("LOGIN: auth environment/headers")
293 for (k, v) in authEnv.items():
294 serverprint(f"LOGIN: ATTRIBUTE {k} = {v}")
295 self.clearUser()
296 if isDevel: 296 ↛ 323line 296 didn't jump to line 323, because the condition on line 296 was never false
297 if DEBUG_AUTH: 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true
298 serverprint("LOGIN: start authentication in development mode")
299 eppn = G(request.args, N.eppn)
300 email = None
301 if eppn is None: 301 ↛ 302line 301 didn't jump to line 302, because the condition on line 301 was never true
302 email = G(request.args, N.email) or E
303 if AT in email:
304 eppn = email.split(AT, maxsplit=1)[0]
305 if eppn:
306 if DEBUG_AUTH:
307 serverprint(
308 f"LOGIN: authentication succeeded: {eppn} / {email}"
309 )
310 return self.getUser(eppn, email=email, mayCreate=True)
311 user.update(unauthUser)
312 if DEBUG_AUTH:
313 serverprint("LOGIN: authentication failed: no eppn, no email")
314 return False
315 result = self.getUser(eppn, mayCreate=False)
316 if DEBUG_AUTH: 316 ↛ 317line 316 didn't jump to line 317, because the condition on line 316 was never true
317 if result:
318 serverprint("LOGIN: authentication successful")
319 else:
320 serverprint("LOGIN: authentication failed")
321 return result
322 else:
323 if DEBUG_AUTH:
324 serverprint("LOGIN: start authentication with shibboleth")
325 authenticated = G(authEnv, SHIB_KEY)
326 if authenticated:
327 eppn = G(authEnv, N.eppn)
328 email = G(authEnv, N.mail)
329 isUser = self.getUser(eppn, email=email, mayCreate=True)
330 if DEBUG_AUTH:
331 serverprint("""LOGIN: shibboleth session found:""")
332 serverprint(f"""LOGIN: eppn = "{eppn}" """)
333 serverprint(f"""LOGIN: email = "{email}" """)
334 serverprint(f"""LOGIN: isUser = "{isUser}" """)
335 if not isUser:
336 # the user is refused because the database says (s)he may not login
337 self.clearUser()
338 if DEBUG_AUTH:
339 serverprint("LOGIN: authentication failed")
340 return False
342 # process the attributes provided by the identity server
343 # they may have been changed after the last login
344 attributes = {
345 toolKey: G(authEnv, envKey, default=E)
346 for (envKey, toolKey) in ATTRIBUTES.items()
347 if envKey in authEnv
348 }
349 dirty = False
350 for (att, val) in attributes.items():
351 currentVal = G(user, att)
352 if currentVal != val:
353 user[att] = val
354 dirty = True
355 if dirty:
356 db.updateUser(user)
357 if DEBUG_AUTH:
358 serverprint(f"LOGIN: user data updated for {eppn}/{email}")
359 if DEBUG_AUTH:
360 serverprint("LOGIN: authentication successful")
361 return True
363 user.update(unauthUser)
364 if DEBUG_AUTH:
365 serverprint("LOGIN: No shibboleth session found:")
366 serverprint("LOGIN: authentication failed")
367 return False
369 def countryRep(self, user=None):
370 """Provide a short representation of the country of a user.
372 Parameters
373 ----------
374 user: dict, optional `None`
375 The user whose country must be represented.
376 If absent, the currently logged in user will be taken.
378 Returns
379 -------
380 string
381 The representation consists of the 2-letter country code plus
382 a derived two letter unicode character combination that will
383 be turned into a flag of that country.
384 """
386 db = self.db
387 country = db.country
389 if user is None: 389 ↛ 392line 389 didn't jump to line 392, because the condition on line 389 was never false
390 user = self.user
392 countryId = G(user, N.country)
393 countryInfo = G(country, countryId)
394 iso = G(countryInfo, N.iso, default=E)
395 flag = shiftRegional(iso) if iso else Qc
396 countryShort = iso + flag
397 return countryShort
399 def groupRep(self, user=None):
400 """Provide a string representation of the permission group of a user.
402 Parameters
403 ----------
404 user: dict, optional `None`
405 The user whose group must be represented.
406 If absent, the currently logged in user will be taken.
408 Returns
409 -------
410 string
411 """
413 if user is None:
414 user = self.user
416 group = G(user, N.group)
417 if group is None:
418 return UNAUTH
420 db = self.db
421 return G(G(db.permissionGroup, group), N.rep) or UNAUTH
423 def identity(self, user=None, markup=True, withRole=False):
424 """Provide a string representation of the identity of a user.
426 !!! note
427 Care will be taken that to unauthenticated users only
428 limited information about users will be shown.
430 Parameters
431 ----------
432 user: dict, optional `None`
433 The user whose identity must be represented.
434 If absent, the currently logged in user will be taken.
436 Returns
437 -------
438 string
439 """
441 if user is None: 441 ↛ 442line 441 didn't jump to line 442, because the condition on line 441 was never true
442 user = self.user
444 db = self.db
445 # if self.isDevel:
446 if db.test: 446 ↛ 453line 446 didn't jump to line 453, because the condition on line 446 was never false
447 return G(
448 user,
449 N.eppn,
450 default=G(user, N.email, default=(E if markup is None else Qu)),
451 )
453 name = G(user, N.name) or E
454 if not name:
455 firstName = G(user, N.firstName) or E
456 lastName = G(user, N.lastName) or E
457 name = firstName + (BLANK if firstName and lastName else E) + lastName
458 group = self.groupRep() # the power of the currently logged in user!
459 isAuth = group != UNAUTH
460 org = G(user, N.org) or E
461 orgRep = f""" ({org})""" if org else E
462 email = (G(user, N.email) or E) if isAuth else E
463 authority = (G(user, N.authority) or E) if isAuth else E
464 authorityRep = f"""{WHYPHEN}{authority}""" if authority else E
465 eppn = (G(user, N.eppn) or E) if isAuth else E
467 countryShort = self.countryRep(user=user)
469 permGroupRep = E
470 if isAuth and withRole:
471 userGroup = self.groupRep(user=user)
472 permGroupRep = G(CP.roles, userGroup, E)
473 if permGroupRep:
474 permGroupRep = f"{permGroupRep}: "
476 identityRep = (
477 permGroupRep
478 + (
479 f"""{name}{orgRep}"""
480 if name
481 else f"""{email}{orgRep}"""
482 if email
483 else f"""{eppn}{authorityRep}"""
484 if eppn
485 else Qu
486 )
487 + """ from """
488 + (countryShort)
489 )
490 return identityRep
492 def credentials(self):
493 """Provide a string representation of the identity and permissions of a user.
495 This is used to present the currently logged in user on the interface.
497 !!! note
498 Care will be taken that to unauthenticated users only
499 limited information about users will be shown.
501 Returns
502 -------
503 string
504 identity
505 string
506 group description
507 """
509 db = self.db
510 user = self.user
512 group = self.groupRep()
513 permissionGroupDesc = db.permissionGroupDesc
514 groupDesc = G(permissionGroupDesc, group) or Qg
515 if group == COORD:
516 country = self.countryRep()
517 groupDesc += f"-{country}"
519 if group == UNAUTH:
520 return (N.Guest, groupDesc)
522 identityRep = self.identity(user)
524 return (identityRep, groupDesc)
526 def nameEmail(self, user=None):
527 """Provide a string representation of the name and email of the user.
529 !!! note
530 Care will be taken that to unauthenticated users only
531 limited information about users will be shown.
533 Parameters
534 ----------
535 user: dict, optional `None`
536 The user whose identity must be represented.
537 If absent, the currently logged in user will be taken.
539 Returns
540 -------
541 string
542 """
544 if user is None: 544 ↛ 547line 544 didn't jump to line 547, because the condition on line 544 was never false
545 user = self.user
547 name = G(user, N.name) or E
548 if not name: 548 ↛ 552line 548 didn't jump to line 552, because the condition on line 548 was never false
549 firstName = G(user, N.firstName) or E
550 lastName = G(user, N.lastName) or E
551 name = firstName + (BLANK if firstName and lastName else E) + lastName
552 group = self.groupRep(user=user)
553 isAuth = group != UNAUTH
554 email = (G(user, N.email) or E) if isAuth else E
555 return (name, email)
557 def authenticate(self, login=False):
558 """Verify the authenticated status of the current user.
560 This function is called for every request that requires authentication.
561 Whether a user is authenticated or not depends on whether a session for
562 that user is present. And that depends on whether the identity provider
563 has sent attributes (eppn and others) to the server.
565 The data in the `user` attribute will be cleared if there is
566 an authenticated user. Subsequent methods that ask for the uid of
567 the currennt user will get nothing if there is no authenticated user.
568 If there is an authenticated user, and `login=False`, his/her data
569 are not loaded into the `user` attribute.
571 Parameters
572 ----------
573 login: boolean, optional `False`
574 Pass `True` in order to verify/update a user that has just logged in.
575 The data in the `user` attribute will be updated with his/her
576 data. The user table in the database will be updated if the
577 identity provider has given updated attributed for that user.
579 Returns
580 -------
581 boolean
582 Whether the current user is authenticated.
583 """
585 user = self.user
587 # if login=True we want to log the user in
588 # if login=False we only want the current user information
590 if login:
591 session.pop(N.eppn, None)
592 if self.checkLogin():
593 # in this case there is an eppn
594 session[N.eppn] = G(user, N.eppn)
595 return True
596 return False
598 eppn = G(session, N.eppn)
599 if eppn:
600 if not self.getUser(eppn, mayCreate=False): 600 ↛ 601line 600 didn't jump to line 601, because the condition on line 600 was never true
601 self.clearUser()
602 return False
603 return True
605 self.clearUser()
606 return False
608 def deauthenticate(self):
609 """Log out the current user.
611 Returns
612 -------
613 void
614 If there is a logged in authenticated user, his/her data will be
615 cleared and the session will be deleted.
616 """
618 self.clearUser()
619 session.pop(N.eppn, None)
621 def authenticated(self):
622 """Is the current user authenticated?
624 Returns
625 -------
626 boolean
627 """
629 groupRep = self.groupRep()
630 return groupRep != UNAUTH
632 def coordinator(self, countryId=None):
633 """Is the current user a national coordinator?
635 !!! note
636 On the overview page, we display contributions of many countries.
637 If a National Coordinator is logged in, (s)he will see the coutributions
638 of his/her country in greater detail, but not those of other countries.
640 Parameters
641 ----------
642 countryId: dict, optional `None`
643 If passed, it is the country of which the currently logged in
644 user is supposed to be National Coordinator.
645 Otherwise, the country of the logged in user will be used.
647 Returns
648 -------
649 boolean
650 """
652 groupRep = self.groupRep()
653 user = self.user
654 uCountry = G(user, N.country)
655 isCoord = groupRep == COORD
656 return isCoord and (countryId is None or uCountry == countryId)
658 def officeuser(self):
659 """Is the current user a backoffice user?
661 Returns
662 -------
663 boolean
664 """
666 groupRep = self.groupRep()
667 return groupRep == OFFICE
669 def superuser(self):
670 """Is the current user a super user?
672 Superusers are backoffice users, sysadmins and root.
674 Returns
675 -------
676 boolean
677 """
679 groupRep = self.groupRep()
680 return groupRep in {OFFICE, SYSTEM, ROOT}
682 def sysadmin(self):
683 """Is the current user a system administrator?
685 Returns
686 -------
687 boolean
688 """
690 groupRep = self.groupRep()
691 return groupRep in {SYSTEM, ROOT}
693 def country(self):
694 """The full country record of the currently logged in user.
696 !!! hint
697 This function is used to get the country on the Sidebar.
699 Returns
700 -------
701 dict
702 """
704 db = self.db
705 user = self.user
706 country = db.country
708 countryId = G(user, N.country)
709 return G(country, countryId, default={})