Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate logging to application/features/. #38

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
28 changes: 24 additions & 4 deletions application/features/Audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
from .Connection import Connection
from .. import app
from ..utils import find_free_port, get_headers_dict_from_str, local_auth
import logging.config

logger = logging.getLogger(__name__)

AUDIO_CONNECTIONS = {}

Expand All @@ -57,13 +60,16 @@ def __del__(self):
super().__del__()

def connect(self, *args, **kwargs):
logger.debug("Audio: Establishing Audio connection")
return super().connect(*args, **kwargs)

def launch_audio(self):
try:
logger.debug("Audio: Launching Audio connection. Forwarding request to 127.0.0.1, port 0.")
self.transport = self.client.get_transport()
self.remote_port = self.transport.request_port_forward('127.0.0.1', 0)
except Exception as e:
logger.warning("Audio: exception raised during launch audio: {}".format(e))
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
return False, str(e)

self.id = uuid.uuid4().hex
Expand All @@ -83,11 +89,12 @@ def handleConnected(self):
headers = get_headers_dict_from_str(headers)
if not local_auth(headers=headers, abort_func=self.close):
# local auth failure
logger.warning("AudioWebSocket: Local Authentication Failure")
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
return

audio_id = self.request.path[1:]
if audio_id not in AUDIO_CONNECTIONS:
print(f'AudioWebSocket: Requested audio_id={audio_id} does not exist.')
logger.warning(f'AudioWebSocket: Requested audio_id={audio_id} does not exist.')
self.close()
return

Expand All @@ -103,26 +110,31 @@ def handleConnected(self):
f'module-null-sink sink_name={sink_name} '
exit_status, _, stdout, _ = self.audio.exec_command_blocking(load_module_command)
if exit_status != 0:
print(f'AudioWebSocket: audio_id={audio_id}: unable to load pactl module-null-sink sink_name={sink_name}')
logger.warning(f'AudioWebSocket: audio_id={audio_id}: unable to load pactl module-null-sink sink_name={sink_name}')
return
load_module_stdout_lines = stdout.readlines()
logger.debug("AudioWebSocket: Load Module: {}".format(load_module_stdout_lines))
self.module_id = int(load_module_stdout_lines[0])

keep_launching_ffmpeg = True

def ffmpeg_launcher():
logger.debug("AudioWebSocket: ffmpeg_launcher thread started")
# TODO: support requesting audio format from the client
launch_ffmpeg_command = f'killall ffmpeg; ffmpeg -f pulse -i "{sink_name}.monitor" ' \
f'-ac 2 -acodec pcm_s16le -ar 44100 -f s16le "tcp://127.0.0.1:{self.audio.remote_port}"'
# keep launching if the connection is not accepted in the writer() below
while keep_launching_ffmpeg:
logger.debug(f"AudioWebSocket: Launch ffmpeg: {launch_ffmpeg_command}")
_, ffmpeg_stdout, _ = self.audio.client.exec_command(launch_ffmpeg_command)
ffmpeg_stdout.channel.recv_exit_status()
# if `ffmpeg` launches successfully, `ffmpeg_stdout.channel.recv_exit_status` should not return
logger.debug("AudioWebSocket: ffmpeg_launcher thread ended")
junhaoliao marked this conversation as resolved.
Show resolved Hide resolved

ffmpeg_launcher_thread = threading.Thread(target=ffmpeg_launcher)

def writer():
logger.debug("AudioWebSocket: writer thread started")
channel = self.audio.transport.accept(FFMPEG_LOAD_TIME * TRY_FFMPEG_MAX_COUNT)

nonlocal keep_launching_ffmpeg
Expand All @@ -138,14 +150,17 @@ def writer():
while True:
data = channel.recv(AUDIO_BUFFER_SIZE)
if not data:
logger.debug("AudioWebSocket: Close audio socket connection")
self.close()
break
buffer += data
if len(buffer) >= AUDIO_BUFFER_SIZE:
compressed = zlib.compress(buffer, level=4)
logger.debug(f"AudioWebSocket: Send compressed message of size {len(compressed)}")
self.sendMessage(compressed)
# print(len(compressed) / len(buffer) * 100)
buffer = b''
logger.debug("AudioWebSocket: write thread ended")

writer_thread = threading.Thread(target=writer)

Expand All @@ -155,8 +170,10 @@ def writer():
def handleClose(self):
if self.module_id is not None:
# unload the module before leaving
logger.debug(f"AudioWebSocket: Unload module {self.module_id}")
self.audio.client.exec_command(f'pactl unload-module {self.module_id}')

