This guide demonstrates how to integrate Mirror SDK’s encryption capabilities with popular vector databases.

ChromaDB Integration

Basic Setup

from mirror_sdk.core.mirror_core import MirrorSDK, MirrorConfig
from mirror_sdk.core.models import VectorData
import chromadb

# Initialize Mirror SDK
config = MirrorConfig.from_env()
sdk = MirrorSDK(config)

# Setup ChromaDB
client = chromadb.Client()
collection = client.create_collection(
    name="secure_documents",
    metadata={"hnsw:space": "cosine"}
)

Secure Operations

# Encrypt vector
vector_data = VectorData(vector=embedding, id="doc_1")
encrypted = sdk.vectax.encrypt(vector_data)

# Store with encryption metadata
collection.add(
    ids=["doc_1"],
    embeddings=[encrypted.ciphertext],
    documents=["Document content"],
    metadatas=[{
        "iv": encrypted.iv,
        "auth_hash": encrypted.auth_hash
    }]
)

# Secure search
encrypted_query = sdk.vectax.encrypt(
    VectorData(vector=query_embedding, id="query")
)
results = collection.query(
    query_embeddings=[encrypted_query.ciphertext],
    n_results=2
)

Pinecone Integration

Setup & Operations

import pinecone
from mirror_sdk.core.models import VectorData

# Initialize Pinecone
pinecone.init(api_key="your-key", environment="env")
index = pinecone.Index("secure-index")

# Encrypt and upsert
vector_data = VectorData(vector=embedding, id="vec1")
encrypted = sdk.vectax.encrypt(vector_data)

index.upsert([
    (
        "vec1",
        encrypted.ciphertext,
        {
            "iv": encrypted.iv,
            "auth_hash": encrypted.auth_hash,
            "metadata": "additional metadata"
        }
    )
])

# Query
encrypted_query = sdk.vectax.encrypt(
    VectorData(vector=query_vector, id="query")
)
results = index.query(
    vector=encrypted_query.ciphertext,
    top_k=5,
    include_metadata=True
)

Weaviate Integration

Schema Setup

import weaviate

client = weaviate.Client("http://localhost:8080")

# Create schema for encrypted vectors
schema = {
    "class": "SecureDocument",
    "vectorizer": "none",
    "properties": [
        {"name": "content", "dataType": ["text"]},
        {"name": "iv", "dataType": ["string"]},
        {"name": "auth_hash", "dataType": ["string"]}
    ]
}
client.schema.create_class(schema)

Secure Operations

# Encrypt and store
vector_data = VectorData(vector=embedding, id="doc1")
encrypted = sdk.vectax.encrypt(vector_data)

client.data_object.create(
    class_name="SecureDocument",
    data_object={
        "content": "Document content",
        "iv": encrypted.iv,
        "auth_hash": encrypted.auth_hash
    },
    vector=encrypted.ciphertext
)

# Search
encrypted_query = sdk.vectax.encrypt(
    VectorData(vector=query_vector, id="query")
)
result = (
    client.query
    .get("SecureDocument", ["content"])
    .with_near_vector({
        "vector": encrypted_query.ciphertext
    })
    .with_limit(5)
    .do()
)

Milvus Integration

Collection Setup

from pymilvus import Collection, DataType, FieldSchema, CollectionSchema
import numpy as np

# Define fields for encrypted vectors
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="encrypted_vector", dtype=DataType.FLOAT_VECTOR, dim=1536),
    FieldSchema(name="iv", dtype=DataType.VARCHAR, max_length=200),
    FieldSchema(name="auth_hash", dtype=DataType.VARCHAR, max_length=200),
    FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=2000)
]
schema = CollectionSchema(fields=fields, description="Secure documents")
collection = Collection("secure_docs", schema)

Operations

# Insert encrypted vectors
vector_data = VectorData(vector=embedding, id="doc1")
encrypted = sdk.vectax.encrypt(vector_data)

entities = [
    [0],  # id
    [encrypted.ciphertext],  # vector
    [encrypted.iv],  # iv
    [encrypted.auth_hash],  # auth_hash
    ["Document content"]  # content
]
collection.insert(entities)

# Search
encrypted_query = sdk.vectax.encrypt(
    VectorData(vector=query_vector, id="query")
)
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
    data=[encrypted_query.ciphertext],
    anns_field="encrypted_vector",
    param=search_params,
    limit=5
)

Qdrant Integration

Setup

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams

# Initialize Qdrant
client = QdrantClient("localhost", port=6333)

# Create collection for encrypted vectors
client.create_collection(
    collection_name="secure_docs",
    vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)

Operations

from qdrant_client.models import PointStruct

# Encrypt and store
vector_data = VectorData(vector=embedding, id="doc1")
encrypted = sdk.vectax.encrypt(vector_data)

point = PointStruct(
    id=0,
    vector=encrypted.ciphertext,
    payload={
        "content": "Document content",
        "iv": encrypted.iv,
        "auth_hash": encrypted.auth_hash
    }
)
client.upsert(collection_name="secure_docs", points=[point])

