Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Things that do not find a more logical place. 

2 

3* Utitility functions 

4* Character constants 

5""" 

6 

7import sys 

8import re 

9import json 

10from json import JSONEncoder 

11from bson.objectid import ObjectId 

12 

13from base64 import b64encode, b64decode 

14from datetime import datetime as dt 

15from flask import request 

16 

17 

18REGION_SHIFT = 0x1F1E6 - ord("A") 

19"""Offset of the Unicode position where flag symbols start w.r.t. to `'A'`.""" 

20 

21ISO_DTP = """%Y-%m-%dT%H:%M:%S.%f""" 

22ISO_DT = """%Y-%m-%dT%H:%M:%S""" 

23ISO_D = """%Y-%m-%d""" 

24 

25E = "" 

26BLANK = " " 

27COMMA = "," 

28COLON = ":" 

29DOT = "." 

30PIPE = "|" 

31T = "T" 

32Z = "Z" 

33AT = "@" 

34EURO = "€" 

35MINONE = "-1" 

36ZERO = "0" 

37ONE = "1" 

38TWO = "2" 

39THREE = "3" 

40SLASH = "/" 

41LOW = "_" 

42AMP = "&" 

43LT = "<" 

44APOS = "'" 

45QUOT = '"' 

46DOLLAR = "$" 

47Q = "?" 

48S = "s" 

49 

50NL = "\n" 

51TAB = "\t" 

52LINE_SEP = "§" 

53MIDDLE_DOT = "\u00b7" 

54 

55PLUS = "+" 

56MIN = "-" 

57HYPHEN = "-" 

58WHYPHEN = " - " 

59ELLIPS = "..." 

60ON = " on " 

61 

62NBSP = "&#xa;" 

63 

64LATIN1 = "latin1" 

65UTF8 = "utf8" 

66 

67EMPTY_DATE = "1900-01-01T00:00:00Z" 

68 

69ITER = "__iter__" 

70 

71 

72class MongoJSONEncoder(JSONEncoder): 

73 def __init__(self, *args, **kwargs): 

74 super().__init__(*args, **kwargs) 

75 

76 def default(self, obj): 

77 if isinstance(obj, dt): 

78 return obj.isoformat() 

79 elif isinstance(obj, ObjectId): 

80 return str(obj) 

81 return JSONEncoder.default(self, obj) 

82 

83 

84mjson = MongoJSONEncoder(ensure_ascii=False).encode 

85 

86 

87def mktsv(data): 

88 if data is None: 

89 return "" 

90 

91 allHeaders = set() 

92 for row in data: 

93 allHeaders |= set(row) 

94 allHeaders = sorted(allHeaders) 

95 

96 lines = ["\t".join(allHeaders)] 

97 for row in data: 

98 values = [] 

99 for field in allHeaders: 

100 value = row.get(field, "") 

101 if type(value) in {list, tuple}: 

102 value = MIDDLE_DOT.join(str(v) for v in value) 

103 else: 

104 value = str(value) 

105 value = value.replace("\n", LINE_SEP).replace("\t", " ") 

106 values.append(value) 

107 lines.append("\t".join(values)) 

108 return "\n".join(lines) 

109 

110 

111def factory(name, Base, Deriveds): 

112 """Find the base or derived class by registered name. 

113 

114 Parameters 

115 ---------- 

116 Base: class 

117 Start the lookup here. 

118 Deriveds: iterable of (name, class) 

119 A list of derived classes with their names. 

120 

121 Returns 

122 ------- 

123 class 

124 """ 

125 

126 Derived = Base 

127 for (nm, NmCl) in Deriveds: 

128 if nm == name: 

129 Derived = NmCl 

130 break 

131 

132 return Derived 

133 

134 

135def utf8FromLatin1(s): 

136 """Get Unicode from a latin1 string. 

137 

138 !!! hint 

139 Needed to process the values of environment variables, in particular 

140 those from the identity provider.. 

141 

142 Parameters 

143 ---------- 

144 s: string(latin1) 

145 

146 Returns 

147 ------- 

148 string(utf8) 

149 """ 

150 return str(bytes(s, encoding=LATIN1), encoding=UTF8) 

151 

152 

153def bencode(s): 

154 """Serialize a complex data structure into a plain ASCII string. 

155 

156 !!! hint 

157 Needed to pass the original value into an edit widget, so that the Javascript 

158 has a way to know whether an edited value is dirty or not. 

159 

160 Parameters 

161 ---------- 

162 s: Python value 

163 

164 Returns 

165 ------- 

