A Data Importing AI Agent

AI
38 min read

For the past few months, I've been blogging about my experience with agents. My first blogs on the subject were about using agents with Dolt. Dolt is a SQL database that supports Git-like versioning and collaboration. When you give a computer agency to modify data, you put yourself at risk. I built an agentic helpdesk to show the benefits of a version controlled database when working with agents. I wrote about rogue agents and how Dolt would have prevented the now infamous case of a rogue agent deleting a production database. I then wrote two blogs about agentic data collection [1], [2] which failed to produce the results I was hoping for.

After those failures, I decided to take a stab at building an agent myself that could import data into a Dolt database. Previously, I wrote a brief overview of developing an AI agent (which I highly recommend reading before this post). At this point, I'll walk you through what I've learned and built so far.

The Goal

The goal when I set out was to be able to build an agent that could import data into a Dolt database using prompts like the following:

There is a MySQL compatible Dolt database server running on 127.0.0.1:3306 which you can connect to using the user 'root'
with no password. Import /path/to/csvs/people.csv into database: "families" table: "people". Only import users that are 21
or older.
There is a MySQL compatible Dolt database server running on 127.0.0.1:3306 which you can connect to using the user 'root'
with no password. Import all CSV files in /path/to/csvs/ into the database: "families". Create tables as needed.
There is a MySQL compatible Dolt database server running on 127.0.0.1:3306 which you can connect to using the user 'root'a 
with no password. Crawl https://en.wikipedia.org/wiki/List_of_cocktails to find cocktail recipes and import them into the
database: "cocktails" tables: "cocktails", "ingredients", "recipes", "recipe_ingredients".
There is a MySQL compatible Dolt database server running on 127.0.0.1:3306 which you can connect to using the user 'root'
with no password. Crawl https://github.com/dolthub/dolt/issues and import all issues into the database: "github_issues".
Create tables as needed.

Looking at these prompts, you can see that the agent needs to be able to parse the prompt and determine a list of data sources to import from, and a target database to import to. Then it needs to be able to understand the structure of the data, both the sources and the target database. Finally, it needs to be able to transform the data as needed, and import it into the target.

Web Crawling / Searching

In the process of building this agent, consistent, accurate, and complete web crawling became a major pain point. I have made considerable progress on this front. However, for this blog post, I split off the web crawling and web searching functionality into a separate agent which I will talk about in a future blog post. It is my hope that at some point in the future this agent's functionality can be integrated with the web crawling / searching agent to provide a more complete data import experience. However, this agent, and this blog post will focus on importing data from local files.

An Overview of the Agent

There are a number of different approaches to building an agent. My initial approach was to provide the prompt, and the list of tools that would be available to the LLM and have it generate a plan to execute. Then I would execute the plan step by step providing the tools necessary for file and database interactions. This approach had very inconsistent results. For simple prompts, the LLM would generate a reasonable plan, and execute it beautifully. But this was extremely rare, and only ever happened with the simplest of data import tasks. More often than not, the LLM wouldn't be able to execute its plan. Often times things broke down as the amount of data being processed by a step was too large. The LLM wouldn't plan to execute in batches, or handle the larger amounts of data in a way that it could handle. It also required the user to provide every detail in the prompt and get it right the first time.

Through trial and error, I split out common stages in the data import process, and allowed the user to provide feedback along the way. In the following sections I'll walk through the stages of the agent.

Extract Info from the Prompt and Verify Understanding of the Task

At this stage, our LLM extracts information about the data sources, target database, and any details relevant to the data import task. Additionally, our LLM summarizes what it understands the task to be. Next, we display the summary of the task to the user and ask them to approve it, or abort the data import job so that they may rephrase the prompt to fix any misunderstandings.

class SourceTypeEnum(str, Enum):
    LOCAL_FILE = "local_file"
    DIRECTORY = "directory"
    FILE_AT_URL = "file_at_url"
    UNKNOWN = "unknown"

class Source(BaseModel):
    source: str
    details: str = ""
    type: SourceTypeEnum
    recursive: Optional[bool] = False

class DatabaseTypeEnum(str, Enum):
    MYSQL = "mysql"
    POSTGRESQL = "postgresql"
    UNKNOWN = "unknown"
    UNSUPPORTED = "unsupported"

class Sink(BaseModel):
    db_type: DatabaseTypeEnum
    host: str
    port: int = 3306
    database: str
    username: str
    password: str = ""
    tables: Optional[list[str]] = None

class PromptExtractionResult(BaseModel):
    sources: Optional[list[Source]] = None
    sink: Optional[Sink] = None
    error: str = ""
    summary: str = ""
    
prompt_extraction_agent = Agent(
    name="File Import Prompt Extractor",
    instructions="""You are a natural language prompt extraction assistant. You have 4 jobs that should be done in order. 
    If any of the jobs fail then you should abort processing and return a message in the error field of the response. The jobs are:
    1. Determine if the user's request is a data collection job, which takes data from one or more input sources and imports
        it into a database. If it is not you should abort processing and respond with an error message detailing why it is 
        not a data collection job.
    2. Find the sources of the data to be collected. The sources may be one or more local files, files matching some pattern 
        in a directory, or remote files at specific urls. You do not support web searches, or web crawling. If an unknown
        source type is found, or a web search or crawl source is found,  then you should 
        respond with an error message detailing the unsupported source type. If you can't determine the source then an error 
        message should be returned detailing why the source could not be determined. For local files the source field
        should be the file path. For files at urls the source field should be the url. For directories the source field 
        should be the directory path and the recursive field should be set to true if the prompt indicates that subdirectories 
        should be searched. If the prompt does not indicate that subdirectories should be searched then the recursive 
        field should be false. The prompt may optionally include details about the source such as rows to filter out, 
        columns to select, or specific items to search for when crawling the web. If such details are present then they 
        should be included in the details field of the source. Additionally, directory type sources may want to include 
        details on the file types or a pattern to match files in the directory in the details field.
    3. Find the destination database for the data to be imported into. The data sink will be a database server 
        and the prompt must the host, database name, and username. The prompt may optionally include the port but if it 
        isn't then 3306 should be assumed. The prompt may optionally include a password for the user, but if it isn't then 
        you should assume the user does not need a password and should return "" for the password. The prompt may optionally 
        include specific tables to import the data into, but if it doesn't then return null for the tables. The database
        type should also be extracted from the prompt. If you are able to determine the database from the prompt then you
        should return either "mysql", "postgresql", or "unsupported". If you can't determine the database type then return "unknown".
    4. Write a brief summary of what you understand the job to be. It should reflect the sources and sink you have identified. If
        the sources or the sink are null then the summary should be empty, and an error is expected.
""",
    output_type=PromptExtractionResult
)

