/test/services/version1/test_BeaconsHttpServiceV1.py
import json
import time
from json import JSONDecodeError
from typing import Union
import requests
from pip_services4_components.config import ConfigParams
from pip_services4_components.refer import References, Descriptor
from pip_services4_commons.reflect import PropertyReflector
from pip_services4_components.exec import Parameters
from src.data.version1 import BeaconV1, BeaconTypeV1
from src.logic.BeaconsService import BeaconsService
from src.persistence.BeaconsMemoryPersistence import BeaconsMemoryPersistence
from src.controllers.version1.BeaconsHttpServiceV1 import BeaconsHttpControllerV1
BEACON1 = BeaconV1("1", "1", BeaconTypeV1.AltBeacon, "00001", "TestBeacon1", {"type": 'Point', "coordinates": [0, 0]},
50.0)
BEACON2 = BeaconV1("2", "1", BeaconTypeV1.iBeacon, "00002", "TestBeacon2", {"type": 'Point', "coordinates": [2, 2]},
70.0)
BEACON3 = BeaconV1("3", "2", BeaconTypeV1.AltBeacon, "00003", "TestBeacon3", {"type": 'Point', "coordinates": [10, 10]},
50.0)
class TestBeaconsHttpControllerV1:
_persistence: BeaconsMemoryPersistence
_controller: BeaconsService
_service: BeaconsHttpControllerV1
@classmethod
def setup_class(cls):
cls._persistence = BeaconsMemoryPersistence()
cls._controller = BeaconsController()
cls._service = BeaconsHttpServiceV1()
cls._service.configure(ConfigParams.from_tuples(
'connection.protocol', 'http',
'connection.port', 3002,
'connection.host', 'localhost'))
references = References.from_tuples(Descriptor('beacons', 'persistence', 'memory', 'default', '1.0'),
cls._persistence,
Descriptor('beacons', 'service', 'default', 'default', '1.0'),
cls._service,
Descriptor('beacons', 'controller', 'http', 'default', '1.0'),
cls._controller)
cls._controller.set_references(references)
cls._service.set_references(references)
cls._persistence.open(None)
cls._controller.open(None)
@classmethod
def teardown_class(cls):
cls._persistence.close(None)
cls._service.close(None)
def test_crud_operations(self):
time.sleep(2)
# Create the first beacon
beacon1 = self.invoke("/v1/beacons/create_beacon",
Parameters.from_tuples("beacon", PropertyReflector.get_properties(BEACON1)))
assert beacon1 is not None
assert beacon1['id'] == BEACON1.id
assert beacon1['site_id'] == BEACON1.site_id
assert beacon1['udi'] == BEACON1.udi
assert beacon1['type'] == BEACON1.type
assert beacon1['label'] == BEACON1.label
assert beacon1['center'] is not None
# Create the second beacon
beacon2 = self.invoke("/v1/beacons/create_beacon",
Parameters.from_tuples("beacon", PropertyReflector.get_properties(BEACON2)))
assert beacon2 is not None
assert beacon2['id'] == BEACON2.id
assert beacon2['site_id'] == BEACON2.site_id
assert beacon2['udi'] == BEACON2.udi
assert beacon2['type'] == BEACON2.type
assert beacon2['label'] == BEACON2.label
assert beacon2['center'] is not None
# Get all beacons
page = self.invoke("/v1/beacons/get_beacons", Parameters.from_tuples("beacons"))
assert page is not None
assert len(page['data']) == 2
beacon1 = page['data'][0]
# Update the beacon
beacon1['label'] = "ABC"
beacon = self.invoke("/v1/beacons/update_beacon", Parameters.from_tuples("beacon", beacon1))
assert beacon is not None
assert beacon1['id'] == beacon['id']
assert "ABC" == beacon['label']
# Get beacon by udi
beacon = self.invoke("/v1/beacons/get_beacon_by_udi", Parameters.from_tuples("udi", beacon1['udi']))
assert beacon is not None
assert beacon['id'] == beacon1['id']
# Calculate position for one beacon
position = self.invoke("/v1/beacons/calculate_position",
Parameters.from_tuples("site_id", '1', "udis", ['00001']))
assert position is not None
assert "Point" == position["type"]
assert 2 == len(position["coordinates"])
assert 0 == position["coordinates"][0]
assert 0 == position["coordinates"][1]
# Delete beacon
self.invoke("/v1/beacons/delete_beacon_by_id", Parameters.from_tuples("id", beacon1['id']))
# Try to get deleted beacon
beacon = self.invoke("/v1/beacons/get_beacon_by_id", Parameters.from_tuples("id", beacon1['id']))
assert beacon is False
def invoke(self, route, entity) -> Union[bool, dict]:
params = {}
route = "http://localhost:3002" + route
response = None
timeout = 10000
# Call the service
data = json.dumps(entity)
try:
response = requests.request('POST', route, params=params, json=data, timeout=timeout)
return response.json()
except JSONDecodeError:
if response.status_code == 404:
return False