Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions .github/workflows/pr_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,80 @@ jobs:
run: make test-python-integration
- name: Minimize uv cache
run: uv cache prune --ci

mcp-feature-server-runtime:
if:
((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) ||
(github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) &&
github.repository == 'feast-dev/feast'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
ref: refs/pull/${{ github.event.pull_request.number }}/merge
submodules: recursive
persist-credentials: false
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
architecture: x64
- name: Install the latest version of uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true
- name: Install dependencies
run: make install-python-dependencies-ci
- name: Start feature server (MCP HTTP)
run: |
cd examples/mcp_feature_store
uv run python -m feast.cli.cli serve --host 127.0.0.1 --port 6566 --workers 1 --no-access-log &
SERVER_PID=$!
echo $SERVER_PID > /tmp/feast_server_pid
for i in $(seq 1 60); do
kill -0 "$SERVER_PID" || { echo "server died"; exit 1; }
if curl -fsS http://127.0.0.1:6566/health >/dev/null; then
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the server crashes immediately after backgrounding, this loop burns 60 seconds before failing. Add kill -0 "$SERVER_PID" || { echo "server died"; exit 1; } at the top of the loop body.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break
fi
sleep 1
done
curl -fsS http://127.0.0.1:6566/health >/dev/null
- name: Validate MCP endpoint
run: |
rm -f /tmp/mcp_headers /tmp/mcp_headers2 /tmp/mcp_body2

curl -sS -D /tmp/mcp_headers -o /dev/null --max-time 10 \
-X POST \
-H "Accept: application/json, text/event-stream" \
-H "Content-Type: application/json" \
-H "mcp-protocol-version: 2025-03-26" \
--data '{}' \
http://127.0.0.1:6566/mcp

SESSION_ID=$(grep -i "^mcp-session-id:" /tmp/mcp_headers | head -1 | awk '{print $2}' | tr -d '\r')
if [ -z "${SESSION_ID}" ]; then
cat /tmp/mcp_headers || true
exit 1
fi

curl -sS -D /tmp/mcp_headers2 -o /tmp/mcp_body2 --max-time 10 \
-X POST \
-H "Accept: application/json, text/event-stream" \
-H "Content-Type: application/json" \
-H "mcp-protocol-version: 2025-03-26" \
-H "mcp-session-id: ${SESSION_ID}" \
--data '{}' \
http://127.0.0.1:6566/mcp || true

grep -Eq "^HTTP/.* 400" /tmp/mcp_headers2
grep -Eiq "^content-type: application/json" /tmp/mcp_headers2
grep -Eiq "^mcp-session-id: ${SESSION_ID}" /tmp/mcp_headers2
- name: Stop feature server
if: always()
run: |
if [ -f /tmp/feast_server_pid ]; then
kill "$(cat /tmp/feast_server_pid)" || true
fi
- name: Minimize uv cache
if: always()
run: uv cache prune --ci
3 changes: 3 additions & 0 deletions docs/getting-started/genai.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,13 @@ Feast supports the Model Context Protocol (MCP), which enables AI agents and app
type: mcp
enabled: true
mcp_enabled: true
mcp_transport: http
mcp_server_name: "feast-feature-store"
mcp_server_version: "1.0.0"
```

By default, Feast uses the SSE-based MCP transport (`mcp_transport: sse`). Streamable HTTP (`mcp_transport: http`) is recommended for improved compatibility with some MCP clients.

### How It Works

The MCP integration uses the `fastapi_mcp` library to automatically transform your Feast feature server's FastAPI endpoints into MCP-compatible tools. When you enable MCP support:
Expand Down
51 changes: 51 additions & 0 deletions docs/reference/feature-servers/mcp-feature-server.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# MCP Feature Server

## Overview

Feast can expose the Python Feature Server as an MCP (Model Context Protocol) server using `fastapi_mcp`. When enabled, MCP clients can discover and call Feast tools such as online feature retrieval.

## Installation

```bash
pip install feast[mcp]
```

## Configuration

Add an MCP `feature_server` block to your `feature_store.yaml`:

```yaml
feature_server:
type: mcp
enabled: true
mcp_enabled: true
mcp_transport: http
mcp_server_name: "feast-feature-store"
mcp_server_version: "1.0.0"
```

### mcp_transport

`mcp_transport` controls how MCP is mounted into the Feature Server:

- `sse`: SSE-based transport. This is the default for backward compatibility.
- `http`: Streamable HTTP transport. This is recommended for improved compatibility with some MCP clients.

If `mcp_transport: http` is configured but your installed `fastapi_mcp` version does not support Streamable HTTP mounting, Feast will fail fast with an error asking you to upgrade `fastapi_mcp` (or reinstall `feast[mcp]`).

## Endpoints

MCP is mounted at:

- `/mcp`

## Connecting an MCP client

Use your MCP client’s “HTTP” configuration and point it to the Feature Server base URL. For example, if your Feature Server runs at `http://localhost:6566`, use:

- `http://localhost:6566/mcp`

## Troubleshooting

- If you see a deprecation warning about `mount()` at runtime, upgrade `fastapi_mcp` and use `mcp_transport: http` or `mcp_transport: sse`.
- If your MCP client has intermittent connectivity issues with `mcp_transport: sse`, switch to `mcp_transport: http`.
3 changes: 2 additions & 1 deletion examples/mcp_feature_store/feature_store.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ feature_server:
type: mcp
enabled: true
mcp_enabled: true # Enable MCP support - defaults to false
mcp_transport: http
mcp_server_name: "feast-feature-store"
mcp_server_version: "1.0.0"
feature_logging:
enabled: false

entity_key_serialization_version: 3
entity_key_serialization_version: 3
16 changes: 15 additions & 1 deletion sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ async def websocket_endpoint(websocket: WebSocket):

def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore"):
"""Add MCP support to the FastAPI app if enabled in configuration."""
mcp_transport_not_supported_error = None
try:
# Check if MCP is enabled in feature server config
if (
Expand All @@ -702,7 +703,16 @@ def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore"):
and store.config.feature_server.type == "mcp"
and getattr(store.config.feature_server, "mcp_enabled", False)
):
from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app
try:
from feast.infra.mcp_servers.mcp_server import (
McpTransportNotSupportedError,
add_mcp_support_to_app,
)

mcp_transport_not_supported_error = McpTransportNotSupportedError
except ImportError as e:
logger.error(f"Error checking/adding MCP support: {e}")
return

mcp_server = add_mcp_support_to_app(app, store, store.config.feature_server)

Expand All @@ -713,6 +723,10 @@ def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore"):
else:
logger.debug("MCP support is not enabled in feature server configuration")
except Exception as e:
if mcp_transport_not_supported_error and isinstance(
e, mcp_transport_not_supported_error
):
raise
logger.error(f"Error checking/adding MCP support: {e}")
# Don't fail the entire server if MCP fails to initialize

Expand Down
5 changes: 2 additions & 3 deletions sdk/python/feast/infra/mcp_servers/mcp_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Literal, Optional
from typing import Literal

from pydantic import StrictBool, StrictStr

Expand All @@ -20,8 +20,7 @@ class McpFeatureServerConfig(BaseFeatureServerConfig):
# MCP server version
mcp_server_version: StrictStr = "1.0.0"

# Optional MCP transport configuration
mcp_transport: Optional[StrictStr] = None
mcp_transport: Literal["sse", "http"] = "sse"

# The endpoint definition for transformation_service (inherited from base)
transformation_service_endpoint: StrictStr = "localhost:6566"
35 changes: 31 additions & 4 deletions sdk/python/feast/infra/mcp_servers/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

import logging
from typing import Optional
from typing import Literal, Optional

from feast.feature_store import FeatureStore

Expand All @@ -26,6 +26,10 @@
FastApiMCP = None


class McpTransportNotSupportedError(RuntimeError):
pass


def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastApiMCP"]:
"""Add MCP support to the FastAPI app if enabled in configuration."""
if not MCP_AVAILABLE:
Expand All @@ -40,8 +44,29 @@ def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastAp
description="Feast Feature Store MCP Server - Access feature store data and operations through MCP",
)

