Create a team of agents to do market research for your product developement team

Category: LLM Agents
Topic: LLM Agents LLMs LangGraph

Published by Nicole on Nov 02, 2024 • 10 min read.

This post draws inspiration from the paper AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation by Wu et al., and the examples from LangGraph. In this post, you will construct a Market Research Team and learn the following:

  • How to define tools for agents. You will use the following tools:
  • Taviliy  or web search, get your API key here.
  • Exa after account login, get your API key here. To find the exact content you're looking for on the web using embeddings-based search.
  • SerpApi, after account login, get your API key to do look for existing patents.
  • Tools to access and write to a .txt file.
  • How to define utilities to help create the graph.
  • How to create a team supervisor and the team of agents.


LangGraph

You will be using LangGraph for this. LangGraph is a library designed for building stateful, multi-actor applications with LLMs, facilitating the creation of agent and multi-agent workflows. LangGraph supports the definition of flows that involve `cycles`, which are crucial for most __agentic architectures__. It is a low-level framework and provides you, therefore, fine-grained control over both the flow and state of your application. LangGraph is inspired by [Pregel](https://research.google/pubs/pub37252/) and [Apache Beam](https://beam.apache.org/). The public interface draws inspiration from [NetworkX](https://networkx.org/documentation/latest/).

Key Features

  • Cycles and Branching: Implement loops and conditionals in your apps.
  • Persistence: Automatically save state after each step in the graph. Pause and resume the graph execution at any point to support error recovery, human-in-the-loop workflows, time travel and more.
  • Human-in-the-Loop: Interrupt graph execution to approve or edit next action planned by the agent.
  • Streaming Support: Stream outputs as they are produced by each node (including token streaming).
  • Integration with LangChain: LangGraph integrates seamlessly with LangChain  and LangSmith, but does not require them.

You can find the code for all in this Google Colab notebook

I will skip the API setups, imports and installing the dependencies here, but you can find all in the Google Colab notebook

First, define a WORKING_DIRECTORY so you can store the generated file later here. 

# Define a persistent working directory
WORKING_DIRECTORY = Path("/content/working_directory")

# Ensure the working directory exists
if not WORKING_DIRECTORY.exists():
    WORKING_DIRECTORY.mkdir(parents=True)
    print(f"Created working directory: {WORKING_DIRECTORY}")
else:
    print(f"Working directory already exists: {WORKING_DIRECTORY}")

Next, create the tools for your agents. You can extend your agents toolbox as you wish. The @tool decorator is the simpliest way of defining a custom tool. Note that you need to define a docstring as the tool's description. 

Note of caution:

The document writing and file-access tools gives your agents access to your file-system, this can be unsafe.

# Load SerpAPI Search Wrapper from LangChain

from langchain_community.utilities import SerpAPIWrapper
from exa_py import Exa
import json
from typing import List


@tool("patent_search")
def patent_search(query: str) -> str:
    """Search with Google SERP API by a query to fine news about patents related to the query."""
    params = {
        "engine": "google_patents",
        "gl": "us",
        "hl": "en",
        }
    patent_search = SerpAPIWrapper(params=params, serpapi_api_key=serp_api_key)
    return patent_search.run(query)


@tool("exa_search")
def exa_search(question: str) -> str:
    """Tool using Exa's Python SDK to run semantic search and return result highlights."""
    exa = Exa(exa_api_key)

    response = exa.search_and_contents(
        question,
        type="neural",
        use_autoprompt=True,
        num_results=3,
        highlights=True
    )

    results = []
    for idx, eachResult in enumerate(response.results):
        result = {
            "Title": eachResult.title,
            "URL": eachResult.url,
            "Highlight": "".join(eachResult.highlights)
        }
        results.append(result)

    return json.dumps(results)

# Load Tavily Search Wrapper from LangChain
tavily_tool = TavilySearchResults(
    max_results= 5,
    search_depth = "advanced"
    )


########## Document Tools ##########
@tool
def create_outline(
    points: Annotated[List[str], "List of main points or sections."],
    file_name: Annotated[str, "File path to save the outline."],
) -> Annotated[str, "Path of the saved outline file."]:
    """Create and save an outline."""
    with (WORKING_DIRECTORY / file_name).open("w") as file:
        for i, point in enumerate(points):
            file.write(f"{i + 1}. {point}\n")
    return f"Outline saved to {file_name}"


@tool
def read_document(
    file_name: Annotated[str, "File path to save the document."],
    start: Annotated[Optional[int], "The start line. Default is 0"] = None,
    end: Annotated[Optional[int], "The end line. Default is None"] = None,
) -> str:
    """Read the specified document."""
    with (WORKING_DIRECTORY / file_name).open("r") as file:
        lines = file.readlines()
    if start is not None:
        start = 0
    return "\n".join(lines[start:end])


@tool
def write_document(
    content: Annotated[str, "Text content to be written into the document."],
    file_name: Annotated[str, "File path to save the document."],
) -> Annotated[str, "Path of the saved document file."]:
    """Create and save a text document."""
    with (WORKING_DIRECTORY / file_name).open("w") as file:
        file.write(content)
    return f"Document saved to {file_name}"


@tool
def edit_document(
    file_name: Annotated[str, "Path of the document to be edited."],
    inserts: Annotated[
        Dict[int, str],
        "Dictionary where key is the line number (1-indexed) and value is the text to be inserted at that line.",
    ],
) -> Annotated[str, "Path of the edited document file."]:
    """Edit a document by inserting text at specific line numbers."""

    with (WORKING_DIRECTORY / file_name).open("r") as file:
        lines = file.readlines()

    sorted_inserts = sorted(inserts.items())

    for line_number, text in sorted_inserts:
        if 1 <= line_number <= len(lines) + 1:
            lines.insert(line_number - 1, text + "\n")
        else:
            return f"Error: Line number {line_number} is out of range."

    with (WORKING_DIRECTORY / file_name).open("w") as file:
        file.writelines(lines)

    return f"Document edited and saved to {file_name}"

Now create the agents and team supervisor. 

def create_agent(
    llm: ChatOpenAI,
    tools: list,
    system_prompt: str,
) -> str:
    """Create a function-calling agent and add it to the graph."""
    system_prompt += "\nWork autonomously according to your specialty, using the tools available to you."
    " Do not ask for clarification."
    " Your other team members (and other teams) will collaborate with you with their own specialties."
    " You are chosen for a reason! You are one of the following team members: {team_members}."
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                system_prompt,
            ),
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )
    agent = create_openai_functions_agent(llm, tools, prompt)
    executor = AgentExecutor(agent=agent, tools=tools, handle_parsing_errors=True)
    return executor


