Skip to content

Commit 1949aa8

Browse files
committed
rewrite plan
1 parent 076ba4c commit 1949aa8

File tree

16 files changed

+277
-230
lines changed

16 files changed

+277
-230
lines changed

pgdog/src/frontend/client/query_engine/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,7 @@ impl QueryEngine {
134134
context.prepared_statements,
135135
&mut self.rewrite_state,
136136
);
137-
match rewrite.execute() {
138-
Ok(ast) => context.ast = Some(ast),
139-
Err(rewrite::Error::EmptyQuery) => (),
140-
Err(err) => return Err(err.into()),
141-
}
137+
context.ast = rewrite.execute()?;
142138
}
143139
}
144140

pgdog/src/frontend/client_request.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,17 @@ impl ClientRequest {
140140
Ok(None)
141141
}
142142

143+
/// Get mutable reference to parameters, if any.
144+
pub fn parameters_mut(&mut self) -> Result<Option<&mut Bind>, Error> {
145+
for message in self.messages.iter_mut() {
146+
if let ProtocolMessage::Bind(bind) = message {
147+
return Ok(Some(bind));
148+
}
149+
}
150+
151+
Ok(None)
152+
}
153+
143154
/// Get all CopyData messages.
144155
pub fn copy_data(&self) -> Result<Vec<CopyData>, Error> {
145156
let mut rows = vec![];

pgdog/src/frontend/router/rewrite/context.rs

Lines changed: 19 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
33
use pg_query::protobuf::{ParseResult, RawStmt};
44

5-
use super::{output::RewriteActionKind, stats::RewriteStats, Error, RewriteAction, StepOutput};
5+
use super::{
6+
output::RewriteActionKind, stats::RewriteStats, Error, RewriteAction, RewritePlan, StepOutput,
7+
};
68
use crate::net::{Bind, Parse, ProtocolMessage, Query};
79

810
#[derive(Debug, Clone)]
@@ -12,30 +14,23 @@ pub struct Context<'a> {
1214
original: &'a ParseResult,
1315
// If an in-place rewrite was done, the statement is saved here.
1416
rewrite: Option<ParseResult>,
15-
/// Original bind message, if any.
16-
bind: Option<&'a Bind>,
17-
/// Bind rewritten.
18-
rewrite_bind: Option<Bind>,
1917
/// Additional messages to add to the request.
2018
result: Vec<RewriteAction>,
2119
/// Extended protocol.
2220
parse: Option<&'a Parse>,
21+
/// Rewrite plan.
22+
plan: RewritePlan,
2323
}
2424

