Skip to content

graph, store: Avoid using to_jsonb when looking up a single entity #5372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ axum = "0.7.5"
chrono = "0.4.38"
clap = { version = "4.5.4", features = ["derive", "env"] }
derivative = "2.2.0"
diesel = { version = "2.2.4", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid"] }
diesel = { version = "2.2.4", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid", "i-implement-a-third-party-backend-and-opt-into-breaking-changes"] }
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
diesel-dynamic-schema = "0.2.1"
diesel-dynamic-schema = { version = "0.2.1", features = ["postgres"] }
diesel_derives = "2.1.4"
diesel_migrations = "2.1.0"
graph = { path = "./graph" }
Expand Down
24 changes: 22 additions & 2 deletions graph/src/data/store/scalar/bigdecimal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use diesel::deserialize::FromSqlRow;
use diesel::expression::AsExpression;
use num_bigint;
use num_bigint::{self, ToBigInt};
use num_traits::FromPrimitive;
use serde::{self, Deserialize, Serialize};
use stable_hash::{FieldAddress, StableHash};
Expand All @@ -10,8 +10,8 @@ use std::fmt::{self, Display, Formatter};
use std::ops::{Add, Div, Mul, Sub};
use std::str::FromStr;

use crate::anyhow::anyhow;
use crate::runtime::gas::{Gas, GasSizeOf};

use old_bigdecimal::BigDecimal as OldBigDecimal;
pub use old_bigdecimal::ToPrimitive;

Expand Down Expand Up @@ -60,6 +60,26 @@ impl BigDecimal {
self.0.as_bigint_and_exponent()
}

pub fn is_integer(&self) -> bool {
self.0.is_integer()
}

/// Convert this `BigDecimal` to a `BigInt` if it is an integer, and
/// return an error if it is not. Also return an error if the integer
/// would use too many digits as definied by `BigInt::new`
pub fn to_bigint(&self) -> Result<BigInt, anyhow::Error> {
if !self.is_integer() {
return Err(anyhow!(
"Cannot convert non-integer `BigDecimal` to `BigInt`: {:?}",
self
));
}
let bi = self.0.to_bigint().ok_or_else(|| {
anyhow!("The implementation of `to_bigint` for `OldBigDecimal` always returns `Some`")
})?;
BigInt::new(bi)
}

pub fn digits(&self) -> u64 {
self.0.digits()
}
Expand Down
8 changes: 8 additions & 0 deletions graph/src/data/store/scalar/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use diesel::deserialize::FromSql;
use diesel::pg::PgValue;
use diesel::serialize::ToSql;
use hex;
use serde::{self, Deserialize, Serialize};
Expand Down Expand Up @@ -115,3 +117,9 @@ impl ToSql<diesel::sql_types::Binary, diesel::pg::Pg> for Bytes {
<_ as ToSql<diesel::sql_types::Binary, _>>::to_sql(self.as_slice(), &mut out.reborrow())
}
}

impl FromSql<diesel::sql_types::Binary, diesel::pg::Pg> for Bytes {
fn from_sql(value: PgValue) -> diesel::deserialize::Result<Self> {
<Vec<u8> as FromSql<diesel::sql_types::Binary, _>>::from_sql(value).map(Bytes::from)
}
}
9 changes: 9 additions & 0 deletions graph/src/data/store/scalar/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use chrono::{DateTime, Utc};
use diesel::deserialize::FromSql;
use diesel::pg::PgValue;
use diesel::serialize::ToSql;
use serde::{self, Deserialize, Serialize};
use stable_hash::StableHash;
Expand Down Expand Up @@ -107,3 +109,10 @@ impl GasSizeOf for Timestamp {
Some(Gas::new(std::mem::size_of::<Timestamp>().saturating_into()))
}
}

impl FromSql<diesel::sql_types::Timestamptz, diesel::pg::Pg> for Timestamp {
fn from_sql(value: PgValue) -> diesel::deserialize::Result<Self> {
<DateTime<Utc> as FromSql<diesel::sql_types::Timestamptz, _>>::from_sql(value)
.map(Timestamp)
}
}
18 changes: 18 additions & 0 deletions graph/src/data/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,24 @@ impl PartialEq<&str> for Word {
}
}

impl PartialEq<str> for Word {
fn eq(&self, other: &str) -> bool {
self.as_str() == other
}
}

impl PartialEq<String> for Word {
fn eq(&self, other: &String) -> bool {
self.as_str() == other
}
}

impl PartialEq<Word> for String {
fn eq(&self, other: &Word) -> bool {
self.as_str() == other.as_str()
}
}

