import streamlit as st import tempfile import os import shutil import re import json import uuid import datetime import requests import time import base64 import hashlib import random import glob import tkinter as tk import gc from tkinter import filedialog from collections import defaultdict # --- KONFIGURACJA --- st.set_page_config(page_title="RAG Pro v6.2 (Polish Embeddings)", page_icon="🇵🇱", layout="wide") API_BASE_URL = "http://localhost:8081/v1" API_KEY = "sk-no-key-required" DB_ROOT_DIR = "./saved_knowledge_bases" CHATS_ROOT_DIR = "./chat_histories" EPHEMERAL_ROOT = "./ephemeral_temps" PROMPTS_FILE = os.path.join(DB_ROOT_DIR, "prompts_library.json") for p in [DB_ROOT_DIR, CHATS_ROOT_DIR, EPHEMERAL_ROOT]: if not os.path.exists(p): os.makedirs(p) # --- DOMYŚLNE PROMPTY --- DEFAULT_PROMPTS = { "Domyślny Ekspert": """Jesteś ekspertem. Odpowiadaj na pytania używając WYŁĄCZNIE dostarczonego Kontekstu. Każdy fragment w Kontekście jest oznaczony źródłem [PLIK: ...]. BARDZO WAŻNE: Nie mieszaj informacji z różnych plików. """, "Asystent Ogólny": """Jesteś pomocnym asystentem AI. Odpowiadaj na pytania użytkownika zgodnie ze swoją najlepszą wiedzą. Analizuj zdjęcia, jeśli zostaną przesłane. """, "Kucharz Thermomix": """Jesteś ekspertem kulinarnym. Twoim zadaniem jest precyzyjne podawanie przepisów z bazy. Uważaj, aby nie pomylić instrukcji z różnych dań. Cytuj kroki dokładnie tak, jak są w pliku źródłowym. """ } # --- FUNKCJE MENEDŻERA PROMPTÓW --- def load_prompt_library(): if not os.path.exists(PROMPTS_FILE): with open(PROMPTS_FILE, "w", encoding="utf-8") as f: json.dump(DEFAULT_PROMPTS, f, ensure_ascii=False, indent=2) return DEFAULT_PROMPTS try: with open(PROMPTS_FILE, "r", encoding="utf-8") as f: return json.load(f) except: return DEFAULT_PROMPTS def save_prompt_library(library): with open(PROMPTS_FILE, "w", encoding="utf-8") as f: json.dump(library, f, ensure_ascii=False, indent=2) # --- INICJALIZACJA STANU --- if "vectorstore" not in st.session_state: st.session_state.vectorstore = None if "messages" not in st.session_state: st.session_state.messages = [] if "current_chat_id" not in st.session_state: st.session_state.current_chat_id = str(uuid.uuid4()) if "active_db_name" not in st.session_state: st.session_state.active_db_name = "GENERAL_CHAT" if "current_system_prompt" not in st.session_state: st.session_state.current_system_prompt = DEFAULT_PROMPTS["Domyślny Ekspert"] if "ephemeral_paths" not in st.session_state: st.session_state.ephemeral_paths = [] if "current_ephemeral_path" not in st.session_state: st.session_state.current_ephemeral_path = None if "selected_prompt_name" not in st.session_state: st.session_state.selected_prompt_name = "Domyślny Ekspert" # --- IMPORTY LANGCHAIN --- from langchain_community.document_loaders import PyPDFLoader, TextLoader from langchain_text_splitters import RecursiveCharacterTextSplitter, MarkdownTextSplitter from langchain_huggingface import HuggingFaceEmbeddings from langchain_chroma import Chroma from langchain_openai import ChatOpenAI from langchain_core.documents import Document from langchain_core.messages import HumanMessage, SystemMessage # --- FUNKCJE POMOCNICZE --- def cleanup_old_ephemeral_folders(): if os.path.exists(EPHEMERAL_ROOT): subfolders = [os.path.join(EPHEMERAL_ROOT, d) for d in os.listdir(EPHEMERAL_ROOT) if os.path.isdir(os.path.join(EPHEMERAL_ROOT, d))] for folder in subfolders: if folder == st.session_state.current_ephemeral_path: continue try: shutil.rmtree(folder) except: pass def get_server_context_limit(base_url): try: response = requests.get(base_url.replace("/v1", "/props"), timeout=0.5) if response.status_code == 200: return response.json().get("default_generation_settings", {}).get("n_ctx", 4096) except: pass return 4096 def clean_reasoning_tags(text): return re.sub(r'.*?', '', text, flags=re.DOTALL).strip() def estimate_tokens(text): if not text: return 0 return int(len(text) / 2.7) def encode_image_to_base64(uploaded_file): try: bytes_data = uploaded_file.getvalue() base64_str = base64.b64encode(bytes_data).decode('utf-8') mime_type = uploaded_file.type return f"data:{mime_type};base64,{base64_str}" except Exception as e: st.error(f"Błąd kodowania obrazu: {e}") return None def delete_chat_callback(db_name, filename): filepath = os.path.join(CHATS_ROOT_DIR, db_name, filename) if os.path.exists(filepath): os.remove(filepath) st.session_state.messages = [] st.session_state.current_chat_id = str(uuid.uuid4()) st.session_state.is_temp_chat = False if "last_stats" in st.session_state: del st.session_state.last_stats st.toast("Czat usunięty!") def reset_all_chats_callback(db_name): chat_dir = os.path.join(CHATS_ROOT_DIR, db_name) if os.path.exists(chat_dir): shutil.rmtree(chat_dir) os.makedirs(chat_dir) st.session_state.messages = [] st.session_state.current_chat_id = str(uuid.uuid4()) if "last_stats" in st.session_state: del st.session_state.last_stats st.toast("Historia wyczyszczona!") def generate_chunk_ids(chunks): ids = [] for chunk in chunks: source = chunk.metadata.get("source", "unknown") page = str(chunk.metadata.get("page", "")) line = str(chunk.metadata.get("line", "")) content = chunk.page_content raw_id = f"{source}:{page}:{line}:{content}" hash_id = hashlib.md5(raw_id.encode('utf-8')).hexdigest() ids.append(hash_id) return ids def select_folder_dialog(): folder_path = None try: root = tk.Tk() root.withdraw() root.attributes('-topmost', True) root.update() folder_path = filedialog.askdirectory(master=root) root.destroy() except Exception as e: st.error(f"Nie udało się otworzyć okna: {e}") return folder_path def process_and_download_list(ai_text): # 1. Wycinanie JSONa z odpowiedzi modelu match = re.search(r'\[DATA_START\](.*?)\[DATA_END\]', ai_text, re.DOTALL) if not match: return None try: # Próba sparsowania - obsługujemy zarówno listę jak i słownik raw_data = json.loads(match.group(1).strip()) # Standaryzacja: jeśli model dał słownik {Śniadanie: []...}, spłaszczamy do listy flat_list = [] if isinstance(raw_data, dict): for meal in raw_data.values(): flat_list.extend(meal) else: flat_list = raw_data # 2. Agregacja (Twoja sprawdzona logika) shopping_dict = defaultdict(lambda: defaultdict(float)) for item in flat_list: name = normalize_name(item['item']) unit = normalize_unit(item['unit']) shopping_dict[name][unit] += item['qty'] # 3. Generowanie tekstu do pliku (Vim style) output_lines = [f"{'PRODUKT':<30} | {'ILOŚĆ'}", "-" * 45] for product, units in sorted(shopping_dict.items()): details = [] for unit, total in units.items(): qty_str = f"{int(total)}" if total.is_integer() else f"{round(total, 2)}" details.append(f"{qty_str} {unit}" if total > 0 else f"wg uznania ({unit})") output_lines.append(f"{product.capitalize():<30} | {', '.join(details)}") return "\n".join(output_lines) except Exception as e: return f"Błąd przetwarzania: {str(e)}" # --- W UI STREAMLIT --- if st.session_state.messages: last_ai_msg = st.session_state.messages[-1] if last_ai_msg["role"] == "assistant": final_list = process_and_download_list(last_ai_msg["content"]) if final_list: with st.expander("📊 Zweryfikowana Lista Zakupów (Python)", expanded=True): st.code(final_list, language="markdown") # Przycisk pobierania pliku .txt st.download_button( label="💾 Pobierz listę do Vima (.txt)", data=final_list, file_name="zakupy.txt", mime="text/plain" ) # 1. Budujemy jeden długi ciąg tekstowy z całej historii full_chat_text = "" for msg in st.session_state.messages: role = msg["role"].upper() content = msg["content"] # Obsługa przypadku, gdy treść to lista (np. tekst + obrazek) if isinstance(content, list): text_part = next((x["text"] for x in content if x["type"] == "text"), "") full_chat_text += f"{role}:\n{text_part}\n\n" else: full_chat_text += f"{role}:\n{content}\n\n" # 2. Wyświetlamy to w "dymku" (popover), który ma w środku blok kodu z przyciskiem kopiowania with st.popover("📋 Kopiuj całą rozmowę"): st.caption("Kliknij ikonę kopiowania w prawym górnym rogu poniższego pola:") st.code(full_chat_text, language=None) # --- ZARZĄDZANIE BAZĄ --- @st.cache_resource def load_llm_client(): return ChatOpenAI(base_url=API_BASE_URL, api_key=API_KEY, model="local-model", temperature=0.2, streaming=True) @st.cache_resource def load_embedding_model(): # --- ZMIANA NA LEPSZY MODEL DLA POLSKIEGO --- return HuggingFaceEmbeddings(model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") def save_chat_to_file(db_name, chat_id, messages, chat_title=None): target_db_name = db_name if db_name != "EPHEMERAL_RAM_MODE" else "EPHEMERAL_HISTORY" chat_dir = os.path.join(CHATS_ROOT_DIR, target_db_name) if not os.path.exists(chat_dir): os.makedirs(chat_dir) if not chat_title: chat_title = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") data = {"id": chat_id, "title": chat_title, "messages": messages} with open(os.path.join(chat_dir, f"{chat_id}.json"), "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def process_local_files_to_ephemeral(dir_paths, files_per_dir=10, recursive=False): unique_id = uuid.uuid4().hex[:8] new_db_path = os.path.join(EPHEMERAL_ROOT, f"temp_db_{unique_id}") cleanup_old_ephemeral_folders() selected_files = [] scanned_folders_count = 0 for d in dir_paths: d = d.strip() if not os.path.exists(d): if d: st.warning(f"Katalog nie istnieje: {d}") continue if recursive: for root, dirs, files in os.walk(d): md_in_current_dir = [os.path.join(root, f) for f in files if f.endswith(".md")] if md_in_current_dir: scanned_folders_count += 1 if len(md_in_current_dir) > files_per_dir: selected_files.extend(random.sample(md_in_current_dir, files_per_dir)) else: selected_files.extend(md_in_current_dir) else: md_files = glob.glob(os.path.join(d, "*.md")) if md_files: scanned_folders_count += 1 if len(md_files) > files_per_dir: selected_files.extend(random.sample(md_files, files_per_dir)) else: selected_files.extend(md_files) if not selected_files: return False, "Nie znaleziono żadnych plików .md w podanych lokalizacjach.", None all_chunks = [] errors = [] for file_path in selected_files: try: loader = TextLoader(file_path, encoding='utf-8') docs = loader.load() for d in docs: d.metadata["source"] = os.path.basename(file_path) d.metadata["full_path"] = file_path # --- ZACHOWANY DUŻY CHUNK --- splitter = MarkdownTextSplitter(chunk_size=4000, chunk_overlap=200) chunks = splitter.split_documents(docs) all_chunks.extend(chunks) except Exception as e: errors.append(f"{os.path.basename(file_path)}: {str(e)}") if not all_chunks: return False, "Błąd przetwarzania plików (wszystkie puste lub błędy).", None try: embeddings = load_embedding_model() vectordb = Chroma(persist_directory=new_db_path, embedding_function=embeddings) chunk_ids = generate_chunk_ids(all_chunks) vectordb.add_documents(documents=all_chunks, ids=chunk_ids) return True, f"Załadowano {len(selected_files)} plików z {scanned_folders_count} folderów.\nPrzykłady: {', '.join([os.path.basename(f) for f in selected_files[:3]])}...", new_db_path except Exception as e: return False, f"Błąd ChromaDB: {e}", None def create_or_update_database(uploaded_files, db_name, is_update=False): db_path = os.path.join(DB_ROOT_DIR, db_name) if not isinstance(uploaded_files, list): uploaded_files = [uploaded_files] all_chunks = [] processed_files_count = 0 errors = [] for uploaded_file in uploaded_files: file_ext = os.path.splitext(uploaded_file.name)[1].lower() with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp_file: tmp_file.write(uploaded_file.getvalue()) tmp_path = tmp_file.name try: file_chunks = [] if file_ext == ".pdf": loader = PyPDFLoader(tmp_path) docs = loader.load() for d in docs: d.metadata["source"] = uploaded_file.name splitter = RecursiveCharacterTextSplitter(chunk_size=4000, chunk_overlap=200) file_chunks = splitter.split_documents(docs) elif file_ext == ".md": loader = TextLoader(tmp_path, encoding='utf-8') docs = loader.load() for d in docs: d.metadata["source"] = uploaded_file.name splitter = MarkdownTextSplitter(chunk_size=4000, chunk_overlap=200) file_chunks = splitter.split_documents(docs) elif file_ext == ".jsonl": raw_docs = [] with open(tmp_path, 'r', encoding='utf-8-sig') as f: for line_number, line in enumerate(f): if not line.strip(): continue try: record = json.loads(line) doc = Document(page_content=json.dumps(record, ensure_ascii=False), metadata={"source": uploaded_file.name, "line": line_number}) raw_docs.append(doc) except: pass splitter = RecursiveCharacterTextSplitter(chunk_size=4000, chunk_overlap=200) file_chunks = splitter.split_documents(raw_docs) if file_chunks: all_chunks.extend(file_chunks) processed_files_count += 1 else: errors.append(f"{uploaded_file.name}: Pusty.") except Exception as e: errors.append(f"{uploaded_file.name}: {str(e)}") finally: if os.path.exists(tmp_path): os.remove(tmp_path) if all_chunks: try: embeddings = load_embedding_model() vectordb = Chroma(persist_directory=db_path, embedding_function=embeddings) chunk_ids = generate_chunk_ids(all_chunks) vectordb.add_documents(documents=all_chunks, ids=chunk_ids) return True, f"Przetworzono {processed_files_count} plików." except Exception as e: return False, str(e) return False, "Brak danych." # --- SIDEBAR --- with st.sidebar: st.title("🎛️ Sterownia AI") if "detected_ctx" not in st.session_state: st.session_state.detected_ctx = get_server_context_limit(API_BASE_URL) max_ctx_limit = st.number_input("Limit Kontekstu", 1024, 128000, st.session_state.detected_ctx, 1024) retrieval_k = st.number_input("Liczba fragmentów (Top-K)", min_value=1, max_value=50, value=5, step=1, help="Mniejsza liczba = mniej mieszania (np. 3-5).") st.markdown("---") existing_dbs = [d for d in os.listdir(DB_ROOT_DIR) if os.path.isdir(os.path.join(DB_ROOT_DIR, d))] options = ["-- Wybierz --", "💬 Czat Ogólny (Bez RAG)", "⚡ Baza Ulotna (Losowy Mix)"] + existing_dbs selected_option = st.selectbox("📂 Wybierz Tryb/Bazę:", options) if selected_option == "⚡ Baza Ulotna (Losowy Mix)": with st.expander("🎲 Konfiguracja Mixera", expanded=True): st.caption("Zarządzaj lokalizacjami wiedzy.") if "ephemeral_paths" not in st.session_state: st.session_state.ephemeral_paths = [] col_btn, col_info = st.columns([1, 1]) with col_btn: if st.button("📂 Dodaj katalog", help="Wybierz folder."): selected_path = select_folder_dialog() if selected_path: if selected_path not in st.session_state.ephemeral_paths: st.session_state.ephemeral_paths.append(selected_path) st.session_state.paths_textarea = "\n".join(st.session_state.ephemeral_paths) st.rerun() with col_info: if st.button("❌ Czyść listę"): st.session_state.ephemeral_paths = [] st.session_state.paths_textarea = "" st.rerun() if "paths_textarea" not in st.session_state: st.session_state.paths_textarea = "\n".join(st.session_state.ephemeral_paths) dir_paths_input = st.text_area("Wybrane ścieżki:", height=100, key="paths_textarea") current_text_paths = [p.strip() for p in dir_paths_input.split('\n') if p.strip()] if current_text_paths != st.session_state.ephemeral_paths: st.session_state.ephemeral_paths = current_text_paths c_opt1, c_opt2 = st.columns(2) with c_opt1: files_count = st.number_input("Plików/folder", min_value=1, max_value=1000, value=10, step=1) with c_opt2: is_recursive = st.checkbox("Skanuj podkatalogi", value=False) if st.button("🎲 Losuj i Ładuj do RAM"): if st.session_state.ephemeral_paths: st.session_state.vectorstore = None gc.collect() with st.spinner("Losowanie i indeksowanie..."): ok, msg, new_path = process_local_files_to_ephemeral( st.session_state.ephemeral_paths, files_per_dir=files_count, recursive=is_recursive ) if ok and new_path: st.session_state.current_ephemeral_path = new_path st.session_state.vectorstore = Chroma( persist_directory=new_path, embedding_function=load_embedding_model() ) st.session_state.active_db_name = "EPHEMERAL_RAM_MODE" st.success(msg) time.sleep(1) st.rerun() else: st.error(msg) else: st.warning("Lista folderów jest pusta.") elif selected_option not in ["-- Wybierz --", "💬 Czat Ogólny (Bez RAG)"]: with st.expander("📥 Douczanie bazy"): up_files = st.file_uploader("Dodaj pliki", key="updater", accept_multiple_files=True) if up_files and st.button("Indeksuj"): with st.spinner(f"Przetwarzanie {len(up_files)} plików..."): ok, msg = create_or_update_database(up_files, selected_option, is_update=True) if ok: st.success(msg) else: st.error(msg) if selected_option != "⚡ Baza Ulotna (Losowy Mix)": with st.expander("🆕 Nowa Stała Baza"): new_db_name = st.text_input("Nazwa") new_db_files = st.file_uploader("Pliki startowe", key="creator", accept_multiple_files=True) if st.button("Utwórz"): if new_db_name and new_db_files: with st.spinner("Tworzenie..."): ok, msg = create_or_update_database(new_db_files, new_db_name) if ok: st.success("Gotowe!") time.sleep(1.5) st.rerun() else: st.warning("Podaj nazwę i pliki.") if selected_option != "-- Wybierz --": st.markdown("---") current_selection = st.session_state.get("active_selection") if current_selection != selected_option: st.session_state.active_selection = selected_option st.session_state.messages = [] st.session_state.current_chat_id = str(uuid.uuid4()) if "last_stats" in st.session_state: del st.session_state.last_stats if selected_option == "💬 Czat Ogólny (Bez RAG)": st.session_state.active_db_name = "GENERAL_CHAT" st.session_state.vectorstore = None st.session_state.current_system_prompt = DEFAULT_PROMPTS["Asystent Ogólny"] st.session_state.selected_prompt_name = "Asystent Ogólny" elif selected_option == "⚡ Baza Ulotna (Losowy Mix)": st.session_state.active_db_name = "EPHEMERAL_RAM_MODE" if "selected_prompt_name" not in st.session_state or st.session_state.selected_prompt_name == "Asystent Ogólny": st.session_state.current_system_prompt = DEFAULT_PROMPTS["Domyślny Ekspert"] st.session_state.selected_prompt_name = "Domyślny Ekspert" if st.session_state.current_ephemeral_path and os.path.exists(st.session_state.current_ephemeral_path): st.session_state.vectorstore = Chroma( persist_directory=st.session_state.current_ephemeral_path, embedding_function=load_embedding_model() ) st.toast("Załadowano bazę ulotną!") else: st.session_state.vectorstore = None st.warning("⚠️ Baza ulotna jest pusta lub wygasła! Kliknij 'Losuj'.") else: st.session_state.active_db_name = selected_option st.session_state.vectorstore = Chroma( persist_directory=os.path.join(DB_ROOT_DIR, selected_option), embedding_function=load_embedding_model() ) st.session_state.current_system_prompt = DEFAULT_PROMPTS["Domyślny Ekspert"] if st.session_state.vectorstore is not None or selected_option == "💬 Czat Ogólny (Bez RAG)": with st.expander("📝 Menedżer Promptów", expanded=True): prompt_lib = load_prompt_library() prompt_names = list(prompt_lib.keys()) selected_p = st.selectbox( "Wybierz gotowy prompt:", prompt_names, index=prompt_names.index(st.session_state.selected_prompt_name) if st.session_state.selected_prompt_name in prompt_names else 0, key="prompt_selector" ) if selected_p != st.session_state.selected_prompt_name: st.session_state.selected_prompt_name = selected_p st.session_state.current_system_prompt = prompt_lib[selected_p] st.rerun() edited_prompt = st.text_area("Edytor treści:", value=st.session_state.current_system_prompt, height=200) if edited_prompt != st.session_state.current_system_prompt: st.session_state.current_system_prompt = edited_prompt col_save, col_del = st.columns([2, 1]) with col_save: new_prompt_name = st.text_input("Nazwa dla nowego/aktualizacji:", value=st.session_state.selected_prompt_name) if st.button("💾 Zapisz / Aktualizuj"): if new_prompt_name: prompt_lib[new_prompt_name] = edited_prompt save_prompt_library(prompt_lib) st.session_state.selected_prompt_name = new_prompt_name st.success(f"Zapisano: {new_prompt_name}") time.sleep(1) st.rerun() with col_del: st.write("") st.write("") if st.button("🗑️ Usuń"): if new_prompt_name in prompt_lib and len(prompt_lib) > 1: del prompt_lib[new_prompt_name] save_prompt_library(prompt_lib) st.session_state.selected_prompt_name = list(prompt_lib.keys())[0] st.session_state.current_system_prompt = prompt_lib[st.session_state.selected_prompt_name] st.rerun() else: st.error("Nie można usunąć.") st.markdown("") c1, c2 = st.columns(2) if c1.button("➕ Nowy Czat"): st.session_state.messages = [] st.session_state.current_chat_id = str(uuid.uuid4()) st.rerun() if c2.button("🕵️ Temp Czat"): st.session_state.messages = [] st.session_state.current_chat_id = "TEMP" st.session_state.is_temp_chat = True st.rerun() st.subheader("Historia") hist_folder_name = "EPHEMERAL_HISTORY" if selected_option == "⚡ Baza Ulotna (Losowy Mix)" else st.session_state.active_db_name chat_dir = os.path.join(CHATS_ROOT_DIR, hist_folder_name) if not os.path.exists(chat_dir): os.makedirs(chat_dir) if os.path.exists(chat_dir): hist_files = [f for f in os.listdir(chat_dir) if f.endswith(".json")] hist_files.sort(key=lambda x: os.path.getmtime(os.path.join(chat_dir, x)), reverse=True) if hist_files: with st.popover("🗑️ Opcje czyszczenia", use_container_width=True): st.button("🔥 Usuń wszystko", on_click=reset_all_chats_callback, args=(hist_folder_name,)) for f in hist_files: try: with open(os.path.join(chat_dir, f), "r") as jf: fdata = json.load(jf) col_l, col_d = st.columns([5, 1]) with col_l: is_act = (st.session_state.get("current_chat_id") == fdata["id"]) title_txt = fdata.get('title', 'Bez tytułu') lbl = f"🔹 {title_txt[:18]}..." if is_act else f"📂 {title_txt[:18]}..." if st.button(lbl, key=f"l_{f}", use_container_width=True): st.session_state.current_chat_id = fdata["id"] st.session_state.messages = fdata["messages"] st.rerun() with col_d: st.button("🗑️", key=f"d_{f}", on_click=delete_chat_callback, args=(hist_folder_name, f)) except: pass else: st.caption("Brak historii.") # --- MAIN --- if "active_db_name" in st.session_state: db_name = st.session_state.active_db_name is_temp = st.session_state.get("current_chat_id") == "TEMP" has_rag = st.session_state.vectorstore is not None if "uploader_key" not in st.session_state: st.session_state.uploader_key = 0 mode_label = "RAG" if has_rag else "CHAT" if db_name == "EPHEMERAL_RAM_MODE": db_display_name = "⚡ Baza Ulotna" elif db_name == "GENERAL_CHAT": db_display_name = "Ogólny" else: db_display_name = db_name st.caption(f"🧠 Tryb: **{mode_label}** | Baza: {db_display_name} | {'🕵️ Temp' if is_temp else f'ID: {st.session_state.current_chat_id[:8]}'}") for msg in st.session_state.messages: with st.chat_message(msg["role"]): content = msg["content"] if isinstance(content, list): for item in content: if item.get("type") == "text": st.markdown(item["text"]) elif item.get("type") == "image_url": st.image(item["image_url"]["url"], width=300) else: st.markdown(content) if "last_stats" in st.session_state: stats = st.session_state.last_stats st.markdown("---") sc1, sc2 = st.columns([4, 1]) with sc1: st.progress(min(stats["total_tok"] / max_ctx_limit, 1.0)) rag_info = f" (w tym RAG: {stats['rag_tokens']})" if stats['rag_tokens'] > 0 else "" st.caption(f"📊 Tokeny: {stats['total_tok']}{rag_info} / Limit: {max_ctx_limit}") with sc2: with st.popover("⚡ Statystyki"): st.markdown(f"**Szybkość:** {stats['speed']:.1f} tok/s\n**Czas:** {stats['duration']:.2f} s") col_tools, col_space = st.columns([1, 20]) uploaded_image_base64 = None with col_tools: with st.popover("📎", help="Załącz zdjęcie", use_container_width=True): st.markdown("### Dodaj załącznik") img_file = st.file_uploader("Wybierz obraz", type=["jpg", "jpeg", "png"], key=f"uploader_{st.session_state.uploader_key}", label_visibility="collapsed") if img_file: st.image(img_file, width=150) uploaded_image_base64 = encode_image_to_base64(img_file) st.caption("Gotowe do wysłania") if user_input := st.chat_input("Napisz wiadomość..."): if uploaded_image_base64: user_msg_content = [{"type": "text", "text": user_input}, {"type": "image_url", "image_url": {"url": uploaded_image_base64}}] st.session_state.messages.append({"role": "user", "content": user_msg_content}) with st.chat_message("user"): st.markdown(user_input) st.image(uploaded_image_base64, width=300) else: st.session_state.messages.append({"role": "user", "content": user_input}) with st.chat_message("user"): st.markdown(user_input) llm = load_llm_client() context_text = "" if has_rag: try: retriever = st.session_state.vectorstore.as_retriever(search_kwargs={"k": retrieval_k}) docs = retriever.invoke(user_input) context_text = "\n\n".join(f"[PLIK: {d.metadata.get('source', 'unknown')}]\n{d.page_content}" for d in docs) except Exception as e: st.error(f"Błąd Retrievera: {e}") context_text = "" else: context_text = "BRAK (Tryb Czat Ogólny)" history_text_lines = [] for m in st.session_state.messages[-10:]: role = m["role"].upper() if isinstance(m["content"], list): text_part = next((x["text"] for x in m["content"] if x["type"] == "text"), "[Obraz]") history_text_lines.append(f"{role}: {text_part} [Załączono obraz]") else: history_text_lines.append(f"{role}: {m['content']}") history_text = "\n".join(history_text_lines) if has_rag: system_instructions = f"{st.session_state.current_system_prompt}\n\nKONTEKST Z BAZY:\n{context_text}\n\nHISTORIA CZATU:\n{history_text}" else: system_instructions = f"{st.session_state.current_system_prompt}\n\nHISTORIA ROZMOWY:\n{history_text}" messages_payload = [SystemMessage(content=system_instructions)] if uploaded_image_base64: messages_payload.append(HumanMessage(content=[{"type": "text", "text": f"Pytanie: {user_input}"}, {"type": "image_url", "image_url": {"url": uploaded_image_base64}}])) else: messages_payload.append(HumanMessage(content=f"Pytanie: {user_input}")) with st.chat_message("assistant"): placeholder = st.empty() full_response = "" start_t = time.time() try: for chunk in llm.stream(messages_payload): if chunk.content: full_response += chunk.content placeholder.markdown(full_response + "▌") except Exception as e: st.error(f"Błąd modelu: {e}") full_response = "Wystąpił błąd komunikacji." duration = max(time.time() - start_t, 0.001) clean_resp = clean_reasoning_tags(full_response) placeholder.markdown(clean_resp) st.session_state.messages.append({"role": "assistant", "content": clean_resp}) if not is_temp: save_chat_to_file(db_name, st.session_state.current_chat_id, st.session_state.messages, user_input[:30]) out_tokens = estimate_tokens(clean_resp) rag_tokens = estimate_tokens(context_text) if has_rag else 0 input_est = estimate_tokens(system_instructions) + estimate_tokens(user_input) total_tok = input_est + out_tokens speed = out_tokens / duration st.session_state.last_stats = { "total_tok": total_tok, "rag_tokens": rag_tokens, "speed": speed, "duration": duration } st.session_state.uploader_key += 1 st.rerun() else: st.info("👈 Wybierz tryb w menu bocznym.")