Skip to content

Commit

Permalink
improve torrent codes handling and download pieces
Browse files Browse the repository at this point in the history
  • Loading branch information
st3llaris committed Jun 12, 2024
1 parent 8d906dc commit 0cd34d7
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 81 deletions.
4 changes: 3 additions & 1 deletion lib/rutorrent/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def self.start
torrent_files = prompt.multi_select(Rutorrent::MESSAGES[:instructions],
format_torrent_files_with_size(decoded_torrent))

torrent_files = format_torrent_files_with_size(decoded_torrent, show_byte_size: false) if torrent_files.empty?
torrent_files = format_torrent_files_with_size(decoded_torrent, show_byte_size: false)

Rutorrent::Downloader.new(decoded_torrent, torrent_files).start
end
Expand All @@ -31,6 +31,8 @@ def self.format_torrent_files_with_size(torrent, show_byte_size: true)
index = torrent["info"]["files"].find_index { |f| f["path"].join("/") == file }
"#{file} (#{ByteSize.new(torrent["info"]["files"][index]["length"])})"
end
else
formatted_files
end

formatted_files
Expand Down
64 changes: 39 additions & 25 deletions lib/rutorrent/file_saver.rb
Original file line number Diff line number Diff line change
@@ -1,43 +1,57 @@
require "digest/sha1"

require "digest"
require "pry"
class FileSaver
attr_reader :expected_hashes, :pieces, :total_pieces, :piece_length, :file_path, :file
attr_reader :expected_hashes, :total_pieces, :piece_length, :file_path, :file, :pieces, :received_sizes, :downloaded,
:length

def initialize(file_path, piece_length, total_pieces, expected_hashes)
def initialize(file_path, piece_length, total_pieces, expected_hashes, length)
@file_path = file_path
@file = File.open(file_path, "wb")
@file = File.open(file_path, "wb+")
@piece_length = piece_length
@total_pieces = total_pieces
@expected_hashes = expected_hashes
@pieces = Array.new(total_pieces) { "" }
@received_sizes = Array.new(total_pieces, 0)
@count = 0
@downloaded = false
@length = length
end

def save_block(piece_index, begin_offset, block_data)
@pieces[piece_index] ||= ""
@pieces[piece_index][begin_offset, block_data.length] = block_data
def save_block(piece_index, _begin_offset, _block_data, pieces)
return unless piece_complete?(piece_index, pieces)

@file.seek(piece_index * @piece_length)
@file.write(@pieces[piece_index])
verify_piece(piece_index)
@pieces[piece_index] = nil
verify_and_save_piece(piece_index, pieces)
end

def save_piece_to_file(piece_index)
@file.seek(piece_index * piece_length)
@file.write(pieces[piece_index])
puts "Verified and saved piece #{piece_index}"
def piece_complete?(piece_index, pieces)
piece_size = piece_index == @total_pieces - 1 ? expected_final_piece_size : @piece_length
pieces[piece_index]&.blocks&.sum { |block| block[:block_data].size } == piece_size
end

def expected_final_piece_size
@total_length - (@piece_length * (@total_pieces - 1))
def verify_and_save_piece(piece_index, pieces)
piece_data = pieces[piece_index]
piece_data.blocks.each do |block_hash|
if @file.size >= length
@file.close
@downloaded = true
break
end

hash = Digest::SHA1.digest(block_hash[:block_data])

if hash.size == @expected_hashes[piece_index].size
@file.seek(@count * block_hash[:block_data].size)
@count += piece_index
@file.write(block_hash[:block_data])
puts "Verified and saved piece #{piece_index}, offset #{block_hash[:begin_offset]}"
else
puts "Piece #{piece_index} failed hash check"
end
end
end

def verify_piece(piece_index)
piece_data = @pieces[piece_index]
piece_hash = Digest::SHA1.digest(piece_data)
expected_hash = @expected_hashes[piece_index]

return save_piece_to_file(piece_index)
def expected_final_piece_size
total_length = (@piece_length * (@total_pieces - 1)) + @expected_hashes.last.size
total_length - (@piece_length * (@total_pieces - 1))
end

