At Craft AI, we build a new product so data scientists can code and push, quickly and easily, in production, their machine learning algorithms. Our purpose is to make life easier for data scientists, for example, we handle data storage in a nice way so data scientists do not have to bother with saving and loading data from a database.
For this purpose, we use a PostgreSQL client written in Rust. Since we have a runtime with Tokio, the Deadpool-postgres crate was chosen.
As the scaling capability of our product is a central question we wanted to use a pool instead of using one connection at a time.
We wrote this article describing the first steps for implementing Postgresql pool connection in Rust with the Tokio Runtime. There might still be room for improvement but to the best of our knowledge, this is the only article that shows an example with Json and Uuid columns, with transactions with a retry mechanism.
That’s why we share this article.
Furthermore, we welcome any reviews on this article. That would be a great opportunity for us to improve it.
- To be fairly comfortable with Rust and its new paradigms such as: Ownership, Borrowchecker, Lifetime. (>= v1.56.0)
- To already have a basic experience with the Tokio Runtime. (>= v1.12.0)
- To have basic knowledge of PostgreSQL. (>= v9)
Setting up Pool, connection and runtime
Let’s start with the installation of the required crates:
Deadpool-postgres is needed for the pool connection. Obviously postgres is required too and we also need the tokio runtime.
The first step is to establish the connection to the database.
For this purpose we need to retrieve database configuration:
And then create the Pool connection client:
And finally the service with Tokio runtime and a messaging manager (Kafka for instance but it could also be RabbitMQ), where each message received through Kafka is handled in an asynchronous task.
Notice: as it is explained in the documentation, cloning the `pool` is cheap, so we do it for each message.
Now that everything is set up, we can dive into the hard part of the code: database queries.
A very useful and tiny helper function is to get a connection from the pool.
How we handle errors is not this article subject, here we will just raise an error with a String as the error message.
For purposes of illustration, we will use a simple table called people.
Each record has an UUID, a name (Text) and a data field (Json):
At each insertion, the id (UUID) is automatically generated.
So now, we need to update our Cargo.toml file with all the required crates:
We need serde and serde_json crates for serialization and deserialization for the Json column and the uuid crate for the uuid type.
This is the struct for the Json column “data”:
And then the struct for the record:
Finally, the From trait implementation for our struct:
I haven’t found a better way to deserialize the Json column other than with the `serde_json::value::Value` and then with the `serde_json::from_value` method.
I will let the reader correctly handle the error if any, instead of using the `unwrap()` method.
Create a record
To create a record, it is straightforward:
Now, we are ready to read records in the database:
To update a row or delete a row, it is pretty straightforward.
Add multiple rows at once
According to the rust postgres crate author, there are two ways to handle this: we can either prepare the query and populate it with our data or use the COPY query.
Here we chose to prepare the query. For the COPY query method, I let the reader click on the link above.
First, we need to create the query string. Since we use parameterized queries, we need to index each element passed to the query.
Caution, index starts at 1 and not 0.
So at the end we have:
Transaction with retry
When we use transaction, there can be two issues:
- Lock: when the request needs to access to a table but this table is already locked by another request
- Deadlock: when several requests need to access several tables and try to lock them at the same time
We are very lucky because a clean error is returned for each of the previous cases.
Hence, this very helpful helper :
All the SqlState can be found here.
We tried to write a high level function to query the database with the retry mechanism, but actually it is not as simple as that. Instead, we will use this helper function in a very procedural way.
If you need to use another isolation level, you can use the `build_transaction` method :
For more details, you can read the documentation here.
To dive deeper into this topic, there is the Rust forum where you can ask any question: https://users.rust-lang.org/
I hope this will help you.
We welcome any suggestions to improve this article. Feel free to share your experience as well.