Skip to content

(feat) Add metadata tools for Couchbase MCP server #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions src/mcp_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import timedelta
from typing import Any
from typing import Any, Dict, List
from mcp.server.fastmcp import FastMCP, Context
from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
@@ -10,6 +10,13 @@
from contextlib import asynccontextmanager
from typing import AsyncIterator

# Import the helper functions from meta.py
from .meta import (
_get_cluster_info,
_get_bucket_info,
_list_fts_indexes,
_list_n1ql_indexes,
)

MCP_SERVER_NAME = "couchbase"

@@ -86,7 +93,66 @@ async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
mcp = FastMCP(MCP_SERVER_NAME, lifespan=app_lifespan)


# Tools
# --- Metadata Tools ---

@mcp.tool()
def get_cluster_info(ctx: Context) -> Dict[str, Any]:
"""Get diagnostic information about the Couchbase cluster."""
cluster = ctx.request_context.lifespan_context.cluster
if not cluster:
raise ValueError("Cluster connection not available in context.")
try:
return _get_cluster_info(cluster)
except Exception as e:
logger.error(f"Tool error getting cluster info: {type(e).__name__} - {e}")
# Re-raise to signal failure to the MCP framework/caller
raise
Comment on lines +99 to +109
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider extracting the cluster availability check and exception handling into a reusable helper function, as this pattern is repeated in multiple tool functions. This would reduce code duplication and improve maintainability. For example, you could have a function that takes a context and returns the cluster, raising an exception if it's not available, and then use that function in each tool.

Suggested change
def get_cluster_info(ctx: Context) -> Dict[str, Any]:
"""Get diagnostic information about the Couchbase cluster."""
cluster = ctx.request_context.lifespan_context.cluster
if not cluster:
raise ValueError("Cluster connection not available in context.")
try:
return _get_cluster_info(cluster)
except Exception as e:
logger.error(f"Tool error getting cluster info: {type(e).__name__} - {e}")
# Re-raise to signal failure to the MCP framework/caller
raise
def get_cluster_from_context(ctx: Context) -> Cluster:
cluster = ctx.request_context.lifespan_context.cluster
if not cluster:
raise ValueError("Cluster connection not available in context.")
return cluster
@mcp.tool()
def get_cluster_info(ctx: Context) -> Dict[str, Any]:
"""Get diagnostic information about the Couchbase cluster."""
cluster = get_cluster_from_context(ctx)
try:
return _get_cluster_info(cluster)
except Exception as e:
logger.error(f"Tool error getting cluster info: {type(e).__name__} - {e}")
# Re-raise to signal failure to the MCP framework/caller
raise



@mcp.tool()
def get_bucket_info(ctx: Context) -> Dict[str, Any]:
"""Get configuration settings for the current Couchbase bucket."""
bucket = ctx.request_context.lifespan_context.bucket
if not bucket:
raise ValueError("Bucket connection not available in context.")
try:
return _get_bucket_info(bucket)
except Exception as e:
logger.error(f"Tool error getting bucket info: {type(e).__name__} - {e}")
raise

Comment on lines +113 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider extracting the bucket availability check and exception handling into a reusable helper function, as this pattern is repeated in multiple tool functions. This would reduce code duplication and improve maintainability. For example, you could have a function that takes a context and returns the bucket, raising an exception if it's not available, and then use that function in each tool.

Suggested change
def get_bucket_info(ctx: Context) -> Dict[str, Any]:
"""Get configuration settings for the current Couchbase bucket."""
bucket = ctx.request_context.lifespan_context.bucket
if not bucket:
raise ValueError("Bucket connection not available in context.")
try:
return _get_bucket_info(bucket)
except Exception as e:
logger.error(f"Tool error getting bucket info: {type(e).__name__} - {e}")
raise
def get_bucket_from_context(ctx: Context) -> Any:
bucket = ctx.request_context.lifespan_context.bucket
if not bucket:
raise ValueError("Bucket connection not available in context.")
return bucket
@mcp.tool()
def get_bucket_info(ctx: Context) -> Dict[str, Any]:
"""Get configuration settings for the current Couchbase bucket."""
bucket = get_bucket_from_context(ctx)
try:
return _get_bucket_info(bucket)
except Exception as e:
logger.error(f"Tool error getting bucket info: {type(e).__name__} - {e}")
raise


