View source with raw comments or as raw
    1/*  Part of SWISH
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2015-2020, VU University Amsterdam
    7                              CWI Amsterdam
    8                              SWI-Prolog Solutions b.v.
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(gitty_driver_files,
   38          [ gitty_open/2,               % +Store, +Options
   39            gitty_close/1,              % +Store
   40            gitty_file/4,               % +Store, ?Name, ?Ext, ?Hash
   41
   42            gitty_update_head/5,        % +Store, +Name, +OldCommit, +NewCommit
   43					% +DataHash
   44            delete_head/2,              % +Store, +Name
   45            set_head/3,                 % +Store, +Name, +Hash
   46            store_object/4,             % +Store, +Hash, +Header, +Data
   47            delete_object/2,            % +Store, +Hash
   48
   49            gitty_hash/2,               % +Store, ?Hash
   50            load_plain_commit/3,        % +Store, +Hash, -Meta
   51            load_object/5,              % +Store, +Hash, -Data, -Type, -Size
   52            gitty_object_file/3,        % +Store, +Hash, -File
   53
   54            repack_objects/2,           % +Store, +Options
   55            pack_objects/6,             % +Store, +Objs, +Packs, +PackDir,
   56                                        % -File, +Opts
   57            unpack_packs/1,             % +Store
   58            unpack_pack/2,              % +Store, +PackFile
   59
   60            attach_pack/2,              % +Store, +PackFile
   61            gitty_fsck/1,               % +Store
   62            fsck_pack/1,                % +PackFile
   63            load_object_from_pack/4,    % +Hash, -Data, -Type, -Size
   64
   65            gitty_rescan/1              % Store
   66          ]).   67:- use_module(library(apply)).   68:- use_module(library(zlib)).   69:- use_module(library(filesex)).   70:- use_module(library(lists)).   71:- use_module(library(apply)).   72:- use_module(library(error)).   73:- use_module(library(debug)).   74:- use_module(library(zlib)).   75:- use_module(library(hash_stream)).   76:- use_module(library(option)).   77:- use_module(library(dcg/basics)).   78:- use_module(library(redis)).   79:- use_module(library(redis_streams)).   80:- use_module(gitty, [is_gitty_hash/1]).   81
   82:- use_module(swish_redis).

Gitty plain files driver

This version of the driver uses plain files to store the gitty data. It consists of a nested directory structure with files named after the hash. Objects and hash computation is the same as for git. The heads (files) are computed on startup by scanning all objects. There is a file ref/head that is updated if a head is updated. Other clients can watch this file and update their notion of the head. This implies that the store can handle multiple clients that can access a shared file system, optionally shared using NFS from different machines.

The store is simple and robust. The main disadvantages are long startup times as the store holds more objects and relatively high disk usage due to rounding the small objects to disk allocation units.

bug
- Shared access does not work on Windows. */
  102:- dynamic
  103    head/4,                             % Store, Name, Ext, Hash
  104    store/2,                            % Store, Updated
  105    commit/3,                           % Store, Hash, Meta
  106    heads_input_stream_cache/2,         % Store, Stream
  107    pack_object/6,                      % Hash, Type, Size, Offset, Store,PackFile
  108    attached_packs/1,                   % Store
  109    attached_pack/2,                    % Store, PackFile
  110    redis_db/3.                         % Store, DB, Prefix
  111
  112:- volatile
  113    head/4,
  114    store/2,
  115    commit/3,
  116    heads_input_stream_cache/2,
  117    pack_object/6,
  118    attached_packs/1,
  119    attached_pack/2.  120
  121:- multifile
  122    gitty:check_object/4.  123
  124% enable/disable syncing remote servers running on  the same file store.
  125% This facility requires shared access to files and thus doesn't work on
  126% Windows.
  127
  128:- if(current_prolog_flag(windows, true)).  129remote_sync(false).
  130:- else.  131remote_sync(true).
  132:- endif.
 gitty_open(+Store, +Options) is det
Driver specific initialization. Handles setting up a Redis connection when requested. This processes:
redis(+DB)
Name of the redis DB to connect to. See redis_server/3.
redis_prefix(+Prefix)
Prefix for all keys. This can be used to host multiple SWISH servers on the same redis cluster. Default is swish.
  145gitty_open(Store, Options) :-
  146    option(redis(DB), Options),
  147    !,
  148    option(redis_prefix(Prefix), Options, swish),
  149    asserta(redis_db(Store, DB, Prefix)),
  150    thread_create(gitty_scan(Store), _, [detached(true)]).
  151gitty_open(_, _).
 gitty_close(+Store) is det
Close resources associated with a store.
  158gitty_close(Store) :-
  159    (   retract(heads_input_stream_cache(Store, In))
  160    ->  close(In)
  161    ;   true
  162    ),
  163    retractall(head(Store,_,_,_)),
  164    retractall(store(Store,_)),
  165    retractall(pack_object(_,_,_,_,Store,_)).
 gitty_file(+Store, ?File, ?Ext, ?Head) is nondet
True when File entry in the gitty store and Head is the HEAD revision.
  173gitty_file(Store, Head, Ext, Hash) :-
  174    redis_db(Store, _, _),
  175    !,
  176    gitty_scan(Store),
  177    redis_file(Store, Head, Ext, Hash).
  178gitty_file(Store, Head, Ext, Hash) :-
  179    gitty_scan(Store),
  180    head(Store, Head, Ext, Hash).
 load_plain_commit(+Store, +Hash, -Meta:dict) is semidet
Load the commit data as a dict. Loaded commits are cached in commit/3. Note that only adding a fact to the cache is synchronized. This means that during a race situation we may load the same object multiple times from disk, but this is harmless while a lock around the whole predicate serializes loading different objects, which is not needed.
  191load_plain_commit(Store, Hash, Meta) :-
  192    must_be(atom, Store),
  193    must_be(atom, Hash),
  194    commit(Store, Hash, Meta),
  195    !.
  196load_plain_commit(Store, Hash, Meta) :-
  197    load_object(Store, Hash, String, _, _),
  198    term_string(Meta0, String, []),
  199    with_mutex(gitty_commit_cache,
  200               assert_cached_commit(Store, Hash, Meta0)),
  201    Meta = Meta0.
  202
  203assert_cached_commit(Store, Hash, Meta) :-
  204    commit(Store, Hash, Meta0),
  205    !,
  206    assertion(Meta0 =@= Meta).
  207assert_cached_commit(Store, Hash, Meta) :-
  208    assertz(commit(Store, Hash, Meta)).
 store_object(+Store, +Hash, +Header:string, +Data:string) is det
