View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald and Jan Wielemaker
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2012-2013, Jeffrey Rosenwald
    7		   2018-2020, CWI Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(udp_broadcast,
   37          [ udp_broadcast_initialize/2,         % +IPAddress, +Options
   38            udp_broadcast_close/1,		% +Scope
   39
   40            udp_peer_add/2,                     % +Scope, +IP
   41            udp_peer_del/2,                     % +Scope, ?IP
   42            udp_peer/2                          % +Scope, -IP
   43          ]).   44:- autoload(library(apply),[maplist/2,maplist/3]).   45:- autoload(library(backcomp),[thread_at_exit/1]).   46:- autoload(library(broadcast),
   47	    [broadcast_request/1,broadcast/1,listening/3,listen/3]).   48:- use_module(library(debug),[debug/3]).   49:- autoload(library(error),
   50	    [must_be/2,syntax_error/1,domain_error/2,existence_error/2]).   51:- autoload(library(option),[option/3]).   52:- autoload(library(socket),
   53	    [ tcp_close_socket/1,
   54	      udp_socket/1,
   55	      tcp_bind/2,
   56	      tcp_getopt/2,
   57	      tcp_setopt/2,
   58	      udp_receive/4,
   59	      udp_send/4
   60	    ]).   61
   62
   63% :- debug(udp(broadcast)).
   64
   65/** <module> A UDP broadcast proxy
   66
   67SWI-Prolog's broadcast library provides a  means   that  may  be used to
   68facilitate publish and subscribe communication regimes between anonymous
   69members of a community of interest.  The   members  of the community are
   70however, necessarily limited to a  single   instance  of Prolog. The UDP
   71broadcast library removes that restriction.   With  this library loaded,
   72any member on your local IP subnetwork that also has this library loaded
   73may hear and respond to your broadcasts.
   74
   75This library support three styles of networking as described below. Each
   76of these networks have their own   advantages  and disadvantages. Please
   77study the literature to understand the consequences.
   78
   79  $ broadcast :
   80  Broadcast messages are sent to the LAN subnet. The broadcast
   81  implementation uses two UDP ports: a public to address the whole
   82  group and a private one to address a specific node.  Broadcasting
   83  is generally a good choice if the subnet is small and traffic is
   84  low.
   85
   86  $ unicast :
   87  Unicast sends copies of packages to known peers.  Unicast networks
   88  can easily be routed.  The unicast version uses a single UDP port
   89  per node.  Unicast is generally a good choice for a small party,
   90  in particular if the peers are in different networks.
   91
   92  $ multicast :
   93  Multicast is like broadcast, but it can be configured to
   94  work accross networks and may work more efficiently on VLAN networks.
   95  Like the broadcast setup, two UDP ports are used.  Multicasting can
   96  in general deliver the most efficient LAN and WAN networks, but
   97  requires properly configured routing between the peers.
   98
   99After initialization and, in the case   of  a _unicast_ network managing
  100the  set  of  peers,   communication    happens   through   broadcast/1,
  101broadcast_request/1 and listen/1,2,3.
  102
  103A broadcast/1 or broadcast_request/1 of the   shape  udp(Scope, Term) or
  104udp(Scope, Term, TimeOut) is forwarded over the UDP network to all peers
  105that joined the same `Scope`.  To   prevent  the  potential for feedback
  106loops, only the plain `Term`  is   broadcasted  locally.  The timeout is
  107optional. It specifies the amount to time  to wait for replies to arrive
  108in response to a  broadcast_request/1.  The   default  period  is  0.250
  109seconds. The timeout is ignored for broadcasts.
  110
  111An example of three separate processes   cooperating in the same _scope_
  112called `peers`:
  113
  114==
  115Process A:
  116
  117   ?- listen(number(X), between(1, 5, X)).
  118   true.
  119
  120   ?-
  121
  122Process B:
  123
  124   ?- listen(number(X), between(7, 9, X)).
  125   true.
  126
  127   ?-
  128
  129Process C:
  130
  131   ?- findall(X, broadcast_request(udp(peers, number(X))), Xs).
  132   Xs = [1, 2, 3, 4, 5, 7, 8, 9].
  133
  134   ?-
  135==
  136
  137It is also  possible  to  carry  on   a  private  dialog  with  a single
  138responder. To do this, you supply a   compound of the form, Term:PortId,
  139to a UDP scoped broadcast/1 or  broadcast_request/1, where PortId is the
  140ip-address and port-id of  the  intended   listener.  If  you  supply an
  141unbound variable, PortId, to broadcast_request, it  will be unified with
  142the address of the listener  that  responds   to  Term.  You  may send a
  143directed broadcast to a specific member by simply providing this address
  144in a similarly structured compound  to   a  UDP  scoped broadcast/1. The
  145message is sent via unicast to that member   only by way of the member's
  146broadcast listener. It is received by  the   listener  just as any other
  147broadcast would be. The listener does not know the difference.
  148
  149For example, in order to discover who responded with a particular value:
  150
  151==
  152Host B Process 1:
  153
  154   ?- listen(number(X), between(1, 5, X)).
  155   true.
  156
  157   ?-
  158
  159Host A Process 1:
  160
  161
  162   ?- listen(number(X), between(7, 9, X)).
  163   true.
  164
  165   ?-
  166
  167Host A Process 2:
  168
  169   ?- listen(number(X), between(1, 5, X)).
  170   true.
  171
  172   ?- bagof(X, broadcast_request(udp(peers,number(X):From,1)), Xs).
  173   From = ip(192, 168, 1, 103):34855,
  174   Xs = [7, 8, 9] ;
  175   From = ip(192, 168, 1, 103):56331,
  176   Xs = [1, 2, 3, 4, 5] ;
  177   From = ip(192, 168, 1, 104):3217,
  178   Xs = [1, 2, 3, 4, 5].
  179==
  180
  181All incomming trafic is handled  by  a   single  thread  with  the alias
  182`udp_inbound_proxy`. This thread also performs  the internal dispatching
  183using broadcast/1 and broadcast_request/1. Future   versions may provide
  184for handling these requests in separate threads.
  185
  186
  187## Caveats {#udp-broadcase-caveats}
  188
  189While the implementation is mostly transparent, there are some important
  190and subtle differences that must be taken into consideration:
  191
  192    * UDP broadcast requires an initialization step in order to
  193    launch the broadcast listener proxy. See
  194    udp_broadcast_initialize/2.
  195
  196    * Prolog's broadcast_request/1 is nondet. It sends the request,
  197    then evaluates the replies synchronously, backtracking as needed
  198    until a satisfactory reply is received. The remaining potential
  199    replies are not evaluated.  With UDP, all peers will send all
  200    answers to the query.  The receiver may however stop listening.
  201
  202    * A UDP broadcast/1 is completely asynchronous.
  203
  204    * A  UDP broadcast_request/1 is partially synchronous. A
  205    broadcast_request/1 is sent, then the sender balks for a period of
  206    time (default: 250 ms) while the replies are collected. Any reply
  207    that is received after this period is silently discarded. A
  208    optional second argument is provided so that a sender may specify
  209    more (or less) time for replies.
  210
  211    * Replies are presented to the user as a choice point on arrival,
  212    until the broadcast request timer finally expires. This
  213    allows traffic to propagate through the system faster and provides
  214    the requestor with the opportunity to terminate a broadcast request
  215    early if desired, by simply cutting choice points.
  216
  217    * Please beware that broadcast request transactions remain active
  218    and resources consumed until broadcast_request finally fails on
  219    backtracking, an uncaught exception occurs, or until choice points
  220    are cut. Failure to properly manage this will likely result in
  221    chronic exhaustion of UDP sockets.
  222
  223    * If a listener is connected to a generator that always succeeds
  224    (e.g. a random number generator), then the broadcast request will
  225    never terminate and trouble is bound to ensue.
  226
  227    * broadcast_request/1 with =|udp_subnet|= scope is _not_ reentrant.
  228    If a listener performs a broadcast_request/1 with UDP scope
  229    recursively, then disaster looms certain. This caveat does not apply
  230    to a UDP scoped broadcast/1, which can safely be performed from a
  231    listener context.
  232
  233    * UDP broadcast's capacity is not infinite. While it can tolerate
  234    substantial bursts of activity, it is designed for short bursts of
  235    small messages. Unlike TIPC, UDP is unreliable and has no QOS
  236    protections. Congestion is likely to cause trouble in the form of
  237    non-Byzantine failure. That is, late, lost (e.g. infinitely late),
  238    or duplicate datagrams. Caveat emptor.
  239
  240    * A UDP broadcast_request/1 term that is grounded is considered to
  241    be a broadcast only. No replies are collected unless the there is at
  242    least one unbound variable to unify.
  243
  244    * A UDP broadcast/1 always succeeds, even if there are no
  245    listeners.
  246
  247    * A UDP broadcast_request/1 that receives no replies will fail.
  248
  249    * Replies may be coming from many different places in the network
  250    (or none at all). No ordering of replies is implied.
  251
  252    * Prolog terms are sent to others after first converting them to
  253    atoms using term_string/3.  Serialization does not deal with cycles,
  254    attributes or sharing.   The hook udp_term_string_hook/3 may be
  255    defined to change the message serialization and support different
  256    message formats and/or encryption.
  257
  258    * The broadcast model is based on anonymity and a presumption of
  259    trust--a perfect recipe for compromise. UDP is an Internet protocol.
  260    A UDP broadcast listener exposes a public port, which is
  261    static and shared by all listeners, and a private port, which is
  262    semi-static and unique to the listener instance. Both can be seen
  263    from off-cluster nodes and networks. Usage of this module exposes
  264    the node and consequently, the cluster to significant security
  265    risks. So have a care when designing your application. You must talk
  266    only to those who share and contribute to your concerns using a
  267    carefully prescribed protocol.
  268
  269    * UDP broadcast categorically and silently ignores all message
  270    traffic originating from or terminating on nodes that are not
  271    members of the local subnet. This security measure only keeps honest
  272    people honest!
  273
  274@author    Jeffrey Rosenwald (JeffRose@acm.org), Jan Wielemaker
  275@license   BSD-2
  276@see       tipc.pl
  277*/
  278
  279:- multifile
  280    udp_term_string_hook/3,                     % +Scope, ?Term, ?String
  281    udp_unicast_join_hook/3,                    % +Scope, +From, +Data
  282    black_list/1.                               % +Term
  283
  284:- meta_predicate
  285    safely(0),
  286    safely_det(0).  287
  288safely(Predicate) :-
  289    Err = error(_,_),
  290    catch(Predicate, Err,
  291          print_message_fail(Err)).
  292
  293safely_det(Predicate) :-
  294    Err = error(_,_),
  295    catch(Predicate, Err,
  296          print_message_fail(Err)),
  297    !.
  298safely_det(_).
  299
  300print_message_fail(Term) :-
  301    print_message(error, Term),
  302    fail.
  303
  304udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :-
  305    IPAddress = ip(A1, A2, A3, A4),
  306    Subnet = ip(S1, S2, S3, S4),
  307    BroadcastAddress = ip(B1, B2, B3, B4),
  308
  309    B1 is A1 \/ (S1 xor 255),
  310    B2 is A2 \/ (S2 xor 255),
  311    B3 is A3 \/ (S3 xor 255),
  312    B4 is A4 \/ (S4 xor 255).
  313
  314%!  udp_broadcast_service(?Scope, ?Address) is nondet.
  315%
  316%   provides the UDP broadcast address for   a  given Scope. At present,
  317%   only one scope is supported, =|udp_subnet|=.
  318
  319%!  udp_scope(?ScopeName, ?ScopeDef)
  320
  321:- dynamic
  322    udp_scope/2,
  323    udp_scope_peer/2.  324:- volatile
  325    udp_scope/2,
  326    udp_scope_peer/2.  327%
  328%  Here's a UDP proxy to Prolog's broadcast library
  329%
  330%  A sender may extend a broadcast  to  a   subnet  of  a UDP network by
  331%  specifying a =|udp_subnet|= scoping qualifier   in his/her broadcast.
  332%  The qualifier has the effect of  selecting the appropriate multi-cast
  333%  address for the transmission. Thus,  the   sender  of the message has
  334%  control over the scope of his/her traffic on a per-message basis.
  335%
  336%  All in-scope listeners receive the   broadcast and simply rebroadcast
  337%  the message locally. All broadcast replies, if any, are sent directly
  338%  to the sender via the port-id that   was received with the broadcast.
  339%
  340%  Each listener exposes two UDP ports,  a   shared  public port that is
  341%  bound to a well-known port number and   a  private port that uniquely
  342%  indentifies the listener. Broadcasts are received  on the public port
  343%  and replies are  sent  on  the   private  port.  Directed  broadcasts
  344%  (unicasts) are received on the private port   and replies are sent on
  345%  the private port.
  346
  347%  Thread 1 listens for directed traffic on the private port.
  348%
  349
  350:- dynamic
  351    udp_private_socket/3,                       % Port, Socket, FileNo
  352    udp_public_socket/4,                        % Scope, Port, Socket, FileNo
  353    udp_closed/1.				% Scope
  354
  355udp_inbound_proxy(Master) :-
  356    thread_at_exit(inbound_proxy_died),
  357    make_private_socket,
  358    thread_send_message(Master, udp_inbound_ready),
  359    udp_inbound_proxy_loop.
  360
  361udp_inbound_proxy_loop :-
  362    forall(udp_scope(Scope, ScopeData),
  363           make_public_socket(ScopeData, Scope)),
  364    retractall(udp_closed(_)),
  365    findall(FileNo, udp_socket_file_no(FileNo), FileNos),
  366    catch(dispatch_inbound(FileNos),
  367          E, dispatch_exception(E)),
  368    udp_inbound_proxy_loop.
  369
  370dispatch_exception(E) :-
  371    E = error(_,_),
  372    !,
  373    print_message(warning, E).
  374dispatch_exception(_).
  375
  376
  377%!  make_private_socket is det.
  378%
  379%   Create our private socket. This socket is used for messages that are
  380%   directed to me. Note that we only  need this for broadcast networks.
  381%   If we use a unicast network we use   our public port to contact this
  382%   specific server.
  383
  384make_private_socket :-
  385    udp_private_socket(_Port, S, _F),
  386    !,
  387    (   (   udp_scope(Scope, broadcast(_,_,_))
  388        ;   udp_scope(Scope, multicast(_,_))
  389        ),
  390        \+ udp_closed(Scope)
  391    ->  true
  392    ;   tcp_close_socket(S),
  393        retractall(udp_private_socket(_,_,_))
  394    ).
  395make_private_socket :-
  396    udp_scope(_, broadcast(_,_,_)),
  397    !,
  398    udp_socket(S),
  399    tcp_bind(S, Port),
  400    tcp_getopt(S, file_no(F)),
  401    tcp_setopt(S, broadcast),
  402    assertz(udp_private_socket(Port, S, F)).
  403make_private_socket :-
  404    udp_scope(_, multicast(_,_)),
  405    !,
  406    udp_socket(S),
  407    tcp_bind(S, Port),
  408    tcp_getopt(S, file_no(F)),
  409    assertz(udp_private_socket(Port, S, F)).
  410make_private_socket.
  411
  412%!  make_public_socket(+ScopeData, +Scope)
  413%
  414%   Create the public port Scope.
  415
  416make_public_socket(_, Scope) :-
  417    udp_public_socket(Scope, _Port, S, _),
  418    !,
  419    (   udp_closed(Scope)
  420    ->  tcp_close_socket(S),
  421        retractall(udp_public_socket(Scope, _, _, _))
  422    ;   true
  423    ).
  424make_public_socket(broadcast(_SubNet, _Broadcast, Port), Scope) :-
  425    udp_socket(S),
  426    tcp_setopt(S, reuseaddr),
  427    tcp_bind(S, Port),
  428    tcp_getopt(S, file_no(F)),
  429    assertz(udp_public_socket(Scope, Port, S, F)).
  430make_public_socket(multicast(Group, Port), Scope) :-
  431    udp_socket(S),
  432    tcp_setopt(S, reuseaddr),
  433    tcp_bind(S, Port),
  434    tcp_setopt(S, ip_add_membership(Group)),
  435    tcp_getopt(S, file_no(F)),
  436    assertz(udp_public_socket(Scope, Port, S, F)).
  437make_public_socket(unicast(Port), Scope) :-
  438    udp_socket(S),
  439    tcp_bind(S, Port),
  440    tcp_getopt(S, file_no(F)),
  441    assertz(udp_public_socket(Scope, Port, S, F)).
  442
  443udp_socket_file_no(FileNo) :-
  444    udp_private_socket(_,_,FileNo).
  445udp_socket_file_no(FileNo) :-
  446    udp_public_socket(_,_,_,FileNo).
  447
  448%!  dispatch_inbound(+FileNos)
  449%
  450%   Dispatch inbound traffic. This loop   uses  wait_for_input/3 to wait
  451%   for one or more UDP sockets and   dispatches  the requests using the
  452%   internal broadcast service. For an  incomming broadcast _request_ we
  453%   send the reply only to the  requester   and  therefore we must use a
  454%   socket that is not in broadcast mode.
  455
  456dispatch_inbound(FileNos) :-
  457    debug(udp(broadcast), 'Waiting for ~p', [FileNos]),
  458    wait_for_input(FileNos, Ready, infinite),
  459    debug(udp(broadcast), 'Ready: ~p', [Ready]),
  460    maplist(dispatch_ready, Ready),
  461    dispatch_inbound(FileNos).
  462
  463dispatch_ready(FileNo) :-
  464    udp_private_socket(_Port, Private, FileNo),
  465    !,
  466    udp_receive(Private, Data, From, [max_message_size(65535)]),
  467    debug(udp(broadcast), 'Inbound on private port', []),
  468    (   in_scope(Scope, From),
  469        udp_term_string(Scope, Term, Data) % only accept valid data
  470    ->  ld_dispatch(Private, Term, From, Scope)
  471    ;   true
  472    ).
  473dispatch_ready(FileNo) :-
  474    udp_public_socket(Scope, _PublicPort, Public, FileNo),
  475    !,
  476    udp_receive(Public, Data, From, [max_message_size(65535)]),
  477    debug(udp(broadcast), 'Inbound on public port from ~p for scope ~p',
  478          [From, Scope]),
  479    (   in_scope(Scope, From),
  480        udp_term_string(Scope, Term, Data) % only accept valid data
  481    ->  (   udp_scope(Scope, unicast(_))
  482        ->  ld_dispatch(Public, Term, From, Scope)
  483        ;   udp_private_socket(_PrivatePort, Private, _FileNo),
  484            ld_dispatch(Private, Term, From, Scope)
  485        )
  486    ;   udp_scope(Scope, unicast(_)),
  487        udp_term_string(Scope, Term, Data),
  488        unicast_out_of_scope_request(Scope, From, Term)
  489    ->  true
  490    ;   true
  491    ).
  492
  493in_scope(Scope, Address) :-
  494    udp_scope(Scope, ScopeData),
  495    in_scope(ScopeData, Scope, Address),
  496    !.
  497in_scope(Scope, From) :-
  498    debug(udp(broadcast), 'Out-of-scope ~p datagram from ~p',
  499          [Scope, From]),
  500    fail.
  501
  502in_scope(broadcast(Subnet, Broadcast, _PublicPort), _Scope, IP:_FromPort) :-
  503    udp_broadcast_address(IP, Subnet, Broadcast).
  504in_scope(multicast(_Group, _Port), _Scope, _From).
  505in_scope(unicast(_PublicPort), Scope, IP:_) :-
  506    udp_peer(Scope, IP:_).
  507
  508
  509%!  ld_dispatch(+PrivateSocket, +Term, +From, +Scope)
  510%
  511%   Locally dispatch Term received from From. If it concerns a broadcast
  512%   request, send the replies to PrivateSocket   to  From. The multifile
  513%   hook black_list/1 can be used to ignore certain messages.
  514
  515ld_dispatch(_S, Term, From, _Scope) :-
  516    debug(udp(broadcast), 'ld_dispatch(~p) from ~p', [Term, From]),
  517    fail.
  518ld_dispatch(_S, Term, _From, _Scope) :-
  519    blacklisted(Term), !.
  520ld_dispatch(S, request(Key, Term), From, Scope) :-
  521    !,
  522    forall(safely(broadcast_request(Term)),
  523           safely((udp_term_string(Scope, reply(Key,Term), Message),
  524                   udp_send(S, Message, From, [])))).
  525ld_dispatch(_S, send(Term), _From, _Scope) :-
  526    !,
  527    safely_det(broadcast(Term)).
  528ld_dispatch(_S, reply(Key, Term), From, _Scope) :-
  529    (   reply_queue(Key, Queue)
  530    ->  safely(thread_send_message(Queue, Term:From))
  531    ;   true
  532    ).
  533
  534blacklisted(send(Term))      :- black_list(Term).
  535blacklisted(request(_,Term)) :- black_list(Term).
  536blacklisted(reply(_,Term))   :- black_list(Term).
  537
  538
  539%!  reload_udp_proxy
  540%
  541%   Update the UDP relaying proxy service.   The proxy consists of three
  542%   forwarding mechanisms:
  543%
  544%     - Listen on our _scope_.  If any messages are received, hand them
  545%       to udp_broadcast/3 to be broadcasted to _scope_ or sent to a
  546%       specific recipient.
  547%     - Listen on the _scope_ public port. Incomming messages are
  548%       relayed to the internal broadcast mechanism and replies are sent
  549%       to from our private socket.
  550%     - Listen on our private port and reply using the same port.
  551
  552reload_udp_proxy :-
  553    reload_outbound_proxy,
  554    reload_inbound_proxy.
  555
  556reload_outbound_proxy :-
  557    listening(udp_broadcast, udp(_,_), _),
  558    !.
  559reload_outbound_proxy :-
  560    listen(udp_broadcast, udp(Scope,Message),
  561           udp_broadcast(Message, Scope, 0.25)),
  562    listen(udp_broadcast, udp(Scope,Message,Timeout),
  563           udp_broadcast(Message, Scope, Timeout)),
  564    listen(udp_broadcast, udp_subnet(Message),  % backward compatibility
  565           udp_broadcast(Message, subnet, 0.25)),
  566    listen(udp_broadcast, udp_subnet(Message,Timeout),
  567           udp_broadcast(Message, subnet, Timeout)).
  568
  569reload_inbound_proxy :-
  570    catch(thread_signal(udp_inbound_proxy, throw(udp_reload)),
  571          error(existence_error(thread, _),_),
  572          fail),
  573    !.
  574reload_inbound_proxy :-
  575    thread_self(Me),
  576    thread_create(udp_inbound_proxy(Me), _,
  577                  [ alias(udp_inbound_proxy),
  578                    detached(true)
  579                  ]),
  580    thread_get_message(Me, udp_inbound_ready, [timeout(10)]).
  581
  582inbound_proxy_died :-
  583    thread_self(Self),
  584    thread_property(Self, status(Status)),
  585    (   catch(recreate_proxy(Status), _, fail)
  586    ->  print_message(informational,
  587                      httpd_restarted_worker(Self))
  588    ;   done_status_message_level(Status, Level),
  589        print_message(Level,
  590                      httpd_stopped_worker(Self, Status))
  591    ).
  592
  593recreate_proxy(exception(Error)) :-
  594    recreate_on_error(Error),
  595    reload_inbound_proxy.
  596
  597recreate_on_error('$aborted').          % old
  598recreate_on_error(unwind(abort)).
  599recreate_on_error(time_limit_exceeded).
  600
  601done_status_message_level(true, silent) :- !.
  602done_status_message_level(exception('$aborted'), silent) :- !.
  603done_status_message_level(exception(unwind(abort)), silent) :- !.
  604done_status_message_level(_, informational).
  605
  606
  607%!  udp_broadcast_close(+Scope)
  608%
  609%   Close a UDP broadcast scope.
  610
  611udp_broadcast_close(Scope) :-
  612    udp_scope(Scope, _ScopeData),
  613    !,
  614    assert(udp_closed(Scope)),
  615    reload_udp_proxy.
  616udp_broadcast_close(_).
  617
  618
  619%!  udp_broadcast(+What, +Scope, +TimeOut)
  620%
  621%   Send a broadcast request to my UDP peers in Scope. What is either of
  622%   the shape `Term:Address` to send Term to a specific address or query
  623%   the address from which term is answered or it is a plain `Term`.
  624%
  625%   If `Term` is  nonground,  it  is   considered  is  a  _request_ (see
  626%   broadcast_request/1) and the predicate  succeeds   for  each  answer
  627%   received within TimeOut seconds. If Term is ground it is considered
  628%   an asynchronous broadcast and udp_broadcast/3 is deterministic.
  629
  630udp_broadcast(Term:To, Scope, _Timeout) :-
  631    ground(Term), ground(To),           % broadcast to single listener
  632    !,
  633    udp_basic_broadcast(send(Term), Scope, single(To)).
  634udp_broadcast(Term, Scope, _Timeout) :-
  635    ground(Term),                       % broadcast to all listeners
  636    !,
  637    udp_basic_broadcast(send(Term), Scope, broadcast).
  638udp_broadcast(Term:To, Scope, Timeout) :-
  639    ground(To),                         % request to single listener
  640    !,
  641    setup_call_cleanup(
  642        request_queue(Id, Queue),
  643        ( udp_basic_broadcast(request(Id, Term), Scope, single(To)),
  644          udp_br_collect_replies(Queue, Timeout, Term:To)
  645        ),
  646        destroy_request_queue(Queue)).
  647udp_broadcast(Term:From, Scope, Timeout) :-
  648    !,                                  % request to all listeners, collect sender
  649    setup_call_cleanup(
  650        request_queue(Id, Queue),
  651        ( udp_basic_broadcast(request(Id, Term), Scope, broadcast),
  652          udp_br_collect_replies(Queue, Timeout, Term:From)
  653        ),
  654        destroy_request_queue(Queue)).
  655udp_broadcast(Term, Scope, Timeout) :-  % request to all listeners
  656    udp_broadcast(Term:_, Scope, Timeout).
  657
  658:- dynamic
  659    reply_queue/2.  660
  661request_queue(Id, Queue) :-
  662    Id is random(1<<63),
  663    message_queue_create(Queue),
  664    asserta(reply_queue(Id, Queue)).
  665
  666destroy_request_queue(Queue) :-         % leave queue to GC
  667    retractall(reply_queue(_, Queue)).
  668
  669
  670%!  udp_basic_broadcast(+Term, +Dest) is multi.
  671%
  672%   Create a UDP private socket and use it   to send Term to Address. If
  673%   Address is our broadcast address, set the socket in broadcast mode.
  674%
  675%   This predicate succeeds with a choice   point. Committing the choice
  676%   point closes S.
  677%
  678%   @arg Dest is one of single(Target) or `broadcast`.
  679
  680udp_basic_broadcast(Term, Scope, Dest) :-
  681    debug(udp(broadcast), 'UDP proxy outbound ~p to ~p', [Term, Dest]),
  682    udp_term_string(Scope, Term, String),
  683    udp_send_message(Dest, String, Scope).
  684
  685udp_send_message(single(Address), String, Scope) :-
  686    (   udp_scope(Scope, unicast(_))
  687    ->  udp_public_socket(Scope, _Port, S, _)
  688    ;   udp_private_socket(_Port, S, _F)
  689    ),
  690    safely(udp_send(S, String, Address, [])).
  691udp_send_message(broadcast, String, Scope) :-
  692    (   udp_scope(Scope, unicast(_))
  693    ->  udp_public_socket(Scope, _Port, S, _),
  694        forall(udp_peer(Scope, Address),
  695               ( debug(udp(broadcast), 'Unicast to ~p', [Address]),
  696                 safely(udp_send(S, String, Address, []))))
  697    ;   udp_scope(Scope, broadcast(_SubNet, Broadcast, Port))
  698    ->  udp_private_socket(_PrivatePort, S, _F),
  699        udp_send(S, String, Broadcast:Port, [])
  700    ;   udp_scope(Scope, multicast(Group, Port))
  701    ->  udp_private_socket(_PrivatePort, S, _F),
  702        udp_send(S, String, Group:Port, [])
  703    ).
  704
  705% ! udp_br_collect_replies(+Queue, +TimeOut, -TermAndFrom) is nondet.
  706%
  707%   Collect replies on Socket for  TimeOut   seconds.  Succeed  for each
  708%   received message.
  709
  710udp_br_collect_replies(Queue, Timeout, Reply) :-
  711    get_time(Start),
  712    Deadline is Start+Timeout,
  713    repeat,
  714       (   thread_get_message(Queue, Reply,
  715                              [ deadline(Deadline)
  716                              ])
  717       ->  true
  718       ;   !,
  719           fail
  720       ).
  721
  722%!  udp_broadcast_initialize(+IPAddress, +Options) is semidet.
  723%
  724%   Initialized UDP broadcast bridge. IPAddress is the IP address on the
  725%   network we want to broadcast on.  IP addresses are terms ip(A,B,C,D)
  726%   or an atom or string of the format =|A.B.C.D|=.   Options processed:
  727%
  728%     - scope(+ScopeName)
  729%     Name of the scope.  Default is `subnet`.
  730%     - subnet_mask(+SubNet)
  731%     Subnet to broadcast on.  This uses the same syntax as IPAddress.
  732%     Default classifies the network as class A, B or C depending on
  733%     the the first octet and applies the default mask.
  734%     - port(+Port)
  735%     Public port to use.  Default is 20005.
  736%     - method(+Method)
  737%     Method to send a message to multiple peers.  One of
  738%       - broadcast
  739%       Use UDP broadcast messages to the LAN.  This is the
  740%       default
  741%       - multicast
  742%       Use UDP multicast messages.  This can be used on WAN networks,
  743%       provided the intermediate routers understand multicast.
  744%       - unicast
  745%       Send the messages individually to all registered peers.
  746%
  747%   For compatibility reasons Options may be the subnet mask.
  748
  749udp_broadcast_initialize(IP, Options) :-
  750    with_mutex(udp_broadcast,
  751               udp_broadcast_initialize_sync(IP, Options)).
  752
  753udp_broadcast_initialize_sync(IP, Options) :-
  754    nonvar(Options),
  755    Options = ip(_,_,_,_),
  756    !,
  757    udp_broadcast_initialize(IP, [subnet_mask(Options)]).
  758udp_broadcast_initialize_sync(IP, Options) :-
  759    to_ip4(IP, IPAddress),
  760    option(method(Method), Options, broadcast),
  761    must_be(oneof([broadcast, multicast, unicast]), Method),
  762    udp_broadcast_initialize_sync(Method, IPAddress, Options),
  763    reload_udp_proxy.
  764
  765udp_broadcast_initialize_sync(broadcast, IPAddress, Options) :-
  766    option(subnet_mask(Subnet), Options, _),
  767    mk_subnet(Subnet, IPAddress, Subnet4),
  768    option(port(Port), Options, 20005),
  769    option(scope(Scope), Options, subnet),
  770
  771    udp_broadcast_address(IPAddress, Subnet4, Broadcast),
  772    udp_broadcast_close(Scope),
  773    assertz(udp_scope(Scope, broadcast(Subnet4, Broadcast, Port))).
  774udp_broadcast_initialize_sync(unicast, _IPAddress, Options) :-
  775    option(port(Port), Options, 20005),
  776    option(scope(Scope), Options, subnet),
  777    udp_broadcast_close(Scope),
  778    assertz(udp_scope(Scope, unicast(Port))).
  779udp_broadcast_initialize_sync(multicast, IPAddress, Options) :-
  780    option(port(Port), Options, 20005),
  781    option(scope(Scope), Options, subnet),
  782    udp_broadcast_close(Scope),
  783    multicast_address(IPAddress),
  784    assertz(udp_scope(Scope, multicast(IPAddress, Port))).
  785
  786to_ip4(Atomic, ip(A,B,C,D)) :-
  787    atomic(Atomic),
  788    !,
  789    (   split_string(Atomic, ".", "", Strings),
  790        maplist(number_string, [A,B,C,D], Strings)
  791    ->  true
  792    ;   syntax_error(illegal_ip_address)
  793    ).
  794to_ip4(IP, IP).
  795
  796mk_subnet(Var, IP, Subnet) :-
  797    var(Var),
  798    !,
  799    (   default_subnet(IP, Subnet)
  800    ->  true
  801    ;   domain_error(ip_with_subnet, IP)
  802    ).
  803mk_subnet(Subnet, _, Subnet4) :-
  804    to_ip4(Subnet, Subnet4).
  805
  806%!  default_subnet(+IP, -NetWork)
  807%
  808%   Determine the default network address from an IP address. This
  809%   classifies the network as class A, B or C.
  810%
  811%   @see https://docs.oracle.com/cd/E19504-01/802-5753/planning3-78185/index.html
  812
  813default_subnet(ip(A,_,_,_), ip(A,0,0,0)) :-
  814    between(0,127, A), !.
  815default_subnet(ip(A,B,_,_), ip(A,B,0,0)) :-
  816    between(128,191, A), !.
  817default_subnet(ip(A,B,C,_), ip(A,B,C,0)) :-
  818    between(192,223, A), !.
  819
  820multicast_address(ip(A,_,_,_)) :-
  821    between(224, 239, A),
  822    !.
  823multicast_address(IP) :-
  824    domain_error(multicast_network, IP).
  825
  826
  827		 /*******************************
  828		 *          UNICAST PEERS	*
  829		 *******************************/
  830
  831%!  udp_peer_add(+Scope, +Address) is det.
  832%!  udp_peer_del(+Scope, ?Address) is det.
  833%!  udp_peer(?Scope, ?Address) is nondet.
  834%
  835%   Manage and query the set  of  known   peers  for  a unicast network.
  836%   Address is either a term  IP:Port  or   a  plain  IP address. In the
  837%   latter case the default port registered with the scope is used.
  838%
  839%   @arg Address has canonical form ip(A,B,C,D):Port.
  840
  841udp_peer_add(Scope, Address) :-
  842    must_be(ground, Address),
  843    peer_address(Address, Scope, Canonical),
  844    (   udp_scope_peer(Scope, Canonical)
  845    ->  true
  846    ;   assertz(udp_scope_peer(Scope, Canonical))
  847    ).
  848
  849udp_peer_del(Scope, Address) :-
  850    peer_address(Address, Scope, Canonical),
  851    retractall(udp_scope_peer(Scope, Canonical)).
  852
  853udp_peer(Scope, IPAddress) :-
  854    udp_scope_peer(Scope, IPAddress).
  855
  856peer_address(IP:Port, _Scope, IPAddress:Port) :-
  857    !,
  858    to_ip4(IP, IPAddress).
  859peer_address(IP, Scope, IPAddress:Port) :-
  860    (   udp_scope(Scope, unicast(Port))
  861    ->  true
  862    ;   existence_error(udp_scope, Scope)
  863    ),
  864    to_ip4(IP, IPAddress).
  865
  866
  867
  868		 /*******************************
  869		 *             HOOKS		*
  870		 *******************************/
  871
  872%!  udp_term_string_hook(+Scope, +Term, -String) is det.
  873%!  udp_term_string_hook(+Scope, -Term, +String) is semidet.
  874%
  875%   Hook  for  serializing  the  message    Term.   The  default  writes
  876%   =|%prolog\n|=, followed by the Prolog term  in quoted notation while
  877%   ignoring operators. This hook may use alternative serialization such
  878%   as fast_term_serialized/2, use  library(ssl)   to  realise encrypted
  879%   messages, etc.
  880%
  881%   @arg Scope is the scope for which the message is broadcasted.  This
  882%   can be used to use different serialization for different scopes.
  883%   @arg Term encapsulates the term broadcasted by the application as
  884%   follows:
  885%
  886%     - send(ApplTerm)
  887%       Is sent by broadcast(udp(Scope, ApplTerm))
  888%     - request(Id,ApplTerm)
  889%       Is sent by broadcast_request/1, where Id is a unique large
  890%       (64 bit) integer.
  891%     - reply(Id,ApplTerm)
  892%       Is sent to reply on a broadcast_request/1 request that has
  893%       been received.  Arguments are the same as above.
  894%
  895%   @throws The hook may throw udp(invalid_message) to stop processing
  896%   the message.
  897
  898%!  udp_term_string(+Scope, +Term, -String) is det.
  899%!  udp_term_string(+Scope, -Term, +String) is semidet.
  900%
  901%   Serialize an arbitrary Prolog  term  as   a  string.  The  string is
  902%   prefixed by a magic key to ensure   we only accept messages that are
  903%   meant for us.
  904%
  905%   In mode (+,-), Term is written with the options ignore_ops(true) and
  906%   quoted(true).
  907%
  908%   This predicate first calls  udp_term_string_hook/3.
  909
  910udp_term_string(Scope, Term, String) :-
  911    catch(udp_term_string_hook(Scope, Term, String), udp(Error), true),
  912    !,
  913    (   var(Error)
  914    ->  true
  915    ;   Error == invalid_message
  916    ->  fail
  917    ;   throw(udp(Error))
  918    ).
  919udp_term_string(_Scope, Term, String) :-
  920    (   var(String)
  921    ->  format(string(String), '%-prolog-\n~W',
  922               [ Term,
  923                 [ ignore_ops(true),
  924                   quoted(true)
  925                 ]
  926               ])
  927    ;   sub_string(String, 0, _, _, '%-prolog-\n'),
  928        term_string(Term, String,
  929                    [ syntax_errors(quiet)
  930                    ])
  931    ).
  932
  933%!  unicast_out_of_scope_request(+Scope, +From, +Data) is semidet.
  934
  935%!  udp_unicast_join_hook(+Scope, +From, +Data) is semidet.
  936%
  937%   This multifile hook is called if an   UDP package is received on the
  938%   port of the unicast network identified by  Scope. From is the origin
  939%   IP and port and Data is  the   message  data that is deserialized as
  940%   defined for the scope (see udp_term_string/3).
  941%
  942%   This hook is intended to initiate a  new node joining the network of
  943%   peers. We could in theory also  omit   the  in-scope  test and use a
  944%   normal broadcast to join. Using a different channal however provides
  945%   a basic level of security. A   possibe  implementation is below. The
  946%   first fragment is a hook  added  to   the  server,  the  second is a
  947%   predicate added to a client and the   last  initiates the request in
  948%   the client. The excanged term (join(X)) can   be  used to exchange a
  949%   welcome handshake.
  950%
  951%
  952%   ```
  953%   :- multifile udp_broadcast:udp_unicast_join_hook/3.
  954%   udp_broadcast:udp_unicast_join_hook(Scope, From, join(welcome)) :-
  955%       udp_peer_add(Scope, From),
  956%   ```
  957%
  958%   ```
  959%   join_request(Scope, Address, Reply) :-
  960%       udp_peer_add(Scope, Address),
  961%       broadcast_request(udp(Scope, join(X))).
  962%   ```
  963%
  964%   ```
  965%   ?- join_request(myscope, "1.2.3.4":10001, Reply).
  966%   Reply = welcome.
  967%   ```
  968
  969unicast_out_of_scope_request(Scope, From, send(Term)) :-
  970    udp_unicast_join_hook(Scope, From, Term).
  971unicast_out_of_scope_request(Scope, From, request(Key, Term)) :-
  972    udp_unicast_join_hook(Scope, From, Term),
  973    udp_public_socket(Scope, _Port, Socket, _FileNo),
  974    safely((udp_term_string(Scope, reply(Key,Term), Message),
  975            udp_send(Socket, Message, From, [])))