Skip to content

Commit c662ecd

Browse files
Added pg_binary_protocol attribute to derive send and receive functions for PostgresType (#2068)
This PR generally addresses issues such as #2060, #1446, #1364, and pksunkara/pgx_ulid#27, and supersedes PR #887. - [x] Introduces the generation of the `SEND` and `RECEIVE` bindings. - [x] Introduces the generation of the `*_recv` and `*_send` functions in rust, based on the existing `serde_cbor`-based methods [`cbor_encode`](https://github.com/pgcentralfoundation/pgrx/blob/231acce2448ce7df2b9f01cb0962077122a0cf83/pgrx/src/datum/varlena.rs#L380-L397) and [`cbor_decode`](https://github.com/pgcentralfoundation/pgrx/blob/231acce2448ce7df2b9f01cb0962077122a0cf83/pgrx/src/datum/varlena.rs#L399-L409) - [x] Introduces the `#[pg_binary_protocol]` decorator to be used alongside the `PostgresType` derive This PR is not API breaking, as even if people have created functions with naming clashes they would still need to add the attribute on the interested structs. Many thanks to @YohDeadfall and @zommiommy for their help in understanding better pgrx internals and postgres's binary protocol. ## The generated Rust & SQL Given a custom rust type such as: ```rust #[derive(serde::Serialize, serde::Deserialize, pgrx::PostgresType)] #[pg_binary_protocol] pub struct PositiveU32 { pub field: i32, } ``` The resulting generated rust bindings look like: ```rust #[doc(hidden)] #[::pgrx::pgrx_macros::pg_extern(immutable, strict, parallel_safe)] pub fn positiveu32_recv( internal: ::pgrx::datum::Internal, ) -> PositiveU32 { let buf = unsafe { internal.get_mut::<::pgrx::pg_sys::StringInfoData>().unwrap() }; let mut serialized = ::pgrx::StringInfo::new(); serialized.push_bytes(&[0u8; ::pgrx::pg_sys::VARHDRSZ]); serialized.push_bytes(unsafe { core::slice::from_raw_parts( buf.data as *const u8, buf.len as usize ) }); let size = serialized.len(); let varlena = serialized.into_char_ptr(); unsafe{ ::pgrx::set_varsize_4b(varlena as *mut ::pgrx::pg_sys::varlena, size as i32); buf.cursor = buf.len; ::pgrx::datum::cbor_decode(varlena as *mut ::pgrx::pg_sys::varlena) } } #[doc(hidden)] #[::pgrx::pgrx_macros::pg_extern(immutable, strict, parallel_safe)] pub fn positiveu32_send(input: PositiveU32) -> Vec<u8> { use ::pgrx::datum::{FromDatum, IntoDatum}; let Some(datum): Option<::pgrx::pg_sys::Datum> = input.into_datum() else { ::pgrx::error!("Datum of type `{}` is unexpectedly NULL.", stringify!(#name)); }; unsafe { let Some(serialized): Option<Vec<u8>> = FromDatum::from_datum(datum, false) else { ::pgrx::error!("Failed to CBOR-serialize Datum to type `{}`.", stringify!(#name)); }; serialized } } ``` And the associated generated SQL looks like: ```sql /* <begin connected objects> */ /* This file is auto generated by pgrx. The ordering of items is not stable, it is driven by a dependency graph. */ /* </end connected objects> */ /* <begin connected objects> */ -- utils/diesel-pgrx/example_extension/src/lib.rs:16 -- example_extension::PositiveU32 CREATE TYPE PositiveU32; -- utils/diesel-pgrx/example_extension/src/lib.rs:16 -- example_extension::positiveu32_in CREATE FUNCTION "positiveu32_in"( "input" cstring /* core::option::Option<&core::ffi::c_str::CStr> */ ) RETURNS PositiveU32 /* core::option::Option<example_extension::PositiveU32> */ IMMUTABLE PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'positiveu32_in_wrapper'; -- utils/diesel-pgrx/example_extension/src/lib.rs:16 -- example_extension::positiveu32_out CREATE FUNCTION "positiveu32_out"( "input" PositiveU32 /* example_extension::PositiveU32 */ ) RETURNS cstring /* alloc::ffi::c_str::CString */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'positiveu32_out_wrapper'; -- utils/diesel-pgrx/example_extension/src/lib.rs:16 -- example_extension::positiveu32_recv CREATE FUNCTION "positiveu32_recv"( "internal" internal /* pgrx::datum::internal::Internal */ ) RETURNS PositiveU32 /* example_extension::PositiveU32 */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'positiveu32_recv_wrapper'; -- utils/diesel-pgrx/example_extension/src/lib.rs:16 -- example_extension::positiveu32_send CREATE FUNCTION "positiveu32_send"( "input" PositiveU32 /* example_extension::PositiveU32 */ ) RETURNS bytea /* alloc::vec::Vec<u8> */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'positiveu32_send_wrapper'; -- utils/diesel-pgrx/example_extension/src/lib.rs:16 -- example_extension::PositiveU32 CREATE TYPE PositiveU32 ( INTERNALLENGTH = variable, INPUT = positiveu32_in, /* example_extension::positiveu32_in */ OUTPUT = positiveu32_out, /* example_extension::positiveu32_out */ RECEIVE = positiveu32_recv, /* example_extension::positiveu32_recv */ SEND = positiveu32_send, /* example_extension::positiveu32_send */ STORAGE = extended ); /* </end connected objects> */ ``` ## On Diesel traits I use [`diesel`](https://docs.rs/diesel/latest/diesel/) extensively in my projects, and it uses the binary protocol to send data to PostgreSQL. This PR makes it possible to use Postgres's binary protocol with `pgrx` type, so I can now implement diesel's [`ToSql`](https://docs.rs/diesel/latest/diesel/serialize/trait.ToSql.html) and [`FromSql`](https://docs.rs/diesel/latest/diesel/deserialize/trait.FromSql.html) traits as follows: ```rust #[derive( Debug, diesel::FromSqlRow, diesel::AsExpression, serde::Serialize, serde::Deserialize, pgrx::PostgresType )] #[pg_binary_protocol] #[diesel(sql_type = MyCustomTypeDiesel)] pub struct MyCustomType { pub field1: String, pub field2: i32, } #[derive( Debug, Clone, Copy, Default, diesel::query_builder::QueryId, diesel::sql_types::SqlType, )] #[diesel(postgres_type(name = "my_custom_type"))] pub struct MyCustomTypeDiesel; impl diesel::serialize::ToSql<MyCustomTypeDiesel, diesel::pg::Pg> for MyCustomType { fn to_sql<'b>(&'b self, out: &mut diesel::serialize::Output<'b, '_, diesel::pg::Pg>) -> diesel::serialize::Result { use std::io::Write; serde_cbor::to_writer(out, self)?; Ok(diesel::serialize::IsNull::No) } } impl diesel::deserialize::FromSql<MyCustomTypeDiesel, diesel::pg::Pg> for MyCustomType { fn from_sql(raw: diesel::pg::PgValue<'_>) -> diesel::deserialize::Result<Self> { Ok(serde_cbor::from_slice(raw.as_bytes())?) } } ``` *P.S. Sorry for the very small commits, but I could only figure out how to compile pgrx inside a Docker and therefore to test it in there I had to push every single small test edit.* *P.P.S I have also tried to add support for fixed-size types, but there is too much varlena-specific methods at this time and the amount of code it would be needed for that makes it necessarily a separate PR*
1 parent 6ce5ec2 commit c662ecd

File tree

15 files changed

+306
-10
lines changed

15 files changed

+306
-10
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
- **Easy Custom Types**
5454
+ `#[derive(PostgresType)]` to use a Rust struct as a Postgres type
5555
- By default, represented as a CBOR-encoded object in-memory/on-disk, and JSON as human-readable
56+
- Supports `#[pg_binary_protocol]` to generate binary protocol send/recv functions
5657
- Provide custom in-memory/on-disk/human-readable representations
5758
+ `#[derive(PostgresEnum)]` to use a Rust enum as a Postgres enum
5859
+ Composite types supported with the `pgrx::composite_type!("Sample")` macro

articles/forging-sql-from-rust.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ These `sql` files must be generated from data within the Rust code. The SQL gene
9797
- Create 'Shell types', in & out functions, and 'Base types' for each `#[derive(PostgresType)]` marked type.
9898
```rust
9999
#[derive(PostgresType, Serialize, Deserialize, Debug, Eq, PartialEq)]
100+
#[pg_binary_protocol]
100101
pub struct Animals {
101102
names: Vec<String>,
102103
age_lookup: HashMap<i32, String>,
@@ -140,6 +141,7 @@ These `sql` files must be generated from data within the Rust code. The SQL gene
140141
Eq, PartialEq, Ord, Hash, PartialOrd,
141142
PostgresType, Serialize, Deserialize
142143
)]
144+
#[pg_binary_protocol]
143145
pub struct Thing(String);
144146
```
145147
```sql

pgrx-macros/src/lib.rs

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -698,7 +698,7 @@ fn impl_postgres_enum(ast: DeriveInput) -> syn::Result<proc_macro2::TokenStream>
698698
stream.extend(quote! {
699699
impl ::pgrx::datum::FromDatum for #enum_ident {
700700
#[inline]
701-
unsafe fn from_polymorphic_datum(datum: ::pgrx::pg_sys::Datum, is_null: bool, typeoid: ::pgrx::pg_sys::Oid) -> Option<#enum_ident> {
701+
unsafe fn from_polymorphic_datum(datum: ::pgrx::pg_sys::Datum, is_null: bool, _typeoid: ::pgrx::pg_sys::Oid) -> Option<#enum_ident> {
702702
if is_null {
703703
None
704704
} else {
@@ -781,6 +781,7 @@ Optionally accepts the following attributes:
781781
782782
* `inoutfuncs(some_in_fn, some_out_fn)`: Define custom in/out functions for the type.
783783
* `pgvarlena_inoutfuncs(some_in_fn, some_out_fn)`: Define custom in/out functions for the `PgVarlena` of this type.
784+
* `pg_binary_protocol`: Use the binary protocol for this type.
784785
* `pgrx(alignment = "<align>")`: Derive Postgres alignment from Rust type. One of `"on"`, or `"off"`.
785786
* `sql`: Same arguments as [`#[pgrx(sql = ..)]`](macro@pgrx).
786787
*/
@@ -789,6 +790,7 @@ Optionally accepts the following attributes:
789790
attributes(
790791
inoutfuncs,
791792
pgvarlena_inoutfuncs,
793+
pg_binary_protocol,
792794
bikeshed_postgres_type_manually_impl_from_into_datum,
793795
requires,
794796
pgrx
@@ -806,6 +808,9 @@ fn impl_postgres_type(ast: DeriveInput) -> syn::Result<proc_macro2::TokenStream>
806808
let has_lifetimes = generics.lifetimes().next();
807809
let funcname_in = Ident::new(&format!("{name}_in").to_lowercase(), name.span());
808810
let funcname_out = Ident::new(&format!("{name}_out").to_lowercase(), name.span());
811+
let funcname_recv = Ident::new(&format!("{name}_recv").to_lowercase(), name.span());
812+
let funcname_send = Ident::new(&format!("{name}_send").to_lowercase(), name.span());
813+
809814
let mut args = parse_postgres_type_args(&ast.attrs);
810815
let mut stream = proc_macro2::TokenStream::new();
811816

@@ -825,7 +830,9 @@ fn impl_postgres_type(ast: DeriveInput) -> syn::Result<proc_macro2::TokenStream>
825830
}
826831
}
827832