Store the actual object. The store must associate Hash with the concatenation of Hdr and Data.
  215store_object(Store, Hash, _Hdr, _Data) :-
  216    pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
  217    !.
  218store_object(Store, Hash, Hdr, Data) :-
  219    gitty_object_file(Store, Hash, Path),
  220    with_mutex(gitty_file, exists_or_create(Path, Out0)),
  221    (   var(Out0)
  222    ->  true
  223    ;   setup_call_cleanup(
  224            zopen(Out0, Out, [format(gzip)]),
  225            format(Out, '~s~s', [Hdr, Data]),
  226            close(Out))
  227    ).
  228
  229exists_or_create(Path, _Out) :-
  230    exists_file(Path),
  231    !.
  232exists_or_create(Path, Out) :-
  233    file_directory_name(Path, Dir),
  234    make_directory_path(Dir),
  235    open(Path, write, Out, [encoding(utf8), lock(write)]).
  236
  237:- if(\+current_predicate(ensure_directory/1)).  238% in Library as of SWI-Prolog 9.1.20
  239ensure_directory(Dir) :-
  240    exists_directory(Dir),
  241    !.
  242ensure_directory(Dir) :-
  243    make_directory(Dir).
  244:- endif.
 store_object_raw(+Store, +Hash, +Bytes:string, -New) is det
Store an object from raw bytes. This is used for replicating objects.
  251store_object_raw(Store, Hash, _Bytes, false) :-
  252    pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
  253    !.
  254store_object_raw(Store, Hash, Bytes, New) :-
  255    gitty_object_file(Store, Hash, Path),
  256    with_mutex(gitty_file, exists_or_create(Path, Out)),
  257    (   var(Out)
  258    ->  New = false
  259    ;   call_cleanup(
  260            ( set_stream(Out, type(binary)),
  261              write(Out, Bytes)
  262            ),
  263            close(Out)),
  264        New = true
  265    ).
 load_object(+Store, +Hash, -Data, -Type, -Size) is det
Load the given object.
  271load_object(_Store, Hash, Data, Type, Size) :-
  272    load_object_from_pack(Hash, Data0, Type0, Size0),
  273    !,
  274    f(Data0, Type0, Size0) = f(Data, Type, Size).
  275load_object(Store, Hash, Data, Type, Size) :-
  276    load_object_file(Store, Hash, Data0, Type0, Size0),
  277    !,
  278    f(Data0, Type0, Size0) = f(Data, Type, Size).
  279load_object(Store, Hash, Data, Type, Size) :-
  280    redis_db(Store, _, _),
  281    redis_replicate_get(Store, Hash),
  282    load_object_file(Store, Hash, Data, Type, Size).
  283
  284load_object_file(Store, Hash, Data, Type, Size) :-
  285    gitty_object_file(Store, Hash, Path),
  286    exists_file(Path),
  287    !,
  288    setup_call_cleanup(
  289        gzopen(Path, read, In, [encoding(utf8)]),
  290        read_object(In, Data, Type, Size),
  291        close(In)).
 load_object_raw(+Store, +Hash, -Data)
Load the compressed data for an object. Intended for replication.
  297load_object_raw(_Store, Hash, Bytes) :-
  298    load_object_from_pack(Hash, Data, Type, Size),
  299    !,
  300    object_bytes(Type, Size, Data, Bytes).
  301load_object_raw(Store, Hash, Data) :-
  302    gitty_object_file(Store, Hash, Path),
  303    exists_file(Path),
  304    !,
  305    setup_call_cleanup(
  306        open(Path, read, In, [type(binary)]),
  307        read_string(In, _, Data),
  308        close(In)).
 object_bytes(+Type, +Size, +Data, -Bytes) is det
Encode an object with the given parameters in memory.
  314object_bytes(Type, Size, Data, Bytes) :-
  315    setup_call_cleanup(
  316        new_memory_file(MF),
  317        ( setup_call_cleanup(
  318              open_memory_file(MF, write, Out, [encoding(octet)]),
  319              setup_call_cleanup(
  320                  zopen(Out, ZOut, [format(gzip), close_parent(false)]),
  321                  ( set_stream(ZOut, encoding(utf8)),
  322                    format(ZOut, '~w ~d\u0000~w', [Type, Size, Data])
  323                  ),
  324                  close(ZOut)),
  325              close(Out)),
  326          memory_file_to_string(MF, Bytes, octet)
  327        ),
  328        free_memory_file(MF)).
 load_object_header(+Store, +Hash, -Type, -Size) is det
Load the header of an object
  335load_object_header(Store, Hash, Type, Size) :-
  336    gitty_object_file(Store, Hash, Path),
  337    setup_call_cleanup(
  338        gzopen(Path, read, In, [encoding(utf8)]),
  339        read_object_hdr(In, Type, Size),
  340        close(In)).
  341
  342read_object(In, Data, Type, Size) :-
  343    read_object_hdr(In, Type, Size),
  344    read_string(In, _, Data).
  345
  346read_object_hdr(In, Type, Size) :-
  347    get_code(In, C0),
  348    read_hdr(C0, In, Hdr),
  349    phrase((nonblanks(TypeChars), " ", integer(Size)), Hdr),
  350    atom_codes(Type, TypeChars).
  351
  352read_hdr(C, In, [C|T]) :-
  353    C > 0,
  354    !,
  355    get_code(In, C1),
  356    read_hdr(C1, In, T).
  357read_hdr(_, _, []).
 gitty_rescan(?Store) is det
Update our view of the shared storage for all stores matching Store.
  364gitty_rescan(Store) :-
  365    retractall(store(Store, _)).
 gitty_scan(+Store) is det
Scan gitty store for files (entries), filling head/3. This is performed lazily at first access to the store.

@tdb Possibly we need to maintain a cached version of this index to avoid having to open all objects of the gitty store.

  376gitty_scan(Store) :-
  377    store(Store, _),
  378    !,
  379    remote_updates(Store).
  380gitty_scan(Store) :-
  381    with_mutex(gitty, gitty_scan_sync(Store)).
  382
  383:- thread_local
  384    latest/3.  385
  386gitty_scan_sync(Store) :-
  387    store(Store, _),
  388    !.
  389gitty_scan_sync(Store) :-
  390    redis_db(Store, _, _),
  391    !,
  392    gitty_attach_packs(Store),
  393    redis_ensure_heads(Store),
  394    get_time(Now),
  395    assertz(store(Store, Now)).
  396:- if(remote_sync(true)).  397gitty_scan_sync(Store) :-
  398    remote_sync(true),
  399    !,
  400    gitty_attach_packs(Store),
  401    restore_heads_from_remote(Store).
  402:- endif.  403gitty_scan_sync(Store) :-
  404    gitty_attach_packs(Store),
  405    read_heads_from_objects(Store).
 read_heads_from_objects(+Store) is det
Establish the head(Store,File,Ext,Hash) relation by reading all objects and adding a fact for the most recent commit.
  412read_heads_from_objects(Store) :-
  413    gitty_scan_latest(Store),
  414    forall(retract(latest(Name, Hash, _Time)),
  415           assert_head(Store, Name, Hash)),
  416    get_time(Now),
  417    assertz(store(Store, Now)).
  418
  419assert_head(Store, Name, Hash) :-
  420    file_name_extension(_, Ext, Name),
  421    assertz(head(Store, Name, Ext, Hash)).
 gitty_scan_latest(+Store)
Scans the gitty store, extracting the latest version of each named entry.
  429gitty_scan_latest(Store) :-
  430    retractall(head(Store, _, _, _)),
  431    retractall(latest(_, _, _)),
  432    (   gitty_hash(Store, Hash),
  433        load_object(Store, Hash, Data, commit, _Size),
  434        term_string(Meta, Data, []),
  435        _{name:Name, time:Time} :< Meta,
  436        (   latest(Name, _, OldTime),
  437            OldTime > Time
  438        ->  true
  439        ;   retractall(latest(Name, _, _)),
  440            assertz(latest(Name, Hash, Time))
  441        ),
  442        fail
  443    ;   true
  444    ).
 gitty_hash(+Store, ?Hash) is nondet
True when Hash is an object in the store.
  451gitty_hash(Store, Hash) :-
  452    var(Hash),
  453    !,
  454    (   gitty_attach_packs(Store),
  455        pack_object(Hash, _Type, _Size, _Offset, Store, _Pack)
  456    ;   gitty_file_object(Store, Hash)
  457    ).
  458gitty_hash(Store, Hash) :-
  459    (   gitty_attach_packs(Store),
  460        pack_object(Hash, _Type, _Size, _Offset, Store, _Pack)
  461    ->  true
  462    ;   gitty_object_file(Store, Hash, File),
  463        exists_file(File)
  464    ).
  465
  466gitty_file_object(Store, Hash) :-
  467    access_file(Store, exist),
  468    directory_files(Store, Level0),
  469    member(E0, Level0),
  470    E0 \== '..',
  471    atom_length(E0, 2),
  472    directory_file_path(Store, E0, Dir0),
  473    directory_files(Dir0, Level1),
  474    member(E1, Level1),
  475    E1 \== '..',
  476    atom_length(E1, 2),
  477    directory_file_path(Dir0, E1, Dir),
  478    directory_files(Dir, Files),
  479    member(File, Files),
  480    atom_length(File, 36),
  481    atomic_list_concat([E0,E1,File], Hash).
 delete_object(+Store, +Hash)
Delete an existing object
  487delete_object(Store, Hash) :-
  488    gitty_object_file(Store, Hash, File),
  489    delete_file(File).
 gitty_object_file(+Store, +Hash, -Path) is det
True when Path is the file at which the object with Hash is stored.
  496gitty_object_file(Store, Hash, Path) :-
  497    sub_string(Hash, 0, 2, _, Dir0),
  498    sub_string(Hash, 2, 2, _, Dir1),
  499    sub_string(Hash, 4, _, 0, File),
  500    atomic_list_concat([Store, Dir0, Dir1, File], /, Path).
  501
  502
  503                 /*******************************
  504                 *            SYNCING           *
  505                 *******************************/
 gitty_update_head(+Store, +Name, +OldCommit, +NewCommit, +DataHash) is det
Update the head of a gitty store for Name. OldCommit is the current head and NewCommit is the new head. If Name is created, and thus there is no head, OldCommit must be -.

This operation can fail because another writer has updated the head. This can both be in-process or another process.

  517gitty_update_head(Store, Name, OldCommit, NewCommit, DataHash) :-
  518    redis_db(Store, _, _),
  519    !,
  520    redis_update_head(Store, Name, OldCommit, NewCommit, DataHash).
  521gitty_update_head(Store, Name, OldCommit, NewCommit, _) :-
  522    with_mutex(gitty,
  523               gitty_update_head_sync(Store, Name, OldCommit, NewCommit)).
  524
  525:- if(remote_sync(true)).  526gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :-
  527    remote_sync(true),
  528    !,
  529    setup_call_cleanup(
  530        heads_output_stream(Store, HeadsOut),
  531        gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut),
  532        close(HeadsOut)).
  533:- endif.  534gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :-
  535    gitty_update_head_sync2(Store, Name, OldCommit, NewCommit).
  536
  537gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut) :-
  538    gitty_update_head_sync2(Store, Name, OldCommit, NewCommit),
  539    format(HeadsOut, '~q.~n', [head(Name, OldCommit, NewCommit)]).
  540
  541gitty_update_head_sync2(Store, Name, OldCommit, NewCommit) :-
  542    gitty_scan(Store),              % fetch remote changes
  543    (   OldCommit == (-)
  544    ->  (   head(Store, Name, _, _)
  545        ->  throw(error(gitty(file_exists(Name),_)))
  546        ;   assert_head(Store, Name, NewCommit)
  547        )
  548    ;   (   retract(head(Store, Name, _, OldCommit))
  549        ->  assert_head(Store, Name, NewCommit)
  550        ;   throw(error(gitty(not_at_head(Name, OldCommit)), _))
  551        )
  552    ).
 remote_updates(+Store)
