Skip to content

Batch Insert doesn't work #348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
gluonfield opened this issue Apr 11, 2025 · 0 comments
Open

Batch Insert doesn't work #348

gluonfield opened this issue Apr 11, 2025 · 0 comments

Comments

@gluonfield
Copy link

gluonfield commented Apr 11, 2025

I have took official graphiti example and tried to insert batch data. The operation resulted in errors

Connection closed
Traceback (most recent call last):
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/index_test.py", line 247, in <module>
    asyncio.run(main())
  File "/Users/wpc/.pyenv/versions/3.11.9/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/wpc/.pyenv/versions/3.11.9/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/.pyenv/versions/3.11.9/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/index_test.py", line 132, in main
    await graphiti.add_episode_bulk(episodes_batch)
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/graphiti.py", line 602, in add_episode_bulk
    raise e
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/graphiti.py", line 566, in add_episode_bulk
    (nodes, uuid_map), extracted_edges_timestamped = await semaphore_gather(
                                                     ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/helpers.py", line 96, in semaphore_gather
    return await asyncio.gather(*(_wrap_coroutine(coroutine) for coroutine in coroutines))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/helpers.py", line 94, in _wrap_coroutine
    return await coroutine
           ^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/utils/bulk_utils.py", line 185, in dedupe_nodes_bulk
    compressed_nodes, compressed_map = await compress_nodes(llm_client, nodes, uuid_map)
                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/utils/bulk_utils.py", line 311, in compress_nodes
    results = await semaphore_gather(
              ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/helpers.py", line 96, in semaphore_gather
    return await asyncio.gather(*(_wrap_coroutine(coroutine) for coroutine in coroutines))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/helpers.py", line 94, in _wrap_coroutine
    return await coroutine
           ^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/utils/maintenance/node_operations.py", line 391, in dedupe_node_list
    llm_response = await llm_client.generate_response(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/llm_client/openai_client.py", line 139, in generate_response
    response = await self._generate_response(messages, response_model, max_tokens)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/graphiti_core/llm_client/openai_client.py", line 104, in _generate_response
    response = await self.client.beta.chat.completions.parse(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/openai/resources/beta/chat/completions.py", line 458, in parse
    "response_format": _type_to_response_format(response_format),
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/wpc/Projects/my company/vc-twin-go/python/memory/graphiti/venv/lib/python3.11/site-packages/openai/lib/_parsing/_completions.py", line 255, in type_to_response_format_param
    raise TypeError(f"Unsupported response_format type - {response_format}")
TypeError: Unsupported response_format type - None

The code that reproduces the error is effectively official example.

import asyncio
import json
import logging
import os
from datetime import datetime, timezone
from logging import INFO

from dotenv import load_dotenv

from graphiti_core import Graphiti
from graphiti_core.graphiti import RawEpisode
from graphiti_core.nodes import EpisodeType
from graphiti_core.search.search_config_recipes import NODE_HYBRID_SEARCH_RRF

#################################################
# CONFIGURATION
#################################################
# Set up logging and environment variables for
# connecting to Neo4j database
#################################################

# Configure logging
logging.basicConfig(
    level=INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)

load_dotenv()

# Neo4j connection parameters
# Make sure Neo4j Desktop is running with a local DBMS started
neo4j_uri = ...
neo4j_user = ...
neo4j_password = ...
if not neo4j_uri or not neo4j_user or not neo4j_password:
    raise ValueError("NEO4J_URI, NEO4J_USER, and NEO4J_PASSWORD must be set")


async def main():
    #################################################
    # INITIALIZATION
    #################################################
    # Connect to Neo4j and set up Graphiti indices
    # This is required before using other Graphiti
    # functionality
    #################################################

    # Initialize Graphiti with Neo4j connection
    graphiti = Graphiti(neo4j_uri, neo4j_user, neo4j_password)

    try:
        # Initialize the graph database with graphiti's indices. This only needs to be done once.
        await graphiti.build_indices_and_constraints(delete_existing=True)

        #################################################
        # ADDING EPISODES
        #################################################
        # Episodes are the primary units of information
        # in Graphiti. They can be text or structured JSON
        # and are automatically processed to extract entities
        # and relationships.
        #################################################

        # Example: Add Episodes
        # Episodes list containing both text and JSON episodes
        episodes = [
            {
                "content": "Kamala Harris is the Attorney General of California. She was previously "
                "the district attorney for San Francisco.",
                "type": EpisodeType.text,
                "description": "podcast transcript",
            },
            {
                "content": "As AG, Harris was in office from January 3, 2011 – January 3, 2017",
                "type": EpisodeType.text,
                "description": "podcast transcript",
            }
        ]

        # Add episodes to the graph
        episodes_batch: list[RawEpisode] = []
        for i, episode in enumerate(episodes[0:2]):
            episodes_batch.append(
                RawEpisode(
                    name=f"Freakonomics Radio {i}",
                    content=episode["content"],
                    source=episode["type"],
                    source_description=episode["description"],
                    reference_time=datetime.now(timezone.utc),
                )
            )

        print(episodes_batch)
        await graphiti.add_episode_bulk(episodes_batch)

    finally:
        #################################################
        # CLEANUP
        #################################################
        # Always close the connection to Neo4j when
        # finished to properly release resources
        #################################################

        # Close the connection
        await graphiti.close()
        print("\nConnection closed")


if __name__ == "__main__":
    asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant