1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald and Jan Wielemaker 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2012-2013, Jeffrey Rosenwald 7 2018-2020, CWI Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(udp_broadcast, 37 [ udp_broadcast_initialize/2, % +IPAddress, +Options 38 udp_broadcast_close/1, % +Scope 39 40 udp_peer_add/2, % +Scope, +IP 41 udp_peer_del/2, % +Scope, ?IP 42 udp_peer/2 % +Scope, -IP 43 ]). 44:- autoload(library(apply),[maplist/2,maplist/3]). 45:- autoload(library(backcomp),[thread_at_exit/1]). 46:- autoload(library(broadcast), 47 [broadcast_request/1,broadcast/1,listening/3,listen/3]). 48:- autoload(library(debug),[debug/3]). 49:- autoload(library(error), 50 [must_be/2,syntax_error/1,domain_error/2,existence_error/2]). 51:- autoload(library(option),[option/3]). 52:- autoload(library(socket), 53 [ tcp_close_socket/1, 54 udp_socket/1, 55 tcp_bind/2, 56 tcp_getopt/2, 57 tcp_setopt/2, 58 udp_receive/4, 59 udp_send/4 60 ]). 61 62 63% :- debug(udp(broadcast)). 64 65/** <module> A UDP broadcast proxy 66 67SWI-Prolog's broadcast library provides a means that may be used to 68facilitate publish and subscribe communication regimes between anonymous 69members of a community of interest. The members of the community are 70however, necessarily limited to a single instance of Prolog. The UDP 71broadcast library removes that restriction. With this library loaded, 72any member on your local IP subnetwork that also has this library loaded 73may hear and respond to your broadcasts. 74 75This library support three styles of networking as described below. Each 76of these networks have their own advantages and disadvantages. Please 77study the literature to understand the consequences. 78 79 $ broadcast : 80 Broadcast messages are sent to the LAN subnet. The broadcast 81 implementation uses two UDP ports: a public to address the whole 82 group and a private one to address a specific node. Broadcasting 83 is generally a good choice if the subnet is small and traffic is 84 low. 85 86 $ unicast : 87 Unicast sends copies of packages to known peers. Unicast networks 88 can easily be routed. The unicast version uses a single UDP port 89 per node. Unicast is generally a good choice for a small party, 90 in particular if the peers are in different networks. 91 92 $ multicast : 93 Multicast is like broadcast, but it can be configured to 94 work accross networks and may work more efficiently on VLAN networks. 95 Like the broadcast setup, two UDP ports are used. Multicasting can 96 in general deliver the most efficient LAN and WAN networks, but 97 requires properly configured routing between the peers. 98 99After initialization and, in the case of a _unicast_ network managing 100the set of peers, communication happens through broadcast/1, 101broadcast_request/1 and listen/1,2,3. 102 103A broadcast/1 or broadcast_request/1 of the shape udp(Scope, Term) or 104udp(Scope, Term, TimeOut) is forwarded over the UDP network to all peers 105that joined the same `Scope`. To prevent the potential for feedback 106loops, only the plain `Term` is broadcasted locally. The timeout is 107optional. It specifies the amount to time to wait for replies to arrive 108in response to a broadcast_request/1. The default period is 0.250 109seconds. The timeout is ignored for broadcasts. 110 111An example of three separate processes cooperating in the same _scope_ 112called `peers`: 113 114== 115Process A: 116 117 ?- listen(number(X), between(1, 5, X)). 118 true. 119 120 ?- 121 122Process B: 123 124 ?- listen(number(X), between(7, 9, X)). 125 true. 126 127 ?- 128 129Process C: 130 131 ?- findall(X, broadcast_request(udp(peers, number(X))), Xs). 132 Xs = [1, 2, 3, 4, 5, 7, 8, 9]. 133 134 ?- 135== 136 137It is also possible to carry on a private dialog with a single 138responder. To do this, you supply a compound of the form, Term:PortId, 139to a UDP scoped broadcast/1 or broadcast_request/1, where PortId is the 140ip-address and port-id of the intended listener. If you supply an 141unbound variable, PortId, to broadcast_request, it will be unified with 142the address of the listener that responds to Term. You may send a 143directed broadcast to a specific member by simply providing this address 144in a similarly structured compound to a UDP scoped broadcast/1. The 145message is sent via unicast to that member only by way of the member's 146broadcast listener. It is received by the listener just as any other 147broadcast would be. The listener does not know the difference. 148 149For example, in order to discover who responded with a particular value: 150 151== 152Host B Process 1: 153 154 ?- listen(number(X), between(1, 5, X)). 155 true. 156 157 ?- 158 159Host A Process 1: 160 161 162 ?- listen(number(X), between(7, 9, X)). 163 true. 164 165 ?- 166 167Host A Process 2: 168 169 ?- listen(number(X), between(1, 5, X)). 170 true. 171 172 ?- bagof(X, broadcast_request(udp(peers,number(X):From,1)), Xs). 173 From = ip(192, 168, 1, 103):34855, 174 Xs = [7, 8, 9] ; 175 From = ip(192, 168, 1, 103):56331, 176 Xs = [1, 2, 3, 4, 5] ; 177 From = ip(192, 168, 1, 104):3217, 178 Xs = [1, 2, 3, 4, 5]. 179== 180 181All incomming trafic is handled by a single thread with the alias 182`udp_inbound_proxy`. This thread also performs the internal dispatching 183using broadcast/1 and broadcast_request/1. Future versions may provide 184for handling these requests in separate threads. 185 186 187## Caveats {#udp-broadcase-caveats} 188 189While the implementation is mostly transparent, there are some important 190and subtle differences that must be taken into consideration: 191 192 * UDP broadcast requires an initialization step in order to 193 launch the broadcast listener proxy. See 194 udp_broadcast_initialize/2. 195 196 * Prolog's broadcast_request/1 is nondet. It sends the request, 197 then evaluates the replies synchronously, backtracking as needed 198 until a satisfactory reply is received. The remaining potential 199 replies are not evaluated. With UDP, all peers will send all 200 answers to the query. The receiver may however stop listening. 201 202 * A UDP broadcast/1 is completely asynchronous. 203 204 * A UDP broadcast_request/1 is partially synchronous. A 205 broadcast_request/1 is sent, then the sender balks for a period of 206 time (default: 250 ms) while the replies are collected. Any reply 207 that is received after this period is silently discarded. A 208 optional second argument is provided so that a sender may specify 209 more (or less) time for replies. 210 211 * Replies are presented to the user as a choice point on arrival, 212 until the broadcast request timer finally expires. This 213 allows traffic to propagate through the system faster and provides 214 the requestor with the opportunity to terminate a broadcast request 215 early if desired, by simply cutting choice points. 216 217 * Please beware that broadcast request transactions remain active 218 and resources consumed until broadcast_request finally fails on 219 backtracking, an uncaught exception occurs, or until choice points 220 are cut. Failure to properly manage this will likely result in 221 chronic exhaustion of UDP sockets. 222 223 * If a listener is connected to a generator that always succeeds 224 (e.g. a random number generator), then the broadcast request will 225 never terminate and trouble is bound to ensue. 226 227 * broadcast_request/1 with =|udp_subnet|= scope is _not_ reentrant. 228 If a listener performs a broadcast_request/1 with UDP scope 229 recursively, then disaster looms certain. This caveat does not apply 230 to a UDP scoped broadcast/1, which can safely be performed from a 231 listener context. 232 233 * UDP broadcast's capacity is not infinite. While it can tolerate 234 substantial bursts of activity, it is designed for short bursts of 235 small messages. Unlike TIPC, UDP is unreliable and has no QOS 236 protections. Congestion is likely to cause trouble in the form of 237 non-Byzantine failure. That is, late, lost (e.g. infinitely late), 238 or duplicate datagrams. Caveat emptor. 239 240 * A UDP broadcast_request/1 term that is grounded is considered to 241 be a broadcast only. No replies are collected unless the there is at 242 least one unbound variable to unify. 243 244 * A UDP broadcast/1 always succeeds, even if there are no 245 listeners. 246 247 * A UDP broadcast_request/1 that receives no replies will fail. 248 249 * Replies may be coming from many different places in the network 250 (or none at all). No ordering of replies is implied. 251 252 * Prolog terms are sent to others after first converting them to 253 atoms using term_string/3. Serialization does not deal with cycles, 254 attributes or sharing. The hook udp_term_string_hook/3 may be 255 defined to change the message serialization and support different 256 message formats and/or encryption. 257 258 * The broadcast model is based on anonymity and a presumption of 259 trust--a perfect recipe for compromise. UDP is an Internet protocol. 260 A UDP broadcast listener exposes a public port, which is 261 static and shared by all listeners, and a private port, which is 262 semi-static and unique to the listener instance. Both can be seen 263 from off-cluster nodes and networks. Usage of this module exposes 264 the node and consequently, the cluster to significant security 265 risks. So have a care when designing your application. You must talk 266 only to those who share and contribute to your concerns using a 267 carefully prescribed protocol. 268 269 * UDP broadcast categorically and silently ignores all message 270 traffic originating from or terminating on nodes that are not 271 members of the local subnet. This security measure only keeps honest 272 people honest! 273 274@author Jeffrey Rosenwald (JeffRose@acm.org), Jan Wielemaker 275@license BSD-2 276@see tipc.pl 277*/ 278 279:- multifile 280 udp_term_string_hook/3, % +Scope, ?Term, ?String 281 udp_unicast_join_hook/3, % +Scope, +From, +Data 282 black_list/1. % +Term 283 284:- meta_predicate 285 safely( ), 286 safely_det( ). 287 288safely(Predicate) :- 289 Err = error(_,_), 290 catch(Predicate, Err, 291 print_message_fail(Err)). 292 293safely_det(Predicate) :- 294 Err = error(_,_), 295 catch(Predicate, Err, 296 print_message_fail(Err)), 297 !. 298safely_det(_). 299 300print_message_fail(Term) :- 301 print_message(error, Term), 302 fail. 303 304udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :- 305 IPAddress = ip(A1, A2, A3, A4), 306 Subnet = ip(S1, S2, S3, S4), 307 BroadcastAddress = ip(B1, B2, B3, B4), 308 309 B1 is A1 \/ (S1 xor 255), 310 B2 is A2 \/ (S2 xor 255), 311 B3 is A3 \/ (S3 xor 255), 312 B4 is A4 \/ (S4 xor 255). 313 314%! udp_broadcast_service(?Scope, ?Address) is nondet. 315% 316% provides the UDP broadcast address for a given Scope. At present, 317% only one scope is supported, =|udp_subnet|=. 318 319%! udp_scope(?ScopeName, ?ScopeDef) 320 321:- dynamic 322 udp_scope/2, 323 udp_scope_peer/2. 324:- volatile 325 udp_scope/2, 326 udp_scope_peer/2. 327% 328% Here's a UDP proxy to Prolog's broadcast library 329% 330% A sender may extend a broadcast to a subnet of a UDP network by 331% specifying a =|udp_subnet|= scoping qualifier in his/her broadcast. 332% The qualifier has the effect of selecting the appropriate multi-cast 333% address for the transmission. Thus, the sender of the message has 334% control over the scope of his/her traffic on a per-message basis. 335% 336% All in-scope listeners receive the broadcast and simply rebroadcast 337% the message locally. All broadcast replies, if any, are sent directly 338% to the sender via the port-id that was received with the broadcast. 339% 340% Each listener exposes two UDP ports, a shared public port that is 341% bound to a well-known port number and a private port that uniquely 342% indentifies the listener. Broadcasts are received on the public port 343% and replies are sent on the private port. Directed broadcasts 344% (unicasts) are received on the private port and replies are sent on 345% the private port. 346 347% Thread 1 listens for directed traffic on the private port. 348% 349 350:- dynamic 351 udp_private_socket/3, % Port, Socket, FileNo 352 udp_public_socket/4, % Scope, Port, Socket, FileNo 353 udp_closed/1. % Scope 354 355udp_inbound_proxy(Master) :- 356 thread_at_exit(inbound_proxy_died), 357 make_private_socket, 358 thread_send_message(Master, udp_inbound_ready), 359 udp_inbound_proxy_loop. 360 361udp_inbound_proxy_loop :- 362 forall(udp_scope(Scope, ScopeData), 363 make_public_socket(ScopeData, Scope)), 364 retractall(udp_closed(_)), 365 findall(FileNo, udp_socket_file_no(FileNo), FileNos), 366 catch(dispatch_inbound(FileNos), 367 E, dispatch_exception(E)), 368 udp_inbound_proxy_loop. 369 370dispatch_exception(E) :- 371 E = error(_,_), 372 !, 373 print_message(warning, E). 374dispatch_exception(_). 375 376 377%! make_private_socket is det. 378% 379% Create our private socket. This socket is used for messages that are 380% directed to me. Note that we only need this for broadcast networks. 381% If we use a unicast network we use our public port to contact this 382% specific server. 383 384make_private_socket :- 385 udp_private_socket(_Port, S, _F), 386 !, 387 ( ( udp_scope(Scope, broadcast(_,_,_)) 388 ; udp_scope(Scope, multicast(_,_)) 389 ), 390 \+ udp_closed(Scope) 391 -> true 392 ; tcp_close_socket(S), 393 retractall(udp_private_socket(_,_,_)) 394 ). 395make_private_socket :- 396 udp_scope(_, broadcast(_,_,_)), 397 !, 398 udp_socket(S), 399 tcp_bind(S, Port), 400 tcp_getopt(S, file_no(F)), 401 tcp_setopt(S, broadcast), 402 assertz(udp_private_socket(Port, S, F)). 403make_private_socket :- 404 udp_scope(_, multicast(_,_)), 405 !, 406 udp_socket(S), 407 tcp_bind(S, Port), 408 tcp_getopt(S, file_no(F)), 409 assertz(udp_private_socket(Port, S, F)). 410make_private_socket. 411 412%! make_public_socket(+ScopeData, +Scope) 413% 414% Create the public port Scope. 415 416make_public_socket(_, Scope) :- 417 udp_public_socket(Scope, _Port, S, _), 418 !, 419 ( udp_closed(Scope) 420 -> tcp_close_socket(S), 421 retractall(udp_public_socket(Scope, _, _, _)) 422 ; true 423 ). 424make_public_socket(broadcast(_SubNet, _Broadcast, Port), Scope) :- 425 udp_socket(S), 426 tcp_setopt(S, reuseaddr), 427 tcp_bind(S, Port), 428 tcp_getopt(S, file_no(F)), 429 assertz(udp_public_socket(Scope, Port, S, F)). 430make_public_socket(multicast(Group, Port), Scope) :- 431 udp_socket(S), 432 tcp_setopt(S, reuseaddr), 433 tcp_bind(S, Port), 434 tcp_setopt(S, ip_add_membership(Group)), 435 tcp_getopt(S, file_no(F)), 436 assertz(udp_public_socket(Scope, Port, S, F)). 437make_public_socket(unicast(Port), Scope) :- 438 udp_socket(S), 439 tcp_bind(S, Port), 440 tcp_getopt(S, file_no(F)), 441 assertz(udp_public_socket(Scope, Port, S, F)). 442 443udp_socket_file_no(FileNo) :- 444 udp_private_socket(_,_,FileNo). 445udp_socket_file_no(FileNo) :- 446 udp_public_socket(_,_,_,FileNo). 447 448%! dispatch_inbound(+FileNos) 449% 450% Dispatch inbound traffic. This loop uses wait_for_input/3 to wait 451% for one or more UDP sockets and dispatches the requests using the 452% internal broadcast service. For an incomming broadcast _request_ we 453% send the reply only to the requester and therefore we must use a 454% socket that is not in broadcast mode. 455 456dispatch_inbound(FileNos) :- 457 debug(udp(broadcast), 'Waiting for ~p', [FileNos]), 458 wait_for_input(FileNos, Ready, infinite), 459 debug(udp(broadcast), 'Ready: ~p', [Ready]), 460 maplist(dispatch_ready, Ready), 461 dispatch_inbound(FileNos). 462 463dispatch_ready(FileNo) :- 464 udp_private_socket(_Port, Private, FileNo), 465 !, 466 udp_receive(Private, Data, From, [max_message_size(65535)]), 467 debug(udp(broadcast), 'Inbound on private port', []), 468 ( in_scope(Scope, From), 469 udp_term_string(Scope, Term, Data) % only accept valid data 470 -> ld_dispatch(Private, Term, From, Scope) 471 ; true 472 ). 473dispatch_ready(FileNo) :- 474 udp_public_socket(Scope, _PublicPort, Public, FileNo), 475 !, 476 udp_receive(Public, Data, From, [max_message_size(65535)]), 477 debug(udp(broadcast), 'Inbound on public port from ~p for scope ~p', 478 [From, Scope]), 479 ( in_scope(Scope, From), 480 udp_term_string(Scope, Term, Data) % only accept valid data 481 -> ( udp_scope(Scope, unicast(_)) 482 -> ld_dispatch(Public, Term, From, Scope) 483 ; udp_private_socket(_PrivatePort, Private, _FileNo), 484 ld_dispatch(Private, Term, From, Scope) 485 ) 486 ; udp_scope(Scope, unicast(_)), 487 udp_term_string(Scope, Term, Data), 488 unicast_out_of_scope_request(Scope, From, Term) 489 -> true 490 ; true 491 ). 492 493in_scope(Scope, Address) :- 494 udp_scope(Scope, ScopeData), 495 in_scope(ScopeData, Scope, Address), 496 !. 497in_scope(Scope, From) :- 498 debug(udp(broadcast), 'Out-of-scope ~p datagram from ~p', 499 [Scope, From]), 500 fail. 501 502in_scope(broadcast(Subnet, Broadcast, _PublicPort), _Scope, IP:_FromPort) :- 503 udp_broadcast_address(IP, Subnet, Broadcast). 504in_scope(multicast(_Group, _Port), _Scope, _From). 505in_scope(unicast(_PublicPort), Scope, IP:_) :- 506 udp_peer(Scope, IP:_). 507 508 509%! ld_dispatch(+PrivateSocket, +Term, +From, +Scope) 510% 511% Locally dispatch Term received from From. If it concerns a broadcast 512% request, send the replies to PrivateSocket to From. The multifile 513% hook black_list/1 can be used to ignore certain messages. 514 515ld_dispatch(_S, Term, From, _Scope) :- 516 debug(udp(broadcast), 'ld_dispatch(~p) from ~p', [Term, From]), 517 fail. 518ld_dispatch(_S, Term, _From, _Scope) :- 519 blacklisted(Term), !. 520ld_dispatch(S, request(Key, Term), From, Scope) :- 521 !, 522 forall(safely(broadcast_request(Term)), 523 safely((udp_term_string(Scope, reply(Key,Term), Message), 524 udp_send(S, Message, From, [])))). 525ld_dispatch(_S, send(Term), _From, _Scope) :- 526 !, 527 safely_det(broadcast(Term)). 528ld_dispatch(_S, reply(Key, Term), From, _Scope) :- 529 ( reply_queue(Key, Queue) 530 -> safely(thread_send_message(Queue, Term:From)) 531 ; true 532 ). 533 534blacklisted(send(Term)) :- black_list(Term). 535blacklisted(request(_,Term)) :- black_list(Term). 536blacklisted(reply(_,Term)) :- black_list(Term). 537 538 539%! reload_udp_proxy 540% 541% Update the UDP relaying proxy service. The proxy consists of three 542% forwarding mechanisms: 543% 544% - Listen on our _scope_. If any messages are received, hand them 545% to udp_broadcast/3 to be broadcasted to _scope_ or sent to a 546% specific recipient. 547% - Listen on the _scope_ public port. Incomming messages are 548% relayed to the internal broadcast mechanism and replies are sent 549% to from our private socket. 550% - Listen on our private port and reply using the same port. 551 552reload_udp_proxy :- 553 reload_outbound_proxy, 554 reload_inbound_proxy. 555 556reload_outbound_proxy :- 557 listening(udp_broadcast, udp(_,_), _), 558 !. 559reload_outbound_proxy :- 560 listen(udp_broadcast, udp(Scope,Message), 561 udp_broadcast(Message, Scope, 0.25)), 562 listen(udp_broadcast, udp(Scope,Message,Timeout), 563 udp_broadcast(Message, Scope, Timeout)), 564 listen(udp_broadcast, udp_subnet(Message), % backward compatibility 565 udp_broadcast(Message, subnet, 0.25)), 566 listen(udp_broadcast, udp_subnet(Message,Timeout), 567 udp_broadcast(Message, subnet, Timeout)). 568 569reload_inbound_proxy :- 570 catch(thread_signal(udp_inbound_proxy, throw(udp_reload)), 571 error(existence_error(thread, _),_), 572 fail), 573 !. 574reload_inbound_proxy :- 575 thread_self(Me), 576 thread_create(udp_inbound_proxy(Me), _, 577 [ alias(udp_inbound_proxy), 578 detached(true) 579 ]), 580 thread_get_message(Me, udp_inbound_ready, [timeout(10)]). 581 582inbound_proxy_died :- 583 thread_self(Self), 584 thread_property(Self, status(Status)), 585 ( catch(recreate_proxy(Status), _, fail) 586 -> print_message(informational, 587 httpd_restarted_worker(Self)) 588 ; done_status_message_level(Status, Level), 589 print_message(Level, 590 httpd_stopped_worker(Self, Status)) 591 ). 592 593recreate_proxy(exception(Error)) :- 594 recreate_on_error(Error), 595 reload_inbound_proxy. 596 597recreate_on_error('$aborted'). 598recreate_on_error(time_limit_exceeded). 599 600done_status_message_level(true, silent) :- !. 601done_status_message_level(exception('$aborted'), silent) :- !. 602done_status_message_level(_, informational). 603 604 605%! udp_broadcast_close(+Scope) 606% 607% Close a UDP broadcast scope. 608 609udp_broadcast_close(Scope) :- 610 udp_scope(Scope, _ScopeData), 611 !, 612 assert(udp_closed(Scope)), 613 reload_udp_proxy. 614udp_broadcast_close(_). 615 616 617%! udp_broadcast(+What, +Scope, +TimeOut) 618% 619% Send a broadcast request to my UDP peers in Scope. What is either of 620% the shape `Term:Address` to send Term to a specific address or query 621% the address from which term is answered or it is a plain `Term`. 622% 623% If `Term` is nonground, it is considered is a _request_ (see 624% broadcast_request/1) and the predicate succeeds for each answer 625% received within TimeOut seconds. If Term is ground it is considered 626% an asynchronous broadcast and udp_broadcast/3 is deterministic. 627 628udp_broadcast(Term:To, Scope, _Timeout) :- 629 ground(Term), ground(To), % broadcast to single listener 630 !, 631 udp_basic_broadcast(send(Term), Scope, single(To)). 632udp_broadcast(Term, Scope, _Timeout) :- 633 ground(Term), % broadcast to all listeners 634 !, 635 udp_basic_broadcast(send(Term), Scope, broadcast). 636udp_broadcast(Term:To, Scope, Timeout) :- 637 ground(To), % request to single listener 638 !, 639 setup_call_cleanup( 640 request_queue(Id, Queue), 641 ( udp_basic_broadcast(request(Id, Term), Scope, single(To)), 642 udp_br_collect_replies(Queue, Timeout, Term:To) 643 ), 644 destroy_request_queue(Queue)). 645udp_broadcast(Term:From, Scope, Timeout) :- 646 !, % request to all listeners, collect sender 647 setup_call_cleanup( 648 request_queue(Id, Queue), 649 ( udp_basic_broadcast(request(Id, Term), Scope, broadcast), 650 udp_br_collect_replies(Queue, Timeout, Term:From) 651 ), 652 destroy_request_queue(Queue)). 653udp_broadcast(Term, Scope, Timeout) :- % request to all listeners 654 udp_broadcast(Term:_, Scope, Timeout). 655 656:- dynamic 657 reply_queue/2. 658 659request_queue(Id, Queue) :- 660 Id is random(1<<63), 661 message_queue_create(Queue), 662 asserta(reply_queue(Id, Queue)). 663 664destroy_request_queue(Queue) :- % leave queue to GC 665 retractall(reply_queue(_, Queue)). 666 667 668%! udp_basic_broadcast(+Term, +Dest) is multi. 669% 670% Create a UDP private socket and use it to send Term to Address. If 671% Address is our broadcast address, set the socket in broadcast mode. 672% 673% This predicate succeeds with a choice point. Committing the choice 674% point closes S. 675% 676% @arg Dest is one of single(Target) or `broadcast`. 677 678udp_basic_broadcast(Term, Scope, Dest) :- 679 debug(udp(broadcast), 'UDP proxy outbound ~p to ~p', [Term, Dest]), 680 udp_term_string(Scope, Term, String), 681 udp_send_message(Dest, String, Scope). 682 683udp_send_message(single(Address), String, Scope) :- 684 ( udp_scope(Scope, unicast(_)) 685 -> udp_public_socket(Scope, _Port, S, _) 686 ; udp_private_socket(_Port, S, _F) 687 ), 688 safely(udp_send(S, String, Address, [])). 689udp_send_message(broadcast, String, Scope) :- 690 ( udp_scope(Scope, unicast(_)) 691 -> udp_public_socket(Scope, _Port, S, _), 692 forall(udp_peer(Scope, Address), 693 ( debug(udp(broadcast), 'Unicast to ~p', [Address]), 694 safely(udp_send(S, String, Address, [])))) 695 ; udp_scope(Scope, broadcast(_SubNet, Broadcast, Port)) 696 -> udp_private_socket(_PrivatePort, S, _F), 697 udp_send(S, String, Broadcast:Port, []) 698 ; udp_scope(Scope, multicast(Group, Port)) 699 -> udp_private_socket(_PrivatePort, S, _F), 700 udp_send(S, String, Group:Port, []) 701 ). 702 703% ! udp_br_collect_replies(+Queue, +TimeOut, -TermAndFrom) is nondet. 704% 705% Collect replies on Socket for TimeOut seconds. Succeed for each 706% received message. 707 708udp_br_collect_replies(Queue, Timeout, Reply) :- 709 get_time(Start), 710 Deadline is Start+Timeout, 711 repeat, 712 ( thread_get_message(Queue, Reply, 713 [ deadline(Deadline) 714 ]) 715 -> true 716 ; !, 717 fail 718 ). 719 720%! udp_broadcast_initialize(+IPAddress, +Options) is semidet. 721% 722% Initialized UDP broadcast bridge. IPAddress is the IP address on the 723% network we want to broadcast on. IP addresses are terms ip(A,B,C,D) 724% or an atom or string of the format =|A.B.C.D|=. Options processed: 725% 726% - scope(+ScopeName) 727% Name of the scope. Default is `subnet`. 728% - subnet_mask(+SubNet) 729% Subnet to broadcast on. This uses the same syntax as IPAddress. 730% Default classifies the network as class A, B or C depending on 731% the the first octet and applies the default mask. 732% - port(+Port) 733% Public port to use. Default is 20005. 734% - method(+Method) 735% Method to send a message to multiple peers. One of 736% - broadcast 737% Use UDP broadcast messages to the LAN. This is the 738% default 739% - multicast 740% Use UDP multicast messages. This can be used on WAN networks, 741% provided the intermediate routers understand multicast. 742% - unicast 743% Send the messages individually to all registered peers. 744% 745% For compatibility reasons Options may be the subnet mask. 746 747udp_broadcast_initialize(IP, Options) :- 748 with_mutex(udp_broadcast, 749 udp_broadcast_initialize_sync(IP, Options)). 750 751udp_broadcast_initialize_sync(IP, Options) :- 752 nonvar(Options), 753 Options = ip(_,_,_,_), 754 !, 755 udp_broadcast_initialize(IP, [subnet_mask(Options)]). 756udp_broadcast_initialize_sync(IP, Options) :- 757 to_ip4(IP, IPAddress), 758 option(method(Method), Options, broadcast), 759 must_be(oneof([broadcast, multicast, unicast]), Method), 760 udp_broadcast_initialize_sync(Method, IPAddress, Options), 761 reload_udp_proxy. 762 763udp_broadcast_initialize_sync(broadcast, IPAddress, Options) :- 764 option(subnet_mask(Subnet), Options, _), 765 mk_subnet(Subnet, IPAddress, Subnet4), 766 option(port(Port), Options, 20005), 767 option(scope(Scope), Options, subnet), 768 769 udp_broadcast_address(IPAddress, Subnet4, Broadcast), 770 udp_broadcast_close(Scope), 771 assertz(udp_scope(Scope, broadcast(Subnet4, Broadcast, Port))). 772udp_broadcast_initialize_sync(unicast, _IPAddress, Options) :- 773 option(port(Port), Options, 20005), 774 option(scope(Scope), Options, subnet), 775 udp_broadcast_close(Scope), 776 assertz(udp_scope(Scope, unicast(Port))). 777udp_broadcast_initialize_sync(multicast, IPAddress, Options) :- 778 option(port(Port), Options, 20005), 779 option(scope(Scope), Options, subnet), 780 udp_broadcast_close(Scope), 781 multicast_address(IPAddress), 782 assertz(udp_scope(Scope, multicast(IPAddress, Port))). 783 784to_ip4(Atomic, ip(A,B,C,D)) :- 785 atomic(Atomic), 786 !, 787 ( split_string(Atomic, ".", "", Strings), 788 maplist(number_string, [A,B,C,D], Strings) 789 -> true 790 ; syntax_error(illegal_ip_address) 791 ). 792to_ip4(IP, IP). 793 794mk_subnet(Var, IP, Subnet) :- 795 var(Var), 796 !, 797 ( default_subnet(IP, Subnet) 798 -> true 799 ; domain_error(ip_with_subnet, IP) 800 ). 801mk_subnet(Subnet, _, Subnet4) :- 802 to_ip4(Subnet, Subnet4). 803 804%! default_subnet(+IP, -NetWork) 805% 806% Determine the default network address from an IP address. This 807% classifies the network as class A, B or C. 808% 809% @see https://docs.oracle.com/cd/E19504-01/802-5753/planning3-78185/index.html 810 811default_subnet(ip(A,_,_,_), ip(A,0,0,0)) :- 812 between(0,127, A), !. 813default_subnet(ip(A,B,_,_), ip(A,B,0,0)) :- 814 between(128,191, A), !. 815default_subnet(ip(A,B,C,_), ip(A,B,C,0)) :- 816 between(192,223, A), !. 817 818multicast_address(ip(A,_,_,_)) :- 819 between(224, 239, A), 820 !. 821multicast_address(IP) :- 822 domain_error(multicast_network, IP). 823 824 825 /******************************* 826 * UNICAST PEERS * 827 *******************************/ 828 829%! udp_peer_add(+Scope, +Address) is det. 830%! udp_peer_del(+Scope, ?Address) is det. 831%! udp_peer(?Scope, ?Address) is nondet. 832% 833% Manage and query the set of known peers for a unicast network. 834% Address is either a term IP:Port or a plain IP address. In the 835% latter case the default port registered with the scope is used. 836% 837% @arg Address has canonical form ip(A,B,C,D):Port. 838 839udp_peer_add(Scope, Address) :- 840 must_be(ground, Address), 841 peer_address(Address, Scope, Canonical), 842 ( udp_scope_peer(Scope, Canonical) 843 -> true 844 ; assertz(udp_scope_peer(Scope, Canonical)) 845 ). 846 847udp_peer_del(Scope, Address) :- 848 peer_address(Address, Scope, Canonical), 849 retractall(udp_scope_peer(Scope, Canonical)). 850 851udp_peer(Scope, IPAddress) :- 852 udp_scope_peer(Scope, IPAddress). 853 854peer_address(IP:Port, _Scope, IPAddress:Port) :- 855 !, 856 to_ip4(IP, IPAddress). 857peer_address(IP, Scope, IPAddress:Port) :- 858 ( udp_scope(Scope, unicast(Port)) 859 -> true 860 ; existence_error(udp_scope, Scope) 861 ), 862 to_ip4(IP, IPAddress). 863 864 865 866 /******************************* 867 * HOOKS * 868 *******************************/ 869 870%! udp_term_string_hook(+Scope, +Term, -String) is det. 871%! udp_term_string_hook(+Scope, -Term, +String) is semidet. 872% 873% Hook for serializing the message Term. The default writes 874% =|%prolog\n|=, followed by the Prolog term in quoted notation while 875% ignoring operators. This hook may use alternative serialization such 876% as fast_term_serialized/2, use library(ssl) to realise encrypted 877% messages, etc. 878% 879% @arg Scope is the scope for which the message is broadcasted. This 880% can be used to use different serialization for different scopes. 881% @arg Term encapsulates the term broadcasted by the application as 882% follows: 883% 884% - send(ApplTerm) 885% Is sent by broadcast(udp(Scope, ApplTerm)) 886% - request(Id,ApplTerm) 887% Is sent by broadcast_request/1, where Id is a unique large 888% (64 bit) integer. 889% - reply(Id,ApplTerm) 890% Is sent to reply on a broadcast_request/1 request that has 891% been received. Arguments are the same as above. 892% 893% @throws The hook may throw udp(invalid_message) to stop processing 894% the message. 895 896%! udp_term_string(+Scope, +Term, -String) is det. 897%! udp_term_string(+Scope, -Term, +String) is semidet. 898% 899% Serialize an arbitrary Prolog term as a string. The string is 900% prefixed by a magic key to ensure we only accept messages that are 901% meant for us. 902% 903% In mode (+,-), Term is written with the options ignore_ops(true) and 904% quoted(true). 905% 906% This predicate first calls udp_term_string_hook/3. 907 908udp_term_string(Scope, Term, String) :- 909 catch(udp_term_string_hook(Scope, Term, String), udp(Error), true), 910 !, 911 ( var(Error) 912 -> true 913 ; Error == invalid_message 914 -> fail 915 ; throw(udp(Error)) 916 ). 917udp_term_string(_Scope, Term, String) :- 918 ( var(String) 919 -> format(string(String), '%-prolog-\n~W', 920 [ Term, 921 [ ignore_ops(true), 922 quoted(true) 923 ] 924 ]) 925 ; sub_string(String, 0, _, _, '%-prolog-\n'), 926 term_string(Term, String, 927 [ syntax_errors(quiet) 928 ]) 929 ). 930 931%! unicast_out_of_scope_request(+Scope, +From, +Data) is semidet. 932 933%! udp_unicast_join_hook(+Scope, +From, +Data) is semidet. 934% 935% This multifile hook is called if an UDP package is received on the 936% port of the unicast network identified by Scope. From is the origin 937% IP and port and Data is the message data that is deserialized as 938% defined for the scope (see udp_term_string/3). 939% 940% This hook is intended to initiate a new node joining the network of 941% peers. We could in theory also omit the in-scope test and use a 942% normal broadcast to join. Using a different channal however provides 943% a basic level of security. A possibe implementation is below. The 944% first fragment is a hook added to the server, the second is a 945% predicate added to a client and the last initiates the request in 946% the client. The excanged term (join(X)) can be used to exchange a 947% welcome handshake. 948% 949% 950% ``` 951% :- multifile udp_broadcast:udp_unicast_join_hook/3. 952% udp_broadcast:udp_unicast_join_hook(Scope, From, join(welcome)) :- 953% udp_peer_add(Scope, From), 954% ``` 955% 956% ``` 957% join_request(Scope, Address, Reply) :- 958% udp_peer_add(Scope, Address), 959% broadcast_request(udp(Scope, join(X))). 960% ``` 961% 962% ``` 963% ?- join_request(myscope, "1.2.3.4":10001, Reply). 964% Reply = welcome. 965% ``` 966 967unicast_out_of_scope_request(Scope, From, send(Term)) :- 968 udp_unicast_join_hook(Scope, From, Term). 969unicast_out_of_scope_request(Scope, From, request(Key, Term)) :- 970 udp_unicast_join_hook(Scope, From, Term), 971 udp_public_socket(Scope, _Port, Socket, _FileNo), 972 safely((udp_term_string(Scope, reply(Key,Term), Message), 973 udp_send(Socket, Message, From, [])))