# Mount the MCP server to the FastAPI app
mcp.mount()
transport: Literal["sse", "http"] = getattr(config, "mcp_transport", "sse")
if transport == "http":
mount_http = getattr(mcp, "mount_http", None)
if mount_http is None:
raise McpTransportNotSupportedError(
"mcp_transport=http requires fastapi_mcp with FastApiMCP.mount_http(). "
"Upgrade fastapi_mcp (or install feast[mcp]) to a newer version."
)
mount_http()
elif transport == "sse":
mount_sse = getattr(mcp, "mount_sse", None)
if mount_sse is not None:
mount_sse()
else:
logger.warning(
"transport sse not supported, fallback to the deprecated mount()."
)
mcp.mount()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent fallback to the deprecated mount() with no warning. Operator configured mcp_transport: sse but gets legacy behavior with no indication. Worth a logger.warning here.

else:
# Code should not reach here
raise McpTransportNotSupportedError(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch is unreachable — Literal["sse", "http"] in the config rejects anything else at parse time. Either remove it or add a comment that it's a defensive guard for programmatic callers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Anarion-zuo u didn't solve it btw :)

f"Unsupported mcp_transport={transport!r}. Expected 'sse' or 'http'."
)

logger.info(
"MCP support has been enabled for the Feast feature server at /mcp endpoint"
Expand All @@ -53,6 +78,8 @@ def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastAp

return mcp

except McpTransportNotSupportedError:
raise
except Exception as e:
logger.error(f"Failed to initialize MCP integration: {e}")
logger.error(f"Failed to initialize MCP integration: {e}", exc_info=True)
return None
66 changes: 30 additions & 36 deletions sdk/python/tests/integration/test_mcp_feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from pydantic import ValidationError

from feast.feature_store import FeatureStore
from feast.infra.mcp_servers.mcp_config import McpFeatureServerConfig
Expand Down Expand Up @@ -49,7 +50,7 @@ def test_mcp_server_functionality_with_mock_store(self):
mcp_server_version="1.0.0",
)

