CentOS 8上MySQL 8.0高可用集群部署与Ollama模型服务整合

云信安装大师
90
AI 质量分
11 5 月, 2025
8 分钟阅读
0 阅读

CentOS 8上MySQL 8.0高可用集群部署与Ollama模型服务整合

引言

在现代AI应用开发中,数据库高可用性和AI模型服务的无缝集成至关重要。本文将指导您在CentOS 8系统上部署MySQL 8.0高可用集群,并将其与Ollama开源大语言模型服务进行整合。这种架构特别适合需要处理大量结构化数据同时又要运行AI推理的应用场景。

准备工作

环境要求

  • CentOS 8服务器(至少3台用于MySQL集群)
  • root或sudo权限
  • 每台服务器至少4GB内存
  • 稳定的网络连接(建议内网带宽≥1Gbps)

软件要求

  • MySQL 8.0
  • MySQL Shell
  • MySQL Router
  • Ollama (最新版本)
  • Docker (可选,用于容器化部署)

第一部分:MySQL InnoDB Cluster部署

1.1 系统基础配置

在所有节点上执行:

代码片段
# 关闭防火墙或配置规则(生产环境请谨慎)
sudo systemctl stop firewalld
sudo systemctl disable firewalld

# 禁用SELinux
sudo setenforce 0
sudo sed -i 's/^SELINUX=enforcing$/SELINUX=permissive/' /etc/selinux/config

# 安装必要工具
sudo dnf install -y wget curl vim net-tools epel-release

1.2 MySQL 8.0安装

在所有节点上执行:

代码片段
# 添加MySQL官方仓库
sudo rpm -Uvh https://dev.mysql.com/get/mysql80-community-release-el8-4.noarch.rpm

# 安装MySQL Server和Shell
sudo dnf install -y mysql-server mysql-shell mysql-router

# 启动MySQL服务并设置开机启动
sudo systemctl enable --now mysqld

# 获取临时密码
sudo grep 'temporary password' /etc/mysql/mysqld.log

1.3 MySQL安全配置

在所有节点上执行:

代码片段
# 运行安全向导,设置root密码等
sudo mysql_secure_installation

# MySQL Shell连接测试(使用刚设置的密码)
mysqlsh root@localhost --sql

1.4 InnoDB Cluster配置

在第一个节点(将成为主节点)上执行:

代码片段
# MySQL Shell中执行以下命令:
mysqlsh

\connect root@node1 --password=YourPassword

# 配置InnoDB Cluster
dba.configureInstance('root@node1:3306', {password: 'YourPassword'})

# 创建集群对象并添加实例
var cluster = dba.createCluster('ai_cluster')

cluster.addInstance('root@node2:3306', {password: 'YourPassword'})
cluster.addInstance('root@node3:3306', {password: 'YourPassword'})

# 检查集群状态
cluster.status()

输出示例:

代码片段
{
    "clusterName": "ai_cluster", 
    "defaultReplicaSet": {
        "name": "default", 
        "primary": "node1:3306", 
        "status": "OK", 
        "statusText": "Cluster is ONLINE and can tolerate up to ONE failure.", 
        "topology": {
            "node1:3306": {
                "address": "node1:3306", 
                "memberRole": "PRIMARY", 
                "mode": "R/W", 
                "status": "ONLINE"
            },
            // ...其他节点信息...
        }
    }
}

1.5 MySQL Router配置

在其中一台节点(建议与应用服务器同机)上执行:

代码片段
# Bootstrap配置路由(自动发现集群拓扑)
mysqlrouter --bootstrap root@node1:3306 --directory /opt/mysqlrouter --user=mysqlrouter --force

# Router会生成启动脚本,通常位于:
/opt/mysqlrouter/start.sh

# 启动Router服务(使用生成的脚本)
sudo /opt/mysqlrouter/start.sh &

第二部分:Ollama模型服务部署与整合

2.1 Ollama安装与配置

在应用服务器上执行:

代码片段
# Linux安装命令(自动下载最新版本)
curl -fsSL https://ollama.com/install.sh | sh

