From 2cc57fd9cfbb07720cf0b588f0561fa6840e5d7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Y=C3=BCce=20Tekol?= Date: Wed, 2 Mar 2022 10:17:27 +0300 Subject: [PATCH] Removed MultiMap AddEntryListener method (#745) --- examples/multimap/event-listener/main.go | 140 ----------------- .../multi_map_add_entry_listener_codec.go | 81 ---------- ...lti_map_add_entry_listener_to_key_codec.go | 83 ---------- .../multi_map_remove_entry_listener_codec.go | 56 ------- multi_map_it_test.go | 147 +----------------- proxy_multi_map.go | 108 ------------- 6 files changed, 1 insertion(+), 614 deletions(-) delete mode 100644 examples/multimap/event-listener/main.go delete mode 100644 internal/proto/codec/multi_map_add_entry_listener_codec.go delete mode 100644 internal/proto/codec/multi_map_add_entry_listener_to_key_codec.go delete mode 100644 internal/proto/codec/multi_map_remove_entry_listener_codec.go diff --git a/examples/multimap/event-listener/main.go b/examples/multimap/event-listener/main.go deleted file mode 100644 index 2199f59f5..000000000 --- a/examples/multimap/event-listener/main.go +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright (c) 2008-2022, Hazelcast, Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License") - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "context" - "fmt" - "log" - - hz "github.com/hazelcast/hazelcast-go-client" - "github.com/hazelcast/hazelcast-go-client/types" -) - -func ObserveListenerIncludeValueOnly(ctx context.Context, m *hz.MultiMap, myHandler func(*hz.EntryNotified)) { - fmt.Println("--ObserveListenerIncludeValueOnly: start") - // In this observation, we will observe EntryAdded, EntryRemoved actions included their value. - // For this observation, it is needed to register our listener to given MultiMap. - listenerConfig := hz.MultiMapEntryListenerConfig{IncludeValue: true} - listenerConfig.NotifyEntryAdded(true) - listenerConfig.NotifyEntryRemoved(true) - // Add our continuous entry listener to the given MultiMap. - subscriptionID, err := m.AddEntryListener(ctx, listenerConfig, myHandler) - if err != nil { - panic(err) - } - // Initialize entries to put to the given MultiMap. - myEntries := []types.Entry{ - {Key: "my-key", Value: "my-value1"}, - {Key: "my-key", Value: "my-value2"}, - {Key: "my-key", Value: "my-value3"}, - {Key: "my-another-key", Value: "my-another-value1"}, - {Key: "my-another-key", Value: "my-another-value2"}, - } - for _, entry := range myEntries { - _, err := m.Put(ctx, entry.Key, entry.Value) - if err != nil { - panic(err) - } - } - // Then, remove same entries to observe EntryAdded and EntryRemoved actions. - for _, entry := range myEntries { - _, err := m.RemoveEntry(ctx, entry.Key, entry.Value) - if err != nil { - panic(err) - } - } - // Remove entry listener from the given MultiMap. - if err := m.RemoveEntryListener(ctx, subscriptionID); err != nil { - panic(err) - } - fmt.Println("--ObserveListenerIncludeValueOnly: end") - // If you observe the output, you can clearly see that given myHandler works fine and notifies for each of "my-key" put and remove operations. -} - -func ObserveListenerOnKey(ctx context.Context, m *hz.MultiMap, myHandler func(*hz.EntryNotified)) { - fmt.Println("--ObserveListenerOnKey: start") - // The key that we will listen on later. - myAwesomeKey := "my-awesome-key" - // In this observation, we will observe EntryAllCleared action and also EntryAdded, EntryRemoved actions for on a certain key. - // For this observation, it is needed to register our listener to the given MultiMap. - listenerConfig := hz.MultiMapEntryListenerConfig{IncludeValue: true, Key: myAwesomeKey} - listenerConfig.NotifyEntryAllCleared(true) - listenerConfig.NotifyEntryAdded(true) - // Add our continuous entry listener to the given MultiMap. - subscriptionID, err := m.AddEntryListener(ctx, listenerConfig, myHandler) - if err != nil { - panic(err) - } - // These are the entries to observe that we only interested in the key which we defined above. - // Events related to dummy key should not be notified and handled. - myEntries := []types.Entry{ - {Key: myAwesomeKey, Value: "my-awesome-value1"}, - {Key: myAwesomeKey, Value: "my-awesome-value2"}, - {Key: myAwesomeKey, Value: "my-awesome-value3"}, - {Key: "my-dummy-key", Value: "my-dummy-value"}, - } - // Put my observation entries to the given MultiMap. - for _, entry := range myEntries { - _, err := m.Put(ctx, entry.Key, entry.Value) - if err != nil { - panic(err) - } - } - // Trigger a clear event on given MultiMap as well. - if err := m.Clear(ctx); err != nil { - panic(err) - } - // Remove entry listener from the given MultiMap. - if err := m.RemoveEntryListener(ctx, subscriptionID); err != nil { - panic(err) - } - fmt.Println("--ObserveListenerOnKey: end") - // If you observe the output, you can clearly see that myHandler only handled myAwesomeKey related events - // then listener ignores "my-dummy-key" related event. -} - -func main() { - ctx := context.TODO() - // Let's start a new hazelcast client with default config. - client, err := hz.StartNewClient(ctx) - if err != nil { - log.Fatal(err) - } - // Disconnect client from the cluster. - defer client.Shutdown(ctx) - // Request an instance of a MultiMap. - m, err := client.GetMultiMap(ctx, "my-MultiMap") - if err != nil { - log.Fatal(err) - } - // Define a handler with supported EntryEventType for the upcoming events. - myHandler := func(event *hz.EntryNotified) { - switch event.EventType { - case hz.EntryAdded: - fmt.Printf("MultiMap: %s, (key: %v, value: %v) was added.\n", event.MapName, event.Key, event.Value) - case hz.EntryRemoved: - fmt.Printf("MultiMap: %s, (key: %v, value: %v) was removed.\n", event.MapName, event.Key, event.OldValue) - case hz.EntryAllCleared: - fmt.Printf("MultiMap: %s was cleared.\n", event.MapName) - } - } - // Observation on configuration setting with IncludeValue only. - ObserveListenerIncludeValueOnly(ctx, m, myHandler) - // Observation on configuration setting on a specific Key with IncludeValue. - ObserveListenerOnKey(ctx, m, myHandler) -} diff --git a/internal/proto/codec/multi_map_add_entry_listener_codec.go b/internal/proto/codec/multi_map_add_entry_listener_codec.go deleted file mode 100644 index f0a8863ab..000000000 --- a/internal/proto/codec/multi_map_add_entry_listener_codec.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package codec - -import ( - "github.com/hazelcast/hazelcast-go-client/internal/proto" - "github.com/hazelcast/hazelcast-go-client/internal/serialization" - "github.com/hazelcast/hazelcast-go-client/types" -) - -const ( - // hex: 0x020E00 - MultiMapAddEntryListenerCodecRequestMessageType = int32(134656) - // hex: 0x020E01 - MultiMapAddEntryListenerCodecResponseMessageType = int32(134657) - - // hex: 0x020E02 - MultiMapAddEntryListenerCodecEventEntryMessageType = int32(134658) - - MultiMapAddEntryListenerCodecRequestIncludeValueOffset = proto.PartitionIDOffset + proto.IntSizeInBytes - MultiMapAddEntryListenerCodecRequestLocalOnlyOffset = MultiMapAddEntryListenerCodecRequestIncludeValueOffset + proto.BooleanSizeInBytes - MultiMapAddEntryListenerCodecRequestInitialFrameSize = MultiMapAddEntryListenerCodecRequestLocalOnlyOffset + proto.BooleanSizeInBytes - - MultiMapAddEntryListenerResponseResponseOffset = proto.ResponseBackupAcksOffset + proto.ByteSizeInBytes - MultiMapAddEntryListenerEventEntryEventTypeOffset = proto.PartitionIDOffset + proto.IntSizeInBytes - MultiMapAddEntryListenerEventEntryUuidOffset = MultiMapAddEntryListenerEventEntryEventTypeOffset + proto.IntSizeInBytes - MultiMapAddEntryListenerEventEntryNumberOfAffectedEntriesOffset = MultiMapAddEntryListenerEventEntryUuidOffset + proto.UuidSizeInBytes -) - -// Adds an entry listener for this multimap. The listener will be notified for all multimap add/remove/update/evict events. - -func EncodeMultiMapAddEntryListenerRequest(name string, includeValue bool, localOnly bool) *proto.ClientMessage { - clientMessage := proto.NewClientMessageForEncode() - clientMessage.SetRetryable(false) - - initialFrame := proto.NewFrameWith(make([]byte, MultiMapAddEntryListenerCodecRequestInitialFrameSize), proto.UnfragmentedMessage) - FixSizedTypesCodec.EncodeBoolean(initialFrame.Content, MultiMapAddEntryListenerCodecRequestIncludeValueOffset, includeValue) - FixSizedTypesCodec.EncodeBoolean(initialFrame.Content, MultiMapAddEntryListenerCodecRequestLocalOnlyOffset, localOnly) - clientMessage.AddFrame(initialFrame) - clientMessage.SetMessageType(MultiMapAddEntryListenerCodecRequestMessageType) - clientMessage.SetPartitionId(-1) - - EncodeString(clientMessage, name) - - return clientMessage -} - -func DecodeMultiMapAddEntryListenerResponse(clientMessage *proto.ClientMessage) types.UUID { - frameIterator := clientMessage.FrameIterator() - initialFrame := frameIterator.Next() - - return FixSizedTypesCodec.DecodeUUID(initialFrame.Content, MultiMapAddEntryListenerResponseResponseOffset) -} - -func HandleMultiMapAddEntryListener(clientMessage *proto.ClientMessage, handleEntryEvent func(key *serialization.Data, value *serialization.Data, oldValue *serialization.Data, mergingValue *serialization.Data, eventType int32, uuid types.UUID, numberOfAffectedEntries int32)) { - messageType := clientMessage.Type() - frameIterator := clientMessage.FrameIterator() - if messageType == MultiMapAddEntryListenerCodecEventEntryMessageType { - initialFrame := frameIterator.Next() - eventType := FixSizedTypesCodec.DecodeInt(initialFrame.Content, MultiMapAddEntryListenerEventEntryEventTypeOffset) - uuid := FixSizedTypesCodec.DecodeUUID(initialFrame.Content, MultiMapAddEntryListenerEventEntryUuidOffset) - numberOfAffectedEntries := FixSizedTypesCodec.DecodeInt(initialFrame.Content, MultiMapAddEntryListenerEventEntryNumberOfAffectedEntriesOffset) - key := CodecUtil.DecodeNullableForData(frameIterator) - value := CodecUtil.DecodeNullableForData(frameIterator) - oldValue := CodecUtil.DecodeNullableForData(frameIterator) - mergingValue := CodecUtil.DecodeNullableForData(frameIterator) - handleEntryEvent(key, value, oldValue, mergingValue, eventType, uuid, numberOfAffectedEntries) - return - } -} diff --git a/internal/proto/codec/multi_map_add_entry_listener_to_key_codec.go b/internal/proto/codec/multi_map_add_entry_listener_to_key_codec.go deleted file mode 100644 index 871f62ffd..000000000 --- a/internal/proto/codec/multi_map_add_entry_listener_to_key_codec.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package codec - -import ( - "github.com/hazelcast/hazelcast-go-client/internal/proto" - "github.com/hazelcast/hazelcast-go-client/internal/serialization" - "github.com/hazelcast/hazelcast-go-client/types" -) - -const ( - // hex: 0x020D00 - MultiMapAddEntryListenerToKeyCodecRequestMessageType = int32(134400) - // hex: 0x020D01 - MultiMapAddEntryListenerToKeyCodecResponseMessageType = int32(134401) - - // hex: 0x020D02 - MultiMapAddEntryListenerToKeyCodecEventEntryMessageType = int32(134402) - - MultiMapAddEntryListenerToKeyCodecRequestIncludeValueOffset = proto.PartitionIDOffset + proto.IntSizeInBytes - MultiMapAddEntryListenerToKeyCodecRequestLocalOnlyOffset = MultiMapAddEntryListenerToKeyCodecRequestIncludeValueOffset + proto.BooleanSizeInBytes - MultiMapAddEntryListenerToKeyCodecRequestInitialFrameSize = MultiMapAddEntryListenerToKeyCodecRequestLocalOnlyOffset + proto.BooleanSizeInBytes - - MultiMapAddEntryListenerToKeyResponseResponseOffset = proto.ResponseBackupAcksOffset + proto.ByteSizeInBytes - MultiMapAddEntryListenerToKeyEventEntryEventTypeOffset = proto.PartitionIDOffset + proto.IntSizeInBytes - MultiMapAddEntryListenerToKeyEventEntryUuidOffset = MultiMapAddEntryListenerToKeyEventEntryEventTypeOffset + proto.IntSizeInBytes - MultiMapAddEntryListenerToKeyEventEntryNumberOfAffectedEntriesOffset = MultiMapAddEntryListenerToKeyEventEntryUuidOffset + proto.UuidSizeInBytes -) - -// Adds the specified entry listener for the specified key.The listener will be notified for all -// add/remove/update/evict events for the specified key only. - -func EncodeMultiMapAddEntryListenerToKeyRequest(name string, key *serialization.Data, includeValue bool, localOnly bool) *proto.ClientMessage { - clientMessage := proto.NewClientMessageForEncode() - clientMessage.SetRetryable(false) - - initialFrame := proto.NewFrameWith(make([]byte, MultiMapAddEntryListenerToKeyCodecRequestInitialFrameSize), proto.UnfragmentedMessage) - FixSizedTypesCodec.EncodeBoolean(initialFrame.Content, MultiMapAddEntryListenerToKeyCodecRequestIncludeValueOffset, includeValue) - FixSizedTypesCodec.EncodeBoolean(initialFrame.Content, MultiMapAddEntryListenerToKeyCodecRequestLocalOnlyOffset, localOnly) - clientMessage.AddFrame(initialFrame) - clientMessage.SetMessageType(MultiMapAddEntryListenerToKeyCodecRequestMessageType) - clientMessage.SetPartitionId(-1) - - EncodeString(clientMessage, name) - EncodeData(clientMessage, key) - - return clientMessage -} - -func DecodeMultiMapAddEntryListenerToKeyResponse(clientMessage *proto.ClientMessage) types.UUID { - frameIterator := clientMessage.FrameIterator() - initialFrame := frameIterator.Next() - - return FixSizedTypesCodec.DecodeUUID(initialFrame.Content, MultiMapAddEntryListenerToKeyResponseResponseOffset) -} - -func HandleMultiMapAddEntryListenerToKey(clientMessage *proto.ClientMessage, handleEntryEvent func(key *serialization.Data, value *serialization.Data, oldValue *serialization.Data, mergingValue *serialization.Data, eventType int32, uuid types.UUID, numberOfAffectedEntries int32)) { - messageType := clientMessage.Type() - frameIterator := clientMessage.FrameIterator() - if messageType == MultiMapAddEntryListenerToKeyCodecEventEntryMessageType { - initialFrame := frameIterator.Next() - eventType := FixSizedTypesCodec.DecodeInt(initialFrame.Content, MultiMapAddEntryListenerToKeyEventEntryEventTypeOffset) - uuid := FixSizedTypesCodec.DecodeUUID(initialFrame.Content, MultiMapAddEntryListenerToKeyEventEntryUuidOffset) - numberOfAffectedEntries := FixSizedTypesCodec.DecodeInt(initialFrame.Content, MultiMapAddEntryListenerToKeyEventEntryNumberOfAffectedEntriesOffset) - key := CodecUtil.DecodeNullableForData(frameIterator) - value := CodecUtil.DecodeNullableForData(frameIterator) - oldValue := CodecUtil.DecodeNullableForData(frameIterator) - mergingValue := CodecUtil.DecodeNullableForData(frameIterator) - handleEntryEvent(key, value, oldValue, mergingValue, eventType, uuid, numberOfAffectedEntries) - return - } -} diff --git a/internal/proto/codec/multi_map_remove_entry_listener_codec.go b/internal/proto/codec/multi_map_remove_entry_listener_codec.go deleted file mode 100644 index af1478cff..000000000 --- a/internal/proto/codec/multi_map_remove_entry_listener_codec.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package codec - -import ( - "github.com/hazelcast/hazelcast-go-client/internal/proto" - "github.com/hazelcast/hazelcast-go-client/types" -) - -const ( - // hex: 0x020F00 - MultiMapRemoveEntryListenerCodecRequestMessageType = int32(134912) - // hex: 0x020F01 - MultiMapRemoveEntryListenerCodecResponseMessageType = int32(134913) - - MultiMapRemoveEntryListenerCodecRequestRegistrationIdOffset = proto.PartitionIDOffset + proto.IntSizeInBytes - MultiMapRemoveEntryListenerCodecRequestInitialFrameSize = MultiMapRemoveEntryListenerCodecRequestRegistrationIdOffset + proto.UuidSizeInBytes - - MultiMapRemoveEntryListenerResponseResponseOffset = proto.ResponseBackupAcksOffset + proto.ByteSizeInBytes -) - -// Removes the specified entry listener. If there is no such listener added before, this call does no change in the -// cluster and returns false. - -func EncodeMultiMapRemoveEntryListenerRequest(name string, registrationId types.UUID) *proto.ClientMessage { - clientMessage := proto.NewClientMessageForEncode() - clientMessage.SetRetryable(true) - - initialFrame := proto.NewFrameWith(make([]byte, MultiMapRemoveEntryListenerCodecRequestInitialFrameSize), proto.UnfragmentedMessage) - FixSizedTypesCodec.EncodeUUID(initialFrame.Content, MultiMapRemoveEntryListenerCodecRequestRegistrationIdOffset, registrationId) - clientMessage.AddFrame(initialFrame) - clientMessage.SetMessageType(MultiMapRemoveEntryListenerCodecRequestMessageType) - clientMessage.SetPartitionId(-1) - - EncodeString(clientMessage, name) - - return clientMessage -} - -func DecodeMultiMapRemoveEntryListenerResponse(clientMessage *proto.ClientMessage) bool { - frameIterator := clientMessage.FrameIterator() - initialFrame := frameIterator.Next() - - return FixSizedTypesCodec.DecodeBoolean(initialFrame.Content, MultiMapRemoveEntryListenerResponseResponseOffset) -} diff --git a/multi_map_it_test.go b/multi_map_it_test.go index 09ba9cb03..e75203205 100644 --- a/multi_map_it_test.go +++ b/multi_map_it_test.go @@ -18,9 +18,7 @@ package hazelcast_test import ( "context" - "fmt" "sync" - "sync/atomic" "testing" "time" @@ -125,6 +123,7 @@ func TestMultiMap_Remove(t *testing.T) { } func TestMultiMap_RemoveEntry(t *testing.T) { + it.SkipIf(t, "hz < 4.1") it.MultiMapTester(t, func(t *testing.T, m *hz.MultiMap) { ctx := context.Background() targetValue := "value" @@ -300,69 +299,6 @@ func TestMultiMap_Size(t *testing.T) { }) } -func TestMultiMap_EntryNotifiedEvent(t *testing.T) { - it.MultiMapTester(t, func(t *testing.T, m *hz.MultiMap) { - ctx := context.Background() - const totalCallCount = int32(100) - callCount := int32(0) - handler := func(event *hz.EntryNotified) { - if event.EventType == hz.EntryAdded { - atomic.AddInt32(&callCount, 1) - } - } - listenerConfig := hz.MultiMapEntryListenerConfig{ - IncludeValue: true, - } - listenerConfig.NotifyEntryAdded(true) - subscriptionID, err := m.AddEntryListener(ctx, listenerConfig, handler) - if err != nil { - t.Fatal(err) - } - for i := 0; i < int(totalCallCount); i++ { - key := fmt.Sprintf("key-%d", i) - value := fmt.Sprintf("value-%d", i) - it.MustValue(m.Put(ctx, key, value)) - } - it.Eventually(t, func() bool { - return atomic.LoadInt32(&callCount) == totalCallCount - }) - atomic.StoreInt32(&callCount, 0) - if err := m.RemoveEntryListener(ctx, subscriptionID); err != nil { - t.Fatal(err) - } - for i := 0; i < int(totalCallCount); i++ { - key := fmt.Sprintf("key-%d", i) - value := fmt.Sprintf("value-%d", i) - it.MustValue(m.Put(ctx, key, value)) - } - if !assert.Equal(t, int32(0), atomic.LoadInt32(&callCount)) { - t.FailNow() - } - }) -} - -func TestMultiMap_EntryNotifiedEventToKey(t *testing.T) { - it.MultiMapTester(t, func(t *testing.T, m *hz.MultiMap) { - ctx := context.Background() - callCount := int32(0) - handler := func(event *hz.EntryNotified) { - atomic.AddInt32(&callCount, 1) - } - listenerConfig := hz.MultiMapEntryListenerConfig{ - IncludeValue: true, - Key: "k1", - } - listenerConfig.NotifyEntryAdded(true) - if _, err := m.AddEntryListener(ctx, listenerConfig, handler); err != nil { - t.Fatal(err) - } - it.MustValue(m.Put(ctx, "k1", "v1")) - it.Eventually(t, func() bool { - return atomic.LoadInt32(&callCount) == int32(1) - }) - }) -} - func TestMultiMap_Destroy(t *testing.T) { it.MultiMapTester(t, func(t *testing.T, m *hz.MultiMap) { ctx := context.Background() @@ -559,87 +495,6 @@ func TestMultiMap_ValueCount(t *testing.T) { }) } -func TestMultiMap_MultiMapEntryListener(t *testing.T) { - it.MultiMapTester(t, func(t *testing.T, m *hz.MultiMap) { - ctx := context.Background() - key := "k1" - cases := []struct { - listenerName string - event hz.EntryEventType - setConf func(*hz.MultiMapEntryListenerConfig) - triggerEvent func() - }{ - { - listenerName: "EntryAdded", - event: hz.EntryAdded, - setConf: func(conf *hz.MultiMapEntryListenerConfig) { - conf.NotifyEntryAdded(true) - }, - triggerEvent: func() { - ok, err := m.Put(ctx, key, "testValue") - if err != nil { - t.Fatal(err) - } - assert.True(t, ok) - }, - }, - { - listenerName: "EntryRemoved", - event: hz.EntryRemoved, - setConf: func(conf *hz.MultiMapEntryListenerConfig) { - conf.NotifyEntryRemoved(true) - }, - triggerEvent: func() { - it.MustBool(m.Put(ctx, key, "testValue")) - val, err := m.Remove(ctx, key) - if err != nil { - t.Fatal(err) - } - assert.Equal(t, []interface{}{"testValue"}, val) - }, - }, - { - listenerName: "EntryAllCleared", - event: hz.EntryAllCleared, - setConf: func(conf *hz.MultiMapEntryListenerConfig) { - conf.NotifyEntryAllCleared(true) - }, - triggerEvent: func() { - it.MustBool(m.Put(ctx, key, "testValue")) - err := m.Clear(ctx) - if err != nil { - t.Fatal(err) - } - }, - }, - } - success := false - for _, testcase := range cases { - t.Run(testcase.listenerName, func(t *testing.T) { - listenerConfig := hz.MultiMapEntryListenerConfig{ - IncludeValue: true, - Key: key, - } - testcase.setConf(&listenerConfig) - subsID, err := m.AddEntryListener(ctx, listenerConfig, func(event *hz.EntryNotified) { - assert.Equal(t, testcase.event, event.EventType) - assert.False(t, success) - success = true - }) - if err != nil { - t.Fatal(err) - } - testcase.triggerEvent() - it.Eventually(t, func() bool { - return success - }) - success = false - it.Must(m.RemoveEntryListener(ctx, subsID)) - }) - } - }) -} - func TestMultiMap_NonExistentKey(t *testing.T) { it.MultiMapTester(t, func(t *testing.T, m *hz.MultiMap) { ctx := context.Background() diff --git a/proxy_multi_map.go b/proxy_multi_map.go index 448d7970c..6206faa21 100644 --- a/proxy_multi_map.go +++ b/proxy_multi_map.go @@ -22,7 +22,6 @@ import ( "github.com/hazelcast/hazelcast-go-client/internal/proto" "github.com/hazelcast/hazelcast-go-client/internal/proto/codec" - "github.com/hazelcast/hazelcast-go-client/internal/serialization" "github.com/hazelcast/hazelcast-go-client/types" ) @@ -31,43 +30,6 @@ MultiMap is a distributed map. Hazelcast Go client enables you to perform operations like reading and writing from/to a Hazelcast MultiMap with methods like Get and Put. For details, see https://docs.hazelcast.com/hazelcast/latest/data-structures/multimap.html -Listening for MultiMap Entry Events - -The first step of listening to entry-based events is creating an instance of MultiMapEntryListenerConfig. -MultiMapEntryListenerConfig contains options to filter the events by key and has an option to include the value of the entry, not just the key. -You should also choose which type of events you want to receive. -In the example below, a listener configuration for added and updated entries is created. -Entries only with key "somekey": - - entryListenerConfig := hazelcast.MultiMapEntryListenerConfig{ - Key: "somekey", - IncludeValue: true, - } - entryListenerConfig.NotifyEntryAdded(true) - entryListenerConfig.NotifyEntryUpdated(true) - m, err := client.GetMap(ctx, "somemap") - -After creating the configuration, the second step is adding an event listener and a handler to act on received events: - - subscriptionID, err := m.AddEntryListener(ctx, entryListenerConfig, func(event *hazelcast.EntryNotified) { - switch event.EventType { - case hazelcast.EntryAdded: - fmt.Println("Entry Added:", event.Value) - case hazelcast.EntryRemoved: - fmt.Println("Entry Removed:", event.Value) - case hazelcast.EntryUpdated: - fmt.Println("Entry Updated:", event.Value) - case hazelcast.EntryEvicted: - fmt.Println("Entry Remove:", event.Value) - case hazelcast.EntryLoaded: - fmt.Println("Entry Loaded:", event.Value) - } - }) - -Adding an event listener returns a subscription ID, which you can later use to remove the listener: - - err = m.RemoveEntryListener(ctx, subscriptionID) - Using Locks You can lock entries in a MultiMap. @@ -113,16 +75,6 @@ func (m *MultiMap) NewLockContext(ctx context.Context) context.Context { return context.WithValue(ctx, lockIDKey{}, lockID(m.refIDGen.NextID())) } -// AddEntryListener adds a continuous entry listener to this multi-map. -func (m *MultiMap) AddEntryListener(ctx context.Context, config MultiMapEntryListenerConfig, handler EntryNotifiedHandler) (types.UUID, error) { - return m.addEntryListener(ctx, config.IncludeValue, m.smart, config.Key, func(event *EntryNotified) { - if int32(event.EventType)&config.flags == 0 { - return - } - handler(event) - }) -} - // Clear deletes all entries one by one and fires related events. func (m *MultiMap) Clear(ctx context.Context) error { request := codec.EncodeMultiMapClearRequest(m.name) @@ -376,11 +328,6 @@ func (m *MultiMap) RemoveEntry(ctx context.Context, key interface{}, value inter return codec.DecodeMultiMapRemoveEntryResponse(response), nil } -// RemoveEntryListener removes the specified entry listener. -func (m *MultiMap) RemoveEntryListener(ctx context.Context, subscriptionID types.UUID) error { - return m.listenerBinder.Remove(ctx, subscriptionID) -} - // Size returns the number of entries in this multi-map. func (m *MultiMap) Size(ctx context.Context) (int, error) { request := codec.EncodeMultiMapSizeRequest(m.name) @@ -429,24 +376,6 @@ func (m *MultiMap) Unlock(ctx context.Context, key interface{}) error { } } -func (m *MultiMap) addEntryListener(ctx context.Context, includeValue, localOnly bool, key interface{}, handler EntryNotifiedHandler) (types.UUID, error) { - var err error - var keyData *serialization.Data - if key != nil { - if keyData, err = m.validateAndSerialize(key); err != nil { - return types.UUID{}, err - } - } - subscriptionID := types.NewUUID() - addRequest := m.makeListenerRequest(keyData, includeValue, localOnly) - listenerHandler := func(msg *proto.ClientMessage) { - m.makeListenerDecoder(msg, keyData, m.makeEntryNotifiedListenerHandler(handler)) - } - removeRequest := codec.EncodeMultiMapRemoveEntryListenerRequest(m.name, subscriptionID) - err = m.listenerBinder.Add(ctx, subscriptionID, addRequest, removeRequest, listenerHandler) - return subscriptionID, err -} - func (m *MultiMap) lock(ctx context.Context, key interface{}, ttl int64) error { lid := extractLockID(ctx) if keyData, err := m.validateAndSerialize(key); err != nil { @@ -487,40 +416,3 @@ func (m *MultiMap) tryLock(ctx context.Context, key interface{}, lease int64, ti } } } - -func (m *MultiMap) makeListenerRequest(keyData *serialization.Data, includeValue, localOnly bool) *proto.ClientMessage { - if keyData != nil { - return codec.EncodeMultiMapAddEntryListenerToKeyRequest(m.name, keyData, includeValue, localOnly) - } - return codec.EncodeMultiMapAddEntryListenerRequest(m.name, includeValue, localOnly) -} - -func (m *MultiMap) makeListenerDecoder(msg *proto.ClientMessage, keyData *serialization.Data, handler entryNotifiedHandler) { - if keyData != nil { - codec.HandleMultiMapAddEntryListenerToKey(msg, handler) - return - } - codec.HandleMultiMapAddEntryListener(msg, handler) -} - -// MultiMapEntryListenerConfig contains configuration for a multi-map entry listener. -type MultiMapEntryListenerConfig struct { - Key interface{} - flags int32 - IncludeValue bool -} - -// NotifyEntryAdded enables receiving an entry event when an entry is added. -func (c *MultiMapEntryListenerConfig) NotifyEntryAdded(enable bool) { - flagsSetOrClear(&c.flags, int32(EntryAdded), enable) -} - -// NotifyEntryRemoved enables receiving an entry event when an entry is removed. -func (c *MultiMapEntryListenerConfig) NotifyEntryRemoved(enable bool) { - flagsSetOrClear(&c.flags, int32(EntryRemoved), enable) -} - -// NotifyEntryAllCleared enables receiving an entry event when all entries are cleared. -func (c *MultiMapEntryListenerConfig) NotifyEntryAllCleared(enable bool) { - flagsSetOrClear(&c.flags, int32(EntryAllCleared), enable) -}