diff --git a/inc/azure_uamqp_c/session.h b/inc/azure_uamqp_c/session.h index fd46647d..e154d0ed 100644 --- a/inc/azure_uamqp_c/session.h +++ b/inc/azure_uamqp_c/session.h @@ -58,6 +58,7 @@ MU_DEFINE_ENUM(SESSION_STATE, SESSION_STATE_VALUES) MU_DEFINE_ENUM(SESSION_SEND_TRANSFER_RESULT, SESSION_SEND_TRANSFER_RESULT_VALUES) typedef void(*LINK_ENDPOINT_FRAME_RECEIVED_CALLBACK)(void* context, AMQP_VALUE performative, uint32_t frame_payload_size, const unsigned char* payload_bytes); + typedef void(*ON_LINK_ENDPOINT_DESTROYED_CALLBACK)(LINK_ENDPOINT_HANDLE handle, void* context); typedef void(*ON_SESSION_STATE_CHANGED)(void* context, SESSION_STATE new_session_state, SESSION_STATE previous_session_state); typedef void(*ON_SESSION_FLOW_ON)(void* context); typedef bool(*ON_LINK_ATTACHED)(void* context, LINK_ENDPOINT_HANDLE new_link_endpoint, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target, fields properties); @@ -74,6 +75,7 @@ MU_DEFINE_ENUM(SESSION_SEND_TRANSFER_RESULT, SESSION_SEND_TRANSFER_RESULT_VALUES MOCKABLE_FUNCTION(, int, session_begin, SESSION_HANDLE, session); MOCKABLE_FUNCTION(, int, session_end, SESSION_HANDLE, session, const char*, condition_value, const char*, description); MOCKABLE_FUNCTION(, LINK_ENDPOINT_HANDLE, session_create_link_endpoint, SESSION_HANDLE, session, const char*, name); + MOCKABLE_FUNCTION(, void, session_set_link_endpoint_callback, LINK_ENDPOINT_HANDLE, link_endpoint, ON_LINK_ENDPOINT_DESTROYED_CALLBACK, on_link_endpoint_destroyed, void*, context); MOCKABLE_FUNCTION(, void, session_destroy_link_endpoint, LINK_ENDPOINT_HANDLE, link_endpoint); MOCKABLE_FUNCTION(, int, session_start_link_endpoint, LINK_ENDPOINT_HANDLE, link_endpoint, ON_ENDPOINT_FRAME_RECEIVED, frame_received_callback, ON_SESSION_STATE_CHANGED, on_session_state_changed, ON_SESSION_FLOW_ON, on_session_flow_on, void*, context); MOCKABLE_FUNCTION(, int, session_send_flow, LINK_ENDPOINT_HANDLE, link_endpoint, FLOW_HANDLE, flow); diff --git a/src/link.c b/src/link.c index 075510f7..95b24fd0 100644 --- a/src/link.c +++ b/src/link.c @@ -701,6 +701,20 @@ static void on_send_complete(void* context, IO_SEND_RESULT send_result) } } +static void on_link_endpoint_destroyed(LINK_ENDPOINT_HANDLE handle, void* context) +{ + if (context != NULL) + { + LINK_INSTANCE* link = (LINK_INSTANCE*)context; + + if (link->link_endpoint == handle) + { + // In this case the link endpoint has been destroyed by the session, and link should not try to free it. + link->link_endpoint = NULL; + } + } +} + LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQP_VALUE source, AMQP_VALUE target) { LINK_INSTANCE* result = (LINK_INSTANCE*)calloc(1, sizeof(LINK_INSTANCE)); @@ -779,6 +793,12 @@ LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQ free(result); result = NULL; } + else + { + // This ensures link.c gets notified if the link endpoint is destroyed + // by uamqp (due to a DETACH from the hub, e.g.) to prevent a double free. + session_set_link_endpoint_callback(result->link_endpoint, on_link_endpoint_destroyed, result); + } } } } @@ -883,6 +903,9 @@ void link_destroy(LINK_HANDLE link) link->on_link_state_changed = NULL; (void)link_detach(link, true, NULL, NULL, NULL); + // This is required to suppress the link destroyed callback, since this function is + // actively requesting that (in the following line). + session_set_link_endpoint_callback(link->link_endpoint, NULL, NULL); session_destroy_link_endpoint(link->link_endpoint); amqpvalue_destroy(link->source); amqpvalue_destroy(link->target); diff --git a/src/session.c b/src/session.c index fe83dde2..58165a3a 100644 --- a/src/session.c +++ b/src/session.c @@ -27,6 +27,8 @@ typedef struct LINK_ENDPOINT_INSTANCE_TAG void* callback_context; SESSION_HANDLE session; LINK_ENDPOINT_STATE link_endpoint_state; + ON_LINK_ENDPOINT_DESTROYED_CALLBACK on_link_endpoint_destroyed_callback; + void* on_link_endpoint_destroyed_context; } LINK_ENDPOINT_INSTANCE; typedef struct SESSION_INSTANCE_TAG @@ -104,6 +106,16 @@ static void remove_link_endpoint(LINK_ENDPOINT_HANDLE link_endpoint) static void free_link_endpoint(LINK_ENDPOINT_HANDLE link_endpoint) { + // The link endpoint handle can be managed by both uamqp and the upper layer. + // uamqp may destroy a link endpoint if a DETACH is received from the remote endpoint, + // so in this case the upper layer must be notified so it does not attempt to destroy the link endpoint as well. + // Ref counting would not suffice to address this situation as uamqp does not destroy link endpoints by itself + // every time, only when receiving a DETACH. + if (link_endpoint->on_link_endpoint_destroyed_callback != NULL) + { + link_endpoint->on_link_endpoint_destroyed_callback(link_endpoint, link_endpoint->on_link_endpoint_destroyed_context); + } + if (link_endpoint->name != NULL) { free(link_endpoint->name); @@ -1121,6 +1133,8 @@ LINK_ENDPOINT_HANDLE session_create_link_endpoint(SESSION_HANDLE session, const result->link_endpoint_state = LINK_ENDPOINT_STATE_NOT_ATTACHED; name_length = strlen(name); result->name = (char*)malloc(name_length + 1); + result->on_link_endpoint_destroyed_callback = NULL; + result->on_link_endpoint_destroyed_context = NULL; if (result->name == NULL) { /* Codes_S_R_S_SESSION_01_045: [If allocating memory for the link endpoint fails, session_create_link_endpoint shall fail and return NULL.] */ @@ -1160,6 +1174,15 @@ LINK_ENDPOINT_HANDLE session_create_link_endpoint(SESSION_HANDLE session, const return result; } +void session_set_link_endpoint_callback(LINK_ENDPOINT_HANDLE link_endpoint, ON_LINK_ENDPOINT_DESTROYED_CALLBACK on_link_endpoint_destroyed, void* context) +{ + if (link_endpoint != NULL) + { + link_endpoint->on_link_endpoint_destroyed_callback = on_link_endpoint_destroyed; + link_endpoint->on_link_endpoint_destroyed_context = context; + } +} + void session_destroy_link_endpoint(LINK_ENDPOINT_HANDLE link_endpoint) { if (link_endpoint != NULL)