diff --git a/.sqlx/query-462ce7087bb17b87b12e21f38005ab653827f8f934c42e1c83aad0e54e64a1d2.json b/.sqlx/query-462ce7087bb17b87b12e21f38005ab653827f8f934c42e1c83aad0e54e64a1d2.json new file mode 100644 index 00000000..0272b883 --- /dev/null +++ b/.sqlx/query-462ce7087bb17b87b12e21f38005ab653827f8f934c42e1c83aad0e54e64a1d2.json @@ -0,0 +1,62 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE subscription\n SET modification_date_time = now(),\n client_name = $2,\n program_id = $3,\n object_operations = $4\n WHERE id = $1\n AND ($5::text IS NULL OR client_id = $5)\n RETURNING\n id,\n created_date_time,\n modification_date_time,\n client_id,\n client_name,\n program_id,\n object_operations\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "created_date_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "modification_date_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "client_id", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "client_name", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "program_id", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "object_operations", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Jsonb", + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + true + ] + }, + "hash": "462ce7087bb17b87b12e21f38005ab653827f8f934c42e1c83aad0e54e64a1d2" +} diff --git a/.sqlx/query-92403bcb08bdc1dce6d00710d275a1e2e5fc3e81cead46fdfd8f497cb73a3c21.json b/.sqlx/query-92403bcb08bdc1dce6d00710d275a1e2e5fc3e81cead46fdfd8f497cb73a3c21.json new file mode 100644 index 00000000..5274d70c --- /dev/null +++ b/.sqlx/query-92403bcb08bdc1dce6d00710d275a1e2e5fc3e81cead46fdfd8f497cb73a3c21.json @@ -0,0 +1,59 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM subscription\n WHERE id = $1\n AND ($2::text IS NULL OR client_id = $2)\n RETURNING\n id,\n created_date_time,\n modification_date_time,\n client_id,\n client_name,\n program_id,\n object_operations\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "created_date_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "modification_date_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "client_id", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "client_name", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "program_id", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "object_operations", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + true + ] + }, + "hash": "92403bcb08bdc1dce6d00710d275a1e2e5fc3e81cead46fdfd8f497cb73a3c21" +} diff --git a/.sqlx/query-9b3e5690d65d00e1ebd48aafe52d8b468890fd946f54278c51ffa6573afcd1e1.json b/.sqlx/query-9b3e5690d65d00e1ebd48aafe52d8b468890fd946f54278c51ffa6573afcd1e1.json new file mode 100644 index 00000000..e51abc47 --- /dev/null +++ b/.sqlx/query-9b3e5690d65d00e1ebd48aafe52d8b468890fd946f54278c51ffa6573afcd1e1.json @@ -0,0 +1,59 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n id,\n created_date_time,\n modification_date_time,\n client_id,\n client_name,\n program_id,\n object_operations\n FROM subscription\n WHERE id = $1\n AND ($2::text IS NULL OR client_id = $2)\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "created_date_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "modification_date_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "client_id", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "client_name", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "program_id", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "object_operations", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + true + ] + }, + "hash": "9b3e5690d65d00e1ebd48aafe52d8b468890fd946f54278c51ffa6573afcd1e1" +} diff --git a/.sqlx/query-b6693aca2cc4399a3341ab52a85f3c2823ffcc7452afc5405a05cce56d99154d.json b/.sqlx/query-b6693aca2cc4399a3341ab52a85f3c2823ffcc7452afc5405a05cce56d99154d.json deleted file mode 100644 index 2ecf9de3..00000000 --- a/.sqlx/query-b6693aca2cc4399a3341ab52a85f3c2823ffcc7452afc5405a05cce56d99154d.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT vp.program_id, v.ven_name\n FROM ven_program vp\n JOIN ven v ON v.id = vp.ven_id\n WHERE vp.program_id = ANY ($1)\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "program_id", - "type_info": "Text" - }, - { - "ordinal": 1, - "name": "ven_name", - "type_info": "Text" - } - ], - "parameters": { - "Left": [ - "TextArray" - ] - }, - "nullable": [ - false, - false - ] - }, - "hash": "b6693aca2cc4399a3341ab52a85f3c2823ffcc7452afc5405a05cce56d99154d" -} diff --git a/.sqlx/query-c2b1203977cf1ca599d3d67d91f679193136a59760f747a34bded1bb725dd1ec.json b/.sqlx/query-c2b1203977cf1ca599d3d67d91f679193136a59760f747a34bded1bb725dd1ec.json new file mode 100644 index 00000000..5a77fc17 --- /dev/null +++ b/.sqlx/query-c2b1203977cf1ca599d3d67d91f679193136a59760f747a34bded1bb725dd1ec.json @@ -0,0 +1,61 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO subscription (\n id,\n created_date_time,\n modification_date_time,\n client_id,\n client_name,\n program_id,\n object_operations\n )\n VALUES (gen_random_uuid(), now(), now(), $1, $2::text, $3, $4)\n RETURNING\n id,\n created_date_time,\n modification_date_time,\n client_id,\n client_name,\n program_id,\n object_operations\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "created_date_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "modification_date_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "client_id", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "client_name", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "program_id", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "object_operations", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Jsonb" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + true + ] + }, + "hash": "c2b1203977cf1ca599d3d67d91f679193136a59760f747a34bded1bb725dd1ec" +} diff --git a/.sqlx/query-e36a438849b5a47e6f2bc5304712aa2aab563bf5850d77ce0d58f75845c63366.json b/.sqlx/query-e36a438849b5a47e6f2bc5304712aa2aab563bf5850d77ce0d58f75845c63366.json new file mode 100644 index 00000000..3085951b --- /dev/null +++ b/.sqlx/query-e36a438849b5a47e6f2bc5304712aa2aab563bf5850d77ce0d58f75845c63366.json @@ -0,0 +1,63 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n id,\n created_date_time,\n modification_date_time,\n client_id,\n client_name,\n program_id,\n object_operations\n FROM subscription\n WHERE ($1::text IS NULL OR client_id = $1)\n AND ($2::text IS NULL OR client_name = $2)\n AND ($3::text IS NULL OR program_id = $3 OR program_id IS NULL)\n AND ($4::text IS NULL OR jsonb_path_exists(\n object_operations,\n '$[*].objects[*] ? (@ == $obj)',\n jsonb_build_object('obj', $4)\n ))\n ORDER BY created_date_time\n OFFSET $5 LIMIT $6\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "created_date_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 2, + "name": "modification_date_time", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "client_id", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "client_name", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "program_id", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "object_operations", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Text", + "Text", + "Int8", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + true + ] + }, + "hash": "e36a438849b5a47e6f2bc5304712aa2aab563bf5850d77ce0d58f75845c63366" +} diff --git a/Cargo.lock b/Cargo.lock index 0f41c084..20491072 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,6 +140,7 @@ checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" dependencies = [ "axum-core", "axum-macros", + "base64", "bytes", "form_urlencoded", "futures-util", @@ -158,8 +159,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -571,6 +574,12 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" + [[package]] name = "der" version = "0.7.10" @@ -3114,6 +3123,18 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "tokio-tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -3246,6 +3267,23 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.2", + "sha1", + "thiserror 2.0.18", + "utf-8", +] + [[package]] name = "typenum" version = "1.19.0" @@ -3316,6 +3354,12 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" diff --git a/fixtures/subscriptions.sql b/fixtures/subscriptions.sql new file mode 100644 index 00000000..dd363957 --- /dev/null +++ b/fixtures/subscriptions.sql @@ -0,0 +1,22 @@ +INSERT INTO subscription (id, + created_date_time, + modification_date_time, + client_id, + client_name, + program_id, + object_operations) + +VALUES ('subscription-1', + '2024-07-25 08:31:10.776000 +00:00', + '2024-07-25 08:31:10.776000 +00:00', + 'ven-1-client-id', + 'abc', + NULL, + '[]'::jsonb), + ('subscription-2', + '2024-07-25 08:31:10.776000 +00:00', + '2024-07-25 08:31:10.776000 +00:00', + 'ven-1-client-id', + 'abc', + NULL, + '[]'::jsonb); diff --git a/migrations/20260218122421_add_subscriptions.sql b/migrations/20260218122421_add_subscriptions.sql new file mode 100644 index 00000000..1b07c760 --- /dev/null +++ b/migrations/20260218122421_add_subscriptions.sql @@ -0,0 +1,12 @@ +create table subscription +( + id text not null + constraint subscription_pk + primary key, + created_date_time timestamptz not null, + modification_date_time timestamptz not null, + client_id text not null references ven (client_id), + client_name text not null, + program_id text, + object_operations jsonb +); diff --git a/openleadr-vtn/Cargo.toml b/openleadr-vtn/Cargo.toml index b74c179e..5ac47a1e 100644 --- a/openleadr-vtn/Cargo.toml +++ b/openleadr-vtn/Cargo.toml @@ -19,7 +19,7 @@ serde.workspace = true serde_json.workspace = true reqwest.workspace = true -axum.workspace = true +axum = { workspace = true, features = ["ws"] } axum-extra.workspace = true tokio = { workspace = true, features = ["full"] } tower-http = { workspace = true } diff --git a/openleadr-vtn/src/api/mod.rs b/openleadr-vtn/src/api/mod.rs index 2f1cd5c3..79525156 100644 --- a/openleadr-vtn/src/api/mod.rs +++ b/openleadr-vtn/src/api/mod.rs @@ -16,6 +16,7 @@ pub(crate) mod event; pub(crate) mod program; pub(crate) mod report; pub(crate) mod resource; +pub(crate) mod subscription; #[cfg(feature = "internal-oauth")] pub(crate) mod user; pub(crate) mod ven; diff --git a/openleadr-vtn/src/api/resource.rs b/openleadr-vtn/src/api/resource.rs index 9957411f..965844db 100644 --- a/openleadr-vtn/src/api/resource.rs +++ b/openleadr-vtn/src/api/resource.rs @@ -4,13 +4,15 @@ use axum::{ extract::{Path, State}, Json, }; -use openleadr_wire::ven::VenId; use reqwest::StatusCode; use serde::Deserialize; use tracing::{info, trace}; use validator::Validate; -use openleadr_wire::resource::{BlResourceRequest, Resource, ResourceId, ResourceRequest}; +use openleadr_wire::{ + resource::{BlResourceRequest, Resource, ResourceId, ResourceRequest}, + ven::VenId, +}; use crate::{ api::{AppResponse, TargetQueryParams, ValidatedJson, ValidatedQuery}, diff --git a/openleadr-vtn/src/api/subscription.rs b/openleadr-vtn/src/api/subscription.rs new file mode 100644 index 00000000..aec3bc31 --- /dev/null +++ b/openleadr-vtn/src/api/subscription.rs @@ -0,0 +1,395 @@ +use std::{collections::HashMap, sync::Arc}; + +use axum::{ + extract::{Path, State}, + Json, +}; +use openleadr_wire::{ + program::ProgramId, + subscription::{Subscription, SubscriptionId, SubscriptionRequest}, + ObjectType, +}; +use reqwest::StatusCode; +use serde::Deserialize; +use tokio::sync::Mutex; +use tracing::{error, info, trace}; +use validator::Validate; + +use crate::{ + api::{AppResponse, ValidatedJson, ValidatedQuery}, + data_source::SubscriptionCrud, + error::AppError, + jwt::{Scope, User}, + state::AppState, +}; + +pub(crate) struct NotifierState { + subscriptions: Mutex>, +} + +impl NotifierState { + pub(crate) async fn load_from_storage( + storage: &dyn SubscriptionCrud, + ) -> Result { + let subscriptions = storage + .retrieve_all( + &QueryParams { + program_id: None, + client_name: None, + objects: None, + skip: 0, + limit: i64::MAX, + }, + &None, + ) + .await?; + + Ok(Self { + subscriptions: Mutex::new( + subscriptions + .into_iter() + .map(|subscription| (subscription.id.clone(), subscription)) + .collect(), + ), + }) + } +} + +pub async fn get_all( + State(subscription_source): State>, + ValidatedQuery(query_params): ValidatedQuery, + User(user): User, +) -> AppResponse> { + trace!(?query_params); + + // FIXME update retrieve_all implementation when removing this + if query_params.objects.as_deref().unwrap_or(&[]).len() > 1 { + let error = "Tried to filter subscriptions by multiple object types. \ + This is not allowed in the current version of openLEADR as the specification \ + is not quite clear about if this should require all or any of the object types \ + to apply to subscriptions. If you have a use case for either option, please \ + open an issue on GitHub."; + error!("{}", error); + return Err(AppError::BadRequest(error)); + } + + let resources = if user.scope.contains(Scope::ReadAll) { + subscription_source + .retrieve_all(&query_params, &None) + .await? + } else if user.scope.contains(Scope::ReadVenObjects) { + subscription_source + .retrieve_all(&query_params, &Some(user.client_id()?)) + .await? + } else { + return Err(AppError::Forbidden( + "Missing 'read_all' or 'read_ven_objects' scope", + )); + }; + + trace!( + client_id = user.sub, + "retrieved {} resources", + resources.len() + ); + + Ok(Json(resources)) +} + +pub async fn get( + State(subscription_source): State>, + Path(id): Path, + User(user): User, +) -> AppResponse { + let subscription = if user.scope.contains(Scope::ReadAll) { + subscription_source.retrieve(&id, &None).await? + } else if user.scope.contains(Scope::ReadVenObjects) { + subscription_source + .retrieve(&id, &Some(user.client_id()?)) + .await? + } else { + return Err(AppError::Forbidden( + "Missing 'read_all' or 'read_ven_objects' scope", + )); + }; + + trace!( + %subscription.id, + subscription.program_id=?subscription.content.program_id, + client_id = user.sub, + "subscription retrieved" + ); + + Ok(Json(subscription)) +} + +pub async fn add( + State(subscription_source): State>, + State(app_state): State, + User(user): User, + ValidatedJson(new_subscription): ValidatedJson, +) -> Result<(StatusCode, Json), AppError> { + let client_id = user.client_id()?; + + let subscription = if user.scope.contains(Scope::WriteSubscriptions) { + subscription_source + .create(new_subscription, &Some(client_id)) + .await? + } else { + return Err(AppError::Forbidden("Missing 'write_vens' scope")); + }; + + app_state + .notifier + .subscriptions + .lock() + .await + .insert(subscription.id.clone(), subscription.clone()); + + info!( + %subscription.id, + subscription.program_id=?subscription.content.program_id, + client_id = user.sub, + "resource added" + ); + + Ok((StatusCode::CREATED, Json(subscription))) +} + +pub async fn edit( + State(subscription_source): State>, + State(app_state): State, + Path(id): Path, + User(user): User, + ValidatedJson(update): ValidatedJson, +) -> AppResponse { + let subscription = if user.scope.contains(Scope::WriteSubscriptions) { + subscription_source + .update(&id, update, &Some(user.client_id()?)) + .await? + } else { + return Err(AppError::Forbidden("Missing 'write_subscriptions' scope")); + }; + + app_state + .notifier + .subscriptions + .lock() + .await + .insert(subscription.id.clone(), subscription.clone()); + + info!( + %subscription.id, + subscription.program_id=?subscription.content.program_id, + client_id = user.sub, + "resource updated" + ); + + Ok(Json(subscription)) +} + +pub async fn delete( + State(subscription_source): State>, + State(app_state): State, + Path(id): Path, + User(user): User, +) -> AppResponse { + let subscription = if user.scope.contains(Scope::WriteSubscriptions) { + subscription_source + .delete(&id, &Some(user.client_id()?)) + .await? + } else { + return Err(AppError::Forbidden("Missing 'write_subscriptions' scope")); + }; + + app_state + .notifier + .subscriptions + .lock() + .await + .remove(&subscription.id); + + info!(%id, client_id = user.sub, "deleted subscription"); + + Ok(Json(subscription)) +} + +#[derive(Deserialize, Validate, Debug)] +#[serde(rename_all = "camelCase")] +pub struct QueryParams { + #[serde(rename = "programID")] + pub(crate) program_id: Option, + #[validate(length(min = 1, max = 128))] + pub(crate) client_name: Option, + #[validate(length(min = 0, max = 6))] + pub(crate) objects: Option>, + #[serde(default)] + #[validate(range(min = 0))] + pub(crate) skip: i64, + #[validate(range(min = 1, max = 50))] + #[serde(default = "get_50")] + pub(crate) limit: i64, +} + +fn get_50() -> i64 { + 50 +} + +#[cfg(test)] +mod test { + use axum::body::Body; + use openleadr_wire::{problem::Problem, subscription::Subscription}; + use reqwest::{Method, StatusCode}; + use sqlx::PgPool; + + use crate::{api::test::ApiTest, jwt::Scope}; + + #[sqlx::test(fixtures("vens", "users"))] + async fn empty_object_operations_not_allowed(db: PgPool) { + let server = ApiTest::new( + db, + "ven-1-client-id", + vec![Scope::WriteSubscriptions, Scope::ReadAll], + ) + .await; + + let (status, _) = server + .request::( + Method::POST, + "/subscriptions", + Body::from(r#"{"clientName": "ven-1-name", "objectOperations": []}"#), + ) + .await; + assert_eq!(status, StatusCode::BAD_REQUEST); + } + + #[sqlx::test(fixtures("vens", "users"))] + async fn get_many(db: PgPool) { + let server = ApiTest::new( + db, + "ven-1-client-id", + vec![Scope::WriteSubscriptions, Scope::ReadAll], + ) + .await; + + let (status, _) = server + .request::( + Method::POST, + "/subscriptions", + Body::from( + r#"{ + "clientName": "myClient", + "objectOperations": [{ + "mechanism": "WEBSOCKET", + "operations": ["CREATE", "UPDATE"], + "objects": ["EVENT", "PROGRAM"] + }] + }"#, + ), + ) + .await; + assert_eq!(status, StatusCode::CREATED); + + let (status, _) = server + .request::( + Method::POST, + "/subscriptions", + Body::from( + r#"{ + "clientName": "myClient", + "programId": "PROGRAM-100", + "objectOperations": [{ + "mechanism": "WEBSOCKET", + "operations": ["CREATE", "UPDATE"], + "objects": ["EVENT", "RESOURCE"] + }] + }"#, + ), + ) + .await; + assert_eq!(status, StatusCode::CREATED); + + let (status, subscriptions) = server + .request::>(Method::GET, "/subscriptions", Body::empty()) + .await; + assert_eq!(status, StatusCode::OK); + assert_eq!(subscriptions.len(), 2); + + let (status, subscriptions) = server + .request::>( + Method::GET, + "/subscriptions?objects=EVENT", + Body::empty(), + ) + .await; + assert_eq!(status, StatusCode::OK); + assert_eq!(subscriptions.len(), 2); + + let (status, subscriptions) = server + .request::>( + Method::GET, + "/subscriptions?objects=RESOURCE", + Body::empty(), + ) + .await; + assert_eq!(status, StatusCode::OK); + assert_eq!(subscriptions.len(), 1); + + let (status, subscriptions) = server + .request::>( + Method::GET, + "/subscriptions?objects=REPORT", + Body::empty(), + ) + .await; + assert_eq!(status, StatusCode::OK); + assert_eq!(subscriptions.len(), 0); + + let (status, _) = server + .request::(Method::GET, "/subscriptions?objects=INVALID", Body::empty()) + .await; + assert_eq!(status, StatusCode::BAD_REQUEST); + + let (status, subscriptions) = server + .request::>( + Method::GET, + "/subscriptions?programID=PROGRAM-100", + Body::empty(), + ) + .await; + assert_eq!(status, StatusCode::OK); + // subscriptions without program id filter also match + assert_eq!(subscriptions.len(), 2); + + let (status, subscriptions) = server + .request::>( + Method::GET, + "/subscriptions?programID=PROGRAM-999999", + Body::empty(), + ) + .await; + assert_eq!(status, StatusCode::OK); + assert_eq!(subscriptions.len(), 2); + } + + #[sqlx::test(fixtures("vens", "users"))] + async fn get_many_multiple_object_types_not_allowed(db: PgPool) { + let server = ApiTest::new( + db, + "ven-1-client-id", + vec![Scope::WriteSubscriptions, Scope::ReadAll], + ) + .await; + + let (status, _) = server + .request::( + Method::GET, + "/subscriptions?objects=PROGRAM&objects=RESOURCE", + Body::empty(), + ) + .await; + assert_eq!(status, StatusCode::BAD_REQUEST); + } + + // FIXME add edit and delete tests +} diff --git a/openleadr-vtn/src/data_source/mod.rs b/openleadr-vtn/src/data_source/mod.rs index 0fd27109..a823a071 100644 --- a/openleadr-vtn/src/data_source/mod.rs +++ b/openleadr-vtn/src/data_source/mod.rs @@ -9,6 +9,7 @@ use openleadr_wire::{ program::{ProgramId, ProgramRequest}, report::{ReportId, ReportRequest}, resource::{BlResourceRequest, Resource, ResourceId}, + subscription::{Subscription, SubscriptionId, SubscriptionRequest}, target::Target, ven::{BlVenRequest, Ven, VenId}, ClientId, Event, Program, Report, @@ -122,6 +123,18 @@ pub trait ResourceCrud: { } +pub trait SubscriptionCrud: + Crud< + Type = Subscription, + Id = SubscriptionId, + NewType = SubscriptionRequest, + Error = AppError, + Filter = crate::api::subscription::QueryParams, + PermissionFilter = Option, +> +{ +} + #[derive(Serialize, Deserialize, PartialEq, Debug)] pub struct UserDetails { pub(crate) id: String, @@ -181,6 +194,7 @@ pub trait DataSource: Send + Sync + 'static { fn vens(&self) -> Arc; fn ven_object_privacy(&self) -> Arc; fn resources(&self) -> Arc; + fn subscriptions(&self) -> Arc; #[cfg(feature = "internal-oauth")] fn auth(&self) -> Arc; fn connection_active(&self) -> bool; diff --git a/openleadr-vtn/src/data_source/postgres/mod.rs b/openleadr-vtn/src/data_source/postgres/mod.rs index 0618aa68..51213fab 100644 --- a/openleadr-vtn/src/data_source/postgres/mod.rs +++ b/openleadr-vtn/src/data_source/postgres/mod.rs @@ -6,7 +6,7 @@ use crate::{ data_source::{ postgres::{ event::PgEventStorage, program::PgProgramStorage, report::PgReportStorage, - ven::PgVenStorage, + subscription::PgSubscriptionStorage, ven::PgVenStorage, }, DataSource, EventCrud, ProgramCrud, ReportCrud, ResourceCrud, VenCrud, }, @@ -25,6 +25,7 @@ mod event; mod program; mod report; mod resource; +mod subscription; #[cfg(feature = "internal-oauth")] mod user; mod ven; @@ -59,6 +60,10 @@ impl DataSource for PostgresStorage { Arc::::new(self.db.clone().into()) } + fn subscriptions(&self) -> Arc { + Arc::::new(self.db.clone().into()) + } + #[cfg(feature = "internal-oauth")] fn auth(&self) -> Arc { Arc::::new(self.db.clone().into()) diff --git a/openleadr-vtn/src/data_source/postgres/subscription.rs b/openleadr-vtn/src/data_source/postgres/subscription.rs new file mode 100644 index 00000000..fd45ef73 --- /dev/null +++ b/openleadr-vtn/src/data_source/postgres/subscription.rs @@ -0,0 +1,312 @@ +use crate::{ + api::subscription::QueryParams, + data_source::{Crud, SubscriptionCrud}, + error::AppError, +}; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use openleadr_wire::{ + subscription::{Subscription, SubscriptionId, SubscriptionRequest}, + ClientId, +}; +use sqlx::PgPool; +use tracing::{error, trace, warn}; + +impl SubscriptionCrud for PgSubscriptionStorage {} + +pub(crate) struct PgSubscriptionStorage { + db: PgPool, +} + +impl From for PgSubscriptionStorage { + fn from(db: PgPool) -> Self { + Self { db } + } +} + +#[derive(Debug)] +pub(crate) struct PostgresSubscription { + id: String, + created_date_time: DateTime, + modification_date_time: DateTime, + client_id: String, + client_name: String, + program_id: Option, + object_operations: serde_json::Value, + //targets: Vec, +} + +impl TryFrom for Subscription { + type Error = AppError; + + #[tracing::instrument(name = "TryFrom for Subscription")] + fn try_from(value: PostgresSubscription) -> Result { + let object_operations = serde_json::from_value(value.object_operations) + .inspect_err(|err| { + error!( + ?err, + "Failed to deserialize JSON from DB to `Vec`" + ) + }) + .map_err(AppError::SerdeJsonInternalServerError)?; + + Ok(Self { + id: value.id.parse()?, + created_date_time: value.created_date_time, + modification_date_time: value.modification_date_time, + client_id: value.client_id.parse()?, + content: SubscriptionRequest { + client_name: value.client_name, + program_id: value + .program_id + .map(|program_id| program_id.parse()) + .transpose()?, + object_operations, + }, + }) + } +} + +#[async_trait] +impl Crud for PgSubscriptionStorage { + type Type = Subscription; + type Id = SubscriptionId; + type NewType = SubscriptionRequest; + type Error = AppError; + type Filter = QueryParams; + type PermissionFilter = Option; + + async fn create( + &self, + new: Self::NewType, + client_id: &Self::PermissionFilter, + ) -> Result { + let subscription: Subscription = sqlx::query_as!( + PostgresSubscription, + r#" + INSERT INTO subscription ( + id, + created_date_time, + modification_date_time, + client_id, + client_name, + program_id, + object_operations + ) + VALUES (gen_random_uuid(), now(), now(), $1, $2::text, $3, $4) + RETURNING + id, + created_date_time, + modification_date_time, + client_id, + client_name, + program_id, + object_operations + "#, + client_id + .as_ref() + .expect("subscription create requires client id") + .as_str(), + new.client_name, + new.program_id.as_ref().map(|id| id.as_str()), + serde_json::to_value(new.object_operations).map_err(AppError::SerdeJsonBadRequest)?, + ) + .fetch_one(&self.db) + .await? + .try_into()?; + + Ok(subscription) + } + + async fn retrieve( + &self, + id: &Self::Id, + client_id: &Self::PermissionFilter, + ) -> Result { + let subscription = sqlx::query_as!( + PostgresSubscription, + r#" + SELECT + id, + created_date_time, + modification_date_time, + client_id, + client_name, + program_id, + object_operations + FROM subscription + WHERE id = $1 + AND ($2::text IS NULL OR client_id = $2) + "#, + id.as_str(), + client_id as _ + ) + .fetch_one(&self.db) + .await? + .try_into()?; + + Ok(subscription) + } + + async fn retrieve_all( + &self, + filter: &Self::Filter, + client_id: &Self::PermissionFilter, + ) -> Result, Self::Error> { + let res = sqlx::query_as!( + PostgresSubscription, + r#" + SELECT + id, + created_date_time, + modification_date_time, + client_id, + client_name, + program_id, + object_operations + FROM subscription + WHERE ($1::text IS NULL OR client_id = $1) + AND ($2::text IS NULL OR client_name = $2) + AND ($3::text IS NULL OR program_id = $3 OR program_id IS NULL) + AND ($4::text IS NULL OR jsonb_path_exists( + object_operations, + '$[*].objects[*] ? (@ == $obj)', + jsonb_build_object('obj', $4) + )) + ORDER BY created_date_time + OFFSET $5 LIMIT $6 + "#, + client_id as _, + filter.client_name, + filter + .program_id + .as_ref() + .map(|program_id| program_id.as_str()), + // We check that at most one object is present in the api module + filter.objects.as_ref().map(|objects| objects[0].as_str()), + filter.skip, + filter.limit, + ) + .fetch_all(&self.db) + .await? + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?; + + trace!("retrieved {} subscriptions", res.len()); + + Ok(res) + } + + async fn update( + &self, + id: &Self::Id, + new: Self::NewType, + client_id: &Self::PermissionFilter, + ) -> Result { + let subscription: Subscription = sqlx::query_as!( + PostgresSubscription, + r#" + UPDATE subscription + SET modification_date_time = now(), + client_name = $2, + program_id = $3, + object_operations = $4 + WHERE id = $1 + AND ($5::text IS NULL OR client_id = $5) + RETURNING + id, + created_date_time, + modification_date_time, + client_id, + client_name, + program_id, + object_operations + "#, + id.as_str(), + new.client_name, + new.program_id.as_ref().map(|id| id.as_str()), + serde_json::to_value(&new.object_operations).map_err(AppError::SerdeJsonBadRequest)?, + client_id as _ + ) + .fetch_one(&self.db) + .await? + .try_into()?; + + Ok(subscription) + } + + async fn delete( + &self, + id: &Self::Id, + client_id: &Self::PermissionFilter, + ) -> Result { + Ok(sqlx::query_as!( + PostgresSubscription, + r#" + DELETE FROM subscription + WHERE id = $1 + AND ($2::text IS NULL OR client_id = $2) + RETURNING + id, + created_date_time, + modification_date_time, + client_id, + client_name, + program_id, + object_operations + "#, + id.as_str(), + client_id as _ + ) + .fetch_one(&self.db) + .await? + .try_into()?) + } +} + +#[cfg(test)] +#[cfg(feature = "live-db-test")] +mod test { + use crate::{ + api::subscription::QueryParams, + data_source::{postgres::subscription::PgSubscriptionStorage, Crud}, + }; + use sqlx::PgPool; + + impl Default for QueryParams { + fn default() -> Self { + Self { + program_id: None, + client_name: None, + objects: None, + skip: 0, + limit: 50, + } + } + } + + #[sqlx::test(fixtures("users", "vens", "resources", "subscriptions"))] // FIXME remove unnecessary fixtures + async fn retrieve_all(db: PgPool) { + let repo = PgSubscriptionStorage::from(db.clone()); + + let subscription = repo + .retrieve_all( + &QueryParams::default(), + &Some("ven-1-client-id".parse().unwrap()), + ) + .await + .unwrap(); + assert_eq!(subscription.len(), 2); + + // Ensure a client cannot see subscriptions of another client + let subscription = repo + .retrieve_all( + &QueryParams::default(), + &Some("ven-2-client-id".parse().unwrap()), + ) + .await + .unwrap(); + assert_eq!(subscription.len(), 0); + } +} diff --git a/openleadr-vtn/src/state.rs b/openleadr-vtn/src/state.rs index 3cef106a..f8c3b786 100644 --- a/openleadr-vtn/src/state.rs +++ b/openleadr-vtn/src/state.rs @@ -1,12 +1,13 @@ #[cfg(feature = "internal-oauth")] use crate::api::auth; +use crate::data_source::SubscriptionCrud; #[cfg(feature = "internal-oauth")] use crate::{api::user, data_source::AuthSource}; #[cfg(feature = "internal-oauth")] use axum::routing::{delete, post}; use crate::{ - api::{event, healthcheck, program, report, resource, ven}, + api::{event, healthcheck, program, report, resource, subscription, ven}, data_source::{ DataSource, EventCrud, ProgramCrud, ReportCrud, ResourceCrud, VenCrud, VenObjectPrivacy, }, @@ -45,6 +46,7 @@ use tracing::{info, warn}; pub struct AppState { pub storage: Arc, pub jwt_manager: Arc, + pub(crate) notifier: Arc, } #[derive(Debug, Default, Copy, Clone)] @@ -274,9 +276,14 @@ impl AppState { OAuthType::External => external_oauth_from_env(key_type).await, }; + let notifier = subscription::NotifierState::load_from_storage(&*storage.subscriptions()) + .await + .expect("failed to retrieve subscriptions from database"); + Self { storage: Arc::new(storage), jwt_manager: Arc::new(jwt_manager), + notifier: Arc::new(notifier), } } @@ -311,6 +318,16 @@ impl AppState { .put(resource::edit) .delete(resource::delete), ) + .route( + "/subscriptions", + get(subscription::get_all).post(subscription::add), + ) + .route( + "/subscriptions/{id}", + get(subscription::get) + .put(subscription::edit) + .delete(subscription::delete), + ) .route("/auth/server", get(auth_server_handler)); #[cfg(feature = "internal-oauth")] { @@ -403,8 +420,21 @@ impl FromRef for Arc { } } +impl FromRef for Arc { + fn from_ref(state: &AppState) -> Self { + state.storage.subscriptions() + } +} + #[cfg(test)] mod test { + use openleadr_wire::{ + subscription::{Subscription, SubscriptionId, SubscriptionRequest}, + ClientId, + }; + + use crate::data_source::Crud; + use super::*; struct MockDataSource {} @@ -433,6 +463,10 @@ mod test { unimplemented!() } + fn subscriptions(&self) -> Arc { + Arc::new(MockSubscriptionSource) + } + #[cfg(feature = "internal-oauth")] fn auth(&self) -> Arc { unimplemented!() @@ -443,6 +477,61 @@ mod test { } } + struct MockSubscriptionSource; + + #[async_trait::async_trait] + impl Crud for MockSubscriptionSource { + type Type = Subscription; + type Id = SubscriptionId; + type NewType = SubscriptionRequest; + type Error = AppError; + type Filter = subscription::QueryParams; + type PermissionFilter = Option; + + async fn create( + &self, + _new: Self::NewType, + _client_id: &Self::PermissionFilter, + ) -> Result { + unimplemented!() + } + + async fn retrieve( + &self, + _id: &Self::Id, + _client_id: &Self::PermissionFilter, + ) -> Result { + unimplemented!() + } + + async fn retrieve_all( + &self, + _filter: &Self::Filter, + _client_id: &Self::PermissionFilter, + ) -> Result, Self::Error> { + Ok(vec![]) + } + + async fn update( + &self, + _id: &Self::Id, + _new: Self::NewType, + _client_id: &Self::PermissionFilter, + ) -> Result { + unimplemented!() + } + + async fn delete( + &self, + _id: &Self::Id, + _client_id: &Self::PermissionFilter, + ) -> Result { + unimplemented!() + } + } + + impl SubscriptionCrud for MockSubscriptionSource {} + mod state_from_env_var { use super::*; use serial_test::serial; diff --git a/openleadr-wire/src/lib.rs b/openleadr-wire/src/lib.rs index 0cf81fd9..0d5b0ff1 100644 --- a/openleadr-wire/src/lib.rs +++ b/openleadr-wire/src/lib.rs @@ -142,7 +142,7 @@ impl Display for Identifier { } } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "UPPERCASE")] pub enum ObjectType { Program, @@ -153,6 +153,19 @@ pub enum ObjectType { Resource, } +impl ObjectType { + pub fn as_str(self) -> &'static str { + match self { + ObjectType::Program => "PROGRAM", + ObjectType::Event => "EVENT", + ObjectType::Report => "REPORT", + ObjectType::Subscription => "SUBSCRIPTION", + ObjectType::Ven => "VEN", + ObjectType::Resource => "RESOURCE", + } + } +} + /// An ISO 8601 formatted duration #[derive(Clone, Debug, PartialEq)] pub struct Duration(iso8601_duration::Duration); diff --git a/openleadr-wire/src/subscription.rs b/openleadr-wire/src/subscription.rs index 7f623280..394923ff 100644 --- a/openleadr-wire/src/subscription.rs +++ b/openleadr-wire/src/subscription.rs @@ -9,8 +9,8 @@ use serde_with::{serde_as, skip_serializing_none}; use validator::Validate; use crate::{ - program::ProgramId, resource::Resource, Event, Identifier, IdentifierError, ObjectType, - Program, Report, Ven, + program::ProgramId, resource::Resource, ClientId, Event, Identifier, IdentifierError, + ObjectType, Program, Report, Ven, }; /// Server provided representation of subscription @@ -25,6 +25,7 @@ pub struct Subscription { /// datetime in ISO 8601 format #[serde(with = "crate::serde_rfc3339")] pub modification_date_time: DateTime, + pub client_id: ClientId, #[serde(flatten)] #[validate(nested)] pub content: SubscriptionRequest, @@ -39,6 +40,7 @@ pub struct Subscription { #[serde(rename_all = "camelCase")] pub struct SubscriptionRequest { /// User generated identifier, may be VEN identifier provisioned out-of-band. + #[serde(deserialize_with = "crate::string_within_range_inclusive::<1, 128, _>")] pub client_name: String, /// ID attribute of the program object this subscription is associated with. @@ -46,6 +48,7 @@ pub struct SubscriptionRequest { pub program_id: Option, /// list of objects and operations to subscribe to. + #[validate(length(min = 1, max = 15))] pub object_operations: Vec, // /// A list of target objects. Used by server to filter notifications. // #[serde(default)] @@ -76,7 +79,7 @@ pub struct SubscriptionObjectOperation { pub bearer_token: Option, } -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "UPPERCASE")] pub enum Operation { Create, @@ -167,6 +170,17 @@ impl AnyObject { AnyObject::Resource(resource) => resource.id.0.clone(), } } + + pub fn kind(&self) -> ObjectType { + match self { + AnyObject::Program(_) => ObjectType::Program, + AnyObject::Report(_) => ObjectType::Report, + AnyObject::Event(_) => ObjectType::Event, + AnyObject::Subscription(_) => ObjectType::Subscription, + AnyObject::Ven(_) => ObjectType::Ven, + AnyObject::Resource(_) => ObjectType::Resource, + } + } } /// Provides details of each notifier binding supported