# Search
encrypted_query = sdk.vectax.encrypt(
    VectorData(vector=query_vector, id="query")
)
results = client.search(
    collection_name="secure_docs",
    query_vector=encrypted_query.ciphertext,
    limit=5
)

RBAC-Protected Vector Database Integration

Common RBAC Configuration

from mirror_sdk.core.models import RBACVectorData

# Define organization policies
app_policy = {
    "roles": ["admin", "researcher", "analyst", "reader"],
    "groups": ["research", "engineering", "product", "compliance"],
    "departments": ["ai", "platform", "security"]
}

# Initialize SDK with policy
sdk.set_policy(app_policy)

ChromaDB with RBAC

class RBACChromaDB:
    def __init__(self, mirror_sdk, collection):
        self.sdk = mirror_sdk
        self.collection = collection

    def store_with_rbac(self, embedding, content, access_policy):
        """Store vector with RBAC protection"""
        # Create RBAC vector data
        vector_data = RBACVectorData(
            vector=embedding,
            id=f"doc_{hash(content)}",
            access_policy=access_policy
        )

        # Encrypt with RBAC
        encrypted = self.sdk.rbac.encrypt(vector_data)

        # Store with RBAC metadata
        self.collection.add(
            ids=[vector_data.id],
            embeddings=[encrypted.crypto.ciphertext],
            documents=[content],
            metadatas=[{
                "encrypted_header": encrypted.encrypted_header,
                "access_policy": access_policy
            }]
        )

    def search_with_rbac(self, query_embedding, user_key):
        """Search with RBAC verification"""
        # Encrypt query
        query_data = RBACVectorData(
            vector=query_embedding,
            id="query",
            access_policy={}  # Query doesn't need policy
        )
        encrypted_query = self.sdk.rbac.encrypt(query_data)

        # Search
        results = self.collection.query(
            query_embeddings=[encrypted_query.crypto.ciphertext],
            n_results=5
        )

        # Verify access and decrypt
        accessible_docs = []
        for doc, metadata in zip(results["documents"][0], results["metadatas"][0]):
            try:
                self.sdk.rbac.decrypt(
                    metadata["encrypted_vector"],
                    metadata["encrypted_header"],
                    user_key
                )
                accessible_docs.append(doc)
            except Exception:
                continue

        return accessible_docs

Pinecone with RBAC

class RBACPinecone:
    def __init__(self, mirror_sdk, index):
        self.sdk = mirror_sdk
        self.index = index

    def upsert_with_rbac(self, vectors, access_policies):
        """Upsert vectors with RBAC policies"""
        records = []
        for vec, policy in zip(vectors, access_policies):
            vector_data = RBACVectorData(
                vector=vec["embedding"],
                id=vec["id"],
                access_policy=policy
            )
            encrypted = self.sdk.rbac.encrypt(vector_data)

            records.append((
                vec["id"],
                encrypted.crypto.ciphertext,
                {
                    "encrypted_header": encrypted.encrypted_header,
                    "access_policy": policy,
                    **vec.get("metadata", {})
                }
            ))

        self.index.upsert(records)

    def query_with_rbac(self, query_vector, user_key, top_k=5):
        """Query with RBAC access verification"""
        query_data = RBACVectorData(
            vector=query_vector,
            id="query"
        )
        encrypted_query = self.sdk.rbac.encrypt(query_data)

        results = self.index.query(
            vector=encrypted_query.crypto.ciphertext,
            top_k=top_k,
            include_metadata=True
        )

        verified_results = []
        for match in results.matches:
            try:
                self.sdk.rbac.decrypt(
                    match.metadata["encrypted_vector"],
                    match.metadata["encrypted_header"],
                    user_key
                )
                verified_results.append(match)
            except Exception:
                continue

        return verified_results

Weaviate with RBAC

class RBACWeaviate:
    def __init__(self, mirror_sdk, client):
        self.sdk = mirror_sdk
        self.client = client

    def add_with_rbac(self, object_data, access_policy):
        """Add object with RBAC protection"""
        vector_data = RBACVectorData(
            vector=object_data["vector"],
            id=object_data["id"],
            access_policy=access_policy
        )
        encrypted = self.sdk.rbac.encrypt(vector_data)

        self.client.data_object.create(
            class_name="SecureDocument",
            data_object={
                "content": object_data["content"],
                "encrypted_header": encrypted.encrypted_header,
                "access_policy": access_policy
            },
            vector=encrypted.crypto.ciphertext
        )

Qdrant with RBAC

from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, models, VectorParams
from mirror_sdk.core.mirror_core import MirrorSDK, MirrorConfig
from mirror_sdk.core.models import RBACVectorData, MirrorCrypto
from mirror_sdk.core import MirrorError
from mirror_sdk.utils import encode_binary_data, decode_binary_data
logger = logging.getLogger(__name__)