Watch for remote updates to the store. We only do this if we did not do so the last second.
  559:- dynamic
  560    last_remote_sync/2.  561
  562:- if(remote_sync(false)).  563remote_updates(_) :-
  564    remote_sync(false),
  565    !.
  566:- endif.  567remote_updates(Store) :-
  568    remote_up_to_data(Store),
  569    !.
  570remote_updates(Store) :-
  571    with_mutex(gitty, remote_updates_sync(Store)).
  572
  573remote_updates_sync(Store) :-
  574    remote_up_to_data(Store),
  575    !.
  576remote_updates_sync(Store) :-
  577    retractall(last_remote_sync(Store, _)),
  578    get_time(Now),
  579    asserta(last_remote_sync(Store, Now)),
  580    remote_update(Store).
  581
  582remote_up_to_data(Store) :-
  583    last_remote_sync(Store, Last),
  584    get_time(Now),
  585    Now-Last < 1.
  586
  587remote_update(Store) :-
  588    remote_updates(Store, List),
  589    maplist(update_head(Store), List).
  590
  591update_head(Store, head(Name, OldCommit, NewCommit)) :-
  592    (   OldCommit == (-)
  593    ->  \+ head(Store, Name, _, _)
  594    ;   retract(head(Store, Name, _, OldCommit))
  595    ),
  596    !,
  597    assert_head(Store, Name, NewCommit).
  598update_head(_, _).
 remote_updates(+Store, -List) is det
Find updates from other gitties on the same filesystem. Note that we have to push/pop the input context to avoid creating a notion of an input context which possibly relate messages incorrectly to the sync file.
  607remote_updates(Store, List) :-
  608    heads_input_stream(Store, Stream),
  609    setup_call_cleanup(
  610        '$push_input_context'(gitty_sync),
  611        read_new_terms(Stream, List),
  612        '$pop_input_context').
  613
  614read_new_terms(Stream, Terms) :-
  615    read(Stream, First),
  616    read_new_terms(First, Stream, Terms).
  617
  618read_new_terms(end_of_file, _, List) :-
  619    !,
  620    List = [].
  621read_new_terms(Term, Stream, [Term|More]) :-
  622    read(Stream, Term2),
  623    read_new_terms(Term2, Stream, More).
  624
  625heads_output_stream(Store, Out) :-
  626    heads_file(Store, HeadsFile),
  627    open(HeadsFile, append, Out,
  628         [ encoding(utf8),
  629           lock(exclusive)
  630         ]).
  631
  632heads_input_stream(Store, Stream) :-
  633    heads_input_stream_cache(Store, Stream0),
  634    !,
  635    Stream = Stream0.
  636heads_input_stream(Store, Stream) :-
  637    heads_file(Store, File),
  638    between(1, 2, _),
  639    catch(open(File, read, In,
  640               [ encoding(utf8),
  641                 eof_action(reset)
  642               ]),
  643          _,
  644          create_heads_file(Store)),
  645    !,
  646    assert(heads_input_stream_cache(Store, In)),
  647    Stream = In.
  648
  649create_heads_file(Store) :-
  650    call_cleanup(
  651        heads_output_stream(Store, Out),
  652        close(Out)),
  653    fail.                                   % always fail!
  654
  655heads_file(Store, HeadsFile) :-
  656    ensure_directory(Store),
  657    directory_file_path(Store, ref, RefDir),
  658    ensure_directory(RefDir),
  659    directory_file_path(RefDir, head, HeadsFile).
 restore_heads_from_remote(Store)
Restore the known heads by reading the remote sync file.
  665restore_heads_from_remote(Store) :-
  666    heads_file(Store, File),
  667    exists_file(File),
  668    setup_call_cleanup(
  669        open(File, read, In, [encoding(utf8)]),
  670        restore_heads(Store, In),
  671        close(In)),
  672    !,
  673    get_time(Now),
  674    assertz(store(Store, Now)).
  675restore_heads_from_remote(Store) :-
  676    read_heads_from_objects(Store),
  677    heads_file(Store, File),
  678    setup_call_cleanup(
  679        open(File, write, Out, [encoding(utf8)]),
  680        save_heads(Store, Out),
  681        close(Out)),
  682    !.
  683
  684restore_heads(Store, In) :-
  685    read(In, Term0),
  686    Term0 = epoch(_),
  687    read(In, Term1),
  688    restore_heads(Term1, In, Store).
  689
  690restore_heads(end_of_file, _, _) :- !.
  691restore_heads(head(File, _, Hash), In, Store) :-
  692    retractall(head(Store, File, _, _)),
  693    assert_head(Store, File, Hash),
  694    read(In, Term),
  695    restore_heads(Term, In, Store).
  696
  697save_heads(Store, Out) :-
  698    get_time(Now),
  699    format(Out, 'epoch(~0f).~n~n', [Now]),
  700    forall(head(Store, File, _, Hash),
  701           format(Out, '~q.~n', [head(File, -, Hash)])).
 delete_head(+Store, +Head) is det
Delete Head from Store. Used by gitty_fsck/1 to remove heads that have no commits. Should we forward this to remotes, or should they do their own thing?
  710delete_head(Store, Head) :-
  711    redis_db(Store, _, _),
  712    !,
  713    redis_delete_head(Store, Head).
  714delete_head(Store, Head) :-
  715    retractall(head(Store, Head, _, _)).
 set_head(+Store, +File, +Hash) is det
Set the head of the given File to Hash
  721set_head(Store, File, Hash) :-
  722    redis_db(Store, _, _),
  723    !,
  724    redis_set_head(Store, File, Hash).
  725set_head(Store, File, Hash) :-
  726    file_name_extension(_, Ext, File),
  727    (   head(Store, File, _, Hash0)
  728    ->  (   Hash == Hash0
  729        ->  true
  730        ;   asserta(head(Store, File, Ext, Hash)),
  731            retractall(head(Store, File, _, Hash0))
  732        )
  733    ;   asserta(head(Store, File, Ext, Hash))
  734    ).
  735
  736
  737                 /*******************************
  738                 *            PACKS             *
  739                 *******************************/
  740
  741/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  742
  743<pack file> := <header>
  744               <file>*
  745<header>    := "gitty(Version).\n" <object>* "end_of_header.\n"
  746<object>    := obj(Hash, Type, Size, FileSize)
  747- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
  748
  749pack_version(1).
 repack_objects(+Store, +Options) is det
