# AI Prishtina Milvus Client
<div align="center">

**A comprehensive, production-ready Python client library for Milvus vector database operations**
[](https://python.org)
[](https://badge.fury.io/py/ai-prishtina-milvus-client)
[](https://pepy.tech/project/ai-prishtina-milvus-client)
[](https://pepy.tech/project/ai-prishtina-milvus-client)
[](LICENSE)
[](https://github.com/ai-prishtina/milvus-client)
[](https://codecov.io)
[](https://docs.ai-prishtina.com)
*Designed for AI and machine learning applications requiring high-performance vector operations*
</div>
---
## โ Support This Project
If you find this project helpful, please consider supporting it:
[](https://coff.ee/albanmaxhuni)
---
## ๐ **Features**
### **Core Capabilities**
- ๐ **Async/Await Support** - High-performance asynchronous operations
- ๐๏ธ **Comprehensive Vector Operations** - Full CRUD operations with advanced querying
- ๐ **Enterprise Security** - Built-in authentication, encryption, and access control
- โ๏ธ **Multi-Cloud Storage** - AWS S3, Google Cloud Storage, Azure Blob integration
- ๐ **Real-time Streaming** - Kafka integration for live data processing
- ๐ **Monitoring & Metrics** - Prometheus integration with custom metrics
- ๐ฏ **Multi-modal Support** - Text, image, audio, and custom embeddings
- โก **Batch Processing** - Optimized for large-scale operations
- ๐ก๏ธ **Robust Error Handling** - Automatic retries and graceful degradation
### **Advanced Features**
- ๐ **Intelligent Indexing** - Support for IVF, HNSW, ANNOY, and custom indexes
- ๐ **Distributed Processing** - Horizontal scaling and load balancing
- ๐พ **Smart Caching** - Redis integration for performance optimization
- ๐ **Data Synchronization** - Cross-service data consistency
- ๐ฆ **Backup & Recovery** - Automated disaster recovery procedures
- ๐๏ธ **Configuration Management** - Environment-based configuration
- ๐ **Comprehensive Logging** - Structured logging with multiple outputs
---
## ๐ฆ **Installation**
### **Requirements**
- Python 3.9 or higher
- Milvus 2.3.0 or higher
### **Install from PyPI**
```bash
pip install ai-prishtina-milvus-client
```
### **Install from Source**
```bash
git clone https://github.com/ai-prishtina/milvus-client.git
cd milvus-client
pip install -e .
```
### **Development Installation**
```bash
git clone https://github.com/ai-prishtina/milvus-client.git
cd milvus-client
pip install -e ".[dev]"
```
---
## โก **Quick Start**
### **Basic Usage**
```python
import asyncio
import numpy as np
from ai_prishtina_milvus_client import AsyncMilvusClient
async def main():
# Initialize client with configuration
config = {
"host": "localhost",
"port": 19530,
"secure": False,
"timeout": 30
}
client = AsyncMilvusClient(config)
try:
# Connect to Milvus
await client.connect()
print("โ
Connected to Milvus")
# Create a collection for image embeddings
collection_name = "image_embeddings"
await client.create_collection(
collection_name=collection_name,
dimension=512, # Common for image embeddings
index_type="HNSW",
metric_type="COSINE",
description="Image embeddings collection"
)
print(f"โ
Created collection: {collection_name}")
# Generate sample embeddings
num_vectors = 1000
vectors = np.random.rand(num_vectors, 512).tolist()
metadata = [
{
"image_id": f"img_{i:06d}",
"category": np.random.choice(["person", "animal", "object"]),
"confidence": np.random.uniform(0.8, 1.0),
"timestamp": "2024-01-01T00:00:00Z"
}
for i in range(num_vectors)
]
# Insert vectors with metadata
ids = await client.insert(
collection_name=collection_name,
vectors=vectors,
metadata=metadata
)
print(f"โ
Inserted {len(ids)} vectors")
# Wait for indexing to complete
await asyncio.sleep(2)
# Perform similarity search
query_vector = np.random.rand(512).tolist()
results = await client.search(
collection_name=collection_name,
query_vectors=[query_vector],
top_k=5,
search_params={"ef": 64} # HNSW parameter
)
print(f"๐ Found {len(results[0])} similar images:")
for i, result in enumerate(results[0]):
metadata = result.get("metadata", {})
print(f" {i+1}. ID: {result['id']}, "
f"Distance: {result['distance']:.4f}, "
f"Category: {metadata.get('category', 'unknown')}")
finally:
await client.disconnect()
print("โ
Disconnected from Milvus")
if __name__ == "__main__":
asyncio.run(main())
```
---
## ๐ **Comprehensive Examples**
### **1. Text Embeddings with Transformers**
```python
import asyncio
from sentence_transformers import SentenceTransformer
from ai_prishtina_milvus_client import AsyncMilvusClient
async def text_similarity_example():
"""Example: Text similarity search using sentence transformers."""
# Initialize sentence transformer
model = SentenceTransformer('all-MiniLM-L6-v2')
# Sample documents
documents = [
"The quick brown fox jumps over the lazy dog",
"Machine learning is a subset of artificial intelligence",
"Python is a popular programming language for data science",
"Vector databases enable efficient similarity search",
"Natural language processing helps computers understand text"
]
# Generate embeddings
embeddings = model.encode(documents).tolist()
# Initialize Milvus client
client = AsyncMilvusClient({
"host": "localhost",
"port": 19530
})
try:
await client.connect()
# Create collection for text embeddings
collection_name = "text_embeddings"
await client.create_collection(
collection_name=collection_name,
dimension=384, # all-MiniLM-L6-v2 dimension
index_type="IVF_FLAT",
metric_type="COSINE"
)
# Insert documents with embeddings
metadata = [
{
"text": doc,
"length": len(doc),
"word_count": len(doc.split())
}
for doc in documents
]
ids = await client.insert(
collection_name=collection_name,
vectors=embeddings,
metadata=metadata
)
# Search for similar documents
query = "What is machine learning?"
query_embedding = model.encode([query]).tolist()
results = await client.search(
collection_name=collection_name,
query_vectors=query_embedding,
top_k=3,
output_fields=["text", "word_count"]
)
print(f"Query: {query}")
print("Similar documents:")
for result in results[0]:
print(f" - {result['metadata']['text']}")
print(f" Similarity: {1 - result['distance']:.4f}")
finally:
await client.disconnect()
asyncio.run(text_similarity_example())
```
### **2. Batch Processing with Progress Tracking**
```python
import asyncio
import numpy as np
from typing import List, Dict, Any
from ai_prishtina_milvus_client import AsyncMilvusClient
async def batch_processing_example():
"""Example: Large-scale batch processing with progress tracking."""
client = AsyncMilvusClient({
"host": "localhost",
"port": 19530
})
try:
await client.connect()
# Create collection for large dataset
collection_name = "large_dataset"
await client.create_collection(
collection_name=collection_name,
dimension=256,
index_type="IVF_SQ8",
metric_type="L2"
)
# Process data in batches
total_vectors = 100000
batch_size = 1000
print(f"Processing {total_vectors} vectors in batches of {batch_size}")
for batch_idx in range(0, total_vectors, batch_size):
end_idx = min(batch_idx + batch_size, total_vectors)
current_batch_size = end_idx - batch_idx
# Generate batch data
vectors = np.random.rand(current_batch_size, 256).tolist()
metadata = [
{
"batch_id": batch_idx // batch_size,
"item_id": batch_idx + i,
"category": f"cat_{(batch_idx + i) % 10}",
"score": np.random.uniform(0, 1)
}
for i in range(current_batch_size)
]
# Insert batch
ids = await client.insert(
collection_name=collection_name,
vectors=vectors,
metadata=metadata
)
# Progress tracking
progress = (end_idx / total_vectors) * 100
print(f"Progress: {progress:.1f}% - Inserted batch {batch_idx//batch_size + 1}")
# Optional: Add delay to prevent overwhelming the system
await asyncio.sleep(0.1)
print("โ
Batch processing completed")
# Verify total count
count = await client.count(collection_name)
print(f"Total vectors in collection: {count}")
finally:
await client.disconnect()
asyncio.run(batch_processing_example())
```
### **3. Multi-modal Search with Metadata Filtering**
```python
import asyncio
import numpy as np
from ai_prishtina_milvus_client import AsyncMilvusClient
async def multimodal_search_example():
"""Example: Multi-modal search with complex metadata filtering."""
client = AsyncMilvusClient({
"host": "localhost",
"port": 19530
})
try:
await client.connect()
# Create collection for multi-modal data
collection_name = "multimodal_content"
await client.create_collection(
collection_name=collection_name,
dimension=768, # Common for multi-modal embeddings
index_type="HNSW",
metric_type="COSINE"
)
# Insert multi-modal content
content_types = ["text", "image", "audio", "video"]
categories = ["education", "entertainment", "news", "sports"]
vectors = []
metadata = []
for i in range(1000):
vectors.append(np.random.rand(768).tolist())
metadata.append({
"content_id": f"content_{i:06d}",
"content_type": np.random.choice(content_types),
"category": np.random.choice(categories),
"duration": np.random.randint(10, 3600), # seconds
"quality": np.random.choice(["HD", "4K", "SD"]),
"language": np.random.choice(["en", "es", "fr", "de"]),
"upload_date": f"2024-{np.random.randint(1,13):02d}-{np.random.randint(1,29):02d}",
"views": np.random.randint(100, 1000000),
"rating": np.random.uniform(1.0, 5.0)
})
ids = await client.insert(
collection_name=collection_name,
vectors=vectors,
metadata=metadata
)
# Wait for indexing
await asyncio.sleep(3)
# Complex search with metadata filtering
query_vector = np.random.rand(768).tolist()
# Search for high-quality educational videos in English
results = await client.search(
collection_name=collection_name,
query_vectors=[query_vector],
top_k=10,
filter_expression='content_type == "video" and category == "education" and language == "en" and quality in ["HD", "4K"] and rating > 4.0',
output_fields=["content_id", "content_type", "category", "quality", "rating"]
)
print("๐ฏ Search Results: High-quality educational videos in English")
for i, result in enumerate(results[0]):
meta = result["metadata"]
print(f" {i+1}. {meta['content_id']} - {meta['quality']} - Rating: {meta['rating']:.2f}")
# Aggregate search - find popular content by category
categories_stats = {}
for category in categories:
cat_results = await client.search(
collection_name=collection_name,
query_vectors=[query_vector],
top_k=100,
filter_expression=f'category == "{category}"',
output_fields=["views", "rating"]
)
if cat_results[0]:
avg_views = np.mean([r["metadata"]["views"] for r in cat_results[0]])
avg_rating = np.mean([r["metadata"]["rating"] for r in cat_results[0]])
categories_stats[category] = {
"avg_views": avg_views,
"avg_rating": avg_rating,
"count": len(cat_results[0])
}
print("\n๐ Category Statistics:")
for category, stats in categories_stats.items():
print(f" {category.title()}: {stats['count']} items, "
f"Avg Views: {stats['avg_views']:.0f}, "
f"Avg Rating: {stats['avg_rating']:.2f}")
finally:
await client.disconnect()
asyncio.run(multimodal_search_example())
```
### **4. Real-time Streaming with Kafka Integration**
```python
import asyncio
import json
import numpy as np
from ai_prishtina_milvus_client import AsyncMilvusClient
from ai_prishtina_milvus_client.streaming import KafkaStreamProcessor, StreamMessage
async def streaming_example():
"""Example: Real-time vector streaming with Kafka."""
# Configuration
milvus_config = {
"host": "localhost",
"port": 19530
}
kafka_config = {
"bootstrap_servers": ["localhost:9092"],
"topic": "vector_stream",
"group_id": "milvus_consumer"
}
# Initialize stream processor
stream_processor = KafkaStreamProcessor(
milvus_config=milvus_config,
stream_config=kafka_config
)
try:
# Setup collection for streaming data
client = AsyncMilvusClient(milvus_config)
await client.connect()
collection_name = "streaming_vectors"
await client.create_collection(
collection_name=collection_name,
dimension=128,
index_type="IVF_FLAT",
metric_type="L2"
)
# Producer: Send vectors to Kafka
async def produce_vectors():
for i in range(100):
# Simulate real-time data
vector = np.random.rand(128).tolist()
metadata = {
"timestamp": "2024-01-01T00:00:00Z",
"source": "sensor_data",
"device_id": f"device_{i % 10}",
"batch_id": i // 10
}
message = StreamMessage(
vectors=[vector],
metadata=[metadata],
operation="insert",
collection=collection_name
)
await stream_processor.produce_message("vector_stream", message)
print(f"๐ค Sent vector {i+1}/100")
# Simulate real-time delay
await asyncio.sleep(0.1)
# Consumer: Process vectors from Kafka
async def consume_vectors():
async for message in stream_processor.consume_messages("vector_stream"):
try:
# Process the stream message
result = await stream_processor.process_message(message)
print(f"๐ฅ Processed {len(result)} vectors")
except Exception as e:
print(f"โ Error processing message: {e}")
# Run producer and consumer concurrently
await asyncio.gather(
produce_vectors(),
consume_vectors()
)
finally:
await stream_processor.close()
await client.disconnect()
# Note: This example requires Kafka to be running
# asyncio.run(streaming_example())
```
### **5. Advanced Security and Monitoring**
```python
import asyncio
from ai_prishtina_milvus_client import AsyncMilvusClient
from ai_prishtina_milvus_client.security import SecurityManager
from ai_prishtina_milvus_client.monitoring import MetricsCollector
async def security_monitoring_example():
"""Example: Advanced security and monitoring features."""
# Initialize security manager
security_config = {
"encryption_key": "your-encryption-key",
"enable_rbac": True,
"token_expiry": 3600
}
security_manager = SecurityManager(config=security_config)
# Initialize metrics collector
metrics_config = {
"prometheus_gateway": "localhost:9091",
"job_name": "milvus_client",
"enable_system_metrics": True
}
metrics_collector = MetricsCollector(config=metrics_config)
# Initialize client with security and monitoring
client = AsyncMilvusClient({
"host": "localhost",
"port": 19530,
"security_manager": security_manager,
"metrics_collector": metrics_collector
})
try:
# Create user and authenticate
await security_manager.create_user(
username="data_scientist",
password="secure_password",
roles=["read", "write"]
)
auth_token = await security_manager.authenticate(
"data_scientist",
"secure_password"
)
# Connect with authentication
await client.connect(auth_token=auth_token)
# Create collection with encryption
collection_name = "secure_collection"
await client.create_collection(
collection_name=collection_name,
dimension=256,
index_type="HNSW",
metric_type="COSINE",
enable_encryption=True
)
# Insert data with automatic metrics collection
vectors = [np.random.rand(256).tolist() for _ in range(1000)]
metadata = [
{
"user_id": security_manager.encrypt_data(f"user_{i}"),
"sensitive_data": security_manager.encrypt_data(f"data_{i}"),
"public_info": f"info_{i}"
}
for i in range(1000)
]
# Metrics are automatically collected during operations
with metrics_collector.timer("insert_operation"):
ids = await client.insert(
collection_name=collection_name,
vectors=vectors,
metadata=metadata
)
# Search with metrics
query_vector = np.random.rand(256).tolist()
with metrics_collector.timer("search_operation"):
results = await client.search(
collection_name=collection_name,
query_vectors=[query_vector],
top_k=10
)
# Decrypt sensitive data from results
for result in results[0]:
meta = result["metadata"]
if "user_id" in meta:
decrypted_user = security_manager.decrypt_data(meta["user_id"])
print(f"User: {decrypted_user}, Distance: {result['distance']:.4f}")
# Export metrics
await metrics_collector.push_metrics()
# Get performance statistics
stats = metrics_collector.get_stats()
print(f"\n๐ Performance Stats:")
print(f" Insert operations: {stats.get('insert_count', 0)}")
print(f" Search operations: {stats.get('search_count', 0)}")
print(f" Average insert time: {stats.get('avg_insert_time', 0):.3f}s")
print(f" Average search time: {stats.get('avg_search_time', 0):.3f}s")
finally:
await client.disconnect()
asyncio.run(security_monitoring_example())
```
---
## ๐ง **Configuration**
### **Basic Configuration**
```python
from ai_prishtina_milvus_client import AsyncMilvusClient
# Simple configuration
config = {
"host": "localhost",
"port": 19530,
"secure": False,
"timeout": 30
}
client = AsyncMilvusClient(config)
```
### **Advanced Configuration**
```python
from ai_prishtina_milvus_client import AsyncMilvusClient, MilvusConfig
# Advanced configuration with all options
config = MilvusConfig(
# Connection settings
host="localhost",
port=19530,
secure=True,
timeout=60,
# Authentication
username="admin",
password="password",
token="auth_token",
# Performance settings
pool_size=10,
max_retries=3,
retry_delay=1.0,
# Validation settings
validate_vectors=True,
normalize_vectors=True,
max_vector_dimension=2048,
# Monitoring settings
enable_metrics=True,
metrics_port=8080,
# Logging settings
log_level="INFO",
log_file="milvus_client.log"
)
client = AsyncMilvusClient(config)
```
### **Environment Variables**
```bash
# Set environment variables
export MILVUS_HOST=localhost
export MILVUS_PORT=19530
export MILVUS_USERNAME=admin
export MILVUS_PASSWORD=password
export MILVUS_SECURE=true
export MILVUS_TIMEOUT=60
```
```python
# Load from environment
from ai_prishtina_milvus_client import AsyncMilvusClient
# Automatically loads from environment variables
client = AsyncMilvusClient.from_env()
```
---
## ๐ **API Reference**
### **AsyncMilvusClient**
#### **Connection Management**
```python
# Connect to Milvus
await client.connect(auth_token=None)
# Disconnect from Milvus
await client.disconnect()
# Check connection status
is_connected = await client.is_connected()
# Get server info
info = await client.get_server_info()
```
#### **Collection Operations**
```python
# Create collection
await client.create_collection(
collection_name: str,
dimension: int,
index_type: str = "IVF_FLAT",
metric_type: str = "L2",
description: str = None,
enable_encryption: bool = False
)
# List collections
collections = await client.list_collections()
# Check if collection exists
exists = await client.has_collection(collection_name)
# Get collection info
info = await client.describe_collection(collection_name)
# Drop collection
await client.drop_collection(collection_name)
# Get collection statistics
stats = await client.get_collection_stats(collection_name)
```
#### **Vector Operations**
```python
# Insert vectors
ids = await client.insert(
collection_name: str,
vectors: List[List[float]],
metadata: List[Dict] = None,
partition_name: str = None
)
# Search vectors
results = await client.search(
collection_name: str,
query_vectors: List[List[float]],
top_k: int = 10,
search_params: Dict = None,
filter_expression: str = None,
output_fields: List[str] = None,
partition_names: List[str] = None
)
# Get vectors by IDs
vectors = await client.get(
collection_name: str,
ids: List[int],
output_fields: List[str] = None
)
# Delete vectors
await client.delete(
collection_name: str,
filter_expression: str
)
# Count vectors
count = await client.count(
collection_name: str,
filter_expression: str = None
)
```
#### **Index Operations**
```python
# Create index
await client.create_index(
collection_name: str,
field_name: str = "vector",
index_params: Dict = None
)
# Drop index
await client.drop_index(
collection_name: str,
field_name: str = "vector"
)
# Get index info
index_info = await client.describe_index(
collection_name: str,
field_name: str = "vector"
)
```
#### **Partition Operations**
```python
# Create partition
await client.create_partition(
collection_name: str,
partition_name: str
)
# List partitions
partitions = await client.list_partitions(collection_name)
# Drop partition
await client.drop_partition(
collection_name: str,
partition_name: str
)
```
### **Streaming Operations**
```python
from ai_prishtina_milvus_client.streaming import KafkaStreamProcessor
# Initialize stream processor
processor = KafkaStreamProcessor(
milvus_config=milvus_config,
stream_config=kafka_config
)
# Produce message
await processor.produce_message(topic, message)
# Consume messages
async for message in processor.consume_messages(topic):
result = await processor.process_message(message)
```
### **Security Operations**
```python
from ai_prishtina_milvus_client.security import SecurityManager
# Initialize security manager
security = SecurityManager(config=security_config)
# User management
await security.create_user(username, password, roles)
await security.delete_user(username)
await security.update_user_roles(username, roles)
# Authentication
token = await security.authenticate(username, password)
await security.validate_token(token)
# Data encryption
encrypted = security.encrypt_data(data)
decrypted = security.decrypt_data(encrypted)
```
### **Monitoring Operations**
```python
from ai_prishtina_milvus_client.monitoring import MetricsCollector
# Initialize metrics collector
metrics = MetricsCollector(config=metrics_config)
# Collect metrics
with metrics.timer("operation_name"):
# Your operation here
pass
# Custom metrics
metrics.increment_counter("custom_counter")
metrics.set_gauge("custom_gauge", value)
metrics.record_histogram("custom_histogram", value)
# Export metrics
await metrics.push_metrics()
stats = metrics.get_stats()
```
---
## ๐ฏ **Best Practices**
### **Performance Optimization**
```python
# 1. Use batch operations for large datasets
batch_size = 1000
for i in range(0, len(vectors), batch_size):
batch_vectors = vectors[i:i+batch_size]
batch_metadata = metadata[i:i+batch_size]
await client.insert(collection_name, batch_vectors, batch_metadata)
# 2. Choose appropriate index types
# - IVF_FLAT: Good balance of speed and accuracy
# - HNSW: Best for high-accuracy searches
# - IVF_SQ8: Memory-efficient for large datasets
# 3. Use connection pooling for high concurrency
config = {
"pool_size": 20, # Adjust based on your needs
"max_retries": 3
}
# 4. Implement proper error handling
try:
results = await client.search(collection_name, query_vectors, top_k=10)
except MilvusException as e:
logger.error(f"Milvus operation failed: {e}")
# Implement fallback logic
```
### **Memory Management**
```python
# 1. Process large datasets in chunks
async def process_large_dataset(vectors, chunk_size=10000):
for i in range(0, len(vectors), chunk_size):
chunk = vectors[i:i+chunk_size]
await client.insert(collection_name, chunk)
# Allow garbage collection
del chunk
# 2. Use generators for streaming data
async def vector_generator():
for data in large_dataset:
yield process_data(data)
# 3. Clean up resources
async with AsyncMilvusClient(config) as client:
# Operations here
pass # Automatic cleanup
```
### **Security Best Practices**
```python
# 1. Use environment variables for sensitive data
import os
config = {
"host": os.getenv("MILVUS_HOST"),
"username": os.getenv("MILVUS_USERNAME"),
"password": os.getenv("MILVUS_PASSWORD")
}
# 2. Enable encryption for sensitive collections
await client.create_collection(
collection_name="sensitive_data",
dimension=768,
enable_encryption=True
)
# 3. Implement proper access control
security_manager = SecurityManager(config={
"enable_rbac": True,
"token_expiry": 3600
})
# 4. Validate input data
from ai_prishtina_milvus_client.validation import VectorValidator
validator = VectorValidator(dimension=768, normalize=True)
validated_vectors = validator.validate(vectors)
```
---
## ๐ **Troubleshooting**
### **Common Issues**
#### **Connection Problems**
```python
# Issue: Connection timeout
# Solution: Increase timeout and check network
config = {
"host": "localhost",
"port": 19530,
"timeout": 60 # Increase timeout
}
# Issue: Authentication failed
# Solution: Check credentials and permissions
try:
await client.connect()
except AuthenticationError:
# Check username/password
# Verify user permissions
```
#### **Performance Issues**
```python
# Issue: Slow search performance
# Solution: Optimize index parameters
index_params = {
"index_type": "HNSW",
"params": {
"M": 16, # Increase for better recall
"efConstruction": 200 # Increase for better quality
}
}
# Issue: High memory usage
# Solution: Use memory-efficient index
index_params = {
"index_type": "IVF_SQ8", # Memory-efficient
"params": {"nlist": 1024}
}
```
#### **Data Issues**
```python
# Issue: Vector dimension mismatch
# Solution: Validate dimensions before insert
if len(vector) != expected_dimension:
raise ValueError(f"Expected dimension {expected_dimension}, got {len(vector)}")
# Issue: Invalid metadata
# Solution: Validate metadata schema
from pydantic import BaseModel
class VectorMetadata(BaseModel):
id: str
category: str
timestamp: str
validated_metadata = [VectorMetadata(**meta) for meta in metadata]
```
### **Debugging**
```python
# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Use client debugging features
client = AsyncMilvusClient(config, debug=True)
# Monitor operations
from ai_prishtina_milvus_client.monitoring import MetricsCollector
metrics = MetricsCollector(config={"enable_debug": True})
```
---
## ๐งช **Testing**
### **Unit Tests**
```bash
# Run unit tests
pytest tests/unit/ -v
# Run with coverage
pytest tests/unit/ --cov=ai_prishtina_milvus_client --cov-report=html
```
### **Integration Tests**
```bash
# Start Docker services
docker-compose up -d
# Run integration tests
pytest tests/integration/ -v
# Run specific test categories
pytest tests/integration/ -m "not slow"
```
### **Performance Tests**
```bash
# Run performance benchmarks
python tests/performance/benchmark.py
# Run load tests
python tests/performance/load_test.py --vectors=100000 --concurrent=10
```
---
## ๐ **Documentation**
### **API Documentation**
- [Complete API Reference](https://docs.ai-prishtina.com/milvus-client/api/)
- [Configuration Guide](https://docs.ai-prishtina.com/milvus-client/config/)
- [Examples Repository](https://github.com/ai-prishtina/milvus-client-examples)
### **Tutorials**
- [Getting Started Guide](https://docs.ai-prishtina.com/milvus-client/tutorials/getting-started/)
- [Advanced Features](https://docs.ai-prishtina.com/milvus-client/tutorials/advanced/)
- [Production Deployment](https://docs.ai-prishtina.com/milvus-client/tutorials/production/)
### **Community**
- [GitHub Discussions](https://github.com/ai-prishtina/milvus-client/discussions)
- [Stack Overflow](https://stackoverflow.com/questions/tagged/ai-prishtina-milvus)
- [Discord Community](https://discord.gg/ai-prishtina)
---
## ๐ค **Contributing**
We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.
### **Development Setup**
```bash
# Clone repository
git clone https://github.com/ai-prishtina/milvus-client.git
cd milvus-client
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install development dependencies
pip install -e ".[dev]"
# Install pre-commit hooks
pre-commit install
# Run tests
pytest
```
### **Contribution Guidelines**
- Follow [PEP 8](https://pep8.org/) style guidelines
- Add tests for new features
- Update documentation for API changes
- Use meaningful commit messages
- Create pull requests against the `develop` branch
---
## ๐ **License**
This project is dual-licensed:
### **Open Source License (AGPL-3.0)**
For open-source projects, research, and educational use, this software is licensed under the GNU Affero General Public License v3.0 (AGPL-3.0). This means:
- โ
Free to use for open-source projects
- โ
Must open-source your entire application if you distribute it
- โ
Perfect for research and educational purposes
- โ
Community-driven development
### **Commercial License**
For commercial use, proprietary applications, or when you cannot comply with AGPL-3.0 requirements:
- ๐ข **Commercial applications** and SaaS products
- ๐ **Proprietary software** without open-source requirements
- ๐ **Enterprise support** and custom development
- ๐ **Priority support** and SLA guarantees
**Need a commercial license?** Contact: [alban.q.maxhuni@gmail.com](mailto:alban.q.maxhuni@gmail.com)
See the [LICENSE](LICENSE) file for complete details.
---
## ๐จโ๐ป **Author**
**Alban Maxhuni, PhD**
Email: [alban.q.maxhuni@gmail.com](mailto:alban.q.maxhuni@gmail.com) | [info@albanmaxhuni.com](mailto:info@albanmaxhuni.com)
---
## ๐ **Acknowledgments**
- [Milvus](https://milvus.io/) - The open-source vector database
- [PyMilvus](https://github.com/milvus-io/pymilvus) - Official Python SDK
- [Sentence Transformers](https://www.sbert.net/) - For embedding examples
- [FastAPI](https://fastapi.tiangolo.com/) - For API integration examples
---
<div align="center">
**โญ Star this repository if you find it helpful!**
[Report Bug](https://github.com/ai-prishtina/milvus-client/issues) โข [Request Feature](https://github.com/ai-prishtina/milvus-client/issues)
</div>
```
```
Raw data
{
"_id": null,
"home_page": null,
"name": "ai-prishtina-milvus-client",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "ai, embeddings, machine-learning, milvus, similarity-search, vector-database, vector-search",
"author": null,
"author_email": "Alban Maxhuni <alban.q.maxhuni@gmail.com>, AI Prishtina <info@albanmaxhuni.com>",
"download_url": "https://files.pythonhosted.org/packages/e9/d5/b5ba223b9241d49efdd02c4cba40534a7134f22e643a4b6f07205e73f8fe/ai_prishtina_milvus_client-1.0.2.tar.gz",
"platform": null,
"description": "# AI Prishtina Milvus Client\n\n<div align=\"center\">\n\n\n\n**A comprehensive, production-ready Python client library for Milvus vector database operations**\n\n[](https://python.org)\n[](https://badge.fury.io/py/ai-prishtina-milvus-client)\n[](https://pepy.tech/project/ai-prishtina-milvus-client)\n[](https://pepy.tech/project/ai-prishtina-milvus-client)\n[](LICENSE)\n[](https://github.com/ai-prishtina/milvus-client)\n[](https://codecov.io)\n[](https://docs.ai-prishtina.com)\n\n*Designed for AI and machine learning applications requiring high-performance vector operations*\n\n</div>\n\n---\n\n\n## \u2615 Support This Project\n\nIf you find this project helpful, please consider supporting it:\n\n[](https://coff.ee/albanmaxhuni)\n\n\n---\n\n## \ud83d\ude80 **Features**\n\n### **Core Capabilities**\n- \ud83d\udd04 **Async/Await Support** - High-performance asynchronous operations\n- \ud83d\uddc4\ufe0f **Comprehensive Vector Operations** - Full CRUD operations with advanced querying\n- \ud83d\udd12 **Enterprise Security** - Built-in authentication, encryption, and access control\n- \u2601\ufe0f **Multi-Cloud Storage** - AWS S3, Google Cloud Storage, Azure Blob integration\n- \ud83d\udcca **Real-time Streaming** - Kafka integration for live data processing\n- \ud83d\udcc8 **Monitoring & Metrics** - Prometheus integration with custom metrics\n- \ud83c\udfaf **Multi-modal Support** - Text, image, audio, and custom embeddings\n- \u26a1 **Batch Processing** - Optimized for large-scale operations\n- \ud83d\udee1\ufe0f **Robust Error Handling** - Automatic retries and graceful degradation\n\n### **Advanced Features**\n- \ud83d\udd0d **Intelligent Indexing** - Support for IVF, HNSW, ANNOY, and custom indexes\n- \ud83c\udf10 **Distributed Processing** - Horizontal scaling and load balancing\n- \ud83d\udcbe **Smart Caching** - Redis integration for performance optimization\n- \ud83d\udd04 **Data Synchronization** - Cross-service data consistency\n- \ud83d\udce6 **Backup & Recovery** - Automated disaster recovery procedures\n- \ud83c\udf9b\ufe0f **Configuration Management** - Environment-based configuration\n- \ud83d\udcdd **Comprehensive Logging** - Structured logging with multiple outputs\n\n---\n\n## \ud83d\udce6 **Installation**\n\n### **Requirements**\n- Python 3.9 or higher\n- Milvus 2.3.0 or higher\n\n### **Install from PyPI**\n```bash\npip install ai-prishtina-milvus-client\n```\n\n### **Install from Source**\n```bash\ngit clone https://github.com/ai-prishtina/milvus-client.git\ncd milvus-client\npip install -e .\n```\n\n### **Development Installation**\n```bash\ngit clone https://github.com/ai-prishtina/milvus-client.git\ncd milvus-client\npip install -e \".[dev]\"\n```\n\n---\n\n## \u26a1 **Quick Start**\n\n### **Basic Usage**\n\n```python\nimport asyncio\nimport numpy as np\nfrom ai_prishtina_milvus_client import AsyncMilvusClient\n\nasync def main():\n # Initialize client with configuration\n config = {\n \"host\": \"localhost\",\n \"port\": 19530,\n \"secure\": False,\n \"timeout\": 30\n }\n \n client = AsyncMilvusClient(config)\n \n try:\n # Connect to Milvus\n await client.connect()\n print(\"\u2705 Connected to Milvus\")\n \n # Create a collection for image embeddings\n collection_name = \"image_embeddings\"\n await client.create_collection(\n collection_name=collection_name,\n dimension=512, # Common for image embeddings\n index_type=\"HNSW\",\n metric_type=\"COSINE\",\n description=\"Image embeddings collection\"\n )\n print(f\"\u2705 Created collection: {collection_name}\")\n \n # Generate sample embeddings\n num_vectors = 1000\n vectors = np.random.rand(num_vectors, 512).tolist()\n metadata = [\n {\n \"image_id\": f\"img_{i:06d}\",\n \"category\": np.random.choice([\"person\", \"animal\", \"object\"]),\n \"confidence\": np.random.uniform(0.8, 1.0),\n \"timestamp\": \"2024-01-01T00:00:00Z\"\n }\n for i in range(num_vectors)\n ]\n \n # Insert vectors with metadata\n ids = await client.insert(\n collection_name=collection_name,\n vectors=vectors,\n metadata=metadata\n )\n print(f\"\u2705 Inserted {len(ids)} vectors\")\n \n # Wait for indexing to complete\n await asyncio.sleep(2)\n \n # Perform similarity search\n query_vector = np.random.rand(512).tolist()\n results = await client.search(\n collection_name=collection_name,\n query_vectors=[query_vector],\n top_k=5,\n search_params={\"ef\": 64} # HNSW parameter\n )\n \n print(f\"\ud83d\udd0d Found {len(results[0])} similar images:\")\n for i, result in enumerate(results[0]):\n metadata = result.get(\"metadata\", {})\n print(f\" {i+1}. ID: {result['id']}, \"\n f\"Distance: {result['distance']:.4f}, \"\n f\"Category: {metadata.get('category', 'unknown')}\")\n \n finally:\n await client.disconnect()\n print(\"\u2705 Disconnected from Milvus\")\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n---\n\n## \ud83d\udcda **Comprehensive Examples**\n\n### **1. Text Embeddings with Transformers**\n\n```python\nimport asyncio\nfrom sentence_transformers import SentenceTransformer\nfrom ai_prishtina_milvus_client import AsyncMilvusClient\n\nasync def text_similarity_example():\n \"\"\"Example: Text similarity search using sentence transformers.\"\"\"\n \n # Initialize sentence transformer\n model = SentenceTransformer('all-MiniLM-L6-v2')\n \n # Sample documents\n documents = [\n \"The quick brown fox jumps over the lazy dog\",\n \"Machine learning is a subset of artificial intelligence\",\n \"Python is a popular programming language for data science\",\n \"Vector databases enable efficient similarity search\",\n \"Natural language processing helps computers understand text\"\n ]\n \n # Generate embeddings\n embeddings = model.encode(documents).tolist()\n \n # Initialize Milvus client\n client = AsyncMilvusClient({\n \"host\": \"localhost\",\n \"port\": 19530\n })\n \n try:\n await client.connect()\n \n # Create collection for text embeddings\n collection_name = \"text_embeddings\"\n await client.create_collection(\n collection_name=collection_name,\n dimension=384, # all-MiniLM-L6-v2 dimension\n index_type=\"IVF_FLAT\",\n metric_type=\"COSINE\"\n )\n \n # Insert documents with embeddings\n metadata = [\n {\n \"text\": doc,\n \"length\": len(doc),\n \"word_count\": len(doc.split())\n }\n for doc in documents\n ]\n \n ids = await client.insert(\n collection_name=collection_name,\n vectors=embeddings,\n metadata=metadata\n )\n \n # Search for similar documents\n query = \"What is machine learning?\"\n query_embedding = model.encode([query]).tolist()\n \n results = await client.search(\n collection_name=collection_name,\n query_vectors=query_embedding,\n top_k=3,\n output_fields=[\"text\", \"word_count\"]\n )\n \n print(f\"Query: {query}\")\n print(\"Similar documents:\")\n for result in results[0]:\n print(f\" - {result['metadata']['text']}\")\n print(f\" Similarity: {1 - result['distance']:.4f}\")\n \n finally:\n await client.disconnect()\n\nasyncio.run(text_similarity_example())\n```\n\n### **2. Batch Processing with Progress Tracking**\n\n```python\nimport asyncio\nimport numpy as np\nfrom typing import List, Dict, Any\nfrom ai_prishtina_milvus_client import AsyncMilvusClient\n\nasync def batch_processing_example():\n \"\"\"Example: Large-scale batch processing with progress tracking.\"\"\"\n \n client = AsyncMilvusClient({\n \"host\": \"localhost\",\n \"port\": 19530\n })\n \n try:\n await client.connect()\n \n # Create collection for large dataset\n collection_name = \"large_dataset\"\n await client.create_collection(\n collection_name=collection_name,\n dimension=256,\n index_type=\"IVF_SQ8\",\n metric_type=\"L2\"\n )\n \n # Process data in batches\n total_vectors = 100000\n batch_size = 1000\n \n print(f\"Processing {total_vectors} vectors in batches of {batch_size}\")\n \n for batch_idx in range(0, total_vectors, batch_size):\n end_idx = min(batch_idx + batch_size, total_vectors)\n current_batch_size = end_idx - batch_idx\n \n # Generate batch data\n vectors = np.random.rand(current_batch_size, 256).tolist()\n metadata = [\n {\n \"batch_id\": batch_idx // batch_size,\n \"item_id\": batch_idx + i,\n \"category\": f\"cat_{(batch_idx + i) % 10}\",\n \"score\": np.random.uniform(0, 1)\n }\n for i in range(current_batch_size)\n ]\n \n # Insert batch\n ids = await client.insert(\n collection_name=collection_name,\n vectors=vectors,\n metadata=metadata\n )\n \n # Progress tracking\n progress = (end_idx / total_vectors) * 100\n print(f\"Progress: {progress:.1f}% - Inserted batch {batch_idx//batch_size + 1}\")\n \n # Optional: Add delay to prevent overwhelming the system\n await asyncio.sleep(0.1)\n \n print(\"\u2705 Batch processing completed\")\n \n # Verify total count\n count = await client.count(collection_name)\n print(f\"Total vectors in collection: {count}\")\n \n finally:\n await client.disconnect()\n\nasyncio.run(batch_processing_example())\n```\n\n### **3. Multi-modal Search with Metadata Filtering**\n\n```python\nimport asyncio\nimport numpy as np\nfrom ai_prishtina_milvus_client import AsyncMilvusClient\n\nasync def multimodal_search_example():\n \"\"\"Example: Multi-modal search with complex metadata filtering.\"\"\"\n\n client = AsyncMilvusClient({\n \"host\": \"localhost\",\n \"port\": 19530\n })\n\n try:\n await client.connect()\n\n # Create collection for multi-modal data\n collection_name = \"multimodal_content\"\n await client.create_collection(\n collection_name=collection_name,\n dimension=768, # Common for multi-modal embeddings\n index_type=\"HNSW\",\n metric_type=\"COSINE\"\n )\n\n # Insert multi-modal content\n content_types = [\"text\", \"image\", \"audio\", \"video\"]\n categories = [\"education\", \"entertainment\", \"news\", \"sports\"]\n\n vectors = []\n metadata = []\n\n for i in range(1000):\n vectors.append(np.random.rand(768).tolist())\n metadata.append({\n \"content_id\": f\"content_{i:06d}\",\n \"content_type\": np.random.choice(content_types),\n \"category\": np.random.choice(categories),\n \"duration\": np.random.randint(10, 3600), # seconds\n \"quality\": np.random.choice([\"HD\", \"4K\", \"SD\"]),\n \"language\": np.random.choice([\"en\", \"es\", \"fr\", \"de\"]),\n \"upload_date\": f\"2024-{np.random.randint(1,13):02d}-{np.random.randint(1,29):02d}\",\n \"views\": np.random.randint(100, 1000000),\n \"rating\": np.random.uniform(1.0, 5.0)\n })\n\n ids = await client.insert(\n collection_name=collection_name,\n vectors=vectors,\n metadata=metadata\n )\n\n # Wait for indexing\n await asyncio.sleep(3)\n\n # Complex search with metadata filtering\n query_vector = np.random.rand(768).tolist()\n\n # Search for high-quality educational videos in English\n results = await client.search(\n collection_name=collection_name,\n query_vectors=[query_vector],\n top_k=10,\n filter_expression='content_type == \"video\" and category == \"education\" and language == \"en\" and quality in [\"HD\", \"4K\"] and rating > 4.0',\n output_fields=[\"content_id\", \"content_type\", \"category\", \"quality\", \"rating\"]\n )\n\n print(\"\ud83c\udfaf Search Results: High-quality educational videos in English\")\n for i, result in enumerate(results[0]):\n meta = result[\"metadata\"]\n print(f\" {i+1}. {meta['content_id']} - {meta['quality']} - Rating: {meta['rating']:.2f}\")\n\n # Aggregate search - find popular content by category\n categories_stats = {}\n for category in categories:\n cat_results = await client.search(\n collection_name=collection_name,\n query_vectors=[query_vector],\n top_k=100,\n filter_expression=f'category == \"{category}\"',\n output_fields=[\"views\", \"rating\"]\n )\n\n if cat_results[0]:\n avg_views = np.mean([r[\"metadata\"][\"views\"] for r in cat_results[0]])\n avg_rating = np.mean([r[\"metadata\"][\"rating\"] for r in cat_results[0]])\n categories_stats[category] = {\n \"avg_views\": avg_views,\n \"avg_rating\": avg_rating,\n \"count\": len(cat_results[0])\n }\n\n print(\"\\n\ud83d\udcca Category Statistics:\")\n for category, stats in categories_stats.items():\n print(f\" {category.title()}: {stats['count']} items, \"\n f\"Avg Views: {stats['avg_views']:.0f}, \"\n f\"Avg Rating: {stats['avg_rating']:.2f}\")\n\n finally:\n await client.disconnect()\n\nasyncio.run(multimodal_search_example())\n```\n\n### **4. Real-time Streaming with Kafka Integration**\n\n```python\nimport asyncio\nimport json\nimport numpy as np\nfrom ai_prishtina_milvus_client import AsyncMilvusClient\nfrom ai_prishtina_milvus_client.streaming import KafkaStreamProcessor, StreamMessage\n\nasync def streaming_example():\n \"\"\"Example: Real-time vector streaming with Kafka.\"\"\"\n\n # Configuration\n milvus_config = {\n \"host\": \"localhost\",\n \"port\": 19530\n }\n\n kafka_config = {\n \"bootstrap_servers\": [\"localhost:9092\"],\n \"topic\": \"vector_stream\",\n \"group_id\": \"milvus_consumer\"\n }\n\n # Initialize stream processor\n stream_processor = KafkaStreamProcessor(\n milvus_config=milvus_config,\n stream_config=kafka_config\n )\n\n try:\n # Setup collection for streaming data\n client = AsyncMilvusClient(milvus_config)\n await client.connect()\n\n collection_name = \"streaming_vectors\"\n await client.create_collection(\n collection_name=collection_name,\n dimension=128,\n index_type=\"IVF_FLAT\",\n metric_type=\"L2\"\n )\n\n # Producer: Send vectors to Kafka\n async def produce_vectors():\n for i in range(100):\n # Simulate real-time data\n vector = np.random.rand(128).tolist()\n metadata = {\n \"timestamp\": \"2024-01-01T00:00:00Z\",\n \"source\": \"sensor_data\",\n \"device_id\": f\"device_{i % 10}\",\n \"batch_id\": i // 10\n }\n\n message = StreamMessage(\n vectors=[vector],\n metadata=[metadata],\n operation=\"insert\",\n collection=collection_name\n )\n\n await stream_processor.produce_message(\"vector_stream\", message)\n print(f\"\ud83d\udce4 Sent vector {i+1}/100\")\n\n # Simulate real-time delay\n await asyncio.sleep(0.1)\n\n # Consumer: Process vectors from Kafka\n async def consume_vectors():\n async for message in stream_processor.consume_messages(\"vector_stream\"):\n try:\n # Process the stream message\n result = await stream_processor.process_message(message)\n print(f\"\ud83d\udce5 Processed {len(result)} vectors\")\n\n except Exception as e:\n print(f\"\u274c Error processing message: {e}\")\n\n # Run producer and consumer concurrently\n await asyncio.gather(\n produce_vectors(),\n consume_vectors()\n )\n\n finally:\n await stream_processor.close()\n await client.disconnect()\n\n# Note: This example requires Kafka to be running\n# asyncio.run(streaming_example())\n```\n\n### **5. Advanced Security and Monitoring**\n\n```python\nimport asyncio\nfrom ai_prishtina_milvus_client import AsyncMilvusClient\nfrom ai_prishtina_milvus_client.security import SecurityManager\nfrom ai_prishtina_milvus_client.monitoring import MetricsCollector\n\nasync def security_monitoring_example():\n \"\"\"Example: Advanced security and monitoring features.\"\"\"\n\n # Initialize security manager\n security_config = {\n \"encryption_key\": \"your-encryption-key\",\n \"enable_rbac\": True,\n \"token_expiry\": 3600\n }\n\n security_manager = SecurityManager(config=security_config)\n\n # Initialize metrics collector\n metrics_config = {\n \"prometheus_gateway\": \"localhost:9091\",\n \"job_name\": \"milvus_client\",\n \"enable_system_metrics\": True\n }\n\n metrics_collector = MetricsCollector(config=metrics_config)\n\n # Initialize client with security and monitoring\n client = AsyncMilvusClient({\n \"host\": \"localhost\",\n \"port\": 19530,\n \"security_manager\": security_manager,\n \"metrics_collector\": metrics_collector\n })\n\n try:\n # Create user and authenticate\n await security_manager.create_user(\n username=\"data_scientist\",\n password=\"secure_password\",\n roles=[\"read\", \"write\"]\n )\n\n auth_token = await security_manager.authenticate(\n \"data_scientist\",\n \"secure_password\"\n )\n\n # Connect with authentication\n await client.connect(auth_token=auth_token)\n\n # Create collection with encryption\n collection_name = \"secure_collection\"\n await client.create_collection(\n collection_name=collection_name,\n dimension=256,\n index_type=\"HNSW\",\n metric_type=\"COSINE\",\n enable_encryption=True\n )\n\n # Insert data with automatic metrics collection\n vectors = [np.random.rand(256).tolist() for _ in range(1000)]\n metadata = [\n {\n \"user_id\": security_manager.encrypt_data(f\"user_{i}\"),\n \"sensitive_data\": security_manager.encrypt_data(f\"data_{i}\"),\n \"public_info\": f\"info_{i}\"\n }\n for i in range(1000)\n ]\n\n # Metrics are automatically collected during operations\n with metrics_collector.timer(\"insert_operation\"):\n ids = await client.insert(\n collection_name=collection_name,\n vectors=vectors,\n metadata=metadata\n )\n\n # Search with metrics\n query_vector = np.random.rand(256).tolist()\n\n with metrics_collector.timer(\"search_operation\"):\n results = await client.search(\n collection_name=collection_name,\n query_vectors=[query_vector],\n top_k=10\n )\n\n # Decrypt sensitive data from results\n for result in results[0]:\n meta = result[\"metadata\"]\n if \"user_id\" in meta:\n decrypted_user = security_manager.decrypt_data(meta[\"user_id\"])\n print(f\"User: {decrypted_user}, Distance: {result['distance']:.4f}\")\n\n # Export metrics\n await metrics_collector.push_metrics()\n\n # Get performance statistics\n stats = metrics_collector.get_stats()\n print(f\"\\n\ud83d\udcc8 Performance Stats:\")\n print(f\" Insert operations: {stats.get('insert_count', 0)}\")\n print(f\" Search operations: {stats.get('search_count', 0)}\")\n print(f\" Average insert time: {stats.get('avg_insert_time', 0):.3f}s\")\n print(f\" Average search time: {stats.get('avg_search_time', 0):.3f}s\")\n\n finally:\n await client.disconnect()\n\nasyncio.run(security_monitoring_example())\n```\n\n---\n\n## \ud83d\udd27 **Configuration**\n\n### **Basic Configuration**\n\n```python\nfrom ai_prishtina_milvus_client import AsyncMilvusClient\n\n# Simple configuration\nconfig = {\n \"host\": \"localhost\",\n \"port\": 19530,\n \"secure\": False,\n \"timeout\": 30\n}\n\nclient = AsyncMilvusClient(config)\n```\n\n### **Advanced Configuration**\n\n```python\nfrom ai_prishtina_milvus_client import AsyncMilvusClient, MilvusConfig\n\n# Advanced configuration with all options\nconfig = MilvusConfig(\n # Connection settings\n host=\"localhost\",\n port=19530,\n secure=True,\n timeout=60,\n\n # Authentication\n username=\"admin\",\n password=\"password\",\n token=\"auth_token\",\n\n # Performance settings\n pool_size=10,\n max_retries=3,\n retry_delay=1.0,\n\n # Validation settings\n validate_vectors=True,\n normalize_vectors=True,\n max_vector_dimension=2048,\n\n # Monitoring settings\n enable_metrics=True,\n metrics_port=8080,\n\n # Logging settings\n log_level=\"INFO\",\n log_file=\"milvus_client.log\"\n)\n\nclient = AsyncMilvusClient(config)\n```\n\n### **Environment Variables**\n\n```bash\n# Set environment variables\nexport MILVUS_HOST=localhost\nexport MILVUS_PORT=19530\nexport MILVUS_USERNAME=admin\nexport MILVUS_PASSWORD=password\nexport MILVUS_SECURE=true\nexport MILVUS_TIMEOUT=60\n```\n\n```python\n# Load from environment\nfrom ai_prishtina_milvus_client import AsyncMilvusClient\n\n# Automatically loads from environment variables\nclient = AsyncMilvusClient.from_env()\n```\n\n---\n\n## \ud83d\udcd6 **API Reference**\n\n### **AsyncMilvusClient**\n\n#### **Connection Management**\n\n```python\n# Connect to Milvus\nawait client.connect(auth_token=None)\n\n# Disconnect from Milvus\nawait client.disconnect()\n\n# Check connection status\nis_connected = await client.is_connected()\n\n# Get server info\ninfo = await client.get_server_info()\n```\n\n#### **Collection Operations**\n\n```python\n# Create collection\nawait client.create_collection(\n collection_name: str,\n dimension: int,\n index_type: str = \"IVF_FLAT\",\n metric_type: str = \"L2\",\n description: str = None,\n enable_encryption: bool = False\n)\n\n# List collections\ncollections = await client.list_collections()\n\n# Check if collection exists\nexists = await client.has_collection(collection_name)\n\n# Get collection info\ninfo = await client.describe_collection(collection_name)\n\n# Drop collection\nawait client.drop_collection(collection_name)\n\n# Get collection statistics\nstats = await client.get_collection_stats(collection_name)\n```\n\n#### **Vector Operations**\n\n```python\n# Insert vectors\nids = await client.insert(\n collection_name: str,\n vectors: List[List[float]],\n metadata: List[Dict] = None,\n partition_name: str = None\n)\n\n# Search vectors\nresults = await client.search(\n collection_name: str,\n query_vectors: List[List[float]],\n top_k: int = 10,\n search_params: Dict = None,\n filter_expression: str = None,\n output_fields: List[str] = None,\n partition_names: List[str] = None\n)\n\n# Get vectors by IDs\nvectors = await client.get(\n collection_name: str,\n ids: List[int],\n output_fields: List[str] = None\n)\n\n# Delete vectors\nawait client.delete(\n collection_name: str,\n filter_expression: str\n)\n\n# Count vectors\ncount = await client.count(\n collection_name: str,\n filter_expression: str = None\n)\n```\n\n#### **Index Operations**\n\n```python\n# Create index\nawait client.create_index(\n collection_name: str,\n field_name: str = \"vector\",\n index_params: Dict = None\n)\n\n# Drop index\nawait client.drop_index(\n collection_name: str,\n field_name: str = \"vector\"\n)\n\n# Get index info\nindex_info = await client.describe_index(\n collection_name: str,\n field_name: str = \"vector\"\n)\n```\n\n#### **Partition Operations**\n\n```python\n# Create partition\nawait client.create_partition(\n collection_name: str,\n partition_name: str\n)\n\n# List partitions\npartitions = await client.list_partitions(collection_name)\n\n# Drop partition\nawait client.drop_partition(\n collection_name: str,\n partition_name: str\n)\n```\n\n### **Streaming Operations**\n\n```python\nfrom ai_prishtina_milvus_client.streaming import KafkaStreamProcessor\n\n# Initialize stream processor\nprocessor = KafkaStreamProcessor(\n milvus_config=milvus_config,\n stream_config=kafka_config\n)\n\n# Produce message\nawait processor.produce_message(topic, message)\n\n# Consume messages\nasync for message in processor.consume_messages(topic):\n result = await processor.process_message(message)\n```\n\n### **Security Operations**\n\n```python\nfrom ai_prishtina_milvus_client.security import SecurityManager\n\n# Initialize security manager\nsecurity = SecurityManager(config=security_config)\n\n# User management\nawait security.create_user(username, password, roles)\nawait security.delete_user(username)\nawait security.update_user_roles(username, roles)\n\n# Authentication\ntoken = await security.authenticate(username, password)\nawait security.validate_token(token)\n\n# Data encryption\nencrypted = security.encrypt_data(data)\ndecrypted = security.decrypt_data(encrypted)\n```\n\n### **Monitoring Operations**\n\n```python\nfrom ai_prishtina_milvus_client.monitoring import MetricsCollector\n\n# Initialize metrics collector\nmetrics = MetricsCollector(config=metrics_config)\n\n# Collect metrics\nwith metrics.timer(\"operation_name\"):\n # Your operation here\n pass\n\n# Custom metrics\nmetrics.increment_counter(\"custom_counter\")\nmetrics.set_gauge(\"custom_gauge\", value)\nmetrics.record_histogram(\"custom_histogram\", value)\n\n# Export metrics\nawait metrics.push_metrics()\nstats = metrics.get_stats()\n```\n\n---\n\n## \ud83c\udfaf **Best Practices**\n\n### **Performance Optimization**\n\n```python\n# 1. Use batch operations for large datasets\nbatch_size = 1000\nfor i in range(0, len(vectors), batch_size):\n batch_vectors = vectors[i:i+batch_size]\n batch_metadata = metadata[i:i+batch_size]\n await client.insert(collection_name, batch_vectors, batch_metadata)\n\n# 2. Choose appropriate index types\n# - IVF_FLAT: Good balance of speed and accuracy\n# - HNSW: Best for high-accuracy searches\n# - IVF_SQ8: Memory-efficient for large datasets\n\n# 3. Use connection pooling for high concurrency\nconfig = {\n \"pool_size\": 20, # Adjust based on your needs\n \"max_retries\": 3\n}\n\n# 4. Implement proper error handling\ntry:\n results = await client.search(collection_name, query_vectors, top_k=10)\nexcept MilvusException as e:\n logger.error(f\"Milvus operation failed: {e}\")\n # Implement fallback logic\n```\n\n### **Memory Management**\n\n```python\n# 1. Process large datasets in chunks\nasync def process_large_dataset(vectors, chunk_size=10000):\n for i in range(0, len(vectors), chunk_size):\n chunk = vectors[i:i+chunk_size]\n await client.insert(collection_name, chunk)\n # Allow garbage collection\n del chunk\n\n# 2. Use generators for streaming data\nasync def vector_generator():\n for data in large_dataset:\n yield process_data(data)\n\n# 3. Clean up resources\nasync with AsyncMilvusClient(config) as client:\n # Operations here\n pass # Automatic cleanup\n```\n\n### **Security Best Practices**\n\n```python\n# 1. Use environment variables for sensitive data\nimport os\n\nconfig = {\n \"host\": os.getenv(\"MILVUS_HOST\"),\n \"username\": os.getenv(\"MILVUS_USERNAME\"),\n \"password\": os.getenv(\"MILVUS_PASSWORD\")\n}\n\n# 2. Enable encryption for sensitive collections\nawait client.create_collection(\n collection_name=\"sensitive_data\",\n dimension=768,\n enable_encryption=True\n)\n\n# 3. Implement proper access control\nsecurity_manager = SecurityManager(config={\n \"enable_rbac\": True,\n \"token_expiry\": 3600\n})\n\n# 4. Validate input data\nfrom ai_prishtina_milvus_client.validation import VectorValidator\n\nvalidator = VectorValidator(dimension=768, normalize=True)\nvalidated_vectors = validator.validate(vectors)\n```\n\n---\n\n## \ud83d\udd0d **Troubleshooting**\n\n### **Common Issues**\n\n#### **Connection Problems**\n```python\n# Issue: Connection timeout\n# Solution: Increase timeout and check network\nconfig = {\n \"host\": \"localhost\",\n \"port\": 19530,\n \"timeout\": 60 # Increase timeout\n}\n\n# Issue: Authentication failed\n# Solution: Check credentials and permissions\ntry:\n await client.connect()\nexcept AuthenticationError:\n # Check username/password\n # Verify user permissions\n```\n\n#### **Performance Issues**\n```python\n# Issue: Slow search performance\n# Solution: Optimize index parameters\nindex_params = {\n \"index_type\": \"HNSW\",\n \"params\": {\n \"M\": 16, # Increase for better recall\n \"efConstruction\": 200 # Increase for better quality\n }\n}\n\n# Issue: High memory usage\n# Solution: Use memory-efficient index\nindex_params = {\n \"index_type\": \"IVF_SQ8\", # Memory-efficient\n \"params\": {\"nlist\": 1024}\n}\n```\n\n#### **Data Issues**\n```python\n# Issue: Vector dimension mismatch\n# Solution: Validate dimensions before insert\nif len(vector) != expected_dimension:\n raise ValueError(f\"Expected dimension {expected_dimension}, got {len(vector)}\")\n\n# Issue: Invalid metadata\n# Solution: Validate metadata schema\nfrom pydantic import BaseModel\n\nclass VectorMetadata(BaseModel):\n id: str\n category: str\n timestamp: str\n\nvalidated_metadata = [VectorMetadata(**meta) for meta in metadata]\n```\n\n### **Debugging**\n\n```python\n# Enable debug logging\nimport logging\nlogging.basicConfig(level=logging.DEBUG)\n\n# Use client debugging features\nclient = AsyncMilvusClient(config, debug=True)\n\n# Monitor operations\nfrom ai_prishtina_milvus_client.monitoring import MetricsCollector\n\nmetrics = MetricsCollector(config={\"enable_debug\": True})\n```\n\n---\n\n## \ud83e\uddea **Testing**\n\n### **Unit Tests**\n```bash\n# Run unit tests\npytest tests/unit/ -v\n\n# Run with coverage\npytest tests/unit/ --cov=ai_prishtina_milvus_client --cov-report=html\n```\n\n### **Integration Tests**\n```bash\n# Start Docker services\ndocker-compose up -d\n\n# Run integration tests\npytest tests/integration/ -v\n\n# Run specific test categories\npytest tests/integration/ -m \"not slow\"\n```\n\n### **Performance Tests**\n```bash\n# Run performance benchmarks\npython tests/performance/benchmark.py\n\n# Run load tests\npython tests/performance/load_test.py --vectors=100000 --concurrent=10\n```\n\n---\n\n## \ud83d\udcda **Documentation**\n\n### **API Documentation**\n- [Complete API Reference](https://docs.ai-prishtina.com/milvus-client/api/)\n- [Configuration Guide](https://docs.ai-prishtina.com/milvus-client/config/)\n- [Examples Repository](https://github.com/ai-prishtina/milvus-client-examples)\n\n### **Tutorials**\n- [Getting Started Guide](https://docs.ai-prishtina.com/milvus-client/tutorials/getting-started/)\n- [Advanced Features](https://docs.ai-prishtina.com/milvus-client/tutorials/advanced/)\n- [Production Deployment](https://docs.ai-prishtina.com/milvus-client/tutorials/production/)\n\n### **Community**\n- [GitHub Discussions](https://github.com/ai-prishtina/milvus-client/discussions)\n- [Stack Overflow](https://stackoverflow.com/questions/tagged/ai-prishtina-milvus)\n- [Discord Community](https://discord.gg/ai-prishtina)\n\n---\n\n## \ud83e\udd1d **Contributing**\n\nWe welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.\n\n### **Development Setup**\n```bash\n# Clone repository\ngit clone https://github.com/ai-prishtina/milvus-client.git\ncd milvus-client\n\n# Create virtual environment\npython -m venv venv\nsource venv/bin/activate # On Windows: venv\\Scripts\\activate\n\n# Install development dependencies\npip install -e \".[dev]\"\n\n# Install pre-commit hooks\npre-commit install\n\n# Run tests\npytest\n```\n\n### **Contribution Guidelines**\n- Follow [PEP 8](https://pep8.org/) style guidelines\n- Add tests for new features\n- Update documentation for API changes\n- Use meaningful commit messages\n- Create pull requests against the `develop` branch\n\n---\n\n## \ud83d\udcc4 **License**\n\nThis project is dual-licensed:\n\n### **Open Source License (AGPL-3.0)**\nFor open-source projects, research, and educational use, this software is licensed under the GNU Affero General Public License v3.0 (AGPL-3.0). This means:\n- \u2705 Free to use for open-source projects\n- \u2705 Must open-source your entire application if you distribute it\n- \u2705 Perfect for research and educational purposes\n- \u2705 Community-driven development\n\n### **Commercial License**\nFor commercial use, proprietary applications, or when you cannot comply with AGPL-3.0 requirements:\n- \ud83c\udfe2 **Commercial applications** and SaaS products\n- \ud83d\udd12 **Proprietary software** without open-source requirements\n- \ud83d\ude80 **Enterprise support** and custom development\n- \ud83d\udcde **Priority support** and SLA guarantees\n\n**Need a commercial license?** Contact: [alban.q.maxhuni@gmail.com](mailto:alban.q.maxhuni@gmail.com)\n\nSee the [LICENSE](LICENSE) file for complete details.\n\n---\n\n## \ud83d\udc68\u200d\ud83d\udcbb **Author**\n\n**Alban Maxhuni, PhD**\nEmail: [alban.q.maxhuni@gmail.com](mailto:alban.q.maxhuni@gmail.com) | [info@albanmaxhuni.com](mailto:info@albanmaxhuni.com)\n\n---\n\n## \ud83d\ude4f **Acknowledgments**\n\n- [Milvus](https://milvus.io/) - The open-source vector database\n- [PyMilvus](https://github.com/milvus-io/pymilvus) - Official Python SDK\n- [Sentence Transformers](https://www.sbert.net/) - For embedding examples\n- [FastAPI](https://fastapi.tiangolo.com/) - For API integration examples\n\n---\n\n<div align=\"center\">\n\n**\u2b50 Star this repository if you find it helpful!**\n\n[Report Bug](https://github.com/ai-prishtina/milvus-client/issues) \u2022 [Request Feature](https://github.com/ai-prishtina/milvus-client/issues)\n\n</div>\n```\n```\n",
"bugtrack_url": null,
"license": "AGPL-3.0-or-later OR Commercial",
"summary": "A comprehensive, production-ready Python client library for Milvus vector database operations designed for AI and machine learning applications",
"version": "1.0.2",
"project_urls": {
"Bug Reports": "https://github.com/albanmaxhuni/ai-prishtina-milvus-client/issues",
"Changelog": "https://github.com/albanmaxhuni/ai-prishtina-milvus-client/releases",
"Documentation": "https://github.com/albanmaxhuni/ai-prishtina-milvus-client/blob/main/README.md",
"Homepage": "https://github.com/albanmaxhuni/ai-prishtina-milvus-client",
"Issues": "https://github.com/albanmaxhuni/ai-prishtina-milvus-client/issues",
"Repository": "https://github.com/albanmaxhuni/ai-prishtina-milvus-client.git",
"Source Code": "https://github.com/albanmaxhuni/ai-prishtina-milvus-client"
},
"split_keywords": [
"ai",
" embeddings",
" machine-learning",
" milvus",
" similarity-search",
" vector-database",
" vector-search"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "fedd61ee402e0c59053626a74e37b4a26d77a32389c7577c69679d4da8859550",
"md5": "09ec314bd99d9d07414ab0e579981407",
"sha256": "d39ab42f5635ebf1ea2866ac92e5490745419a8e9afc01c4d6fe1a160570849f"
},
"downloads": -1,
"filename": "ai_prishtina_milvus_client-1.0.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "09ec314bd99d9d07414ab0e579981407",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 67518,
"upload_time": "2025-07-20T19:07:17",
"upload_time_iso_8601": "2025-07-20T19:07:17.343422Z",
"url": "https://files.pythonhosted.org/packages/fe/dd/61ee402e0c59053626a74e37b4a26d77a32389c7577c69679d4da8859550/ai_prishtina_milvus_client-1.0.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "e9d5b5ba223b9241d49efdd02c4cba40534a7134f22e643a4b6f07205e73f8fe",
"md5": "17bbc9c20b045bd49c796c07e8fa6850",
"sha256": "7f721a061f20a5090396177061c81aea13d6799b246734a9dea260435f1b7504"
},
"downloads": -1,
"filename": "ai_prishtina_milvus_client-1.0.2.tar.gz",
"has_sig": false,
"md5_digest": "17bbc9c20b045bd49c796c07e8fa6850",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 1354160,
"upload_time": "2025-07-20T19:07:19",
"upload_time_iso_8601": "2025-07-20T19:07:19.307875Z",
"url": "https://files.pythonhosted.org/packages/e9/d5/b5ba223b9241d49efdd02c4cba40534a7134f22e643a4b6f07205e73f8fe/ai_prishtina_milvus_client-1.0.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-20 19:07:19",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "albanmaxhuni",
"github_project": "ai-prishtina-milvus-client",
"github_not_found": true,
"lcname": "ai-prishtina-milvus-client"
}