import requests import json import logging class FeishuClient: def __init__(self, app_id, app_secret): self.app_id = app_id self.app_secret = app_secret self.base_url = "https://open.feishu.cn/open-apis" self._app_access_token = None def get_token(self): url = f"{self.base_url}/auth/v3/app_access_token/internal" payload = { "app_id": self.app_id, "app_secret": self.app_secret } try: resp = requests.post(url, json=payload) resp.raise_for_status() data = resp.json() self._app_access_token = data.get("app_access_token") return self._app_access_token except Exception as e: logging.error(f"Failed to get Feishu token: {e}") return None def send_message(self, receive_id_type, receive_id, msg_type, content): token = self.get_token() if not token: return None url = f"{self.base_url}/im/v1/messages?receive_id_type={receive_id_type}" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } payload = { "receive_id": receive_id, "msg_type": msg_type, "content": json.dumps(content) if isinstance(content, dict) else content } try: resp = requests.post(url, json=payload, headers=headers) return resp.json() except Exception as e: logging.error(f"Failed to send Feishu message: {e}") return None def create_group(self, name, description, user_ids): """ Creates a group (chat) and adds users in one call. user_ids should be a list of open_ids. """ token = self.get_token() if not token: return None url = f"{self.base_url}/im/v1/chats" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } payload = { "name": name, "description": description, "user_id_list": user_ids } try: resp = requests.post(url, json=payload, headers=headers) result = resp.json() print(f"DEBUG create_group response: {result}") return result except Exception as e: logging.error(f"Failed to create Feishu group: {e}") return None def get_user_ids_by_emails(self, emails): """ Retrieves open_ids for a list of emails. """ token = self.get_token() if not token: return None url = f"{self.base_url}/contact/v3/users/batch_get_id" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } payload = { "emails": emails } try: resp = requests.post(url, json=payload, headers=headers) return resp.json() except Exception as e: logging.error(f"Failed to fetch user IDs: {e}") return None def send_card(self, receive_id, card_data): """ Sends an interactive card message. """ return self.send_message( receive_id_type="chat_id" if receive_id.startswith("oc_") else "open_id", receive_id=receive_id, msg_type="interactive", content=card_data )