Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Authentication 

2 

3* User management 

4* Identity provider attribute handling 

5* Authorization 

6""" 

7 

8from flask import request, session, abort 

9from control.utils import ( 

10 pick as G, 

11 utf8FromLatin1, 

12 serverprint, 

13 shiftRegional, 

14 E, 

15 AT, 

16 BLANK, 

17 WHYPHEN, 

18) 

19from config import Config as C, Names as N 

20from control.html import HtmlElements as H 

21from control.perm import ( 

22 AUTH, 

23 UNAUTH, 

24 COORD, 

25 OFFICE, 

26 ROOT, 

27 SYSTEM, 

28) 

29 

30CB = C.base 

31CW = C.web 

32CP = C.perm 

33 

34DEBUG = CB.debug 

35DEBUG_AUTH = G(DEBUG, N.auth) 

36TRANSPORT_ATTRIBUTES = CB.transportAttributes 

37SHIB_KEY = CB.shibKey 

38ATTRIBUTES = CB.attributes 

39 

40LIMIT_JSON = CW.limitJson 

41 

42Qc = H.icon(CW.unknown[N.country], asChar=True) 

43Qu = H.icon(CW.unknown[N.user], asChar=True) 

44Qg = H.icon(CW.unknown[N.group], asChar=True) 

45 

46 

47class Auth: 

48 """Deal with user Authentication. 

49 

50 Facilitates the login/logout process of users. 

51 Maintains the attributes that the DARIAH Identity Provider supplies about users. 

52 """ 

53 

54 def __init__(self, db, regime): 

55 """## Initialization 

56 

57 Include a handle to `control.db.Db` into the 

58 attributes. 

59 

60 Parameters 

61 ---------- 

62 db: object 

63 See below. 

64 """ 

65 

66 self.db = db 

67 """*object* The `control.db.Db` singleton 

68 

69 Provides methods to retrieve user 

70 info from the database and store user info there. 

71 """ 

72 

73 permissionGroupInv = db.permissionGroupInv 

74 

75 # determine production or devel 

76 self.isDevel = regime == N.development 

77 """*boolean* Whether the server runs in production or in development. 

78 

79 In production we use the DARIAH Identity provider, 

80 while in development we use a simple, console-based way of 

81 logging a few test users in. 

82 """ 

83 

84 self.authority = N.local if self.isDevel else N.DARIAH 

85 """*string* The name of the authority that identifies users. 

86 

87 In production it is "DARIAH", which stands for the DARIAH Identity Provider. 

88 In development it is "local". 

89 """ 

90 

91 self.authId = G(permissionGroupInv, AUTH) 

92 """*string* The groupId of the `auth` permission group. 

93 """ 

94 

95 self.authUser = {N.group: self.authId} 

96 """*string* Info of the `auth` permission group. 

97 """ 

98 

99 self.unauthId = G(permissionGroupInv, UNAUTH) 

100 """*string* The groupId of the `public` permission group. 

101 """ 

102 

103 self.unauthUser = {N.group: self.unauthId} 

104 """*string* Info of the `public` permission group. 

105 """ 

106 

107 self.user = {} 

108 """*dict* The attributes of the currently logged in user.""" 

109 

110 def clearUser(self): 

111 """Forgets the currently logged in user. 

112 

113 The attributes in the `user` attribute will be cleared and attributes 

114 for an unauthenticated user will take their place. 

115 """ 

116 

117 user = self.user 

118 user.clear() 

119 user.update(self.unauthUser) 

120 

121 def getUser(self, eppn, email=None, mayCreate=False): 

122 """Find a user in the database. 

123 

124 This is called to get extra information for an authenticated user 

125 from the database. 

126 The resulting data will be stored in the `user` attribute of Auth. 

127 

128 !!! caution 

129 Even if the user can be found, the attribute `mayLogin` 

130 might be false, in which case it will be prevented to log in that user. 

131 

132 !!! tip 

133 When assigning reviewers, office users may select people who are not yet 

