Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions compute_worker/compute_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,16 @@ def run_wrapper(run_args):
run_args.update(secret=str(run_args["secret"]))
logger.info(f"Received run arguments: \n {colorize_run_args(json.dumps(run_args))}")
run = Run(run_args)

state = run._fetch_submission_state()
if state and state.get("is_terminal"):
logger.warning(
f"Submission {run.submission_id} is already in terminal state "
f"'{state.get('status')}' (worker_attempt_count={state.get('worker_attempt_count')}). "
f"Skipping redelivered task to avoid duplicate execution."
)
return {"skipped": True, "reason": "already_terminal", "state": state}

try:
run.prepare()
run.start()
Expand Down Expand Up @@ -629,6 +639,33 @@ def _update_submission(self, data):
)
raise SubmissionException("Failure updating submission data.")

def _fetch_submission_state(self):
"""probe the API for the current submission state on entry.

Returns a dict with at least ``status`` and ``is_terminal`` when the
API is reachable. Returns ``None`` on any error: the caller falls
back to the legacy "just run it" behaviour so we never block a
legitimate execution because of a transient API blip.
"""
url = f"{self.submissions_api_url}/submissions/{self.submission_id}/worker_state/"
try:
resp = self.requests_session.get(
url, params={"secret": self.secret}, timeout=30,
)
except Exception:
logger.exception("worker_state fetch failed; proceeding with run anyway")
return None
if resp.status_code != 200:
logger.warning(
f"worker_state fetch returned {resp.status_code}; proceeding with run anyway"
)
return None
try:
return resp.json()
except Exception:
logger.exception("worker_state response was not JSON; proceeding with run anyway")
return None

def _update_status(self, status, extra_information=None):
# Update submission status
if status not in SubmissionStatus.AVAILABLE_STATUSES:
Expand Down
11 changes: 11 additions & 0 deletions src/apps/api/serializers/submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@ def update(self, submission, validated_data):
# Task of a submission cannot be updated
if "task" in validated_data:
raise PermissionDenied("Task of a submission cannot be update")

TERMINAL = (Submission.FINISHED, Submission.FAILED, Submission.CANCELLED)
if submission.status in TERMINAL:
incoming_status = validated_data.get('status')
if incoming_status and incoming_status != submission.status:
logger.warning(
"Ignoring redelivered PATCH on terminal submission %s "
"(current=%s, incoming=%s)",
submission.pk, submission.status, incoming_status,
)
return submission

# Update status if it is there in validated data
if "status" in validated_data:
Expand Down
62 changes: 55 additions & 7 deletions src/apps/api/views/submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,26 @@ def check_object_permissions(self, request, obj):
return
if self.request and self.request.method in ('POST', 'PUT', 'PATCH'):
dir(self.request)
TERMINAL = (Submission.FINISHED, Submission.FAILED, Submission.CANCELLED)
obj_is_terminal = obj.status in TERMINAL
# Set hostname of submission
if "status_details" in self.request.data.keys():
# Check ingestion hostname
if request.data['status_details'].find('ingestion_hostname') != -1:
hostname = request.data['status_details'].replace('ingestion_hostname-', '')
obj.ingestion_worker_hostname = hostname
obj.save()
if not obj_is_terminal:
obj.ingestion_worker_hostname = hostname
obj.save()
# Check socring hostname
if request.data['status_details'].find('scoring_hostname') != -1:
hostname = request.data['status_details'].replace('scoring_hostname-', '')
obj.scoring_worker_hostname = hostname
obj.save()
if not obj_is_terminal:
obj.scoring_worker_hostname = hostname
obj.save()

# check if type is in request data. type can have the following values
# - Docker_Image_Pull_Fail
Expand Down Expand Up @@ -198,6 +206,38 @@ def create(self, request, *args, **kwargs):
raise ValidationError('You do not have participant permissions for this group')
return super(SubmissionViewSet, self).create(request, *args, **kwargs)

@action(detail=True, methods=['get'], permission_classes=[AllowAny], url_path='worker_state')
def worker_state(self, request, pk=None):
"""Lightweight state lookup for compute_workers.

Authenticated via the submission secret (query param). Returns whether
the submission is in a terminal state so a redelivered worker can
short-circuit instead of re-executing. Each call atomically bumps
``worker_attempt_count`` to provide an audit trail of redeliveries.
"""
from django.db.models import F
submission = get_object_or_404(Submission, pk=pk)
secret = request.query_params.get('secret')
try:
if not secret or uuid.UUID(secret) != submission.secret:
raise PermissionDenied('Submission secrets do not match')
except (TypeError, ValueError):
raise PermissionDenied('Invalid secret')
TERMINAL = (Submission.FINISHED, Submission.FAILED, Submission.CANCELLED)
is_terminal = submission.status in TERMINAL
Submission.objects.filter(pk=submission.pk).update(
worker_attempt_count=F('worker_attempt_count') + 1,
)
submission.refresh_from_db(fields=['worker_attempt_count'])
return Response({
'id': submission.id,
'status': submission.status,
'is_terminal': is_terminal,
'has_scoring_result': bool(submission.scoring_result),
'has_prediction_result': bool(submission.prediction_result),
'worker_attempt_count': submission.worker_attempt_count,
})

def destroy(self, request, *args, **kwargs):
"""
- If user is neither owner nor admin, user cannot delete the submission
Expand Down Expand Up @@ -593,21 +633,29 @@ def upload_submission_scores(request, submission_pk):
for column_key, score in request.data.get("scores").items():
if column_key not in competition_columns:
continue
score = SubmissionScore.objects.create(
score=score,
column=Column.objects.get(leaderboard=submission.phase.leaderboard, key=column_key)
)
submission.scores.add(score)
column = Column.objects.get(leaderboard=submission.phase.leaderboard, key=column_key)
# idempotent score write. A redelivered worker would otherwise
# add a second SubmissionScore for the same (submission, column),
# which both inflates the leaderboard and crashes calculate_scores()
# on .get(column=column).
existing = submission.scores.filter(column=column).first()
if existing is not None:
existing.score = score
existing.save(update_fields=['score'])
score_obj = existing
else:
score_obj = SubmissionScore.objects.create(score=score, column=column)
submission.scores.add(score_obj)
if submission.parent:
submission.parent.scores.add(score)
if not submission.parent.scores.filter(column=column).exists():
submission.parent.scores.add(score_obj)
submission.parent.calculate_scores()
else:
submission.calculate_scores()

put_on_leaderboard_by_submission_rule(request, submission_pk, submission_rule)
return Response()


@api_view(('GET',))
def can_make_submission(request, phase_id):
phase = get_object_or_404(Phase, id=phase_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.2.14 on 2026-06-18 13:01

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("competitions", "0062_idempotencyrecord"),
]

operations = [
migrations.AddField(
model_name="submission",
name="worker_attempt_count",
field=models.PositiveIntegerField(default=0),
),
]
3 changes: 3 additions & 0 deletions src/apps/competitions/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,9 @@ class Submission(models.Model):
ingestion_worker_hostname = models.CharField(max_length=255, blank=True, null=True)
# Scoring hostname
scoring_worker_hostname = models.CharField(max_length=255, blank=True, null=True)
# Incremented every time a worker claims this submission from the broker.
# Used to detect redeliveries (M6) and provide an audit trail.
worker_attempt_count = models.PositiveIntegerField(default=0)
queue = models.ForeignKey('queues.Queue', on_delete=models.SET_NULL, null=True, blank=True,
related_name='submissions')
is_migrated = models.BooleanField(default=False)
Expand Down