Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Things that do not find a more logical place. 

2 

3* Utitility functions 

4* Character constants 

5""" 

6 

7import sys 

8import json 

9from json import JSONEncoder 

10from bson.objectid import ObjectId 

11 

12from base64 import b64encode, b64decode 

13from datetime import datetime as dt 

14from flask import request 

15 

16 

17REGION_SHIFT = 0x1F1E6 - ord("A") 

18"""Offset of the Unicode position where flag symbols start w.r.t. to `'A'`.""" 

19 

20ISO_DTP = """%Y-%m-%dT%H:%M:%S.%f""" 

21ISO_DT = """%Y-%m-%dT%H:%M:%S""" 

22ISO_D = """%Y-%m-%d""" 

23 

24E = "" 

25BLANK = " " 

26COMMA = "," 

27COLON = ":" 

28DOT = "." 

29PIPE = "|" 

30T = "T" 

31Z = "Z" 

32AT = "@" 

33EURO = "€" 

34MINONE = "-1" 

35ZERO = "0" 

36ONE = "1" 

37TWO = "2" 

38THREE = "3" 

39SLASH = "/" 

40LOW = "_" 

41AMP = "&" 

42LT = "<" 

43APOS = "'" 

44QUOT = '"' 

45DOLLAR = "$" 

46Q = "?" 

47S = "s" 

48 

49NL = "\n" 

50TAB = "\t" 

51LINE_SEP = "§" 

52MIDDLE_DOT = "\u00b7" 

53 

54PLUS = "+" 

55MIN = "-" 

56HYPHEN = "-" 

57WHYPHEN = " - " 

58ELLIPS = "..." 

59ON = " on " 

60 

61NBSP = "&#xa;" 

62 

63LATIN1 = "latin1" 

64UTF8 = "utf8" 

65 

66EMPTY_DATE = "1900-01-01T00:00:00Z" 

67 

68ITER = "__iter__" 

69 

70 

71class MongoJSONEncoder(JSONEncoder): 

72 def __init__(self, *args, **kwargs): 

73 super().__init__(*args, **kwargs) 

74 

75 def default(self, obj): 

76 if isinstance(obj, dt): 

77 return obj.isoformat() 

78 elif isinstance(obj, ObjectId): 

79 return str(obj) 

80 return JSONEncoder.default(self, obj) 

81 

82 

83mjson = MongoJSONEncoder(ensure_ascii=False).encode 

84 

85 

86def mktsv(data): 

87 if data is None: 

88 return "" 

89 

90 allHeaders = set() 

91 for row in data: 

92 allHeaders |= set(row) 

93 allHeaders = sorted(allHeaders) 

94 

95 lines = ["\t".join(allHeaders)] 

96 for row in data: 

97 values = [] 

98 for field in allHeaders: 

99 value = row.get(field, "") 

100 if type(value) in {list, tuple}: 

101 value = MIDDLE_DOT.join(str(v) for v in value) 

102 else: 

103 value = str(value) 

104 value = value.replace("\n", LINE_SEP).replace("\t", " ") 

105 values.append(value) 

106 lines.append("\t".join(values)) 

107 return "\n".join(lines) 

108 

109 

110def factory(name, Base, Deriveds): 

111 """Find the base or derived class by registered name. 

112 

113 Parameters 

114 ---------- 

115 Base: class 

116 Start the lookup here. 

117 Deriveds: iterable of (name, class) 

118 A list of derived classes with their names. 

119 

120 Returns 

121 ------- 

122 class 

123 """ 

124 

125 Derived = Base 

126 for (nm, NmCl) in Deriveds: 

127 if nm == name: 

128 Derived = NmCl 

129 break 

130 

131 return Derived 

132 

133 

134def utf8FromLatin1(s): 

135 """Get Unicode from a latin1 string. 

136 

137 !!! hint 

138 Needed to process the values of environment variables, in particular 

139 those from the identity provider.. 

140 

141 Parameters 

142 ---------- 

143 s: string(latin1) 

144 

145 Returns 

146 ------- 

147 string(utf8) 

148 """ 

149 return str(bytes(s, encoding=LATIN1), encoding=UTF8) 

150 

151 

152def bencode(s): 

153 """Serialize a complex data structure into a plain ASCII string. 

154 

155 !!! hint 

156 Needed to pass the original value into an edit widget, so that the Javascript 

157 has a way to know whether an edited value is dirty or not. 

158 

159 Parameters 

160 ---------- 

161 s: Python value 

162 

163 Returns 

164 ------- 

165 string(ascii) 

166 """ 

167 

168 return b64encode(json.dumps(s, separators=(COMMA, COLON)).encode()).decode() 

169 

170 

171def bdecode(s): 

172 """Interpets a serialized value as a Python value. 

173 

174 Parameters 

175 ---------- 

176 s: string(ascii) 

177 

178 Returns 

179 ------- 

180 Python value. 

181 """ 

182 

183 return json.loads(b64decode(s.encode()).decode()) 

184 

185 

186def cap1(s): 

187 """The first letter capitalized. 

188 

189 Parameters 

190 ---------- 

191 s: string 

192 

193 Returns 

194 ------- 

195 string 

196 """ 

197 

198 return E if not s else s[0].upper() + s[1:] 

199 

200 

201def shiftRegional(iso): 

202 """Transpose iso country code into flag. 

203 

204 By shifting the 2-letter iso country code with a fixed offset, 

205 we get two Unicode characters that browsers know to render as a flag symbol 

206 for that country. 

207 

208 Parameters 

209 ---------- 

210 iso: string 

211 2-letter iso country code. 

212 

213 Returns 

214 ------- 

215 flag:string 