134 known to the contrib tool by specifying their email address. 

135 When such users log in for the first time, their `eppn` and other 

136 attributes become known, and are merged into a record in the user table. 

137 

138 Parameters 

139 ---------- 

140 eppn: string 

141 The unique identifier of a user as assigned by the DARIAH identity provider. 

142 email: string, optional `None` 

143 New users may not have an eppn, but might already be present in the 

144 user table by their email. 

145 If so, the email address can be used to look up the user. 

146 mayCreate: boolean, optional `False` 

147 If a user cannot be found, they will be created if this flag is `True`. 

148 This is relevant for situation where a new user has been authenticated 

149 by the identity provider. 

150 

151 Returns 

152 ------- 

153 boolean 

154 Whether a user was authenticated and logged in. 

155 The attributes retrieved from the database will be merged into 

156 the `user` attribute. 

157 If no user was logged in, the `user` attribute will be filled with 

158 info that says that the current user is the public and nothing more. 

159 """ 

160 

161 user = self.user 

162 db = self.db 

163 authority = self.authority 

164 authId = self.authId 

165 

166 userFound = [ 

167 record 

168 for record in db.user.values() 

169 if ( 

170 G(record, N.authority) == authority 

171 and ( 

172 (eppn is not None and G(record, N.eppn) == eppn) 

173 or ( 

174 eppn is None 

175 and email is not None 

176 and G(record, N.eppn) is None 

177 and G(record, N.email) == email 

178 ) 

179 ) 

180 ) 

181 ] 

182 user.clear() 

183 if len(userFound) > 1: 183 ↛ 184line 183 didn't jump to line 184, because the condition on line 183 was never true

184 self.clearUser() 

185 if DEBUG_AUTH: 

186 serverprint(f"LOGIN: multiple matches in user DB: {eppn} / {email}") 

187 return False 

188 

189 if len(userFound) == 1: 

190 if not G(userFound[0], N.mayLogin, default=True): 190 ↛ 191line 190 didn't jump to line 191, because the condition on line 190 was never true

191 self.clearUser() 

192 if DEBUG_AUTH: 

193 serverprint(f"LOGIN: existing user may not login: {eppn} / {email}") 

194 return False 

195 

196 user.update({N.eppn: eppn, N.authority: authority}) 

197 if email: 197 ↛ 198line 197 didn't jump to line 198, because the condition on line 197 was never true

198 user[N.email] = email 

199 

200 if len(userFound) == 0: 

201 if mayCreate: 201 ↛ 202line 201 didn't jump to line 202, because the condition on line 201 was never true

202 db.insertUser(user) 

203 if DEBUG_AUTH: 

204 serverprint(f"LOGIN: new user: {eppn} / {email}") 

205 else: 

206 if DEBUG_AUTH: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true

207 serverprint(f"LOGIN: may not create new user: {eppn} / {email}") 

208 return False 

209 else: 

210 user.update(userFound[0]) 

211 if DEBUG_AUTH: 211 ↛ 212line 211 didn't jump to line 212, because the condition on line 211 was never true

212 serverprint(f"LOGIN: existing user: {eppn} / {email}") 

213 

214 # new users do not have yet group information 

215 group = user[N.group] if N.group in user else authId 

216 if N.group not in user: 216 ↛ 217line 216 didn't jump to line 217, because the condition on line 216 was never true

217 user[N.group] = group 

218 

219 groupRep = G(G(db.permissionGroup, group), N.rep) 

220 result = groupRep != UNAUTH 

221 if DEBUG_AUTH: 221 ↛ 222line 221 didn't jump to line 222, because the condition on line 221 was never true

222 if result: 

223 serverprint(f"LOGIN: user authenticated: {eppn} / {email}") 

224 else: 

225 serverprint(f"LOGIN: user not authenticated: {eppn} / {email}") 

226 return result 

227 

228 def wrapTestUsers(self): 

229 """Present a widget to select a test user for login. 

230 

231 !!! caution 

