Utils

When writing JavaScript code for a PostgreSQL filter, you have access to the context.pgutils object. This object has the following methods:


context.pgutils.executeSQL

Caution: this method is available from all PostgreSQL filters, but it must be used only when the following packets are received: Query and Parse for request filters, and ReadyForQuery for response filters. Invoking this method in any other circumstances will result in undefined behavior.

This method must be called with a valid SQL command which may or may not return a result set (so SELECT, INSERT , UPDATE and DELETE statements are OK). If called from a request filter, this SQL will be executed before the actual SQL command from the client is executed, so it can be useful to fetch some data before executing the client command. If called from a response filter, this SQL will be executed after the ReadyForQuery packet is received from the server, which means that the server will be ready to accept it, and the SQL command will be "sneaked" to the server before the next client command.

Note that this "piggybacks" on the current connection, which means that, if the transaction is rolled back, any inserts, updates or deletes executed with this method will also be rolled back.

Simple example: pre-insert check

Assume that, before issuing certain insert statements, we want to check whether the row already exists in the database.

This could be done with a Query filter with the following query pattern:


This specifies that the code for this filter will only be called when the SQL command satisfies the regular expression.

Now if we assume that the SQL command received from the database client looks like:

insert into customers (id, first_name, last_name, country)
values
(1000, 'Jane', 'Doe', 'ML');

then the filter code could be something like:

