Source code for bmtk.utils.brain_observatory.rma_engine
import pandas as pd
import time
import requests
from .utils import infer_column_types
try:
from tqdm import tqdm
except ImportError:
from .utils import FakeTqdm as tqdm
DEFAULT_TIMEOUT = 20 * 60 # seconds
DEFAULT_CHUNKSIZE = 1024 * 10 # bytes
[docs]
class HttpEngine:
def __init__(
self,
scheme: str,
host: str,
timeout: float = DEFAULT_TIMEOUT,
chunksize: int = DEFAULT_CHUNKSIZE,
**kwargs
):
self.scheme = scheme
self.host = host
self.timeout = timeout
self.chunksize = chunksize
[docs]
def stream(self, route):
""" Makes an http request and returns an iterator over the response.
Parameters
----------
route :
the http route (under this object's host) to request against.
"""
url = self._build_url(route)
start_time = time.perf_counter()
response = requests.get(url, stream=True)
response_b = None
if "Content-length" in response.headers:
response_b = float(response.headers["Content-length"])
size_message = f"{response_b / 1024 ** 2:3.3f}MiB" if response_b is not None else "potentially large"
# logging.warning(f"downloading a {size_message} file from {url}")
progress = tqdm(unit="B", total=response_b, unit_scale=True, desc="Downloading")
for chunk in response.iter_content(self.chunksize):
if chunk: # filter out keep-alive new chunks
progress.update(len(chunk))
yield chunk
elapsed = time.perf_counter() - start_time
if elapsed > self.timeout:
raise requests.Timeout(f"Download took {elapsed} seconds, but timeout was set to {self.timeout}")
def _build_url(self, route):
return f"{self.scheme}://{self.host}/{route}"
[docs]
class RmaEngine(HttpEngine):
def __init__(
self,
scheme,
host,
rma_prefix: str = "api/v2/data",
rma_format: str = "json",
page_size: int = 5000,
**kwargs
):
super(RmaEngine, self).__init__(scheme, host, **kwargs)
self.rma_prefix = rma_prefix
self.rma_format = rma_format
self.page_size = page_size
@property
def format_query_string(self):
return f"query.{self.rma_format}"
[docs]
def add_page_params(self, url, start, count=None):
if count is None:
count = self.page_size
return f"{url},rma::options[start_row$eq{start}][num_rows$eq{count}][order$eq'id']"
[docs]
def get_rma(self, query: str):
""" Makes a paging rma query
Parameters
----------
query :
The RMA query parameters
"""
url = f"{self.scheme}://{self.host}/{self.rma_prefix}/{self.format_query_string}?{query}"
# logging.debug(url)
start_row = 0
total_rows = None
start_time = time.time()
while total_rows is None or start_row < total_rows:
current_url = self.add_page_params(url, start_row)
response_json = requests.get(current_url).json()
if not response_json["success"]:
raise Exception(response_json["msg"])
start_row += response_json["num_rows"]
if total_rows is None:
total_rows = response_json["total_rows"]
# logging.debug(f"downloaded {start_row} of {total_rows} records ({time.time() - start_time:.3f} seconds)")
yield response_json["msg"]
[docs]
def get_rma_list(self, query):
response = []
for chunk in self.get_rma(query):
response.extend(chunk)
return response
[docs]
def get_rma_tabular(self, query, try_infer_dtypes=True):
response = pd.DataFrame(self.get_rma_list(query))
if try_infer_dtypes:
response = infer_column_types(response)
return response