File size: 4,013 Bytes
19d62ff
80cb919
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19d62ff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Centralized SFT writer (JSONL + CSV) and RAG writer
import csv
import orjson
from typing import Optional, Dict
import logging

# Logger
logger = logging.getLogger("schema")
if not logger.handlers:
    logger.setLevel(logging.INFO)
    logger.addHandler(logging.StreamHandler())

def sft_row(instruction: str, user_input: str, output: str, source: str, rid: str, task: str, meta: Optional[dict] = None):
    return {
        "source": source,
        "id": rid,
        "task": task,
        "sft": {
            "instruction": instruction,
            "input": user_input,
            "output": output
        },
        "meta": meta or {}
    }

def is_valid_row(row: Dict, max_chars: int = 20000) -> bool:
    s = row.get("sft", {})
    instr = s.get("instruction", "")
    inp = s.get("input", "")
    out = s.get("output", "")
    # basic sanity: non-empty input OR output; cap extremes
    if not (inp or out): return False
    if any(len(x) > max_chars for x in (instr, inp, out)): return False
    return True

class CentralisedWriter:
    """Streams JSONL + CSV in parallel to stay memory-safe."""
    def __init__(self, jsonl_path: str, csv_path: str):
        self.jsonl_fp = open(jsonl_path, "wb")
        self.csv_fp   = open(csv_path, "w", newline="", encoding="utf-8")
        self.csv_wr   = csv.DictWriter(self.csv_fp, fieldnames=["instruction","input","output","source","id","task"])
        self.csv_wr.writeheader()

    def write(self, row: dict):
        if not is_valid_row(row):
            s = row.get("sft", {})
            logger.warning(
                f"[WRITER] Skipping invalid row id={row.get('id')} "
                f"(len instr={len(s.get('instruction',''))}, input={len(s.get('input',''))}, output={len(s.get('output',''))})"
            )
            return
        self.jsonl_fp.write(orjson.dumps(row))
        self.jsonl_fp.write(b"\n")
        s = row["sft"]
        self.csv_wr.writerow({
            "instruction": s.get("instruction",""),
            "input": s.get("input",""),
            "output": s.get("output",""),
            "source": row.get("source",""),
            "id": row.get("id",""),
            "task": row.get("task","")
        })

    def close(self):
        try:
            self.jsonl_fp.close()
        finally:
            self.csv_fp.close()


# —— RAG (QAC) schema ——

def rag_row(question: str, context: str, answer: str, rid: str):
    return {
        "id": rid,
        "question": question or "",
        "answer": answer or "",
        "context": context or ""
    }


def is_valid_rag_row(row: Dict, max_chars: int = 20000) -> bool:
    q = row.get("question", "")
    a = row.get("answer", "")
    c = row.get("context", "")
    if not (q and a):
        return False
    if any(len(x) > max_chars for x in (q, a, c)):
        return False
    return True


class RAGWriter:
    """Streams JSONL + CSV for RAG (QAC) format with columns: id, question, answer, context."""
    def __init__(self, jsonl_path: str, csv_path: str):
        self.jsonl_fp = open(jsonl_path, "wb")
        self.csv_fp   = open(csv_path, "w", newline="", encoding="utf-8")
        self.csv_wr   = csv.DictWriter(self.csv_fp, fieldnames=["id","question","answer","context"])
        self.csv_wr.writeheader()

    def write(self, row: dict):
        if not is_valid_rag_row(row):
            logger.warning(
                f"[RAG-WRITER] Skipping invalid row id={row.get('id')} "
                f"(len q={len(row.get('question',''))}, a={len(row.get('answer',''))}, c={len(row.get('context',''))})"
            )
            return
        self.jsonl_fp.write(orjson.dumps(row))
        self.jsonl_fp.write(b"\n")
        self.csv_wr.writerow({
            "id": row.get("id",""),
            "question": row.get("question",""),
            "answer": row.get("answer",""),
            "context": row.get("context","")
        })

    def close(self):
        try:
            self.jsonl_fp.close()
        finally:
            self.csv_fp.close()