Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Authentication 

2 

3* User management 

4* Identity provider attribute handling 

5* Authorization 

6""" 

7 

8from flask import request, session, abort 

9from control.utils import ( 

10 pick as G, 

11 utf8FromLatin1, 

12 serverprint, 

13 shiftRegional, 

14 E, 

15 AT, 

16 BLANK, 

17 WHYPHEN, 

18) 

19from config import Config as C, Names as N 

20from control.html import HtmlElements as H 

21from control.perm import ( 

22 AUTH, 

23 UNAUTH, 

24 COORD, 

25 OFFICE, 

26 ROOT, 

27 SYSTEM, 

28) 

29 

30CB = C.base 

31CW = C.web 

32 

33DEBUG = CB.debug 

34DEBUG_AUTH = G(DEBUG, N.auth) 

35TRANSPORT_ATTRIBUTES = CB.transportAttributes 

36SHIB_KEY = CB.shibKey 

37ATTRIBUTES = CB.attributes 

38 

39LIMITS = CW.limits 

40LIMIT_JSON = G(LIMITS, N.json, default=1000000) 

41 

42Qc = H.icon(CW.unknown[N.country], asChar=True) 

43Qu = H.icon(CW.unknown[N.user], asChar=True) 

44Qg = H.icon(CW.unknown[N.group], asChar=True) 

45 

46 

47class Auth: 

48 """Deal with user Authentication. 

49 

50 Facilitates the login/logout process of users. 

51 Maintains the attributes that the DARIAH Identity Provider supplies about users. 

52 """ 

53 

54 def __init__(self, db, regime): 

55 """## Initialization 

56 

57 Include a handle to `control.db.Db` into the 

58 attributes. 

59 

60 Parameters 

61 ---------- 

62 db: object 

63 See below. 

64 """ 

65 

66 self.db = db 

67 """*object* The `control.db.Db` singleton 

68 

69 Provides methods to retrieve user 

70 info from the database and store user info there. 

71 """ 

72 

73 permissionGroupInv = db.permissionGroupInv 

74 

75 # determine production or devel 

76 self.isDevel = regime == N.development 

77 """*boolean* Whether the server runs in production or in development. 

78 

79 In production we use the DARIAH Identity provider, 

80 while in development we use a simple, console-based way of 

81 logging a few test users in. 

82 """ 

83 

84 self.authority = N.local if self.isDevel else N.DARIAH 

85 """*string* The name of the authority that identifies users. 

86 

87 In production it is "DARIAH", which stands for the DARIAH Identity Provider. 

88 In development it is "local". 

89 """ 

90 

91 self.authId = G(permissionGroupInv, AUTH) 

92 """*string* The groupId of the `auth` permission group. 

93 """ 

94 

95 self.authUser = {N.group: self.authId} 

96 """*string* Info of the `auth` permission group. 

97 """ 

98 

99 self.unauthId = G(permissionGroupInv, UNAUTH) 

100 """*string* The groupId of the `public` permission group. 

101 """ 

102 

103 self.unauthUser = {N.group: self.unauthId} 

104 """*string* Info of the `public` permission group. 

105 """ 

106 

107 self.user = {} 

108 """*dict* The attributes of the currently logged in user.""" 

109 

110 def clearUser(self): 

111 """Forgets the currently logged in user. 

112 

113 The attributes in the `user` attribute will be cleared and attributes 

114 for an unauthenticated user will take their place. 

115 """ 

116 

117 user = self.user 

118 user.clear() 

119 user.update(self.unauthUser) 

120 

121 def getUser(self, eppn, email=None, mayCreate=False): 

122 """Find a user in the database. 

123 

124 This is called to get extra information for an authenticated user 

125 from the database. 

126 The resulting data will be stored in the `user` attribute of Auth. 

127 

128 !!! caution 

129 Even if the user can be found, the attribute `mayLogin` 

130 might be false, in which case it will be prevented to log in that user. 

