Navicat连接配置信息还原并导出文件
脚本
#!/usr/bin/env python3
# -*- coding: utf-8 -*-"""
文件名: np_recover.py
功能: Navicat 配置信息还原
作者: hou.ys
创建日期: 2025-9-9
版本: 1.0
"""# pip install pycryptodomeimport hashlib
from Crypto.Cipher import AES, Blowfish
import binasciiimport pandas as pd
import xml.etree.ElementTree as ETimport clickclass NavicatPassword:def __init__(self, version=12):self.version = versionself.aesKey = b'libcckeylibcckey'self.aesIv = b'libcciv libcciv 'self.blowString = '3DC5CA39'self.blowKey = hashlib.sha1(self.blowString.encode()).digest()self.blowIv = binascii.unhexlify('d9c7c3c8870d64bd')def encrypt(self, string):result = Falseif self.version == 11:result = self.encryptEleven(string)elif self.version == 12:result = self.encryptTwelve(string)return resultdef encryptEleven(self, string):string = string.encode()round_val = len(string) // 8leftLength = len(string) % 8result = b''currentVector = self.blowIvfor i in range(round_val):temp = self.encryptBlock(self.xorBytes(string[8*i:8*i+8], currentVector))currentVector = self.xorBytes(currentVector, temp)result += tempif leftLength:currentVector = self.encryptBlock(currentVector)result += self.xorBytes(string[8*round_val:8*round_val+leftLength], currentVector)return binascii.hexlify(result).upper().decode()def encryptBlock(self, block):cipher = Blowfish.new(self.blowKey, Blowfish.MODE_ECB)return cipher.encrypt(block)def decryptBlock(self, block):cipher = Blowfish.new(self.blowKey, Blowfish.MODE_ECB)return cipher.decrypt(block)def xorBytes(self, str1, str2):result = b''min_len = min(len(str1), len(str2))for i in range(min_len):result += bytes([str1[i] ^ str2[i]])return resultdef encryptTwelve(self, string):string = string.encode()cipher = AES.new(self.aesKey, AES.MODE_CBC, self.aesIv)encrypted = cipher.encrypt(self.pad(string))return binascii.hexlify(encrypted).upper().decode()def pad(self, s):block_size = AES.block_sizereturn s + (block_size - len(s) % block_size) * bytes([block_size - len(s) % block_size])# def decrypt(self, string):# print(string)# result = False# if self.version == 11:# result = self.decryptEleven(string)# elif self.version == 12:# result = self.decryptTwelve(string)# return resultdef decrypt(self, string):try:print(f"正在解密字符串: {string}")result = Falseif self.version == 11:result = self.decryptEleven(string)elif self.version == 12:result = self.decryptTwelve(string)else:raise ValueError(f"不支持的版本: {self.version}")print(f"解密成功: {result}")return resultexcept binascii.Error as e:print(f"十六进制解码错误: {e}, 输入: {string}")return "解密失败: 无效的十六进制格式"except ValueError as e:print(f"值错误: {e}, 输入: {string}")return f"解密失败: {e}"except Exception as e:print(f"解密过程中发生未知错误: {e}, 输入: {string}")return "解密失败: 未知错误"def decryptEleven(self, upperString):string = binascii.unhexlify(upperString.lower())round_val = len(string) // 8leftLength = len(string) % 8result = b''currentVector = self.blowIvfor i in range(round_val):encryptedBlock = string[8*i:8*i+8]temp = self.xorBytes(self.decryptBlock(encryptedBlock), currentVector)currentVector = self.xorBytes(currentVector, encryptedBlock)result += tempif leftLength:currentVector = self.encryptBlock(currentVector)result += self.xorBytes(string[8*round_val:8*round_val+leftLength], currentVector)return result.decode(errors='ignore')def decryptTwelve(self, upperString):string = binascii.unhexlify(upperString.lower())cipher = AES.new(self.aesKey, AES.MODE_CBC, self.aesIv)decrypted = cipher.decrypt(string)return self.unpad(decrypted).decode()def unpad(self, s):return s[:-s[-1]]# 从文件读取XML
def parse_xml_file(file_path):tree = ET.parse(file_path)root = tree.getroot()connections_data = []for connection in root.findall('Connection'):connection_data = connection.attribconnections_data.append(connection_data)return pd.DataFrame(connections_data)def generate_jdbc_url(df, extra_params=None):conn_type = df['ConnType']host = df['Host']port = df['Port']database = df['Database']user = df['UserName']password = df['Password']"""根据数据库类型生成JDBC URL参数:conn_type: 数据库类型 (MYSQL, POSTGRESQL, ORACLE, MARIADB, REDIS)host: 主机地址port: 端口号database: 数据库名user: 用户名password: 密码extra_params: 额外参数字典返回:JDBC URL字符串"""if extra_params is None:extra_params = {}conn_type = conn_type.upper()if conn_type in ['MYSQL', 'MARIADB']:# MySQL和MariaDB使用相同的JDBC驱动base_url = f"jdbc:mysql://{host}:{port}/"if database:base_url += str(database)params = {'useSSL': 'false','allowPublicKeyRetrieval': 'true','serverTimezone': 'UTC'}params.update(extra_params)param_str = '&'.join([f"{k}={v}" for k, v in params.items()])return f"{base_url}?{param_str}"elif conn_type == 'POSTGRESQL':# PostgreSQLbase_url = f"jdbc:postgresql://{host}:{port}/"if database:base_url += str(database)params = {# 'ApplicationName': 'navicat'}params.update(extra_params)param_str = '&'.join([f"{k}={v}" for k, v in params.items()])return f"{base_url}?{param_str}" if param_str else base_url@click.command()
@click.option('--file', '-f', default='connections.ncx', help='文件路径')
def main(file):df = parse_xml_file(file)navicatPassword = NavicatPassword(12)df['rawpwd'] = df['Password'].apply(navicatPassword.decrypt)df['url'] = df.apply(generate_jdbc_url, axis=1)df = df[['ConnType','Host','Port','UserName','Password','Database', 'rawpwd', 'url']]df.to_csv('dbinfo.csv', index=False)if __name__ == '__main__':main()
使用
python .\np_recover.py -f connections.ncx