This is the first call to the LLM which extracts the relevant information from the prompt. A critical piece of this LLM call is the output_type parameter. This parameter tells the agent the structure of what the output response should be. This is a critical aspect of using an LLM within your program. By providing the output_type, we can get a structured response from the LLM that we can code against.

Our instructions specify four goals for the LLM to achieve. The first is to determine if the prompt is a data import task. The second is to extract the data sources. The third is to extract the target database. The fourth is to summarize the task as the LLM understands it. If any of these steps fail, the LLM should return an error message in the error field of the response.

We run this agent passing it the prompt provided by the user, check it for errors, and ask the user to approve the summary of the task like so:

async def extract_info_from_prompt(prompt:str, sess) -> PromptExtractionResult:
    try:
        print("Extracting information from prompt...")
        info = await prompt_extraction_agent.run(openai_conn, prompt, sess)

        if len(info.error) > 0:
            print(f"Ran into the following error trying to run your data import job: {info.error}")
            sys.exit(1)

        print()
        print(f"This is my understanding of the job:\n{info.summary}")
        answer = input("Is this correct? (y/n)> ")
        print()

        if answer.lower() != "y":
            print("Please update your prompt and try again.")
            sys.exit(0)

        return info

    except Exception as e:
        print(f"Error extracting information from prompt: {e}")
        sys.exit(1)

Getting the Schema of the Target Database

After extracting the information from the prompt, we need to understand the target database and the data sources. By understanding the schema of the sources and destination we can determine what transformations need to be made, and how to process them. We start by getting the schema of the target database.

I initially provided tooling for our LLM to connect to the database and extract the schema. This worked ok for small databases with a few dozen tables. But what happens when the database becomes large? The number of tool calls becomes unmanageable, and it's a lot simpler, less expensive, and faster to just connect to the database directly and extract the schema. Some trial and error in the later stages of our agent development showed me that getting the CREATE TABLE statements was the best way to show the LLM the schema of the database. It provides the column names and types, and table keys and constraints in a way that our LLM can understand. In this code, I loop over all the tables in the database and get their CREATE TABLE statements.

def mysql_connect(sink:Sink) -> MySQLConnection:
    from mysql.connector import connect

    kwargs = {
        "host":sink.host,
        "port":sink.port,
        "user":sink.username,
        "database": sink.database
    }

    if len(sink.password) > 0:
        kwargs["password"] = sink.password

    return connect(**kwargs)

def get_tables(conn:MySQLConnection) -> list[str]:
    cursor = conn.cursor()
    cursor.execute("SHOW TABLES;")
    rows = cursor.fetchall()
    cursor.close()
    tables = [row[0] for row in rows]
    return tables

def get_table_create_statements(conn:MySQLConnection, tables:list[str]) -> dict[str,str]:
    cursor = conn.cursor()
    table_schemas = {}
    for table in tables:
        cursor.execute(f"SHOW CREATE TABLE `{table}`;")
        row = cursor.fetchone()
        if row and len(row) == 2:
            table_schemas[table] = row[1]
    cursor.close()
    return table_schemas

def get_sink_schema(sink:Sink) -> dict[str,str]:
    if sink.db_type == DatabaseTypeEnum.MYSQL:
        conn = mysql_connect(sink)
        tables = get_tables(conn)
        table_schemas = get_table_create_statements(conn, tables)
        conn.close()
        return table_schemas
    else:
        raise ValueError(f"Unsupported database type: {sink.db_type}")

Here our get_sink_schema function connects to the database, gets the list of tables, and then gets the CREATE TABLE statements for each table. The result is a dictionary mapping table names to their CREATE TABLE statements. We can then add that to our context as needed.

Try to determine the schema of the data sources

The next step is to try to determine the schema of the data sources. This agent is focused on importing data from local files. We need to first get a list of the files to import. Our prompts may include some natural language details about the files to import. Looking at one of the prompt examples from earlier:

There is MySQL compatible Dolt database server running on 127.0.0.1:3306 which you can connect to using the user 'root'
with no password. Import all CSV files in /path/to/csvs/ into the database: "families". Create tables as needed.

I would expect the Source object extracted from the prompt to look like this:

{
  "source": "/path/to/csvs/",
  "details": "Import all CSV files",
  "type": "directory",
  "recursive": false
}

Because the details field is of arbitrary complexity, I don't try to parse it. Instead, I just pass it along to the LLM along with the list of files found in the directory. If the recursive field is true, the list of files will include the files in subdirectories as well.

I'll start by defining the Agent which calls the LLM to exclude files based on the details provided from the prompt.

class DirectoryFiles(BaseModel):
    included_files: list[str]
    excluded_files: list[str]

file_excluding_agent = Agent(
    name="File Excluding Agent",
    instructions="""
    You are an agent that processes a list of file paths and excludes any files that should not be processed based on the
    details provided. The details may or may not include directions like "process csv files" or "ignore log files", and
    if they do then the appropriate files should be excluded. The details may also contain information related to how
    the contents of the files should be handled. Those directives you should ignore as it is not your job to read the
    files and process them. The lists of both the included and excluded files should be returned.
    """,
    output_type=DirectoryFiles
)

Our instructions tell our agent that we will provide it a list of file paths, and want it to exclude any files that should not be processed based on the details provided by the user's prompt. The output type is a DirectoryFiles object which contains two lists of strings, one for the included files, and one for the excluded files.

Now that we have that defined, we can loop through all of our sources, appending to a list of local files as we go. If the source is a local file, we just add it to the list. If the source is a remote file, we download it to a temporary directory and add it to the list. If the source is a directory, we use the agent as described above and add the included files to the list. If the source is a web crawl or search, we add a SourceSchema object with an error message indicating that we cannot determine the schema for web sources currently.

    local_files = []

    for source in sources:
        if source.type == SourceTypeEnum.LOCAL_FILE:
            local_files.append(source)
        if source.type == SourceTypeEnum.CRAWL_WEBSITE or source.type == SourceTypeEnum.SEARCH_WEB:
            schemas.append(SourceSchema(source=source, info="Schema cannot be determined for web search or crawl sources", types=None, error=""))
        elif source.type == SourceTypeEnum.DIRECTORY:
            new_paths = []

            if source.recursive:
                # Recursively walk the directory and add each file as a local_file source
                for path, sub_dirs, files in os.walk(source.source):
                    for file in files:
                        new_paths.append(os.path.join(path, file))
            else:
                for entry in os.listdir(source.source):
                    full_path = os.path.join(source.source, entry)
                    if os.path.isfile(full_path):
                        new_paths.append(full_path)

            if len(new_paths) > 0:
                dir_files = await file_excluding_agent.run(conn, f"The files are: {new_paths}. The details are: {source.details}", sess)
                for file in dir_files.included_files:
                    local_files.append(Source(source=file, type=SourceTypeEnum.LOCAL_FILE, details=source.details))

        elif source.type == SourceTypeEnum.FILE_AT_URL:
            dest = os.path.join(tempDir, os.path.basename(source.source))
            response = requests.get(source.source)
            response.raise_for_status()
            with open(dest, 'wb') as f:
                f.write(response.content)

            local_files.append(Source(source=dest, type=SourceTypeEnum.LOCAL_FILE, details=source.details))

Now we have a list of local files which we need to determine the schema of. This is trickier than it was with our database. There are many different file formats, and trying to code for all of them is an arduous task. Instead, I'll give the LLM the tools it needs to be able to look at the data and determine the schema itself.

class TypeSchema(BaseModel):
  type: str
  fields: dict[str,str]

class SourceSchema(BaseModel):
  source: Source
  types: Optional[list[TypeSchema]] = None
  info: str
  error: Optional[str] = None
  
source_schema_identifier = Agent(
    name="Data Collection Source Schema Identifier",
    instructions='''
    You are an agent that identifies the schema of data being imported from a local file.
    Open and inspect the file to determine the schema. You should read a portion of the file and infer the schema based 
    on the contents of the file. The function tools 'read_file', 'read_lines', 'file_size' and 'file_exists' may be useful 
    for this. There may or may not be additional instructions provided in the details section. It's possible that the
    details section doesn't apply to this file, and if it doesn't then ignore the details and process the file as usual.
    If the file is empty or not structured in a way that you can determine the schema, then leave the `types` empty
    and provide an explanation of why in the info field. If the file is something like a csv with a single type then 
    give the type a name and supply it's schema. If the file is something like a jsonl file with multiple types,
    then supply the `TypeSchema` for each type in the file and add it to the `types` field in the output.
    If there is an error reading the file then provide the error message.
    ''',
    output_type=SourceSchema,
    tools = [read_file, read_lines, file_exists]
)

So our source_schema_identifier agent is instructed to sample the file, and determine the schema based on its contents, returning a SourceSchema object. A file like a .jsonl file may contain multiple types, so the output type allows for multiple TypeSchema objects. The file may also be unstructured, or empty, so the types field is optional. If there is an error reading the file, the agent should provide the error message in the error field.

Now that we have our agent defined, we can loop over all of our local files and get their schemas.

    schemas = []
    for local_file in local_files:
        prompt = f'The source path is "{local_file.source}". The details are: "{local_file.details}"'
        schema = await source_schema_identifier.run(conn, prompt, sess)

        if schema.error is not None and schema.error != "":
            raise ValueError(f"Error identifying schema for source {local_file.source}: {schema.error}")

        schemas.append(schema)

    return schemas

Questioning the User

At this point, we have a general understanding of the prompt, the target database schema, and information about the data sources. Often times, this is enough to get a good result from our agent. However, this relies on the agent making correct assumptions or guessing what the user wants. We can improve the chances of success by giving our LLM the opportunity to ask the user some questions about the task with what we already know.

class PromptFollowupQuestion(BaseModel):
    question: str
    accepted_answers: Optional[list[str]] = None

class PromptFollowupQuestionsOutput(BaseModel):
    questions: list[PromptFollowupQuestion]
    preface: str

prompt_followup_questions_agent = Agent(
    name="Data Collection Prompt Followup Question Generator",
    instructions="""
    Given an objective, and some data related to that objective generate any followup questions needed to clarify the
    user's intent, along with information that you don't currently have, but would need to accomplish the given goal. 
    If the question can be answered with a yes or no then the accepted_answers field should be set to ["yes", "no"]. If 
    the question is multiple choice, and the choices are ["1. First Option", "2. Second Option", "3. Last Option"] then 
    the accepted_answers field should be set to ["1", "2", "3"]. If the question is open ended then the accepted_answers 
    field should be null. The preface field should be a brief message to the user before the questions are presented. It 
    should be formatted so that it can be printed directly to the user. If no followup questions are needed then the 
    questions field should be an empty list and the preface should be an empty string.
    """,
    output_type=PromptFollowupQuestionsOutput
)

Here the agent is instructed to generate followup questions based on the information it has. The output type contains a list of questions, each with a list of accepted answers if applicable. The output also contains a preface which is a message to the user before the questions are presented.

async def run_followup_question_generator(conn, sess, job_info, sources, sink_schema):
    try:
        prompt = f"""
The job information extracted from the prompt is: {job_info.model_dump_json()}
The schema of the destination database is: {json.dumps(sink_schema)}
The source of import data is: {list_of_basemodels_to_json(sources)}
"""

        followup = await prompt_followup_questions_agent.run(conn, prompt, sess)

    except Exception as e:
        print(f"Error generating followup questions: {e}")
        sys.exit(1)

    question_to_answer_map = {}
    if len(followup.questions) > 0:
        print(followup.preface)

        for question in followup.questions:
            accepted_answers = []
            accepted_answers_str = ""

            if question.accepted_answers is not None:
                accepted_answers = [ans.lower() for ans in question.accepted_answers]
                accepted_answers_str = f" [{'/'.join(question.accepted_answers)}]"

            while True:
                answer = input(f"{question.question}{accepted_answers_str}\n> ")

                if question.accepted_answers is not None and len(question.accepted_answers) > 0:
                    lwr_answer = answer.lower()
                    if lwr_answer not in accepted_answers:
                        print(f"Answer must be one of: {', '.join(question.accepted_answers)}")
                        continue

                break

            question_to_answer_map[question.question] = answer

    return question_to_answer_map

