19
19
-define (MONITOR_CHECK , 10000 ).
20
20
% 4 KiB
21
21
-define (SIZE_BLOCK , 16#1000 ).
22
- -define (IS_OLD_STATE (S ), is_pid (S # file .db_monitor )).
23
22
-define (PREFIX_SIZE , 5 ).
24
23
-define (DEFAULT_READ_COUNT , 1024 ).
25
24
-define (WRITE_XXHASH_CHECKSUMS_KEY , {? MODULE , write_xxhash_checksums }).
39
38
40
39
% public API
41
40
-export ([open /1 , open /2 , close /1 , bytes /1 , sync /1 , truncate /2 , set_db_pid /2 ]).
42
- -export ([pread_term /2 , pread_iolist / 2 , pread_binary /2 ]).
41
+ -export ([pread_term /2 , pread_binary /2 ]).
43
42
-export ([append_binary /2 ]).
44
43
-export ([append_raw_chunk /2 , assemble_file_chunk_and_checksum /1 ]).
45
44
-export ([append_term /2 , append_term /3 ]).
@@ -137,10 +136,20 @@ append_term(Fd, Term, Options) ->
137
136
% %----------------------------------------------------------------------
138
137
139
138
append_binary (Fd , Bin ) ->
140
- ioq :call (Fd , {append_bin , assemble_file_chunk (Bin )}, erlang :get (io_priority )).
139
+ case append_binaries (Fd , [Bin ]) of
140
+ {ok , [{Pos , NumBytesWritten }]} ->
141
+ {ok , Pos , NumBytesWritten };
142
+ Error ->
143
+ Error
144
+ end .
141
145
142
146
append_raw_chunk (Fd , Chunk ) ->
143
- ioq :call (Fd , {append_bin , Chunk }, erlang :get (io_priority )).
147
+ case append_raw_chunks (Fd , [Chunk ]) of
148
+ {ok , [{Pos , NumBytesWritten }]} ->
149
+ {ok , Pos , NumBytesWritten };
150
+ Error ->
151
+ Error
152
+ end .
144
153
145
154
assemble_file_chunk (Bin ) ->
146
155
[<<0 :1 /integer , (iolist_size (Bin )):31 /integer >>, Bin ].
@@ -152,60 +161,40 @@ assemble_file_chunk_and_checksum(Bin) ->
152
161
% %----------------------------------------------------------------------
153
162
% % Purpose: Reads a term from a file that was written with append_term
154
163
% % Args: Pos, the offset into the file where the term is serialized.
155
- % % Returns: {ok, Term}
156
- % % or {error, Reason}.
164
+ % % Returns: {ok, Term}, {error, Error} or throws an error
157
165
% %----------------------------------------------------------------------
158
166
159
167
pread_term (Fd , Pos ) ->
160
- {ok , Bin } = pread_binary (Fd , Pos ),
161
- {ok , couch_compress :decompress (Bin )}.
168
+ case pread_binary (Fd , Pos ) of
169
+ {ok , Bin } -> {ok , couch_compress :decompress (Bin )};
170
+ Error -> Error
171
+ end .
162
172
163
173
% %----------------------------------------------------------------------
164
- % % Purpose: Reads a binrary from a file that was written with append_binary
174
+ % % Purpose: Reads a binary from a file that was written with append_binary
165
175
% % Args: Pos, the offset into the file where the term is serialized.
166
- % % Returns: {ok, Term}
167
- % % or {error, Reason}.
176
+ % % Returns: {ok, Binary}, {error, Error} or throws an error
168
177
% %----------------------------------------------------------------------
169
178
170
179
pread_binary (Fd , Pos ) ->
171
- {ok , L } = pread_iolist (Fd , Pos ),
172
- {ok , iolist_to_binary (L )}.
173
-
174
- pread_iolist (Fd , Pos ) ->
175
- case ioq :call (Fd , {pread_iolist , Pos }, erlang :get (io_priority )) of
176
- {ok , IoList , Checksum } ->
177
- {ok , verify_checksum (Fd , Pos , IoList , Checksum , false )};
178
- Error ->
179
- Error
180
+ case pread_binaries (Fd , [Pos ]) of
181
+ {ok , [Bin ]} -> {ok , Bin };
182
+ Error -> Error
180
183
end .
181
184
182
185
pread_terms (Fd , PosList ) ->
183
- {ok , Bins } = pread_binaries (Fd , PosList ),
184
- Terms = lists :map (
185
- fun (Bin ) ->
186
- couch_compress :decompress (Bin )
187
- end ,
188
- Bins
189
- ),
190
- {ok , Terms }.
186
+ case pread_binaries (Fd , PosList ) of
187
+ {ok , Bins } -> {ok , lists :map (fun couch_compress :decompress /1 , Bins )};
188
+ Error -> Error
189
+ end .
191
190
192
191
pread_binaries (Fd , PosList ) ->
193
- {ok , Data } = pread_iolists (Fd , PosList ),
194
- {ok , lists :map (fun erlang :iolist_to_binary /1 , Data )}.
195
-
196
- pread_iolists (Fd , PosList ) ->
192
+ ZipFun = fun (Pos , {IoList , Checksum }) ->
193
+ verify_checksum (Fd , Pos , iolist_to_binary (IoList ), Checksum , false )
194
+ end ,
197
195
case ioq :call (Fd , {pread_iolists , PosList }, erlang :get (io_priority )) of
198
- {ok , DataAndChecksums } ->
199
- Data = lists :zipwith (
200
- fun (Pos , {IoList , Checksum }) ->
201
- verify_checksum (Fd , Pos , IoList , Checksum , false )
202
- end ,
203
- PosList ,
204
- DataAndChecksums
205
- ),
206
- {ok , Data };
207
- Error ->
208
- Error
196
+ {ok , DataAndChecksums } -> {ok , lists :zipwith (ZipFun , PosList , DataAndChecksums )};
197
+ Error -> Error
209
198
end .
210
199
211
200
append_terms (Fd , Terms ) ->
@@ -221,7 +210,10 @@ append_terms(Fd, Terms, Options) ->
221
210
),
222
211
append_binaries (Fd , Bins ).
223
212
224
- append_binaries (Fd , Bins ) ->
213
+ append_raw_chunks (Fd , RawChunks ) when is_list (RawChunks ) ->
214
+ ioq :call (Fd , {append_bins , RawChunks }, erlang :get (io_priority )).
215
+
216
+ append_binaries (Fd , Bins ) when is_list (Bins ) ->
225
217
WriteBins = lists :map (fun assemble_file_chunk /1 , Bins ),
226
218
ioq :call (Fd , {append_bins , WriteBins }, erlang :get (io_priority )).
227
219
@@ -500,53 +492,28 @@ terminate(_Reason, #file{fd = nil}) ->
500
492
terminate (_Reason , # file {fd = Fd }) ->
501
493
ok = file :close (Fd ).
502
494
503
- handle_call (Msg , From , File ) when ? IS_OLD_STATE (File ) ->
504
- handle_call (Msg , From , upgrade_state (File ));
505
495
handle_call (close , _From , # file {fd = Fd } = File ) ->
506
496
{stop , normal , file :close (Fd ), File # file {fd = nil }};
507
- handle_call ({pread_iolist , Pos }, _From , File ) ->
508
- update_read_timestamp (),
509
- {LenIolist , NextPos } = read_raw_iolist_int (File , Pos , 4 ),
510
- case iolist_to_binary (LenIolist ) of
511
- % an checksum-prefixed term
512
- <<1 :1 /integer , Len :31 /integer >> ->
513
- {ChecksumAndIoList , _ } = read_raw_iolist_int (File , NextPos , Len + 16 ),
514
- {Checksum , IoList } = extract_checksum (ChecksumAndIoList ),
515
- {reply , {ok , IoList , Checksum }, File };
516
- <<0 :1 /integer , Len :31 /integer >> ->
517
- {Iolist , _ } = read_raw_iolist_int (File , NextPos , Len ),
518
- {reply , {ok , Iolist , <<>>}, File }
519
- end ;
520
497
handle_call ({pread_iolists , PosL }, _From , File ) ->
521
498
update_read_timestamp (),
522
499
LocNums1 = [{Pos , 4 } || Pos <- PosL ],
523
500
DataSizes = read_multi_raw_iolists_int (File , LocNums1 ),
524
- LocNums2 = lists :map (
525
- fun ({LenIoList , NextPos }) ->
526
- case iolist_to_binary (LenIoList ) of
527
- % a checksum-prefixed term
528
- <<1 :1 /integer , Len :31 /integer >> ->
529
- {NextPos , Len + 16 };
530
- <<0 :1 /integer , Len :31 /integer >> ->
531
- {NextPos , Len }
532
- end
533
- end ,
534
- DataSizes
535
- ),
501
+ MapFun = fun ({LenIoList , NextPos }) ->
502
+ case iolist_to_binary (LenIoList ) of
503
+ % a checksum-prefixed term
504
+ <<1 :1 /integer , Len :31 /integer >> -> {NextPos , Len + 16 };
505
+ <<0 :1 /integer , Len :31 /integer >> -> {NextPos , Len }
506
+ end
507
+ end ,
508
+ LocNums2 = lists :map (MapFun , DataSizes ),
536
509
Resps = read_multi_raw_iolists_int (File , LocNums2 ),
537
- Extracted = lists :zipwith (
538
- fun ({LenIoList , _ }, {IoList , _ }) ->
539
- case iolist_to_binary (LenIoList ) of
540
- <<1 :1 /integer , _ :31 /integer >> ->
541
- {Checksum , IoList } = extract_checksum (IoList ),
542
- {IoList , Checksum };
543
- <<0 :1 /integer , _ :31 /integer >> ->
544
- {IoList , <<>>}
545
- end
546
- end ,
547
- DataSizes ,
548
- Resps
549
- ),
510
+ ZipFun = fun ({LenIoList , _ }, {FullIoList , _ }) ->
511
+ case iolist_to_binary (LenIoList ) of
512
+ <<1 :1 /integer , _ :31 /integer >> -> extract_checksum (FullIoList );
513
+ <<0 :1 /integer , _ :31 /integer >> -> {FullIoList , <<>>}
514
+ end
515
+ end ,
516
+ Extracted = lists :zipwith (ZipFun , DataSizes , Resps ),
550
517
{reply , {ok , Extracted }, File };
551
518
handle_call (bytes , _From , # file {fd = Fd } = File ) ->
552
519
{reply , file :position (Fd , eof ), File };
@@ -576,15 +543,6 @@ handle_call({truncate, Pos}, _From, #file{fd = Fd} = File) ->
576
543
Error ->
577
544
{reply , Error , File }
578
545
end ;
579
- handle_call ({append_bin , Bin }, _From , # file {fd = Fd , eof = Pos } = File ) ->
580
- Blocks = make_blocks (Pos rem ? SIZE_BLOCK , Bin ),
581
- Size = iolist_size (Blocks ),
582
- case file :write (Fd , Blocks ) of
583
- ok ->
584
- {reply , {ok , Pos , Size }, File # file {eof = Pos + Size }};
585
- Error ->
586
- {reply , Error , reset_eof (File )}
587
- end ;
588
546
handle_call ({append_bins , Bins }, _From , # file {fd = Fd , eof = Pos } = File ) ->
589
547
{BlockResps , FinalPos } = lists :mapfoldl (
590
548
fun (Bin , PosAcc ) ->
@@ -620,11 +578,9 @@ handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
620
578
handle_call (find_header , _From , # file {fd = Fd , eof = Pos } = File ) ->
621
579
{reply , find_header (Fd , Pos div ? SIZE_BLOCK ), File }.
622
580
623
- handle_cast (close , Fd ) ->
624
- {stop , normal , Fd }.
581
+ handle_cast (Msg , # file {} = File ) ->
582
+ {stop , { invalid_cast , Msg }, File }.
625
583
626
- handle_info (Msg , File ) when ? IS_OLD_STATE (File ) ->
627
- handle_info (Msg , upgrade_state (File ));
628
584
handle_info (maybe_close , File ) ->
629
585
case is_idle (File ) of
630
586
true ->
@@ -736,65 +692,39 @@ find_newest_header(Fd, [{Location, Size} | LocationSizes]) ->
736
692
find_newest_header (Fd , LocationSizes )
737
693
end .
738
694
739
- - spec read_raw_iolist_int (# file {}, Pos :: non_neg_integer (), Len :: non_neg_integer ()) ->
740
- {Data :: iolist (), CurPos :: non_neg_integer ()}.
741
- % 0110 UPGRADE CODE
742
- read_raw_iolist_int (Fd , {Pos , _Size }, Len ) ->
743
- read_raw_iolist_int (Fd , Pos , Len );
744
- read_raw_iolist_int (# file {fd = Fd } = File , Pos , Len ) ->
745
- {Pos , TotalBytes } = get_pread_locnum (File , Pos , Len ),
746
- case catch file :pread (Fd , Pos , TotalBytes ) of
747
- {ok , <<RawBin :TotalBytes /binary >>} ->
748
- {remove_block_prefixes (Pos rem ? SIZE_BLOCK , RawBin ), Pos + TotalBytes };
749
- Else ->
750
- % This clause matches when the file we are working with got truncated
751
- % outside of CouchDB after we opened it. To find affected files, we
752
- % need to log the file path.
753
- %
754
- % Technically, this should also go into read_multi_raw_iolists_int/2,
755
- % but that doesn’t seem to be in use anywhere.
756
- {_Fd , Filepath } = get (couch_file_fd ),
757
- throw ({file_truncate_error , Else , Filepath })
758
- end .
759
-
760
- % TODO: check if this is really unused
761
695
read_multi_raw_iolists_int (# file {fd = Fd } = File , PosLens ) ->
762
- LocNums = lists :map (
763
- fun ({Pos , Len }) ->
764
- get_pread_locnum (File , Pos , Len )
765
- end ,
766
- PosLens
767
- ),
696
+ MapFun = fun ({Pos , Len }) -> get_pread_locnum (File , Pos , Len ) end ,
697
+ LocNums = lists :map (MapFun , PosLens ),
768
698
{ok , Bins } = file :pread (Fd , LocNums ),
769
- lists :zipwith (
770
- fun ({Pos , TotalBytes }, Bin ) ->
771
- <<RawBin :TotalBytes /binary >> = Bin ,
772
- {remove_block_prefixes (Pos rem ? SIZE_BLOCK , RawBin ), Pos + TotalBytes }
773
- end ,
774
- LocNums ,
775
- Bins
776
- ).
699
+ ZipFun = fun ({Pos , TotalBytes }, Bin ) ->
700
+ <<RawBin :TotalBytes /binary >> = Bin ,
701
+ {remove_block_prefixes (Pos rem ? SIZE_BLOCK , RawBin ), Pos + TotalBytes }
702
+ end ,
703
+ lists :zipwith (ZipFun , LocNums , Bins ).
777
704
778
- get_pread_locnum (File , Pos , Len ) ->
705
+ get_pread_locnum (# file {} = File , Pos , Len ) ->
706
+ # file {eof = Eof , pread_limit = PreadLimit } = File ,
779
707
BlockOffset = Pos rem ? SIZE_BLOCK ,
780
708
TotalBytes = calculate_total_read_len (BlockOffset , Len ),
781
709
case Pos + TotalBytes of
782
- Size when Size > File # file . eof ->
710
+ Size when Size > Eof ->
783
711
couch_stats :increment_counter ([pread , exceed_eof ]),
784
712
{_Fd , Filepath } = get (couch_file_fd ),
785
- throw ({read_beyond_eof , Filepath });
786
- Size when Size > File # file .pread_limit ->
713
+ ErrEof = {error , {read_beyond_eof , Filepath , Pos , Eof }},
714
+ throw ({stop , ErrEof , ErrEof , File });
715
+ Size when Size > PreadLimit ->
787
716
couch_stats :increment_counter ([pread , exceed_limit ]),
788
717
{_Fd , Filepath } = get (couch_file_fd ),
789
- throw ({exceed_pread_limit , Filepath , File # file .pread_limit });
718
+ ErrPread = {error , {exceed_pread_limit , Filepath , PreadLimit }},
719
+ throw ({stop , ErrPread , ErrPread , File });
790
720
_ ->
791
721
{Pos , TotalBytes }
792
722
end .
793
723
794
- - spec extract_checksum (iolist ()) -> {binary (), iolist ()}.
724
+ - spec extract_checksum (iolist ()) -> {iolist (), binary ()}.
795
725
extract_checksum (FullIoList ) ->
796
726
{ChecksumList , IoList } = split_iolist (FullIoList , 16 , []),
797
- {iolist_to_binary (ChecksumList ), IoList }.
727
+ {IoList , iolist_to_binary (ChecksumList )}.
798
728
799
729
calculate_total_read_len (0 , FinalLen ) ->
800
730
calculate_total_read_len (1 , FinalLen ) + 1 ;
@@ -864,36 +794,36 @@ monitored_by_pids() ->
864
794
{monitored_by , PidsAndRefs } = process_info (self (), monitored_by ),
865
795
lists :filter (fun is_pid /1 , PidsAndRefs ).
866
796
867
- verify_checksum (_Fd , _Pos , IoList , <<>>, _IsHeader ) ->
868
- IoList ;
869
- verify_checksum (Fd , Pos , IoList , Checksum , IsHeader ) ->
797
+ verify_checksum (_Fd , _Pos , << Bin / binary >> , <<>>, _IsHeader ) ->
798
+ Bin ;
799
+ verify_checksum (Fd , Pos , << Bin / binary >> , Checksum , IsHeader ) ->
870
800
% If writing xxhash checksums is enabled, check those first, then check
871
801
% legacy ones. If any legacy ones are found, bump the legacy metric. If
872
802
% generating xxhash checksums is disabled, assume most checksums would be
873
803
% legacy, so check that first, and then, in a likely case of release
874
804
% downgrade, check xxhash ones.
875
805
case generate_xxhash_checksums () of
876
806
true ->
877
- case exxhash :xxhash128 (iolist_to_binary ( IoList ) ) of
807
+ case exxhash :xxhash128 (Bin ) of
878
808
Checksum ->
879
- IoList ;
809
+ Bin ;
880
810
<<_ /binary >> ->
881
- case couch_hash :md5_hash (IoList ) of
811
+ case couch_hash :md5_hash (Bin ) of
882
812
Checksum ->
883
813
legacy_checksums_stats_update (),
884
- IoList ;
814
+ Bin ;
885
815
_ ->
886
816
report_checksum_error (Fd , Pos , IsHeader )
887
817
end
888
818
end ;
889
819
false ->
890
- case couch_hash :md5_hash (IoList ) of
820
+ case couch_hash :md5_hash (Bin ) of
891
821
Checksum ->
892
- IoList ;
822
+ Bin ;
893
823
_ ->
894
- case exxhash :xxhash128 (iolist_to_binary ( IoList ) ) of
824
+ case exxhash :xxhash128 (Bin ) of
895
825
Checksum ->
896
- IoList ;
826
+ Bin ;
897
827
<<_ /binary >> ->
898
828
report_checksum_error (Fd , Pos , IsHeader )
899
829
end
@@ -932,13 +862,6 @@ process_info(Pid) ->
932
862
update_read_timestamp () ->
933
863
put (read_timestamp , os :timestamp ()).
934
864
935
- upgrade_state (# file {db_monitor = DbPid } = File ) when is_pid (DbPid ) ->
936
- unlink (DbPid ),
937
- Ref = monitor (process , DbPid ),
938
- File # file {db_monitor = Ref };
939
- upgrade_state (State ) ->
940
- State .
941
-
942
865
get_pread_limit () ->
943
866
case config :get_integer (" couchdb" , " max_pread_size" , 0 ) of
944
867
N when N > 0 -> N ;
0 commit comments