1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker and Sean Charles 4 E-mail: jan@swi-prolog.org and <sean at objitsu dot com> 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2013-2020, Sean Charles 7 SWI-Prolog Solutions b.v. 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34 35 NOTE 36 37 The original code was subject to the MIT licence and written by 38 Sean Charles. Re-licenced to standard SWI-Prolog BSD-2 with 39 permission from Sean Charles. 40*/ 41 42:- module(redis, 43 [ redis_server/3, % +Alias, +Address, +Options 44 redis_connect/1, % -Connection 45 redis_connect/3, % -Connection, +Host, +Port 46 redis_disconnect/1, % +Connection 47 redis_disconnect/2, % +Connection, +Options 48 % Queries 49 redis/1, % +Request 50 redis/2, % +Connection, +Request 51 redis/3, % +Connection, +Request, -Reply 52 % High level queries 53 redis_get_list/3, % +Redis, +Key, -List 54 redis_get_list/4, % +Redis, +Key, +ChunkSize, -List 55 redis_set_list/3, % +Redis, +Key, +List 56 redis_get_hash/3, % +Redis, +Key, -Data:dict 57 redis_set_hash/3, % +Redis, +Key, +Data:dict 58 redis_scan/3, % +Redis, -LazyList, +Options 59 redis_sscan/4, % +Redis, +Set, -LazyList, +Options 60 redis_hscan/4, % +Redis, +Hash, -LazyList, +Options 61 redis_zscan/4, % +Redis, +Set, -LazyList, +Options 62 % Publish/Subscribe 63 redis_subscribe/4, % +Redis, +Channels, -Id, +Options 64 redis_subscribe/2, % +Id, +Channels 65 redis_unsubscribe/2, % +Id, +Channels 66 redis_current_subscription/2, % ?Id,?Channels 67 redis_write/2, % +Redis, +Command 68 redis_read/2, % +Redis, -Reply 69 % Building blocks 70 redis_array_dict/3, % ?Array, ?Tag, ?Dict 71 % Admin stuff 72 redis_property/2, % +Reply, ?Property 73 redis_current_command/2, % +Redis,?Command 74 redis_current_command/3 % +Redis, +Command, -Properties 75 ]). 76:- autoload(library(socket), [tcp_connect/3]). 77:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]). 78:- autoload(library(broadcast), [broadcast/1]). 79:- autoload(library(error), 80 [ must_be/2, 81 instantiation_error/1, 82 uninstantiation_error/1, 83 existence_error/2 84 ]). 85:- autoload(library(lazy_lists), [lazy_list/2]). 86:- autoload(library(lists), [append/3, member/2]). 87:- autoload(library(option), [merge_options/3, option/2, option/3]). 88:- autoload(library(pairs), [group_pairs_by_key/2]). 89:- use_module(library(debug), [debug/3, assertion/1]). 90:- use_module(library(settings), [setting/4, setting/2]). 91 92:- use_foreign_library(foreign(redis4pl)). 93 94:- setting(max_retry_count, nonneg, 8640, % one day 95 "Max number of retries"). 96:- setting(max_retry_wait, number, 10, 97 "Max time to wait between recovery attempts"). 98 99:- predicate_options(redis_server/3, 3, 100 [ pass_to(redis:redis_connect/3, 3) 101 ]). 102:- predicate_options(redis_connect/3, 3, 103 [ reconnect(boolean), 104 user(atom), 105 password(atomic), 106 version(between(2,3)) 107 ]). 108:- predicate_options(redis_disconnect/2, 2, 109 [ force(boolean) 110 ]). 111:- predicate_options(redis_scan/3, 3, 112 [ match(atomic), 113 count(nonneg), 114 type(atom) 115 ]). 116% Actually not passing, but the same 117:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 118:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 119:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).
140:- dynamic server/3. 141 142:- dynamic ( connection/2 % ServerName, Stream 143 ) as volatile.
default
points at localhost:6379
with no connect options. The default
server is used for redis/1 and redis/2 and may be changed using this
predicate. Options are described with redis_connect/3.
Connections established this way are by default automatically
reconnected if the connection is lost for some reason unless a
reconnect(false)
option is specified.
157redis_server(Alias, Address, Options) :- 158 must_be(ground, Alias), 159 retractall(server(Alias, _, _)), 160 asserta(server(Alias, Address, Options)). 161 162server(default, localhost:6379, []).
redis_connect(+Address,
-Connection, +Options)
. redis_connect/1 is equivalent to
redis_connect(localhost:6379, Connection, [])
. Options:
true
, try to reconnect to the service when the connection
seems lost. Default is true
for connections specified using
redis_server/3 and false
for explictly opened connections.version(3)
and password(Password)
are specified, these
are used to authenticate using the HELLO command.3
, the HELLO command is used to upgrade the protocol.Instead of using these predicates, redis/2 and redis/3 are normally used with a server name argument registered using redis_server/3. These predicates are meant for creating a temporary paralel connection or using a connection with a blocking call.
200redis_connect(Conn) :- 201 redis_connect(default, Conn, []). 202 203redis_connect(Conn, Host, Port) :- 204 var(Conn), 205 ground(Host), ground(Port), 206 !, % GNU-Prolog compatibility 207 redis_connect(Host:Port, Conn, []). 208redis_connect(Server, Conn, Options) :- 209 atom(Server), 210 !, 211 ( server(Server, Address, DefaultOptions) 212 -> merge_options(Options, DefaultOptions, Options2), 213 do_connect(Server, Address, Conn, [address(Address)|Options2]) 214 ; existence_error(redis_server, Server) 215 ). 216redis_connect(Address, Conn, Options) :- 217 do_connect(Address, Address, Conn, [address(Address)|Options]).
redis_connection(Id, Stream, Failures, Options)
225do_connect(Id, Address0, Conn, Options) :- 226 tcp_address(Address0, Address), 227 tcp_connect(Address, Stream, Options), 228 Conn = redis_connection(Id, Stream, 0, Options), 229 hello(Conn, Options). 230 231tcp_address(unix(Path), Path) :- 232 !. % Using an atom is ambiguous 233tcp_address(Address, Address).
241hello(Con, Options) :- 242 option(version(V), Options), 243 V >= 3, 244 !, 245 ( option(user(User), Options), 246 option(password(Password), Options) 247 -> redis(Con, hello(3, auth, User, Password)) 248 ; redis(Con, hello(3)) 249 ). 250hello(Con, Options) :- 251 option(password(Password), Options), 252 !, 253 redis(Con, auth(Password)). 254hello(_, _).
redis_connection(Id,Stream,Failures,Options)
. If the stream is
disconnected it will be reconnected.263redis_stream(Var, S, _) :- 264 ( var(Var) 265 -> !, instantiation_error(Var) 266 ; nonvar(S) 267 -> !, uninstantiation_error(S) 268 ). 269redis_stream(ServerName, S, Connect) :- 270 atom(ServerName), 271 !, 272 ( connection(ServerName, S0) 273 -> S = S0 274 ; Connect == true, 275 server(ServerName, Address, Options) 276 -> redis_connect(Address, Connection, Options), 277 redis_stream(Connection, S, false), 278 asserta(connection(ServerName, S)) 279 ; existence_error(redis_server, ServerName) 280 ). 281redis_stream(redis_connection(_,S0,_,_), S, _) :- 282 S0 \== (-), 283 !, 284 S = S0. 285redis_stream(Redis, S, _) :- 286 Redis = redis_connection(Id,-,_,Options), 287 option(address(Address), Options), 288 do_connect(Id,Address,Redis2,Options), 289 arg(2, Redis2, S0), 290 nb_setarg(2, Redis, S0), 291 S = S0. 292 293has_redis_stream(Var, _) :- 294 var(Var), 295 !, 296 instantiation_error(Var). 297has_redis_stream(Alias, S) :- 298 atom(Alias), 299 !, 300 connection(Alias, S). 301has_redis_stream(redis_connection(_,S,_,_), S) :- 302 S \== (-).
true
(default false
), do not raise any errors if
Connection does not exist or closing the connection raises
a network or I/O related exception. This version is used
internally if a connection is in a broken state, either due
to a protocol error or a network issue.318redis_disconnect(Redis) :- 319 redis_disconnect(Redis, []). 320 321redis_disconnect(Redis, Options) :- 322 option(force(true), Options), 323 !, 324 ( Redis = redis_connection(_Id, S, _, _Opts) 325 -> ( S == (-) 326 -> true 327 ; close(S, [force(true)]), 328 nb_setarg(2, Redis, -) 329 ) 330 ; has_redis_stream(Redis, S) 331 -> close(S, [force(true)]), 332 retractall(connection(_,S)) 333 ; true 334 ). 335redis_disconnect(Redis, _Options) :- 336 redis_stream(Redis, S, false), 337 close(S), 338 retractall(connection(_,S)).
redis(Connection, Command, _)
and second, it
can be used to exploit Redis pipelines and transactions. The
second form is acticated if Request is a list. In that case, each
element of the list is either a term Command -> Reply
or a simple
Command. Semantically this represents a sequence of redis/3 and
redis/2 calls. It differs in the following aspects:
multi
and the last exec
, the
commands are executed as a Redis transaction, i.e., they
are executed atomically.Procedurally, the process takes the following steps:
Command -> Reply
terms.Examples
?- redis(default, [ lpush(li,1), lpush(li,2), lrange(li,0,-1) -> List ]). List = ["2", "1"].
382redis(Redis, PipeLine) :- 383 is_list(PipeLine), 384 !, 385 redis_pipeline(Redis, PipeLine). 386redis(Redis, Req) :- 387 redis(Redis, Req, _).
"A:B:..."
. This is a common shorthand for
representing Redis keys.
Reply is either a plain term (often a variable) or a term Value as
Type
. In the latter form, Type dictates how the Redis bulk
reply is translated to Prolog. The default equals to auto
, i.e.,
as a number of the content satisfies the Prolog number syntax and
as an atom otherwise.
status(Atom)
Returned if the server replies with + Status
. Atom
is the textual value of Status converted to lower case,
e.g., status(ok)
or status(pong)
.nil
This atom is returned for a NIL/NULL value. Note that if
the reply is only nil
, redis/3 fails. The nil
value
may be embedded inside lists or maps.nil
. If Reply
as a whole would be nil
the call fails.
Redis bulk replies are translated depending on the as
Type as
explained above.
bytes
(iso_latin_1
), utf8
and text
(the
current locale translation).type_error(Type, String)
is raised.min_tagged_integer
and max_tagged_integer
, allowing
the value to be used as a dict key.auto(atom, number)
auto(atom,tagged_integer)
. This allows the value
to be used as a key for a SWI-Prolog dict.pairs
type
can also be applied to a Redis array. In this case the array
length must be even. This notably allows fetching a Redis
hash as pairs using HGETALL
using version 2 of the
Redis protocol.pairs(AsKey, AsValue)
, but convert the resulting
pair list into a SWI-Prolog dict. AsKey must convert to a
valid dict key, i.e., an atom or tagged integer. See dict_key
.dict(dict_key, AsValue)
.Here are some simple examples
?- redis(default, set(a, 42), X). X = status("OK"). ?- redis(default, get(a), X). X = "42". ?- redis(default, get(a), X as integer). X = 42. ?- redis(default, get(a), X as float). X = 42.0. ?- redis(default, set(swipl:version, 8)). true. ?- redis(default, incr(swipl:version), X). X = 9.
507redis(Redis, Req, Out) :- 508 out_val(Out, Val), 509 redis1(Redis, Req, Out), 510 Val \== nil. 511 512out_val(Out, Val) :- 513 ( nonvar(Out), 514 Out = (Val as _) 515 -> true 516 ; Val = Out 517 ). 518 519redis1(Redis, Req, Out) :- 520 Error = error(Formal, _), 521 catch(redis2(Redis, Req, Out), Error, true), 522 ( var(Formal) 523 -> true 524 ; recover(Error, Redis, redis1(Redis, Req, Out)) 525 ). 526 527redis2(Redis, Req, Out) :- 528 atom(Redis), 529 !, 530 redis_stream(Redis, S, true), 531 with_mutex(Redis, 532 ( redis_write_msg(S, Req), 533 redis_read_stream(Redis, S, Out) 534 )). 535redis2(Redis, Req, Out) :- 536 redis_stream(Redis, S, true), 537 redis_write_msg(S, Req), 538 redis_read_stream(Redis, S, Out).
542redis_pipeline(Redis, PipeLine) :- 543 Error = error(Formal, _), 544 catch(redis_pipeline2(Redis, PipeLine), Error, true), 545 ( var(Formal) 546 -> true 547 ; recover(Error, Redis, redis_pipeline(Redis, PipeLine)) 548 ). 549 550redis_pipeline2(Redis, PipeLine) :- 551 atom(Redis), 552 !, 553 redis_stream(Redis, S, true), 554 with_mutex(Redis, 555 redis_pipeline3(Redis, S, PipeLine)). 556redis_pipeline2(Redis, PipeLine) :- 557 redis_stream(Redis, S, true), 558 redis_pipeline3(Redis, S, PipeLine). 559 560redis_pipeline3(Redis, S, PipeLine) :- 561 maplist(write_pipeline(S), PipeLine), 562 flush_output(S), 563 read_pipeline(Redis, S, PipeLine). 564 565write_pipeline(S, Command -> _Reply) :- 566 !, 567 redis_write_msg_no_flush(S, Command). 568write_pipeline(S, Command) :- 569 redis_write_msg_no_flush(S, Command). 570 571read_pipeline(Redis, S, PipeLine) :- 572 E = error(Formal,_), 573 catch(read_pipeline2(Redis, S, PipeLine), E, true), 574 ( var(Formal) 575 -> true 576 ; reconnect_error(E) 577 -> redis_disconnect(Redis, [force(true)]), 578 throw(E) 579 ; resync(Redis), 580 throw(E) 581 ). 582 583read_pipeline2(Redis, S, PipeLine) :- 584 maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed), 585 maplist(handle_push(Redis), Pushed), 586 maplist(handle_error, Errors), 587 maplist(bind_reply, PipeLine, Replies). 588 589redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :- 590 !, 591 redis_read_msg(S, ReplyIn, Reply, Error, Push). 592redis_read_msg3(S, Var, Reply, Error, Push) :- 593 redis_read_msg(S, Var, Reply, Error, Push). 594 595handle_push(Redis, Pushed) :- 596 handle_push_messages(Pushed, Redis). 597handle_error(Error) :- 598 ( var(Error) 599 -> true 600 ; throw(Error) 601 ). 602bind_reply(_Command -> Reply0, Reply) :- 603 !, 604 Reply0 = Reply. 605bind_reply(_Command, _).
614:- meta_predicate recover( , , ). 615 616recover(Error, Redis, Goal) :- 617 reconnect_error(Error), 618 auto_reconnect(Redis), 619 !, 620 debug(redis(recover), '~p: got error ~p; trying to reconnect', 621 [Redis, Error]), 622 redis_disconnect(Redis, [force(true)]), 623 ( wait_to_retry(Redis, Error) 624 -> call(Goal), 625 retractall(failure(Redis, _)) 626 ; throw(Error) 627 ). 628recover(Error, _, _) :- 629 throw(Error). 630 631auto_reconnect(redis_connection(_,_,_,Options)) :- 632 !, 633 option(reconnect(true), Options). 634auto_reconnect(Server) :- 635 ground(Server), 636 server(Server, _, Options), 637 option(reconnect(true), Options, true). 638 639reconnect_error(error(socket_error(_Code, _),_)). 640reconnect_error(error(syntax_error(unexpected_eof),_)).
max_retry_wait
. If the
setting max_retry_count
is exceeded we fail and the called signals
an exception.649:- dynamic failure/2 as volatile. 650 651wait_to_retry(Redis, Error) :- 652 redis_failures(Redis, Failures), 653 setting(max_retry_count, Count), 654 Failures < Count, 655 Failures2 is Failures+1, 656 redis_set_failures(Redis, Failures2), 657 setting(max_retry_wait, MaxWait), 658 Wait is min(MaxWait*100, 1<<Failures)/100.0, 659 debug(redis(recover), ' Sleeping ~p seconds', [Wait]), 660 retry_message_level(Failures, Level), 661 print_message(Level, redis(retry(Redis, Failures, Wait, Error))), 662 sleep(Wait). 663 664redis_failures(redis_connection(_,_,Failures0,_), Failures) :- 665 !, 666 Failures = Failures0. 667redis_failures(Server, Failures) :- 668 atom(Server), 669 ( failure(Server, Failures) 670 -> true 671 ; Failures = 0 672 ). 673 674redis_set_failures(Connection, Count) :- 675 compound(Connection), 676 !, 677 nb_setarg(3, Connection, Count). 678redis_set_failures(Server, Count) :- 679 atom(Server), 680 retractall(failure(Server, _)), 681 asserta(failure(Server, Count)). 682 683retry_message_level(0, warning) :- !. 684retry_message_level(_, silent).
693redis(Req) :-
694 setup_call_cleanup(
695 redis_connect(default, C, []),
696 redis1(C, Req, Out),
697 redis_disconnect(C)),
698 print(Out).
706redis_write(Redis, Command) :- 707 redis_stream(Redis, S, true), 708 redis_write_msg(S, Command). 709 710redis_read(Redis, Reply) :- 711 redis_stream(Redis, S, true), 712 redis_read_stream(Redis, S, Reply). 713 714 715 /******************************* 716 * HIGH LEVEL ACCESS * 717 *******************************/
LRANGE
requests. Note
that this results in O(N^2) complexity. Using a lazy list is most
useful for relatively short lists holding possibly large items.
Note that values retrieved are strings, unless the value was added
using Term as prolog
.
734redis_get_list(Redis, Key, List) :- 735 redis_get_list(Redis, Key, -1, List). 736 737redis_get_list(Redis, Key, Chunk, List) :- 738 redis(Redis, llen(Key), Len), 739 ( ( Chunk >= Len 740 ; Chunk == -1 741 ) 742 -> ( Len == 0 743 -> List = [] 744 ; End is Len-1, 745 list_range(Redis, Key, 0, End, List) 746 ) 747 ; lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List) 748 ). 749 750rlist_next(State, List, Tail) :- 751 State = s(Redis,Key,Offset,Slice,Len), 752 End is min(Len-1, Offset+Slice-1), 753 list_range(Redis, Key, Offset, End, Elems), 754 ( End =:= Len-1 755 -> List = Elems, 756 Tail = [] 757 ; Offset2 is Offset+Slice, 758 nb_setarg(3, State, Offset2), 759 append(Elems, Tail, List) 760 ). 761 762% Redis LRANGE demands End > Start and returns inclusive. 763 764list_range(DB, Key, Start, Start, [Elem]) :- 765 !, 766 redis(DB, lindex(Key, Start), Elem). 767list_range(DB, Key, Start, End, List) :- 768 !, 769 redis(DB, lrange(Key, Start, End), List).
[]
, Key is deleted. Note that key values
are always strings in Redis. The same conversion rules as for
redis/1-3 apply.
780redis_set_list(Redis, Key, List) :-
781 redis(Redis, del(Key), _),
782 ( List == []
783 -> true
784 ; Term =.. [rpush,Key|List],
785 redis(Redis, Term, _Count)
786 ).
HGETALL
command. If the Redis hash is not used by
other (non-Prolog) applications one may also consider using the
Term as prolog
syntax to store the Prolog dict as-is.799redis_get_hash(Redis, Key, Dict) :- 800 redis(Redis, hgetall(Key), Dict as dict(auto)). 801 802redis_set_hash(Redis, Key, Dict) :- 803 redis_array_dict(Array, _, Dict), 804 Term =.. [hset,Key|Array], 805 redis(Redis, del(Key), _), 806 redis(Redis, Term, _Count).
817redis_array_dict(Array, Tag, Dict) :- 818 nonvar(Array), 819 !, 820 array_to_pairs(Array, Pairs), 821 dict_pairs(Dict, Tag, Pairs). 822redis_array_dict(TwoList, Tag, Dict) :- 823 dict_pairs(Dict, Tag, Pairs), 824 pairs_to_array(Pairs, TwoList). 825 826array_to_pairs([], []) :- 827 !. 828array_to_pairs([NameS-Value|T0], [Name-Value|T]) :- 829 !, % RESP3 returns a map as pairs. 830 atom_string(Name, NameS), 831 array_to_pairs(T0, T). 832array_to_pairs([NameS,Value|T0], [Name-Value|T]) :- 833 atom_string(Name, NameS), 834 array_to_pairs(T0, T). 835 836pairs_to_array([], []) :- 837 !. 838pairs_to_array([Name-Value|T0], [NameS,Value|T]) :- 839 atom_string(Name, NameS), 840 pairs_to_array(T0, T).
SCAN
, SSCAN
, HSCAN
and ZSCAN` commands
into a lazy list. For redis_scan/3 and redis_sscan/4 the result is
a list of strings. For redis_hscan/4 and redis_zscan/4, the result
is a list of pairs. Options processed:
MATCH
subcommand, only returning matches for
Pattern.COUNT
subcommand, giving a hint to the size of the
chunks fetched.TYPE
subcommand, only returning answers of the
indicated type.864redis_scan(Redis, LazyList, Options) :- 865 scan_options([match,count,type], Options, Parms), 866 lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList). 867 868redis_sscan(Redis, Set, LazyList, Options) :- 869 scan_options([match,count,type], Options, Parms), 870 lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList). 871 872redis_hscan(Redis, Hash, LazyList, Options) :- 873 scan_options([match,count,type], Options, Parms), 874 lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList). 875 876redis_zscan(Redis, Set, LazyList, Options) :- 877 scan_options([match,count,type], Options, Parms), 878 lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList). 879 880scan_options([], _, []). 881scan_options([H|T0], Options, [H,V|T]) :- 882 Term =.. [H,V], 883 option(Term, Options), 884 !, 885 scan_options(T0, Options, T). 886scan_options([_|T0], Options, T) :- 887 scan_options(T0, Options, T). 888 889 890scan_next(State, List, Tail) :- 891 State = s(Command,Redis,Cursor,Params), 892 Command =.. CList, 893 append(CList, [Cursor|Params], CList2), 894 Term =.. CList2, 895 redis(Redis, Term, [NewCursor,Elems0]), 896 scan_pairs(Command, Elems0, Elems), 897 ( NewCursor == 0 898 -> List = Elems, 899 Tail = [] 900 ; nb_setarg(3, State, NewCursor), 901 append(Elems, Tail, List) 902 ). 903 904scan_pairs(hscan(_), List, Pairs) :- 905 !, 906 scan_pairs(List, Pairs). 907scan_pairs(zscan(_), List, Pairs) :- 908 !, 909 scan_pairs(List, Pairs). 910scan_pairs(_, List, List). 911 912scan_pairs([], []). 913scan_pairs([Key,Value|T0], [Key-Value|T]) :- 914 !, 915 scan_pairs(T0, T). 916scan_pairs([Key-Value|T0], [Key-Value|T]) :- 917 scan_pairs(T0, T). 918 919 920 /******************************* 921 * ABOUT * 922 *******************************/
931redis_current_command(Redis, Command) :- 932 redis_current_command(Redis, Command, _). 933 934redis_current_command(Redis, Command, Properties) :- 935 nonvar(Command), 936 !, 937 redis(Redis, command(info, Command), [[_|Properties]]). 938redis_current_command(Redis, Command, Properties) :- 939 redis(Redis, command, Commands), 940 member([Name|Properties], Commands), 941 atom_string(Command, Name).
redis(info, String)
and parses the result. As this is for machine
usage, properties names *_human are skipped.949redis_property(Redis, Property) :- 950 redis(Redis, info, String), 951 info_terms(String, Terms), 952 member(Property, Terms). 953 954info_terms(Info, Pairs) :- 955 split_string(Info, "\n", "\r\n ", Lines), 956 convlist(info_line_term, Lines, Pairs). 957 958info_line_term(Line, Term) :- 959 sub_string(Line, B, _, A, :), 960 !, 961 sub_atom(Line, 0, B, _, Name), 962 \+ sub_atom(Name, _, _, 0, '_human'), 963 sub_string(Line, _, A, 0, ValueS), 964 ( number_string(Value, ValueS) 965 -> true 966 ; Value = ValueS 967 ), 968 Term =.. [Name,Value]. 969 970 971 /******************************* 972 * SUBSCRIBE * 973 *******************************/
redis(Id, Channel, Data)
If redis_unsubscribe/2 removes the last subscription, the thread terminates.
To simply print the incomming messages use e.g.
?- listen(redis(_, Channel, Data), format('Channel ~p got ~p~n', [Channel,Data])). true. ?- redis_subscribe(default, test, Id, []). Id = redis_pubsub_3, ?- redis(publish(test, "Hello world")). Channel test got "Hello world" 1 true.
1003:- dynamic ( subscription/2, % Id, Channel 1004 listening/3 % Id, Connection, Thread 1005 ) as volatile. 1006 1007redis_subscribe(Redis, Spec, Id, Options) :- 1008 atom(Redis), 1009 !, 1010 channels(Spec, Channels), 1011 pubsub_thread_options(ThreadOptions, Options), 1012 thread_create(setup_call_cleanup( 1013 redis_connect(Redis, Conn, [reconnect(true)]), 1014 redis_subscribe1(Redis, Conn, Channels), 1015 redis_disconnect(Conn)), 1016 Thread, 1017 ThreadOptions), 1018 pubsub_id(Thread, Id). 1019redis_subscribe(Redis, Spec, Id, Options) :- 1020 channels(Spec, Channels), 1021 pubsub_thread_options(ThreadOptions, Options), 1022 thread_create(redis_subscribe1(Redis, Redis, Channels), 1023 Thread, 1024 ThreadOptions), 1025 pubsub_id(Thread, Id). 1026 1027pubsub_thread_options(ThreadOptions, Options) :- 1028 merge_options(Options, [detached(true)], ThreadOptions). 1029 1030pubsub_id(Thread, Thread). 1031%pubsub_id(Thread, Id) :- 1032% thread_property(Thread, id(TID)), 1033% atom_concat('redis_pubsub_', TID, Id). 1034 1035redis_subscribe1(Redis, Conn, Channels) :- 1036 Error = error(Formal, _), 1037 catch(redis_subscribe2(Redis, Conn, Channels), Error, true), 1038 ( var(Formal) 1039 -> true 1040 ; recover(Error, Conn, redis1(Conn, echo("reconnect"), _)), 1041 thread_self(Me), 1042 pubsub_id(Me, Id), 1043 findall(Channel, subscription(Id, Channel), CurrentChannels), 1044 redis_subscribe1(Redis, Conn, CurrentChannels) 1045 ). 1046 1047redis_subscribe2(Redis, Conn, Channels) :- 1048 redis_subscribe3(Conn, Channels), 1049 redis_listen(Redis, Conn). 1050 1051redis_subscribe3(Conn, Channels) :- 1052 thread_self(Me), 1053 pubsub_id(Me, Id), 1054 prolog_listen(this_thread_exit, pubsub_clean(Id)), 1055 maplist(register_subscription(Id), Channels), 1056 redis_stream(Conn, S, true), 1057 Req =.. [subscribe|Channels], 1058 redis_write_msg(S, Req). 1059 1060pubsub_clean(Id) :- 1061 retractall(listening(Id, _Connection, _Thread)), 1062 retractall(subscription(Id, _Channel)).
1074redis_subscribe(Id, Spec) :- 1075 channels(Spec, Channels), 1076 ( listening(Id, Connection, _Thread) 1077 -> true 1078 ; existence_error(redis_pubsub, Id) 1079 ), 1080 maplist(register_subscription(Id), Channels), 1081 redis_stream(Connection, S, true), 1082 Req =.. [subscribe|Channels], 1083 redis_write_msg(S, Req). 1084 1085redis_unsubscribe(Id, Spec) :- 1086 channels(Spec, Channels), 1087 ( listening(Id, Connection, _Thread) 1088 -> true 1089 ; existence_error(redis_pubsub, Id) 1090 ), 1091 maplist(unregister_subscription(Id), Channels), 1092 redis_stream(Connection, S, true), 1093 Req =.. [unsubscribe|Channels], 1094 redis_write_msg(S, Req).
1100redis_current_subscription(Id, Channels) :- 1101 findall(Id-Channel, subscription(Id, Channel), Pairs), 1102 keysort(Pairs, Sorted), 1103 group_pairs_by_key(Sorted, Grouped), 1104 member(Id-Channels, Grouped). 1105 1106channels(Spec, List) :- 1107 is_list(Spec), 1108 !, 1109 maplist(channel_name, Spec, List). 1110channels(Ch, [Key]) :- 1111 channel_name(Ch, Key). 1112 1113channel_name(Atom, Atom) :- 1114 atom(Atom), 1115 !. 1116channel_name(Key, Atom) :- 1117 phrase(key_parts(Key), Parts), 1118 !, 1119 atomic_list_concat(Parts, :, Atom). 1120channel_name(Key, _) :- 1121 type_error(redis_key, Key). 1122 1123key_parts(Var) --> 1124 { var(Var), !, fail }. 1125key_parts(Atom) --> 1126 { atom(Atom) }, 1127 !, 1128 [Atom]. 1129key_parts(A:B) --> 1130 key_parts(A), 1131 key_parts(B). 1132 1133 1134 1135 1136register_subscription(Id, Channel) :- 1137 ( subscription(Id, Channel) 1138 -> true 1139 ; assertz(subscription(Id, Channel)) 1140 ). 1141 1142unregister_subscription(Id, Channel) :- 1143 retractall(subscription(Id, Channel)). 1144 1145redis_listen(Redis, Conn) :- 1146 thread_self(Me), 1147 pubsub_id(Me, Id), 1148 setup_call_cleanup( 1149 assertz(listening(Id, Conn, Me), Ref), 1150 redis_listen_loop(Redis, Id, Conn), 1151 erase(Ref)). 1152 1153redis_listen_loop(Redis, Id, Conn) :- 1154 redis_stream(Conn, S, true), 1155 ( subscription(Id, _) 1156 -> redis_read_stream(Redis, S, Reply), 1157 redis_broadcast(Redis, Reply), 1158 redis_listen_loop(Redis, Id, Conn) 1159 ; true 1160 ). 1161 1162redis_broadcast(_, [subscribe, _Channel, _N]) :- 1163 !. 1164redis_broadcast(Redis, [message, Channel, Data]) :- 1165 !, 1166 catch(broadcast(redis(Redis, Channel, Data)), 1167 Error, 1168 print_message(error, Error)). 1169redis_broadcast(Redis, Message) :- 1170 assertion((Message = [Type, Channel, _Data], 1171 atom(Type), 1172 atom(Channel))), 1173 debug(redis(warning), '~p: Unknown message while listening: ~p', 1174 [Redis,Message]). 1175 1176 1177 /******************************* 1178 * READ/WRITE * 1179 *******************************/
nil
status(String)
true
or false
). RESP3 only.If something goes wrong, the connection is closed and an exception is raised.
1196redis_read_stream(Redis, SI, Out) :- 1197 E = error(Formal,_), 1198 catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true), 1199 ( var(Formal) 1200 -> handle_push_messages(Push, Redis), 1201 ( var(Error) 1202 -> Out = Out0 1203 ; resync(Redis), 1204 throw(Error) 1205 ) 1206 ; redis_disconnect(Redis, [force(true)]), 1207 throw(E) 1208 ). 1209 1210handle_push_messages([], _). 1211handle_push_messages([H|T], Redis) :- 1212 ( catch(handle_push_message(H, Redis), E, 1213 print_message(warning, E)) 1214 -> true 1215 ; true 1216 ), 1217 handle_push_messages(T, Redis). 1218 1219handle_push_message(["pubsub"|List], Redis) :- 1220 redis_broadcast(Redis, List).
1230resync(Redis) :- 1231 E = error(Formal,_), 1232 catch(do_resync(Redis), E, true), 1233 ( var(Formal) 1234 -> true 1235 ; redis_disconnect(Redis, [force(true)]), 1236 throw(E) 1237 ). 1238 1239do_resync(Redis) :- 1240 A is random(1_000_000_000), 1241 redis_stream(Redis, S, true), 1242 redis_write_msg(S, echo(A)), 1243 '$redis_resync'(S, A).
redis4pl
.
1258 /******************************* 1259 * MESSAGES * 1260 *******************************/ 1261 1262:- multifile 1263 prolog:error_message//1, 1264 prolog:message//1. 1265 1266prologerror_message(redis_error(Code, String)) --> 1267 [ 'REDIS: ~w: ~s'-[Code, String] ]. 1268 1269prologmessage(redis(retry(_Redis, _Failures, Wait, Error))) --> 1270 [ 'REDIS: connection error. Retrying in ~2f seconds'-[Wait], nl ], 1271 [ ' '-[] ], '$messages':translate_message(Error)
Redis client
This library is a client to Redis, a popular key value store to deal with caching and communication between micro services.
In the typical use case we register the details of one or more Redis servers using redis_server/3. Subsequenly, redis/2-3 is used to issue commands on the server. For example:
*/