Documentation
start
AsynchronousIterativeAlgorithms.start
— Functionstart(algorithm, problem_constructor, stopat; kwargs...)
start(algorithm, distributed_problem, stopat; kwargs...)
Solve the distributed problem returned by problem_constructor
(or referenced by distributed_problem
) using the algorithm
until the stopat
conditions are reached.
Arguments
algorithm::AbstractAlgorithm{Q,A}
: subtypesAbstractAlgorithm{Q,A}
and implementing its functor callsproblem_constructor::Function
: for each pid in {pids
⋃ current pid}, process pid callingproblem_constructor(pid)
should return the process' assigned problemdistributed_problem::DistributedObject
: for each pid in {pids
⋃ current pid},distributed_problem
should reference process pid's assigned problem on pidstopat::NamedTuple
: you can specify any of the followingiteration::Int64
: maximum number of iterationsepoch::Int64
: maximum number of epochs (an epoch passes when all workers have answered at least one time)time::Float64
: maximum elapsed time (in seconds)- other custom stopping conditions that you have specified by implementing
stopnow
Keywords
saveat=NamedTuple()
: when to record query iterates (::Q
), iterations, epochs, timestamps (and other custom values specified by implementingprogress
). Specified with any of the followingiteration::Int64
: save everyiteration
> 0epoch::Int64
: , save everyepoch
> 0- other custom saving conditions that you have specified by implementing
savenow
save_answers=false
: answer iterates (::A
) along with the pids of the workers that sent them are recorderpids=workers()
:pids
of the active workers, you can start a non-distributed (and necessarily synchronous) version of your algorithm withpids=[1]
synchronous=false
: ifsynchronous=true
, the central node waits for all workers to answer before making a stepresilience=0
: number of workers allowed to fail before the execution is stoppedverbose=1
: if> 0
, a progress bar is displayed (implentprogress
and/orshowvalues
to customize the display)
Returns
- NamedTuple: a record of the
queries
and theiterations
,epochs
,timestamps
at which they were recorded, as well ananswer_count
of each worker, additionally,- if
save_answers=true
, a record of theanswers
and theanswers_origin
- other custom values you have specified by implementing
report
- if
Throws
ArgumentError
: if the arguments don't match the specifications.
AbstractAlgorithm
The algorithm you pass to start
should subtype AbstractAlgorithm{Q,A}
.
AsynchronousIterativeAlgorithms.AbstractAlgorithm
— TypeAbstractAlgorithm{Q,A}
To be compatible with start
, types subtyping AbstractAlgorithm
should be callable with the following signatures:
(algorithm::AbstractAlgorithm{Q,A})(problem::Any)::Q where {Q,A}
: the initialization step that create the first query iterate(algorithm::AbstractAlgorithm{Q,A})(q::Q, problem::Any)::A where {Q,A}
: the answer step perfromed by the wokers when they receive a queryq::Q
from the central node(algorithm::AbstractAlgorithm{Q,A})(a::A, worker::Int64, problem::Any)::Q where {Q,A}
: the query step performed by the central node when receiving an answera::A
from a worker- when
start
takes the keywordsynchronous=true
,(algorithm::AbstractAlgorithm{Q,A})(as::Vector{A}, workers::Vector{Int64}, problem::Any)::Q where {Q,A}
: the query step performed by the central node when receiving the answersas::Vector{A}
respectively from theworkers
Customization of start
's execution
AsynchronousIterativeAlgorithms.stopnow
— Functionstopnow(::AbstractAlgorithm, stopat::NamedTuple) = false
Define this method on your algorithm<:AbstractAlgorithm
to add a stopping criterion: return true if your stopping condition has been reached.
AsynchronousIterativeAlgorithms.savenow
— Functionsavenow(::AbstractAlgorithm, saveat::NamedTuple) = false
Define this method on your algorithm<:AbstractAlgorithm
to add saving stops: return true if your saving condition has been reached
AsynchronousIterativeAlgorithms.savevalues
— Functionsavevalues(::AbstractAlgorithm) = nothing
Define this method on your algorithm<:AbstractAlgorithm
. It will be called at each iteration where savenow
returns true
: store some values on your algorithm object (don't forget to define report
to retrieve what you stored)
AsynchronousIterativeAlgorithms.report
— Functionreport(::AbstractAlgorithm) = NamedTuple()
Define this method on your algorithm<:AbstractAlgorithm
to add custom values to the results outputted by start
: return a NamedTuple()
with those values, making sure to not reuse the field names queries
, answers
, iterations
, epochs
, timestamps
, answers_origin
, answer_count
.
AsynchronousIterativeAlgorithms.progress
— Functionprogress(::AbstractAlgorithm, stopat::NamedTuple) = 0.
Define this method on your algorithm<:AbstractAlgorithm
to change the display of the progress bar: return how close the current step is to reaching your stopping requirement on a scale of 0 to 1
AsynchronousIterativeAlgorithms.showvalues
— Functionshowvalues(::AbstractAlgorithm) = Tuple{Symbol, Any}[]
Define this method on your algorithm<:AbstractAlgorithm
to add a values to be displayed below the progress bar when verbose>1
: return a Tuple{Symbol, Any}
with those values.
Algorithm wrappers
The two following algorithms already subtype AbstractAlgorithm{Q,A}
and are ready to use in start
.
AsynchronousIterativeAlgorithms.AggregationAlgorithm
— TypeAggregationAlgorithm(arg; kwarg)::AbstractAlgorithm
Distributed algorithm that writes: q_j <- query(aggregate([answer(q_i) for i in connected]))
Where a "connected" worker is a worker that has answered at least once. (Not memory optimized: length(pids)
answers are stored on the central worker at all times)
Argument
algorithm<:AbstractAlgorithm{Q,A}
which should define the following (whereconst AIA = AsynchronousIterativeAlgorithms
)AIA.initialize(algorithm, problem::Any)::Q
: step that creates the first query iterateAIA.aggregate(algorithm, as::Vector{A}, workers::Vector{Int64})::AggregatedA
where A: step performed by the central node when receiving the answersas::Vector{A}
from theworkers
AIA.query(algorithm, agg::AggregatedA, problem::Any)::Q
: step producing a query from the aggregated answeragg::AggregatedA
, performed by the central nodeAIA.answer(algorithm, q::Q, problem::Any)::A
: step perfromed by the wokers when they receive a queryq::Q
from the central node
Keyword
pids=workers()
:pids
of the active workers
AsynchronousIterativeAlgorithms.AveragingAlgorithm
— TypeAveragingAlgorithm(arg; kwarg)::AbstractAlgorithm
Distributed algorithm that writes: q_j <- query(weighted_average([answer(q_i) for i in connected]))
Where a "connected" worker is a worker that has answered at least once. (Memory optimized: only the equivalent of one answer is stored on the central worker at all times)
Argument
algorithm<:AbstractAlgorithm{Q,A}
which should define the following (whereconst AIA = AsynchronousIterativeAlgorithms
)AIA.initialize(algorithm, problem::Any)::Q
: step that creates the first query iterateAIA.query(algorithm, a::A, problem::Any)::Q
: step producing a query from the averaged answer, performed by the central nodeAIA.answer(algorithm, q::Q, problem::Any)::A
: step perfromed by the wokers when they receive a queryq::Q
from the central node
Keyword
pids=workers()
:pids
of the active workersweights=ones(length(pids))
: weights of each pid in the weighted average