class RBACQdrant:
    def __init__(self, mirror_sdk, client):
        self.sdk = mirror_sdk
        self.client = client

    # Upsert vectors with RBAC policies
    # vectors is array of dictionaries with "embedding" and "id" fields
    # auth_policy is a dictionary which contains RBAC for uploaded documents
    def upsert_with_rbac(self, vectors, auth_policy, collection_name):
        """Upsert vectors with RBAC policies"""
        for index, vec in enumerate(vectors):

            vector_data = RBACVectorData(
                vector=vec["embedding"],
                id=index+1,
                access_policy=auth_policy
            )
            encrypted = self.sdk.rbac.encrypt(vector_data)

            # Store in Qdrant
            self.client.upsert(
                collection_name=collection_name,
                points=[
                    models.PointStruct(
                        id=index+1,
                        vector=encrypted.crypto.ciphertext,
                        payload={
                            "doc_id": index+1,
                            "content": vec["content"],
                            "encrypted_header": encrypted.encrypted_header,
                            "encrypted_vector_metadata": encode_binary_data(
                                encrypted.crypto.serialize()
                            ),
                        },
                    )
                ],
            )
            logger.info(f"Successfully stored document {vec['id']}")

    # Query method with RBAC access verification
    # search_policy contains RBAC for search
    # user_secret_key is the secret key for the user
    def query_with_rbac(self, query_vector, search_policy, user_secret_key, collection_name, top_k=5):
        """Query with RBAC access verification"""
        query_data = RBACVectorData(
            vector=query_vector,
            id="query",
            access_policy=search_policy
        )

        # Encrypt query with user's policy
        query_data = RBACVectorData(
            vector=query_vector,
            id="query",
            access_policy=search_policy
        )
        encrypted_query = self.sdk.rbac.encrypt(query_data)

        results = self.client.search(
            collection_name=collection_name,
            query_vector=encrypted_query.crypto.ciphertext,
            limit=5
        )

        accessible_results = []
        for result in results:
            try:
                doc_id = result.payload["doc_id"]

                # Get complete document from Qdrant
                qdrant_doc = qdrant_client.retrieve(
                    collection_name=collection_name,
                    ids=[doc_id],
                )[0]

                # Extract crypto metadata
                encrypted_vector_metadata = decode_binary_data(
                    qdrant_doc.payload["encrypted_vector_metadata"]
                )
                mirror_data = MirrorCrypto.deserialize(
                    encrypted_vector_metadata)

                # Try to decrypt with user's key - will fail if unauthorized
                try:
                    decrypted_result = sdk.rbac.decrypt(
                        crypto=mirror_data,
                        encrypted_header=qdrant_doc.payload["encrypted_header"],
                        user_secret_key=user_secret_key,
                    )
                    # Access granted
                    accessible_results.append({
                        "id": doc_id,
                        "content": qdrant_doc.payload["content"],
                        "score": result.score,
                        "access": "granted"
                    })
                    logger.info(f"Access granted for document {doc_id}")
                except MirrorError:
                    # Access denied
                    logger.info(f"Access denied for document {doc_id}")
                    accessible_results.append({
                        "id": doc_id,
                        "content": "[ACCESS DENIED]",
                        "score": result.score,
                        "access": "denied"
                    })

            except Exception as e:
                logger.error(f"Error processing document {doc_id}: {str(e)}")

        return accessible_results


def generate_vectors(docs):
    """Generate embeddings for the given documents."""
    vectors = []
    for index, doc in enumerate(docs):
        vectors.append({
            "id": index,
            "embedding": get_embedding(doc["content"])
        })
    return vectors

def rbac_demo():
    # Add your documents here, each document contains "id" and "content" field
    documents = []

    # Set overall policy for app
    sdk.set_policy(access_policy)
    
    collection_name = "rbac_demo"

    # Create Collection in Qdrant with name `rbac_demo`, use Qdrant client to create collection

    # Define authorized and unauthorized user contexts
    auth_user_context = {
            "roles": ["researcher"],
            "groups": ["research"],
            "departments": ["ai", "platform"],
        }
    unauth_user_context = {
        "roles": ["reader"],
        "groups": ["research"],
        "departments": ["ai", "platform"],
    }

    # Create user keys for authorized policy and unauthorized policy
    authorized_user_key = sdk.rbac.generate_user_secret_key(auth_user_context)
    unauthorized_user_key = sdk.rbac.generate_user_secret_key(unauth_user_context)

    rbac = RBACQdrant(sdk, qdrant_client)

    # Generate Vectors array, each element containing id and embedding field
    vectors = generate_vectors(documents)

    # Upsert documents with RBAC auth_policy
    rbac.upsert_with_rbac(vectors, auth_user_context, collection_name)

    search_query = "investment strategy"
    query_vector = get_embedding(search_query)

    # search with authorized key
    search_results = rbac.query_with_rbac(
        query_vector, auth_user_context, authorized_user_key, collection_name, top_k=5)
    print(search_results)

    # search with unauthorized key
    search_results = rbac.query_with_rbac(
        query_vector, unauth_user_context, unauthorized_user_key, collection_name, top_k=5)
    print(search_results)