Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions lib/myxql/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ defmodule MyXQL.Client do
end
end

def com_stmt_close_prepare(client, statement, close_statement_id) do
with :ok <-
send_com(client, [
{:com_stmt_close, close_statement_id},
{:com_stmt_prepare, statement}
]) do
recv_packets(client, &decode_com_stmt_prepare_response/3, :initial)
end
end

def com_stmt_execute(client, statement_id, params, cursor_type) do
with :ok <- send_com(client, {:com_stmt_execute, statement_id, params, cursor_type}) do
recv_packets(client, &decode_com_stmt_execute_response/3, :initial)
Expand Down Expand Up @@ -142,6 +152,16 @@ defmodule MyXQL.Client do
:ok
end

def send_com(client, coms) when is_list(coms) do
payload =
Enum.map(
coms,
&encode_packet(encode_com(&1), 0, @default_max_packet_size)
)

send_data(client, payload)
end

def send_com(client, com) do
payload = encode_com(com)
send_packet(client, payload, 0)
Expand Down
50 changes: 24 additions & 26 deletions lib/myxql/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule MyXQL.Connection do
prepare: :named,
queries: nil,
transaction_status: :idle,
last_ref: nil
last_query: nil
]

@impl true
Expand Down Expand Up @@ -75,11 +75,11 @@ defmodule MyXQL.Connection do
query = rename_query(state, query)

if cached_query = queries_get(state, query) do
{:ok, cached_query, %{state | last_ref: cached_query.ref}}
{:ok, cached_query, %{state | last_query: cached_query}}
else
case prepare(query, state) do
{:ok, query, state} ->
{:ok, query, %{state | last_ref: query.ref}}
{:ok, query, state}

{:error, %MyXQL.Error{mysql: %{name: :ER_UNSUPPORTED_PS}}, state} = error ->
if Keyword.get(opts, :query_type) == :binary_then_text do
Expand All @@ -104,9 +104,8 @@ defmodule MyXQL.Connection do
query.statement_id,
params,
:cursor_type_no_cursor
),
{:ok, query, result, state} <- result(result, query, state) do
{:ok, query, result, maybe_close(query, state)}
) do
result(result, query, state)
end
end

Expand Down Expand Up @@ -412,8 +411,6 @@ defmodule MyXQL.Connection do

defp queries_new(), do: :ets.new(__MODULE__, [:set, :public])

defp queries_put(_state, %Query{name: ""}), do: :ok

defp queries_put(state, %Query{cache: :reference} = query) do
try do
:ets.insert(state.queries, {cache_key(query), query.statement_id})
Expand All @@ -436,8 +433,6 @@ defmodule MyXQL.Connection do
end
end

defp queries_delete(_state, %Query{name: ""}), do: :ok

defp queries_delete(state, %Query{} = query) do
try do
:ets.delete(state.queries, cache_key(query))
Expand All @@ -448,8 +443,6 @@ defmodule MyXQL.Connection do
end
end

defp queries_get(_state, %Query{name: ""}), do: nil

defp queries_get(state, %{cache: :reference} = query) do
try do
statement_id = :ets.lookup_element(state.queries, cache_key(query), 2)
Expand All @@ -470,20 +463,33 @@ defmodule MyXQL.Connection do
defp cache_key(%MyXQL.Query{cache: :reference, ref: ref}), do: ref
defp cache_key(%MyXQL.Query{cache: :statement, statement: statement}), do: statement

defp prepare(%Query{ref: ref, statement: statement} = query, state) when is_reference(ref) do
case Client.com_stmt_prepare(state.client, statement) do
defp prepare(%Query{ref: ref} = query, state) when is_reference(ref) do
case prepare_maybe_close(query, state) do
{:ok, com_stmt_prepare_ok(statement_id: statement_id, num_params: num_params)} ->
query = %{query | num_params: num_params, statement_id: statement_id}
queries_put(state, query)
{:ok, query, state}
{:ok, query, %{state | last_query: query}}

result ->
result(result, query, state)
end
end

defp maybe_reprepare(%{ref: ref} = query, %{last_ref: ref} = state) do
{:ok, query, state}
def prepare_maybe_close(
%{ref: newref} = query,
%{prepare: :unnamed, last_query: %{ref: oldref} = last_query} = state
)
when oldref != newref do
queries_delete(state, last_query)
Client.com_stmt_close_prepare(state.client, query.statement, last_query.statement_id)
end

def prepare_maybe_close(query, state) do
Client.com_stmt_prepare(state.client, query.statement)
end

defp maybe_reprepare(%{ref: ref}, %{last_query: %{ref: ref}} = state) do
{:ok, state.last_query, state}
end

defp maybe_reprepare(query, state) do
Expand All @@ -500,17 +506,9 @@ defmodule MyXQL.Connection do
end
end

# Close unnamed queries after executing them
defp maybe_close(%Query{name: ""} = query, state), do: close(query, state)
defp maybe_close(_query, state), do: state

defp close(%{ref: ref} = query, %{last_ref: ref} = state) do
close(query, %{state | last_ref: nil})
end

defp close(query, state) do
:ok = Client.com_stmt_close(state.client, query.statement_id)
queries_delete(state, query)
state
%{state | last_query: nil}
end
end
8 changes: 7 additions & 1 deletion test/myxql/sync_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ defmodule MyXQL.SyncTest do
assert prepared_stmt_count() == 0

MyXQL.query!(conn, "SELECT 42", [], cache_statement: "42")
assert prepared_stmt_count() == 0
assert prepared_stmt_count() == 1

MyXQL.query!(conn, "SELECT 1337", [], cache_statement: "69")
assert prepared_stmt_count() == 1

MyXQL.query!(conn, "SELECT 42", [], cache_statement: "42")
assert prepared_stmt_count() == 1
end

test "do not leak statements with streams" do
Expand Down
6 changes: 5 additions & 1 deletion test/myxql_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ defmodule MyXQLTest do
assert query == query2
{:ok, query3, _} = MyXQL.execute(pid, query, [])
assert query2.ref == query3.ref
assert query2.statement_id != query3.statement_id
assert query2.statement_id == query3.statement_id

{:ok, query4} = MyXQL.prepare(pid, "2", "SELECT 2")
assert query3.ref != query4.ref
assert query3.statement_id != query4.statement_id
end

test ":force_named" do
Expand Down