Python でシリアルポートを連結
~ Preparing for sniffing at serial port(s) ~
2022-05-05 作成 福島
TOP > tips > python-comleak

高機能なイーサネットスイッチには、シャドウポート (ポートミラーリングとも) と呼ばれる機能があります。
これは、トラフィックを横流しして無駄なデータや不正なデータを分析・検知するものです。
ここでは、シリアルポートに同様の機能を持たせます。


1. シリアルモジュールのインストールとプログラム作成

1-1. シリアルモジュールをインストールする。
 >_ Windows PowerShell 
Windows PowerShell
Copyright (C) Microsoft Corporation. All rights reserved.

新しいクロスプラットフォームの PowerShell をお試しください https://aka.ms/pscore6

PS C:\Users\who> pip3 install pyserial 
PS C:\Users\who> exit 
1-2. プログラムを作成する。
comleak.py
#!/usr/bin/python3
# -*- coding: utf8 -*-
# comleak.py v1.0 written by fuku@rouge.gr.jp
#
# Usage: comleak.py SerialDevice1 SerialDevice2 [..]
#   ex(Windows): python3 comleak.py COM4 COM10 COM12
#   ex(Linux): comleak.py /dev/ttyS3 /dev/ttyS9 /dev/ttyS11
#   ひとつのシリアルポートに出力されたデータを他のシリアルポートにも流す。
#

import sys, time
import serial

argv = sys.argv     # コマンドライン引数を取得する。

# シリアルデバイス名のリストを作成する。
deviceList = []
for s in argv[1:]:
  if not s in deviceList:
    deviceList.append(s)
deviceCnt = len(deviceList)

# 下記の場合は中断する。
# ・デバイスの数が不足している
# ・デバイス名に重複がある
if deviceCnt < 2 or deviceCnt != len(argv) - 1:
  print('Usage: comleak.py <Ser0> <Ser1> [..]')
  exit()

# 指定されたシリアルポートをすべて開く。
ser = {}
for s in deviceList:
  try:
    port = serial.Serial(port=s, baudrate=115200, write_timeout=0.05)
    ser[port] = b''
  except:
    print("Error: %s can't open." % s)
    exit()

ports = ser.keys()  # 開いたシリアルデバイスをリスト化する。

try:    # ^C をキャッチするため try にする。
  while True:
    # 全シリアルデバイスを読み込む。(データが到来していれば)
    for p in ports:
      ser[p] = p.read_all()

    # 到来したデータを全デバイスに出力する。(自デバイスを除く)
    for p in ports:
      data = ser[p]
      if data != b'':
        for q in ports:
          if q != p:
            try:
              q.write(data)
            except serial.serialutil.SerialTimeoutException:
              pass  # 読み込む準備が出来ていないデバイスはスキップ*する。

    time.sleep(0.1)   # 100ms 待つ。
except KeyboardInterrupt:
  pass

for p in ports:
  p.close()
* 読み込む準備が出来てないシリアルポートがある場合、書き込みのタイムアウトを待つため動作が遅くなる。


2. プログラムの実行と動作確認

2-1. com0com を設定する。
仮想ペアシリアルポート名
Virtual Port Pair 0COM10 ←結合→ COM11
Virtual Port Pair 1COM20 ←結合→ COM21
Virtual Port Pair 2COM30 ←結合→ COM31
use Ports class はすべてチェックを入れる。
(チェックが外れたままだと正常に動作しない)

2-2. プログラムを実行する。
comleak.py を実行し、COM10, COM20, COM30 に流れるデータを共有する。
 >_ Windows PowerShell 
Windows PowerShell
Copyright (C) Microsoft Corporation. All rights reserved.

新しいクロスプラットフォームの PowerShell をお試しください https://aka.ms/pscore6

PS C:\Users\who> python3 .\comleak.py COM10 COM20 COM30 
プログラムを終了する場合は ^C をタイプして中断する。
2-3. 動作確認をする。
下記 操作画面 A, B, C を同時に表示し、
操作画面 A から送信したデータが 操作画面 B, C で受信できることを確認する。

· 操作画面 A (COM11 からデータを送信する)
 >_ Windows PowerShell 
Windows PowerShell
Copyright (C) Microsoft Corporation. All rights reserved.

新しいクロスプラットフォームの PowerShell をお試しください https://aka.ms/pscore6

PS C:\Users\who> $com = New-Object System.IO.Ports.SerialPort COM11 
PS C:\Users\who> $com.open() 
PS C:\Users\who> $com.writeline("Hello") 
PS C:\Users\who> $com.close() 

· 操作画面 B (COM21 でデータを受信する)
 >_ Windows PowerShell 
Windows PowerShell
Copyright (C) Microsoft Corporation. All rights reserved.

新しいクロスプラットフォームの PowerShell をお試しください https://aka.ms/pscore6

PS C:\Users\who> $com = New-Object System.IO.Ports.SerialPort COM21 
PS C:\Users\who> $com.open() 
PS C:\Users\who> $com.readline() 
Hello       ← A から送信されたデータ
PS C:\Users\who> $com.close() 

· 操作画面 C (COM31 でデータを受信する)
 >_ Windows PowerShell 
Windows PowerShell
Copyright (C) Microsoft Corporation. All rights reserved.

新しいクロスプラットフォームの PowerShell をお試しください https://aka.ms/pscore6

PS C:\Users\who> $com = New-Object System.IO.Ports.SerialPort COM31 
PS C:\Users\who> $com.open() 
PS C:\Users\who> $com.readline() 
Hello       ← A から送信されたデータ
PS C:\Users\who> $com.close()