logger.debug(f"AudioWebSocket: End audio socket {self.audio.id} connection")
del AUDIO_CONNECTIONS[self.audio.id]
del self.audio

Expand All @@ -166,18 +183,21 @@ def handleClose(self):
# if we are in debug mode, run the server in the second round
if not app.debug or os.environ.get("WERKZEUG_RUN_MAIN") == "true":
AUDIO_PORT = find_free_port()
print("AUDIO_PORT =", AUDIO_PORT)
# print("AUDIO_PORT =", AUDIO_PORT)
logger.debug("Audio: Audio port {}".format(AUDIO_PORT))

if os.environ.get('SSL_CERT_PATH') is None:
logger.debug("Audio: SSL Certification Path not set. Generating self-signing certificate")
# no certificate provided, generate self-signing certificate
audio_server = SimpleSSLWebSocketServer('127.0.0.1', AUDIO_PORT, AudioWebSocket,
ssl_context=generate_adhoc_ssl_context())
else:
logger.debug("Audio: SSL Certification Path exists")
import ssl

audio_server = SimpleSSLWebSocketServer('0.0.0.0', AUDIO_PORT, AudioWebSocket,
certfile=os.environ.get('SSL_CERT_PATH'),
keyfile=os.environ.get('SSL_KEY_PATH'),
version=ssl.PROTOCOL_TLS)

threading.Thread(target=audio_server.serveforever, daemon=True).start()
threading.Thread(target=audio_server.serveforever, daemon=True).start()
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
34 changes: 33 additions & 1 deletion application/features/Connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@
import paramiko
import select

import logging.config

logger = logging.getLogger(__name__)

class ForwardServerHandler(socketserver.BaseRequestHandler):
def handle(self):
junhaoliao marked this conversation as resolved.
Show resolved Hide resolved
self.server: ForwardServer
try:
logger.debug("Connection: Open forward server channel")
chan = self.server.ssh_transport.open_channel(
"direct-tcpip",
("127.0.0.1", self.server.chain_port),
Expand All @@ -49,6 +53,14 @@ def handle(self):
("127.0.0.1", self.server.chain_port),
)
)
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(
"Connected! Tunnel open %r -> %r -> %r"
% (
self.request.getpeername(),
chan.getpeername(),
("127.0.0.1", self.server.chain_port),
)
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
)
IreneLime marked this conversation as resolved.
Show resolved Hide resolved

try:
while True:
Expand All @@ -67,6 +79,7 @@ def handle(self):
print(e)
IreneLime marked this conversation as resolved.
Show resolved Hide resolved

try:
logger.debug("Connection: Close forward server channel")
chan.close()
self.server.shutdown()
except Exception as e:
Expand Down Expand Up @@ -102,6 +115,9 @@ def __del__(self):
def _client_connect(self, client: paramiko.SSHClient,
host, username,
password=None, key_filename=None, private_key_str=None):
if self._jump_channel != None:
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
logger.debug("Connection: Connection initialized through Jump Channel")
logger.debug(f"Connection: Connecting to {username}@{host}")
junhaoliao marked this conversation as resolved.
Show resolved Hide resolved
if password is not None:
client.connect(host, username=username, password=password, timeout=15, sock=self._jump_channel)
elif key_filename is not None:
Expand All @@ -128,13 +144,16 @@ def _init_jump_channel(self, host, username, **auth_methods):

self._jump_client = paramiko.SSHClient()
self._jump_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
logger.debug(f"Connection: Initialize Jump Client for connection to {username}@remote.ecf.utoronto.ca")
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
self._client_connect(self._jump_client, 'remote.ecf.utoronto.ca', username, **auth_methods)
logger.debug(f"Connection: Open Jump channel connection to {host} at port 22")
junhaoliao marked this conversation as resolved.
Show resolved Hide resolved
self._jump_channel = self._jump_client.get_transport().open_channel('direct-tcpip',
(host, 22),
('127.0.0.1', 22))

def connect(self, host: str, username: str, **auth_methods):
junhaoliao marked this conversation as resolved.
Show resolved Hide resolved
try:
logger.debug(f"Connection: Connection attempt to {username}@{host}")
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
self._init_jump_channel(host, username, **auth_methods)
self._client_connect(self.client, host, username, **auth_methods)
except Exception as e:
Expand All @@ -145,6 +164,7 @@ def connect(self, host: str, username: str, **auth_methods):
self.host = host
self.username = username

