35
36:- module(swish_data_source,
37 [ data_source/2, 38 data_record/2, 39 record/2, 40 data_property/2, 41 data_row/2, 42 data_row/4, 43 data_dump/3, 44
45 data_flush/1, 46 'data assert'/1, 47 'data materialized'/3, 48 'data failed'/2 49 ]). 50:- use_module(library(error)). 51:- use_module(library(lists)). 52:- use_module(library(settings)). 53:- use_module(library(solution_sequences)). 54:- use_module(library(pengines)). 55
56:- setting(max_memory, integer, 8000,
57 "Max memory used for cached data store (Mb)"). 58
59
66
67:- meta_predicate
68 data_source(:, +),
69 data_record(:, -),
70 record(:, -),
71 data_row(:, -),
72 data_row(:, +, +, -),
73 data_dump(:, +, -),
74 data_property(:, -). 75
76:- multifile
77 source/2. 78
79
80 83
84:- dynamic
85 data_source_db/3, 86 data_signature_db/2, 87 data_materialized/5, 88 data_last_access/3. 89
90'data assert'(Term) :-
91 assertz(Term).
92
104
105'data materialized'(Hash, Signature, SourceVersion) :-
106 statistics(cputime, CPU1),
107 get_time(Now),
108 nb_current('$data_source_materalize', stats(Time0, CPU0)),
109 CPU is CPU1 - CPU0,
110 Wall is Now - Time0,
111 assertz(data_signature_db(Hash, Signature)),
112 assertz(data_materialized(Hash, Now, SourceVersion, CPU, Wall)).
113
114'data failed'(_Hash, Signature) :-
115 functor(Signature, Name, Arity),
116 functor(Generic, Name, Arity),
117 retractall(Generic).
118
123
124data_source(M:Id, Source) :-
125 variant_sha1(Source, Hash),
126 data_source_db(Hash, Source, _),
127 !,
128 ( clause(M:'$data'(Id, Hash), true)
129 -> true
130 ; assertz(M:'$data'(Id, Hash))
131 ).
132data_source(M:Id, Source) :-
133 valid_source(Source),
134 variant_sha1(Source, Hash),
135 mutex_create(Lock),
136 assertz(data_source_db(Hash, Source, Lock)),
137 assertz(M:'$data'(Id, Hash)).
138
147
148record(Id, Record) :-
149 data_record(Id, Record).
150
151data_record(M:Id, Record) :-
152 data_hash(M:Id, Hash),
153 materialize(Hash),
154 data_signature_db(Hash, Signature),
155 data_record(Signature, Id, Record, Head),
156 call(Head).
157
158data_record(Signature, Tag, Record, Head) :-
159 Signature =.. [Name|Keys],
160 pairs_keys_values(Pairs, Keys, Values),
161 dict_pairs(Record, Tag, Pairs),
162 Head =.. [Name|Values].
163
164data_hash(M:Id, Hash) :-
165 clause(M:'$data'(Id, Hash), true),
166 !.
167data_hash(_:Id, _) :-
168 existence_error(dataset, Id).
169
178
179data_row(Id, Row) :-
180 data_row(Id, all, true, Row).
181
182data_row(M:Id, Range, Header, Row) :-
183 must_be(boolean, Header),
184 data_hash(M:Id, Hash),
185 materialize(Hash),
186 data_signature_db(Hash, Signature),
187 Signature =.. [_|ColNames],
188 same_length(ColNames, Vars),
189 Goal =.. [Hash|Vars],
190 Row =.. [Id|Vars],
191 ( Header == true,
192 Vars = ColNames
193 ; range(Range, M:Id, Goal)
194 ).
195
196range(all, _Id, Goal) :-
197 !,
198 call(Goal).
199range(From-To, _Id, Goal) :-
200 !,
201 Skip is From - 1,
202 Size is To-Skip,
203 limit(Size, offset(Skip, call(Goal))).
204range(Limit, _Id, Goal) :-
205 Limit >= 0,
206 !,
207 limit(Limit, call(Goal)).
208range(Limit, Id, Goal) :-
209 Limit < 0,
210 data_property(Id, rows(Rows)),
211 Skip is Rows+Limit,
212 offset(Skip, call(Goal)).
213
227
228data_dump(Id, Range, Table) :-
229 findall(Row, data_row(Id, Range, true, Row), Table).
230
231
254
255data_property(M:Id, Property) :-
256 data_hash(M:Id, Hash),
257 materialize(Hash),
258 property(Property),
259 property(Property, Hash).
260
261property(columns(_)).
262property(column_names(_)).
263property(rows(_)).
264property(hash(_)).
265property(source_version(_)).
266property(materialized(_)).
267property(source(_)).
268
269property(columns(Count), Hash) :-
270 data_signature_db(Hash, Signature),
271 functor(Signature, _, Count).
272property(column_names(Names), Hash) :-
273 data_signature_db(Hash, Signature),
274 Signature =.. [_|Names].
275property(rows(Count), Hash) :-
276 data_signature_db(Hash, Signature),
277 predicate_property(Signature, number_of_clauses(Count)).
278property(hash(Hash), Hash).
279property(source_version(SourceVersion), Hash) :-
280 data_materialized(Hash, _, SourceVersion, _, _).
281property(materialized(TimeStamp), Hash) :-
282 data_materialized(Hash, TimeStamp, _, _, _).
283property(source(SourceTerm), Hash) :-
284 data_source_db(Hash, SourceTerm, _Lock).
285
292
293:- multifile
294 swish:goal_expansion/2. 295
296swish:goal_expansion(Dict, swish_data_source:Head) :-
297 is_dict(Dict, Id),
298 prolog_load_context(module, M),
299 clause(M:'$data'(Id, Hash), true),
300 materialize(Hash),
301 data_signature_db(Hash, Signature),
302 data_record(Signature, Id, Record, Head),
303 Dict :< Record.
304
305
306 309
310valid_source(Source) :-
311 must_be(nonvar, Source),
312 source(Source, _Goal),
313 !.
314valid_source(Source) :-
315 existence_error(data_source, Source).
316
330
331materialize(Hash) :-
332 must_be(atom, Hash),
333 data_materialized(Hash, _When, _From, _CPU, _Wall),
334 !,
335 update_last_access(Hash).
336materialize(Hash) :-
337 data_source_db(Hash, Source, Lock),
338 update_last_access(Hash),
339 gc_data,
340 with_mutex(Lock, materialize_sync(Hash, Source)).
341
342materialize_sync(Hash, _Source) :-
343 data_materialized(Hash, _When, _From, _CPU, _Wall),
344 !.
345materialize_sync(Hash, Source) :-
346 source(Source, Goal),
347 get_time(Time0),
348 statistics(cputime, CPU0),
349 setup_call_cleanup(
350 b_setval('$data_source_materalize', stats(Time0, CPU0)),
351 call(Goal, Hash),
352 nb_delete('$data_source_materalize')),
353 data_signature_db(Hash, Head),
354 functor(Head, Name, Arity),
355 public(Name/Arity).
356
357
358 361
366
367update_last_access(Hash) :-
368 get_time(Now),
369 Rounded is floor(Now/60)*60,
370 ( data_last_access(Hash, Rounded, _)
371 -> true
372 ; clause(data_last_access(Hash, _, C0), true, Old)
373 -> C is C0+1,
374 asserta(data_last_access(Hash, Rounded, C)),
375 erase(Old)
376 ; asserta(data_last_access(Hash, Rounded, 1))
377 ).
378
379gc_stats(Hash, _{ hash:Hash,
380 materialized:When, cpu:CPU, wall:Wall,
381 bytes:Size,
382 last_accessed_ago:Ago,
383 access_frequency:AccessCount
384 }) :-
385 data_materialized(Hash, When, _From, CPU, Wall),
386 data_signature_db(Hash, Signature),
387 data_last_access(Hash, Last, AccessCount),
388 get_time(Now),
389 Ago is floor(Now/60)*60-Last,
390 predicate_property(Signature, number_of_clauses(Count)),
391 functor(Signature, _, Arity),
392 Size is (88+(16*Arity))*Count.
393
394
401
402gc_data :-
403 setting(max_memory, MB),
404 Bytes is MB*1024*1024,
405 gc_data(Bytes),
406 set_module(program_space(Bytes)).
407
408gc_data(MaxSize) :-
409 module_property(swish_data_source, program_size(Size)),
410 Size < MaxSize,
411 !.
412gc_data(MaxSize) :-
413 findall(Stat, gc_stats(_, Stat), Stats),
414 sort(last_accessed_ago, >=, Stats, ByTime),
415 member(Stat, ByTime),
416 data_flush(ByTime.hash),
417 module_property(swish_data_source, program_size(Size)),
418 Size < MaxSize,
419 !.
420gc_data(_).
421
422
426
427data_flush(Hash) :-
428 data_signature_db(Hash, Signature),
429 data_record(Signature, _Id, _Record, Head),
430 retractall(Head),
431 retractall(data_signature_db(Hash, Head)),
432 retractall(data_materialized(Hash, _When1, _From, _CPU, _Wall)),
433 retractall(data_last_access(Hash, _When2, _Count)).
434
435
436 439
440:- multifile
441 sandbox:safe_meta/2. 442
443sandbox:safe_meta(swish_data_source:data_source(Id,_), []) :- safe_id(Id).
444sandbox:safe_meta(swish_data_source:data_record(Id,_), []) :- safe_id(Id).
445sandbox:safe_meta(swish_data_source:record(Id,_), []) :- safe_id(Id).
446sandbox:safe_meta(swish_data_source:data_row(Id,_), []) :- safe_id(Id).
447sandbox:safe_meta(swish_data_source:data_row(Id,_,_,_), []) :- safe_id(Id).
448sandbox:safe_meta(swish_data_source:data_dump(Id,_,_), []) :- safe_id(Id).
449sandbox:safe_meta(swish_data_source:data_property(Id,_), []) :- safe_id(Id).
450
451safe_id(M:_) :- !, pengine_self(M).
452safe_id(_)