This code simply runs the agent, then loops over the questions, prompting the user for answers. If the question has a list of accepted answers, it will validate the user's answer against that list. The result is a map of questions to answers, which we can add to our context as needed.

Propose Database Changes

Before we import our data, we may need to create or modify tables in the target database. To determine the necessary tables, we'll look at the prompt, along with the source schemas and the target database scheme. We can then propose the necessary database changes, then we'll let the user approve them before we make any changes to the database.

class InferSinkTablesOutput(BaseModel):
    existing_tables_to_import_to: list[str] = [] # tables that already exist in the sink database to import data into
    new_tables_to_create: dict[str, str] = {} # tables that need to be created in the sink database to import data into
    table_alter_statements: list[str] = [] # alter table statements to modify existing tables in the sink database if needed
    error : Optional[str] = None, # an error message if something went wrong during the inference process
    plan: str = "" # a plan for how to import the data into the sink database once the tables are created

infer_sink_table_agent = Agent(
    name = "Sink Table Inferring Agent",
    instructions = """
You are a data import assistant that takes the initial job info, a dictionary of questions about the job and their answers,
the schemas for all data sources, and the schema for the existing sink database, and attempts to infer existing tables 
that data should be imported into, along with tables that need to be created. For tables that need to be created you need 
to provide a SQL create table statement that can be used to create the table in the sink database containing the 
appropriate schema, primary key, and foreign keys. If existing tables need to be altered a list of alter statements can 
be provided to make those changes. If you are unable to infer the tables to import into, or the schema of the tables to 
create, then you should return an error message in the error field explaining why. Then you should write a plan for 
where the data in the sources should be put in the database once the tables are created.
""",
    output_type=InferSinkTablesOutput,
    strict=False)

This agent is instructed to look at all the data we have collected so far, and infer which tables we are going to import into. If the existing tables aren't suitable for the data, then the agent can provide a combination of create table and alter table statements to get the database into a state where it can accept the data. Additionally, the agent should provide a plan for how to import the data once the tables are created. Now this plan isn't the step by step instructions for the data import job, but rather a high level overview of where the different pieces of data should go in our database.

With a way to propose database changes, we need to be able to incorporate user feedback. The data passed to the last agent, and the agent's response has now been incorporated into the context. So we can reference it in our instructions to the LLM.

table_creation_feedback_agent = Agent(
    name = "Sink Table Creation Feedback Agent",
    instructions = """
You are a data import assistant that looks back at the previously proposed list of tables to create, and tables to alter 
and incorporates feedback from the user. You should return an updated list of existing tables to import into, new tables to create with
their create table statements, alter table statements if needed, and an updated plan for where the data in the sources 
should be put in the database once the tables are created and the alter statements are executed.
""",
    output_type = InferSinkTablesOutput,
    strict=False
)

With our agents defined, we will have an LLM make a proposal, then we will loop showing the proposal to the user and asking for feedback until the user approves the proposal.

async def infer_sink_tables(conn, sess, job_info, sink_schema:dict[str,str], sources:list[BaseModel], question_to_answer_map : dict[str,str]) -> tuple[str, list[str]]:
    # First Proposal
    try:
        prompt = f"""
The job information extracted from the prompt is: {job_info.model_dump_json()}
The questions and answers about the job are: {json.dumps(question_to_answer_map)}
The create table statements for the existing tables in the sink database are: {json.dumps(sink_schema)}
The source of import data is: {list_of_basemodels_to_json(sources)}
"""
        output = await infer_sink_table_agent.run(conn, prompt=prompt, session=sess)
    except Exception as e:
        print(f"Error inferring sink tables: {e}")
        sys.exit(1)

    # Loop until user approves the proposed database changes
    while True:
        if output.error is not None and len(output.error) > 0:
            print(f"Error inferring sink tables: {output.error}")
            sys.exit(1)

        # Show the proposed changes to the user
        
        if len(output.existing_tables_to_import_to) > 0:
            print(f"\nSome or all of the data will be imported into the following existing tables:")
            for table in output.existing_tables_to_import_to:
                print(f"Table: {table}")
        else:
            print("\nNo existing tables will have data imported into them.")

        if len(output.new_tables_to_create) > 0:
            print(f"\nThe following new tables need to be created:")
            for table, schema in output.new_tables_to_create.items():
                print(f"Table: {table}", f"Schema: {schema}", "", sep="\n")
        else:
            print("\nNo new tables need to be created.")

        if len(output.table_alter_statements) > 0:
            print(f"\nThe following alter table statements need to be executed:")
            for alter in output.table_alter_statements:
                print(f"  {alter}")

        print("The plan for where the source data will go once the tables are created is:")
        print(output.plan)
        print()

        # Get user feedback
        answer = ""
        while answer not in ["1", "2", "3"]:
            print(f"""Would you like to:
1. Continue with this plan to create {len(output.new_tables_to_create)} and import data into {len(output.existing_tables_to_import_to)} existing tables.
2. Make changes to the import destination plan, such as changing table names or schema, adding or removing tables to create, or designate the different table to import into.
3. Exit and create the tables manually, or change the prompt to specify existing tables.""")
            answer = input("(1/2/3)> ")
            print()

        if answer == "1":
            # We have approval. Return the plan and the queries to execute
            queries = output.table_alter_statements
            for create_statement in output.new_tables_to_create.values():
                queries.append(create_statement)

            return output.plan, queries
        elif answer == "2":
            # The user wants to make changes
            try:
                # Get the user's feedack
                prompt = input("What changes would you like to make to the schema or table names?> ")
                print()
                
                # incorporate the feedback then re-enter the loop
                output = await table_creation_feedback_agent.run(conn, prompt=prompt, session=sess)
            except Exception as e:
                print(f"Error updating inferred sink tables: {e}")
                sys.exit(1)
        else:
            sys.exit(1)

This function returns both the import overview, and the list of queries to execute to create or alter tables in the target database. The calling code then executes the queries like so:

    overview, queries = await infer_sink_tables(openai_conn, sess, job_info, sink_schema, source_schemas, question_to_answer_map)

    if queries is not None and len(queries) > 0:
        print("updating database)")

        if len(queries) > 0:
            # Execute the queries to create/alter tables in the sink database
            exec_sink_queries(job_info.sink, queries)
            sink_schema = get_sink_schema(job_info.sink)

Creating and Executing the Data Import Plan

We've done a lot of work up front to improve the chances of success for our data import job. Now it's time to actually import the data. The final step is to create a plan for how to import the data, let the user provide feedback again, and then execute that plan. It seems straightforward, but a problem I ran into time and again was it only processing a portion of the data. One issue is that the LLM has a limit to the number of tool calls it can make for a single LLM API call. If a tool can get 1000 rows of data, and one of the files has 10,000 rows, then the LLM wouldn't be able to process all the data for a file in one LLM call.

One way I attempted to solve this problem was to allow the LLM to create a plan for the data import that included loops so that it could process the data in batches. This is still something I'm playing with, but haven't been able to achieve consistent results with so far. Instead, allowing agents to create python scripts to execute a stage in the data import process has been much more successful. Now I don't know how well this would work for all types of data. But this has worked well with .csv or .jsonl files containing a single type of data. More test cases are needed to see how well this approach works in general, and I imagine that both cases need to be supported in the future. There may be additional techniques worth exploring as well.

class Plan(BaseModel):
  steps: list[str]
  summary: Optional[str] = None
  
async def run_plan_agent(conn, sess, temp_dir, source_schemas:list[SourceSchema], db_schema:dict[str,str], destination_plan:str) -> Agent:
    instructions = f"""
    You are a task planning assistant that creates a step-by-step plan to move data from one or more data source to a mysql 
    compatible database. Given information on the sources of information, the schema of the destination database, and a 
    plan for where data should be moved from and to, Your task is to create a detailed plan that outlines the steps needed
    to transform and transfer data from the source schema to the sink schema and insert it into the sink database. The tables in the sink already
    exist and have the schema provided.
    
    The amount of data being processed may be very large, so consider techniques for processing the data locally using
    tools. For local file processing, you may consider writing a python script to transform or filter data to
    the destination schema and output the results to another file and processing that. You might then consider writing a
    python script to read the transformed data and insert it into the database using the credentials provided for the sink,
    and then execute that python script.
    
    The tools that will be available in each processing step are:
        - read_file - for reading local files in their entirety or in batches
        - read_lines - for reading local text files in batches of lines
        - file_write - for creating or opening local files for writing
        - file_exists - for determining if a local file exists
        - file_size - for getting the size of a local file
        - directory_exists - for determiting if a local directory exists 
        - directory_list - for iterating over files in a directory 
        - read_from_url - for reading the bytes of a file in chunks
        - download_file - for downloading a file from a url to a local file
        - mysql_query - executing a query against a mysql compatible database
        - psql_query - executing a query against a postgresql compatible database
        - execute_python_script - for executing a python script in a local file
        
    A temp directory will be provided for storing files locally, including python scripts, and any files downloaded or created during processing.
    You should use this directory for any local files you need to create or download, but there is no need to copy files that are already local
    to this directory if they don't need to be transformed.
    
    The plan should include:
    1. A list of steps to be taken in order.
    2. A brief summary of the overall plan.
    """

    prompt = f"The source are {list_of_basemodels_to_json(source_schemas)}\n"
    prompt += f"\nThe database schema is {json.dumps(db_schema)}\n"
    prompt += f"\nThe plan for where data should be moved from and to is: {destination_plan}\n"
    prompt += (f"\nYou can use the temp directory at {temp_dir} for any local files you need to create or download"
               f"(but there is no need to copy files that are already local here).\n")

    try:
      plan = await Agent(
        name="Task Planning Agent",
        instructions=instructions,
        output_type=Plan,
      ).run(conn, prompt, sess)
    except Exception as e:
      print(f"Error creating plan: {e}")
      sys.exit(1)

From here we can do the same thing we did with our database changes and provide the user the opportunity to approve the plan, alter it, or abort the data import job. Once the user approves the plan, we can execute it step by step.

    for i, step in enumerate(plan.steps):
        print(f"Executing step: {step}")
        try:
            agent = execute_step_agent_for_step(step, i)
            print(f"Executing step {i}: {step}")
            result = await agent.run(openai_conn, step, sess)
            print(f"Step {i} result: {result}")
        except Exception as e:
            print(f"Error executing step '{step}': {e}")
            sys.exit(1)

Before Running our First Data Import Job

There are a few things you need to do before you can run your first data import job. First, you'll need to run dolt sql-server to start a MySQL compatible database server. Before running the server, I'll create the directory data_import_tests and then create the database families, and families2 (for two separate tests) then I will start the server like so:

~/dev/test>mkdir data_import_tests
~/dev/test>cd data_import_tests/
~/dev/test/data_import_tests>dolt sql -q 'CREATE DATABASE families'
~/dev/test/data_import_tests>dolt sql -q 'CREATE DATABASE families2'
~/dev/test/data_import_tests>dolt sql-server --host 127.0.0.1
Starting server with Config HP="127.0.0.1:3306"|T="28800000"|R="false"|L="info"
INFO[0000] Server ready. Accepting connections.

Now that our server is running, we need to set up our Python environment. You can do this by creating a virtual environment and installing the dependencies like so:

~/dev/d0>python3 -m venv .venv
~/dev/d0>source .venv/bin/activate
(.venv) ~/dev/d0>pip install -r requirements.txt

Finally, you'll need to set the OPENAI_API_KEY environment variable to your OpenAI API key. You can do this by running

(.venv) ~/dev/d0>export OPENAI_API_KEY="your_api_key_here"

Now you should be ready to run your first data import job. like so:

(.venv) ~/dev/d0>PYTHONPATH=src python3 -m d0_file_import <PROMPT>

Some Usage Examples

So I have four csv files in ~/datasets/csv/families which contain the following data:

~/datasets/csv/families>head -n 1 *
==> birthdays.csv <==
person_id,date_of_birth

==> marriages.csv <==
person1_id,person2_id,marriage_date,divorce_date

==> parents.csv <==
person_id,mother_id,father_id

==> people.csv <==
id,name,age

We will start with the simplest prompt from our earlier examples:

(.venv) ~/dev/d0>PYTHONPATH=src python3 -m d0_file_import 'There is MySQL compatible Dolt database server running on 127.0.0.1:3306 which you can connect to using the user 'root' with no password. Import /Users/brian/datasets/csv/families/people.csv into database: "families" table: "people". Only import users that are 21 or over.'
Extracting information from prompt...

This is my understanding of the job:
Import the local CSV file '/Users/brian/datasets/csv/families/people.csv', filtering to users 21 and over, into the 'people' table in the 'families' database on the MySQL-compatible Dolt server at 127.0.0.1:3306 using the 'root' user with no password.
Is this correct? (y/n)> y

Retrieving schema from sink database...
Investigating data sources...

To proceed with importing the CSV data into the MySQL-compatible Dolt database, I need a few more details to ensure everything matches your expectations:

What is the schema of the 'people' table in the 'families' database, or should it be created based on the CSV file structure (id: integer, name: string, age: integer)?
> create it

If the 'people' table already exists and has a different schema, how should column mismatches be handled (e.g., ignore extra columns, add missing columns, abort)?
> abort

Should existing rows in the 'people' table be replaced if 'id' values in the CSV already exist in the table? [yes/no]
> yes

Comparing data sources to the database schema...
No existing tables will have data imported into them.

The following new tables need to be created:

Table: people
Schema: CREATE TABLE people (
  id INT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  age INT NOT NULL
);

The plan for where the source data will go once the tables are created is:
Create the 'people' table in the 'families' database using the schema:
- id (integer, PRIMARY KEY)
- name (string/VARCHAR)
- age (integer)

Once the table is created, import all rows from '/Users/brian/datasets/csv/families/people.csv' where the age column is greater than or equal to 21. If any row in the CSV has an existing 'id' in the table, the import should replace that row with the new data. If a mismatch in schema is detected on import (e.g., if the table already exists but doesnt match the expected structure), abort the import job as per instructions.

Would you like to:
1. Continue with this plan to create 1 and import data into 0 existing tables.
2. Make changes to the import destination plan, such as changing table names or schema, adding or removing tables to create, or designate the different table to import into.
3. Exit and create the tables manually, or change the prompt to specify existing tables.
(1/2/3)> 1