# Ollama服务管理(systemd)
sudo systemctl enable ollama && sudo systemctl start ollama

# Pull需要的模型(例如llama2)
ollama pull llama2

# Run测试模型交互(验证安装)
ollama run llama2 "/list"

2.2 Python环境准备

代码片段
# Python3和pip安装(CentOS默认已安装):
sudo dnf install -y python39 python39-pip python39-devel gcc-c++ make openssl-devel bzip2-devel libffi-devel zlib-devel readline-devel sqlite-devel xz-devel tk-devel gdbm-devel libuuid-devel libnsl2-devel 

pip3 install --upgrade pip wheel setuptools virtualenv numpy pandas cryptography PyMySQL sqlalchemy transformers torch sentencepiece protobuf flask fastapi uvicorn python-dotenv requests tqdm datasets openai pydantic loguru pyjwt bcrypt passlib[bcrypt] httpx aiohttp websockets pymongo redis psycopg2-binary grpcio protobuf pyarrow scipy scikit-learn matplotlib seaborn plotly dash streamlit pandas-profiling ipython jupyterlab notebook autopep8 pylint black isort mypy bandit safety pipenv poetry pipx twine wheel build invoke fabric paramiko ansible pytest pytest-cov pytest-mock pytest-asyncio pytest-xdist hypothesis tox coverage nose robotframework behave locust httpie yq jq csvkit xlsxwriter openpyxl xlrd xlwt pdfkit weasyprint reportlab pillow wand opencv-python pytesseract easyocr spacy gensim nltk textblob pattern polyglot langdetect googletrans translatepy googletrans==4.0.0-rc1 polyglot pycld2 fasttext sentence-transformers bertopic sumy pytextrank keybert yake rake-nltk pke textacy flair stanza transformers[sentencepiece] torchaudio torchvision simpletransformers farm-haystack allennlp spacy-transformers prodigy spacy-lookups-data spacy-models en_core_web_sm en_core_web_md en_core_web_lg xx_ent_wiki_sm xx_sent_ud_sm xx_ent_wiki_sm xx_sent_ud_sm xx_ent_wiki_md xx_sent_ud_md xx_ent_wiki_lg xx_sent_ud_lg zh_core_web_sm zh_core_web_md zh_core_web_lg ja_core_news_sm ja_core_news_md ja_core_news_lg de_core_news_sm de_core_news_md de_core_news_lg fr_core_news_sm fr_core_news_md fr_core_news_lg es_core_news_sm es_core_news_md es_core_news_lg it_core_news_sm it_core_news_md it_core_

2.3 MySQL-Ollama集成示例代码

数据库连接类 (db_manager.py)

代码片段
import pymysql.cursors  
from mysql.connector import pooling  
import logging  
from typing import Optional, Dict, Any, List  
import json  

class MySQLManager:  
    """  
    MySQL数据库连接管理类,支持连接池和高可用集群  
    """  

    _connection_pool = None  

    def __init__(self, config: Dict[str, Any]):  
        self.config = config  
        self.logger = logging.getLogger(__name__)  

        if not self._connection_pool:  
            self._initialize_pool()  

    def _initialize_pool(self):  
        """初始化MySQL连接池"""  
        try:  
            self._connection_pool = pooling.MySQLConnectionPool(  
                pool_name="ai_cluster_pool",  
                pool_size=5,  
                pool_reset_session=True,  
                host=self.config['host'],   # MySQL Router地址  
                port=self.config['port'],   # Router端口(通常6446)  
                user=self.config['user'],  
                password=self.config['password'],  
                database=self.config['database'],  
                charset='utf8mb4',  
                collation='utf8mb4_general_ci',  
                autocommit=True,   # AI应用通常需要自动提交模式   
            )  
            self.logger.info("MySQL连接池初始化成功")  

        except Exception as e:  
            self.logger.error(f"MySQL连接池初始化失败: {str(e)}")  
            raise  

    def get_query_results(self, query: str, params=None) -> List[Dict[str, Any]]:   
        """执行查询并返回结果列表"""   
        connection = None   
        cursor = None   

        try:   
            connection = self._connection_pool.get_connection()   
            cursor = connection.cursor(dictionary=True)   

            if params:   
                cursor.execute(query, params)   
            else:   
                cursor.execute(query)   

            results = cursor.fetchall()   

            # JSON字段自动解析处理    
            for row in results:
                for key, value in row.items():
                    if isinstance(value, str):
                        try:
                            row[key] = json.loads(value)
                        except (json.JSONDecodeError, TypeError):
                            pass

            return results   

        except Exception as e:
            self.logger.error(f"查询执行失败: {str(e)}")
            raise

        finally:
            if cursor:
                cursor.close()

            if connection:
                connection.close()

    # ...其他数据库操作方法...

