Integrations
VectorDB
Integrations
VectorDB
Vector Database Integrations with Vectax SDK
This guide demonstrates how to integrate Mirror SDK’s encryption capabilities with popular vector databases.
ChromaDB Integration
Basic Setup
from mirror_sdk.core.mirror_core import MirrorSDK, MirrorConfig
from mirror_sdk.core.models import VectorData
import chromadb
# Initialize Mirror SDK
config = MirrorConfig.from_env()
sdk = MirrorSDK(config)
# Setup ChromaDB
client = chromadb.Client()
collection = client.create_collection(
name="secure_documents",
metadata={"hnsw:space": "cosine"}
)
Secure Operations
# Encrypt vector
vector_data = VectorData(vector=embedding, id="doc_1")
encrypted = sdk.vectax.encrypt(vector_data)
# Store with encryption metadata
collection.add(
ids=["doc_1"],
embeddings=[encrypted.ciphertext],
documents=["Document content"],
metadatas=[{
"iv": encrypted.iv,
"auth_hash": encrypted.auth_hash
}]
)
# Secure search
encrypted_query = sdk.vectax.encrypt(
VectorData(vector=query_embedding, id="query")
)
results = collection.query(
query_embeddings=[encrypted_query.ciphertext],
n_results=2
)
Pinecone Integration
Setup & Operations
import pinecone
from mirror_sdk.core.models import VectorData
# Initialize Pinecone
pinecone.init(api_key="your-key", environment="env")
index = pinecone.Index("secure-index")
# Encrypt and upsert
vector_data = VectorData(vector=embedding, id="vec1")
encrypted = sdk.vectax.encrypt(vector_data)
index.upsert([
(
"vec1",
encrypted.ciphertext,
{
"iv": encrypted.iv,
"auth_hash": encrypted.auth_hash,
"metadata": "additional metadata"
}
)
])
# Query
encrypted_query = sdk.vectax.encrypt(
VectorData(vector=query_vector, id="query")
)
results = index.query(
vector=encrypted_query.ciphertext,
top_k=5,
include_metadata=True
)
Weaviate Integration
Schema Setup
import weaviate
client = weaviate.Client("http://localhost:8080")
# Create schema for encrypted vectors
schema = {
"class": "SecureDocument",
"vectorizer": "none",
"properties": [
{"name": "content", "dataType": ["text"]},
{"name": "iv", "dataType": ["string"]},
{"name": "auth_hash", "dataType": ["string"]}
]
}
client.schema.create_class(schema)
Secure Operations
# Encrypt and store
vector_data = VectorData(vector=embedding, id="doc1")
encrypted = sdk.vectax.encrypt(vector_data)
client.data_object.create(
class_name="SecureDocument",
data_object={
"content": "Document content",
"iv": encrypted.iv,
"auth_hash": encrypted.auth_hash
},
vector=encrypted.ciphertext
)
# Search
encrypted_query = sdk.vectax.encrypt(
VectorData(vector=query_vector, id="query")
)
result = (
client.query
.get("SecureDocument", ["content"])
.with_near_vector({
"vector": encrypted_query.ciphertext
})
.with_limit(5)
.do()
)
Milvus Integration
Collection Setup
from pymilvus import Collection, DataType, FieldSchema, CollectionSchema
import numpy as np
# Define fields for encrypted vectors
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="encrypted_vector", dtype=DataType.FLOAT_VECTOR, dim=1536),
FieldSchema(name="iv", dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name="auth_hash", dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=2000)
]
schema = CollectionSchema(fields=fields, description="Secure documents")
collection = Collection("secure_docs", schema)
Operations
# Insert encrypted vectors
vector_data = VectorData(vector=embedding, id="doc1")
encrypted = sdk.vectax.encrypt(vector_data)
entities = [
[0], # id
[encrypted.ciphertext], # vector
[encrypted.iv], # iv
[encrypted.auth_hash], # auth_hash
["Document content"] # content
]
collection.insert(entities)
# Search
encrypted_query = sdk.vectax.encrypt(
VectorData(vector=query_vector, id="query")
)
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
data=[encrypted_query.ciphertext],
anns_field="encrypted_vector",
param=search_params,
limit=5
)
Qdrant Integration
Setup
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
# Initialize Qdrant
client = QdrantClient("localhost", port=6333)
# Create collection for encrypted vectors
client.create_collection(
collection_name="secure_docs",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)
Operations
from qdrant_client.models import PointStruct
# Encrypt and store
vector_data = VectorData(vector=embedding, id="doc1")
encrypted = sdk.vectax.encrypt(vector_data)
point = PointStruct(
id=0,
vector=encrypted.ciphertext,
payload={
"content": "Document content",
"iv": encrypted.iv,
"auth_hash": encrypted.auth_hash
}
)
client.upsert(collection_name="secure_docs", points=[point])
# Search
encrypted_query = sdk.vectax.encrypt(
VectorData(vector=query_vector, id="query")
)
results = client.search(
collection_name="secure_docs",
query_vector=encrypted_query.ciphertext,
limit=5
)
RBAC-Protected Vector Database Integration
Common RBAC Configuration
from mirror_sdk.core.models import RBACVectorData
# Define organization policies
app_policy = {
"roles": ["admin", "researcher", "analyst", "reader"],
"groups": ["research", "engineering", "product", "compliance"],
"departments": ["ai", "platform", "security"]
}
# Initialize SDK with policy
sdk.set_policy(app_policy)
ChromaDB with RBAC
class RBACChromaDB:
def __init__(self, mirror_sdk, collection):
self.sdk = mirror_sdk
self.collection = collection
def store_with_rbac(self, embedding, content, access_policy):
"""Store vector with RBAC protection"""
# Create RBAC vector data
vector_data = RBACVectorData(
vector=embedding,
id=f"doc_{hash(content)}",
access_policy=access_policy
)
# Encrypt with RBAC
encrypted = self.sdk.rbac.encrypt(vector_data)
# Store with RBAC metadata
self.collection.add(
ids=[vector_data.id],
embeddings=[encrypted.crypto.ciphertext],
documents=[content],
metadatas=[{
"encrypted_header": encrypted.encrypted_header,
"access_policy": access_policy
}]
)
def search_with_rbac(self, query_embedding, user_key):
"""Search with RBAC verification"""
# Encrypt query
query_data = RBACVectorData(
vector=query_embedding,
id="query",
access_policy={} # Query doesn't need policy
)
encrypted_query = self.sdk.rbac.encrypt(query_data)
# Search
results = self.collection.query(
query_embeddings=[encrypted_query.crypto.ciphertext],
n_results=5
)
# Verify access and decrypt
accessible_docs = []
for doc, metadata in zip(results["documents"][0], results["metadatas"][0]):
try:
self.sdk.rbac.decrypt(
metadata["encrypted_vector"],
metadata["encrypted_header"],
user_key
)
accessible_docs.append(doc)
except Exception:
continue
return accessible_docs
Pinecone with RBAC
class RBACPinecone:
def __init__(self, mirror_sdk, index):
self.sdk = mirror_sdk
self.index = index
def upsert_with_rbac(self, vectors, access_policies):
"""Upsert vectors with RBAC policies"""
records = []
for vec, policy in zip(vectors, access_policies):
vector_data = RBACVectorData(
vector=vec["embedding"],
id=vec["id"],
access_policy=policy
)
encrypted = self.sdk.rbac.encrypt(vector_data)
records.append((
vec["id"],
encrypted.crypto.ciphertext,
{
"encrypted_header": encrypted.encrypted_header,
"access_policy": policy,
**vec.get("metadata", {})
}
))
self.index.upsert(records)
def query_with_rbac(self, query_vector, user_key, top_k=5):
"""Query with RBAC access verification"""
query_data = RBACVectorData(
vector=query_vector,
id="query"
)
encrypted_query = self.sdk.rbac.encrypt(query_data)
results = self.index.query(
vector=encrypted_query.crypto.ciphertext,
top_k=top_k,
include_metadata=True
)
verified_results = []
for match in results.matches:
try:
self.sdk.rbac.decrypt(
match.metadata["encrypted_vector"],
match.metadata["encrypted_header"],
user_key
)
verified_results.append(match)
except Exception:
continue
return verified_results
Weaviate with RBAC
class RBACWeaviate:
def __init__(self, mirror_sdk, client):
self.sdk = mirror_sdk
self.client = client
def add_with_rbac(self, object_data, access_policy):
"""Add object with RBAC protection"""
vector_data = RBACVectorData(
vector=object_data["vector"],
id=object_data["id"],
access_policy=access_policy
)
encrypted = self.sdk.rbac.encrypt(vector_data)
self.client.data_object.create(
class_name="SecureDocument",
data_object={
"content": object_data["content"],
"encrypted_header": encrypted.encrypted_header,
"access_policy": access_policy
},
vector=encrypted.crypto.ciphertext
)
Qdrant with RBAC
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, models, VectorParams
from mirror_sdk.core.mirror_core import MirrorSDK, MirrorConfig
from mirror_sdk.core.models import RBACVectorData, MirrorCrypto
from mirror_sdk.core import MirrorError
from mirror_sdk.utils import encode_binary_data, decode_binary_data
logger = logging.getLogger(__name__)
class RBACQdrant:
def __init__(self, mirror_sdk, client):
self.sdk = mirror_sdk
self.client = client
# Upsert vectors with RBAC policies
# vectors is array of dictionaries with "embedding" and "id" fields
# auth_policy is a dictionary which contains RBAC for uploaded documents
def upsert_with_rbac(self, vectors, auth_policy, collection_name):
"""Upsert vectors with RBAC policies"""
for index, vec in enumerate(vectors):
vector_data = RBACVectorData(
vector=vec["embedding"],
id=index+1,
access_policy=auth_policy
)
encrypted = self.sdk.rbac.encrypt(vector_data)
# Store in Qdrant
self.client.upsert(
collection_name=collection_name,
points=[
models.PointStruct(
id=index+1,
vector=encrypted.crypto.ciphertext,
payload={
"doc_id": index+1,
"content": vec["content"],
"encrypted_header": encrypted.encrypted_header,
"encrypted_vector_metadata": encode_binary_data(
encrypted.crypto.serialize()
),
},
)
],
)
logger.info(f"Successfully stored document {vec['id']}")
# Query method with RBAC access verification
# search_policy contains RBAC for search
# user_secret_key is the secret key for the user
def query_with_rbac(self, query_vector, search_policy, user_secret_key, collection_name, top_k=5):
"""Query with RBAC access verification"""
query_data = RBACVectorData(
vector=query_vector,
id="query",
access_policy=search_policy
)
# Encrypt query with user's policy
query_data = RBACVectorData(
vector=query_vector,
id="query",
access_policy=search_policy
)
encrypted_query = self.sdk.rbac.encrypt(query_data)
results = self.client.search(
collection_name=collection_name,
query_vector=encrypted_query.crypto.ciphertext,
limit=5
)
accessible_results = []
for result in results:
try:
doc_id = result.payload["doc_id"]
# Get complete document from Qdrant
qdrant_doc = qdrant_client.retrieve(
collection_name=collection_name,
ids=[doc_id],
)[0]
# Extract crypto metadata
encrypted_vector_metadata = decode_binary_data(
qdrant_doc.payload["encrypted_vector_metadata"]
)
mirror_data = MirrorCrypto.deserialize(
encrypted_vector_metadata)
# Try to decrypt with user's key - will fail if unauthorized
try:
decrypted_result = sdk.rbac.decrypt(
crypto=mirror_data,
encrypted_header=qdrant_doc.payload["encrypted_header"],
user_secret_key=user_secret_key,
)
# Access granted
accessible_results.append({
"id": doc_id,
"content": qdrant_doc.payload["content"],
"score": result.score,
"access": "granted"
})
logger.info(f"Access granted for document {doc_id}")
except MirrorError:
# Access denied
logger.info(f"Access denied for document {doc_id}")
accessible_results.append({
"id": doc_id,
"content": "[ACCESS DENIED]",
"score": result.score,
"access": "denied"
})
except Exception as e:
logger.error(f"Error processing document {doc_id}: {str(e)}")
return accessible_results
def generate_vectors(docs):
"""Generate embeddings for the given documents."""
vectors = []
for index, doc in enumerate(docs):
vectors.append({
"id": index,
"embedding": get_embedding(doc["content"])
})
return vectors
def rbac_demo():
# Add your documents here, each document contains "id" and "content" field
documents = []
# Set overall policy for app
sdk.set_policy(access_policy)
collection_name = "rbac_demo"
# Create Collection in Qdrant with name `rbac_demo`, use Qdrant client to create collection
# Define authorized and unauthorized user contexts
auth_user_context = {
"roles": ["researcher"],
"groups": ["research"],
"departments": ["ai", "platform"],
}
unauth_user_context = {
"roles": ["reader"],
"groups": ["research"],
"departments": ["ai", "platform"],
}
# Create user keys for authorized policy and unauthorized policy
authorized_user_key = sdk.rbac.generate_user_secret_key(auth_user_context)
unauthorized_user_key = sdk.rbac.generate_user_secret_key(unauth_user_context)
rbac = RBACQdrant(sdk, qdrant_client)
# Generate Vectors array, each element containing id and embedding field
vectors = generate_vectors(documents)
# Upsert documents with RBAC auth_policy
rbac.upsert_with_rbac(vectors, auth_user_context, collection_name)
search_query = "investment strategy"
query_vector = get_embedding(search_query)
# search with authorized key
search_results = rbac.query_with_rbac(
query_vector, auth_user_context, authorized_user_key, collection_name, top_k=5)
print(search_results)
# search with unauthorized key
search_results = rbac.query_with_rbac(
query_vector, unauth_user_context, unauthorized_user_key, collection_name, top_k=5)
print(search_results)
Was this page helpful?
On this page
- ChromaDB Integration
- Basic Setup
- Secure Operations
- Pinecone Integration
- Setup & Operations
- Weaviate Integration
- Schema Setup
- Secure Operations
- Milvus Integration
- Collection Setup
- Operations
- Qdrant Integration
- Setup
- Operations
- RBAC-Protected Vector Database Integration
- Common RBAC Configuration
- ChromaDB with RBAC
- Pinecone with RBAC
- Weaviate with RBAC
- Qdrant with RBAC