@mcp.tool()
def list_fts_indexes(ctx: Context) -> List[Dict[str, Any]]:
"""List all Full-Text Search (FTS) indexes in the cluster."""
cluster = ctx.request_context.lifespan_context.cluster
if not cluster:
raise ValueError("Cluster connection not available in context.")
try:
return _list_fts_indexes(cluster)
except Exception as e:
logger.error(f"Tool error listing FTS indexes: {type(e).__name__} - {e}")
raise


@mcp.tool()
def list_n1ql_indexes(ctx: Context) -> List[Dict[str, Any]]:
"""List all N1QL (Query) indexes for the current bucket."""
cluster = ctx.request_context.lifespan_context.cluster
bucket = ctx.request_context.lifespan_context.bucket
if not cluster:
raise ValueError("Cluster connection not available in context.")
if not bucket:
raise ValueError("Bucket connection not available in context.")
try:
return _list_n1ql_indexes(cluster, bucket.name)
except Exception as e:
logger.error(f"Tool error listing N1QL indexes: {type(e).__name__} - {e}")
raise


# --- Existing Tools ---

@mcp.tool()
def get_scopes_and_collections_in_bucket(ctx: Context) -> dict[str, list[str]]:
"""Get the names of all scopes and collections in the bucket.
127 changes: 127 additions & 0 deletions src/meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import logging
from typing import Any, Dict, List
from couchbase.cluster import Cluster
from couchbase.bucket import Bucket
from couchbase.management.buckets import BucketSettings
from couchbase.management.search import SearchIndex
from couchbase.management.queries import QueryIndex
from couchbase.exceptions import CouchbaseException

# Configure logging for this module
meta_logger = logging.getLogger(__name__)
meta_logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
if not meta_logger.handlers:
meta_logger.addHandler(handler)


def _get_cluster_info(cluster: Cluster) -> Dict[str, Any]:
"""Helper function to get basic cluster diagnostics."""
try:
diag = cluster.diagnostics()
# Convert diagnostics report to a simpler dict
report = {
'id': diag.id,
'sdk': diag.sdk,
'state': diag.state.name,
'endpoints': {}
}
for service_type, endpoints in diag.endpoints.items():
report['endpoints'][service_type.name] = [
{'id': ep.id, 'state': ep.state.name, 'local': ep.local, 'remote': ep.remote, 'last_activity': str(ep.last_activity)}
for ep in endpoints
]
return report
except CouchbaseException as e:
meta_logger.error(f"Error getting cluster info: {type(e).__name__} - {e}")
raise # Re-raise for the tool wrapper to handle
Comment on lines +37 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's good that you're catching CouchbaseException and logging it. Consider adding more context to the log message, such as the specific operation that failed. Also, ensure that the re-raised exception preserves the original traceback for debugging purposes.

Suggested change
except CouchbaseException as e:
meta_logger.error(f"Error getting cluster info: {type(e).__name__} - {e}")
raise # Re-raise for the tool wrapper to handle
meta_logger.error(f"Error getting cluster info: {type(e).__name__} - {e}", exc_info=True)
raise

except Exception as e: # Catch potential other errors during dict creation
meta_logger.error(f"Unexpected error processing cluster info: {type(e).__name__} - {e}")
raise


def _get_bucket_info(bucket: Bucket) -> Dict[str, Any]:
"""Helper function to get bucket settings."""
try:
# Need the bucket manager associated with the cluster
bm = bucket.bucket_manager()
settings = bm.get_bucket(bucket.name) # Get settings for the specific bucket instance

# Convert BucketSettings to a dictionary
return {
'name': settings.name,
'bucket_type': settings.bucket_type.value,
'ram_quota_mb': settings.ram_quota_mb,
'num_replicas': settings.num_replicas,
'replica_indexes': settings.replica_indexes,
'flush_enabled': settings.flush_enabled,
'max_ttl': settings.max_ttl,
'compression_mode': settings.compression_mode.value,
'minimum_durability_level': str(settings.minimum_durability_level), # Convert enum/object to string
'storage_backend': settings.storage_backend.value,
'eviction_policy': settings.eviction_policy.value,
'conflict_resolution_type': settings.conflict_resolution_type.value,
}
except CouchbaseException as e:
meta_logger.error(f"Error getting bucket info for {bucket.name}: {type(e).__name__} - {e}")
raise
Comment on lines +67 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's good that you're catching CouchbaseException and logging it. Consider adding more context to the log message, such as the specific operation that failed. Also, ensure that the re-raised exception preserves the original traceback for debugging purposes.

Suggested change
except CouchbaseException as e:
meta_logger.error(f"Error getting bucket info for {bucket.name}: {type(e).__name__} - {e}")
raise
meta_logger.error(f"Error getting bucket info for {bucket.name}: {type(e).__name__} - {e}", exc_info=True)
raise

except Exception as e: # Catch potential other errors during dict creation
meta_logger.error(f"Unexpected error processing bucket info: {type(e).__name__} - {e}")
raise


def _list_fts_indexes(cluster: Cluster) -> List[Dict[str, Any]]:
"""Helper function to list all Full-Text Search (FTS) indexes."""
try:
index_manager = cluster.search_indexes()
indexes = index_manager.get_all_indexes()
# Convert SearchIndex objects to dictionaries
result_list = []
for index in indexes:
result_list.append({
'name': index.name,
'type': index.type,
'source_name': index.source_name,
'uuid': index.uuid,
'params': index.params, # These can be complex dicts
'source_params': index.source_params,
'plan_params': index.plan_params,
'source_uuid': index.source_uuid,
})
return result_list
except CouchbaseException as e:
meta_logger.error(f"Error listing FTS indexes: {type(e).__name__} - {e}")
raise
Comment on lines +94 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's good that you're catching CouchbaseException and logging it. Consider adding more context to the log message, such as the specific operation that failed. Also, ensure that the re-raised exception preserves the original traceback for debugging purposes.

Suggested change
except CouchbaseException as e:
meta_logger.error(f"Error listing FTS indexes: {type(e).__name__} - {e}")
raise
meta_logger.error(f"Error listing FTS indexes: {type(e).__name__} - {e}", exc_info=True)
raise

except Exception as e: # Catch potential other errors during dict creation
meta_logger.error(f"Unexpected error processing FTS indexes: {type(e).__name__} - {e}")
raise


def _list_n1ql_indexes(cluster: Cluster, bucket_name: str) -> List[Dict[str, Any]]:
"""Helper function to list N1QL (Query) indexes for a specific bucket."""
try:
index_manager = cluster.query_indexes()
indexes = index_manager.get_all_indexes(bucket_name=bucket_name)
# Convert QueryIndex objects to dictionaries
result_list = []
for index in indexes:
result_list.append({
'name': index.name,
'keyspace': index.keyspace_id, # Corrected attribute name
'namespace': index.namespace_id, # Corrected attribute name
'keys': index.index_key, # Corrected attribute name
'condition': index.condition,
'state': index.state,
'type': index.type.value, # e.g., 'gsi'
'is_primary': index.is_primary,
'partition': index.partition
})
return result_list
except CouchbaseException as e:
meta_logger.error(f"Error listing N1QL indexes for bucket {bucket_name}: {type(e).__name__} - {e}")
raise
Comment on lines +122 to +124
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's good that you're catching CouchbaseException and logging it. Consider adding more context to the log message, such as the specific operation that failed. Also, ensure that the re-raised exception preserves the original traceback for debugging purposes.

Suggested change
except CouchbaseException as e:
meta_logger.error(f"Error listing N1QL indexes for bucket {bucket_name}: {type(e).__name__} - {e}")
raise
meta_logger.error(f"Error listing N1QL indexes for bucket {bucket_name}: {type(e).__name__} - {e}", exc_info=True)
raise

except Exception as e: # Catch potential other errors during dict creation
meta_logger.error(f"Unexpected error processing N1QL indexes: {type(e).__name__} - {e}")
raise