基于Agent开发跨平台任务调度系统的实践

云信安装大师
90
AI 质量分
27 4 月, 2025
5 分钟阅读
0 阅读

基于Agent开发跨平台任务调度系统的实践

引言

在现代分布式系统中,任务调度是一个核心需求。本文将介绍如何使用Agent模式开发一个跨平台的任务调度系统,包括Agent端和服务端(MCP Server)的实现。这种架构特别适合需要在不同操作系统上执行任务的场景。

准备工作

环境要求

  • Python 3.7+
  • pip包管理工具
  • 基本的网络环境(Agent和Server能互相通信)
  • 了解基本的REST API概念

需要安装的包

代码片段
# Agent端
pip install psutil requests schedule

# Server端
pip install flask flask-restful psutil

系统架构设计

我们的系统由两部分组成:
1. Agent:运行在目标机器上,负责接收和执行任务
2. MCP Server:任务控制中心,负责任务的调度和状态跟踪

代码片段
[流程图]
MCP Server <---> Agent1 (Windows)
           <---> Agent2 (Linux)
           <---> Agent3 (MacOS)

Agent实现

Agent核心代码

代码片段
import os
import sys
import time
import json
import psutil
import requests
import schedule
from threading import Thread

class TaskAgent:
    def __init__(self, server_url, agent_id):
        self.server_url = server_url
        self.agent_id = agent_id
        self.running_tasks = {}

    def register(self):
        """向服务器注册当前Agent"""
        data = {
            'agent_id': self.agent_id,
            'os': os.name,
            'hostname': os.uname().nodename if hasattr(os, 'uname') else os.environ.get('COMPUTERNAME', 'unknown'),
            'cpu_count': psutil.cpu_count(),
            'memory': round(psutil.virtual_memory().total / (1024**3), 2)  # GB为单位
        }

        try:
            resp = requests.post(
                f"{self.server_url}/register",
                json=data,
                timeout=5
            )
            return resp.status_code == 200
        except Exception as e:
            print(f"注册失败: {str(e)}")
            return False

    def fetch_tasks(self):
        """从服务器获取待执行任务"""
        try:
            resp = requests.get(
                f"{self.server_url}/tasks/{self.agent_id}",
                timeout=5
            )
            if resp.status_code == 200:
                return resp.json()
            return []
        except Exception as e:
            print(f"获取任务失败: {str(e)}")
            return []

    def execute_task(self, task):
        """执行单个任务"""
        task_id = task['task_id']
        command = task['command']

        print(f"开始执行任务 {task_id}: {command}")

        try:
            # 记录任务开始时间
            start_time = time.time()

            # 执行命令并捕获输出(兼容跨平台)
            if os.name == 'nt':  # Windows
                import subprocess
                result = subprocess.run(command, shell=True, 
                                      capture_output=True, text=True)
                output = result.stdout + result.stderr
                exit_code = result.returncode

            else:  # Unix-like系统
                import subprocess
                result = subprocess.run(command, shell=True, 
                                      executable='/bin/bash',
                                      capture_output=True, text=True)
                output = result.stdout + result.stderr
                exit_code = result.returncode

            # 计算执行时间(秒)
            duration = round(time.time() - start_time, 2)

            return {
                'task_id': task_id,
                'status': 'completed',
                'exit_code': exit_code,
                'output': output,
                'duration': duration,
                'agent_id': self.agent_id,
                'timestamp': int(time.time())
            }

        except Exception as e:
            return {
                'task_id': task_id,
                'status': 'failed',
                'error': str(e),
                'agent_id': self.agent_id,
                'timestamp': int(time.time())
            }

    def report_result(self, result):
        """向服务器报告任务结果"""
        try:
            resp = requests.post(
                f"{self.server_url}/report",
                json=result,
                timeout=5
            )
            return resp.status_code == 200

        except Exception as e:
            print(f"报告结果失败: {str(e)}")
            return False

    def run_pending_tasks(self):
        """执行待处理任务并报告结果"""
        tasks = self.fetch_tasks()

        for task in tasks:
            if task['task_id'] not in self.running_tasks:

                # Windows和Linux/Mac的命令可能需要不同处理方式示例:
                if os.name == 'nt' and not task.get('windows_command'):
                    print(f"警告: Windows系统但未提供windows_command")
                    continue

                    # Linux/Mac下检查命令是否存在?

                    # TODO: 可以添加更多预处理检查...

                    result = self.execute_task(task)
                    self.report_result(result)

                    if result['status'] == 'completed':
                        self.running_tasks[task['task_id']] = True

    def heartbeat(self):
        """定期发送心跳包"""
        while True:
            try:
               requests.post(
                   f"{self.server_url}/heartbeat",
                   json={'agent_id': self.agent_id},
                   timeout=3  
               )
               time.sleep(30)   # 每30秒发送一次心跳

           except Exception as e:
               print(f"心跳发送失败: {str(e)}")
               time.sleep(5)   # 失败后等待更短时间重试

    def start(self):
       """启动Agent服务"""
       if not self.register():
           print("初始注册失败,退出...")
           sys.exit(1)

       # 启动心跳线程  
       Thread(target=self.heartbeat, daemon=True).start()

       # 设置定时任务检查(每分钟检查一次新任务)
       schedule.every(1).minutes.do(self.run_pending_tasks)

       print(f"Agent {self.agent_id}已启动,等待任务...")

       while True:
           schedule.run_pending()
           time.sleep(1)

