Skip to content

Commit

Permalink
1. Improved logging system.
Browse files Browse the repository at this point in the history
2. A more friendly scanning experience (Halo).
3. Used `CompanyIdentfiers.csv` to parse the manufacturer name in
   HCI_Read_Remote_Version_Information_Complete.
  • Loading branch information
x committed Sep 14, 2021
1 parent 01df94f commit 8773455
Show file tree
Hide file tree
Showing 13 changed files with 2,818 additions and 64 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ bthci>=0.0.19
btatt>=0.0.2
btgatt>=0.0.2
btsmp>=0.0.2
pyclui>=0.0.8
pyclui>=0.0.10
scapy>=2.4.5
docopt>=0.6.2
pybluez>=0.23
Expand Down
20 changes: 10 additions & 10 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3

import sys
import os
import sys
import shutil
import logging
from pathlib import Path
Expand All @@ -13,10 +13,10 @@

logger = Logger(__name__, logging.INFO)

PROJECT_ROOT = os.path.abspath(Path(__file__).parent)
PROJECT_ROOT = Path(__file__).parent
PROJECT_NAME = 'bluescan'

sys.path.insert(0, PROJECT_ROOT+'/src')
sys.path.insert(0, str(PROJECT_ROOT/'src'))
from bluescan import VERSION


Expand All @@ -37,11 +37,11 @@ class MyClean(clean):
def run(self):
super().run()
dirs = [
os.path.join(PROJECT_ROOT, 'build', 'bdist.linux-x86_64'), # 不直接删除 build 目录,因为 yotta 也会使用该目录。
os.path.join(PROJECT_ROOT, 'build', 'lib'),
os.path.join(PROJECT_ROOT, 'dist'),
os.path.join(PROJECT_ROOT, 'src', PROJECT_NAME+'.egg-info'),
os.path.join(PROJECT_ROOT, 'src', PROJECT_NAME, '__pycache__')
PROJECT_ROOT/'build'/'bdist.linux-x86_64', # 不直接删除 build 目录,因为 yotta 也会使用该目录。
PROJECT_ROOT/'build'/'lib',
PROJECT_ROOT/'dist',
PROJECT_ROOT/'src'/(PROJECT_NAME+'.egg-info'),
PROJECT_ROOT/'src'/PROJECT_NAME/'__pycache__'
]

for d in dirs:
Expand All @@ -61,13 +61,13 @@ def run(self):
]
},
package_data={
"bluescan": ["res/*.txt"]
"bluescan": ['res/*.txt', 'res/*.csv']
},
#scripts=['src/bluescan/bluescan.py'],

install_requires=[
'bthci>=0.0.19', 'btatt>=0.0.2', 'btgatt>=0.0.2', 'btsmp>=0.0.2',
'pyclui>=0.0.8', 'scapy>=2.4.5', 'docopt>=0.6.2', 'pybluez>=0.23',
'pyclui>=0.0.10', 'scapy>=2.4.5', 'docopt>=0.6.2', 'pybluez>=0.23',
'bluepy>=1.3.0', 'pyserial>=3.5', 'dbus-python>=1.2.16', 'PyGObject>=3.40.1',
],
python_requires='>=3.9',
Expand Down
6 changes: 4 additions & 2 deletions src/bluescan/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

logger = Logger(__name__, logging.INFO)

VERSION = '0.6.5'
VERSION = '0.6.6'
PKG_ROOT = Path(__file__).parent

LE_DEVS_SCAN_RESULT_CACHE = PKG_ROOT/'res'/'le_devs_scan_result.cache'

# https://www.bluetooth.com/specifications/assigned-numbers/service-discovery/
# Table 2: Service Class Profile Identifiers
#
Expand Down Expand Up @@ -61,7 +63,7 @@ def __init__(self, iface='hci0'):
try:
self.hci_bdaddr = HCI(iface).read_bdaddr()['BD_ADDR'].upper()
except Exception as e:
logger.error("{}".format(e))
logger.error(str(e))
exit(1)


Expand Down
17 changes: 9 additions & 8 deletions src/bluescan/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from bluetooth.btcommon import BluetoothError

