1/* Part of SWISH 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2015-2020, VU University Amsterdam 7 CWI Amsterdam 8 SWI-Prolog Solutions b.v. 9 All rights reserved. 10 11 Redistribution and use in source and binary forms, with or without 12 modification, are permitted provided that the following conditions 13 are met: 14 15 1. Redistributions of source code must retain the above copyright 16 notice, this list of conditions and the following disclaimer. 17 18 2. Redistributions in binary form must reproduce the above copyright 19 notice, this list of conditions and the following disclaimer in 20 the documentation and/or other materials provided with the 21 distribution. 22 23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 26 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 27 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 28 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 29 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 30 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 31 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 32 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 33 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 POSSIBILITY OF SUCH DAMAGE. 35*/ 36 37:- module(gitty_driver_files, 38 [ gitty_open/2, % +Store, +Options 39 gitty_close/1, % +Store 40 gitty_file/4, % +Store, ?Name, ?Ext, ?Hash 41 42 gitty_update_head/5, % +Store, +Name, +OldCommit, +NewCommit 43 % +DataHash 44 delete_head/2, % +Store, +Name 45 set_head/3, % +Store, +Name, +Hash 46 store_object/4, % +Store, +Hash, +Header, +Data 47 delete_object/2, % +Store, +Hash 48 49 gitty_hash/2, % +Store, ?Hash 50 load_plain_commit/3, % +Store, +Hash, -Meta 51 load_object/5, % +Store, +Hash, -Data, -Type, -Size 52 gitty_object_file/3, % +Store, +Hash, -File 53 54 repack_objects/2, % +Store, +Options 55 pack_objects/6, % +Store, +Objs, +Packs, +PackDir, 56 % -File, +Opts 57 unpack_packs/1, % +Store 58 unpack_pack/2, % +Store, +PackFile 59 60 attach_pack/2, % +Store, +PackFile 61 gitty_fsck/1, % +Store 62 fsck_pack/1, % +PackFile 63 load_object_from_pack/4, % +Hash, -Data, -Type, -Size 64 65 gitty_rescan/1 % Store 66 ]). 67:- use_module(library(apply)). 68:- use_module(library(zlib)). 69:- use_module(library(filesex)). 70:- use_module(library(lists)). 71:- use_module(library(apply)). 72:- use_module(library(error)). 73:- use_module(library(debug)). 74:- use_module(library(zlib)). 75:- use_module(library(hash_stream)). 76:- use_module(library(option)). 77:- use_module(library(dcg/basics)). 78:- use_module(library(redis)). 79:- use_module(library(redis_streams)). 80:- use_module(gitty, [is_gitty_hash/1]). 81 82:- use_module(swish_redis).
102:- dynamic 103 head/4, % Store, Name, Ext, Hash 104 store/2, % Store, Updated 105 commit/3, % Store, Hash, Meta 106 heads_input_stream_cache/2, % Store, Stream 107 pack_object/6, % Hash, Type, Size, Offset, Store,PackFile 108 attached_packs/1, % Store 109 attached_pack/2, % Store, PackFile 110 redis_db/3. % Store, DB, Prefix 111 112:- volatile 113 head/4, 114 store/2, 115 commit/3, 116 heads_input_stream_cache/2, 117 pack_object/6, 118 attached_packs/1, 119 attached_pack/2. 120 121:- multifile 122 gitty:check_object/4. 123 124% enable/disable syncing remote servers running on the same file store. 125% This facility requires shared access to files and thus doesn't work on 126% Windows. 127 128:- if(current_prolog_flag(windows, true)). 129remote_sync(false). 130:- else. 131remote_sync(true). 132:- endif.
swish
.145gitty_open(Store, Options) :- 146 option(redis(DB), Options), 147 !, 148 option(redis_prefix(Prefix), Options, swish), 149 asserta(redis_db(Store, DB, Prefix)), 150 thread_create(gitty_scan(Store), _, [detached(true)]). 151gitty_open(_, _).
158gitty_close(Store) :-
159 ( retract(heads_input_stream_cache(Store, In))
160 -> close(In)
161 ; true
162 ),
163 retractall(head(Store,_,_,_)),
164 retractall(store(Store,_)),
165 retractall(pack_object(_,_,_,_,Store,_)).
173gitty_file(Store, Head, Ext, Hash) :- 174 redis_db(Store, _, _), 175 !, 176 gitty_scan(Store), 177 redis_file(Store, Head, Ext, Hash). 178gitty_file(Store, Head, Ext, Hash) :- 179 gitty_scan(Store), 180 head(Store, Head, Ext, Hash).
191load_plain_commit(Store, Hash, Meta) :- 192 must_be(atom, Store), 193 must_be(atom, Hash), 194 commit(Store, Hash, Meta), 195 !. 196load_plain_commit(Store, Hash, Meta) :- 197 load_object(Store, Hash, String, _, _), 198 term_string(Meta0, String, []), 199 with_mutex(gitty_commit_cache, 200 assert_cached_commit(Store, Hash, Meta0)), 201 Meta = Meta0. 202 203assert_cached_commit(Store, Hash, Meta) :- 204 commit(Store, Hash, Meta0), 205 !, 206 assertion(Meta0 =@= Meta). 207assert_cached_commit(Store, Hash, Meta) :- 208 assertz(commit(Store, Hash, Meta)).
215store_object(Store, Hash, _Hdr, _Data) :- 216 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack), 217 !. 218store_object(Store, Hash, Hdr, Data) :- 219 gitty_object_file(Store, Hash, Path), 220 with_mutex(gitty_file, exists_or_create(Path, Out0)), 221 ( var(Out0) 222 -> true 223 ; setup_call_cleanup( 224 zopen(Out0, Out, [format(gzip)]), 225 format(Out, '~s~s', [Hdr, Data]), 226 close(Out)) 227 ). 228 229exists_or_create(Path, _Out) :- 230 exists_file(Path), 231 !. 232exists_or_create(Path, Out) :- 233 file_directory_name(Path, Dir), 234 make_directory_path(Dir), 235 open(Path, write, Out, [encoding(utf8), lock(write)]). 236 237:- if(\+current_predicate(ensure_directory/1)). 238% in Library as of SWI-Prolog 9.1.20 239ensure_directory(Dir) :- 240 exists_directory(Dir), 241 !. 242ensure_directory(Dir) :- 243 make_directory(Dir). 244:- endif.
251store_object_raw(Store, Hash, _Bytes, false) :- 252 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack), 253 !. 254store_object_raw(Store, Hash, Bytes, New) :- 255 gitty_object_file(Store, Hash, Path), 256 with_mutex(gitty_file, exists_or_create(Path, Out)), 257 ( var(Out) 258 -> New = false 259 ; call_cleanup( 260 ( set_stream(Out, type(binary)), 261 write(Out, Bytes) 262 ), 263 close(Out)), 264 New = true 265 ).
271load_object(_Store, Hash, Data, Type, Size) :- 272 load_object_from_pack(Hash, Data0, Type0, Size0), 273 !, 274 f(Data0, Type0, Size0) = f(Data, Type, Size). 275load_object(Store, Hash, Data, Type, Size) :- 276 load_object_file(Store, Hash, Data0, Type0, Size0), 277 !, 278 f(Data0, Type0, Size0) = f(Data, Type, Size). 279load_object(Store, Hash, Data, Type, Size) :- 280 redis_db(Store, _, _), 281 redis_replicate_get(Store, Hash), 282 load_object_file(Store, Hash, Data, Type, Size). 283 284load_object_file(Store, Hash, Data, Type, Size) :- 285 gitty_object_file(Store, Hash, Path), 286 exists_file(Path), 287 !, 288 setup_call_cleanup( 289 gzopen(Path, read, In, [encoding(utf8)]), 290 read_object(In, Data, Type, Size), 291 close(In)).
297load_object_raw(_Store, Hash, Bytes) :- 298 load_object_from_pack(Hash, Data, Type, Size), 299 !, 300 object_bytes(Type, Size, Data, Bytes). 301load_object_raw(Store, Hash, Data) :- 302 gitty_object_file(Store, Hash, Path), 303 exists_file(Path), 304 !, 305 setup_call_cleanup( 306 open(Path, read, In, [type(binary)]), 307 read_string(In, _, Data), 308 close(In)).
314object_bytes(Type, Size, Data, Bytes) :-
315 setup_call_cleanup(
316 new_memory_file(MF),
317 ( setup_call_cleanup(
318 open_memory_file(MF, write, Out, [encoding(octet)]),
319 setup_call_cleanup(
320 zopen(Out, ZOut, [format(gzip), close_parent(false)]),
321 ( set_stream(ZOut, encoding(utf8)),
322 format(ZOut, '~w ~d\u0000~w', [Type, Size, Data])
323 ),
324 close(ZOut)),
325 close(Out)),
326 memory_file_to_string(MF, Bytes, octet)
327 ),
328 free_memory_file(MF)).
335load_object_header(Store, Hash, Type, Size) :- 336 gitty_object_file(Store, Hash, Path), 337 setup_call_cleanup( 338 gzopen(Path, read, In, [encoding(utf8)]), 339 read_object_hdr(In, Type, Size), 340 close(In)). 341 342read_object(In, Data, Type, Size) :- 343 read_object_hdr(In, Type, Size), 344 read_string(In, _, Data). 345 346read_object_hdr(In, Type, Size) :- 347 get_code(In, C0), 348 read_hdr(C0, In, Hdr), 349 phrase((nonblanks(TypeChars), " ", integer(Size)), Hdr), 350 atom_codes(Type, TypeChars). 351 352read_hdr(C, In, [C|T]) :- 353 C > 0, 354 !, 355 get_code(In, C1), 356 read_hdr(C1, In, T). 357read_hdr(_, _, []).
364gitty_rescan(Store) :-
365 retractall(store(Store, _)).
@tdb Possibly we need to maintain a cached version of this index to avoid having to open all objects of the gitty store.
376gitty_scan(Store) :- 377 store(Store, _), 378 !, 379 remote_updates(Store). 380gitty_scan(Store) :- 381 with_mutex(gitty, gitty_scan_sync(Store)). 382 383:- thread_local 384 latest/3. 385 386gitty_scan_sync(Store) :- 387 store(Store, _), 388 !. 389gitty_scan_sync(Store) :- 390 redis_db(Store, _, _), 391 !, 392 gitty_attach_packs(Store), 393 redis_ensure_heads(Store), 394 get_time(Now), 395 assertz(store(Store, Now)). 396:- if(remote_sync(true)). 397gitty_scan_sync(Store) :- 398 remote_sync(true), 399 !, 400 gitty_attach_packs(Store), 401 restore_heads_from_remote(Store). 402:- endif. 403gitty_scan_sync(Store) :- 404 gitty_attach_packs(Store), 405 read_heads_from_objects(Store).
head(Store,File,Ext,Hash)
relation by reading all
objects and adding a fact for the most recent commit.412read_heads_from_objects(Store) :- 413 gitty_scan_latest(Store), 414 forall(retract(latest(Name, Hash, _Time)), 415 assert_head(Store, Name, Hash)), 416 get_time(Now), 417 assertz(store(Store, Now)). 418 419assert_head(Store, Name, Hash) :- 420 file_name_extension(_, Ext, Name), 421 assertz(head(Store, Name, Ext, Hash)).
429gitty_scan_latest(Store) :-
430 retractall(head(Store, _, _, _)),
431 retractall(latest(_, _, _)),
432 ( gitty_hash(Store, Hash),
433 load_object(Store, Hash, Data, commit, _Size),
434 term_string(Meta, Data, []),
435 _{name:Name, time:Time} :< Meta,
436 ( latest(Name, _, OldTime),
437 OldTime > Time
438 -> true
439 ; retractall(latest(Name, _, _)),
440 assertz(latest(Name, Hash, Time))
441 ),
442 fail
443 ; true
444 ).
451gitty_hash(Store, Hash) :- 452 var(Hash), 453 !, 454 ( gitty_attach_packs(Store), 455 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack) 456 ; gitty_file_object(Store, Hash) 457 ). 458gitty_hash(Store, Hash) :- 459 ( gitty_attach_packs(Store), 460 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack) 461 -> true 462 ; gitty_object_file(Store, Hash, File), 463 exists_file(File) 464 ). 465 466gitty_file_object(Store, Hash) :- 467 access_file(Store, exist), 468 directory_files(Store, Level0), 469 member(E0, Level0), 470 E0 \== '..', 471 atom_length(E0, 2), 472 directory_file_path(Store, E0, Dir0), 473 directory_files(Dir0, Level1), 474 member(E1, Level1), 475 E1 \== '..', 476 atom_length(E1, 2), 477 directory_file_path(Dir0, E1, Dir), 478 directory_files(Dir, Files), 479 member(File, Files), 480 atom_length(File, 36), 481 atomic_list_concat([E0,E1,File], Hash).
487delete_object(Store, Hash) :-
488 gitty_object_file(Store, Hash, File),
489 delete_file(File).
496gitty_object_file(Store, Hash, Path) :- 497 sub_string(Hash, 0, 2, _, Dir0), 498 sub_string(Hash, 2, 2, _, Dir1), 499 sub_string(Hash, 4, _, 0, File), 500 atomic_list_concat([Store, Dir0, Dir1, File], /, Path). 501 502 503 /******************************* 504 * SYNCING * 505 *******************************/
-
.
This operation can fail because another writer has updated the head. This can both be in-process or another process.
517gitty_update_head(Store, Name, OldCommit, NewCommit, DataHash) :- 518 redis_db(Store, _, _), 519 !, 520 redis_update_head(Store, Name, OldCommit, NewCommit, DataHash). 521gitty_update_head(Store, Name, OldCommit, NewCommit, _) :- 522 with_mutex(gitty, 523 gitty_update_head_sync(Store, Name, OldCommit, NewCommit)). 524 525:- if(remote_sync(true)). 526gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :- 527 remote_sync(true), 528 !, 529 setup_call_cleanup( 530 heads_output_stream(Store, HeadsOut), 531 gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut), 532 close(HeadsOut)). 533:- endif. 534gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :- 535 gitty_update_head_sync2(Store, Name, OldCommit, NewCommit). 536 537gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut) :- 538 gitty_update_head_sync2(Store, Name, OldCommit, NewCommit), 539 format(HeadsOut, '~q.~n', [head(Name, OldCommit, NewCommit)]). 540 541gitty_update_head_sync2(Store, Name, OldCommit, NewCommit) :- 542 gitty_scan(Store), % fetch remote changes 543 ( OldCommit == (-) 544 -> ( head(Store, Name, _, _) 545 -> throw(error(gitty(file_exists(Name),_))) 546 ; assert_head(Store, Name, NewCommit) 547 ) 548 ; ( retract(head(Store, Name, _, OldCommit)) 549 -> assert_head(Store, Name, NewCommit) 550 ; throw(error(gitty(not_at_head(Name, OldCommit)), _)) 551 ) 552 ).
559:- dynamic 560 last_remote_sync/2. 561 562:- if(remote_sync(false)). 563remote_updates(_) :- 564 remote_sync(false), 565 !. 566:- endif. 567remote_updates(Store) :- 568 remote_up_to_data(Store), 569 !. 570remote_updates(Store) :- 571 with_mutex(gitty, remote_updates_sync(Store)). 572 573remote_updates_sync(Store) :- 574 remote_up_to_data(Store), 575 !. 576remote_updates_sync(Store) :- 577 retractall(last_remote_sync(Store, _)), 578 get_time(Now), 579 asserta(last_remote_sync(Store, Now)), 580 remote_update(Store). 581 582remote_up_to_data(Store) :- 583 last_remote_sync(Store, Last), 584 get_time(Now), 585 Now-Last < 1. 586 587remote_update(Store) :- 588 remote_updates(Store, List), 589 maplist(update_head(Store), List). 590 591update_head(Store, head(Name, OldCommit, NewCommit)) :- 592 ( OldCommit == (-) 593 -> \+ head(Store, Name, _, _) 594 ; retract(head(Store, Name, _, OldCommit)) 595 ), 596 !, 597 assert_head(Store, Name, NewCommit). 598update_head(_, _).
607remote_updates(Store, List) :- 608 heads_input_stream(Store, Stream), 609 setup_call_cleanup( 610 '$push_input_context'(gitty_sync), 611 read_new_terms(Stream, List), 612 '$pop_input_context'). 613 614read_new_terms(Stream, Terms) :- 615 read(Stream, First), 616 read_new_terms(First, Stream, Terms). 617 618read_new_terms(end_of_file, _, List) :- 619 !, 620 List = []. 621read_new_terms(Term, Stream, [Term|More]) :- 622 read(Stream, Term2), 623 read_new_terms(Term2, Stream, More). 624 625heads_output_stream(Store, Out) :- 626 heads_file(Store, HeadsFile), 627 open(HeadsFile, append, Out, 628 [ encoding(utf8), 629 lock(exclusive) 630 ]). 631 632heads_input_stream(Store, Stream) :- 633 heads_input_stream_cache(Store, Stream0), 634 !, 635 Stream = Stream0. 636heads_input_stream(Store, Stream) :- 637 heads_file(Store, File), 638 between(1, 2, _), 639 catch(open(File, read, In, 640 [ encoding(utf8), 641 eof_action(reset) 642 ]), 643 _, 644 create_heads_file(Store)), 645 !, 646 assert(heads_input_stream_cache(Store, In)), 647 Stream = In. 648 649create_heads_file(Store) :- 650 call_cleanup( 651 heads_output_stream(Store, Out), 652 close(Out)), 653 fail. % always fail! 654 655heads_file(Store, HeadsFile) :- 656 ensure_directory(Store), 657 directory_file_path(Store, ref, RefDir), 658 ensure_directory(RefDir), 659 directory_file_path(RefDir, head, HeadsFile).
665restore_heads_from_remote(Store) :- 666 heads_file(Store, File), 667 exists_file(File), 668 setup_call_cleanup( 669 open(File, read, In, [encoding(utf8)]), 670 restore_heads(Store, In), 671 close(In)), 672 !, 673 get_time(Now), 674 assertz(store(Store, Now)). 675restore_heads_from_remote(Store) :- 676 read_heads_from_objects(Store), 677 heads_file(Store, File), 678 setup_call_cleanup( 679 open(File, write, Out, [encoding(utf8)]), 680 save_heads(Store, Out), 681 close(Out)), 682 !. 683 684restore_heads(Store, In) :- 685 read(In, Term0), 686 Term0 = epoch(_), 687 read(In, Term1), 688 restore_heads(Term1, In, Store). 689 690restore_heads(end_of_file, _, _) :- !. 691restore_heads(head(File, _, Hash), In, Store) :- 692 retractall(head(Store, File, _, _)), 693 assert_head(Store, File, Hash), 694 read(In, Term), 695 restore_heads(Term, In, Store). 696 697save_heads(Store, Out) :- 698 get_time(Now), 699 format(Out, 'epoch(~0f).~n~n', [Now]), 700 forall(head(Store, File, _, Hash), 701 format(Out, '~q.~n', [head(File, -, Hash)])).
710delete_head(Store, Head) :- 711 redis_db(Store, _, _), 712 !, 713 redis_delete_head(Store, Head). 714delete_head(Store, Head) :- 715 retractall(head(Store, Head, _, _)).
721set_head(Store, File, Hash) :- 722 redis_db(Store, _, _), 723 !, 724 redis_set_head(Store, File, Hash). 725set_head(Store, File, Hash) :- 726 file_name_extension(_, Ext, File), 727 ( head(Store, File, _, Hash0) 728 -> ( Hash == Hash0 729 -> true 730 ; asserta(head(Store, File, Ext, Hash)), 731 retractall(head(Store, File, _, Hash0)) 732 ) 733 ; asserta(head(Store, File, Ext, Hash)) 734 ). 735 736 737 /******************************* 738 * PACKS * 739 *******************************/ 740 741/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 742 743<pack file> := <header> 744 <file>* 745<header> := "gitty(Version).\n" <object>* "end_of_header.\n" 746<object> := obj(Hash, Type, Size, FileSize) 747- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 748 749pack_version(1).
764:- debug(gitty(pack)). 765 766repack_objects(Store, Options) :- 767 option(min_files(MinFiles), Options, 1_000), 768 findall(Object, gitty_file_object(Store, Object), Objects), 769 length(Objects, NewFiles), 770 debug(gitty(pack), 'Found ~D file objects', [NewFiles]), 771 ( NewFiles >= MinFiles 772 -> pack_files(Store, ExistingPacks), 773 option(small_pack(MaxSize), Options, 10_000_000), 774 include(small_file(MaxSize), ExistingPacks, PackFiles), 775 ( debugging(gitty(pack)) 776 -> length(PackFiles, PackCount), 777 debug(gitty(pack), 'Found ~D small packs', [PackCount]) 778 ; true 779 ), 780 directory_file_path(Store, pack, PackDir), 781 make_directory_path(PackDir), 782 pack_objects(Store, Objects, PackFiles, PackDir, _PackFile, Options) 783 ; debug(gitty(pack), 'Nothing to do', []) 784 ). 785 786small_file(MaxSize, File) :- 787 size_file(File, Size), 788 Size < MaxSize.
795pack_objects(Store, Objects, Packs, PackDir, PackFile, Options) :- 796 with_mutex(gitty_pack, 797 pack_objects_sync(Store, Objects, Packs, PackDir, 798 PackFile, Options)). 799 800pack_objects_sync(_Store, [], [Pack], _, [Pack], _) :- 801 !. 802pack_objects_sync(Store, Objects, Packs, PackDir, PackFilePath, Options) :- 803 length(Objects, ObjCount), 804 length(Packs, PackCount), 805 debug(gitty(pack), 'Repacking ~D objects and ~D packs', 806 [ObjCount, PackCount]), 807 maplist(object_info(Store), Objects, FileInfo), 808 maplist(pack_info(Store), Packs, PackInfo), 809 append([FileInfo|PackInfo], Info0), 810 sort(1, @<, Info0, Info), % remove possible duplicates 811 ( debugging(gitty(pack)) 812 -> ( PackCount > 0 813 -> length(Info, FinalObjCount), 814 debug(gitty(pack), 'Total ~D objects', [FinalObjCount]) 815 ; true 816 ) 817 ; true 818 ), 819 directory_file_path(PackDir, 'pack-create', TmpPack), 820 setup_call_cleanup( 821 ( open(TmpPack, write, Out0, [type(binary), lock(write)]), 822 open_hash_stream(Out0, Out, [algorithm(sha1)]) 823 ), 824 ( write_signature(Out), 825 maplist(write_header(Out), Info), 826 format(Out, 'end_of_header.~n', []), 827 maplist(add_file(Out, Store), Info), 828 stream_hash(Out, SHA1) 829 ), 830 close(Out)), 831 format(atom(PackFile), 'pack-~w.pack', [SHA1]), 832 directory_file_path(PackDir, PackFile, PackFilePath), 833 rename_file(TmpPack, PackFilePath), 834 debug(gitty(pack), 'Attaching ~p', [PackFilePath]), 835 attach_pack(Store, PackFilePath), 836 remove_objects_after_pack(Store, Objects, Options), 837 delete(Packs, PackFilePath, RmPacks), 838 remove_repacked_packs(Store, RmPacks, Options), 839 debug(gitty(pack), 'Packing completed', []). 840 841object_info(Store, Object, obj(Object, Type, Size, FileSize)) :- 842 gitty_object_file(Store, Object, File), 843 load_object_header(Store, Object, Type, Size), 844 size_file(File, FileSize). 845 846pack_info(Store, PackFile, Objects) :- 847 attach_pack(Store, PackFile), 848 pack_read_header(PackFile, _Version, _DataOffset, Objects). 849 850write_signature(Out) :- 851 pack_version(Version), 852 format(Out, "gitty(~d).~n", [Version]). 853 854write_header(Out, obj(Object, Type, Size, FileSize)) :- 855 format(Out, 'obj(~q,~q,~d,~d).~n', [Object, Type, Size, FileSize]).
861add_file(Out, Store, obj(Object, _Type, _Size, _FileSize)) :- 862 gitty_object_file(Store, Object, File), 863 exists_file(File), 864 !, 865 setup_call_cleanup( 866 open(File, read, In, [type(binary)]), 867 copy_stream_data(In, Out), 868 close(In)). 869add_file(Out, Store, obj(Object, Type, Size, FileSize)) :- 870 pack_object(Object, Type, Size, Offset, Store, PackFile), 871 setup_call_cleanup( 872 open(PackFile, read, In, [type(binary)]), 873 ( seek(In, Offset, bof, Offset), 874 copy_stream_data(In, Out, FileSize) 875 ), 876 close(In)).
883gitty_fsck(Store) :-
884 pack_files(Store, PackFiles),
885 maplist(fsck_pack, PackFiles).
891fsck_pack(File) :- 892 debug(gitty(pack), 'fsck ~p', [File]), 893 check_pack_hash(File), 894 debug(gitty(pack), 'fsck ~p: checking objects', [File]), 895 check_pack_objects(File), 896 debug(gitty(pack), 'fsck ~p: done', [File]). 897 898check_pack_hash(File) :- 899 file_base_name(File, Base), 900 file_name_extension(Plain, Ext, Base), 901 must_be(oneof([pack]), Ext), 902 atom_concat('pack-', Hash, Plain), 903 setup_call_cleanup( 904 ( open(File, read, In0, [type(binary)]), 905 open_hash_stream(In0, In, [algorithm(sha1)]) 906 ), 907 ( setup_call_cleanup( 908 open_null_stream(Null), 909 copy_stream_data(In, Null), 910 close(Null)), 911 stream_hash(In, SHA1) 912 ), 913 close(In)), 914 assertion(Hash == SHA1). 915 916check_pack_objects(PackFile) :- 917 setup_call_cleanup( 918 open(PackFile, read, In, [type(binary)]), 919 ( read_header(In, Version, DataOffset, Objects), 920 set_stream(In, encoding(utf8)), 921 foldl(check_object(In, PackFile, Version), Objects, DataOffset, _) 922 ), 923 close(In)). 924 925check_object(In, PackFile, _Version, 926 obj(Object, Type, Size, FileSize), 927 Offset0, Offset) :- 928 Offset is Offset0+FileSize, 929 byte_count(In, Here), 930 ( Here == Offset0 931 -> true 932 ; print_message(warning, pack(reposition(Here, Offset0))), 933 seek(In, Offset0, bof, Offset0) 934 ), 935 ( setup_call_cleanup( 936 zopen(In, In2, [multi_part(false), close_parent(false)]), 937 catch(read_object(In2, Data, _0RType, _0RSize), E, 938 ( print_message(error, 939 gitty(PackFile, fsck(read_object(Object, E)))), 940 fail)), 941 close(In2)) 942 -> byte_count(In, End), 943 ( End == Offset 944 -> true 945 ; print_message(error, 946 gitty(PackFile, fsck(object_end(Object, End, 947 Offset0, Offset, 948 Data)))) 949 ), 950 assertion(Type == _0RType), 951 assertion(Size == _0RSize), 952 gitty:check_object(Object, Data, Type, Size) 953 ; true 954 ).
961gitty_attach_packs(Store) :- 962 attached_packs(Store), 963 !. 964gitty_attach_packs(Store) :- 965 with_mutex(gitty_attach_packs, 966 gitty_attach_packs_sync(Store)). 967 968gitty_attach_packs_sync(Store) :- 969 attached_packs(Store), 970 !. 971gitty_attach_packs_sync(Store) :- 972 pack_files(Store, PackFiles), 973 maplist(attach_pack(Store), PackFiles), 974 asserta(attached_packs(Store)). 975 976pack_files(Store, Packs) :- 977 directory_file_path(Store, pack, PackDir), 978 exists_directory(PackDir), 979 !, 980 directory_files(PackDir, Files), 981 convlist(is_pack(PackDir), Files, Packs). 982pack_files(_, []). 983 984is_pack(PackDir, File, Path) :- 985 file_name_extension(_, pack, File), 986 directory_file_path(PackDir, File, Path).
992attach_pack(Store, PackFile) :- 993 attached_pack(Store, PackFile), 994 !. 995attach_pack(Store, PackFile) :- 996 retractall(pack_object(_,_,_,_,_,PackFile)), 997 pack_read_header(PackFile, Version, DataOffset, Objects), 998 foldl(assert_object(Store, PackFile, Version), Objects, DataOffset, _), 999 assertz(attached_pack(Store, PackFile)). 1000 1001pack_read_header(PackFile, Version, DataOffset, Objects) :- 1002 setup_call_cleanup( 1003 open(PackFile, read, In, [type(binary)]), 1004 read_header(In, Version, DataOffset, Objects), 1005 close(In)). 1006 1007read_header(In, Version, DataOffset, Objects) :- 1008 read(In, Signature), 1009 ( Signature = gitty(Version) 1010 -> true 1011 ; domain_error(gitty_pack_file, Objects) 1012 ), 1013 read(In, Term), 1014 read_index(Term, In, Objects), 1015 get_code(In, Code), 1016 assertion(Code == 0'\n), 1017 byte_count(In, DataOffset). 1018 1019read_index(end_of_header, _, []) :- 1020 !. 1021read_index(Object, In, [Object|T]) :- 1022 read(In, Term2), 1023 read_index(Term2, In, T). 1024 1025assert_object(Store, Pack, _Version, 1026 obj(Object, Type, Size, FileSize), 1027 Offset0, Offset) :- 1028 Offset is Offset0+FileSize, 1029 assertz(pack_object(Object, Type, Size, Offset0, Store, Pack)).
1035detach_pack(Store, Pack) :-
1036 retractall(pack_object(_, _, _, _, Store, Pack)),
1037 retractall(attached_pack(Store, Pack)).
1043load_object_from_pack(Hash, Data, Type, Size) :- 1044 pack_object(Hash, Type, Size, Offset, _Store, Pack), 1045 setup_call_cleanup( 1046 open(Pack, read, In, [type(binary)]), 1047 read_object_at(In, Offset, Data, Type, Size), 1048 close(In)). 1049 1050read_object_at(In, Offset, Data, Type, Size) :- 1051 seek(In, Offset, bof, Offset), 1052 read_object_here(In, Data, Type, Size). 1053 1054read_object_here(In, Data, Type, Size) :- 1055 stream_property(In, encoding(Enc)), 1056 setup_call_cleanup( 1057 ( set_stream(In, encoding(utf8)), 1058 zopen(In, In2, [multi_part(false), close_parent(false)]) 1059 ), 1060 read_object(In2, Data, Type, Size), 1061 ( close(In2), 1062 set_stream(In, encoding(Enc)) 1063 )).
1070unpack_packs(Store) :-
1071 absolute_file_name(Store, AbsStore, [file_type(directory),
1072 access(read)]),
1073 pack_files(AbsStore, Packs),
1074 maplist(unpack_pack(AbsStore), Packs).
1080unpack_pack(Store, PackFile) :- 1081 pack_read_header(PackFile, _Version, DataOffset, Objects), 1082 setup_call_cleanup( 1083 open(PackFile, read, In, [type(binary)]), 1084 foldl(create_file(Store, In), Objects, DataOffset, _), 1085 close(In)), 1086 detach_pack(Store, PackFile), 1087 delete_file(PackFile). 1088 1089create_file(Store, In, obj(Object, _Type, _Size, FileSize), Offset0, Offset) :- 1090 Offset is Offset0+FileSize, 1091 gitty_object_file(Store, Object, Path), 1092 with_mutex(gitty_file, exists_or_recreate(Path, Out)), 1093 ( var(Out) 1094 -> true 1095 ; setup_call_cleanup( 1096 seek(In, Offset0, bof, Offset0), 1097 copy_stream_data(In, Out, FileSize), 1098 close(Out)) 1099 ). 1100 1101exists_or_recreate(Path, _Out) :- 1102 exists_file(Path), 1103 !. 1104exists_or_recreate(Path, Out) :- 1105 file_directory_name(Path, Dir), 1106 make_directory_path(Dir), 1107 open(Path, write, Out, [type(binary), lock(write)]).
1114remove_objects_after_pack(Store, Objects, Options) :-
1115 debug(gitty(pack), 'Deleting plain files', []),
1116 maplist(delete_object(Store), Objects),
1117 ( option(prune_empty_directories(true), Options, true)
1118 -> debug(gitty(pack), 'Pruning empty directories', []),
1119 prune_empty_directories(Store)
1120 ; true
1121 ).
1127remove_repacked_packs(Store, Packs, Options) :- 1128 maplist(remove_pack(Store, Options), Packs). 1129 1130remove_pack(Store, _Options, Pack) :- 1131 detach_pack(Store, Pack), 1132 delete_file(Pack).
1139prune_empty_directories(Dir) :- 1140 prune_empty_directories(Dir, 0). 1141 1142prune_empty_directories(Dir, Level) :- 1143 directory_files(Dir, AllFiles), 1144 exclude(hidden, AllFiles, Files), 1145 ( Files == [], 1146 Level > 0 1147 -> delete_directory_async(Dir) 1148 ; convlist(prune_empty_directories(Dir, Level), Files, Left), 1149 ( Left == [], 1150 Level > 0 1151 -> delete_directory_async(Dir) 1152 ; true 1153 ) 1154 ). 1155 (.). 1157hidden(..). 1158 1159prune_empty_directories(Parent, Level0, File, _) :- 1160 directory_file_path(Parent, File, Path), 1161 exists_directory(Path), 1162 !, 1163 Level is Level0 + 1, 1164 prune_empty_directories(Path, Level), 1165 fail. 1166prune_empty_directories(_, _, File, File). 1167 1168delete_directory_async(Dir) :- 1169 with_mutex(gitty_file, delete_directory_async2(Dir)). 1170 1171delete_directory_async2(Dir) :- 1172 catch(delete_directory(Dir), E, 1173 ( \+ exists_directory(Dir) 1174 -> true 1175 ; \+ empty_directory(Dir) 1176 -> true 1177 ; throw(E) 1178 )). 1179 1180empty_directory(Dir) :- 1181 directory_files(Dir, AllFiles), 1182 exclude(hidden, AllFiles, []). 1183 1184 1185 /******************************* 1186 * REDIS PRIMITIVES * 1187 *******************************/ 1188 1189redis_head_db(Store, DB, Key) :- 1190 redis_db(Store, DB, Prefix), 1191 string_concat(Prefix, ":gitty:head", Key).
1195redis_file(Store, Name, Ext, Hash) :- 1196 nonvar(Name), 1197 !, 1198 file_name_extension(_Base, Ext, Name), 1199 redis_head_db(Store, DB, Heads), 1200 redis(DB, hget(Heads, Name), Hash as atom). 1201redis_file(Store, Name, Ext, Hash) :- 1202 nonvar(Ext), 1203 !, 1204 string_concat("*.", Ext, Pattern), 1205 redis_head_db(Store, DB, Heads), 1206 redis_hscan(DB, Heads, LazyList, [match(Pattern)]), 1207 member(NameS-HashS, LazyList), 1208 atom_string(Name, NameS), 1209 atom_string(Hash, HashS). 1210redis_file(Store, Name, Ext, Hash) :- 1211 nonvar(Hash), 1212 !, 1213 load_plain_commit(Store, Hash, Commit), 1214 Name = Commit.name, 1215 file_name_extension(_Base, Ext, Name). 1216redis_file(Store, Name, Ext, Hash) :- 1217 redis_head_db(Store, DB, Heads), 1218 redis(DB, hgetall(Heads), Pairs as pairs(atom,atom)), 1219 member(Name-Hash, Pairs), 1220 file_name_extension(_Base, Ext, Name).
1227redis_ensure_heads(Store) :- 1228 redis_head_db(Store, DB, Key), 1229 redis(DB, exists(Key), 1), 1230 !. 1231redis_ensure_heads(Store) :- 1232 redis_head_db(Store, DB, Key), 1233 debug(gitty(redis), 'Initializing gitty heads in ~p ...', [Key]), 1234 gitty_scan_latest(Store), 1235 forall(retract(latest(Name, Hash, _Time)), 1236 redis(DB, hset(Key, Name, Hash))), 1237 debug(gitty(redis), '... finished gitty heads', []).
1241redis_update_head(Store, Name, -, NewCommit, DataHash) :- 1242 !, 1243 redis_head_db(Store, DB, Key), 1244 redis(DB, hset(Key, Name, NewCommit)), 1245 publish_objects(Store, [NewCommit, DataHash]). 1246redis_update_head(Store, Name, OldCommit, NewCommit, DataHash) :- 1247 redis_head_db(Store, DB, Key), 1248 redis_hcas(DB, Key, Name, OldCommit, NewCommit), 1249 publish_objects(Store, [NewCommit, DataHash]).
1255redis_delete_head(Store, Head) :-
1256 redis_head_db(Store, DB, Key),
1257 redis(DB, hdel(Key, Head)).
1261redis_set_head(Store, File, Hash) :- 1262 redis_head_db(Store, DB, Key), 1263 redis(DB, hset(Key, File, Hash)). 1264 1265 /******************************* 1266 * REPLICATE * 1267 *******************************/
We could improve on this two ways: (1) put the hash published in a short-lived key on Redis and make others check that. That is likely to avoid many nodes sending the same object or (2) see how many nodes are in the pool and switch to a consumer group based approach if this number is high (and thus we are unlikely to be asked ourselves for the missing hash).
1293:- multifile 1294 swish_redis:stream/2. 1295 1296swish_redisstream('gitty:replicate', [maxlen(100)]). 1297 1298:- listen(http(pre_server_start(_)), 1299 init_replicator). 1300 1301init_replicator :- 1302 redis_swish_stream('gitty:replicate', ReplKey), 1303 listen(redis(_Redis, ReplKey, _Id, Data), 1304 replicate(Data)), 1305 listen(redis(_, 'swish:gitty', Message), 1306 gitty_message(Message)), 1307 message_queue_create(_, [alias(gitty_queue)]). 1308 1309:- debug(gitty(replicate)). 1310 1311gitty_message(discover(Hash)) :- 1312 debug(gitty(replicate), 'Discover: ~p', [Hash]), 1313 store(Store, _), 1314 load_object_raw(Store, Hash, Data), 1315 debug(gitty(replicate), 'Sending object ~p', [Hash]), 1316 redis(swish, publish(swish:gitty, object(Hash, Data) as prolog)). 1317gitty_message(object(Hash, Data)) :- 1318 debug(gitty(replicate), 'Replicate: ~p', [Hash]), 1319 redis_db(Store, _DB, _Prefix), 1320 store_object_raw(Store, Hash, Data, New), 1321 debug(gitty(replicate), 'Received object ~p (new=~p)', [Hash, New]), 1322 ( New == true 1323 -> thread_send_message(gitty_queue, Hash) 1324 ; true 1325 ). 1326 1327redis_replicate_get(_Store, Hash) :- 1328 is_gitty_hash(Hash), 1329 redis(swish, publish(swish:gitty, discover(Hash) as prolog), Count), 1330 Count > 1, % If I'm alone it won't help :( 1331 thread_get_message(gitty_queue, Hash, 1332 [ timeout(10) 1333 ]).
This realized eager replication as opposed to the above code (redis_replicate_get/2) which performs lazy replication. Eager replication ensure the object is on multiple places in the event that the node on which it was saved dies shortly after.
Note that we also receive the object we just saved. That is unavoidable in a network where all nodes are equal.
1349publish_objects(Store, Hashes) :- 1350 redis_swish_stream('gitty:replicate', ReplKey), 1351 maplist(publish_object(Store, ReplKey), Hashes). 1352 1353publish_object(Store, Stream, Hash) :- 1354 load_object_raw(Store, Hash, Data), 1355 debug(gitty(replicate), 'Sending ~p to ~p', [Hash, Stream]), 1356 xadd(swish, Stream, _, _{hash:Hash, data:Data}).
1364replicate(Data) :- 1365 redis_db(Store, _DB, _Prefix), 1366 atom_string(Hash, Data.hash), 1367 store_object_raw(Store, Hash, Data.data, _0New), 1368 debug(gitty(replicate), 'Received object ~p (new=~p)', 1369 [Hash, _0New]). 1370 1371 1372 /******************************* 1373 * REDIS BASICS * 1374 *******************************/
1380redis_hcas(DB, Hash, Key, Old, New) :-
1381 redis(DB, eval("if redis.call('HGET', KEYS[1], ARGV[1]) == ARGV[2] then \c
1382 redis.call('HSET', KEYS[1], ARGV[1], ARGV[3]); \c
1383 return 1; \c
1384 end; \c
1385 return 0\c
1386 ",
1387 1, Hash, Key, Old, New),
1388 1)
Gitty plain files driver
This version of the driver uses plain files to store the gitty data. It consists of a nested directory structure with files named after the hash. Objects and hash computation is the same as for
git
. The heads (files) are computed on startup by scanning all objects. There is a fileref/head
that is updated if a head is updated. Other clients can watch this file and update their notion of the head. This implies that the store can handle multiple clients that can access a shared file system, optionally shared using NFS from different machines.The store is simple and robust. The main disadvantages are long startup times as the store holds more objects and relatively high disk usage due to rounding the small objects to disk allocation units.