2525
impl<'a> Context<'a> {
2626
/// Create new input.
27-
pub(super) fn new(
28-
original: &'a ParseResult,
29-
bind: Option<&'a Bind>,
30-
parse: Option<&'a Parse>,
31-
) -> Self {
27+
pub(super) fn new(original: &'a ParseResult, parse: Option<&'a Parse>) -> Self {
3228
Self {
3329
original,
34-
bind,
3530
rewrite: None,
36-
rewrite_bind: None,
3731
result: vec![],
3832
parse,
33+
plan: RewritePlan::default(),
3934
}
4035
}
4136

@@ -46,32 +41,12 @@ impl<'a> Context<'a> {
4641

4742
/// We are rewriting an extended protocol request.
4843
pub fn extended(&self) -> bool {
49-
self.parse().is_some() || self.bind().is_some()
44+
self.parse().is_some()
5045
}
5146

52-
/// Get the Bind message, if set.
53-
pub fn bind(&'a self) -> Option<&'a Bind> {
54-
if let Some(ref rewrite_bind) = self.rewrite_bind {
55-
Some(rewrite_bind)
56-
} else {
57-
self.bind
58-
}
59-
}
60-
61-
/// Take the Bind message for modification.
62-
/// Don't forget to return it.
63-
#[must_use]
64-
pub fn bind_take(&mut self) -> Option<Bind> {
65-
if self.rewrite_bind.is_none() {
66-
self.rewrite_bind = self.bind.cloned();
67-
}
68-
69-
self.rewrite_bind.take()
70-
}
71-
72-
/// Put the bind message back.
73-
pub fn bind_put(&mut self, bind: Option<Bind>) {
74-
self.rewrite_bind = bind;
47+
/// Get reference to rewrite plan for modification.
48+
pub fn plan(&mut self) -> &mut RewritePlan {
49+
&mut self.plan
7550
}
7651

7752
/// Get the original (or modified) statement.
@@ -121,31 +96,19 @@ impl<'a> Context<'a> {
12196
Ok(StepOutput::NoOp)
12297
} else {
12398
let mut stats = RewriteStats::default();
124-
let bind = self.rewrite_bind.take();
12599
let ast = self.rewrite.take().ok_or(Error::NoRewrite)?;
126100
let stmt = ast.deparse()?;
127-
let extended = self.extended();
128101
let mut parse = self.parse().cloned();
129102

130103
let mut actions = self.result;
131104

132-
if extended {
133-
if let Some(mut parse) = parse.take() {
134-
parse.set_query(&stmt);
135-
actions.push(RewriteAction {
136-
message: parse.into(),
137-
action: RewriteActionKind::Replace,
138-
});
139-
stats.parse += 1;
140-
}
141-
142-
if let Some(bind) = bind {
143-
actions.push(RewriteAction {
144-
message: bind.into(),
145-
action: RewriteActionKind::Replace,
146-
});
147-
stats.bind += 1;
148-
}
105+
if let Some(mut parse) = parse.take() {
106+
parse.set_query(&stmt);
107+
actions.push(RewriteAction {
108+
message: parse.into(),
109+
action: RewriteActionKind::Replace,
110+
});
111+
stats.parse += 1;
149112
} else {
150113
actions.push(RewriteAction {
151114
message: Query::new(stmt.clone()).into(),
@@ -159,6 +122,7 @@ impl<'a> Context<'a> {
159122
ast,
160123
actions,
161124
stats,
125+
plan: self.plan.freeze(),
162126
})
163127
}
164128
}

pgdog/src/frontend/router/rewrite/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,7 @@ pub enum Error {
3131

3232
#[error("parser: {0}")]
3333
Parser(#[from] crate::frontend::router::parser::Error),
34+
35+
#[error("no active rewrite plan set")]
36+
NoActiveRewritePlan,
3437
}

pgdog/src/frontend/router/rewrite/insert_split/mod.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ use pg_query::{
33
Node, NodeEnum,
44
};
55

6-
use crate::net::Bind;
7-
86
use super::*;
97

108
#[derive(Default)]
@@ -39,20 +37,19 @@ impl RewriteModule for InsertSplitRewrite {
3937
let mut new_insert = proto_insert.clone();
4038
let mut new_select = proto_select.clone();
4139
let mut new_values = values.clone();
42-
let mut new_bind = Bind::default();
4340

4441
// Rewrite the parameter references
4542
// and create new Bind message for each INSERT statement.
4643
if let Some(NodeEnum::List(list)) = new_values.node.as_mut() {
4744
for value in list.items.iter_mut() {
48-
if let Some(NodeEnum::ParamRef(param)) = value.node.as_mut() {
49-
let parameter = input
50-
.bind()
51-
.and_then(|bind| bind.parameter(param.number as usize - 1).ok())
52-
.flatten();
53-
if let Some(parameter) = parameter {
54-
param.number = new_bind.add_existing(parameter)?;
55-
}
45+
if let Some(NodeEnum::ParamRef(_)) = value.node.as_mut() {
46+
// let parameter = input
47+
// .bind()
48+
// .and_then(|bind| bind.parameter(param.number as usize - 1).ok())
49+
// .flatten();
50+
// if let Some(parameter) = parameter {
51+
// param.number = new_bind.add_existing(parameter)?;
52+
// }
5653
}
5754
}
5855
}
@@ -69,7 +66,7 @@ impl RewriteModule for InsertSplitRewrite {
6966
..Default::default()
7067
}],
7168
};
72-
inserts.push((result, new_bind));
69+
inserts.push(result);
7370
}
7471
}
7572
}

pgdog/src/frontend/router/rewrite/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod error;
1111
pub mod insert_split;
1212
pub mod interface;
1313
pub mod output;
14+
pub mod plan;
1415
pub mod prepared;
1516
pub mod request;
1617
pub mod state;
@@ -21,6 +22,7 @@ pub use context::Context;
2122
pub use error::Error;
2223
pub use interface::RewriteModule;
2324
pub use output::{RewriteAction, StepOutput};
25+
pub use plan::{ImmutableRewritePlan, RewritePlan, UniqueIdPlan};
2426
pub use request::RewriteRequest;
2527
pub use state::RewriteState;
2628

pgdog/src/frontend/router/rewrite/output.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use pg_query::protobuf::ParseResult;
22

33
use super::stats::RewriteStats;
4+
use super::ImmutableRewritePlan;
45
use crate::{frontend::ClientRequest, net::ProtocolMessage};
56

67
use std::mem::discriminant;
@@ -53,6 +54,7 @@ pub enum StepOutput {
5354
ast: ParseResult,
5455
stmt: String,
5556
stats: RewriteStats,
57+
plan: ImmutableRewritePlan,
5658
},
5759
}
5860

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use std::{ops::Deref, sync::Arc};
2+
3+
use super::Error;
4+
use crate::{
5+
net::{Bind, Datum},
6+
unique_id::UniqueId,
7+
};
8+
9+
#[derive(Debug, Clone, Default)]
10+
pub struct UniqueIdPlan {
11+
/// Parameter number.
12+
pub(super) param_ref: i32,
13+
}
14+
15+
#[derive(Debug, Clone, Default)]
16+
pub struct RewritePlan {
17+
/// How many unique IDs to add to the Bind message.
18+
pub(super) unique_ids: Vec<UniqueIdPlan>,
19+
}
20+
21+
#[derive(Debug, Clone, Default)]
22+
pub struct ImmutableRewritePlan {
23+
/// Compiled rewrite plan, that cannot be modified further.
24+
pub(super) plan: Arc<RewritePlan>,
25+
}
26+
27+
impl Deref for ImmutableRewritePlan {
28+
type Target = RewritePlan;
29+
30+
fn deref(&self) -> &Self::Target {
31+
&self.plan
32+
}
33+
}
34+
35+
impl RewritePlan {
36+
/// Apply rewrite plan to Bind message.
37+
///
38+
/// N.B. this isn't idempotent, run this only once.
39+
///
40+
pub fn apply_bind(&self, bind: &mut Bind) -> Result<(), Error> {
41+
for unique_id in &self.unique_ids {
42+
let id = UniqueId::generator()?.next_id();
43+
let counter = bind.add_parameter(Datum::Bigint(id))?;
44+
// Params should be added to plan in order.
45+
// This validates it.
46+
if counter != unique_id.param_ref {
47+
return Err(Error::ParameterCountMismatch);
48+
}
49+
}
50+
51+
Ok(())
52+
}
53+
54+
/// Freeze rewrite plan, without any more modifications allowed.
55+
pub fn freeze(self) -> ImmutableRewritePlan {
56+
ImmutableRewritePlan {
57+
plan: Arc::new(self),
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)