from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import (
ChatAgentMessage,
ChatAgentResponse,
ChatAgentChunk,
ChatContext
)
class CustomerSupportAgent(ChatAgent):
def __init__(self):
"""에이전트 초기화: 모델, 검색 인덱스, 도구 설정"""
import mlflow.deployments
self.llm_client = mlflow.deployments.get_deploy_client("databricks")
self.vector_index = self._init_vector_search()
self.tools = self._init_tools()
def predict(
self,
messages: list[ChatAgentMessage],
context: ChatContext = None
) -> ChatAgentResponse:
"""동기 응답: 전체 답변을 한 번에 반환합니다"""
# 1. 사용자의 최신 메시지 추출
user_message = messages[-1].content
# 2. 관련 문서 검색 (RAG)
docs = self._search_documents(user_message)
# 3. 프롬프트 구성
system_prompt = self._build_system_prompt(docs)
# 4. LLM 호출
response = self.llm_client.predict(
endpoint="databricks-meta-llama-3-3-70b-instruct",
inputs={
"messages": [
{"role": "system", "content": system_prompt},
*[{"role": m.role, "content": m.content} for m in messages]
],
"temperature": 0.1,
"max_tokens": 1000
}
)
answer = response["choices"][0]["message"]["content"]
# 5. 응답 반환
return ChatAgentResponse(
messages=[ChatAgentMessage(role="assistant", content=answer)]
)
def predict_stream(
self,
messages: list[ChatAgentMessage],
context: ChatContext = None
):
"""스트리밍 응답: 토큰 단위로 점진적으로 반환합니다"""
# SSE(Server-Sent Events) 방식으로 토큰을 하나씩 전달
for chunk in self._stream_llm_response(messages):
yield ChatAgentChunk(
delta=ChatAgentMessage(role="assistant", content=chunk)
)
def _search_documents(self, query: str) -> list:
"""Vector Search에서 관련 문서 검색"""
results = self.vector_index.similarity_search(
query_text=query,
columns=["content", "title", "source"],
num_results=5
)
return results["result"]["data_array"]
def _build_system_prompt(self, docs: list) -> str:
"""검색된 문서를 기반으로 시스템 프롬프트 구성"""
context = "\n\n".join([f"[{d[1]}]\n{d[0]}" for d in docs])
return f"""당신은 고객 지원 전문가입니다. 다음 문서를 참고하여 정확하게 답변해 주세요.
문서에 없는 내용은 "확인 후 답변드리겠습니다"라고 안내해 주세요.
참고 문서:
{context}"""