. (utf8) 2/* Part of SWI-Prolog 3 4 Author: Torbjörn Lager and Jan Wielemaker 5 E-mail: J.Wielemaker@vu.nl 6 WWW: http://www.swi-prolog.org 7 Copyright (C): 2014-2020, Torbjörn Lager, 8 VU University Amsterdam 9 All rights reserved. 10 11 Redistribution and use in source and binary forms, with or without 12 modification, are permitted provided that the following conditions 13 are met: 14 15 1. Redistributions of source code must retain the above copyright 16 notice, this list of conditions and the following disclaimer. 17 18 2. Redistributions in binary form must reproduce the above copyright 19 notice, this list of conditions and the following disclaimer in 20 the documentation and/or other materials provided with the 21 distribution. 22 23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 26 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 27 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 28 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 29 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 30 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 31 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 32 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 33 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 POSSIBILITY OF SUCH DAMAGE. 35*/ 36 37:- module(pengines, 38 [ pengine_create/1, % +Options 39 pengine_ask/3, % +Pengine, :Query, +Options 40 pengine_next/2, % +Pengine. +Options 41 pengine_stop/2, % +Pengine. +Options 42 pengine_event/2, % -Event, +Options 43 pengine_input/2, % +Prompt, -Term 44 pengine_output/1, % +Term 45 pengine_respond/3, % +Pengine, +Input, +Options 46 pengine_debug/2, % +Format, +Args 47 pengine_self/1, % -Pengine 48 pengine_pull_response/2, % +Pengine, +Options 49 pengine_destroy/1, % +Pengine 50 pengine_destroy/2, % +Pengine, +Options 51 pengine_abort/1, % +Pengine 52 pengine_application/1, % +Application 53 current_pengine_application/1, % ?Application 54 pengine_property/2, % ?Pengine, ?Property 55 pengine_user/1, % -User 56 pengine_event_loop/2, % :Closure, +Options 57 pengine_rpc/2, % +Server, :Goal 58 pengine_rpc/3 % +Server, :Goal, +Options 59 ]).
70:- autoload(library(aggregate),[aggregate_all/3]). 71:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]). 72:- autoload(library(broadcast),[broadcast/1]). 73:- autoload(library(charsio),[open_chars_stream/2]). 74:- autoload(library(debug),[debug/1,debugging/1,debug/3,assertion/1]). 75:- autoload(library(error), 76 [ must_be/2, 77 existence_error/2, 78 permission_error/3, 79 domain_error/2 80 ]). 81:- autoload(library(filesex),[directory_file_path/3]). 82:- autoload(library(listing),[listing/1]). 83:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]). 84:- autoload(library(modules),[in_temporary_module/3]). 85:- autoload(library(occurs),[sub_term/2]). 86:- autoload(library(option), 87 [select_option/3,option/2,option/3,select_option/4]). 88:- autoload(library(prolog_stack),[print_prolog_backtrace/2]). 89:- autoload(library(sandbox),[safe_goal/1]). 90:- autoload(library(statistics),[thread_statistics/2]). 91:- autoload(library(term_to_json),[term_to_json/2]). 92:- autoload(library(thread_pool), 93 [thread_pool_create/3,thread_create_in_pool/4]). 94:- autoload(library(time),[alarm/4,call_with_time_limit/2]). 95:- autoload(library(uri), 96 [ uri_components/2, 97 uri_query_components/2, 98 uri_data/3, 99 uri_data/4, 100 uri_encoded/3 101 ]). 102:- autoload(library(http/http_client),[http_read_data/3]). 103:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]). 104:- autoload(library(http/http_dispatch), 105 [http_handler/3,http_404/2,http_reply_file/3]). 106:- autoload(library(http/http_open),[http_open/3]). 107:- autoload(library(http/http_parameters),[http_parameters/2]). 108:- autoload(library(http/http_stream),[is_cgi_stream/1]). 109:- autoload(library(http/http_wrapper),[http_peer/2]). 110 111:- use_module(library(settings),[setting/2,setting/4]). 112:- use_module(library(http/http_json), 113 [http_read_json_dict/2,reply_json/1]). 114 115:- if(exists_source(library(uuid))). 116:- autoload(library(uuid), [uuid/2]). 117:- endif. 118 119 120:- meta_predicate 121 pengine_create( ), 122 pengine_rpc( , , ), 123 pengine_event_loop( , ). 124 125:- multifile 126 write_result/3, % +Format, +Event, +Dict 127 event_to_json/3, % +Event, -JSON, +Format 128 prepare_module/3, % +Module, +Application, +Options 129 prepare_goal/3, % +GoalIn, -GoalOut, +Options 130 authentication_hook/3, % +Request, +Application, -User 131 not_sandboxed/2. % +User, +App 132 133:- predicate_options(pengine_create/1, 1, 134 [ id(-atom), 135 alias(atom), 136 application(atom), 137 destroy(boolean), 138 server(atom), 139 ask(compound), 140 template(compound), 141 chunk(integer), 142 bindings(list), 143 src_list(list), 144 src_text(any), % text 145 src_url(atom), 146 src_predicates(list) 147 ]). 148:- predicate_options(pengine_ask/3, 3, 149 [ template(any), 150 chunk(integer), 151 bindings(list) 152 ]). 153:- predicate_options(pengine_next/2, 2, 154 [ chunk(integer), 155 pass_to(pengine_send/3, 3) 156 ]). 157:- predicate_options(pengine_stop/2, 2, 158 [ pass_to(pengine_send/3, 3) 159 ]). 160:- predicate_options(pengine_respond/3, 2, 161 [ pass_to(pengine_send/3, 3) 162 ]). 163:- predicate_options(pengine_rpc/3, 3, 164 [ chunk(integer), 165 pass_to(pengine_create/1, 1) 166 ]). 167:- predicate_options(pengine_send/3, 3, 168 [ delay(number) 169 ]). 170:- predicate_options(pengine_event/2, 2, 171 [ pass_to(thread_get_message/3, 3) 172 ]). 173:- predicate_options(pengine_pull_response/2, 2, 174 [ pass_to(http_open/3, 3) 175 ]). 176:- predicate_options(pengine_event_loop/2, 2, 177 []). % not yet implemented 178 179% :- debug(pengine(transition)). 180:- debug(pengine(debug)). % handle pengine_debug in pengine_rpc/3. 181 182goal_expansion(random_delay, Expanded) :- 183 ( debugging(pengine(delay)) 184 -> Expanded = do_random_delay 185 ; Expanded = true 186 ). 187 188do_random_delay :- 189 Delay is random(20)/1000, 190 sleep(Delay). 191 192:- meta_predicate % internal meta predicates 193 solve( , , , ), 194 findnsols_no_empty( , , , ), 195 pengine_event_loop( , , ).
Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..
Successful creation of a pengine will return an event term of the following form:
An error will be returned if the pengine could not be created:
250pengine_create(M:Options0) :-
251 translate_local_sources(Options0, Options, M),
252 ( select_option(server(BaseURL), Options, RestOptions)
253 -> remote_pengine_create(BaseURL, RestOptions)
254 ; local_pengine_create(Options)
255 ).
src_predicates
and src_list
options into
src_text
. We need to do that anyway for remote pengines. For
local pengines, we could avoid this step, but there is very
little point in transferring source to a local pengine anyway as
local pengines can access any Prolog predicate that you make
visible to the application.
Multiple sources are concatenated to end up with a single src_text option.
269translate_local_sources(OptionsIn, Options, Module) :- 270 translate_local_sources(OptionsIn, Sources, Options2, Module), 271 ( Sources == [] 272 -> Options = Options2 273 ; Sources = [Source] 274 -> Options = [src_text(Source)|Options2] 275 ; atomics_to_string(Sources, Source) 276 -> Options = [src_text(Source)|Options2] 277 ). 278 279translate_local_sources([], [], [], _). 280translate_local_sources([H0|T], [S0|S], Options, M) :- 281 nonvar(H0), 282 translate_local_source(H0, S0, M), 283 !, 284 translate_local_sources(T, S, Options, M). 285translate_local_sources([H|T0], S, [H|T], M) :- 286 translate_local_sources(T0, S, T, M). 287 288translate_local_source(src_predicates(PIs), Source, M) :- 289 must_be(list, PIs), 290 with_output_to(string(Source), 291 maplist(list_in_module(M), PIs)). 292translate_local_source(src_list(Terms), Source, _) :- 293 must_be(list, Terms), 294 with_output_to(string(Source), 295 forall(member(Term, Terms), 296 format('~k .~n', [Term]))). 297translate_local_source(src_text(Source), Source, _). 298 299list_in_module(M, PI) :- 300 listing(M:PI).
pengine_send(NameOrID, Term, [])
.
*/
307pengine_send(Target, Event) :-
308 pengine_send(Target, Event, []).
Any remaining options are passed to http_open/3. */
323pengine_send(Target, Event, Options) :- 324 must_be(atom, Target), 325 pengine_send2(Target, Event, Options). 326 327pengine_send2(self, Event, Options) :- 328 !, 329 thread_self(Queue), 330 delay_message(queue(Queue), Event, Options). 331pengine_send2(Name, Event, Options) :- 332 child(Name, Target), 333 !, 334 delay_message(pengine(Target), Event, Options). 335pengine_send2(Target, Event, Options) :- 336 delay_message(pengine(Target), Event, Options). 337 338delay_message(Target, Event, Options) :- 339 option(delay(Delay), Options), 340 !, 341 alarm(Delay, 342 send_message(Target, Event, Options), 343 _AlarmID, 344 [remove(true)]). 345delay_message(Target, Event, Options) :- 346 random_delay, 347 send_message(Target, Event, Options). 348 349send_message(queue(Queue), Event, _) :- 350 thread_send_message(Queue, pengine_request(Event)). 351send_message(pengine(Pengine), Event, Options) :- 352 ( pengine_remote(Pengine, Server) 353 -> remote_pengine_send(Server, Pengine, Event, Options) 354 ; pengine_thread(Pengine, Thread) 355 -> thread_send_message(Thread, pengine_request(Event)) 356 ; existence_error(pengine, Pengine) 357 ).
364pengine_request(Request) :-
365 pengine_self(Self),
366 get_pengine_application(Self, Application),
367 setting(Application:idle_limit, IdleLimit),
368 thread_self(Me),
369 ( thread_get_message(Me, pengine_request(Request), [timeout(IdleLimit)])
370 -> true
371 ; Request = destroy
372 ).
If the message cannot be sent within the idle_limit
setting of
the pengine, abort the pengine.
385pengine_reply(Event) :- 386 pengine_parent(Queue), 387 pengine_reply(Queue, Event). 388 389pengine_reply(_Queue, _Event0) :- 390 nb_current(pengine_idle_limit_exceeded, true), 391 !. 392pengine_reply(Queue, Event0) :- 393 arg(1, Event0, ID), 394 wrap_first_answer(ID, Event0, Event), 395 random_delay, 396 debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]), 397 ( pengine_self(ID), 398 \+ pengine_detached(ID, _) 399 -> get_pengine_application(ID, Application), 400 setting(Application:idle_limit, IdleLimit), 401 debug(pengine(reply), 'Sending ~p, timout: ~q', [Event, IdleLimit]), 402 ( thread_send_message(Queue, pengine_event(ID, Event), 403 [ timeout(IdleLimit) 404 ]) 405 -> true 406 ; thread_self(Me), 407 debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)', 408 [ID, Me]), 409 nb_setval(pengine_idle_limit_exceeded, true), 410 thread_detach(Me), 411 abort 412 ) 413 ; thread_send_message(Queue, pengine_event(ID, Event)) 414 ). 415 416wrap_first_answer(ID, Event0, CreateEvent) :- 417 wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]), 418 arg(1, CreateEvent, ID), 419 !, 420 retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])). 421wrap_first_answer(_ID, Event, Event). 422 423 424empty_queue :- 425 pengine_parent(Queue), 426 empty_queue(Queue, 0, Discarded), 427 debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]). 428 429empty_queue(Queue, C0, C) :- 430 thread_get_message(Queue, _Term, [timeout(0)]), 431 !, 432 C1 is C0+1, 433 empty_queue(Queue, C1, C). 434empty_queue(_, C, C).
Options is a list of options:
Name = Var
terms, providing access to the actual variable
names.Any remaining options are passed to pengine_send/3.
Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.
true
or false
, indicating whether we can expect the
pengine to be able to return more solutions or not, would we call
pengine_next/2.Defined in terms of pengine_send/3, like so:
pengine_ask(ID, Query, Options) :- partition(pengine_ask_option, Options, AskOptions, SendOptions), pengine_send(ID, ask(Query, AskOptions), SendOptions).
*/
499pengine_ask(ID, Query, Options) :- 500 partition(pengine_ask_option, Options, AskOptions, SendOptions), 501 pengine_send(ID, ask(Query, AskOptions), SendOptions). 502 503 504pengine_ask_option(template(_)). 505pengine_ask_option(chunk(_)). 506pengine_ask_option(bindings(_)). 507pengine_ask_option(breakpoints(_)).
Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.
Defined in terms of pengine_send/3, as follows:
pengine_next(ID, Options) :- pengine_send(ID, next, Options).
*/
551pengine_next(ID, Options) :- 552 select_option(chunk(Count), Options, Options1), 553 !, 554 pengine_send(ID, next(Count), Options1). 555pengine_next(ID, Options) :- 556 pengine_send(ID, next, Options).
Defined in terms of pengine_send/3, like so:
pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
*/
572pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
583pengine_abort(Name) :-
584 ( child(Name, Pengine)
585 -> true
586 ; Pengine = Name
587 ),
588 ( pengine_remote(Pengine, Server)
589 -> remote_pengine_abort(Server, Pengine, [])
590 ; pengine_thread(Pengine, Thread),
591 debug(pengine(abort), 'Signalling thread ~p', [Thread]),
592 catch(thread_signal(Thread, throw(abort_query)), _, true)
593 ).
force(true)
, the pengine
is killed using abort/0 and pengine_destroy/2 succeeds.
*/603pengine_destroy(ID) :- 604 pengine_destroy(ID, []). 605 606pengine_destroy(Name, Options) :- 607 ( child(Name, ID) 608 -> true 609 ; ID = Name 610 ), 611 option(force(true), Options), 612 !, 613 ( pengine_thread(ID, Thread) 614 -> catch(thread_signal(Thread, abort), 615 error(existence_error(thread, _), _), true) 616 ; true 617 ). 618pengine_destroy(ID, _) :- 619 catch(pengine_send(ID, destroy), 620 error(existence_error(pengine, ID), _), 621 retractall(child(_,ID))). 622 623 624/*================= pengines administration ======================= 625*/
thread(ThreadId)
remote(URL)
636:- dynamic 637 current_pengine/6, % Id, ParentId, Thread, URL, App, Destroy 638 pengine_queue/4, % Id, Queue, TimeOut, Time 639 output_queue/3, % Id, Queue, Time 640 pengine_user/2, % Id, User 641 pengine_data/2, % Id, Data 642 pengine_detached/2. % Id, Data 643:- volatile 644 current_pengine/6, 645 pengine_queue/4, 646 output_queue/3, 647 pengine_user/2, 648 pengine_data/2, 649 pengine_detached/2. 650 651:- thread_local 652 child/2. % ?Name, ?Child
658pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :- 659 asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)). 660 661pengine_register_remote(Id, URL, Application, Destroy) :- 662 thread_self(Queue), 663 asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
http
and the queue is the
message queue used to send events to the HTTP workers.671pengine_unregister(Id) :- 672 thread_self(Me), 673 ( current_pengine(Id, Queue, Me, http, _, _) 674 -> with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue)) 675 ; true 676 ), 677 retractall(current_pengine(Id, _, Me, _, _, _)), 678 retractall(pengine_user(Id, _)), 679 retractall(pengine_data(Id, _)). 680 681pengine_unregister_remote(Id) :- 682 retractall(current_pengine(Id, _Parent, 0, _, _, _)).
688pengine_self(Id) :- 689 thread_self(Thread), 690 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy). 691 692pengine_parent(Parent) :- 693 nb_getval(pengine_parent, Parent). 694 695pengine_thread(Pengine, Thread) :- 696 current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy), 697 Thread \== 0, 698 !. 699 700pengine_remote(Pengine, URL) :- 701 current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy). 702 703get_pengine_application(Pengine, Application) :- 704 current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy), 705 !. 706 707get_pengine_module(Pengine, Pengine). 708 709:- if(current_predicate(uuid/2)). 710pengine_uuid(Id) :- 711 uuid(Id, [version(4)]). % Version 4 is random. 712:- else. 713pengine_uuid(Id) :- 714 ( current_prolog_flag(max_integer, Max1) 715 -> Max is Max1-1 716 ; Max is 1<<128 717 ), 718 random_between(0, Max, Num), 719 atom_number(Id, Num). 720:- endif.
This also runs Goal if the Pengine no longer exists. This deals with Pengines terminated through destroy_or_continue/1.
737:- meta_predicate protect_pengine( , ). 738 739protect_pengine(Id, Goal) :- 740 term_hash(Id, Hash), 741 LockN is Hash mod 64, 742 atom_concat(pengine_done_, LockN, Lock), 743 with_mutex(Lock, 744 ( pengine_thread(Id, _) 745 -> Goal 746 ; Goal 747 )).
pengine_sandbox
. The example below creates a new application
address_book
and imports the API defined in the module file
adress_book_api.pl
into the application.
:- pengine_application(address_book). :- use_module(address_book:adress_book_api).
*/
764pengine_application(Application) :- 765 throw(error(context_error(nodirective, 766 pengine_application(Application)), _)). 767 768:- multifile 769 system:term_expansion/2, 770 current_application/1.
778current_pengine_application(Application) :- 779 current_application(Application). 780 781 782% Default settings for all applications 783 784:- setting(thread_pool_size, integer, 100, 785 'Maximum number of pengines this application can run.'). 786:- setting(thread_pool_stacks, list(compound), [], 787 'Maximum stack sizes for pengines this application can run.'). 788:- setting(slave_limit, integer, 3, 789 'Maximum number of slave pengines a master pengine can create.'). 790:- setting(time_limit, number, 300, 791 'Maximum time to wait for output'). 792:- setting(idle_limit, number, 300, 793 'Pengine auto-destroys when idle for this time'). 794:- setting(safe_goal_limit, number, 10, 795 'Maximum time to try proving safety of the goal'). 796:- setting(program_space, integer, 100_000_000, 797 'Maximum memory used by predicates'). 798:- setting(allow_from, list(atom), [*], 799 'IP addresses from which remotes are allowed to connect'). 800:- setting(deny_from, list(atom), [], 801 'IP addresses from which remotes are NOT allowed to connect'). 802:- setting(debug_info, boolean, false, 803 'Keep information to support source-level debugging'). 804 805 806systemterm_expansion((:- pengine_application(Application)), Expanded) :- 807 must_be(atom, Application), 808 ( module_property(Application, file(_)) 809 -> permission_error(create, pengine_application, Application) 810 ; true 811 ), 812 expand_term((:- setting(Application:thread_pool_size, integer, 813 setting(pengines:thread_pool_size), 814 'Maximum number of pengines this \c 815 application can run.')), 816 ThreadPoolSizeSetting), 817 expand_term((:- setting(Application:thread_pool_stacks, list(compound), 818 setting(pengines:thread_pool_stacks), 819 'Maximum stack sizes for pengines \c 820 this application can run.')), 821 ThreadPoolStacksSetting), 822 expand_term((:- setting(Application:slave_limit, integer, 823 setting(pengines:slave_limit), 824 'Maximum number of local slave pengines \c 825 a master pengine can create.')), 826 SlaveLimitSetting), 827 expand_term((:- setting(Application:time_limit, number, 828 setting(pengines:time_limit), 829 'Maximum time to wait for output')), 830 TimeLimitSetting), 831 expand_term((:- setting(Application:idle_limit, number, 832 setting(pengines:idle_limit), 833 'Pengine auto-destroys when idle for this time')), 834 IdleLimitSetting), 835 expand_term((:- setting(Application:safe_goal_limit, number, 836 setting(pengines:safe_goal_limit), 837 'Maximum time to try proving safety of the goal')), 838 SafeGoalLimitSetting), 839 expand_term((:- setting(Application:program_space, integer, 840 setting(pengines:program_space), 841 'Maximum memory used by predicates')), 842 ProgramSpaceSetting), 843 expand_term((:- setting(Application:allow_from, list(atom), 844 setting(pengines:allow_from), 845 'IP addresses from which remotes are allowed \c 846 to connect')), 847 AllowFromSetting), 848 expand_term((:- setting(Application:deny_from, list(atom), 849 setting(pengines:deny_from), 850 'IP addresses from which remotes are NOT \c 851 allowed to connect')), 852 DenyFromSetting), 853 expand_term((:- setting(Application:debug_info, boolean, 854 setting(pengines:debug_info), 855 'Keep information to support source-level \c 856 debugging')), 857 DebugInfoSetting), 858 flatten([ pengines:current_application(Application), 859 ThreadPoolSizeSetting, 860 ThreadPoolStacksSetting, 861 SlaveLimitSetting, 862 TimeLimitSetting, 863 IdleLimitSetting, 864 SafeGoalLimitSetting, 865 ProgramSpaceSetting, 866 AllowFromSetting, 867 DenyFromSetting, 868 DebugInfoSetting 869 ], Expanded). 870 871% Register default application 872 873:- pengine_application(pengine_sandbox).
alias
option when creating the pengine.true
if the pengines is destroyed automatically
after completing the query.debug_info
is present.910pengine_property(Id, Prop) :- 911 nonvar(Id), nonvar(Prop), 912 pengine_property2(Id, Prop), 913 !. 914pengine_property(Id, Prop) :- 915 pengine_property2(Prop, Id). 916 917pengine_property2(self(Id), Id) :- 918 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 919pengine_property2(module(Id), Id) :- 920 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 921pengine_property2(alias(Alias), Id) :- 922 child(Alias, Id), 923 Alias \== Id. 924pengine_property2(thread(Thread), Id) :- 925 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy), 926 Thread \== 0. 927pengine_property2(remote(Server), Id) :- 928 current_pengine(Id, _Parent, 0, Server, _Application, _Destroy). 929pengine_property2(application(Application), Id) :- 930 current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy). 931pengine_property2(destroy(Destroy), Id) :- 932 current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy). 933pengine_property2(parent(Parent), Id) :- 934 current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy). 935pengine_property2(source(SourceID, Source), Id) :- 936 pengine_data(Id, source(SourceID, Source)). 937pengine_property2(detached(When), Id) :- 938 pengine_detached(Id, When).
945pengine_output(Term) :-
946 pengine_self(Me),
947 pengine_reply(output(Me, Term)).
console.log(Message)
if there is a console. The predicate
pengine_rpc/3 calls debug(pengine(debug), '~w', [Message])
. The debug
topic pengine(debug)
is enabled by default.
962pengine_debug(Format, Args) :- 963 pengine_parent(Queue), 964 pengine_self(Self), 965 catch(safe_goal(format(atom(_), Format, Args)), E, true), 966 ( var(E) 967 -> format(atom(Message), Format, Args) 968 ; message_to_string(E, Message) 969 ), 970 pengine_reply(Queue, debug(Self, Message)). 971 972 973/*================= Local pengine ======================= 974*/
985local_pengine_create(Options) :-
986 thread_self(Self),
987 option(application(Application), Options, pengine_sandbox),
988 create(Self, Child, Options, local, Application),
989 option(alias(Name), Options, Child),
990 assert(child(Name, Child)).
997:- multifile thread_pool:create_pool/1. 998 999thread_poolcreate_pool(Application) :- 1000 current_application(Application), 1001 setting(Application:thread_pool_size, Size), 1002 setting(Application:thread_pool_stacks, Stacks), 1003 thread_pool_create(Application, Size, Stacks).
1013create(Queue, Child, Options, local, Application) :- 1014 !, 1015 pengine_child_id(Child), 1016 create0(Queue, Child, Options, local, Application). 1017create(Queue, Child, Options, URL, Application) :- 1018 pengine_child_id(Child), 1019 catch(create0(Queue, Child, Options, URL, Application), 1020 Error, 1021 create_error(Queue, Child, Error)). 1022 1023pengine_child_id(Child) :- 1024 ( nonvar(Child) 1025 -> true 1026 ; pengine_uuid(Child) 1027 ). 1028 1029create_error(Queue, Child, Error) :- 1030 pengine_reply(Queue, error(Child, Error)). 1031 1032create0(Queue, Child, Options, URL, Application) :- 1033 ( current_application(Application) 1034 -> true 1035 ; existence_error(pengine_application, Application) 1036 ), 1037 ( URL \== http % pengine is _not_ a child of the 1038 % HTTP server thread 1039 -> aggregate_all(count, child(_,_), Count), 1040 setting(Application:slave_limit, Max), 1041 ( Count >= Max 1042 -> throw(error(resource_error(max_pengines), _)) 1043 ; true 1044 ) 1045 ; true 1046 ), 1047 partition(pengine_create_option, Options, PengineOptions, RestOptions), 1048 thread_create_in_pool( 1049 Application, 1050 pengine_main(Queue, PengineOptions, Application), ChildThread, 1051 [ at_exit(pengine_done) 1052 | RestOptions 1053 ]), 1054 option(destroy(Destroy), PengineOptions, true), 1055 pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy), 1056 thread_send_message(ChildThread, pengine_registered(Child)), 1057 ( option(id(Id), Options) 1058 -> Id = Child 1059 ; true 1060 ). 1061 1062pengine_create_option(src_text(_)). 1063pengine_create_option(src_url(_)). 1064pengine_create_option(application(_)). 1065pengine_create_option(destroy(_)). 1066pengine_create_option(ask(_)). 1067pengine_create_option(template(_)). 1068pengine_create_option(bindings(_)). 1069pengine_create_option(chunk(_)). 1070pengine_create_option(alias(_)). 1071pengine_create_option(user(_)).
at_exit
option. Destroys child
pengines using pengine_destroy/1. Cleaning up the Pengine is
synchronised by the pengine_done
mutex. See read_event/6.1080:- public 1081 pengine_done/0. 1082 1083pengine_done :- 1084 thread_self(Me), 1085 ( thread_property(Me, status(exception('$aborted'))), 1086 thread_detach(Me), 1087 pengine_self(Pengine) 1088 -> catch(pengine_reply(destroy(Pengine, abort(Pengine))), 1089 error(_,_), true) 1090 ; true 1091 ), 1092 forall(child(_Name, Child), 1093 pengine_destroy(Child)), 1094 pengine_self(Id), 1095 protect_pengine(Id, pengine_unregister(Id)).
1103:- thread_local wrap_first_answer_in_create_event/2. 1104 1105:- meta_predicate 1106 pengine_prepare_source( , ). 1107 1108pengine_main(Parent, Options, Application) :- 1109 fix_streams, 1110 thread_get_message(pengine_registered(Self)), 1111 nb_setval(pengine_parent, Parent), 1112 pengine_register_user(Options), 1113 set_prolog_flag(mitigate_spectre, true), 1114 catch(in_temporary_module( 1115 Self, 1116 pengine_prepare_source(Application, Options), 1117 pengine_create_and_loop(Self, Application, Options)), 1118 prepare_source_failed, 1119 pengine_terminate(Self)). 1120 1121pengine_create_and_loop(Self, Application, Options) :- 1122 setting(Application:slave_limit, SlaveLimit), 1123 CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]), 1124 ( option(ask(Query0), Options) 1125 -> asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)), 1126 ( string(Query0) % string is not callable 1127 -> ( option(template(TemplateS), Options) 1128 -> Ask2 = Query0-TemplateS 1129 ; Ask2 = Query0 1130 ), 1131 catch(ask_to_term(Ask2, Self, Query, Template, Bindings), 1132 Error, true), 1133 ( var(Error) 1134 -> true 1135 ; send_error(Error), 1136 throw(prepare_source_failed) 1137 ) 1138 ; Query = Query0, 1139 option(template(Template), Options, Query), 1140 option(bindings(Bindings), Options, []) 1141 ), 1142 option(chunk(Chunk), Options, 1), 1143 pengine_ask(Self, Query, 1144 [ template(Template), 1145 chunk(Chunk), 1146 bindings(Bindings) 1147 ]) 1148 ; Extra = [], 1149 pengine_reply(CreateEvent) 1150 ), 1151 pengine_main_loop(Self).
1161ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :- 1162 !, 1163 format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]), 1164 term_string(t(Template1,Ask1), AskTemplate, 1165 [ variable_names(Bindings0), 1166 module(Module) 1167 ]), 1168 phrase(template_bindings(Template1, Bindings0), Bindings). 1169ask_to_term(Ask, Module, Ask1, Template, Bindings1) :- 1170 term_string(Ask1, Ask, 1171 [ variable_names(Bindings), 1172 module(Module) 1173 ]), 1174 exclude(anon, Bindings, Bindings1), 1175 dict_create(Template, swish_default_template, Bindings1). 1176 1177template_bindings(Var, Bindings) --> 1178 { var(Var) }, !, 1179 ( { var_binding(Bindings, Var, Binding) 1180 } 1181 -> [Binding] 1182 ; [] 1183 ). 1184template_bindings([H|T], Bindings) --> 1185 !, 1186 template_bindings(H, Bindings), 1187 template_bindings(T, Bindings). 1188template_bindings(Compoound, Bindings) --> 1189 { compound(Compoound), !, 1190 compound_name_arguments(Compoound, _, Args) 1191 }, 1192 template_bindings(Args, Bindings). 1193template_bindings(_, _) --> []. 1194 1195var_binding(Bindings, Var, Binding) :- 1196 member(Binding, Bindings), 1197 arg(2, Binding, V), 1198 V == Var, !.
1205fix_streams :- 1206 fix_stream(current_output). 1207 1208fix_stream(Name) :- 1209 is_cgi_stream(Name), 1210 !, 1211 debug(pengine(stream), '~w is a CGI stream!', [Name]), 1212 set_stream(user_output, alias(Name)). 1213fix_stream(_).
1222pengine_prepare_source(Module:Application, Options) :- 1223 setting(Application:program_space, SpaceLimit), 1224 set_module(Module:program_space(SpaceLimit)), 1225 delete_import_module(Module, user), 1226 add_import_module(Module, Application, start), 1227 catch(prep_module(Module, Application, Options), Error, true), 1228 ( var(Error) 1229 -> true 1230 ; send_error(Error), 1231 throw(prepare_source_failed) 1232 ). 1233 1234prep_module(Module, Application, Options) :- 1235 maplist(copy_flag(Module, Application), [var_prefix]), 1236 forall(prepare_module(Module, Application, Options), true), 1237 setup_call_cleanup( 1238 '$set_source_module'(OldModule, Module), 1239 maplist(process_create_option(Module), Options), 1240 '$set_source_module'(OldModule)). 1241 1242copy_flag(Module, Application, Flag) :- 1243 current_prolog_flag(ApplicationFlag, Value), 1244 !, 1245 set_prolog_flag(ModuleFlag, Value). 1246copy_flag(_, _, _). 1247 1248process_create_option(Application, src_text(Text)) :- 1249 !, 1250 pengine_src_text(Text, Application). 1251process_create_option(Application, src_url(URL)) :- 1252 !, 1253 pengine_src_url(URL, Application). 1254process_create_option(_, _).
src_text
and
src_url
options1277pengine_main_loop(ID) :- 1278 catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)). 1279 1280pengine_aborted(ID) :- 1281 thread_self(Self), 1282 debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]), 1283 empty_queue, 1284 destroy_or_continue(abort(ID)).
1297guarded_main_loop(ID) :- 1298 pengine_request(Request), 1299 ( Request = destroy 1300 -> debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]), 1301 pengine_terminate(ID) 1302 ; Request = ask(Goal, Options) 1303 -> debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]), 1304 ask(ID, Goal, Options) 1305 ; debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]), 1306 pengine_reply(error(ID, error(protocol_error, _))), 1307 guarded_main_loop(ID) 1308 ). 1309 1310 1311pengine_terminate(ID) :- 1312 pengine_reply(destroy(ID)), 1313 thread_self(Me), % Make the thread silently disappear 1314 thread_detach(Me).
1325solve(Chunk, Template, Goal, ID) :- 1326 prolog_current_choice(Choice), 1327 State = count(Chunk), 1328 statistics(cputime, Epoch), 1329 Time = time(Epoch), 1330 nb_current('$variable_names', Bindings), 1331 filter_template(Template, Bindings, Template2), 1332 '$current_typein_module'(CurrTypeIn), 1333 ( '$set_typein_module'(ID), 1334 call_cleanup(catch(findnsols_no_empty(State, Template2, 1335 set_projection(Goal, Bindings), 1336 Result), 1337 Error, true), 1338 query_done(Det, CurrTypeIn)), 1339 arg(1, Time, T0), 1340 statistics(cputime, T1), 1341 CPUTime is T1-T0, 1342 ( var(Error) 1343 -> projection(Projection), 1344 ( var(Det) 1345 -> pengine_reply(success(ID, Result, Projection, 1346 CPUTime, true)), 1347 more_solutions(ID, Choice, State, Time) 1348 ; !, % commit 1349 destroy_or_continue(success(ID, Result, Projection, 1350 CPUTime, false)) 1351 ) 1352 ; !, % commit 1353 ( Error == abort_query 1354 -> throw(Error) 1355 ; destroy_or_continue(error(ID, Error)) 1356 ) 1357 ) 1358 ; !, % commit 1359 arg(1, Time, T0), 1360 statistics(cputime, T1), 1361 CPUTime is T1-T0, 1362 destroy_or_continue(failure(ID, CPUTime)) 1363 ). 1364solve(_, _, _, _). % leave a choice point 1365 1366query_done(true, CurrTypeIn) :- 1367 '$set_typein_module'(CurrTypeIn).
1376set_projection(Goal, Bindings) :- 1377 b_setval('$variable_names', Bindings), 1378 call(Goal). 1379 1380projection(Projection) :- 1381 nb_current('$variable_names', Bindings), 1382 !, 1383 maplist(var_name, Bindings, Projection). 1384projection([]).
1394filter_template(Template0, Bindings, Template) :- 1395 is_dict(Template0, swish_default_template), 1396 !, 1397 dict_create(Template, swish_default_template, Bindings). 1398filter_template(Template, _Bindings, Template). 1399 1400findnsols_no_empty(N, Template, Goal, List) :- 1401 findnsols(N, Template, Goal, List), 1402 List \== []. 1403 1404destroy_or_continue(Event) :- 1405 arg(1, Event, ID), 1406 ( pengine_property(ID, destroy(true)) 1407 -> thread_self(Me), 1408 thread_detach(Me), 1409 pengine_reply(destroy(ID, Event)) 1410 ; pengine_reply(Event), 1411 garbage_collect, % minimise our footprint 1412 trim_stacks, 1413 guarded_main_loop(ID) 1414 ).
chunk
solutions.next
, but sets the new chunk-size to Count.1432more_solutions(ID, Choice, State, Time) :- 1433 pengine_request(Event), 1434 more_solutions(Event, ID, Choice, State, Time). 1435 1436more_solutions(stop, ID, _Choice, _State, _Time) :- 1437 !, 1438 debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]), 1439 destroy_or_continue(stop(ID)). 1440more_solutions(next, ID, _Choice, _State, Time) :- 1441 !, 1442 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]), 1443 statistics(cputime, T0), 1444 nb_setarg(1, Time, T0), 1445 fail. 1446more_solutions(next(Count), ID, _Choice, State, Time) :- 1447 Count > 0, 1448 !, 1449 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]), 1450 nb_setarg(1, State, Count), 1451 statistics(cputime, T0), 1452 nb_setarg(1, Time, T0), 1453 fail. 1454more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :- 1455 !, 1456 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]), 1457 prolog_cut_to(Choice), 1458 ask(ID, Goal, Options). 1459more_solutions(destroy, ID, _Choice, _State, _Time) :- 1460 !, 1461 debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]), 1462 pengine_terminate(ID). 1463more_solutions(Event, ID, Choice, State, Time) :- 1464 debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]), 1465 pengine_reply(error(ID, error(protocol_error, _))), 1466 more_solutions(ID, Choice, State, Time).
chunk(N)
option.
1474ask(ID, Goal, Options) :-
1475 catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
1476 !,
1477 ( var(Error)
1478 -> option(template(Template), Options, Goal),
1479 option(chunk(N), Options, 1),
1480 solve(N, Template, Goal1, ID)
1481 ; pengine_reply(error(ID, Error)),
1482 guarded_main_loop(ID)
1483 ).
Note that expand_goal(Module:GoalIn, GoalOut)
is what we'd like
to write, but this does not work correctly if the user wishes to
expand X:Y
while interpreting X not as the module in which
to run Y. This happens in the CQL package. Possibly we should
disallow this reinterpretation?
1497prepare_goal(ID, Goal0, Module:Goal, Options) :-
1498 option(bindings(Bindings), Options, []),
1499 b_setval('$variable_names', Bindings),
1500 ( prepare_goal(Goal0, Goal1, Options)
1501 -> true
1502 ; Goal1 = Goal0
1503 ),
1504 get_pengine_module(ID, Module),
1505 setup_call_cleanup(
1506 '$set_source_module'(Old, Module),
1507 expand_goal(Goal1, Goal),
1508 '$set_source_module'(_, Old)),
1509 ( pengine_not_sandboxed(ID)
1510 -> true
1511 ; get_pengine_application(ID, App),
1512 setting(App:safe_goal_limit, Limit),
1513 catch(call_with_time_limit(
1514 Limit,
1515 safe_goal(Module:Goal)), E, true)
1516 -> ( var(E)
1517 -> true
1518 ; E = time_limit_exceeded
1519 -> throw(error(sandbox(time_limit_exceeded, Limit),_))
1520 ; throw(E)
1521 )
1522 ).
not_sandboxed(User, Application)
must succeed.
1542pengine_not_sandboxed(ID) :-
1543 pengine_user(ID, User),
1544 pengine_property(ID, application(App)),
1545 not_sandboxed(User, App),
1546 !.
1568pengine_pull_response(Pengine, Options) :- 1569 pengine_remote(Pengine, Server), 1570 !, 1571 remote_pengine_pull_response(Server, Pengine, Options). 1572pengine_pull_response(_ID, _Options).
1581pengine_input(Prompt, Term) :-
1582 pengine_self(Self),
1583 pengine_parent(Parent),
1584 pengine_reply(Parent, prompt(Self, Prompt)),
1585 pengine_request(Request),
1586 ( Request = input(Input)
1587 -> Term = Input
1588 ; Request == destroy
1589 -> abort
1590 ; throw(error(protocol_error,_))
1591 ).
Defined in terms of pengine_send/3, as follows:
pengine_respond(Pengine, Input, Options) :- pengine_send(Pengine, input(Input), Options).
*/
1608pengine_respond(Pengine, Input, Options) :-
1609 pengine_send(Pengine, input(Input), Options).
1618send_error(error(Formal, context(prolog_stack(Frames), Message))) :- 1619 is_list(Frames), 1620 !, 1621 with_output_to(string(Stack), 1622 print_prolog_backtrace(current_output, Frames)), 1623 pengine_self(Self), 1624 replace_blobs(Formal, Formal1), 1625 replace_blobs(Message, Message1), 1626 pengine_reply(error(Self, error(Formal1, 1627 context(prolog_stack(Stack), Message1)))). 1628send_error(Error) :- 1629 pengine_self(Self), 1630 replace_blobs(Error, Error1), 1631 pengine_reply(error(Self, Error1)).
1639replace_blobs(Blob, Atom) :- 1640 blob(Blob, Type), Type \== text, 1641 !, 1642 format(atom(Atom), '~p', [Blob]). 1643replace_blobs(Term0, Term) :- 1644 compound(Term0), 1645 !, 1646 compound_name_arguments(Term0, Name, Args0), 1647 maplist(replace_blobs, Args0, Args), 1648 compound_name_arguments(Term, Name, Args). 1649replace_blobs(Term, Term). 1650 1651 1652/*================= Remote pengines ======================= 1653*/ 1654 1655 1656remote_pengine_create(BaseURL, Options) :- 1657 partition(pengine_create_option, Options, PengineOptions0, RestOptions), 1658 ( option(ask(Query), PengineOptions0), 1659 \+ option(template(_Template), PengineOptions0) 1660 -> PengineOptions = [template(Query)|PengineOptions0] 1661 ; PengineOptions = PengineOptions0 1662 ), 1663 options_to_dict(PengineOptions, PostData), 1664 remote_post_rec(BaseURL, create, PostData, Reply, RestOptions), 1665 arg(1, Reply, ID), 1666 ( option(id(ID2), Options) 1667 -> ID = ID2 1668 ; true 1669 ), 1670 option(alias(Name), Options, ID), 1671 assert(child(Name, ID)), 1672 ( ( functor(Reply, create, _) % actually created 1673 ; functor(Reply, output, _) % compiler messages 1674 ) 1675 -> option(application(Application), PengineOptions, pengine_sandbox), 1676 option(destroy(Destroy), PengineOptions, true), 1677 pengine_register_remote(ID, BaseURL, Application, Destroy) 1678 ; true 1679 ), 1680 thread_self(Queue), 1681 pengine_reply(Queue, Reply). 1682 1683options_to_dict(Options, Dict) :- 1684 select_option(ask(Ask), Options, Options1), 1685 select_option(template(Template), Options1, Options2), 1686 !, 1687 no_numbered_var_in(Ask+Template), 1688 findall(AskString-TemplateString, 1689 ask_template_to_strings(Ask, Template, AskString, TemplateString), 1690 [ AskString-TemplateString ]), 1691 options_to_dict(Options2, Dict0), 1692 Dict = Dict0.put(_{ask:AskString,template:TemplateString}). 1693options_to_dict(Options, Dict) :- 1694 maplist(prolog_option, Options, Options1), 1695 dict_create(Dict, _, Options1). 1696 1697no_numbered_var_in(Term) :- 1698 sub_term(Sub, Term), 1699 subsumes_term('$VAR'(_), Sub), 1700 !, 1701 domain_error(numbered_vars_free_term, Term). 1702no_numbered_var_in(_). 1703 1704ask_template_to_strings(Ask, Template, AskString, TemplateString) :- 1705 numbervars(Ask+Template, 0, _), 1706 WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ], 1707 format(string(AskTemplate), '~W\n~W', [ Ask, WOpts, 1708 Template, WOpts 1709 ]), 1710 split_string(AskTemplate, "\n", "", [AskString, TemplateString]). 1711 1712prolog_option(Option0, Option) :- 1713 create_option_type(Option0, term), 1714 !, 1715 Option0 =.. [Name,Value], 1716 format(string(String), '~k', [Value]), 1717 Option =.. [Name,String]. 1718prolog_option(Option, Option). 1719 1720create_option_type(ask(_), term). 1721create_option_type(template(_), term). 1722create_option_type(application(_), atom). 1723 1724remote_pengine_send(BaseURL, ID, Event, Options) :- 1725 remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options), 1726 thread_self(Queue), 1727 pengine_reply(Queue, Reply). 1728 1729remote_pengine_pull_response(BaseURL, ID, Options) :- 1730 remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options), 1731 thread_self(Queue), 1732 pengine_reply(Queue, Reply). 1733 1734remote_pengine_abort(BaseURL, ID, Options) :- 1735 remote_send_rec(BaseURL, abort, ID, [], Reply, Options), 1736 thread_self(Queue), 1737 pengine_reply(Queue, Reply).
1744remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :- 1745 !, 1746 server_url(Server, Action, [id=ID], URL), 1747 http_open(URL, Stream, % putting this in setup_call_cleanup/3 1748 [ post(prolog(Event)) % makes it impossible to interrupt. 1749 | Options 1750 ]), 1751 call_cleanup( 1752 read_prolog_reply(Stream, Reply), 1753 close(Stream)). 1754remote_send_rec(Server, Action, ID, Params, Reply, Options) :- 1755 server_url(Server, Action, [id=ID|Params], URL), 1756 http_open(URL, Stream, Options), 1757 call_cleanup( 1758 read_prolog_reply(Stream, Reply), 1759 close(Stream)). 1760 1761remote_post_rec(Server, Action, Data, Reply, Options) :- 1762 server_url(Server, Action, [], URL), 1763 probe(Action, URL), 1764 http_open(URL, Stream, 1765 [ post(json(Data)) 1766 | Options 1767 ]), 1768 call_cleanup( 1769 read_prolog_reply(Stream, Reply), 1770 close(Stream)).
1778probe(create, URL) :- 1779 !, 1780 http_open(URL, Stream, [method(options)]), 1781 close(Stream). 1782probe(_, _). 1783 1784read_prolog_reply(In, Reply) :- 1785 set_stream(In, encoding(utf8)), 1786 read(In, Reply0), 1787 rebind_cycles(Reply0, Reply). 1788 1789rebind_cycles(@(Reply, Bindings), Reply) :- 1790 is_list(Bindings), 1791 !, 1792 maplist(bind, Bindings). 1793rebind_cycles(Reply, Reply). 1794 1795bind(Var = Value) :- 1796 Var = Value. 1797 1798server_url(Server, Action, Params, URL) :- 1799 uri_components(Server, Components0), 1800 uri_query_components(Query, Params), 1801 uri_data(path, Components0, Path0), 1802 atom_concat('pengine/', Action, PAction), 1803 directory_file_path(Path0, PAction, Path), 1804 uri_data(path, Components0, Path, Components), 1805 uri_data(search, Components, Query), 1806 uri_components(URL, Components).
Valid options are:
timeout
.1827pengine_event(Event) :- 1828 pengine_event(Event, []). 1829 1830pengine_event(Event, Options) :- 1831 thread_self(Self), 1832 option(listen(Id), Options, _), 1833 ( thread_get_message(Self, pengine_event(Id, Event), Options) 1834 -> true 1835 ; Event = timeout 1836 ), 1837 update_remote_destroy(Event). 1838 1839update_remote_destroy(Event) :- 1840 destroy_event(Event), 1841 arg(1, Event, Id), 1842 pengine_remote(Id, _Server), 1843 !, 1844 pengine_unregister_remote(Id). 1845update_remote_destroy(_). 1846 1847destroy_event(destroy(_)). 1848destroy_event(destroy(_,_)). 1849destroy_event(create(_,Features)) :- 1850 memberchk(answer(Answer), Features), 1851 !, 1852 nonvar(Answer), 1853 destroy_event(Answer).
ignore(call(Closure, E))
. A
closure thus acts as a handler for the event. Some events are also
treated specially:
Valid options are:
all
,
all_but_sender
or a Prolog list of NameOrIDs. [not yet
implemented]*/
1882pengine_event_loop(Closure, Options) :- 1883 child(_,_), 1884 !, 1885 pengine_event(Event), 1886 ( option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs 1887 -> forall(child(_,ID), pengine_send(ID, Event)) 1888 ; true 1889 ), 1890 pengine_event_loop(Event, Closure, Options). 1891pengine_event_loop(_, _). 1892 1893:- meta_predicate 1894 pengine_process_event( , , , ). 1895 1896pengine_event_loop(Event, Closure, Options) :- 1897 pengine_process_event(Event, Closure, Continue, Options), 1898 ( Continue == true 1899 -> pengine_event_loop(Closure, Options) 1900 ; true 1901 ). 1902 1903pengine_process_event(create(ID, T), Closure, Continue, Options) :- 1904 debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]), 1905 ( select(answer(First), T, T1) 1906 -> ignore(call(Closure, create(ID, T1))), 1907 pengine_process_event(First, Closure, Continue, Options) 1908 ; ignore(call(Closure, create(ID, T))), 1909 Continue = true 1910 ). 1911pengine_process_event(output(ID, Msg), Closure, true, _Options) :- 1912 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]), 1913 ignore(call(Closure, output(ID, Msg))), 1914 pengine_pull_response(ID, []). 1915pengine_process_event(debug(ID, Msg), Closure, true, _Options) :- 1916 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]), 1917 ignore(call(Closure, debug(ID, Msg))), 1918 pengine_pull_response(ID, []). 1919pengine_process_event(prompt(ID, Term), Closure, true, _Options) :- 1920 debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]), 1921 ignore(call(Closure, prompt(ID, Term))). 1922pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :- 1923 debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]), 1924 ignore(call(Closure, success(ID, Sol, More))). 1925pengine_process_event(failure(ID, _Time), Closure, true, _Options) :- 1926 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]), 1927 ignore(call(Closure, failure(ID))). 1928pengine_process_event(error(ID, Error), Closure, Continue, _Options) :- 1929 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]), 1930 ( call(Closure, error(ID, Error)) 1931 -> Continue = true 1932 ; forall(child(_,Child), pengine_destroy(Child)), 1933 throw(Error) 1934 ). 1935pengine_process_event(stop(ID), Closure, true, _Options) :- 1936 debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]), 1937 ignore(call(Closure, stop(ID))). 1938pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :- 1939 pengine_process_event(Event, Closure, _, Options), 1940 pengine_process_event(destroy(ID), Closure, Continue, Options). 1941pengine_process_event(destroy(ID), Closure, true, _Options) :- 1942 retractall(child(_,ID)), 1943 debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]), 1944 ignore(call(Closure, destroy(ID))).
copy_term_nat(Query, Copy), % attributes are not copied to the server call(Copy), % executed on server at URL Query = Copy.
Valid options are:
pengines:time_limit
.Remaining options (except the server option) are passed to pengine_create/1. */
1973pengine_rpc(URL, Query) :- 1974 pengine_rpc(URL, Query, []). 1975 1976pengine_rpc(URL, Query, M:Options0) :- 1977 translate_local_sources(Options0, Options1, M), 1978 ( option(timeout(_), Options1) 1979 -> Options = Options1 1980 ; setting(time_limit, Limit), 1981 Options = [timeout(Limit)|Options1] 1982 ), 1983 term_variables(Query, Vars), 1984 Template =.. [v|Vars], 1985 State = destroy(true), % modified by process_event/4 1986 setup_call_catcher_cleanup( 1987 pengine_create([ ask(Query), 1988 template(Template), 1989 server(URL), 1990 id(Id) 1991 | Options 1992 ]), 1993 wait_event(Template, State, [listen(Id)|Options]), 1994 Why, 1995 pengine_destroy_and_wait(State, Id, Why)). 1996 1997pengine_destroy_and_wait(destroy(true), Id, Why) :- 1998 !, 1999 debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]), 2000 pengine_destroy(Id), 2001 wait_destroy(Id, 10). 2002pengine_destroy_and_wait(_, _, Why) :- 2003 debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]). 2004 2005wait_destroy(Id, _) :- 2006 \+ child(_, Id), 2007 !. 2008wait_destroy(Id, N) :- 2009 pengine_event(Event, [listen(Id),timeout(10)]), 2010 !, 2011 ( destroy_event(Event) 2012 -> retractall(child(_,Id)) 2013 ; succ(N1, N) 2014 -> wait_destroy(Id, N1) 2015 ; debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]), 2016 pengine_unregister_remote(Id), 2017 retractall(child(_,Id)) 2018 ). 2019 2020wait_event(Template, State, Options) :- 2021 pengine_event(Event, Options), 2022 debug(pengine(event), 'Received ~p', [Event]), 2023 process_event(Event, Template, State, Options). 2024 2025process_event(create(_ID, Features), Template, State, Options) :- 2026 memberchk(answer(First), Features), 2027 process_event(First, Template, State, Options). 2028process_event(error(_ID, Error), _Template, _, _Options) :- 2029 throw(Error). 2030process_event(failure(_ID, _Time), _Template, _, _Options) :- 2031 fail. 2032process_event(prompt(ID, Prompt), Template, State, Options) :- 2033 pengine_rpc_prompt(ID, Prompt, Reply), 2034 pengine_send(ID, input(Reply)), 2035 wait_event(Template, State, Options). 2036process_event(output(ID, Term), Template, State, Options) :- 2037 pengine_rpc_output(ID, Term), 2038 pengine_pull_response(ID, Options), 2039 wait_event(Template, State, Options). 2040process_event(debug(ID, Message), Template, State, Options) :- 2041 debug(pengine(debug), '~w', [Message]), 2042 pengine_pull_response(ID, Options), 2043 wait_event(Template, State, Options). 2044process_event(success(_ID, Solutions, _Proj, _Time, false), 2045 Template, _, _Options) :- 2046 !, 2047 member(Template, Solutions). 2048process_event(success(ID, Solutions, _Proj, _Time, true), 2049 Template, State, Options) :- 2050 ( member(Template, Solutions) 2051 ; pengine_next(ID, Options), 2052 wait_event(Template, State, Options) 2053 ). 2054process_event(destroy(ID, Event), Template, State, Options) :- 2055 !, 2056 retractall(child(_,ID)), 2057 nb_setarg(1, State, false), 2058 debug(pengine(destroy), 'State: ~p~n', [State]), 2059 process_event(Event, Template, State, Options). 2060% compatibility with older versions of the protocol. 2061process_event(success(ID, Solutions, Time, More), 2062 Template, State, Options) :- 2063 process_event(success(ID, Solutions, _Proj, Time, More), 2064 Template, State, Options). 2065 2066 2067pengine_rpc_prompt(ID, Prompt, Term) :- 2068 prompt(ID, Prompt, Term0), 2069 !, 2070 Term = Term0. 2071pengine_rpc_prompt(_ID, Prompt, Term) :- 2072 setup_call_cleanup( 2073 prompt(Old, Prompt), 2074 read(Term), 2075 prompt(_, Old)). 2076 2077pengine_rpc_output(ID, Term) :- 2078 output(ID, Term), 2079 !. 2080pengine_rpc_output(_ID, Term) :- 2081 print(Term).
2088:- multifile prompt/3.
2095:- multifile output/2. 2096 2097 2098/*================= HTTP handlers ======================= 2099*/ 2100 2101% Declare HTTP locations we serve and how. Note that we use 2102% time_limit(inifinite) because pengines have their own timeout. Also 2103% note that we use spawn. This is needed because we can easily get 2104% many clients waiting for some action on a pengine to complete. 2105% Without spawning, we would quickly exhaust the worker pool of the 2106% HTTP server. 2107% 2108% FIXME: probably we should wait for a short time for the pengine on 2109% the default worker thread. Only if that time has expired, we can 2110% call http_spawn/2 to continue waiting on a new thread. That would 2111% improve the performance and reduce the usage of threads. 2112 2113:- http_handler(root(pengine), http_404([]), 2114 [ id(pengines) ]). 2115:- http_handler(root(pengine/create), http_pengine_create, 2116 [ time_limit(infinite), spawn([]) ]). 2117:- http_handler(root(pengine/send), http_pengine_send, 2118 [ time_limit(infinite), spawn([]) ]). 2119:- http_handler(root(pengine/pull_response), http_pengine_pull_response, 2120 [ time_limit(infinite), spawn([]) ]). 2121:- http_handler(root(pengine/abort), http_pengine_abort, []). 2122:- http_handler(root(pengine/detach), http_pengine_detach, []). 2123:- http_handler(root(pengine/list), http_pengine_list, []). 2124:- http_handler(root(pengine/ping), http_pengine_ping, []). 2125:- http_handler(root(pengine/destroy_all), http_pengine_destroy_all, []). 2126 2127:- http_handler(root(pengine/'pengines.js'), 2128 http_reply_file(library('http/web/js/pengines.js'), []), []). 2129:- http_handler(root(pengine/'plterm.css'), 2130 http_reply_file(library('http/web/css/plterm.css'), []), []).
application/json
and as
www-form-encoded
. Accepted parameters:
Parameter | Default | Comment |
---|---|---|
format | prolog | Output format |
application | pengine_sandbox | Pengine application |
chunk | 1 | Chunk-size for results |
solutions | chunked | If all , emit all results |
ask | - | The query |
template | - | Output template |
src_text | "" | Program |
src_url | - | Program to download |
disposition | - | Download location |
Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.
2157http_pengine_create(Request) :- 2158 reply_options(Request, [post]), 2159 !. 2160http_pengine_create(Request) :- 2161 memberchk(content_type(CT), Request), 2162 sub_atom(CT, 0, _, _, 'application/json'), 2163 !, 2164 http_read_json_dict(Request, Dict), 2165 dict_atom_option(format, Dict, Format, prolog), 2166 dict_atom_option(application, Dict, Application, pengine_sandbox), 2167 http_pengine_create(Request, Application, Format, Dict). 2168http_pengine_create(Request) :- 2169 Optional = [optional(true)], 2170 OptString = [string|Optional], 2171 Form = [ format(Format, [default(prolog)]), 2172 application(Application, [default(pengine_sandbox)]), 2173 chunk(_, [integer, default(1)]), 2174 solutions(_, [oneof([all,chunked]), default(chunked)]), 2175 ask(_, OptString), 2176 template(_, OptString), 2177 src_text(_, OptString), 2178 disposition(_, OptString), 2179 src_url(_, Optional) 2180 ], 2181 http_parameters(Request, Form), 2182 form_dict(Form, Dict), 2183 http_pengine_create(Request, Application, Format, Dict). 2184 2185dict_atom_option(Key, Dict, Atom, Default) :- 2186 ( get_dict(Key, Dict, String) 2187 -> atom_string(Atom, String) 2188 ; Atom = Default 2189 ). 2190 2191form_dict(Form, Dict) :- 2192 form_values(Form, Pairs), 2193 dict_pairs(Dict, _, Pairs). 2194 2195form_values([], []). 2196form_values([H|T], Pairs) :- 2197 arg(1, H, Value), 2198 nonvar(Value), 2199 !, 2200 functor(H, Name, _), 2201 Pairs = [Name-Value|PairsT], 2202 form_values(T, PairsT). 2203form_values([_|T], Pairs) :- 2204 form_values(T, Pairs).
2209http_pengine_create(Request, Application, Format, Dict) :- 2210 current_application(Application), 2211 !, 2212 allowed(Request, Application), 2213 authenticate(Request, Application, UserOptions), 2214 dict_to_options(Dict, Application, CreateOptions0), 2215 append(UserOptions, CreateOptions0, CreateOptions), 2216 pengine_uuid(Pengine), 2217 message_queue_create(Queue, [max_size(25)]), 2218 setting(Application:time_limit, TimeLimit), 2219 get_time(Now), 2220 asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)), 2221 broadcast(pengine(create(Pengine, Application, CreateOptions))), 2222 create(Queue, Pengine, CreateOptions, http, Application), 2223 create_wait_and_output_result(Pengine, Queue, Format, 2224 TimeLimit, Dict), 2225 gc_abandoned_queues. 2226http_pengine_create(_Request, Application, Format, _Dict) :- 2227 Error = existence_error(pengine_application, Application), 2228 pengine_uuid(ID), 2229 output_result(Format, error(ID, error(Error, _))). 2230 2231 2232dict_to_options(Dict, Application, CreateOptions) :- 2233 dict_pairs(Dict, _, Pairs), 2234 pairs_create_options(Pairs, Application, CreateOptions). 2235 2236pairs_create_options([], _, []) :- !. 2237pairs_create_options([N-V0|T0], App, [Opt|T]) :- 2238 Opt =.. [N,V], 2239 pengine_create_option(Opt), N \== user, 2240 !, 2241 ( create_option_type(Opt, atom) 2242 -> atom_string(V, V0) % term creation must be done if 2243 ; V = V0 % we created the source and know 2244 ), % the operators. 2245 pairs_create_options(T0, App, T). 2246pairs_create_options([_|T0], App, T) :- 2247 pairs_create_options(T0, App, T).
time_limit
,
Pengine is aborted and the result is error(time_limit_exceeded,
_)
.
2258wait_and_output_result(Pengine, Queue, Format, TimeLimit) :-
2259 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2260 [ timeout(TimeLimit)
2261 ]),
2262 Error, true)
2263 -> ( var(Error)
2264 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2265 ignore(destroy_queue_from_http(Pengine, Event, Queue)),
2266 protect_pengine(Pengine, output_result(Format, Event))
2267 ; output_result(Format, died(Pengine))
2268 )
2269 ; time_limit_exceeded(Pengine, Format)
2270 ).
disposition
key to denote the
download location.2279create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2280 get_dict(solutions, Dict, all), 2281 !, 2282 between(1, infinite, Page), 2283 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2284 [ timeout(TimeLimit) 2285 ]), 2286 Error, true) 2287 -> ( var(Error) 2288 -> debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]), 2289 ( destroy_queue_from_http(Pengine, Event, Queue) 2290 -> !, 2291 protect_pengine(Pengine, 2292 output_result(Format, page(Page, Event), Dict)) 2293 ; is_more_event(Event) 2294 -> pengine_thread(Pengine, Thread), 2295 thread_send_message(Thread, pengine_request(next)), 2296 protect_pengine(Pengine, 2297 output_result(Format, page(Page, Event), Dict)), 2298 fail 2299 ; !, 2300 protect_pengine(Pengine, 2301 output_result(Format, page(Page, Event), Dict)) 2302 ) 2303 ; !, output_result(Format, died(Pengine)) 2304 ) 2305 ; !, time_limit_exceeded(Pengine, Format) 2306 ), 2307 !. 2308create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, _Dict) :- 2309 wait_and_output_result(Pengine, Queue, Format, TimeLimit). 2310 2311is_more_event(success(_Id, _Answers, _Projection, _Time, true)). 2312is_more_event(create(_, Options)) :- 2313 memberchk(answer(Event), Options), 2314 is_more_event(Event).
2328time_limit_exceeded(Pengine, Format) :-
2329 call_cleanup(
2330 pengine_destroy(Pengine, [force(true)]),
2331 output_result(Format,
2332 destroy(Pengine,
2333 error(Pengine, time_limit_exceeded)))).
2348destroy_queue_from_http(ID, _, Queue) :- 2349 output_queue(ID, Queue, _), 2350 !, 2351 destroy_queue_if_empty(Queue). 2352destroy_queue_from_http(ID, Event, Queue) :- 2353 debug(pengine(destroy), 'DESTROY? ~p', [Event]), 2354 is_destroy_event(Event), 2355 !, 2356 message_queue_property(Queue, size(Waiting)), 2357 debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]), 2358 with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)). 2359 2360is_destroy_event(destroy(_)). 2361is_destroy_event(destroy(_,_)). 2362is_destroy_event(create(_, Options)) :- 2363 memberchk(answer(Event), Options), 2364 is_destroy_event(Event). 2365 2366destroy_queue_if_empty(Queue) :- 2367 thread_peek_message(Queue, _), 2368 !. 2369destroy_queue_if_empty(Queue) :- 2370 retractall(output_queue(_, Queue, _)), 2371 message_queue_destroy(Queue).
2379:- dynamic 2380 last_gc/1. 2381 2382gc_abandoned_queues :- 2383 consider_queue_gc, 2384 !, 2385 get_time(Now), 2386 ( output_queue(_, Queue, Time), 2387 Now-Time > 15*60, 2388 retract(output_queue(_, Queue, Time)), 2389 message_queue_destroy(Queue), 2390 fail 2391 ; retractall(last_gc(_)), 2392 asserta(last_gc(Now)) 2393 ). 2394gc_abandoned_queues. 2395 2396consider_queue_gc :- 2397 predicate_property(output_queue(_,_,_), number_of_clauses(N)), 2398 N > 100, 2399 ( last_gc(Time), 2400 get_time(Now), 2401 Now-Time > 5*60 2402 -> true 2403 ; \+ last_gc(_) 2404 ).
2422:- dynamic output_queue_destroyed/1. 2423 2424sync_destroy_queue_from_http(ID, Queue) :- 2425 ( output_queue(ID, Queue, _) 2426 -> destroy_queue_if_empty(Queue) 2427 ; thread_peek_message(Queue, pengine_event(_, output(_,_))) 2428 -> debug(pengine(destroy), 'Delay destruction of ~p because of output', 2429 [Queue]), 2430 get_time(Now), 2431 asserta(output_queue(ID, Queue, Now)) 2432 ; message_queue_destroy(Queue), 2433 asserta(output_queue_destroyed(Queue)) 2434 ).
pengine
held.2441sync_destroy_queue_from_pengine(ID, Queue) :- 2442 ( retract(output_queue_destroyed(Queue)) 2443 -> true 2444 ; get_time(Now), 2445 asserta(output_queue(ID, Queue, Now)) 2446 ), 2447 retractall(pengine_queue(ID, Queue, _, _)). 2448 2449 2450http_pengine_send(Request) :- 2451 reply_options(Request, [get,post]), 2452 !. 2453http_pengine_send(Request) :- 2454 http_parameters(Request, 2455 [ id(ID, [ type(atom) ]), 2456 event(EventString, [optional(true)]), 2457 format(Format, [default(prolog)]) 2458 ]), 2459 catch(read_event(ID, Request, Format, EventString, Event), 2460 Error, 2461 true), 2462 ( var(Error) 2463 -> debug(pengine(event), 'HTTP send: ~p', [Event]), 2464 ( pengine_thread(ID, Thread) 2465 -> pengine_queue(ID, Queue, TimeLimit, _), 2466 random_delay, 2467 broadcast(pengine(send(ID, Event))), 2468 thread_send_message(Thread, pengine_request(Event)), 2469 wait_and_output_result(ID, Queue, Format, TimeLimit) 2470 ; atom(ID) 2471 -> pengine_died(Format, ID) 2472 ; http_404([], Request) 2473 ) 2474 ; Error = error(existence_error(pengine, ID), _) 2475 -> pengine_died(Format, ID) 2476 ; output_result(Format, error(ID, Error)) 2477 ). 2478 2479pengine_died(Format, Pengine) :- 2480 output_result(Format, error(Pengine, 2481 error(existence_error(pengine, Pengine),_))).
pengine_done
mutex.
2492read_event(Pengine, Request, Format, EventString, Event) :- 2493 protect_pengine( 2494 Pengine, 2495 ( get_pengine_module(Pengine, Module), 2496 read_event_2(Request, EventString, Module, Event0, Bindings) 2497 )), 2498 !, 2499 fix_bindings(Format, Event0, Bindings, Event). 2500read_event(Pengine, Request, _Format, _EventString, _Event) :- 2501 debug(pengine(event), 'Pengine ~q vanished', [Pengine]), 2502 discard_post_data(Request), 2503 existence_error(pengine, Pengine).
event
parameter or as a posted document.2511read_event_2(_Request, EventString, Module, Event, Bindings) :- 2512 nonvar(EventString), 2513 !, 2514 term_string(Event, EventString, 2515 [ variable_names(Bindings), 2516 module(Module) 2517 ]). 2518read_event_2(Request, _EventString, Module, Event, Bindings) :- 2519 option(method(post), Request), 2520 http_read_data(Request, Event, 2521 [ content_type('application/x-prolog'), 2522 module(Module), 2523 variable_names(Bindings) 2524 ]).
2530discard_post_data(Request) :- 2531 option(method(post), Request), 2532 !, 2533 setup_call_cleanup( 2534 open_null_stream(NULL), 2535 http_read_data(Request, _, [to(stream(NULL))]), 2536 close(NULL)). 2537discard_post_data(_).
json(-s)
Format from the variables in
the asked Goal. Variables starting with an underscore, followed
by an capital letter are ignored from the template.2545fix_bindings(Format, 2546 ask(Goal, Options0), Bindings, 2547 ask(Goal, NewOptions)) :- 2548 json_lang(Format), 2549 !, 2550 exclude(anon, Bindings, NamedBindings), 2551 template(NamedBindings, Template, Options0, Options1), 2552 select_option(chunk(Paging), Options1, Options2, 1), 2553 NewOptions = [ template(Template), 2554 chunk(Paging), 2555 bindings(NamedBindings) 2556 | Options2 2557 ]. 2558fix_bindings(_, Command, _, Command). 2559 2560template(_, Template, Options0, Options) :- 2561 select_option(template(Template), Options0, Options), 2562 !. 2563template(Bindings, Template, Options, Options) :- 2564 dict_create(Template, swish_default_template, Bindings). 2565 2566anon(Name=_) :- 2567 sub_atom(Name, 0, _, _, '_'), 2568 sub_atom(Name, 1, 1, _, Next), 2569 char_type(Next, prolog_var_start). 2570 2571var_name(Name=_, Name).
2578json_lang(json) :- !. 2579json_lang(Format) :- 2580 sub_atom(Format, 0, _, _, 'json-').
2587http_pengine_pull_response(Request) :- 2588 reply_options(Request, [get]), 2589 !. 2590http_pengine_pull_response(Request) :- 2591 http_parameters(Request, 2592 [ id(ID, []), 2593 format(Format, [default(prolog)]) 2594 ]), 2595 reattach(ID), 2596 ( ( pengine_queue(ID, Queue, TimeLimit, _) 2597 -> true 2598 ; output_queue(ID, Queue, _), 2599 TimeLimit = 0 2600 ) 2601 -> wait_and_output_result(ID, Queue, Format, TimeLimit) 2602 ; http_404([], Request) 2603 ).
2612http_pengine_abort(Request) :- 2613 reply_options(Request, [get,post]), 2614 !. 2615http_pengine_abort(Request) :- 2616 http_parameters(Request, 2617 [ id(ID, []) 2618 ]), 2619 ( pengine_thread(ID, _Thread) 2620 -> broadcast(pengine(abort(ID))), 2621 abort_pending_output(ID), 2622 pengine_abort(ID), 2623 reply_json(true) 2624 ; http_404([], Request) 2625 ).
2637http_pengine_detach(Request) :- 2638 reply_options(Request, [post]), 2639 !. 2640http_pengine_detach(Request) :- 2641 http_parameters(Request, 2642 [ id(ID, []) 2643 ]), 2644 http_read_json_dict(Request, ClientData), 2645 ( pengine_property(ID, application(Application)), 2646 allowed(Request, Application), 2647 authenticate(Request, Application, _UserOptions) 2648 -> broadcast(pengine(detach(ID))), 2649 get_time(Now), 2650 assertz(pengine_detached(ID, ClientData.put(time, Now))), 2651 pengine_queue(ID, Queue, _TimeLimit, _Now), 2652 message_queue_set(Queue, max_size(1000)), 2653 pengine_reply(Queue, detached(ID)), 2654 reply_json(true) 2655 ; http_404([], Request) 2656 ). 2657 2658:- if(\+current_predicate(message_queue_set/2)). 2659message_queue_set(_,_). 2660:- endif. 2661 2662reattach(ID) :- 2663 ( retract(pengine_detached(ID, _Data)), 2664 pengine_queue(ID, Queue, _TimeLimit, _Now) 2665 -> message_queue_set(Queue, max_size(25)) 2666 ; true 2667 ).
2675http_pengine_destroy_all(Request) :- 2676 reply_options(Request, [get,post]), 2677 !. 2678http_pengine_destroy_all(Request) :- 2679 http_parameters(Request, 2680 [ ids(IDsAtom, []) 2681 ]), 2682 atomic_list_concat(IDs, ',', IDsAtom), 2683 forall(( member(ID, IDs), 2684 \+ pengine_detached(ID, _) 2685 ), 2686 pengine_destroy(ID, [force(true)])), 2687 reply_json("ok").
status(Pengine, Stats)
is created, where Stats
is the return of thread_statistics/2.2695http_pengine_ping(Request) :- 2696 reply_options(Request, [get]), 2697 !. 2698http_pengine_ping(Request) :- 2699 http_parameters(Request, 2700 [ id(Pengine, []), 2701 format(Format, [default(prolog)]) 2702 ]), 2703 ( pengine_thread(Pengine, Thread), 2704 Error = error(_,_), 2705 catch(thread_statistics(Thread, Stats), Error, fail) 2706 -> output_result(Format, ping(Pengine, Stats)) 2707 ; output_result(Format, died(Pengine)) 2708 ).
2717http_pengine_list(Request) :- 2718 reply_options(Request, [get]), 2719 !. 2720http_pengine_list(Request) :- 2721 http_parameters(Request, 2722 [ status(Status, [default(detached), oneof([detached])]), 2723 application(Application, [default(pengine_sandbox)]) 2724 ]), 2725 allowed(Request, Application), 2726 authenticate(Request, Application, _UserOptions), 2727 findall(Term, listed_pengine(Application, Status, Term), Terms), 2728 reply_json(json{pengines: Terms}). 2729 2730listed_pengine(Application, detached, State) :- 2731 State = pengine{id:Id, 2732 detached:Time, 2733 queued:Queued, 2734 stats:Stats}, 2735 2736 pengine_property(Id, application(Application)), 2737 pengine_property(Id, detached(Time)), 2738 pengine_queue(Id, Queue, _TimeLimit, _Now), 2739 message_queue_property(Queue, size(Queued)), 2740 ( pengine_thread(Id, Thread), 2741 catch(thread_statistics(Thread, Stats), _, fail) 2742 -> true 2743 ; Stats = thread{status:died} 2744 ).
prolog
, json
or json-s
.2753:- dynamic 2754 pengine_replying/2. % +Pengine, +Thread 2755 2756output_result(Format, Event) :- 2757 arg(1, Event, Pengine), 2758 thread_self(Thread), 2759 cors_enable, % contingent on http:cors setting 2760 disable_client_cache, 2761 setup_call_cleanup( 2762 asserta(pengine_replying(Pengine, Thread), Ref), 2763 catch(output_result(Format, Event, _{}), 2764 pengine_abort_output, 2765 true), 2766 erase(Ref)). 2767 2768output_result(Lang, Event, Dict) :- 2769 write_result(Lang, Event, Dict), 2770 !. 2771output_result(prolog, Event, _) :- 2772 !, 2773 format('Content-type: text/x-prolog; charset=UTF-8~n~n'), 2774 write_term(Event, 2775 [ quoted(true), 2776 ignore_ops(true), 2777 fullstop(true), 2778 blobs(portray), 2779 portray_goal(portray_blob), 2780 nl(true) 2781 ]). 2782output_result(Lang, Event, _) :- 2783 json_lang(Lang), 2784 !, 2785 ( event_term_to_json_data(Event, JSON, Lang) 2786 -> reply_json(JSON) 2787 ; assertion(event_term_to_json_data(Event, _, Lang)) 2788 ). 2789output_result(Lang, _Event, _) :- % FIXME: allow for non-JSON format 2790 domain_error(pengine_format, Lang).
'$BLOB'(Type)
.
Future versions may include more info, depending on Type.2800:- public portray_blob/2. % called from write-term 2801portray_blob(Blob, _Options) :- 2802 blob(Blob, Type), 2803 writeq('$BLOB'(Type)).
2810abort_pending_output(Pengine) :- 2811 forall(pengine_replying(Pengine, Thread), 2812 abort_output_thread(Thread)). 2813 2814abort_output_thread(Thread) :- 2815 catch(thread_signal(Thread, throw(pengine_abort_output)), 2816 error(existence_error(thread, _), _), 2817 true).
prolog
and various JSON dialects. The hook
event_to_json/3 can be used to refine the JSON dialects. This
hook must be used if a completely different output format is
desired.2833disable_client_cache :- 2834 format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c 2835 Pragma: no-cache\r\n\c 2836 Expires: 0\r\n'). 2837 2838event_term_to_json_data(Event, JSON, Lang) :- 2839 event_to_json(Event, JSON, Lang), 2840 !. 2841event_term_to_json_data(success(ID, Bindings0, Projection, Time, More), 2842 json{event:success, id:ID, time:Time, 2843 data:Bindings, more:More, projection:Projection}, 2844 json) :- 2845 !, 2846 term_to_json(Bindings0, Bindings). 2847event_term_to_json_data(destroy(ID, Event), 2848 json{event:destroy, id:ID, data:JSON}, 2849 Style) :- 2850 !, 2851 event_term_to_json_data(Event, JSON, Style). 2852event_term_to_json_data(create(ID, Features0), JSON, Style) :- 2853 !, 2854 ( select(answer(First0), Features0, Features1) 2855 -> event_term_to_json_data(First0, First, Style), 2856 Features = [answer(First)|Features1] 2857 ; Features = Features0 2858 ), 2859 dict_create(JSON, json, [event(create), id(ID)|Features]). 2860event_term_to_json_data(destroy(ID, Event), 2861 json{event:destroy, id:ID, data:JSON}, Style) :- 2862 !, 2863 event_term_to_json_data(Event, JSON, Style). 2864event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :- 2865 !, 2866 Error0 = json{event:error, id:ID, data:Message}, 2867 add_error_details(ErrorTerm, Error0, Error), 2868 message_to_string(ErrorTerm, Message). 2869event_term_to_json_data(failure(ID, Time), 2870 json{event:failure, id:ID, time:Time}, _) :- 2871 !. 2872event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :- 2873 functor(EventTerm, F, 1), 2874 !, 2875 arg(1, EventTerm, ID). 2876event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :- 2877 functor(EventTerm, F, 2), 2878 arg(1, EventTerm, ID), 2879 arg(2, EventTerm, Data), 2880 term_to_json(Data, JSON). 2881 2882:- public add_error_details/3.
pengines_io.pl
.
2889add_error_details(Error, JSON0, JSON) :-
2890 add_error_code(Error, JSON0, JSON1),
2891 add_error_location(Error, JSON1, JSON).
code
field to JSON0 of Error is an ISO error term. The error
code is the functor name of the formal part of the error, e.g.,
syntax_error
, type_error
, etc. Some errors carry more
information:
2904add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :- 2905 atom(Type), 2906 !, 2907 to_atomic(Obj, Value), 2908 Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}). 2909add_error_code(error(Formal, _), Error0, Error) :- 2910 callable(Formal), 2911 !, 2912 functor(Formal, Code, _), 2913 Error = Error0.put(code, Code). 2914add_error_code(_, Error, Error). 2915 2916% What to do with large integers? 2917to_atomic(Obj, Atomic) :- atom(Obj), !, Atomic = Obj. 2918to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj. 2919to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj. 2920to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
location
property if the error can be associated with a
source location. The location is an object with properties file
and line
and, if available, the character location in the line.2929add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :- 2930 atom(Path), integer(Line), 2931 !, 2932 Term = Term0.put(_{location:_{file:Path, line:Line}}). 2933add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :- 2934 atom(Path), integer(Line), integer(Ch), 2935 !, 2936 Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}). 2937add_error_location(_, Term, Term).
success(ID, Bindings, Projection, Time, More)
and output(ID,
Term)
into a format suitable for processing at the client side.2948%:- multifile pengines:event_to_json/3. 2949 2950 2951 /******************************* 2952 * ACCESS CONTROL * 2953 *******************************/
forbidden
header if contact is not allowed.2960allowed(Request, Application) :- 2961 setting(Application:allow_from, Allow), 2962 match_peer(Request, Allow), 2963 setting(Application:deny_from, Deny), 2964 \+ match_peer(Request, Deny), 2965 !. 2966allowed(Request, _Application) :- 2967 memberchk(request_uri(Here), Request), 2968 throw(http_reply(forbidden(Here))). 2969 2970match_peer(_, Allowed) :- 2971 memberchk(*, Allowed), 2972 !. 2973match_peer(_, []) :- !, fail. 2974match_peer(Request, Allowed) :- 2975 http_peer(Request, Peer), 2976 debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]), 2977 ( memberchk(Peer, Allowed) 2978 -> true 2979 ; member(Pattern, Allowed), 2980 match_peer_pattern(Pattern, Peer) 2981 ). 2982 2983match_peer_pattern(Pattern, Peer) :- 2984 ip_term(Pattern, IP), 2985 ip_term(Peer, IP), 2986 !. 2987 2988ip_term(Peer, Pattern) :- 2989 split_string(Peer, ".", "", PartStrings), 2990 ip_pattern(PartStrings, Pattern). 2991 2992ip_pattern([], []). 2993ip_pattern([*], _) :- !. 2994ip_pattern([S|T0], [N|T]) :- 2995 number_string(N, S), 2996 ip_pattern(T0, T).
[user(User)]
, []
or
an exception.3004authenticate(Request, Application, UserOptions) :- 3005 authentication_hook(Request, Application, User), 3006 !, 3007 must_be(ground, User), 3008 UserOptions = [user(User)]. 3009authenticate(_, _, []).
throw(http_reply(authorise(basic(Realm))))
Start a normal HTTP login challenge (reply 401)throw(http_reply(forbidden(Path)))
)
Reject the request using a 403 repply.3031pengine_register_user(Options) :- 3032 option(user(User), Options), 3033 !, 3034 pengine_self(Me), 3035 asserta(pengine_user(Me, User)). 3036pengine_register_user(_).
3047pengine_user(User) :-
3048 pengine_self(Me),
3049 pengine_user(Me, User).
3055reply_options(Request, Allowed) :- 3056 option(method(options), Request), 3057 !, 3058 cors_enable(Request, 3059 [ methods(Allowed) 3060 ]), 3061 format('Content-type: text/plain\r\n'), 3062 format('~n'). % empty body 3063 3064 3065 /******************************* 3066 * COMPILE SOURCE * 3067 *******************************/
3076pengine_src_text(Src, Module) :- 3077 pengine_self(Self), 3078 format(atom(ID), 'pengine://~w/src', [Self]), 3079 extra_load_options(Self, Options), 3080 setup_call_cleanup( 3081 open_chars_stream(Src, Stream), 3082 load_files(Module:ID, 3083 [ stream(Stream), 3084 module(Module), 3085 silent(true) 3086 | Options 3087 ]), 3088 close(Stream)), 3089 keep_source(Self, ID, Src). 3090 3091system'#file'(File, _Line) :- 3092 prolog_load_context(stream, Stream), 3093 set_stream(Stream, file_name(File)), 3094 set_stream(Stream, record_position(false)), 3095 set_stream(Stream, record_position(true)).
3105pengine_src_url(URL, Module) :- 3106 pengine_self(Self), 3107 uri_encoded(path, URL, Path), 3108 format(atom(ID), 'pengine://~w/url/~w', [Self, Path]), 3109 extra_load_options(Self, Options), 3110 ( get_pengine_application(Self, Application), 3111 setting(Application:debug_info, false) 3112 -> setup_call_cleanup( 3113 http_open(URL, Stream, []), 3114 ( set_stream(Stream, encoding(utf8)), 3115 load_files(Module:ID, 3116 [ stream(Stream), 3117 module(Module) 3118 | Options 3119 ]) 3120 ), 3121 close(Stream)) 3122 ; setup_call_cleanup( 3123 http_open(URL, TempStream, []), 3124 ( set_stream(TempStream, encoding(utf8)), 3125 read_string(TempStream, _, Src) 3126 ), 3127 close(TempStream)), 3128 setup_call_cleanup( 3129 open_chars_stream(Src, Stream), 3130 load_files(Module:ID, 3131 [ stream(Stream), 3132 module(Module) 3133 | Options 3134 ]), 3135 close(Stream)), 3136 keep_source(Self, ID, Src) 3137 ). 3138 3139 3140extra_load_options(Pengine, Options) :- 3141 pengine_not_sandboxed(Pengine), 3142 !, 3143 Options = []. 3144extra_load_options(_, [sandboxed(true)]). 3145 3146 3147keep_source(Pengine, ID, SrcText) :- 3148 get_pengine_application(Pengine, Application), 3149 setting(Application:debug_info, true), 3150 !, 3151 to_string(SrcText, SrcString), 3152 assertz(pengine_data(Pengine, source(ID, SrcString))). 3153keep_source(_, _, _). 3154 3155to_string(String, String) :- 3156 string(String), 3157 !. 3158to_string(Atom, String) :- 3159 atom_string(Atom, String), 3160 !. 3161 3162 /******************************* 3163 * SANDBOX * 3164 *******************************/ 3165 3166:- multifile 3167 sandbox:safe_primitive/1. 3168 3169sandbox:safe_primitive(pengines:pengine_input(_, _)). 3170sandbox:safe_primitive(pengines:pengine_output(_)). 3171sandbox:safe_primitive(pengines:pengine_debug(_,_)). 3172 3173 3174 /******************************* 3175 * MESSAGES * 3176 *******************************/ 3177 3178prologerror_message(sandbox(time_limit_exceeded, Limit)) --> 3179 [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl, 3180 'This is normally caused by an insufficiently instantiated'-[], nl, 3181 'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl, 3182 'find all possible instantations of Var.'-[] 3183 ]
Pengines: Web Logic Programming Made Easy
The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.