FE/backend/feishu_client.py

113 lines
3.5 KiB
Python

import requests
import json
import logging
class FeishuClient:
def __init__(self, app_id, app_secret):
self.app_id = app_id
self.app_secret = app_secret
self.base_url = "https://open.feishu.cn/open-apis"
self._app_access_token = None
def get_token(self):
url = f"{self.base_url}/auth/v3/app_access_token/internal"
payload = {
"app_id": self.app_id,
"app_secret": self.app_secret
}
try:
resp = requests.post(url, json=payload)
resp.raise_for_status()
data = resp.json()
self._app_access_token = data.get("app_access_token")
return self._app_access_token
except Exception as e:
logging.error(f"Failed to get Feishu token: {e}")
return None
def send_message(self, receive_id_type, receive_id, msg_type, content):
token = self.get_token()
if not token:
return None
url = f"{self.base_url}/im/v1/messages?receive_id_type={receive_id_type}"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"receive_id": receive_id,
"msg_type": msg_type,
"content": json.dumps(content) if isinstance(content, dict) else content
}
try:
resp = requests.post(url, json=payload, headers=headers)
return resp.json()
except Exception as e:
logging.error(f"Failed to send Feishu message: {e}")
return None
def create_group(self, name, description, user_ids):
"""
Creates a group (chat) and adds users in one call.
user_ids should be a list of open_ids.
"""
token = self.get_token()
if not token:
return None
url = f"{self.base_url}/im/v1/chats"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"name": name,
"description": description,
"user_id_list": user_ids
}
try:
resp = requests.post(url, json=payload, headers=headers)
result = resp.json()
print(f"DEBUG create_group response: {result}")
return result
except Exception as e:
logging.error(f"Failed to create Feishu group: {e}")
return None
def get_user_ids_by_emails(self, emails):
"""
Retrieves open_ids for a list of emails.
"""
token = self.get_token()
if not token:
return None
url = f"{self.base_url}/contact/v3/users/batch_get_id"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"emails": emails
}
try:
resp = requests.post(url, json=payload, headers=headers)
return resp.json()
except Exception as e:
logging.error(f"Failed to fetch user IDs: {e}")
return None
def send_card(self, receive_id, card_data):
"""
Sends an interactive card message.
"""
return self.send_message(
receive_id_type="chat_id" if receive_id.startswith("oc_") else "open_id",
receive_id=receive_id,
msg_type="interactive",
content=card_data
)