UN-3266 [FEAT] Async Executor Backend for Prompt Studio#1849
UN-3266 [FEAT] Async Executor Backend for Prompt Studio#1849harini-venkataraman wants to merge 91 commits intomainfrom
Conversation
Conflicts resolved: - docker-compose.yaml: Use main's dedicated dashboard_metric_events queue for worker-metrics - PromptCard.jsx: Keep tool_id matching condition from our async socket feature - PromptRun.jsx: Merge useEffect import from main with our branch - ToolIde.jsx: Keep fire-and-forget socket approach (spinner waits for socket event) - SocketMessages.js: Keep both session-store and socket-custom-tool imports + updateCusToolMessages dep - SocketContext.js: Keep simpler path-based socket connection approach - usePromptRun.js: Keep Celery fire-and-forget with socket delivery over polling - setupProxy.js: Accept main's deletion (migrated to Vite)
for more information, see https://pre-commit.ci
… into feat/execution-backend
for more information, see https://pre-commit.ci
… into feat/execution-backend
| # Check if highlight data should be removed using configuration registry | ||
| # Ensure workflow identification keys are always in item metadata | ||
| organization = api.organization if api else None | ||
| org_id = str(organization.organization_id) if organization else "" |
There was a problem hiding this comment.
I don’t think this should be allowed when the organization is missing. Also how it works with an empty org_id?
cc: @vishnuszipstack
| ) -> None: | ||
| """Inject per-model usage breakdown into item['result']['metadata'].""" | ||
| inner_result = item.get("result") | ||
| if not isinstance(inner_result, dict): |
There was a problem hiding this comment.
NIT: improve/enhance class ExecutionResponse by adding a dto for result
| ) | ||
| return APIExecutionResponseSerializer(result).data | ||
|
|
||
| @staticmethod |
There was a problem hiding this comment.
Hope there is no structure change of result here .. Can you please add the model/sample in descreption. or along the class ExecutionResponse
| _worker_app: Celery | None = None | ||
|
|
||
|
|
||
| class _WorkerDispatchCelery(Celery): |
There was a problem hiding this comment.
Why celery here ? We already moved it from backend . What this methods do here?
There was a problem hiding this comment.
@muhammad-ali-e The backend Celery worker handles fire-and-forget callback tasks that run after the executor worker finishes. Here's the flow:
Backend dispatches task → Executor Worker (does the heavy lifting)
↓ (Celery link/link_error)
Backend Callback Task (lightweight)
├── ORM writes (persist results to DB)
└── WebSocket push (notify frontend in real-time)
Why these run on the backend (not the executor worker):
- They need Django ORM access (database models, services) — the executor worker doesn't have Django loaded
- They need the Socket.IO emitter to push real-time updates to the frontend
- They're lightweight — just DB writes + WebSocket emit, no heavy computation
- Keeps the executor worker stateless and focused on execution only
| the action. | ||
| """ | ||
| profile_manager_owner = profile_manager.created_by | ||
| if profile_manager_owner is None: |
There was a problem hiding this comment.
is this created_by a default value , right? when will it be None?
| global _worker_app | ||
| if _worker_app is not None: | ||
| return _worker_app |
There was a problem hiding this comment.
Unsynchronised singleton initialisation — race condition under concurrent requests
get_worker_celery_app() uses the classic double-check-without-lock pattern:
if _worker_app is not None:
return _worker_appUnder gunicorn with threaded workers (or any multi-threaded Django deployment), two threads can simultaneously see _worker_app is None and both proceed to create a new _WorkerDispatchCelery instance. The second assignment overwrites the first (last-writer-wins), so each thread may end up holding a reference to a different object than what ends up in the module global. This is benign in practice because both instances are configured identically, but it is wasteful and could cause subtle issues if Celery connection pools are per-instance.
The idiomatic Python fix is to use a module-level lock:
import threading
_worker_app: Celery | None = None
_worker_app_lock = threading.Lock()
def get_worker_celery_app() -> Celery:
global _worker_app
if _worker_app is not None:
return _worker_app
with _worker_app_lock:
if _worker_app is None: # re-check inside lock
...
_worker_app = app
return _worker_appPrompt To Fix With AI
This is a comment left during a code review.
Path: backend/backend/worker_celery.py
Line: 61-63
Comment:
**Unsynchronised singleton initialisation — race condition under concurrent requests**
`get_worker_celery_app()` uses the classic double-check-without-lock pattern:
```python
if _worker_app is not None:
return _worker_app
```
Under gunicorn with threaded workers (or any multi-threaded Django deployment), two threads can simultaneously see `_worker_app is None` and both proceed to create a new `_WorkerDispatchCelery` instance. The second assignment overwrites the first (last-writer-wins), so each thread may end up holding a reference to a *different* object than what ends up in the module global. This is benign in practice because both instances are configured identically, but it is wasteful and could cause subtle issues if Celery connection pools are per-instance.
The idiomatic Python fix is to use a module-level lock:
```python
import threading
_worker_app: Celery | None = None
_worker_app_lock = threading.Lock()
def get_worker_celery_app() -> Celery:
global _worker_app
if _worker_app is not None:
return _worker_app
with _worker_app_lock:
if _worker_app is None: # re-check inside lock
...
_worker_app = app
return _worker_app
```
How can I resolve this? If you propose a fix, please make it concise.
Code reviewFound 1 issue:
unstract/backend/prompt_studio/prompt_studio_core_v2/views.py Lines 364 to 424 in 1c58eb9 unstract/backend/prompt_studio/prompt_studio_core_v2/views.py Lines 426 to 505 in 1c58eb9 unstract/backend/prompt_studio/prompt_studio_core_v2/views.py Lines 507 to 596 in 1c58eb9 🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
| def _is_safe_public_url(?url=https%3A%2F%2Fgithub.com%2FZipstack%2Funstract%2Fpull%2F1849%2Furl%3A+str) -> bool: | ||
| """Validate webhook URL for SSRF protection. | ||
|
|
||
| Only allows HTTPS and blocks private/loopback/internal addresses. | ||
| """ | ||
| try: | ||
| p = urlparse(url) | ||
| if p.scheme not in ("https",): | ||
| return False | ||
| host = p.hostname or "" | ||
| if host in ("localhost",): | ||
| return False | ||
|
|
||
| addrs = _resolve_host_addresses(host) | ||
| if not addrs: | ||
| return False | ||
|
|
||
| for addr in addrs: | ||
| try: | ||
| ip = ipaddress.ip_address(addr) | ||
| except ValueError: | ||
| return False | ||
| if ( | ||
| ip.is_private | ||
| or ip.is_loopback | ||
| or ip.is_link_local | ||
| or ip.is_reserved | ||
| or ip.is_multicast | ||
| ): | ||
| return False | ||
| return True | ||
| except Exception: | ||
| return False |
There was a problem hiding this comment.
DNS rebinding (TOCTOU) bypasses SSRF protection
_is_safe_public_url resolves the webhook hostname via DNS at validation time, but the actual HTTP request to the webhook (via postprocess_data) happens afterwards. An attacker who controls a DNS server can perform a DNS rebinding attack:
- During validation:
attacker.com→1.2.3.4(a public IP) — validation passes. - Attacker flips DNS TTL to 0 and rebinds
attacker.com→169.254.169.254(AWS metadata),10.0.0.1(internal service), etc. - During the actual HTTP request: the OS re-resolves
attacker.comand connects to the internal address.
The current is_private / is_loopback / etc. checks are bypassed entirely because they only guard the validation-time resolution. This is a real risk in a multi-tenant environment where arbitrary webhook URLs can be registered.
The standard mitigation is to make the HTTP request through a socket-level wrapper that re-validates the resolved IP at connection time — for example by overriding the socket's connect() to check the destination address immediately before the connection is established, or by using a library such as urllib3-pyOpenSSL with a custom ProxyManager, or setting a fixed DNS-resolved IP on the requests session host header.
Prompt To Fix With AI
This is a comment left during a code review.
Path: workers/executor/executors/answer_prompt.py
Line: 44-76
Comment:
**DNS rebinding (TOCTOU) bypasses SSRF protection**
`_is_safe_public_url` resolves the webhook hostname via DNS at *validation time*, but the actual HTTP request to the webhook (via `postprocess_data`) happens *afterwards*. An attacker who controls a DNS server can perform a DNS rebinding attack:
1. During validation: `attacker.com` → `1.2.3.4` (a public IP) — validation passes.
2. Attacker flips DNS TTL to 0 and rebinds `attacker.com` → `169.254.169.254` (AWS metadata), `10.0.0.1` (internal service), etc.
3. During the actual HTTP request: the OS re-resolves `attacker.com` and connects to the internal address.
The current `is_private` / `is_loopback` / etc. checks are bypassed entirely because they only guard the validation-time resolution. This is a real risk in a multi-tenant environment where arbitrary webhook URLs can be registered.
The standard mitigation is to make the HTTP request through a socket-level wrapper that re-validates the resolved IP *at connection time* — for example by overriding the socket's `connect()` to check the destination address immediately before the connection is established, or by using a library such as [urllib3-pyOpenSSL](https://github.com/urllib3/urllib3) with a custom `ProxyManager`, or setting a fixed DNS-resolved IP on the `requests` session host header.
How can I resolve this? If you propose a fix, please make it concise.Signed-off-by: harini-venkataraman <115449948+harini-venkataraman@users.noreply.github.com>
for more information, see https://pre-commit.ci
|
@claude review |
| DocumentIndexingService.set_document_indexing( | ||
| org_id=org_id, user_id=user_id, doc_id_key=doc_id_key | ||
| ) | ||
|
|
There was a problem hiding this comment.
set_document_indexing not rolled back on broker failure
DocumentIndexingService.set_document_indexing(...) is called at the end of build_index_payload (before dispatch_with_callback is called in the view). If dispatch_with_callback subsequently raises — for example, because the broker is unavailable, the Celery app is not configured, or any other exception — the document is permanently left in the "indexing in progress" state. The ide_index_error errback is never invoked because no task was dispatched, so there is no mechanism to clear the stuck flag.
The view code does not wrap dispatch_with_callback in a try/except that would call DocumentIndexingService.remove_document_indexing(...) on failure. The result is that the user sees an infinite "indexing in progress" indicator and cannot re-index the document without manual DB intervention.
To fix this, either:
- Move
set_document_indexingto after successful dispatch (wrap the dispatch, set the flag only on success), or - Add a
try/exceptarounddispatch_with_callbackin the view that callsremove_document_indexingon error.
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 392-395
Comment:
**`set_document_indexing` not rolled back on broker failure**
`DocumentIndexingService.set_document_indexing(...)` is called at the end of `build_index_payload` (before `dispatch_with_callback` is called in the view). If `dispatch_with_callback` subsequently raises — for example, because the broker is unavailable, the Celery app is not configured, or any other exception — the document is permanently left in the "indexing in progress" state. The `ide_index_error` errback is never invoked because no task was dispatched, so there is no mechanism to clear the stuck flag.
The view code does not wrap `dispatch_with_callback` in a try/except that would call `DocumentIndexingService.remove_document_indexing(...)` on failure. The result is that the user sees an infinite "indexing in progress" indicator and cannot re-index the document without manual DB intervention.
To fix this, either:
1. Move `set_document_indexing` to after successful dispatch (wrap the dispatch, set the flag only on success), or
2. Add a `try/except` around `dispatch_with_callback` in the view that calls `remove_document_indexing` on error.
How can I resolve this? If you propose a fix, please make it concise.| cb_kwargs = { | ||
| "log_events_id": log_events_id, | ||
| "request_id": request_id, | ||
| "org_id": org_id, | ||
| "operation": "single_pass_extraction", | ||
| "run_id": run_id, | ||
| "document_id": document_id, | ||
| "tool_id": tool_id, | ||
| "prompt_ids": [str(p.prompt_id) for p in prompts], | ||
| "is_single_pass": True, | ||
| } |
There was a problem hiding this comment.
Missing
profile_manager_id in single_pass_extraction cb_kwargs
build_single_pass_payload does not include profile_manager_id in its cb_kwargs (lines 855–865). When ide_prompt_complete processes this callback it reads:
profile_manager_id = cb.get("profile_manager_id") # always None for single-passand passes profile_manager_id=None to OutputManagerHelper.handle_prompt_output_update. Depending on how that helper uses the field, single-pass outputs may not be correctly associated with the profile manager, producing a different storage behavior than the fetch_response path (which always passes the explicit profile_manager_id).
More concretely, when ide_prompt_error fires for a single-pass failure, the emitted error event also lacks profile_manager_id. The frontend's handleFailed falls through to the broad clearPromptStatusById(promptId) fallback, which clears ALL doc/profile status combinations for those prompts — not just the one that was actually running. This means an error in one single-pass run would cancel the loading spinners for unrelated concurrent runs.
Consider adding the default profile's profile_id to cb_kwargs:
cb_kwargs = {
...
"profile_manager_id": str(default_profile.profile_id),
...
}Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 855-865
Comment:
**Missing `profile_manager_id` in `single_pass_extraction` `cb_kwargs`**
`build_single_pass_payload` does not include `profile_manager_id` in its `cb_kwargs` (lines 855–865). When `ide_prompt_complete` processes this callback it reads:
```python
profile_manager_id = cb.get("profile_manager_id") # always None for single-pass
```
and passes `profile_manager_id=None` to `OutputManagerHelper.handle_prompt_output_update`. Depending on how that helper uses the field, single-pass outputs may not be correctly associated with the profile manager, producing a different storage behavior than the `fetch_response` path (which always passes the explicit `profile_manager_id`).
More concretely, when `ide_prompt_error` fires for a single-pass failure, the emitted error event also lacks `profile_manager_id`. The frontend's `handleFailed` falls through to the broad `clearPromptStatusById(promptId)` fallback, which clears ALL doc/profile status combinations for those prompts — not just the one that was actually running. This means an error in one single-pass run would cancel the loading spinners for unrelated concurrent runs.
Consider adding the default profile's `profile_id` to `cb_kwargs`:
```python
cb_kwargs = {
...
"profile_manager_id": str(default_profile.profile_id),
...
}
```
How can I resolve this? If you propose a fix, please make it concise.| const onResult = useCallback( | ||
| (payload) => { | ||
| try { | ||
| const msg = payload?.data || payload; | ||
| const { status, operation, result, error, ...extra } = msg; | ||
|
|
||
| if (status === "completed") { | ||
| handleCompleted(operation, result); | ||
| } else if (status === "failed") { | ||
| handleFailed(operation, error, extra); | ||
| } | ||
| } catch (err) { | ||
| setAlertDetails( | ||
| handleException(err, "Failed to process prompt studio result"), | ||
| ); | ||
| } | ||
| }, | ||
| [handleCompleted, handleFailed, setAlertDetails, handleException], | ||
| ); |
There was a problem hiding this comment.
Socket result event not scoped to the current tool — multi-tab state corruption
prompt_studio_result events are emitted to the log_events_id Socket.IO room, which is per-user-session, not per-tool or per-tab. If a user has two Prompt Studio tools open simultaneously in separate tabs (both sharing the same Socket.IO connection and log_events_id), a result from Tool A's execution will be received and processed by Tab B's usePromptStudioSocket listener as well.
In handleCompleted("fetch_response", result):
updatePromptOutputState(data, false); // writes Tool A's outputs into Tab B's store
clearResultStatuses(data); // tries to clear statuses using Tool A's prompt IDsupdatePromptOutputState in Tab B would overwrite prompt output state with data belonging to Tool A's prompts. This can cause phantom outputs to appear under the wrong tool and leave Tab B in an inconsistent state.
The socket event payload (_emit_result in tasks.py) does not include a tool_id field, so the frontend has no way to discard irrelevant events. Consider adding tool_id (or custom_tool_id) to the emitted payload and filtering it in onResult:
const onResult = useCallback((payload) => {
const msg = payload?.data || payload;
if (msg.tool_id && msg.tool_id !== details?.tool_id) return; // ignore events for other tools
...
}, [..., details?.tool_id]);Prompt To Fix With AI
This is a comment left during a code review.
Path: frontend/src/hooks/usePromptStudioSocket.js
Line: 131-149
Comment:
**Socket result event not scoped to the current tool — multi-tab state corruption**
`prompt_studio_result` events are emitted to the `log_events_id` Socket.IO room, which is per-user-session, not per-tool or per-tab. If a user has two Prompt Studio tools open simultaneously in separate tabs (both sharing the same Socket.IO connection and `log_events_id`), a result from Tool A's execution will be received and processed by Tab B's `usePromptStudioSocket` listener as well.
In `handleCompleted("fetch_response", result)`:
```js
updatePromptOutputState(data, false); // writes Tool A's outputs into Tab B's store
clearResultStatuses(data); // tries to clear statuses using Tool A's prompt IDs
```
`updatePromptOutputState` in Tab B would overwrite prompt output state with data belonging to Tool A's prompts. This can cause phantom outputs to appear under the wrong tool and leave Tab B in an inconsistent state.
The socket event payload (`_emit_result` in `tasks.py`) does not include a `tool_id` field, so the frontend has no way to discard irrelevant events. Consider adding `tool_id` (or `custom_tool_id`) to the emitted payload and filtering it in `onResult`:
```js
const onResult = useCallback((payload) => {
const msg = payload?.data || payload;
if (msg.tool_id && msg.tool_id !== details?.tool_id) return; // ignore events for other tools
...
}, [..., details?.tool_id]);
```
How can I resolve this? If you propose a fix, please make it concise.| const clearResultStatuses = useCallback( | ||
| (data) => { | ||
| if (!Array.isArray(data)) { | ||
| return; | ||
| } | ||
| data.forEach((item) => { | ||
| const promptId = item?.prompt_id; | ||
| const docId = item?.document_manager; | ||
| const profileId = item?.profile_manager; | ||
| if (promptId && docId && profileId) { | ||
| const statusKey = generateApiRunStatusId(docId, profileId); | ||
| removePromptStatus(promptId, statusKey); | ||
| } | ||
| }); | ||
| }, | ||
| [removePromptStatus], | ||
| ); |
There was a problem hiding this comment.
clearResultStatuses spinner-clearing may permanently fail
clearResultStatuses derives the status key from item.profile_manager on the result data items. The status was originally stored using a profileId taken directly from the queue item string — a raw UUID string. For clearResultStatuses to match and call removePromptStatus, item.profile_manager in the result data must be the exact same UUID string.
If OutputManagerHelper.handle_prompt_output_update returns serialized objects where profile_manager is an integer PK, a nested object, or null, the condition if (promptId && docId && profileId) will be false, removePromptStatus will never be called, and the loading spinner for the prompt will remain active forever. The user would be unable to re-run the prompt without a page refresh.
The old polling path avoided this by explicitly removing the status with the IDs already available in the callback closure. The new socket path has no such explicit fallback.
Consider including prompt_ids, document_id, and profile_manager_id in the socket event payload (they are already present in cb_kwargs) so the frontend can always do a direct cleanup regardless of the result data format, rather than relying on parsing the ORM-serialized result items.
Prompt To Fix With AI
This is a comment left during a code review.
Path: frontend/src/hooks/usePromptStudioSocket.js
Line: 28-44
Comment:
**`clearResultStatuses` spinner-clearing may permanently fail**
`clearResultStatuses` derives the status key from `item.profile_manager` on the result data items. The status was originally stored using a `profileId` taken directly from the queue item string — a raw UUID string. For `clearResultStatuses` to match and call `removePromptStatus`, `item.profile_manager` in the result data must be the exact same UUID string.
If `OutputManagerHelper.handle_prompt_output_update` returns serialized objects where `profile_manager` is an integer PK, a nested object, or `null`, the condition `if (promptId && docId && profileId)` will be false, `removePromptStatus` will never be called, and the loading spinner for the prompt will remain active forever. The user would be unable to re-run the prompt without a page refresh.
The old polling path avoided this by explicitly removing the status with the IDs already available in the callback closure. The new socket path has no such explicit fallback.
Consider including `prompt_ids`, `document_id`, and `profile_manager_id` in the socket event payload (they are already present in `cb_kwargs`) so the frontend can always do a direct cleanup regardless of the result data format, rather than relying on parsing the ORM-serialized result items.
How can I resolve this? If you propose a fix, please make it concise.…t drift
- Remove redundant inline `import uuid as _uuid` in views.py (use module-level uuid)
- URL-encode DB_USER in worker_celery.py result backend connection string
- Remove misleading task_queues=[Queue("executor")] from dispatch-only Celery app
- Remove dead `if not tool:` guards after objects.get() (already raises DoesNotExist)
- Move profile_manager/default_profile null checks before first dereference
- Reorder ProfileManager.objects.get before mark_document_indexed in tasks.py
- Handle ProfileManager.DoesNotExist as warning, not hard failure
- Wrap PostHog analytics in try/catch so failures don't block prompt execution
- Handle pending-indexing 200 response in usePromptRun.js (clear RUNNING status)
- Reset formData when metadata is missing in ConfigureDs.jsx
- Fix test_should_skip_extraction tests: function now takes 1 arg (outputs only)
- Fix agentic routing tests: mock X2Text.process, remove stale platform_helper kwarg
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
for more information, see https://pre-commit.ci
| const statusKey = generateApiRunStatusId(docId, profileId); | ||
| removePromptStatus(promptId, statusKey); | ||
| setAlertDetails({ | ||
| type: "info", | ||
| content: | ||
| res?.data?.message || "Document is being indexed. Please wait.", | ||
| }); | ||
| return; | ||
| } | ||
|
|
||
| // Timeout safety net: clear stale status if socket event never arrives. | ||
| setTimeout(() => { | ||
| const statusKey = generateApiRunStatusId(docId, profileId); | ||
| const current = usePromptRunStatusStore.getState().promptRunStatus; | ||
| if ( |
There was a problem hiding this comment.
5-minute timeout can falsely cancel a later re-run of the same prompt
The setTimeout closure captures promptId, docId, and profileId from run N. When it fires 5 minutes later it reads the current store state and checks whether current?.[promptId]?.[statusKey] === PROMPT_RUN_API_STATUSES.RUNNING. If the user triggered another run of the same (promptId, docId, profileId) combination (run N+1) within that 5-minute window, the timer from run N will see run N+1's RUNNING state, clear it, and display a spurious "timed out" warning — even though run N+1 may still be processing and will later receive a valid socket event. The result is a permanently stuck spinner for run N+1 (status cleared by the stale timer) while the socket result arrives and tries to call clearResultStatuses on an already-removed entry.
Mitigation: tag each dispatch with a unique runId and include it in the status store key, so the timeout only clears the specific run it corresponds to:
const runNonce = generateUUID();
addPromptStatus(promptId, statusKey, PROMPT_RUN_API_STATUSES.RUNNING, runNonce);
setTimeout(() => {
const current = usePromptRunStatusStore.getState().promptRunStatus;
if (current?.[promptId]?.[statusKey]?.nonce === runNonce) {
removePromptStatus(promptId, statusKey);
setAlertDetails({ type: "warning", content: "Prompt execution timed out. Please try again." });
}
}, SOCKET_TIMEOUT_MS);Prompt To Fix With AI
This is a comment left during a code review.
Path: frontend/src/hooks/usePromptRun.js
Line: 56-70
Comment:
**5-minute timeout can falsely cancel a later re-run of the same prompt**
The `setTimeout` closure captures `promptId`, `docId`, and `profileId` from run N. When it fires 5 minutes later it reads the *current* store state and checks whether `current?.[promptId]?.[statusKey] === PROMPT_RUN_API_STATUSES.RUNNING`. If the user triggered another run of the same `(promptId, docId, profileId)` combination (run N+1) within that 5-minute window, the timer from run N will see run N+1's `RUNNING` state, clear it, and display a spurious "timed out" warning — even though run N+1 may still be processing and will later receive a valid socket event. The result is a permanently stuck spinner for run N+1 (status cleared by the stale timer) while the socket result arrives and tries to call `clearResultStatuses` on an already-removed entry.
Mitigation: tag each dispatch with a unique `runId` and include it in the status store key, so the timeout only clears the *specific* run it corresponds to:
```js
const runNonce = generateUUID();
addPromptStatus(promptId, statusKey, PROMPT_RUN_API_STATUSES.RUNNING, runNonce);
setTimeout(() => {
const current = usePromptRunStatusStore.getState().promptRunStatus;
if (current?.[promptId]?.[statusKey]?.nonce === runNonce) {
removePromptStatus(promptId, statusKey);
setAlertDetails({ type: "warning", content: "Prompt execution timed out. Please try again." });
}
}, SOCKET_TIMEOUT_MS);
```
How can I resolve this? If you propose a fix, please make it concise.| ) | ||
| return str(platform_key.key) | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Phase 5B — Payload builders for fire-and-forget dispatch | ||
| # ------------------------------------------------------------------ |
There was a problem hiding this comment.
default_profile dereferenced before null guard in build_index_payload
build_index_payload calls ProfileManager.get_default_llm_profile(tool), then immediately passes the result to validate_adapter_status and validate_profile_manager_owner_access without any null check. If no default LLM profile is configured for the tool, get_default_llm_profile returns None and both validators will raise AttributeError deep inside the helper, surfacing as an opaque 500 error instead of the intended DefaultProfileError.
The same defensiveness present in build_single_pass_payload (if not default_profile: raise DefaultProfileError()) should be applied here:
default_profile = ProfileManager.get_default_llm_profile(tool)
if not default_profile:
raise DefaultProfileError()
PromptStudioHelper.validate_adapter_status(default_profile)
PromptStudioHelper.validate_profile_manager_owner_access(default_profile)Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 297-302
Comment:
**`default_profile` dereferenced before null guard in `build_index_payload`**
`build_index_payload` calls `ProfileManager.get_default_llm_profile(tool)`, then immediately passes the result to `validate_adapter_status` and `validate_profile_manager_owner_access` without any null check. If no default LLM profile is configured for the tool, `get_default_llm_profile` returns `None` and both validators will raise `AttributeError` deep inside the helper, surfacing as an opaque 500 error instead of the intended `DefaultProfileError`.
The same defensiveness present in `build_single_pass_payload` (`if not default_profile: raise DefaultProfileError()`) should be applied here:
```python
default_profile = ProfileManager.get_default_llm_profile(tool)
if not default_profile:
raise DefaultProfileError()
PromptStudioHelper.validate_adapter_status(default_profile)
PromptStudioHelper.validate_profile_manager_owner_access(default_profile)
```
How can I resolve this? If you propose a fix, please make it concise.|
|
||
| Raises: | ||
| FilenameMissingError: _description_ | ||
| Args: | ||
| request (HttpRequest) | ||
|
|
||
| Returns: | ||
| Response | ||
| """ | ||
| custom_tool = self.get_object() | ||
| tool_id: str = str(custom_tool.tool_id) | ||
| document_id: str = request.data.get(ToolStudioPromptKeys.DOCUMENT_ID) | ||
| id: str = request.data.get(ToolStudioPromptKeys.ID) | ||
| prompt_id: str = request.data.get(ToolStudioPromptKeys.ID) | ||
| run_id: str = request.data.get(ToolStudioPromptKeys.RUN_ID) | ||
| profile_manager: str = request.data.get(ToolStudioPromptKeys.PROFILE_MANAGER_ID) | ||
| profile_manager_id: str = request.data.get( | ||
| ToolStudioPromptKeys.PROFILE_MANAGER_ID | ||
| ) |
There was a problem hiding this comment.
HubSpot first-run analytics event silently dropped in async path
The old sync fetch_response path tracked output_count_before and called notify_hubspot_event(user, "PROMPT_RUN", is_first_for_org=..., ...) to fire a business analytics event on the first prompt run for an organisation. The new async path removes both the count query and the notification call entirely with no comment or TODO.
If this is intentional (e.g., to be re-added once the async path is stable), a comment noting this would prevent it from being permanently lost. If it is unintentional, first-run HubSpot events will silently stop firing for any organisation that has async_prompt_execution enabled, skewing adoption metrics.
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 430-443
Comment:
**HubSpot first-run analytics event silently dropped in async path**
The old sync `fetch_response` path tracked `output_count_before` and called `notify_hubspot_event(user, "PROMPT_RUN", is_first_for_org=..., ...)` to fire a business analytics event on the first prompt run for an organisation. The new async path removes both the count query and the notification call entirely with no comment or TODO.
If this is intentional (e.g., to be re-added once the async path is stable), a comment noting this would prevent it from being permanently lost. If it is unintentional, first-run HubSpot events will silently stop firing for any organisation that has `async_prompt_execution` enabled, skewing adoption metrics.
How can I resolve this? If you propose a fix, please make it concise.
Frontend Lint Report (Biome)✅ All checks passed! No linting or formatting issues found. |
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
| dp = ProfileManager.get_default_llm_profile(tool) | ||
| monitor_llm = str(dp.llm.id) | ||
| if challenge_llm_instance: | ||
| challenge_llm = str(challenge_llm_instance.id) | ||
| else: | ||
| dp = ProfileManager.get_default_llm_profile(tool) | ||
| challenge_llm = str(dp.llm.id) | ||
| return monitor_llm, challenge_llm |
There was a problem hiding this comment.
AttributeError on None default profile in _resolve_llm_ids
When tool.monitor_llm or tool.challenge_llm is None (not explicitly set), the code falls through to ProfileManager.get_default_llm_profile(tool). If that also returns None (no default profile configured), the next line str(dp.llm.id) immediately raises AttributeError: 'NoneType' object has no attribute 'llm'. This surfaces as an opaque 500 error rather than the expected DefaultProfileError.
This can happen when a user creates a Prompt Studio tool, sets a per-prompt profile manager, but has never configured a default tool-level profile AND has not set explicit monitor_llm/challenge_llm adapters.
Note that _resolve_llm_ids is called in build_fetch_response_payload before the if not profile_manager: raise DefaultProfileError() guard (line 536), so a missing default profile causes an AttributeError that bypasses the intended error handling entirely.
| dp = ProfileManager.get_default_llm_profile(tool) | |
| monitor_llm = str(dp.llm.id) | |
| if challenge_llm_instance: | |
| challenge_llm = str(challenge_llm_instance.id) | |
| else: | |
| dp = ProfileManager.get_default_llm_profile(tool) | |
| challenge_llm = str(dp.llm.id) | |
| return monitor_llm, challenge_llm | |
| if monitor_llm_instance: | |
| monitor_llm = str(monitor_llm_instance.id) | |
| else: | |
| dp = ProfileManager.get_default_llm_profile(tool) | |
| if not dp: | |
| raise DefaultProfileError() | |
| monitor_llm = str(dp.llm.id) | |
| if challenge_llm_instance: | |
| challenge_llm = str(challenge_llm_instance.id) | |
| else: | |
| dp = ProfileManager.get_default_llm_profile(tool) | |
| if not dp: | |
| raise DefaultProfileError() | |
| challenge_llm = str(dp.llm.id) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 484-491
Comment:
**`AttributeError` on `None` default profile in `_resolve_llm_ids`**
When `tool.monitor_llm` or `tool.challenge_llm` is `None` (not explicitly set), the code falls through to `ProfileManager.get_default_llm_profile(tool)`. If that also returns `None` (no default profile configured), the next line `str(dp.llm.id)` immediately raises `AttributeError: 'NoneType' object has no attribute 'llm'`. This surfaces as an opaque 500 error rather than the expected `DefaultProfileError`.
This can happen when a user creates a Prompt Studio tool, sets a per-prompt profile manager, but has never configured a default tool-level profile AND has not set explicit `monitor_llm`/`challenge_llm` adapters.
Note that `_resolve_llm_ids` is called in `build_fetch_response_payload` **before** the `if not profile_manager: raise DefaultProfileError()` guard (line 536), so a missing default profile causes an `AttributeError` that bypasses the intended error handling entirely.
```suggestion
if monitor_llm_instance:
monitor_llm = str(monitor_llm_instance.id)
else:
dp = ProfileManager.get_default_llm_profile(tool)
if not dp:
raise DefaultProfileError()
monitor_llm = str(dp.llm.id)
if challenge_llm_instance:
challenge_llm = str(challenge_llm_instance.id)
else:
dp = ProfileManager.get_default_llm_profile(tool)
if not dp:
raise DefaultProfileError()
challenge_llm = str(dp.llm.id)
```
How can I resolve this? If you propose a fix, please make it concise.Add PSKeys.LLM_USAGE_REASON to usage_kwargs in _handle_summarize() so summarization costs appear under summarize_llm in API response metadata. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| PromptStudioHelper.dynamic_extractor( | ||
| profile_manager=default_profile, | ||
| file_path=doc_path, | ||
| org_id=org_id, | ||
| document_id=document_id, | ||
| run_id=run_id, | ||
| enable_highlight=tool.enable_highlight, | ||
| ) |
There was a problem hiding this comment.
dynamic_extractor blocks Django worker in async path
build_single_pass_payload calls PromptStudioHelper.dynamic_extractor(...) synchronously before returning the ExecutionContext. Because build_single_pass_payload is called directly from the single_pass_extraction view (inside the HTTP request–response cycle), this blocking document-extraction call — which can be a long-running x2text adapter operation on a cache miss — ties up a Django worker thread for potentially minutes, directly contradicting the PR's stated goal of freeing Django workers immediately.
The comment # Extract (blocking, usually cached) acknowledges this, but "usually cached" is not a correctness guarantee: first-run documents, cache invalidations, or simply a cache miss will exhibit the same blocking behavior the async architecture was designed to eliminate.
The extraction step should either be moved into the executor worker (as part of the Celery task), or preceded by a cache check that returns an early HTTP 202 / queued status if the extracted text is not already available, rather than extracting inline in the view's call stack.
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Line: 766-773
Comment:
**`dynamic_extractor` blocks Django worker in async path**
`build_single_pass_payload` calls `PromptStudioHelper.dynamic_extractor(...)` synchronously before returning the `ExecutionContext`. Because `build_single_pass_payload` is called directly from the `single_pass_extraction` view (inside the HTTP request–response cycle), this blocking document-extraction call — which can be a long-running x2text adapter operation on a cache miss — ties up a Django worker thread for potentially minutes, directly contradicting the PR's stated goal of freeing Django workers immediately.
The comment `# Extract (blocking, usually cached)` acknowledges this, but "usually cached" is not a correctness guarantee: first-run documents, cache invalidations, or simply a cache miss will exhibit the same blocking behavior the async architecture was designed to eliminate.
The extraction step should either be moved into the executor worker (as part of the Celery task), or preceded by a cache check that returns an early `HTTP 202` / queued status if the extracted text is not already available, rather than extracting inline in the view's call stack.
How can I resolve this? If you propose a fix, please make it concise.- Route _handle_structure_pipeline to _handle_single_pass_extraction when is_single_pass=True (was always calling _handle_answer_prompt) - Delegate _handle_single_pass_extraction to cloud plugin via ExecutorRegistry, falling back to _handle_answer_prompt if plugin not installed Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
| @action(detail=True, methods=["get"]) | ||
| def task_status( | ||
| self, request: HttpRequest, pk: Any = None, task_id: str = None | ||
| ) -> Response: | ||
| """Poll the status of an async Prompt Studio task. | ||
|
|
||
| Task IDs now point to executor worker tasks dispatched via the | ||
| worker-v2 Celery app. Both apps share the same PostgreSQL | ||
| result backend, so we use the worker app to look up results. | ||
|
|
||
| Args: | ||
| request (HttpRequest) | ||
| pk: Primary key of the CustomTool (for permission check) | ||
| task_id: Celery task ID returned by the 202 response | ||
|
|
||
| Returns: | ||
| Response with {task_id, status} and optionally result or error | ||
| """ | ||
| from celery.result import AsyncResult | ||
|
|
||
| from backend.worker_celery import get_worker_celery_app | ||
|
|
||
| # Verify the user has access to this tool (triggers permission check) | ||
| self.get_object() | ||
|
|
||
| result = AsyncResult(task_id, app=get_worker_celery_app()) | ||
| if not result.ready(): | ||
| return Response({"task_id": task_id, "status": "processing"}) | ||
| if result.successful(): |
There was a problem hiding this comment.
task_status returns raw executor result — data shape differs from socket event
The endpoint looks up the executor Celery task (on the celery_executor_legacy queue) and returns its raw result. However, the actual prompt output that the frontend cares about is produced by the callback task (ide_prompt_complete) — which calls OutputManagerHelper.handle_prompt_output_update() and then emits the processed data via Socket.IO.
The task_status endpoint therefore returns the unprocessed ExecutionResult.to_dict() from the executor worker (containing raw LLM output and metadata), while the socket event delivers the ORM-serialised PromptStudioOutputManager records that the frontend can actually render.
If any client uses task_status as a polling fallback expecting the same data shape as the socket event (e.g., for recovery after a missed socket connection), it will receive incompatible data, silently rendering nothing or throwing a client-side error.
Consider either:
- Tracking the callback task ID instead of the executor task ID, so the polling endpoint returns the ORM-processed result, or
- Clearly documenting in the response that
resultis a raw executor payload and not the rendered output, and ensuring the frontend never tries to use it for output rendering.
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/prompt_studio/prompt_studio_core_v2/views.py
Line: 592-620
Comment:
**`task_status` returns raw executor result — data shape differs from socket event**
The endpoint looks up the executor Celery task (on the `celery_executor_legacy` queue) and returns its raw `result`. However, the actual prompt output that the frontend cares about is produced by the **callback task** (`ide_prompt_complete`) — which calls `OutputManagerHelper.handle_prompt_output_update()` and then emits the processed data via Socket.IO.
The `task_status` endpoint therefore returns the unprocessed `ExecutionResult.to_dict()` from the executor worker (containing raw LLM output and metadata), while the socket event delivers the ORM-serialised `PromptStudioOutputManager` records that the frontend can actually render.
If any client uses `task_status` as a polling fallback expecting the same data shape as the socket event (e.g., for recovery after a missed socket connection), it will receive incompatible data, silently rendering nothing or throwing a client-side error.
Consider either:
- Tracking the **callback task ID** instead of the executor task ID, so the polling endpoint returns the ORM-processed result, or
- Clearly documenting in the response that `result` is a raw executor payload and not the rendered output, and ensuring the frontend never tries to use it for output rendering.
How can I resolve this? If you propose a fix, please make it concise.

