- 제 목 mqtt python
- 작성자 관리자 등록일 2025-10-08/16:17 조회수 36
from machine import UART, Pin
import time
import json
import os
import neopixel
import _thread
#import datetime # 날짜와 시간 생성을 위해 import
from datetime import datetime
#======================= MQTT ==========================
MQTT_SERVER = "broker.eztrack.co.kr"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "sunlink"
MQTT_PUB_TOPIC = "/message/eztrack/pub"
MQTT_SUB_TOPIC = "/message/eztrack/sub"
MQTT_MESSAGE = "Receive my Message?" # 요청 반영
MQTT_COUNT = 0
SHIP_NAME = ""
PHONE = ""
CAPTAIN = ""
MMSI = ""
ACTTIVE_TIME = ""
PMAX = 0
DIV = "loop"
#----------------------WS2812 LED Color-------------------------
NUM_LEDS = 4
LED_PIN = 2 # 필요 시 변경 (예: 2 = GP2)
BRIGHTNESS = 150 # 0.0 ~ 1.0 (dim to bright)
BLUE = (0 , 0 , 255)
YELLOW = (255, 255, 0)
OFF = (0 , 0 , 0)
# ===================== HW: UART/LED =====================
LTE = UART(0, baudrate=115200, tx=Pin(0), rx=Pin(1))
led = Pin(25, Pin.OUT)
# ================= LTE 모듈 RESET GPIO =================
LTE_RESET_GPIO = 20 # 필요 시 변경 (예: 2 = GP2)
PWR_GPIO = 15
LTE_RESET_ACTIVE_LOW = True # 대부분 Active-Low
_RST_INACTIVE = 0 if LTE_RESET_ACTIVE_LOW else 1
_RST_ACTIVE = 1 - _RST_INACTIVE
LTE_RST = Pin(LTE_RESET_GPIO, Pin.OUT, value=_RST_INACTIVE)
#===========================================================
msg = ""
IMEI = "";
#************************ GPS *****************************
gJson = {}
sJson = {}
iJson = '''{"div":"TST"
, "MMSI":""
, "IMEI":""
, "msg":""
, "lat":0.0
, "lon":0.0
, "date":""
, "time":""
, "fix":""
, "sats":""
, "ship_name":""
, "captain":""
, "phone":""}'''
#-------------------------json----------------------------
# UART1: GPS TXD -> GPIO9 (RX), GPS RXD <- GPIO8 (TX, 선택)
GPS_UART_ID = 1
GPS_BAUD = 9600
GPS_TX_PIN = 8 # Pico TX → GPS RX (optional)
GPS_RX_PIN = 9 # Pico RX ← GPS TX
GPS_PRINT_EVERY_MS = 1000
GPS_UART = UART(GPS_UART_ID, baudrate=GPS_BAUD, tx=Pin(GPS_TX_PIN), rx=Pin(GPS_RX_PIN))
#-------------------------------------------------------------------------------------------
_gps_buf = b""
_gps_last_lat = None
_gps_last_lon = None
_gps_last_date = None
_gps_last_time = None
_gps_last_fix = None
_gps_last_sats = None
_gps_last_print = time.ticks_ms()
np = neopixel.NeoPixel(Pin(LED_PIN, Pin.OUT), NUM_LEDS)
def imei_Save(imei: str) -> bool: # 저장 성공/실패 여부를 반환하도록!
imei_file_path = 'imei.txt'
try:
with open(imei_file_path, 'w') as f:
f.write(imei.strip())
f.write("\n")
print(f"✅ 파일 '{imei_file_path}'에 IMEI 저장 성공! (덮어쓰기 됨)") # 🚨 메시지도 '저장'으로 변경!
return True
except Exception as e:
print(f"❌ 파일 '{imei_file_path}' 저장 실패: {e}")
return False
def imei_Read() -> str:
imei_str = ''
save_file = 'imei.txt'
try:
os.stat(save_file) # 파일 정보 가져오기 시도 (MicroPython 방식)
file_exists = True
print(f"야!.... '{save_file}' 파일 있다, 있어!")
except OSError:
print(f"젠장... '{save_file}' 파일 없어. 다시 확인해봐.")
return False
try:
with open(save_file, 'r') as f:
print(f"\n--- '{save_file}'의 현재 전체 내용 ---")
imei_str = f.readline()
imei_str = imei_str.strip()
return imei_str
except FileNotFoundError:
print(f"❌ '{save_file}' 파일을 찾을 수 없습니다.")
return ""
except Exception as e:
print(f"❌ 파일 읽기 중 알 수 없는 에러 발생: {e}")
def Work_Read() -> str:
global PMAX # 얘네들은 global이라고 미리 박아놔야 돼
global MQTT_COUNT
save_file = 'work.txt'
try:
# 파일이 없을 수도 있으니 통째로 try-except로 감싸는 게 좋아
with open(save_file, 'r') as f:
read_value = f.readline()
read_strip = read_value.strip()
data_dict = json.loads(read_strip)
print(f"\n--- '{save_file}'의 현재 전체 내용 ---") # 'json_file' 대신 'save_file'로 고침
print(f"PMAX: {PMAX}") # 값 확인용으로 변수 이름도 같이 출력하는 게 좋아
print(f"MQTT_COUNT: {MQTT_COUNT}")
print('-------------- json pass --------')
return data_dict
except OSError as e: # FileNotFoundError 대신 OSError로 잡는 게 안전빵임
if e.args[0] == 2: # MicroPython에서 errno 2는 ENOENT (파일이나 디렉토리가 없음)
print(f"❌ '{save_file}' 파일을 찾을 수 없습니다. (에러 코드: {e.args[0]})")
else:
print(f"❌ 파일 접근 중 알 수 없는 OSError 발생: {e}")
return False
except ValueError as e: # json.loads() 에러 잡기. 예를 들어, 파일 내용이 JSON 형식이 아닐 때
print(read_value)
print(f"❌ '{save_file}' 파일 내용이 JSON 형식이 아닙니다: {e}")
return False
except Exception as e: # 혹시 모를 다른 모든 에러를 대비
print(f"❌ 파일 읽기 중 알 수 없는 에러 발생: {e}")
return False
def Time_Read() -> str:
global PMAX # 얘네들은 global이라고 미리 박아놔야 돼
global MQTT_COUNT
save_file = 'time.txt'
try:
# 파일이 없을 수도 있으니 통째로 try-except로 감싸는 게 좋아
with open(save_file, 'r') as f:
read_value = f.readline()
read_strip = read_value.strip()
data_dict = json.loads(read_strip)
print(f"\n--- '{save_file}'의 현재 전체 내용 ---") # 'json_file' 대신 'save_file'로 고침
print(f"PMAX: {PMAX}") # 값 확인용으로 변수 이름도 같이 출력하는 게 좋아
print(f"MQTT_COUNT: {MQTT_COUNT}")
print('-------------- json pass --------')
return data_dict
except OSError as e: # FileNotFoundError 대신 OSError로 잡는 게 안전빵임
if e.args[0] == 2: # MicroPython에서 errno 2는 ENOENT (파일이나 디렉토리가 없음)
print(f"❌ '{save_file}' 파일을 찾을 수 없습니다. (에러 코드: {e.args[0]})")
else:
print(f"❌ 파일 접근 중 알 수 없는 OSError 발생: {e}")
return False
except ValueError as e: # json.loads() 에러 잡기. 예를 들어, 파일 내용이 JSON 형식이 아닐 때
print(read_value)
print(f"❌ '{save_file}' 파일 내용이 JSON 형식이 아닙니다: {e}")
return False
except Exception as e: # 혹시 모를 다른 모든 에러를 대비
print(f"❌ 파일 읽기 중 알 수 없는 에러 발생: {e}")
return False
def scale(color, brightness=1.0):
r, g, b = color
return (int(r * brightness), int(g * brightness), int(b * brightness))
def fill(color):
c = scale(color, BRIGHTNESS)
for i in range(NUM_LEDS):
np[i] = c
np.write()
# --- 스레드 중지 플래그 및 락 (대신 사용) ---
global_stop_flag = False # 스레드 중지를 위한 전역 플래그
global_stop_flag_lock = _thread.allocate_lock() # 전역 플래그 접근을 위한 락
def Worker():
print("✨ Worker 스레드 (NeoPixel 제어) 시작되었습니다! ✨")
try:
while True:
global_stop_flag_lock.acquire() # 순차적 처리 경
if global_stop_flag: # 플래그가 True이면 루프 탈출
global_stop_flag_lock.release()
break
global_stop_flag_lock.release()
fill(BLUE)
time.sleep(1)
# sleep 후 다시 플래그 확인 (안전을 위해)
global_stop_flag_lock.acquire()
if global_stop_flag:
global_stop_flag_lock.release()
break
global_stop_flag_lock.release()
fill(OFF)
time.sleep(1)
global_stop_flag_lock.acquire()
if global_stop_flag:
global_stop_flag_lock.release()
break
global_stop_flag_lock.release()
fill(YELLOW)
time.sleep(1)
global_stop_flag_lock.acquire()
if global_stop_flag:
global_stop_flag_lock.release()
break
global_stop_flag_lock.release()
fill(OFF)
time.sleep(1)
global_stop_flag_lock.acquire()
if global_stop_flag:
global_stop_flag_lock.release()
break
global_stop_flag_lock.release()
except Exception as e:
print(f"❌ Worker 스레드 오류 발생: {e} ❌")
finally:
fill(OFF) # 스레드 종료 시 NeoPixel 끄기
print("✅ Worker 스레드가 종료 신호를 받고 종료합니다. ✅")
def LTE_Module_Reset(hold_s=5, boot_wait_s=8):
print("[RESET] LTE module reset start (hold {}s)".format(hold_s))
LTE_RST.value(_RST_ACTIVE)
# _Wait_With_GPS(hold_s * 1000) # 기다리는 동안에도 GPS 서비스
LTE_RST.value(_RST_INACTIVE)
print("[RESET] released, boot wait {}s".format(boot_wait_s))
# _Wait_With_GPS(boot_wait_s * 1000)
PUBLISH_INTERVAL_S = 10 # 퍼블리시 주기(초)
reset_line = Pin(15, Pin.OUT) # 핀을 출력 모드로 설정
# ==================== 유틸/공용 함수 ===================
def Blink_LED(duration=0.05):
led.value(1); time.sleep(duration); led.value(0)
def _to_bytes(x):
if isinstance(x, (bytes, bytearray)):
return bytes(x)
return str(x).encode()
def Send_AT(cmd, expected, timeout=3000, max_retry=2):
"""
AT 명령: 최대 2회 재시도, expected(단일 패턴) 포함 시 성공.
스레드 없이, 대기 루프 중에도 GPS 서비스 계속 호출.
"""
exp = _to_bytes(expected)
attempt = 0
while attempt < max_retry:
attempt += 1
Blink_LED(0.1)
LTE.write(cmd + "\r")
t0 = time.ticks_ms()
buf = b""
while time.ticks_diff(time.ticks_ms(), t0) < timeout:
if LTE.any():
Blink_LED(0.02)
buf += LTE.read()
if exp in buf:
print("[OK-{}] {}".format(attempt, cmd))
return True
time.sleep(0.005)
print("[FAIL-{}] {} → wait 3s".format(attempt, cmd))
print("[SKIP]", cmd, "(expected:", exp, ")")
return False
# --- IMEI를 찾기 위한 함수 ---
def Get_IMEI(uart_obj, line_ending='\r', command_timeout_ms=5000):
cmd = b'AT+CGSN' + line_ending.encode('ascii') # AT+CGSN 명령어 바이트
print(f"\n[UART] -> AT 명령어 전송: {cmd.decode('ascii').strip().replace('\r', '\\r').replace('\n', '\\n')}")
uart_obj.write(cmd) # 명령어 전송
start_time_ms = time.ticks_ms() # 현재 시간 (밀리초) 기록
received_lines = []
imei_result = None
# 응답 기다리기
while time.ticks_diff(time.ticks_ms(), start_time_ms) < command_timeout_ms:
if uart_obj.any(): # 수신 버퍼에 데이터가 있다면
try:
line = uart_obj.readline().decode('utf-8').strip() # 한 줄 읽고 디코딩
except UnicodeError: # 디코딩 실패 시 Latin-1으로 재시도 (혹은 무시)
line = uart_obj.readline().decode('latin-1', 'ignore').strip()
if line: # 빈 라인이 아니면
received_lines.append(line)
print(f"[UART] <- 수신: {line}") # ✨ 여기! 수신 라인 즉시 출력! ✨
# 15자리 숫자로 된 IMEI 찾기
if len(line) == 15 and line.isdigit():
imei_result = line
print("[UART] ✔ IMEI 패턴 발견!")
break # 찾았으면 루프 종료
# 'OK' 또는 'ERROR'가 오면 응답 종료로 판단
if "OK" in line or "ERROR" in line:
break # 루프 종료
time.sleep_ms(10) # 짧게 대기
print("[UART] --- 응답 수신 종료 ---")
if imei_result:
print(f"\n[UART] ✅ 최종 획득한 IMEI: {imei_result}")
imei_value= imei_result
return imei_result
else:
print("\n[UART] ❌ IMEI를 응답에서 찾지 못했습니다.")
if not received_lines:
print(" (아무런 응답도 받지 못했습니다. 모듈 연결, 전원, Baud Rate 확인!)")
elif "ERROR" in "\n".join(received_lines):
print(" (모듈에서 'ERROR' 응답을 받았습니다. AT 명령어 또는 모듈 상태 확인!)")
else:
print(" (응답은 있었으나 15자리 IMEI 패턴을 찾지 못했습니다.)")
return None
def Send_AT_Any(cmd, expecteds, timeout=3000, max_retry=2):
"""
AT 명령: 최대 2회 재시도, expecteds 중 하나라도 매칭되면 성공.
"""
exps = expecteds if isinstance(expecteds, (list, tuple)) else [expecteds]
exps = [_to_bytes(e) for e in exps]
attempt = 0
while attempt < max_retry:
attempt += 1
Blink_LED(0.1)
LTE.write(cmd + "\r")
t0 = time.ticks_ms()
buf = b""
while time.ticks_diff(time.ticks_ms(), t0) < timeout:
if LTE.any():
Blink_LED(0.02)
buf += LTE.read()
for ex in exps:
if ex in buf:
print("[OK-{}] {}".format(attempt, cmd))
return True
# _GPS_Service()
time.sleep(0.005)
print("[FAIL-{}] {} → wait 3s".format(attempt, cmd))
# _Wait_With_GPS(3000)
print("[SKIP]", cmd, "(expected one of:", exps, ")")
return False
#print("# ================= LTE Initial ======================")
def LTE_Init_Once():
if not Send_AT("AT" , "OK"): return False
if not Send_AT("ATE0" , "OK"): return False
if not Send_AT("AT+CPIN?" , "READY"): return False
if not Send_AT("AT+CGATT?", "+CGATT: 1"):
LTE_RST.value(0)
print("[WARN] CGATT attach 실패 → skip 진행")
if not Send_AT('AT+CGDCONT=1,"IP","connect.cxn"', "OK"): return False
if not Send_AT("AT+QIACT=1", "OK"): return False
return True
#print("# ============== MQTT 한 사이클 퍼블리시 =================")
#***********************************************************
#
# SUNSCRIBE DATA PROCESS
#
#***********************************************************
def MQTT_Publish_Once(loop_cnt):
if not Send_AT('AT+QMTOPEN=0,"{}",{}'.format(MQTT_SERVER, MQTT_PORT), "+QMTOPEN: 0,0"):
return False
if not Send_AT('AT+QMTCONN=0,"{}"'.format(MQTT_CLIENT_ID), "+QMTCONN: 0,0,0"):
Send_AT("AT+QMTCLOSE=0", "+QMTCLOSE: 0,0")
return False
if not Send_AT('AT+QMTPUB=0,0,0,0,"{}"'.format(MQTT_PUB_TOPIC), ">"):
Send_AT("AT+QMTDISC=0", "+QMTDISC: 0,0")
Send_AT("AT+QMTCLOSE=0", "+QMTCLOSE: 0,0")
return False
_GPS_Service()
global MQTT_COUNT
global DIV
global PMAX
Blink_LED(0.05)
# global gJson
msg = json.dumps(gJson)
if DIV == "loop":
print(f'-----나다LOOP ---------- PUBLISH COUNT -{MQTT_COUNT}--------------------')
LTE.write(msg + "\x1A")
print("==========={MQTT_COUNT}=========")
print("================={PMAX}=========")
if DIV == "test" and MQTT_COUNT <= PMAX:
LTE.write(msg + "\x1A")
MQTT_COUNT += 1
print(f'-----나다TEST ---------- PUBLISH COUNT -{MQTT_COUNT}--------------------')
else:
DIV = 'sleep'
MQTT_COUNT = 0
# _Wait_With_GPS(2000) # 전송 후 여유
Send_AT("AT+QMTDISC=0", "+QMTDISC: 0,0")
Send_AT("AT+QMTCLOSE=0", "+QMTCLOSE: 0,0")
return True
def _Read_Until(timeout_ms=500):
t0 = time.ticks_ms()
buf = b""
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if LTE.any():
buf += LTE.read()
else:
time.sleep(0.01)
# _GPS_Service()
return buf
#***********************************************************
#
# PUBLISH DATA PROCESS
#
#***********************************************************
def Subscribe_Recv_Data(buf_bytes: bytes):
if not buf_bytes:
return False
global IDLE
global MQTT_COUNT
global PMAX
try:
full_msg_content = buf_bytes.decode("utf-8", "ignore") # 1. buf_bytes를 전체 디코드해서 'full_msg_content' 변수에 담자!
print(full_msg_content)
target_to_remove_prefix = '\r\n+QMTRECV: 0,0,"/message/eztrack/sub","' # 2. 제거하고 싶은 패턴을 정의 정확히 자름)
if full_msg_content.startswith(target_to_remove_prefix):
Org_Value = full_msg_content[len(target_to_remove_prefix):]
else:
Org_Value = full_msg_content
return False
print(Org_Value)
Org_Value = Org_Value.rstrip('"\r\n') # ⭐⭐ JSON 내부에 있는 \r\n (개행문자) 제거! ⭐⭐
json_string = Org_Value.replace('\r', '').replace('\n', '').replace('\t', '')
json_data = None # 일단 JSON 데이터 담을 변수 초기화
json_data = json.loads(json_string) # 클린된 문자열을 JSON으로 변환 시도!
if not isinstance(json_data, dict):
return False
if json_data["div"] == "Save" and json_data["imei"] != imei_Read():
print("내것이 맞다===================================")
if json_data["div"] == "Save":
print("=============== save Save file========================")
Work_Save(json_data)
if imei_Read() == False:
imei_Save(json_data["imei"])
return False
if json_data["div"] == "time":
print(json_data)
print("=============== time Save file========================")
Time_Save(json_data)
except:
print("Error json")
print(json_string)
def Time_Save( json_str ):
print("Time_save routine 이다.")
print(json_str)
save_file = 'time.txt' # JSON 전체를 저장할 파일 이름
try:
with open(save_file, 'w', encoding='utf-8') as f:
json.dump(json_str, f)
print(f"✅ 파일 '{save_file}'에 JSON 데이터 전체 저장 성공!")
return True
except OSError as e: # 파일 입출력 관련 에러
print(f"🚨 파일 '{save_file}'에 JSON 데이터 저장 중 에러 발생! ({e})")
return False
except Exception as e:
print(f"🤷♂️ JSON 파일 저장 중 예상치 못한 에러! ({e})")
print(f" > 발생 에러 상세: {e}") # 에러 내용을 더 자세히 출력
print(f" > 파일명: {save_file}, 시도된 데이터: {json_data}") # 어떤 데이터를 저장하려다 문제인지
return False # dump 함수에서 예외 발생 시 False 리턴
def Work_Save( json_str ):
print("Work_save routine 이다.")
print(json_str)
save_file = 'work.txt' # JSON 전체를 저장할 파일 이름
try:
with open(save_file, 'w', encoding='utf-8') as f:
json.dump(json_str, f)
print(f"✅ 파일 '{save_file}'에 JSON 데이터 전체 저장 성공!")
machine.reset()
return True
except OSError as e: # 파일 입출력 관련 에러
print(f"🚨 파일 '{save_file}'에 JSON 데이터 저장 중 에러 발생! ({e})")
return False
except Exception as e:
print(f"🤷♂️ JSON 파일 저장 중 예상치 못한 에러! ({e})")
print(f" > 발생 에러 상세: {e}") # 에러 내용을 더 자세히 출력
print(f" > 파일명: {save_file}, 시도된 데이터: {json_data}") # 어떤 데이터를 저장하려다 문제인지
return False # dump 함수에서 예외 발생 시 False 리턴
def MQTT_Subscribe_Window(sub_ms=50000):
need_reset = False
ok_open = Send_AT('AT+QMTOPEN=0,"{}",{}'.format(MQTT_SERVER, MQTT_PORT), b"+QMTOPEN: 0,0")
if not ok_open:
print("[AT+QMTOPEN=0] open fail"); need_reset = True
else:
# MQTT SERVER CONNECT
ok_conn = Send_AT('AT+QMTCONN=0,"{}"'.format(MQTT_CLIENT_ID), b"+QMTCONN: 0,0,0")
if not ok_conn:
print("[AT+QMTCONN=0] conn fail"); need_reset = True
Send_AT("AT+QMTCLOSE=0", b"+QMTCLOSE: 0,0")
else:
# MQTT SERVER OPEN OK SUBSCRITE
if Send_AT_Any('AT+QMTSUB=0,1,"{}",0'.format(MQTT_SUB_TOPIC),[b"OK", b"+QMTSUB:"], timeout=4000, max_retry=2):
print("[AT+QMTSUB=0,1] waiting {} ms...".format(sub_ms))
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < sub_ms:
buf = _Read_Until(250)
if buf and (b"+QMTRECV:" in buf):
Subscribe_Recv_Data(buf)
# _Wait_With_GPS(3000) # late packets
else:
print("[SUB FAIL] → will reset LTE")
need_reset = True
Send_AT("AT+QMTDISC=0" , b"+QMTDISC: 0,0")
Send_AT("AT+QMTCLOSE=0", b"+QMTCLOSE: 0,0")
if need_reset:
reset_line.value(1)
LTE_Module_Reset(hold_s=5, boot_wait_s=8)
if not LTE_Init_Once():
reset_line.value(0)
print("[RESET] LTE_Init_Once fail (retry next loop)")
def _GPS_NMEA_CheckSum_OK(s):
try:
if not s or s[0] != '$': return False
star = s.rfind('*')
if star == -1: return False
data = s[1:star]
checksum = 0
for c in data:
checksum ^= ord(c)
given = int(s[star+1:star+3], 16)
return checksum == given
except:
return True # 관대 처리
def _GPS_DM_To_Deg(dm, hemi):
if not dm: return None
try:
f = float(dm)
deg = int(f // 100)
minutes = f - deg * 100
dec = deg + minutes / 60.0
if hemi in ('S', 'W'):
dec = -dec
return dec
except:
return None
def _GPS_Parse_GPRMC(parts):
global _imei,_gps_last_lat, _gps_last_lon, _gps_last_date, _gps_last_time
if len(parts) < 10: return
status = parts[2]
raw_time = parts[1]
raw_lat, hemi_lat = parts[3], (parts[4] if len(parts) > 4 else None)
raw_lon, hemi_lon = parts[5], (parts[6] if len(parts) > 6 else None)
raw_date = parts[9]
if raw_time and len(raw_time) >= 6:
_gps_last_time = f"{raw_time[0:2]}:{raw_time[2:4]}:{raw_time[4:6]}"
if raw_date and len(raw_date) == 6:
_gps_last_date = f"20{raw_date[4:6]}-{raw_date[2:4]}-{raw_date[0:2]}"
lat = _GPS_DM_To_Deg(raw_lat, hemi_lat) if raw_lat and hemi_lat else None
lon = _GPS_DM_To_Deg(raw_lon, hemi_lon) if raw_lon and hemi_lon else None
if status == 'A':
if lat is not None: _gps_last_lat = lat
if lon is not None: _gps_last_lon = lon
def _GPS_Parse_GPGGA(parts):
global _gps_last_fix, _gps_last_sats, _gps_last_time, _gps_last_lat, _gps_last_lon
if len(parts) < 8: return
raw_time = parts[1]
if raw_time and len(raw_time) >= 6:
_gps_last_time = f"{raw_time[0:2]}:{raw_time[2:4]}:{raw_time[4:6]}"
raw_lat, hemi_lat = parts[2], (parts[3] if len(parts) > 3 else None)
raw_lon, hemi_lon = parts[4], (parts[5] if len(parts) > 5 else None)
lat = _GPS_DM_To_Deg(raw_lat, hemi_lat) if raw_lat and hemi_lat else None
lon = _GPS_DM_To_Deg(raw_lon, hemi_lon) if raw_lon and hemi_lon else None
if lat is not None: _gps_last_lat = lat
if lon is not None: _gps_last_lon = lon
try: _gps_last_fix = int(parts[6]) if parts[6] else None
except: _gps_last_fix = None
try: _gps_last_sats = int(parts[7]) if parts[7] else None
except: _gps_last_sats = None
def _GPS_Service():
"""루프 어디서든 자주 호출: 입력 누적/파싱 + 1초마다 출력"""
_Wait_With_GPS(1000)
global _gps_buf, _gps_last_print
# 1) 입력 누적 & 문장 파싱
if GPS_UART.any():
_gps_buf += GPS_UART.read()
if b'\n' in _gps_buf:
lines = _gps_buf.split(b'\n')
_gps_buf = lines[-1] # 마지막 조각 보존
for raw in lines[:-1]:
try:
s = raw.decode('ascii').strip()
except:
s = ""
if s.startswith('$') and _GPS_NMEA_CheckSum_OK(s):
body = s[1:s.find('*')] if '*' in s else s[1:]
parts = body.split(',')
head = parts[0]
if head.endswith("GPRMC") or head.endswith("GNRMC"):
_GPS_Parse_GPRMC(parts)
elif head.endswith("GPGGA") or head.endswith("GNGGA"):
_GPS_Parse_GPGGA(parts)
# 2) 1초마다 상태 출력(없어도 NA)
if time.ticks_diff(time.ticks_ms(), _gps_last_print) >= GPS_PRINT_EVERY_MS:
_gps_last_print = time.ticks_ms()
lat = _gps_last_lat
lon = _gps_last_lon
d = _gps_last_date
t = _gps_last_time
fx = _gps_last_fix
sv = _gps_last_sats
lat_s = ("{:.6f}".format(lat)) if isinstance(lat, (int, float)) else "NA"
lng_s = ("{:.6f}".format(lon)) if isinstance(lon, (int, float)) else "NA"
d = d if d else "NA"
t = t if t else "NA"
fx_s = str(fx) if fx is not None else "NA"
sv_s = str(sv) if sv is not None else "NA"
#ss = print('[GPS]', 'lat=', lat_s, 'lon=', lon_s, 'date=', d_s, 'time=', t_s, 'fix=', fx_s, 'sats=', sv_s)
global MQTT_MESSAGE
MQTT_MESSAGE ="changed message"
gJson['IMEI'] = imei_Read()
gJson['MMSI'] = '0123456789'
gJson['msg'] = 'Help>>> Sinktracking!!!'
gJson['lat'] = lat_s
gJson['lng'] = lng_s
gJson['date'] = d
gJson['time'] = t
gJson['fix'] = fx_s
gJson['sats'] = sv_s
gJson['captain'] = CAPTAIN
gJson['phone'] = PHONE
gJson['ship_name'] = SHIP_NAME
#msg = "div={},MMSI={},IMEI={},msg={},lat={},lon={},date={},time={},fix={},sats={},ship_name={}, captain={},phone={}".format("TEST","8888",IMEI,MQTT_MESSAGE,lat_s, lng_s, d, t, fx_s, sv_s, SHIP_NAME, CAPTAIN, PHONE)
#print(msg)
#MQTT_MESSAGE=MQTT_MESSAGE+lat_s
def _Wait_With_GPS(ms):
"""대기하면서 GPS 서비스도 계속 돌림"""
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < ms:
# _GPS_Service()
time.sleep(0.01)
#========================================================
#
# 부팅/메인(Main Routime)
#
#========================================================
if __name__ == '__main__':
print("main.py가 직접 실행되었습니다.")
else:
print("main.py는 모듈로 임포트되었습니다.")
LTE_RST.value(1)
print("[BOOT] LTE init...")
print ( DIV)
if DIV == 'loop' or 'test':
_thread.start_new_thread(Worker, ()) # 인자가 없으므로 빈 튜플 ()을 전달
else:
_thread
imei = imei_Read()
if imei == False:
IMEI = ""
else:
IMEI = imei
gJson = json.loads(iJson)
global gjson
MQTT_COUNT = 0
json_data = None
#-----------------------------------
# 기본정보는 save.txt 파일로 저장되어 있다
#-----------------------------------
data_dic = Work_Read()
if isinstance(data_dic, dict):
gJson['MMSI'] = data_dic['MMSI']
gJson['captain'] = data_dic['Captain']
gJson['phone'] = data_dic['Phone']
gJson['ship_name'] = data_dic['Ship_Name']
PMAX = int(data_dic["Max"])
if PMAX > 0:
DIV = 'test'
print(DIV)
print(PMAX)
print('====== pmax===========')
print("===========LTE_Init_Once====================")
while True:
MQTT_Subscribe_Window(sub_ms=50000)
ok = MQTT_Publish_Once(1)
if not ok:
print("[WARN] publish cycle failed (Send_AT 2회 정책으로 skip됨)")
for _ in range(PUBLISH_INTERVAL_S):
Blink_LED(0.01)
# _GPS_Service()
# _Wait_With_GPS(1000) # 1s
import time
import json
import os
import neopixel
import _thread
#import datetime # 날짜와 시간 생성을 위해 import
from datetime import datetime
#======================= MQTT ==========================
MQTT_SERVER = "broker.eztrack.co.kr"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "sunlink"
MQTT_PUB_TOPIC = "/message/eztrack/pub"
MQTT_SUB_TOPIC = "/message/eztrack/sub"
MQTT_MESSAGE = "Receive my Message?" # 요청 반영
MQTT_COUNT = 0
SHIP_NAME = ""
PHONE = ""
CAPTAIN = ""
MMSI = ""
ACTTIVE_TIME = ""
PMAX = 0
DIV = "loop"
#----------------------WS2812 LED Color-------------------------
NUM_LEDS = 4
LED_PIN = 2 # 필요 시 변경 (예: 2 = GP2)
BRIGHTNESS = 150 # 0.0 ~ 1.0 (dim to bright)
BLUE = (0 , 0 , 255)
YELLOW = (255, 255, 0)
OFF = (0 , 0 , 0)
# ===================== HW: UART/LED =====================
LTE = UART(0, baudrate=115200, tx=Pin(0), rx=Pin(1))
led = Pin(25, Pin.OUT)
# ================= LTE 모듈 RESET GPIO =================
LTE_RESET_GPIO = 20 # 필요 시 변경 (예: 2 = GP2)
PWR_GPIO = 15
LTE_RESET_ACTIVE_LOW = True # 대부분 Active-Low
_RST_INACTIVE = 0 if LTE_RESET_ACTIVE_LOW else 1
_RST_ACTIVE = 1 - _RST_INACTIVE
LTE_RST = Pin(LTE_RESET_GPIO, Pin.OUT, value=_RST_INACTIVE)
#===========================================================
msg = ""
IMEI = "";
#************************ GPS *****************************
gJson = {}
sJson = {}
iJson = '''{"div":"TST"
, "MMSI":""
, "IMEI":""
, "msg":""
, "lat":0.0
, "lon":0.0
, "date":""
, "time":""
, "fix":""
, "sats":""
, "ship_name":""
, "captain":""
, "phone":""}'''
#-------------------------json----------------------------
# UART1: GPS TXD -> GPIO9 (RX), GPS RXD <- GPIO8 (TX, 선택)
GPS_UART_ID = 1
GPS_BAUD = 9600
GPS_TX_PIN = 8 # Pico TX → GPS RX (optional)
GPS_RX_PIN = 9 # Pico RX ← GPS TX
GPS_PRINT_EVERY_MS = 1000
GPS_UART = UART(GPS_UART_ID, baudrate=GPS_BAUD, tx=Pin(GPS_TX_PIN), rx=Pin(GPS_RX_PIN))
#-------------------------------------------------------------------------------------------
_gps_buf = b""
_gps_last_lat = None
_gps_last_lon = None
_gps_last_date = None
_gps_last_time = None
_gps_last_fix = None
_gps_last_sats = None
_gps_last_print = time.ticks_ms()
np = neopixel.NeoPixel(Pin(LED_PIN, Pin.OUT), NUM_LEDS)
def imei_Save(imei: str) -> bool: # 저장 성공/실패 여부를 반환하도록!
imei_file_path = 'imei.txt'
try:
with open(imei_file_path, 'w') as f:
f.write(imei.strip())
f.write("\n")
print(f"✅ 파일 '{imei_file_path}'에 IMEI 저장 성공! (덮어쓰기 됨)") # 🚨 메시지도 '저장'으로 변경!
return True
except Exception as e:
print(f"❌ 파일 '{imei_file_path}' 저장 실패: {e}")
return False
def imei_Read() -> str:
imei_str = ''
save_file = 'imei.txt'
try:
os.stat(save_file) # 파일 정보 가져오기 시도 (MicroPython 방식)
file_exists = True
print(f"야!.... '{save_file}' 파일 있다, 있어!")
except OSError:
print(f"젠장... '{save_file}' 파일 없어. 다시 확인해봐.")
return False
try:
with open(save_file, 'r') as f:
print(f"\n--- '{save_file}'의 현재 전체 내용 ---")
imei_str = f.readline()
imei_str = imei_str.strip()
return imei_str
except FileNotFoundError:
print(f"❌ '{save_file}' 파일을 찾을 수 없습니다.")
return ""
except Exception as e:
print(f"❌ 파일 읽기 중 알 수 없는 에러 발생: {e}")
def Work_Read() -> str:
global PMAX # 얘네들은 global이라고 미리 박아놔야 돼
global MQTT_COUNT
save_file = 'work.txt'
try:
# 파일이 없을 수도 있으니 통째로 try-except로 감싸는 게 좋아
with open(save_file, 'r') as f:
read_value = f.readline()
read_strip = read_value.strip()
data_dict = json.loads(read_strip)
print(f"\n--- '{save_file}'의 현재 전체 내용 ---") # 'json_file' 대신 'save_file'로 고침
print(f"PMAX: {PMAX}") # 값 확인용으로 변수 이름도 같이 출력하는 게 좋아
print(f"MQTT_COUNT: {MQTT_COUNT}")
print('-------------- json pass --------')
return data_dict
except OSError as e: # FileNotFoundError 대신 OSError로 잡는 게 안전빵임
if e.args[0] == 2: # MicroPython에서 errno 2는 ENOENT (파일이나 디렉토리가 없음)
print(f"❌ '{save_file}' 파일을 찾을 수 없습니다. (에러 코드: {e.args[0]})")
else:
print(f"❌ 파일 접근 중 알 수 없는 OSError 발생: {e}")
return False
except ValueError as e: # json.loads() 에러 잡기. 예를 들어, 파일 내용이 JSON 형식이 아닐 때
print(read_value)
print(f"❌ '{save_file}' 파일 내용이 JSON 형식이 아닙니다: {e}")
return False
except Exception as e: # 혹시 모를 다른 모든 에러를 대비
print(f"❌ 파일 읽기 중 알 수 없는 에러 발생: {e}")
return False
def Time_Read() -> str:
global PMAX # 얘네들은 global이라고 미리 박아놔야 돼
global MQTT_COUNT
save_file = 'time.txt'
try:
# 파일이 없을 수도 있으니 통째로 try-except로 감싸는 게 좋아
with open(save_file, 'r') as f:
read_value = f.readline()
read_strip = read_value.strip()
data_dict = json.loads(read_strip)
print(f"\n--- '{save_file}'의 현재 전체 내용 ---") # 'json_file' 대신 'save_file'로 고침
print(f"PMAX: {PMAX}") # 값 확인용으로 변수 이름도 같이 출력하는 게 좋아
print(f"MQTT_COUNT: {MQTT_COUNT}")
print('-------------- json pass --------')
return data_dict
except OSError as e: # FileNotFoundError 대신 OSError로 잡는 게 안전빵임
if e.args[0] == 2: # MicroPython에서 errno 2는 ENOENT (파일이나 디렉토리가 없음)
print(f"❌ '{save_file}' 파일을 찾을 수 없습니다. (에러 코드: {e.args[0]})")
else:
print(f"❌ 파일 접근 중 알 수 없는 OSError 발생: {e}")
return False
except ValueError as e: # json.loads() 에러 잡기. 예를 들어, 파일 내용이 JSON 형식이 아닐 때
print(read_value)
print(f"❌ '{save_file}' 파일 내용이 JSON 형식이 아닙니다: {e}")
return False
except Exception as e: # 혹시 모를 다른 모든 에러를 대비
print(f"❌ 파일 읽기 중 알 수 없는 에러 발생: {e}")
return False
def scale(color, brightness=1.0):
r, g, b = color
return (int(r * brightness), int(g * brightness), int(b * brightness))
def fill(color):
c = scale(color, BRIGHTNESS)
for i in range(NUM_LEDS):
np[i] = c
np.write()
# --- 스레드 중지 플래그 및 락 (대신 사용) ---
global_stop_flag = False # 스레드 중지를 위한 전역 플래그
global_stop_flag_lock = _thread.allocate_lock() # 전역 플래그 접근을 위한 락
def Worker():
print("✨ Worker 스레드 (NeoPixel 제어) 시작되었습니다! ✨")
try:
while True:
global_stop_flag_lock.acquire() # 순차적 처리 경
if global_stop_flag: # 플래그가 True이면 루프 탈출
global_stop_flag_lock.release()
break
global_stop_flag_lock.release()
fill(BLUE)
time.sleep(1)
# sleep 후 다시 플래그 확인 (안전을 위해)
global_stop_flag_lock.acquire()
if global_stop_flag:
global_stop_flag_lock.release()
break
global_stop_flag_lock.release()
fill(OFF)
time.sleep(1)
global_stop_flag_lock.acquire()
if global_stop_flag:
global_stop_flag_lock.release()
break
global_stop_flag_lock.release()
fill(YELLOW)
time.sleep(1)
global_stop_flag_lock.acquire()
if global_stop_flag:
global_stop_flag_lock.release()
break
global_stop_flag_lock.release()
fill(OFF)
time.sleep(1)
global_stop_flag_lock.acquire()
if global_stop_flag:
global_stop_flag_lock.release()
break
global_stop_flag_lock.release()
except Exception as e:
print(f"❌ Worker 스레드 오류 발생: {e} ❌")
finally:
fill(OFF) # 스레드 종료 시 NeoPixel 끄기
print("✅ Worker 스레드가 종료 신호를 받고 종료합니다. ✅")
def LTE_Module_Reset(hold_s=5, boot_wait_s=8):
print("[RESET] LTE module reset start (hold {}s)".format(hold_s))
LTE_RST.value(_RST_ACTIVE)
# _Wait_With_GPS(hold_s * 1000) # 기다리는 동안에도 GPS 서비스
LTE_RST.value(_RST_INACTIVE)
print("[RESET] released, boot wait {}s".format(boot_wait_s))
# _Wait_With_GPS(boot_wait_s * 1000)
PUBLISH_INTERVAL_S = 10 # 퍼블리시 주기(초)
reset_line = Pin(15, Pin.OUT) # 핀을 출력 모드로 설정
# ==================== 유틸/공용 함수 ===================
def Blink_LED(duration=0.05):
led.value(1); time.sleep(duration); led.value(0)
def _to_bytes(x):
if isinstance(x, (bytes, bytearray)):
return bytes(x)
return str(x).encode()
def Send_AT(cmd, expected, timeout=3000, max_retry=2):
"""
AT 명령: 최대 2회 재시도, expected(단일 패턴) 포함 시 성공.
스레드 없이, 대기 루프 중에도 GPS 서비스 계속 호출.
"""
exp = _to_bytes(expected)
attempt = 0
while attempt < max_retry:
attempt += 1
Blink_LED(0.1)
LTE.write(cmd + "\r")
t0 = time.ticks_ms()
buf = b""
while time.ticks_diff(time.ticks_ms(), t0) < timeout:
if LTE.any():
Blink_LED(0.02)
buf += LTE.read()
if exp in buf:
print("[OK-{}] {}".format(attempt, cmd))
return True
time.sleep(0.005)
print("[FAIL-{}] {} → wait 3s".format(attempt, cmd))
print("[SKIP]", cmd, "(expected:", exp, ")")
return False
# --- IMEI를 찾기 위한 함수 ---
def Get_IMEI(uart_obj, line_ending='\r', command_timeout_ms=5000):
cmd = b'AT+CGSN' + line_ending.encode('ascii') # AT+CGSN 명령어 바이트
print(f"\n[UART] -> AT 명령어 전송: {cmd.decode('ascii').strip().replace('\r', '\\r').replace('\n', '\\n')}")
uart_obj.write(cmd) # 명령어 전송
start_time_ms = time.ticks_ms() # 현재 시간 (밀리초) 기록
received_lines = []
imei_result = None
# 응답 기다리기
while time.ticks_diff(time.ticks_ms(), start_time_ms) < command_timeout_ms:
if uart_obj.any(): # 수신 버퍼에 데이터가 있다면
try:
line = uart_obj.readline().decode('utf-8').strip() # 한 줄 읽고 디코딩
except UnicodeError: # 디코딩 실패 시 Latin-1으로 재시도 (혹은 무시)
line = uart_obj.readline().decode('latin-1', 'ignore').strip()
if line: # 빈 라인이 아니면
received_lines.append(line)
print(f"[UART] <- 수신: {line}") # ✨ 여기! 수신 라인 즉시 출력! ✨
# 15자리 숫자로 된 IMEI 찾기
if len(line) == 15 and line.isdigit():
imei_result = line
print("[UART] ✔ IMEI 패턴 발견!")
break # 찾았으면 루프 종료
# 'OK' 또는 'ERROR'가 오면 응답 종료로 판단
if "OK" in line or "ERROR" in line:
break # 루프 종료
time.sleep_ms(10) # 짧게 대기
print("[UART] --- 응답 수신 종료 ---")
if imei_result:
print(f"\n[UART] ✅ 최종 획득한 IMEI: {imei_result}")
imei_value= imei_result
return imei_result
else:
print("\n[UART] ❌ IMEI를 응답에서 찾지 못했습니다.")
if not received_lines:
print(" (아무런 응답도 받지 못했습니다. 모듈 연결, 전원, Baud Rate 확인!)")
elif "ERROR" in "\n".join(received_lines):
print(" (모듈에서 'ERROR' 응답을 받았습니다. AT 명령어 또는 모듈 상태 확인!)")
else:
print(" (응답은 있었으나 15자리 IMEI 패턴을 찾지 못했습니다.)")
return None
def Send_AT_Any(cmd, expecteds, timeout=3000, max_retry=2):
"""
AT 명령: 최대 2회 재시도, expecteds 중 하나라도 매칭되면 성공.
"""
exps = expecteds if isinstance(expecteds, (list, tuple)) else [expecteds]
exps = [_to_bytes(e) for e in exps]
attempt = 0
while attempt < max_retry:
attempt += 1
Blink_LED(0.1)
LTE.write(cmd + "\r")
t0 = time.ticks_ms()
buf = b""
while time.ticks_diff(time.ticks_ms(), t0) < timeout:
if LTE.any():
Blink_LED(0.02)
buf += LTE.read()
for ex in exps:
if ex in buf:
print("[OK-{}] {}".format(attempt, cmd))
return True
# _GPS_Service()
time.sleep(0.005)
print("[FAIL-{}] {} → wait 3s".format(attempt, cmd))
# _Wait_With_GPS(3000)
print("[SKIP]", cmd, "(expected one of:", exps, ")")
return False
#print("# ================= LTE Initial ======================")
def LTE_Init_Once():
if not Send_AT("AT" , "OK"): return False
if not Send_AT("ATE0" , "OK"): return False
if not Send_AT("AT+CPIN?" , "READY"): return False
if not Send_AT("AT+CGATT?", "+CGATT: 1"):
LTE_RST.value(0)
print("[WARN] CGATT attach 실패 → skip 진행")
if not Send_AT('AT+CGDCONT=1,"IP","connect.cxn"', "OK"): return False
if not Send_AT("AT+QIACT=1", "OK"): return False
return True
#print("# ============== MQTT 한 사이클 퍼블리시 =================")
#***********************************************************
#
# SUNSCRIBE DATA PROCESS
#
#***********************************************************
def MQTT_Publish_Once(loop_cnt):
if not Send_AT('AT+QMTOPEN=0,"{}",{}'.format(MQTT_SERVER, MQTT_PORT), "+QMTOPEN: 0,0"):
return False
if not Send_AT('AT+QMTCONN=0,"{}"'.format(MQTT_CLIENT_ID), "+QMTCONN: 0,0,0"):
Send_AT("AT+QMTCLOSE=0", "+QMTCLOSE: 0,0")
return False
if not Send_AT('AT+QMTPUB=0,0,0,0,"{}"'.format(MQTT_PUB_TOPIC), ">"):
Send_AT("AT+QMTDISC=0", "+QMTDISC: 0,0")
Send_AT("AT+QMTCLOSE=0", "+QMTCLOSE: 0,0")
return False
_GPS_Service()
global MQTT_COUNT
global DIV
global PMAX
Blink_LED(0.05)
# global gJson
msg = json.dumps(gJson)
if DIV == "loop":
print(f'-----나다LOOP ---------- PUBLISH COUNT -{MQTT_COUNT}--------------------')
LTE.write(msg + "\x1A")
print("==========={MQTT_COUNT}=========")
print("================={PMAX}=========")
if DIV == "test" and MQTT_COUNT <= PMAX:
LTE.write(msg + "\x1A")
MQTT_COUNT += 1
print(f'-----나다TEST ---------- PUBLISH COUNT -{MQTT_COUNT}--------------------')
else:
DIV = 'sleep'
MQTT_COUNT = 0
# _Wait_With_GPS(2000) # 전송 후 여유
Send_AT("AT+QMTDISC=0", "+QMTDISC: 0,0")
Send_AT("AT+QMTCLOSE=0", "+QMTCLOSE: 0,0")
return True
def _Read_Until(timeout_ms=500):
t0 = time.ticks_ms()
buf = b""
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if LTE.any():
buf += LTE.read()
else:
time.sleep(0.01)
# _GPS_Service()
return buf
#***********************************************************
#
# PUBLISH DATA PROCESS
#
#***********************************************************
def Subscribe_Recv_Data(buf_bytes: bytes):
if not buf_bytes:
return False
global IDLE
global MQTT_COUNT
global PMAX
try:
full_msg_content = buf_bytes.decode("utf-8", "ignore") # 1. buf_bytes를 전체 디코드해서 'full_msg_content' 변수에 담자!
print(full_msg_content)
target_to_remove_prefix = '\r\n+QMTRECV: 0,0,"/message/eztrack/sub","' # 2. 제거하고 싶은 패턴을 정의 정확히 자름)
if full_msg_content.startswith(target_to_remove_prefix):
Org_Value = full_msg_content[len(target_to_remove_prefix):]
else:
Org_Value = full_msg_content
return False
print(Org_Value)
Org_Value = Org_Value.rstrip('"\r\n') # ⭐⭐ JSON 내부에 있는 \r\n (개행문자) 제거! ⭐⭐
json_string = Org_Value.replace('\r', '').replace('\n', '').replace('\t', '')
json_data = None # 일단 JSON 데이터 담을 변수 초기화
json_data = json.loads(json_string) # 클린된 문자열을 JSON으로 변환 시도!
if not isinstance(json_data, dict):
return False
if json_data["div"] == "Save" and json_data["imei"] != imei_Read():
print("내것이 맞다===================================")
if json_data["div"] == "Save":
print("=============== save Save file========================")
Work_Save(json_data)
if imei_Read() == False:
imei_Save(json_data["imei"])
return False
if json_data["div"] == "time":
print(json_data)
print("=============== time Save file========================")
Time_Save(json_data)
except:
print("Error json")
print(json_string)
def Time_Save( json_str ):
print("Time_save routine 이다.")
print(json_str)
save_file = 'time.txt' # JSON 전체를 저장할 파일 이름
try:
with open(save_file, 'w', encoding='utf-8') as f:
json.dump(json_str, f)
print(f"✅ 파일 '{save_file}'에 JSON 데이터 전체 저장 성공!")
return True
except OSError as e: # 파일 입출력 관련 에러
print(f"🚨 파일 '{save_file}'에 JSON 데이터 저장 중 에러 발생! ({e})")
return False
except Exception as e:
print(f"🤷♂️ JSON 파일 저장 중 예상치 못한 에러! ({e})")
print(f" > 발생 에러 상세: {e}") # 에러 내용을 더 자세히 출력
print(f" > 파일명: {save_file}, 시도된 데이터: {json_data}") # 어떤 데이터를 저장하려다 문제인지
return False # dump 함수에서 예외 발생 시 False 리턴
def Work_Save( json_str ):
print("Work_save routine 이다.")
print(json_str)
save_file = 'work.txt' # JSON 전체를 저장할 파일 이름
try:
with open(save_file, 'w', encoding='utf-8') as f:
json.dump(json_str, f)
print(f"✅ 파일 '{save_file}'에 JSON 데이터 전체 저장 성공!")
machine.reset()
return True
except OSError as e: # 파일 입출력 관련 에러
print(f"🚨 파일 '{save_file}'에 JSON 데이터 저장 중 에러 발생! ({e})")
return False
except Exception as e:
print(f"🤷♂️ JSON 파일 저장 중 예상치 못한 에러! ({e})")
print(f" > 발생 에러 상세: {e}") # 에러 내용을 더 자세히 출력
print(f" > 파일명: {save_file}, 시도된 데이터: {json_data}") # 어떤 데이터를 저장하려다 문제인지
return False # dump 함수에서 예외 발생 시 False 리턴
def MQTT_Subscribe_Window(sub_ms=50000):
need_reset = False
ok_open = Send_AT('AT+QMTOPEN=0,"{}",{}'.format(MQTT_SERVER, MQTT_PORT), b"+QMTOPEN: 0,0")
if not ok_open:
print("[AT+QMTOPEN=0] open fail"); need_reset = True
else:
# MQTT SERVER CONNECT
ok_conn = Send_AT('AT+QMTCONN=0,"{}"'.format(MQTT_CLIENT_ID), b"+QMTCONN: 0,0,0")
if not ok_conn:
print("[AT+QMTCONN=0] conn fail"); need_reset = True
Send_AT("AT+QMTCLOSE=0", b"+QMTCLOSE: 0,0")
else:
# MQTT SERVER OPEN OK SUBSCRITE
if Send_AT_Any('AT+QMTSUB=0,1,"{}",0'.format(MQTT_SUB_TOPIC),[b"OK", b"+QMTSUB:"], timeout=4000, max_retry=2):
print("[AT+QMTSUB=0,1] waiting {} ms...".format(sub_ms))
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < sub_ms:
buf = _Read_Until(250)
if buf and (b"+QMTRECV:" in buf):
Subscribe_Recv_Data(buf)
# _Wait_With_GPS(3000) # late packets
else:
print("[SUB FAIL] → will reset LTE")
need_reset = True
Send_AT("AT+QMTDISC=0" , b"+QMTDISC: 0,0")
Send_AT("AT+QMTCLOSE=0", b"+QMTCLOSE: 0,0")
if need_reset:
reset_line.value(1)
LTE_Module_Reset(hold_s=5, boot_wait_s=8)
if not LTE_Init_Once():
reset_line.value(0)
print("[RESET] LTE_Init_Once fail (retry next loop)")
def _GPS_NMEA_CheckSum_OK(s):
try:
if not s or s[0] != '$': return False
star = s.rfind('*')
if star == -1: return False
data = s[1:star]
checksum = 0
for c in data:
checksum ^= ord(c)
given = int(s[star+1:star+3], 16)
return checksum == given
except:
return True # 관대 처리
def _GPS_DM_To_Deg(dm, hemi):
if not dm: return None
try:
f = float(dm)
deg = int(f // 100)
minutes = f - deg * 100
dec = deg + minutes / 60.0
if hemi in ('S', 'W'):
dec = -dec
return dec
except:
return None
def _GPS_Parse_GPRMC(parts):
global _imei,_gps_last_lat, _gps_last_lon, _gps_last_date, _gps_last_time
if len(parts) < 10: return
status = parts[2]
raw_time = parts[1]
raw_lat, hemi_lat = parts[3], (parts[4] if len(parts) > 4 else None)
raw_lon, hemi_lon = parts[5], (parts[6] if len(parts) > 6 else None)
raw_date = parts[9]
if raw_time and len(raw_time) >= 6:
_gps_last_time = f"{raw_time[0:2]}:{raw_time[2:4]}:{raw_time[4:6]}"
if raw_date and len(raw_date) == 6:
_gps_last_date = f"20{raw_date[4:6]}-{raw_date[2:4]}-{raw_date[0:2]}"
lat = _GPS_DM_To_Deg(raw_lat, hemi_lat) if raw_lat and hemi_lat else None
lon = _GPS_DM_To_Deg(raw_lon, hemi_lon) if raw_lon and hemi_lon else None
if status == 'A':
if lat is not None: _gps_last_lat = lat
if lon is not None: _gps_last_lon = lon
def _GPS_Parse_GPGGA(parts):
global _gps_last_fix, _gps_last_sats, _gps_last_time, _gps_last_lat, _gps_last_lon
if len(parts) < 8: return
raw_time = parts[1]
if raw_time and len(raw_time) >= 6:
_gps_last_time = f"{raw_time[0:2]}:{raw_time[2:4]}:{raw_time[4:6]}"
raw_lat, hemi_lat = parts[2], (parts[3] if len(parts) > 3 else None)
raw_lon, hemi_lon = parts[4], (parts[5] if len(parts) > 5 else None)
lat = _GPS_DM_To_Deg(raw_lat, hemi_lat) if raw_lat and hemi_lat else None
lon = _GPS_DM_To_Deg(raw_lon, hemi_lon) if raw_lon and hemi_lon else None
if lat is not None: _gps_last_lat = lat
if lon is not None: _gps_last_lon = lon
try: _gps_last_fix = int(parts[6]) if parts[6] else None
except: _gps_last_fix = None
try: _gps_last_sats = int(parts[7]) if parts[7] else None
except: _gps_last_sats = None
def _GPS_Service():
"""루프 어디서든 자주 호출: 입력 누적/파싱 + 1초마다 출력"""
_Wait_With_GPS(1000)
global _gps_buf, _gps_last_print
# 1) 입력 누적 & 문장 파싱
if GPS_UART.any():
_gps_buf += GPS_UART.read()
if b'\n' in _gps_buf:
lines = _gps_buf.split(b'\n')
_gps_buf = lines[-1] # 마지막 조각 보존
for raw in lines[:-1]:
try:
s = raw.decode('ascii').strip()
except:
s = ""
if s.startswith('$') and _GPS_NMEA_CheckSum_OK(s):
body = s[1:s.find('*')] if '*' in s else s[1:]
parts = body.split(',')
head = parts[0]
if head.endswith("GPRMC") or head.endswith("GNRMC"):
_GPS_Parse_GPRMC(parts)
elif head.endswith("GPGGA") or head.endswith("GNGGA"):
_GPS_Parse_GPGGA(parts)
# 2) 1초마다 상태 출력(없어도 NA)
if time.ticks_diff(time.ticks_ms(), _gps_last_print) >= GPS_PRINT_EVERY_MS:
_gps_last_print = time.ticks_ms()
lat = _gps_last_lat
lon = _gps_last_lon
d = _gps_last_date
t = _gps_last_time
fx = _gps_last_fix
sv = _gps_last_sats
lat_s = ("{:.6f}".format(lat)) if isinstance(lat, (int, float)) else "NA"
lng_s = ("{:.6f}".format(lon)) if isinstance(lon, (int, float)) else "NA"
d = d if d else "NA"
t = t if t else "NA"
fx_s = str(fx) if fx is not None else "NA"
sv_s = str(sv) if sv is not None else "NA"
#ss = print('[GPS]', 'lat=', lat_s, 'lon=', lon_s, 'date=', d_s, 'time=', t_s, 'fix=', fx_s, 'sats=', sv_s)
global MQTT_MESSAGE
MQTT_MESSAGE ="changed message"
gJson['IMEI'] = imei_Read()
gJson['MMSI'] = '0123456789'
gJson['msg'] = 'Help>>> Sinktracking!!!'
gJson['lat'] = lat_s
gJson['lng'] = lng_s
gJson['date'] = d
gJson['time'] = t
gJson['fix'] = fx_s
gJson['sats'] = sv_s
gJson['captain'] = CAPTAIN
gJson['phone'] = PHONE
gJson['ship_name'] = SHIP_NAME
#msg = "div={},MMSI={},IMEI={},msg={},lat={},lon={},date={},time={},fix={},sats={},ship_name={}, captain={},phone={}".format("TEST","8888",IMEI,MQTT_MESSAGE,lat_s, lng_s, d, t, fx_s, sv_s, SHIP_NAME, CAPTAIN, PHONE)
#print(msg)
#MQTT_MESSAGE=MQTT_MESSAGE+lat_s
def _Wait_With_GPS(ms):
"""대기하면서 GPS 서비스도 계속 돌림"""
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < ms:
# _GPS_Service()
time.sleep(0.01)
#========================================================
#
# 부팅/메인(Main Routime)
#
#========================================================
if __name__ == '__main__':
print("main.py가 직접 실행되었습니다.")
else:
print("main.py는 모듈로 임포트되었습니다.")
LTE_RST.value(1)
print("[BOOT] LTE init...")
print ( DIV)
if DIV == 'loop' or 'test':
_thread.start_new_thread(Worker, ()) # 인자가 없으므로 빈 튜플 ()을 전달
else:
_thread
imei = imei_Read()
if imei == False:
IMEI = ""
else:
IMEI = imei
gJson = json.loads(iJson)
global gjson
MQTT_COUNT = 0
json_data = None
#-----------------------------------
# 기본정보는 save.txt 파일로 저장되어 있다
#-----------------------------------
data_dic = Work_Read()
if isinstance(data_dic, dict):
gJson['MMSI'] = data_dic['MMSI']
gJson['captain'] = data_dic['Captain']
gJson['phone'] = data_dic['Phone']
gJson['ship_name'] = data_dic['Ship_Name']
PMAX = int(data_dic["Max"])
if PMAX > 0:
DIV = 'test'
print(DIV)
print(PMAX)
print('====== pmax===========')
print("===========LTE_Init_Once====================")
while True:
MQTT_Subscribe_Window(sub_ms=50000)
ok = MQTT_Publish_Once(1)
if not ok:
print("[WARN] publish cycle failed (Send_AT 2회 정책으로 skip됨)")
for _ in range(PUBLISH_INTERVAL_S):
Blink_LED(0.01)
# _GPS_Service()
# _Wait_With_GPS(1000) # 1s