from . import BlueScanner
from .ui import parse_cmdline
from .ui import parse_cmdline, INDENT
from .helper import find_rfkill_devid, get_microbit_devpaths
from .br_scan import BRScanner
from .le_scan import LeScanner
Expand Down Expand Up @@ -170,22 +170,23 @@ def main():
scan_result.print()
scan_result.store()
except AttributeError as e:
logger.debug("{}".format(e))
logger.debug(str(e))
except ValueError as e:
logger.error("{}".format(e))
logger.error(str(e))
exit(1)
except BluetoothError as e:
logger.error('{}'.format(e))
logger.error(str(e))
except RuntimeError as e:
logger.error('{}'.format(e))
logger.error(str(e))
except (BTLEException, ValueError) as e:
logger.error('{}'.format(e))
if 'le on' in str(e):
print(" No BLE adapter? or missing sudo ?")
logger.error(str(e) +
("\nNo BLE adapter or missing sudo?" if 'le on' in str(e)
else ""))
except KeyboardInterrupt:
if args != None and args['-i'] != None:
output = subprocess.check_output(' '.join(['hciconfig', args['-i'], 'reset']),
stderr=STDOUT, timeout=60, shell=True)
print()
logger.info("Canceled\n")


Expand Down
4 changes: 2 additions & 2 deletions src/bluescan/br_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
COMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, COMPLETE_LOCAL_NAME, \
SHORTENED_LOCAL_NAME, TX_POWER_LEVEL

from .lmp import lmp_vers
from .lmp import lmp_vers, company_identfiers
from .lmp import pp_lmp_features, pp_ext_lmp_features


Expand Down Expand Up @@ -151,7 +151,7 @@ def scan_lmp_feature(self, paddr):
print(' Version:')
print(' '*8+lmp_vers[event_params['Version']], '(LMP)')
print(' '*8+ll_vers[event_params['Version']], '(LL)')
print(' Manufacturer name:', event_params['Manufacturer_Name'])
print(' Manufacturer name:', green(company_identfiers[event_params['Manufacturer_Name']]))
print(' Subversion:', event_params['Subversion'], '\n')

