The Node.js client SDK for https://www.deepstructure.io
Introduction
DeepStructure makes it easy to create powerful data pipelines backed by SQL databases. A Workflow pipes together multiple WorkflowComponents, connecting them with database tables. The output from one component becomes the input to another. Imagine the cells in a spreadsheet automatically updating in as new data is entered--components in a workflow operate in a similar way.
A workflow's components are stand-alone transformers, accepting data (from one table), modifying it (e.g., "summarize with an LLM"), and storing the result (in another table). Components are reusable and can be altered via configuration updates without writing new code.
Programmatically, we define our data pipelines using classes like Workflow
, Table
, and various WorkflowComponents
like SummarizationComponent
and RestApiComponent
.
Quickstart
Installation
Install the DeepStructure SDK:
npm install @deepstructure/sdk
Creating a Simple Application
The SDK can be used in both JavaScript (CommonJS or ESM) and TypeScript projects. For example, to create a DeepStructure application named app.mjs
(or set { "type": "module" }
in your package.json
):
import { Application, SQLite, SummarizationComponent } from "@deepstructure/sdk";
// Create and export an Application instance
export const application = new Application();
// Use the database automatically created for each app
const db = application.database;
// Define tables to store values in the workflow
const documents = db.table("documents");
const summaries = db.table("summaries");
// Define a WorkflowComponent
const summary = new SummarizationComponent({
input: documents,
output: summaries,
prompt: "Summarize the text to a maximum of 140 characters",
});
// Add the workflow to the application
application.addWorkflow(documents.pipe(summary));
This example creates a simple application that reads data from a documents
table, summarizes the text using an LLM, and stores the result in a summaries
table.
Application
Application
Creating a DeepStructure application requires an Application
instance. The Application
instance lets us define one or more Workflows
(see below), then build a file we can deploy to DeepStructure. We create an Application
instance like this:
import { Application } from "@deepstructure/sdk";
const app = new Application();
We'll use this app
as we begin to define our workflows below.
An Application
can also include options when created, including:
environment
: an optional object containing environment variables to use when the application is deployed. NOTE: in most situations, defining environment variables and secrets using theds
command line tool is the preferred method, and theApplication
'senvironment
property is useful for testing or quick experiments.
const app = new Application({
environment: {
OPENAI_API_KEY: process.env.OPENAI_API_KEY,
LOG_LEVEL: "debug",
},
});
Working with Databases
When the Application
is created, it will use a Postgres connection string URI (DS_DB
) to create a new Postgres
database connection. By default, all deployed DeepStructure applications have their own Postgres database created automatically. You can use this database
instance when you need to access database objects (e.g., Table
).
The data we use in an Application
is always written to intermediate database tables. These tables form part of a workflow's structure, since data is piped from one component to another via a Table
.
Table
Table
A Table
is created or accessed using the application database's .table()
method:
const usersTable = application.database.table("users");
Various table configuration options can also be specified. For example, in the following code we define a table named data
, which includes a foreign key named query_id
and also specify that the primary key is a UUID:
const table = application.database.table("data", {
foreign_key_column: "query_id",
primary_key_uuid: true,
});
As just demonstrated, the Table
s used in a workflow can be explicitly defined by the user. However, you can also have your Application
implicitly create them for you. Implicit tables work well when you need tables between components in a workflow that aren't shared or referenced in other parts of your application. For example: if you need to have multiple workflows share a table, or are using a custom sql
statement to create a view or otherwise pull data from previous tables, you'll want to explicitly name and control these tables yourself. However, in the common case, where you are simply connecting two components, letting the application manage the tables for you makes more sense.
const application = new Application()
// ...
.pipe(new UrlScraperComponent())
// A new table in the default database will be created between these two components
.pipe(
new SummarizationComponent({
prompt: "Summarize the text in fewer than 200 words",
output: modelOutputTable,
})
);
Workflow Components
Workflow components transform data flowing through a workflow. For example: having an LLM process text to produce new data (e.g., summarize a document) or extracting information (e.g., find all URLs in a piece of text).
All components work in a similar way, allowing configuration data to be updated in order to control the specific outcome of the transformation:
import { SummarizationComponent } from "@deepstructure/sdk";
const summary = new SummarizationComponent({
prompt: "Summarize the text to a maximum of 140 characters",
});
The SummarizationComponent
allows us to specify a prompt
. Many components also look for environment variables. For example, the SummarizationComponent
expects an OPENAI_API_KEY
environment variable to be set. A component's configuration comes from a mix of optional and required values passed to their constructor's and through environment variables. If a configuration option for a component is missing, an error will be thrown when the component is used in a workflow.
Most WorkflowComponent
s also expect Table
instances via their input
and output
properties (NOTE: in the case that you are using implicit tables, these can be omitted):
const documents = application.database.table("documents");
const summary = new SummarizationComponent({
// Explicitly define the `input` table used by this SummarizationComponent
input: documents,
prompt: "Summarize the text to a maximum of 140 characters",
});
Here we have defined our SummarizationComponent
and explicitly set its input
to be the documents
table in our SQLite database. We can also implicitly connect components to tables as part of a Workflow
.
WorkflowComponent
and Table
WorkflowComponent
and Table
A Workflow
is created by connecting WorkflowComponent
s and Table
s. We do this with the .pipe()
method, which exists on all Table
and WorkflowComponent
instances (NOTE: .pipe()
is also available on Workflow
instances).
We could rewrite our example above to connect an input
table to our SummarizationComponent
:
const documents = application.database.table("documents");
const summary = new SummarizationComponent({
prompt: "Summarize the text to a maximum of 140 characters",
});
// Implicitly define the `input` table used by the SummarizationComponent
const workflow = documents.pipe(summary);
In the final line above, documents.pipe(summary)
connects the documents
table to the summary
component as its input
. We could also add another table afterward, in order to define the component's output
:
const workflow = documents.pipe(summary).pipe(sqlite.table("summaries"));
The code above connects the documents
and summaries
tables to the summary
component via its input
and output
properties. If desired, users can also make this more explicit in the definition of the component. The following code has the same outcome:
const summary = new SummarizationComponent({
input: documents,
output: sqlite.table("summaries"),
prompt: "Summarize the text to a maximum of 140 characters",
});
Both methods of defining the data connections between components are common.
Connecting WorkflowComponent
s
WorkflowComponent
sWe've just seen how to connect a Table
to a WorkflowComponent
, and we can use these same ideas to connect two WorkflowComponent
s via tables:
const modelOutputTable = db.table("model_output");
const workflow = new RestApiComponent({
input: modelOutputTable,
route: "/summarize",
postParams: ["value"],
})
.pipe(db.table("user_input"))
.pipe(
new SummarizationComponent({
prompt: "Summarize the text in fewer than 200 words",
output: modelOutputTable,
})
);
Here we create two components: a RestApiComponent
and a SummarizationComponent
. The RestApiComponent
will listen for clients to POST
content to a given URL, then pipe that data to the SummarizationComponent
via the user_input
table. When the summarization is complete, the resulting summary is piped to the model_output
table.
Wrapping a Workflow with a RestApiComponent
RestApiComponent
In the previous example, the RestApiComponent
waits for the summary and returns it to the user. It's also possible to express this workflow pattern using the Workflow
's .wrap()
method. When a workflow's data needs to start and end at the same component (e.g., a workflow that is triggered by, and responds via a REST API endpoint), we can wrap the workflow with a component, thus joining its two ends in a loop.
Consider the following example, where the workflow moves through multiple components until it is ultimately wrapped by a RestApiComponent
:
const workflow = .pipe(new UrlScraperComponent())
.pipe(db.table("scraped"))
.pipe(
new ChatCompletionComponent({
model: "gpt-4",
})
)
.pipe(db.table("chat_response"))
.wrap(
new RestApiComponent({
route: "/query",
output: db.table("rest_output"),
postParams: ["value"],
})
);
In this example the workflow has no initial input
table. Instead, it is wrapped with a RestApiComponent
, which will supply the input
. The workflow begins when the REST API receives a value, which will be fed into the input
of the UrlScraperComponent
. The data will flow through the workflow until it ends in the input
table of the RestApiComponent
, completing the loop. Because wrap()
forms a loop, a Workflow
should only be wrapped once.
Unlike wrap()
, which can only be called on a Workflow
, the .pipe()
method can be used on any Table
, WorkflowComponent
, or Workflow
to create a new Workflow
that connects everything into a single flow. Each connection in the workflow requires database tables to be set via input
and output
(either implicitly or explicitly).
NOTE: when wrapping a workflow, the component passed to
.wrap()
must define anoutput
table, which will then be connected to the first component asinput
. With this approach, it's common to not set aninput
table on the first component, knowing it will come from the final, wrapped component's output.
Wrapping a Workflow with .wrapOutput()
.wrapOutput()
A workflow can also be "wrapped" by using .wrapOutput()
as the last method called. This uses the output
table of the final (i.e. preceding) component to feed back into the input
of the first component:
const workflow = new RestApiComponent({
route: "/query",
postParams: ["value"],
})
.pipe(...)
.wrapOutput();
The .wrapOutput()
method is often easier to implement than .wrap()
.
Array-like Operations
In addition to using .pipe()
and .wrap()
, a number of common Array-like transformations are available for Table
s, WorkflowComponent
s, and Workflow
s:
stream()
: turns an Array into a set of individual, streamable items. Thestream()
function is used to create a batch, which the other functions below can then operate on one-by-one.forEach()
: runs a provided function on the value, but passes it through to the Workflow unmodifiedmap()
: transform data values using a provided functionfilter()
: selectively include values that satisfy a specified conditionreduce()
: aggregate values into a single output value (e.g., total a series of numbers) using a provided reducer functioncollect()
: collects (i.e., joins) separate elements into a single arrayflat()
: likecollect()
but also flattens multiple arrays into a single arrayflatMap()
: applies a mapping function to each item and flattens the result.
These methods make it easy to transform and control the flow of data through workflows. For example:
const workflow = ...
.stream()
.filter((record: DataRecord) => value.isActive)
.map((record: DataRecord) => ({ ...record, updatedAt: new Date().toISOString() }))
.pipe(...)
Naming a Workflow
A Workflow
can be named. This name is helpful for observing workflow runs, debugging, or when accessing API methods for a given workflow. There are two ways set the name:
First, while building a workflow and piping components together, the .withName()
method can be used, which sets the workflow's name and returns the Workflow
:
const workflow = .pipe(new UrlScraperComponent())
.pipe(db.table("scraped"))
.pipe(
new ChatCompletionComponent({
model: "gpt-4",
})
)
.pipe(db.table("chat_response"))
.wrap(
new RestApiComponent({
route: "/query",
output: db.table("rest_output"),
postParams: ["value"],
})
)
.withName("chat-workflow");
Second, after you've created a Workflow
, you can use its .name
property to set a value:
const workflow = .pipe(new UrlScraperComponent())
.pipe(db.table("scraped"))
.pipe(
new ChatCompletionComponent({
model: "gpt-4",
})
)
.pipe(db.table("chat_response"))
.wrap(
new RestApiComponent({
route: "/query",
output: db.table("rest_output"),
postParams: ["value"],
})
);
workflow.name = "chat-workflow";
Both methods of setting the name can be used.
Running Workflows
Once we've defined our Workflow
s, we can add them to our DeepStructure app
:
const workflow = new RestApiComponent({
input: modelOutputTable,
route: "/summarize",
postParams: ["value"],
})
.pipe(db.table("user_input"))
.pipe(
new SummarizationComponent({
prompt: "Summarize the text in fewer than 200 words",
output: modelOutputTable,
})
);
// Include this workflow in our app
app.addWorkflow(workflow);
We can get information from our Application
about the .workflows
, .components
and .databases
it contains. For example, to get a list of all databases used in the workflow(s):
app.addWorkflow(workflow1, workflow2);
const databases = app.databases;
// databases contains all databases used in workflow1 and workflow2 [db, db2, db3, ...]
Using .pipe()
with a targetTable
.pipe()
with a targetTable
In some cases, it's necessary to pipe data into a component, but not use the input
. For example, the BM25Component
has both input
and content
tables. In order to override the usual destination of .pipe()
you can pass an option
s object as the second argument:
// ...
.pipe(new ChunkerComponent({ documentIdColumnName: "document_id" }))
.pipe(chunksTable)
.pipe(new BM25Component(), { targetTable: "content" })
// ...
Here, the output
of the ChunkerComponent
is piped into the content
table of the BM25Component
, not the output
table.
This is not usually necessary, but certain components have multiple "ports" where data can be connected, and each will specify this in the documentation.
If you try to .pipe()
into a targetTable
that does not exist on the component, an error will be thrown.
Building an Application for Deployment
In order to be deployed, an Application
must be built to produce JSON. The JSON represents all of the Workflow
s, WorkflowComponent
s and their configurations, as well as the Database
s and Table
s that will be used:
const workflow = db
.table("input_data")
.pipe(componentOne)
.pipe(...);
export application = new Application();
app.addWorkflow(workflow);
We can now use the ds
command line tool to build (i.e., ds build
) or build-and-deploy (ds deploy
) our application.