232 In production this will do nothing. 

233 Only in development mode one can select a test user. 

234 """ 

235 if not self.isDevel: 235 ↛ 236line 235 didn't jump to line 236, because the condition on line 235 was never true

236 return E 

237 

238 db = self.db 

239 

240 testUsers = { 

241 record[N.eppn]: record 

242 for record in db.user.values() 

243 if N.eppn in record and G(record, N.authority) == N.local 

244 } 

245 return H.join( 

246 [ 

247 H.div(H.a(u, href=f"/login?eppn={u}", cls="button small")) 

248 for u in testUsers 

249 ] 

250 + [ 

251 H.div( 

252 H.input( 

253 E, 

254 placeholder="email", 

255 onchange="window.location.href=`/login?email=${this.value}`", 

256 ) 

257 ) 

258 ] 

259 ) 

260 

261 def checkLogin(self): 

262 """Checks for a currently logged in user and sets `user` accordingly. 

263 

264 This happens after a login action and is meant to adapt the `user` attribute 

265 to a newly logged-in user. 

266 

267 Returns 

268 ------- 

269 Whether an authenticated user has just logged in. 

270 """ 

271 

272 db = self.db 

273 user = self.user 

274 isDevel = self.isDevel 

275 unauthUser = self.unauthUser 

276 

277 contentLength = request.content_length 

278 if contentLength is not None and contentLength > LIMIT_JSON: 278 ↛ 279line 278 didn't jump to line 279, because the condition on line 278 was never true

279 abort(400) 

280 authEnv = ( 

281 { 

282 k[4:].lower(): utf8FromLatin1(v) 

283 for (k, v) in request.environ.items() 

284 if k.startswith("""AJP_""") 

285 } 

286 if TRANSPORT_ATTRIBUTES == N.ajp 

287 else {k.lower(): utf8FromLatin1(v) for (k, v) in request.headers} 

288 if TRANSPORT_ATTRIBUTES == N.http 

289 else {k.lower(): utf8FromLatin1(v) for (k, v) in request.environ.items()} 

290 ) 

291 if DEBUG_AUTH: 291 ↛ 292line 291 didn't jump to line 292, because the condition on line 291 was never true

292 serverprint("LOGIN: auth environment/headers") 

293 for (k, v) in authEnv.items(): 

294 serverprint(f"LOGIN: ATTRIBUTE {k} = {v}") 

295 self.clearUser() 

296 if isDevel: 296 ↛ 323line 296 didn't jump to line 323, because the condition on line 296 was never false

297 if DEBUG_AUTH: 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true

298 serverprint("LOGIN: start authentication in development mode") 

299 eppn = G(request.args, N.eppn) 

300 email = None 

301 if eppn is None: 301 ↛ 302line 301 didn't jump to line 302, because the condition on line 301 was never true

302 email = G(request.args, N.email) or E 

303 if AT in email: 

304 eppn = email.split(AT, maxsplit=1)[0] 

305 if eppn: 

306 if DEBUG_AUTH: 

307 serverprint( 

308 f"LOGIN: authentication succeeded: {eppn} / {email}" 

309 ) 

310 return self.getUser(eppn, email=email, mayCreate=True) 

311 user.update(unauthUser) 

312 if DEBUG_AUTH: 

313 serverprint("LOGIN: authentication failed: no eppn, no email") 

314 return False 

315 result = self.getUser(eppn, mayCreate=False) 

316 if DEBUG_AUTH: 316 ↛ 317line 316 didn't jump to line 317, because the condition on line 316 was never true

317 if result: 

318 serverprint("LOGIN: authentication successful") 

319 else: 

320 serverprint("LOGIN: authentication failed") 

321 return result 

322 else: 

323 if DEBUG_AUTH: 

324 serverprint("LOGIN: start authentication with shibboleth") 

325 authenticated = G(authEnv, SHIB_KEY) 

326 if authenticated: 

327 eppn = G(authEnv, N.eppn) 

328 email = G(authEnv, N.mail) 

329 isUser = self.getUser(eppn, email=email, mayCreate=True) 

330 if DEBUG_AUTH: 

331 serverprint("""LOGIN: shibboleth session found:""") 

332 serverprint(f"""LOGIN: eppn = "{eppn}" """) 

333 serverprint(f"""LOGIN: email = "{email}" """) 

334 serverprint(f"""LOGIN: isUser = "{isUser}" """) 

335 if not isUser: 

336 # the user is refused because the database says (s)he may not login 

337 self.clearUser() 

338 if DEBUG_AUTH: 

339 serverprint("LOGIN: authentication failed") 

340 return False 

341 

342 # process the attributes provided by the identity server 

343 # they may have been changed after the last login 

344 attributes = { 

345 toolKey: G(authEnv, envKey, default=E) 

346 for (envKey, toolKey) in ATTRIBUTES.items() 

347 if envKey in authEnv 

348 } 

349 dirty = False 

350 for (att, val) in attributes.items(): 

351 currentVal = G(user, att) 

352 if currentVal != val: 

353 user[att] = val 

354 dirty = True 

355 if dirty: 

356 db.updateUser(user) 

357 if DEBUG_AUTH: 

358 serverprint(f"LOGIN: user data updated for {eppn}/{email}") 

359 if DEBUG_AUTH: 

360 serverprint("LOGIN: authentication successful") 

361 return True 

362 

363 user.update(unauthUser) 

364 if DEBUG_AUTH: 

365 serverprint("LOGIN: No shibboleth session found:") 

366 serverprint("LOGIN: authentication failed") 

367 return False 

368 

369 def countryRep(self, user=None): 

370 """Provide a short representation of the country of a user. 

