"""Local WebSocket bridge for placing MT5 bulk orders from a browser form. Run: python layering/server.py """ from __future__ import annotations import asyncio import glob import importlib.util import json import logging import math import os import subprocess import sys import time from dataclasses import asdict, dataclass from typing import Any def _auto_install_requirements_if_needed() -> None: dependency_map = { "MetaTrader5": "MetaTrader5>=5.0.45", "websockets": "websockets>=12.0", } missing = [module for module in dependency_map if importlib.util.find_spec(module) is None] if not missing: return print(f"[bootstrap] Missing packages: {', '.join(missing)}") pip_specs = [dependency_map[module] for module in missing] print(f"[bootstrap] Installing: {', '.join(pip_specs)}") def _run(cmd: list[str]) -> tuple[bool, str]: try: subprocess.run(cmd, check=True) return True, "" except FileNotFoundError as exc: return False, f"command not found: {exc}" except subprocess.CalledProcessError as exc: return False, f"exit code {exc.returncode}" except Exception as exc: return False, str(exc) install_commands = [ [sys.executable, "-m", "pip", "install", *pip_specs], ["py", "-m", "pip", "install", *pip_specs], ["python", "-m", "pip", "install", *pip_specs], ] for cmd in install_commands: ok, _ = _run(cmd) if ok: print("[bootstrap] Dependencies installed successfully") return print("[bootstrap] pip install failed, trying to bootstrap pip with ensurepip") ensurepip_commands = [ [sys.executable, "-m", "ensurepip", "--upgrade"], ["py", "-m", "ensurepip", "--upgrade"], ["python", "-m", "ensurepip", "--upgrade"], ] ensurepip_ok = False for cmd in ensurepip_commands: ok, _ = _run(cmd) if ok: ensurepip_ok = True break if not ensurepip_ok: print("[bootstrap] Failed to bootstrap pip (ensurepip unavailable)") print("[bootstrap] Install dependencies manually if imports still fail") return for cmd in install_commands: ok, err = _run(cmd) if ok: print("[bootstrap] Dependencies installed successfully after ensurepip") return print(f"[bootstrap] Failed to install dependencies automatically: {err}") print("[bootstrap] Please install MetaTrader5 and websockets manually") _auto_install_requirements_if_needed() try: import MetaTrader5 as mt5 # type: ignore[import-not-found] except Exception: # pragma: no cover mt5 = None import websockets try: from websockets.legacy.server import WebSocketServerProtocol except Exception: # pragma: no cover from websockets.server import WebSocketServerProtocol HOST = "127.0.0.1" PORT = 8765 SERVER_PY_VERSION = "v1-b9c12c6" RECONNECT_COOLDOWN_SECONDS = 10.0 TERMINAL_WARNING_COOLDOWN_SECONDS = 30.0 @dataclass class BulkOrderRequest: symbol: str side: str volume: float count: int order_kind: str = "MARKET" pending_price: float | None = None sl: float | None = None tp: float | None = None sl_pips: float | None = None tp_pips: float | None = None deviation: int = 20 magic: int = 606060 delay_ms: int = 120 comment: str = "LayeringWebApp" @dataclass class BreakEvenRequest: symbol: str | None = None side: str = "ALL" magic: int | None = None offset_pips: float | None = None target_sl_price: float | None = None @dataclass class TradeStatusRequest: symbol: str | None = None side: str = "ALL" magic: int | None = None @dataclass class ClosePositionsRequest: symbol: str | None = None side: str = "ALL" magic: int | None = None profit_filter: str = "ALL" close_percent: float = 100.0 deviation: int = 20 class MT5BulkExecutor: def __init__(self) -> None: self.connected = False env_path = ( os.getenv("MT5_PATH") or os.getenv("MCP_TERMINAL_PATH") or os.getenv("MT5_TERMINAL_PATH") or "" ).strip() self.mt5_path = self._resolve_mt5_path(env_path) self.mt5_login = self._parse_int_env("MT5_LOGIN") self.mt5_password = (os.getenv("MT5_PASSWORD") or "").strip() self.mt5_server = (os.getenv("MT5_SERVER") or "").strip() self.last_reconnect_attempt_ts = 0.0 self.last_terminal_warning_ts = 0.0 self.last_terminal_launch_ts = 0.0 @staticmethod def _parse_int_env(name: str) -> int | None: raw = (os.getenv(name) or "").strip() if not raw: return None try: return int(raw) except ValueError: return None @staticmethod def _resolve_mt5_path(explicit_path: str) -> str: if explicit_path: return explicit_path common_paths = [ r"C:\Program Files\MetaTrader 5\terminal64.exe", r"C:\Program Files (x86)\MetaTrader 5\terminal64.exe", ] for path in common_paths: if os.path.exists(path): return path discovered = [] discovered.extend(glob.glob(r"C:\Program Files\*\terminal64.exe")) discovered.extend(glob.glob(r"C:\Program Files (x86)\*\terminal64.exe")) if not discovered: return "" # Prefer paths that explicitly include MetaTrader naming. discovered.sort() for path in discovered: upper = path.upper() if "METATRADER" in upper or "MT5" in upper: return path return discovered[0] @staticmethod def _is_terminal_not_found(error: Any) -> bool: return isinstance(error, tuple) and len(error) > 0 and int(error[0]) == -4 def _format_mt5_error(self, prefix: str) -> str: err = mt5.last_error() if mt5 is not None else "MetaTrader5 unavailable" if self._is_terminal_not_found(err): details = [ f"{prefix}: {err}", "Ensure your MT5 desktop terminal is running and logged in on this machine.", ] if self.mt5_path: details.append(f"Configured MT5 path: {self.mt5_path}") else: details.append("Set MT5_PATH to your terminal64.exe path.") return " ".join(details) return f"{prefix}: {err}" def _launch_terminal_if_needed(self) -> bool: now = time.time() if now - self.last_terminal_launch_ts < 30.0: return False if not self.mt5_path or not os.path.exists(self.mt5_path): return False try: subprocess.Popen([self.mt5_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) self.last_terminal_launch_ts = now logging.info("Started MT5 terminal process: %s", self.mt5_path) return True except Exception as exc: logging.warning("Failed to start MT5 terminal process: %s", exc) return False def _login_if_configured(self) -> tuple[bool, str]: if mt5 is None: return False, "MetaTrader5 package is not installed" account = mt5.account_info() if account is not None: return True, "already logged in" if self.mt5_login is None or not self.mt5_password or not self.mt5_server: return False, "MT5 terminal not logged in and MT5_LOGIN/MT5_PASSWORD/MT5_SERVER are not configured" if not mt5.login(login=self.mt5_login, password=self.mt5_password, server=self.mt5_server): return False, f"mt5.login failed: {mt5.last_error()}" if mt5.account_info() is None: return False, "mt5.login succeeded but account_info() is still None" return True, "logged in" def _initialize_mt5(self) -> tuple[bool, str]: if mt5 is None: return False, "MetaTrader5 package is not installed" attempts: list[tuple[dict[str, Any], str]] = [] if self.mt5_path: attempts.append(({"path": self.mt5_path}, f"path={self.mt5_path}")) attempts.append(({}, "default")) errors: list[str] = [] for kwargs, label in attempts: if mt5.initialize(**kwargs): # MT5 terminal can require a brief warm-up before API calls are stable. for _ in range(3): if mt5.terminal_info() is not None: return True, "connected" time.sleep(0.6) return True, "connected" err = mt5.last_error() errors.append(f"{label}: {err}") if self._is_terminal_not_found(err) and self._launch_terminal_if_needed(): time.sleep(3.0) if mt5.initialize(**kwargs): for _ in range(3): if mt5.terminal_info() is not None: return True, "connected" time.sleep(0.6) return True, "connected" errors.append(f"{label} (after launch): {mt5.last_error()}") hint = "" if not self.mt5_path: hint = " Set MT5_PATH to your terminal64.exe path if auto-discovery fails." return False, f"mt5.initialize failed ({'; '.join(errors)}).{hint}" def connect(self, force: bool = False) -> tuple[bool, str]: if mt5 is None: return False, "MetaTrader5 package is not installed" if self.connected and mt5.terminal_info() is not None and mt5.account_info() is not None: return True, "already connected" now = time.time() if not force and now - self.last_reconnect_attempt_ts < RECONNECT_COOLDOWN_SECONDS: wait_s = int(RECONNECT_COOLDOWN_SECONDS - (now - self.last_reconnect_attempt_ts)) + 1 return False, f"MT5 reconnect cooldown active ({wait_s}s). Keep terminal open/logged in and retry." self.last_reconnect_attempt_ts = now self.connected = False mt5.shutdown() ok, reason = self._initialize_mt5() if not ok: return False, reason login_ok, login_reason = self._login_if_configured() if not login_ok: return False, login_reason self.connected = True return True, "connected" def shutdown(self) -> None: if mt5 is not None: mt5.shutdown() self.connected = False def _positions_get(self, symbol_filter: str | None = None, allow_forced_reconnect: bool = False): if mt5 is None: return None positions = mt5.positions_get(symbol=symbol_filter) if symbol_filter else mt5.positions_get() if positions is not None: return positions err = mt5.last_error() if self._is_terminal_not_found(err): now = time.time() if now - self.last_terminal_warning_ts >= TERMINAL_WARNING_COOLDOWN_SECONDS: logging.warning("positions_get failed with terminal not found; reconnecting and retrying once") self.last_terminal_warning_ts = now self.connected = False mt5.shutdown() ok, _ = self.connect(force=allow_forced_reconnect) if ok: return mt5.positions_get(symbol=symbol_filter) if symbol_filter else mt5.positions_get() return None @staticmethod def _as_dict(result: Any) -> dict[str, Any]: if hasattr(result, "_asdict"): return result._asdict() # type: ignore[no-any-return] return {k: getattr(result, k) for k in dir(result) if not k.startswith("_")} @staticmethod def _pick_filling_mode(symbol: str): if mt5 is None: return "RETURN" info = mt5.symbol_info(symbol) if info is None: return mt5.ORDER_FILLING_RETURN mode = int(getattr(info, "filling_mode", 0)) if mode & 1: return mt5.ORDER_FILLING_FOK if mode & 2: return mt5.ORDER_FILLING_IOC if mode & 4: return mt5.ORDER_FILLING_RETURN return mt5.ORDER_FILLING_RETURN def place_bulk(self, req: BulkOrderRequest) -> dict[str, Any]: if req.count <= 0: return {"ok": False, "error": "count must be > 0"} if req.volume <= 0: return {"ok": False, "error": "volume must be > 0"} side = req.side.strip().upper() if side not in {"BUY", "SELL"}: return {"ok": False, "error": "side must be BUY or SELL"} order_kind = req.order_kind.strip().upper() allowed_order_kinds = {"MARKET", "BUY_LIMIT", "SELL_LIMIT", "BUY_STOP", "SELL_STOP"} if order_kind not in allowed_order_kinds: return {"ok": False, "error": "order_kind must be MARKET, BUY_LIMIT, SELL_LIMIT, BUY_STOP, or SELL_STOP"} is_pending = order_kind != "MARKET" if is_pending and req.pending_price is None: return {"ok": False, "error": "pending_price is required for pending orders"} if req.sl_pips is not None and req.sl_pips <= 0: return {"ok": False, "error": "sl_pips must be > 0"} if req.tp_pips is not None and req.tp_pips <= 0: return {"ok": False, "error": "tp_pips must be > 0"} if mt5 is None: return { "ok": False, "error": "MetaTrader5 not available in this environment", "hint": "Run this on your MT5 Windows machine with MetaTrader5 installed", } ok, reason = self.connect(force=True) if not ok: return {"ok": False, "error": reason} symbol = req.symbol.strip() info = mt5.symbol_info(symbol) if info is None: return {"ok": False, "error": f"Symbol not found: {symbol}"} if not info.visible and not mt5.symbol_select(symbol, True): return {"ok": False, "error": f"Cannot select symbol: {symbol}"} point = float(getattr(info, "point", 0.0)) if point <= 0: return {"ok": False, "error": f"Invalid point size for symbol: {symbol}"} digits = int(getattr(info, "digits", 5)) if order_kind == "MARKET": order_type = mt5.ORDER_TYPE_BUY if side == "BUY" else mt5.ORDER_TYPE_SELL order_side = side elif order_kind == "BUY_LIMIT": order_type = mt5.ORDER_TYPE_BUY_LIMIT order_side = "BUY" elif order_kind == "SELL_LIMIT": order_type = mt5.ORDER_TYPE_SELL_LIMIT order_side = "SELL" elif order_kind == "BUY_STOP": order_type = mt5.ORDER_TYPE_BUY_STOP order_side = "BUY" else: order_type = mt5.ORDER_TYPE_SELL_STOP order_side = "SELL" results: list[dict[str, Any]] = [] success_count = 0 for i in range(req.count): if is_pending: price = float(req.pending_price or 0.0) request: dict[str, Any] = { "action": mt5.TRADE_ACTION_PENDING, "symbol": symbol, "volume": float(req.volume), "type": order_type, "price": float(price), "deviation": int(req.deviation), "magic": int(req.magic), "comment": req.comment, "type_time": mt5.ORDER_TIME_GTC, "type_filling": self._pick_filling_mode(symbol), } else: tick = mt5.symbol_info_tick(symbol) if tick is None: results.append({"index": i + 1, "ok": False, "error": "No symbol tick"}) continue price = tick.ask if side == "BUY" else tick.bid request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "volume": float(req.volume), "type": order_type, "price": float(price), "deviation": int(req.deviation), "magic": int(req.magic), "comment": req.comment, "type_time": mt5.ORDER_TIME_GTC, "type_filling": self._pick_filling_mode(symbol), } sl_value = req.sl tp_value = req.tp # Absolute SL/TP prices take priority. If missing, derive price from pip distance. if sl_value is None and req.sl_pips is not None: distance = float(req.sl_pips) * point sl_value = price - distance if order_side == "BUY" else price + distance if tp_value is None and req.tp_pips is not None: distance = float(req.tp_pips) * point tp_value = price + distance if order_side == "BUY" else price - distance if sl_value is not None: request["sl"] = round(float(sl_value), digits) if tp_value is not None: request["tp"] = round(float(tp_value), digits) response = mt5.order_send(request) if response is None: results.append( { "index": i + 1, "ok": False, "error": "order_send returned None", "request": request, } ) else: retcode = int(getattr(response, "retcode", -1)) success_codes = {int(mt5.TRADE_RETCODE_DONE)} placed_code = getattr(mt5, "TRADE_RETCODE_PLACED", None) if placed_code is not None: success_codes.add(int(placed_code)) was_ok = retcode in success_codes if was_ok: success_count += 1 results.append( { "index": i + 1, "ok": was_ok, "retcode": retcode, "request": request, "response": self._as_dict(response), } ) if i < req.count - 1 and req.delay_ms > 0: time.sleep(req.delay_ms / 1000.0) return { "ok": success_count == req.count, "symbol": symbol, "side": order_side, "order_kind": order_kind, "requested": req.count, "successful": success_count, "failed": req.count - success_count, "results": results, "request": asdict(req), "timestamp": int(time.time()), } def move_positions_to_breakeven(self, req: BreakEvenRequest) -> dict[str, Any]: side_filter = req.side.strip().upper() if side_filter not in {"ALL", "BUY", "SELL"}: return {"ok": False, "error": "side must be ALL, BUY, or SELL"} if req.offset_pips is None and req.target_sl_price is None: return {"ok": False, "error": "Provide offset_pips or target_sl_price"} if req.offset_pips is not None and req.target_sl_price is not None: return {"ok": False, "error": "Use only one: offset_pips or target_sl_price"} if req.offset_pips is not None and req.offset_pips < 0: return {"ok": False, "error": "offset_pips must be >= 0"} if mt5 is None: return { "ok": False, "error": "MetaTrader5 not available in this environment", "hint": "Run this on your MT5 Windows machine with MetaTrader5 installed", } ok, reason = self.connect(force=True) if not ok: return {"ok": False, "error": reason} symbol_filter = req.symbol.strip() if req.symbol else None positions = self._positions_get(symbol_filter, allow_forced_reconnect=True) if positions is None: return {"ok": False, "error": self._format_mt5_error("positions_get failed")} results: list[dict[str, Any]] = [] targeted = 0 modified = 0 for pos in positions: pos_side = "BUY" if int(pos.type) == mt5.POSITION_TYPE_BUY else "SELL" if side_filter != "ALL" and pos_side != side_filter: continue pos_magic = int(getattr(pos, "magic", 0)) if req.magic is not None and pos_magic != req.magic: continue symbol = str(pos.symbol) info = mt5.symbol_info(symbol) if info is None: results.append( { "ticket": int(pos.ticket), "symbol": symbol, "ok": False, "error": "symbol_info unavailable", } ) continue point = float(getattr(info, "point", 0.0)) digits = int(getattr(info, "digits", 5)) if point <= 0: results.append( { "ticket": int(pos.ticket), "symbol": symbol, "ok": False, "error": "invalid symbol point", } ) continue targeted += 1 open_price = float(pos.price_open) existing_sl = float(getattr(pos, "sl", 0.0) or 0.0) existing_tp = float(getattr(pos, "tp", 0.0) or 0.0) if req.target_sl_price is not None: new_sl = float(req.target_sl_price) else: offset_price = float(req.offset_pips or 0.0) * point new_sl = open_price + offset_price if pos_side == "BUY" else open_price - offset_price new_sl = round(new_sl, digits) # Avoid loosening risk: only move SL when it improves protection. if pos_side == "BUY": should_update = existing_sl == 0.0 or new_sl > existing_sl else: should_update = existing_sl == 0.0 or new_sl < existing_sl if not should_update: results.append( { "ticket": int(pos.ticket), "symbol": symbol, "side": pos_side, "ok": True, "updated": False, "reason": "existing SL is already better than breakeven target", "existing_sl": existing_sl, "target_sl": new_sl, } ) continue request = { "action": mt5.TRADE_ACTION_SLTP, "symbol": symbol, "position": int(pos.ticket), "sl": new_sl, "tp": existing_tp, } response = mt5.order_send(request) if response is None: results.append( { "ticket": int(pos.ticket), "symbol": symbol, "side": pos_side, "ok": False, "updated": False, "error": "order_send returned None", "request": request, } ) continue retcode = int(getattr(response, "retcode", -1)) was_ok = retcode == mt5.TRADE_RETCODE_DONE if was_ok: modified += 1 results.append( { "ticket": int(pos.ticket), "symbol": symbol, "side": pos_side, "ok": was_ok, "updated": was_ok, "retcode": retcode, "request": request, "response": self._as_dict(response), } ) return { "ok": True, "action": "move_breakeven", "requested_filters": { "symbol": symbol_filter, "side": side_filter, "magic": req.magic, "offset_pips": req.offset_pips, "target_sl_price": req.target_sl_price, }, "targeted_positions": targeted, "updated_positions": modified, "results": results, "timestamp": int(time.time()), } def get_positions_status(self, req: TradeStatusRequest) -> dict[str, Any]: side_filter = req.side.strip().upper() if side_filter not in {"ALL", "BUY", "SELL"}: return {"ok": False, "error": "side must be ALL, BUY, or SELL"} if mt5 is None: return { "ok": False, "error": "MetaTrader5 not available in this environment", "hint": "Run this on your MT5 Windows machine with MetaTrader5 installed", } ok, reason = self.connect() if not ok: symbol_filter = req.symbol.strip() if req.symbol else None return { "ok": True, "type": "trade_status", "mt5_connected": False, "warning": reason, "filters": { "symbol": symbol_filter, "side": side_filter, "magic": req.magic, }, "count": 0, "buy_count": 0, "sell_count": 0, "total_volume": 0.0, "total_profit": 0.0, "positions": [], "timestamp": int(time.time()), } symbol_filter = req.symbol.strip() if req.symbol else None positions = self._positions_get(symbol_filter, allow_forced_reconnect=False) if positions is None: return {"ok": False, "error": self._format_mt5_error("positions_get failed")} filtered_positions: list[dict[str, Any]] = [] buy_count = 0 sell_count = 0 total_volume = 0.0 total_profit = 0.0 for pos in positions: pos_side = "BUY" if int(pos.type) == mt5.POSITION_TYPE_BUY else "SELL" if side_filter != "ALL" and pos_side != side_filter: continue pos_magic = int(getattr(pos, "magic", 0)) if req.magic is not None and pos_magic != req.magic: continue volume = float(getattr(pos, "volume", 0.0) or 0.0) profit = float(getattr(pos, "profit", 0.0) or 0.0) if pos_side == "BUY": buy_count += 1 else: sell_count += 1 total_volume += volume total_profit += profit filtered_positions.append( { "ticket": int(pos.ticket), "symbol": str(pos.symbol), "side": pos_side, "magic": pos_magic, "volume": volume, "open_price": float(getattr(pos, "price_open", 0.0) or 0.0), "current_price": float(getattr(pos, "price_current", 0.0) or 0.0), "sl": float(getattr(pos, "sl", 0.0) or 0.0), "tp": float(getattr(pos, "tp", 0.0) or 0.0), "profit": profit, "swap": float(getattr(pos, "swap", 0.0) or 0.0), "commission": float(getattr(pos, "commission", 0.0) or 0.0), "time": int(getattr(pos, "time", 0) or 0), } ) return { "ok": True, "type": "trade_status", "filters": { "symbol": symbol_filter, "side": side_filter, "magic": req.magic, }, "count": len(filtered_positions), "buy_count": buy_count, "sell_count": sell_count, "total_volume": round(total_volume, 4), "total_profit": round(total_profit, 2), "positions": filtered_positions, "timestamp": int(time.time()), } def close_positions(self, req: ClosePositionsRequest) -> dict[str, Any]: side_filter = req.side.strip().upper() if side_filter not in {"ALL", "BUY", "SELL"}: return {"ok": False, "error": "side must be ALL, BUY, or SELL"} profit_filter = req.profit_filter.strip().upper() if profit_filter not in {"ALL", "PROFIT", "LOSS"}: return {"ok": False, "error": "profit_filter must be ALL, PROFIT, or LOSS"} if req.close_percent <= 0 or req.close_percent > 100: return {"ok": False, "error": "close_percent must be > 0 and <= 100"} if mt5 is None: return { "ok": False, "error": "MetaTrader5 not available in this environment", "hint": "Run this on your MT5 Windows machine with MetaTrader5 installed", } ok, reason = self.connect(force=True) if not ok: return {"ok": False, "error": reason} symbol_filter = req.symbol.strip() if req.symbol else None positions = self._positions_get(symbol_filter, allow_forced_reconnect=True) if positions is None: return {"ok": False, "error": self._format_mt5_error("positions_get failed")} filtered_positions = [] for pos in positions: pos_side = "BUY" if int(pos.type) == mt5.POSITION_TYPE_BUY else "SELL" if side_filter != "ALL" and pos_side != side_filter: continue pos_magic = int(getattr(pos, "magic", 0)) if req.magic is not None and pos_magic != req.magic: continue pos_profit = float(getattr(pos, "profit", 0.0) or 0.0) if profit_filter == "PROFIT" and pos_profit <= 0: continue if profit_filter == "LOSS" and pos_profit >= 0: continue filtered_positions.append(pos) filtered_positions.sort(key=lambda p: int(getattr(p, "time", 0) or 0)) if not filtered_positions: return { "ok": True, "action": "close_positions", "requested_filters": { "symbol": symbol_filter, "side": side_filter, "magic": req.magic, "profit_filter": profit_filter, "close_percent": req.close_percent, }, "targeted_positions": 0, "selected_to_close": 0, "closed_positions": 0, "results": [], "timestamp": int(time.time()), } selected_count = max(1, math.ceil(len(filtered_positions) * (req.close_percent / 100.0))) selected_count = min(selected_count, len(filtered_positions)) selected_positions = filtered_positions[:selected_count] results: list[dict[str, Any]] = [] closed_positions = 0 for pos in selected_positions: symbol = str(pos.symbol) pos_side = "BUY" if int(pos.type) == mt5.POSITION_TYPE_BUY else "SELL" close_type = mt5.ORDER_TYPE_SELL if pos_side == "BUY" else mt5.ORDER_TYPE_BUY tick = mt5.symbol_info_tick(symbol) if tick is None: results.append( { "ticket": int(pos.ticket), "symbol": symbol, "side": pos_side, "ok": False, "error": "No symbol tick", } ) continue close_price = float(tick.bid if pos_side == "BUY" else tick.ask) request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "position": int(pos.ticket), "volume": float(getattr(pos, "volume", 0.0) or 0.0), "type": close_type, "price": close_price, "deviation": int(req.deviation), "magic": int(getattr(pos, "magic", 0) or 0), "comment": "LayeringClose", "type_time": mt5.ORDER_TIME_GTC, "type_filling": self._pick_filling_mode(symbol), } response = mt5.order_send(request) if response is None: results.append( { "ticket": int(pos.ticket), "symbol": symbol, "side": pos_side, "ok": False, "closed": False, "error": "order_send returned None", "request": request, } ) continue retcode = int(getattr(response, "retcode", -1)) done_codes = {int(mt5.TRADE_RETCODE_DONE)} done_partial = getattr(mt5, "TRADE_RETCODE_DONE_PARTIAL", None) if done_partial is not None: done_codes.add(int(done_partial)) was_ok = retcode in done_codes if was_ok: closed_positions += 1 results.append( { "ticket": int(pos.ticket), "symbol": symbol, "side": pos_side, "ok": was_ok, "closed": was_ok, "retcode": retcode, "request": request, "response": self._as_dict(response), } ) return { "ok": True, "action": "close_positions", "requested_filters": { "symbol": symbol_filter, "side": side_filter, "magic": req.magic, "profit_filter": profit_filter, "close_percent": req.close_percent, "deviation": req.deviation, }, "targeted_positions": len(filtered_positions), "selected_to_close": selected_count, "closed_positions": closed_positions, "results": results, "timestamp": int(time.time()), } EXECUTOR = MT5BulkExecutor() def parse_bulk_request(payload: dict[str, Any]) -> BulkOrderRequest: return BulkOrderRequest( symbol=str(payload.get("symbol", "XAUUSD")), side=str(payload.get("side", "BUY")), order_kind=str(payload.get("order_kind", "MARKET")), pending_price=float(payload["pending_price"]) if payload.get("pending_price") not in (None, "") else None, volume=float(payload.get("volume", 0.01)), count=int(payload.get("count", 1)), sl=float(payload["sl"]) if payload.get("sl") not in (None, "") else None, tp=float(payload["tp"]) if payload.get("tp") not in (None, "") else None, sl_pips=float(payload["sl_pips"]) if payload.get("sl_pips") not in (None, "") else None, tp_pips=float(payload["tp_pips"]) if payload.get("tp_pips") not in (None, "") else None, deviation=int(payload.get("deviation", 20)), magic=int(payload.get("magic", 606060)), delay_ms=int(payload.get("delay_ms", 120)), comment=str(payload.get("comment", "LayeringWebApp")), ) def parse_breakeven_request(payload: dict[str, Any]) -> BreakEvenRequest: symbol = str(payload.get("symbol", "")).strip() or None magic_raw = payload.get("magic") magic = None if magic_raw not in (None, ""): magic = int(magic_raw) return BreakEvenRequest( symbol=symbol, side=str(payload.get("side", "ALL")), magic=magic, offset_pips=float(payload["offset_pips"]) if payload.get("offset_pips") not in (None, "") else None, target_sl_price=float(payload["target_sl_price"]) if payload.get("target_sl_price") not in (None, "") else None, ) def parse_trade_status_request(payload: dict[str, Any]) -> TradeStatusRequest: symbol = str(payload.get("symbol", "")).strip() or None magic_raw = payload.get("magic") magic = None if magic_raw not in (None, ""): magic = int(magic_raw) return TradeStatusRequest( symbol=symbol, side=str(payload.get("side", "ALL")), magic=magic, ) def parse_close_positions_request(payload: dict[str, Any]) -> ClosePositionsRequest: symbol = str(payload.get("symbol", "")).strip() or None magic_raw = payload.get("magic") magic = None if magic_raw not in (None, ""): magic = int(magic_raw) return ClosePositionsRequest( symbol=symbol, side=str(payload.get("side", "ALL")), magic=magic, profit_filter=str(payload.get("profit_filter", "ALL")), close_percent=float(payload.get("close_percent", 100.0)), deviation=int(payload.get("deviation", 20)), ) async def send_json(ws: WebSocketServerProtocol, message: dict[str, Any]) -> None: await ws.send(json.dumps(message, default=str)) async def handle_connection(ws: WebSocketServerProtocol) -> None: await send_json( ws, { "type": "hello", "ok": True, "message": "Connected to local layering bridge", "server_py_version": SERVER_PY_VERSION, "server": {"host": HOST, "port": PORT}, }, ) async for raw in ws: try: data = json.loads(raw) except json.JSONDecodeError: await send_json(ws, {"type": "error", "ok": False, "error": "Invalid JSON"}) continue action = str(data.get("action", "")).strip().lower() if action == "ping": await send_json(ws, {"type": "pong", "ok": True, "ts": int(time.time())}) continue if action == "place_bulk": try: request = parse_bulk_request(data.get("payload", {})) except (TypeError, ValueError) as exc: await send_json( ws, { "type": "bulk_result", "ok": False, "error": f"Invalid payload: {exc}", }, ) continue loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, EXECUTOR.place_bulk, request) await send_json(ws, {"type": "bulk_result", **result}) continue if action == "move_breakeven": try: request = parse_breakeven_request(data.get("payload", {})) except (TypeError, ValueError) as exc: await send_json( ws, { "type": "breakeven_result", "ok": False, "error": f"Invalid payload: {exc}", }, ) continue loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, EXECUTOR.move_positions_to_breakeven, request) await send_json(ws, {"type": "breakeven_result", **result}) continue if action == "trade_status": try: request = parse_trade_status_request(data.get("payload", {})) except (TypeError, ValueError) as exc: await send_json( ws, { "type": "trade_status", "ok": False, "error": f"Invalid payload: {exc}", }, ) continue loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, EXECUTOR.get_positions_status, request) await send_json(ws, result) continue if action == "close_positions": try: request = parse_close_positions_request(data.get("payload", {})) except (TypeError, ValueError) as exc: await send_json( ws, { "type": "close_result", "ok": False, "error": f"Invalid payload: {exc}", }, ) continue loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, EXECUTOR.close_positions, request) await send_json(ws, {"type": "close_result", **result}) continue await send_json( ws, { "type": "error", "ok": False, "error": f"Unsupported action: {action}", "supported_actions": ["ping", "place_bulk", "move_breakeven", "trade_status", "close_positions"], }, ) async def main() -> None: logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logging.info("server.py version: %s", SERVER_PY_VERSION) if EXECUTOR.mt5_path: logging.info("MT5 terminal path: %s", EXECUTOR.mt5_path) else: logging.warning("MT5 terminal path not resolved. Set MT5_PATH to terminal64.exe if needed") if EXECUTOR.mt5_login is not None and EXECUTOR.mt5_password and EXECUTOR.mt5_server: logging.info("MT5 credential login is enabled for account %s on server %s", EXECUTOR.mt5_login, EXECUTOR.mt5_server) else: logging.info("MT5 credential login is disabled; relying on terminal GUI login session") async with websockets.serve(handle_connection, HOST, PORT, max_size=5_000_000): logging.info("Layering WebSocket server listening on ws://%s:%d", HOST, PORT) await asyncio.Future() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass finally: EXECUTOR.shutdown()