Streaming local LLM with FastAPI, Llama.cpp and Langchain
题意:
使用FastAPI、Llama.cpp和Langchain流式传输本地大型语言模型
问题背景:
I have setup FastAPI with Llama.cpp and Langchain. Now I want to enable streaming in the FastAPI responses. Streaming works with Llama.cpp in my terminal, but I wasn't able to implement it with a FastAPI response.
我已经使用Llama.cpp和Langchain设置了FastAPI。现在我想在FastAPI响应中启用流式传输。在我的终端中,流式传输与Llama.cpp一起工作正常,但我无法将其与FastAPI响应一起实现。
Most tutorials focused on enabling streaming with an OpenAI model, but I am using a local LLM (quantized Mistral) with llama.cpp. I think I have to modify the Callbackhandler, but no tutorial worked. Here is my code:
大多数教程都集中在如何使用OpenAI模型启用流式传输,但我正在使用带有llama.cpp的本地大型语言模型(量化的Mistral)。我认为我需要修改Callbackhandler,但我没有找到任何可行的教程。以下是我的代码:
from fastapi import FastAPI, Request, Response from langchain_community.llms import LlamaCpp from langchain.callbacks.manager import CallbackManager from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler import copy from langchain.chains import LLMChain from langchain.prompts import PromptTemplate model_path = "../modelle/mixtral-8x7b-instruct-v0.1.Q5_K_M.gguf" prompt= """ [INST] Im folgenden bekommst du eine Aufgabe. Erledige diese anhand des User Inputs. ### Hier die Aufgabe: ### {typescript_string} ### Hier der User Input: ### {input} Antwort: [/INST] """ def model_response_prompt(): return PromptTemplate(template=prompt, input_variables=['input', 'typescript_string']) def build_llm(model_path, callback=None): callback_manager = CallbackManager([StreamingStdOutCallbackHandler()]) #callback_manager = CallbackManager(callback) n_gpu_layers = 1 # Metal set to 1 is enough. # ausprobiert mit mehreren n_batch = 512#1024 # Should be between 1 and n_ctx, consider the amount of RAM of your Apple Silicon Chip. llm = LlamaCpp( max_tokens =1000, n_threads = 6, model_path=model_path, temperature= 0.8, f16_kv=True, n_ctx=28000, n_gpu_layers=n_gpu_layers, n_batch=n_batch, callback_manager=callback_manager, verbose=True, top_p=0.75, top_k=40, repeat_penalty = 1.1, streaming=True, model_kwargs={ 'mirostat': 2, }, ) return llm # caching LLM @lru_cache(maxsize=100) def get_cached_llm(): chat = build_llm(model_path) return chat chat = get_cached_llm() app = FastAPI( , description="A simple API that use Mistral or Mixtral", version="1.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def bullet_point_model(): llm = build_llm(model_path=model_path) llm_chain = LLMChain( llm=llm, prompt=model_response_prompt(), verbose=True, ) return llm_chain @app.get('/model_response') async def model(question : str, prompt: str): model = bullet_point_model() res = model({"typescript_string": prompt, "input": question}) result = copy.deepcopy(res) return result
In a example notebook, I am calling FastAPI like this:
在一个示例笔记本中,我像这样调用FastAPI:
import subprocess import urllib.parse import shlex query = input("Insert your bullet points here: ") task = input("Insert the task here: ") #Safe Encode url string encodedquery = urllib.parse.quote(query) encodedtask = urllib.parse.quote(task) #Join the curl command textx command = f"curl -X 'GET' 'http://127.0.0.1:8000/model_response?question={encodedquery}&prompt={encodedtask}' -H 'accept: application/json'" print(command) args = shlex.split(command) process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() print(stdout)
So with this code, getting responses from the API works. But I only see streaming in my terminal (I think this is because of the StreamingStdOutCallbackHandler. After the streaming in the terminal is complete, I am getting my FastAPI response.
所以,使用这段代码,从API获取响应是可行的。但我只能在终端中看到流式传输(我认为这是因为使用了StreamingStdOutCallbackHandler)。在终端中的流式传输完成后,我才能收到FastAPI的响应。
What do I have to change now that I can stream token by token with FastAPI and a local llama.cpp model?
我现在可以使用FastAPI和本地的llama.cpp模型逐令牌(token-by-token)地进行流式传输,那么我还需要改变什么?
问题解决:
I was doing the same and hit similar issue that FastAPI was not streaming the response even I am using the StreamingResponse API and eventually I got the following code work. There are three important part:
我之前也做了同样的事情,并遇到了类似的问题,即即使我使用了StreamingResponse API,FastAPI也没有流式传输响应。但最终我得到了以下可以工作的代码。这里有三个重要的部分:
-
Make sure using StreamingResponse to wrap an Iterator.
确保使用StreamingResponse来包装一个迭代器
-
Make sure the Iterator sends newline character \n in each streaming response.
确保迭代器在每个流式响应中发送换行符 \n。
-
Make sure using streaming APIs to connect to your LLMs. For example, _client.chat function in my example is using httpx to connect to REST APIs for LLMs. If you use requests package, it won't work as it doesn't support streaming.
确保使用流式API来连接您的大型语言模型(LLMs)。例如,在我的示例中,_client.chat 函数使用 httpx 来连接到LLMs的REST API。如果您使用 requests 包,那么它将无法工作,因为 requests 不支持流式传输。
async def chat(self, request: Request): """ Generate a chat response using the requested model. """ # Passing request body JSON to parameters of function _chat # Request body follows ollama API's chat request format for now. params = await request.json() self.logger.debug("Request data: %s", params) chat_response = self._client.chat(**params) # Always return as streaming if isinstance(chat_response, Iterator): def generate_response(): for response in chat_response: yield json.dumps(response) + "\n" return StreamingResponse(generate_response(), media_type="application/x-ndjson") elif chat_response is not None: return json.dumps(chat_response)
-
-