From f8353c612fb655348fdd69ff942a04560d8d55dc Mon Sep 17 00:00:00 2001 From: Karen Arutyunov Date: Tue, 5 Nov 2024 12:11:22 +0200 Subject: Retry database operations on recoverable failures in ci_start class functions --- mod/ci-common.cxx | 470 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 290 insertions(+), 180 deletions(-) (limited to 'mod/ci-common.cxx') diff --git a/mod/ci-common.cxx b/mod/ci-common.cxx index c7dfdb9..cc361d2 100644 --- a/mod/ci-common.cxx +++ b/mod/ci-common.cxx @@ -541,6 +541,7 @@ namespace brep const basic_mark&, const basic_mark* trace, odb::core::database& db, + size_t retry, tenant_service&& service, duration notify_interval, duration notify_delay, @@ -551,107 +552,135 @@ namespace brep assert (mode == duplicate_tenant_mode::fail || !service.id.empty ()); assert (!transaction::has_current ()); + build_tenant t; duplicate_tenant_result r (duplicate_tenant_result::created); - transaction tr (db.begin ()); - - // Unless we are in the 'fail on duplicate' mode, check if this service - // type/id pair is already in use and, if that's the case, either ignore - // it or reassign this service to a new tenant, canceling the old one. - // - if (mode != duplicate_tenant_mode::fail) + for (string request_id;;) { - using query = query; - - shared_ptr t ( - db.query_one (query::service.id == service.id && - query::service.type == service.type)); - if (t != nullptr) + try { - // Reduce the replace_archived mode to the replace or ignore mode. + transaction tr (db.begin ()); + + // Unless we are in the 'fail on duplicate' mode, check if this + // service type/id pair is already in use and, if that's the case, + // either ignore it or reassign this service to a new tenant, + // canceling the old one. // - if (mode == duplicate_tenant_mode::replace_archived) + if (mode != duplicate_tenant_mode::fail) { - mode = (t->archived - ? duplicate_tenant_mode::replace - : duplicate_tenant_mode::ignore); + using query = query; + + shared_ptr t ( + db.query_one (query::service.id == service.id && + query::service.type == service.type)); + if (t != nullptr) + { + // Reduce the replace_archived mode to the replace or ignore mode. + // + if (mode == duplicate_tenant_mode::replace_archived) + { + mode = (t->archived + ? duplicate_tenant_mode::replace + : duplicate_tenant_mode::ignore); + } + + // Bail out in the ignore mode and cancel the tenant in the + // replace mode. + // + if (mode == duplicate_tenant_mode::ignore) + return make_pair (move (t->id), duplicate_tenant_result::ignored); + + assert (mode == duplicate_tenant_mode::replace); + + if (t->unloaded_timestamp) + { + db.erase (t); + } + else + { + t->service = nullopt; + t->archived = true; + db.update (t); + } + + r = duplicate_tenant_result::replaced; + } } - // Bail out in the ignore mode and cancel the tenant in the replace - // mode. + // Generate the request id. // - if (mode == duplicate_tenant_mode::ignore) - return make_pair (move (t->id), duplicate_tenant_result::ignored); - - assert (mode == duplicate_tenant_mode::replace); - - if (t->unloaded_timestamp) + if (request_id.empty ()) + try { - db.erase (t); + request_id = uuid::generate ().string (); } - else + catch (const system_error& e) { - t->service = nullopt; - t->archived = true; - db.update (t); + error << "unable to generate request id: " << e; + return nullopt; } - r = duplicate_tenant_result::replaced; - } - } - - // Generate the request id. - // - string request_id; - - try - { - request_id = uuid::generate ().string (); - } - catch (const system_error& e) - { - error << "unable to generate request id: " << e; - return nullopt; - } - - // Use the generated request id if the tenant service id is not specified. - // - if (service.id.empty ()) - service.id = request_id; - - build_tenant t (move (request_id), - move (service), - system_clock::now () - notify_interval + notify_delay, - notify_interval); - // Note that in contrast to brep-load, we know that the tenant id is - // unique and thus we don't try to remove a tenant with such an id. - // There is also not much reason to assume that we may have switched - // from the single-tenant mode here and remove the respective tenant, - // unless we are in the tenant-service functionality development mode. - // + // Use the generated request id if the tenant service id is not + // specified. + // + if (service.id.empty ()) + service.id = request_id; + + t = build_tenant (move (request_id), + move (service), + system_clock::now () - notify_interval + notify_delay, + notify_interval); + + // Note that in contrast to brep-load, we know that the tenant id is + // unique and thus we don't try to remove a tenant with such an id. + // There is also not much reason to assume that we may have switched + // from the single-tenant mode here and remove the respective tenant, + // unless we are in the tenant-service functionality development mode. + // #ifdef BREP_CI_TENANT_SERVICE_UNLOADED - cstrings ts ({""}); + cstrings ts ({""}); - db.erase_query ( - query::id.tenant.in_range (ts.begin (), ts.end ())); + db.erase_query ( + query::id.tenant.in_range (ts.begin (), ts.end ())); - db.erase_query ( - query::id.tenant.in_range (ts.begin (), ts.end ())); + db.erase_query ( + query::id.tenant.in_range (ts.begin (), ts.end ())); - db.erase_query ( - query::id.tenant.in_range (ts.begin (), ts.end ())); + db.erase_query ( + query::id.tenant.in_range (ts.begin (), ts.end ())); - db.erase_query ( - query::id.in_range (ts.begin (), ts.end ())); + db.erase_query ( + query::id.in_range (ts.begin (), ts.end ())); #endif - db.persist (t); + db.persist (t); - tr.commit (); + tr.commit (); - if (trace != nullptr) - *trace << "unloaded CI request " << t.id << " for service " - << t.service->id << ' ' << t.service->type << " is created"; + if (trace != nullptr) + *trace << "unloaded CI request " << t.id << " for service " + << t.service->id << ' ' << t.service->type << " is created"; + + // Bail out if we have successfully erased, updated, or persisted the + // tenant object. + // + break; + } + catch (const odb::recoverable& e) + { + // If no more retries left, don't re-throw odb::recoverable not to + // retry at the upper level. + // + if (retry-- == 0) + throw runtime_error (e.what ()); + + // Prepare for the next iteration. + // + request_id = move (t.id); + service = move (*t.service); + r = duplicate_tenant_result::created; + } + } return make_pair (move (t.id), r); } @@ -661,51 +690,69 @@ namespace brep const basic_mark& warn, const basic_mark* trace, odb::core::database& db, + size_t retry, tenant_service&& service, const repository_location& repository) const { using namespace odb::core; string request_id; + + for (;;) { - assert (!transaction::has_current ()); + try + { + assert (!transaction::has_current ()); - transaction tr (db.begin ()); + transaction tr (db.begin ()); - using query = query; + using query = query; - shared_ptr t ( - db.query_one (query::service.id == service.id && - query::service.type == service.type)); + shared_ptr t ( + db.query_one (query::service.id == service.id && + query::service.type == service.type)); - if (t == nullptr) - { - error << "unable to find tenant for service " << service.id << ' ' - << service.type; + if (t == nullptr) + { + error << "unable to find tenant for service " << service.id << ' ' + << service.type; - return nullopt; - } - else if (t->archived) - { - error << "tenant " << t->id << " for service " << service.id << ' ' - << service.type << " is already archived"; + return nullopt; + } + else if (t->archived) + { + error << "tenant " << t->id << " for service " << service.id << ' ' + << service.type << " is already archived"; - return nullopt; - } - else if (!t->unloaded_timestamp) - { - error << "tenant " << t->id << " for service " << service.id << ' ' - << service.type << " is already loaded"; + return nullopt; + } + else if (!t->unloaded_timestamp) + { + error << "tenant " << t->id << " for service " << service.id << ' ' + << service.type << " is already loaded"; - return nullopt; - } + return nullopt; + } - t->unloaded_timestamp = nullopt; - db.update (t); + t->unloaded_timestamp = nullopt; + db.update (t); - tr.commit (); + tr.commit (); - request_id = move (t->id); + request_id = move (t->id); + + // Bail out if we have successfully updated the tenant object. + // + break; + } + catch (const odb::recoverable& e) + { + // If no more retries left, don't re-throw odb::recoverable not to + // retry at the upper level. + // + if (retry-- == 0) + throw runtime_error (e.what ()); + } } assert (options_ != nullptr); // Shouldn't be called otherwise. @@ -739,6 +786,7 @@ namespace brep const basic_mark&, const basic_mark* trace, odb::core::database& db, + size_t retry, const string& type, const string& id) const { @@ -746,34 +794,57 @@ namespace brep assert (!transaction::has_current ()); - transaction tr (db.begin ()); + optional r; - using query = query; + for (;;) + { + try + { + transaction tr (db.begin ()); - shared_ptr t ( - db.query_one (query::service.id == id && - query::service.type == type)); - if (t == nullptr) - return nullopt; + using query = query; - optional r (move (t->service)); + shared_ptr t ( + db.query_one (query::service.id == id && + query::service.type == type)); + if (t == nullptr) + return nullopt; - if (t->unloaded_timestamp) - { - db.erase (t); - } - else - { - t->service = nullopt; - t->archived = true; - db.update (t); - } + r = move (t->service); - tr.commit (); + if (t->unloaded_timestamp) + { + db.erase (t); + } + else + { + t->service = nullopt; + t->archived = true; + db.update (t); + } - if (trace != nullptr) - *trace << "CI request " << t->id << " for service " << id << ' ' << type - << " is canceled"; + tr.commit (); + + if (trace != nullptr) + *trace << "CI request " << t->id << " for service " << id << ' ' + << type << " is canceled"; + + // Bail out if we have successfully updated or erased the tenant + // object. + // + break; + } + catch (const odb::recoverable& e) + { + // If no more retries left, don't re-throw odb::recoverable not to + // retry at the upper level. + // + if (retry-- == 0) + throw runtime_error (e.what ()); + + r = nullopt; // Prepare for the next iteration. + } + } return r; } @@ -784,30 +855,50 @@ namespace brep const basic_mark* trace, const string& reason, odb::core::database& db, + size_t retry, const string& tid) const { using namespace odb::core; assert (!transaction::has_current ()); - transaction tr (db.begin ()); + for (;;) + { + try + { + transaction tr (db.begin ()); - shared_ptr t (db.find (tid)); + shared_ptr t (db.find (tid)); - if (t == nullptr) - return false; + if (t == nullptr) + return false; - if (t->unloaded_timestamp) - { - db.erase (t); - } - else if (!t->archived) - { - t->archived = true; - db.update (t); - } + if (t->unloaded_timestamp) + { + db.erase (t); + } + else if (!t->archived) + { + t->archived = true; + db.update (t); + } - tr.commit (); + tr.commit (); + + // Bail out if we have successfully updated or erased the tenant + // object. + // + break; + } + catch (const odb::recoverable& e) + { + // If no more retries left, don't re-throw odb::recoverable not to + // retry at the upper level. + // + if (retry-- == 0) + throw runtime_error (e.what ()); + } + } if (trace != nullptr) *trace << "CI request " << tid << " is canceled: " @@ -820,61 +911,80 @@ namespace brep optional ci_start:: rebuild (odb::core::database& db, + size_t retry, const build_id& id, function (const tenant_service&, build_state)> uf) const { using namespace odb::core; - //@@@ Should we not retry transactions (here and in other functions that - // update)? + build_state s; - // NOTE: don't forget to update build_force::handle() if changing anything - // here. - // - transaction t (db.begin ()); - - package_build pb; - if (!db.query_one (query::build::id == id, - pb) || - pb.archived) + for (;;) { - return nullopt; - } - - const shared_ptr& b (pb.build); - build_state s (b->state); - - if (s != build_state::queued) - { - force_state force (s == build_state::built - ? force_state::forced - : force_state::forcing); - - if (b->force != force) - { - b->force = force; - db.update (b); - } - - if (uf != nullptr) + try { - shared_ptr t (db.load (b->tenant)); + // NOTE: don't forget to update build_force::handle() if changing + // anything here. + // + transaction t (db.begin ()); - assert (t->service); + package_build pb; + if (!db.query_one (query::build::id == id, + pb) || + pb.archived) + { + return nullopt; + } - tenant_service& ts (*t->service); + const shared_ptr& b (pb.build); + s = b->state; - if (optional data = uf (ts, s)) + if (s != build_state::queued) { - ts.data = move (*data); - db.update (t); + force_state force (s == build_state::built + ? force_state::forced + : force_state::forcing); + + if (b->force != force) + { + b->force = force; + db.update (b); + } + + if (uf != nullptr) + { + shared_ptr t (db.load (b->tenant)); + + assert (t->service); + + tenant_service& ts (*t->service); + + if (optional data = uf (ts, s)) + { + ts.data = move (*data); + db.update (t); + } + } } + + t.commit (); + + // Bail out if we have successfully updated the build and tenant + // objects. + // + break; + } + catch (const odb::recoverable& e) + { + // If no more retries left, don't re-throw odb::recoverable not to + // retry at the upper level. + // + if (retry-- == 0) + throw runtime_error (e.what ()); } } - t.commit (); - return s; } -- cgit v1.1