Skip to content
1 change: 1 addition & 0 deletions ydb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ set(YDB_LIBRARIES
YDB-CPP-SDK::Coordination
YDB-CPP-SDK::Driver
YDB-CPP-SDK::Operation
YDB-CPP-SDK::Query
YDB-CPP-SDK::Result
YDB-CPP-SDK::Scheme
YDB-CPP-SDK::SolomonStats
Expand Down
2 changes: 0 additions & 2 deletions ydb/include/userver/ydb/table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,6 @@ class TableClient final {

void Select1();

NYdb::NTable::TExecDataQuerySettings ToExecQuerySettings(QuerySettings query_settings) const;

template <typename... Args>
PreparedArgsBuilder MakeBuilder(Args&&... args);

Expand Down
6 changes: 4 additions & 2 deletions ydb/include/userver/ydb/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <string>

#include <ydb-cpp-sdk/client/query/query.h>

#include <userver/tracing/span.hpp>
#include <userver/utils/trx_tracker.hpp>

Expand Down Expand Up @@ -69,7 +71,7 @@ class Transaction final {
// For internal use only.
Transaction(
TableClient& table_client,
NYdb::NTable::TTransaction ydb_tx,
NYdb::NQuery::TTransaction ydb_tx,
std::string name,
OperationSettings&& rollback_settings
) noexcept;
Expand All @@ -85,7 +87,7 @@ class Transaction final {
std::string name_;
impl::StatsScope stats_scope_;
tracing::Span span_;
NYdb::NTable::TTransaction ydb_tx_;
NYdb::NQuery::TTransaction ydb_tx_;
OperationSettings rollback_settings_;
bool is_active_{true};
utils::trx_tracker::TransactionLock trx_lock_;
Expand Down
15 changes: 15 additions & 0 deletions ydb/src/ydb/impl/operation_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ std::chrono::milliseconds GetBoundTimeout(std::chrono::milliseconds timeout, eng
return (std::chrono::milliseconds::zero() < timeout) ? std::min(timeout, max_timeout) : max_timeout;
}

NYdb::NQuery::EStatsMode ConvertStatsMode(NYdb::NTable::ECollectQueryStatsMode collect_query_stats) {
// Convert Table Client stats mode to Query Client stats mode
switch (collect_query_stats) {
case NYdb::NTable::ECollectQueryStatsMode::None:
return NYdb::NQuery::EStatsMode::None;
case NYdb::NTable::ECollectQueryStatsMode::Basic:
return NYdb::NQuery::EStatsMode::Basic;
case NYdb::NTable::ECollectQueryStatsMode::Full:
return NYdb::NQuery::EStatsMode::Full;
case NYdb::NTable::ECollectQueryStatsMode::Profile:
return NYdb::NQuery::EStatsMode::Profile;
}
return NYdb::NQuery::EStatsMode::None; // Safe fallback for invalid enum values
}

} // namespace ydb::impl

USERVER_NAMESPACE_END
4 changes: 4 additions & 0 deletions ydb/src/ydb/impl/operation_settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

#include <chrono>

#include <ydb-cpp-sdk/client/query/query.h>
#include <ydb-cpp-sdk/client/retry/retry.h>
#include <ydb-cpp-sdk/client/table/table.h>
#include <ydb-cpp-sdk/client/types/request_settings.h>

#include <userver/engine/deadline.hpp>
Expand All @@ -14,6 +16,8 @@ USERVER_NAMESPACE_BEGIN

namespace ydb::impl {

NYdb::NQuery::EStatsMode ConvertStatsMode(NYdb::NTable::ECollectQueryStatsMode collect_query_stats);

std::chrono::milliseconds GetBoundTimeout(std::chrono::milliseconds timeout, engine::Deadline deadline);

template <typename T>
Expand Down
40 changes: 22 additions & 18 deletions ydb/src/ydb/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ NYdb::NQuery::TTxSettings PrepareQueryTxSettings(const OperationSettings& settin
}
}

NYdb::NQuery::TExecuteQuerySettings ToExecuteQuerySettings(QuerySettings query_settings) {
// Convert QuerySettings to Query Client settings
NYdb::NQuery::TExecuteQuerySettings exec_settings;
if (query_settings.keep_in_query_cache.has_value()) {
// Query Client doesn't have KeepInQueryCache, it caches automatically
}
if (query_settings.collect_query_stats) {
exec_settings.StatsMode(impl::ConvertStatsMode(*query_settings.collect_query_stats));
}
return exec_settings;
}

} // namespace

TableClient::TableClient(
Expand Down Expand Up @@ -311,14 +323,14 @@ Transaction TableClient::Begin(utils::StringLiteral transaction_name, OperationS
Transaction TableClient::Begin(DynamicTransactionName transaction_name, OperationSettings settings) {
const Query query{"", Query::Name{"Begin"}};
impl::RequestContext context{*this, query, std::move(settings)};
auto tx_settings = PrepareTxSettings(context.settings);
auto tx_settings = PrepareQueryTxSettings(context.settings);

auto future = impl::RetryOperation(
auto future = impl::RetryQuery(
context,
[tx_settings = std::move(tx_settings),
settings = context.settings,
deadline = context.deadline](NYdb::NTable::TSession session) {
const auto exec_settings = impl::PrepareRequestSettings<NYdb::NTable::TBeginTxSettings>(settings, deadline);
deadline = context.deadline](NYdb::NQuery::TSession session) {
const auto exec_settings = impl::PrepareRequestSettings<NYdb::NQuery::TBeginTxSettings>(settings, deadline);
return session.BeginTransaction(tx_settings, exec_settings);
}
);
Expand Down Expand Up @@ -357,17 +369,17 @@ ExecuteResponse TableClient::ExecuteDataQuery(
) {
impl::RequestContext context{*this, query, std::move(settings)};

auto future = impl::RetryOperation(
auto future = impl::RetryQuery(
context,
[query,
params = std::move(builder).Build(),
exec_settings = ToExecQuerySettings(query_settings),
exec_settings = ToExecuteQuerySettings(query_settings),
settings = context.settings,
deadline = context.deadline](NYdb::NTable::TSession session) mutable {
deadline = context.deadline](NYdb::NQuery::TSession session) mutable {
impl::ApplyToRequestSettings(exec_settings, settings, deadline);
const auto tx_settings = PrepareTxSettings(settings);
const auto tx = NYdb::NTable::TTxControl::BeginTx(tx_settings).CommitTx();
return session.ExecuteDataQuery(impl::ToString(query.GetStatementView()), tx, params, exec_settings);
const auto tx_settings = PrepareQueryTxSettings(settings);
const auto tx = NYdb::NQuery::TTxControl::BeginTx(tx_settings).CommitTx();
return session.ExecuteQuery(impl::ToString(query.GetStatementView()), tx, params, exec_settings);
}
);

Expand Down Expand Up @@ -428,14 +440,6 @@ void DumpMetric(utils::statistics::Writer& writer, const TableClient& table_clie

PreparedArgsBuilder TableClient::GetBuilder() const { return PreparedArgsBuilder(table_client_->GetParamsBuilder()); }

NYdb::NTable::TExecDataQuerySettings TableClient::ToExecQuerySettings(QuerySettings query_settings) const {
NYdb::NTable::TExecDataQuerySettings exec_settings;
exec_settings.KeepInQueryCache(query_settings.keep_in_query_cache.value_or(keep_in_query_cache_));
if (query_settings.collect_query_stats) {
exec_settings.CollectQueryStats(*query_settings.collect_query_stats);
}
return exec_settings;
}
} // namespace ydb

USERVER_NAMESPACE_END
19 changes: 13 additions & 6 deletions ydb/src/ydb/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace ydb {

Transaction::Transaction(
TableClient& table_client,
NYdb::NTable::TTransaction ydb_tx,
NYdb::NQuery::TTransaction ydb_tx,
std::string name,
OperationSettings&& rollback_settings
) noexcept
Expand Down Expand Up @@ -86,7 +86,7 @@ void Transaction::Commit(OperationSettings settings) {
}

const auto commit_settings =
impl::PrepareRequestSettings<NYdb::NTable::TCommitTxSettings>(context.settings, context.deadline);
impl::PrepareRequestSettings<NYdb::NQuery::TCommitTxSettings>(context.settings, context.deadline);

auto error_guard = ErrorGuard();

Expand All @@ -107,7 +107,7 @@ void Transaction::Rollback() {
impl::RequestContext context{table_client_, kQuery, std::move(settings), impl::IsStreaming{false}, &span_};

const auto rollback_settings =
impl::PrepareRequestSettings<NYdb::NTable::TRollbackTxSettings>(context.settings, context.deadline);
impl::PrepareRequestSettings<NYdb::NQuery::TRollbackTxSettings>(context.settings, context.deadline);

[[maybe_unused]] auto error_guard = ErrorGuard();

Expand Down Expand Up @@ -145,16 +145,23 @@ ExecuteResponse Transaction::Execute(
impl::RequestContext context{table_client_, query, std::move(settings), impl::IsStreaming{false}, &span_};
auto internal_params = std::move(builder).Build();

auto exec_settings = table_client_.ToExecQuerySettings(query_settings);
// Convert QuerySettings to Query Client settings
NYdb::NQuery::TExecuteQuerySettings exec_settings;
if (query_settings.keep_in_query_cache.has_value()) {
// Query Client doesn't have KeepInQueryCache, it caches automatically
}
if (query_settings.collect_query_stats) {
exec_settings.StatsMode(impl::ConvertStatsMode(*query_settings.collect_query_stats));
}
impl::ApplyToRequestSettings(exec_settings, context.settings, context.deadline);

// Must go after PrepareExecuteSettings, because an exception from there
// leaves the transaction active.
auto error_guard = ErrorGuard();

auto execute_fut = ydb_tx_.GetSession().ExecuteDataQuery(
auto execute_fut = ydb_tx_.GetSession().ExecuteQuery(
impl::ToString(query.GetStatementView()),
NYdb::NTable::TTxControl::Tx(ydb_tx_),
NYdb::NQuery::TTxControl::Tx(ydb_tx_),
std::move(internal_params),
exec_settings
);
Expand Down
Loading