Dit is een oude revisie van het document!
Inhoud
LangGraph Basis principes
🗂️ Terug naar start
🦜♻️🗂️ LangGraph start
🦜♻️💫 LangGraph Tests
LangGraph Instaleren
pip install --quiet -U langgraph
1️⃣ Kernbegrippen (bronnen):
Video: Aan het begin van deze video worden de Kernbegrippen van LangGraph keurig op een rijtje gezet en uitgebreid besproken
🔸 Verder wordt in deze video ook Google gebruikt om te zoeken op het web en een Python web-scraper (BeautifulSoup) om de content van een webpagina binnen te halen ✨
💡Hieronder de onderdelen zoals deze in het begin van de video worden beschreven
Simple Graph (als voorbeeld van de code hieronder)
Vervolg video: aanpassingen op hier boven genoemde video✨
State
De State is een dictionary met informatie over het proces
De State word gelezen en gevuld door de Graph
Doel van de State is om alle activiteiten van de Agent bij te houden (te registreren)
Definieer eerst de State (toestand) van de Graph (grafiek).
Het toestandsschema dient als invoerschema voor alle knooppunten en randen in de grafiek.
Laten we de TypedDict-klasse van de typmodule van Python gebruiken als ons schema, dat typehints biedt voor de sleutels.
from typing_extensions import TypedDict class State(TypedDict): graph_state: str
Node
In LangGraph Nodes kunnen agenten zijn maar ook Tools
Nodes zijn gewoon python-functies.
🔸 Het eerste positionele argument is de state, zoals hierboven gedefinieerd.
🔸 Omdat de state een TypedDict is met schema zoals hierboven gedefinieerd, kan elke node toegang krijgen tot de sleutel, graph_state, met state['graph_state'].
🔸 Elke node retourneert een nieuwe waarde van de state-sleutel graph_state.
🔸 Standaard overschrijft de nieuwe waarde die door elke node wordt geretourneerd de vorige state-waarde.
def node_1(state): print("---Node 1---") return {"graph_state": state['graph_state'] +" I am"} def node_2(state): print("---Node 2---") return {"graph_state": state['graph_state'] +" happy!"} def node_3(state): print("---Node 3---") return {"graph_state": state['graph_state'] +" sad!"}
Edge
Edges verbinden de nodes.
Normale Edges worden gebruikt als u bijvoorbeeld altijd van node_1 naar node_2 wilt gaan.
Voorwaardelijke Edges worden gebruikt als u optioneel tussen nodes wilt routeren.
Voorwaardelijke Edges worden geïmplementeerd als functies die het volgende node retourneren dat u wilt bezoeken op basis van een bepaalde logica.
import random from typing import Literal def decide_mood(state) -> Literal["node_2", "node_3"]: # Often, we will use state to decide on the next node to visit user_input = state['graph_state'] # Here, let's just do a 50 / 50 split between nodes 2, 3 if random.random() < 0.5: # 50% of the time, we return Node 2 return "node_2" # 50% of the time, we return Node 3 return "node_3"
Graphs
Een Graph bestaat uit Nodes en Edges
Een Graph leest en schrijft van en naar een State
De eenvoudigste graph (grafiek) - Coab
De StateGraph-klasse is de grafiekklasse die we kunnen gebruiken.
Eerst initialiseren we een StateGraph met de State-klasse die we hierboven hebben gedefinieerd.
Vervolgens voegen we onze knooppunten en randen toe.
🔸 We gebruiken de START-knoop (node), een speciaal knooppunt dat gebruikersinvoer naar de grafiek stuurt, om aan te geven waar onze grafiek moet beginnen.
🔸 De END-knoop (node) is een speciaal knooppunt (node) dat een eindknooppunt vertegenwoordigt.
Tot slot compileren we onze graph (grafiek) om een paar basiscontroles op de grafiekstructuur uit te voeren.
from typing import TypedDict from langgraph.graph import StateGraph, START, END # Build graph builder = StateGraph(State) builder.add_node("node_1", node_1) builder.add_node("node_2", node_2) builder.add_node("node_3", node_3) # Logic builder.add_edge(START, "node_1") builder.add_conditional_edges("node_1", decide_mood) builder.add_edge("node_2", END) builder.add_edge("node_3", END) # Add graph = builder.compile()
💡 Variant met Mens in de Loop Vraag aan gebruiker of we verder moeten gaan of niet✨
Graph Invocation
De gecompileerde grafiek implementeert het uitvoerbare protocol.
Dit biedt een standaardmanier om LangChain-componenten uit te voeren.
invoke is een van de standaardmethoden in deze interface.
De invoer is een woordenboek {“graph_state”: “Hi, this is lance.”}, dat de beginwaarde voor onze grafiekstatusdict instelt.
Wanneer invoke wordt aangeroepen, start de grafiek met de uitvoering vanaf het START-knooppunt.
Het gaat door de gedefinieerde knooppunten (node_1, node_2, node_3) in volgorde.
De voorwaardelijke rand gaat van knooppunt 1 naar knooppunt 2 of 3 met behulp van een 50/50-beslissingsregel.
Elke knooppuntfunctie ontvangt de huidige status en retourneert een nieuwe waarde, die de grafiekstatus overschrijft.
De uitvoering gaat door totdat het END-knooppunt is bereikt.
graph.invoke({"graph_state" : "Hi, this is Lance."})
Output:
---Node 1--- ---Node 3--- {'graph_state': 'Hi, this is Lance. I am sad!'}
invoke voert de hele grafiek synchroon uit.
Dit wacht tot elke stap is voltooid voordat het naar de volgende gaat.
Het retourneert de uiteindelijke status van de grafiek nadat alle knooppunten zijn uitgevoerd.
In dit geval retourneert het de status nadat node_3 is voltooid:
{'graph_state': 'Hi, this is Lance. I am sad!'}
Schematisch Overzicht Basis
2️⃣ Mens in de Loop
binnen LangGraph kan een interrupt worden toegevoegd wanneer een menselijke tussenkomst vereist is. Een interrupt kan ook worden toegevoegd voor bepaalde tools waarbij wordt aangenomen dat een tool niet kan worden uitgevoerd voordat een of meer mensen goedkeuring hebben gegeven.
Daarom kan voor bepaalde tools een menselijke interrupt worden toegevoegd, of binnen de grafiekomgeving van LangGraph kan een knooppunt waar menselijke tussenkomst vereist is als zodanig worden gedefinieerd. De HITL kan voorafgaand aan of na een bepaald knooppunt worden geplaatst, zodat de menselijke betrokkenheid kan zijn om goedkeuring te verlenen of een transactie na de interactie te controleren.
user_approval = input(“Do you want to go to Step 3? (yes/no): “)
from typing import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver class State(TypedDict): input: str def step_1(state): print("---Step 1---") pass def step_2(state): print("---Step 2---") pass def step_3(state): print("---Step 3---") pass builder = StateGraph(State) builder.add_node("step_1", step_1) builder.add_node("step_2", step_2) builder.add_node("step_3", step_3) builder.add_edge(START, "step_1") builder.add_edge("step_1", "step_2") builder.add_edge("step_2", "step_3") builder.add_edge("step_3", END) # Set up memory memory = MemorySaver() # Add graph = builder.compile(checkpointer=memory, interrupt_before=["step_3"])
3️⃣ Human-in-the-Loop Met GUI ✨
Uitgebreidere versie Human in The Loop - Agent met GUI ✨
Met Streamlit GUI
Het volgende basis principe is in zijn totaliteit terug te vinden op deze pagina. Ik heb het een beetje uit elkaar getrokken en voor mezelf wat anders ingedeeld en beschreven maar de basis is het zelfde.
🔸GUI met Streamlit
🔸Mens in de Loop
Incl. Tavily Zoekmachine
import os import uuid import json from typing import Optional from langchain_openai import ChatOpenAI from langgraph.prebuilt import ToolExecutor from langgraph.checkpoint.memory import MemorySaver from IPython.display import Image, display from typing import Annotated from typing_extensions import TypedDict from langgraph.graph.message import add_messages from langgraph.prebuilt import ToolExecutor from langchain_core.messages import ToolMessage from langgraph.prebuilt import ToolInvocation from langgraph.graph import END, StateGraph from langchain_core.messages import AIMessage from langchain_core.messages import HumanMessage from langgraph.checkpoint.sqlite import SqliteSaver from langchain_core.tools import tool from langchain_core.messages import AIMessage import streamlit as st from langchain_community.tools.tavily_search import TavilySearchResults ## Environment Variable OPENAI_API_KEY="sk-proj-xxxxx" # https://platform.openai.com/account/api-keys TAVILY_API_KEY="tvly-xxxx" # https://tavily.com/account/api-keys os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY os.environ['TAVILY_API_KEY'] = TAVILY_API_KEY #define model (gpt-4o) - try different models model = ChatOpenAI(model='gpt-4o') class State(TypedDict): messages: Annotated[list, add_messages]
## define two tools- internet search and simply add @tool def add(x,y): "adding two numbers" return x+y tools = [TavilySearchResults(max_results=1), add] # tool_executor = ToolExecutor(tools) model = model.bind_tools(tools)
# Define the function that determines whether to continue or not def should_continue(state): messages = state["messages"] last_message = messages[-1] # If there is no function call, then we finish if not last_message.tool_calls: return "end" # Otherwise if there is, we continue else: return "continue" # Define the function that calls the model def call_model(state): messages = state["messages"] response = model.invoke(messages) # We return a list, because this will get added to the existing list return {"messages": [response]} # Define the function to execute tools def call_tool(state): messages = state["messages"] # Based on the continue condition # we know the last message involves a function call last_message = messages[-1] # We construct an ToolInvocation from the function_call tool_call = last_message.tool_calls[0] action = ToolInvocation( tool=tool_call["name"], tool_input=tool_call["args"], ) # We call the tool_executor and get back a response response = tool_executor.invoke(action) # We use the response to create a ToolMessage tool_message = ToolMessage( content=str(response), name=action.tool, tool_call_id=tool_call["id"] ) # We return a list, because this will get added to the existing list return {"messages": [tool_message]}
# Define a new graph workflow = StateGraph(State) # Define the two nodes we will cycle between workflow.add_node("agent", call_model) workflow.add_node("action", call_tool) # Set the entrypoint as `agent` # This means that this node is the first one called workflow.set_entry_point("agent") # We now add a conditional edge workflow.add_conditional_edges( # First, we define the start node. We use `agent`. # This means these are the edges taken after the `agent` node is called. "agent", # Next, we pass in the function that will determine which node is called next. should_continue, # Finally we pass in a mapping. # The keys are strings, and the values are other nodes. # END is a special node marking that the graph should finish. # What will happen is we will call `should_continue`, and then the output of that # will be matched against the keys in this mapping. # Based on which one it matches, that node will then be called. { # If `tools`, then we call the tool node. "continue": "action", # Otherwise we finish. "end": END, }, ) # We now add a normal edge from `tools` to `agent`. # This means that after `tools` is called, `agent` node is called next. workflow.add_edge("action", "agent")
Onderstaande is nog beetje onbekend terein voor mij en ik weet dus nog niet wat ik hier mee moet:
De volgende twee regels zijn cruciaal voor het Human-in-the-Loop-framework. Om persistentie te garanderen, moet u een controlepunt opnemen bij het compileren van de grafiek, wat nodig is om interrupts te ondersteunen. We gebruiken SqliteSaver voor een in-memory SQLite-database om de status op te slaan. Om consistent te interrupten voor een specifiek knooppunt, moeten we de naam van het knooppunt opgeven aan de compile-methode:
memory = SqliteSaver.from_conn_string(":memory:") app = workflow.compile(checkpointer=memory, interrupt_before=["action"])
# Helper function to construct message asking for verification def generate_verification_message(message: AIMessage) -> None: """Generate "verification message" from message with tool calls.""" serialized_tool_calls = json.dumps( message.tool_calls, indent=2, ) return AIMessage( content=( "I plan to invoke the following tools, do you approve?\n\n" "Type 'y' if you do, anything else to stop.\n\n" f"{serialized_tool_calls}" ), id=message.id, ) # Helper function to stream output from the graph def stream_app_catch_tool_calls(inputs, thread) -> Optional[AIMessage]: """Stream app, catching tool calls.""" tool_call_message = None for event in app.stream(inputs, thread, stream_mode="values"): message = event["messages"][-1] if isinstance(message, AIMessage) and message.tool_calls: tool_call_message = message else: #print(message) message.pretty_print() if isinstance(message, AIMessage): st.write(message.content) return tool_call_message
st.title('Human in The Loop - Agent') user_input = st.text_input("Enter your question:", key="input1") #if st.button("Submit Question"): if user_input: thread = {"configurable": {"thread_id": "5"}} #inputs = [HumanMessage(content="what's the weather in sf now?")] inputs = [HumanMessage(content=user_input)] # for event in app.stream({"messages": inputs}, thread, stream_mode="values"): # event["messages"][-1].pretty_print() tool_call_message = stream_app_catch_tool_calls( {"messages": inputs}, thread, ) # tool name: tool_name=tool_call_message.tool_calls[-1]['name'] #st.write(tool_call_message.tool_calls[-1]) st.write(f":blue[tool invoked]: {tool_name} ") st.write(":green[Please approve the tool picked up by the agent - select either 'yes' or 'no' ]") verification_message = generate_verification_message(tool_call_message) #verification_message.pretty_print() #st.write(verification_message) #human_input=input("Please provide your response") human_input = st.text_input('Please provide your response', key='keyname') if human_input: input_message = HumanMessage(human_input) # if input_message.content == "exit": # break #st.write(input_message) #input_message.pretty_print() # First we update the state with the verification message and the input message. # note that `generate_verification_message` sets the message ID to be the same # as the ID from the original tool call message. Updating the state with this # message will overwrite the previous tool call. snapshot = app.get_state(thread) snapshot.values["messages"] += [verification_message, input_message] if input_message.content == "yes": tool_call_message.id = str(uuid.uuid4()) # If verified, we append the tool call message to the state # and resume execution. snapshot.values["messages"] += [tool_call_message] app.update_state(thread, snapshot.values, as_node="agent") tool_call_message = stream_app_catch_tool_calls(None, thread) else: # Otherwise, resume execution from the input message. app.update_state(thread, snapshot.values, as_node="__start__") tool_call_message = stream_app_catch_tool_calls(None, thread)
4️⃣ Taakgerichte dialoogsystemen creëren
Bron: Creating Task-Oriented Dialog systems with LangGraph and LangChain
from langgraph.graph.message import add_messages class StateSchema(TypedDict): messages: Annotated[list, add_messages]
config = {"configurable": {"thread_id": str(uuid.uuid4())}} while True: user = input("User (q/Q to quit): ") if user in {"q", "Q"}: print("AI: Byebye") break output = None for output in graph.stream( {"messages": [HumanMessage(content=user)]}, config=config, stream_mode="updates" ): last_message = next(iter(output.values()))["messages"][-1] last_message.pretty_print() if output and "prompt" in output: print("Done!")
rompt_system_task = """Your job is to gather information from the user about the User Story they need to create. You should obtain the following information from them: - Objective: the goal of the user story. should be concrete enough to be developed in 2 weeks. - Success criteria the sucess criteria of the user story - Plan_of_execution: the plan of execution of the initiative - Deliverables: the deliverables of the initiative If you are not able to discern this info, ask them to clarify! Do not attempt to wildly guess. Whenever the user responds to one of the criteria, evaluate if it is detailed enough to be a criterion of a User Story. If not, ask questions to help the user better detail the criterion. Do not overwhelm the user with too many questions at once; ask for the information you need in a way that they do not have to write much in each response. Always remind them that if they do not know how to answer something, you can help them. After you are able to discern all the information, call the relevant tool."""
def domain_state_tracker(messages): return [SystemMessage(content=prompt_system_task)] + messages
import os from dotenv import load_dotenv, find_dotenv from langchain_openai import AzureChatOpenAI from langchain_core.pydantic_v1 import BaseModel from typing import List, Literal, Annotated _ = load_dotenv(find_dotenv()) # read local .env file llm = AzureChatOpenAI(azure_deployment=os.environ.get("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"), openai_api_version="2023-09-01-preview", openai_api_type="azure", openai_api_key=os.environ.get('AZURE_OPENAI_API_KEY'), azure_endpoint=os.environ.get('AZURE_OPENAI_ENDPOINT'), temperature=0) prompt_system_task = """Your job is to gather information from the user about the User Story they need to create. You should obtain the following information from them: - Objective: the goal of the user story. should be concrete enough to be developed in 2 weeks. - Success criteria the sucess criteria of the user story - Plan_of_execution: the plan of execution of the initiative If you are not able to discern this info, ask them to clarify! Do not attempt to wildly guess. Whenever the user responds to one of the criteria, evaluate if it is detailed enough to be a criterion of a User Story. If not, ask questions to help the user better detail the criterion. Do not overwhelm the user with too many questions at once; ask for the information you need in a way that they do not have to write much in each response. Always remind them that if they do not know how to answer something, you can help them. After you are able to discern all the information, call the relevant tool.""" class UserStoryCriteria(BaseModel): """Instructions on how to prompt the LLM.""" objective: str success_criteria: str plan_of_execution: str llm_with_tool = llm.bind_tools([UserStoryCriteria])
from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages class StateSchema(TypedDict): messages: Annotated[list, add_messages] workflow = StateGraph(StateSchema)
def domain_state_tracker(messages): return [SystemMessage(content=prompt_system_task)] + messages def call_llm(state: StateSchema): """ talk_to_user node function, adds the prompt_system_task to the messages, calls the LLM and returns the response """ messages = domain_state_tracker(state["messages"]) response = llm_with_tool.invoke(messages) return {"messages": [response]}
workflow.add_node("talk_to_user", call_llm)
workflow.add_edge(START, "talk_to_user")
def finalize_dialogue(state: StateSchema): """ Add a tool message to the history so the graph can see that it`s time to create the user story """ return { "messages": [ ToolMessage( content="Prompt generated!", tool_call_id=state["messages"][-1].tool_calls[0]["id"], ) ] } workflow.add_node("finalize_dialogue", finalize_dialogue)
prompt_generate_user_story = """Based on the following requirements, write a good user story: {reqs}""" def build_prompt_to_generate_user_story(messages: list): tool_call = None other_msgs = [] for m in messages: if isinstance(m, AIMessage) and m.tool_calls: #tool_calls is from the OpenAI API tool_call = m.tool_calls[0]["args"] elif isinstance(m, ToolMessage): continue elif tool_call is not None: other_msgs.append(m) return [SystemMessage(content=prompt_generate_user_story.format(reqs=tool_call))] + other_msgs def call_model_to_generate_user_story(state): messages = build_prompt_to_generate_user_story(state["messages"]) response = llm.invoke(messages) return {"messages": [response]} workflow.add_node("create_user_story", call_model_to_generate_user_story)
def define_next_action(state) -> Literal["finalize_dialogue", END]: messages = state["messages"] if isinstance(messages[-1], AIMessage) and messages[-1].tool_calls: return "finalize_dialogue" else: return END workflow.add_conditional_edges("talk_to_user", define_next_action)
workflow.add_edge("finalize_dialogue", "create_user_story") workflow.add_edge("create_user_story", END)
memory = MemorySaver() graph = workflow.compile(checkpointer=memory) config = {"configurable": {"thread_id": str(uuid.uuid4())}} while True: user = input("User (q/Q to quit): ") if user in {"q", "Q"}: print("AI: Byebye") break output = None for output in graph.stream( {"messages": [HumanMessage(content=user)]}, config=config, stream_mode="updates" ): last_message = next(iter(output.values()))["messages"][-1] last_message.pretty_print() if output and "create_user_story" in output: print("User story created!")
Bron: Creating Task-Oriented Dialog systems with LangGraph and LangChain
5️⃣ Snelle generatie op basis van gebruikersvereisten
Bron: Prompt Generation from User Requirements
Gegevens verzamelen met een Agent en als gegevens verzameld zijn actie ondernemen.
Prompt Generation from User Requirements: In this example we will create a chat bot that helps a user generate a prompt. It will first collect requirements from the user, and then will generate the prompt (and refine it based on user input). These are split into two separate states, and the LLM decides when to transition between them.
pip install -U langgraph langchain_openai
import getpass import os def _set_env(var: str): if not os.environ.get(var): os.environ[var] = getpass.getpass(f"{var}: ") _set_env("OPENAI_API_KEY")
from typing import List from langchain_core.messages import SystemMessage from langchain_openai import ChatOpenAI from pydantic import BaseModel
template = """Your job is to get information from a user about what type of prompt template they want to create. You should get the following information from them: - What the objective of the prompt is - What variables will be passed into the prompt template - Any constraints for what the output should NOT do - Any requirements that the output MUST adhere to If you are not able to discern this info, ask them to clarify! Do not attempt to wildly guess. After you are able to discern all the information, call the relevant tool.""" def get_messages_info(messages): return [SystemMessage(content=template)] + messages class PromptInstructions(BaseModel): """Instructions on how to prompt the LLM.""" objective: str variables: List[str] constraints: List[str] requirements: List[str] llm = ChatOpenAI(temperature=0) llm_with_tool = llm.bind_tools([PromptInstructions]) def info_chain(state): messages = get_messages_info(state["messages"]) response = llm_with_tool.invoke(messages) return {"messages": [response]}
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage # New system prompt prompt_system = """Based on the following requirements, write a good prompt template: {reqs}""" # Function to get the messages for the prompt # Will only get messages AFTER the tool call def get_prompt_messages(messages: list): tool_call = None other_msgs = [] for m in messages: if isinstance(m, AIMessage) and m.tool_calls: tool_call = m.tool_calls[0]["args"] elif isinstance(m, ToolMessage): continue elif tool_call is not None: other_msgs.append(m) return [SystemMessage(content=prompt_system.format(reqs=tool_call))] + other_msgs def prompt_gen_chain(state): messages = get_prompt_messages(state["messages"]) response = llm.invoke(messages) return {"messages": [response]}
from typing import Literal from langgraph.graph import END def get_state(state) -> Literal["add_tool_message", "info", "__end__"]: messages = state["messages"] if isinstance(messages[-1], AIMessage) and messages[-1].tool_calls: return "add_tool_message" elif not isinstance(messages[-1], HumanMessage): return END return "info"
from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import StateGraph, START from langgraph.graph.message import add_messages from typing import Annotated from typing_extensions import TypedDict class State(TypedDict): messages: Annotated[list, add_messages] memory = MemorySaver() workflow = StateGraph(State) workflow.add_node("info", info_chain) workflow.add_node("prompt", prompt_gen_chain) @workflow.add_node def add_tool_message(state: State): return { "messages": [ ToolMessage( content="Prompt generated!", tool_call_id=state["messages"][-1].tool_calls[0]["id"], ) ] } workflow.add_conditional_edges("info", get_state) workflow.add_edge("add_tool_message", "prompt") workflow.add_edge("prompt", END) workflow.add_edge(START, "info") graph = workflow.compile(checkpointer=memory)
from IPython.display import Image, display display(Image(graph.get_graph().draw_mermaid_png()))
USING THE GRAPH:
import uuid config = {"configurable": {"thread_id": str(uuid.uuid4())}} while True: user = input("User (q/Q to quit): ") print(f"User (q/Q to quit): {user}") if user in {"q", "Q"}: print("AI: Byebye") break output = None for output in graph.stream( {"messages": [HumanMessage(content=user)]}, config=config, stream_mode="updates" ): last_message = next(iter(output.values()))["messages"][-1] last_message.pretty_print() if output and "prompt" in output: print("Done!")
Bron: Prompt Generation from User Requirements
Specials
Hands on LangGraph — Building a multi agent assistant
AI Agents With Human In The Loop
Implementing Human-in-the-Loop with LangGraph
Cursus: Introduction to LangGraph
Links
Cursus LangGraph in de LangChain acaddemy
Cursus: Introduction to LangGraph✨
Langgraph Home
Building language agents as graphs
THE BEST Tool for AI Agent Workflows - LangGraph FULL Guide met streamlit GUI
LangChain Academy
Cursus: Introduction to LangGraph✨
Tutorials:
LangGraph: A Comprehensive Guide for Beginners
Building Tool-Calling Conversational AI with LangChain and LangGraph: A Beginner’s Guide
AI Agents With Human In The Loop
LangGraph Agents By LangChain (ook met Human In The Loop)
Implementing Human-in-the-Loop with LangGraph (incl. tools)
Building Stateful Applications with LangGraph (incl tools)
Hands on LangGraph — Building a multi agent assistant