131 

132 !!! tip 

133 When assigning reviewers, office users may select people who are not yet 

134 known to the contrib tool by specifying their email address. 

135 When such users log in for the first time, their `eppn` and other 

136 attributes become known, and are merged into a record in the user table. 

137 

138 Parameters 

139 ---------- 

140 eppn: string 

141 The unique identifier of a user as assigned by the DARIAH identity provider. 

142 email: string, optional `None` 

143 New users may not have an eppn, but might already be present in the 

144 user table by their email. 

145 If so, the email address can be used to look up the user. 

146 mayCreate: boolean, optional `False` 

147 If a user cannot be found, they will be created if this flag is `True`. 

148 This is relevant for situation where a new user has been authenticated 

149 by the identity provider. 

150 

151 Returns 

152 ------- 

153 boolean 

154 Whether a user was authenticated and logged in. 

155 The attributes retrieved from the database will be merged into 

156 the `user` attribute. 

157 If no user was logged in, the `user` attribute will be filled with 

158 info that says that the current user is the public and nothing more. 

159 """ 

160 

161 user = self.user 

162 db = self.db 

163 authority = self.authority 

164 authId = self.authId 

165 

166 userFound = [ 

167 record 

168 for record in db.user.values() 

169 if ( 

170 G(record, N.authority) == authority 

171 and ( 

172 (eppn is not None and G(record, N.eppn) == eppn) 

173 or ( 

174 eppn is None 

175 and email is not None 

176 and G(record, N.eppn) is None 

177 and G(record, N.email) == email 

178 ) 

179 ) 

180 ) 

181 ] 

182 user.clear() 

183 if len(userFound) > 1: 183 ↛ 184line 183 didn't jump to line 184, because the condition on line 183 was never true

184 self.clearUser() 

185 if DEBUG_AUTH: 

186 serverprint(f"LOGIN: multiple matches in user DB: {eppn} / {email}") 

187 return False 

188 

189 if len(userFound) == 1: 

190 if not G(userFound[0], N.mayLogin, default=True): 190 ↛ 191line 190 didn't jump to line 191, because the condition on line 190 was never true

191 self.clearUser() 

192 if DEBUG_AUTH: 

193 serverprint(f"LOGIN: existing user may not login: {eppn} / {email}") 

194 return False 

195 

196 user.update({N.eppn: eppn, N.authority: authority}) 

197 if email: 197 ↛ 198line 197 didn't jump to line 198, because the condition on line 197 was never true

198 user[N.email] = email 

199 

200 if len(userFound) == 0: 

201 if mayCreate: 201 ↛ 202line 201 didn't jump to line 202, because the condition on line 201 was never true

202 db.insertUser(user) 

203 if DEBUG_AUTH: 

204 serverprint(f"LOGIN: new user: {eppn} / {email}") 

205 else: 

206 if DEBUG_AUTH: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true

207 serverprint(f"LOGIN: may not create new user: {eppn} / {email}") 

208 return False 

209 else: 

210 user.update(userFound[0]) 

211 if DEBUG_AUTH: 211 ↛ 212line 211 didn't jump to line 212, because the condition on line 211 was never true

212 serverprint(f"LOGIN: existing user: {eppn} / {email}") 

213 

214 # new users do not have yet group information 

215 group = user[N.group] if N.group in user else authId 

216 if N.group not in user: 216 ↛ 217line 216 didn't jump to line 217, because the condition on line 216 was never true

217 user[N.group] = group 

218 

219 groupRep = G(G(db.permissionGroup, group), N.rep) 

220 result = groupRep != UNAUTH 

221 if DEBUG_AUTH: 221 ↛ 222line 221 didn't jump to line 222, because the condition on line 221 was never true

222 if result: 

223 serverprint(f"LOGIN: user authenticated: {eppn} / {email}") 

224 else: 

225 serverprint(f"LOGIN: user not authenticated: {eppn} / {email}") 

226 return result 

227 

228 def wrapTestUsers(self): 

229 """Present a widget to select a test user for login. 

