@@ -32,8 +32,8 @@ defmodule Mix.Sync.Lock do
32
32
# successfully link to lock_N. At that point we can clean up all
33
33
# the files, so we perform these steps:
34
34
#
35
- # * move our port_P to lock_0
36
- # * remove all the other port_P files
35
+ # * replace lock_0 content with port P
36
+ # * remove all port_P files
37
37
# * remove all lock_1+ files
38
38
#
39
39
# It is important to perform these steps in this order, to avoid
@@ -44,14 +44,14 @@ defmodule Mix.Sync.Lock do
44
44
# the port_P file will no longer exist (once lock_N is removed).
45
45
#
46
46
# Finally, note that we do not remove the lock file in `unlock/1`.
47
- # If we did that, another process could try to connect and fail
48
- # because the file would not exist, in such case the process would
49
- # assume the file is stale and needs to be replaced, therefore
50
- # possibly replacing another process who successfully links at the
51
- # empty spot. This means we effectively always leave a stale file,
52
- # however, in order to shortcut the port check for future processes,
53
- # we atomically replace the file content with port 0, to indicate
54
- # the file is stale.
47
+ # If we did that, another process could read it before removal,
48
+ # then it may connect and fail (once the socket is closed), in such
49
+ # case the process would assume the file is stale and needs to be
50
+ # replaced, therefore possibly replacing another process who
51
+ # successfully links at the empty spot. This means we effectively
52
+ # always leave a stale file, however, in order to shortcut the port
53
+ # check for future processes, we atomically replace the file content
54
+ # with port 0, to indicate the file is stale.
55
55
#
56
56
# The main caveat of using ephemeral TCP ports is that they are not
57
57
# unique. This creates a theoretical scenario where the lock holder
@@ -72,6 +72,7 @@ defmodule Mix.Sync.Lock do
72
72
@ probe_data "mixlock"
73
73
@ probe_data_size byte_size ( @ probe_data )
74
74
@ probe_timeout_ms 5_000
75
+ @ version 2
75
76
76
77
@ typedoc """
77
78
Options for `with_lock/3`.
@@ -132,7 +133,11 @@ defmodule Mix.Sync.Lock do
132
133
defp base_path do
133
134
# We include user in the dir to avoid permission conflicts across users
134
135
user = System . get_env ( "USER" , "default" )
135
- Path . join ( System . tmp_dir! ( ) , "mix_lock_#{ Base . url_encode64 ( user , padding: false ) } " )
136
+
137
+ Path . join (
138
+ System . tmp_dir! ( ) ,
139
+ "mix_lock_#{ @ version } _#{ Base . url_encode64 ( user , padding: false ) } "
140
+ )
136
141
end
137
142
138
143
defp lock_disabled? ( ) , do: System . get_env ( "MIX_OS_CONCURRENCY_LOCK" ) in ~w( 0 false)
@@ -177,7 +182,7 @@ defmodule Mix.Sync.Lock do
177
182
port_path = Path . join ( path , "port_#{ port } " )
178
183
os_pid = System . pid ( )
179
184
180
- File . write !( port_path , << port :: unsigned - integer - 32 , os_pid :: binary >> , [ :raw ] )
185
+ switch_file_create !( port_path , encode_lock_info ( port , os_pid ) )
181
186
182
187
case grab_lock ( path , port_path , 0 ) do
183
188
{ :ok , 0 } ->
@@ -186,7 +191,7 @@ defmodule Mix.Sync.Lock do
186
191
187
192
{ :ok , _n } ->
188
193
# We grabbed lock_1+, so we need to replace lock_0 and clean up
189
- take_over ( path , port_path )
194
+ take_over ( path , port , os_pid )
190
195
% { socket: socket , path: path }
191
196
192
197
{ :taken , probe_socket , os_pid } ->
@@ -253,10 +258,15 @@ defmodule Mix.Sync.Lock do
253
258
end
254
259
255
260
defp fetch_probe_port ( port_path ) do
256
- case File . read ( port_path ) do
257
- { :ok , << 0 :: unsigned - integer - 32 >> } -> { :error , :ignore }
258
- { :ok , << port :: unsigned - integer - 32 , os_pid :: binary >> } -> { :ok , port , os_pid }
259
- { :error , reason } -> { :error , reason }
261
+ case switch_file_read ( port_path ) do
262
+ { :ok , data } ->
263
+ case decode_lock_info ( data ) do
264
+ { 0 , _os_pid } -> { :error , :ignore }
265
+ { port , os_pid } -> { :ok , port , os_pid }
266
+ end
267
+
268
+ { :error , reason } ->
269
+ { :error , reason }
260
270
end
261
271
end
262
272
@@ -305,15 +315,14 @@ defmodule Mix.Sync.Lock do
305
315
end
306
316
end
307
317
308
- defp take_over ( path , port_path ) do
318
+ defp take_over ( path , port , os_pid ) do
309
319
# The operations here must happen in precise order, so if anything
310
320
# fails, we keep the files as is and the next process that grabs
311
321
# the lock will do the cleanup
312
322
313
323
lock_path = Path . join ( path , "lock_0" )
314
324
315
- # We linked to lock_N successfully, so port_path should exist
316
- File . rename! ( port_path , lock_path )
325
+ switch_file_replace! ( lock_path , encode_lock_info ( port , os_pid ) )
317
326
318
327
names = File . ls! ( path )
319
328
@@ -342,14 +351,133 @@ defmodule Mix.Sync.Lock do
342
351
end
343
352
344
353
defp unlock ( lock ) do
345
- port_path = Path . join ( lock . path , "port_0" )
346
354
lock_path = Path . join ( lock . path , "lock_0" )
347
355
348
- File . write! ( port_path , << 0 :: unsigned - integer - 32 >> , [ :raw ] )
349
- File . rename! ( port_path , lock_path )
356
+ switch_file_replace! ( lock_path , encode_lock_info ( 0 , "" ) )
350
357
after
351
358
# Closing the socket will cause the accepting process to finish
352
359
# and all accepted sockets (tied to that process) will get closed
353
360
:gen_tcp . close ( lock . socket )
354
361
end
362
+
363
+ defp encode_lock_info ( port , os_pid ) do
364
+ os_pid_size = byte_size ( os_pid )
365
+
366
+ if os_pid_size > 32 do
367
+ Mix . raise ( "unexpectedly long PID: #{ inspect ( os_pid ) } " )
368
+ end
369
+
370
+ # The info needs to have fixed size, so we pad os_pid to maximum
371
+ # of 32 bytes (we expect it to be a few bytes).
372
+ padding_size = 32 - os_pid_size
373
+ padding = :binary . copy ( << 0 >> , padding_size )
374
+
375
+ <<
376
+ port :: unsigned - integer - 32 ,
377
+ padding_size :: unsigned - integer - 8 ,
378
+ padding :: binary ,
379
+ os_pid :: binary
380
+ >>
381
+ end
382
+
383
+ defp decode_lock_info ( data ) do
384
+ <<
385
+ port :: unsigned - integer - 32 ,
386
+ padding_size :: unsigned - integer - 8 ,
387
+ _padding :: binary - size ( padding_size ) ,
388
+ os_pid :: binary
389
+ >> = data
390
+
391
+ { port , os_pid }
392
+ end
393
+
394
+ # We need a mechanism to atomically replace file content. Typically,
395
+ # we could use File.rename/2 to do that, however File.rename/2 is
396
+ # not atomic on Windows, if the destination exists [1].
397
+ #
398
+ # As an alternative approach we use a switch-file. The file content
399
+ # consists of 1 switch byte (either 0 or 1) and two content segments
400
+ # with fixed, equal lengths. The switch byte indicates which segment
401
+ # is currently active. To replace the file content, we write to the
402
+ # non-active segment and call :file.sync/1 to ensure the segment is
403
+ # persisted, then we toggle the switch byte. While we cannot write
404
+ # multiple bytes atomically (since they may reside in multiple disk
405
+ # sectors), if we toggle only a single byte, there is no intermediate
406
+ # invalid state, which gives us the atomic replace we need.
407
+ #
408
+ # Note that file content can be replaced only by a single process
409
+ # at a time.
410
+ #
411
+ # [1]: https://github.com/elixir-lang/elixir/pull/14793#issuecomment-3338665065
412
+ defp switch_file_create! ( path , content ) do
413
+ data = << 0 , content :: binary , content :: binary >>
414
+ File . write! ( path , data , [ :raw ] )
415
+ end
416
+
417
+ defp switch_file_replace! ( path , new_content ) do
418
+ file = File . open! ( path , [ :read , :write , :binary , :raw ] )
419
+
420
+ content_size = byte_size ( new_content )
421
+
422
+ << switch_byte >> = read_bytes! ( file , 1 )
423
+
424
+ try do
425
+ inactive_content_position =
426
+ case switch_byte do
427
+ 0 -> 1 + content_size
428
+ 1 -> 1
429
+ end
430
+
431
+ # Write new data
432
+ file_pwrite_sync! ( file , inactive_content_position , new_content )
433
+
434
+ # Toggle switch byte - it's a single byte so the content changes
435
+ # atomically
436
+ file_pwrite_sync! ( file , 0 , << 1 - switch_byte >> )
437
+ after
438
+ File . close ( file )
439
+ end
440
+ end
441
+
442
+ defp switch_file_read ( path ) do
443
+ with { :ok , data } <- File . read ( path ) do
444
+ << switch_byte , rest :: binary >> = data
445
+ content_size = rest |> byte_size ( ) |> div ( 2 )
446
+ << content1 :: binary - size ( ^ content_size ) , content2 :: binary - size ( ^ content_size ) >> = rest
447
+
448
+ case switch_byte do
449
+ 0 -> { :ok , content1 }
450
+ 1 -> { :ok , content2 }
451
+ end
452
+ end
453
+ end
454
+
455
+ defp read_bytes! ( file , bytes ) do
456
+ case :file . read ( file , bytes ) do
457
+ { :ok , data } ->
458
+ data
459
+
460
+ :eof ->
461
+ raise "unexpected EOF of file when reading file"
462
+
463
+ { :error , reason } ->
464
+ raise File.Error , reason: reason , action: "read file"
465
+ end
466
+ end
467
+
468
+ defp file_pwrite_sync! ( file , position , bytes ) do
469
+ case :file . pwrite ( file , position , bytes ) do
470
+ :ok ->
471
+ case :file . sync ( file ) do
472
+ :ok ->
473
+ :ok
474
+
475
+ { :error , reason } ->
476
+ raise File.Error , reason: reason , action: "sync file"
477
+ end
478
+
479
+ { :error , { _n , reason } } ->
480
+ raise File.Error , reason: reason , action: "write to file at position"
481
+ end
482
+ end
355
483
end
0 commit comments