掘金 人工智能 08月15日
Mirix 图像记忆管理
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

Mirix项目是一款基于Electron的桌面应用,用户只需提供Gemini API Key即可体验其核心功能——图像记忆。该应用巧妙地利用了LLM(如Gemini)的多模态能力,特别是其图片理解能力,来解析屏幕截图或特定应用窗口的图像内容。捕获的图像经过预处理和相似度对比过滤后,会被发送至后端,并上传至Google Cloud。随后,这些图像信息与文本消息一起被整合,通过一个临时的消息积累机制,当积累内容达到阈值时,便触发记忆处理流程。最终,这些信息被发送至元记忆代理,根据用户操作和内容,更新系统的记忆库,实现对视觉信息的有效记忆和理解。

✨ **图像捕获与预处理**:Mirix应用通过Electron桌面端捕获屏幕截图或指定窗口内容,并在前端进行相似度对比,过滤掉冗余信息。有效的截图会被发送到后端,准备进行后续处理。此过程是实现视觉信息录入的第一步。

☁️ **云端上传与处理**:捕获的图像文件会被上传至Google Cloud,特别是当使用Gemini模型时。后端接收前端发送的包含图像URI的消息请求,并利用`UploadManager`类处理文件的异步上传,确保图像数据安全可靠地存储。

⏳ **临时消息积累与记忆触发**:应用采用临时消息积累机制,将捕获的图像和相关文本信息暂存。当积累的消息数量达到预设阈值(如`TEMPORARY_MESSAGE_LIMIT`为20)或在特定触发条件下,系统会调用`absorb_content_into_memory`方法,启动记忆处理流程。

🧠 **元记忆代理与记忆更新**:积累的内容会被构建成一个统一的消息,并发送给“元记忆代理”。该代理负责分析内容和用户交互,判断需要更新的记忆类型(如情景记忆、程序记忆、知识库等),从而实现对视觉信息的深度理解和有效管理。

🚀 **LLM多模态能力的应用**:Mirix的核心在于利用LLM(如Gemini)的多模态能力,特别是其强大的图片理解功能。LLM能够解析图像内容,并将其转化为可用于语义理解和记忆存储的信息,这是实现图像记忆功能的技术关键。

mirix 项目提供了一个 electron 桌面应用,只需要提供一个 gemini api key 就可以体验了。这里我们重点分析下 mirix 是如何实现图像记忆的。最核心的就是利用的 LLM 的多模态能力(如 gemini 的图片理解),利用 LLM 的理解结果再做语义理解等常规文本记忆处理。

请求 LLM 图片理解

1. 图片捕获和预处理

图片首先通过前端的屏幕监控功能捕获。在 ScreenshotMonitor.js 中:

    捕获屏幕截图或特定应用窗口 startMonitoring对比新截图与上一张截图的相似度,过滤掉过于相似的图片 processScreenshot将有效的图片路径发送到后端 sendScreenshotToBackend,调用接口 send_streaming_message

@ScreenshotMonitor.js