230 

231 !!! caution 

232 In production this will do nothing. 

233 Only in development mode one can select a test user. 

234 """ 

235 if not self.isDevel: 235 ↛ 236line 235 didn't jump to line 236, because the condition on line 235 was never true

236 return E 

237 

238 db = self.db 

239 

240 testUsers = { 

241 record[N.eppn]: record 

242 for record in db.user.values() 

243 if N.eppn in record and G(record, N.authority) == N.local 

244 } 

245 return H.join( 

246 [ 

247 H.div(H.a(u, href=f"/login?eppn={u}", cls="button small")) 

248 for u in testUsers 

249 ] 

250 + [ 

251 H.div( 

252 H.input( 

253 E, 

254 placeholder="email", 

255 onchange="window.location.href=`/login?email=${this.value}`", 

256 ) 

257 ) 

258 ] 

259 ) 

260 

261 def checkLogin(self): 

262 """Checks for a currently logged in user and sets `user` accordingly. 

263 

264 This happens after a login action and is meant to adapt the `user` attribute 

265 to a newly logged-in user. 

266 

267 Returns 

268 ------- 

269 Whether an authenticated user has just logged in. 

270 """ 

271 

272 db = self.db 

273 user = self.user 

274 isDevel = self.isDevel 

275 unauthUser = self.unauthUser 

276 

277 contentLength = request.content_length 

278 if contentLength is not None and contentLength > LIMIT_JSON: 278 ↛ 279line 278 didn't jump to line 279, because the condition on line 278 was never true

279 abort(400) 

280 authEnv = ( 

281 { 

282 k[4:].lower(): utf8FromLatin1(v) 

283 for (k, v) in request.environ.items() 

284 if k.startswith("""AJP_""") 

285 } 

286 if TRANSPORT_ATTRIBUTES == N.ajp 

287 else {k.lower(): utf8FromLatin1(v) for (k, v) in request.headers} 

288 if TRANSPORT_ATTRIBUTES == N.http 

289 else {k.lower(): utf8FromLatin1(v) for (k, v) in request.environ.items()} 

290 ) 

291 if DEBUG_AUTH: 291 ↛ 292line 291 didn't jump to line 292, because the condition on line 291 was never true

292 serverprint("LOGIN: auth environment/headers") 

293 for (k, v) in authEnv.items(): 

294 serverprint(f"LOGIN: ATTRIBUTE {k} = {v}") 

295 self.clearUser() 

296 if isDevel: 296 ↛ 323line 296 didn't jump to line 323, because the condition on line 296 was never false

297 if DEBUG_AUTH: 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true

298 serverprint("LOGIN: start authentication in development mode") 

299 eppn = G(request.args, N.eppn) 

300 email = None 

301 if eppn is None: 301 ↛ 302line 301 didn't jump to line 302, because the condition on line 301 was never true

302 email = G(request.args, N.email) or E 

303 if AT in email: 

304 eppn = email.split(AT, maxsplit=1)[0] 

305 if eppn: 

306 if DEBUG_AUTH: 

307 serverprint( 

308 f"LOGIN: authentication succeeded: {eppn} / {email}" 

309 ) 

310 return self.getUser(eppn, email=email, mayCreate=True) 

311 user.update(unauthUser) 

312 if DEBUG_AUTH: 

313 serverprint("LOGIN: authentication failed: no eppn, no email") 

314 return False 

315 result = self.getUser(eppn, mayCreate=False) 

316 if DEBUG_AUTH: 316 ↛ 317line 316 didn't jump to line 317, because the condition on line 316 was never true

317 if result: 

318 serverprint("LOGIN: authentication successful") 

319 else: 

320 serverprint("LOGIN: authentication failed") 

321 return result 

322 else: 

323 if DEBUG_AUTH: 

324 serverprint("LOGIN: start authentication with shibboleth") 

325 authenticated = G(authEnv, SHIB_KEY) 

326 if authenticated: 

327 eppn = G(authEnv, N.eppn) 

328 email = G(authEnv, N.mail) 

329 isUser = self.getUser(eppn, email=email, mayCreate=True) 

330 if DEBUG_AUTH: 

331 serverprint("""LOGIN: shibboleth session found:""") 

332 serverprint(f"""LOGIN: eppn = "{eppn}" """) 

333 serverprint(f"""LOGIN: email = "{email}" """) 

334 serverprint(f"""LOGIN: isUser = "{isUser}" """) 

335 if not isUser: 

336 # the user is refused because the database says (s)he may not login 

337 self.clearUser() 

338 if DEBUG_AUTH: 

339 serverprint("LOGIN: authentication failed") 

340 return False 

341 

342 # process the attributes provided by the identity server 

343 # they may have been changed after the last login 

344 attributes = { 

345 toolKey: G(authEnv, envKey, default=E) 

346 for (envKey, toolKey) in ATTRIBUTES.items() 

347 if envKey in authEnv 

348 } 

349 dirty = False 

350 for (att, val) in attributes.items(): 

351 currentVal = G(user, att) 

352 if currentVal != val: 

353 user[att] = val 

354 dirty = True 

355 if dirty: 

356 db.updateUser(user) 

357 if DEBUG_AUTH: 

358 serverprint(f"LOGIN: user data updated for {eppn}/{email}") 

359 if DEBUG_AUTH: 

360 serverprint("LOGIN: authentication successful") 

361 return True 

362 

363 user.update(unauthUser) 

364 if DEBUG_AUTH: 

365 serverprint("LOGIN: No shibboleth session found:") 

366 serverprint("LOGIN: authentication failed") 

367 return False 

368 

369 def countryRep(self, user=None): 

370 """Provide a short representation of the country of a user. 