What
Introduces a pluggable executor system that replaces Docker-container-based tool execution with Celery worker tasks, and migrates the Prompt Studio IDE to an async execution model using Socket.IO for result delivery. Gated behind the
async_prompt_executionfeature flag for safe rollout.Why
The existing architecture has several limitations:
How
Backend (65 files)
index_document,fetch_response,single_pass_extractionnow return HTTP 202 (accepted) with atask_idinstead of blocking. Gated byasync_prompt_executionfeature flag — old sync path preserved as fallbackbackend/prompt_studio/prompt_studio_core_v2/tasks.py):ide_index_complete,ide_prompt_complete,ide_prompt_erroretc. run onprompt_studio_callbackqueue, perform ORM writes viaOutputManagerHelper, and emitprompt_studio_resultSocket.IO eventsbackend/backend/worker_celery.py): A second Celery app instance that coexists with Django's Celery app, configured to route tasks to executor workersprompt_studio_helper.pyrewrite: RemovedPromptToolHTTP calls entirely. Newbuild_index_payload(),build_fetch_response_payload(),build_single_pass_payload()methods constructExecutionContextobjects with all ORM data pre-loadedbackend/backend/workers/,file_execution_tasks.py,celery_task.py(old in-process workers)Workers (70 files, ~19,500 new lines)
workers/executor/): NewWorkerType.EXECUTORCelery worker withLegacyExecutorhandling all operations:extract,index,answer_prompt,single_pass_extraction,summarize,agentic_extraction,structure_pipelineBaseExecutor→ExecutorRegistry(class-decorator self-registration) →ExecutionOrchestrator→ExecutionDispatcher(Celerysend_task)ExecutorToolShim: Lightweight stand-in forBaseToolthat satisfies SDK1 adapter interfaces without Docker contextworkers/file_processing/structure_tool_task.py): Celery-native replacement for Docker-basedStructureTool.run()with profile overrides, smart table detection, and output file managementSDK1 (22 files)
unstract/sdk1/src/unstract/sdk1/execution/):ExecutionContext,ExecutionResult(serializable DTOs for Celery JSON transport),ExecutionDispatcher(dispatch()+dispatch_with_callback()),BaseExecutor,ExecutorRegistryFrontend (275 files)
usePromptStudioSockethook listens forprompt_studio_resultSocket.IO events.usePromptRunrewritten from polling to fire-and-forget.PromptRun.jsxconditionally renders async or sync path based on feature flagDocker / Infrastructure
worker-executor-v2,worker-prompt-studio-callback,worker-metricsworkers-v2services from opt-in (profiles: [workers-v2]) to defaultArchitecture Change
Can this PR break any existing features? If yes, please list possible items. If no, please explain why.
Yes, potential breaking changes — mitigated by feature flag:
Prompt Studio IDE async path — gated by
async_prompt_executionfeature flag. When flag is OFF (default), all 3 endpoints (index_document,fetch_response,single_pass_extraction) use the old sync path returning HTTP 200. No behavior change for existing users.Review Guidelines
This PR touches 441 files across backend, frontend, workers, and SDK1. Below is a structured review path to navigate it efficiently.
Code Structure Overview
Recommended Review Order
Review in dependency order — each layer builds on the previous:
execution/context.py,result.py,dispatcher.py,registry.pyto_dict()/from_dict()round-trips correct? Is theOperationenum complete? Queue naming (celery_executor_{name}).executor/tasks.py,executor/worker.pyexecute_extraction: retry policy, error handling, log correlation.executors/legacy_executor.py(focus on_OPERATION_MAP+execute())answer_prompt.py,index.py,retrieval.pyexecutor_paramsmatch whatbuild_*_payload()sends? Lazy import pattern (_get_prompt_deps(),_get_indexing_deps()).views.pylines 351–583dispatch_with_callbackusage with correct callback task names and queue.prompt_studio_helper.py(build_index_payload,build_fetch_response_payload,build_single_pass_payload)executor_params? Key compatibility with executor handlers.tasks.py(callback tasks)ide_prompt_complete: ORM writes viaOutputManagerHelper. Socket.IO emission shape. Error callback cleanup. State store setup/teardown.usePromptRun.js,usePromptStudioSocket.js,PromptRun.jsx_emit_result(). Timeout handling. Status cleanup on failure.docker/docker-compose.yamlworker-executor-v2,worker-prompt-studio-callback. Removed old workers. Queue bindings.workers/tests/test_sanity_phase*.pyData Flow (End-to-End)
Known Code Duplication
views.py— 3 view actionsbuild_payload → get_dispatcher → dispatch_with_callback → return 202tasks.py— callback taskside_index_completeandide_prompt_completefollow same structure: extract kwargs → setup state → check result → ORM work → emit → cleanuptasks.py— legacy tasksrun_index_document,run_fetch_response,run_single_pass_extractionkept alongside new callback tasksFiles Safe to Skim
workers/tests/— 24 test files, ~10,000 lines. Well-structured but high volume. Focus ontest_sanity_phase2.py(full Celery chain) andtest_sanity_phase4.py(IDE payload compatibility) as representative examples.workers/executor/executors/retrievers/— 7 retriever implementations. All follow the same pattern. Reviewing one (simple.py) covers the pattern.architecture-*.md,phase*.md) — Reference material, not code.Relevant Docs
architecture-executor-system.md,architecture-flow-diagram.md,architecture-sequence-diagrams.mdin repo rootarchitecture-migration-phases.mdrollout-plan.mdRelated Issues or PRs
Dependencies Versions / Env Variables
New env variables:
FLIPT_SERVICE_AVAILABLEfalseNotes on Testing
cd workers && uv run pytest -v— 490+ tests (444 inworkers/tests/+ extras)cd unstract/sdk1 && uv run pytest -v— 146+ testscd backend && python -m pytest prompt_studio/prompt_studio_core_v2/test_tasks.py -vasync_prompt_execution=true), trigger prompt runs in IDE, verify Socket.IO events deliver results via Network → WS → Messages tabScreenshots
N/A (primarily backend/worker architecture change; frontend UX unchanged when feature flag is off)
Checklist
I have read and understood the Contribution Guidelines.