logger.debug(f"Connection: Successfully connected to {username}@{host}")
junhaoliao marked this conversation as resolved.
Show resolved Hide resolved
return True, ''

@staticmethod
Expand All @@ -160,9 +180,11 @@ def ssh_keygen(key_filename=None, key_file_obj=None, public_key_comment=''):

# save the private key
if key_filename is not None:
logger.debug(f"Connection: RSA SSH private key written to {key_filename}")
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
rsa_key.write_private_key_file(key_filename)
elif key_file_obj is not None:
rsa_key.write_private_key(key_file_obj)
logger.debug(f"Connection: RSA SSH private key written to {key_file_obj}")
junhaoliao marked this conversation as resolved.
Show resolved Hide resolved
else:
raise ValueError('Neither key_filename nor key_file_obj is provided.')

Expand Down Expand Up @@ -192,6 +214,7 @@ def save_keys(self, key_filename=None, key_file_obj=None, public_key_comment='')
"mkdir -p ~/.ssh && chmod 700 ~/.ssh && echo '%s' >> ~/.ssh/authorized_keys" % pub_key)
if exit_status != 0:
return False, "Connection::save_keys: unable to save public key; Check for disk quota and permissions with any conventional SSH clients. "
logger.debug("Connection: Public ssh key saved to remove server ~/.ssh/authorized_keys")

return True, ""

Expand All @@ -217,22 +240,28 @@ def exec_command_blocking_large(self, command):
return '\n'.join(stdout) + '\n' + '\n'.join(stderr)

def _port_forward_thread(self, local_port, remote_port):
logger.debug("Connection: Port forward thread started")
forward_server = ForwardServer(("", local_port), ForwardServerHandler)

forward_server.ssh_transport = self.client.get_transport()
forward_server.chain_port = remote_port

forward_server.serve_forever()
forward_server.server_close()
logger.debug("Connection: Port forward thread ended")

def port_forward(self, *args):
forwarding_thread = threading.Thread(target=self._port_forward_thread, args=args)
forwarding_thread.start()

def is_eecg(self):
if 'eecg' in self.host:
logger.debug("Connection: Target host is eecg")
return 'eecg' in self.host
IreneLime marked this conversation as resolved.
Show resolved Hide resolved

def is_ecf(self):
if 'ecf' in self.host:
logger.debug("Connection: Target host is ecf")
return 'ecf' in self.host
IreneLime marked this conversation as resolved.
Show resolved Hide resolved

def is_uoft(self):
Expand All @@ -256,6 +285,9 @@ def is_load_high(self):

my_pts_count = len(output) - 1 # -1: excluding the `uptime` output

logger.debug(f"Connection: pts count: {pts_count}; my pts count: {my_pts_count}")
logger.debug(f"Connection: load sum: {load_sum}")
junhaoliao marked this conversation as resolved.
Show resolved Hide resolved

if pts_count > my_pts_count: # there are more terminals than mine
return True
elif load_sum > 1.0:
Expand All @@ -265,4 +297,4 @@ def is_load_high(self):
# it is considered a high load
return True

return False
return False
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
19 changes: 17 additions & 2 deletions application/features/SFTP.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
from paramiko.sftp_client import SFTPClient

from .Connection import Connection
import logging.config
IreneLime marked this conversation as resolved.
Show resolved Hide resolved

logger = logging.getLogger(__name__)

class SFTP(Connection):
def __init__(self):
Expand All @@ -41,11 +43,13 @@ def __del__(self):
super().__del__()

def connect(self, *args, **kwargs):
logger.debug("SFTP: Establishing SFTP connection")
status, reason = super().connect(*args, **kwargs)
if not status:
return status, reason

try:
logger.debug("SFTP: Open SFTP client connection")
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
self.sftp = self.client.open_sftp()
self.sftp.chdir(".")
except Exception as e:
Expand All @@ -59,6 +63,7 @@ def ls(self, path=""):
self.sftp.chdir(path)
cwd = self.sftp.getcwd()
attrs = self.sftp.listdir_attr(cwd)
logger.debug(f"SFTP: ls {cwd}: {attrs}")
IreneLime marked this conversation as resolved.
Show resolved Hide resolved

