repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import queue |
| #2 | from typing import Any, Union |
| #3 | |
| #4 | from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler |
| #5 | from langchain.schema import LLMResult |
| #6 | |
| #7 | STOP_ITEM = "[END]" |
| #8 | """ |
| #9 | This is a special item that is used to signal the end of the stream. |
| #10 | """ |
| #11 | |
| #12 | |
| #13 | class StreamingStdOutCallbackHandlerYield(StreamingStdOutCallbackHandler): |
| #14 | """ |
| #15 | This is a callback handler that yields the tokens as they are generated. |
| #16 | For a usage example, see the :func:`generate` function below. |
| #17 | """ |
| #18 | |
| #19 | q: queue.Queue |
| #20 | """ |
| #21 | The queue to write the tokens to as they are generated. |
| #22 | """ |
| #23 | |
| #24 | def __init__(self, q: queue.Queue) -> None: |
| #25 | """ |
| #26 | Initialize the callback handler. |
| #27 | q: The queue to write the tokens to as they are generated. |
| #28 | """ |
| #29 | super().__init__() |
| #30 | self.q = q |
| #31 | |
| #32 | def on_llm_start(self, serialized: dict[str, Any], prompts: list[str], **kwargs: Any) -> None: |
| #33 | """Run when LLM starts running.""" |
| #34 | with self.q.mutex: |
| #35 | self.q.queue.clear() |
| #36 | |
| #37 | def on_llm_new_token(self, token: str, **kwargs: Any) -> None: |
| #38 | """Run on new LLM token. Only available when streaming is enabled.""" |
| #39 | self.q.put(token) |
| #40 | |
| #41 | def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: |
| #42 | """Run when LLM ends running.""" |
| #43 | self.q.put(STOP_ITEM) |
| #44 | |
| #45 | def on_llm_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None: |
| #46 | """Run when LLM errors.""" |
| #47 | self.q.put("%s: %s" % (type(error).__name__, str(error))) |
| #48 | self.q.put(STOP_ITEM) |
| #49 | |
| #50 | |
| #51 | def generate(rq: queue.Queue): |
| #52 | """ |
| #53 | This is a generator that yields the items in the queue until it reaches the stop item. |
| #54 | |
| #55 | Usage example: |
| #56 | ``` |
| #57 | def askQuestion(callback_fn: StreamingStdOutCallbackHandlerYield): |
| #58 | llm = OpenAI(streaming=True, callbacks=[callback_fn]) |
| #59 | return llm.invoke(prompt="Write a poem about a tree.") |
| #60 | |
| #61 | @app.route("/", methods=["GET"]) |
| #62 | def generate_output(): |
| #63 | q = Queue() |
| #64 | callback_fn = StreamingStdOutCallbackHandlerYield(q) |
| #65 | threading.Thread(target=askQuestion, args=(callback_fn,)).start() |
| #66 | return Response(generate(q), mimetype="text/event-stream") |
| #67 | ``` |
| #68 | """ |
| #69 | while True: |
| #70 | result: str = rq.get() |
| #71 | if result == STOP_ITEM or result is None: |
| #72 | break |
| #73 | yield result |
| #74 |