36
37:- module(gitty_driver_files,
38 [ gitty_open/2, 39 gitty_close/1, 40 gitty_file/4, 41
42 gitty_update_head/5, 43 44 delete_head/2, 45 set_head/3, 46 store_object/4, 47 delete_object/2, 48
49 gitty_hash/2, 50 load_plain_commit/3, 51 load_object/5, 52 gitty_object_file/3, 53
54 repack_objects/2, 55 pack_objects/6, 56 57 unpack_packs/1, 58 unpack_pack/2, 59
60 attach_pack/2, 61 gitty_fsck/1, 62 fsck_pack/1, 63 load_object_from_pack/4, 64
65 gitty_rescan/1 66 ]). 67:- use_module(library(apply)). 68:- use_module(library(zlib)). 69:- use_module(library(filesex)). 70:- use_module(library(lists)). 71:- use_module(library(apply)). 72:- use_module(library(error)). 73:- use_module(library(debug)). 74:- use_module(library(zlib)). 75:- use_module(library(hash_stream)). 76:- use_module(library(option)). 77:- use_module(library(dcg/basics)). 78:- use_module(library(redis)). 79:- use_module(library(redis_streams)). 80:- use_module(gitty, [is_gitty_hash/1]). 81
82:- use_module(swish_redis). 83
101
102:- dynamic
103 head/4, 104 store/2, 105 commit/3, 106 heads_input_stream_cache/2, 107 pack_object/6, 108 attached_packs/1, 109 attached_pack/2, 110 redis_db/3. 111
112:- volatile
113 head/4,
114 store/2,
115 commit/3,
116 heads_input_stream_cache/2,
117 pack_object/6,
118 attached_packs/1,
119 attached_pack/2. 120
121:- multifile
122 gitty:check_object/4. 123
127
128:- if(current_prolog_flag(windows, true)). 129remote_sync(false).
130:- else. 131remote_sync(true).
132:- endif. 133
144
145gitty_open(Store, Options) :-
146 option(redis(DB), Options),
147 !,
148 option(redis_prefix(Prefix), Options, swish),
149 asserta(redis_db(Store, DB, Prefix)),
150 thread_create(gitty_scan(Store), _, [detached(true)]).
151gitty_open(_, _).
152
153
157
158gitty_close(Store) :-
159 ( retract(heads_input_stream_cache(Store, In))
160 -> close(In)
161 ; true
162 ),
163 retractall(head(Store,_,_,_)),
164 retractall(store(Store,_)),
165 retractall(pack_object(_,_,_,_,Store,_)).
166
167
172
173gitty_file(Store, Head, Ext, Hash) :-
174 redis_db(Store, _, _),
175 !,
176 gitty_scan(Store),
177 redis_file(Store, Head, Ext, Hash).
178gitty_file(Store, Head, Ext, Hash) :-
179 gitty_scan(Store),
180 head(Store, Head, Ext, Hash).
181
190
191load_plain_commit(Store, Hash, Meta) :-
192 must_be(atom, Store),
193 must_be(atom, Hash),
194 commit(Store, Hash, Meta),
195 !.
196load_plain_commit(Store, Hash, Meta) :-
197 load_object(Store, Hash, String, _, _),
198 term_string(Meta0, String, []),
199 with_mutex(gitty_commit_cache,
200 assert_cached_commit(Store, Hash, Meta0)),
201 Meta = Meta0.
202
203assert_cached_commit(Store, Hash, Meta) :-
204 commit(Store, Hash, Meta0),
205 !,
206 assertion(Meta0 =@= Meta).
207assert_cached_commit(Store, Hash, Meta) :-
208 assertz(commit(Store, Hash, Meta)).
209
214
215store_object(Store, Hash, _Hdr, _Data) :-
216 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
217 !.
218store_object(Store, Hash, Hdr, Data) :-
219 gitty_object_file(Store, Hash, Path),
220 with_mutex(gitty_file, exists_or_create(Path, Out0)),
221 ( var(Out0)
222 -> true
223 ; setup_call_cleanup(
224 zopen(Out0, Out, [format(gzip)]),
225 format(Out, '~s~s', [Hdr, Data]),
226 close(Out))
227 ).
228
229exists_or_create(Path, _Out) :-
230 exists_file(Path),
231 !.
232exists_or_create(Path, Out) :-
233 file_directory_name(Path, Dir),
234 make_directory_path(Dir),
235 open(Path, write, Out, [encoding(utf8), lock(write)]).
236
237:- if(\+current_predicate(ensure_directory/1)). 239ensure_directory(Dir) :-
240 exists_directory(Dir),
241 !.
242ensure_directory(Dir) :-
243 make_directory(Dir).
244:- endif. 245
250
251store_object_raw(Store, Hash, _Bytes, false) :-
252 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
253 !.
254store_object_raw(Store, Hash, Bytes, New) :-
255 gitty_object_file(Store, Hash, Path),
256 with_mutex(gitty_file, exists_or_create(Path, Out)),
257 ( var(Out)
258 -> New = false
259 ; call_cleanup(
260 ( set_stream(Out, type(binary)),
261 write(Out, Bytes)
262 ),
263 close(Out)),
264 New = true
265 ).
266
270
271load_object(_Store, Hash, Data, Type, Size) :-
272 load_object_from_pack(Hash, Data0, Type0, Size0),
273 !,
274 f(Data0, Type0, Size0) = f(Data, Type, Size).
275load_object(Store, Hash, Data, Type, Size) :-
276 load_object_file(Store, Hash, Data0, Type0, Size0),
277 !,
278 f(Data0, Type0, Size0) = f(Data, Type, Size).
279load_object(Store, Hash, Data, Type, Size) :-
280 redis_db(Store, _, _),
281 redis_replicate_get(Store, Hash),
282 load_object_file(Store, Hash, Data, Type, Size).
283
284load_object_file(Store, Hash, Data, Type, Size) :-
285 gitty_object_file(Store, Hash, Path),
286 exists_file(Path),
287 !,
288 setup_call_cleanup(
289 gzopen(Path, read, In, [encoding(utf8)]),
290 read_object(In, Data, Type, Size),
291 close(In)).
292
296
297load_object_raw(_Store, Hash, Bytes) :-
298 load_object_from_pack(Hash, Data, Type, Size),
299 !,
300 object_bytes(Type, Size, Data, Bytes).
301load_object_raw(Store, Hash, Data) :-
302 gitty_object_file(Store, Hash, Path),
303 exists_file(Path),
304 !,
305 setup_call_cleanup(
306 open(Path, read, In, [type(binary)]),
307 read_string(In, _, Data),
308 close(In)).
309
313
314object_bytes(Type, Size, Data, Bytes) :-
315 setup_call_cleanup(
316 new_memory_file(MF),
317 ( setup_call_cleanup(
318 open_memory_file(MF, write, Out, [encoding(octet)]),
319 setup_call_cleanup(
320 zopen(Out, ZOut, [format(gzip), close_parent(false)]),
321 ( set_stream(ZOut, encoding(utf8)),
322 format(ZOut, '~w ~d\u0000~w', [Type, Size, Data])
323 ),
324 close(ZOut)),
325 close(Out)),
326 memory_file_to_string(MF, Bytes, octet)
327 ),
328 free_memory_file(MF)).
329
330
334
(Store, Hash, Type, Size) :-
336 gitty_object_file(Store, Hash, Path),
337 setup_call_cleanup(
338 gzopen(Path, read, In, [encoding(utf8)]),
339 read_object_hdr(In, Type, Size),
340 close(In)).
341
342read_object(In, Data, Type, Size) :-
343 read_object_hdr(In, Type, Size),
344 read_string(In, _, Data).
345
346read_object_hdr(In, Type, Size) :-
347 get_code(In, C0),
348 read_hdr(C0, In, Hdr),
349 phrase((nonblanks(TypeChars), " ", integer(Size)), Hdr),
350 atom_codes(Type, TypeChars).
351
352read_hdr(C, In, [C|T]) :-
353 C > 0,
354 !,
355 get_code(In, C1),
356 read_hdr(C1, In, T).
357read_hdr(_, _, []).
358
363
364gitty_rescan(Store) :-
365 retractall(store(Store, _)).
366
375
376gitty_scan(Store) :-
377 store(Store, _),
378 !,
379 remote_updates(Store).
380gitty_scan(Store) :-
381 with_mutex(gitty, gitty_scan_sync(Store)).
382
383:- thread_local
384 latest/3. 385
386gitty_scan_sync(Store) :-
387 store(Store, _),
388 !.
389gitty_scan_sync(Store) :-
390 redis_db(Store, _, _),
391 !,
392 gitty_attach_packs(Store),
393 redis_ensure_heads(Store),
394 get_time(Now),
395 assertz(store(Store, Now)).
396:- if(remote_sync(true)). 397gitty_scan_sync(Store) :-
398 remote_sync(true),
399 !,
400 gitty_attach_packs(Store),
401 restore_heads_from_remote(Store).
402:- endif. 403gitty_scan_sync(Store) :-
404 gitty_attach_packs(Store),
405 read_heads_from_objects(Store).
406
411
412read_heads_from_objects(Store) :-
413 gitty_scan_latest(Store),
414 forall(retract(latest(Name, Hash, _Time)),
415 assert_head(Store, Name, Hash)),
416 get_time(Now),
417 assertz(store(Store, Now)).
418
419assert_head(Store, Name, Hash) :-
420 file_name_extension(_, Ext, Name),
421 assertz(head(Store, Name, Ext, Hash)).
422
423
428
429gitty_scan_latest(Store) :-
430 retractall(head(Store, _, _, _)),
431 retractall(latest(_, _, _)),
432 ( gitty_hash(Store, Hash),
433 load_object(Store, Hash, Data, commit, _Size),
434 term_string(Meta, Data, []),
435 _{name:Name, time:Time} :< Meta,
436 ( latest(Name, _, OldTime),
437 OldTime > Time
438 -> true
439 ; retractall(latest(Name, _, _)),
440 assertz(latest(Name, Hash, Time))
441 ),
442 fail
443 ; true
444 ).
445
446
450
451gitty_hash(Store, Hash) :-
452 var(Hash),
453 !,
454 ( gitty_attach_packs(Store),
455 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack)
456 ; gitty_file_object(Store, Hash)
457 ).
458gitty_hash(Store, Hash) :-
459 ( gitty_attach_packs(Store),
460 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack)
461 -> true
462 ; gitty_object_file(Store, Hash, File),
463 exists_file(File)
464 ).
465
466gitty_file_object(Store, Hash) :-
467 access_file(Store, exist),
468 directory_files(Store, Level0),
469 member(E0, Level0),
470 E0 \== '..',
471 atom_length(E0, 2),
472 directory_file_path(Store, E0, Dir0),
473 directory_files(Dir0, Level1),
474 member(E1, Level1),
475 E1 \== '..',
476 atom_length(E1, 2),
477 directory_file_path(Dir0, E1, Dir),
478 directory_files(Dir, Files),
479 member(File, Files),
480 atom_length(File, 36),
481 atomic_list_concat([E0,E1,File], Hash).
482
486
487delete_object(Store, Hash) :-
488 gitty_object_file(Store, Hash, File),
489 delete_file(File).
490
495
496gitty_object_file(Store, Hash, Path) :-
497 sub_string(Hash, 0, 2, _, Dir0),
498 sub_string(Hash, 2, 2, _, Dir1),
499 sub_string(Hash, 4, _, 0, File),
500 atomic_list_concat([Store, Dir0, Dir1, File], /, Path).
501
502
503 506
516
517gitty_update_head(Store, Name, OldCommit, NewCommit, DataHash) :-
518 redis_db(Store, _, _),
519 !,
520 redis_update_head(Store, Name, OldCommit, NewCommit, DataHash).
521gitty_update_head(Store, Name, OldCommit, NewCommit, _) :-
522 with_mutex(gitty,
523 gitty_update_head_sync(Store, Name, OldCommit, NewCommit)).
524
525:- if(remote_sync(true)). 526gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :-
527 remote_sync(true),
528 !,
529 setup_call_cleanup(
530 heads_output_stream(Store, HeadsOut),
531 gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut),
532 close(HeadsOut)).
533:- endif. 534gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :-
535 gitty_update_head_sync2(Store, Name, OldCommit, NewCommit).
536
537gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut) :-
538 gitty_update_head_sync2(Store, Name, OldCommit, NewCommit),
539 format(HeadsOut, '~q.~n', [head(Name, OldCommit, NewCommit)]).
540
541gitty_update_head_sync2(Store, Name, OldCommit, NewCommit) :-
542 gitty_scan(Store), 543 ( OldCommit == (-)
544 -> ( head(Store, Name, _, _)
545 -> throw(error(gitty(file_exists(Name),_)))
546 ; assert_head(Store, Name, NewCommit)
547 )
548 ; ( retract(head(Store, Name, _, OldCommit))
549 -> assert_head(Store, Name, NewCommit)
550 ; throw(error(gitty(not_at_head(Name, OldCommit)), _))
551 )
552 ).
553
558
559:- dynamic
560 last_remote_sync/2. 561
562:- if(remote_sync(false)). 563remote_updates(_) :-
564 remote_sync(false),
565 !.
566:- endif. 567remote_updates(Store) :-
568 remote_up_to_data(Store),
569 !.
570remote_updates(Store) :-
571 with_mutex(gitty, remote_updates_sync(Store)).
572
573remote_updates_sync(Store) :-
574 remote_up_to_data(Store),
575 !.
576remote_updates_sync(Store) :-
577 retractall(last_remote_sync(Store, _)),
578 get_time(Now),
579 asserta(last_remote_sync(Store, Now)),
580 remote_update(Store).
581
582remote_up_to_data(Store) :-
583 last_remote_sync(Store, Last),
584 get_time(Now),
585 Now-Last < 1.
586
587remote_update(Store) :-
588 remote_updates(Store, List),
589 maplist(update_head(Store), List).
590
591update_head(Store, head(Name, OldCommit, NewCommit)) :-
592 ( OldCommit == (-)
593 -> \+ head(Store, Name, _, _)
594 ; retract(head(Store, Name, _, OldCommit))
595 ),
596 !,
597 assert_head(Store, Name, NewCommit).
598update_head(_, _).
599
606
607remote_updates(Store, List) :-
608 heads_input_stream(Store, Stream),
609 setup_call_cleanup(
610 '$push_input_context'(gitty_sync),
611 read_new_terms(Stream, List),
612 '$pop_input_context').
613
614read_new_terms(Stream, Terms) :-
615 read(Stream, First),
616 read_new_terms(First, Stream, Terms).
617
618read_new_terms(end_of_file, _, List) :-
619 !,
620 List = [].
621read_new_terms(Term, Stream, [Term|More]) :-
622 read(Stream, Term2),
623 read_new_terms(Term2, Stream, More).
624
625heads_output_stream(Store, Out) :-
626 heads_file(Store, HeadsFile),
627 open(HeadsFile, append, Out,
628 [ encoding(utf8),
629 lock(exclusive)
630 ]).
631
632heads_input_stream(Store, Stream) :-
633 heads_input_stream_cache(Store, Stream0),
634 !,
635 Stream = Stream0.
636heads_input_stream(Store, Stream) :-
637 heads_file(Store, File),
638 between(1, 2, _),
639 catch(open(File, read, In,
640 [ encoding(utf8),
641 eof_action(reset)
642 ]),
643 _,
644 create_heads_file(Store)),
645 !,
646 assert(heads_input_stream_cache(Store, In)),
647 Stream = In.
648
649create_heads_file(Store) :-
650 call_cleanup(
651 heads_output_stream(Store, Out),
652 close(Out)),
653 fail. 654
655heads_file(Store, HeadsFile) :-
656 ensure_directory(Store),
657 directory_file_path(Store, ref, RefDir),
658 ensure_directory(RefDir),
659 directory_file_path(RefDir, head, HeadsFile).
660
664
665restore_heads_from_remote(Store) :-
666 heads_file(Store, File),
667 exists_file(File),
668 setup_call_cleanup(
669 open(File, read, In, [encoding(utf8)]),
670 restore_heads(Store, In),
671 close(In)),
672 !,
673 get_time(Now),
674 assertz(store(Store, Now)).
675restore_heads_from_remote(Store) :-
676 read_heads_from_objects(Store),
677 heads_file(Store, File),
678 setup_call_cleanup(
679 open(File, write, Out, [encoding(utf8)]),
680 save_heads(Store, Out),
681 close(Out)),
682 !.
683
684restore_heads(Store, In) :-
685 read(In, Term0),
686 Term0 = epoch(_),
687 read(In, Term1),
688 restore_heads(Term1, In, Store).
689
690restore_heads(end_of_file, _, _) :- !.
691restore_heads(head(File, _, Hash), In, Store) :-
692 retractall(head(Store, File, _, _)),
693 assert_head(Store, File, Hash),
694 read(In, Term),
695 restore_heads(Term, In, Store).
696
697save_heads(Store, Out) :-
698 get_time(Now),
699 format(Out, 'epoch(~0f).~n~n', [Now]),
700 forall(head(Store, File, _, Hash),
701 format(Out, '~q.~n', [head(File, -, Hash)])).
702
703
709
710delete_head(Store, Head) :-
711 redis_db(Store, _, _),
712 !,
713 redis_delete_head(Store, Head).
714delete_head(Store, Head) :-
715 retractall(head(Store, Head, _, _)).
716
720
721set_head(Store, File, Hash) :-
722 redis_db(Store, _, _),
723 !,
724 redis_set_head(Store, File, Hash).
725set_head(Store, File, Hash) :-
726 file_name_extension(_, Ext, File),
727 ( head(Store, File, _, Hash0)
728 -> ( Hash == Hash0
729 -> true
730 ; asserta(head(Store, File, Ext, Hash)),
731 retractall(head(Store, File, _, Hash0))
732 )
733 ; asserta(head(Store, File, Ext, Hash))
734 ).
735
736
737 740
748
749pack_version(1).
750
763
764:- debug(gitty(pack)). 765
766repack_objects(Store, Options) :-
767 option(min_files(MinFiles), Options, 1_000),
768 findall(Object, gitty_file_object(Store, Object), Objects),
769 length(Objects, NewFiles),
770 debug(gitty(pack), 'Found ~D file objects', [NewFiles]),
771 ( NewFiles >= MinFiles
772 -> pack_files(Store, ExistingPacks),
773 option(small_pack(MaxSize), Options, 10_000_000),
774 include(small_file(MaxSize), ExistingPacks, PackFiles),
775 ( debugging(gitty(pack))
776 -> length(PackFiles, PackCount),
777 debug(gitty(pack), 'Found ~D small packs', [PackCount])
778 ; true
779 ),
780 directory_file_path(Store, pack, PackDir),
781 make_directory_path(PackDir),
782 pack_objects(Store, Objects, PackFiles, PackDir, _PackFile, Options)
783 ; debug(gitty(pack), 'Nothing to do', [])
784 ).
785
786small_file(MaxSize, File) :-
787 size_file(File, Size),
788 Size < MaxSize.
789
794
795pack_objects(Store, Objects, Packs, PackDir, PackFile, Options) :-
796 with_mutex(gitty_pack,
797 pack_objects_sync(Store, Objects, Packs, PackDir,
798 PackFile, Options)).
799
800pack_objects_sync(_Store, [], [Pack], _, [Pack], _) :-
801 !.
802pack_objects_sync(Store, Objects, Packs, PackDir, PackFilePath, Options) :-
803 length(Objects, ObjCount),
804 length(Packs, PackCount),
805 debug(gitty(pack), 'Repacking ~D objects and ~D packs',
806 [ObjCount, PackCount]),
807 maplist(object_info(Store), Objects, FileInfo),
808 maplist(pack_info(Store), Packs, PackInfo),
809 append([FileInfo|PackInfo], Info0),
810 sort(1, @<, Info0, Info), 811 ( debugging(gitty(pack))
812 -> ( PackCount > 0
813 -> length(Info, FinalObjCount),
814 debug(gitty(pack), 'Total ~D objects', [FinalObjCount])
815 ; true
816 )
817 ; true
818 ),
819 directory_file_path(PackDir, 'pack-create', TmpPack),
820 setup_call_cleanup(
821 ( open(TmpPack, write, Out0, [type(binary), lock(write)]),
822 open_hash_stream(Out0, Out, [algorithm(sha1)])
823 ),
824 ( write_signature(Out),
825 maplist(write_header(Out), Info),
826 format(Out, 'end_of_header.~n', []),
827 maplist(add_file(Out, Store), Info),
828 stream_hash(Out, SHA1)
829 ),
830 close(Out)),
831 format(atom(PackFile), 'pack-~w.pack', [SHA1]),
832 directory_file_path(PackDir, PackFile, PackFilePath),
833 rename_file(TmpPack, PackFilePath),
834 debug(gitty(pack), 'Attaching ~p', [PackFilePath]),
835 attach_pack(Store, PackFilePath),
836 remove_objects_after_pack(Store, Objects, Options),
837 delete(Packs, PackFilePath, RmPacks),
838 remove_repacked_packs(Store, RmPacks, Options),
839 debug(gitty(pack), 'Packing completed', []).
840
841object_info(Store, Object, obj(Object, Type, Size, FileSize)) :-
842 gitty_object_file(Store, Object, File),
843 load_object_header(Store, Object, Type, Size),
844 size_file(File, FileSize).
845
846pack_info(Store, PackFile, Objects) :-
847 attach_pack(Store, PackFile),
848 pack_read_header(PackFile, _Version, _DataOffset, Objects).
849
850write_signature(Out) :-
851 pack_version(Version),
852 format(Out, "gitty(~d).~n", [Version]).
853
(Out, obj(Object, Type, Size, FileSize)) :-
855 format(Out, 'obj(~q,~q,~d,~d).~n', [Object, Type, Size, FileSize]).
856
860
861add_file(Out, Store, obj(Object, _Type, _Size, _FileSize)) :-
862 gitty_object_file(Store, Object, File),
863 exists_file(File),
864 !,
865 setup_call_cleanup(
866 open(File, read, In, [type(binary)]),
867 copy_stream_data(In, Out),
868 close(In)).
869add_file(Out, Store, obj(Object, Type, Size, FileSize)) :-
870 pack_object(Object, Type, Size, Offset, Store, PackFile),
871 setup_call_cleanup(
872 open(PackFile, read, In, [type(binary)]),
873 ( seek(In, Offset, bof, Offset),
874 copy_stream_data(In, Out, FileSize)
875 ),
876 close(In)).
877
878
882
883gitty_fsck(Store) :-
884 pack_files(Store, PackFiles),
885 maplist(fsck_pack, PackFiles).
886
890
891fsck_pack(File) :-
892 debug(gitty(pack), 'fsck ~p', [File]),
893 check_pack_hash(File),
894 debug(gitty(pack), 'fsck ~p: checking objects', [File]),
895 check_pack_objects(File),
896 debug(gitty(pack), 'fsck ~p: done', [File]).
897
898check_pack_hash(File) :-
899 file_base_name(File, Base),
900 file_name_extension(Plain, Ext, Base),
901 must_be(oneof([pack]), Ext),
902 atom_concat('pack-', Hash, Plain),
903 setup_call_cleanup(
904 ( open(File, read, In0, [type(binary)]),
905 open_hash_stream(In0, In, [algorithm(sha1)])
906 ),
907 ( setup_call_cleanup(
908 open_null_stream(Null),
909 copy_stream_data(In, Null),
910 close(Null)),
911 stream_hash(In, SHA1)
912 ),
913 close(In)),
914 assertion(Hash == SHA1).
915
916check_pack_objects(PackFile) :-
917 setup_call_cleanup(
918 open(PackFile, read, In, [type(binary)]),
919 ( read_header(In, Version, DataOffset, Objects),
920 set_stream(In, encoding(utf8)),
921 foldl(check_object(In, PackFile, Version), Objects, DataOffset, _)
922 ),
923 close(In)).
924
925check_object(In, PackFile, _Version,
926 obj(Object, Type, Size, FileSize),
927 Offset0, Offset) :-
928 Offset is Offset0+FileSize,
929 byte_count(In, Here),
930 ( Here == Offset0
931 -> true
932 ; print_message(warning, pack(reposition(Here, Offset0))),
933 seek(In, Offset0, bof, Offset0)
934 ),
935 ( setup_call_cleanup(
936 zopen(In, In2, [multi_part(false), close_parent(false)]),
937 catch(read_object(In2, Data, _0RType, _0RSize), E,
938 ( print_message(error,
939 gitty(PackFile, fsck(read_object(Object, E)))),
940 fail)),
941 close(In2))
942 -> byte_count(In, End),
943 ( End == Offset
944 -> true
945 ; print_message(error,
946 gitty(PackFile, fsck(object_end(Object, End,
947 Offset0, Offset,
948 Data))))
949 ),
950 assertion(Type == _0RType),
951 assertion(Size == _0RSize),
952 gitty:check_object(Object, Data, Type, Size)
953 ; true
954 ).
955
956
960
961gitty_attach_packs(Store) :-
962 attached_packs(Store),
963 !.
964gitty_attach_packs(Store) :-
965 with_mutex(gitty_attach_packs,
966 gitty_attach_packs_sync(Store)).
967
968gitty_attach_packs_sync(Store) :-
969 attached_packs(Store),
970 !.
971gitty_attach_packs_sync(Store) :-
972 pack_files(Store, PackFiles),
973 maplist(attach_pack(Store), PackFiles),
974 asserta(attached_packs(Store)).
975
976pack_files(Store, Packs) :-
977 directory_file_path(Store, pack, PackDir),
978 exists_directory(PackDir),
979 !,
980 directory_files(PackDir, Files),
981 convlist(is_pack(PackDir), Files, Packs).
982pack_files(_, []).
983
984is_pack(PackDir, File, Path) :-
985 file_name_extension(_, pack, File),
986 directory_file_path(PackDir, File, Path).
987
991
992attach_pack(Store, PackFile) :-
993 attached_pack(Store, PackFile),
994 !.
995attach_pack(Store, PackFile) :-
996 retractall(pack_object(_,_,_,_,_,PackFile)),
997 pack_read_header(PackFile, Version, DataOffset, Objects),
998 foldl(assert_object(Store, PackFile, Version), Objects, DataOffset, _),
999 assertz(attached_pack(Store, PackFile)).
1000
(PackFile, Version, DataOffset, Objects) :-
1002 setup_call_cleanup(
1003 open(PackFile, read, In, [type(binary)]),
1004 read_header(In, Version, DataOffset, Objects),
1005 close(In)).
1006
(In, Version, DataOffset, Objects) :-
1008 read(In, Signature),
1009 ( Signature = gitty(Version)
1010 -> true
1011 ; domain_error(gitty_pack_file, Objects)
1012 ),
1013 read(In, Term),
1014 read_index(Term, In, Objects),
1015 get_code(In, Code),
1016 assertion(Code == 0'\n),
1017 byte_count(In, DataOffset).
1018
1019read_index(end_of_header, _, []) :-
1020 !.
1021read_index(Object, In, [Object|T]) :-
1022 read(In, Term2),
1023 read_index(Term2, In, T).
1024
1025assert_object(Store, Pack, _Version,
1026 obj(Object, Type, Size, FileSize),
1027 Offset0, Offset) :-
1028 Offset is Offset0+FileSize,
1029 assertz(pack_object(Object, Type, Size, Offset0, Store, Pack)).
1030
1034
1035detach_pack(Store, Pack) :-
1036 retractall(pack_object(_, _, _, _, Store, Pack)),
1037 retractall(attached_pack(Store, Pack)).
1038
1042
1043load_object_from_pack(Hash, Data, Type, Size) :-
1044 pack_object(Hash, Type, Size, Offset, _Store, Pack),
1045 setup_call_cleanup(
1046 open(Pack, read, In, [type(binary)]),
1047 read_object_at(In, Offset, Data, Type, Size),
1048 close(In)).
1049
1050read_object_at(In, Offset, Data, Type, Size) :-
1051 seek(In, Offset, bof, Offset),
1052 read_object_here(In, Data, Type, Size).
1053
1054read_object_here(In, Data, Type, Size) :-
1055 stream_property(In, encoding(Enc)),
1056 setup_call_cleanup(
1057 ( set_stream(In, encoding(utf8)),
1058 zopen(In, In2, [multi_part(false), close_parent(false)])
1059 ),
1060 read_object(In2, Data, Type, Size),
1061 ( close(In2),
1062 set_stream(In, encoding(Enc))
1063 )).
1064
1065
1069
1070unpack_packs(Store) :-
1071 absolute_file_name(Store, AbsStore, [file_type(directory),
1072 access(read)]),
1073 pack_files(AbsStore, Packs),
1074 maplist(unpack_pack(AbsStore), Packs).
1075
1079
1080unpack_pack(Store, PackFile) :-
1081 pack_read_header(PackFile, _Version, DataOffset, Objects),
1082 setup_call_cleanup(
1083 open(PackFile, read, In, [type(binary)]),
1084 foldl(create_file(Store, In), Objects, DataOffset, _),
1085 close(In)),
1086 detach_pack(Store, PackFile),
1087 delete_file(PackFile).
1088
1089create_file(Store, In, obj(Object, _Type, _Size, FileSize), Offset0, Offset) :-
1090 Offset is Offset0+FileSize,
1091 gitty_object_file(Store, Object, Path),
1092 with_mutex(gitty_file, exists_or_recreate(Path, Out)),
1093 ( var(Out)
1094 -> true
1095 ; setup_call_cleanup(
1096 seek(In, Offset0, bof, Offset0),
1097 copy_stream_data(In, Out, FileSize),
1098 close(Out))
1099 ).
1100
1101exists_or_recreate(Path, _Out) :-
1102 exists_file(Path),
1103 !.
1104exists_or_recreate(Path, Out) :-
1105 file_directory_name(Path, Dir),
1106 make_directory_path(Dir),
1107 open(Path, write, Out, [type(binary), lock(write)]).
1108
1109
1113
1114remove_objects_after_pack(Store, Objects, Options) :-
1115 debug(gitty(pack), 'Deleting plain files', []),
1116 maplist(delete_object(Store), Objects),
1117 ( option(prune_empty_directories(true), Options, true)
1118 -> debug(gitty(pack), 'Pruning empty directories', []),
1119 prune_empty_directories(Store)
1120 ; true
1121 ).
1122
1126
1127remove_repacked_packs(Store, Packs, Options) :-
1128 maplist(remove_pack(Store, Options), Packs).
1129
1130remove_pack(Store, _Options, Pack) :-
1131 detach_pack(Store, Pack),
1132 delete_file(Pack).
1133
1138
1139prune_empty_directories(Dir) :-
1140 prune_empty_directories(Dir, 0).
1141
1142prune_empty_directories(Dir, Level) :-
1143 directory_files(Dir, AllFiles),
1144 exclude(hidden, AllFiles, Files),
1145 ( Files == [],
1146 Level > 0
1147 -> delete_directory_async(Dir)
1148 ; convlist(prune_empty_directories(Dir, Level), Files, Left),
1149 ( Left == [],
1150 Level > 0
1151 -> delete_directory_async(Dir)
1152 ; true
1153 )
1154 ).
1155
1156hidden(.).
1157hidden(..).
1158
1159prune_empty_directories(Parent, Level0, File, _) :-
1160 directory_file_path(Parent, File, Path),
1161 exists_directory(Path),
1162 !,
1163 Level is Level0 + 1,
1164 prune_empty_directories(Path, Level),
1165 fail.
1166prune_empty_directories(_, _, File, File).
1167
1168delete_directory_async(Dir) :-
1169 with_mutex(gitty_file, delete_directory_async2(Dir)).
1170
1171delete_directory_async2(Dir) :-
1172 catch(delete_directory(Dir), E,
1173 ( \+ exists_directory(Dir)
1174 -> true
1175 ; \+ empty_directory(Dir)
1176 -> true
1177 ; throw(E)
1178 )).
1179
1180empty_directory(Dir) :-
1181 directory_files(Dir, AllFiles),
1182 exclude(hidden, AllFiles, []).
1183
1184
1185 1188
1189redis_head_db(Store, DB, Key) :-
1190 redis_db(Store, DB, Prefix),
1191 string_concat(Prefix, ":gitty:head", Key).
1192
1194
1195redis_file(Store, Name, Ext, Hash) :-
1196 nonvar(Name),
1197 !,
1198 file_name_extension(_Base, Ext, Name),
1199 redis_head_db(Store, DB, Heads),
1200 redis(DB, hget(Heads, Name), Hash as atom).
1201redis_file(Store, Name, Ext, Hash) :-
1202 nonvar(Ext),
1203 !,
1204 string_concat("*.", Ext, Pattern),
1205 redis_head_db(Store, DB, Heads),
1206 redis_hscan(DB, Heads, LazyList, [match(Pattern)]),
1207 member(NameS-HashS, LazyList),
1208 atom_string(Name, NameS),
1209 atom_string(Hash, HashS).
1210redis_file(Store, Name, Ext, Hash) :-
1211 nonvar(Hash),
1212 !,
1213 load_plain_commit(Store, Hash, Commit),
1214 Name = Commit.name,
1215 file_name_extension(_Base, Ext, Name).
1216redis_file(Store, Name, Ext, Hash) :-
1217 redis_head_db(Store, DB, Heads),
1218 redis(DB, hgetall(Heads), Pairs as pairs(atom,atom)),
1219 member(Name-Hash, Pairs),
1220 file_name_extension(_Base, Ext, Name).
1221
1226
1227redis_ensure_heads(Store) :-
1228 redis_head_db(Store, DB, Key),
1229 redis(DB, exists(Key), 1),
1230 !.
1231redis_ensure_heads(Store) :-
1232 redis_head_db(Store, DB, Key),
1233 debug(gitty(redis), 'Initializing gitty heads in ~p ...', [Key]),
1234 gitty_scan_latest(Store),
1235 forall(retract(latest(Name, Hash, _Time)),
1236 redis(DB, hset(Key, Name, Hash))),
1237 debug(gitty(redis), '... finished gitty heads', []).
1238
1240
1241redis_update_head(Store, Name, -, NewCommit, DataHash) :-
1242 !,
1243 redis_head_db(Store, DB, Key),
1244 redis(DB, hset(Key, Name, NewCommit)),
1245 publish_objects(Store, [NewCommit, DataHash]).
1246redis_update_head(Store, Name, OldCommit, NewCommit, DataHash) :-
1247 redis_head_db(Store, DB, Key),
1248 redis_hcas(DB, Key, Name, OldCommit, NewCommit),
1249 publish_objects(Store, [NewCommit, DataHash]).
1250
1254
1255redis_delete_head(Store, Head) :-
1256 redis_head_db(Store, DB, Key),
1257 redis(DB, hdel(Key, Head)).
1258
1260
1261redis_set_head(Store, File, Hash) :-
1262 redis_head_db(Store, DB, Key),
1263 redis(DB, hset(Key, File, Hash)).
1264
1265 1268
1292
1293:- multifile
1294 swish_redis:stream/2. 1295
1296swish_redis:stream('gitty:replicate', [maxlen(100)]).
1297
1298:- listen(http(pre_server_start(_)),
1299 init_replicator). 1300
1301init_replicator :-
1302 redis_swish_stream('gitty:replicate', ReplKey),
1303 listen(redis(_Redis, ReplKey, _Id, Data),
1304 replicate(Data)),
1305 listen(redis(_, 'swish:gitty', Message),
1306 gitty_message(Message)),
1307 message_queue_create(_, [alias(gitty_queue)]).
1308
1309:- debug(gitty(replicate)). 1310
1311gitty_message(discover(Hash)) :-
1312 debug(gitty(replicate), 'Discover: ~p', [Hash]),
1313 store(Store, _),
1314 load_object_raw(Store, Hash, Data),
1315 debug(gitty(replicate), 'Sending object ~p', [Hash]),
1316 redis(swish, publish(swish:gitty, object(Hash, Data) as prolog)).
1317gitty_message(object(Hash, Data)) :-
1318 debug(gitty(replicate), 'Replicate: ~p', [Hash]),
1319 redis_db(Store, _DB, _Prefix),
1320 store_object_raw(Store, Hash, Data, New),
1321 debug(gitty(replicate), 'Received object ~p (new=~p)', [Hash, New]),
1322 ( New == true
1323 -> thread_send_message(gitty_queue, Hash)
1324 ; true
1325 ).
1326
1327redis_replicate_get(_Store, Hash) :-
1328 is_gitty_hash(Hash),
1329 redis(swish, publish(swish:gitty, discover(Hash) as prolog), Count),
1330 Count > 1, 1331 thread_get_message(gitty_queue, Hash,
1332 [ timeout(10)
1333 ]).
1334
1335
1348
1349publish_objects(Store, Hashes) :-
1350 redis_swish_stream('gitty:replicate', ReplKey),
1351 maplist(publish_object(Store, ReplKey), Hashes).
1352
1353publish_object(Store, Stream, Hash) :-
1354 load_object_raw(Store, Hash, Data),
1355 debug(gitty(replicate), 'Sending ~p to ~p', [Hash, Stream]),
1356 xadd(swish, Stream, _, _{hash:Hash, data:Data}).
1357
1363
1364replicate(Data) :-
1365 redis_db(Store, _DB, _Prefix),
1366 atom_string(Hash, Data.hash),
1367 store_object_raw(Store, Hash, Data.data, _0New),
1368 debug(gitty(replicate), 'Received object ~p (new=~p)',
1369 [Hash, _0New]).
1370
1371
1372 1375
1379
1380redis_hcas(DB, Hash, Key, Old, New) :-
1381 redis(DB, eval("if redis.call('HGET', KEYS[1], ARGV[1]) == ARGV[2] then \c
1382 redis.call('HSET', KEYS[1], ARGV[1], ARGV[3]); \c
1383 return 1; \c
1384 end; \c
1385 return 0\c
1386 ",
1387 1, Hash, Key, Old, New),
1388 1)