166 string(ascii) 

167 """ 

168 

169 return b64encode(json.dumps(s, separators=(COMMA, COLON)).encode()).decode() 

170 

171 

172def bdecode(s): 

173 """Interpets a serialized value as a Python value. 

174 

175 Parameters 

176 ---------- 

177 s: string(ascii) 

178 

179 Returns 

180 ------- 

181 Python value. 

182 """ 

183 

184 return json.loads(b64decode(s.encode()).decode()) 

185 

186 

187def cap1(s): 

188 """The first letter capitalized. 

189 

190 Parameters 

191 ---------- 

192 s: string 

193 

194 Returns 

195 ------- 

196 string 

197 """ 

198 

199 return E if not s else s[0].upper() + s[1:] 

200 

201 

202def shiftRegional(iso): 

203 """Transpose iso country code into flag. 

204 

205 By shifting the 2-letter iso country code with a fixed offset, 

206 we get two Unicode characters that browsers know to render as a flag symbol 

207 for that country. 

208 

209 Parameters 

210 ---------- 

211 iso: string 

212 2-letter iso country code. 

213 

214 Returns 

215 ------- 

216 flag:string 

217 2-letter unicode, starting from `control.utils.REGION_SHIFT`. 

218 """ 

219 

220 return E.join(chr(ord(r) + REGION_SHIFT) for r in iso) 

221 

222 

223def now(): 

224 """The current moment in time as a `datetime` value.""" 

225 

226 return dt.utcnow() 

227 

228 

229def thisYear(): 

230 """The current year as number.""" 

231 

232 return dt.utcnow().year 

233 

234 

235def debug(*msg): 

236 """Print a message to the std error immediately.""" 

237 

238 sys.stderr.write(f"""{" ".join(msg)}{NL}""") 

239 sys.stderr.flush() 

240 

241 

242def serverprint(*msg): 

243 """Print a message to the console immediately.""" 

244 

245 sys.stdout.write(f"""{" ".join(msg)}{NL}""") 

246 sys.stdout.flush() 

247 

248 

249def dtm(isostr): 

250 """Get a datetime value from an ISO string representing time.""" 

251 

252 isostr = isostr.rstrip(Z) 

253 try: 

254 date = dt.strptime(isostr, ISO_DTP) 

255 except Exception: 

256 try: 

257 date = dt.strptime(isostr, ISO_DT) 

258 except Exception: 

259 try: 

260 date = dt.strptime(isostr, ISO_D) 

261 except Exception as err: 

262 return (str(err), isostr) 

263 return (E, date) 

264 

265 

266def isIterable(value): 

267 """Whether a value is a non-string iterable. 

268 

269 !!! note 

270 Strings are iterables. 

271 We want to know whether a value is a string or an iterable of strings. 

272 """ 

273 

274 return type(value) is not str and hasattr(value, ITER) 

275 

276 

277def asString(value): 

278 """Join an iterable of strings into a string. 

279 

280 And if the value is already a string, return it, and if it is `None` 

281 return the empty string. 

282 """ 

283 

284 return E if value is None else E.join(value) if isIterable(value) else value 

285 

286 

287def getLast(sequence): 

288 """Get the last element of a sequence or `None` if the sequence is empty.""" 

289 

290 return sequence[-1] if sequence else None 

291 

292 

293def pick(record, field, default=None): 

294 """Get the value for a key in a dict, or None if there is no dict. 

295 

296 !!! warning 

297 But if the value for `field` in the record is `None`, `None` will be returned. 

298 

299 Parameters 

300 ---------- 

301 record: dict | `None` 

302 `pick` should work in both cases. 

303 field: string 

304 The field in `record` we want to extract. 

305 default: mixed 

306 Default value. 

307 

308 Returns 

309 ------- 

310 value | `None` 

311 The value is the default if the record is `None`, or if the record has no 

312 `field`. 

313 Otherwise it is the value for `field` from the record. 

314 """ 

315 

316 return default if record is None else record.get(field, default) 

317 

318 

319def creators(record, creatorField, editorsField): 

320 """List all ids in two fields of a record. 

321 

322 Parameters 

323 ---------- 

324 record: dict 

325 The source record 

326 creatorField: string 

327 The name of a field with a single id value. 

328 editorsFields: string 

329 The name of a field with multiple id values. 

330 

331 Returns 

332 ------- 

333 list 

334 A sorted list of all ids encountered in those fields. 

335 """ 

336 

337 editors = set(pick(record, editorsField, default=[])) 

338 editors.add(pick(record, creatorField)) 

339 return sorted(editors) 

340 

341 

342def filterModified(modified): 

343 """Filter a provenance trail. 