if __name__ == '__main__':
    SERVER_URL = "http://your-server-address:5000"
    AGENT_ID = "agent-" + str(os.getpid())   # 简单起见用进程ID作为代理ID

    agent = TaskAgent(SERVER_URL, AGENT_ID)
    agent.start()

Agent代码说明

  1. 注册机制:Agent启动时向MCP Server注册自身信息,包括硬件配置等。
  2. 心跳检测:单独的线程定期发送心跳包,让Server知道Agent在线。
  3. 命令执行:使用Python的subprocess模块跨平台执行命令。
  4. 结果上报:将命令执行结果回传给Server。

Agent部署注意事项

  1. 安全性考虑

    • Agent应该运行在受限用户权限下。
    • Server和Agent之间应该使用HTTPS通信。
    • Command参数应该进行适当的清理和验证。
  2. 资源限制

    代码片段
    # Windows下设置子进程超时示例:
    try:
        result = subprocess.run(command, timeout=60, ...) 
    except subprocess.TimeoutExpired:
        # 处理超时逻辑...
    
    # Unix下可以使用resource模块限制资源使用量  
    
  3. 日志记录

    • Agent应该记录所有执行的任务和结果。
    • 可以添加日志轮转功能防止日志文件过大。

MCP Server实现

Server核心代码

代码片段
from flask import Flask, request, jsonify 
from flask_restful import Api, Resource 
import time 
import threading 

app = Flask(__name__) 
api = Api(app) 

class MCPServer: 
    def __init__(self): 
        self.agents = {}          # {'agent-id': {'last_heartbeat': timestamp}}

class TaskManager: 
    def __init__(self): 
        self.tasks_queue = {}     # {'agent-id': [task1, task2]}

# API资源定义 

class RegisterResource(Resource): 
    def post(self): 
         data = request.get_json() 
         agent_id = data['agent_id'] 

         server.mcp_server_instance.register_agent(
             agent_id,
             data['os'],
             data['hostname'],
             data['cpu_count'],
             data['memory']
         ) 

         return {'status':'success'},200 

class TasksResource(Resource): 
     def get(self): pass  

     def post(self): pass  

api.add_resource(TasksResource,'/tasks')  

if __name__=='__main__':

     app.run(host='0.0.0.',port=5000)

MCP Server关键功能实现细节

代码片段

class MCPServer:

     ...

     def register_agent(self):

          ...

          if agent_data["os"] not in ["posix","nt"]:

               raise ValueError("Unsupported OS")

          ...

class TaskManager:

     ...

     def assign_task_to_agents_based_on_load_balancing():

          ...

          for agent in available_agents_sorted_by_cpu_load:

              if current_load_percentage < MAX_THRESHOLD_PERCENTAGE:

                  assign_task_to_this_agent()

                  break  

          else:

              raise RuntimeError("No suitable agent available")

MCP Server部署建议

1.数据库集成

建议将MongoDB或PostgreSQL用于存储:

代码片段

from pymongo import MongoClient  

client=MongoClient('mongodb://localhost:27017/')

db=client['task_scheduler']

tasks_collection=db.tasks  

agents_collection=db.active_agents  

2.认证与授权

使用JWT进行API保护:

代码片段

@app.before_request  

def check_auth_header():

     auth_header=request.headers.get('Authorization')

     if not auth_header or not validate_jwt(auth_header):

          abort(401)

3.高可用性考虑

可以使用Gunicorn+Gevent部署:

代码片段

gunicorn -w4-kgevent app:app-b0.:5000  

4.监控集成

集成Prometheus监控指标:

代码片段

from prometheus_flask_exporter import PrometheusMetrics  

metrics=PrometheusMetrics(app)

metrics.info('app_info','Application info',version='1..')

完整系统测试流程

1.启动MCP Server

代码片段

python mcp_server.py  

2.启动多个测试Agent

在不同终端中运行:

代码片段

#Windows PowerShell示例:

$env:SERVER_URL="http://localhost:5000"

python .\task_agent.py  

#Linux/Mac终端示例:

export SERVER_URL=http://localhost:5000

python3 task_agent.py  

3.通过API提交测试任务

使用curl测试:

代码片段

curl-X POST \

-H "Content-Type: application/json"\

-d'{"command":"ls-la","target_os":"posix"}'\

http://localhost:5000/tasks  

4.验证任务状态

查询已完成的任务:

代码片段

curl http://localhost:5000/tasks/completed  

常见问题解决方案(Q&A)

Q:如何确保Windows和Linux命令兼容性?

A:最佳实践是:

在创建任务时指定特定于操作系统的命令

“`json {

“windows_command”:”dir”,

“unix_command”:”ls-la”

}

“`
Q:如何处理长时间运行的任务?

A:建议方案:

在Server端实现超时机制

在Agent端使用进程组确保子进程被正确终止

Q:如何扩展支持更多操作系统?

A:扩展方法:

修改Agent的register方法上报更详细的系统信息

在Server端维护一个能力矩阵匹配适合的Agent

总结与展望本文实现的跨平台调度系统具有以下特点:

✅支持主流操作系统(Win/Linux/Mac)

✅自动负载均衡分配任务

✅完善的错误处理和监控机制未来可能的改进方向包括:

🔹容器化部署(Docker集成)

🔹更细粒度的权限控制系统(RBAC)

🔹与CI/CD管道集成希望这篇实践指南能帮助你构建自己的分布式任务调度系统!

原创 高质量