Repack objects of Store for reduced disk usage and enhanced performance. By default this picks up all file objects of the store and all existing small pack files. Options:
small_pack(+Bytes)
Consider all packs with less than Bytes as small and repack them. Default 10Mb
min_files(+Count)
Do not repack if there are less than Count new files. Default 1,000.
  764:- debug(gitty(pack)).  765
  766repack_objects(Store, Options) :-
  767    option(min_files(MinFiles), Options, 1_000),
  768    findall(Object, gitty_file_object(Store, Object), Objects),
  769    length(Objects, NewFiles),
  770    debug(gitty(pack), 'Found ~D file objects', [NewFiles]),
  771    (   NewFiles >= MinFiles
  772    ->  pack_files(Store, ExistingPacks),
  773        option(small_pack(MaxSize), Options, 10_000_000),
  774        include(small_file(MaxSize), ExistingPacks, PackFiles),
  775        (   debugging(gitty(pack))
  776        ->  length(PackFiles, PackCount),
  777            debug(gitty(pack), 'Found ~D small packs', [PackCount])
  778        ;   true
  779        ),
  780        directory_file_path(Store, pack, PackDir),
  781        make_directory_path(PackDir),
  782        pack_objects(Store, Objects, PackFiles, PackDir, _PackFile, Options)
  783    ;   debug(gitty(pack), 'Nothing to do', [])
  784    ).
  785
  786small_file(MaxSize, File) :-
  787    size_file(File, Size),
  788    Size < MaxSize.
 pack_objects(+Store, +Objects, +Packs, +PackDir, -PackFile, +Options) is det
Pack the given objects and pack files into a new pack.
  795pack_objects(Store, Objects, Packs, PackDir, PackFile, Options) :-
  796    with_mutex(gitty_pack,
  797               pack_objects_sync(Store, Objects, Packs, PackDir,
  798                                 PackFile, Options)).
  799
  800pack_objects_sync(_Store, [], [Pack], _, [Pack], _) :-
  801    !.
  802pack_objects_sync(Store, Objects, Packs, PackDir, PackFilePath, Options) :-
  803    length(Objects, ObjCount),
  804    length(Packs, PackCount),
  805    debug(gitty(pack), 'Repacking ~D objects and ~D packs',
  806          [ObjCount, PackCount]),
  807    maplist(object_info(Store), Objects, FileInfo),
  808    maplist(pack_info(Store), Packs, PackInfo),
  809    append([FileInfo|PackInfo], Info0),
  810    sort(1, @<, Info0, Info),           % remove possible duplicates
  811    (   debugging(gitty(pack))
  812    ->  (   PackCount > 0
  813        ->  length(Info, FinalObjCount),
  814            debug(gitty(pack), 'Total ~D objects', [FinalObjCount])
  815        ;   true
  816        )
  817    ;   true
  818    ),
  819    directory_file_path(PackDir, 'pack-create', TmpPack),
  820    setup_call_cleanup(
  821        (   open(TmpPack, write, Out0, [type(binary), lock(write)]),
  822            open_hash_stream(Out0, Out, [algorithm(sha1)])
  823        ),
  824        (   write_signature(Out),
  825            maplist(write_header(Out), Info),
  826            format(Out, 'end_of_header.~n', []),
  827            maplist(add_file(Out, Store), Info),
  828            stream_hash(Out, SHA1)
  829        ),
  830        close(Out)),
  831    format(atom(PackFile), 'pack-~w.pack', [SHA1]),
  832    directory_file_path(PackDir, PackFile, PackFilePath),
  833    rename_file(TmpPack, PackFilePath),
  834    debug(gitty(pack), 'Attaching ~p', [PackFilePath]),
  835    attach_pack(Store, PackFilePath),
  836    remove_objects_after_pack(Store, Objects, Options),
  837    delete(Packs, PackFilePath, RmPacks),
  838    remove_repacked_packs(Store, RmPacks, Options),
  839    debug(gitty(pack), 'Packing completed', []).
  840
  841object_info(Store, Object, obj(Object, Type, Size, FileSize)) :-
  842    gitty_object_file(Store, Object, File),
  843    load_object_header(Store, Object, Type, Size),
  844    size_file(File, FileSize).
  845
  846pack_info(Store, PackFile, Objects) :-
  847    attach_pack(Store, PackFile),
  848    pack_read_header(PackFile, _Version, _DataOffset, Objects).
  849
  850write_signature(Out) :-
  851    pack_version(Version),
  852    format(Out, "gitty(~d).~n", [Version]).
  853
  854write_header(Out, obj(Object, Type, Size, FileSize)) :-
  855    format(Out, 'obj(~q,~q,~d,~d).~n', [Object, Type, Size, FileSize]).
 add_file(+Out, +Store, +Object) is det
Add Object from Store to the pack stream Out.
  861add_file(Out, Store, obj(Object, _Type, _Size, _FileSize)) :-
  862    gitty_object_file(Store, Object, File),
  863    exists_file(File),
  864    !,
  865    setup_call_cleanup(
  866        open(File, read, In, [type(binary)]),
  867        copy_stream_data(In, Out),
  868        close(In)).
  869add_file(Out, Store, obj(Object, Type, Size, FileSize)) :-
  870    pack_object(Object, Type, Size, Offset, Store, PackFile),
  871    setup_call_cleanup(
  872        open(PackFile, read, In, [type(binary)]),
  873        (   seek(In, Offset, bof, Offset),
  874            copy_stream_data(In, Out, FileSize)
  875        ),
  876        close(In)).
 gitty_fsck(+Store) is det
Validate all packs associated with Store
  883gitty_fsck(Store) :-
  884    pack_files(Store, PackFiles),
  885    maplist(fsck_pack, PackFiles).
 fsck_pack(+File) is det