Ollama服务集成类 (ollama_integration.py)

“`python
import httpx
from typing import Optional, Dict, Any
import logging
import json
from tenacity import retry, stopafterattempt, wait_exponential

class OllamaService:
“””
与Ollama模型服务交互的封装类

代码片段
功能包括:
- SQL生成辅助:基于自然语言生成SQL查询建议 
- SQL优化建议:分析现有SQL并提供优化方案 
- NLQ转SQL:将自然语言问题转换为SQL查询 
- SQL解释:用自然语言解释SQL语句的作用 
- Schema分析:提供数据库结构分析见解 
"""

def __init__(self, base_url="http://localhost:11434"):
    self.base_url = base_url.rstrip('/')

    # HTTP客户端配置 (推荐使用httpx异步客户端)
    self.client = httpx.Client(timeout=60.0)

    # LLM提示模板 (可根据具体需求调整)
    self.prompts = {
        'sql_generation': (
            """你是一个专业的SQL专家。根据以下数据库Schema和用户需求,生成高效且安全的MySQL查询语句。

            数据库Schema:
            {schema_info}

            用户需求:
            {user_input}

            请只返回有效的SQL语句,不需要解释。"""
        ),

        'sql_explanation': (
            """请用简明易懂的自然语言解释以下SQL查询的功能和作用:

            查询语句:
            {sql_query}

            请用中文回答。"""
        ),

        'nlq_to_sql': (
            """将以下自然语言问题转换为对应的MySQL查询语句。

            数据库表结构信息:
            表名:{tables}
            列信息:

{columns_info}

代码片段
            问题描述:
            "{question}"

            请直接返回有效的SQL语句。"""
         )
     }

 @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1))
 async def generate_response(self, model_name="llama2", prompt="", context=""):
     """
     调用Ollama API获取LLM响应

     参数:
     model_name - Ollama中的模型名称(如'llama2','mistral'等)
     prompt - LLM提示词模板内容

     返回:
     生成的文本响应(str)
     """

     request_data = {
         "model": model_name,
         "prompt": prompt,
         "context": context,
         "stream": False,
         # Ollama特有参数可在此添加...
     }

     try:
         response = await self.client.post(
             f"{self.base_url}/api/generate",
             json=request_data,
             timeout=60.0,
         )

         response_data = response.json()

         if response.status_code !=200 or not response_data.get("response"):
             error_msg=f"Ollama API请求失败:{response.text}"
             logging.error(error_msg)
             raise ValueError(error_msg)

         return response_data["response"]

     except Exception as e:
         logging.error(f"调用Ollama API时出错:{str(e)}")
         raise

 async def generate_sql(self,schema_info:str,natural_language_query:str)->str:
     """
     根据自然语言描述生成SQL查询

     参数:
     schema_info - JSON格式的表结构信息(包含表名、列名、类型等)
     natural_language_query -用户的自然语言查询请求

     返回:
     生成的SQL语句(str)
     """

     prompt_template=self.prompts['sql_generation']

     filled_prompt=prompt_template.format(
         schema_info=schema_info,
         user_input=natural_language_query,
     )

     generated_text=await self.generate_response(prompt=filled_prompt)

     #后处理确保只提取有效的SQL部分(防止LLM附加额外说明文本)
     sql_lines=[]

     for line in generated_text.split('\n'):
         line=line.strip()

         if line and not line.startswith('--')and not line.startswith('/*'):
             sql_lines.append(line)

     return'\n'.join(sql_lines).rstrip(';')+';'

 async def explain_sql(self,sql_query)->str:
     """
     用自然语言解释SQL查询的功能

     参数:
     sql_query-SQL查询字符串

     返回:
     解释文本(str)
     """

     prompt_template=self.prompts['sql_explanation']

     filled_prompt=prompt_template.format(sql_query=sql_query)

     return await self.generate_response(prompt=filled_prompt)

 async def optimize_query(self,sql_query,schema_info)->dict:
     """
     提供SQL优化建议

     参数:
       sql_query-SQL查询字符串

       schema_info-Schema信息字符串

      返回格式示例:
       {"original":"SELECT * FROM users WHERE id IN(SELECT user_id FROM orders)",
       optimized":"SELECT u.* FROM users u JOIN orders o ON u.id=o.user_id",
       explanation":"改为JOIN操作效率更高...",
       performance_gain":"预计提升30%"}

      实际实现中应包含更详细的优化分析和指标...

      注意:此方法仅为框架示例,
      实际实现需要结合具体业务逻辑和LLM能力进行调整...

      可以考虑结合EXPLAIN分析结果作为LLM的额外输入...

      也可以考虑集成专业优化工具如pt-index-usage的输出...

      对于复杂场景可能需要多次LLM交互+中间结果验证...

      生产环境中应添加缓存机制避免重复计算相同查询...

      考虑添加成本控制机制防止复杂优化消耗过多资源...

      考虑添加历史记录功能跟踪优化效果随时间变化...

      考虑添加A/B测试框架验证不同优化方案的实际效果..."""

   pass

   async def natural_to_SQL(self,tables_info,natural_language)->str:

   pass

   async def validate_SQL(self,sql_statement)->dict:

   pass

   async def get_table_recommendations(self,business_description)->list:

   pass

   async def get_data_model_design(self,business_needs)->dict:

   pass

   async def analyze_performance(self,sql_statement)->dict:

   pass

   async def generate_test_data(self,schema_description,num_records)->dict:

   pass

   async def generate_documentation(self,database_structure)->dict:

   pass

   async def compare_datasets(self,sql_statement_a,sql_statement_b)->dict:

   pass

   async def detect_data_patterns(self,sql_statement)->dict:

   pass

   async def predict_trends(self,temporal_data_query)->dict:

   pass

   async def generate_dashboard_recommendations(data_profile):

   pass

  # ...其他业务相关方法可根据需求继续扩展...

  class AsyncOllamaService(OllamService):

  """异步版本的服务类"""

  async_client=None 

  @classmethod 

  asyncdef create(cls):

  instance=cls()

  instance.async_client=

  httpx.AsynClient(timeout=None)

  return instance 

  @retry(stop.stop after attempt(3),wait.wait exponential(multiplier=l))

  asyncdef generate_responseasync(model name,prompt.context):

  request data={}

  try:

  response=

  awaitself.async client.post()

  returndata.get(response)

  exceptExceptionas e:

  logger.error(f)

  raise 

 ...其他异步方法实现...

class AIModelCacheManager:

“””AI模型响应缓存管理器”””

def init (self):

self.cache={}

self.hits=

self.misses=

def get cache key(model name,prompt.context):

returnhashlib.md5(f{model name}{prompt}{context}.encode()).hexdigest()

def get cached response(key):

ifkey inself.cache:

self.hits +=l

returnself.cache[key]

else:

self.misses +=l

returnNone

def add to cache(key.response.ttl=None):

ifttl isNone:self.cache[key]=response

else:self.cache[key]=(response.time.time()+ttl)

def clean expired entries():

now time.time()

forkey.value inself.cache.items():

ifisinstance(value.tuple)and len(value)==2and value[l]

@property

def hit rate(self):

total requests=(self.hits+max(l .self.misses))

returnround((self.hits/total requests)*100 .2)

class AIModelOrchestrator(OllamService.MySQLManager):

“””高级协调器类结合了数据库访问和AI能力”””

def init (config.db config.model config=None):

super(). init

原创 高质量