371 

372 Parameters 

373 ---------- 

374 user: dict, optional `None` 

375 The user whose country must be represented. 

376 If absent, the currently logged in user will be taken. 

377 

378 Returns 

379 ------- 

380 string 

381 The representation consists of the 2-letter country code plus 

382 a derived two letter unicode character combination that will 

383 be turned into a flag of that country. 

384 """ 

385 

386 db = self.db 

387 country = db.country 

388 

389 if user is None: 389 ↛ 392line 389 didn't jump to line 392, because the condition on line 389 was never false

390 user = self.user 

391 

392 countryId = G(user, N.country) 

393 countryInfo = G(country, countryId) 

394 iso = G(countryInfo, N.iso, default=E) 

395 flag = shiftRegional(iso) if iso else Qc 

396 countryShort = iso + flag 

397 return countryShort 

398 

399 def groupRep(self, user=None): 

400 """Provide a string representation of the permission group of a user. 

401 

402 Parameters 

403 ---------- 

404 user: dict, optional `None` 

405 The user whose group must be represented. 

406 If absent, the currently logged in user will be taken. 

407 

408 Returns 

409 ------- 

410 string 

411 """ 

412 

413 if user is None: 

414 user = self.user 

415 

416 group = G(user, N.group) 

417 if group is None: 

418 return UNAUTH 

419 

420 db = self.db 

421 return G(G(db.permissionGroup, group), N.rep) or UNAUTH 

422 

423 def identity(self, user=None, markup=True): 

424 """Provide a string representation of the identity of a user. 

425 

426 !!! note 

427 Care will be taken that to unauthenticated users only 

428 limited information about users will be shown. 

429 

430 Parameters 

431 ---------- 

432 user: dict, optional `None` 

433 The user whose identity must be represented. 

434 If absent, the currently logged in user will be taken. 

435 

436 Returns 

437 ------- 

438 string 