Validate the integrity of the pack file File.
  891fsck_pack(File) :-
  892    debug(gitty(pack), 'fsck ~p', [File]),
  893    check_pack_hash(File),
  894    debug(gitty(pack), 'fsck ~p: checking objects', [File]),
  895    check_pack_objects(File),
  896    debug(gitty(pack), 'fsck ~p: done', [File]).
  897
  898check_pack_hash(File) :-
  899    file_base_name(File, Base),
  900    file_name_extension(Plain, Ext, Base),
  901    must_be(oneof([pack]), Ext),
  902    atom_concat('pack-', Hash, Plain),
  903    setup_call_cleanup(
  904        (   open(File, read, In0, [type(binary)]),
  905            open_hash_stream(In0, In, [algorithm(sha1)])
  906        ),
  907        (   setup_call_cleanup(
  908                open_null_stream(Null),
  909                copy_stream_data(In, Null),
  910                close(Null)),
  911            stream_hash(In, SHA1)
  912        ),
  913        close(In)),
  914    assertion(Hash == SHA1).
  915
  916check_pack_objects(PackFile) :-
  917    setup_call_cleanup(
  918        open(PackFile, read, In, [type(binary)]),
  919        (  read_header(In, Version, DataOffset, Objects),
  920           set_stream(In, encoding(utf8)),
  921           foldl(check_object(In, PackFile, Version), Objects, DataOffset, _)
  922        ),
  923        close(In)).
  924
  925check_object(In, PackFile, _Version,
  926             obj(Object, Type, Size, FileSize),
  927             Offset0, Offset) :-
  928    Offset is Offset0+FileSize,
  929    byte_count(In, Here),
  930    (   Here == Offset0
  931    ->  true
  932    ;   print_message(warning, pack(reposition(Here, Offset0))),
  933        seek(In, Offset0, bof, Offset0)
  934    ),
  935    (   setup_call_cleanup(
  936            zopen(In, In2, [multi_part(false), close_parent(false)]),
  937            catch(read_object(In2, Data, _0RType, _0RSize), E,
  938                  ( print_message(error,
  939                                  gitty(PackFile, fsck(read_object(Object, E)))),
  940                    fail)),
  941            close(In2))
  942    ->  byte_count(In, End),
  943        (   End == Offset
  944        ->  true
  945        ;   print_message(error,
  946                          gitty(PackFile, fsck(object_end(Object, End,
  947                                                          Offset0, Offset,
  948                                                          Data))))
  949        ),
  950        assertion(Type == _0RType),
  951        assertion(Size == _0RSize),
  952        gitty:check_object(Object, Data, Type, Size)
  953    ;   true
  954    ).
 gitty_attach_packs(+Store) is det
Attach all packs for Store
  961gitty_attach_packs(Store) :-
  962    attached_packs(Store),
  963    !.
  964gitty_attach_packs(Store) :-
  965    with_mutex(gitty_attach_packs,
  966               gitty_attach_packs_sync(Store)).
  967
  968gitty_attach_packs_sync(Store) :-
  969    attached_packs(Store),
  970    !.
  971gitty_attach_packs_sync(Store) :-
  972    pack_files(Store, PackFiles),
  973    maplist(attach_pack(Store), PackFiles),
  974    asserta(attached_packs(Store)).
  975
  976pack_files(Store, Packs) :-
  977    directory_file_path(Store, pack, PackDir),
  978    exists_directory(PackDir),
  979    !,
  980    directory_files(PackDir, Files),
  981    convlist(is_pack(PackDir), Files, Packs).
  982pack_files(_, []).
  983
  984is_pack(PackDir, File, Path) :-
  985    file_name_extension(_, pack, File),
  986    directory_file_path(PackDir, File, Path).
 attach_pack(+Store, +PackFile)