file_list = []
# TODO: should support uid and gid later
Expand Down Expand Up @@ -100,9 +105,11 @@ def _zip_dir_recurse(self, z, parent, file):
mode = self.sftp.stat(fullpath).st_mode
if stat.S_ISREG(mode):
# print(fullpath, 'is file')
logger.debug(f"SFTP: {fullpath} is a file")
z.write_iter(fullpath, self.dl_generator(fullpath))
elif stat.S_ISDIR(mode):
# print(fullpath, 'is dir')
logger.debug(f"SFTP: {fullpath} is a directory")
junhaoliao marked this conversation as resolved.
Show resolved Hide resolved
# TODO: support writing an empty directory if len(dir_ls)==0
# That will involve modifying the zipstream library
dir_ls = self.sftp.listdir(fullpath)
Expand All @@ -116,10 +123,12 @@ def _zip_dir_recurse(self, z, parent, file):
return

def zip_generator(self, cwd, file_list):
logger.debug(f"SFTP: zip_generator on directory: {cwd}")
self.sftp.chdir(cwd)
z = zipstream.ZipFile(compression=zipstream.ZIP_DEFLATED, allowZip64=True)

for file in file_list:
logger.debug(f"SFTP: zip_generator on file: {file}")
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
self._zip_dir_recurse(z, '', file)

return z
Expand All @@ -128,6 +137,7 @@ def rename(self, cwd, old, new):
try:
self.sftp.chdir(cwd)
self.sftp.rename(old, new)
logger.debug(f"SFTP: Rename {old} in directory {cwd} to {new}")
junhaoliao marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
return False, repr(e)

Expand All @@ -136,9 +146,11 @@ def rename(self, cwd, old, new):
def chmod(self, path, mode, recursive):
_, _, _, stderr = self.exec_command_blocking(
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
f'chmod {"-R" if recursive else ""} {"{0:0{1}o}".format(mode, 3)} "{path}"')
logger.debug("SFTP: Change permission on " + path + " to '{0:0{1}o}'".format(mode, 3))
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
stderr_lines = stderr.readlines()
if len(stderr_lines) != 0:
print(stderr_lines)
logger.warning(f"SFTP: chmod failed due to {stderr_lines}")
# print(stderr_lines)
Copy link

@coderabbitai coderabbitai bot Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM: Improved logging for chmod operations

The addition of debug and warning log messages in the chmod method enhances the observability of permission changes and potential failures.

  1. To address the G003 and G004 warnings from the static analysis tool, consider using the logger's built-in string formatting:
logger.debug("SFTP: Change permission on %s to '%s'", path, "{0:0{1}o}".format(mode, 3))
logger.warning("SFTP: chmod failed due to %s", stderr_lines)
  1. Remove the commented-out print statement on line 153 as it's no longer needed:
- # print(stderr_lines)

These changes will improve code cleanliness and logging efficiency.

🧰 Tools
🪛 Ruff

149-149: Logging statement uses +

(G003)


152-152: Logging statement uses f-string

(G004)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. f-strings are preferred.
  2. @IreneLime Please remove the commented out print statement.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @junhaoliao, I am just wondering if I should replace it with f-string or string formatting in this case as you've mentioned f-strings are preferred in the previous comment but string formatting avoids representation evaluation.
Original Code:
logger.debug("SFTP: Change permission on " + path + " to '{0:0{1}o}'".format(mode, 3))

String formatting:
logger.debug("SFTP: Change permission on %s to '%s'", path, "{0:0{1}o}".format(mode, 3))

f-string:
logger.debug(f"SFTP: Change permission on {path} to '{mode:03o}'")

return False, 'Some files were not applied with the request mode due to permission issues.'

return True, ''
Expand All @@ -159,6 +171,7 @@ def rm(self, cwd, file_list):

counter += 1
if counter == 50:
logger.debug(f"SFTP: Execute Command {' '.join(cmd_list)}")
_, _, stderr = self.client.exec_command(" ".join(cmd_list))
stderr_lines = stderr.readlines()
if len(stderr_lines) != 0:
Expand All @@ -169,6 +182,7 @@ def rm(self, cwd, file_list):
counter = 0
cmd_list = [f'cd "{cwd}" && rm -rf']

logger.debug(f"SFTP: Execute Command {' '.join(cmd_list)}")
_, _, stderr = self.client.exec_command(" ".join(cmd_list))
stderr_lines = stderr.readlines()
if len(stderr_lines) != 0:
Expand All @@ -180,8 +194,9 @@ def rm(self, cwd, file_list):
return True, ''

def mkdir(self, cwd, name):
logger.debug(f"SFTP: Make directory {name} at {cwd}")
_, _, _, stderr = self.exec_command_blocking(f'cd "{cwd}"&& mkdir "{name}"')
stderr_lines = stderr.readlines()
if len(stderr_lines) != 0:
return False, stderr_lines[0]
return True, ''
return True, ''
IreneLime marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading