diff --git a/Project.toml b/Project.toml index a5a3112e6..e10d64ac1 100644 --- a/Project.toml +++ b/Project.toml @@ -5,15 +5,20 @@ version = "1.13.1" [deps] ArnoldiMethod = "ec485272-7323-5ecc-a04f-4719b315124d" DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" -Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Inflate = "d25df0c9-e2be-5dd7-82c8-3ad0b3e990b9" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" -SharedArrays = "1a1011a3-84de-559e-8e89-a11a2f7dc383" SimpleTraits = "699a6c99-e7fa-54fc-8d76-47d257e15c1d" SparseArrays = "2f01184e-e22b-5df5-ae63-d93ebab69eaf" Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" +[weakdeps] +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" +SharedArrays = "1a1011a3-84de-559e-8e89-a11a2f7dc383" + +[extensions] +GraphsSharedArraysExt = "SharedArrays" + [compat] ArnoldiMethod = "0.4" Distributed = "1" @@ -25,4 +30,4 @@ SharedArrays = "1" SimpleTraits = "0.9.1" SparseArrays = "1" Statistics = "1" -julia = "1.10" \ No newline at end of file +julia = "1.10" diff --git a/ext/GraphsSharedArraysExt.jl b/ext/GraphsSharedArraysExt.jl new file mode 100644 index 000000000..6526cace0 --- /dev/null +++ b/ext/GraphsSharedArraysExt.jl @@ -0,0 +1,170 @@ +module GraphsSharedArraysExt + +using Graphs +using SharedArrays: SharedArrays, SharedMatrix, SharedVector, sdata +using SharedArrays.Distributed: @distributed +using Random: shuffle + +# betweenness +function Graphs.Parallel.distr_betweenness_centrality( + g::AbstractGraph, + vs=vertices(g), + distmx::AbstractMatrix=weights(g); + normalize=true, + endpoints=false, +)::Vector{Float64} + n_v = nv(g) + k = length(vs) + isdir = is_directed(g) + + # Parallel reduction + + betweenness = @distributed (+) for s in vs + temp_betweenness = zeros(n_v) + if degree(g, s) > 0 # this might be 1? + state = Graphs.dijkstra_shortest_paths( + g, s, distmx; allpaths=true, trackvertices=true + ) + if endpoints + Graphs._accumulate_endpoints!(temp_betweenness, state, g, s) + else + Graphs._accumulate_basic!(temp_betweenness, state, g, s) + end + end + temp_betweenness + end + + Graphs._rescale!(betweenness, n_v, normalize, isdir, k) + + return betweenness +end + +# closeness +function Graphs.Parallel.distr_closeness_centrality( + g::AbstractGraph, distmx::AbstractMatrix=weights(g); normalize=true +)::Vector{Float64} + n_v = Int(nv(g)) + closeness = SharedVector{Float64}(n_v) + fill!(closeness, 0.0) + + @sync @distributed for u in vertices(g) + if degree(g, u) == 0 # no need to do Dijkstra here + closeness[u] = 0.0 + else + d = Graphs.dijkstra_shortest_paths(g, u, distmx).dists + δ = filter(x -> x != typemax(x), d) + σ = sum(δ) + l = length(δ) - 1 + if σ > 0 + closeness[u] = l / σ + if normalize + n = l * 1.0 / (n_v - 1) + closeness[u] *= n + end + else + closeness[u] = 0.0 + end + end + end + return sdata(closeness) +end + +# radiality +function Graphs.Parallel.distr_radiality_centrality(g::AbstractGraph)::Vector{Float64} + n_v = nv(g) + vs = vertices(g) + n = ne(g) + meandists = SharedVector{Float64}(Int(n_v)) + maxdists = SharedVector{Float64}(Int(n_v)) + + @sync @distributed for i in 1:n_v + d = Graphs.dijkstra_shortest_paths(g, vs[i]) + maxdists[i] = maximum(d.dists) + meandists[i] = sum(d.dists) / (n_v - 1) + nothing + end + dmtr = maximum(maxdists) + radialities = collect(meandists) + return ((dmtr + 1) .- radialities) ./ dmtr +end + +# stress +function Graphs.Parallel.distr_stress_centrality( + g::AbstractGraph, vs=vertices(g) +)::Vector{Int64} + n_v = nv(g) + k = length(vs) + isdir = is_directed(g) + + # Parallel reduction + stress = @distributed (+) for s in vs + temp_stress = zeros(Int64, n_v) + if degree(g, s) > 0 # this might be 1? + state = Graphs.dijkstra_shortest_paths(g, s; allpaths=true, trackvertices=true) + Graphs._stress_accumulate_basic!(temp_stress, state, g, s) + end + temp_stress + end + return stress +end + +# generate_reduce +function Graphs.Parallel.distr_generate_reduce( + g::AbstractGraph{T}, gen_func::Function, comp::Comp, reps::Integer +) where {T<:Integer,Comp} + # Type assert required for type stability + min_set::Vector{T} = @distributed ((x, y) -> comp(x, y) ? x : y) for _ in 1:reps + gen_func(g) + end + return min_set +end + +# eccentricity +function Graphs.Parallel.distr_eccentricity( + g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g) +) where {T<:Number} + vlen = length(vs) + eccs = SharedVector{T}(vlen) + @sync @distributed for i in 1:vlen + local d = Graphs.dijkstra_shortest_paths(g, vs[i], distmx) + eccs[i] = maximum(d.dists) + end + d = sdata(eccs) + maximum(d) == typemax(T) && @warn("Infinite path length detected") + return d +end + +# dijkstra shortest paths +function Graphs.Parallel.distr_dijkstra_shortest_paths( + g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g) +) where {T<:Number} where {U} + n_v = nv(g) + r_v = length(sources) + + # TODO: remove `Int` once julialang/#23029 / #23032 are resolved + dists = SharedMatrix{T}(Int(r_v), Int(n_v)) + parents = SharedMatrix{U}(Int(r_v), Int(n_v)) + + @sync @distributed for i in 1:r_v + state = Graphs.dijkstra_shortest_paths(g, sources[i], distmx) + dists[i, :] = state.dists + parents[i, :] = state.parents + end + + result = Graphs.Parallel.MultipleDijkstraState(sdata(dists), sdata(parents)) + return result +end + +# random greedy color +function Graphs.Parallel.distr_random_greedy_color( + g::AbstractGraph{T}, reps::Integer +) where {T<:Integer} + best = @distributed (Graphs.best_color) for i in 1:reps + seq = shuffle(vertices(g)) + Graphs.perm_greedy_color(g, seq) + end + + return convert(Graphs.Coloring{T}, best) +end + +end diff --git a/src/Parallel/Parallel.jl b/src/Parallel/Parallel.jl index 6dc931231..3917dde96 100644 --- a/src/Parallel/Parallel.jl +++ b/src/Parallel/Parallel.jl @@ -2,9 +2,7 @@ module Parallel using Graphs using Graphs: sample, AbstractPathState, JohnsonState, BellmanFordState, FloydWarshallState -using Distributed: @distributed using Base.Threads: @threads, nthreads, Atomic, atomic_add!, atomic_cas! -using SharedArrays: SharedMatrix, SharedVector, sdata using ArnoldiMethod: LM, SR, LR, partialschur, partialeigen using Random: AbstractRNG, shuffle import SparseArrays: sparse diff --git a/src/Parallel/centrality/betweenness.jl b/src/Parallel/centrality/betweenness.jl index 14a7a5a8a..77bd739dc 100644 --- a/src/Parallel/centrality/betweenness.jl +++ b/src/Parallel/centrality/betweenness.jl @@ -4,7 +4,7 @@ function betweenness_centrality( distmx::AbstractMatrix=weights(g); normalize=true, endpoints=false, - parallel=:distributed, + parallel=:threads, ) return if parallel == :distributed distr_betweenness_centrality( @@ -23,7 +23,7 @@ function betweenness_centrality( distmx::AbstractMatrix=weights(g); normalize=true, endpoints=false, - parallel=:distributed, + parallel=:threads, rng::Union{Nothing,AbstractRNG}=nothing, seed::Union{Nothing,Integer}=nothing, ) @@ -39,37 +39,10 @@ function betweenness_centrality( end end -function distr_betweenness_centrality( - g::AbstractGraph, - vs=vertices(g), - distmx::AbstractMatrix=weights(g); - normalize=true, - endpoints=false, -)::Vector{Float64} - n_v = nv(g) - k = length(vs) - isdir = is_directed(g) - - # Parallel reduction - - betweenness = @distributed (+) for s in vs - temp_betweenness = zeros(n_v) - if degree(g, s) > 0 # this might be 1? - state = Graphs.dijkstra_shortest_paths( - g, s, distmx; allpaths=true, trackvertices=true - ) - if endpoints - Graphs._accumulate_endpoints!(temp_betweenness, state, g, s) - else - Graphs._accumulate_basic!(temp_betweenness, state, g, s) - end - end - temp_betweenness - end - - Graphs._rescale!(betweenness, n_v, normalize, isdir, k) - - return betweenness +function distr_betweenness_centrality(args...; kwargs...) + return error( + "`parallel = :distributed` requested, but SharedArrays or Distributed is not loaded" + ) end function distr_betweenness_centrality( diff --git a/src/Parallel/centrality/closeness.jl b/src/Parallel/centrality/closeness.jl index 920421211..2259ea0aa 100644 --- a/src/Parallel/centrality/closeness.jl +++ b/src/Parallel/centrality/closeness.jl @@ -1,8 +1,5 @@ function closeness_centrality( - g::AbstractGraph, - distmx::AbstractMatrix=weights(g); - normalize=true, - parallel=:distributed, + g::AbstractGraph, distmx::AbstractMatrix=weights(g); normalize=true, parallel=:threads ) return if parallel == :distributed distr_closeness_centrality(g, distmx; normalize=normalize) @@ -11,33 +8,10 @@ function closeness_centrality( end end -function distr_closeness_centrality( - g::AbstractGraph, distmx::AbstractMatrix=weights(g); normalize=true -)::Vector{Float64} - n_v = Int(nv(g)) - closeness = SharedVector{Float64}(n_v) - fill!(closeness, 0.0) - - @sync @distributed for u in vertices(g) - if degree(g, u) == 0 # no need to do Dijkstra here - closeness[u] = 0.0 - else - d = Graphs.dijkstra_shortest_paths(g, u, distmx).dists - δ = filter(x -> x != typemax(x), d) - σ = sum(δ) - l = length(δ) - 1 - if σ > 0 - closeness[u] = l / σ - if normalize - n = l * 1.0 / (n_v - 1) - closeness[u] *= n - end - else - closeness[u] = 0.0 - end - end - end - return sdata(closeness) +function distr_closeness_centrality(args...; kwargs...) + return error( + "`parallel = :distributed` requested, but SharedArrays or Distributed is not loaded" + ) end function threaded_closeness_centrality( diff --git a/src/Parallel/centrality/radiality.jl b/src/Parallel/centrality/radiality.jl index e10fbe81b..ace98d454 100644 --- a/src/Parallel/centrality/radiality.jl +++ b/src/Parallel/centrality/radiality.jl @@ -1,4 +1,4 @@ -function radiality_centrality(g::AbstractGraph; parallel=:distributed) +function radiality_centrality(g::AbstractGraph; parallel=:threads) return if parallel == :distributed distr_radiality_centrality(g) else @@ -6,22 +6,10 @@ function radiality_centrality(g::AbstractGraph; parallel=:distributed) end end -function distr_radiality_centrality(g::AbstractGraph)::Vector{Float64} - n_v = nv(g) - vs = vertices(g) - n = ne(g) - meandists = SharedVector{Float64}(Int(n_v)) - maxdists = SharedVector{Float64}(Int(n_v)) - - @sync @distributed for i in 1:n_v - d = Graphs.dijkstra_shortest_paths(g, vs[i]) - maxdists[i] = maximum(d.dists) - meandists[i] = sum(d.dists) / (n_v - 1) - nothing - end - dmtr = maximum(maxdists) - radialities = collect(meandists) - return ((dmtr + 1) .- radialities) ./ dmtr +function distr_radiality_centrality(args...; kwargs...) + return error( + "`parallel = :distributed` requested, but SharedArrays or Distributed is not loaded" + ) end function threaded_radiality_centrality(g::AbstractGraph)::Vector{Float64} diff --git a/src/Parallel/centrality/stress.jl b/src/Parallel/centrality/stress.jl index 58844af8d..7d717aa11 100644 --- a/src/Parallel/centrality/stress.jl +++ b/src/Parallel/centrality/stress.jl @@ -1,4 +1,4 @@ -function stress_centrality(g::AbstractGraph, vs=vertices(g); parallel=:distributed) +function stress_centrality(g::AbstractGraph, vs=vertices(g); parallel=:threads) return if parallel == :distributed distr_stress_centrality(g, vs) else @@ -9,7 +9,7 @@ end function stress_centrality( g::AbstractGraph, k::Integer; - parallel=:distributed, + parallel=:threads, rng::Union{Nothing,AbstractRNG}=nothing, seed::Union{Nothing,Integer}=nothing, ) @@ -21,21 +21,10 @@ function stress_centrality( end end -function distr_stress_centrality(g::AbstractGraph, vs=vertices(g))::Vector{Int64} - n_v = nv(g) - k = length(vs) - isdir = is_directed(g) - - # Parallel reduction - stress = @distributed (+) for s in vs - temp_stress = zeros(Int64, n_v) - if degree(g, s) > 0 # this might be 1? - state = Graphs.dijkstra_shortest_paths(g, s; allpaths=true, trackvertices=true) - Graphs._stress_accumulate_basic!(temp_stress, state, g, s) - end - temp_stress - end - return stress +function distr_stress_centrality(args...; kwargs...) + return error( + "`parallel = :distributed` requested, but SharedArrays or Distributed is not loaded" + ) end function threaded_stress_centrality(g::AbstractGraph, vs=vertices(g))::Vector{Int64} diff --git a/src/Parallel/distance.jl b/src/Parallel/distance.jl index f16df61ea..5e5bf5c13 100644 --- a/src/Parallel/distance.jl +++ b/src/Parallel/distance.jl @@ -4,7 +4,7 @@ function eccentricity( g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g); - parallel::Symbol=:distributed, + parallel::Symbol=:threads, ) where {T<:Number} return if parallel === :threads threaded_eccentricity(g, vs, distmx) @@ -19,18 +19,10 @@ function eccentricity( end end -function distr_eccentricity( - g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g) -) where {T<:Number} - vlen = length(vs) - eccs = SharedVector{T}(vlen) - @sync @distributed for i in 1:vlen - local d = Graphs.dijkstra_shortest_paths(g, vs[i], distmx) - eccs[i] = maximum(d.dists) - end - d = sdata(eccs) - maximum(d) == typemax(T) && @warn("Infinite path length detected") - return d +function distr_eccentricity(args...; kwargs...) + return error( + "`parallel = :distributed` requested, but SharedArrays or Distributed is not loaded" + ) end function threaded_eccentricity( @@ -46,9 +38,7 @@ function threaded_eccentricity( return eccs end -function eccentricity( - g::AbstractGraph, distmx::AbstractMatrix; parallel::Symbol=:distributed -) +function eccentricity(g::AbstractGraph, distmx::AbstractMatrix; parallel::Symbol=:threads) return eccentricity(g, vertices(g), distmx; parallel) end diff --git a/src/Parallel/shortestpaths/dijkstra.jl b/src/Parallel/shortestpaths/dijkstra.jl index 7091e782b..4aa6804e3 100644 --- a/src/Parallel/shortestpaths/dijkstra.jl +++ b/src/Parallel/shortestpaths/dijkstra.jl @@ -21,7 +21,7 @@ function dijkstra_shortest_paths( g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g); - parallel::Symbol=:distributed, + parallel::Symbol=:threads, ) where {T<:Number} where {U} return if parallel === :threads threaded_dijkstra_shortest_paths(g, sources, distmx) @@ -56,22 +56,8 @@ function threaded_dijkstra_shortest_paths( return result end -function distr_dijkstra_shortest_paths( - g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g) -) where {T<:Number} where {U} - n_v = nv(g) - r_v = length(sources) - - # TODO: remove `Int` once julialang/#23029 / #23032 are resolved - dists = SharedMatrix{T}(Int(r_v), Int(n_v)) - parents = SharedMatrix{U}(Int(r_v), Int(n_v)) - - @sync @distributed for i in 1:r_v - state = Graphs.dijkstra_shortest_paths(g, sources[i], distmx) - dists[i, :] = state.dists - parents[i, :] = state.parents - end - - result = MultipleDijkstraState(sdata(dists), sdata(parents)) - return result +function distr_dijkstra_shortest_paths(args...; kwargs...) + return error( + "`parallel = :distributed` requested, but SharedArrays or Distributed is not loaded" + ) end diff --git a/src/Parallel/traversals/greedy_color.jl b/src/Parallel/traversals/greedy_color.jl index b6ebf1880..1b6d7a430 100644 --- a/src/Parallel/traversals/greedy_color.jl +++ b/src/Parallel/traversals/greedy_color.jl @@ -1,5 +1,5 @@ function random_greedy_color( - g::AbstractGraph{T}, reps::Integer; parallel::Symbol=:distributed + g::AbstractGraph{T}, reps::Integer; parallel::Symbol=:threads ) where {T<:Integer} return if parallel === :threads threaded_random_greedy_color(g, reps) @@ -25,20 +25,14 @@ function threaded_random_greedy_color(g::AbstractGraph{T}, reps::Integer) where return convert(Graphs.Coloring{T}, best) end -function distr_random_greedy_color(g::AbstractGraph{T}, reps::Integer) where {T<:Integer} - best = @distributed (Graphs.best_color) for i in 1:reps - seq = shuffle(vertices(g)) - Graphs.perm_greedy_color(g, seq) - end - - return convert(Graphs.Coloring{T}, best) +function distr_random_greedy_color(args...; kwargs...) + return error( + "`parallel = :distributed` requested, but SharedArrays or Distributed is not loaded" + ) end function greedy_color( - g::AbstractGraph{U}; - sort_degree::Bool=false, - reps::Integer=1, - parallel::Symbol=:distributed, + g::AbstractGraph{U}; sort_degree::Bool=false, reps::Integer=1, parallel::Symbol=:threads ) where {U<:Integer} return if sort_degree Graphs.degree_greedy_color(g) diff --git a/src/Parallel/utils.jl b/src/Parallel/utils.jl index fe763ecdc..85847db33 100644 --- a/src/Parallel/utils.jl +++ b/src/Parallel/utils.jl @@ -22,14 +22,10 @@ end Distributed implementation of [`generate_reduce`](@ref). """ -function distr_generate_reduce( - g::AbstractGraph{T}, gen_func::Function, comp::Comp, reps::Integer -) where {T<:Integer,Comp} - # Type assert required for type stability - min_set::Vector{T} = @distributed ((x, y) -> comp(x, y) ? x : y) for _ in 1:reps - gen_func(g) - end - return min_set +function distr_generate_reduce(args...; kwargs...) + return error( + "`parallel = :distributed` requested, but SharedArrays or Distributed is not loaded" + ) end """ diff --git a/test/parallel/runtests.jl b/test/parallel/runtests.jl index 76805ac5b..c9cc7ee43 100644 --- a/test/parallel/runtests.jl +++ b/test/parallel/runtests.jl @@ -1,5 +1,6 @@ using Graphs using Graphs.Parallel +using SharedArrays # to trigger extension loading using Base.Threads: @threads, Atomic @test length(description()) > 1