mock_mcp_instance = Mock()
mock_mcp_instance = Mock(spec_set=["mount_sse", "mount_http", "mount"])
mock_fast_api_mcp.return_value = mock_mcp_instance

result = add_mcp_support_to_app(mock_app, mock_store, config)
Expand All @@ -58,7 +59,7 @@ def test_mcp_server_functionality_with_mock_store(self):
self.assertIsNotNone(result)
self.assertEqual(result, mock_mcp_instance)
mock_fast_api_mcp.assert_called_once()
mock_mcp_instance.mount.assert_called_once()
mock_mcp_instance.mount_sse.assert_called_once()

@patch("feast.infra.mcp_servers.mcp_server.MCP_AVAILABLE", True)
@patch("feast.infra.mcp_servers.mcp_server.FastApiMCP")
Expand All @@ -77,7 +78,7 @@ def test_complete_mcp_setup_flow(self, mock_fast_api_mcp):
transformation_service_endpoint="localhost:6566",
)

mock_mcp_instance = Mock()
mock_mcp_instance = Mock(spec_set=["mount_sse", "mount_http", "mount"])
mock_fast_api_mcp.return_value = mock_mcp_instance

# Execute the flow
Expand All @@ -90,7 +91,7 @@ def test_complete_mcp_setup_flow(self, mock_fast_api_mcp):
name="e2e-test-server",
description="Feast Feature Store MCP Server - Access feature store data and operations through MCP",
)
mock_mcp_instance.mount.assert_called_once()
mock_mcp_instance.mount_sse.assert_called_once()
self.assertEqual(result, mock_mcp_instance)

@pytest.mark.skipif(
Expand Down Expand Up @@ -160,36 +161,29 @@ def test_feature_server_with_mcp_config(self):
def test_mcp_server_configuration_validation(self):
"""Test comprehensive MCP server configuration validation."""
# Test various configuration combinations
test_configs = [
{
"enabled": True,
"mcp_enabled": True,
"mcp_server_name": "test-server-1",
"mcp_server_version": "1.0.0",
"mcp_transport": "sse",
},
{
"enabled": True,
"mcp_enabled": True,
"mcp_server_name": "test-server-2",
"mcp_server_version": "2.0.0",
"mcp_transport": "websocket",
},
{
"enabled": False,
"mcp_enabled": False,
"mcp_server_name": "disabled-server",
"mcp_server_version": "1.0.0",
"mcp_transport": None,
},
]

for config_dict in test_configs:
config = McpFeatureServerConfig(**config_dict)
self.assertEqual(config.enabled, config_dict["enabled"])
self.assertEqual(config.mcp_enabled, config_dict["mcp_enabled"])
self.assertEqual(config.mcp_server_name, config_dict["mcp_server_name"])
self.assertEqual(
config.mcp_server_version, config_dict["mcp_server_version"]
for transport in ["sse", "http"]:
config = McpFeatureServerConfig(
enabled=True,
mcp_enabled=True,
mcp_server_name="test-server",
mcp_server_version="1.0.0",
mcp_transport=transport,
)
self.assertEqual(config.mcp_transport, transport)

config_default = McpFeatureServerConfig(
enabled=True,
mcp_enabled=True,
mcp_server_name="test-server-default",
mcp_server_version="1.0.0",
)
self.assertEqual(config_default.mcp_transport, "sse")

with self.assertRaises(ValidationError):
McpFeatureServerConfig(
enabled=True,
mcp_enabled=True,
mcp_server_name="bad-transport",
mcp_server_version="1.0.0",
mcp_transport="websocket",
)
self.assertEqual(config.mcp_transport, config_dict["mcp_transport"])
Loading
Loading