216 2-letter unicode, starting from `control.utils.REGION_SHIFT`. 

217 """ 

218 

219 return E.join(chr(ord(r) + REGION_SHIFT) for r in iso) 

220 

221 

222def now(): 

223 """The current moment in time as a `datetime` value.""" 

224 

225 return dt.utcnow() 

226 

227 

228def thisYear(): 

229 """The current year as number.""" 

230 

231 return dt.utcnow().year 

232 

233 

234def serverprint(*msg): 

235 """Print a message to the console immediately.""" 

236 

237 sys.stdout.write(f"""{" ".join(msg)}{NL}""") 

238 sys.stdout.flush() 

239 

240 

241def dtm(isostr): 

242 """Get a datetime value from an ISO string representing time.""" 

243 

244 isostr = isostr.rstrip(Z) 

245 try: 

246 date = dt.strptime(isostr, ISO_DTP) 

247 except Exception: 

248 try: 

249 date = dt.strptime(isostr, ISO_DT) 

250 except Exception: 

251 try: 

252 date = dt.strptime(isostr, ISO_D) 

253 except Exception as err: 

254 return (str(err), isostr) 

255 return (E, date) 

256 

257 

258def isIterable(value): 

259 """Whether a value is a non-string iterable. 

260 

261 !!! note 

262 Strings are iterables. 

263 We want to know whether a value is a string or an iterable of strings. 

264 """ 

265 

266 return type(value) is not str and hasattr(value, ITER) 

267 

268 

269def asString(value): 

270 """Join an iterable of strings into a string. 

271 

272 And if the value is already a string, return it, and if it is `None` 

273 return the empty string. 

274 """ 

275 

276 return E if value is None else E.join(value) if isIterable(value) else value 

277 

278 

279def getLast(sequence): 

280 """Get the last element of a sequence or `None` if the sequence is empty.""" 

281 

282 return sequence[-1] if sequence else None 

283 

284 

285def pick(record, field, default=None): 

286 """Get the value for a key in a dict, or None if there is no dict. 

287 

288 !!! warning 

289 But if the value for `field` in the record is `None`, `None` will be returned. 

290 

291 Parameters 

292 ---------- 

293 record: dict | `None` 

294 `pick` should work in both cases. 

295 field: string 

296 The field in `record` we want to extract. 

297 default: mixed 

298 Default value. 

299 

300 Returns 

301 ------- 

302 value | `None` 

303 The value is the default if the record is `None`, or if the record has no 

304 `field`. 

305 Otherwise it is the value for `field` from the record. 

306 """ 

307 

308 return default if record is None else record.get(field, default) 

309 

310 

311def creators(record, creatorField, editorsField): 

312 """List all ids in two fields of a record. 

313 

314 Parameters 

315 ---------- 

316 record: dict 

317 The source record 

318 creatorField: string 

319 The name of a field with a single id value. 

320 editorsFields: string 

321 The name of a field with multiple id values. 

322 

323 Returns 

324 ------- 

325 list 

326 A sorted list of all ids encountered in those fields. 

327 """ 

328 

329 editors = set(pick(record, editorsField, default=[])) 

330 editors.add(pick(record, creatorField)) 

331 return sorted(editors) 

332 

333 

334def filterModified(modified): 

335 """Filter a provenance trail. 

336 

337 The provenance trail is a list of strings shaped as `"actor on date"` corresponding 

338 to changes in a record. 

339 

340 After filtering we retain for each day only the last modification event per person. 

341 """ 

342 

343 logicM = decomposeM(modified) 

344 chunks = perDay(logicM) 

345 thinned = thinM(chunks) 

346 return composeM(thinned) 

347 

348 

349def decomposeM(modified): 

350 """Auxiliary in provenance filtering: split an entry into name and date.""" 

351 

352 splits = [m.rsplit(ON, 1) for m in modified] 

353 return [(m[0], dtm(m[1].replace(BLANK, T))[1]) for m in splits] 

354 

355 

356def trimM(mdt, trim): 

357 """Auxiliary in provenance filtering: trim the secoonds part. 

358 

359 Parameters 

360 ---------- 

361 mdt: string 

362 Modification date in iso shape. 

363 trim: boolean 

364 Whether or not to trim the decimal parts of the seconds aways. 

365 """ 

366 

367 return str(mdt).split(BLANK)[0] if trim == 1 else str(mdt).split(DOT)[0] 

368 

369 

370def composeM(modified): 

371 """Auxiliary in provenance filtering: compose the trimmed parts.""" 

372 

373 return [f"""{m[0]}{ON}{trimM(m[1], trim)}""" for (m, trim) in reversed(modified)] 

374 

375 

376def perDay(modified): 

377 """Auxiliary in provenance filtering: chunk the trails into daily bits.""" 

378 

379 chunks = {} 

380 for m in modified: 

381 chunks.setdefault(dt.date(m[1]), []).append(m) 

382 return [chunks[date] for date in sorted(chunks)] 

383 

384 

385def thinM(chunks): 

386 """Auxiliary in provenance filtering: weed out the non-last items per day.""" 

387 

388 modified = [] 

389 nChunks = len(chunks) 

390 for (i, chunk) in enumerate(chunks): 

391 isLast = i == nChunks - 1 

392 people = {} 

393 for m in chunk: 

394 people.setdefault(m[0], []).append(m[1]) 

395 thinned = [] 

396 for (p, dates) in people.items(): 

397 thinned.append((p, sorted(dates)[-1])) 

398 for m in sorted(thinned, key=lambda x: x[1]): 

399 modified.append((m, 2 if isLast else 1)) 

400 return modified 

401 

402 

403def getq(name): 

404 return request.args.get(name, "")[0:64]