Skip to content

Commit

Permalink
reproduce issue with parallel access
Browse files Browse the repository at this point in the history
  • Loading branch information
totonga committed Jan 29, 2025
1 parent 71fa53b commit 6c154f8
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 130 deletions.
267 changes: 137 additions & 130 deletions external_data_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,19 @@ def Open(self, identifier: exd_api.Identifier, context: grpc.ServicerContext) ->
:raises ValueError: If file does not exist
:return exd_api.Handle: Handle to the opened file.
"""
file_path = Path(self.__get_path(identifier.url))
if not file_path.is_file():
context.abort(
grpc.StatusCode.NOT_FOUND,
f"File '{
identifier.url}' not found.",
)
with self.lock:
file_path = Path(self.__get_path(identifier.url))
if not file_path.is_file():
context.abort(
grpc.StatusCode.NOT_FOUND,
f"File '{
identifier.url}' not found.",
)

connection_id = self.__open_dwfile(identifier)
connection_id = self.__open_dwfile(identifier)

rv = exd_api.Handle(uuid=connection_id)
return rv
rv = exd_api.Handle(uuid=connection_id)
return rv

def Close(self, handle: exd_api.Handle, context: grpc.ServicerContext) -> exd_api.Empty:
"""
Expand All @@ -116,8 +117,9 @@ def Close(self, handle: exd_api.Handle, context: grpc.ServicerContext) -> exd_ap
:param context: Additional parameters from grpc.
:return exd_api.Empty: Empty object.
"""
self.__close_dwfile(handle)
return exd_api.Empty()
with self.lock:
self.__close_dwfile(handle)
return exd_api.Empty()

def GetStructure(self, structure_request: exd_api.StructureRequest, context: grpc.ServicerContext) -> exd_api.StructureResult:
"""
Expand All @@ -128,55 +130,56 @@ def GetStructure(self, structure_request: exd_api.StructureRequest, context: grp
:raises NotImplementedError: If advanced features are requested.
:return exd_api.StructureResult: The structure of the opened file.
"""
if (
structure_request.suppress_channels
or structure_request.suppress_attributes
or 0 != len(structure_request.channel_names)
):
context.abort(
grpc.StatusCode.UNIMPLEMENTED, "Method not implemented!",
)
with self.lock:
if (
structure_request.suppress_channels
or structure_request.suppress_attributes
or 0 != len(structure_request.channel_names)
):
context.abort(
grpc.StatusCode.UNIMPLEMENTED, "Method not implemented!",
)

identifier = self.connection_map[structure_request.handle.uuid]
dw_file_handle = self.__get_dw_file(structure_request.handle)
dw_group_dict = dw_get_structure(dw_file_handle)

rv = exd_api.StructureResult(identifier=identifier)
rv.name = Path(identifier.url).name
dw_info = dw_file_handle.info
rv.attributes.variables["start_time"].string_array.values.append(
dw_info.start_store_time.strftime("%Y%m%d%H%M%S%f"))
rv.attributes.variables["duration"].double_array.values.append(
dw_info.duration)
rv.attributes.variables["sample_rate"].double_array.values.append(
dw_info.sample_rate)
dw_groups = []
for key, value in dw_group_dict.items():
dw_groups.append(key)
new_group = exd_api.StructureResult.Group()
new_group.name = key
new_group.id = value["group_number"]
ch_dict = value["channel_dict"]
new_group.total_number_of_channels = len(ch_dict)
new_group.number_of_rows = ch_dict[0]["length"]

for ch_index in ch_dict:
new_channel = exd_api.StructureResult.Channel()
new_channel.name = ch_dict[ch_index]["long_name"]
new_channel.id = ch_index
new_channel.data_type = ch_dict[ch_index]["datatype"]
new_channel.unit_string = ch_dict[ch_index]["unit"]
channel_comment = ch_dict[ch_index]["description"]
# added attributes
if channel_comment is not None and "" != channel_comment:
new_channel.attributes.variables["description"].string_array.values.append(
channel_comment)
new_channel.attributes.variables["independent"].long_array.values.append(
ch_dict[ch_index]["independent"])
new_group.channels.append(new_channel)

rv.groups.append(new_group)
return rv
identifier = self.connection_map[structure_request.handle.uuid]
dw_file_handle = self.__get_dw_file(structure_request.handle)
dw_group_dict = dw_get_structure(dw_file_handle)

rv = exd_api.StructureResult(identifier=identifier)
rv.name = Path(identifier.url).name
dw_info = dw_file_handle.info
rv.attributes.variables["start_time"].string_array.values.append(
dw_info.start_store_time.strftime("%Y%m%d%H%M%S%f"))
rv.attributes.variables["duration"].double_array.values.append(
dw_info.duration)
rv.attributes.variables["sample_rate"].double_array.values.append(
dw_info.sample_rate)
dw_groups = []
for key, value in dw_group_dict.items():
dw_groups.append(key)
new_group = exd_api.StructureResult.Group()
new_group.name = key
new_group.id = value["group_number"]
ch_dict = value["channel_dict"]
new_group.total_number_of_channels = len(ch_dict)
new_group.number_of_rows = ch_dict[0]["length"]

for ch_index in ch_dict:
new_channel = exd_api.StructureResult.Channel()
new_channel.name = ch_dict[ch_index]["long_name"]
new_channel.id = ch_index
new_channel.data_type = ch_dict[ch_index]["datatype"]
new_channel.unit_string = ch_dict[ch_index]["unit"]
channel_comment = ch_dict[ch_index]["description"]
# added attributes
if channel_comment is not None and "" != channel_comment:
new_channel.attributes.variables["description"].string_array.values.append(
channel_comment)
new_channel.attributes.variables["independent"].long_array.values.append(
ch_dict[ch_index]["independent"])
new_group.channels.append(new_channel)

rv.groups.append(new_group)
return rv

def GetValues(self, values_request: exd_api.ValuesRequest, context: grpc.ServicerContext) -> exd_api.ValuesResult:
"""
Expand All @@ -187,78 +190,79 @@ def GetValues(self, values_request: exd_api.ValuesRequest, context: grpc.Service
:raises NotImplementedError: If unknown data type is accessed.
:return exd_api.ValuesResult: The chunk of bulk data.
"""
dw_file = self.__get_dw_file(values_request.handle)

# (1) get the structure create for the get_structure()
dw_structure = dw_get_structure(dw_file)
dw_group_index_dict = {}
# (2) reorganize the data to be able to address the groups by index
for key, source in dw_structure.items():
group_no = source["group_number"]
dw_group_index_dict[group_no] = {
"channel_dict": source["channel_dict"],
"group_name": key,
"indep_channel": source["indep_channel"]
}

if values_request.group_id < 0 or values_request.group_id >= len(dw_group_index_dict):
context.abort(
grpc.StatusCode.INVALID_ARGUMENT, f"Invalid group id {
values_request.group_id}!",
)

dw_group = dw_group_index_dict[values_request.group_id]

nr_of_rows = dw_group["channel_dict"][0]["length"]

if values_request.start > nr_of_rows or values_request.start < 0:
context.abort(
grpc.StatusCode.INVALID_ARGUMENT, f"Channel start index {
values_request.start} out of range!",
)

end_index = values_request.start + values_request.limit
if end_index >= nr_of_rows:
end_index = nr_of_rows

if len(values_request.channel_ids) == 0:
# empty id array means all channels of the group
for ch in dw_group["channel_dict"]:
values_request.channel_ids.append(ch)

for channel_id in values_request.channel_ids:
if channel_id < 0 or channel_id >= len(dw_group["channel_dict"]):
with self.lock:
dw_file = self.__get_dw_file(values_request.handle)

# (1) get the structure create for the get_structure()
dw_structure = dw_get_structure(dw_file)
dw_group_index_dict = {}
# (2) reorganize the data to be able to address the groups by index
for key, source in dw_structure.items():
group_no = source["group_number"]
dw_group_index_dict[group_no] = {
"channel_dict": source["channel_dict"],
"group_name": key,
"indep_channel": source["indep_channel"]
}

if values_request.group_id < 0 or values_request.group_id >= len(dw_group_index_dict):
context.abort(
grpc.StatusCode.INVALID_ARGUMENT, f"Invalid channel id {
channel_id}!",
grpc.StatusCode.INVALID_ARGUMENT, f"Invalid group id {
values_request.group_id}!",
)

rv = exd_api.ValuesResult(id=values_request.group_id)
for signal_index in values_request.channel_ids:
channel = dw_group["channel_dict"][signal_index]
chn_section = None
if signal_index == 0:
channel_next = dw_group["channel_dict"][1]
# this is the index channel, e.g. time
chn_section = dw_file[channel_next['name']].series().index
channel_datatype = channel["datatype"]
else:
chn_section = dw_file[channel['name']].series().values
channel_datatype = channel["datatype"]

# restrict the number of data per channel like requested
section = chn_section[values_request.start: end_index]
dw_group = dw_group_index_dict[values_request.group_id]

new_channel_values = exd_api.ValuesResult.ChannelValues()
new_channel_values.id = signal_index
new_channel_values.values.data_type = channel_datatype
nr_of_rows = dw_group["channel_dict"][0]["length"]

self.__assign_channel_values(
channel_datatype, section, new_channel_values)

rv.channels.append(new_channel_values)
if values_request.start > nr_of_rows or values_request.start < 0:
context.abort(
grpc.StatusCode.INVALID_ARGUMENT, f"Channel start index {
values_request.start} out of range!",
)

return rv
end_index = values_request.start + values_request.limit
if end_index >= nr_of_rows:
end_index = nr_of_rows

if len(values_request.channel_ids) == 0:
# empty id array means all channels of the group
for ch in dw_group["channel_dict"]:
values_request.channel_ids.append(ch)

for channel_id in values_request.channel_ids:
if channel_id < 0 or channel_id >= len(dw_group["channel_dict"]):
context.abort(
grpc.StatusCode.INVALID_ARGUMENT, f"Invalid channel id {
channel_id}!",
)

rv = exd_api.ValuesResult(id=values_request.group_id)
for signal_index in values_request.channel_ids:
channel = dw_group["channel_dict"][signal_index]
chn_section = None
if signal_index == 0:
channel_next = dw_group["channel_dict"][1]
# this is the index channel, e.g. time
chn_section = dw_file[channel_next['name']].series().index
channel_datatype = channel["datatype"]
else:
chn_section = dw_file[channel['name']].series().values
channel_datatype = channel["datatype"]

# restrict the number of data per channel like requested
section = chn_section[values_request.start: end_index]

new_channel_values = exd_api.ValuesResult.ChannelValues()
new_channel_values.id = signal_index
new_channel_values.values.data_type = channel_datatype

self.__assign_channel_values(
channel_datatype, section, new_channel_values)

rv.channels.append(new_channel_values)

return rv

def __assign_channel_values(self, channel_datatype, section, new_channel_values):
if channel_datatype == ods.DataTypeEnum.DT_BOOLEAN:
Expand Down Expand Up @@ -316,7 +320,7 @@ def __init__(self):
self.connect_count = 0
self.connection_map = {}
self.file_map = {}
self.lock = threading.Lock()
self.lock = threading.RLock()

def __get_id(self, identifier):
self.connect_count = self.connect_count + 1
Expand Down Expand Up @@ -344,9 +348,12 @@ def __open_dwfile(self, identifier: exd_api.Identifier) -> str:
return connection_id

def __get_dw_file(self, handle: exd_api.Handle) -> dw.DWFile:
identifier = self.connection_map[handle.uuid]
connection_url = self.__get_path(identifier.url)
return self.file_map[connection_url]["dw_file"]
with self.lock:
identifier = self.connection_map[handle.uuid]
connection_url = self.__get_path(identifier.url)
rv: dw.DWFile = self.file_map[connection_url]["dw_file"]
rv.activate()
return rv

def __close_dwfile(self, handle: exd_api.Handle):
with self.lock:
Expand Down
Loading

0 comments on commit 6c154f8

Please sign in to comment.