Skip to content

Commit

Permalink
Merge pull request fluent#4433 from daipom/ci-fix-unstable-tests-of-o…
Browse files Browse the repository at this point in the history
…ut_forward

CI: Fix unstable tests of out_forward
  • Loading branch information
ashie authored Mar 13, 2024
2 parents d55f444 + 5d18f35 commit bb0fd3f
Showing 1 changed file with 25 additions and 37 deletions.
62 changes: 25 additions & 37 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1248,27 +1248,22 @@ def plugin_id_for_test?
target_input_driver = create_target_input_driver(conf: target_config)
output_conf = config
d = create_driver(output_conf)
d.instance_start

begin
chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil))
mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port,
linger_timeout: anything,
send_timeout: anything,
recv_timeout: anything,
connect_timeout: anything
) { |sock| mock(sock).close.once; sock }.twice
chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil))
mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port,
linger_timeout: anything,
send_timeout: anything,
recv_timeout: anything,
connect_timeout: anything
) { |sock| mock(sock).close.once; sock }.twice

target_input_driver.run(timeout: 15) do
d.run(shutdown: false) do
node = d.instance.nodes.first
2.times do
node.send_data('test', chunk) rescue nil
end
target_input_driver.run(timeout: 15) do
d.run do
node = d.instance.nodes.first
2.times do
node.send_data('test', chunk) rescue nil
end
end
ensure
d.instance_shutdown
end
end

Expand All @@ -1282,7 +1277,6 @@ def plugin_id_for_test?
port #{@target_port}
</server>
])
d.instance_start
assert_nothing_raised { d.run }
end

Expand All @@ -1294,33 +1288,28 @@ def plugin_id_for_test?
keepalive_timeout 2
]
d = create_driver(output_conf)
d.instance_start

begin
chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil))
mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port,
linger_timeout: anything,
send_timeout: anything,
recv_timeout: anything,
connect_timeout: anything
) { |sock| mock(sock).close.once; sock }.once
chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil))
mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port,
linger_timeout: anything,
send_timeout: anything,
recv_timeout: anything,
connect_timeout: anything
) { |sock| mock(sock).close.once; sock }.once

target_input_driver.run(timeout: 15) do
d.run(shutdown: false) do
node = d.instance.nodes.first
2.times do
node.send_data('test', chunk) rescue nil
end
target_input_driver.run(timeout: 15) do
d.run do
node = d.instance.nodes.first
2.times do
node.send_data('test', chunk) rescue nil
end
end
ensure
d.instance_shutdown
end
end

test 'create timer of purging obsolete sockets' do
output_conf = config + %[keepalive true]
d = create_driver(output_conf)
@d = d = create_driver(output_conf)

mock(d.instance).timer_execute(:out_forward_heartbeat_request, 1).once
mock(d.instance).timer_execute(:out_forward_keep_alived_socket_watcher, 5).once
Expand All @@ -1336,7 +1325,6 @@ def plugin_id_for_test?
keepalive_timeout 2
]
d = create_driver(output_conf)
d.instance_start

chunk = Fluent::Plugin::Buffer::MemoryChunk.new(Fluent::Plugin::Buffer::Metadata.new(nil, nil, nil))
mock.proxy(d.instance).socket_create_tcp(TARGET_HOST, @target_port,
Expand Down

0 comments on commit bb0fd3f

Please sign in to comment.