let sql = context.packet.getQuery();
// Extract the id from the SQL
let regexp = /^\((\d{1,6}),/gm;
let matches = regexp.exec(sql);
// Query the database to see if that row already exists
var rs = context.pgutils.executeSQL("select id, last_name from customers where id = " + matches[1]);
if (rs.getNumRows() > 0) {
// The row already exists: rewrite the SQL with a different ID
log.info("customer id already exists: " + matches[1]);
var newSql = sql.substring(0, matches.index + 1) +
(parseInt(matches[1]) + 1000000) +
sql.substring(matches.index + matches[1].length);
context.packet.setQuery(newSql);
}

Note that this is not perfect: there is a millisecond window during which the row could get inserted by someone else, but it's a reasonable example.

Technical note: this code will only work for immediate SQL commands but not for prepared statements. The same thing is possible with prepared statements, but when a Bind packet is received. This is in some ways easier, since the values from the client are easily available from the Bind packet and don't have have to be fished out of the SQL statement, but you do need to handle both Bind and Execute packets, unless you're OK running the check on Bind.


Advanced example: record all SQL commands

In this example, we're going to record all SQL commands received from the database client and store them in a database table, along with when they were executed, by whom, how long they took to run, and how many rows and total bytes they returned. This example is included in the tutorial project as a disabled filter.

We'll use a duplex filter because we need to know when a query starts and ends, and we're also going to count the number of rows and the number of bytes in the result set.

First, we need a place to record this data. In this example, we will simply create a table in the same database that is being queried (we could use a separate database if desired with only a bit more work).

create table queries (
id serial primary key,
sql text not null,
status text,
username varchar(100) not null,
ts timestamp with time zone not null,
exec_time int,
num_rows int,
num_bytes int
);

Now we're going to define a duplex filter to insert into this table whenever the database client finishes executing a command.

To do this properly, we need to understand the flow of information between the database client and the database server. All traffic is organized in packets. There are a few dozen types of packets defined, but the ones that are relevant here are Query, Execute, DataRow, ErrorResponse and ReadyForQuery.

Query is the simplest one: it is used to execute a piece of SQL, and that's it. A (possibly empty) set of DataRow packets is returned if execution was successful, an ErrorResponse packet if not.

Execute is used to ask the database to execute a prepared statement. It returns the same thing as a Query.

DataRow represents one row in a result set. We'll keep track of how many we see, and how many bytes they represent.

ErrorResponse is sent by the database whenever something went wrong, such as a SQL syntax error. We want to be notified of those so that we can record the error message.

ReadyForQuery is sent by the server when it's ready to accept a new command. This will be our signal that it's OK for us to send an insert to the server to record the data we have collected.

First we'll create a duplex filter and configure it to be notified of the correct packet types:

Next, we'll add some code to the filter to do all the work. Let's look at it in chunks -- the full code is at the end:

let JavaDate = Java.type("java.util.Date");
let packetType = context.packet.getPacketType();
if (packetType === 'Query' || packetType === 'Execute') {
context.connectionContext.recordQuery = context.utils.createObject();
context.connectionContext.recordQuery.startTime = new JavaDate();
context.connectionContext.recordQuery.numRows = 0;
context.connectionContext.recordQuery.numBytes = 0;
context.connectionContext.recordQuery.status = " ";
if (packetType === 'Query') {
context.connectionContext.recordQuery.sql = context.packet.getQuery().replace(/'/g, "''");
}
else {
context.connectionContext.recordQuery.sql = context.connectionContext.currentQuery.replace(/'/g, "''");
}
return;
}

The first line is just looking up Java's Date type. We can't use a JavaScript Date here because JavaScript objects cannot be shared across invocations of the filter (this also explains line 4's use of createObject).

Next, we get the packet type, and if it is Query or Execute, we set up a new object in the connection context. Note that we have to use the connection context and not simply the context object because variables defined in the context object are lost after the code is executed -- you get a clean context for every invocation. But the connection context sticks around for the duration of the database connection, so it's a good place to store information that you will need across filter invocations.

Finally, we get the SQL for this call, which is done differently for Query packets and Execute packets.

That takes care of Query and Execute packets.


if (packetType === 'DataRow') {
context.connectionContext.recordQuery.numRows++;
context.connectionContext.recordQuery.numBytes += context.packet.getLength();
return;
}

Here we handle data rows. Whenever we see a row, we simply increment the number of rows and the number of bytes for the current command.


if (packetType === "ErrorResponse") {
context.connectionContext.recordQuery.status =
context.packet.getErrorString().replace(/'/g, "''");
return;
}

Here we handle errors: if the server sends an error packet, we make a note of its message.


// At this point, this can only be a ReadyForQuery packet
let execTime = new JavaDate().getTime() - context.connectionContext.recordQuery.startTime.getTime();
let recordSql = "insert into gallium_demo.queries " +
"(sql, username, ts, status, exec_time, num_rows, num_bytes) " +
"values (" +
"'" + context.connectionContext.recordQuery.sql + "'," +
"'" + context.connectionContext.userName + "'," +
"'" + new Date().toISOString() + "'," +
"'" + context.connectionContext.recordQuery.status + "'," +
execTime + "," +
context.connectionContext.recordQuery.numRows + "," +
context.connectionContext.recordQuery.numBytes +
")";
let rs = context.pgutils.executeSQL(recordSql);
if (rs && rs.getErrorPacket() !== null) {
log.error("Error running SQL: " + rs.getErrorPacket().getErrorString());
}

All other packet types have been handled, so by the time we get to this code, it can only be a ReadyForQuery packet, which means that the server is ready to accept a command. This is the perfect time for us to "sneak in" an insert to the server and record the data we've been collecting.

Full code (expand/collapse)

let JavaDate = Java.type("java.util.Date");
let packetType = context.packet.getPacketType();
if (packetType === 'Query' || packetType === 'Execute') {
context.connectionContext.recordQuery = context.utils.createObject();
context.connectionContext.recordQuery.startTime = new JavaDate();
context.connectionContext.recordQuery.numRows = 0;
context.connectionContext.recordQuery.numBytes = 0;
if (packetType === 'Query') {
context.connectionContext.recordQuery.sql = context.packet.getQuery().replace(/'/g, "''");
}
else {
context.connectionContext.recordQuery.sql = context.connectionContext.currentQuery.replace(/'/g, "''");
}
context.connectionContext.recordQuery.status = " ";
return;
}
if (packetType === 'DataRow') {
context.connectionContext.recordQuery.numRows++;
context.connectionContext.recordQuery.numBytes += context.packet.getLength();
return;
}

if (packetType === "ErrorResponse") {
context.connectionContext.recordQuery.status =
context.packet.getErrorString().replace(/'/g, "''");
return;
}

// At this point, this can only be a ReadyForQuery packet

let execTime = new JavaDate().getTime() - context.connectionContext.recordQuery.startTime.getTime();

let recordSql = "insert into gallium_demo.queries " +

"(sql, username, ts, status, exec_time, num_rows, num_bytes) " +

"values (" +

"'" + context.connectionContext.recordQuery.sql + "'," +

"'" + context.connectionContext.userName + "'," +

"'" + new Date().toISOString() + "'," +

"'" + context.connectionContext.recordQuery.status + "'," +

execTime + "," +

context.connectionContext.recordQuery.numRows + "," +

context.connectionContext.recordQuery.numBytes +

")";

let rs = context.utils.executeSQL(recordSql);

if (rs && rs.getErrorPacket() !== null) {

log.error("Error running SQL: " + rs.getErrorPacket().getErrorString());

}


The result is that, when this filter is in effect, every SQL command sent by the client is persisted in the queries table. In the real world, you would typically not persist every single statement, as that can create large amounts of data very quickly, but that's up to you.