828-
if args.is_empty() {
833+
if !args.contains(&PostgresTypeAttribute::InOutFuncs)
834+
&& !args.contains(&PostgresTypeAttribute::PgVarlenaInOutFuncs)
835+
{
829836
// assume the user wants us to implement the InOutFuncs
830837
args.insert(PostgresTypeAttribute::Default);
831838
}
@@ -937,14 +944,14 @@ fn impl_postgres_type(ast: DeriveInput) -> syn::Result<proc_macro2::TokenStream>
937944
if args.contains(&PostgresTypeAttribute::Default) {
938945
stream.extend(quote! {
939946
#[doc(hidden)]
940-
#[::pgrx::pgrx_macros::pg_extern(immutable,parallel_safe)]
947+
#[::pgrx::pgrx_macros::pg_extern(immutable, parallel_safe)]
941948
pub fn #funcname_in #generics(input: Option<&#lifetime ::core::ffi::CStr>) -> Option<#name #generics> {
942949
use ::pgrx::inoutfuncs::json_from_slice;
943950
input.map(|cstr| json_from_slice(cstr.to_bytes()).ok()).flatten()
944951
}
945952

946953
#[doc(hidden)]
947-
#[::pgrx::pgrx_macros::pg_extern (immutable,parallel_safe)]
954+
#[::pgrx::pgrx_macros::pg_extern (immutable, parallel_safe)]
948955
pub fn #funcname_out #generics(input: #name #generics) -> ::pgrx::ffi::CString {
949956
use ::pgrx::inoutfuncs::json_to_vec;
950957
let mut bytes = json_to_vec(&input).unwrap();
@@ -1000,7 +1007,57 @@ fn impl_postgres_type(ast: DeriveInput) -> syn::Result<proc_macro2::TokenStream>
10001007
});
10011008
}
10021009

