diff --git a/power_grid_model_c/power_grid_model/include/power_grid_model/job_adapter.hpp b/power_grid_model_c/power_grid_model/include/power_grid_model/job_adapter.hpp new file mode 100644 index 0000000000..b1d1bb1b6f --- /dev/null +++ b/power_grid_model_c/power_grid_model/include/power_grid_model/job_adapter.hpp @@ -0,0 +1,149 @@ +// SPDX-FileCopyrightText: Contributors to the Power Grid Model project +// +// SPDX-License-Identifier: MPL-2.0 + +#pragma once + +// Adapter that connects the JobDispatch to the MainModelImpl + +#include "auxiliary/dataset.hpp" +#include "job_interface.hpp" +#include "main_model_fwd.hpp" + +#include "main_core/calculation_info.hpp" +#include "main_core/update.hpp" + +namespace power_grid_model { + +template +class JobDispatchAdapter : public JobDispatchInterface> { + public: + JobDispatchAdapter(std::reference_wrapper model) : model_{std::move(model)} {} + JobDispatchAdapter(JobDispatchAdapter const& other) + : model_copy_{std::make_unique(other.model_.get())}, + model_{std::ref(*model_copy_)}, + components_to_update_{other.components_to_update_}, + update_independence_{other.update_independence_}, + independence_flags_{other.independence_flags_}, + all_scenarios_sequence_{other.all_scenarios_sequence_} {} + JobDispatchAdapter& operator=(JobDispatchAdapter const& other) { + if (this != &other) { + model_copy_ = std::make_unique(other.model_.get()); + model_ = std::ref(*model_copy_); + components_to_update_ = other.components_to_update_; + update_independence_ = other.update_independence_; + independence_flags_ = other.independence_flags_; + all_scenarios_sequence_ = other.all_scenarios_sequence_; + } + return *this; + } + JobDispatchAdapter(JobDispatchAdapter&& other) noexcept + : model_copy_{std::move(other.model_copy_)}, + model_{model_copy_ ? std::ref(*model_copy_) : std::move(other.model_)}, + components_to_update_{std::move(other.components_to_update_)}, + update_independence_{std::move(other.update_independence_)}, + independence_flags_{std::move(other.independence_flags_)}, + all_scenarios_sequence_{std::move(other.all_scenarios_sequence_)} {} + JobDispatchAdapter& operator=(JobDispatchAdapter&& other) noexcept { + if (this != &other) { + model_copy_ = std::move(other.model_copy_); + model_ = model_copy_ ? std::ref(*model_copy_) : std::move(other.model_); + components_to_update_ = std::move(other.components_to_update_); + update_independence_ = std::move(other.update_independence_); + independence_flags_ = std::move(other.independence_flags_); + all_scenarios_sequence_ = std::move(other.all_scenarios_sequence_); + } + return *this; + } + ~JobDispatchAdapter() { model_copy_.reset(); } + + private: + // Grant the CRTP base (JobDispatchInterface) access to + // JobDispatchAdapter's private members. This allows the base class template + // to call derived-class implementation details as part of the CRTP pattern. + friend class JobDispatchInterface; + + static constexpr Idx ignore_output{-1}; + + std::unique_ptr model_copy_; + std::reference_wrapper model_; + + main_core::utils::ComponentFlags components_to_update_{}; + main_core::update::independence::UpdateIndependence update_independence_{}; + main_core::utils::ComponentFlags independence_flags_{}; + std::shared_ptr> all_scenarios_sequence_; + // current_scenario_sequence_cache_ is calculated per scenario, so it is excluded from the constructors. + main_core::utils::SequenceIdx current_scenario_sequence_cache_{}; + + std::mutex calculation_info_mutex_; + + // TODO(figueroa1395): Keep calculation_fn at the adapter level only + template + requires std::invocable, MainModel&, MutableDataset const&, Idx> + void calculate_impl(Calculate&& calculation_fn, MutableDataset const& result_data, Idx scenario_idx) const { + std::forward(calculation_fn)(model_.get(), result_data, scenario_idx); + } + + template + requires std::invocable, MainModel&, MutableDataset const&, Idx> + void cache_calculate_impl(Calculate&& calculation_fn) const { + // calculate once to cache topology, ignore results, all math solvers are initialized + try { + std::forward(calculation_fn)(model_.get(), + { + false, + 1, + "sym_output", + model_.get().meta_data(), + }, + ignore_output); + } catch (SparseMatrixError const&) { // NOLINT(bugprone-empty-catch) // NOSONAR + // missing entries are provided in the update data + } catch (NotObservableError const&) { // NOLINT(bugprone-empty-catch) // NOSONAR + // missing entries are provided in the update data + } + } + + void prepare_job_dispatch_impl(ConstDataset const& update_data) { + // cache component update order where possible. + // the order for a cacheable (independent) component by definition is the same across all scenarios + components_to_update_ = model_.get().get_components_to_update(update_data); + update_independence_ = main_core::update::independence::check_update_independence( + model_.get().state(), update_data); + std::ranges::transform(update_independence_, independence_flags_.begin(), + [](auto const& comp) { return comp.is_independent(); }); + all_scenarios_sequence_ = std::make_shared>( + main_core::update::get_all_sequence_idx_map( + model_.get().state(), update_data, 0, components_to_update_, update_independence_, false)); + } + + void setup_impl(ConstDataset const& update_data, Idx scenario_idx) { + current_scenario_sequence_cache_ = main_core::update::get_all_sequence_idx_map( + model_.get().state(), update_data, scenario_idx, components_to_update_, update_independence_, true); + auto const current_scenario_sequence = get_current_scenario_sequence_view_(); + model_.get().template update_components(update_data, scenario_idx, current_scenario_sequence); + } + + void winddown_impl() { + model_.get().restore_components(get_current_scenario_sequence_view_()); + std::ranges::for_each(current_scenario_sequence_cache_, [](auto& comp_seq_idx) { comp_seq_idx.clear(); }); + } + + CalculationInfo get_calculation_info_impl() const { return model_.get().calculation_info(); } + + void thread_safe_add_calculation_info_impl(CalculationInfo const& info) { + std::lock_guard const lock{calculation_info_mutex_}; + model_.get().merge_calculation_info(info); + } + + auto get_current_scenario_sequence_view_() const { + return main_core::utils::run_functor_with_all_types_return_array([this]() { + constexpr auto comp_idx = main_core::utils::index_of_component; + if (std::get(independence_flags_)) { + return std::span{std::get(*all_scenarios_sequence_)}; + } + return std::span{std::get(current_scenario_sequence_cache_)}; + }); + } +}; +} // namespace power_grid_model diff --git a/power_grid_model_c/power_grid_model/include/power_grid_model/job_dispatch.hpp b/power_grid_model_c/power_grid_model/include/power_grid_model/job_dispatch.hpp index 05fd464f36..6cd8750789 100644 --- a/power_grid_model_c/power_grid_model/include/power_grid_model/job_dispatch.hpp +++ b/power_grid_model_c/power_grid_model/include/power_grid_model/job_dispatch.hpp @@ -4,33 +4,29 @@ #pragma once -#include "main_model_fwd.hpp" +#include "job_interface.hpp" #include "main_core/calculation_info.hpp" #include "main_core/update.hpp" -#include #include namespace power_grid_model { -template class JobDispatch { - private: - using SequenceIdxView = std::array, main_core::utils::n_types>; - +class JobDispatch { public: static constexpr Idx ignore_output{-1}; static constexpr Idx sequential{-1}; - template - requires std::invocable, MainModel&, MutableDataset const&, Idx> - static BatchParameter batch_calculation_(MainModel& model, CalculationInfo& calculation_info, - Calculate&& calculation_fn, MutableDataset const& result_data, - ConstDataset const& update_data, Idx threading = sequential) { - // if the update dataset is empty without any component - // execute one power flow in the current instance, no batch calculation is needed + // TODO(figueroa1395): remove calculation_fn dependency + // TODO(figueroa1395): add concept to Adapter template parameter + // TODO(figueroa1395): add generic template parameters for update_data and result_data + template + static BatchParameter batch_calculation(Adapter& adapter, Calculate&& calculation_fn, + MutableDataset const& result_data, ConstDataset const& update_data, + Idx threading = sequential) { if (update_data.empty()) { - std::forward(calculation_fn)(model, result_data, 0); + adapter.calculate(std::forward(calculation_fn), result_data); return BatchParameter{}; } @@ -43,89 +39,62 @@ template class JobDispatch { return BatchParameter{}; } - // calculate once to cache topology, ignore results, all math solvers are initialized - try { - calculation_fn(model, - { - false, - 1, - "sym_output", - model.meta_data(), - }, - ignore_output); - } catch (SparseMatrixError const&) { // NOLINT(bugprone-empty-catch) // NOSONAR - // missing entries are provided in the update data - } catch (NotObservableError const&) { // NOLINT(bugprone-empty-catch) // NOSONAR - // missing entries are provided in the update data - } + // calculate once to cache, ignore results + adapter.cache_calculate(std::forward(calculation_fn)); // error messages std::vector exceptions(n_scenarios, ""); - // thread-safe handling of calculation info - std::mutex calculation_info_mutex; - auto const thread_safe_add_calculation_info = [&calculation_info, - &calculation_info_mutex](CalculationInfo const& info) { - std::lock_guard const lock{calculation_info_mutex}; - main_core::merge_into(calculation_info, info); - }; - - // lambda for sub batch calculation - main_core::utils::SequenceIdx all_scenarios_sequence; - auto sub_batch = - sub_batch_calculation_(model, std::forward(calculation_fn), result_data, update_data, - all_scenarios_sequence, exceptions, thread_safe_add_calculation_info); + adapter.prepare_job_dispatch(update_data); + auto single_job = + single_thread_job(adapter, std::forward(calculation_fn), result_data, update_data, exceptions); - job_dispatch(sub_batch, n_scenarios, threading); + job_dispatch(single_job, n_scenarios, threading); handle_batch_exceptions(exceptions); return BatchParameter{}; } - template - requires std::invocable, MainModel&, MutableDataset const&, Idx> - static auto sub_batch_calculation_(MainModel const& base_model, Calculate&& calculation_fn, - MutableDataset const& result_data, ConstDataset const& update_data, - main_core::utils::SequenceIdx& all_scenarios_sequence, - std::vector& exceptions, - AddCalculationInfo&& thread_safe_add_calculation_info) { - // cache component update order where possible. - // the order for a cacheable (independent) component by definition is the same across all scenarios - auto const components_to_update = base_model.get_components_to_update(update_data); - auto const update_independence = main_core::update::independence::check_update_independence( - base_model.state(), update_data); - all_scenarios_sequence = main_core::update::get_all_sequence_idx_map( - base_model.state(), update_data, 0, components_to_update, update_independence, false); - - return [&base_model, &exceptions, &thread_safe_add_calculation_info, - calculation_fn_ = std::forward(calculation_fn), &result_data, &update_data, - &all_scenarios_sequence_ = std::as_const(all_scenarios_sequence), components_to_update, - update_independence](Idx start, Idx stride, Idx n_scenarios) { + private: + template + static auto single_thread_job(Adapter& base_adapter, Calculate&& calculation_fn, MutableDataset const& result_data, + ConstDataset const& update_data, std::vector& exceptions) { + return [&base_adapter, &exceptions, calculation_fn_ = std::forward(calculation_fn), &result_data, + &update_data](Idx start, Idx stride, Idx n_scenarios) { assert(n_scenarios <= narrow_cast(exceptions.size())); CalculationInfo thread_info; - Timer t_total(thread_info, 0000, "Total in thread"); + Timer t_total(thread_info, 0200, "Total batch calculation in thread"); - auto const copy_model_functor = [&base_model, &thread_info] { - Timer const t_copy_model_functor(thread_info, 1100, "Copy model"); - return MainModel{base_model}; + auto const copy_adapter_functor = [&base_adapter, &thread_info]() { + Timer const t_copy_adapter_functor(thread_info, 1100, "Copy model"); + return Adapter{base_adapter}; }; - auto model = copy_model_functor(); - auto current_scenario_sequence_cache = main_core::utils::SequenceIdx{}; - auto [setup, winddown] = - scenario_update_restore(model, update_data, components_to_update, update_independence, - all_scenarios_sequence_, current_scenario_sequence_cache, thread_info); + auto adapter = copy_adapter_functor(); + + auto setup = [&adapter, &update_data, &thread_info](Idx scenario_idx) { + Timer const t_update_model(thread_info, 1200, "Update model"); + adapter.setup(update_data, scenario_idx); + }; + + auto winddown = [&adapter, &thread_info]() { + Timer const t_restore_model(thread_info, 1201, "Restore model"); + adapter.winddown(); + }; + + auto recover_from_bad = [&adapter, ©_adapter_functor]() { adapter = copy_adapter_functor(); }; + + auto run = [&adapter, &calculation_fn_, &result_data, &thread_info](Idx scenario_idx) { + adapter.calculate(calculation_fn_, result_data, scenario_idx); + main_core::merge_into(thread_info, adapter.get_calculation_info()); + }; auto calculate_scenario = JobDispatch::call_with( - [&model, &calculation_fn_, &result_data, &thread_info](Idx scenario_idx) { - calculation_fn_(model, result_data, scenario_idx); - main_core::merge_into(thread_info, model.calculation_info()); - }, - std::move(setup), std::move(winddown), scenario_exception_handler(model, exceptions, thread_info), - [&model, ©_model_functor](Idx /*scenario_idx*/) { model = copy_model_functor(); }); + std::move(run), std::move(setup), std::move(winddown), + scenario_exception_handler(adapter, exceptions, thread_info), std::move(recover_from_bad)); for (Idx scenario_idx = start; scenario_idx < n_scenarios; scenario_idx += stride) { Timer const t_total_single(thread_info, 0100, "Total single calculation in thread"); @@ -133,25 +102,25 @@ template class JobDispatch { } t_total.stop(); - thread_safe_add_calculation_info(thread_info); + base_adapter.thread_safe_add_calculation_info(thread_info); }; } - template - requires std::invocable, Idx /*start*/, Idx /*stride*/, Idx /*n_scenarios*/> - static void job_dispatch(RunSubBatchFn sub_batch, Idx n_scenarios, Idx threading) { + template + requires std::invocable, Idx /*start*/, Idx /*stride*/, Idx /*n_scenarios*/> + static void job_dispatch(RunSingleJobFn single_thread_job, Idx n_scenarios, Idx threading) { // run batches sequential or parallel auto const n_thread = n_threads(n_scenarios, threading); if (n_thread == 1) { // run all in sequential - sub_batch(0, 1, n_scenarios); + single_thread_job(0, 1, n_scenarios); } else { // create parallel threads std::vector threads; threads.reserve(n_thread); for (Idx thread_number = 0; thread_number < n_thread; ++thread_number) { - // compute each sub batch with stride - threads.emplace_back(sub_batch, thread_number, n_thread, n_scenarios); + // compute each single thread job with stride + threads.emplace_back(single_thread_job, thread_number, n_thread, n_scenarios); } for (auto& thread : threads) { thread.join(); @@ -175,9 +144,9 @@ template class JobDispatch { typename RecoverFromBadFn> requires std::invocable, Args const&...> && std::invocable, Args const&...> && - std::invocable, Args const&...> && + std::invocable> && std::invocable, Args const&...> && - std::invocable, Args const&...> + std::invocable> static auto call_with(RunFn run, SetupFn setup, WinddownFn winddown, HandleExceptionFn handle_exception, RecoverFromBadFn recover_from_bad) { return [setup_ = std::move(setup), run_ = std::move(run), winddown_ = std::move(winddown), @@ -186,62 +155,23 @@ template class JobDispatch { try { setup_(args...); run_(args...); - winddown_(args...); + winddown_(); } catch (...) { handle_exception_(args...); try { - winddown_(args...); + winddown_(); } catch (...) { - recover_from_bad_(args...); + recover_from_bad_(); } } }; } - static auto scenario_update_restore( - MainModel& model, ConstDataset const& update_data, - main_core::utils::ComponentFlags const& components_to_store, - main_core::update::independence::UpdateIndependence const& do_update_cache, - main_core::utils::SequenceIdx const& all_scenario_sequence, - main_core::utils::SequenceIdx& current_scenario_sequence_cache, - CalculationInfo& info) noexcept { - main_core::utils::ComponentFlags independence_flags{}; - std::ranges::transform(do_update_cache, independence_flags.begin(), - [](auto const& comp) { return comp.is_independent(); }); - auto const scenario_sequence = [&all_scenario_sequence, ¤t_scenario_sequence_cache, - independence_flags_ = std::move(independence_flags)]() -> SequenceIdxView { - return main_core::utils::run_functor_with_all_types_return_array( - [&all_scenario_sequence, ¤t_scenario_sequence_cache, &independence_flags_]() { - constexpr auto comp_idx = main_core::utils::index_of_component; - if (std::get(independence_flags_)) { - return std::span{std::get(all_scenario_sequence)}; - } - return std::span{std::get(current_scenario_sequence_cache)}; - }); - }; - - return std::make_pair( - [&model, &update_data, scenario_sequence, ¤t_scenario_sequence_cache, &components_to_store, - do_update_cache_ = std::move(do_update_cache), &info](Idx scenario_idx) { - Timer const t_update_model(info, 1200, "Update model"); - current_scenario_sequence_cache = main_core::update::get_all_sequence_idx_map( - model.state(), update_data, scenario_idx, components_to_store, do_update_cache_, true); - - model.template update_components(update_data, scenario_idx, scenario_sequence()); - }, - [&model, scenario_sequence, ¤t_scenario_sequence_cache, &info](Idx /*scenario_idx*/) { - Timer const t_update_model(info, 1201, "Restore model"); - - model.restore_components(scenario_sequence()); - std::ranges::for_each(current_scenario_sequence_cache, - [](auto& comp_seq_idx) { comp_seq_idx.clear(); }); - }); - } - // Lippincott pattern - static auto scenario_exception_handler(MainModel& model, std::vector& messages, + template + static auto scenario_exception_handler(Adapter& adapter, std::vector& messages, CalculationInfo& info) { - return [&model, &messages, &info](Idx scenario_idx) { + return [&adapter, &messages, &info](Idx scenario_idx) { std::exception_ptr const ex_ptr = std::current_exception(); try { std::rethrow_exception(ex_ptr); @@ -250,7 +180,7 @@ template class JobDispatch { } catch (...) { messages[scenario_idx] = "unknown exception"; } - info.merge(model.calculation_info()); + info.merge(adapter.get_calculation_info()); }; } diff --git a/power_grid_model_c/power_grid_model/include/power_grid_model/job_interface.hpp b/power_grid_model_c/power_grid_model/include/power_grid_model/job_interface.hpp new file mode 100644 index 0000000000..a8bf5aab69 --- /dev/null +++ b/power_grid_model_c/power_grid_model/include/power_grid_model/job_interface.hpp @@ -0,0 +1,64 @@ +// SPDX-FileCopyrightText: Contributors to the Power Grid Model project +// +// SPDX-License-Identifier: MPL-2.0 + +#pragma once + +// batch dispatch interface class + +#include "common/calculation_info.hpp" +#include "common/common.hpp" + +#include +#include +#include +#include + +namespace power_grid_model { +template class JobDispatchInterface { + public: + template + requires requires(Adapter& adapter, Calculate&& calculation_fn, ResultDataset const& result_data, Idx pos) { + { adapter.calculate_impl(std::forward(calculation_fn), result_data, pos) } -> std::same_as; + } + void calculate(Calculate&& calculation_fn, ResultDataset const& result_data, Idx pos = 0) { + return static_cast(this)->calculate_impl(std::forward(calculation_fn), result_data, pos); + } + + template + requires requires(Adapter& adapter, Calculate&& calculation_fn) { + { adapter.cache_calculate_impl(std::forward(calculation_fn)) } -> std::same_as; + } + void cache_calculate(Calculate&& calculation_fn) { + return static_cast(this)->cache_calculate_impl(std::forward(calculation_fn)); + } + + template void prepare_job_dispatch(UpdateDataset const& update_data) { + return static_cast(this)->prepare_job_dispatch_impl(update_data); + } + + template void setup(UpdateDataset const& update_data, Idx scenario_idx) { + return static_cast(this)->setup_impl(update_data, scenario_idx); + } + + void winddown() { return static_cast(this)->winddown_impl(); } + + CalculationInfo get_calculation_info() const { + return static_cast(this)->get_calculation_info_impl(); + } + + void thread_safe_add_calculation_info(CalculationInfo const& info) { + static_cast(this)->thread_safe_add_calculation_info_impl(info); + } + + protected: + // Protected & defaulted special members — CRTP: only the derived can create/copy/move this base + JobDispatchInterface() = default; + JobDispatchInterface(const JobDispatchInterface& /*other*/) = default; + JobDispatchInterface& operator=(const JobDispatchInterface& /*other*/) = default; + JobDispatchInterface(JobDispatchInterface&& /*other*/) noexcept = default; + JobDispatchInterface& operator=(JobDispatchInterface&& /*other*/) noexcept = default; + ~JobDispatchInterface() = default; +}; + +} // namespace power_grid_model diff --git a/power_grid_model_c/power_grid_model/include/power_grid_model/main_model_impl.hpp b/power_grid_model_c/power_grid_model/include/power_grid_model/main_model_impl.hpp index 5211e6d4f9..17755fb5b0 100644 --- a/power_grid_model_c/power_grid_model/include/power_grid_model/main_model_impl.hpp +++ b/power_grid_model_c/power_grid_model/include/power_grid_model/main_model_impl.hpp @@ -10,6 +10,7 @@ #include "batch_parameter.hpp" #include "calculation_parameters.hpp" #include "container.hpp" +#include "job_adapter.hpp" #include "job_dispatch.hpp" #include "main_model_fwd.hpp" #include "topology.hpp" @@ -151,14 +152,10 @@ class MainModelImpl, ComponentLis using SequenceIdxView = std::array, main_core::utils::n_types>; using OwnedUpdateDataset = std::tuple...>; - using JobDispatcher = - JobDispatch, ComponentList>, - ComponentType...>; - - static constexpr Idx ignore_output{JobDispatcher::ignore_output}; + static constexpr Idx ignore_output{JobDispatch::ignore_output}; static constexpr Idx isolated_component{-1}; static constexpr Idx not_connected{-1}; - static constexpr Idx sequential{JobDispatcher::sequential}; + static constexpr Idx sequential{JobDispatch::sequential}; public: using Options = MainModelOptions; @@ -459,26 +456,6 @@ class MainModelImpl, ComponentLis }; } - /* - run the calculation function in batch on the provided update data. - - The calculation function should be able to run standalone. - It should output to the provided result_data if the trailing argument is not ignore_output. - - threading - < 0 sequential - = 0 parallel, use number of hardware threads - > 0 specify number of parallel threads - raise a BatchCalculationError if any of the calculations in the batch raised an exception - */ - template - requires std::invocable, MainModelImpl&, MutableDataset const&, Idx> - BatchParameter batch_calculation_(Calculate&& calculation_fn, MutableDataset const& result_data, - ConstDataset const& update_data, Idx threading = sequential) { - return JobDispatcher::batch_calculation_(*this, calculation_info_, std::forward(calculation_fn), - result_data, update_data, threading); - } - // Calculate with optimization, e.g., automatic tap changer template auto calculate(Options const& options) { auto const calculator = [this, &options] { @@ -534,10 +511,28 @@ class MainModelImpl, ComponentLis } public: - // Batch calculation, propagating the results to result_data + /* + Batch calculation, propagating the results to result_data + + Run the calculation function in batch on the provided update data. + + The calculation function should be able to run standalone. + It should output to the provided result_data if the trailing argument is not ignore_output. + + threading + < 0 sequential + = 0 parallel, use number of hardware threads + > 0 specify number of parallel threads + raise a BatchCalculationError if any of the calculations in the batch raised an exception + */ BatchParameter calculate(Options const& options, MutableDataset const& result_data, ConstDataset const& update_data) { - return batch_calculation_( + JobDispatchAdapter< + MainModelImpl, ComponentList>, + ComponentType...> + adapter{std::ref(*this)}; + return JobDispatch::batch_calculation( + adapter, [&options](MainModelImpl& model, MutableDataset const& target_data, Idx pos) { auto sub_opt = options; // copy sub_opt.err_tol = pos != ignore_output ? options.err_tol : std::numeric_limits::max(); @@ -549,6 +544,10 @@ class MainModelImpl, ComponentLis } CalculationInfo calculation_info() const { return calculation_info_; } + void merge_calculation_info(CalculationInfo const& info) { + assert(construction_complete_); + main_core::merge_into(calculation_info_, info); + } auto const& state() const { assert(construction_complete_); return state_;