View source with formatted comments or as raw
    1/*  Part of SWISH
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2015-2020, VU University Amsterdam
    7                              CWI Amsterdam
    8                              SWI-Prolog Solutions b.v.
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(gitty_driver_files,
   38          [ gitty_open/2,               % +Store, +Options
   39            gitty_close/1,              % +Store
   40            gitty_file/4,               % +Store, ?Name, ?Ext, ?Hash
   41
   42            gitty_update_head/5,        % +Store, +Name, +OldCommit, +NewCommit
   43					% +DataHash
   44            delete_head/2,              % +Store, +Name
   45            set_head/3,                 % +Store, +Name, +Hash
   46            store_object/4,             % +Store, +Hash, +Header, +Data
   47            delete_object/2,            % +Store, +Hash
   48
   49            gitty_hash/2,               % +Store, ?Hash
   50            load_plain_commit/3,        % +Store, +Hash, -Meta
   51            load_object/5,              % +Store, +Hash, -Data, -Type, -Size
   52            gitty_object_file/3,        % +Store, +Hash, -File
   53
   54            repack_objects/2,           % +Store, +Options
   55            pack_objects/6,             % +Store, +Objs, +Packs, +PackDir,
   56                                        % -File, +Opts
   57            unpack_packs/1,             % +Store
   58            unpack_pack/2,              % +Store, +PackFile
   59
   60            attach_pack/2,              % +Store, +PackFile
   61            gitty_fsck/1,               % +Store
   62            fsck_pack/1,                % +PackFile
   63            load_object_from_pack/4,    % +Hash, -Data, -Type, -Size
   64
   65            gitty_rescan/1              % Store
   66          ]).   67:- use_module(library(apply)).   68:- use_module(library(zlib)).   69:- use_module(library(filesex)).   70:- use_module(library(lists)).   71:- use_module(library(apply)).   72:- use_module(library(error)).   73:- use_module(library(debug)).   74:- use_module(library(zlib)).   75:- use_module(library(hash_stream)).   76:- use_module(library(option)).   77:- use_module(library(dcg/basics)).   78:- use_module(library(redis)).   79:- use_module(library(redis_streams)).   80:- use_module(gitty, [is_gitty_hash/1]).   81
   82:- use_module(swish_redis).   83
   84/** <module> Gitty plain files driver
   85
   86This version of the driver uses plain files  to store the gitty data. It
   87consists of a nested directory  structure   with  files  named after the
   88hash. Objects and hash computation is the same as for `git`. The _heads_
   89(files) are computed on startup by scanning all objects. There is a file
   90=ref/head= that is updated if a head is updated. Other clients can watch
   91this file and update their notion  of   the  head. This implies that the
   92store can handle multiple clients that can  access a shared file system,
   93optionally shared using NFS from different machines.
   94
   95The store is simple and robust. The  main disadvantages are long startup
   96times as the store holds more objects and relatively high disk usage due
   97to rounding the small objects to disk allocation units.
   98
   99@bug    Shared access does not work on Windows.
  100*/
  101
  102:- dynamic
  103    head/4,                             % Store, Name, Ext, Hash
  104    store/2,                            % Store, Updated
  105    commit/3,                           % Store, Hash, Meta
  106    heads_input_stream_cache/2,         % Store, Stream
  107    pack_object/6,                      % Hash, Type, Size, Offset, Store,PackFile
  108    attached_packs/1,                   % Store
  109    attached_pack/2,                    % Store, PackFile
  110    redis_db/3.                         % Store, DB, Prefix
  111
  112:- volatile
  113    head/4,
  114    store/2,
  115    commit/3,
  116    heads_input_stream_cache/2,
  117    pack_object/6,
  118    attached_packs/1,
  119    attached_pack/2.  120
  121:- multifile
  122    gitty:check_object/4.  123
  124% enable/disable syncing remote servers running on  the same file store.
  125% This facility requires shared access to files and thus doesn't work on
  126% Windows.
  127
  128:- if(current_prolog_flag(windows, true)).  129remote_sync(false).
  130:- else.  131remote_sync(true).
  132:- endif.  133
  134%!  gitty_open(+Store, +Options) is det.
  135%
  136%   Driver  specific  initialization.  Handles  setting    up   a  Redis
  137%   connection when requested.  This processes:
  138%
  139%     - redis(+DB)
  140%       Name of the redis DB to connect to.  See redis_server/3.
  141%     - redis_prefix(+Prefix)
  142%       Prefix for all keys.  This can be used to host multiple
  143%       SWISH servers on the same redis cluster.  Default is `swish`.
  144
  145gitty_open(Store, Options) :-
  146    option(redis(DB), Options),
  147    !,
  148    option(redis_prefix(Prefix), Options, swish),
  149    asserta(redis_db(Store, DB, Prefix)),
  150    thread_create(gitty_scan(Store), _, [detached(true)]).
  151gitty_open(_, _).
  152
  153
  154%!  gitty_close(+Store) is det.
  155%
  156%   Close resources associated with a store.
  157
  158gitty_close(Store) :-
  159    (   retract(heads_input_stream_cache(Store, In))
  160    ->  close(In)
  161    ;   true
  162    ),
  163    retractall(head(Store,_,_,_)),
  164    retractall(store(Store,_)),
  165    retractall(pack_object(_,_,_,_,Store,_)).
  166
  167
  168%!  gitty_file(+Store, ?File, ?Ext, ?Head) is nondet.
  169%
  170%   True when File entry in the  gitty   store  and Head is the HEAD
  171%   revision.
  172
  173gitty_file(Store, Head, Ext, Hash) :-
  174    redis_db(Store, _, _),
  175    !,
  176    gitty_scan(Store),
  177    redis_file(Store, Head, Ext, Hash).
  178gitty_file(Store, Head, Ext, Hash) :-
  179    gitty_scan(Store),
  180    head(Store, Head, Ext, Hash).
  181
  182%!  load_plain_commit(+Store, +Hash, -Meta:dict) is semidet.
  183%
  184%   Load the commit data as a  dict.   Loaded  commits are cached in
  185%   commit/3.  Note  that  only  adding  a  fact  to  the  cache  is
  186%   synchronized. This means that during  a   race  situation we may
  187%   load the same object  multiple  times   from  disk,  but this is
  188%   harmless while a lock  around   the  whole  predicate serializes
  189%   loading different objects, which is not needed.
  190
  191load_plain_commit(Store, Hash, Meta) :-
  192    must_be(atom, Store),
  193    must_be(atom, Hash),
  194    commit(Store, Hash, Meta),
  195    !.
  196load_plain_commit(Store, Hash, Meta) :-
  197    load_object(Store, Hash, String, _, _),
  198    term_string(Meta0, String, []),
  199    with_mutex(gitty_commit_cache,
  200               assert_cached_commit(Store, Hash, Meta0)),
  201    Meta = Meta0.
  202
  203assert_cached_commit(Store, Hash, Meta) :-
  204    commit(Store, Hash, Meta0),
  205    !,
  206    assertion(Meta0 =@= Meta).
  207assert_cached_commit(Store, Hash, Meta) :-
  208    assertz(commit(Store, Hash, Meta)).
  209
  210%!  store_object(+Store, +Hash, +Header:string, +Data:string) is det.
  211%
  212%   Store the actual object. The store  must associate Hash with the
  213%   concatenation of Hdr and Data.
  214
  215store_object(Store, Hash, _Hdr, _Data) :-
  216    pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
  217    !.
  218store_object(Store, Hash, Hdr, Data) :-
  219    gitty_object_file(Store, Hash, Path),
  220    with_mutex(gitty_file, exists_or_create(Path, Out0)),
  221    (   var(Out0)
  222    ->  true
  223    ;   setup_call_cleanup(
  224            zopen(Out0, Out, [format(gzip)]),
  225            format(Out, '~s~s', [Hdr, Data]),
  226            close(Out))
  227    ).
  228
  229exists_or_create(Path, _Out) :-
  230    exists_file(Path),
  231    !.
  232exists_or_create(Path, Out) :-
  233    file_directory_name(Path, Dir),
  234    make_directory_path(Dir),
  235    open(Path, write, Out, [encoding(utf8), lock(write)]).
  236
  237:- if(\+current_predicate(ensure_directory/1)).  238% in Library as of SWI-Prolog 9.1.20
  239ensure_directory(Dir) :-
  240    exists_directory(Dir),
  241    !.
  242ensure_directory(Dir) :-
  243    make_directory(Dir).
  244:- endif.  245
  246%!  store_object_raw(+Store, +Hash, +Bytes:string, -New) is det.
  247%
  248%   Store an object  from  raw  bytes.   This  is  used  for replicating
  249%   objects.
  250
  251store_object_raw(Store, Hash, _Bytes, false) :-
  252    pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
  253    !.
  254store_object_raw(Store, Hash, Bytes, New) :-
  255    gitty_object_file(Store, Hash, Path),
  256    with_mutex(gitty_file, exists_or_create(Path, Out)),
  257    (   var(Out)
  258    ->  New = false
  259    ;   call_cleanup(
  260            ( set_stream(Out, type(binary)),
  261              write(Out, Bytes)
  262            ),
  263            close(Out)),
  264        New = true
  265    ).
  266
  267%!  load_object(+Store, +Hash, -Data, -Type, -Size) is det.
  268%
  269%   Load the given object.
  270
  271load_object(_Store, Hash, Data, Type, Size) :-
  272    load_object_from_pack(Hash, Data0, Type0, Size0),
  273    !,
  274    f(Data0, Type0, Size0) = f(Data, Type, Size).
  275load_object(Store, Hash, Data, Type, Size) :-
  276    load_object_file(Store, Hash, Data0, Type0, Size0),
  277    !,
  278    f(Data0, Type0, Size0) = f(Data, Type, Size).
  279load_object(Store, Hash, Data, Type, Size) :-
  280    redis_db(Store, _, _),
  281    redis_replicate_get(Store, Hash),
  282    load_object_file(Store, Hash, Data, Type, Size).
  283
  284load_object_file(Store, Hash, Data, Type, Size) :-
  285    gitty_object_file(Store, Hash, Path),
  286    exists_file(Path),
  287    !,
  288    setup_call_cleanup(
  289        gzopen(Path, read, In, [encoding(utf8)]),
  290        read_object(In, Data, Type, Size),
  291        close(In)).
  292
  293%!  load_object_raw(+Store, +Hash, -Data)
  294%
  295%   Load the compressed data for an object. Intended for replication.
  296
  297load_object_raw(_Store, Hash, Bytes) :-
  298    load_object_from_pack(Hash, Data, Type, Size),
  299    !,
  300    object_bytes(Type, Size, Data, Bytes).
  301load_object_raw(Store, Hash, Data) :-
  302    gitty_object_file(Store, Hash, Path),
  303    exists_file(Path),
  304    !,
  305    setup_call_cleanup(
  306        open(Path, read, In, [type(binary)]),
  307        read_string(In, _, Data),
  308        close(In)).
  309
  310%!  object_bytes(+Type, +Size, +Data, -Bytes) is det.
  311%
  312%   Encode an object with the given parameters in memory.
  313
  314object_bytes(Type, Size, Data, Bytes) :-
  315    setup_call_cleanup(
  316        new_memory_file(MF),
  317        ( setup_call_cleanup(
  318              open_memory_file(MF, write, Out, [encoding(octet)]),
  319              setup_call_cleanup(
  320                  zopen(Out, ZOut, [format(gzip), close_parent(false)]),
  321                  ( set_stream(ZOut, encoding(utf8)),
  322                    format(ZOut, '~w ~d\u0000~w', [Type, Size, Data])
  323                  ),
  324                  close(ZOut)),
  325              close(Out)),
  326          memory_file_to_string(MF, Bytes, octet)
  327        ),
  328        free_memory_file(MF)).
  329
  330
  331%!  load_object_header(+Store, +Hash, -Type, -Size) is det.
  332%
  333%   Load the header of an object
  334
  335load_object_header(Store, Hash, Type, Size) :-
  336    gitty_object_file(Store, Hash, Path),
  337    setup_call_cleanup(
  338        gzopen(Path, read, In, [encoding(utf8)]),
  339        read_object_hdr(In, Type, Size),
  340        close(In)).
  341
  342read_object(In, Data, Type, Size) :-
  343    read_object_hdr(In, Type, Size),
  344    read_string(In, _, Data).
  345
  346read_object_hdr(In, Type, Size) :-
  347    get_code(In, C0),
  348    read_hdr(C0, In, Hdr),
  349    phrase((nonblanks(TypeChars), " ", integer(Size)), Hdr),
  350    atom_codes(Type, TypeChars).
  351
  352read_hdr(C, In, [C|T]) :-
  353    C > 0,
  354    !,
  355    get_code(In, C1),
  356    read_hdr(C1, In, T).
  357read_hdr(_, _, []).
  358
  359%!  gitty_rescan(?Store) is det.
  360%
  361%   Update our view of the shared   storage  for all stores matching
  362%   Store.
  363
  364gitty_rescan(Store) :-
  365    retractall(store(Store, _)).
  366
  367%!  gitty_scan(+Store) is det.
  368%
  369%   Scan gitty store for files (entries),   filling  head/3. This is
  370%   performed lazily at first access to the store.
  371%
  372%   @tdb    Possibly we need to maintain a cached version of this
  373%           index to avoid having to open all objects of the gitty
  374%           store.
  375
  376gitty_scan(Store) :-
  377    store(Store, _),
  378    !,
  379    remote_updates(Store).
  380gitty_scan(Store) :-
  381    with_mutex(gitty, gitty_scan_sync(Store)).
  382
  383:- thread_local
  384    latest/3.  385
  386gitty_scan_sync(Store) :-
  387    store(Store, _),
  388    !.
  389gitty_scan_sync(Store) :-
  390    redis_db(Store, _, _),
  391    !,
  392    gitty_attach_packs(Store),
  393    redis_ensure_heads(Store),
  394    get_time(Now),
  395    assertz(store(Store, Now)).
  396:- if(remote_sync(true)).  397gitty_scan_sync(Store) :-
  398    remote_sync(true),
  399    !,
  400    gitty_attach_packs(Store),
  401    restore_heads_from_remote(Store).
  402:- endif.  403gitty_scan_sync(Store) :-
  404    gitty_attach_packs(Store),
  405    read_heads_from_objects(Store).
  406
  407%!  read_heads_from_objects(+Store) is det.
  408%
  409%   Establish the head(Store,File,Ext,Hash) relation  by reading all
  410%   objects and adding a fact for the most recent commit.
  411
  412read_heads_from_objects(Store) :-
  413    gitty_scan_latest(Store),
  414    forall(retract(latest(Name, Hash, _Time)),
  415           assert_head(Store, Name, Hash)),
  416    get_time(Now),
  417    assertz(store(Store, Now)).
  418
  419assert_head(Store, Name, Hash) :-
  420    file_name_extension(_, Ext, Name),
  421    assertz(head(Store, Name, Ext, Hash)).
  422
  423
  424%!  gitty_scan_latest(+Store)
  425%
  426%   Scans the gitty store, extracting  the   latest  version of each
  427%   named entry.
  428
  429gitty_scan_latest(Store) :-
  430    retractall(head(Store, _, _, _)),
  431    retractall(latest(_, _, _)),
  432    (   gitty_hash(Store, Hash),
  433        load_object(Store, Hash, Data, commit, _Size),
  434        term_string(Meta, Data, []),
  435        _{name:Name, time:Time} :< Meta,
  436        (   latest(Name, _, OldTime),
  437            OldTime > Time
  438        ->  true
  439        ;   retractall(latest(Name, _, _)),
  440            assertz(latest(Name, Hash, Time))
  441        ),
  442        fail
  443    ;   true
  444    ).
  445
  446
  447%!  gitty_hash(+Store, ?Hash) is nondet.
  448%
  449%   True when Hash is an object in the store.
  450
  451gitty_hash(Store, Hash) :-
  452    var(Hash),
  453    !,
  454    (   gitty_attach_packs(Store),
  455        pack_object(Hash, _Type, _Size, _Offset, Store, _Pack)
  456    ;   gitty_file_object(Store, Hash)
  457    ).
  458gitty_hash(Store, Hash) :-
  459    (   gitty_attach_packs(Store),
  460        pack_object(Hash, _Type, _Size, _Offset, Store, _Pack)
  461    ->  true
  462    ;   gitty_object_file(Store, Hash, File),
  463        exists_file(File)
  464    ).
  465
  466gitty_file_object(Store, Hash) :-
  467    access_file(Store, exist),
  468    directory_files(Store, Level0),
  469    member(E0, Level0),
  470    E0 \== '..',
  471    atom_length(E0, 2),
  472    directory_file_path(Store, E0, Dir0),
  473    directory_files(Dir0, Level1),
  474    member(E1, Level1),
  475    E1 \== '..',
  476    atom_length(E1, 2),
  477    directory_file_path(Dir0, E1, Dir),
  478    directory_files(Dir, Files),
  479    member(File, Files),
  480    atom_length(File, 36),
  481    atomic_list_concat([E0,E1,File], Hash).
  482
  483%!  delete_object(+Store, +Hash)
  484%
  485%   Delete an existing object
  486
  487delete_object(Store, Hash) :-
  488    gitty_object_file(Store, Hash, File),
  489    delete_file(File).
  490
  491%!  gitty_object_file(+Store, +Hash, -Path) is det.
  492%
  493%   True when Path is the file  at   which  the  object with Hash is
  494%   stored.
  495
  496gitty_object_file(Store, Hash, Path) :-
  497    sub_string(Hash, 0, 2, _, Dir0),
  498    sub_string(Hash, 2, 2, _, Dir1),
  499    sub_string(Hash, 4, _, 0, File),
  500    atomic_list_concat([Store, Dir0, Dir1, File], /, Path).
  501
  502
  503                 /*******************************
  504                 *            SYNCING           *
  505                 *******************************/
  506
  507%!  gitty_update_head(+Store, +Name, +OldCommit,
  508%!                    +NewCommit, +DataHash) is det.
  509%
  510%   Update the head of a gitty  store   for  Name.  OldCommit is the
  511%   current head and NewCommit is the new  head. If Name is created,
  512%   and thus there is no head, OldCommit must be `-`.
  513%
  514%   This operation can fail because another   writer has updated the
  515%   head.  This can both be in-process or another process.
  516
  517gitty_update_head(Store, Name, OldCommit, NewCommit, DataHash) :-
  518    redis_db(Store, _, _),
  519    !,
  520    redis_update_head(Store, Name, OldCommit, NewCommit, DataHash).
  521gitty_update_head(Store, Name, OldCommit, NewCommit, _) :-
  522    with_mutex(gitty,
  523               gitty_update_head_sync(Store, Name, OldCommit, NewCommit)).
  524
  525:- if(remote_sync(true)).  526gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :-
  527    remote_sync(true),
  528    !,
  529    setup_call_cleanup(
  530        heads_output_stream(Store, HeadsOut),
  531        gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut),
  532        close(HeadsOut)).
  533:- endif.  534gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :-
  535    gitty_update_head_sync2(Store, Name, OldCommit, NewCommit).
  536
  537gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut) :-
  538    gitty_update_head_sync2(Store, Name, OldCommit, NewCommit),
  539    format(HeadsOut, '~q.~n', [head(Name, OldCommit, NewCommit)]).
  540
  541gitty_update_head_sync2(Store, Name, OldCommit, NewCommit) :-
  542    gitty_scan(Store),              % fetch remote changes
  543    (   OldCommit == (-)
  544    ->  (   head(Store, Name, _, _)
  545        ->  throw(error(gitty(file_exists(Name),_)))
  546        ;   assert_head(Store, Name, NewCommit)
  547        )
  548    ;   (   retract(head(Store, Name, _, OldCommit))
  549        ->  assert_head(Store, Name, NewCommit)
  550        ;   throw(error(gitty(not_at_head(Name, OldCommit)), _))
  551        )
  552    ).
  553
  554%!  remote_updates(+Store)
  555%
  556%   Watch for remote updates to the store. We only do this if we did
  557%   not do so the last second.
  558
  559:- dynamic
  560    last_remote_sync/2.  561
  562:- if(remote_sync(false)).  563remote_updates(_) :-
  564    remote_sync(false),
  565    !.
  566:- endif.  567remote_updates(Store) :-
  568    remote_up_to_data(Store),
  569    !.
  570remote_updates(Store) :-
  571    with_mutex(gitty, remote_updates_sync(Store)).
  572
  573remote_updates_sync(Store) :-
  574    remote_up_to_data(Store),
  575    !.
  576remote_updates_sync(Store) :-
  577    retractall(last_remote_sync(Store, _)),
  578    get_time(Now),
  579    asserta(last_remote_sync(Store, Now)),
  580    remote_update(Store).
  581
  582remote_up_to_data(Store) :-
  583    last_remote_sync(Store, Last),
  584    get_time(Now),
  585    Now-Last < 1.
  586
  587remote_update(Store) :-
  588    remote_updates(Store, List),
  589    maplist(update_head(Store), List).
  590
  591update_head(Store, head(Name, OldCommit, NewCommit)) :-
  592    (   OldCommit == (-)
  593    ->  \+ head(Store, Name, _, _)
  594    ;   retract(head(Store, Name, _, OldCommit))
  595    ),
  596    !,
  597    assert_head(Store, Name, NewCommit).
  598update_head(_, _).
  599
  600%!  remote_updates(+Store, -List) is det.
  601%
  602%   Find updates from other gitties  on   the  same filesystem. Note
  603%   that we have to push/pop the input   context to avoid creating a
  604%   notion of an  input  context   which  possibly  relate  messages
  605%   incorrectly to the sync file.
  606
  607remote_updates(Store, List) :-
  608    heads_input_stream(Store, Stream),
  609    setup_call_cleanup(
  610        '$push_input_context'(gitty_sync),
  611        read_new_terms(Stream, List),
  612        '$pop_input_context').
  613
  614read_new_terms(Stream, Terms) :-
  615    read(Stream, First),
  616    read_new_terms(First, Stream, Terms).
  617
  618read_new_terms(end_of_file, _, List) :-
  619    !,
  620    List = [].
  621read_new_terms(Term, Stream, [Term|More]) :-
  622    read(Stream, Term2),
  623    read_new_terms(Term2, Stream, More).
  624
  625heads_output_stream(Store, Out) :-
  626    heads_file(Store, HeadsFile),
  627    open(HeadsFile, append, Out,
  628         [ encoding(utf8),
  629           lock(exclusive)
  630         ]).
  631
  632heads_input_stream(Store, Stream) :-
  633    heads_input_stream_cache(Store, Stream0),
  634    !,
  635    Stream = Stream0.
  636heads_input_stream(Store, Stream) :-
  637    heads_file(Store, File),
  638    between(1, 2, _),
  639    catch(open(File, read, In,
  640               [ encoding(utf8),
  641                 eof_action(reset)
  642               ]),
  643          _,
  644          create_heads_file(Store)),
  645    !,
  646    assert(heads_input_stream_cache(Store, In)),
  647    Stream = In.
  648
  649create_heads_file(Store) :-
  650    call_cleanup(
  651        heads_output_stream(Store, Out),
  652        close(Out)),
  653    fail.                                   % always fail!
  654
  655heads_file(Store, HeadsFile) :-
  656    ensure_directory(Store),
  657    directory_file_path(Store, ref, RefDir),
  658    ensure_directory(RefDir),
  659    directory_file_path(RefDir, head, HeadsFile).
  660
  661%!  restore_heads_from_remote(Store)
  662%
  663%   Restore the known heads by reading the remote sync file.
  664
  665restore_heads_from_remote(Store) :-
  666    heads_file(Store, File),
  667    exists_file(File),
  668    setup_call_cleanup(
  669        open(File, read, In, [encoding(utf8)]),
  670        restore_heads(Store, In),
  671        close(In)),
  672    !,
  673    get_time(Now),
  674    assertz(store(Store, Now)).
  675restore_heads_from_remote(Store) :-
  676    read_heads_from_objects(Store),
  677    heads_file(Store, File),
  678    setup_call_cleanup(
  679        open(File, write, Out, [encoding(utf8)]),
  680        save_heads(Store, Out),
  681        close(Out)),
  682    !.
  683
  684restore_heads(Store, In) :-
  685    read(In, Term0),
  686    Term0 = epoch(_),
  687    read(In, Term1),
  688    restore_heads(Term1, In, Store).
  689
  690restore_heads(end_of_file, _, _) :- !.
  691restore_heads(head(File, _, Hash), In, Store) :-
  692    retractall(head(Store, File, _, _)),
  693    assert_head(Store, File, Hash),
  694    read(In, Term),
  695    restore_heads(Term, In, Store).
  696
  697save_heads(Store, Out) :-
  698    get_time(Now),
  699    format(Out, 'epoch(~0f).~n~n', [Now]),
  700    forall(head(Store, File, _, Hash),
  701           format(Out, '~q.~n', [head(File, -, Hash)])).
  702
  703
  704%!  delete_head(+Store, +Head) is det.
  705%
  706%   Delete Head from Store. Used  by   gitty_fsck/1  to remove heads
  707%   that have no commits. Should  we   forward  this  to remotes, or
  708%   should they do their own thing?
  709
  710delete_head(Store, Head) :-
  711    redis_db(Store, _, _),
  712    !,
  713    redis_delete_head(Store, Head).
  714delete_head(Store, Head) :-
  715    retractall(head(Store, Head, _, _)).
  716
  717%!  set_head(+Store, +File, +Hash) is det.
  718%
  719%   Set the head of the given File to Hash
  720
  721set_head(Store, File, Hash) :-
  722    redis_db(Store, _, _),
  723    !,
  724    redis_set_head(Store, File, Hash).
  725set_head(Store, File, Hash) :-
  726    file_name_extension(_, Ext, File),
  727    (   head(Store, File, _, Hash0)
  728    ->  (   Hash == Hash0
  729        ->  true
  730        ;   asserta(head(Store, File, Ext, Hash)),
  731            retractall(head(Store, File, _, Hash0))
  732        )
  733    ;   asserta(head(Store, File, Ext, Hash))
  734    ).
  735
  736
  737                 /*******************************
  738                 *            PACKS             *
  739                 *******************************/
  740
  741/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  742
  743<pack file> := <header>
  744               <file>*
  745<header>    := "gitty(Version).\n" <object>* "end_of_header.\n"
  746<object>    := obj(Hash, Type, Size, FileSize)
  747- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  748
  749pack_version(1).
  750
  751%!  repack_objects(+Store, +Options) is det.
  752%
  753%   Repack  objects  of  Store  for  reduced  disk  usage  and  enhanced
  754%   performance. By default this picks up all  file objects of the store
  755%   and all existing small pack files.  Options:
  756%
  757%     - small_pack(+Bytes)
  758%     Consider all packs with less than Bytes as small and repack them.
  759%     Default 10Mb
  760%     - min_files(+Count)
  761%     Do not repack if there are less than Count new files.
  762%     Default 1,000.
  763
  764:- debug(gitty(pack)).  765
  766repack_objects(Store, Options) :-
  767    option(min_files(MinFiles), Options, 1_000),
  768    findall(Object, gitty_file_object(Store, Object), Objects),
  769    length(Objects, NewFiles),
  770    debug(gitty(pack), 'Found ~D file objects', [NewFiles]),
  771    (   NewFiles >= MinFiles
  772    ->  pack_files(Store, ExistingPacks),
  773        option(small_pack(MaxSize), Options, 10_000_000),
  774        include(small_file(MaxSize), ExistingPacks, PackFiles),
  775        (   debugging(gitty(pack))
  776        ->  length(PackFiles, PackCount),
  777            debug(gitty(pack), 'Found ~D small packs', [PackCount])
  778        ;   true
  779        ),
  780        directory_file_path(Store, pack, PackDir),
  781        make_directory_path(PackDir),
  782        pack_objects(Store, Objects, PackFiles, PackDir, _PackFile, Options)
  783    ;   debug(gitty(pack), 'Nothing to do', [])
  784    ).
  785
  786small_file(MaxSize, File) :-
  787    size_file(File, Size),
  788    Size < MaxSize.
  789
  790%!  pack_objects(+Store, +Objects, +Packs, +PackDir,
  791%!               -PackFile, +Options) is det.
  792%
  793%   Pack the given objects and pack files into a new pack.
  794
  795pack_objects(Store, Objects, Packs, PackDir, PackFile, Options) :-
  796    with_mutex(gitty_pack,
  797               pack_objects_sync(Store, Objects, Packs, PackDir,
  798                                 PackFile, Options)).
  799
  800pack_objects_sync(_Store, [], [Pack], _, [Pack], _) :-
  801    !.
  802pack_objects_sync(Store, Objects, Packs, PackDir, PackFilePath, Options) :-
  803    length(Objects, ObjCount),
  804    length(Packs, PackCount),
  805    debug(gitty(pack), 'Repacking ~D objects and ~D packs',
  806          [ObjCount, PackCount]),
  807    maplist(object_info(Store), Objects, FileInfo),
  808    maplist(pack_info(Store), Packs, PackInfo),
  809    append([FileInfo|PackInfo], Info0),
  810    sort(1, @<, Info0, Info),           % remove possible duplicates
  811    (   debugging(gitty(pack))
  812    ->  (   PackCount > 0
  813        ->  length(Info, FinalObjCount),
  814            debug(gitty(pack), 'Total ~D objects', [FinalObjCount])
  815        ;   true
  816        )
  817    ;   true
  818    ),
  819    directory_file_path(PackDir, 'pack-create', TmpPack),
  820    setup_call_cleanup(
  821        (   open(TmpPack, write, Out0, [type(binary), lock(write)]),
  822            open_hash_stream(Out0, Out, [algorithm(sha1)])
  823        ),
  824        (   write_signature(Out),
  825            maplist(write_header(Out), Info),
  826            format(Out, 'end_of_header.~n', []),
  827            maplist(add_file(Out, Store), Info),
  828            stream_hash(Out, SHA1)
  829        ),
  830        close(Out)),
  831    format(atom(PackFile), 'pack-~w.pack', [SHA1]),
  832    directory_file_path(PackDir, PackFile, PackFilePath),
  833    rename_file(TmpPack, PackFilePath),
  834    debug(gitty(pack), 'Attaching ~p', [PackFilePath]),
  835    attach_pack(Store, PackFilePath),
  836    remove_objects_after_pack(Store, Objects, Options),
  837    delete(Packs, PackFilePath, RmPacks),
  838    remove_repacked_packs(Store, RmPacks, Options),
  839    debug(gitty(pack), 'Packing completed', []).
  840
  841object_info(Store, Object, obj(Object, Type, Size, FileSize)) :-
  842    gitty_object_file(Store, Object, File),
  843    load_object_header(Store, Object, Type, Size),
  844    size_file(File, FileSize).
  845
  846pack_info(Store, PackFile, Objects) :-
  847    attach_pack(Store, PackFile),
  848    pack_read_header(PackFile, _Version, _DataOffset, Objects).
  849
  850write_signature(Out) :-
  851    pack_version(Version),
  852    format(Out, "gitty(~d).~n", [Version]).
  853
  854write_header(Out, obj(Object, Type, Size, FileSize)) :-
  855    format(Out, 'obj(~q,~q,~d,~d).~n', [Object, Type, Size, FileSize]).
  856
  857%!  add_file(+Out, +Store, +Object) is det.
  858%
  859%   Add Object from Store to the pack stream Out.
  860
  861add_file(Out, Store, obj(Object, _Type, _Size, _FileSize)) :-
  862    gitty_object_file(Store, Object, File),
  863    exists_file(File),
  864    !,
  865    setup_call_cleanup(
  866        open(File, read, In, [type(binary)]),
  867        copy_stream_data(In, Out),
  868        close(In)).
  869add_file(Out, Store, obj(Object, Type, Size, FileSize)) :-
  870    pack_object(Object, Type, Size, Offset, Store, PackFile),
  871    setup_call_cleanup(
  872        open(PackFile, read, In, [type(binary)]),
  873        (   seek(In, Offset, bof, Offset),
  874            copy_stream_data(In, Out, FileSize)
  875        ),
  876        close(In)).
  877
  878
  879%!  gitty_fsck(+Store) is det.
  880%
  881%   Validate all packs associated with Store
  882
  883gitty_fsck(Store) :-
  884    pack_files(Store, PackFiles),
  885    maplist(fsck_pack, PackFiles).
  886
  887%!  fsck_pack(+File) is det.
  888%
  889%   Validate the integrity of the pack file File.
  890
  891fsck_pack(File) :-
  892    debug(gitty(pack), 'fsck ~p', [File]),
  893    check_pack_hash(File),
  894    debug(gitty(pack), 'fsck ~p: checking objects', [File]),
  895    check_pack_objects(File),
  896    debug(gitty(pack), 'fsck ~p: done', [File]).
  897
  898check_pack_hash(File) :-
  899    file_base_name(File, Base),
  900    file_name_extension(Plain, Ext, Base),
  901    must_be(oneof([pack]), Ext),
  902    atom_concat('pack-', Hash, Plain),
  903    setup_call_cleanup(
  904        (   open(File, read, In0, [type(binary)]),
  905            open_hash_stream(In0, In, [algorithm(sha1)])
  906        ),
  907        (   setup_call_cleanup(
  908                open_null_stream(Null),
  909                copy_stream_data(In, Null),
  910                close(Null)),
  911            stream_hash(In, SHA1)
  912        ),
  913        close(In)),
  914    assertion(Hash == SHA1).
  915
  916check_pack_objects(PackFile) :-
  917    setup_call_cleanup(
  918        open(PackFile, read, In, [type(binary)]),
  919        (  read_header(In, Version, DataOffset, Objects),
  920           set_stream(In, encoding(utf8)),
  921           foldl(check_object(In, PackFile, Version), Objects, DataOffset, _)
  922        ),
  923        close(In)).
  924
  925check_object(In, PackFile, _Version,
  926             obj(Object, Type, Size, FileSize),
  927             Offset0, Offset) :-
  928    Offset is Offset0+FileSize,
  929    byte_count(In, Here),
  930    (   Here == Offset0
  931    ->  true
  932    ;   print_message(warning, pack(reposition(Here, Offset0))),
  933        seek(In, Offset0, bof, Offset0)
  934    ),
  935    (   setup_call_cleanup(
  936            zopen(In, In2, [multi_part(false), close_parent(false)]),
  937            catch(read_object(In2, Data, _0RType, _0RSize), E,
  938                  ( print_message(error,
  939                                  gitty(PackFile, fsck(read_object(Object, E)))),
  940                    fail)),
  941            close(In2))
  942    ->  byte_count(In, End),
  943        (   End == Offset
  944        ->  true
  945        ;   print_message(error,
  946                          gitty(PackFile, fsck(object_end(Object, End,
  947                                                          Offset0, Offset,
  948                                                          Data))))
  949        ),
  950        assertion(Type == _0RType),
  951        assertion(Size == _0RSize),
  952        gitty:check_object(Object, Data, Type, Size)
  953    ;   true
  954    ).
  955
  956
  957%!  gitty_attach_packs(+Store) is det.
  958%
  959%   Attach all packs for Store
  960
  961gitty_attach_packs(Store) :-
  962    attached_packs(Store),
  963    !.
  964gitty_attach_packs(Store) :-
  965    with_mutex(gitty_attach_packs,
  966               gitty_attach_packs_sync(Store)).
  967
  968gitty_attach_packs_sync(Store) :-
  969    attached_packs(Store),
  970    !.
  971gitty_attach_packs_sync(Store) :-
  972    pack_files(Store, PackFiles),
  973    maplist(attach_pack(Store), PackFiles),
  974    asserta(attached_packs(Store)).
  975
  976pack_files(Store, Packs) :-
  977    directory_file_path(Store, pack, PackDir),
  978    exists_directory(PackDir),
  979    !,
  980    directory_files(PackDir, Files),
  981    convlist(is_pack(PackDir), Files, Packs).
  982pack_files(_, []).
  983
  984is_pack(PackDir, File, Path) :-
  985    file_name_extension(_, pack, File),
  986    directory_file_path(PackDir, File, Path).
  987
  988%!  attach_pack(+Store, +PackFile)
  989%
  990%   Load the index of Pack into memory.
  991
  992attach_pack(Store, PackFile) :-
  993    attached_pack(Store, PackFile),
  994    !.
  995attach_pack(Store, PackFile) :-
  996    retractall(pack_object(_,_,_,_,_,PackFile)),
  997    pack_read_header(PackFile, Version, DataOffset, Objects),
  998    foldl(assert_object(Store, PackFile, Version), Objects, DataOffset, _),
  999    assertz(attached_pack(Store, PackFile)).
 1000
 1001pack_read_header(PackFile, Version, DataOffset, Objects) :-
 1002    setup_call_cleanup(
 1003        open(PackFile, read, In, [type(binary)]),
 1004        read_header(In, Version, DataOffset, Objects),
 1005        close(In)).
 1006
 1007read_header(In, Version, DataOffset, Objects) :-
 1008    read(In, Signature),
 1009    (   Signature = gitty(Version)
 1010    ->  true
 1011    ;   domain_error(gitty_pack_file, Objects)
 1012    ),
 1013    read(In, Term),
 1014    read_index(Term, In, Objects),
 1015    get_code(In, Code),
 1016    assertion(Code == 0'\n),
 1017    byte_count(In, DataOffset).
 1018
 1019read_index(end_of_header, _, []) :-
 1020    !.
 1021read_index(Object, In, [Object|T]) :-
 1022    read(In, Term2),
 1023    read_index(Term2, In, T).
 1024
 1025assert_object(Store, Pack, _Version,
 1026              obj(Object, Type, Size, FileSize),
 1027              Offset0, Offset) :-
 1028    Offset is Offset0+FileSize,
 1029    assertz(pack_object(Object, Type, Size, Offset0, Store, Pack)).
 1030
 1031%!  detach_pack(+Store, +Pack) is det.
 1032%
 1033%   Remove a pack file from the memory index.
 1034
 1035detach_pack(Store, Pack) :-
 1036    retractall(pack_object(_, _, _, _, Store, Pack)),
 1037    retractall(attached_pack(Store, Pack)).
 1038
 1039%!  load_object_from_pack(+Hash, -Data, -Type, -Size) is semidet.
 1040%
 1041%   True when Hash is in a pack and can be loaded.
 1042
 1043load_object_from_pack(Hash, Data, Type, Size) :-
 1044    pack_object(Hash, Type, Size, Offset, _Store, Pack),
 1045    setup_call_cleanup(
 1046        open(Pack, read, In, [type(binary)]),
 1047        read_object_at(In, Offset, Data, Type, Size),
 1048        close(In)).
 1049
 1050read_object_at(In, Offset, Data, Type, Size) :-
 1051    seek(In, Offset, bof, Offset),
 1052    read_object_here(In, Data, Type, Size).
 1053
 1054read_object_here(In, Data, Type, Size) :-
 1055    stream_property(In, encoding(Enc)),
 1056    setup_call_cleanup(
 1057        ( set_stream(In, encoding(utf8)),
 1058          zopen(In, In2, [multi_part(false), close_parent(false)])
 1059        ),
 1060        read_object(In2, Data, Type, Size),
 1061        ( close(In2),
 1062          set_stream(In, encoding(Enc))
 1063        )).
 1064
 1065
 1066%!  unpack_packs(+Store) is det.
 1067%
 1068%   Unpack all packs.
 1069
 1070unpack_packs(Store) :-
 1071    absolute_file_name(Store, AbsStore, [file_type(directory),
 1072                                         access(read)]),
 1073    pack_files(AbsStore, Packs),
 1074    maplist(unpack_pack(AbsStore), Packs).
 1075
 1076%!  unpack_pack(+Store, +Pack) is det.
 1077%
 1078%   Turn a pack back into a plain object files
 1079
 1080unpack_pack(Store, PackFile) :-
 1081    pack_read_header(PackFile, _Version, DataOffset, Objects),
 1082    setup_call_cleanup(
 1083        open(PackFile, read, In, [type(binary)]),
 1084        foldl(create_file(Store, In), Objects, DataOffset, _),
 1085        close(In)),
 1086    detach_pack(Store, PackFile),
 1087    delete_file(PackFile).
 1088
 1089create_file(Store, In, obj(Object, _Type, _Size, FileSize), Offset0, Offset) :-
 1090    Offset is Offset0+FileSize,
 1091    gitty_object_file(Store, Object, Path),
 1092    with_mutex(gitty_file, exists_or_recreate(Path, Out)),
 1093        (   var(Out)
 1094        ->  true
 1095        ;   setup_call_cleanup(
 1096                seek(In, Offset0, bof, Offset0),
 1097                copy_stream_data(In, Out, FileSize),
 1098                close(Out))
 1099        ).
 1100
 1101exists_or_recreate(Path, _Out) :-
 1102    exists_file(Path),
 1103    !.
 1104exists_or_recreate(Path, Out) :-
 1105    file_directory_name(Path, Dir),
 1106    make_directory_path(Dir),
 1107    open(Path, write, Out, [type(binary), lock(write)]).
 1108
 1109
 1110%!  remove_objects_after_pack(+Store, +Objects, +Options) is det.
 1111%
 1112%   Remove the indicated (file) objects from Store.
 1113
 1114remove_objects_after_pack(Store, Objects, Options) :-
 1115    debug(gitty(pack), 'Deleting plain files', []),
 1116    maplist(delete_object(Store), Objects),
 1117    (   option(prune_empty_directories(true), Options, true)
 1118    ->  debug(gitty(pack), 'Pruning empty directories', []),
 1119        prune_empty_directories(Store)
 1120    ;   true
 1121    ).
 1122
 1123%!  remove_repacked_packs(+Store, +Packs, +Options)
 1124%
 1125%   Remove packs that have been repacked.
 1126
 1127remove_repacked_packs(Store, Packs, Options) :-
 1128    maplist(remove_pack(Store, Options), Packs).
 1129
 1130remove_pack(Store, _Options, Pack) :-
 1131    detach_pack(Store, Pack),
 1132    delete_file(Pack).
 1133
 1134%!  prune_empty_directories(+Dir) is det.
 1135%
 1136%   Prune directories that are  empty  below   Dir.  Dir  itself  is not
 1137%   removed, even if it is empty.
 1138
 1139prune_empty_directories(Dir) :-
 1140    prune_empty_directories(Dir, 0).
 1141
 1142prune_empty_directories(Dir, Level) :-
 1143    directory_files(Dir, AllFiles),
 1144    exclude(hidden, AllFiles, Files),
 1145    (   Files == [],
 1146        Level > 0
 1147    ->  delete_directory_async(Dir)
 1148    ;   convlist(prune_empty_directories(Dir, Level), Files, Left),
 1149        (   Left == [],
 1150            Level > 0
 1151        ->  delete_directory_async(Dir)
 1152        ;   true
 1153        )
 1154    ).
 1155
 1156hidden(.).
 1157hidden(..).
 1158
 1159prune_empty_directories(Parent, Level0, File, _) :-
 1160    directory_file_path(Parent, File, Path),
 1161    exists_directory(Path),
 1162    !,
 1163    Level is Level0 + 1,
 1164    prune_empty_directories(Path, Level),
 1165    fail.
 1166prune_empty_directories(_, _, File, File).
 1167
 1168delete_directory_async(Dir) :-
 1169    with_mutex(gitty_file, delete_directory_async2(Dir)).
 1170
 1171delete_directory_async2(Dir) :-
 1172    catch(delete_directory(Dir), E,
 1173          (   \+ exists_directory(Dir)
 1174          ->  true
 1175          ;   \+ empty_directory(Dir)
 1176          ->  true
 1177          ;   throw(E)
 1178          )).
 1179
 1180empty_directory(Dir) :-
 1181    directory_files(Dir, AllFiles),
 1182    exclude(hidden, AllFiles, []).
 1183
 1184
 1185		 /*******************************
 1186		 *        REDIS PRIMITIVES	*
 1187		 *******************************/
 1188
 1189redis_head_db(Store, DB, Key) :-
 1190    redis_db(Store, DB, Prefix),
 1191    string_concat(Prefix, ":gitty:head", Key).
 1192
 1193%!  redis_file(+Store, ?Name, ?Ext, ?Hash)
 1194
 1195redis_file(Store, Name, Ext, Hash) :-
 1196    nonvar(Name),
 1197    !,
 1198    file_name_extension(_Base, Ext, Name),
 1199    redis_head_db(Store, DB, Heads),
 1200    redis(DB, hget(Heads, Name), Hash as atom).
 1201redis_file(Store, Name, Ext, Hash) :-
 1202    nonvar(Ext),
 1203    !,
 1204    string_concat("*.", Ext, Pattern),
 1205    redis_head_db(Store, DB, Heads),
 1206    redis_hscan(DB, Heads, LazyList, [match(Pattern)]),
 1207    member(NameS-HashS, LazyList),
 1208    atom_string(Name, NameS),
 1209    atom_string(Hash, HashS).
 1210redis_file(Store, Name, Ext, Hash) :-
 1211    nonvar(Hash),
 1212    !,
 1213    load_plain_commit(Store, Hash, Commit),
 1214    Name = Commit.name,
 1215    file_name_extension(_Base, Ext, Name).
 1216redis_file(Store, Name, Ext, Hash) :-
 1217    redis_head_db(Store, DB, Heads),
 1218    redis(DB, hgetall(Heads), Pairs as pairs(atom,atom)),
 1219    member(Name-Hash, Pairs),
 1220    file_name_extension(_Base, Ext, Name).
 1221
 1222%!  redis_ensure_heads(+Store)
 1223%
 1224%   Ensure the redis db contains a  hashmap   mapping  all file names to
 1225%   their head hashes.
 1226
 1227redis_ensure_heads(Store) :-
 1228    redis_head_db(Store, DB, Key),
 1229    redis(DB, exists(Key), 1),
 1230    !.
 1231redis_ensure_heads(Store) :-
 1232    redis_head_db(Store, DB, Key),
 1233    debug(gitty(redis), 'Initializing gitty heads in ~p ...', [Key]),
 1234    gitty_scan_latest(Store),
 1235    forall(retract(latest(Name, Hash, _Time)),
 1236           redis(DB, hset(Key, Name, Hash))),
 1237    debug(gitty(redis), '... finished gitty heads', []).
 1238
 1239%!  redis_update_head(+Store, +Name, +OldCommit, +NewCommit, +DataHash)
 1240
 1241redis_update_head(Store, Name, -, NewCommit, DataHash) :-
 1242    !,
 1243    redis_head_db(Store, DB, Key),
 1244    redis(DB, hset(Key, Name, NewCommit)),
 1245    publish_objects(Store, [NewCommit, DataHash]).
 1246redis_update_head(Store, Name, OldCommit, NewCommit, DataHash) :-
 1247    redis_head_db(Store, DB, Key),
 1248    redis_hcas(DB, Key, Name, OldCommit, NewCommit),
 1249    publish_objects(Store, [NewCommit, DataHash]).
 1250
 1251%!  redis_delete_head(Store, Head) is det.
 1252%
 1253%   Unregister Head
 1254
 1255redis_delete_head(Store, Head) :-
 1256    redis_head_db(Store, DB, Key),
 1257    redis(DB, hdel(Key, Head)).
 1258
 1259%!  redis_set_head(+Store, +File, +Hash) is det.
 1260
 1261redis_set_head(Store, File, Hash) :-
 1262    redis_head_db(Store, DB, Key),
 1263    redis(DB, hset(Key, File, Hash)).
 1264
 1265		 /*******************************
 1266		 *           REPLICATE		*
 1267		 *******************************/
 1268
 1269%!  redis_replicate_get(+Store, +Hash)
 1270%
 1271%   Try to get an object from another   SWISH  server in the network. We
 1272%   implement replication using the PUB/SUB protocol   of Redis. This is
 1273%   not ideal as this route of the   synchronisation is only used if for
 1274%   some reason this server lacks some object. This typically happens if
 1275%   this node is new to the cluster or has been offline for a long time.
 1276%   In a large cluster, most nodes  will   have  the objects and each of
 1277%   them will send the object around. A consumer group based solution is
 1278%   not ideal either, as the message may  be   picked  up by a node that
 1279%   does not have this object, after which  we need the failure recovery
 1280%   protocol to get it right. This  is   particularly  the case with two
 1281%   nodes, where we have a fair chance to have be requested for the hash
 1282%   we miss ourselves.
 1283%
 1284%   We could improve on this two ways: (1)   put the hash published in a
 1285%   short-lived key on Redis and make others  check that. That is likely
 1286%   to avoid many nodes sending the  same   object  or  (2) see how many
 1287%   nodes are in the pool and switch  to a consumer group based approach
 1288%   if this number is  high  (and  thus   we  are  unlikely  to be asked
 1289%   ourselves for the missing hash).
 1290%
 1291%   @see publish_objects/2 for the incremental replication
 1292
 1293:- multifile
 1294    swish_redis:stream/2. 1295
 1296swish_redis:stream('gitty:replicate', [maxlen(100)]).
 1297
 1298:- listen(http(pre_server_start(_)),
 1299          init_replicator). 1300
 1301init_replicator :-
 1302    redis_swish_stream('gitty:replicate', ReplKey),
 1303    listen(redis(_Redis, ReplKey, _Id, Data),
 1304           replicate(Data)),
 1305    listen(redis(_, 'swish:gitty', Message),
 1306           gitty_message(Message)),
 1307    message_queue_create(_, [alias(gitty_queue)]).
 1308
 1309:- debug(gitty(replicate)). 1310
 1311gitty_message(discover(Hash)) :-
 1312    debug(gitty(replicate), 'Discover: ~p', [Hash]),
 1313    store(Store, _),
 1314    load_object_raw(Store, Hash, Data),
 1315    debug(gitty(replicate), 'Sending object ~p', [Hash]),
 1316    redis(swish, publish(swish:gitty, object(Hash, Data) as prolog)).
 1317gitty_message(object(Hash, Data)) :-
 1318    debug(gitty(replicate), 'Replicate: ~p', [Hash]),
 1319    redis_db(Store, _DB, _Prefix),
 1320    store_object_raw(Store, Hash, Data, New),
 1321    debug(gitty(replicate), 'Received object ~p (new=~p)', [Hash, New]),
 1322    (   New == true
 1323    ->  thread_send_message(gitty_queue, Hash)
 1324    ;   true
 1325    ).
 1326
 1327redis_replicate_get(_Store, Hash) :-
 1328    is_gitty_hash(Hash),
 1329    redis(swish, publish(swish:gitty, discover(Hash) as prolog), Count),
 1330    Count > 1,                          % If I'm alone it won't help :(
 1331    thread_get_message(gitty_queue, Hash,
 1332                       [ timeout(10)
 1333                       ]).
 1334
 1335
 1336%!  publish_objects(+Store, +Hashes)
 1337%
 1338%   Make the objects we just stored globally   known. These are added to
 1339%   the Redis stream gitty:replicate and received by replicate/1 below.
 1340%
 1341%   This realized eager  replication  as  opposed   to  the  above  code
 1342%   (redis_replicate_get/2)  which  performs  lazy   replication.  Eager
 1343%   replication ensure the object is  on   multiple  places in the event
 1344%   that the node on which it was saved dies shortly after.
 1345%
 1346%   Note that we also  receive  the  object   we  just  saved.  That  is
 1347%   unavoidable in a network where all nodes are equal.
 1348
 1349publish_objects(Store, Hashes) :-
 1350    redis_swish_stream('gitty:replicate', ReplKey),
 1351    maplist(publish_object(Store, ReplKey), Hashes).
 1352
 1353publish_object(Store, Stream, Hash) :-
 1354    load_object_raw(Store, Hash, Data),
 1355    debug(gitty(replicate), 'Sending ~p to ~p', [Hash, Stream]),
 1356    xadd(swish, Stream, _, _{hash:Hash, data:Data}).
 1357
 1358%!  replicate(+Data) is det.
 1359%
 1360%   Act on a message send to the  gitty:replicate stream. Add the object
 1361%   to our store unless we already have it. Note that we receive our own
 1362%   objects as well.
 1363
 1364replicate(Data) :-
 1365    redis_db(Store, _DB, _Prefix),
 1366    atom_string(Hash, Data.hash),
 1367    store_object_raw(Store, Hash, Data.data, _0New),
 1368    debug(gitty(replicate), 'Received object ~p (new=~p)',
 1369          [Hash, _0New]).
 1370
 1371
 1372		 /*******************************
 1373		 *         REDIS BASICS		*
 1374		 *******************************/
 1375
 1376%!  redis_hcas(+DB, +Hash, +Key, +Old, +New) is semidet.
 1377%
 1378%   Update Hash.Key to New provided the current value is Old.
 1379
 1380redis_hcas(DB, Hash, Key, Old, New) :-
 1381    redis(DB, eval("if redis.call('HGET', KEYS[1], ARGV[1]) == ARGV[2] then \c
 1382                      redis.call('HSET', KEYS[1],  ARGV[1], ARGV[3]); \c
 1383                      return 1; \c
 1384                      end; \c
 1385                    return 0\c
 1386                   ",
 1387                   1, Hash, Key, Old, New),
 1388          1)