-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory:实现添加 #14
Memory:实现添加 #14
Conversation
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Partition是包外可见的, 但是其上的方法确实包内可见的
2.方法命令应该站在Partiton的角度, append/getBatch(index, limit)/retrieveBatch(index, limit) - 缺少对应的单元测试
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个并发安全的测试你知道如何操作吗
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
暂时还没添加并发安全的测试
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我有点忘了 你说的是哪个并发测试场景
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
就是这个partition 的添加数据获取数据是并发安全的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
添加上
memory/topic.go
Outdated
case 1: | ||
partitionID = partition[0] | ||
} else { | ||
default: | ||
return mqerr.ErrInvalidPartition | ||
} | ||
if partitionID < 0 || int(partitionID) >= len(t.partitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
确认一下, mqerr.ErrInvalidPartition 是否表示 PartitionID非法, 还是表示分区总数非法?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
分区id非法
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
补充对Consumer和ConsumerGroup的测试, 尤其是并发相关测试
// 测试调用consumer 和 producer 如果topic不存在就新建 | ||
testmq := &MQ{ | ||
topics: syncx.Map[string, *Topic]{}, | ||
} | ||
_, err := testmq.Consumer("test_topic", "group1") | ||
require.NoError(t, err) | ||
_, ok := testmq.topics.Load("test_topic") | ||
assert.Equal(t, ok, true) | ||
_, err = testmq.Producer("test_topic1") | ||
require.NoError(t, err) | ||
_, ok = testmq.topics.Load("test_topic1") | ||
assert.Equal(t, ok, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
写到针对抽象的e2e测试集中取, 分成两个测试
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
已经拆分到e2e测试中, 这里就可以删掉了
memory/mq.go
Outdated
} | ||
t, ok := m.topics.Load(topic) | ||
if !ok { | ||
t = NewTopic(topic, defaultPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在针对抽象模型的e2e测试集中添加两个测试:
- 未创建topic, 直接创建producer
- 未创建topic, 直接创建consumer
运行结果与kafka运行结果对其即可
group.partitionRecords = &partitionRecords | ||
} | ||
consumer := group.JoinGroup() | ||
t.consumerGroups.Store(groupID, group) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
测试在哪里?
memory/topic.go
Outdated
t.locker.Lock() | ||
defer t.locker.Unlock() | ||
if !t.closed { | ||
t.consumerGroups.Range(func(key string, value *ConsumerGroup) bool { | ||
value.Close() | ||
return true | ||
}) | ||
for _, producer := range t.producers { | ||
_ = producer.Close() | ||
} | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t.closed 在哪里设置为true?, 添加测试代码
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
// addMessage 往分区里面添加消息 | ||
func (t *Topic) addMessage(msg *mq.Message) error { | ||
partitionID := t.producerPartitionIDGetter.PartitionID(string(msg.Key)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Topic在closed后 是否还能添加?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
直接掉可以但我没暴露出去。用户只能通过producer去调用,producer上已经做了限制。
@@ -0,0 +1,50 @@ | |||
// Copyright 2021 ecodeclub |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
文件路径中 memory/consumerpartitionassigner/equaldivide/balancer.go, 既有assigner又有balancer.go统一一下, 测试文件也一样
// 测试调用consumer 和 producer 如果topic不存在就新建 | ||
testmq := &MQ{ | ||
topics: syncx.Map[string, *Topic]{}, | ||
} | ||
_, err := testmq.Consumer("test_topic", "group1") | ||
require.NoError(t, err) | ||
_, ok := testmq.topics.Load("test_topic") | ||
assert.Equal(t, ok, true) | ||
_, err = testmq.Producer("test_topic1") | ||
require.NoError(t, err) | ||
_, ok = testmq.topics.Load("test_topic1") | ||
assert.Equal(t, ok, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
已经拆分到e2e测试中, 这里就可以删掉了
自查清单
注意: 请完成下列自查清单中的所有自查项,完成一项勾选一项.
PR概述
关联Issue
其他内容