Documentation
start
AsynchronousIterativeAlgorithms.start — Functionstart(algorithm, problem_constructor, stopat; kwargs...)
start(algorithm, distributed_problem, stopat; kwargs...)Solve the distributed problem returned by problem_constructor (or referenced by distributed_problem) using the algorithm until the stopat conditions are reached.
Arguments
algorithm::AbstractAlgorithm{Q,A}: subtypesAbstractAlgorithm{Q,A}and implementing its functor callsproblem_constructor::Function: for each pid in {pids⋃ current pid}, process pid callingproblem_constructor(pid)should return the process' assigned problemdistributed_problem::DistributedObject: for each pid in {pids⋃ current pid},distributed_problemshould reference process pid's assigned problem on pidstopat::NamedTuple: you can specify any of the followingiteration::Int64: maximum number of iterationsepoch::Int64: maximum number of epochs (an epoch passes when all workers have answered at least one time)time::Float64: maximum elapsed time (in seconds)- other custom stopping conditions that you have specified by implementing
stopnow
Keywords
saveat=NamedTuple(): when to record query iterates (::Q), iterations, epochs, timestamps (and other custom values specified by implementingprogress). Specified with any of the followingiteration::Int64: save everyiteration> 0epoch::Int64: , save everyepoch> 0- other custom saving conditions that you have specified by implementing
savenow
save_answers=false: answer iterates (::A) along with the pids of the workers that sent them are recorderpids=workers():pidsof the active workers, you can start a non-distributed (and necessarily synchronous) version of your algorithm withpids=[1]synchronous=false: ifsynchronous=true, the central node waits for all workers to answer before making a stepresilience=0: number of workers allowed to fail before the execution is stoppedverbose=1: if> 0, a progress bar is displayed (implentprogressand/orshowvaluesto customize the display)
Returns
- NamedTuple: a record of the
queriesand theiterations,epochs,timestampsat which they were recorded, as well ananswer_countof each worker, additionally,- if
save_answers=true, a record of theanswersand theanswers_origin - other custom values you have specified by implementing
report
- if
Throws
ArgumentError: if the arguments don't match the specifications.
AbstractAlgorithm
The algorithm you pass to start should subtype AbstractAlgorithm{Q,A}.
AsynchronousIterativeAlgorithms.AbstractAlgorithm — TypeAbstractAlgorithm{Q,A}To be compatible with start, types subtyping AbstractAlgorithm should be callable with the following signatures:
(algorithm::AbstractAlgorithm{Q,A})(problem::Any)::Q where {Q,A}: the initialization step that create the first query iterate(algorithm::AbstractAlgorithm{Q,A})(q::Q, problem::Any)::A where {Q,A}: the answer step perfromed by the wokers when they receive a queryq::Qfrom the central node(algorithm::AbstractAlgorithm{Q,A})(a::A, worker::Int64, problem::Any)::Q where {Q,A}: the query step performed by the central node when receiving an answera::Afrom a worker- when
starttakes the keywordsynchronous=true,(algorithm::AbstractAlgorithm{Q,A})(as::Vector{A}, workers::Vector{Int64}, problem::Any)::Q where {Q,A}: the query step performed by the central node when receiving the answersas::Vector{A}respectively from theworkers
Customization of start's execution
AsynchronousIterativeAlgorithms.stopnow — Functionstopnow(::AbstractAlgorithm, stopat::NamedTuple) = falseDefine this method on your algorithm<:AbstractAlgorithm to add a stopping criterion: return true if your stopping condition has been reached.
AsynchronousIterativeAlgorithms.savenow — Functionsavenow(::AbstractAlgorithm, saveat::NamedTuple) = falseDefine this method on your algorithm<:AbstractAlgorithm to add saving stops: return true if your saving condition has been reached
AsynchronousIterativeAlgorithms.savevalues — Functionsavevalues(::AbstractAlgorithm) = nothingDefine this method on your algorithm<:AbstractAlgorithm. It will be called at each iteration where savenow returns true: store some values on your algorithm object (don't forget to define report to retrieve what you stored)
AsynchronousIterativeAlgorithms.report — Functionreport(::AbstractAlgorithm) = NamedTuple()Define this method on your algorithm<:AbstractAlgorithm to add custom values to the results outputted by start: return a NamedTuple() with those values, making sure to not reuse the field names queries, answers, iterations, epochs, timestamps, answers_origin, answer_count.
AsynchronousIterativeAlgorithms.progress — Functionprogress(::AbstractAlgorithm, stopat::NamedTuple) = 0.Define this method on your algorithm<:AbstractAlgorithm to change the display of the progress bar: return how close the current step is to reaching your stopping requirement on a scale of 0 to 1
AsynchronousIterativeAlgorithms.showvalues — Functionshowvalues(::AbstractAlgorithm) = Tuple{Symbol, Any}[]Define this method on your algorithm<:AbstractAlgorithm to add a values to be displayed below the progress bar when verbose>1: return a Tuple{Symbol, Any} with those values.
Algorithm wrappers
The two following algorithms already subtype AbstractAlgorithm{Q,A} and are ready to use in start.
AsynchronousIterativeAlgorithms.AggregationAlgorithm — TypeAggregationAlgorithm(arg; kwarg)::AbstractAlgorithmDistributed algorithm that writes: q_j <- query(aggregate([answer(q_i) for i in connected])) Where a "connected" worker is a worker that has answered at least once. (Not memory optimized: length(pids) answers are stored on the central worker at all times)
Argument
algorithm<:AbstractAlgorithm{Q,A}which should define the following (whereconst AIA = AsynchronousIterativeAlgorithms)AIA.initialize(algorithm, problem::Any)::Q: step that creates the first query iterateAIA.aggregate(algorithm, as::Vector{A}, workers::Vector{Int64})::AggregatedAwhere A: step performed by the central node when receiving the answersas::Vector{A}from theworkersAIA.query(algorithm, agg::AggregatedA, problem::Any)::Q: step producing a query from the aggregated answeragg::AggregatedA, performed by the central nodeAIA.answer(algorithm, q::Q, problem::Any)::A: step perfromed by the wokers when they receive a queryq::Qfrom the central node
Keyword
pids=workers():pidsof the active workers
AsynchronousIterativeAlgorithms.AveragingAlgorithm — TypeAveragingAlgorithm(arg; kwarg)::AbstractAlgorithmDistributed algorithm that writes: q_j <- query(weighted_average([answer(q_i) for i in connected])) Where a "connected" worker is a worker that has answered at least once. (Memory optimized: only the equivalent of one answer is stored on the central worker at all times)
Argument
algorithm<:AbstractAlgorithm{Q,A}which should define the following (whereconst AIA = AsynchronousIterativeAlgorithms)AIA.initialize(algorithm, problem::Any)::Q: step that creates the first query iterateAIA.query(algorithm, a::A, problem::Any)::Q: step producing a query from the averaged answer, performed by the central nodeAIA.answer(algorithm, q::Q, problem::Any)::A: step perfromed by the wokers when they receive a queryq::Qfrom the central node
Keyword
pids=workers():pidsof the active workersweights=ones(length(pids)): weights of each pid in the weighted average