def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {"messages": [HumanMessage(content=result["output"], name=name)]}


def create_team_supervisor(llm: ChatOpenAI, system_prompt, members) -> str:
    """An LLM-based router."""
    options = ["FINISH"] + members
    function_def = {
        "name": "route",
        "description": "Select the next role.",
        "parameters": {
            "title": "routeSchema",
            "type": "object",
            "properties": {
                "next": {
                    "title": "Next",
                    "anyOf": [
                        {"enum": options},
                    ],
                },
            },
            "required": ["next"],
        },
    }
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system_prompt),
            MessagesPlaceholder(variable_name="messages"),
            (
                "system",
                "Given the conversation above, who should act next?"
                " Or should we FINISH? Select one of: {options}",
            ),
        ]
    ).partial(options=str(options), team_members=", ".join(members))
    return (
        prompt
        | llm.bind_functions(functions=[function_def], function_call="route")
        | JsonOutputFunctionsParser()
    )

To create the agents you can also combine different LLMs to do your tasks. This might be beneficial because for specific tasks some LLMs might perform better than others or be less costly than others. First you have to define the graph state. And then you can create your agents which can access the tools created before.

State: A shared data structure that represents the current snapshot of an application. It can be any Python type, but is typically a TypedDict or Pydantic BaseModel.

# Define team graph state
class ResearchTeamState(TypedDict):
    # This tracks the team's conversation internally
    messages: Annotated[List[BaseMessage], operator.add]
    # This provides each worker with context on the others' skill sets
    team_members: str
    # This is how the supervisor tells langgraph who has to work next
    next: str
    # This tracks the shared directory state
    current_files: str

Next, you have a helper function that will run before each worker agent begins their task. This function ensures that the agents are more aware of the current state of the working directory.

def prelude(state):
    """
    A helper function that prepares the state for each worker agent by ensuring the working directory exists
    and listing the files present in it.

    Args:
        state (dict): The current state to be updated with information about the files in the working directory.

    Returns:
        dict: The updated state with a key "current_files" containing a message about the files in the directory.
              If no files are present, the message will indicate that no files have been written.
    """

    written_files = []
    if not WORKING_DIRECTORY.exists():
        WORKING_DIRECTORY.mkdir(parents=True)
    try:
        written_files = [
            f.relative_to(WORKING_DIRECTORY) for f in WORKING_DIRECTORY.rglob("*")
            if f.is_file()
        ]
    except Exception as e:
        print(f"Error reading files: {e}")
    if not written_files:
        return {**state, "current_files": "No files written."}
    return {
        **state,
        "current_files": "\nBelow are files your team has written to the directory:\n"
        + "\n".join([f" - {f}" for f in written_files]),
    }

Now you can initialize your LLM. You can also use another LLM, for instance via Fireworks works seemlessly with this LangGraph setting instead of OpenAI. For this, you just need to get the API key and if you want to use, for instance, the Llama 3.1 70B model, you how have to initialize your LLM as follows:

from langchain_fireworks import ChatFireworks

llm = ChatFireworks(
    model="accounts/fireworks/models/llama-v3p1-70b-instruct",
    temperature=0
)

I use GPT-4o:

llm = ChatOpenAI(model="gpt-4o")

Now you can define the agents with their specific roles. Note that you have to define what their purpose is to guide the model.  

web_search_agent = create_agent(
    llm,
    [tavily_tool],
    """You are a research assistant who can search for up-to-date info using the
    Tavily Search Engine.""",
)
search_node = functools.partial(agent_node, agent=web_search_agent, name="Search")

exa_search_agent = create_agent(
    llm,
    [exa_search],
    """You are a research assistant who can search for all recent info on Exa Search
    your response should clearly articulate the key points you found.""",
)
exa_search_node = functools.partial(agent_node, agent=web_search_agent, name="ExaSearch")


patent_search_agent = create_agent(
    llm,
    [patent_search],
    """You are a market research assistant, very knowledgeable in patent research
    to find up-to-date info about patents using the Google patents API.""",
)
patent_search_node = functools.partial(agent_node, agent=web_search_agent, name="PatentSearch")

doc_writer_agent = create_agent(
    llm,
    [write_document, edit_document, read_document],
    """You are a Senior Market Research Analyst, and an highly respected
    expert in writing market research reports for your product development team.\n"""
    # The {current_files} value is populated automatically by the graph state
    "Below are files currently in your directory:\n{current_files}",
)
# Injects current directory working state before each call
context_aware_doc_writer_agent = prelude | doc_writer_agent
doc_writing_node = functools.partial(
    agent_node, agent=context_aware_doc_writer_agent, name="DocWriter"
)

research_writing_supervisor = create_team_supervisor(
    llm,
    "You are a supervisor tasked with managing a conversation between the"
    " following workers:  {team_members}. Given the following user request,"
    " respond with the worker to act next. Each worker will perform a"
    " task and respond with their results and status. When finished,"
    " respond with FINISH.",
    ["DocWriter", "Search", "ExaSearch", "PatentSearch"],
)

With all the previous steps in place, you can now define your Graph. As a guideline, here is a short explanation what nodes and edges do:

  • Nodes: Functions that encode the logic of the agents. They receive the current state as input, perform some computation or side-effect, and return an updated state.
  • Edges: Functions that determine which node to execute next based on the current state. They can be conditional branches or fixed transitions.

So in short: Nodes do the work. Edges tell what to do next.

# Create the graph
authoring_graph = StateGraph(ResearchTeamState)
authoring_graph.add_node("DocWriter", doc_writing_node)
authoring_graph.add_node("Search", search_node)
authoring_graph.add_node("ExaSearch", exa_search_node)
authoring_graph.add_node("PatentSearch", patent_search_node)
authoring_graph.add_node("supervisor", research_writing_supervisor)

# Add the edges
authoring_graph.add_edge("DocWriter", "supervisor")
authoring_graph.add_edge("PatentSearch", "supervisor")
authoring_graph.add_edge("Search", "supervisor")
authoring_graph.add_edge("ExaSearch", "supervisor")

# Add the edges where routing applies
authoring_graph.add_conditional_edges(
    "supervisor",
    lambda x: x["next"],
    {
        "DocWriter": "DocWriter",
        "Search": "Search",
        "PatentSearch": "PatentSearch",
        "ExaSearch": "ExaSearch",
        "FINISH": END,
    },
)

authoring_graph.add_edge(START, "supervisor")
chain = authoring_graph.compile()


# The following functions interoperate between the top level graph state
# and the state of the research sub-graph
# this makes it so that the states of each graph don't get intermixed
def enter_chain(message: str, members: List[str]):
    results = {
        "messages": [HumanMessage(content=message)],
        "team_members": ", ".join(members),
    }
    return results


# Reuse the enter/exit functions to wrap the graph
authoring_chain = (
    functools.partial(enter_chain, members=authoring_graph.nodes)
    | authoring_graph.compile()
)

If you call this here: 
 

display(Image(chain.get_graph().draw_mermaid_png()))

Your Graph will be displayed:

As a final step, you need to prompt your agent. 

for s in authoring_chain.stream(
    """Write a 800 words long research report white paper on semiconductor development which
    includes an executive summary at the beginning about your findings.
    Search also for relevant recent patents and include the links to it.
    IMPORTANT: Provide the links to all your sources, and then write the report to disk as .txt file.""",
    {"recursion_limit": 100},
):
    if "__end__" not in s:
        print(s)
        print("---")

If you execute this cell in the notbook the supervisor will start the tasks and the agents will generate your report and store it as a .txt file in your working directory.