Skip to content

Commit 6aa4d23

Browse files
authored
Merge pull request #181 from poissoncorp/RDBC-700
RDBC-700 Synchronize v5.2 Python to v5.2 Java
2 parents 48908be + 78dc9c4 commit 6aa4d23

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+601
-218
lines changed

ravendb/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,9 @@
177177
from ravendb.documents.session.document_info import DocumentInfo
178178
from ravendb.documents.session.document_session import DocumentSession
179179
from ravendb.documents.session.entity_to_json import EntityToJson
180-
from ravendb.documents.session.in_memory_document_session_operations import InMemoryDocumentSessionOperations
180+
from ravendb.documents.session.document_session_operations.in_memory_document_session_operations import (
181+
InMemoryDocumentSessionOperations,
182+
)
181183
from ravendb.documents.session.loaders.include import IncludeBuilder, IncludeBuilderBase, QueryIncludeBuilder
182184
from ravendb.documents.session.loaders.loaders import (
183185
LoaderWithInclude,

ravendb/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class Metadata:
1616
LAST_MODIFIED = "@last-modified"
1717
CHANGE_VECTOR = "@change-vector"
1818
EXPIRES = "@expires"
19+
REFRESH = "@refresh"
1920
ALL_DOCUMENTS_COLLECTION = "@all_docs"
2021
EMPTY_COLLECTION = "@empty"
2122
NESTED_OBJECT_TYPES = "@nested-object-types"

ravendb/documents/bulk_insert_operation.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,25 @@ def error_on_request_start(self, exception: Exception):
6262
self.output_stream_mock.set_exception(exception)
6363

6464
class _BulkInsertCommand(RavenCommand[requests.Response]):
65-
def __init__(self, key: int, buffer_exposer: BulkInsertOperation._BufferExposer, node_tag: str):
65+
def __init__(
66+
self,
67+
key: int,
68+
buffer_exposer: BulkInsertOperation._BufferExposer,
69+
node_tag: str,
70+
skip_overwrite_if_unchanged: bool,
71+
):
6672
super().__init__(requests.Response)
6773
self._buffer_exposer = buffer_exposer
6874
self._key = key
6975
self._selected_node_tag = node_tag
7076
self.use_compression = False
77+
self._skip_overwrite_if_unchanged = skip_overwrite_if_unchanged
7178

7279
def create_request(self, node: ServerNode) -> requests.Request:
7380
return requests.Request(
7481
"POST",
75-
f"{node.url}/databases/{node.database}/bulk_insert?id={self._key}",
82+
f"{node.url}/databases/{node.database}/bulk_insert?id={self._key}"
83+
f"&skipOverwriteIfUnchanged={'true' if self._skip_overwrite_if_unchanged else 'false'}",
7684
data=self._buffer_exposer.send_data(),
7785
)
7886

@@ -88,7 +96,7 @@ def send(self, session: requests.Session, request: requests.Request) -> requests
8896
except Exception as e:
8997
self._buffer_exposer.error_on_request_start(e)
9098

91-
def __init__(self, database: str = None, store: "DocumentStore" = None):
99+
def __init__(self, database: str = None, store: "DocumentStore" = None, options: BulkInsertOptions = None):
92100
self.use_compression = False
93101

94102
self._ongoing_bulk_insert_execute_task: Optional[Future] = None
@@ -103,6 +111,9 @@ def __init__(self, database: str = None, store: "DocumentStore" = None):
103111
self._conventions = store.conventions
104112
if not database or database.isspace():
105113
self._throw_no_database()
114+
115+
self._use_compression = options.use_compression if options else False
116+
self._options = options or BulkInsertOptions()
106117
self._request_executor = store.get_request_executor(database)
107118

108119
self._enqueue_current_buffer_async = Future()
@@ -232,6 +243,7 @@ def store_as(
232243

233244
self._write_document(entity, metadata)
234245
self._write_string_no_escape("}")
246+
# todo: self._flush_if_needed() - causes error - https://issues.hibernatingrhinos.com/issue/RDBC-701
235247
except Exception as e:
236248
self._handle_errors(key, e)
237249
finally:
@@ -336,7 +348,7 @@ def _get_exception_from_operation(self) -> Optional[BulkInsertAbortedException]:
336348
def _start_executing_bulk_insert_command(self) -> None:
337349
try:
338350
bulk_command = BulkInsertOperation._BulkInsertCommand(
339-
self._operation_id, self._buffer_exposer, self._node_tag
351+
self._operation_id, self._buffer_exposer, self._node_tag, self._options.skip_overwrite_if_unchanged
340352
)
341353
bulk_command.use_compression = self.use_compression
342354

@@ -441,3 +453,9 @@ def attachments_for(self, key: str) -> BulkInsertOperation.AttachmentsBulkInsert
441453
raise ValueError("Document id cannot be None or empty.")
442454

443455
return BulkInsertOperation.AttachmentsBulkInsert(self, key)
456+
457+
458+
class BulkInsertOptions:
459+
def __init__(self, use_compression: bool = None, skip_overwrite_if_unchanged: bool = None):
460+
self.use_compression = use_compression
461+
self.skip_overwrite_if_unchanged = skip_overwrite_if_unchanged

ravendb/documents/commands/batches.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
if TYPE_CHECKING:
2424
from ravendb.documents.conventions import DocumentConventions
2525
from ravendb.documents.operations.patch import PatchRequest
26-
from ravendb.documents.session.in_memory_document_session_operations import InMemoryDocumentSessionOperations
26+
from ravendb.documents.session.document_session_operations.in_memory_document_session_operations import (
27+
InMemoryDocumentSessionOperations,
28+
)
2729

2830

2931
class CommandType(Enum):
@@ -270,11 +272,15 @@ def serialize(self, conventions: DocumentConventions) -> dict:
270272

271273

272274
class DeleteCommandData(CommandData):
273-
def __init__(self, key: str, change_vector: str):
274-
super(DeleteCommandData, self).__init__(key=key, command_type=CommandType.DELETE)
275+
def __init__(self, key: str, change_vector: str, original_change_vector: str = None):
276+
super(DeleteCommandData, self).__init__(key=key, command_type=CommandType.DELETE, change_vector=change_vector)
277+
self.original_change_vector = original_change_vector
275278

276279
def serialize(self, conventions: DocumentConventions) -> dict:
277-
return {"Id": self.key, "ChangeVector": self.change_vector, "Type": CommandType.DELETE}
280+
data = {"Id": self.key, "ChangeVector": self.change_vector, "Type": CommandType.DELETE}
281+
if self.original_change_vector is not None:
282+
data.update({"OriginalChangeVector": self.original_change_vector})
283+
return data
278284

279285

280286
class PutCommandDataBase(CommandData):

ravendb/documents/commands/crud.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def create_request(self, node: ServerNode) -> requests.Request:
9090
url = f"{node.url}/databases/{node.database}/docs?id={Utils.escape(self.__key, True, False)}"
9191
request = requests.Request("HEAD", url)
9292
if self.__change_vector is not None:
93-
request.headers["If-None-Match"] = self.__change_vector
93+
request.headers[constants.Headers.IF_NONE_MATCH] = self.__change_vector
9494
return request
9595

9696
def process_response(self, cache: HttpCache, response: requests.Response, url) -> ResponseDisposeHandling:
@@ -387,7 +387,7 @@ def create_request(self, node: ServerNode) -> requests.Request:
387387
f"{node.url}/databases/{node.database}"
388388
f"/attachments?id={Utils.quote_key(self.__document_id)}"
389389
f"&name={Utils.quote_key(self.__name)}",
390-
{"If-None-Match": self.__change_vector} if self.__change_vector else None,
390+
{constants.Headers.IF_NONE_MATCH: self.__change_vector} if self.__change_vector else None,
391391
)
392392

393393
def process_response(self, cache: HttpCache, response: requests.Response, url) -> http.ResponseDisposeHandling:
@@ -435,7 +435,7 @@ def __init__(self, key: str, change_vector: str):
435435
def create_request(self, node: ServerNode) -> requests.Request:
436436
url = f"{node.url}/databases/{node.database}/docs?id={Utils.quote_key(self.__key)}"
437437
request = requests.Request("GET", url)
438-
request.headers["If-None-Match"] = f'"{self.__change_vector}"'
438+
request.headers[constants.Headers.IF_NONE_MATCH] = f'"{self.__change_vector}"'
439439

440440
return request
441441

ravendb/documents/commands/multi_get.py

Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -115,36 +115,6 @@ def __init__(self, request_executor: RequestExecutor, commands: List[GetRequest]
115115
def create_request(self, node: ServerNode) -> Optional[requests.Request]:
116116
self.__base_url = f"{node.url}/databases/{node.database}"
117117
url = self.__base_url + "/multi_get"
118-
# todo: aggressive caching
119-
# if self.__maybe_read_all_from_cache(self.__request_executor.aggressive_caching):
120-
# self.aggressively_cached = True
121-
# return None
122-
#
123-
# aggressive_cache_options: AggressiveCacheOptions = self.__request_executor.aggressive_caching
124-
# if aggressive_cache_options and aggressive_cache_options.mode == AggressiveCacheMode.TRACK_CHANGES:
125-
# self.result = []
126-
# for command in self.__commands:
127-
# if not command.can_cache_aggressively:
128-
# break
129-
# cache_key = self.__get_cache_key(command)[0]
130-
# cached_item, _, cached_ref = self.__http_cache.get(cache_key, "", "")
131-
# cached_item: ReleaseCacheItem
132-
# if (
133-
# cached_ref is None
134-
# or cached_item.age > aggressive_cache_options.duration
135-
# or cached_item.might_have_been_modified
136-
# ):
137-
# break
138-
# get_response = GetResponse()
139-
# get_response.result = cached_ref
140-
# get_response.status_code = http.HTTPStatus.NOT_MODIFIED
141-
# self.result.append(get_response)
142-
#
143-
# if len(self.result) == len(self.__commands):
144-
# return None
145-
#
146-
# self.result = None
147-
148118
request = requests.Request("POST", url)
149119

150120
request.data = {
@@ -211,7 +181,6 @@ def __get_cache_key(self, command: GetRequest) -> (str, str):
211181
req_url = self.__base_url + command.url_and_query
212182
return command.method + "-" + req_url if command.method else req_url, req_url
213183

214-
# todo: make sure json parses correctly down there
215184
def set_response_raw(self, response: requests.Response, stream: bytes) -> None:
216185
try:
217186
try:
@@ -224,7 +193,7 @@ def set_response_raw(self, response: requests.Response, stream: bytes) -> None:
224193

225194
for get_response in self.read_responses(response_temp):
226195
command = self.__commands[i]
227-
self.__maybe_set_cache(get_response, command)
196+
self.__maybe_set_cache(get_response, command, i)
228197

229198
if self.__cached is not None and get_response.status_code == http.HTTPStatus.NOT_MODIFIED:
230199
cloned_response = GetResponse()
@@ -241,8 +210,11 @@ def set_response_raw(self, response: requests.Response, stream: bytes) -> None:
241210
except Exception as e:
242211
self._throw_invalid_response(e)
243212

244-
def __maybe_set_cache(self, get_response: GetResponse, command: GetRequest):
213+
def __maybe_set_cache(self, get_response: GetResponse, command: GetRequest, cached_index: int):
245214
if get_response.status_code == http.HTTPStatus.NOT_MODIFIED:
215+
# if not modified - update age
216+
if self.__cached is not None:
217+
self.__cached.values[cached_index][0].not_modified()
246218
return
247219

248220
cache_key = self.__get_cache_key(command)[0]
@@ -281,7 +253,19 @@ def is_read_request(self) -> bool:
281253
return False
282254

283255
def close_cache(self):
256+
# If _cached is not null - it means that the client approached with this multitask request to node
257+
# and the request failed and now client tries to send it to another node.
258+
284259
if self.__cached is not None:
285260
self.__cached.close()
286261

287262
self.__cached = None
263+
# The client sends the commands.
264+
# Some of which could be saved in cache with a response
265+
# that includes the change vector that received from the old fallen node.
266+
# The client can't use those responses because their URLs are different
267+
# (include the IP and port of the old node), because of that the client
268+
# needs to get those docs again from the new node.
269+
270+
for command in self.__commands:
271+
command.headers.remove(constants.Headers.IF_NONE_MATCH)

ravendb/documents/commands/subscriptions.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@ def __init__(
6363
certificate: Optional[str] = None,
6464
urls: Optional[List[str]] = None,
6565
node_tag: Optional[str] = None,
66+
server_id: Optional[str] = None,
6667
):
6768
self.port = port
6869
self.url = url
6970
self.certificate = certificate
7071
self.urls = urls
7172
self.node_tag = node_tag
73+
self.server_id = server_id
7274

7375
@classmethod
7476
def from_json(cls, json_dict: Dict) -> TcpConnectionInfo:
@@ -78,6 +80,7 @@ def from_json(cls, json_dict: Dict) -> TcpConnectionInfo:
7880
json_dict.get("Certificate", None),
7981
json_dict.get("Urls", None),
8082
json_dict.get("NodeTag", None),
83+
json_dict.get("ServerId", None),
8184
)
8285

8386

ravendb/documents/conventions.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def __init__(self):
4040
self.throw_if_query_page_size_is_not_set = False
4141
self._send_application_identifier = True
4242
self._save_enums_as_integers: Optional[bool] = None
43+
self._disable_atomic_document_writes_in_cluster_wide_transaction: Optional[bool] = None
4344

4445
# Configuration
4546
self.json_default_method = DocumentConventions.json_default
@@ -198,6 +199,15 @@ def read_balance_behavior(self, value: ReadBalanceBehavior):
198199
def send_application_identifier(self) -> bool:
199200
return self._send_application_identifier
200201

202+
@property
203+
def disable_atomic_document_writes_in_cluster_wide_transaction(self) -> bool:
204+
return self._disable_atomic_document_writes_in_cluster_wide_transaction
205+
206+
@disable_atomic_document_writes_in_cluster_wide_transaction.setter
207+
def disable_atomic_document_writes_in_cluster_wide_transaction(self, value: bool):
208+
self.__assert_not_frozen()
209+
self._disable_atomic_document_writes_in_cluster_wide_transaction = value
210+
201211
@staticmethod
202212
def json_default(o):
203213
if o is None:

0 commit comments

Comments
 (0)