1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2006-2015, University of Amsterdam 7 VU University Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(rdf_persistency, 37 [ rdf_attach_db/2, % +Directory, +Options 38 rdf_detach_db/0, % +Detach current Graph 39 rdf_current_db/1, % -Directory 40 rdf_persistency/2, % +Graph, +Bool 41 rdf_flush_journals/1, % +Options 42 rdf_persistency_property/1, % ?Property 43 rdf_journal_file/2, % ?Graph, ?JournalFile 44 rdf_snapshot_file/2, % ?Graph, ?SnapshotFile 45 rdf_db_to_file/2 % ?Graph, ?FileBase 46 ]). 47:- use_module(library(semweb/rdf_db), 48 [ rdf_graph/1, rdf_unload_graph/1, rdf_statistics/1, 49 rdf_load_db/1, rdf_retractall/4, rdf_create_graph/1, 50 rdf_assert/4, rdf_update/5, rdf_monitor/2, rdf/4, 51 rdf_save_db/2, rdf_atom_md5/3, rdf_current_ns/2, 52 rdf_register_ns/3 53 ]). 54 55:- autoload(library(apply),[maplist/2,maplist/3,partition/4,exclude/3]). 56:- autoload(library(debug),[debug/3]). 57:- autoload(library(error), 58 [permission_error/3,must_be/2,domain_error/2]). 59:- autoload(library(filesex), 60 [directory_file_path/3,make_directory_path/1]). 61:- autoload(library(lists),[select/3,append/3]). 62:- autoload(library(option),[option/2,option/3]). 63:- autoload(library(readutil),[read_file_to_terms/3]). 64:- autoload(library(socket),[gethostname/1]). 65:- autoload(library(thread),[concurrent/3]). 66:- autoload(library(uri),[uri_encoded/3]).
100:- volatile 101 rdf_directory/1, 102 rdf_lock/2, 103 rdf_option/1, 104 source_journal_fd/2, 105 file_base_db/2. 106:- dynamic 107 rdf_directory/1, % Absolute path 108 rdf_lock/2, % Dir, Lock 109 rdf_option/1, % Defined options 110 source_journal_fd/2, % DB, JournalFD 111 file_base_db/2. % FileBase, DB 112 113:- meta_predicate 114 no_agc( ). 115 116:- predicate_options(rdf_attach_db/2, 2, 117 [ access(oneof([read_write,read_only])), 118 concurrency(positive_integer), 119 max_open_journals(positive_integer), 120 silent(oneof([true,false,brief])), 121 log_nested_transactions(boolean) 122 ]).
Options:
auto
(default), read_write
or
read_only
. Read-only access implies that the RDF
store is not locked. It is read at startup and all
modifications to the data are temporary. The default
auto
mode is read_write
if the directory is
writeable and the lock can be acquired. Otherwise
it reverts to read_only
.cpu_count
.true
(default false
), do not print informational
messages. Finally, if brief
it will show minimal
feedback.true
, nested log transactions are added to the
journal information. By default (false
), no log-term
is added for nested transactions.\\172rdf_attach_db(DirSpec, Options) :- 173 option(access(read_only), Options), 174 !, 175 absolute_file_name(DirSpec, 176 Directory, 177 [ access(read), 178 file_type(directory) 179 ]), 180 rdf_attach_db_ro(Directory, Options). 181rdf_attach_db(DirSpec, Options) :- 182 option(access(read_write), Options), 183 !, 184 rdf_attach_db_rw(DirSpec, Options). 185rdf_attach_db(DirSpec, Options) :- 186 absolute_file_name(DirSpec, 187 Directory, 188 [ access(exist), 189 file_type(directory), 190 file_errors(fail) 191 ]), 192 !, 193 ( access_file(Directory, write) 194 -> catch(rdf_attach_db_rw(Directory, Options), E, true), 195 ( var(E) 196 -> true 197 ; E = error(permission_error(lock, rdf_db, _), _) 198 -> print_message(warning, E), 199 print_message(warning, rdf(read_only)), 200 rdf_attach_db(DirSpec, [access(read_only)|Options]) 201 ; throw(E) 202 ) 203 ; print_message(warning, 204 error(permission_error(write, directory, Directory))), 205 print_message(warning, rdf(read_only)), 206 rdf_attach_db_ro(Directory, Options) 207 ). 208rdf_attach_db(DirSpec, Options) :- 209 catch(rdf_attach_db_rw(DirSpec, Options), E, true), 210 ( var(E) 211 -> true 212 ; print_message(warning, E), 213 print_message(warning, rdf(read_only)), 214 rdf_attach_db(DirSpec, [access(read_only)|Options]) 215 ). 216 217 218rdf_attach_db_rw(DirSpec, Options) :- 219 absolute_file_name(DirSpec, 220 Directory, 221 [ access(write), 222 file_type(directory), 223 file_errors(fail) 224 ]), 225 !, 226 ( rdf_directory(Directory) 227 -> true % update settings? 228 ; rdf_detach_db, 229 mkdir(Directory), 230 lock_db(Directory), 231 assert(rdf_directory(Directory)), 232 assert_options(Options), 233 stop_monitor, % make sure not to register load 234 no_agc(load_db), 235 at_halt(rdf_detach_db), 236 start_monitor 237 ). 238rdf_attach_db_rw(DirSpec, Options) :- 239 absolute_file_name(DirSpec, 240 Directory, 241 [ solutions(all) 242 ]), 243 ( exists_directory(Directory) 244 -> access_file(Directory, write) 245 ; catch(make_directory(Directory), _, fail) 246 ), 247 !, 248 rdf_attach_db(Directory, Options). 249rdf_attach_db_rw(DirSpec, _) :- % Generate an existence or 250 absolute_file_name(DirSpec, % permission error 251 Directory, 252 [ access(exist), 253 file_type(directory) 254 ]), 255 permission_error(write, directory, Directory).
261rdf_attach_db_ro(Directory, Options) :- 262 rdf_detach_db, 263 assert(rdf_directory(Directory)), 264 assert_options(Options), 265 stop_monitor, % make sure not to register load 266 no_agc(load_db). 267 268 269assert_options([]). 270assert_options([H|T]) :- 271 ( option_type(H, Check) 272 -> , 273 assert(rdf_option(H)) 274 ; true % ignore options we do not understand 275 ), 276 assert_options(T). 277 278option_type(concurrency(X), must_be(positive_integer, X)). 279option_type(max_open_journals(X), must_be(positive_integer, X)). 280option_type(directory_levels(X), must_be(positive_integer, X)). 281option_type(silent(X), must_be(oneof([true,false,brief]), X)). 282option_type(log_nested_transactions(X), must_be(boolean, X)). 283option_type(access(X), must_be(oneof([read_write, 284 read_only]), X)).
rdf_persistency_property(access(read_only))
is true iff the database
is mounted in read-only mode. In addition, the following property is
supported:
299rdf_persistency_property(Property) :- 300 var(Property), 301 !, 302 rdf_persistency_property_(Property). 303rdf_persistency_property(Property) :- 304 rdf_persistency_property_(Property), 305 !. 306 307rdf_persistency_property_(Property) :- 308 rdf_option(Property). 309rdf_persistency_property_(directory(Dir)) :- 310 rdf_directory(Dir).
318no_agc(Goal) :-
319 current_prolog_flag(agc_margin, Old),
320 setup_call_cleanup(
321 set_prolog_flag(agc_margin, 0),
322 Goal,
323 set_prolog_flag(agc_margin, Old)).
332rdf_detach_db :-
333 debug(halt, 'Detaching RDF database', []),
334 stop_monitor,
335 close_journals,
336 ( retract(rdf_directory(Dir))
337 -> debug(halt, 'DB Directory: ~w', [Dir]),
338 save_prefixes(Dir),
339 retractall(rdf_option(_)),
340 retractall(source_journal_fd(_,_)),
341 unlock_db(Dir)
342 ; true
343 ).
350rdf_current_db(Directory) :-
351 rdf_directory(Dir),
352 !,
353 Dir = Directory.
367rdf_flush_journals(Options) :- 368 option(graph(Graph), Options, _), 369 forall(rdf_graph(Graph), 370 rdf_flush_journal(Graph, Options)). 371 372rdf_flush_journal(Graph, Options) :- 373 db_files(Graph, _SnapshotFile, JournalFile), 374 db_file(JournalFile, File), 375 ( \+ exists_file(File) 376 -> true 377 ; memberchk(min_size(KB), Options), 378 size_file(JournalFile, Size), 379 Size / 1024 < KB 380 -> true 381 ; create_db(Graph) 382 ). 383 384 /******************************* 385 * LOAD * 386 *******************************/
394load_db :- 395 rdf_directory(Dir), 396 concurrency(Jobs), 397 cpu_stat_key(Jobs, StatKey), 398 get_time(Wall0), 399 statistics(StatKey, T0), 400 load_prefixes(Dir), 401 verbosity(Silent), 402 find_dbs(Dir, Graphs, SnapShots, Journals), 403 length(Graphs, GraphCount), 404 maplist(rdf_unload_graph, Graphs), 405 rdf_statistics(triples(Triples0)), 406 load_sources(snapshots, SnapShots, Silent, Jobs), 407 load_sources(journals, Journals, Silent, Jobs), 408 rdf_statistics(triples(Triples1)), 409 statistics(StatKey, T1), 410 get_time(Wall1), 411 T is T1 - T0, 412 Wall is Wall1 - Wall0, 413 Triples = Triples1 - Triples0, 414 message_level(Silent, Level), 415 print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))). 416 417load_sources(_, [], _, _) :- !. 418load_sources(Type, Sources, Silent, Jobs) :- 419 length(Sources, Count), 420 RunJobs is min(Count, Jobs), 421 print_message(informational, rdf(restoring(Type, Count, RunJobs))), 422 make_goals(Sources, Silent, 1, Count, Goals), 423 concurrent(RunJobs, Goals, []).
428make_goals([], _, _, _, []). 429make_goals([DB|T0], Silent, I, Total, 430 [load_source(DB, Silent, I, Total)|T]) :- 431 I2 is I + 1, 432 make_goals(T0, Silent, I2, Total, T). 433 434verbosity(Silent) :- 435 rdf_option(silent(Silent)), 436 !. 437verbosity(Silent) :- 438 current_prolog_flag(verbose, silent), 439 !, 440 Silent = true. 441verbosity(brief).
448concurrency(Jobs) :- 449 rdf_option(concurrency(Jobs)), 450 !. 451concurrency(Jobs) :- 452 current_prolog_flag(cpu_count, Jobs), 453 Jobs > 0, 454 !. 455concurrency(1). 456 457cpu_stat_key(1, cputime) :- !. 458cpu_stat_key(_, process_cputime).
db(Size, Ext, DB, DBFile, Depth)
470find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :- 471 directory_files(Dir, Files), 472 phrase(scan_db_files(Files, Dir, '.', 0), Scanned), 473 maplist(db_graph, Scanned, UnsortedGraphs), 474 sort(UnsortedGraphs, Graphs), 475 ( consider_reindex_db(Dir, Graphs, Scanned) 476 -> find_dbs(Dir, Graphs, SnapBySize, JournalBySize) 477 ; partition(db_is_snapshot, Scanned, Snapshots, Journals), 478 sort(Snapshots, SnapBySize), 479 sort(Journals, JournalBySize) 480 ). 481 482consider_reindex_db(Dir, Graphs, Scanned) :- 483 length(Graphs, Count), 484 Count > 0, 485 DepthNeeded is floor(log(Count)/log(256)), 486 ( maplist(depth_db(DepthNow), Scanned) 487 -> ( DepthNeeded > DepthNow 488 -> true 489 ; retractall(rdf_option(directory_levels(_))), 490 assertz(rdf_option(directory_levels(DepthNow))), 491 fail 492 ) 493 ; true 494 ), 495 reindex_db(Dir, DepthNeeded). 496 497db_is_snapshot(Term) :- 498 arg(2, Term, trp). 499 500db_graph(Term, DB) :- 501 arg(3, Term, DB). 502 503db_file_name(Term, File) :- 504 arg(4, Term, File). 505 506depth_db(Depth, DB) :- 507 arg(5, DB, Depth).
db(DB, Size, File)
for all recognised RDF
database files. File is relative to the database directory Dir.514scan_db_files([], _, _, _) --> 515 []. 516scan_db_files([Nofollow|T], Dir, Prefix, Depth) --> 517 { nofollow(Nofollow) }, 518 !, 519 scan_db_files(T, Dir, Prefix, Depth). 520scan_db_files([File|T], Dir, Prefix, Depth) --> 521 { file_name_extension(Base, Ext, File), 522 db_extension(Ext), 523 !, 524 rdf_db_to_file(DB, Base), 525 directory_file_path(Prefix, File, DBFile), 526 directory_file_path(Dir, DBFile, AbsFile), 527 size_file(AbsFile, Size) 528 }, 529 [ db(Size, Ext, DB, AbsFile, Depth) ], 530 scan_db_files(T, Dir, Prefix, Depth). 531scan_db_files([D|T], Dir, Prefix, Depth) --> 532 { directory_file_path(Prefix, D, SubD), 533 directory_file_path(Dir, SubD, AbsD), 534 exists_directory(AbsD), 535 \+ read_link(AbsD, _, _), % Do not follow links 536 !, 537 directory_files(AbsD, SubFiles), 538 SubDepth is Depth + 1 539 }, 540 scan_db_files(SubFiles, Dir, SubD, SubDepth), 541 scan_db_files(T, Dir, Prefix, Depth). 542scan_db_files([_|T], Dir, Prefix, Depth) --> 543 scan_db_files(T, Dir, Prefix, Depth). 544 545nofollow(.). 546nofollow(..). 547 548db_extension(trp). 549db_extension(jrn). 550 551:- public load_source/4. % called through make_goals/5 552 553load_source(DB, Silent, Nth, Total) :- 554 db_file_name(DB, File), 555 db_graph(DB, Graph), 556 message_level(Silent, Level), 557 graph_triple_count(Graph, Count0), 558 statistics(cputime, T0), 559 ( db_is_snapshot(DB) 560 -> print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))), 561 rdf_load_db(File) 562 ; print_message(Level, rdf(restore(Silent, journal(Graph, File)))), 563 load_journal(File, Graph) 564 ), 565 statistics(cputime, T1), 566 T is T1 - T0, 567 graph_triple_count(Graph, Count1), 568 Count is Count1 - Count0, 569 print_message(Level, rdf(restore(Silent, 570 done(Graph, T, Count, Nth, Total)))). 571 572 573graph_triple_count(Graph, Count) :- 574 rdf_statistics(triples_by_graph(Graph, Count)), 575 !. 576graph_triple_count(_, 0).
584attach_graph(Graph, Options) :- 585 ( option(silent(true), Options) 586 -> Level = silent 587 ; Level = informational 588 ), 589 db_files(Graph, SnapshotFile, JournalFile), 590 rdf_retractall(_,_,_,Graph), 591 statistics(cputime, T0), 592 print_message(Level, rdf(restore(Silent, Graph))), 593 db_file(SnapshotFile, AbsSnapShot), 594 ( exists_file(AbsSnapShot) 595 -> print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))), 596 rdf_load_db(AbsSnapShot) 597 ; true 598 ), 599 ( exists_db(JournalFile) 600 -> print_message(Level, rdf(restore(Silent, journal(JournalFile)))), 601 load_journal(JournalFile, Graph) 602 ; true 603 ), 604 statistics(cputime, T1), 605 T is T1 - T0, 606 ( rdf_statistics(triples_by_graph(Graph, Count)) 607 -> true 608 ; Count = 0 609 ), 610 print_message(Level, rdf(restore(Silent, 611 done(Graph, T, Count)))). 612 613message_level(true, silent) :- !. 614message_level(_, informational). 615 616 617 /******************************* 618 * LOAD JOURNAL * 619 *******************************/
626load_journal(File, DB) :- 627 rdf_create_graph(DB), 628 setup_call_cleanup( 629 open(File, read, In, [encoding(utf8)]), 630 ( read(In, T0), 631 process_journal(T0, In, DB) 632 ), 633 close(In)). 634 635process_journal(end_of_file, _, _) :- !. 636process_journal(Term, In, DB) :- 637 ( process_journal_term(Term, DB) 638 -> true 639 ; throw(error(type_error(journal_term, Term), _)) 640 ), 641 read(In, T2), 642 process_journal(T2, In, DB). 643 644process_journal_term(assert(S,P,O), DB) :- 645 rdf_assert(S,P,O,DB). 646process_journal_term(assert(S,P,O,Line), DB) :- 647 rdf_assert(S,P,O,DB:Line). 648process_journal_term(retract(S,P,O), DB) :- 649 rdf_retractall(S,P,O,DB). 650process_journal_term(retract(S,P,O,Line), DB) :- 651 rdf_retractall(S,P,O,DB:Line). 652process_journal_term(update(S,P,O,Action), DB) :- 653 ( rdf_update(S,P,O,DB, Action) 654 -> true 655 ; print_message(warning, rdf(update_failed(S,P,O,Action))) 656 ). 657process_journal_term(start(_), _). % journal open/close 658process_journal_term(end(_), _). 659process_journal_term(begin(_), _). % logged transaction (compatibility) 660process_journal_term(end, _). 661process_journal_term(begin(_,_,_,_), _). % logged transaction (current) 662process_journal_term(end(_,_,_), _). 663 664 665 /******************************* 666 * CREATE JOURNAL * 667 *******************************/ 668 669:- dynamic 670 blocked_db/2, % DB, Reason 671 transaction_message/3, % Nesting, Time, Message 672 transaction_db/3. % Nesting, DB, Id
false
kills the persistent state. Switching to true
creates it.679rdf_persistency(DB, Bool) :- 680 must_be(atom, DB), 681 must_be(boolean, Bool), 682 fail. 683rdf_persistency(DB, false) :- 684 !, 685 ( blocked_db(DB, persistency) 686 -> true 687 ; assert(blocked_db(DB, persistency)), 688 delete_db(DB) 689 ). 690rdf_persistency(DB, true) :- 691 ( retract(blocked_db(DB, persistency)) 692 -> create_db(DB) 693 ; true 694 ).
700:- multifile 701 rdf_db:property_of_graph/2. 702 703rdf_dbproperty_of_graph(persistent(State), Graph) :- 704 ( blocked_db(Graph, persistency) 705 -> State = false 706 ; State = true 707 ).
716start_monitor :- 717 rdf_monitor(monitor, 718 [ -assert(load) 719 ]). 720stop_monitor :- 721 rdf_monitor(monitor, 722 [ -all 723 ]).
rdf_db.pl
that deal with
database changes are serialized. They do come from different
threads though.732monitor(Msg) :- 733 debug(monitor, 'Monitor: ~p~n', [Msg]), 734 fail. 735monitor(assert(S,P,O,DB:Line)) :- 736 !, 737 \+ blocked_db(DB, _), 738 journal_fd(DB, Fd), 739 open_transaction(DB, Fd), 740 format(Fd, '~q.~n', [assert(S,P,O,Line)]), 741 sync_journal(DB, Fd). 742monitor(assert(S,P,O,DB)) :- 743 \+ blocked_db(DB, _), 744 journal_fd(DB, Fd), 745 open_transaction(DB, Fd), 746 format(Fd, '~q.~n', [assert(S,P,O)]), 747 sync_journal(DB, Fd). 748monitor(retract(S,P,O,DB:Line)) :- 749 !, 750 \+ blocked_db(DB, _), 751 journal_fd(DB, Fd), 752 open_transaction(DB, Fd), 753 format(Fd, '~q.~n', [retract(S,P,O,Line)]), 754 sync_journal(DB, Fd). 755monitor(retract(S,P,O,DB)) :- 756 \+ blocked_db(DB, _), 757 journal_fd(DB, Fd), 758 open_transaction(DB, Fd), 759 format(Fd, '~q.~n', [retract(S,P,O)]), 760 sync_journal(DB, Fd). 761monitor(update(S,P,O,DB:Line,Action)) :- 762 !, 763 \+ blocked_db(DB, _), 764 ( Action = graph(NewDB) 765 -> monitor(assert(S,P,O,NewDB)), 766 monitor(retract(S,P,O,DB:Line)) 767 ; journal_fd(DB, Fd), 768 format(Fd, '~q.~n', [update(S,P,O,Action)]), 769 sync_journal(DB, Fd) 770 ). 771monitor(update(S,P,O,DB,Action)) :- 772 \+ blocked_db(DB, _), 773 ( Action = graph(NewDB) 774 -> monitor(assert(S,P,O,NewDB)), 775 monitor(retract(S,P,O,DB)) 776 ; journal_fd(DB, Fd), 777 open_transaction(DB, Fd), 778 format(Fd, '~q.~n', [update(S,P,O,Action)]), 779 sync_journal(DB, Fd) 780 ). 781monitor(load(BE, _DumpFileURI)) :- 782 ( BE = end(Graphs) 783 -> sync_loaded_graphs(Graphs) 784 ; true 785 ). 786monitor(create_graph(Graph)) :- 787 \+ blocked_db(Graph, _), 788 journal_fd(Graph, Fd), 789 open_transaction(Graph, Fd), 790 sync_journal(Graph, Fd). 791monitor(reset) :- 792 forall(rdf_graph(Graph), delete_db(Graph)). 793 % TBD: Remove empty directories? 794 795monitor(transaction(BE, Id)) :- 796 monitor_transaction(Id, BE). 797 798monitor_transaction(load_journal(DB), begin(_)) :- 799 !, 800 assert(blocked_db(DB, journal)). 801monitor_transaction(load_journal(DB), end(_)) :- 802 !, 803 retractall(blocked_db(DB, journal)). 804 805monitor_transaction(parse(URI), begin(_)) :- 806 !, 807 ( blocked_db(URI, persistency) 808 -> true 809 ; assert(blocked_db(URI, parse)) 810 ). 811monitor_transaction(parse(URI), end(_)) :- 812 !, 813 ( retract(blocked_db(URI, parse)) 814 -> create_db(URI) 815 ; true 816 ). 817monitor_transaction(unload(DB), begin(_)) :- 818 !, 819 ( blocked_db(DB, persistency) 820 -> true 821 ; assert(blocked_db(DB, unload)) 822 ). 823monitor_transaction(unload(DB), end(_)) :- 824 !, 825 ( retract(blocked_db(DB, unload)) 826 -> delete_db(DB) 827 ; true 828 ). 829monitor_transaction(log(Msg), begin(N)) :- 830 !, 831 check_nested(N), 832 get_time(Time), 833 asserta(transaction_message(N, Time, Msg)). 834monitor_transaction(log(_), end(N)) :- 835 check_nested(N), 836 retract(transaction_message(N, _, _)), 837 !, 838 findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs), 839 end_transactions(DBs, N). 840monitor_transaction(log(Msg, DB), begin(N)) :- 841 !, 842 check_nested(N), 843 get_time(Time), 844 asserta(transaction_message(N, Time, Msg)), 845 journal_fd(DB, Fd), 846 open_transaction(DB, Fd). 847monitor_transaction(log(Msg, _DB), end(N)) :- 848 monitor_transaction(log(Msg), end(N)).
log_nested_transactions(true)
is defined.857check_nested(0) :- !. 858check_nested(_) :- 859 rdf_option(log_nested_transactions(true)).
begin(Id, Level, Time, Message)
term if a transaction
involves DB. Id is an incremental integer, where each database
has its own counter. Level is the nesting level, Time a floating
point timestamp and Message te message provided as argument to
the log message.870open_transaction(DB, Fd) :- 871 transaction_message(N, Time, Msg), 872 !, 873 ( transaction_db(N, DB, _) 874 -> true 875 ; next_transaction_id(DB, Id), 876 assert(transaction_db(N, DB, Id)), 877 RoundedTime is round(Time*100)/100, 878 format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)]) 879 ). 880open_transaction(_,_).
891:- dynamic 892 current_transaction_id/2. 893 894next_transaction_id(DB, Id) :- 895 retract(current_transaction_id(DB, Last)), 896 !, 897 Id is Last + 1, 898 assert(current_transaction_id(DB, Id)). 899next_transaction_id(DB, Id) :- 900 db_files(DB, _, Journal), 901 exists_file(Journal), 902 !, 903 size_file(Journal, Size), 904 open_db(Journal, read, In, []), 905 call_cleanup(iterative_expand(In, Size, Last), close(In)), 906 Id is Last + 1, 907 assert(current_transaction_id(DB, Id)). 908next_transaction_id(DB, 1) :- 909 assert(current_transaction_id(DB, 1)). 910 911iterative_expand(_, 0, 0) :- !. 912iterative_expand(In, Size, Last) :- % Scan growing sections from the end 913 Max is floor(log(Size)/log(2)), 914 between(10, Max, Step), 915 Offset is -(1<<Step), 916 seek(In, Offset, eof, _), 917 skip(In, 10), % records are line-based 918 read(In, T0), 919 last_transaction_id(T0, In, 0, Last), 920 Last > 0, 921 !. 922iterative_expand(In, _, Last) :- % Scan the whole file 923 seek(In, 0, bof, _), 924 read(In, T0), 925 last_transaction_id(T0, In, 0, Last). 926 927last_transaction_id(end_of_file, _, Last, Last) :- !. 928last_transaction_id(end(Id, _, _), In, _, Last) :- 929 read(In, T1), 930 last_transaction_id(T1, In, Id, Last). 931last_transaction_id(_, In, Id, Last) :- 932 read(In, T1), 933 last_transaction_id(T1, In, Id, Last).
In each database, the transaction is ended with a term end(Id,
Nesting, Others)
, where Id and Nesting are the transaction
identifier and nesting (see open_transaction/2) and Others is a
list of DB:Id, indicating other databases affected by the
transaction.
948end_transactions(DBs, N) :- 949 end_transactions(DBs, DBs, N). 950 951end_transactions([], _, _). 952end_transactions([DB:Id|T], DBs, N) :- 953 journal_fd(DB, Fd), 954 once(select(DB:Id, DBs, Others)), 955 format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]), 956 sync_journal(DB, Fd), 957 end_transactions(T, DBs, N).
965sync_loaded_graphs(Graphs) :- 966 maplist(create_db, Graphs). 967 968 969 /******************************* 970 * JOURNAL FILES * 971 *******************************/
max_open_journals
option.
Then the journal is opened in append
mode. Journal files are
always encoded as UTF-8 for portability as well as to ensure
full coverage of Unicode.981journal_fd(DB, Fd) :- 982 source_journal_fd(DB, Fd), 983 !. 984journal_fd(DB, Fd) :- 985 with_mutex(rdf_journal_file, 986 journal_fd_(DB, Out)), 987 Fd = Out. 988 989journal_fd_(DB, Fd) :- 990 source_journal_fd(DB, Fd), 991 !. 992journal_fd_(DB, Fd) :- 993 limit_fd_pool, 994 db_files(DB, _Snapshot, Journal), 995 open_db(Journal, append, Fd, 996 [ close_on_abort(false) 997 ]), 998 time_stamp(Now), 999 format(Fd, '~q.~n', [start([time(Now)])]), 1000 assert(source_journal_fd(DB, Fd)). % new one at the end
1009limit_fd_pool :- 1010 predicate_property(source_journal_fd(_, _), number_of_clauses(N)), 1011 !, 1012 ( rdf_option(max_open_journals(Max)) 1013 -> true 1014 ; Max = 10 1015 ), 1016 Close is N - Max, 1017 forall(between(1, Close, _), 1018 close_oldest_journal). 1019limit_fd_pool. 1020 1021close_oldest_journal :- 1022 source_journal_fd(DB, _Fd), 1023 !, 1024 debug(rdf_persistency, 'Closing old journal for ~q', [DB]), 1025 close_journal(DB). 1026close_oldest_journal.
1035sync_journal(DB, _) :- 1036 transaction_db(_, DB, _), 1037 !. 1038sync_journal(_, Fd) :- 1039 flush_output(Fd).
1045close_journal(DB) :- 1046 with_mutex(rdf_journal_file, 1047 close_journal_(DB)). 1048 1049close_journal_(DB) :- 1050 ( retract(source_journal_fd(DB, Fd)) 1051 -> time_stamp(Now), 1052 format(Fd, '~q.~n', [end([time(Now)])]), 1053 close(Fd, [force(true)]) 1054 ; true 1055 ).
1061close_journals :-
1062 forall(source_journal_fd(DB, _),
1063 catch(close_journal(DB), E,
1064 print_message(error, E))).
1071create_db(Graph) :- 1072 \+ rdf(_,_,_,Graph), 1073 !, 1074 debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]), 1075 delete_db(Graph). 1076create_db(Graph) :- 1077 debug(rdf_persistency, 'Saving Graph ~w', [Graph]), 1078 close_journal(Graph), 1079 db_abs_files(Graph, Snapshot, Journal), 1080 atom_concat(Snapshot, '.new', NewSnapshot), 1081 ( catch(( create_directory_levels(Snapshot), 1082 rdf_save_db(NewSnapshot, Graph) 1083 ), Error, 1084 ( print_message(warning, Error), 1085 fail 1086 )) 1087 -> ( exists_file(Journal) 1088 -> delete_file(Journal) 1089 ; true 1090 ), 1091 rename_file(NewSnapshot, Snapshot), 1092 debug(rdf_persistency, 'Saved Graph ~w', [Graph]) 1093 ; catch(delete_file(NewSnapshot), _, true) 1094 ).
1101delete_db(DB) :- 1102 with_mutex(rdf_journal_file, 1103 delete_db_(DB)). 1104 1105delete_db_(DB) :- 1106 close_journal_(DB), 1107 db_abs_files(DB, Snapshot, Journal), 1108 !, 1109 ( exists_file(Journal) 1110 -> delete_file(Journal) 1111 ; true 1112 ), 1113 ( exists_file(Snapshot) 1114 -> delete_file(Snapshot) 1115 ; true 1116 ). 1117delete_db_(_). 1118 1119 /******************************* 1120 * LOCKING * 1121 *******************************/
1127lock_db(Dir) :- 1128 lockfile(Dir, File), 1129 catch(open(File, update, Out, [lock(write), wait(false)]), 1130 error(permission_error(Access, _, _), _), 1131 locked_error(Access, Dir)), 1132 ( current_prolog_flag(pid, PID) 1133 -> true 1134 ; PID = 0 % TBD: Fix in Prolog 1135 ), 1136 time_stamp(Now), 1137 gethostname(Host), 1138 format(Out, '/* RDF Database is in use */~n~n', []), 1139 format(Out, '~q.~n', [ locked([ time(Now), 1140 pid(PID), 1141 host(Host) 1142 ]) 1143 ]), 1144 flush_output(Out), 1145 set_end_of_stream(Out), 1146 assert(rdf_lock(Dir, lock(Out, File))), 1147 at_halt(unlock_db(Dir)). 1148 1149locked_error(lock, Dir) :- 1150 lockfile(Dir, File), 1151 ( catch(read_file_to_terms(File, Terms, []), _, fail), 1152 Terms = [locked(Args)] 1153 -> Context = rdf_locked(Args) 1154 ; Context = context(_, 'Database is in use') 1155 ), 1156 throw(error(permission_error(lock, rdf_db, Dir), Context)). 1157locked_error(open, Dir) :- 1158 throw(error(permission_error(lock, rdf_db, Dir), 1159 context(_, 'Lock file cannot be opened'))).
1164unlock_db(Dir) :- 1165 retract(rdf_lock(Dir, lock(Out, File))), 1166 !, 1167 unlock_db(Out, File). 1168unlock_db(_). 1169 1170unlock_db(Out, File) :- 1171 close(Out), 1172 delete_file(File). 1173 1174 /******************************* 1175 * FILENAMES * 1176 *******************************/ 1177 1178lockfile(Dir, LockFile) :- 1179 atomic_list_concat([Dir, /, lock], LockFile). 1180 1181directory_levels(Levels) :- 1182 rdf_option(directory_levels(Levels)), 1183 !. 1184directory_levels(2). 1185 1186db_file(Base, File) :- 1187 rdf_directory(Dir), 1188 directory_levels(Levels), 1189 db_file(Dir, Base, Levels, File). 1190 1191db_file(Dir, Base, Levels, File) :- 1192 dir_levels(Base, Levels, Segments, [Base]), 1193 atomic_list_concat([Dir|Segments], /, File). 1194 1195open_db(Base, Mode, Stream, Options) :- 1196 db_file(Base, File), 1197 create_directory_levels(File), 1198 open(File, Mode, Stream, [encoding(utf8)|Options]). 1199 1200create_directory_levels(_File) :- 1201 rdf_option(directory_levels(0)), 1202 !. 1203create_directory_levels(File) :- 1204 file_directory_name(File, Dir), 1205 make_directory_path(Dir). 1206 1207exists_db(Base) :- 1208 db_file(Base, File), 1209 exists_file(File).
1216dir_levels(_, 0, Segments, Segments) :- !. 1217dir_levels(File, Levels, Segments, Tail) :- 1218 rdf_atom_md5(File, 1, Hash), 1219 create_dir_levels(Levels, 0, Hash, Segments, Tail). 1220 1221create_dir_levels(0, _, _, Segments, Segments) :- !. 1222create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :- 1223 sub_atom(Hash, S, 2, _, S1), 1224 S2 is S+2, 1225 N2 is N-1, 1226 create_dir_levels(N2, S2, Hash, Segments0, Tail).
1237db_files(DB, Snapshot, Journal) :- 1238 nonvar(DB), 1239 !, 1240 rdf_db_to_file(DB, Base), 1241 atom_concat(Base, '.trp', Snapshot), 1242 atom_concat(Base, '.jrn', Journal). 1243db_files(DB, Snapshot, Journal) :- 1244 nonvar(Snapshot), 1245 !, 1246 atom_concat(Base, '.trp', Snapshot), 1247 atom_concat(Base, '.jrn', Journal), 1248 rdf_db_to_file(DB, Base). 1249db_files(DB, Snapshot, Journal) :- 1250 nonvar(Journal), 1251 !, 1252 atom_concat(Base, '.jrn', Journal), 1253 atom_concat(Base, '.trp', Snapshot), 1254 rdf_db_to_file(DB, Base). 1255 1256db_abs_files(DB, Snapshot, Journal) :- 1257 db_files(DB, Snapshot0, Journal0), 1258 db_file(Snapshot0, Snapshot), 1259 db_file(Journal0, Journal).
1267rdf_journal_file(Graph, Journal) :-
1268 ( var(Graph)
1269 -> rdf_graph(Graph)
1270 ; true
1271 ),
1272 db_abs_files(Graph, _Snapshot, Journal),
1273 exists_file(Journal).
1281rdf_snapshot_file(Graph, Snapshot) :-
1282 ( var(Graph)
1283 -> rdf_graph(Graph) % also pick the empty graphs
1284 ; true
1285 ),
1286 db_abs_files(Graph, Snapshot, _Journal),
1287 exists_file(Snapshot).
1299rdf_db_to_file(DB, File) :- 1300 file_base_db(File, DB), 1301 !. 1302rdf_db_to_file(DB, File) :- 1303 url_to_filename(DB, File), 1304 assert(file_base_db(File, DB)).
1317url_to_filename(URL, FileName) :- 1318 atomic(URL), 1319 !, 1320 atom_codes(URL, Codes), 1321 phrase(url_encode(EncCodes), Codes), 1322 atom_codes(FileName, EncCodes). 1323url_to_filename(URL, FileName) :- 1324 uri_encoded(path, URL, FileName). 1325 1326url_encode([0'+|T]) --> 1327 " ", 1328 !, 1329 url_encode(T). 1330url_encode([C|T]) --> 1331 alphanum(C), 1332 !, 1333 url_encode(T). 1334url_encode([C|T]) --> 1335 no_enc_extra(C), 1336 !, 1337 url_encode(T). 1338url_encode(Enc) --> 1339 ( "\r\n" 1340 ; "\n" 1341 ), 1342 !, 1343 { string_codes("%0D%0A", Codes), 1344 append(Codes, T, Enc) 1345 }, 1346 url_encode(T). 1347url_encode([]) --> 1348 eos, 1349 !. 1350url_encode([0'%,D1,D2|T]) --> 1351 [C], 1352 { Dv1 is (C>>4 /\ 0xf), 1353 Dv2 is (C /\ 0xf), 1354 code_type(D1, xdigit(Dv1)), 1355 code_type(D2, xdigit(Dv2)) 1356 }, 1357 url_encode(T). 1358 1359eos([], []). 1360 1361alphanum(C) --> 1362 [C], 1363 { C < 128, % US-ASCII 1364 code_type(C, alnum) 1365 }. 1366 1367no_enc_extra(0'_) --> "_". 1368 1369 1370 /******************************* 1371 * REINDEX * 1372 *******************************/
1378reindex_db(Dir, Levels) :- 1379 directory_files(Dir, Files), 1380 reindex_files(Files, Dir, '.', 0, Levels), 1381 remove_empty_directories(Files, Dir). 1382 1383reindex_files([], _, _, _, _). 1384reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :- 1385 nofollow(Nofollow), 1386 !, 1387 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1388reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :- 1389 CLevel \== Levels, 1390 file_name_extension(_Base, Ext, File), 1391 db_extension(Ext), 1392 !, 1393 directory_file_path(Prefix, File, DBFile), 1394 directory_file_path(Dir, DBFile, OldPath), 1395 db_file(Dir, File, Levels, NewPath), 1396 debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]), 1397 file_directory_name(NewPath, NewDir), 1398 make_directory_path(NewDir), 1399 rename_file(OldPath, NewPath), 1400 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1401reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :- 1402 directory_file_path(Prefix, D, SubD), 1403 directory_file_path(Dir, SubD, AbsD), 1404 exists_directory(AbsD), 1405 \+ read_link(AbsD, _, _), % Do not follow links 1406 !, 1407 directory_files(AbsD, SubFiles), 1408 CLevel2 is CLevel + 1, 1409 reindex_files(SubFiles, Dir, SubD, CLevel2, Levels), 1410 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1411reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :- 1412 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1413 1414 1415remove_empty_directories([], _). 1416remove_empty_directories([File|Files], Dir) :- 1417 \+ nofollow(File), 1418 directory_file_path(Dir, File, Path), 1419 exists_directory(Path), 1420 \+ read_link(Path, _, _), 1421 !, 1422 directory_files(Path, Content), 1423 exclude(nofollow, Content, RealContent), 1424 ( RealContent == [] 1425 -> debug(rdf_persistency, 'Remove empty dir ~q', [Path]), 1426 delete_directory(Path) 1427 ; remove_empty_directories(RealContent, Path) 1428 ), 1429 remove_empty_directories(Files, Dir). 1430remove_empty_directories([_|Files], Dir) :- 1431 remove_empty_directories(Files, Dir). 1432 1433 1434 /******************************* 1435 * PREFIXES * 1436 *******************************/ 1437 1438save_prefixes(Dir) :- 1439 atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile), 1440 setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]), 1441 write_prefixes(Out), 1442 close(Out)). 1443 1444write_prefixes(Out) :- 1445 format(Out, '% Snapshot of defined RDF prefixes~n~n', []), 1446 forall(rdf_current_ns(Alias, URI), 1447 format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
1457load_prefixes(Dir) :- 1458 atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile), 1459 ( exists_file(PrefixFile) 1460 -> setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]), 1461 read_prefixes(In), 1462 close(In)) 1463 ; true 1464 ). 1465 1466read_prefixes(Stream) :- 1467 read_term(Stream, T0, []), 1468 read_prefixes(T0, Stream). 1469 1470read_prefixes(end_of_file, _) :- !. 1471read_prefixes(prefix(Alias, URI), Stream) :- 1472 !, 1473 must_be(atom, Alias), 1474 must_be(atom, URI), 1475 catch(rdf_register_ns(Alias, URI, []), E, 1476 print_message(warning, E)), 1477 read_term(Stream, T, []), 1478 read_prefixes(T, Stream). 1479read_prefixes(Term, _) :- 1480 domain_error(prefix_term, Term). 1481 1482 1483 /******************************* 1484 * UTIL * 1485 *******************************/
1491mkdir(Directory) :- 1492 exists_directory(Directory), 1493 !. 1494mkdir(Directory) :- 1495 make_directory(Directory).
1501time_stamp(Int) :- 1502 get_time(Now), 1503 Int is round(Now). 1504 1505 1506 /******************************* 1507 * MESSAGES * 1508 *******************************/ 1509 1510:- multifile 1511 prolog:message/3, 1512 prolog:message_context/3. 1513 1514prologmessage(rdf(Term)) --> 1515 message(Term). 1516 1517message(restoring(Type, Count, Jobs)) --> 1518 [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ]. 1519message(restore(attached(Graphs, Triples, Time/Wall))) --> 1520 { catch(Percent is round(100*Time/Wall), _, Percent = 0) }, 1521 [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'- 1522 [Graphs, Triples, Wall, Percent, Time] ]. 1523% attach_graph/2 1524message(restore(true, Action)) --> 1525 !, 1526 silent_message(Action). 1527message(restore(brief, Action)) --> 1528 !, 1529 brief_message(Action). 1530message(restore(_, Graph)) --> 1531 [ 'Restoring ~p ... '-[Graph], flush ]. 1532message(restore(_, snapshot(_))) --> 1533 [ at_same_line, '(snapshot) '-[], flush ]. 1534message(restore(_, journal(_))) --> 1535 [ at_same_line, '(journal) '-[], flush ]. 1536message(restore(_, done(_, Time, Count))) --> 1537 [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ]. 1538% load_source/4 1539message(restore(_, snapshot(G, _))) --> 1540 [ 'Restoring ~p\t(snapshot)'-[G], flush ]. 1541message(restore(_, journal(G, _))) --> 1542 [ 'Restoring ~p\t(journal)'-[G], flush ]. 1543message(restore(_, done(_, Time, Count))) --> 1544 [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ]. 1545% journal handling 1546message(update_failed(S,P,O,Action)) --> 1547 [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ]. 1548% directory reindexing 1549message(reindex(Count, Depth)) --> 1550 [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ]. 1551message(reindex(Depth)) --> 1552 [ 'Fixing database directory structure (~d levels)'-[Depth] ]. 1553message(read_only) --> 1554 [ 'Cannot write persistent store; continuing in read-only mode.', nl, 1555 'All changes to the RDF store will be lost if this process terminates.' 1556 ]. 1557 1558silent_message(_Action) --> []. 1559 1560brief_message(done(Graph, _Time, _Count, Nth, Total)) --> 1561 { file_base_name(Graph, Base) }, 1562 [ at_same_line, 1563 '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total], 1564 flush 1565 ]. 1566brief_message(_) --> []. 1567 1568 1569prologmessage_context(rdf_locked(Args)) --> 1570 { memberchk(time(Time), Args), 1571 memberchk(pid(Pid), Args), 1572 format_time(string(S), '%+', Time) 1573 }, 1574 [ nl, 1575 'locked at ~s by process id ~w'-[S,Pid] 1576 ]
RDF persistency plugin
This module provides persistency for
rdf_db.pl
based on the rdf_monitor/2 predicate to track changes to the repository. Where previous versions used autosafe of the whole database using the quick-load format of rdf_db, this version is based on a quick-load file per source (4th argument of rdf/4), and journalling for edit operations.The result is safe, avoids frequent small changes to large files which makes synchronisation and backup expensive and avoids long disruption of the server doing the autosafe. Only loading large files disrupts service for some time.
The persistent backup of the database is realised in a directory, using a lock file to avoid corruption due to concurrent access. Each source is represented by two files, the latest snapshot and a journal. The state is restored by loading the snapshot and replaying the journal. The predicate rdf_flush_journals/1 can be used to create fresh snapshots and delete the journals.
rdf_edit.pl
*/