def close
Expand Down
66 changes: 28 additions & 38 deletions lib/rutorrent/message_code_handler.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
module Rutorrent
class MessageCodeHandler
attr_accessor :socket, :file_savers, :downloaded_pieces, :requested_pieces, :piece_availability, :mutex,
:download_complete
attr_accessor :socket, :file_savers, :downloaded_pieces, :requested_pieces, :mutex,
:download_complete, :piece_length

def initialize(socket, file_savers, downloaded_pieces, download_complete)
def initialize(socket, file_savers, downloaded_pieces, download_complete, piece_length)
@socket = socket
@file_savers = file_savers
@downloaded_pieces = downloaded_pieces
@requested_pieces = Set.new
@piece_availability = Hash.new { |hash, key| hash[key] = 0 }
@mutex = Mutex.new
@download_complete = download_complete
@piece_length = piece_length
@received_pieces = []
@pieces = []
end

def handle_choke(_payload)
Expand All @@ -21,7 +23,7 @@ def handle_choke(_payload)

def handle_unchoke(_payload)
puts "Received unchoked"
return if downloaded_pieces.size + 1 == file_savers.sum(&:total_pieces)
return if file_savers.all?(&:downloaded)

handle_request(nil, sending: true)
end
Expand Down Expand Up @@ -59,28 +61,24 @@ def handle_bitfield(payload)
def handle_request(_payload, sending: false)
if sending
puts "Sending requests"
@available_pieces.each do |_available_piece|
request_info = next_request_piece
break unless request_info
@request_length = 16_384
@available_pieces.each_with_index do |_available_piece, i|
@offset = 0
piece_index = i
length_prefix = [13].pack("N")
message_id = [6].pack("C")

piece_index = request_info[:piece_index]
next if requested_pieces.include?(piece_index)
until @offset >= @piece_length
payload = [piece_index, @offset, @request_length].pack("N*")
packet = length_prefix + message_id + payload

begin_offset = request_info[:begin_offset]
request_length = request_info[:request_length]
socket.send(packet, 0)

length_prefix = [13].pack("N")
message_id = [6].pack("C")
payload = [piece_index, begin_offset, request_length].pack("N*")
packet = length_prefix + message_id + payload
puts "Requested piece #{piece_index} from offset #{@offset}"
@offset += @request_length
end

socket.send(packet, 0)
requested_pieces.add(piece_index)
puts "Requested piece #{piece_index} from offset #{begin_offset} with length #{request_length}"
if download_complete.true?
puts "download completed"
return
end
end
else
puts "Received request"
Expand All @@ -91,33 +89,25 @@ def handle_piece(payload)
puts "Received piece"
piece_index, begin_offset = payload.unpack("N2")
block_data = payload[8..]
existing_piece = @received_pieces.find { |piece| piece_index == piece&.piece_index }
if existing_piece
existing_piece.blocks = { begin_offset: begin_offset,
block_data: block_data }
else
@received_pieces << Piece.new([{ begin_offset: begin_offset, block_data: block_data }], piece_index)
end

file_savers.each do |file_saver|
file_saver.save_block(piece_index, begin_offset, block_data)
file_saver.save_block(piece_index, begin_offset, block_data, @received_pieces)
end

mutex.synchronize do
downloaded_pieces.add(piece_index)
requested_pieces.delete(piece_index)
end

return if downloaded_pieces.size + 1 == file_savers.sum(&:total_pieces)

handle_request(nil, sending: true)
end

def handle_cancel(payload)
puts "Received cancel: #{payload.unpack1("B*")}"
end

def next_request_piece
piece_index = @available_pieces.find { |index| !requested_pieces.include?(index) }
return nil unless piece_index

begin_offset = 0
request_length = 16_384

{ piece_index: piece_index, begin_offset: begin_offset, request_length: request_length }
end
end
end
21 changes: 5 additions & 16 deletions lib/rutorrent/peer_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Rutorrent
class PeerConnection
attr_reader :peers, :info_hash, :peer_id, :torrent_files, :file_savers, :interval, :downloaded_pieces,
:pool, :torrent_info, :piece_length, :total_pieces, :pieces_hashes, :download_complete
:pool, :torrent_info, :piece_length, :total_pieces, :pieces_hashes, :download_complete, :left

def initialize(peers, torrent_info, interval)
@peers = peers
Expand Down Expand Up @@ -33,13 +33,12 @@ def start!

def start_connection(socket)
perform_handshake(socket)
message_code_class = MessageCodeHandler.new(socket, file_savers, downloaded_pieces, download_complete)

message_code_class = MessageCodeHandler.new(socket, file_savers, downloaded_pieces, download_complete,
piece_length)
loop do
message = MessageReader.read_message(socket)
message_code = Constants::PEER_MESSAGES_MAPPING[message[0]]
message_code_class.send("handle_#{message_code}", message[1])
report_progress
end
end

Expand All @@ -53,16 +52,14 @@ def shutdown_pool
def initialize_file_savers
torrent_files.each_with_index.map do |torrent_file, _index|
file_path = "#{Dir.home}/Downloads/#{torrent_file}"
FileSaver.new(file_path, piece_length, total_pieces, pieces_hashes)
FileSaver.new(file_path, piece_length, total_pieces, pieces_hashes, left)
end
end

def connect_to_peer(peer)
puts "Connecting to peer #{peer.ip}:#{peer.port}"
socket = TCPSocket.new(peer.ip, peer.port)
start_connection(socket)
rescue IO::ECONNREFUSED => e
puts "Failed to connect to peer #{peer.ip}:#{peer.port} - #{e.message}"
rescue StandardError => e
puts "Something went wrong: #{e}"
ensure
Expand All @@ -81,15 +78,6 @@ def perform_handshake(socket)
raise "Info hash mismatch" if received_info_hash != info_hash

puts "Connected to peer: #{received_peer_id.unpack1("H*")}"
response
end

def report_progress
puts "Download progress: #{downloaded_pieces.size} pieces downloaded."
return unless downloaded_pieces.size + 1 == @file_savers.sum(&:total_pieces)

puts "Download complete!"
download_complete.make_true
end

def handshake
Expand All @@ -100,6 +88,7 @@ def format_pieces_from(torrent)
@piece_length = torrent["info"]["piece length"]
@pieces_hashes = torrent["info"]["pieces"].scan(/.{20}/m)
@total_pieces = @pieces_hashes.size
@left = torrent["info"]["length"]
end
end
end
28 changes: 28 additions & 0 deletions lib/rutorrent/piece.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
require "observer"

module Rutorrent
class Piece
include Observable
attr_reader :blocks, :piece_index, :buffer

def initialize(blocks, piece_index)
@blocks = blocks
@piece_index = piece_index
@piece_observer = PieceObserver.new(self)
end

def blocks=(new_element)
blocks << new_element
changed
notify_observers
end

def update_buffer
# buffer.write(blocks.last)
end

def buffer_content
buffer.string
end
end
end
14 changes: 14 additions & 0 deletions lib/rutorrent/piece_observer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module Rutorrent
class PieceObserver
attr_reader :piece

def initialize(piece)
@piece = piece
piece.add_observer(self)
end

def update
# piece.update_buffer
end
end
end
12 changes: 11 additions & 1 deletion lib/rutorrent/tracker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def connect
if torrent["announce-list"][2][0].include?("udp")
return UDPConnection.new(torrent, torrent_files, options).connect
end

if torrent["announce"].include?("http")
HTTPConnection.new(torrent, torrent_files, options).connect

Expand All @@ -28,10 +29,19 @@ def options
port: 6881,
uploaded: 0,
downloaded: 0,
left: torrent["info"]["length"],
left: torrent["info"]["length"] || left,
compact: 1,
event: 2
}
end

def left
length = []
torrent_files.map do |torrent_file|
length << torrent["info"]["files"].find { |file| file["path"][0] == torrent_file }["length"]
end

length.sum
end
end
end

0 comments on commit 0cd34d7

Please sign in to comment.