Getting Started With The TypeScript SDK

The Node.js client SDK for https://www.deepstructure.io

Introduction

DeepStructure makes it easy to create powerful data pipelines backed by SQL databases. A Workflow pipes together multiple WorkflowComponents, connecting them with database tables. The output from one component becomes the input to another. Imagine the cells in a spreadsheet automatically updating in as new data is entered--components in a workflow operate in a similar way.

A workflow's components are stand-alone transformers, accepting data (from one table), modifying it (e.g., "summarize with an LLM"), and storing the result (in another table). Components are reusable and can be altered via configuration updates without writing new code.

Programmatically, we define our data pipelines using classes like Workflow, Table, and various WorkflowComponents like SummarizationComponent and RestApiComponent.

Quickstart

Installation

Install the DeepStructure SDK:

npm install @deepstructure/sdk

Creating a Simple Application

The SDK can be used in both JavaScript (CommonJS or ESM) and TypeScript projects. For example, to create a DeepStructure application named app.mjs (or set { "type": "module" } in your package.json):

import { Application, SQLite, SummarizationComponent } from "@deepstructure/sdk";

// Create and export an Application instance
export const application = new Application();

// Use the database automatically created for each app
const db = application.database;

// Define tables to store values in the workflow
const documents = db.table("documents");
const summaries = db.table("summaries");

// Define a WorkflowComponent
const summary = new SummarizationComponent({
    input: documents,
    output: summaries,
    prompt: "Summarize the text to a maximum of 140 characters",
});

// Add the workflow to the application
application.addWorkflow(documents.pipe(summary));

This example creates a simple application that reads data from a documents table, summarizes the text using an LLM, and stores the result in a summaries table.

Application

Creating a DeepStructure application requires an Application instance. The Application instance lets us define one or more Workflows (see below), then build a file we can deploy to DeepStructure. We create an Application instance like this:

import { Application } from "@deepstructure/sdk";

const app = new Application();

We'll use this app as we begin to define our workflows below.

An Application can also include options when created, including:

  • environment: an optional object containing environment variables to use when the application is deployed. NOTE: in most situations, defining environment variables and secrets using the ds command line tool is the preferred method, and the Application's environment property is useful for testing or quick experiments.
const app = new Application({
    environment: {
        OPENAI_API_KEY: process.env.OPENAI_API_KEY,
        LOG_LEVEL: "debug",
    },
});

Working with Databases

When the Application is created, it will use a Postgres connection string URI (DS_DB) to create a new Postgres database connection. By default, all deployed DeepStructure applications have their own Postgres database created automatically. You can use this database instance when you need to access database objects (e.g., Table).

The data we use in an Application is always written to intermediate database tables. These tables form part of a workflow's structure, since data is piped from one component to another via a Table.

Table

A Table is created or accessed using the application database's .table() method:

const usersTable = application.database.table("users");

Various table configuration options can also be specified. For example, in the following code we define a table named data, which includes a foreign key named query_id and also specify that the primary key is a UUID:

const table = application.database.table("data", {
    foreign_key_column: "query_id",
    primary_key_uuid: true,
});

As just demonstrated, the Tables used in a workflow can be explicitly defined by the user. However, you can also have your Application implicitly create them for you. Implicit tables work well when you need tables between components in a workflow that aren't shared or referenced in other parts of your application. For example: if you need to have multiple workflows share a table, or are using a custom sql statement to create a view or otherwise pull data from previous tables, you'll want to explicitly name and control these tables yourself. However, in the common case, where you are simply connecting two components, letting the application manage the tables for you makes more sense.

const application = new Application()
    // ...
    .pipe(new UrlScraperComponent())
    // A new table in the default database will be created between these two components
    .pipe(
        new SummarizationComponent({
            prompt: "Summarize the text in fewer than 200 words",
            output: modelOutputTable,
        })
    );

Workflow Components

Workflow components transform data flowing through a workflow. For example: having an LLM process text to produce new data (e.g., summarize a document) or extracting information (e.g., find all URLs in a piece of text).

All components work in a similar way, allowing configuration data to be updated in order to control the specific outcome of the transformation:

import { SummarizationComponent } from "@deepstructure/sdk";

const summary = new SummarizationComponent({
    prompt: "Summarize the text to a maximum of 140 characters",
});

The SummarizationComponent allows us to specify a prompt. Many components also look for environment variables. For example, the SummarizationComponent expects an OPENAI_API_KEY environment variable to be set. A component's configuration comes from a mix of optional and required values passed to their constructor's and through environment variables. If a configuration option for a component is missing, an error will be thrown when the component is used in a workflow.

Most WorkflowComponents also expect Table instances via their input and output properties (NOTE: in the case that you are using implicit tables, these can be omitted):

const documents = application.database.table("documents");

const summary = new SummarizationComponent({
    // Explicitly define the `input` table used by this SummarizationComponent
    input: documents,
    prompt: "Summarize the text to a maximum of 140 characters",
});

Here we have defined our SummarizationComponent and explicitly set its input to be the documents table in our SQLite database. We can also implicitly connect components to tables as part of a Workflow.

WorkflowComponent and Table

A Workflow is created by connecting WorkflowComponents and Tables. We do this with the .pipe() method, which exists on all Table and WorkflowComponent instances (NOTE: .pipe() is also available on Workflow instances).

We could rewrite our example above to connect an input table to our SummarizationComponent:

const documents = application.database.table("documents");

const summary = new SummarizationComponent({
    prompt: "Summarize the text to a maximum of 140 characters",
});

// Implicitly define the `input` table used by the SummarizationComponent
const workflow = documents.pipe(summary);

In the final line above, documents.pipe(summary) connects the documents table to the summary component as its input. We could also add another table afterward, in order to define the component's output:

const workflow = documents.pipe(summary).pipe(sqlite.table("summaries"));

The code above connects the documents and summaries tables to the summary component via its input and output properties. If desired, users can also make this more explicit in the definition of the component. The following code has the same outcome:

const summary = new SummarizationComponent({
    input: documents,
    output: sqlite.table("summaries"),
    prompt: "Summarize the text to a maximum of 140 characters",
});

Both methods of defining the data connections between components are common.

Connecting WorkflowComponents

We've just seen how to connect a Table to a WorkflowComponent, and we can use these same ideas to connect two WorkflowComponents via tables:

const modelOutputTable = db.table("model_output");

const workflow = new RestApiComponent({
    input: modelOutputTable,
    route: "/summarize",
    postParams: ["value"],
})
    .pipe(db.table("user_input"))
    .pipe(
        new SummarizationComponent({
            prompt: "Summarize the text in fewer than 200 words",
            output: modelOutputTable,
        })
    );

Here we create two components: a RestApiComponent and a SummarizationComponent. The RestApiComponent will listen for clients to POST content to a given URL, then pipe that data to the SummarizationComponent via the user_input table. When the summarization is complete, the resulting summary is piped to the model_output table.

Wrapping a Workflow with a RestApiComponent

In the previous example, the RestApiComponent waits for the summary and returns it to the user. It's also possible to express this workflow pattern using the Workflow's .wrap() method. When a workflow's data needs to start and end at the same component (e.g., a workflow that is triggered by, and responds via a REST API endpoint), we can wrap the workflow with a component, thus joining its two ends in a loop.

Consider the following example, where the workflow moves through multiple components until it is ultimately wrapped by a RestApiComponent:

const workflow = .pipe(new UrlScraperComponent())
    .pipe(db.table("scraped"))
    .pipe(
        new ChatCompletionComponent({
            model: "gpt-4",
        })
    )
    .pipe(db.table("chat_response"))
    .wrap(
        new RestApiComponent({
            route: "/query",
            output: db.table("rest_output"),
            postParams: ["value"],
        })
    );

In this example the workflow has no initial input table. Instead, it is wrapped with a RestApiComponent, which will supply the input. The workflow begins when the REST API receives a value, which will be fed into the input of the UrlScraperComponent. The data will flow through the workflow until it ends in the input table of the RestApiComponent, completing the loop. Because wrap() forms a loop, a Workflow should only be wrapped once.

Unlike wrap(), which can only be called on a Workflow, the .pipe() method can be used on any Table, WorkflowComponent, or Workflow to create a new Workflow that connects everything into a single flow. Each connection in the workflow requires database tables to be set via input and output (either implicitly or explicitly).

NOTE: when wrapping a workflow, the component passed to .wrap() must define an output table, which will then be connected to the first component as input. With this approach, it's common to not set an input table on the first component, knowing it will come from the final, wrapped component's output.

Wrapping a Workflow with .wrapOutput()