344 

345 The provenance trail is a list of strings shaped as `"actor on date"` corresponding 

346 to changes in a record. 

347 

348 After filtering we retain for each day only the last modification event per person. 

349 """ 

350 

351 logicM = decomposeM(modified) 

352 chunks = perDay(logicM) 

353 thinned = thinM(chunks) 

354 return composeM(thinned) 

355 

356 

357def decomposeM(modified): 

358 """Auxiliary in provenance filtering: split an entry into name and date.""" 

359 

360 splits = [m.rsplit(ON, 1) for m in modified] 

361 return [(m[0], dtm(m[1].replace(BLANK, T))[1]) for m in splits] 

362 

363 

364def trimM(mdt, trim): 

365 """Auxiliary in provenance filtering: trim the secoonds part. 

366 

367 Parameters 

368 ---------- 

369 mdt: string 

370 Modification date in iso shape. 

371 trim: boolean 

372 Whether or not to trim the decimal parts of the seconds aways. 

373 """ 

374 

375 return str(mdt).split(BLANK)[0] if trim == 1 else str(mdt).split(DOT)[0] 

376 

377 

378def composeM(modified): 

379 """Auxiliary in provenance filtering: compose the trimmed parts.""" 

380 

381 return [f"""{m[0]}{ON}{trimM(m[1], trim)}""" for (m, trim) in reversed(modified)] 

382 

383 

384def perDay(modified): 

385 """Auxiliary in provenance filtering: chunk the trails into daily bits.""" 

386 

387 chunks = {} 

388 for m in modified: 

389 chunks.setdefault(dt.date(m[1]), []).append(m) 

390 return [chunks[date] for date in sorted(chunks)] 

391 

392 

393def thinM(chunks): 

394 """Auxiliary in provenance filtering: weed out the non-last items per day.""" 

395 

396 modified = [] 

397 nChunks = len(chunks) 

398 for (i, chunk) in enumerate(chunks): 

399 isLast = i == nChunks - 1 

400 people = {} 

401 for m in chunk: 

402 people.setdefault(m[0], []).append(m[1]) 

403 thinned = [] 

404 for (p, dates) in people.items(): 

405 thinned.append((p, sorted(dates)[-1])) 

406 for m in sorted(thinned, key=lambda x: x[1]): 

407 modified.append((m, 2 if isLast else 1)) 

408 return modified 

409 

410 

411IDLIKE_RE = re.compile(r"^[0-9a-f]+$", re.S) 

412 

413 

414def isIdLike(val): 

415 return IDLIKE_RE.match(val) 

416 

417 

418NAMELIKE_RE = re.compile(r"^[0-9a-zA-Z_]+$", re.S) 

419 

420 

421def isNameLike(val): 

422 return NAMELIKE_RE.match(val) 

423 

424 

425def isEmailLike(val): 

426 parts = val.split("@") 

427 if len(parts) != 2: 427 ↛ 429line 427 didn't jump to line 429, because the condition on line 427 was never false

428 return False 

429 good = True 

430 for part in parts: 

431 if not part.replace("_", "").replace("-", "").replace("+", "").isalnum(): 

432 good = False 

433 break 

434 return good 

435 

436 

437def isEppnLike(val): 

438 parts = val.split("@") 

439 good = True 

440 for part in parts: 

441 if not part.replace("_", "").replace("-", "").replace("+", "").isalnum(): 

442 good = False 

443 break 

444 return good 

445 

446 

447def isFileLike(val): 

448 parts = val.split("/") 

449 good = True 

450 for part in parts: 

451 if ( 

452 not part.replace("_", "") 

453 .replace("-", "") 

454 .replace("+", "") 

455 .replace(".", "") 

456 .isalnum() 

457 ): 

458 good = False 

459 break 

460 return good 

461 

462 

463def isNamesLike(val): 

464 parts = val.split(",") 

465 good = True 

466 for part in parts: 466 ↛ 470line 466 didn't jump to line 470, because the loop on line 466 didn't complete

467 if not part.isalnum(): 467 ↛ 466line 467 didn't jump to line 466, because the condition on line 467 was never false

468 good = False 

469 break 

470 return good 

471 

472 

473def saveParam(v): 

474 if not v: 

475 return "" 

476 if len(v) < 30: 

477 return v 

478 return f"{v[0:10]} ... {v[-10:]}" 

479 

480 

481def getq(name): 

482 return request.args.get(name, "")[0:64]