event_params = hci.read_remote_supported_features({
Expand Down
42 changes: 20 additions & 22 deletions src/bluescan/gatt_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
import pkg_resources
from bluepy.btle import Peripheral # TODO: Get out of bluepy
from bluepy.btle import BTLEException, BTLEDisconnectError
from pyclui import green, blue, yellow, red, \
INFO_INDENT, ERROR_INDENT, WARNING_INDENT
from pyclui import green, blue, yellow, red
from pyclui import Logger

import dbus
Expand Down Expand Up @@ -291,13 +290,13 @@ def __init__(self, iface: str = 'hci0', ioc: str = 'NoInputNoOutput'):
self.agent_registered = False

def register_agent_callback():
logger.info('Agent object registered')
print(INFO_INDENT, "IO capability: {}".format(self.bluescan_agent.io_capability), sep='')
logger.info('Agent object registered\n'
"IO capability: {}".format(self.bluescan_agent.io_capability))
self.agent_registered = True
def register_agent_error_callback(error):
logger.error("Failed to register agent object")
print(ERROR_INDENT, "{}".format(error), sep='')
print(ERROR_INDENT, "IO capability: {}".format(self.bluescan_agent.io_capability), sep='')
logger.error("Failed to register agent object.\n"
"{}\n"
"IO capability: {}".format(error, self.bluescan_agent.io_capability))
self.agent_registered = False
self.bluescan_agent = BluescanAgent(sys_bus, 0, ioc)
self.agent_mgr_1_iface = dbus.Interface(
Expand Down Expand Up @@ -334,8 +333,8 @@ def run_mainloop():
try:
target = Peripheral(addr, iface=self.devid, addrType=self.result.addr_type)
except BTLEDisconnectError as e:
logger.error("BTLEDisconnectError")
print(ERROR_INDENT, e, sep='')
logger.error("BTLEDisconnectError\n" +
str(e))
return self.result

# 这里 bluepy 只会发送 ATT_READ_BY_GROUP_TYPE_REQ 获取 primary service。
Expand All @@ -354,8 +353,7 @@ def run_mainloop():
try:
characteristics = service.getCharacteristics()
except BTLEException as e:
logger.warning("BTLEException")
print(WARNING_INDENT, e, sep='')
logger.warning("BTLEException\n" + str(e))
# continue

for characteristic in characteristics:
Expand All @@ -368,13 +366,13 @@ def run_mainloop():
if characteristic.supportsRead():
value = characteristic.read()
except BTLEDisconnectError as e:
logger.warning("GattScanner.scan(), BTLEDisconnectError: {}".format(e))
print(WARNING_INDENT + "Read characteristic value {} failed".format(characteristic.uuid))
print(WARNING_INDENT + yellow("The Scan results may be incomplete."))
logger.warning("GattScanner.scan(), BTLEDisconnectError: {}\n"
"Read characteristic value {} failed\n".format(e, characteristic.uuid) +
yellow("The Scan results may be incomplete."))
return self.result # TODO: 直接返回可能会浪费很多数据,打包这些数据再返回。
except BTLEException as e:
logger.warning("GattScanner.scan(), BTLEException: {}".format(e))
print(WARNING_INDENT + "Read characteristic value {} failed".format(characteristic.uuid))
logger.warning("GattScanner.scan(), BTLEException: {}\n"
"Read characteristic value {} failed".format(e, characteristic.uuid))

charac_declar_value = {
'Properties' : characteristic.propertiesToString(), # TODO: get properties binary data
Expand Down Expand Up @@ -405,11 +403,11 @@ def run_mainloop():
try:
descriptor_declar.value = descriptor.read()
except BTLEException as e:
logger.warning("GattScanner.scan(), BTLEException: {}".format(e))
print(WARNING_INDENT + "Read descriptor {} failed".format(descriptor.uuid))
logger.warning("GattScanner.scan(), BTLEException: {}\n"
"Read descriptor {} failed".format(e, descriptor.uuid))
except BrokenPipeError as e:
logger.warning("GattScanner.scan(), BrokenPipeError: {}".format(e))
print(WARNING_INDENT + "Read descriptor {} failed".format(descriptor.uuid))
logger.warning("GattScanner.scan(), BrokenPipeError: {}"
"Read descriptor {} failed".format(e, descriptor.uuid))

# Set remote device untursted
output = subprocess.check_output(' '.join(['bluetoothctl', 'untrust',
Expand All @@ -430,8 +428,8 @@ def run_mainloop():
# ' '.join(['sudo', 'systemctl', 'start', 'bluetooth.service']),
# stderr=STDOUT, timeout=60, shell=True)
except BTLEDisconnectError as e:
logger.warning("GattScanner.scan(), BTLEDisconnectError: {}".format(e))
print(WARNING_INDENT + yellow("The Scan results may be incomplete."))
logger.warning("GattScanner.scan(), BTLEDisconnectError: {}\n".format(e) +
yellow("The Scan results may be incomplete."))
finally:
if self.agent_registered:
self.agent_mgr_1_iface.UnregisterAgent(
Expand Down
24 changes: 24 additions & 0 deletions src/bluescan/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,41 @@
import logging
import re
import sys
# import pickle
import subprocess

from serial.tools.list_ports import comports

from pyclui import blue, green, yellow, red
from pyclui import Logger

# from . import LE_DEVS_SCAN_RESULT_CACHE


logger = Logger(__name__, logging.INFO)


# def determine_addr_type(self):
# """For user not provide the remote LE address type."""
# from .le_scan import LeScanner
# try:
# with open(LE_DEVS_SCAN_RESULT_CACHE, 'rb') as le_devs_scan_result_cache:
# le_devs_scan_result = pickle.load(le_devs_scan_result_cache)
# for dev_info in le_devs_scan_result.devices_info:
# if self.result.addr == dev_info.addr:
# logger.debug("determine_addr_type, {} {}".format(self.result.addr, dev_info.addr_type))
# return dev_info.addr_type
# except FileNotFoundError as e:
# logger.debug("No {} found".format(LE_DEVS_SCAN_RESULT_CACHE))

# le_devs_scan_result = LeScanner(self.iface).scan_devs()
# for dev_info in le_devs_scan_result.devices_info:
# if self.result.addr == dev_info.addr:
# return dev_info.addr_type

# raise RuntimeError("Couldn't determine the LE address type. Please provide it explicitly.")


def valid_bdaddr(addr:str) -> bool:
regexp = r'(?:[\da-fA-F]{2}:){5}[\da-fA-F]{2}'
result = re.findall(regexp, addr)
Expand Down
Loading

0 comments on commit 8773455

Please sign in to comment.