impl PartialEq<Word> for &str {
fn eq(&self, other: &Word) -> bool {
self == &other.as_str()
Expand Down
6 changes: 5 additions & 1 deletion graph/src/data_source/causality_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use diesel::{
serialize::{Output, ToSql},
sql_types::Integer,
};
use diesel_derives::AsExpression;
use std::fmt;

use crate::components::subgraph::Entity;
Expand All @@ -20,7 +21,10 @@ use crate::derive::CacheWeight;
/// This necessary for determinism because offchain data sources don't have a deterministic order of
/// execution, for example an IPFS file may become available at any point in time. The isolation
/// rules make the indexing result reproducible, given a set of available files.
#[derive(Debug, CacheWeight, Copy, Clone, PartialEq, Eq, FromSqlRow, Hash, PartialOrd, Ord)]
#[derive(
Debug, CacheWeight, Copy, Clone, PartialEq, Eq, FromSqlRow, Hash, PartialOrd, Ord, AsExpression,
)]
#[diesel(sql_type = Integer)]
pub struct CausalityRegion(i32);

impl fmt::Display for CausalityRegion {
Expand Down
22 changes: 0 additions & 22 deletions store/postgres/src/block_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,6 @@ impl<'a> BlockRangeColumn<'a> {
}
}
}

pub fn block(&self) -> BlockNumber {
match self {
BlockRangeColumn::Mutable { block, .. } => *block,
BlockRangeColumn::Immutable { block, .. } => *block,
}
}
}

impl<'a> BlockRangeColumn<'a> {
Expand Down Expand Up @@ -227,13 +220,6 @@ impl<'a> BlockRangeColumn<'a> {
}
}

pub fn column_name(&self) -> &str {
match self {
BlockRangeColumn::Mutable { .. } => BLOCK_RANGE_COLUMN,
BlockRangeColumn::Immutable { .. } => BLOCK_COLUMN,
}
}

/// Output the qualified name of the block range column
pub fn name(&self, out: &mut AstPass<Pg>) {
match self {
Expand Down Expand Up @@ -280,14 +266,6 @@ impl<'a> BlockRangeColumn<'a> {
}
}

/// Output the name of the block range column without the table prefix
pub(crate) fn bare_name(&self, out: &mut AstPass<Pg>) {
match self {
BlockRangeColumn::Mutable { .. } => out.push_sql(BLOCK_RANGE_COLUMN),
BlockRangeColumn::Immutable { .. } => out.push_sql(BLOCK_COLUMN),
}
}

/// Output an expression that matches all rows that have been changed
/// after `block` (inclusive)
pub(crate) fn changed_since<'b>(&'b self, out: &mut AstPass<'_, 'b, Pg>) -> QueryResult<()> {
Expand Down
6 changes: 6 additions & 0 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,12 @@ impl Borrow<str> for Namespace {
}
}

impl Borrow<str> for &Namespace {
fn borrow(&self) -> &str {
&self.0
}
}

/// A marker that an `i32` references a deployment. Values of this type hold
/// the primary key from the `deployment_schemas` table
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, AsExpression, FromSqlRow)]
Expand Down
67 changes: 59 additions & 8 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@ mod ddl_tests;
#[cfg(test)]
mod query_tests;

pub(crate) mod dsl;
pub(crate) mod index;
mod prune;
mod rollup;
pub(crate) mod value;

use diesel::deserialize::FromSql;
use diesel::pg::Pg;
use diesel::serialize::{Output, ToSql};
use diesel::sql_types::Text;
use diesel::{connection::SimpleConnection, Connection};
use diesel::{debug_query, sql_query, OptionalExtension, PgConnection, QueryResult, RunQueryDsl};
use diesel::{
debug_query, sql_query, OptionalExtension, PgConnection, QueryDsl, QueryResult, RunQueryDsl,
};
use graph::blockchain::BlockTime;
use graph::cheap_clone::CheapClone;
use graph::components::store::write::{RowGroup, WriteChunk};
Expand All @@ -50,6 +54,7 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use crate::relational::value::{FromOidRow, OidRow};
use crate::relational_queries::{
ConflictingEntitiesData, ConflictingEntitiesQuery, FindChangesQuery, FindDerivedQuery,
FindPossibleDeletionsQuery, ReturnedEntityData,
Expand All @@ -58,10 +63,10 @@ use crate::{
primary::{Namespace, Site},
relational_queries::{
ClampRangeQuery, EntityData, EntityDeletion, FilterCollection, FilterQuery, FindManyQuery,
FindQuery, InsertQuery, RevertClampQuery, RevertRemoveQuery,
InsertQuery, RevertClampQuery, RevertRemoveQuery,
},
};
use graph::components::store::DerivedEntityQuery;
use graph::components::store::{AttributeNames, DerivedEntityQuery};
use graph::data::store::{Id, IdList, IdType, BYTES_SCALAR};
use graph::data::subgraph::schema::POI_TABLE;
use graph::prelude::{
Expand Down Expand Up @@ -172,6 +177,12 @@ impl From<String> for SqlName {
}
}

impl From<SqlName> for Word {
fn from(name: SqlName) -> Self {
Word::from(name.0)
}
}

impl fmt::Display for SqlName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
Expand All @@ -184,6 +195,12 @@ impl Borrow<str> for &SqlName {
}
}