Load the index of Pack into memory.
  992attach_pack(Store, PackFile) :-
  993    attached_pack(Store, PackFile),
  994    !.
  995attach_pack(Store, PackFile) :-
  996    retractall(pack_object(_,_,_,_,_,PackFile)),
  997    pack_read_header(PackFile, Version, DataOffset, Objects),
  998    foldl(assert_object(Store, PackFile, Version), Objects, DataOffset, _),
  999    assertz(attached_pack(Store, PackFile)).
 1000
 1001pack_read_header(PackFile, Version, DataOffset, Objects) :-
 1002    setup_call_cleanup(
 1003        open(PackFile, read, In, [type(binary)]),
 1004        read_header(In, Version, DataOffset, Objects),
 1005        close(In)).
 1006
 1007read_header(In, Version, DataOffset, Objects) :-
 1008    read(In, Signature),
 1009    (   Signature = gitty(Version)
 1010    ->  true
 1011    ;   domain_error(gitty_pack_file, Objects)
 1012    ),
 1013    read(In, Term),
 1014    read_index(Term, In, Objects),
 1015    get_code(In, Code),
 1016    assertion(Code == 0'\n),
 1017    byte_count(In, DataOffset).
 1018
 1019read_index(end_of_header, _, []) :-
 1020    !.
 1021read_index(Object, In, [Object|T]) :-
 1022    read(In, Term2),
 1023    read_index(Term2, In, T).
 1024
 1025assert_object(Store, Pack, _Version,
 1026              obj(Object, Type, Size, FileSize),
 1027              Offset0, Offset) :-
 1028    Offset is Offset0+FileSize,
 1029    assertz(pack_object(Object, Type, Size, Offset0, Store, Pack)).
 detach_pack(+Store, +Pack) is det
Remove a pack file from the memory index.
 1035detach_pack(Store, Pack) :-
 1036    retractall(pack_object(_, _, _, _, Store, Pack)),
 1037    retractall(attached_pack(Store, Pack)).
 load_object_from_pack(+Hash, -Data, -Type, -Size) is semidet
True when Hash is in a pack and can be loaded.
 1043load_object_from_pack(Hash, Data, Type, Size) :-
 1044    pack_object(Hash, Type, Size, Offset, _Store, Pack),
 1045    setup_call_cleanup(
 1046        open(Pack, read, In, [type(binary)]),
 1047        read_object_at(In, Offset, Data, Type, Size),
 1048        close(In)).
 1049
 1050read_object_at(In, Offset, Data, Type, Size) :-
 1051    seek(In, Offset, bof, Offset),
 1052    read_object_here(In, Data, Type, Size).
 1053
 1054read_object_here(In, Data, Type, Size) :-
 1055    stream_property(In, encoding(Enc)),
 1056    setup_call_cleanup(
 1057        ( set_stream(In, encoding(utf8)),
 1058          zopen(In, In2, [multi_part(false), close_parent(false)])
 1059        ),
 1060        read_object(In2, Data, Type, Size),
 1061        ( close(In2),
 1062          set_stream(In, encoding(Enc))
 1063        )).
 unpack_packs(+Store) is det
Unpack all packs.
 1070unpack_packs(Store) :-
 1071    absolute_file_name(Store, AbsStore, [file_type(directory),
 1072                                         access(read)]),
 1073    pack_files(AbsStore, Packs),
 1074    maplist(unpack_pack(AbsStore), Packs).
 unpack_pack(+Store, +Pack) is det
Turn a pack back into a plain object files
 1080unpack_pack(Store, PackFile) :-
 1081    pack_read_header(PackFile, _Version, DataOffset, Objects),
 1082    setup_call_cleanup(
 1083        open(PackFile, read, In, [type(binary)]),
 1084        foldl(create_file(Store, In), Objects, DataOffset, _),
 1085        close(In)),
 1086    detach_pack(Store, PackFile),
 1087    delete_file(PackFile).
 1088
 1089create_file(Store, In, obj(Object, _Type, _Size, FileSize), Offset0, Offset) :-
 1090    Offset is Offset0+FileSize,
 1091    gitty_object_file(Store, Object, Path),
 1092    with_mutex(gitty_file, exists_or_recreate(Path, Out)),
 1093        (   var(Out)
 1094        ->  true
 1095        ;   setup_call_cleanup(
 1096                seek(In, Offset0, bof, Offset0),
 1097                copy_stream_data(In, Out, FileSize),
 1098                close(Out))
 1099        ).
 1100
 1101exists_or_recreate(Path, _Out) :-
 1102    exists_file(Path),
 1103    !.
 1104exists_or_recreate(Path, Out) :-
 1105    file_directory_name(Path, Dir),
 1106    make_directory_path(Dir),
 1107    open(Path, write, Out, [type(binary), lock(write)]).
 remove_objects_after_pack(+Store, +Objects, +Options) is det
Remove the indicated (file) objects from Store.
 1114remove_objects_after_pack(Store, Objects, Options) :-
 1115    debug(gitty(pack), 'Deleting plain files', []),
 1116    maplist(delete_object(Store), Objects),
 1117    (   option(prune_empty_directories(true), Options, true)
 1118    ->  debug(gitty(pack), 'Pruning empty directories', []),
 1119        prune_empty_directories(Store)
 1120    ;   true
 1121    ).
 remove_repacked_packs(+Store, +Packs, +Options)
Remove packs that have been repacked.
 1127remove_repacked_packs(Store, Packs, Options) :-
 1128    maplist(remove_pack(Store, Options), Packs).
 1129
 1130remove_pack(Store, _Options, Pack) :-
 1131    detach_pack(Store, Pack),
 1132    delete_file(Pack).
 prune_empty_directories(+Dir) is det
Prune directories that are empty below Dir. Dir itself is not removed, even if it is empty.
 1139prune_empty_directories(Dir) :-
 1140    prune_empty_directories(Dir, 0).
 1141
 1142prune_empty_directories(Dir, Level) :-
 1143    directory_files(Dir, AllFiles),
 1144    exclude(hidden, AllFiles, Files),
 1145    (   Files == [],
 1146        Level > 0
 1147    ->  delete_directory_async(Dir)
 1148    ;   convlist(prune_empty_directories(Dir, Level), Files, Left),
 1149        (   Left == [],
 1150            Level > 0
 1151        ->  delete_directory_async(Dir)
 1152        ;   true
 1153        )
 1154    ).
 1155
 1156hidden(.).
 1157hidden(..).
 1158
 1159prune_empty_directories(Parent, Level0, File, _) :-
 1160    directory_file_path(Parent, File, Path),
 1161    exists_directory(Path),
 1162    !,
 1163    Level is Level0 + 1,
 1164    prune_empty_directories(Path, Level),
 1165    fail.
 1166prune_empty_directories(_, _, File, File).
 1167
 1168delete_directory_async(Dir) :-
 1169    with_mutex(gitty_file, delete_directory_async2(Dir)).
 1170
 1171delete_directory_async2(Dir) :-
 1172    catch(delete_directory(Dir), E,
 1173          (   \+ exists_directory(Dir)
 1174          ->  true
 1175          ;   \+ empty_directory(Dir)
 1176          ->  true
 1177          ;   throw(E)
 1178          )).
 1179
 1180empty_directory(Dir) :-
 1181    directory_files(Dir, AllFiles),
 1182    exclude(hidden, AllFiles, []).
 1183
 1184
 1185		 /*******************************
 1186		 *        REDIS PRIMITIVES	*
 1187		 *******************************/
 1188
 1189redis_head_db(Store, DB, Key) :-
 1190    redis_db(Store, DB, Prefix),
 1191    string_concat(Prefix, ":gitty:head", Key).
 redis_file(+Store, ?Name, ?Ext, ?Hash)
 1195redis_file(Store, Name, Ext, Hash) :-
 1196    nonvar(Name),
 1197    !,
 1198    file_name_extension(_Base, Ext, Name),
 1199    redis_head_db(Store, DB, Heads),
 1200    redis(DB, hget(Heads, Name), Hash as atom).
 1201redis_file(Store, Name, Ext, Hash) :-
 1202    nonvar(Ext),
 1203    !,
 1204    string_concat("*.", Ext, Pattern),
 1205    redis_head_db(Store, DB, Heads),
 1206    redis_hscan(DB, Heads, LazyList, [match(Pattern)]),
 1207    member(NameS-HashS, LazyList),
 1208    atom_string(Name, NameS),
 1209    atom_string(Hash, HashS).
 1210redis_file(Store, Name, Ext, Hash) :-
 1211    nonvar(Hash),
 1212    !,
 1213    load_plain_commit(Store, Hash, Commit),
 1214    Name = Commit.name,
 1215    file_name_extension(_Base, Ext, Name).
 1216redis_file(Store, Name, Ext, Hash) :-
 1217    redis_head_db(Store, DB, Heads),
 1218    redis(DB, hgetall(Heads), Pairs as pairs(atom,atom)),
 1219    member(Name-Hash, Pairs),
 1220    file_name_extension(_Base, Ext, Name).
 redis_ensure_heads(+Store)
Ensure the redis db contains a hashmap mapping all file names to their head hashes.
 1227redis_ensure_heads(Store) :-
 1228    redis_head_db(Store, DB, Key),
 1229    redis(DB, exists(Key), 1),
 1230    !.
 1231redis_ensure_heads(Store) :-
 1232    redis_head_db(Store, DB, Key),
 1233    debug(gitty(redis), 'Initializing gitty heads in ~p ...', [Key]),
 1234    gitty_scan_latest(Store),
 1235    forall(retract(latest(Name, Hash, _Time)),
 1236           redis(DB, hset(Key, Name, Hash))),
 1237    debug(gitty(redis), '... finished gitty heads', []).
 redis_update_head(+Store, +Name, +OldCommit, +NewCommit, +DataHash)
 1241redis_update_head(Store, Name, -, NewCommit, DataHash) :-
 1242    !,
 1243    redis_head_db(Store, DB, Key),
 1244    redis(DB, hset(Key, Name, NewCommit)),
 1245    publish_objects(Store, [NewCommit, DataHash]).
 1246redis_update_head(Store, Name, OldCommit, NewCommit, DataHash) :-
 1247    redis_head_db(Store, DB, Key),
 1248    redis_hcas(DB, Key, Name, OldCommit, NewCommit),
 1249    publish_objects(Store, [NewCommit, DataHash]).
 redis_delete_head(Store, Head) is det
Unregister Head
 1255redis_delete_head(Store, Head) :-
 1256    redis_head_db(Store, DB, Key),
 1257    redis(DB, hdel(Key, Head)).
 redis_set_head(+Store, +File, +Hash) is det
 1261redis_set_head(Store, File, Hash) :-
 1262    redis_head_db(Store, DB, Key),
 1263    redis(DB, hset(Key, File, Hash)).
 1264
 1265		 /*******************************
 1266		 *           REPLICATE		*
 1267		 *******************************/
 redis_replicate_get(+Store, +Hash)
Try to get an object from another SWISH server in the network. We implement replication using the PUB/SUB protocol of Redis. This is not ideal as this route of the synchronisation is only used if for some reason this server lacks some object. This typically happens if this node is new to the cluster or has been offline for a long time. In a large cluster, most nodes will have the objects and each of them will send the object around. A consumer group based solution is not ideal either, as the message may be picked up by a node that does not have this object, after which we need the failure recovery protocol to get it right. This is particularly the case with two nodes, where we have a fair chance to have be requested for the hash we miss ourselves.

We could improve on this two ways: (1) put the hash published in a short-lived key on Redis and make others check that. That is likely to avoid many nodes sending the same object or (2) see how many nodes are in the pool and switch to a consumer group based approach if this number is high (and thus we are unlikely to be asked ourselves for the missing hash).

See also
- publish_objects/2 for the incremental replication
 1293:- multifile
 1294    swish_redis:stream/2. 1295
 1296swish_redis:stream('gitty:replicate', [maxlen(100)]).
 1297
 1298:- listen(http(pre_server_start(_)),
 1299          init_replicator). 1300
 1301init_replicator :-
 1302    redis_swish_stream('gitty:replicate', ReplKey),
 1303    listen(redis(_Redis, ReplKey, _Id, Data),
 1304           replicate(Data)),
 1305    listen(redis(_, 'swish:gitty', Message),
 1306           gitty_message(Message)),
 1307    message_queue_create(_, [alias(gitty_queue)]).
 1308
 1309:- debug(gitty(replicate)). 1310
 1311gitty_message(discover(Hash)) :-
 1312    debug(gitty(replicate), 'Discover: ~p', [Hash]),
 1313    store(Store, _),
 1314    load_object_raw(Store, Hash, Data),
 1315    debug(gitty(replicate), 'Sending object ~p', [Hash]),
 1316    redis(swish, publish(swish:gitty, object(Hash, Data) as prolog)).
 1317gitty_message(object(Hash, Data)) :-
 1318    debug(gitty(replicate), 'Replicate: ~p', [Hash]),
 1319    redis_db(Store, _DB, _Prefix),
 1320    store_object_raw(Store, Hash, Data, New),
 1321    debug(gitty(replicate), 'Received object ~p (new=~p)', [Hash, New]),
 1322    (   New == true
 1323    ->  thread_send_message(gitty_queue, Hash)
 1324    ;   true
 1325    ).
 1326
 1327redis_replicate_get(_Store, Hash) :-
 1328    is_gitty_hash(Hash),
 1329    redis(swish, publish(swish:gitty, discover(Hash) as prolog), Count),
 1330    Count > 1,                          % If I'm alone it won't help :(
 1331    thread_get_message(gitty_queue, Hash,
 1332                       [ timeout(10)
 1333                       ]).
 publish_objects(+Store, +Hashes)
Make the objects we just stored globally known. These are added to the Redis stream gitty:replicate and received by replicate/1 below.

This realized eager replication as opposed to the above code (redis_replicate_get/2) which performs lazy replication. Eager replication ensure the object is on multiple places in the event that the node on which it was saved dies shortly after.

Note that we also receive the object we just saved. That is unavoidable in a network where all nodes are equal.

 1349publish_objects(Store, Hashes) :-
 1350    redis_swish_stream('gitty:replicate', ReplKey),
 1351    maplist(publish_object(Store, ReplKey), Hashes).
 1352
 1353publish_object(Store, Stream, Hash) :-
 1354    load_object_raw(Store, Hash, Data),
 1355    debug(gitty(replicate), 'Sending ~p to ~p', [Hash, Stream]),
 1356    xadd(swish, Stream, _, _{hash:Hash, data:Data}).
 replicate(+Data) is det
Act on a message send to the gitty:replicate stream. Add the object to our store unless we already have it. Note that we receive our own objects as well.
 1364replicate(Data) :-
 1365    redis_db(Store, _DB, _Prefix),
 1366    atom_string(Hash, Data.hash),
 1367    store_object_raw(Store, Hash, Data.data, _0New),
 1368    debug(gitty(replicate), 'Received object ~p (new=~p)',
 1369          [Hash, _0New]).
 1370
 1371
 1372		 /*******************************
 1373		 *         REDIS BASICS		*
 1374		 *******************************/
 redis_hcas(+DB, +Hash, +Key, +Old, +New) is semidet
Update Hash.Key to New provided the current value is Old.
 1380redis_hcas(DB, Hash, Key, Old, New) :-
 1381    redis(DB, eval("if redis.call('HGET', KEYS[1], ARGV[1]) == ARGV[2] then \c
 1382                      redis.call('HSET', KEYS[1],  ARGV[1], ARGV[3]); \c
 1383                      return 1; \c
 1384                      end; \c
 1385                    return 0\c
 1386                   ",
 1387                   1, Hash, Key, Old, New),
 1388          1)