Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion getm/concurrent/buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ def __init__(self, name: Optional[str]=None, size: int=0, create=False):
self._did_create = True
else:
self._shared_memory = SharedMemory(name)
self._view = self._shared_memory.buf
# self._view = self._shared_memory.buf

@property
def _view(self):
return self._shared_memory.buf

@property
def size(self):
Expand Down
34 changes: 30 additions & 4 deletions getm/reader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import io
import sys
import time
import logging
import warnings
import traceback
from math import ceil
from collections import deque
from multiprocessing import Process
Expand All @@ -12,6 +15,9 @@
from getm.concurrent import ConcurrentQueue, ConcurrentPool, SharedCircularBuffer, SharedBufferArray


logger = logging.getLogger(__name__)


class BaseURLReader(io.IOBase):
def readable(self):
return True
Expand Down Expand Up @@ -176,6 +182,13 @@ def compute_buffer_size(concurrent_downloads: int, chunk_size: int) -> int:
return buffer_size

def run(self):
try:
self._run()
except Exception as e:
logger.error(f"reader subprocesses exiting due to '{traceback.format_exc()}'")
sys.exit(1)

def _run(self):
with http_session().get(self.url, stream=True) as resp:
handle = resp.raw
start = stop = 0
Expand All @@ -193,21 +206,32 @@ def run(self):
stop += bytes_read
buf.stop = stop

def read(self, sz: int=-1):
def _check_buf(self) -> SharedCircularBuffer:
if self.exitcode:
raise RuntimeError(f"subprocesses quit with non-zero exitcode {self.exitcode}")
return self._buf

def _read(self, sz: int=-1):
if -1 == sz:
sz = self.max_read
self._buf.start = self._start
self._check_buf().start = self._start
sz = min(sz, self.max_read)
while sz > self._stop - self._start and self._stop < self.size:
self._stop = self._buf.stop
self._stop = self._check_buf().stop
sz = min(sz, self._stop - self._start)
if sz:
res = self._buf[self._start: self._start + sz]
res = self._check_buf()[self._start: self._start + sz]
self._start += len(res)
return res
else:
return memoryview(bytes())

def read(self, sz: int=-1):
try:
return self._read(sz)
except RuntimeError:
return memoryview(bytes())

def readinto(self, buff: bytearray) -> int:
d = self.read(len(buff))
bytes_read = len(d)
Expand All @@ -220,6 +244,8 @@ def close(self):
self.join(timeout=5)
self._buf.close()
super().close()
if self.exitcode:
raise RuntimeError()

def __enter__(self):
self.start()
Expand Down