# Cosdata Python SDK
A Python SDK for interacting with the Cosdata Vector Database.
## Installation
```bash
pip install cosdata-sdk
```
## Quick Start
```python
from cosdata import Client # Import the Client class
# Initialize the client (all parameters are optional)
client = Client(
host="http://127.0.0.1:8443", # Default host
username="admin", # Default username
password="admin", # Default password
verify=False # SSL verification
)
# Create a collection
collection = client.create_collection(
name="my_collection",
dimension=768, # Vector dimension
description="My vector collection"
)
# Create an index (all parameters are optional)
index = collection.create_index(
distance_metric="cosine", # Default: cosine
num_layers=10, # Default: 10
max_cache_size=1000, # Default: 1000
ef_construction=128, # Default: 128
ef_search=64, # Default: 64
neighbors_count=32, # Default: 32
level_0_neighbors_count=64 # Default: 64
)
# Generate some vectors (example with random data)
import numpy as np
def generate_random_vector(id: int, dimension: int) -> dict:
values = np.random.uniform(-1, 1, dimension).tolist()
return {
"id": f"vec_{id}",
"dense_values": values,
"document_id": f"doc_{id//10}", # Group vectors into documents
"metadata": { # Optional metadata
"created_at": "2024-03-20",
"category": "example"
}
}
# Generate and insert vectors
vectors = [generate_random_vector(i, 768) for i in range(100)]
# Add vectors using a transaction
with collection.transaction() as txn:
# Single vector upsert (creates or updates)
txn.upsert_vector(vectors[0])
# Single vector create (only for new vectors)
txn.create_vector(vectors[1])
# Batch upsert for remaining vectors
txn.batch_upsert_vectors(vectors[2:], max_workers=8, max_retries=3)
# Add vectors using streaming operations (immediate availability)
# Single vector upsert - returns immediately with result
result = collection.stream_upsert(vectors[0])
print(f"Stream upsert result: {result}")
# Multiple vectors upsert - returns immediately with result
result = collection.stream_upsert(vectors[1:])
print(f"Stream batch upsert result: {result}")
# Delete vectors using streaming operations
result = collection.stream_delete("vector-1")
print(f"Stream delete result: {result}")
# Add vectors using streaming operations (immediate availability)
# Single vector upsert - returns immediately with result
result = collection.stream_upsert(vectors[0])
print(f"Stream upsert result: {result}")
# Multiple vectors upsert - returns immediately with result
result = collection.stream_upsert(vectors[1:])
print(f"Stream batch upsert result: {result}")
# Delete vectors using streaming operations
result = collection.stream_delete("vector-1")
print(f"Stream delete result: {result}")
# Search for similar vectors
results = collection.search.dense(
query_vector=vectors[0]["dense_values"], # Use first vector as query
top_k=5, # Number of nearest neighbors
return_raw_text=True
)
# Fetch a specific vector
vector = collection.vectors.get("vec_1")
# Get collection information
collection_info = collection.get_info()
print(f"Collection info: {collection_info}")
# List all collections
print("Available collections:")
for coll in client.collections():
print(f" - {coll.name}")
# Version management
current_version = collection.versions.get_current()
print(f"Current version: {current_version}")
```
## 🧩 Embedding Generation (Optional Convenience Feature)
Cosdata SDK provides a convenience utility for generating embeddings using [cosdata-fastembed](https://github.com/cosdata/cosdata-fastembed). This is optional—if you already have your own embeddings, you can use those directly. If you want to generate embeddings in Python, you can use the following utility:
```python
from cosdata.embedding import embed_texts
texts = [
"Cosdata makes vector search easy!",
"This is a test of the embedding utility."
]
embeddings = embed_texts(texts, model_name="thenlper/gte-base") # Specify any supported model
```
- See the [cosdata-fastembed supported models list](https://github.com/cosdata/cosdata-fastembed#supported-models) for available model names and dimensions.
- The output is a list of lists (one embedding per input text), ready to upsert into your collection.
- If `cosdata-fastembed` is not installed, a helpful error will be raised.
## Methods
### embed_texts
- `embed_texts(texts: List[str], model_name: str = "BAAI/bge-small-en-v1.5") -> List[List[float]]`
- Generates embeddings for a list of texts using cosdata-fastembed. Returns a list of embedding vectors (as plain Python lists). Raises ImportError if cosdata-fastembed is not installed.
Example:
```python
from cosdata.embedding import embed_texts
embeddings = embed_texts(["hello world"], model_name="thenlper/gte-base")
```
## API Reference
### Client
The main client for interacting with the Vector Database API.
```python
client = Client(
host="http://127.0.0.1:8443", # Optional
username="admin", # Optional
password="admin", # Optional
verify=False # Optional
)
```
Methods:
- `create_collection(...) -> Collection`
- Returns a `Collection` object. Collection info can be accessed via `collection.get_info()`:
```python
{
"name": str,
"description": str,
"dense_vector": {"enabled": bool, "dimension": int},
"sparse_vector": {"enabled": bool},
"tf_idf_options": {"enabled": bool}
}
```
- `collections() -> List[Collection]`
- Returns a list of `Collection` objects.
- `get_collection(name: str) -> Collection`
- Returns a `Collection` object for the given name.
- `list_collections() -> List[Dict[str, Any]]`
- Returns a list of collection information dictionaries.
- `indexes` - Access to client-level index management (see Indexes section below)
### Collection
The Collection class provides access to all collection-specific operations.
```python
collection = client.create_collection(
name="my_collection",
dimension=768,
description="My collection"
)
```
Methods:
- `create_index(...) -> Index`
- Returns an `Index` object. Index info can be fetched (if implemented) as:
```python
{
"dense": {...},
"sparse": {...},
"tf-idf": {...}
}
```
- `create_sparse_index(name: str) -> Index`
- Creates a sparse index for the collection.
- `create_tf_idf_index(name: str, sample_threshold: int = 1000, k1: float = 1.2, b: float = 0.75) -> Index`
- Creates a TF-IDF index for the collection.
- `get_index(name: str) -> Index`
- Returns an `Index` object for the given name.
- `get_info() -> dict`
- Returns collection metadata as above.
- `delete() -> None`
- Deletes the collection.
- `load() -> None`
- Loads the collection into memory.
- `unload() -> None`
- Unloads the collection from memory.
- `create_transaction() -> Transaction`
- Creates a new transaction for this collection.
- `transaction() -> Transaction` (context manager)
- Creates a transaction with automatic commit/abort.
- `stream_upsert(vectors: Union[Dict[str, Any], List[Dict[str, Any]]]) -> Dict[str, Any]`
- Immediate vector upsert with immediate availability.
- `stream_delete(vector_id: str) -> Dict[str, Any]`
- Immediate vector deletion.
- `neighbors(vector_id: str) -> Dict[str, Any]`
- Fetch neighbors for a given vector ID.
- `set_version(version: str) -> Dict[str, Any]`
- Set the current version of the collection.
- `indexing_status() -> Dict[str, Any]`
- Get the indexing status of this collection.
- `loaded(client) -> List[Dict[str, Any]]` (class method)
- Get a list of all loaded collections.
### Indexes
Index management is handled directly through the Collection object.
```python
# Create a dense index
dense_index = collection.create_index(
distance_metric="cosine",
num_layers=7
)
# Create a sparse index
sparse_index = collection.create_sparse_index(
name="my_sparse_index",
quantization=64
)
# Create a TF-IDF index
tf_idf_index = collection.create_tf_idf_index(
name="my_tf_idf_index",
sample_threshold=1000
)
# Get an existing index
index = collection.get_index("my_index")
# Delete an index
index.delete()
```
Methods:
- `create_index(distance_metric: str = "cosine", num_layers: int = 7, max_cache_size: int = 1000, ef_construction: int = 512, ef_search: int = 256, neighbors_count: int = 32, level_0_neighbors_count: int = 64) -> Index`
- Creates a dense vector index for the collection.
- `create_sparse_index(name: str, quantization: int = 64, sample_threshold: int = 1000) -> Index`
- Creates a sparse index for the collection.
- `create_tf_idf_index(name: str, sample_threshold: int = 1000, k1: float = 1.2, b: float = 0.75) -> Index`
- Creates a TF-IDF index for the collection.
- `get_index(name: str) -> Index`
- Get an existing index by name.
### Index
The Index class represents an index in a collection.
```python
index = collection.get_index("my_index")
```
Methods:
- `delete() -> None`
- Deletes this index.
### Transaction
The Transaction class provides methods for vector operations with clear semantics.
```python
with collection.transaction() as txn:
txn.upsert_vector(vector) # Single vector (create or update)
txn.batch_upsert_vectors(vectors, max_workers=8, max_retries=3) # Multiple vectors, with parallelism and retries
```
Methods:
- `upsert_vector(vector: Dict[str, Any]) -> None`
- **Creates or updates** an existing vector. Use this when you want to ensure the vector exists regardless of whether it already does.
- `delete_vector(vector_id: str) -> None`
- Deletes a vector by ID in the transaction.
- `batch_upsert_vectors(vectors: List[Dict[str, Any]], max_workers: Optional[int] = None, max_retries: int = 3) -> None`
- `vectors`: List of vector dictionaries to upsert
- `max_workers`: Number of threads to use for parallel upserts (default: all available CPU threads)
- `max_retries`: Number of times to retry a failed batch (default: 3)
- `commit() -> None`
- Commits the transaction.
- `abort() -> None`
- Aborts the transaction.
- `get_status() -> Dict[str, Any]`
- Gets the status of the transaction.
- `poll_completion(target_status: str = 'complete', max_attempts: int = 10, sleep_interval: float = 1.0) -> Tuple[str, bool]`
- Polls transaction status until target status is reached or max attempts exceeded.
### Transaction Status Polling
The Transaction class provides methods for monitoring transaction status and polling for completion.
```python
# Create a transaction
with collection.transaction() as txn:
# Get current transaction status
status = txn.get_status()
print(f"Transaction status: {status}")
# Upsert some vectors
txn.upsert_vector(vector)
# Poll for completion with custom parameters
final_status, success = txn.poll_completion(
target_status="complete",
max_attempts=20,
sleep_interval=2.0
)
if success:
print(f"Transaction completed with status: {final_status}")
else:
print(f"Transaction may not have completed. Final status: {final_status}")
```
Methods:
- `get_status(collection_name: str = None, transaction_id: str = None) -> str`
- Get the current status of this transaction (or another, if specified)
- Returns transaction status as a string
- Throws exceptions for API errors with descriptive messages
- Parameters:
- `collection_name`: Name of the collection (default: this transaction's collection)
- `transaction_id`: ID of the transaction to check (default: this transaction's ID)
- `poll_completion(target_status: str = 'complete', max_attempts: int = 10, sleep_interval: float = 1.0, collection_name: str = None, transaction_id: str = None) -> tuple[str, bool]`
- Poll transaction status until target status is reached or max attempts exceeded
- Returns tuple of `(final_status, success_boolean)`
- Configurable polling parameters for different use cases
- Provides real-time progress feedback via console output
- Parameters:
- `target_status`: Target status to wait for (default: 'complete')
- `max_attempts`: Maximum number of polling attempts (default: 10)
- `sleep_interval`: Time to sleep between attempts in seconds (default: 1.0)
- `collection_name`: Name of the collection (default: this transaction's collection)
- `transaction_id`: Transaction ID to poll (default: this transaction's ID)
### Streaming Operations (Implicit Transactions)
The streaming operations provide immediate vector availability optimized for streaming scenarios. These methods use implicit transactions that prioritize data availability over batch processing efficiency.
**Design Philosophy:**
- **Optimized for streaming scenarios** where individual records must become immediately searchable
- **Serves real-time monitoring systems, live content feeds, and streaming analytics**
- **Prioritizes data availability over batch processing efficiency**
- **Automatic transaction management** - no client-managed transaction boundaries
- **System automatically handles batching and version allocation**
- **Abstracts transactional complexity while preserving append-only semantics**
```python
# Single vector stream upsert - immediately available for search
vector = {
"id": "vector-1",
"document_id": "doc-123",
"dense_values": [0.1, 0.2, 0.3, 0.4, 0.5],
"metadata": {"category": "technology"},
"text": "Sample text content"
}
result = collection.stream_upsert(vector)
print(f"Vector immediately available: {result}")
# Multiple vectors stream upsert
vectors = [vector1, vector2, vector3]
result = collection.stream_upsert(vectors)
print(f"All vectors immediately available: {result}")
# Single vector stream delete
result = collection.stream_delete("vector-1")
print(f"Vector immediately deleted: {result}")
```
Methods:
- `stream_upsert(vectors: Union[Dict[str, Any], List[Dict[str, Any]]]) -> Dict[str, Any]`
- Upsert vectors with immediate availability
- Returns response data immediately
- Accepts single vector dict or list of vector dicts
- `stream_delete(vector_id: str) -> Dict[str, Any]`
- Delete a vector with immediate effect
- Returns response data immediately
- Accepts single vector ID
### Transaction Status Polling
The Transaction class provides methods for monitoring transaction status and polling for completion.
```python
# Create a transaction
with collection.transaction() as txn:
# Get current transaction status
status = txn.get_status()
print(f"Transaction status: {status}")
# Upsert some vectors
txn.upsert_vector(vector)
# Poll for completion with custom parameters
final_status, success = txn.poll_completion(
target_status="complete",
max_attempts=20,
sleep_interval=2.0
)
if success:
print(f"Transaction completed with status: {final_status}")
else:
print(f"Transaction may not have completed. Final status: {final_status}")
```
Methods:
- `get_status(collection_name: str = None, transaction_id: str = None) -> str`
- Get the current status of this transaction (or another, if specified)
- Returns transaction status as a string
- Throws exceptions for API errors with descriptive messages
- Parameters:
- `collection_name`: Name of the collection (default: this transaction's collection)
- `transaction_id`: ID of the transaction to check (default: this transaction's ID)
- `poll_completion(target_status: str = 'complete', max_attempts: int = 10, sleep_interval: float = 1.0, collection_name: str = None, transaction_id: str = None) -> tuple[str, bool]`
- Poll transaction status until target status is reached or max attempts exceeded
- Returns tuple of `(final_status, success_boolean)`
- Configurable polling parameters for different use cases
- Provides real-time progress feedback via console output
- Parameters:
- `target_status`: Target status to wait for (default: 'complete')
- `max_attempts`: Maximum number of polling attempts (default: 10)
- `sleep_interval`: Time to sleep between attempts in seconds (default: 1.0)
- `collection_name`: Name of the collection (default: this transaction's collection)
- `transaction_id`: Transaction ID to poll (default: this transaction's ID)
### Streaming Operations (Implicit Transactions)
The streaming operations provide immediate vector availability optimized for streaming scenarios. These methods use implicit transactions that prioritize data availability over batch processing efficiency.
**Design Philosophy:**
- **Optimized for streaming scenarios** where individual records must become immediately searchable
- **Serves real-time monitoring systems, live content feeds, and streaming analytics**
- **Prioritizes data availability over batch processing efficiency**
- **Automatic transaction management** - no client-managed transaction boundaries
- **System automatically handles batching and version allocation**
- **Abstracts transactional complexity while preserving append-only semantics**
```python
# Single vector stream upsert - immediately available for search
vector = {
"id": "vector-1",
"document_id": "doc-123",
"dense_values": [0.1, 0.2, 0.3, 0.4, 0.5],
"metadata": {"category": "technology"},
"text": "Sample text content"
}
result = collection.stream_upsert(vector)
print(f"Vector immediately available: {result}")
# Multiple vectors stream upsert
vectors = [vector1, vector2, vector3]
result = collection.stream_upsert(vectors)
print(f"All vectors immediately available: {result}")
# Single vector stream delete
result = collection.stream_delete("vector-1")
print(f"Vector immediately deleted: {result}")
```
Methods:
- `stream_upsert(vectors: Union[Dict[str, Any], List[Dict[str, Any]]]) -> Dict[str, Any]`
- Upsert vectors with immediate availability
- Returns response data immediately
- Accepts single vector dict or list of vector dicts
- `stream_delete(vector_id: str) -> Dict[str, Any]`
- Delete a vector with immediate effect
- Returns response data immediately
- Accepts single vector ID
### Search
The Search class provides methods for vector similarity search.
```python
results = collection.search.dense(
query_vector=vector,
top_k=5,
return_raw_text=True
)
```
Methods:
- `dense(query_vector: List[float], top_k: int = 5, return_raw_text: bool = False) -> dict`
- Returns:
```python
{
"results": [
{
"id": str,
"document_id": str,
"score": float,
"text": str | None
},
...
]
}
```
- `batch_dense(queries: List[Dict[str, List[float]]], top_k: int = 5, return_raw_text: bool = False) -> List[dict]`
- Batch dense vector search. Each query must contain a "vector" field.
- `sparse(query_terms: List[dict], top_k: int = 5, early_terminate_threshold: float = 0.0, return_raw_text: bool = False) -> dict`
- Same structure as above.
- `batch_sparse(query_terms_list: List[List[dict]], top_k: int = 5, early_terminate_threshold: float = 0.0, return_raw_text: bool = False) -> List[dict]`
- Batch sparse vector search.
- `text(query_text: str, top_k: int = 5, return_raw_text: bool = False) -> dict`
- Same structure as above.
- `batch_text(query_texts: List[str], top_k: int = 5, return_raw_text: bool = False) -> List[dict]`
- Batch text search.
- `hybrid_search(queries: dict) -> dict`
- Hybrid search combining dense and sparse queries.
- `batch_tf_idf_search(queries: List[str], top_k: int = 10, return_raw_text: bool = False) -> List[dict]`
- Batch TF-IDF search.
### Vectors
The Vectors class provides methods for vector operations.
```python
vector = collection.vectors.get("vec_1")
exists = collection.vectors.exists("vec_1")
```
Methods:
- `get(vector_id: str) -> Vector`
- Returns a `Vector` dataclass object with attributes:
```python
vector.id: str
vector.document_id: Optional[str]
vector.dense_values: Optional[List[float]]
vector.sparse_indices: Optional[List[int]]
vector.sparse_values: Optional[List[float]]
vector.text: Optional[str]
```
- `get_by_document_id(document_id: str) -> List[Vector]`
- Returns a list of `Vector` objects as above.
- `exists(vector_id: str) -> bool`
- Returns `True` if the vector exists, else `False`.
### Versions
The Versions class provides methods for version management.
```python
current_version = collection.versions.get_current()
all_versions = collection.versions.list()
```
Methods:
- `list() -> dict`
- Returns:
```python
{
"versions": [
{
"version_number": int,
"vector_count": int
},
...
],
"current_version": int
}
```
- `get_current() -> Version`
- Returns a `Version` dataclass object with attributes:
```python
version.version_number: int
version.vector_count: int
```
- `get(version_number: int) -> Version`
- Same as above.
# Usage Examples
## Basic Usage
```python
from cosdata import Client
client = Client(host="http://localhost:8443", username="admin", password="admin")
collection = client.get_collection("my_collection")
```
## Get Collection Indexing Status
Get the current indexing status of a collection, including progress and statistics. Useful for monitoring background indexing operations.
```python
status = collection.indexing_status()
print("Indexing status:", status)
```
## List Loaded Collections
Retrieve a list of all collections currently loaded in memory. This is helpful for understanding which collections are ready for fast access.
```python
loaded = Collection.loaded(client)
print("Loaded collections:", loaded)
```
## Create Sparse Index
Create a sparse index for your collection to enable efficient sparse vector search. You can specify the index name and optional parameters.
```python
result = collection.create_sparse_index("my_sparse_index")
print("Sparse index creation result:", result)
```
## Hybrid Search
Perform a hybrid search that combines dense and sparse vector queries. This is useful for advanced retrieval scenarios where you want to leverage both types of features.
```python
hybrid_query = {
"dense_query": [0.1, 0.2, ...],
"sparse_query": [{"index": 1, "value": 0.5}],
"top_k": 5
}
results = collection.search.hybrid_search(hybrid_query)
print("Hybrid search results:", results)
```
## Batch TF-IDF Search
Run a batch of TF-IDF (text) searches in a single call. This is efficient for evaluating multiple queries at once.
```python
batch_queries = ["text query 1", "text query 2"]
results = collection.search.batch_tf_idf_search(batch_queries, top_k=3)
print("Batch TF-IDF results:", results)
```
## Fetch Vector Neighbors
Retrieve the nearest neighbors for a given vector ID in your collection. Useful for similarity search and recommendations.
```python
neighbors = collection.neighbors("vector_id")
print("Neighbors:", neighbors)
```
## Set Current Version
Set the current active version of a collection. This is important for versioned data management and switching between different dataset states.
```python
set_result = collection.set_version("version_id")
print("Set current version result:", set_result)
```
## Delete Vector via Streaming Endpoint
Delete a vector by its ID using the streaming endpoint. This is a fast way to remove vectors without managing explicit transactions.
```python
collection.stream_delete("vector_id")
print("Deleted vector via streaming endpoint")
```
Raw data
{
"_id": null,
"home_page": null,
"name": "cosdata-sdk",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "cosdata, database, embeddings, similarity-search, vector",
"author": null,
"author_email": "Cosdata <contact@cosdata.com>",
"download_url": "https://files.pythonhosted.org/packages/9c/0c/502fd77caf04167b87beece462a12ffffb98d6c70c36685b195474ca2adf/cosdata_sdk-0.2.4.tar.gz",
"platform": null,
"description": "# Cosdata Python SDK\n\nA Python SDK for interacting with the Cosdata Vector Database.\n\n## Installation\n\n```bash\npip install cosdata-sdk\n```\n\n## Quick Start\n\n```python\nfrom cosdata import Client # Import the Client class\n\n# Initialize the client (all parameters are optional)\nclient = Client(\n host=\"http://127.0.0.1:8443\", # Default host\n username=\"admin\", # Default username\n password=\"admin\", # Default password\n verify=False # SSL verification\n)\n\n# Create a collection\ncollection = client.create_collection(\n name=\"my_collection\",\n dimension=768, # Vector dimension\n description=\"My vector collection\"\n)\n\n# Create an index (all parameters are optional)\nindex = collection.create_index(\n distance_metric=\"cosine\", # Default: cosine\n num_layers=10, # Default: 10\n max_cache_size=1000, # Default: 1000\n ef_construction=128, # Default: 128\n ef_search=64, # Default: 64\n neighbors_count=32, # Default: 32\n level_0_neighbors_count=64 # Default: 64\n)\n\n# Generate some vectors (example with random data)\nimport numpy as np\n\ndef generate_random_vector(id: int, dimension: int) -> dict:\n values = np.random.uniform(-1, 1, dimension).tolist()\n return {\n \"id\": f\"vec_{id}\",\n \"dense_values\": values,\n \"document_id\": f\"doc_{id//10}\", # Group vectors into documents\n \"metadata\": { # Optional metadata\n \"created_at\": \"2024-03-20\",\n \"category\": \"example\"\n }\n }\n\n# Generate and insert vectors\nvectors = [generate_random_vector(i, 768) for i in range(100)]\n\n# Add vectors using a transaction\nwith collection.transaction() as txn:\n # Single vector upsert (creates or updates)\n txn.upsert_vector(vectors[0])\n # Single vector create (only for new vectors)\n txn.create_vector(vectors[1])\n # Batch upsert for remaining vectors\n txn.batch_upsert_vectors(vectors[2:], max_workers=8, max_retries=3)\n\n# Add vectors using streaming operations (immediate availability)\n# Single vector upsert - returns immediately with result\nresult = collection.stream_upsert(vectors[0])\nprint(f\"Stream upsert result: {result}\")\n\n# Multiple vectors upsert - returns immediately with result\nresult = collection.stream_upsert(vectors[1:])\nprint(f\"Stream batch upsert result: {result}\")\n\n# Delete vectors using streaming operations\nresult = collection.stream_delete(\"vector-1\")\nprint(f\"Stream delete result: {result}\")\n\n# Add vectors using streaming operations (immediate availability)\n# Single vector upsert - returns immediately with result\nresult = collection.stream_upsert(vectors[0])\nprint(f\"Stream upsert result: {result}\")\n\n# Multiple vectors upsert - returns immediately with result\nresult = collection.stream_upsert(vectors[1:])\nprint(f\"Stream batch upsert result: {result}\")\n\n# Delete vectors using streaming operations\nresult = collection.stream_delete(\"vector-1\")\nprint(f\"Stream delete result: {result}\")\n\n# Search for similar vectors\nresults = collection.search.dense(\n query_vector=vectors[0][\"dense_values\"], # Use first vector as query\n top_k=5, # Number of nearest neighbors\n return_raw_text=True\n)\n\n# Fetch a specific vector\nvector = collection.vectors.get(\"vec_1\")\n\n# Get collection information\ncollection_info = collection.get_info()\nprint(f\"Collection info: {collection_info}\")\n\n# List all collections\nprint(\"Available collections:\")\nfor coll in client.collections():\n print(f\" - {coll.name}\")\n\n# Version management\ncurrent_version = collection.versions.get_current()\nprint(f\"Current version: {current_version}\")\n```\n\n## \ud83e\udde9 Embedding Generation (Optional Convenience Feature)\n\nCosdata SDK provides a convenience utility for generating embeddings using [cosdata-fastembed](https://github.com/cosdata/cosdata-fastembed). This is optional\u2014if you already have your own embeddings, you can use those directly. If you want to generate embeddings in Python, you can use the following utility:\n\n```python\nfrom cosdata.embedding import embed_texts\n\ntexts = [\n \"Cosdata makes vector search easy!\",\n \"This is a test of the embedding utility.\"\n]\nembeddings = embed_texts(texts, model_name=\"thenlper/gte-base\") # Specify any supported model\n```\n\n- See the [cosdata-fastembed supported models list](https://github.com/cosdata/cosdata-fastembed#supported-models) for available model names and dimensions.\n- The output is a list of lists (one embedding per input text), ready to upsert into your collection.\n- If `cosdata-fastembed` is not installed, a helpful error will be raised.\n\n## Methods\n\n### embed_texts\n\n- `embed_texts(texts: List[str], model_name: str = \"BAAI/bge-small-en-v1.5\") -> List[List[float]]`\n - Generates embeddings for a list of texts using cosdata-fastembed. Returns a list of embedding vectors (as plain Python lists). Raises ImportError if cosdata-fastembed is not installed.\n\n Example:\n ```python\n from cosdata.embedding import embed_texts\n embeddings = embed_texts([\"hello world\"], model_name=\"thenlper/gte-base\")\n ```\n\n## API Reference\n\n### Client\n\nThe main client for interacting with the Vector Database API.\n\n```python\nclient = Client(\n host=\"http://127.0.0.1:8443\", # Optional\n username=\"admin\", # Optional\n password=\"admin\", # Optional\n verify=False # Optional\n)\n```\n\nMethods:\n- `create_collection(...) -> Collection`\n - Returns a `Collection` object. Collection info can be accessed via `collection.get_info()`:\n ```python\n {\n \"name\": str,\n \"description\": str,\n \"dense_vector\": {\"enabled\": bool, \"dimension\": int},\n \"sparse_vector\": {\"enabled\": bool},\n \"tf_idf_options\": {\"enabled\": bool}\n }\n ```\n- `collections() -> List[Collection]`\n - Returns a list of `Collection` objects.\n- `get_collection(name: str) -> Collection`\n - Returns a `Collection` object for the given name.\n- `list_collections() -> List[Dict[str, Any]]`\n - Returns a list of collection information dictionaries.\n- `indexes` - Access to client-level index management (see Indexes section below)\n\n### Collection\n\nThe Collection class provides access to all collection-specific operations.\n\n```python\ncollection = client.create_collection(\n name=\"my_collection\",\n dimension=768,\n description=\"My collection\"\n)\n```\n\nMethods:\n- `create_index(...) -> Index`\n - Returns an `Index` object. Index info can be fetched (if implemented) as:\n ```python\n {\n \"dense\": {...},\n \"sparse\": {...},\n \"tf-idf\": {...}\n }\n ```\n- `create_sparse_index(name: str) -> Index`\n - Creates a sparse index for the collection.\n- `create_tf_idf_index(name: str, sample_threshold: int = 1000, k1: float = 1.2, b: float = 0.75) -> Index`\n - Creates a TF-IDF index for the collection.\n- `get_index(name: str) -> Index`\n - Returns an `Index` object for the given name.\n- `get_info() -> dict`\n - Returns collection metadata as above.\n- `delete() -> None`\n - Deletes the collection.\n- `load() -> None`\n - Loads the collection into memory.\n- `unload() -> None`\n - Unloads the collection from memory.\n- `create_transaction() -> Transaction`\n - Creates a new transaction for this collection.\n- `transaction() -> Transaction` (context manager)\n - Creates a transaction with automatic commit/abort.\n- `stream_upsert(vectors: Union[Dict[str, Any], List[Dict[str, Any]]]) -> Dict[str, Any]`\n - Immediate vector upsert with immediate availability.\n- `stream_delete(vector_id: str) -> Dict[str, Any]`\n - Immediate vector deletion.\n- `neighbors(vector_id: str) -> Dict[str, Any]`\n - Fetch neighbors for a given vector ID.\n- `set_version(version: str) -> Dict[str, Any]`\n - Set the current version of the collection.\n- `indexing_status() -> Dict[str, Any]`\n - Get the indexing status of this collection.\n- `loaded(client) -> List[Dict[str, Any]]` (class method)\n - Get a list of all loaded collections.\n\n### Indexes\n\nIndex management is handled directly through the Collection object.\n\n```python\n# Create a dense index\ndense_index = collection.create_index(\n distance_metric=\"cosine\",\n num_layers=7\n)\n\n# Create a sparse index\nsparse_index = collection.create_sparse_index(\n name=\"my_sparse_index\",\n quantization=64\n)\n\n# Create a TF-IDF index\ntf_idf_index = collection.create_tf_idf_index(\n name=\"my_tf_idf_index\",\n sample_threshold=1000\n)\n\n# Get an existing index\nindex = collection.get_index(\"my_index\")\n\n# Delete an index\nindex.delete()\n```\n\nMethods:\n- `create_index(distance_metric: str = \"cosine\", num_layers: int = 7, max_cache_size: int = 1000, ef_construction: int = 512, ef_search: int = 256, neighbors_count: int = 32, level_0_neighbors_count: int = 64) -> Index`\n - Creates a dense vector index for the collection.\n- `create_sparse_index(name: str, quantization: int = 64, sample_threshold: int = 1000) -> Index`\n - Creates a sparse index for the collection.\n- `create_tf_idf_index(name: str, sample_threshold: int = 1000, k1: float = 1.2, b: float = 0.75) -> Index`\n - Creates a TF-IDF index for the collection.\n- `get_index(name: str) -> Index`\n - Get an existing index by name.\n\n### Index\n\nThe Index class represents an index in a collection.\n\n```python\nindex = collection.get_index(\"my_index\")\n```\n\nMethods:\n- `delete() -> None`\n - Deletes this index.\n\n### Transaction\n\nThe Transaction class provides methods for vector operations with clear semantics.\n\n```python\nwith collection.transaction() as txn:\n txn.upsert_vector(vector) # Single vector (create or update)\n txn.batch_upsert_vectors(vectors, max_workers=8, max_retries=3) # Multiple vectors, with parallelism and retries\n```\n\nMethods:\n- `upsert_vector(vector: Dict[str, Any]) -> None`\n - **Creates or updates** an existing vector. Use this when you want to ensure the vector exists regardless of whether it already does.\n- `delete_vector(vector_id: str) -> None`\n - Deletes a vector by ID in the transaction.\n- `batch_upsert_vectors(vectors: List[Dict[str, Any]], max_workers: Optional[int] = None, max_retries: int = 3) -> None`\n - `vectors`: List of vector dictionaries to upsert\n - `max_workers`: Number of threads to use for parallel upserts (default: all available CPU threads)\n - `max_retries`: Number of times to retry a failed batch (default: 3)\n- `commit() -> None`\n - Commits the transaction.\n- `abort() -> None`\n - Aborts the transaction.\n- `get_status() -> Dict[str, Any]`\n - Gets the status of the transaction.\n- `poll_completion(target_status: str = 'complete', max_attempts: int = 10, sleep_interval: float = 1.0) -> Tuple[str, bool]`\n - Polls transaction status until target status is reached or max attempts exceeded.\n\n### Transaction Status Polling\n\nThe Transaction class provides methods for monitoring transaction status and polling for completion.\n\n```python\n# Create a transaction\nwith collection.transaction() as txn:\n # Get current transaction status\n status = txn.get_status()\n print(f\"Transaction status: {status}\")\n \n # Upsert some vectors\n txn.upsert_vector(vector)\n \n # Poll for completion with custom parameters\n final_status, success = txn.poll_completion(\n target_status=\"complete\",\n max_attempts=20,\n sleep_interval=2.0\n )\n \n if success:\n print(f\"Transaction completed with status: {final_status}\")\n else:\n print(f\"Transaction may not have completed. Final status: {final_status}\")\n```\n\nMethods:\n- `get_status(collection_name: str = None, transaction_id: str = None) -> str`\n - Get the current status of this transaction (or another, if specified)\n - Returns transaction status as a string\n - Throws exceptions for API errors with descriptive messages\n - Parameters:\n - `collection_name`: Name of the collection (default: this transaction's collection)\n - `transaction_id`: ID of the transaction to check (default: this transaction's ID)\n- `poll_completion(target_status: str = 'complete', max_attempts: int = 10, sleep_interval: float = 1.0, collection_name: str = None, transaction_id: str = None) -> tuple[str, bool]`\n - Poll transaction status until target status is reached or max attempts exceeded\n - Returns tuple of `(final_status, success_boolean)`\n - Configurable polling parameters for different use cases\n - Provides real-time progress feedback via console output\n - Parameters:\n - `target_status`: Target status to wait for (default: 'complete')\n - `max_attempts`: Maximum number of polling attempts (default: 10)\n - `sleep_interval`: Time to sleep between attempts in seconds (default: 1.0)\n - `collection_name`: Name of the collection (default: this transaction's collection)\n - `transaction_id`: Transaction ID to poll (default: this transaction's ID)\n\n### Streaming Operations (Implicit Transactions)\n\nThe streaming operations provide immediate vector availability optimized for streaming scenarios. These methods use implicit transactions that prioritize data availability over batch processing efficiency.\n\n**Design Philosophy:**\n- **Optimized for streaming scenarios** where individual records must become immediately searchable\n- **Serves real-time monitoring systems, live content feeds, and streaming analytics**\n- **Prioritizes data availability over batch processing efficiency**\n- **Automatic transaction management** - no client-managed transaction boundaries\n- **System automatically handles batching and version allocation**\n- **Abstracts transactional complexity while preserving append-only semantics**\n\n```python\n# Single vector stream upsert - immediately available for search\nvector = {\n \"id\": \"vector-1\",\n \"document_id\": \"doc-123\",\n \"dense_values\": [0.1, 0.2, 0.3, 0.4, 0.5],\n \"metadata\": {\"category\": \"technology\"},\n \"text\": \"Sample text content\"\n}\nresult = collection.stream_upsert(vector)\nprint(f\"Vector immediately available: {result}\")\n\n# Multiple vectors stream upsert\nvectors = [vector1, vector2, vector3]\nresult = collection.stream_upsert(vectors)\nprint(f\"All vectors immediately available: {result}\")\n\n# Single vector stream delete\nresult = collection.stream_delete(\"vector-1\")\nprint(f\"Vector immediately deleted: {result}\")\n```\n\nMethods:\n- `stream_upsert(vectors: Union[Dict[str, Any], List[Dict[str, Any]]]) -> Dict[str, Any]`\n - Upsert vectors with immediate availability\n - Returns response data immediately\n - Accepts single vector dict or list of vector dicts\n- `stream_delete(vector_id: str) -> Dict[str, Any]`\n - Delete a vector with immediate effect\n - Returns response data immediately\n - Accepts single vector ID\n\n### Transaction Status Polling\n\nThe Transaction class provides methods for monitoring transaction status and polling for completion.\n\n```python\n# Create a transaction\nwith collection.transaction() as txn:\n # Get current transaction status\n status = txn.get_status()\n print(f\"Transaction status: {status}\")\n \n # Upsert some vectors\n txn.upsert_vector(vector)\n \n # Poll for completion with custom parameters\n final_status, success = txn.poll_completion(\n target_status=\"complete\",\n max_attempts=20,\n sleep_interval=2.0\n )\n \n if success:\n print(f\"Transaction completed with status: {final_status}\")\n else:\n print(f\"Transaction may not have completed. Final status: {final_status}\")\n```\n\nMethods:\n- `get_status(collection_name: str = None, transaction_id: str = None) -> str`\n - Get the current status of this transaction (or another, if specified)\n - Returns transaction status as a string\n - Throws exceptions for API errors with descriptive messages\n - Parameters:\n - `collection_name`: Name of the collection (default: this transaction's collection)\n - `transaction_id`: ID of the transaction to check (default: this transaction's ID)\n- `poll_completion(target_status: str = 'complete', max_attempts: int = 10, sleep_interval: float = 1.0, collection_name: str = None, transaction_id: str = None) -> tuple[str, bool]`\n - Poll transaction status until target status is reached or max attempts exceeded\n - Returns tuple of `(final_status, success_boolean)`\n - Configurable polling parameters for different use cases\n - Provides real-time progress feedback via console output\n - Parameters:\n - `target_status`: Target status to wait for (default: 'complete')\n - `max_attempts`: Maximum number of polling attempts (default: 10)\n - `sleep_interval`: Time to sleep between attempts in seconds (default: 1.0)\n - `collection_name`: Name of the collection (default: this transaction's collection)\n - `transaction_id`: Transaction ID to poll (default: this transaction's ID)\n\n### Streaming Operations (Implicit Transactions)\n\nThe streaming operations provide immediate vector availability optimized for streaming scenarios. These methods use implicit transactions that prioritize data availability over batch processing efficiency.\n\n**Design Philosophy:**\n- **Optimized for streaming scenarios** where individual records must become immediately searchable\n- **Serves real-time monitoring systems, live content feeds, and streaming analytics**\n- **Prioritizes data availability over batch processing efficiency**\n- **Automatic transaction management** - no client-managed transaction boundaries\n- **System automatically handles batching and version allocation**\n- **Abstracts transactional complexity while preserving append-only semantics**\n\n```python\n# Single vector stream upsert - immediately available for search\nvector = {\n \"id\": \"vector-1\",\n \"document_id\": \"doc-123\",\n \"dense_values\": [0.1, 0.2, 0.3, 0.4, 0.5],\n \"metadata\": {\"category\": \"technology\"},\n \"text\": \"Sample text content\"\n}\nresult = collection.stream_upsert(vector)\nprint(f\"Vector immediately available: {result}\")\n\n# Multiple vectors stream upsert\nvectors = [vector1, vector2, vector3]\nresult = collection.stream_upsert(vectors)\nprint(f\"All vectors immediately available: {result}\")\n\n# Single vector stream delete\nresult = collection.stream_delete(\"vector-1\")\nprint(f\"Vector immediately deleted: {result}\")\n```\n\nMethods:\n- `stream_upsert(vectors: Union[Dict[str, Any], List[Dict[str, Any]]]) -> Dict[str, Any]`\n - Upsert vectors with immediate availability\n - Returns response data immediately\n - Accepts single vector dict or list of vector dicts\n- `stream_delete(vector_id: str) -> Dict[str, Any]`\n - Delete a vector with immediate effect\n - Returns response data immediately\n - Accepts single vector ID\n\n### Search\n\nThe Search class provides methods for vector similarity search.\n\n```python\nresults = collection.search.dense(\n query_vector=vector,\n top_k=5,\n return_raw_text=True\n)\n```\n\nMethods:\n- `dense(query_vector: List[float], top_k: int = 5, return_raw_text: bool = False) -> dict`\n - Returns:\n ```python\n {\n \"results\": [\n {\n \"id\": str,\n \"document_id\": str,\n \"score\": float,\n \"text\": str | None\n },\n ...\n ]\n }\n ```\n- `batch_dense(queries: List[Dict[str, List[float]]], top_k: int = 5, return_raw_text: bool = False) -> List[dict]`\n - Batch dense vector search. Each query must contain a \"vector\" field.\n- `sparse(query_terms: List[dict], top_k: int = 5, early_terminate_threshold: float = 0.0, return_raw_text: bool = False) -> dict`\n - Same structure as above.\n- `batch_sparse(query_terms_list: List[List[dict]], top_k: int = 5, early_terminate_threshold: float = 0.0, return_raw_text: bool = False) -> List[dict]`\n - Batch sparse vector search.\n- `text(query_text: str, top_k: int = 5, return_raw_text: bool = False) -> dict`\n - Same structure as above.\n- `batch_text(query_texts: List[str], top_k: int = 5, return_raw_text: bool = False) -> List[dict]`\n - Batch text search.\n- `hybrid_search(queries: dict) -> dict`\n - Hybrid search combining dense and sparse queries.\n- `batch_tf_idf_search(queries: List[str], top_k: int = 10, return_raw_text: bool = False) -> List[dict]`\n - Batch TF-IDF search.\n\n### Vectors\n\nThe Vectors class provides methods for vector operations.\n\n```python\nvector = collection.vectors.get(\"vec_1\")\nexists = collection.vectors.exists(\"vec_1\")\n```\n\nMethods:\n- `get(vector_id: str) -> Vector`\n - Returns a `Vector` dataclass object with attributes:\n ```python\n vector.id: str\n vector.document_id: Optional[str]\n vector.dense_values: Optional[List[float]]\n vector.sparse_indices: Optional[List[int]]\n vector.sparse_values: Optional[List[float]]\n vector.text: Optional[str]\n ```\n- `get_by_document_id(document_id: str) -> List[Vector]`\n - Returns a list of `Vector` objects as above.\n- `exists(vector_id: str) -> bool`\n - Returns `True` if the vector exists, else `False`.\n\n### Versions\n\nThe Versions class provides methods for version management.\n\n```python\ncurrent_version = collection.versions.get_current()\nall_versions = collection.versions.list()\n```\n\nMethods:\n- `list() -> dict`\n - Returns:\n ```python\n {\n \"versions\": [\n {\n \"version_number\": int,\n \"vector_count\": int\n },\n ...\n ],\n \"current_version\": int\n }\n ```\n- `get_current() -> Version`\n - Returns a `Version` dataclass object with attributes:\n ```python\n version.version_number: int\n version.vector_count: int\n ```\n- `get(version_number: int) -> Version`\n - Same as above.\n\n# Usage Examples\n\n## Basic Usage\n\n```python\nfrom cosdata import Client\nclient = Client(host=\"http://localhost:8443\", username=\"admin\", password=\"admin\")\ncollection = client.get_collection(\"my_collection\")\n```\n\n## Get Collection Indexing Status\nGet the current indexing status of a collection, including progress and statistics. Useful for monitoring background indexing operations.\n```python\nstatus = collection.indexing_status()\nprint(\"Indexing status:\", status)\n```\n\n## List Loaded Collections\nRetrieve a list of all collections currently loaded in memory. This is helpful for understanding which collections are ready for fast access.\n```python\nloaded = Collection.loaded(client)\nprint(\"Loaded collections:\", loaded)\n```\n\n## Create Sparse Index\nCreate a sparse index for your collection to enable efficient sparse vector search. You can specify the index name and optional parameters.\n```python\nresult = collection.create_sparse_index(\"my_sparse_index\")\nprint(\"Sparse index creation result:\", result)\n```\n\n## Hybrid Search\nPerform a hybrid search that combines dense and sparse vector queries. This is useful for advanced retrieval scenarios where you want to leverage both types of features.\n```python\nhybrid_query = {\n \"dense_query\": [0.1, 0.2, ...],\n \"sparse_query\": [{\"index\": 1, \"value\": 0.5}],\n \"top_k\": 5\n}\nresults = collection.search.hybrid_search(hybrid_query)\nprint(\"Hybrid search results:\", results)\n```\n\n## Batch TF-IDF Search\nRun a batch of TF-IDF (text) searches in a single call. This is efficient for evaluating multiple queries at once.\n```python\nbatch_queries = [\"text query 1\", \"text query 2\"]\nresults = collection.search.batch_tf_idf_search(batch_queries, top_k=3)\nprint(\"Batch TF-IDF results:\", results)\n```\n\n## Fetch Vector Neighbors\nRetrieve the nearest neighbors for a given vector ID in your collection. Useful for similarity search and recommendations.\n```python\nneighbors = collection.neighbors(\"vector_id\")\nprint(\"Neighbors:\", neighbors)\n```\n\n## Set Current Version\nSet the current active version of a collection. This is important for versioned data management and switching between different dataset states.\n```python\nset_result = collection.set_version(\"version_id\")\nprint(\"Set current version result:\", set_result)\n```\n\n## Delete Vector via Streaming Endpoint\nDelete a vector by its ID using the streaming endpoint. This is a fast way to remove vectors without managing explicit transactions.\n```python\ncollection.stream_delete(\"vector_id\")\nprint(\"Deleted vector via streaming endpoint\")\n```\n",
"bugtrack_url": null,
"license": null,
"summary": "A Python SDK for interacting with the Cosdata Vector Database",
"version": "0.2.4",
"project_urls": {
"Homepage": "https://github.com/cosdata/cosdata-sdk-python",
"Issues": "https://github.com/cosdata/cosdata-sdk-python/issues",
"Repository": "https://github.com/cosdata/cosdata-sdk-python.git"
},
"split_keywords": [
"cosdata",
" database",
" embeddings",
" similarity-search",
" vector"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "33321246ffc4a7968973e9eea9ca9785ac580efe6f8abea6af93e92f88fddc3f",
"md5": "8f5f45872cfd5d3e348cbe7666120f7c",
"sha256": "2f6bb2c0a405c79dc4417eb210b40d6ab3f7547ae711a69906453b9671223f69"
},
"downloads": -1,
"filename": "cosdata_sdk-0.2.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "8f5f45872cfd5d3e348cbe7666120f7c",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 36886,
"upload_time": "2025-07-23T06:24:07",
"upload_time_iso_8601": "2025-07-23T06:24:07.596183Z",
"url": "https://files.pythonhosted.org/packages/33/32/1246ffc4a7968973e9eea9ca9785ac580efe6f8abea6af93e92f88fddc3f/cosdata_sdk-0.2.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "9c0c502fd77caf04167b87beece462a12ffffb98d6c70c36685b195474ca2adf",
"md5": "0f0990566b2d8303f0f331a4aeaf403f",
"sha256": "ce27200685ef80db06b53885ac5cc50c4a41ae550146467e326f20f64f6857f7"
},
"downloads": -1,
"filename": "cosdata_sdk-0.2.4.tar.gz",
"has_sig": false,
"md5_digest": "0f0990566b2d8303f0f331a4aeaf403f",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 18685,
"upload_time": "2025-07-23T06:24:09",
"upload_time_iso_8601": "2025-07-23T06:24:09.287464Z",
"url": "https://files.pythonhosted.org/packages/9c/0c/502fd77caf04167b87beece462a12ffffb98d6c70c36685b195474ca2adf/cosdata_sdk-0.2.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-23 06:24:09",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "cosdata",
"github_project": "cosdata-sdk-python",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "cosdata-sdk"
}