impl PartialEq<str> for SqlName {
fn eq(&self, other: &str) -> bool {
self.0 == other
}
}

impl FromSql<Text, Pg> for SqlName {
fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result<Self> {
<String as FromSql<Text, Pg>>::from_sql(bytes).map(|s| SqlName::verbatim(s))
Expand Down Expand Up @@ -361,9 +378,11 @@ impl Layout {
}

let table_name = SqlName::verbatim(POI_TABLE.to_owned());
let nsp = catalog.site.namespace.clone();
Table {
object: poi_type.to_owned(),
qualified_name: SqlName::qualified_name(&catalog.site.namespace, &table_name),
nsp,
name: table_name,
columns,
// The position of this table in all the tables for this layout; this
Expand Down Expand Up @@ -469,11 +488,19 @@ impl Layout {
key: &EntityKey,
block: BlockNumber,
) -> Result<Option<Entity>, StoreError> {
let table = self.table_for_entity(&key.entity_type)?;
FindQuery::new(table.as_ref(), key, block)
.get_result::<EntityData>(conn)
let table = self.table_for_entity(&key.entity_type)?.dsl_table();
let columns = table.selected_columns::<Entity>(&AttributeNames::All, None)?;

let query = table
.select_cols(&columns)
.filter(table.id_eq(&key.entity_id))
.filter(table.at_block(block))
.filter(table.belongs_to_causality_region(key.causality_region));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beautiful!


query
.get_result::<OidRow>(conn)
.optional()?
.map(|entity_data| entity_data.deserialize_with_layout(self, None))
.map(|row| Entity::from_oid_row(row, &self.input_schema, &columns))
.transpose()
}

Expand Down Expand Up @@ -1348,6 +1375,21 @@ impl Column {
})
}

pub fn pseudo_column(name: &str, column_type: ColumnType) -> Column {
let field_type = q::Type::NamedType(column_type.to_string());
let name = SqlName::verbatim(name.to_string());
let field = Word::from(name.as_str());
Column {
name,
field,
field_type,
column_type,
fulltext_fields: None,
is_reference: false,
use_prefix_comparison: false,
}
}

fn new_fulltext(def: &FulltextDefinition) -> Result<Column, StoreError> {
SqlName::check_valid_identifier(&def.name, "attribute")?;
let sql_name = SqlName::from(def.name.as_str());
Expand Down Expand Up @@ -1440,6 +1482,9 @@ pub struct Table {
/// `Stats_hour`, not the overall aggregation type `Stats`.
pub object: EntityType,

/// The namespace in which the table lives
nsp: Namespace,

/// The name of the database table for this type ('thing'), snakecased
/// version of `object`
pub name: SqlName,
Expand Down Expand Up @@ -1494,10 +1539,11 @@ impl Table {
.collect::<Result<Vec<Column>, StoreError>>()?;
let qualified_name = SqlName::qualified_name(&catalog.site.namespace, &table_name);
let immutable = defn.is_immutable();

let nsp = catalog.site.namespace.clone();
let table = Table {
object: defn.cheap_clone(),
name: table_name,
nsp,
qualified_name,
// Default `is_account_like` to `false`; the caller should call
// `refresh` after constructing the layout, but that requires a
Expand All @@ -1516,6 +1562,7 @@ impl Table {
pub fn new_like(&self, namespace: &Namespace, name: &SqlName) -> Arc<Table> {
let other = Table {
object: self.object.clone(),
nsp: self.nsp.clone(),
name: name.clone(),
qualified_name: SqlName::qualified_name(namespace, name),
columns: self.columns.clone(),
Expand Down Expand Up @@ -1590,6 +1637,10 @@ impl Table {
&crate::block_range::BLOCK_RANGE_COLUMN_SQL
}
}

pub fn dsl_table(&self) -> dsl::Table<'_> {
dsl::Table::new(self)
}
}

#[derive(Clone)]
Expand Down
Loading
Loading