439 """ 

440 

441 if user is None: 441 ↛ 442line 441 didn't jump to line 442, because the condition on line 441 was never true

442 user = self.user 

443 

444 if self.isDevel: 444 ↛ 451line 444 didn't jump to line 451, because the condition on line 444 was never false

445 return G( 

446 user, 

447 N.eppn, 

448 default=G(user, N.email, default=(E if markup is None else Qu)), 

449 ) 

450 

451 name = G(user, N.name) or E 

452 if not name: 

453 firstName = G(user, N.firstName) or E 

454 lastName = G(user, N.lastName) or E 

455 name = firstName + (BLANK if firstName and lastName else E) + lastName 

456 group = self.groupRep() # the power of the currently logged in user! 

457 isAuth = group != UNAUTH 

458 org = G(user, N.org) or E 

459 orgRep = f""" ({org})""" if org else E 

460 email = (G(user, N.email) or E) if isAuth else E 

461 authority = (G(user, N.authority) or E) if isAuth else E 

462 authorityRep = f"""{WHYPHEN}{authority}""" if authority else E 

463 eppn = (G(user, N.eppn) or E) if isAuth else E 

464 

465 countryShort = self.countryRep(user=user) 

466 

467 identityRep = ( 

468 ( 

469 f"""{name}{orgRep}""" 

470 if name 

471 else f"""{email}{orgRep}""" 

472 if email 

473 else f"""{eppn}{authorityRep}""" 

474 if eppn 

475 else Qu 

476 ) 

477 + """ from """ 

478 + (countryShort) 

479 ) 

480 return identityRep 

481 

482 def credentials(self): 

483 """Provide a string representation of the identity and permissions of a user. 

484 

485 This is used to present the currently logged in user on the interface. 

486 

487 !!! note 

488 Care will be taken that to unauthenticated users only 

489 limited information about users will be shown. 

490 

491 Returns 

492 ------- 

493 string 

494 identity 

495 string 

496 group description 

497 """ 

498 

499 db = self.db 

500 user = self.user 

501 

502 group = self.groupRep() 

503 permissionGroupDesc = db.permissionGroupDesc 

504 groupDesc = G(permissionGroupDesc, group) or Qg 

505 if group == COORD: 

506 country = self.countryRep() 

507 groupDesc += f"-{country}" 

508 

509 if group == UNAUTH: 

510 return (N.Guest, groupDesc) 

511 

512 identityRep = self.identity(user) 

513 

514 return (identityRep, groupDesc) 

515 

516 def nameEmail(self, user=None): 

517 """Provide a string representation of the name and email of the user. 

518 

519 !!! note 

520 Care will be taken that to unauthenticated users only 

521 limited information about users will be shown. 

522 

523 Parameters 

524 ---------- 

525 user: dict, optional `None` 

526 The user whose identity must be represented. 

527 If absent, the currently logged in user will be taken. 

528 

529 Returns 

530 ------- 

531 string 

532 """ 

533 

534 if user is None: 534 ↛ 537line 534 didn't jump to line 537, because the condition on line 534 was never false

535 user = self.user 

536 

537 name = G(user, N.name) or E 

538 if not name: 538 ↛ 542line 538 didn't jump to line 542, because the condition on line 538 was never false

539 firstName = G(user, N.firstName) or E 

540 lastName = G(user, N.lastName) or E 

541 name = firstName + (BLANK if firstName and lastName else E) + lastName 

542 group = self.groupRep(user=user) 

543 isAuth = group != UNAUTH 

544 email = (G(user, N.email) or E) if isAuth else E 

545 return (name, email) 

546 

547 def authenticate(self, login=False): 

548 """Verify the authenticated status of the current user. 

549 

550 This function is called for every request that requires authentication. 

551 Whether a user is authenticated or not depends on whether a session for 

552 that user is present. And that depends on whether the identity provider 

553 has sent attributes (eppn and others) to the server. 

554 

555 The data in the `user` attribute will be cleared if there is 

556 an authenticated user. Subsequent methods that ask for the uid of 