371 

372 Parameters 

373 ---------- 

374 user: dict, optional `None` 

375 The user whose country must be represented. 

376 If absent, the currently logged in user will be taken. 

377 

378 Returns 

379 ------- 

380 string 

381 The representation consists of the 2-letter country code plus 

382 a derived two letter unicode character combination that will 

383 be turned into a flag of that country. 

384 """ 

385 

386 db = self.db 

387 country = db.country 

388 

389 if user is None: 389 ↛ 392line 389 didn't jump to line 392, because the condition on line 389 was never false

390 user = self.user 

391 

392 countryId = G(user, N.country) 

393 countryInfo = G(country, countryId) 

394 iso = G(countryInfo, N.iso, default=E) 

395 flag = shiftRegional(iso) if iso else Qc 

396 countryShort = iso + flag 

397 return countryShort 

398 

399 def groupRep(self, user=None): 

400 """Provide a string representation of the permission group of a user. 

401 

402 Parameters 

403 ---------- 

404 user: dict, optional `None` 

405 The user whose group must be represented. 

406 If absent, the currently logged in user will be taken. 

407 

408 Returns 

409 ------- 

410 string 

411 """ 

412 

413 if user is None: 

414 user = self.user 

415 

416 group = G(user, N.group) 

417 if group is None: 

418 return UNAUTH 

419 

420 db = self.db 

421 return G(G(db.permissionGroup, group), N.rep) or UNAUTH 

422 

423 def identity(self, user=None, markup=True, withRole=False): 

424 """Provide a string representation of the identity of a user. 

425 

426 !!! note 

427 Care will be taken that to unauthenticated users only 

428 limited information about users will be shown. 

429 

430 Parameters 

431 ---------- 

432 user: dict, optional `None` 

433 The user whose identity must be represented. 

434 If absent, the currently logged in user will be taken. 

435 

436 Returns 

437 ------- 

438 string 

