forked from daniel-gonzalez-sanchez/ngsi-ld-client-tester
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate-iot-device-entity.py
73 lines (56 loc) · 2.63 KB
/
create-iot-device-entity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import logging
import logging.config
import yaml
import os
import json
import ngsi_ld_client
from ngsi_ld_models.models.iot_device import IotDevice
from ngsi_ld_client.models.entity import Entity
from ngsi_ld_models.models.has_sensor import HasSensor
from ngsi_ld_client.models.query_entity200_response_inner import QueryEntity200ResponseInner
from ngsi_ld_client.api_client import ApiClient as NGSILDClient
from ngsi_ld_client.configuration import Configuration as NGSILDConfiguration
from ngsi_ld_client.exceptions import ApiException
#assuming the log config file name is logging.yaml
with open('logging.yaml', 'r') as stream:
config = yaml.load(stream, Loader=yaml.FullLoader)
#read the file to logging config
logging.config.dictConfig(config)
logger = logging.getLogger(__name__)
# NGSI-LD Context Broker
BROKER_URI = os.getenv("BROKER_URI", "http://localhost:1026/ngsi-ld/v1")
# Context Catalog
CONTEXT_CATALOG_URI = os.getenv("CONTEXT_CATALOG_URI",
"http://context-catalog:8080/context.jsonld")
# Init NGSI-LD Client
configuration = NGSILDConfiguration(host=BROKER_URI)
configuration.debug = True
ngsi_ld = NGSILDClient(configuration=configuration)
ngsi_ld.set_default_header(
header_name="Link",
header_value='<{0}>; '
'rel="http://www.w3.org/ns/json-ld#context"; '
'type="application/ld+json"'.format(CONTEXT_CATALOG_URI)
)
ngsi_ld.set_default_header(
header_name="Accept",
header_value="application/json"
)
iot_device = IotDevice(
id="urn:ngsi-ld:IotDevice:1",
type="IotDevice",
name={"type":"Property", "value": "IoTDevice"},
hasSensor=HasSensor.from_dict({"type": "Relationship", "object": ["urn:ngsi-ld:TemperatureSensor:1","urn:ngsi-ld:TemperatureSensor:2"]})
#hasSensor=HasSensor.from_dict([{"type": "Relationship", "object": "urn:ngsi-ld:TemperatureSensor:1"},{"type": "Relationship", "object": "urn:ngsi-ld:HumiditySensor:2"}])
)
api_instance = ngsi_ld_client.ContextInformationProvisionApi(ngsi_ld)
entity_input = iot_device.to_dict()
logger.info("IotDevice object representation: %s\n" % entity_input)
logger.info("Entity object representation: %s\n" % Entity.from_dict(entity_input))
logger.info("QueryEntity200ResponseInner object representation: %s\n" % QueryEntity200ResponseInner.from_dict(entity_input))
query_entity_input = QueryEntity200ResponseInner.from_dict(entity_input)
try:
# Create NGSI-LD entity of type Sensor: POST /entities
api_instance.create_entity(query_entity200_response_inner=query_entity_input)
except Exception as e:
logger.exception("Exception when calling ContextInformationProvisionApi->create_entity: %s\n" % e)