const requestData = {        // message: message,        image_uris: [screenshotFile.path],        // voice_files: voiceFiles.length > 0 ? voiceFiles : null, // COMMENTED OUT        memorizing: true, // This is the key difference from chat        is_screen_monitoring: true // Indicate this request is from screen monitoring      };

chat memory , @chatWindow.js

const requestData = {        message: messageText || null,        image_uris: imageUris,        memorizing: false,// 关键数据        is_screen_monitoring: isScreenMonitoring      };

2. 图片上传和处理

前端处理截图等前期预处理,然后调用接口发送图片给后端:

@app.post("/send_streaming_message")async def send_streaming_message_endpoint(request: MessageRequest):    # 构建 prompt    agent.update_chat_agent_system_prompt(request.is_screen_monitoring)    message_queue = queue.Queue()        loop = asyncio.get_event_loop()                    response = await loop.run_in_executor(                        None,  # Use default ThreadPoolExecutor                        lambda: agent.send_message(                            message=request.message,                            image_uris=request.image_uris,                            sources=request.sources,  # Pass sources to agent                            voice_files=request.voice_files,  # Pass raw voice files                            memorizing=request.memorizing,                            display_intermediate_message=display_intermediate_message                        )                    )

后端接收请求:

def send_message(self,                       message=None,                       images=None,                       image_uris=None,                       sources=None,                      voice_files=None,                      memorizing=False,                       delete_after_upload=False,                       specific_timestamps=None,                      display_intermediate_message=None,                      force_absorb_content=False,                      async_upload=True):    # 添加消息到 temporary message queue    self.temp_message_accumulator.add_message(                {                    'message': message,                    'image_uris': image_uris,                    'sources': sources,                    'voice_files': voice_files,                },                timestamp,                delete_after_upload=delete_after_upload,                async_upload=async_upload            )       # 添加消息到队列,如果还不需要去处理消息队列,则直接返回了。    ready_messages = self.temp_message_accumulator.should_absorb_content()    # 需要处理见第 3 部分    if force_absorb_content or ready_messages:                t1 = time.time()                # Pass the ready messages to absorb_content_into_memory if available                if ready_messages:                    self.temp_message_accumulator.absorb_content_into_memory(self.agent_states, ready_messages)                else:                    # Force absorb with whatever is available                    self.temp_message_accumulator.absorb_content_into_memory(self.agent_states)                t2 = time.time()                self.clear_old_screenshots()

如果使用 Gemini 模型,图片需要上传到 Google Cloud(非 Gemini 模型场景,不需要上传图片,直接图片编码),使用 UploadManager 类处理:

def add_message(self, full_message, timestamp, delete_after_upload=True, async_upload=True):    # 处理图片上传    if self.needs_upload and self.upload_manager is not None:        if 'image_uris' in full_message and full_message['image_uris']:            if async_upload:                image_file_ref_placeholders = [self.upload_manager.upload_file_async(image_uri, timestamp) for image_uri in full_message['image_uris']]            else:                image_file_ref_placeholders = [self.upload_manager.upload_file(image_uri, timestamp) for image_uri in full_message['image_uris']]                # 

图片上传到 google cloud

def upload_file(self, filename, timestamp):        """Legacy synchronous upload method"""        placeholder = self.upload_file_async(filename, timestamp)        return self.wait_for_upload(placeholder, timeout=10)  # Reduced timeout since individual uploads timeout at 5sdef _upload_single_file(self, upload_uuid, filename, timestamp, compressed_file):    # 上传单个文件到 Google Cloud    file_ref = self.google_client.files.upload(file=upload_file)

到这里就上传完了,图像的解析是由 temp_message_accumulator.should_absorb_content异步处理的。

3. 批处理 temporary memory

系统使用临时消息积累机制,当积累的内容达到一定数量(TEMPORARY_MESSAGE_LIMIT)时,再有消息进来时,就会触发记忆处理:

def send_message(self, ...):    # 上传图片 。。。    # 时间顺序保证:函数确保消息按照时间顺序处理,如果有任何消息仍在上传中,会停止处理后续消息,以保持时间顺序。    # 异步处理支持:对于需要上传的模型,函数能够处理异步上传状态,只在上传完成后才处理相关消息。    # 阈值控制:只有当准备好的消息数量达到 TEMPORARY_MESSAGE_LIMIT(默认为20)时,才触发 memory absorb(summery)过程。    ready_messages = self.temp_message_accumulator.should_absorb_content()        if force_absorb_content or ready_messages:        t1 = time.time()        # Pass the ready messages to absorb_content_into_memory if available        if ready_messages:            self.temp_message_accumulator.absorb_content_into_memory(self.agent_states, ready_messages)        else:            # Force absorb with whatever is available            self.temp_message_accumulator.absorb_content_into_memory(self.agent_states)

接下来开始处理积攒的 message

# 构建包含所有累积内容的消息(_build_memory_message)def absorb_content_into_memory(self, agent_states, ready_messages=None):    message = self._build_memory_message(ready_to_process, voice_content)    message, user_message_added = self._add_user_conversation_to_message(message)        # Add system instruction for meta memory manager    if user_message_added:        system_message = "[System Message] As the meta memory manager, analyze the provided content and the conversations between the user and the chat agent. Based on what the user is doing, determine which memory should be updated (episodic, procedural, knowledge vault, semantic, core, and resource)."    else:        system_message = "[System Message] As the meta memory manager, analyze the provided content. Based on the content, determine what memories need to be updated (episodic, procedural, knowledge vault, semantic, core, and resource)"    response, agent_type = self._send_to_meta_memory_agent(message, set(list(self.uri_to_create_time.keys())), agent_states)# 将消息发送到元记忆代理(_send_to_meta_memory_agent)def _send_to_meta_memory_agent(self, message, existing_file_uris, agent_states):    """Send the processed content to the meta memory agent."""        payloads = {        'message': message,        'existing_file_uris': existing_file_uris,        'chaining': CHAINING_FOR_MEMORY_UPDATE,        'message_queue': self.message_queue    }    response, agent_type = self.message_queue.send_message_in_queue(        self.client, agent_states.meta_memory_agent_state.id, payloads, 'meta_memory'    )    return response, agent_type

添加到消息队列

def send_message_in_queue(self, client, agent_id, kwargs, agent_type='chat'):        """        Queue a message to be sent to a specific agent type.                Args:            client: The mirix client instance            agent_id: The ID of the agent to send the message to            kwargs: Arguments to pass to client.send_message            agent_type: Type of agent to send message to                    Returns:            Tuple of (response, agent_type)        """        message_uuid = uuid.uuid4()        with self._message_queue_lock:            self.message_queue[message_uuid] = {                'kwargs': kwargs,                'started': False,                'finished': False,                'type': agent_type,            }        response = client.send_message(            agent_id=agent_id,            role='user',            **self.message_queue[message_uuid]['kwargs']        )        with self._message_queue_lock:            self.message_queue[message_uuid]['finished'] = True            del self.message_queue[message_uuid]                return response, agent_type

发送消息给对应的 agent

def send_message(        self,        message: str | list[dict],        role: str,        name: Optional[str] = None,        agent_id: Optional[str] = None,        agent_name: Optional[str] = None,        stream_steps: bool = False,        stream_tokens: bool = False,        force_response: bool = False,        existing_file_uris: Optional[List[str]] = None,        extra_messages: Optional[List[dict]] = None,        display_intermediate_message: any = None,        chaining: Optional[bool] = None,        message_queue: Optional[any] = None,        retrieved_memories: Optional[dict] = None,    ) -> MirixResponse:        """        Send a message to an agent        Args:            message (str): Message to send            role (str): Role of the message            agent_id (str): ID of the agent            name(str): Name of the sender            stream (bool): Stream the response (default: `False`)            extra_message (str): Extra message to send. It will be inserted before the last message            chaining (bool): Whether to enable chaining for this message        Returns:            response (MirixResponse): Response from the agent        """        usage = self.server.send_messages(            actor=self.server.user_manager.get_user_by_id(self.user.id),            agent_id=agent_id,            input_messages=input_messages,            force_response=force_response,            display_intermediate_message=display_intermediate_message,            chaining=chaining,            existing_file_uris=existing_file_uris,            extra_messages=extra_messages,            message_queue=message_queue        )        # format messages        messages = self.interface.to_list()        mirix_messages = []        for m in messages:            mirix_messages += m.to_mirix_message()        return MirixResponse(messages=mirix_messages, usage=usage)
def send_messages(        self,        actor: User,        agent_id: str,        input_messages: List[MessageCreate],        interface: Union[AgentInterface, None] = None,  # needed for responses        metadata: Optional[dict] = None,  # Pass through metadata to interface        put_inner_thoughts_first: bool = True,        display_intermediate_message: callable = None,        force_response: bool = False,        chaining: Optional[bool] = True,        existing_file_uris: Optional[List[str]] = None,        extra_messages: Optional[List[dict]] = None,        message_queue: Optional[any] = None,        retrieved_memories: Optional[dict] = None,    ) -> MirixUsageStatistics:        """Send a list of messages to the agent."""        # Store metadata in interface if provided        if metadata and hasattr(interface, "metadata"):            interface.metadata = metadata        # Run the agent state forward        return self._step(            actor=actor,            agent_id=agent_id,            input_messages=input_messages,            interface=interface,            force_response=force_response,            put_inner_thoughts_first=put_inner_thoughts_first,            display_intermediate_message=display_intermediate_message,            chaining=chaining,            existing_file_uris=existing_file_uris,            extra_messages=extra_messages,            message_queue=message_queue,            retrieved_memories=retrieved_memories        )def _step(        self,        actor: User,        agent_id: str,        input_messages: Union[Message, List[Message]],        interface: Union[AgentInterface, None] = None,  # needed to getting responses        put_inner_thoughts_first: bool = True,        existing_file_uris: Optional[List[str]] = None,        force_response: bool = False,        display_intermediate_message: any = None,        chaining: Optional[bool] = None,        extra_messages: Optional[List[dict]] = None,        message_queue: Optional[any] = None,        retrieved_memories: Optional[dict] = None,    ) -> MirixUsageStatistics:        """Send the input message through the agent"""        mirix_agent = None        try:            mirix_agent = self.load_agent(agent_id=agent_id, interface=interface, actor=actor)                    logger.debug(f"Starting agent step")                        usage_stats = mirix_agent.step(                input_messages=input_messages,                chaining=effective_chaining,                max_chaining_steps=self.max_chaining_steps,                stream=token_streaming,                skip_verify=True,                metadata=metadata,                force_response=force_response,                existing_file_uris=existing_file_uris,                display_intermediate_message=display_intermediate_message,                put_inner_thoughts_first=put_inner_thoughts_first,                extra_messages=extra_messages,                message_queue=message_queue,            )        finally:            logger.debug("Calling step_yield()")            if mirix_agent:                mirix_agent.interface.step_yield()        return usage_stats

到这里终于把消息发送到 mirix_agent

# 生成 topicdef step(        self,        input_messages: Union[Message, List[Message]],        chaining: bool = True,        max_chaining_steps: Optional[int] = None,        extra_messages: Optional[List[dict]] = None,        **kwargs,    ) -> MirixUsageStatistics:            # Use LLMClient to extract topics        llm_client = LLMClient.create(            llm_config=self.agent_state.llm_config,            put_inner_thoughts_first=True,        )        # 这里调用 LLM 获取 topic,只有第一步执行 step_cout == 0        if llm_client:            response = llm_client.send_llm_request(                messages=temporary_messages,                tools=functions,                stream=False,                force_tool_call='update_topic',            )# Step 1: add user message            if isinstance(messages, Message):                messages = [messages]        step_response = self.inner_step(                first_input_messge=first_input_message,                messages=next_input_message,                extra_messages=extra_message_objects,                initial_message_count=initial_message_count,                chaining=chaining,                **kwargs,            )# 调用 LLM                def inner_step(...):    # Step 0: get in-context messages and get the raw system prompt    in_context_messages = self.agent_manager.get_in_context_messages(agent_id=self.agent_state.id, actor=self.user)    raw_system = in_context_messages[0].content[0].text    # Step 1: add user message    messages = [messages]    # Step 2: send the conversation and available functions to the LLM    response = self._get_ai_reply(        message_sequence=input_message_sequence,        first_message=first_message,        stream=stream,        step_count=step_count,        put_inner_thoughts_first=put_inner_thoughts_first,        existing_file_uris=existing_file_uris,    )    # Step 3: check if LLM wanted to call a function    # (if yes) Step 4: call the function    # (if yes) Step 5: send the info on the function call and function response to LLM    all_response_messages = []    for response_choice in response.choices:        response_message = response_choice.message        tmp_response_messages, continue_chaining, function_failed = self._handle_ai_response(            first_input_messge, # give the last message to the function so that other agents can see this message through funciton_calls            response_message,            existing_file_uris=existing_file_uris,            # TODO this is kind of hacky, find a better way to handle this            # the only time we set up message creation ahead of time is when streaming is on            response_message_id=response.id if stream else None,            force_response=force_response,            retrieved_memories=retrieved_memories,            display_intermediate_message=display_intermediate_message,            return_memory_types_without_update=return_memory_types_without_update,            message_queue=message_queue,            chaining=chaining        )        all_response_messages.extend(tmp_response_messages)# 发送 LLM 请求def _get_ai_reply(...):    response = llm_client.send_llm_request(        messages=message_sequence,        tools=allowed_functions,        stream=stream,        force_tool_call=force_tool_call,        get_input_data_for_debugging=get_input_data_for_debugging,        existing_file_uris=existing_file_uris,    )# 处理 LLM 响应数据def _handle_ai_response(...):    # Step 2: check if LLM wanted to call a function    if response_message.function_call or (response_message.tool_calls is not None and len(response_message.tool_calls) > 0):        if response_message.function_call:            raise DeprecationWarning(response_message)            assert response_message.tool_calls is not None and len(response_message.tool_calls) > 0            # Generate UUIDs for tool calls if needed        if override_tool_call_id or response_message.function_call:            self.logger.warning("Overriding the tool call can result in inconsistent tool call IDs during streaming")            for tool_call in response_message.tool_calls:                tool_call.id = get_tool_call_id()  # needs to be a string for JSON        else:            for tool_call in response_message.tool_calls:                assert tool_call.id is not None  # should be defined

到这里已经完成了整个发送流程处理,接下来是一个具体的发送实现。

4. LLM 请求实现

def send_llm_request(    self,    messages: List[Message],    tools: Optional[List[dict]] = None,    stream: bool = False,    force_tool_call: Optional[str] = None,    get_input_data_for_debugging: bool = False,    existing_file_uris: Optional[List[str]] = None,) -> ChatCompletionResponse:    """    Issues a request to the downstream model endpoint and parses response.    """    request_data = self.build_request_data(messages, self.llm_config, tools, force_tool_call, existing_file_uris=existing_file_uris)    try:        response_data = self.request(request_data)    # 处理响应的数据格式,这个 data 就是 LLM 返回的数据。    chat_completion_data = self.convert_response_to_chat_completion(response_data, messages)        return chat_completion_data
def request(self, request_data: dict) -> dict:    """    Performs underlying request to llm and returns raw response.    """    # print("[google_ai request]", json.dumps(request_data, indent=2))    # Check for database-stored API key first, fall back to model_settings    override_key = ProviderManager().get_gemini_override_key()    api_key = str(override_key) if override_key else str(model_settings.gemini_api_key)    url, headers = get_gemini_endpoint_and_headers(        base_url=str(self.llm_config.model_endpoint),        model=self.llm_config.model,        api_key=api_key,        key_in_header=True,        generate_content=True,    )    return make_post_request(url, headers, request_data)

这里请求 gemini 接口并没有使用 sdk,而是使用了标准的 http API 调用:

def make_post_request(url: str, headers: dict[str, str], data: dict[str, Any]) -> dict[str, Any]:    try:        # 这里的 调用 requests 发送请求出去        response = requests.post(url, headers=headers, json=data)        return response_data

到这里,已经通过 LLM 把消息请求发送给 LLM 了。接下来就是等待响应并处理了。

LLM 请求响应处理

LLM 处理内容,当内容发送到元记忆代理后,LLM会处理这些内容:

    系统会添加指令,要求 LLM 分析内容并确定需要更新的记忆类型LLM 会分析上传的图像、文本和语音内容,提取重要信息根据分析结果,系统会将相关信息存储到适当的记忆组件中(事件记忆、过程记忆、语义记忆等)

接收数据

inner_step._get_ai_reply返回 response后,

def _handle_ai_response():    # Step 2: check if LLM wanted to call a function    if response_message.function_call or (response_message.tool_calls is not None and len(response_message.tool_calls) > 0):           # Generate UUIDs for tool calls if needed        if override_tool_call_id or response_message.function_call:            self.logger.warning("Overriding the tool call can result in inconsistent tool call IDs during streaming")            for tool_call in response_message.tool_calls:                tool_call.id = get_tool_call_id()  # needs to be a string for JSON        else:            for tool_call in response_message.tool_calls:                assert tool_call.id is not None  # should be defined
" 需要更新记忆的场景if function_name == 'trigger_memory_update':    function_args["user_message"] = {'message': convert_message_to_input_message(input_message),                                      'existing_file_uris': existing_file_uris,                                     'retrieved_memories': retrieved_memories,                                     'chaining': CHAINING_FOR_MEMORY_UPDATE}    if message_queue is not None:        function_args["user_message"]['message_queue'] = message_queueelif function_name == 'trigger_memory_update_with_instruction':    function_args["user_message"] = {'existing_file_uris': existing_file_uris,                                     'retrieved_memories': retrieved_memories}" 其他各种各样的异常处理之后,终于可以调用 tool 了function_response = self.execute_tool_and_persist_state(function_name, function_args,                                                                             target_mirix_tool,                                                                             display_intermediate_message=display_intermediate_message)    

系统调用 execute_tool_and_persist_state 执行工具:

def execute_tool_and_persist_state(self, function_name: str, function_args: dict, target_mirix_tool: Tool,                                    display_intermediate_message: Optional[Callable] = None) -> str:    """    Execute tool modifications and persist the state of the agent.    Note: only some agent state modifications will be persisted, such as data in the AgentState ORM and block data    """    # TODO: add agent manager here    orig_memory_str = self.agent_state.memory.compile()    # TODO: need to have an AgentState object that actually has full access to the block data    # this is because the sandbox tools need to be able to access block.value to edit this data    try:        if function_name in ['episodic_memory_insert', 'episodic_memory_replace', 'list_memory_within_timerange']:            key = "items" if function_name == 'episodic_memory_insert' else 'new_items'            if key in function_args:                # Need to change the timezone into UTC timezone                for item in function_args[key]:                    if 'occurred_at' in item:                        item['occurred_at'] = convert_timezone_to_utc(item['occurred_at'], self.user_manager.get_user_by_id(self.user.id).timezone)        " 正常 chat 会走这里        if function_name in ['search_in_memory', 'list_memory_within_timerange']:            function_args['timezone_str'] = self.user_manager.get_user_by_id(self.user.id).timezone        if target_mirix_tool.tool_type == ToolType.MIRIX_CORE:            # base tools are allowed to access the `Agent` object and run on the database            # 找到 tool            callable_func = get_function_from_module(MIRIX_CORE_TOOL_MODULE_NAME, function_name)            function_args["self"] = self  # need to attach self to arg since it's dynamically linked            if function_name in ['send_message', 'send_intermediate_message']:                agent_state_copy = self.agent_state.__deepcopy__()                function_args["agent_state"] = agent_state_copy  # need to attach self to arg since it's dynamically linked            ' 调用 tool            function_response = callable_func(**function_args)            if function_name in ['send_message', 'send_intermediate_message']:                self.update_topic_if_changed(agent_state_copy.topic)            if function_name == 'send_intermediate_message':                # send intermediate message to the user                if display_intermediate_message:                    display_intermediate_message("response", function_args['message'])                elif target_mirix_tool.tool_type == ToolType.MIRIX_MEMORY_CORE:            callable_func = get_function_from_module(MIRIX_MEMORY_TOOL_MODULE_NAME, function_name)            if function_name in ['core_memory_append', 'core_memory_rewrite']:                agent_state_copy = self.agent_state.__deepcopy__()                function_args["agent_state"] = agent_state_copy  # need to attach self to arg since it's dynamically linked            if function_name in ['check_episodic_memory', 'check_semantic_memory']:                function_args['timezone_str'] = self.user_manager.get_user_by_id(self.user.id).timezone            function_args["self"] = self            function_response = callable_func(**function_args)            if function_name in ['core_memory_append', 'core_memory_rewrite']:                self.update_memory_if_changed(agent_state_copy.memory)    return function_response

工具调用场景

    第一步调用 update_topic 工具,LLM 返回
'functionCall': {              'name': 'update_topic',               'args': {                  'inner_thoughts': 'The user wants to learn how to make coffee and is asking for a tutorial.',                   'topic': 'coffee making;tutorial'              }          }

2. llm_request 调用,llm 返回 search_in_memory

'functionCall': {              'name': 'search_in_memory',               'args': {                'memory_type': 'resource',                 'query': 'coffee making tutorial',                 'inner_thoughts': 'Searching for resources related to coffee making tutorials to provide a comprehensive guide.',                 'search_field': 'content',                 'search_method': 'bm25'              }            }

执行 serarch_in_memory tool @base.py:

def search_in_memory(self: "Agent", memory_type: str, query: str, search_field: str, search_method: str, timezone_str: str) -> Optional[str]:    """    Choose which memory to search. All memory types support multiple search methods with different performance characteristics. Most of the time, you should use search over 'details' for episodic memory and semantic memory, 'content' for resource memory (but for resource memory, `embedding` is not supported for content field so you have to use other search methods), 'description' for procedural memory. This is because these fields have the richest information and is more likely to contain the keywords/query. You can always start from a thorough search over the whole memory by setting memory_type as 'all' and search_field as 'null', and then narrow down to specific fields and specific memories.        Args:        memory_type: The type of memory to search in. It should be chosen from the following: "episodic", "resource", "procedural", "knowledge_vault", "semantic", "all". Here "all" means searching in all the memories.         query: The keywords/query used to search in the memory.                search_field: The field to search in the memory. It should be chosen from the attributes of the corresponding memory. For "episodic" memory, it can be 'summary', 'details'; for "resource" memory, it can be 'summary', 'content'; for "procedural" memory, it can be 'summary', 'steps'; for "knowledge_vault", it can be 'secret_value', 'caption'; for semantic memory, it can be 'name', 'summary', 'details'. For "all", it should also be "null" as the system will search all memories with default fields.         search_method: The method to search in the memory. Choose from:            - 'bm25': BM25 ranking-based full-text search (fast and effective for keyword-based searches)            - 'embedding': Vector similarity search using embeddings (most powerful, good for conceptual matches)        Returns:        str: Query result string    """    " 我们要找的资源    if memory_type == 'resource' or memory_type == 'all':        resource_memories = self.resource_memory_manager.list_resources(agent_state=self.agent_state,            query=query,            search_field=search_field if search_field != 'null' else ('summary' if search_method == 'embedding' else 'content'),            search_method=search_method,            limit=10,            timezone_str=timezone_str,        )        formatted_results_resource = [{'memory_type': 'resource', 'id': x.id, 'resource_type': x.resource_type, 'summary': x.summary, 'content': x.content} for x in resource_memories]        if memory_type == 'resource':            return formatted_results_resource, len(formatted_results_resource)        if memory_type == 'procedural' or memory_type == 'all':        if memory_type == 'knowledge_vault' or memory_type == 'all':            if memory_type == 'semantic' or memory_type == 'all':                return formatted_results_from_episodic + formatted_results_resource + formatted_results_procedural + formatted_results_knowledge_vault + formatted_results_semantic, len(formatted_results_from_episodic) + len(formatted_results_resource) + len(formatted_results_procedural) + len(formatted_results_knowledge_vault) + len(formatted_results_semantic)

查找函数 @mirix/services/resource_memory_manager.py

    @update_timezone    @enforce_types    def list_resources(self,                       agent_state: AgentState,                       query: str = '',                       embedded_text: Optional[List[float]] = None,                       search_field: str = 'content',                       search_method: str = 'string_match',                       limit: Optional[int] = 50,                       timezone_str: str = None) -> List[PydanticResourceMemoryItem]:        base_query = select(                ResourceMemoryItem.id.label("id"),                ResourceMemoryItem.title.label("title"),                ResourceMemoryItem.summary.label("summary"),                ResourceMemoryItem.content.label("content"),                ResourceMemoryItem.summary_embedding.label("summary_embedding"),                ResourceMemoryItem.embedding_config.label("embedding_config"),                ResourceMemoryItem.created_at.label("created_at"),                ResourceMemoryItem.resource_type.label("resource_type"),                ResourceMemoryItem.organization_id.label("organization_id"),                ResourceMemoryItem.metadata_.label("metadata_"),                ResourceMemoryItem.last_modify.label("last_modify"),                ResourceMemoryItem.tree_path.label("tree_path"),            )                elif search_method == 'bm25':

BM25(Best Matching 25)是一种在信息检索领域广泛使用的排名函数,用于评估文档与用户查询的相关性。它基于概率检索模型,是对经典的TF-IDF(词频-逆文档频率)方法的改进。

BM25算法基于以下关键思想:

    词频(Term Frequency, TF):一个词在文档中出现的次数越多,该文档与包含该词的查询越相关。逆文档频率(Inverse Document Frequency, IDF):一个词在越多的文档中出现,它就越不重要。文档长度归一化:较长的文档更容易包含查询词,因此需要对文档长度进行归一化处理。
    根据搜索结果调用 llm 查询,返回 send_message表示发送 messag
内容给用户
'functionCall': {                 'name': 'send_message',                  'args': {                     'topic': 'coffee making tutorial',                      'inner_thoughts': "I need to create a coffee making tutorial since I don't have one in my resource memory.",                      'message': '好的,我将基于我现有的知识,为您编写一个咖啡制作教程:XXXXX。\n\n希望这个教程对您有所帮助!'                 }             }
" @mirix/functions/function_sets/base.pydef send_message(self: "Agent",  agent_state: "AgentState", message: str, topic: str = None) -> Optional[str]:    """    Sends a message to the human user. Meanwhile, whenever this function is called, the agent needs to include the `topic` of the current focus. It can be the same as before, it can also be updated when the agent is focusing on something different.    Args:        message (str): Message contents. All unicode (including emojis) are supported.        topic (str): The focus of the agent right now. It is used to track the most recent topic in the conversation and will be used to retrieve the relevant memories from each memory component.     Returns:        Optional[str]: None is always returned as this function does not produce a response.    """    # FIXME passing of msg_obj here is a hack, unclear if guaranteed to be the correct reference    self.interface.assistant_message(message)  # , msg_obj=self._messages[-1])    agent_state.topic = topic    return None

更新 memory 场景

系统使用线程池并行调用各个 memory agent:

" 触发 memory 更新走这里def trigger_memory_update(self: "Agent", user_message: object, memory_types: List[str]) -> Optional[str]:    """    Choose which memory to update. This function will trigger another memory agent which is specifically in charge of handling the corresponding memory to update its memory. Trigger all necessary memory updates at once. Put the explanations in the `internal_monologue` field.    Args:        memory_types (List[str]): The types of memory to update. It should be chosen from the following: "core", "episodic", "resource", "procedural", "knowledge_vault", "semantic". For instance, ['episodic', 'resource'].            Returns:        Optional[str]: None is always returned as this function does not produce a response.    """    from mirix import create_client    client = create_client()    agents = client.list_agents()    if 'message_queue' in user_message:                from concurrent.futures import ThreadPoolExecutor, as_completed        from tqdm import tqdm        import time        # Use multi-processing approach similar to _send_to_memory_agents_separately        message_queue = user_message['message_queue']                # Map memory types to agent types        memory_type_to_agent_type = {            "core": "core_memory_agent",            "episodic": "episodic_memory_agent",             "resource": "resource_memory_agent",            "procedural": "procedural_memory_agent",            "knowledge_vault": "knowledge_vault_agent",            "semantic": "semantic_memory_agent"        }                # Filter to only supported memory types        valid_agent_types = []        for memory_type in memory_types:            if memory_type in memory_type_to_agent_type:                valid_agent_types.append(memory_type_to_agent_type[memory_type])            else:                raise ValueError(f"Memory type '{memory_type}' is not supported. Please choose from 'core', 'episodic', 'resource', 'procedural', 'knowledge_vault', 'semantic'.")                if user_message['message'][-1]['text'].startswith('[System Message]'):            user_message['message'] = user_message['message'][:-1]            user_message['message'].append([                {'type': 'text', 'text': "[System Message] Interpret the provided content, according to what the user is doing, extract the important information matching your memory type and save it into the memory."}            ])        # Prepare payloads for message queue        payloads = {            'message': user_message['message'],            'existing_file_uris': user_message.get('existing_file_uris', set()),            'chaining': user_message.get('chaining', False),            'message_queue': message_queue,            'retrieved_memories': user_message.get('retrieved_memories', None)        }        responses = []        overall_start = time.time()                        responses = []        overall_start = time.time()                with ThreadPoolExecutor(max_workers=6) as pool:            futures = [                pool.submit(self.message_queue.send_message_in_queue,                            self.client, self.message_queue._get_agent_id_for_type(agent_states, agent_type), payloads, agent_type)                 for agent_type in memory_agent_types            ]                        for future in tqdm(as_completed(futures), total=len(futures)):                response, agent_type = future.result()                responses.append(response)                overall_end = time.time()        self.logger.info(f'Time taken to trigger memory updates: {overall_end - overall_start:.2f} seconds.')                response_message = f'[System Message] Memory updates have been triggered for the following memory types: {memory_types}. '        response_message += f'Total time taken: {overall_end - overall_start:.2f} seconds.'        return response_message

每个 memory agent 执行相应的存储操作,例如 Episodic memory:

def episodic_memory_insert(self: "Agent", items: List[EpisodicEventForLLM]):    """    The tool to update episodic memory. The item being inserted into the episodic memory is an event either happened on the user or the assistant.    Args:        items (array): List of episodic memory items to insert.    Returns:        Optional[str]: None is always returned as this function does not produce a response.    """    for item in items:        self.episodic_memory_manager.insert_event(            agent_state=self.agent_state,            timestamp=item['occurred_at'],            event_type=item['event_type'],            actor=item['actor'],            summary=item['summary'],            details=item['details'],            organization_id=self.user.organization_id,            tree_path=item.get('tree_path')        )    response = "Events inserted! Now you need to check if there are repeated events shown in the system prompt."    return response

最终,各个 Memory Manager 将数据存储到数据库:

class EpisodicMemoryManager:    def insert_event(self,                      agent_state: AgentState,                     event_type: str,                     timestamp: datetime,                      actor: str,                      details: str,                     summary: str,                     organization_id: str,                     tree_path: Optional[List[str]] = None) -> PydanticEpisodicEvent:    def create_episodic_memory(self, episodic_memory: PydanticEpisodicEvent) -> PydanticEpisodicEvent:        episodic_memory_item = EpisodicEvent(**episodic_memory_dict)        episodic_memory_item.create(session)

最后,系统调用 finish_memory_update 完成整个 memory 更新过程:

def finish_memory_update(self: "Agent"):    """    Finish the memory update process. This function should be called after all memory updates have been completed.        Returns:        Optional[str]: None is always returned as this function does not produce a response.    """    return None

Google gemini demo

SDK 图片理解

SDK 请求示例代码

from google import genaiclient = genai.Client("api-key")# 下面的 start history 图片my_file = client.files.upload(file="sample.jpg")response = client.models.generate_content(    model="gemini-2.5-flash",    contents=[my_file, "Caption this image in Chinese."],)print(response.text)

最终输出图片理解结果如下,还是挺准的:

这张图表名为“星标历史”(Star History),展示了从大约2023年4月到2025年7月期间,四个GitHub项目的星标(Stars)增长趋势。

主要信息如下:

    microsoft/autogen (红色曲线) :该项目自2023年10月起呈现爆发式增长,星标数遥遥领先,到2025年7月已突破4万,是图中增长最快、星标最多的项目。camel-ai/camel (黄色曲线) :该项目从图表开始时就存在,并保持了较为平稳的持续增长态势,到2025年7月星标数约在1.3万左右。camel-ai/owl (蓝色曲线) :该项目在2025年4月后出现显著的跳跃式增长,迅速超越了camel-ai/camel,到2025年7月星标数接近1.8万。microsoft/magentic-ui (粉色曲线) :这是图表中最新入局的项目,从2025年6月开始快速上涨,但截至图表结束,其星标数仍在1万以下,约在7千左右。

总的来说,这张图反映了几个GitHub开源项目在不同时间段内的受欢迎程度和社区关注度的变化,其中microsoft/autogen展现出惊人的增长速度和用户基础。

HTTP 调用

Gemini HTTP API 请求示例:

curl "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key=GEMINI_API_KEY" \  -H 'Content-Type: application/json' \  -X POST \  -d '{    "contents": [      {        "parts": [          {            "text": "Explain how AI works in a few words"          }        ]      }    ]  }'

Gemini 函数调用

curl --location -g --request POST 'https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key=' \--header 'Content-Type: application/json' \--data-raw '{    "contents": [      {        "role": "user",        "parts": [          {            "text": "What'''s the temperature in London?"          }        ]      }    ],    "tools": [      {        "functionDeclarations": [          {            "name": "get_current_temperature",            "description": "Gets the current temperature for a given location.",            "parameters": {              "type": "object",              "properties": {                "location": {                  "type": "string",                  "description": "The city name, e.g. San Francisco"                }              },              "required": ["location"]            }          }        ]      }    ]  }'

总结

MIRIX 系统将图片转化为记忆的完整流程包括:

    图片捕获:前端捕获屏幕截图或应用窗口相似度过滤:通过图像相似度比较过滤重复内容上传处理:将图片上传到云端(如 Google Cloud),除了 
Gemini 之外,其他模型直接在调用 LLM 时传图片编码,不需要上传图片内容积累:在达到一定数量前暂存内容内容转换:将图片和文本转换为结构化消息调用模型:首先调用模型生成 topic,然后再调用模型 step记忆分发:将内容分发到不同类型的记忆代理存储处理:各代理根据内容类型进行存储清理维护:清理临时文件和状态这个流程确保了系统能够有效地将视觉信息转化为可检索和使用的记忆,同时通过批处理机制提高了效率。

参考链接

ai.google.dev/gemini-api/…

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Mirix Electron LLM Gemini 图像记忆 多模态AI
相关文章