A workflow can also be "wrapped" by using .wrapOutput() as the last method called. This uses the output table of the final (i.e. preceding) component to feed back into the input of the first component:

const workflow = new RestApiComponent({
        route: "/query",
        postParams: ["value"],
    })
    .pipe(...)
    .wrapOutput();

The .wrapOutput() method is often easier to implement than .wrap().

Array-like Operations

In addition to using .pipe() and .wrap(), a number of common Array-like transformations are available for Tables, WorkflowComponents, and Workflows:

  • stream(): turns an Array into a set of individual, streamable items. The stream() function is used to create a batch, which the other functions below can then operate on one-by-one.
  • forEach(): runs a provided function on the value, but passes it through to the Workflow unmodified
  • map(): transform data values using a provided function
  • filter(): selectively include values that satisfy a specified condition
  • reduce(): aggregate values into a single output value (e.g., total a series of numbers) using a provided reducer function
  • collect(): collects (i.e., joins) separate elements into a single array
  • flat(): like collect() but also flattens multiple arrays into a single array
  • flatMap(): applies a mapping function to each item and flattens the result.

These methods make it easy to transform and control the flow of data through workflows. For example:

const workflow = ...
    .stream()
    .filter((record: DataRecord) => value.isActive)
    .map((record: DataRecord) => ({ ...record, updatedAt: new Date().toISOString() }))
    .pipe(...)

Naming a Workflow

A Workflow can be named. This name is helpful for observing workflow runs, debugging, or when accessing API methods for a given workflow. There are two ways set the name:

First, while building a workflow and piping components together, the .withName() method can be used, which sets the workflow's name and returns the Workflow:

const workflow = .pipe(new UrlScraperComponent())
    .pipe(db.table("scraped"))
    .pipe(
        new ChatCompletionComponent({
            model: "gpt-4",
        })
    )
    .pipe(db.table("chat_response"))
    .wrap(
        new RestApiComponent({
            route: "/query",
            output: db.table("rest_output"),
            postParams: ["value"],
        })
    )
    .withName("chat-workflow");

Second, after you've created a Workflow, you can use its .name property to set a value:

const workflow = .pipe(new UrlScraperComponent())
    .pipe(db.table("scraped"))
    .pipe(
        new ChatCompletionComponent({
            model: "gpt-4",
        })
    )
    .pipe(db.table("chat_response"))
    .wrap(
        new RestApiComponent({
            route: "/query",
            output: db.table("rest_output"),
            postParams: ["value"],
        })
    );

workflow.name = "chat-workflow";

Both methods of setting the name can be used.

Running Workflows

Once we've defined our Workflows, we can add them to our DeepStructure app:

const workflow = new RestApiComponent({
    input: modelOutputTable,
    route: "/summarize",
    postParams: ["value"],
})
    .pipe(db.table("user_input"))
    .pipe(
        new SummarizationComponent({
            prompt: "Summarize the text in fewer than 200 words",
            output: modelOutputTable,
        })
    );

// Include this workflow in our app
app.addWorkflow(workflow);

We can get information from our Application about the .workflows, .components and .databases it contains. For example, to get a list of all databases used in the workflow(s):

app.addWorkflow(workflow1, workflow2);
const databases = app.databases;
// databases contains all databases used in workflow1 and workflow2 [db, db2, db3, ...]

Using .pipe() with a targetTable

In some cases, it's necessary to pipe data into a component, but not use the input. For example, the BM25Component has both input and content tables. In order to override the usual destination of .pipe() you can pass an options object as the second argument:

// ...
.pipe(new ChunkerComponent({ documentIdColumnName: "document_id" }))
.pipe(chunksTable)
.pipe(new BM25Component(), { targetTable: "content" })
// ...

Here, the output of the ChunkerComponent is piped into the content table of the BM25Component, not the output table.

This is not usually necessary, but certain components have multiple "ports" where data can be connected, and each will specify this in the documentation.

If you try to .pipe() into a targetTable that does not exist on the component, an error will be thrown.

Building an Application for Deployment

In order to be deployed, an Application must be built to produce JSON. The JSON represents all of the Workflows, WorkflowComponents and their configurations, as well as the Databases and Tables that will be used:

const workflow = db
    .table("input_data")
    .pipe(componentOne)
    .pipe(...);

export application = new Application();
app.addWorkflow(workflow);

We can now use the ds command line tool to build (i.e., ds build) or build-and-deploy (ds deploy) our application.