Skip to content

Commit

Permalink
Merge branch 'master' into subscription_refactor2
Browse files Browse the repository at this point in the history
  • Loading branch information
locka99 committed Apr 6, 2019
2 parents 473a79c + 3f2549a commit 679b153
Show file tree
Hide file tree
Showing 34 changed files with 301 additions and 275 deletions.
52 changes: 27 additions & 25 deletions 3rd-party/open62541/CMakelists.txt
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
cmake_minimum_required(VERSION 3.0)

project(open62541)

add_subdirectory(libopen62541)

include_directories(
./libopen62541
)

add_executable(open62541-client
./client.cpp
)

set(PLATFORM_LIBS)
if (WIN32)
set(PLATFORM_LIBS ws2_32)
endif ()

target_link_libraries(open62541-client
libopen62541
${PLATFORM_LIBS}
)

# add_executable(open62541-server)
cmake_minimum_required(VERSION 3.0)

project(open62541)

add_subdirectory(libopen62541)

include_directories(
./libopen62541
)

if (WIN32)
set(PLATFORM_LIBS ws2_32)
else ()
set(PLATFORM_LIBS)
endif ()

add_executable(open62541-client ./client.cpp)
target_link_libraries(open62541-client
libopen62541
${PLATFORM_LIBS}
)

add_executable(open62541-server ./server.cpp)
target_link_libraries(open62541-server
libopen62541
${PLATFORM_LIBS}
)
256 changes: 127 additions & 129 deletions 3rd-party/open62541/client.cpp
Original file line number Diff line number Diff line change
@@ -1,130 +1,128 @@
// MODIFIED FROM open62541 example

#ifdef _WIN32

#include <winsock2.h>
#include <windows.h>

# define UA_sleep_ms(X) Sleep(X)
#else
# include <unistd.h>
# define UA_sleep_ms(X) usleep(X * 1000)
#endif

#include <signal.h>
#include <stdlib.h>

#include "libopen62541/open62541.h"

UA_Boolean running = true;
UA_Logger logger = UA_Log_Stdout;

static void stopHandler(int sign) {
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "Received Ctrl-C");
running = 0;
}

static void
handler_currentTimeChanged(UA_Client *client, UA_UInt32 subId, void *subContext,
UA_UInt32 monId, void *monContext, UA_DataValue *value) {
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "currentTime has changed!");
if (UA_Variant_hasScalarType(&value->value, &UA_TYPES[UA_TYPES_DATETIME])) {
UA_DateTime raw_date = *(UA_DateTime *) value->value.data;
UA_DateTimeStruct dts = UA_DateTime_toStruct(raw_date);
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "date is: %02u-%02u-%04u %02u:%02u:%02u.%03u",
dts.day, dts.month, dts.year, dts.hour, dts.min, dts.sec, dts.milliSec);
}
}

static void
deleteSubscriptionCallback(UA_Client *client, UA_UInt32 subscriptionId, void *subscriptionContext) {
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "Subscription Id %u was deleted", subscriptionId);
}

static void
subscriptionInactivityCallback(UA_Client *client, UA_UInt32 subId, void *subContext) {
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "Inactivity for subscription %u", subId);
}

static void
stateCallback(UA_Client *client, UA_ClientState clientState) {
switch (clientState) {
case UA_CLIENTSTATE_DISCONNECTED:
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "The client is disconnected");
break;
case UA_CLIENTSTATE_CONNECTED:
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "A TCP connection to the server is open");
break;
case UA_CLIENTSTATE_SECURECHANNEL:
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "A SecureChannel to the server is open");
break;
case UA_CLIENTSTATE_SESSION: {
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "A session with the server is open");
/* A new session was created. We need to create the subscription. */
/* Create a subscription */
UA_CreateSubscriptionRequest request = UA_CreateSubscriptionRequest_default();
UA_CreateSubscriptionResponse response = UA_Client_Subscriptions_create(client, request,
NULL, NULL,
deleteSubscriptionCallback);

if (response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "Create subscription succeeded, id %u",
response.subscriptionId);
else
return;

/* Add a MonitoredItem */

char *node_id = const_cast<char *>("v1");

UA_MonitoredItemCreateRequest monRequest =
UA_MonitoredItemCreateRequest_default(UA_NODEID_STRING(2, node_id));

UA_MonitoredItemCreateResult monResponse =
UA_Client_MonitoredItems_createDataChange(client, response.subscriptionId,
UA_TIMESTAMPSTORETURN_BOTH,
monRequest, NULL, handler_currentTimeChanged, NULL);
if (monResponse.statusCode == UA_STATUSCODE_GOOD)
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND,
"Monitoring ns=2;s=v1', id %u", monResponse.monitoredItemId);
}
break;
case UA_CLIENTSTATE_SESSION_RENEWED:
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "A session with the server is open (renewed)");
/* The session was renewed. We don't need to recreate the subscription. */
break;
}
return;
}