439 """ 

440 

441 if user is None: 441 ↛ 442line 441 didn't jump to line 442, because the condition on line 441 was never true

442 user = self.user 

443 

444 db = self.db 

445 # if self.isDevel: 

446 if db.test: 446 ↛ 453line 446 didn't jump to line 453, because the condition on line 446 was never false

447 return G( 

448 user, 

449 N.eppn, 

450 default=G(user, N.email, default=(E if markup is None else Qu)), 

451 ) 

452 

453 name = G(user, N.name) or E 

454 if not name: 

455 firstName = G(user, N.firstName) or E 

456 lastName = G(user, N.lastName) or E 

457 name = firstName + (BLANK if firstName and lastName else E) + lastName 

458 group = self.groupRep() # the power of the currently logged in user! 

459 isAuth = group != UNAUTH 

460 org = G(user, N.org) or E 

461 orgRep = f""" ({org})""" if org else E 

462 email = (G(user, N.email) or E) if isAuth else E 

463 authority = (G(user, N.authority) or E) if isAuth else E 

464 authorityRep = f"""{WHYPHEN}{authority}""" if authority else E 

465 eppn = (G(user, N.eppn) or E) if isAuth else E 

466 

467 countryShort = self.countryRep(user=user) 

468 

469 permGroupRep = E 

470 if isAuth and withRole: 

471 userGroup = self.groupRep(user=user) 

472 permGroupRep = G(CP.roles, userGroup, E) 

473 if permGroupRep: 

474 permGroupRep = f"{permGroupRep}: " 

475 

476 identityRep = ( 

477 permGroupRep 

478 + ( 

479 f"""{name}{orgRep}""" 

480 if name 

481 else f"""{email}{orgRep}""" 

482 if email 

483 else f"""{eppn}{authorityRep}""" 

484 if eppn 

485 else Qu 

486 ) 

487 + """ from """ 

488 + (countryShort) 

489 ) 

490 return identityRep 

491 

492 def credentials(self): 

493 """Provide a string representation of the identity and permissions of a user. 

494 

495 This is used to present the currently logged in user on the interface. 

496 

497 !!! note 

498 Care will be taken that to unauthenticated users only 

499 limited information about users will be shown. 

500 

501 Returns 

502 ------- 

503 string 

504 identity 

505 string 

506 group description 

507 """ 

508 

509 db = self.db 

510 user = self.user 

511 

512 group = self.groupRep() 

513 permissionGroupDesc = db.permissionGroupDesc 

514 groupDesc = G(permissionGroupDesc, group) or Qg 

515 if group == COORD: 

516 country = self.countryRep() 

517 groupDesc += f"-{country}" 

518 

519 if group == UNAUTH: 

520 return (N.Guest, groupDesc) 

521 

522 identityRep = self.identity(user) 

523 

524 return (identityRep, groupDesc) 

525 

526 def nameEmail(self, user=None): 

527 """Provide a string representation of the name and email of the user. 

528 

529 !!! note 

530 Care will be taken that to unauthenticated users only 

531 limited information about users will be shown. 

532 

533 Parameters 

534 ---------- 

535 user: dict, optional `None` 

536 The user whose identity must be represented. 

537 If absent, the currently logged in user will be taken. 

538 

539 Returns 

540 ------- 

541 string 

542 """ 

543 

544 if user is None: 544 ↛ 547line 544 didn't jump to line 547, because the condition on line 544 was never false

545 user = self.user 

546 

547 name = G(user, N.name) or E 

548 if not name: 548 ↛ 552line 548 didn't jump to line 552, because the condition on line 548 was never false

549 firstName = G(user, N.firstName) or E 

550 lastName = G(user, N.lastName) or E 

551 name = firstName + (BLANK if firstName and lastName else E) + lastName 

552 group = self.groupRep(user=user) 

553 isAuth = group != UNAUTH 

554 email = (G(user, N.email) or E) if isAuth else E 

555 return (name, email) 

556 

557 def authenticate(self, login=False): 

558 """Verify the authenticated status of the current user. 

559 

560 This function is called for every request that requires authentication. 

561 Whether a user is authenticated or not depends on whether a session for 

562 that user is present. And that depends on whether the identity provider 

563 has sent attributes (eppn and others) to the server. 

564 

565 The data in the `user` attribute will be cleared if there is 

