Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ version = "1.13.1"
[deps]
ArnoldiMethod = "ec485272-7323-5ecc-a04f-4719b315124d"
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Inflate = "d25df0c9-e2be-5dd7-82c8-3ad0b3e990b9"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
SharedArrays = "1a1011a3-84de-559e-8e89-a11a2f7dc383"
SimpleTraits = "699a6c99-e7fa-54fc-8d76-47d257e15c1d"
SparseArrays = "2f01184e-e22b-5df5-ae63-d93ebab69eaf"
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"

[weakdeps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
SharedArrays = "1a1011a3-84de-559e-8e89-a11a2f7dc383"

[extensions]
GraphsSharedArraysExt = "SharedArrays"

[compat]
ArnoldiMethod = "0.4"
Distributed = "1"
Expand All @@ -25,4 +30,4 @@ SharedArrays = "1"
SimpleTraits = "0.9"
SparseArrays = "1"
Statistics = "1"
julia = "1.10"
julia = "1.10"
170 changes: 170 additions & 0 deletions ext/GraphsSharedArraysExt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
module GraphsSharedArraysExt

using Graphs
using SharedArrays: SharedArrays, SharedMatrix, SharedVector, sdata
using SharedArrays.Distributed: @distributed
using Random: shuffle

# betweenness
function Graphs.Parallel.distr_betweenness_centrality(
g::AbstractGraph,
vs=vertices(g),
distmx::AbstractMatrix=weights(g);
normalize=true,
endpoints=false,
)::Vector{Float64}
n_v = nv(g)
k = length(vs)
isdir = is_directed(g)

# Parallel reduction

betweenness = @distributed (+) for s in vs
temp_betweenness = zeros(n_v)
if degree(g, s) > 0 # this might be 1?
state = Graphs.dijkstra_shortest_paths(
g, s, distmx; allpaths=true, trackvertices=true
)
if endpoints
Graphs._accumulate_endpoints!(temp_betweenness, state, g, s)
else
Graphs._accumulate_basic!(temp_betweenness, state, g, s)
end
end
temp_betweenness
end

Graphs._rescale!(betweenness, n_v, normalize, isdir, k)

return betweenness
end

# closeness
function Graphs.Parallel.distr_closeness_centrality(
g::AbstractGraph, distmx::AbstractMatrix=weights(g); normalize=true
)::Vector{Float64}
n_v = Int(nv(g))
closeness = SharedVector{Float64}(n_v)
fill!(closeness, 0.0)

@sync @distributed for u in vertices(g)
if degree(g, u) == 0 # no need to do Dijkstra here
closeness[u] = 0.0
else
d = Graphs.dijkstra_shortest_paths(g, u, distmx).dists
δ = filter(x -> x != typemax(x), d)
σ = sum(δ)
l = length(δ) - 1
if σ > 0
closeness[u] = l / σ
if normalize
n = l * 1.0 / (n_v - 1)
closeness[u] *= n
end
else
closeness[u] = 0.0
end
end
end
return sdata(closeness)
end

# radiality
function Graphs.Parallel.distr_radiality_centrality(g::AbstractGraph)::Vector{Float64}
n_v = nv(g)
vs = vertices(g)
n = ne(g)
meandists = SharedVector{Float64}(Int(n_v))
maxdists = SharedVector{Float64}(Int(n_v))

@sync @distributed for i in 1:n_v
d = Graphs.dijkstra_shortest_paths(g, vs[i])
maxdists[i] = maximum(d.dists)
meandists[i] = sum(d.dists) / (n_v - 1)
nothing
end
dmtr = maximum(maxdists)
radialities = collect(meandists)
return ((dmtr + 1) .- radialities) ./ dmtr
end

# stress
function Graphs.Parallel.distr_stress_centrality(
g::AbstractGraph, vs=vertices(g)
)::Vector{Int64}
n_v = nv(g)
k = length(vs)
isdir = is_directed(g)

# Parallel reduction
stress = @distributed (+) for s in vs
temp_stress = zeros(Int64, n_v)
if degree(g, s) > 0 # this might be 1?
state = Graphs.dijkstra_shortest_paths(g, s; allpaths=true, trackvertices=true)
Graphs._stress_accumulate_basic!(temp_stress, state, g, s)
end
temp_stress
end
return stress
end

# generate_reduce
function Graphs.Parallel.distr_generate_reduce(
g::AbstractGraph{T}, gen_func::Function, comp::Comp, reps::Integer
) where {T<:Integer,Comp}
# Type assert required for type stability
min_set::Vector{T} = @distributed ((x, y) -> comp(x, y) ? x : y) for _ in 1:reps
gen_func(g)
end
return min_set
end

# eccentricity
function Graphs.Parallel.distr_eccentricity(
g::AbstractGraph, vs=vertices(g), distmx::AbstractMatrix{T}=weights(g)
) where {T<:Number}
vlen = length(vs)
eccs = SharedVector{T}(vlen)
@sync @distributed for i in 1:vlen
local d = Graphs.dijkstra_shortest_paths(g, vs[i], distmx)
eccs[i] = maximum(d.dists)
end
d = sdata(eccs)
maximum(d) == typemax(T) && @warn("Infinite path length detected")
return d
end

# dijkstra shortest paths
function Graphs.Parallel.distr_dijkstra_shortest_paths(
g::AbstractGraph{U}, sources=vertices(g), distmx::AbstractMatrix{T}=weights(g)
) where {T<:Number} where {U}
n_v = nv(g)
r_v = length(sources)

# TODO: remove `Int` once julialang/#23029 / #23032 are resolved
dists = SharedMatrix{T}(Int(r_v), Int(n_v))
parents = SharedMatrix{U}(Int(r_v), Int(n_v))

@sync @distributed for i in 1:r_v
state = Graphs.dijkstra_shortest_paths(g, sources[i], distmx)
dists[i, :] = state.dists
parents[i, :] = state.parents
end

result = Graphs.Parallel.MultipleDijkstraState(sdata(dists), sdata(parents))
return result
end

# random greedy color
function Graphs.Parallel.distr_random_greedy_color(
g::AbstractGraph{T}, reps::Integer
) where {T<:Integer}
best = @distributed (Graphs.best_color) for i in 1:reps
seq = shuffle(vertices(g))
Graphs.perm_greedy_color(g, seq)
end

return convert(Graphs.Coloring{T}, best)
end

end
2 changes: 0 additions & 2 deletions src/Parallel/Parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ module Parallel

using Graphs
using Graphs: sample, AbstractPathState, JohnsonState, BellmanFordState, FloydWarshallState
using Distributed: @distributed
using Base.Threads: @threads, nthreads, Atomic, atomic_add!, atomic_cas!
using SharedArrays: SharedMatrix, SharedVector, sdata
using ArnoldiMethod: LM, SR, LR, partialschur, partialeigen
using Random: AbstractRNG, shuffle
import SparseArrays: sparse
Expand Down
39 changes: 6 additions & 33 deletions src/Parallel/centrality/betweenness.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ function betweenness_centrality(
distmx::AbstractMatrix=weights(g);
normalize=true,
endpoints=false,
parallel=:distributed,
parallel=:threads,
)
return if parallel == :distributed
distr_betweenness_centrality(
Expand All @@ -23,7 +23,7 @@ function betweenness_centrality(
distmx::AbstractMatrix=weights(g);
normalize=true,
endpoints=false,
parallel=:distributed,
parallel=:threads,
rng::Union{Nothing,AbstractRNG}=nothing,
seed::Union{Nothing,Integer}=nothing,
)
Expand All @@ -39,37 +39,10 @@ function betweenness_centrality(
end
end

function distr_betweenness_centrality(
g::AbstractGraph,
vs=vertices(g),
distmx::AbstractMatrix=weights(g);
normalize=true,
endpoints=false,
)::Vector{Float64}
n_v = nv(g)
k = length(vs)
isdir = is_directed(g)

# Parallel reduction

betweenness = @distributed (+) for s in vs
temp_betweenness = zeros(n_v)
if degree(g, s) > 0 # this might be 1?
state = Graphs.dijkstra_shortest_paths(
g, s, distmx; allpaths=true, trackvertices=true
)
if endpoints
Graphs._accumulate_endpoints!(temp_betweenness, state, g, s)
else
Graphs._accumulate_basic!(temp_betweenness, state, g, s)
end
end
temp_betweenness
end

Graphs._rescale!(betweenness, n_v, normalize, isdir, k)

return betweenness
function distr_betweenness_centrality(args...; kwargs...)
return error(
"`parallel = :distributed` requested, but SharedArrays or Distributed is not loaded"
)
end

function distr_betweenness_centrality(
Expand Down
36 changes: 5 additions & 31 deletions src/Parallel/centrality/closeness.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
function closeness_centrality(
g::AbstractGraph,
distmx::AbstractMatrix=weights(g);
normalize=true,
parallel=:distributed,
g::AbstractGraph, distmx::AbstractMatrix=weights(g); normalize=true, parallel=:threads
)
return if parallel == :distributed
distr_closeness_centrality(g, distmx; normalize=normalize)
Expand All @@ -11,33 +8,10 @@ function closeness_centrality(
end
end

function distr_closeness_centrality(
g::AbstractGraph, distmx::AbstractMatrix=weights(g); normalize=true
)::Vector{Float64}
n_v = Int(nv(g))
closeness = SharedVector{Float64}(n_v)
fill!(closeness, 0.0)

@sync @distributed for u in vertices(g)
if degree(g, u) == 0 # no need to do Dijkstra here
closeness[u] = 0.0
else
d = Graphs.dijkstra_shortest_paths(g, u, distmx).dists
δ = filter(x -> x != typemax(x), d)
σ = sum(δ)
l = length(δ) - 1
if σ > 0
closeness[u] = l / σ
if normalize
n = l * 1.0 / (n_v - 1)
closeness[u] *= n
end
else
closeness[u] = 0.0
end
end
end
return sdata(closeness)
function distr_closeness_centrality(args...; kwargs...)
return error(
"`parallel = :distributed` requested, but SharedArrays or Distributed is not loaded"
)
end

function threaded_closeness_centrality(
Expand Down
22 changes: 5 additions & 17 deletions src/Parallel/centrality/radiality.jl
Original file line number Diff line number Diff line change
@@ -1,27 +1,15 @@
function radiality_centrality(g::AbstractGraph; parallel=:distributed)
function radiality_centrality(g::AbstractGraph; parallel=:threads)
return if parallel == :distributed
distr_radiality_centrality(g)
else
threaded_radiality_centrality(g)
end
end

function distr_radiality_centrality(g::AbstractGraph)::Vector{Float64}
n_v = nv(g)
vs = vertices(g)
n = ne(g)
meandists = SharedVector{Float64}(Int(n_v))
maxdists = SharedVector{Float64}(Int(n_v))

@sync @distributed for i in 1:n_v
d = Graphs.dijkstra_shortest_paths(g, vs[i])
maxdists[i] = maximum(d.dists)
meandists[i] = sum(d.dists) / (n_v - 1)
nothing
end
dmtr = maximum(maxdists)
radialities = collect(meandists)
return ((dmtr + 1) .- radialities) ./ dmtr
function distr_radiality_centrality(args...; kwargs...)
return error(
"`parallel = :distributed` requested, but SharedArrays or Distributed is not loaded"
)
end

function threaded_radiality_centrality(g::AbstractGraph)::Vector{Float64}
Expand Down
23 changes: 6 additions & 17 deletions src/Parallel/centrality/stress.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
function stress_centrality(g::AbstractGraph, vs=vertices(g); parallel=:distributed)
function stress_centrality(g::AbstractGraph, vs=vertices(g); parallel=:threads)
return if parallel == :distributed
distr_stress_centrality(g, vs)
else
Expand All @@ -9,7 +9,7 @@ end
function stress_centrality(
g::AbstractGraph,
k::Integer;
parallel=:distributed,
parallel=:threads,
rng::Union{Nothing,AbstractRNG}=nothing,
seed::Union{Nothing,Integer}=nothing,
)
Expand All @@ -21,21 +21,10 @@ function stress_centrality(
end
end

function distr_stress_centrality(g::AbstractGraph, vs=vertices(g))::Vector{Int64}
n_v = nv(g)
k = length(vs)
isdir = is_directed(g)

# Parallel reduction
stress = @distributed (+) for s in vs
temp_stress = zeros(Int64, n_v)
if degree(g, s) > 0 # this might be 1?
state = Graphs.dijkstra_shortest_paths(g, s; allpaths=true, trackvertices=true)
Graphs._stress_accumulate_basic!(temp_stress, state, g, s)
end
temp_stress
end
return stress
function distr_stress_centrality(args...; kwargs...)
return error(
"`parallel = :distributed` requested, but SharedArrays or Distributed is not loaded"
)
end

function threaded_stress_centrality(g::AbstractGraph, vs=vertices(g))::Vector{Int64}
Expand Down
Loading
Loading