int main(void) {
signal(SIGINT, stopHandler); /* catches ctrl-c */

UA_ClientConfig config = UA_ClientConfig_default;
/* Set stateCallback */
config.stateCallback = stateCallback;
config.subscriptionInactivityCallback = subscriptionInactivityCallback;

UA_Client *client = UA_Client_new(config);

/* Endless loop runAsync */
while (running) {
/* if already connected, this will return GOOD and do nothing */
/* if the connection is closed/errored, the connection will be reset and then reconnected */
/* Alternatively you can also use UA_Client_getState to get the current state */
UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4855");
if (retval != UA_STATUSCODE_GOOD) {
UA_LOG_ERROR(logger, UA_LOGCATEGORY_USERLAND, "Not connected. Retrying to connect in 1 second");
/* The connect may timeout after 5 seconds (default timeout) or it may fail immediately on network errors */
/* E.g. name resolution errors or unreachable network. Thus there should be a small sleep here */
UA_sleep_ms(1000);
continue;
}

UA_Client_runAsync(client, 1000);
};

/* Clean up */
UA_Client_delete(client); /* Disconnects the client internally */
return UA_STATUSCODE_GOOD;
// MODIFIED FROM open62541 example

#ifdef _WIN32

#include <winsock2.h>
#include <windows.h>

# define UA_sleep_ms(X) Sleep(X)
#else
# include <unistd.h>
# define UA_sleep_ms(X) usleep(X * 1000)
#endif

#include <signal.h>
#include <stdlib.h>

#include "libopen62541/open62541.h"

UA_Boolean running = true;
UA_Logger logger = UA_Log_Stdout;

static void stopHandler(int sign) {
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "Received Ctrl-C");
running = 0;
}

static void
handler_valueChanged(UA_Client *client, UA_UInt32 subId, void *subContext,
UA_UInt32 monId, void *monContext, UA_DataValue *value) {
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "value has changed!");
if (UA_Variant_hasScalarType(&value->value, &UA_TYPES[UA_TYPES_INT32])) {
UA_Int32 rawValue = *(UA_Int32 *) value->value.data;
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "value is %d", rawValue);
}
}

static void
deleteSubscriptionCallback(UA_Client *client, UA_UInt32 subscriptionId, void *subscriptionContext) {
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "Subscription Id %u was deleted", subscriptionId);
}

static void
subscriptionInactivityCallback(UA_Client *client, UA_UInt32 subId, void *subContext) {
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "Inactivity for subscription %u", subId);
}

static void
stateCallback(UA_Client *client, UA_ClientState clientState) {
switch (clientState) {
case UA_CLIENTSTATE_DISCONNECTED:
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "The client is disconnected");
break;
case UA_CLIENTSTATE_CONNECTED:
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "A TCP connection to the server is open");
break;
case UA_CLIENTSTATE_SECURECHANNEL:
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "A SecureChannel to the server is open");
break;
case UA_CLIENTSTATE_SESSION: {
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "A session with the server is open");
/* A new session was created. We need to create the subscription. */
/* Create a subscription */
UA_CreateSubscriptionRequest request = UA_CreateSubscriptionRequest_default();
UA_CreateSubscriptionResponse response = UA_Client_Subscriptions_create(client, request,
NULL, NULL,
deleteSubscriptionCallback);

if (response.responseHeader.serviceResult == UA_STATUSCODE_GOOD)
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "Create subscription succeeded, id %u",
response.subscriptionId);
else
return;

/* Add a MonitoredItem */

char *node_id = const_cast<char *>("v1");

UA_MonitoredItemCreateRequest monRequest =
UA_MonitoredItemCreateRequest_default(UA_NODEID_STRING(2, node_id));

UA_MonitoredItemCreateResult monResponse =
UA_Client_MonitoredItems_createDataChange(client, response.subscriptionId,
UA_TIMESTAMPSTORETURN_BOTH,
monRequest, NULL, handler_valueChanged, NULL);
if (monResponse.statusCode == UA_STATUSCODE_GOOD)
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND,
"Monitoring ns=2;s=v1', id %u", monResponse.monitoredItemId);
}
break;
case UA_CLIENTSTATE_SESSION_RENEWED:
UA_LOG_INFO(logger, UA_LOGCATEGORY_USERLAND, "A session with the server is open (renewed)");
/* The session was renewed. We don't need to recreate the subscription. */
break;
}
return;
}

int main(void) {
signal(SIGINT, stopHandler); /* catches ctrl-c */

UA_ClientConfig config = UA_ClientConfig_default;
/* Set stateCallback */
config.stateCallback = stateCallback;
config.subscriptionInactivityCallback = subscriptionInactivityCallback;

UA_Client *client = UA_Client_new(config);

/* Endless loop runAsync */
while (running) {
/* if already connected, this will return GOOD and do nothing */
/* if the connection is closed/errored, the connection will be reset and then reconnected */
/* Alternatively you can also use UA_Client_getState to get the current state */
UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4855");
if (retval != UA_STATUSCODE_GOOD) {
UA_LOG_ERROR(logger, UA_LOGCATEGORY_USERLAND, "Not connected. Retrying to connect in 1 second");
/* The connect may timeout after 5 seconds (default timeout) or it may fail immediately on network errors */
/* E.g. name resolution errors or unreachable network. Thus there should be a small sleep here */
UA_sleep_ms(1000);
continue;
}

UA_Client_runAsync(client, 1000);
};

/* Clean up */
UA_Client_delete(client); /* Disconnects the client internally */
return UA_STATUSCODE_GOOD;
}
6 changes: 6 additions & 0 deletions 3rd-party/open62541/server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#include <iostream>

int main() {
std::cout << "Implement me\n";
return 0;
}
6 changes: 3 additions & 3 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Client {
// Ask the server associated with the default endpoint for its list of endpoints
let endpoints = match self.get_server_endpoints() {
Result::Err(status_code) => {
error!("Can't get endpoints for server, error - {}", status_code);
error!("Cannot get endpoints for server, error - {}", status_code);
return Err(status_code);
}
Result::Ok(endpoints) => endpoints
Expand Down Expand Up @@ -449,7 +449,7 @@ impl Client {
Err(StatusCode::BadUnexpectedError)
}
} else {
error!("Can't find an endpoint that we call register server on");
error!("Cannot find an endpoint that we call register server on");
Err(StatusCode::BadUnexpectedError)
}
}
Expand Down Expand Up @@ -535,7 +535,7 @@ impl Client {
security_mode: MessageSecurityMode) -> Option<EndpointDescription>
{
if security_policy == SecurityPolicy::Unknown {
panic!("Can't match against unknown security policy");
panic!("Cannot match against unknown security policy");
}

let matching_endpoint = endpoints.iter().find(|e| {
Expand Down
2 changes: 1 addition & 1 deletion core/src/comms/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl Chunker {
}
}
Err(err) => {
debug!("Can't decode message {:?}, err = {:?}", object_id, err);
debug!("Cannot decode message {:?}, err = {:?}", object_id, err);
Err(StatusCode::BadServiceUnsupported)
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/comms/message_chunk_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl ChunkInfo {
let security_header = if chunk.is_open_secure_channel(&decoding_limits) {
let result = AsymmetricSecurityHeader::decode(&mut stream, &decoding_limits);
if result.is_err() {
error!("chunk_info() can't decode asymmetric security_header, {}", result.unwrap_err());
error!("chunk_info() cannot decode asymmetric security_header, {}", result.unwrap_err());
return Err(StatusCode::BadCommunicationError);
}
let security_header = result.unwrap();
Expand All @@ -67,7 +67,7 @@ impl ChunkInfo {
} else {
let result = SymmetricSecurityHeader::decode(&mut stream, &decoding_limits);
if result.is_err() {
error!("chunk_info() can't decode symmetric security_header, {}", result.unwrap_err());
error!("chunk_info() cannot decode symmetric security_header, {}", result.unwrap_err());
return Err(StatusCode::BadCommunicationError);
}
SecurityHeader::Symmetric(result.unwrap())
Expand Down
17 changes: 9 additions & 8 deletions core/src/crypto/aeskey.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ impl AesKey {
crypter.pad(false);
let result = crypter.update(src, dst);
if let Ok(count) = result {
let result = crypter.finalize(&mut dst[count..]);
if let Ok(rest) = result {
trace!("do cipher size {}", count + rest);
Ok(count + rest)
} else {
error!("Encryption error during finalize {:?}", result.unwrap_err());
Err(StatusCode::BadUnexpectedError)
}
crypter.finalize(&mut dst[count..])
.map(|rest| {
trace!("do cipher size {}", count + rest);
count + rest
})
.map_err(|e| {
error!("Encryption error during finalize {:?}", e);
StatusCode::BadUnexpectedError
})
} else {
error!("Encryption error during update {:?}", result.unwrap_err());
Err(StatusCode::BadUnexpectedError)
Expand Down
Loading

0 comments on commit 679b153

Please sign in to comment.