Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…9a0d8306bae9b94f6be55f080aa0cdb...1554d559dca6cfa4c9270ad2ab6173bcc038f165 (Branch HEAD)

Loxigen Head commit bigswitch/loxigen@1554d559dca6cfa4c9270ad2ab6173bcc038f165
commit 1554d559dca6cfa4c9270ad2ab6173bcc038f165
Merge: f11089d e6ed014
Author: Ken Chiang <[email protected]>
Date:   Thu Oct 21 10:19:10 2021 -0700

    Merge pull request floodlight#3 from kenchiang/BT-8027

    BT-8027: connection.py template

commit e6ed01486e0b83f38f93668850195d59e8ac4e74
Author: Ken Chiang <[email protected]>
Date:   Thu Oct 21 16:57:33 2021 +0000

    BT-8027
    Harmonize pyloxi3's connection.py with tap-app's valuate/connection.py,
    adding the ConnectionManager class and updating exception strings
  • Loading branch information
Jenkins committed Oct 21, 2021
1 parent ad1778e commit f90c6e6
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 35 deletions.
2 changes: 1 addition & 1 deletion loxi-revision
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f11089d449a0d8306bae9b94f6be55f080aa0cdb Merge pull request #2 from kenchiang/java-update
1554d559dca6cfa4c9270ad2ab6173bcc038f165 Merge pull request #3 from kenchiang/BT-8027
94 changes: 77 additions & 17 deletions pyloxi/loxi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import select
from threading import Condition, Lock, Thread

# uncomment to log to screen
#logging.basicConfig(level=logging.DEBUG)

DEFAULT_TIMEOUT = 1

class TransactionError(Exception):
Expand Down Expand Up @@ -52,7 +55,14 @@ def run(self):
self.logger.debug("Exited event loop")

def process_read(self):
recvd = self.sock.recv(4096)
try:
recvd = self.sock.recv(4096)
except IOError as e:
if e.errno == errno.ECONNRESET:
print('Connection reset:', e.strerror)
return
else:
raise e

self.logger.debug("Received %d bytes", len(recvd))

Expand All @@ -69,25 +79,25 @@ def process_read(self):
break

# Parse the header to get type
hdr_version, hdr_type, hdr_msglen, hdr_xid = loxi.of14.message.parse_header(buf[offset:])
hdr_version, hdr_type, hdr_length, hdr_xid = loxi.of14.message.parse_header(buf[offset:])

# Use loxi to resolve ofp of matching version
ofp = loxi.protocol(hdr_version)

# Extract the raw message bytes
if (offset + hdr_msglen) > len(buf):
if (offset + hdr_length) > len(buf):
# Not enough data for the body
break
rawmsg = buf[offset : offset + hdr_msglen]
offset += hdr_msglen
rawmsg = buf[offset : offset + hdr_length]
offset += hdr_length

msg = ofp.message.parse_message(rawmsg)
if not msg:
self.logger.warn("Could not parse message")
continue

self.logger.debug("Received message %s.%s xid %d length %d",
type(msg).__module__, type(msg).__name__, hdr_xid, hdr_msglen)
type(msg).__module__, type(msg).__name__, hdr_xid, hdr_length)

with self.rx_cv:
self.rx.append(msg)
Expand Down Expand Up @@ -145,7 +155,7 @@ def send_raw(self, buf):
self.logger.debug("Sending raw message length %d", len(buf))
with self.tx_lock:
if self.sock.sendall(buf) is not None:
raise RuntimeError("failed to send message to switch")
raise RuntimeError("Failed to send message to switch")

def send(self, msg):
"""
Expand All @@ -160,7 +170,24 @@ def send(self, msg):
type(msg).__module__, type(msg).__name__, msg.xid, len(buf))
with self.tx_lock:
if self.sock.sendall(buf) is not None:
raise RuntimeError("failed to send message to switch")
raise RuntimeError("Failed to send message to switch")

def hello(self):
reply = self.recv(lambda msg: True, timeout=DEFAULT_TIMEOUT)

def request_stats_generator(self, request):
ofp = loxi.protocol(request.version)
self.send(request)
while True:
reply = self.recv(lambda msg: True, timeout=DEFAULT_TIMEOUT)
if reply is None:
raise TransactionError("No reply for %s" % type(request).__name__, None)
if not isinstance(reply, ofp.message.stats_reply):
raise TransactionError("Expected stats_reply, received %s" % type(reply).__name__, reply)
for entry in reply.entries:
yield entry
if reply.flags & ofp.const.OFPSF_REPLY_MORE == 0:
break

def transact(self, msg, timeout=DEFAULT_TIMEOUT):
"""
Expand All @@ -169,9 +196,9 @@ def transact(self, msg, timeout=DEFAULT_TIMEOUT):
self.send(msg)
reply = self.recv_xid(msg.xid, timeout)
if reply is None:
raise TransactionError("no reply for %s" % type(msg).__name__, None)
raise TransactionError("No reply for %s" % type(msg).__name__, None)
elif isinstance(reply, loxi.protocol(reply.version).message.error_msg):
raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
raise TransactionError("Received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
return reply

def transact_multipart_generator(self, msg, timeout=DEFAULT_TIMEOUT):
Expand All @@ -183,9 +210,9 @@ def transact_multipart_generator(self, msg, timeout=DEFAULT_TIMEOUT):
while not finished:
reply = self.recv_xid(msg.xid, timeout)
if reply is None:
raise TransactionError("no reply for %s" % type(msg).__name__, None)
raise TransactionError("No reply for %s" % type(msg).__name__, None)
elif not isinstance(reply, loxi.protocol(reply.version).message.stats_reply):
raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
raise TransactionError("Received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
for entry in reply.entries:
yield entry
finished = reply.flags & loxi.protocol(reply.version).OFPSF_REPLY_MORE == 0
Expand Down Expand Up @@ -231,8 +258,9 @@ def connect(ip, port=6653, daemon=True, ofp=loxi.of14):
cxn.start()

cxn.send(ofp.message.hello())
if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
raise Exception("Did not receive HELLO")
hello = cxn.recv_any()
if hello is None or hello.type != ofp.OFPT_HELLO:
cxn.logger.debug('Did not receive hello message')

return cxn

Expand All @@ -242,13 +270,45 @@ def connect_unix(path, daemon=True, ofp=loxi.of14):
"""
soc = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
soc.connect(path)
cxn = loxi.connection.Connection(soc)
cxn = Connection(soc)
cxn.daemon = daemon
cxn.logger.debug("Connected to %s", path)
cxn.start()

cxn.send(ofp.message.hello())
if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
raise Exception("Did not receive HELLO")
hello = cxn.recv_any()
if hello is None or hello.type != ofp.OFPT_HELLO:
cxn.logger.debug('Did not receive hello message')

return cxn

class ConnectionManager(object):
def __init__(self, target='localhost', port=6634, ofp=loxi.of14,
immediate=False):
self.cxn = None
self.target = target
self.port = port
self.ofp = ofp
self.immediate = immediate
def __enter__(self):
interval = 1
while self.cxn == None:
try:
self.cxn = connect(self.target, self.port, self.ofp)
except socket.error as e:
if self.immediate:
raise TransactionError('Connection: %s' % e, None)
print('%s: Retrying...' % e.strerror)
time.sleep(interval)
# exponential backoff, 16s max
interval = min(2*interval, 16)
return self
def __exit__(self, exception_type, exception_value, traceback):
if self.cxn:
self.cxn.stop()
if exception_type is not None:
if exception_type == TransactionError:
print("Received transaction error")
self.cxn.logger.debug("Received transaction error: %s",
exception_value)
return True
91 changes: 74 additions & 17 deletions pyloxi3/loxi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import select
from threading import Condition, Lock, Thread

# uncomment to log to screen
#logging.basicConfig(level=logging.DEBUG)

DEFAULT_TIMEOUT = 1

class TransactionError(Exception):
Expand Down Expand Up @@ -52,7 +55,11 @@ def run(self):
self.logger.debug("Exited event loop")

def process_read(self):
recvd = self.sock.recv(4096)
try:
recvd = self.sock.recv(4096)
except ConnectionResetError as e:
print('Connection reset:', e.strerror)
return

self.logger.debug("Received %d bytes", len(recvd))

Expand All @@ -69,25 +76,25 @@ def process_read(self):
break

# Parse the header to get type
hdr_version, hdr_type, hdr_msglen, hdr_xid = loxi.of14.message.parse_header(buf[offset:])
hdr_version, hdr_type, hdr_length, hdr_xid = loxi.of14.message.parse_header(buf[offset:])

# Use loxi to resolve ofp of matching version
ofp = loxi.protocol(hdr_version)

# Extract the raw message bytes
if (offset + hdr_msglen) > len(buf):
if (offset + hdr_length) > len(buf):
# Not enough data for the body
break
rawmsg = buf[offset : offset + hdr_msglen]
offset += hdr_msglen
rawmsg = buf[offset : offset + hdr_length]
offset += hdr_length

msg = ofp.message.parse_message(rawmsg)
if not msg:
self.logger.warn("Could not parse message")
continue

self.logger.debug("Received message %s.%s xid %d length %d",
type(msg).__module__, type(msg).__name__, hdr_xid, hdr_msglen)
type(msg).__module__, type(msg).__name__, hdr_xid, hdr_length)

with self.rx_cv:
self.rx.append(msg)
Expand Down Expand Up @@ -145,7 +152,7 @@ def send_raw(self, buf):
self.logger.debug("Sending raw message length %d", len(buf))
with self.tx_lock:
if self.sock.sendall(buf) is not None:
raise RuntimeError("failed to send message to switch")
raise RuntimeError("Failed to send message to switch")

def send(self, msg):
"""
Expand All @@ -160,7 +167,24 @@ def send(self, msg):
type(msg).__module__, type(msg).__name__, msg.xid, len(buf))
with self.tx_lock:
if self.sock.sendall(buf) is not None:
raise RuntimeError("failed to send message to switch")
raise RuntimeError("Failed to send message to switch")

def hello(self):
reply = self.recv(lambda msg: True, timeout=DEFAULT_TIMEOUT)

def request_stats_generator(self, request):
ofp = loxi.protocol(request.version)
self.send(request)
while True:
reply = self.recv(lambda msg: True, timeout=DEFAULT_TIMEOUT)
if reply is None:
raise TransactionError("No reply for %s" % type(request).__name__, None)
if not isinstance(reply, ofp.message.stats_reply):
raise TransactionError("Expected stats_reply, received %s" % type(reply).__name__, reply)
for entry in reply.entries:
yield entry
if reply.flags & ofp.const.OFPSF_REPLY_MORE == 0:
break

def transact(self, msg, timeout=DEFAULT_TIMEOUT):
"""
Expand All @@ -169,9 +193,9 @@ def transact(self, msg, timeout=DEFAULT_TIMEOUT):
self.send(msg)
reply = self.recv_xid(msg.xid, timeout)
if reply is None:
raise TransactionError("no reply for %s" % type(msg).__name__, None)
raise TransactionError("No reply for %s" % type(msg).__name__, None)
elif isinstance(reply, loxi.protocol(reply.version).message.error_msg):
raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
raise TransactionError("Received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
return reply

def transact_multipart_generator(self, msg, timeout=DEFAULT_TIMEOUT):
Expand All @@ -183,9 +207,9 @@ def transact_multipart_generator(self, msg, timeout=DEFAULT_TIMEOUT):
while not finished:
reply = self.recv_xid(msg.xid, timeout)
if reply is None:
raise TransactionError("no reply for %s" % type(msg).__name__, None)
raise TransactionError("No reply for %s" % type(msg).__name__, None)
elif not isinstance(reply, loxi.protocol(reply.version).message.stats_reply):
raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
raise TransactionError("Received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
for entry in reply.entries:
yield entry
finished = reply.flags & loxi.protocol(reply.version).OFPSF_REPLY_MORE == 0
Expand Down Expand Up @@ -231,8 +255,9 @@ def connect(ip, port=6653, daemon=True, ofp=loxi.of14):
cxn.start()

cxn.send(ofp.message.hello())
if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
raise Exception("Did not receive HELLO")
hello = cxn.recv_any()
if hello is None or hello.type != ofp.OFPT_HELLO:
cxn.logger.debug('Did not receive hello message')

return cxn

Expand All @@ -242,13 +267,45 @@ def connect_unix(path, daemon=True, ofp=loxi.of14):
"""
soc = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
soc.connect(path)
cxn = loxi.connection.Connection(soc)
cxn = Connection(soc)
cxn.daemon = daemon
cxn.logger.debug("Connected to %s", path)
cxn.start()

cxn.send(ofp.message.hello())
if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
raise Exception("Did not receive HELLO")
hello = cxn.recv_any()
if hello is None or hello.type != ofp.OFPT_HELLO:
cxn.logger.debug('Did not receive hello message')

return cxn

class ConnectionManager(object):
def __init__(self, target='localhost', port=6634, ofp=loxi.of14,
immediate=False):
self.cxn = None
self.target = target
self.port = port
self.ofp = ofp
self.immediate = immediate
def __enter__(self):
interval = 1
while self.cxn == None:
try:
self.cxn = connect(self.target, self.port, self.ofp)
except socket.error as e:
if self.immediate:
raise TransactionError('Connection: %s' % e, None)
print('%s: Retrying...' % e.strerror)
time.sleep(interval)
# exponential backoff, 16s max
interval = min(2*interval, 16)
return self
def __exit__(self, exception_type, exception_value, traceback):
if self.cxn:
self.cxn.stop()
if exception_type is not None:
if exception_type == TransactionError:
print("Received transaction error")
self.cxn.logger.debug("Received transaction error: %s",
exception_value)
return True

0 comments on commit f90c6e6

Please sign in to comment.