# a2a_server.py — 간단한 A2A 서버 예시 (FastAPI 기반)
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
# Agent Card 제공
@app.get("/.well-known/agent.json")
async def agent_card():
return {
"name": "데이터 분석 Agent",
"description": "SQL 쿼리 기반 데이터 분석을 수행합니다",
"url": "https://data-agent.example.com",
"version": "1.0.0",
"capabilities": {"streaming": False},
"skills": [{
"id": "analyze_data",
"name": "데이터 분석",
"description": "자연어 질문을 SQL로 변환하여 데이터를 분석합니다",
"inputModes": ["text/plain"],
"outputModes": ["application/json"]
}]
}
class TaskRequest(BaseModel):
jsonrpc: str = "2.0"
method: str
params: dict
id: str
# Task 수신 및 처리
@app.post("/")
async def handle_task(request: TaskRequest):
if request.method == "tasks/send":
# Task 처리 로직
user_message = request.params["message"]["parts"][0]["text"]
# LLM + SQL 실행으로 분석 수행
result = await analyze_with_llm(user_message)
return {
"jsonrpc": "2.0",
"id": request.id,
"result": {
"id": request.params.get("id", "task-001"),
"status": {"state": "completed"},
"artifacts": [{
"parts": [{"type": "text", "text": result}]
}]
}
}