#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 百度VDB向量数据库连接测试脚本 测试数据库连接、基本操作和可用性 """ import pymochow from pymochow.configuration import Configuration from pymochow.auth.bce_credentials import BceCredentials from pymochow.model.schema import Schema, Field, VectorIndex, HNSWParams from pymochow.model.enum import FieldType, IndexType, MetricType from pymochow.model.table import Partition, Row import traceback import time # 数据库连接配置 ACCOUNT = 'root' API_KEY = 'vdb$yjr9ln3n0td' # 你提供的密码作为API密钥 ENDPOINT = 'http://180.76.96.191:5287' # 使用标准端口5287 def test_connection(): """测试数据库连接""" print("=" * 50) print("测试1: 数据库连接") print("=" * 50) try: # 创建配置和客户端 config = Configuration( credentials=BceCredentials(ACCOUNT, API_KEY), endpoint=ENDPOINT ) client = pymochow.MochowClient(config) print(f"✓ 成功创建客户端连接") print(f" - 账户: {ACCOUNT}") print(f" - 端点: {ENDPOINT}") return client except Exception as e: print(f"✗ 连接失败: {str(e)}") print(f"详细错误: {traceback.format_exc()}") return None def test_list_databases(client): """测试数据库列表查询""" print("\n" + "=" * 50) print("测试2: 查询数据库列表") print("=" * 50) try: # 查询数据库列表 db_list = client.list_databases() print(f"✓ 成功查询数据库列表") print(f" - 数据库数量: {len(db_list)}") if db_list: print(" - 数据库列表:") for i, db in enumerate(db_list, 1): print(f" {i}. {db.database_name}") else: print(" - 当前没有数据库") return db_list except Exception as e: print(f"✗ 查询数据库列表失败: {str(e)}") print(f"详细错误: {traceback.format_exc()}") return None def test_create_database(client): """测试创建数据库""" print("\n" + "=" * 50) print("测试3: 创建测试数据库") print("=" * 50) test_db_name = "test_db_" + str(int(time.time())) try: # 创建测试数据库 db = client.create_database(test_db_name) print(f"✓ 成功创建数据库: {test_db_name}") return db, test_db_name except Exception as e: print(f"✗ 创建数据库失败: {str(e)}") print(f"详细错误: {traceback.format_exc()}") return None, test_db_name def test_create_table(client, db, db_name): """测试创建表""" print("\n" + "=" * 50) print("测试4: 创建测试表") print("=" * 50) table_name = "test_table" try: # 定义表字段 fields = [] fields.append(Field("id", FieldType.STRING, primary_key=True, partition_key=True, auto_increment=False, not_null=True)) fields.append(Field("text", FieldType.STRING, not_null=True)) fields.append(Field("vector", FieldType.FLOAT_VECTOR, not_null=True, dimension=3)) # 定义索引 indexes = [] indexes.append( VectorIndex( index_name="vector_idx", index_type=IndexType.HNSW, field="vector", metric_type=MetricType.L2, params=HNSWParams(m=16, efconstruction=100), auto_build=True ) ) # 创建表 table = db.create_table( table_name=table_name, replication=2, # 最小副本数为2 partition=Partition(partition_num=1), # 单分区测试 schema=Schema(fields=fields, indexes=indexes) ) print(f"✓ 成功创建表: {table_name}") print(f" - 字段数量: {len(fields)}") print(f" - 索引数量: {len(indexes)}") return table, table_name except Exception as e: print(f"✗ 创建表失败: {str(e)}") print(f"详细错误: {traceback.format_exc()}") return None, table_name def test_insert_data(table): """测试插入数据""" print("\n" + "=" * 50) print("测试5: 插入测试数据") print("=" * 50) try: # 准备测试数据 rows = [ Row(id='001', text='测试文本1', vector=[0.1, 0.2, 0.3]), Row(id='002', text='测试文本2', vector=[0.4, 0.5, 0.6]), Row(id='003', text='测试文本3', vector=[0.7, 0.8, 0.9]) ] # 插入数据 table.upsert(rows) print(f"✓ 成功插入 {len(rows)} 条测试数据") # 等待一下让数据生效 time.sleep(2) return True except Exception as e: print(f"✗ 插入数据失败: {str(e)}") print(f"详细错误: {traceback.format_exc()}") return False def test_query_data(table): """测试查询数据""" print("\n" + "=" * 50) print("测试6: 查询测试数据") print("=" * 50) try: # 标量查询 primary_key = {'id': '001'} result = table.query(primary_key=primary_key, retrieve_vector=True) if result: print(f"✓ 成功查询数据") print(f" - ID: {result.get('id')}") print(f" - 文本: {result.get('text')}") print(f" - 向量: {result.get('vector')}") else: print("✗ 查询结果为空") return True except Exception as e: print(f"✗ 查询数据失败: {str(e)}") print(f"详细错误: {traceback.format_exc()}") return False def test_cleanup(client, db_name, table_name): """清理测试数据""" print("\n" + "=" * 50) print("测试7: 清理测试数据") print("=" * 50) try: # 获取数据库对象 db = client.database(db_name) # 删除表 try: db.drop_table(table_name) print(f"✓ 成功删除表: {table_name}") except Exception as e: print(f"⚠ 删除表失败: {str(e)}") # 删除数据库 try: client.drop_database(db_name) print(f"✓ 成功删除数据库: {db_name}") except Exception as e: print(f"⚠ 删除数据库失败: {str(e)}") except Exception as e: print(f"⚠ 清理过程出现错误: {str(e)}") def main(): """主测试函数""" print("百度VDB向量数据库可用性测试") print("测试配置:") print(f" - 用户名: {ACCOUNT}") print(f" - 服务器: {ENDPOINT}") print(f" - 测试时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") client = None db = None db_name = None table = None table_name = None try: # 测试1: 连接 client = test_connection() if not client: print("\n❌ 数据库连接失败,无法继续测试") return # 测试2: 查询数据库列表 db_list = test_list_databases(client) # 测试3: 创建数据库 db, db_name = test_create_database(client) if not db: print("\n❌ 无法创建数据库,跳过后续测试") return # 测试4: 创建表 table, table_name = test_create_table(client, db, db_name) if not table: print("\n❌ 无法创建表,跳过后续测试") return # 测试5: 插入数据 if test_insert_data(table): # 测试6: 查询数据 test_query_data(table) except Exception as e: print(f"\n❌ 测试过程中发生未预期错误: {str(e)}") print(f"详细错误: {traceback.format_exc()}") finally: # 测试7: 清理 if client and db_name: test_cleanup(client, db_name, table_name) # 关闭连接 if client: try: client.close() print(f"\n✓ 已关闭数据库连接") except: pass print("\n" + "=" * 50) print("测试完成") print("=" * 50) if __name__ == "__main__": main()