Planning data import...
Data import plan created:
steps: 
  1. Connect to the 'families' MySQL database on 127.0.0.1:3306 using username 'root' and no password.
  2. Check if the 'people' table exists in the 'families' database. If it does, verify the schema matches: id (int, PRIMARY KEY), name (varchar(255) NOT NULL), age (int NOT NULL). If there is any schema mismatch, abort the job.
  3. If the 'people' table does not exist, create it using: CREATE TABLE `people` ( `id` int NOT NULL, `name` varchar(255) NOT NULL, `age` int NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin;
  4. Write a Python script in ./.d0/5110f93b-96d9-451d-b283-a284c5f120ca/filter_people_at_least_21.py. This script should read people.csv, filter for rows with age >= 21, and output a new CSV to ./.d0/5110f93b-96d9-451d-b283-a284c5f120ca/people_at_least_21.csv with the same columns (id,name,age).
  5. Execute the Python script to produce people_at_least_21.csv.
  6. Write a second Python script in ./.d0/5110f93b-96d9-451d-b283-a284c5f120ca/import_people_to_mysql.py. This script should read people_at_least_21.csv and insert or replace rows in the 'people' table of the 'families' database (use REPLACE INTO or ON DUPLICATE KEY UPDATE to update rows with existing ids).
  7. Execute the second Python script to import the filtered people into the MySQL database.
  8. Verify that all rows in the 'people' table have age >= 21 by running a SELECT query: SELECT COUNT(*) FROM people WHERE age < 21; (should return 0, otherwise report an error)."] summary="This plan filters the source CSV file for users at least 21 years of age, creating a new intermediate CSV, and then inserts or replaces corresponding rows in the 'people' table of the MySQL-compatible Dolt database, ensuring the final data matches the required schema and constraints. All processing is done locally using Python scripts, and data integrity is verified post-import.

Would you like to:
1. Execute this plan to import the data as described.
2. Make changes to the import the plan.
3. Abort the data import job.
(1/2/3)> 1

Executing step: 1. Connect to the 'families' MySQL database on 127.0.0.1:3306 using username 'root' and no password.
Step 1 succeeded

Executing step: 2. Check if the 'people' table exists in the 'families' database. If it does, verify the schema matches: id (int, PRIMARY KEY), name (varchar(255) NOT NULL), age (int NOT NULL). If there is any schema mismatch, abort the job.
Step 2 succeeded

Executing step: 3. If the 'people' table does not exist, create it using: CREATE TABLE `people` ( `id` int NOT NULL, `name` varchar(255) NOT NULL, `age` int NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin;
Step 3 succeeded

Executing step: 4. Write a Python script in ./.d0/5110f93b-96d9-451d-b283-a284c5f120ca/filter_people_at_least_21.py. This script should read people.csv, filter for rows with age >= 21, and output a new CSV to ./.d0/5110f93b-96d9-451d-b283-a284c5f120ca/people_at_least_21.csv with the same columns (id,name,age).
Step 4 succeeded

Executing step: 5. Execute the Python script to produce people_at_least_21.csv.
Step 5 succeeded

Executing step: 6. Write a second Python script in ./.d0/5110f93b-96d9-451d-b283-a284c5f120ca/import_people_to_mysql.py. This script should read people_at_least_21.csv and insert or replace rows in the 'people' table of the 'families' database (use REPLACE INTO or ON DUPLICATE KEY UPDATE to update rows with existing ids).
Step 6 succeeded

Executing step: 7. Execute the second Python script to import the filtered people into the MySQL database.
Step 7 succeeded

Executing step: 8. Verify that all rows in the 'people' table have age >= 21 by running a SELECT query: SELECT COUNT(*) FROM people WHERE age < 21; (should return 0, otherwise report an error).
Step 8 succeeded

We've done it. I can now verify that the data was imported correctly by running a few queries against the database.

(.venv) ~/dev/d0>mysql -h127.0.0.1 -uroot -Dfamilies
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 25
Server version: 8.0.33 Dolt

Copyright (c) 2000, 2025, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> select count(*) from people;
+----------+
| count(*) |
+----------+
|       33 |
+----------+
1 row in set (0.001 sec)

mysql> select count(*) from people where age < 21;
+----------+
| count(*) |
+----------+
|        0 |
+----------+
1 row in set (0.002 sec)

mysql> exit
Bye
(.venv) ~/dev/d0>wc -l /Users/brian/datasets/csv/families/people.csv
      50 /Users/brian/datasets/csv/families/people.csv

So we can see that 33 rows were imported into the people table, and all of them have an age greater than or equal to 21, as requested.

Now we'll try a more complicated prompt from our examples.

(.venv) ~/dev/d0>PYTHONPATH=src python3 -m d0_file_import "There is MySQL compatible Dolt database server running on 127.0.0.1:3306 which you can connect to using the user 'root' with no password. Import all CSV files in /Users/brian/datasets/csv/families/ into the database: "families2". Create tables as needed."
Extracting information from prompt...

This is my understanding of the job:
Import all CSV files from /Users/brian/datasets/csv/families/ into the MySQL-compatible Dolt database 'families2' at 127.0.0.1:3306 as user 'root', creating tables as needed.
Is this correct? (y/n)> y

Retrieving schema from sink database...
Investigating data sources...

Before importing the CSV files into the database, I need a few clarifications:
Should primary and foreign key constraints be set up between the tables (e.g., linking person_id in other tables to the id in people)? [yes/no]
> yes
Should the target tables overwrite any existing tables with the same name, or should import be aborted if they already exist? [overwrite/abort]
> abort
If some columns in the CSV (like divorce_date) can be null, should the corresponding table columns allow NULL values? [yes/no]
> yes

Comparing data sources to the database schema...
No existing tables will have data imported into them.

The following new tables need to be created:
Table: people
Schema: CREATE TABLE people (
    id INT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    age INT NOT NULL
);

Table: birthdays
Schema: CREATE TABLE birthdays (
    person_id INT,
    date_of_birth DATE NOT NULL,
    PRIMARY KEY (person_id),
    FOREIGN KEY (person_id) REFERENCES people(id)
);

Table: marriages
Schema: CREATE TABLE marriages (
    person1_id INT,
    person2_id INT,
    marriage_date DATE NOT NULL,
    divorce_date DATE NULL,
    PRIMARY KEY (person1_id, person2_id, marriage_date),
    FOREIGN KEY (person1_id) REFERENCES people(id),
    FOREIGN KEY (person2_id) REFERENCES people(id)
);

Table: parents
Schema: CREATE TABLE parents (
    person_id INT PRIMARY KEY,
    mother_id INT,
    father_id INT,
    FOREIGN KEY (person_id) REFERENCES people(id),
    FOREIGN KEY (mother_id) REFERENCES people(id),
    FOREIGN KEY (father_id) REFERENCES people(id)
);

The plan for where the source data will go once the tables are created is:
1. Create all four tables as specified with appropriate primary and foreign keys.
2. Import people.csv into the people table.
3. Import birthdays.csv into the birthdays table (each entry links to a person via person_id).
4. Import marriages.csv into the marriages table (each row represents two people getting married and optionally divorced).
5. Import parents.csv into the parents table (each row specifies the mother and father, by id, for a given person).
No existing tables will be modified or overwritten, and import will abort if tables of the same name already exist.

Would you like to:
1. Continue with this plan to create 4 and import data into 0 existing tables.
2. Make changes to the import destination plan, such as changing table names or schema, adding or removing tables to create, or designate the different table to import into.
3. Exit and create the tables manually, or change the prompt to specify existing tables.

This time we'll make a change to the proposed plan. Instead of creating a separate birthdays table, we'll add a date_of_birth column to the people table. As a result we expect the updated plan to create 3 tables instead of 4, and to alter the people table to add the new column.

(1/2/3)> 2

What changes would you like to make to the schema or table names?> instead of creating the table 'birthdays' add a date_of_birth column to the table 'people'.

No existing tables will have data imported into them.

The following new tables need to be created:
Table: people
Schema: CREATE TABLE people (
    id INT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    age INT NOT NULL,
    date_of_birth DATE
);

Table: marriages
Schema: CREATE TABLE marriages (
    person1_id INT,
    person2_id INT,
    marriage_date DATE NOT NULL,
    divorce_date DATE NULL,
    PRIMARY KEY (person1_id, person2_id, marriage_date),
    FOREIGN KEY (person1_id) REFERENCES people(id),
    FOREIGN KEY (person2_id) REFERENCES people(id)
);

Table: parents
Schema: CREATE TABLE parents (
    person_id INT PRIMARY KEY,
    mother_id INT,
    father_id INT,
    FOREIGN KEY (person_id) REFERENCES people(id),
    FOREIGN KEY (mother_id) REFERENCES people(id),
    FOREIGN KEY (father_id) REFERENCES people(id)
);

The plan for where the source data will go once the tables are created is:
1. Create the 'people' table with an additional 'date_of_birth' column.
2. Create the 'marriages' and 'parents' tables with their respective keys and foreign key constraints.
3. Import people.csv into 'people' (id, name, age). Then update 'people' using birthdays.csv to fill in date_of_birth by matching on id = person_id.
4. Import marriages.csv into 'marriages'.
5. Import parents.csv into 'parents'.
No separate 'birthdays' table is needed. Abort if any of these tables already exist.

Would you like to:
1. Continue with this plan to create 3 and import data into 0 existing tables.
2. Make changes to the import destination plan, such as changing table names or schema, adding or removing tables to create, or designate the different table to import into.
3. Exit and create the tables manually, or change the prompt to specify existing tables.
(1/2/3)> 1

Planning data import...
Data import plan created:
steps
1. Check if the tables 'people', 'marriages', and 'parents' already exist in the 'families2' database. Abort the process if any exist, as per instructions.
2. Using the provided CREATE TABLE statements, ensure the three tables (people, marriages, parents) are created exactly as specified if they do not exist.
3. Write a Python script (for example, '.d0/62aede55-73a7-43e2-add7-9bec09a9aadb/import_people.py') that reads 'people.csv' and imports its rows (id, name, age) into the 'people' table without modifying date_of_birth.
4. Write another Python script (e.g., '.d0/62aede55-73a7-43e2-add7-9bec09a9aadb/update_birthdays.py') to read 'birthdays.csv' and perform an UPDATE on 'people' to set date_of_birth for each id.
5. Write a Python script ('.d0/62aede55-73a7-43e2-add7-9bec09a9aadb/import_marriages.py') to read 'marriages.csv' and insert rows into the 'marriages' table.
6. Write a Python script ('.d0/62aede55-73a7-43e2-add7-9bec09a9aadb/import_parents.py') to read 'parents.csv' and insert rows into the 'parents' table.
7. Execute the scripts in sequence: import_people.py, update_birthdays.py, import_marriages.py, import_parents.py, ensuring that if any error occurs (such as foreign key failure), the process logs the error and aborts.']

Would you like to:
1. Execute this plan to import the data as described.
2. Make changes to the import the plan.
3. Abort the data import job.
(1/2/3)> 1

Executing step 1: Check if the tables 'people', 'marriages', and 'parents' already exist in the 'families2' database. Abort the process if any exist, as per instructions.
Step 1 succeeded

Executing step 2: Using the provided CREATE TABLE statements, ensure the three tables (people, marriages, parents) are created exactly as specified if they do not exist.
Step 2 succeeded

Executing step 3: Write a Python script (for example, '.d0/62aede55-73a7-43e2-add7-9bec09a9aadb/import_people.py') that reads 'people.csv' and imports its rows (id, name, age) into the 'people' table without modifying date_of_birth.
Step 3 succeeded

Executing step 4: Write another Python script (e.g., '.d0/62aede55-73a7-43e2-add7-9bec09a9aadb/update_birthdays.py') to read 'birthdays.csv' and perform an UPDATE on 'people' to set date_of_birth for each id.
Step 4 succeeded

Executing step 5: Write a Python script ('.d0/62aede55-73a7-43e2-add7-9bec09a9aadb/import_marriages.py') to read 'marriages.csv' and insert rows into the 'marriages' table.
Step 5 succeeded

Executing step 6: Write a Python script ('.d0/62aede55-73a7-43e2-add7-9bec09a9aadb/import_parents.py') to read 'parents.csv' and insert rows into the 'parents' table.
Step 6 succeeded

Executing step 7 : Execute the scripts in sequence: import_people.py, update_birthdays.py, import_marriages.py, import_parents.py, ensuring that if any error occurs (such as foreign key failure), the process logs the error and aborts.
Step 7 succeeded

Now we can verify that the data was imported correctly by running a few queries against the database.

~>cd datasets/csv/families/
~/datasets/csv/families>wc -l *.csv
      50 birthdays.csv
      21 marriages.csv
      44 parents.csv
      50 people.csv
      165 total

~/datasets/csv/families>mysql -h127.0.0.1 -uroot -Dfamilies2
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 63
Server version: 8.0.33 Dolt

Copyright (c) 2000, 2025, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> SELECT count(*) from people;
+----------+
| count(*) |
+----------+
|       50 |
+----------+
1 row in set (0.001 sec)

mysql> SELECT count(*) from parents;
+----------+
| count(*) |
+----------+
|       44 |
+----------+
1 row in set (0.001 sec)

mysql> SELECT count(*) from marriages;
+----------+
| count(*) |
+----------+
|       21 |
+----------+
1 row in set (0.001 sec)

So it looks like all the data exists... What about the birthdays?

+----+-------------------+-----+---------------+
| id | name              | age | date_of_birth |
+----+-------------------+-----+---------------+
|  1 | Margaret Thompson |  87 | 1937-03-15    |
|  2 | Robert Thompson   |  89 | 1935-08-22    |
|  3 | Dorothy Wilson    |  84 | 1940-11-07    |
|  4 | James Wilson      |  86 | 1938-02-14    |
+----+-------------------+-----+---------------+
4 rows in set (0.003 sec)

mysql> exit
Bye

~/datasets/csv/families>head -n 5 people.csv
id,name,age
1,Margaret Thompson,87
2,Robert Thompson,89
3,Dorothy Wilson,84
4,James Wilson,86
~/datasets/csv/families>head -n 5 birthdays.csv
person_id,date_of_birth
1,1937-03-15
2,1935-08-22
3,1940-11-07
4,1938-02-14

Was it worth it?

The greatest benefit of doing this work was the learnings made during the process. I don't doubt that there is great value in having an agent that is really good at importing data from any source into a database. But thus far, the results have been mixed, and there is still a lot of work to be done. Using LLMs in your programs provides some great benefits, but their non-deterministic nature, and their hallucinations makes it difficult to build reliable products. Every time I run the agent I don't know what it's going to do. There may be people, or companies that have solved these problems, but they seem to be keeping the secrets to themselves. Concrete examples of agent development is in short supply, and I hope that by sharing my experiences, both good and bad, that others will be able to build on this work and create something truly useful.

Conclusion

So there you have it. A data import tool that can take a natural language prompt, and import data from one or more sources into a Dolt database. The tool is still a work in progress, and there are many improvements that can be made. There are certainly cases it doesn't handle well, and the nondeterministic nature of LLMs means that it won't always produce the same results. But I believe this is a step in the right direction for making data import easier and more accessible to a wider audience. There are still many areas I want to explore, and features I want to add. If you'd like to try it, then come talk to us on our Discord and let us know that you'd like early access to the tool.

SHARE

JOIN THE DATA EVOLUTION

Get started with Dolt

Or join our mailing list to get product updates.