I recently came across a good excuse to write another toy server in Rust. This time, rather than Axum, I wanted to try the new async Rocket. And, inspired by Matthias Endler’s zerocal, I also decided to give Shuttle a try.
The problem
At $WORK
, we have a simple ticket workflow in Linear, where tickets are automatically moved to Merged
if the pull request they are linked to merges.
However, this is not their intended final state.
Instead, we expect these tickets to go one of two places: QA Ready
(meaning there are clear instructions for testing it in our deployed environments), or Done
(where we can indeed consider the ticket completed).
The problem is, it is incredibly easy to forget to move a ticket along in the process. This can cause extra work to make sure things are not falling through the cracks.
One way to help mitigate this issue: sending reminders!
Specifically, we are looking some behavior something like the state machine below.
We only want to consider sending a reminder if a ticket’s status is Merged
.
In order to not be annoying (i.e., reminding someone to do something they are already working on), we also want to wait some amount of time.
If the ticket is no longer Merged
by the time that timer runs out, do nothing.
At time of writing, there is no way to do this natively in Linear, so how can we do it?
The better thing to do
I first tried to implement this with the Linear Zapier integration.
However, I ran into a fatal flaw: I could not find a way to implement the delayed notification.
Specifically, I attempted to create the chain of:
- Linear trigger on issue updated
- Delay N mins
- Filter on if
status==Merged
- Post a comment on the issue
However, it turns out this doesn’t work.
I’d get a comment on issue regardless of whether its status was still Merged
after the delay.
Apparently, this is expected.
For this to work, you need an additional step between (2) and (3) to refetch data to use in the filter [1].
Linear’s Zapier integration currently does not implement such a way to fetch.
Arguably, the best thing to do in this situation would be to contribute upstream to linear/linear-zapier
to add some action that would allow this refetch the ticket in question before an action was taken.
But, I didn’t do that.
What I did instead
Instead, this seemed like a prime opportunity to explore creating some more tooling with Rust! Since Linear supports webhooks and mutations via API, we should have all we need to implement this ourselves.
By using Rust, we could have a pretty lightweight, efficient server that we could easily serve with Shuttle for free.
System Design
Just because it’s a toy doesn’t mean it shouldn’t be relatively efficient. For example, it would be wasteful to just periodically poll linear for all issues and compute the diffs and updates ourselves.
Instead, thanks to Linear’s webhooks, and by saving some application state ourselves, we can get all the state we need from the webhooks alone; no need for extra calls.
Technically, we don’t need the identifier
or title
columns in the database.
But, with them, we can print more useful logs (i.e., so we can actually go search the ticket title and verify things worked).
Implementation Journey
Note that in the code blocks below, I’m omitting the use
commands for brevity.
You can see the repository itself for full details and code that compiles.
Shuttle Prerequisites
We can start with a dead-simple Rocket template using shuttle. First, we install the shuttle CLI.
$ cargo install cargo-shuttle
Then, in the directory we want this project to live, we can make a new shuttle project.
$ cargo shuttle init -t rocket --name linear-reminders linear-reminders
This gets us the basics:
// src/main.rs
use rocket::{get, routes};
#[get("/")]
fn index() -> &'static str {
"Hello, world!"
}
#[shuttle_runtime::main]
async fn main() -> shuttle_rocket::ShuttleRocket {
let rocket = rocket::build().mount("/", routes![index]);
Ok(rocket.into())
}
# Cargo.toml
[package]
name = "linear-reminders"
version = "0.1.0"
edition = "2021"
[dependencies]
rocket = "0.5.0"
shuttle-rocket = "0.42.0"
shuttle-runtime = "0.42.0"
tokio = "1.26.0"
To deploy the code, you need a shuttle account, which can be done at https://console.shuttle.rs/login
Once you’ve authed with cargo shuttle login
, you can deploy with cargo shuttle deploy
.
Then, hitting linear-reminders.shuttleapp.rs
gives you
Hello, world!
Setting up a Justfile
for fast local iteration
In order to implement this quickly (it’s a toy, after all), we want very quick dev iterations.
To do so, let’s leverage just
a convenient command runner with a Justfile
that should remind you of Makefile
s.
The key here is to leverage cargo-watch
, which watches project source code for changes, and when changes are detected, runs a series of commands.
We also use cargo-nextest
, despite us not actually setting up a test suite in this walkthrough.
If/when we do, nextest
is a very convenient test runner.
# just manual: https://github.com/casey/just
_default:
@just --list
# Runs clippy on the sources
check:
cargo clippy --locked -- -D warnings
# Sets up a watcher that lints, tests, and builds
watch:
cargo watch -qcx 'clippy --all-targets --all-features -- -D warnings' -x 'nextest run' -x 'shuttle run --release'
With this, we can run just watch
in another terminal, and every time we modify the code, we’ll see clippy
warnings, test results, and rebuild/rerun the server locally so that we can interact with it.
Adding Logging
Next, a good place to start is to add some logging so that we can get visibility when we need to.
$ cargo add tracing
Then, we can use tracing’s debug
/info
/warn
/error
macros just as we normally would, and shuttle will display them.
Adding a Database
We know we are going to need to save some application state.
Let’s reach for sqlx
to handle SQL in a lightweight, compile-time checked way.
$ cargo add sqlx -F chrono -F postgres
In normal SQLx style, we can set up a migration to create our table.
-- migrations/1_issues.sql
CREATE TABLE IF NOT EXISTS issues (
id VARCHAR PRIMARY KEY,
identifier VARCHAR NOT NULL,
title VARCHAR NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
reminded BOOLEAN NOT NULL DEFAULT FALSE
);
To do so, Shuttle supports created a shared Postgres instance just by adding a dependency and an argument.
$ cargo add shuttle-shared-db -F postgres -F sqlx
// src/main.rs
use rocket::{get, routes};
#[get("/")]
fn index() -> &'static str {
"Hello, world!"
}
struct AppState {
pool: PgPool,
}
#[shuttle_runtime::main]
async fn main(
#[shuttle_shared_db::Postgres] pool: PgPool,
) -> shuttle_rocket::ShuttleRocket {
let state = AppState { pool };
let rocket = rocket::build().mount("/", routes![index]).manage(state);
Ok(rocket.into())
}
Again, we could follow the normal SQLx way of running migrations using the CLI. We can get the connection string for Postgres by running the following.
$ cargo shuttle resource list --show-secrets
These databases are linked to linear-reminders
╭────────────────────────────┬──────────────────────────────────────────────────────────────────────────────────╮
│ Type ┆ Connection string │
╞════════════════════════════╪══════════════════════════════════════════════════════════════════════════════════╡
│ database::shared::postgres ┆ postgres://user-linear-reminders:********@db.shuttle.rs:5432/db-linear-reminders │
╰────────────────────────────┴──────────────────────────────────────────────────────────────────────────────────╯
However, for convenience (and since we don’t anticipate changing this anytime soon), we just run it on startup.
// src/main.rs
use shuttle_runtime::CustomError;
// ...
#[shuttle_runtime::main]
async fn main(
#[shuttle_shared_db::Postgres] pool: PgPool,
) -> shuttle_rocket::ShuttleRocket {
// Run single migration on startup.
pool.execute(include_str!("../migrations/1_issues.sql"))
.await
.map_err(CustomError::new)?;
info!("ran database migrations");
let rocket = rocket::build().mount("/", routes![index]);
Ok(rocket.into())
}
Adding Configuration
Next, we need a way to configure our application when we deploy it. Ideally, we don’t hard code a bunch of secrets and strings.
To do so, we’ll leverage Rocket’s built-in configuration, along with Shuttle secrets.
We’ll also use serde
to get our config into a struct and secrecy
to keep secrets from accidentally leaking.
We also want to pull in chrono
for convenient duration types.
And, just as a nice-to-have, let’s use humantime
so our configuration can be a little more human-readable.
$ cargo add serde -F derive
$ cargo add secrecy -F serde
$ cargo add chrono
$ cargo add humantime
Next, let’s define what configuration we want to take.
We need some secret keys to interact with Linear, and we also want to make the target status (Merged
, in our case), the time before reminding, and the actual message sent, all configurable.
use serde::{Deserialize, Deserializer, Serialize};
use secrecy::{ExposeSecret, SecretString};
#[derive(Deserialize, Debug, Clone)]
struct AppConfig {
linear: LinearConfig,
#[serde(deserialize_with = "deserialize_duration")]
time_to_remind: Duration,
}
#[derive(Deserialize, Debug, Clone)]
struct LinearConfig {
api_key: SecretString,
signing_key: SecretString,
target_status: String,
message: String,
}
/// Custom deserializer from humantime to std::time::Duration
fn deserialize_duration<'de, D>(deserializer: D) -> Result<std::time::Duration, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
match s.parse::<humantime::Duration>() {
Ok(duration) => Ok(duration.into()),
Err(_) => Err(serde::de::Error::custom("Invalid duration format")),
}
}
With that set, we can simply define these things in Rocket.toml
# Rocket.toml
[default]
# The amount of time to wait between an issue hiting the `target_status` and a reminder being sent.
# This is provided in "humantime" format (e.g, 15days 3hr 3min)
time_to_remind = '30min'
[default.linear]
# Your Linear personal api Key
api_key = 'insert-here'
# Your Linear webhook signing key
signing_key = 'insert-here'
# The target status to send reminders for
target_status = 'Merged'
# The content of the comment to send as the reminder.
# Must be a single line.
message = 'If this issue is QA-able, please write instructions and move to `QA Ready`. If not, mark it as `Done`. Thanks!\n\n*This is an automated message.*'
Then, these can be accessed using the AdHoc
fairing in Rocket.
use rocket::fairing::AdHoc;
#[shuttle_runtime::main]
async fn main(
#[shuttle_shared_db::Postgres] pool: PgPool,
) -> shuttle_rocket::ShuttleRocket {
// Run single migration on startup.
pool.execute(include_str!("../migrations/1_issues.sql"))
.await
.map_err(CustomError::new)?;
info!("ran database migrations");
let rocket = rocket::build()
.attach(AdHoc::config::<AppConfig>())
.mount("/", routes![index])
.manage(state);
Ok(rocket.into())
}
But, we want to be able to configure these when we deploy, and not check secrets in to version control.
So, we can set up Shuttle Secrets.toml
# Secrets.toml
'ROCKET_LINEAR.API_KEY' = 'lin_api_fillinyourkey'
'ROCKET_LINEAR.SIGNING_KEY' = 'lin_wh_fillinyourotherkey'
'ROCKET_LINEAR.TARGET_STATUS' = 'Limbo'
'ROCKET_LINEAR.MESSAGE' = 'Get out of Limbo.'
'ROCKET_TIME_TO_REMIND' = '10min'
One annoyance with this set up is that for Rocket to read in these values from the environment (12-factor-app-style), we actually need to go and set them in the environment.
#[shuttle_runtime::main]
async fn main(
#[shuttle_shared_db::Postgres] pool: PgPool,
#[shuttle_runtime::Secrets] secrets: shuttle_runtime::SecretStore,
) -> shuttle_rocket::ShuttleRocket {
// Transfer Shuttle.rs Secrets to Env Vars
if let Some(secret) = secrets.get("ROCKET_LINEAR.API_KEY") {
env::set_var("ROCKET_LINEAR.API_KEY", secret)
}
if let Some(secret) = secrets.get("ROCKET_LINEAR.SIGNING_KEY") {
env::set_var("ROCKET_LINEAR.SIGNING_KEY", secret)
}
if let Some(secret) = secrets.get("ROCKET_LINEAR.TARGET_STATUS") {
env::set_var("ROCKET_LINEAR.TARGET_STATUS", secret)
}
if let Some(secret) = secrets.get("ROCKET_LINEAR.MESSAGE") {
env::set_var("ROCKET_LINEAR.MESSAGE", secret)
}
if let Some(secret) = secrets.get("ROCKET_TIME_TO_REMIND") {
env::set_var("ROCKET_TIME_TO_REMIND", secret)
}
// Run single migration on startup.
pool.execute(include_str!("../migrations/1_issues.sql"))
.await
.map_err(CustomError::new)?;
info!("ran database migrations");
let state = AppState { pool };
let rocket = rocket::build()
.attach(AdHoc::config::<AppConfig>())
.mount("/", routes![index])
.manage(state);
}
Validating Webhooks
Now, let’s make our webhook endpoint.
When Linear sends a webhook, they send a POST
with a body that looks like the following.
{
"action": "update",
"actor": {
"id": "2e6eea91-1111-0000-9486-acea0603004e",
"name": "Luke Hsiao"
},
"createdAt": "2024-03-28T05:10:45.264Z",
"data": {
"id": "bf740309-1111-0000-a0f7-b8b26e18b33b",
"createdAt": "2024-03-23T15:32:11.774Z",
"updatedAt": "2024-03-28T05:10:45.264Z",
"number": 339,
"title": "2023 Taxes",
"priority": 2,
"estimate": 4,
"boardOrder": 0,
"sortOrder": -11061.79,
"startedAt": "2024-03-23T15:32:11.806Z",
"labelIds": [],
"teamId": "4d869526-1111-0000-92b2-2f0dc171849a",
"cycleId": "8d86d606-1111-0000-aa34-e6f8dfc00ebc",
"previousIdentifiers": [],
"creatorId": "2e6eea91-1111-0000-9486-acea0603004e",
"assigneeId": "2e6eea91-1111-0000-9486-acea0603004e",
"stateId": "478ce2a9-1111-0000-b2ee-9dbe810352f9",
"priorityLabel": "High",
"botActor": {
"id": "5c07d33f-1111-0000-8100-67908589ec45",
"type": "workflow",
"name": "Linear",
"avatarUrl": "https://static.linear.app/assets/pwa/icon_maskable_512.png"
},
"identifier": "HSI-339",
"url": "https://linear.app/hsiao/issue/HSI-339/2023-taxes",
"assignee": {
"id": "2e6eea91-1111-0000-9486-acea0603004e",
"name": "Luke Hsiao"
},
"cycle": {
"id": "8d86d606-1111-0000-aa34-e6f8dfc00ebc",
"number": 19,
"startsAt": "2024-03-25T07:00:00.000Z",
"endsAt": "2024-04-08T07:00:00.000Z"
},
"state": {
"id": "478ce2a9-1111-0000-b2ee-9dbe810352f9",
"color": "#f2c94c",
"name": "In Progress",
"type": "started"
},
"team": {
"id": "4d869526-1111-0000-92b2-2f0dc171849a",
"key": "HSI",
"name": "Hsiao"
},
"subscriberIds": [
"2e6eea91-1111-0000-9486-acea0603004e",
"233a3b9e-1111-0000-b350-4b1f85ce733b"
],
"labels": []
},
"updatedFrom": {
"updatedAt": "2024-03-28T05:10:18.275Z",
"sortOrder": 84.27,
"stateId": "3e0d1574-1111-0000-953d-42e08ad719eb"
},
"url": "https://linear.app/hsiao/issue/HSI-339/2023-taxes",
"type": "Issue",
"organizationId": "15a23696-1111-0000-ad4a-84e751d82d13",
"webhookTimestamp": 1711602645358,
"webhookId": "3f106cc1-1111-0000-83ed-238cece0b5e2"
}
In addition, they provide guidance on securing webhooks using the Linear-Signature
header and timestamp.
Specifically, they recommend three things:
- Check the SHA256 HMAC signature with the secret signing key
- Ensure that the
webhookTimestamp
is within a minute of your system time to prevent replay attacks - Confirm that the sender IP address is one of the following:
35.231.147.226
or35.243.134.228
Doing all of these is the best, but doing 3 makes local debugging a little harder, so we will leave that as an exercise to the reader.
To start, we need to define some structs to get the small subset of fields we actually care about from the request body.
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(crate = "rocket::serde")]
struct Payload {
action: String,
#[serde(rename = "type")]
event_type: String,
#[serde(alias = "createdAt")]
created_at: DateTime<Utc>,
data: IssueData,
#[serde(alias = "webhookTimestamp")]
webhook_timestamp: i64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(crate = "rocket::serde")]
struct IssueData {
id: String,
identifier: String,
title: String,
state: StateData,
#[serde(skip)]
_ignored_fields: Option<Value>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(crate = "rocket::serde")]
struct StateData {
name: String,
#[serde(skip)]
_ignored_fields: Option<Value>,
}
Next, we implement a Data Guard for Payload
that performs the bulk of the validation for us.
/// Data guard that validates integrity of the request body by comparing with a
/// signature.
const LINEAR_SIGNATURE: &str = "Linear-Signature";
#[rocket::async_trait]
impl<'r> FromData<'r> for Payload {
type Error = ();
async fn from_data(req: &'r Request<'_>, data: Data<'r>) -> data::Outcome<'r, Self> {
// Ensure header is present
let keys = req.headers().get(LINEAR_SIGNATURE).collect::<Vec<_>>();
if keys.len() != 1 {
return Outcome::Error((Status::BadRequest, ()));
}
let signature = keys[0];
// Ensure content type is right
let ct = ContentType::new("application", "json");
if req.content_type() != Some(&ct) {
return Outcome::Forward((data, Status::UnsupportedMediaType));
}
// TODO: could also verify IP address, but that makes testing harder.
// Use a configured limit with name 'json' or fallback to default.
let limit = req.limits().get("json").unwrap_or(5.kilobytes());
// Read the data into a string.
let body = match data.open(limit).into_string().await {
Ok(string) if string.is_complete() => string.into_inner(),
Ok(_) => return Outcome::Error((Status::PayloadTooLarge, ())),
Err(_) => return Outcome::Error((Status::InternalServerError, ())),
};
// We store `body` in request-local cache for long-lived borrows.
let body = request::local_cache!(req, body);
let config = match req.rocket().state::<AppConfig>() {
Some(c) => c,
None => return Outcome::Error((Status::InternalServerError, ())),
};
if !is_valid_signature(signature, body, config.linear.signing_key.expose_secret()) {
return Outcome::Error((Status::BadRequest, ()));
}
let r: Payload = match serde_json::from_str(body) {
Ok(r) => r,
Err(_) => return Outcome::Error((Status::BadRequest, ())),
};
// Prevent replay attacks
let webhook_time = match DateTime::from_timestamp(r.webhook_timestamp, 0) {
Some(t) => t,
None => return Outcome::Error((Status::BadRequest, ())),
};
let now = Utc::now();
if now.signed_duration_since(webhook_time).num_seconds() > 60 {
return Outcome::Error((Status::BadRequest, ()));
}
Outcome::Success(r)
}
}
type HmacSha256 = SimpleHmac<Sha256>;
fn is_valid_signature(signature: &str, body: &str, secret: &str) -> bool {
let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).expect("failed to create hmac");
mac.update(body.as_bytes());
let result = mac.finalize();
let expected_signature = result.into_bytes();
let encoded = hex::encode(expected_signature);
debug!(encoded=%encoded, "actual signature");
// Some might say this should be constant-time equality check
encoded == signature
}
Note that this is a suboptimal way to do this, and Rocket v0.6 should have a more optimal way.
Now, we can define our endpoint with the relevant logic.
#[post("/", format = "json", data = "<payload>")]
async fn webhook_linear(
payload: Payload,
state: &State<AppState>,
app_config: &State<AppConfig>,
) -> Result<()> {
// Do everything in one transaction
let mut transaction = state.pool.begin().await?;
if payload.data.state.name == app_config.linear.target_status {
// Use `ON CONFLICT DO NOTHING` because after the `time_to_remind`,
// we will check again, whether or not an issue was updated twice.
sqlx::query!(
"INSERT INTO issues( id, identifier, title, updated_at, reminded) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING",
&payload.data.id,
&payload.data.identifier,
&payload.data.title,
payload.created_at,
false
)
.execute(&mut *transaction)
.await?;
info!(payload=?payload, "added issue to remind");
} else if let Ok(true) = issue_in_db(&mut transaction, &payload.data.id).await {
sqlx::query!("DELETE FROM issues WHERE id = $1", &payload.data.id)
.execute(&mut *transaction)
.await?;
info!(payload=?payload, "issue is no longer {}", app_config.linear.target_status);
}
transaction.commit().await?;
Ok(())
}
async fn issue_in_db(transaction: &mut PgTransaction, id: &str) -> Result<bool> {
let r = sqlx::query!(
r#"
SELECT COUNT(*)
FROM issues
WHERE id = $1
"#,
id
)
.fetch_one(&mut **transaction)
.await?;
if r.count == Some(1) {
Ok(true)
} else {
Ok(false)
}
}
#[shuttle_runtime::main]
async fn main(
#[shuttle_shared_db::Postgres] pool: PgPool,
#[shuttle_runtime::Secrets] secrets: shuttle_runtime::SecretStore,
) -> shuttle_rocket::ShuttleRocket {
// Transfer Shuttle.rs Secrets to Env Vars
if let Some(secret) = secrets.get("ROCKET_LINEAR.API_KEY") {
env::set_var("ROCKET_LINEAR.API_KEY", secret)
}
if let Some(secret) = secrets.get("ROCKET_LINEAR.SIGNING_KEY") {
env::set_var("ROCKET_LINEAR.SIGNING_KEY", secret)
}
if let Some(secret) = secrets.get("ROCKET_LINEAR.TARGET_STATUS") {
env::set_var("ROCKET_LINEAR.TARGET_STATUS", secret)
}
if let Some(secret) = secrets.get("ROCKET_LINEAR.MESSAGE") {
env::set_var("ROCKET_LINEAR.MESSAGE", secret)
}
if let Some(secret) = secrets.get("ROCKET_TIME_TO_REMIND") {
env::set_var("ROCKET_TIME_TO_REMIND", secret)
}
// Run single migration on startup.
pool.execute(include_str!("../migrations/1_issues.sql"))
.await
.map_err(CustomError::new)?;
info!("ran database migrations");
let state = AppState { pool };
let rocket = rocket::build()
.attach(AdHoc::config::<AppConfig>())
.mount("/webhooks/linear", routes![webhook_linear])
.manage(state);
Ok(rocket.into())
}
Implementing a Postgres Queue
We have handled inserting into the database and deleting from the database if the status changed. Now, we just need a handy way to dequeue from Postgres for our background task.
async fn dequeue_issue(pool: &PgPool) -> Result<Option<(PgTransaction, Issue)>> {
let mut transaction = pool.begin().await?;
let r = sqlx::query!(
r#"
SELECT id, identifier, title, updated_at, reminded
FROM issues
WHERE reminded = FALSE
ORDER BY updated_at ASC
FOR UPDATE
SKIP LOCKED
LIMIT 1
"#,
)
.fetch_optional(&mut *transaction)
.await?;
if let Some(r) = r {
Ok(Some((
transaction,
Issue {
id: r.id,
updated_at: r.updated_at,
identifier: r.identifier,
title: r.title,
reminded: r.reminded,
},
)))
} else {
Ok(None)
}
}
The key bit here is that we make the query clever in case we have multiple workers.
Since Postgres 9.5, there is the SKIP LOCKED
clause, which allows SELECT
statements to ignore all rows that are currently locked by another concurrent operation.
FOR UPDATE
locks the rows returned by a SELECT
.
We combine them, making this a concurrency-safe queue.
This way, a worker just selects uncontested tasks for the duration of the transaction.
Posting Comments via GraphQL
Finally, we need to actually implement the background worker which will poll the queue and post comments.
We implement this by just spawning a tokio
task at startup.
Because it happens before Rocket starts, we also provide it a clone of the Postgres pool and the configuration.
#[shuttle_runtime::main]
async fn main(
#[shuttle_shared_db::Postgres] pool: PgPool,
#[shuttle_runtime::Secrets] secrets: shuttle_runtime::SecretStore,
) -> shuttle_rocket::ShuttleRocket {
// Transfer Shuttle.rs Secrets to Env Vars
if let Some(secret) = secrets.get("ROCKET_LINEAR.API_KEY") {
env::set_var("ROCKET_LINEAR.API_KEY", secret)
}
if let Some(secret) = secrets.get("ROCKET_LINEAR.SIGNING_KEY") {
env::set_var("ROCKET_LINEAR.SIGNING_KEY", secret)
}
if let Some(secret) = secrets.get("ROCKET_LINEAR.TARGET_STATUS") {
env::set_var("ROCKET_LINEAR.TARGET_STATUS", secret)
}
if let Some(secret) = secrets.get("ROCKET_LINEAR.MESSAGE") {
env::set_var("ROCKET_LINEAR.MESSAGE", secret)
}
if let Some(secret) = secrets.get("ROCKET_TIME_TO_REMIND") {
env::set_var("ROCKET_TIME_TO_REMIND", secret)
}
// Run single migration on startup.
pool.execute(include_str!("../migrations/1_issues.sql"))
.await
.map_err(CustomError::new)?;
info!("ran database migrations");
// Worker Task: periodically checks and sends the reminder comments
let worker_pool = pool.clone();
let worker_config = Config::figment()
.extract::<AppConfig>()
.expect("failed to parse app config");
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let issue = dequeue_issue(&worker_pool).await;
if let Ok(Some((mut transaction, issue_db))) = issue {
let now = Utc::now();
if now.signed_duration_since(issue_db.updated_at)
> TimeDelta::from_std(worker_config.time_to_remind)
.expect("failed to convert Duration to TimeDelta")
{
let client = reqwest::Client::new();
// Ref: https://developers.linear.app/docs/graphql/working-with-the-graphql-api#queries-and-mutations
let body = serde_json::json!({
"query": format!(r#"mutation CommentCreate {{
commentCreate(
input: {{
body: "{}"
issueId: "{}"
}}
) {{
success
}}
}}"#, worker_config.linear.message, issue_db.id)
});
if let Ok(res) = client
.post("https://api.linear.app/graphql")
.header(
header::AUTHORIZATION,
worker_config.linear.api_key.expose_secret(),
)
.header(header::CONTENT_TYPE, "application/json")
.json(&body)
.send()
.await
{
if !res.status().is_success() {
let status = res.status();
let text = res.text().await.unwrap_or_default();
warn!(issue=?issue_db, status=?status, msg=%text, "failed to post comment, retrying later...");
continue;
}
} else {
warn!(issue=?issue_db,"failed to post comment, retrying later...");
continue;
}
if let Ok(r) = sqlx::query!(
"UPDATE issues SET reminded = TRUE WHERE id = $1",
&issue_db.id
)
.execute(&mut *transaction)
.await
{
if r.rows_affected() == 1 {
let _ = transaction.commit().await;
info!(issue=?issue_db, "sent reminder");
} else {
let _ = transaction.rollback().await;
}
}
}
}
}
});
let state = AppState { pool };
let rocket = rocket::build()
.attach(AdHoc::config::<AppConfig>())
.mount("/webhooks/linear", routes![webhook_linear])
.manage(state);
Ok(rocket.into())
}
Disabling Idle Timeout
After about a month of using this, we noticed we would occasionally miss webhooks. Looking at the logs, it seems like they were never received at all, rather than some other error (e.g., failing to parse).
We suspect this is due to Shuttle’s idle timeout.
So, as an extra step, we set --idle-minutes 0
to disable the timeout so that the project never sleeps.
We have not noticed any missed webhooks since changing this setting.
Conclusion
This results in an 18 MB server that does the job efficiently in just over 300 lines of Rust.
In fact, since we started running it, we haven’t had a single ticket that required manual nagging to push along in the process, saving attention and time.
Feel free to check it out at lukehsiao/linear-reminder
.