From 42d9950f8642f26fede523d0a2ae6b1b21a589a4 Mon Sep 17 00:00:00 2001 From: AybH26 <58746253+AybH26@users.noreply.github.com> Date: Thu, 18 Jun 2026 17:47:29 +0200 Subject: [PATCH] fix(compute_worker): make submission lifecycle idempotent against broker redelivery --- compute_worker/compute_worker.py | 37 +++++++++++ src/apps/api/serializers/submissions.py | 11 ++++ src/apps/api/views/submissions.py | 62 ++++++++++++++++--- .../0063_submission_worker_attempt_count.py | 18 ++++++ src/apps/competitions/models.py | 3 + 5 files changed, 124 insertions(+), 7 deletions(-) create mode 100644 src/apps/competitions/migrations/0063_submission_worker_attempt_count.py diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 243dcd16d..3c999dc63 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -310,6 +310,16 @@ def run_wrapper(run_args): run_args.update(secret=str(run_args["secret"])) logger.info(f"Received run arguments: \n {colorize_run_args(json.dumps(run_args))}") run = Run(run_args) + + state = run._fetch_submission_state() + if state and state.get("is_terminal"): + logger.warning( + f"Submission {run.submission_id} is already in terminal state " + f"'{state.get('status')}' (worker_attempt_count={state.get('worker_attempt_count')}). " + f"Skipping redelivered task to avoid duplicate execution." + ) + return {"skipped": True, "reason": "already_terminal", "state": state} + try: run.prepare() run.start() @@ -629,6 +639,33 @@ def _update_submission(self, data): ) raise SubmissionException("Failure updating submission data.") + def _fetch_submission_state(self): + """probe the API for the current submission state on entry. + + Returns a dict with at least ``status`` and ``is_terminal`` when the + API is reachable. Returns ``None`` on any error: the caller falls + back to the legacy "just run it" behaviour so we never block a + legitimate execution because of a transient API blip. + """ + url = f"{self.submissions_api_url}/submissions/{self.submission_id}/worker_state/" + try: + resp = self.requests_session.get( + url, params={"secret": self.secret}, timeout=30, + ) + except Exception: + logger.exception("worker_state fetch failed; proceeding with run anyway") + return None + if resp.status_code != 200: + logger.warning( + f"worker_state fetch returned {resp.status_code}; proceeding with run anyway" + ) + return None + try: + return resp.json() + except Exception: + logger.exception("worker_state response was not JSON; proceeding with run anyway") + return None + def _update_status(self, status, extra_information=None): # Update submission status if status not in SubmissionStatus.AVAILABLE_STATUSES: diff --git a/src/apps/api/serializers/submissions.py b/src/apps/api/serializers/submissions.py index 9c91737ca..4e4b8c124 100644 --- a/src/apps/api/serializers/submissions.py +++ b/src/apps/api/serializers/submissions.py @@ -172,6 +172,17 @@ def update(self, submission, validated_data): # Task of a submission cannot be updated if "task" in validated_data: raise PermissionDenied("Task of a submission cannot be update") + + TERMINAL = (Submission.FINISHED, Submission.FAILED, Submission.CANCELLED) + if submission.status in TERMINAL: + incoming_status = validated_data.get('status') + if incoming_status and incoming_status != submission.status: + logger.warning( + "Ignoring redelivered PATCH on terminal submission %s " + "(current=%s, incoming=%s)", + submission.pk, submission.status, incoming_status, + ) + return submission # Update status if it is there in validated data if "status" in validated_data: diff --git a/src/apps/api/views/submissions.py b/src/apps/api/views/submissions.py index bfd6f8889..e04f79d76 100644 --- a/src/apps/api/views/submissions.py +++ b/src/apps/api/views/submissions.py @@ -42,6 +42,8 @@ def check_object_permissions(self, request, obj): return if self.request and self.request.method in ('POST', 'PUT', 'PATCH'): dir(self.request) + TERMINAL = (Submission.FINISHED, Submission.FAILED, Submission.CANCELLED) + obj_is_terminal = obj.status in TERMINAL # Set hostname of submission if "status_details" in self.request.data.keys(): # Check ingestion hostname @@ -49,11 +51,17 @@ def check_object_permissions(self, request, obj): hostname = request.data['status_details'].replace('ingestion_hostname-', '') obj.ingestion_worker_hostname = hostname obj.save() + if not obj_is_terminal: + obj.ingestion_worker_hostname = hostname + obj.save() # Check socring hostname if request.data['status_details'].find('scoring_hostname') != -1: hostname = request.data['status_details'].replace('scoring_hostname-', '') obj.scoring_worker_hostname = hostname obj.save() + if not obj_is_terminal: + obj.scoring_worker_hostname = hostname + obj.save() # check if type is in request data. type can have the following values # - Docker_Image_Pull_Fail @@ -198,6 +206,38 @@ def create(self, request, *args, **kwargs): raise ValidationError('You do not have participant permissions for this group') return super(SubmissionViewSet, self).create(request, *args, **kwargs) + @action(detail=True, methods=['get'], permission_classes=[AllowAny], url_path='worker_state') + def worker_state(self, request, pk=None): + """Lightweight state lookup for compute_workers. + + Authenticated via the submission secret (query param). Returns whether + the submission is in a terminal state so a redelivered worker can + short-circuit instead of re-executing. Each call atomically bumps + ``worker_attempt_count`` to provide an audit trail of redeliveries. + """ + from django.db.models import F + submission = get_object_or_404(Submission, pk=pk) + secret = request.query_params.get('secret') + try: + if not secret or uuid.UUID(secret) != submission.secret: + raise PermissionDenied('Submission secrets do not match') + except (TypeError, ValueError): + raise PermissionDenied('Invalid secret') + TERMINAL = (Submission.FINISHED, Submission.FAILED, Submission.CANCELLED) + is_terminal = submission.status in TERMINAL + Submission.objects.filter(pk=submission.pk).update( + worker_attempt_count=F('worker_attempt_count') + 1, + ) + submission.refresh_from_db(fields=['worker_attempt_count']) + return Response({ + 'id': submission.id, + 'status': submission.status, + 'is_terminal': is_terminal, + 'has_scoring_result': bool(submission.scoring_result), + 'has_prediction_result': bool(submission.prediction_result), + 'worker_attempt_count': submission.worker_attempt_count, + }) + def destroy(self, request, *args, **kwargs): """ - If user is neither owner nor admin, user cannot delete the submission @@ -593,13 +633,22 @@ def upload_submission_scores(request, submission_pk): for column_key, score in request.data.get("scores").items(): if column_key not in competition_columns: continue - score = SubmissionScore.objects.create( - score=score, - column=Column.objects.get(leaderboard=submission.phase.leaderboard, key=column_key) - ) - submission.scores.add(score) + column = Column.objects.get(leaderboard=submission.phase.leaderboard, key=column_key) + # idempotent score write. A redelivered worker would otherwise + # add a second SubmissionScore for the same (submission, column), + # which both inflates the leaderboard and crashes calculate_scores() + # on .get(column=column). + existing = submission.scores.filter(column=column).first() + if existing is not None: + existing.score = score + existing.save(update_fields=['score']) + score_obj = existing + else: + score_obj = SubmissionScore.objects.create(score=score, column=column) + submission.scores.add(score_obj) if submission.parent: - submission.parent.scores.add(score) + if not submission.parent.scores.filter(column=column).exists(): + submission.parent.scores.add(score_obj) submission.parent.calculate_scores() else: submission.calculate_scores() @@ -607,7 +656,6 @@ def upload_submission_scores(request, submission_pk): put_on_leaderboard_by_submission_rule(request, submission_pk, submission_rule) return Response() - @api_view(('GET',)) def can_make_submission(request, phase_id): phase = get_object_or_404(Phase, id=phase_id) diff --git a/src/apps/competitions/migrations/0063_submission_worker_attempt_count.py b/src/apps/competitions/migrations/0063_submission_worker_attempt_count.py new file mode 100644 index 000000000..f6765d07e --- /dev/null +++ b/src/apps/competitions/migrations/0063_submission_worker_attempt_count.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.14 on 2026-06-18 13:01 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("competitions", "0062_idempotencyrecord"), + ] + + operations = [ + migrations.AddField( + model_name="submission", + name="worker_attempt_count", + field=models.PositiveIntegerField(default=0), + ), + ] diff --git a/src/apps/competitions/models.py b/src/apps/competitions/models.py index 87b0b5513..70a5570b4 100644 --- a/src/apps/competitions/models.py +++ b/src/apps/competitions/models.py @@ -492,6 +492,9 @@ class Submission(models.Model): ingestion_worker_hostname = models.CharField(max_length=255, blank=True, null=True) # Scoring hostname scoring_worker_hostname = models.CharField(max_length=255, blank=True, null=True) + # Incremented every time a worker claims this submission from the broker. + # Used to detect redeliveries (M6) and provide an audit trail. + worker_attempt_count = models.PositiveIntegerField(default=0) queue = models.ForeignKey('queues.Queue', on_delete=models.SET_NULL, null=True, blank=True, related_name='submissions') is_migrated = models.BooleanField(default=False)