566 an authenticated user. Subsequent methods that ask for the uid of 

567 the currennt user will get nothing if there is no authenticated user. 

568 If there is an authenticated user, and `login=False`, his/her data 

569 are not loaded into the `user` attribute. 

570 

571 Parameters 

572 ---------- 

573 login: boolean, optional `False` 

574 Pass `True` in order to verify/update a user that has just logged in. 

575 The data in the `user` attribute will be updated with his/her 

576 data. The user table in the database will be updated if the 

577 identity provider has given updated attributed for that user. 

578 

579 Returns 

580 ------- 

581 boolean 

582 Whether the current user is authenticated. 

583 """ 

584 

585 user = self.user 

586 

587 # if login=True we want to log the user in 

588 # if login=False we only want the current user information 

589 

590 if login: 

591 session.pop(N.eppn, None) 

592 if self.checkLogin(): 

593 # in this case there is an eppn 

594 session[N.eppn] = G(user, N.eppn) 

595 return True 

596 return False 

597 

598 eppn = G(session, N.eppn) 

599 if eppn: 

600 if not self.getUser(eppn, mayCreate=False): 600 ↛ 601line 600 didn't jump to line 601, because the condition on line 600 was never true

601 self.clearUser() 

602 return False 

603 return True 

604 

605 self.clearUser() 

606 return False 

607 

608 def deauthenticate(self): 

609 """Log out the current user. 

610 

611 Returns 

612 ------- 

613 void 

614 If there is a logged in authenticated user, his/her data will be 

615 cleared and the session will be deleted. 

616 """ 

617 

618 self.clearUser() 

619 session.pop(N.eppn, None) 

620 

621 def authenticated(self): 

622 """Is the current user authenticated? 

623 

624 Returns 

625 ------- 

626 boolean 

627 """ 

628 

629 groupRep = self.groupRep() 

630 return groupRep != UNAUTH 

631 

632 def coordinator(self, countryId=None): 

633 """Is the current user a national coordinator? 

634 

635 !!! note 

636 On the overview page, we display contributions of many countries. 

637 If a National Coordinator is logged in, (s)he will see the coutributions 

638 of his/her country in greater detail, but not those of other countries. 

639 

640 Parameters 

641 ---------- 

642 countryId: dict, optional `None` 

643 If passed, it is the country of which the currently logged in 

644 user is supposed to be National Coordinator. 

645 Otherwise, the country of the logged in user will be used. 

646 

647 Returns 

648 ------- 

649 boolean 

650 """ 

651 

652 groupRep = self.groupRep() 

653 user = self.user 

654 uCountry = G(user, N.country) 

655 isCoord = groupRep == COORD 

656 return isCoord and (countryId is None or uCountry == countryId) 

657 

658 def officeuser(self): 

659 """Is the current user a backoffice user? 

660 

661 Returns 

662 ------- 

663 boolean 

664 """ 

665 

666 groupRep = self.groupRep() 

667 return groupRep == OFFICE 

668 

669 def superuser(self): 

670 """Is the current user a super user? 

671 

672 Superusers are backoffice users, sysadmins and root. 

673 

674 Returns 

675 ------- 

676 boolean 

677 """ 

678 

679 groupRep = self.groupRep() 

680 return groupRep in {OFFICE, SYSTEM, ROOT} 

681 

682 def sysadmin(self): 

683 """Is the current user a system administrator? 

684 

685 Returns 

686 ------- 

687 boolean 

688 """ 

689 

690 groupRep = self.groupRep() 

691 return groupRep in {SYSTEM, ROOT} 

692 

693 def country(self): 

694 """The full country record of the currently logged in user. 

695 

696 !!! hint 

697 This function is used to get the country on the Sidebar. 

698 

699 Returns 

700 ------- 

701 dict 

702 """ 

703 

704 db = self.db 

705 user = self.user 

706 country = db.country 

707 

708 countryId = G(user, N.country) 

709 return G(country, countryId, default={})