Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
13b8ccb
Initial dirty version of push v2
MarcelGeo Aug 8, 2025
216c3bd
tweak _do_upload running outside of class to prevent threads mix
MarcelGeo Aug 8, 2025
57c6bf4
add local changes
MarcelGeo Aug 14, 2025
88f00ea
make tests runnable with checking current version - Please update you…
MarcelGeo Aug 14, 2025
a2cfe62
Cleanup and fix getting diff from dictionary to be more safe
MarcelGeo Aug 14, 2025
054a818
fixes for missing schema
MarcelGeo Aug 15, 2025
7ba0971
cleanup
MarcelGeo Aug 15, 2025
42c34a2
Retry sync job using
MarcelGeo Aug 19, 2025
0128a06
Address comstic comments from @varmar05 and some from @wonder-sk
MarcelGeo Sep 3, 2025
df88376
fix uploading 0 size files
MarcelGeo Sep 3, 2025
28aa757
handle project info directly from v2 projects versions
MarcelGeo Sep 3, 2025
efd7ed6
Add unit tests and tunning of retry mechanism
MarcelGeo Sep 3, 2025
29292da
Update test calling count measurement
MarcelGeo Sep 5, 2025
c6dafca
add test also for v1
MarcelGeo Sep 5, 2025
e54bde8
Drop versions and permissions check
MarcelGeo Sep 9, 2025
57e854b
get rid of project info from editor
MarcelGeo Sep 9, 2025
747ea46
black
MarcelGeo Sep 9, 2025
39cea8b
Merge remote-tracking branch 'origin/master' into push-v2-integration
MarcelGeo Sep 10, 2025
b6cfa3e
temp_dir fix
MarcelGeo Sep 10, 2025
a045095
add max of 100 files
MarcelGeo Sep 11, 2025
50b6224
black
MarcelGeo Sep 11, 2025
0fc2a9e
Find just one file over limit in transaction
MarcelGeo Sep 12, 2025
607e148
black swan
MarcelGeo Sep 12, 2025
77b82e9
Close connection to make runnable test in new python and skip test fo…
MarcelGeo Sep 12, 2025
85387a5
Merge remote-tracking branch 'origin/push-v2-integration' into change…
MarcelGeo Sep 15, 2025
3041e75
Move validation to post_init
MarcelGeo Sep 16, 2025
350aedc
get rid of glob.glob
MarcelGeo Sep 16, 2025
e36ae8d
address comments without progress bar @wonder-sk
MarcelGeo Sep 17, 2025
0119b7d
First version for sync progress bar
MarcelGeo Sep 18, 2025
cf232a6
Merge remote-tracking branch 'origin/push-v2-integration' into change…
MarcelGeo Sep 18, 2025
7dbed14
fix typo and some docstrings for localProjectChanges
MarcelGeo Sep 18, 2025
92d5340
Merge remote-tracking branch 'origin/push-v2-integration' into change…
MarcelGeo Sep 18, 2025
0b585ee
move file size to consts
MarcelGeo Sep 18, 2025
2e38b84
create generator yielding function for sync project
MarcelGeo Sep 18, 2025
3d00631
Introduce generator for getting progress
MarcelGeo Sep 18, 2025
4fc877c
Merge remote-tracking branch 'origin/push-v2-integration' into change…
MarcelGeo Sep 18, 2025
7440f3f
Update docstring for sync method
MarcelGeo Sep 18, 2025
cac700e
Merge pull request #274 from MerginMaps/changes_limits
MarcelGeo Sep 24, 2025
06e815e
Reloading MerginProject instance to get fresh changes
MarcelGeo Sep 25, 2025
9486830
Cosmetic upgrades v1
MarcelGeo Sep 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions mergin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
PUSH_ATTEMPTS,
SYNC_CALLBACK_WAIT,
ClientError,
ErrorCode,
LoginError,
WorkspaceRole,
ProjectRole,
Expand Down Expand Up @@ -807,7 +806,7 @@ def download_project(self, project_path, directory, version=None):
def user_info(self):
server_type = self.server_type()
if server_type == ServerType.OLD:
resp = self.get("/v1/user/" + self.username())
resp = self.get(f"/v1/user/{self.username()}")
else:
resp = self.get("/v1/user/profile")
return json.load(resp)
Expand Down Expand Up @@ -1526,11 +1525,11 @@ def sync_project(self, project_directory, progress_callback=None):
sleep(SYNC_CALLBACK_WAIT)
current_size = job.transferred_size
progress_callback(current_size - last_size, job) # call callback with transferred size increment
last = current_size
last_size = current_size
push_project_finalize(job)
_, has_changes = get_push_changes_batch(self, mp, job.server_resp)
except ClientError as e:
if e.sync_retry and server_conflict_attempts <= PUSH_ATTEMPTS:
if e.is_retryable_sync() and server_conflict_attempts < PUSH_ATTEMPTS - 1:
# retry on conflict, e.g. when server has changes that we do not have yet
mp.log.info(
f"Restarting sync process (conflict on server) - {server_conflict_attempts + 1}/{PUSH_ATTEMPTS}"
Expand Down
18 changes: 8 additions & 10 deletions mergin/client_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from .local_changes import LocalChange, LocalChanges

from .common import UPLOAD_CHUNK_SIZE, ClientError, ErrorCode
from .common import UPLOAD_CHUNK_ATTEMPT_WAIT, UPLOAD_CHUNK_ATTEMPTS, UPLOAD_CHUNK_SIZE, ClientError, ErrorCode
from .merginproject import MerginProject
from .editor import filter_changes
from .utils import get_data_checksum
Expand Down Expand Up @@ -111,8 +111,7 @@ def upload_blocking(self):
checksum_str = get_data_checksum(data)

self.mp.log.debug(f"Uploading {self.file_path} part={self.chunk_index}")
attempts = 2
for attempt in range(attempts):
for attempt in range(UPLOAD_CHUNK_ATTEMPTS):
try:
if self.mc.server_features().get("v2_push_enabled"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since both methods have the same signature, what about having if-else with single upload_chunk method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be, but:

  • v2 api is using chunks cache, v1 not
  • there is also v1/push/cancel in v2 not.

Do we want to have such mix of concepts in one method?

# use v2 API for uploading chunks
Expand All @@ -122,8 +121,8 @@ def upload_blocking(self):
self.upload_chunk(data, checksum_str)
break # exit loop if upload was successful
except ClientError as e:
if attempt < attempts - 1:
time.sleep(5)
if attempt < UPLOAD_CHUNK_ATTEMPTS - 1:
time.sleep(UPLOAD_CHUNK_ATTEMPT_WAIT)
continue
raise

Expand Down Expand Up @@ -249,7 +248,7 @@ def create_upload_job(
)
push_start_resp = json.load(resp)
except ClientError as err:
if err.server_code not in [ErrorCode.AnotherUploadRunning.value, ErrorCode.ProjectVersionExists.value]:
if not err.is_blocking_sync():
mp.log.error("Error starting transaction: " + str(err))
mp.log.info("--- push aborted")
raise
Expand Down Expand Up @@ -415,10 +414,9 @@ def push_project_finalize(job: UploadJob):
project_info = json.load(resp)
job.server_resp = project_info
except ClientError as err:
if err.server_code in [ErrorCode.AnotherUploadRunning.value, ErrorCode.ProjectVersionExists.value]:
err.sync_retry = True
else:
job.mc.upload_chunks_cache.clear() # clear the upload chunks cache, as we are getting fatal from server
if not err.is_retryable_sync():
# clear the upload chunks cache, as we are getting fatal from server
job.mc.upload_chunks_cache.clear()
job.mp.log.error("--- push finish failed! " + str(err))
raise err
elif with_upload_of_files:
Expand Down
45 changes: 39 additions & 6 deletions mergin/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,22 @@
# there is an upper limit for chunk size on server, ideally should be requested from there once implemented
UPLOAD_CHUNK_SIZE = 10 * 1024 * 1024

# number of attempts to upload a chunk
UPLOAD_CHUNK_ATTEMPTS = 2

# seconds to wait between attempts of uploading a chunk
UPLOAD_CHUNK_ATTEMPT_WAIT = 5

# size of the log file part to send (if file is larger only this size from end will be sent)
MAX_LOG_FILE_SIZE_TO_SEND = 5 * 1024 * 1024

# number of attempts to push changes (in case of network issues etc)
PUSH_ATTEMPTS = 12
PUSH_ATTEMPTS = 10

# seconds to wait between attempts
# seconds to wait between attempts to push changes
PUSH_ATTEMPT_WAIT = 5

# seconds to wait between sync callback calls
# seconds to wait between sync callback calls
SYNC_CALLBACK_WAIT = 0.01

# default URL for submitting logs
Expand All @@ -33,7 +39,7 @@ class ErrorCode(Enum):


class ClientError(Exception):
def __init__(self, detail, url=None, server_code=None, server_response=None, http_error=None, http_method=None):
def __init__(self, detail: str, url=None, server_code=None, server_response=None, http_error=None, http_method=None):
self.detail = detail
self.url = url
self.http_error = http_error
Expand All @@ -43,8 +49,6 @@ def __init__(self, detail, url=None, server_code=None, server_response=None, htt
self.server_response = server_response

self.extra = None
# Param to mark error as candidate for retry sync process
self.sync_retry = False

def __str__(self):
string_res = f"Detail: {self.detail}\n"
Expand All @@ -60,6 +64,35 @@ def __str__(self):
string_res += f"{self.extra}\n"
return string_res

def is_rate_limit(self) -> bool:
"""Check if error is rate limit error based on server code"""
return self.http_error == 429

def is_blocking_sync(self) -> bool:
"""
Check if error is blocking sync based on server code.
Blocking sync means, that the sync is blocked by another user in server.
"""
return self.server_code in [
ErrorCode.AnotherUploadRunning.value,
ErrorCode.ProjectVersionExists.value,
]

def is_retryable_sync(self) -> bool:
# Check if error is retryable based on server code
if self.is_blocking_sync() or self.is_rate_limit():
return True

if (
self.http_error
and self.detail
and self.http_error == 400
and ("Another process" in self.detail or "Version mismatch" in self.detail)
):
return True

return False


class LoginError(Exception):
pass
Expand Down
26 changes: 26 additions & 0 deletions mergin/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pytz
import sqlite3
import glob
from unittest.mock import patch, Mock

from .. import InvalidProject
from ..client import (
Expand Down Expand Up @@ -3157,3 +3158,28 @@ def test_client_project_sync(mc):
os.path.join(project_dir_2, "base.gpkg"),
)
mc.sync_project(project_dir_2)


def test_client_project_sync_retry(mc):
test_project = "test_client_project_sync_retry"
project = API_USER + "/" + test_project
project_dir = os.path.join(TMP_DIR, test_project)
cleanup(mc, project, [project_dir])
mc.create_project(test_project)
mc.download_project(project, project_dir)
shutil.copy(os.path.join(TEST_DATA_DIR, "test.txt"), project_dir)
with patch("mergin.client.push_project_finalize", autospec=True) as mock_push_project_finalize:
mock_push_project_finalize.side_effect = ClientError("test error")
with pytest.raises(ClientError, match="test error"):
mc.sync_project(project_dir)

with patch("mergin.client.push_project_finalize") as mock_push_project_finalize, patch(
"mergin.client.PUSH_ATTEMPTS", 2
):
mock_push_project_finalize.side_effect = ClientError(
detail="",
server_code=ErrorCode.AnotherUploadRunning.value,
)
with pytest.raises(ClientError):
mc.sync_project(project_dir)
assert mock_push_project_finalize.call_count == 2
46 changes: 46 additions & 0 deletions mergin/test/test_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from ..common import ClientError, ErrorCode

def test_client_error_is_blocked_sync():
"""Test the is_blocked_sync method of ClientError."""
error = ClientError(detail="", server_code=None)
assert error.is_blocking_sync() is False
error.server_code = ErrorCode.ProjectsLimitHit.value
assert error.is_blocking_sync() is False

error.server_code = ErrorCode.AnotherUploadRunning.value
assert error.is_blocking_sync() is True
error.server_code = ErrorCode.ProjectVersionExists.value
assert error.is_blocking_sync() is True

def test_client_error_is_rate_limit():
"""Test the is_rate_limit method of ClientError."""
error = ClientError(detail="", http_error=None)
assert error.is_rate_limit() is False
error.http_error = 500
assert error.is_rate_limit() is False
error.http_error = 429
assert error.is_rate_limit() is True

def test_client_error_is_retryable_sync():
"""Test the is_retryable_sync method of ClientError."""
error = ClientError(detail="", server_code=None, http_error=None)
assert error.is_retryable_sync() is False

error.server_code = ErrorCode.ProjectsLimitHit.value
assert error.is_retryable_sync() is False
error.server_code = ErrorCode.AnotherUploadRunning.value
assert error.is_retryable_sync() is True

error.server_code = None
error.http_error = 400
error.detail = "Some other error"
assert error.is_retryable_sync() is False
error.detail = "Another process"
assert error.is_retryable_sync() is True
error.detail = "Version mismatch"
assert error.is_retryable_sync() is True

error.http_error = 500
assert error.is_retryable_sync() is False
error.http_error = 429
assert error.is_retryable_sync() is True
Loading