557 the currennt user will get nothing if there is no authenticated user. 

558 If there is an authenticated user, and `login=False`, his/her data 

559 are not loaded into the `user` attribute. 

560 

561 Parameters 

562 ---------- 

563 login: boolean, optional `False` 

564 Pass `True` in order to verify/update a user that has just logged in. 

565 The data in the `user` attribute will be updated with his/her 

566 data. The user table in the database will be updated if the 

567 identity provider has given updated attributed for that user. 

568 

569 Returns 

570 ------- 

571 boolean 

572 Whether the current user is authenticated. 

573 """ 

574 

575 user = self.user 

576 

577 # if login=True we want to log the user in 

578 # if login=False we only want the current user information 

579 

580 if login: 

581 session.pop(N.eppn, None) 

582 if self.checkLogin(): 

583 # in this case there is an eppn 

584 session[N.eppn] = G(user, N.eppn) 

585 return True 

586 return False 

587 

588 eppn = G(session, N.eppn) 

589 if eppn: 

590 if not self.getUser(eppn, mayCreate=False): 590 ↛ 591line 590 didn't jump to line 591, because the condition on line 590 was never true

591 self.clearUser() 

592 return False 

593 return True 

594 

595 self.clearUser() 

596 return False 

597 

598 def deauthenticate(self): 

599 """Log out the current user. 

600 

601 Returns 

602 ------- 

603 void 

604 If there is a logged in authenticated user, his/her data will be 

605 cleared and the session will be deleted. 

606 """ 

607 

608 self.clearUser() 

609 session.pop(N.eppn, None) 

610 

611 def authenticated(self): 

612 """Is the current user authenticated? 

613 

614 Returns 

615 ------- 

616 boolean 

617 """ 

618 

619 groupRep = self.groupRep() 

620 return groupRep != UNAUTH 

621 

622 def coordinator(self, countryId=None): 

623 """Is the current user a national coordinator? 

624 

625 !!! note 

626 On the overview page, we display contributions of many countries. 

627 If a National Coordinator is logged in, (s)he will see the coutributions 

628 of his/her country in greater detail, but not those of other countries. 

629 

630 Parameters 

631 ---------- 

632 countryId: dict, optional `None` 

633 If passed, it is the country of which the currently logged in 

634 user is supposed to be National Coordinator. 

635 Otherwise, the country of the logged in user will be used. 

636 

637 Returns 

638 ------- 

639 boolean 

640 """ 

641 

642 groupRep = self.groupRep() 

643 user = self.user 

644 uCountry = G(user, N.country) 

645 isCoord = groupRep == COORD 

646 return isCoord and (countryId is None or uCountry == countryId) 

647 

648 def officeuser(self): 

649 """Is the current user a backoffice user? 

650 

651 Returns 

652 ------- 

653 boolean 

654 """ 

655 

656 groupRep = self.groupRep() 

657 return groupRep == OFFICE 

658 

659 def superuser(self): 

660 """Is the current user a super user? 

661 

662 Superusers are backoffice users, sysadmins and root. 

663 

664 Returns 

665 ------- 

666 boolean 

667 """ 

668 

669 groupRep = self.groupRep() 

670 return groupRep in {OFFICE, SYSTEM, ROOT} 

671 

672 def sysadmin(self): 

673 """Is the current user a system administrator? 

674 

675 Returns 

676 ------- 

677 boolean 

678 """ 

679 

680 groupRep = self.groupRep() 

681 return groupRep in {SYSTEM, ROOT} 

682 

683 def country(self): 

684 """The full country record of the currently logged in user. 

685 

686 !!! hint 

687 This function is used to get the country on the Sidebar. 

688 

689 Returns 

690 ------- 

691 dict 

692 """ 

693 

694 db = self.db 

695 user = self.user 

696 country = db.country 

697 

698 countryId = G(user, N.country) 

699 return G(country, countryId, default={})