1003-
let sql_graph_entity_item = sql_gen::PostgresTypeDerive::from_derive_input(ast)?;
1010+
if args.contains(&PostgresTypeAttribute::PgBinaryProtocol) {
1011+
// At this time, the `PostgresTypeAttribute` does not impact the way we generate
1012+
// the `recv` and `send` functions.
1013+
stream.extend(quote! {
1014+
#[doc(hidden)]
1015+
#[::pgrx::pgrx_macros::pg_extern(immutable, strict, parallel_safe)]
1016+
pub fn #funcname_recv #generics(
1017+
internal: ::pgrx::datum::Internal,
1018+
) -> #name #generics {
1019+
let buf = unsafe { internal.get_mut::<::pgrx::pg_sys::StringInfoData>().unwrap() };
1020+
1021+
let mut serialized = ::pgrx::StringInfo::new();
1022+
1023+
serialized.push_bytes(&[0u8; ::pgrx::pg_sys::VARHDRSZ]); // reserve space for the header
1024+
serialized.push_bytes(unsafe {
1025+
core::slice::from_raw_parts(
1026+
buf.data as *const u8,
1027+
buf.len as usize
1028+
)
1029+
});
1030+
1031+
let size = serialized.len();
1032+
let varlena = serialized.into_char_ptr();
1033+
1034+
unsafe{
1035+
::pgrx::set_varsize_4b(varlena as *mut ::pgrx::pg_sys::varlena, size as i32);
1036+
buf.cursor = buf.len;
1037+
::pgrx::datum::cbor_decode(varlena as *mut ::pgrx::pg_sys::varlena)
1038+
}
1039+
}
1040+
#[doc(hidden)]
1041+
#[::pgrx::pgrx_macros::pg_extern(immutable, strict, parallel_safe)]
1042+
pub fn #funcname_send #generics(input: #name #generics) -> Vec<u8> {
1043+
use ::pgrx::datum::{FromDatum, IntoDatum};
1044+
let Some(datum): Option<::pgrx::pg_sys::Datum> = input.into_datum() else {
1045+
::pgrx::error!("Datum of type `{}` is unexpectedly NULL.", stringify!(#name));
1046+
};
1047+
unsafe {
1048+
let Some(serialized): Option<Vec<u8>> = FromDatum::from_datum(datum, false) else {
1049+
::pgrx::error!("Failed to CBOR-serialize Datum to type `{}`.", stringify!(#name));
1050+
};
1051+
serialized
1052+
}
1053+
}
1054+
});
1055+
}
1056+
1057+
let sql_graph_entity_item = sql_gen::PostgresTypeDerive::from_derive_input(
1058+
ast,
1059+
args.contains(&PostgresTypeAttribute::PgBinaryProtocol),
1060+
)?;
10041061
sql_graph_entity_item.to_tokens(&mut stream);
10051062

10061063
Ok(stream)
@@ -1097,6 +1154,7 @@ fn impl_guc_enum(ast: DeriveInput) -> syn::Result<proc_macro2::TokenStream> {
10971154
#[derive(Debug, Hash, Ord, PartialOrd, Eq, PartialEq)]
10981155
enum PostgresTypeAttribute {
10991156
InOutFuncs,
1157+
PgBinaryProtocol,
11001158
PgVarlenaInOutFuncs,
11011159
Default,
11021160
ManualFromIntoDatum,
@@ -1112,6 +1170,9 @@ fn parse_postgres_type_args(attributes: &[Attribute]) -> HashSet<PostgresTypeAtt
11121170
"inoutfuncs" => {
11131171
categorized_attributes.insert(PostgresTypeAttribute::InOutFuncs);
11141172
}
1173+
"pg_binary_protocol" => {
1174+
categorized_attributes.insert(PostgresTypeAttribute::PgBinaryProtocol);
1175+
}
11151176
"pgvarlena_inoutfuncs" => {
11161177
categorized_attributes.insert(PostgresTypeAttribute::PgVarlenaInOutFuncs);
11171178
}

pgrx-sql-entity-graph/src/lib.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,10 @@ impl ToSql for SqlGraphEntity {
241241
in_fn_module_path,
242242
out_fn,
243243
out_fn_module_path,
244+
receive_fn,
245+
receive_fn_module_path,
246+
send_fn,
247+
send_fn_module_path,
244248
..
245249
}) = &context.graph[neighbor]
246250
else {
@@ -251,7 +255,16 @@ impl ToSql for SqlGraphEntity {
251255
&& item.full_path.ends_with(in_fn);
252256
let is_out_fn = item.full_path.starts_with(out_fn_module_path)
253257
&& item.full_path.ends_with(out_fn);
254-
is_in_fn || is_out_fn
258+
let is_receive_fn =
259+
receive_fn_module_path.as_ref().is_some_and(|receive_fn_module_path| {
260+
item.full_path.starts_with(receive_fn_module_path)
261+
}) && receive_fn
262+
.is_some_and(|receive_fn| item.full_path.ends_with(receive_fn));
263+
let is_send_fn =
264+
send_fn_module_path.as_ref().is_some_and(|send_fn_module_path| {
265+
item.full_path.starts_with(send_fn_module_path)
266+
}) && send_fn.is_some_and(|send_fn| item.full_path.ends_with(send_fn));
267+
is_in_fn || is_out_fn || is_receive_fn || is_send_fn
255268
})
256269
{
257270
Ok(String::default())

pgrx-sql-entity-graph/src/postgres_type/entity.rs

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ pub struct PostgresTypeEntity {
105105
pub in_fn_module_path: String,
106106
pub out_fn: &'static str,
107107
pub out_fn_module_path: String,
108+
pub receive_fn: Option<&'static str>,
109+
pub receive_fn_module_path: Option<String>,
110+
pub send_fn: Option<&'static str>,
111+
pub send_fn_module_path: Option<String>,
108112
pub to_sql_config: ToSqlConfigEntity,
109113
pub alignment: Option<usize>,
110114
}
@@ -152,6 +156,10 @@ impl ToSql for PostgresTypeEntity {
152156
out_fn,
153157
out_fn_module_path,
154158
in_fn,
159+
receive_fn,
160+
receive_fn_module_path,
161+
send_fn,
162+
send_fn_module_path,
155163
alignment,
156164
..
157165
}) = item_node
@@ -217,6 +225,80 @@ impl ToSql for PostgresTypeEntity {
217225
.ok_or_else(|| eyre!("Could not find out_fn graph entity."))?;
218226
let out_fn_sql = out_fn_entity.to_sql(context)?;
219227

228+
let receive_fn_graph_index_and_receive_fn_sql = receive_fn_module_path
229+
.as_ref()
230+
.zip(*receive_fn)
231+
.map(|(receive_fn_module_path, receive_fn)| {
232+
let receive_fn_module_path = if !receive_fn_module_path.is_empty() {
233+
receive_fn_module_path.clone()
234+
} else {
235+
module_path.to_string() // Presume a local
236+
};
237+
let receive_fn_path = format!(
238+
"{receive_fn_module_path}{maybe_colons}{receive_fn}",
239+
maybe_colons = if !receive_fn_module_path.is_empty() { "::" } else { "" }
240+
);
241+
242+
// Find the receive function in the context
243+
let (_, _index) = context
244+
.externs
245+
.iter()
246+
.find(|(k, _v)| k.full_path == receive_fn_path)
247+
.ok_or_else(|| eyre::eyre!("Did not find `receive_fn`: {receive_fn_path}."))?;
248+
249+
let (receive_fn_graph_index, receive_fn_entity) = context
250+
.graph
251+
.neighbors_undirected(self_index)
252+
.find_map(|neighbor| match &context.graph[neighbor] {
253+
SqlGraphEntity::Function(func) if func.full_path == receive_fn_path => {
254+
Some((neighbor, func))
255+
}
256+
_ => None,
257+
})
258+
.ok_or_else(|| eyre!("Could not find receive_fn graph entity."))?;
259+
let receive_fn_sql = receive_fn_entity.to_sql(context)?;
260+
261+
Ok::<_, eyre::Report>((receive_fn_graph_index, receive_fn_sql, receive_fn_path))
262+
})
263+
.transpose()?;
264+
265+
let send_fn_graph_index_and_send_fn_sql = send_fn_module_path
266+
.as_ref()
267+
.zip(*send_fn)
268+
.map(|(send_fn_module_path, send_fn)| {
269+
let send_fn_module_path = if !send_fn_module_path.is_empty() {
270+
send_fn_module_path.clone()
271+
} else {
272+
module_path.to_string() // Presume a local
273+
};
274+
let send_fn_path = format!(
275+
"{send_fn_module_path}{maybe_colons}{send_fn}",
276+
maybe_colons = if !send_fn_module_path.is_empty() { "::" } else { "" }
277+
);
278+
279+
// Find the send function in the context
280+
let (_, _index) = context
281+
.externs
282+
.iter()
283+
.find(|(k, _v)| k.full_path == send_fn_path)
284+
.ok_or_else(|| eyre::eyre!("Did not find `send_fn: {}`.", send_fn_path))?;
285+
286+
let (send_fn_graph_index, send_fn_entity) = context
287+
.graph
288+
.neighbors_undirected(self_index)
289+
.find_map(|neighbor| match &context.graph[neighbor] {
290+
SqlGraphEntity::Function(func) if func.full_path == send_fn_path => {
291+
Some((neighbor, func))
292+
}
293+
_ => None,
294+
})
295+
.ok_or_else(|| eyre!("Could not find send_fn graph entity."))?;
296+
let send_fn_sql = send_fn_entity.to_sql(context)?;
297+
298+
Ok::<_, eyre::Report>((send_fn_graph_index, send_fn_sql, send_fn_path))
299+
})
300+
.transpose()?;
301+
220302
let shell_type = format!(
221303
"\n\
222304
-- {file}:{line}\n\
@@ -244,6 +326,29 @@ impl ToSql for PostgresTypeEntity {
244326
})
245327
.unwrap_or_default();
246328

329+
let (receive_send_attributes, receive_send_sql) = receive_fn_graph_index_and_receive_fn_sql
330+
.zip(send_fn_graph_index_and_send_fn_sql)
331+
.map(|((receive_fn_graph_index, receive_fn_sql, receive_fn_path), (send_fn_graph_index, send_fn_sql, send_fn_path))| {
332+
let receive_fn = receive_fn.unwrap();
333+
let send_fn = send_fn.unwrap();
334+
(
335+
format! {
336+
"\
337+
\tRECEIVE = {schema_prefix_receive_fn}{receive_fn}, /* {receive_fn_path} */\n\
338+
\tSEND = {schema_prefix_send_fn}{send_fn}, /* {send_fn_path} */\n\
339+
",
340+
schema_prefix_receive_fn = context.schema_prefix_for(&receive_fn_graph_index),
341+
schema_prefix_send_fn = context.schema_prefix_for(&send_fn_graph_index),
342+
},
343+
format! {
344+
"\n\
345+
{receive_fn_sql}\n\
346+
{send_fn_sql}\n\
347+
"
348+
}
349+
)
350+
}).unwrap_or_default();
351+
247352
let materialized_type = format! {
248353
"\n\
249354
-- {file}:{line}\n\
@@ -252,14 +357,24 @@ impl ToSql for PostgresTypeEntity {
252357
\tINTERNALLENGTH = variable,\n\
253358
\tINPUT = {schema_prefix_in_fn}{in_fn}, /* {in_fn_path} */\n\
254359
\tOUTPUT = {schema_prefix_out_fn}{out_fn}, /* {out_fn_path} */\n\
360+
{receive_send_attributes}\
255361
\tSTORAGE = extended{alignment}\n\
256362
);\
257363
",
258364
schema = context.schema_prefix_for(&self_index),
259365
schema_prefix_in_fn = context.schema_prefix_for(&in_fn_graph_index),
260-
schema_prefix_out_fn = context.schema_prefix_for(&out_fn_graph_index),
366+
schema_prefix_out_fn = context.schema_prefix_for(&out_fn_graph_index)
261367
};
262368

263-
Ok(shell_type + "\n" + &in_fn_sql + "\n" + &out_fn_sql + "\n" + &materialized_type)
369+
let result = shell_type
370+
+ "\n"
371+
+ &in_fn_sql
372+
+ "\n"
373+
+ &out_fn_sql
374+
+ &receive_send_sql
375+
+ "\n"
376+
+ &materialized_type;
377+
378+
Ok(result)
264379
}
265380
}

0 commit comments

Comments
 (0)