Skip to content

Module mongo_episode

Episode

Source code in nbs/mongo_episode.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
class Episode:
    def __init__(
        self, date: str, titre: str, collection_name: str = "episodes"
    ) -> None:
        """Initialise une instance d'Episode.

        Args:
            date (str): La date de l'épisode au format "2024-12-22T09:59:39" conforme à DATE_FORMAT.
            titre (str): Le titre de l'épisode.
            collection_name (str, optional): Le nom de la collection dans la base de données. Défaut: "episodes".

        Notes:
            Si l'épisode existe déjà en base, ses attributs seront chargés.
        """
        DB_HOST, DB_NAME, _ = get_DB_VARS()
        self.collection = get_collection(
            target_db=DB_HOST, client_name=DB_NAME, collection_name=collection_name
        )
        self.date: datetime = Episode.get_date_from_string(date)
        self.titre: str = titre

        if self.exists():
            episode = self.collection.find_one({"titre": self.titre, "date": self.date})
            self.description: Optional[str] = episode.get("description")
            self.url_telechargement: Optional[str] = episode.get("url")
            self.audio_rel_filename: Optional[str] = episode.get("audio_rel_filename")
            self.transcription: Optional[str] = episode.get("transcription")
            self.type: Optional[str] = episode.get("type")
            self.duree: int = episode.get("duree", -1)
            self.masked: bool = episode.get("masked", False)
        else:
            self.description = None
            self.url_telechargement = None
            self.audio_rel_filename = None
            self.transcription = None
            self.type = None
            self.duree = -1  # en secondes
            self.masked = False

    @classmethod
    def from_oid(cls, oid: ObjectId, collection_name: str = "episodes") -> "Episode":
        """Crée un épisode à partir d'un ObjectId dans la base de données.

        Args:
            oid (ObjectId): L'identifiant de l'épisode dans Mongo.
            collection_name (str, optional): Le nom de la collection. Défaut: "episodes".

        Returns:
            Episode: L'instance d'Episode correspondante.
        """
        DB_HOST, DB_NAME, _ = get_DB_VARS()
        collection = get_collection(
            target_db=DB_HOST, client_name=DB_NAME, collection_name=collection_name
        )
        document = collection.find_one({"_id": oid})
        date_doc_str = cls.get_string_from_date(document.get("date"), DATE_FORMAT)
        instance = cls(date=date_doc_str, titre=document.get("titre"))
        return instance

    @classmethod
    def from_date(
        cls, date: datetime, collection_name: str = "episodes"
    ) -> Optional["Episode"]:
        """Crée un épisode à partir d'une date dans la base de données.

        Args:
            date (datetime): La date recherchée.
            collection_name (str, optional): Le nom de la collection. Défaut: "episodes".

        Returns:
            Optional[Episode]: L'instance d'Episode si trouvée, sinon None.
        """
        DB_HOST, DB_NAME, _ = get_DB_VARS()
        collection = get_collection(
            target_db=DB_HOST, client_name=DB_NAME, collection_name=collection_name
        )
        start_date = datetime(date.year, date.month, date.day)
        end_date = datetime(date.year, date.month, date.day, 23, 59, 59)
        document = collection.find_one({"date": {"$gte": start_date, "$lte": end_date}})
        if document:
            date_doc_str = cls.get_string_from_date(document.get("date"), DATE_FORMAT)
            instance = cls(date=date_doc_str, titre=document.get("titre"))
            return instance
        else:
            return None

    def exists(self) -> bool:
        """Vérifie si l'épisode existe dans la base de données.

        Returns:
            bool: True si l'épisode existe, False sinon.
        """
        return (
            self.collection.find_one({"titre": self.titre, "date": self.date})
            is not None
        )

    def keep(self) -> int:
        """Télécharge le fichier audio si nécessaire et conserve l'épisode dans la base de données.

        Returns:
            int: 1 si une nouvelle entrée est créée en base, 0 sinon.
        """
        message_log = f"{Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)} - {self.titre}"
        if not self.exists():
            print(
                f"Episode du {Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)} nouveau: Duree: {self.duree}, Type: {self.type}"
            )
            mongolog("insert", self.collection.name, message_log)
            self.download_audio(verbose=True)
            self.collection.insert_one(
                {
                    "titre": self.titre,
                    "date": self.date,
                    "description": self.description,
                    "url": self.url_telechargement,
                    "audio_rel_filename": self.audio_rel_filename,
                    "transcription": self.transcription,
                    "type": self.type,
                    "duree": self.duree,
                    "masked": self.masked,
                }
            )
            return 1
        else:
            print(
                f"Episode du {Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)} deja existant"
            )
            mongolog("update", self.collection.name, message_log)
            return 0

    def update_date(self, new_date: datetime) -> None:
        """Met à jour la date de l'épisode dans la base de données.

        Args:
            new_date (datetime): La nouvelle date de l'épisode.
        """
        self.collection.update_one(
            {"_id": self.get_oid()}, {"$set": {"date": new_date}}
        )
        message_log = f"{Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)} - {self.titre} -> {Episode.get_string_from_date(new_date, format=LOG_DATE_FORMAT)}"
        self.date = new_date
        mongolog("force_update", self.collection.name, message_log)

    def remove(self) -> None:
        """Supprime l'épisode de la base de données."""
        message_log = f"{Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)} - {self.titre}"
        self.collection.delete_one({"titre": self.titre, "date": self.date})
        mongolog("delete", self.collection.name, message_log)

    def get_oid(self) -> Optional[ObjectId]:
        """Récupère l'identifiant Mongo (_id) de l'épisode.

        Returns:
            Optional[ObjectId]: L'ObjectId de l'épisode s'il existe, sinon None.
        """
        document = self.collection.find_one({"titre": self.titre, "date": self.date})
        if document:
            return document["_id"]
        else:
            return None

    @staticmethod
    def get_date_from_string(date: str, DATE_FORMAT: str = DATE_FORMAT) -> datetime:
        """Convertit une chaîne de caractères en objet datetime.

        Args:
            date (str): La chaîne représentant la date.
            DATE_FORMAT (str, optional): Le format de la date. Défaut est DATE_FORMAT.

        Returns:
            datetime: L'objet datetime correspondant.
        """
        return datetime.strptime(date, DATE_FORMAT)

    @staticmethod
    def get_string_from_date(date: datetime, format: Optional[str] = None) -> str:
        """Convertit un objet datetime en chaîne de caractères.

        Args:
            date (datetime): L'objet datetime.
            format (Optional[str], optional): Le format de sortie. Si None, DATE_FORMAT est utilisé.

        Returns:
            str: La chaîne représentant la date.
        """
        if format is not None:
            return date.strftime(format)
        else:
            return date.strftime(DATE_FORMAT)

    @staticmethod
    def format_duration(seconds: int) -> str:
        """Convertit une durée en secondes au format HH:MM:SS.

        Args:
            seconds (int): La durée en secondes.

        Returns:
            str: La durée formatée en chaîne de caractères.
        """
        if seconds < 0:
            return f"-{Episode.format_duration(-seconds)}"
        hours = seconds // 3600
        minutes = (seconds % 3600) // 60
        seconds = seconds % 60
        return f"{hours:02}:{minutes:02}:{seconds:02}"

    def __str__(self) -> str:
        """Renvoie une représentation textuelle de l'épisode.

        Returns:
            str: Les informations de l'épisode sous forme de chaîne de caractères.
        """
        return (
            f"_oid: {self.get_oid()}\n"
            f"Date: {Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)}\n"
            f"Titre: {self.titre}\n"
            f"Description: {self.description}\n"
            f"URL de téléchargement: {self.url_telechargement}\n"
            f"Fichier audio: {self.audio_rel_filename}\n"
            f"Duree: {self.duree} en secondes ({Episode.format_duration(self.duree)})\n"
            f"Transcription: {self.transcription[:100] if self.transcription else 'No transcription yet available'}..."
        )

    def __repr__(self) -> str:
        """Renvoie une représentation officielle de l'objet.

        Returns:
            str: La représentation de l'objet (équivalente à __str__).
        """
        return self.__str__()

    def download_audio(self, verbose: bool = False) -> None:
        """Télécharge le fichier audio à partir de l'URL de téléchargement et le sauvegarde localement.

        Args:
            verbose (bool, optional): Si True, affiche des messages d'information. Défaut False.
        """
        if self.url_telechargement is None:
            return
        year = str(self.date.year)
        full_audio_path = get_audio_path(AUDIO_PATH, year)
        full_filename = os.path.join(
            full_audio_path, os.path.basename(self.url_telechargement)
        )
        self.audio_rel_filename = os.path.relpath(
            full_filename, get_audio_path(AUDIO_PATH, year="")
        )
        if not os.path.exists(full_filename):
            if verbose:
                print(
                    f"Téléchargement de {self.url_telechargement} vers {full_filename}"
                )
            response = requests.get(self.url_telechargement)
            with open(full_filename, "wb") as file:
                file.write(response.content)
        else:
            if verbose:
                print(f"Le fichier {full_filename} existe déjà. Ignoré.")

    def set_transcription(self, verbose: bool = False, keep_cache: bool = True) -> None:
        """Extrait et sauvegarde la transcription de l'audio en utilisant un modèle Whisper.

        Utilise le cache si disponible ou extrait la transcription de l'audio.

        Args:
            verbose (bool, optional): Si True, affiche des messages d'information. Défaut False.
            keep_cache (bool, optional): Si True, sauvegarde la transcription dans un fichier cache. Défaut True.
        """
        if self.transcription is not None:
            if verbose:
                print("Transcription existe deja")
            return
        mp3_fullfilename = get_audio_path(AUDIO_PATH, year="") + self.audio_rel_filename
        cache_transcription_filename = f"{os.path.splitext(mp3_fullfilename)[0]}.txt"
        if os.path.exists(cache_transcription_filename):
            if verbose:
                print(f"Transcription cachee trouvee: {cache_transcription_filename}")
            with open(cache_transcription_filename, "r") as file:
                self.transcription = file.read()
            self.collection.update_one(
                {"_id": self.get_oid()},
                {"$set": {"transcription": self.transcription}},
            )
            return

        self.transcription = extract_whisper(mp3_fullfilename)
        if keep_cache:
            with open(cache_transcription_filename, "w") as f:
                f.write(self.transcription)
        self.collection.update_one(
            {"_id": self.get_oid()}, {"$set": {"transcription": self.transcription}}
        )

    def to_dict(self) -> Dict[str, Union[str, datetime, int, None, bool]]:
        """Convertit l'épisode en dictionnaire.

        Returns:
            Dict[str, Union[str, datetime, int, None, bool]]: Dictionnaire contenant les informations de l'épisode.
                Les clés sont ['date', 'titre', 'description', 'url_telechargement', 'audio_rel_filename', 'transcription', 'type', 'duree', 'masked'].
        """
        return {
            "date": self.date,
            "titre": self.titre,
            "description": self.description,
            "url_telechargement": self.url_telechargement,
            "audio_rel_filename": self.audio_rel_filename,
            "transcription": self.transcription,
            "type": self.type,
            "duree": self.duree,
            "masked": self.masked,
        }

    def get_all_auteurs(self) -> List[str]:
        """Extrait la liste de tous les auteurs mentionnés dans la transcription.

        Notes:
            Utilise le modèle GPT-4 via Azure LLM pour extraire une liste JSON de noms d'auteurs.

        Returns:
            List[str]: La liste des auteurs détectés.
        """
        if self.transcription is None:
            return []

        llm_structured_output = get_azure_llm("gpt-4o")
        response_schema = {
            "type": "json_schema",
            "json_schema": {
                "name": "AuthorList",
                "schema": {
                    "type": "object",
                    "properties": {
                        "Authors": {
                            "type": "array",
                            "items": {
                                "type": "string",
                                "description": "Une liste des auteurs extraits de la transcription",
                            },
                        }
                    },
                    "required": ["Authors"],
                    "additionalProperties": False,
                },
            },
        }
        response = llm_structured_output.chat(
            messages=[
                ChatMessage(
                    role="system",
                    content="Tu es un assistant utile qui retourne une liste JSON de noms d'auteurs.",
                ),
                ChatMessage(
                    role="user",
                    content=f"Est-ce que tu peux me lister tous les noms d'auteurs dont on parle des oeuvres \
dans cette transcription d'un épisode du masque et la plume \
diffuse le {self.date.strftime('%d %b %Y')}. \
Je veux toujours avoir le prénom et le nom complet de chaque auteur. \
Voici cette transcription : {self.transcription} ",
                ),
            ],
            response_format=response_schema,
        )
        try:
            json_dict = json.loads(response.message.content)
        except json.JSONDecodeError as e:
            print("Error parsing JSON:", e)
            print("Raw response:", response.message.content)
            return []
        return json_dict["Authors"]

__init__(date, titre, collection_name='episodes')

Initialise une instance d'Episode.

Parameters:

Name Type Description Default
date str

La date de l'épisode au format "2024-12-22T09:59:39" conforme à DATE_FORMAT.

required
titre str

Le titre de l'épisode.

required
collection_name str

Le nom de la collection dans la base de données. Défaut: "episodes".

'episodes'
Notes

Si l'épisode existe déjà en base, ses attributs seront chargés.

Source code in nbs/mongo_episode.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def __init__(
    self, date: str, titre: str, collection_name: str = "episodes"
) -> None:
    """Initialise une instance d'Episode.

    Args:
        date (str): La date de l'épisode au format "2024-12-22T09:59:39" conforme à DATE_FORMAT.
        titre (str): Le titre de l'épisode.
        collection_name (str, optional): Le nom de la collection dans la base de données. Défaut: "episodes".

    Notes:
        Si l'épisode existe déjà en base, ses attributs seront chargés.
    """
    DB_HOST, DB_NAME, _ = get_DB_VARS()
    self.collection = get_collection(
        target_db=DB_HOST, client_name=DB_NAME, collection_name=collection_name
    )
    self.date: datetime = Episode.get_date_from_string(date)
    self.titre: str = titre

    if self.exists():
        episode = self.collection.find_one({"titre": self.titre, "date": self.date})
        self.description: Optional[str] = episode.get("description")
        self.url_telechargement: Optional[str] = episode.get("url")
        self.audio_rel_filename: Optional[str] = episode.get("audio_rel_filename")
        self.transcription: Optional[str] = episode.get("transcription")
        self.type: Optional[str] = episode.get("type")
        self.duree: int = episode.get("duree", -1)
        self.masked: bool = episode.get("masked", False)
    else:
        self.description = None
        self.url_telechargement = None
        self.audio_rel_filename = None
        self.transcription = None
        self.type = None
        self.duree = -1  # en secondes
        self.masked = False

__repr__()

Renvoie une représentation officielle de l'objet.

Returns:

Name Type Description
str str

La représentation de l'objet (équivalente à str).

Source code in nbs/mongo_episode.py
392
393
394
395
396
397
398
def __repr__(self) -> str:
    """Renvoie une représentation officielle de l'objet.

    Returns:
        str: La représentation de l'objet (équivalente à __str__).
    """
    return self.__str__()

__str__()

Renvoie une représentation textuelle de l'épisode.

Returns:

Name Type Description
str str

Les informations de l'épisode sous forme de chaîne de caractères.

Source code in nbs/mongo_episode.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def __str__(self) -> str:
    """Renvoie une représentation textuelle de l'épisode.

    Returns:
        str: Les informations de l'épisode sous forme de chaîne de caractères.
    """
    return (
        f"_oid: {self.get_oid()}\n"
        f"Date: {Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)}\n"
        f"Titre: {self.titre}\n"
        f"Description: {self.description}\n"
        f"URL de téléchargement: {self.url_telechargement}\n"
        f"Fichier audio: {self.audio_rel_filename}\n"
        f"Duree: {self.duree} en secondes ({Episode.format_duration(self.duree)})\n"
        f"Transcription: {self.transcription[:100] if self.transcription else 'No transcription yet available'}..."
    )

download_audio(verbose=False)

Télécharge le fichier audio à partir de l'URL de téléchargement et le sauvegarde localement.

Parameters:

Name Type Description Default
verbose bool

Si True, affiche des messages d'information. Défaut False.

False
Source code in nbs/mongo_episode.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def download_audio(self, verbose: bool = False) -> None:
    """Télécharge le fichier audio à partir de l'URL de téléchargement et le sauvegarde localement.

    Args:
        verbose (bool, optional): Si True, affiche des messages d'information. Défaut False.
    """
    if self.url_telechargement is None:
        return
    year = str(self.date.year)
    full_audio_path = get_audio_path(AUDIO_PATH, year)
    full_filename = os.path.join(
        full_audio_path, os.path.basename(self.url_telechargement)
    )
    self.audio_rel_filename = os.path.relpath(
        full_filename, get_audio_path(AUDIO_PATH, year="")
    )
    if not os.path.exists(full_filename):
        if verbose:
            print(
                f"Téléchargement de {self.url_telechargement} vers {full_filename}"
            )
        response = requests.get(self.url_telechargement)
        with open(full_filename, "wb") as file:
            file.write(response.content)
    else:
        if verbose:
            print(f"Le fichier {full_filename} existe déjà. Ignoré.")

exists()

Vérifie si l'épisode existe dans la base de données.

Returns:

Name Type Description
bool bool

True si l'épisode existe, False sinon.

Source code in nbs/mongo_episode.py
253
254
255
256
257
258
259
260
261
262
def exists(self) -> bool:
    """Vérifie si l'épisode existe dans la base de données.

    Returns:
        bool: True si l'épisode existe, False sinon.
    """
    return (
        self.collection.find_one({"titre": self.titre, "date": self.date})
        is not None
    )

format_duration(seconds) staticmethod

Convertit une durée en secondes au format HH:MM:SS.

Parameters:

Name Type Description Default
seconds int

La durée en secondes.

required

Returns:

Name Type Description
str str

La durée formatée en chaîne de caractères.

Source code in nbs/mongo_episode.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
@staticmethod
def format_duration(seconds: int) -> str:
    """Convertit une durée en secondes au format HH:MM:SS.

    Args:
        seconds (int): La durée en secondes.

    Returns:
        str: La durée formatée en chaîne de caractères.
    """
    if seconds < 0:
        return f"-{Episode.format_duration(-seconds)}"
    hours = seconds // 3600
    minutes = (seconds % 3600) // 60
    seconds = seconds % 60
    return f"{hours:02}:{minutes:02}:{seconds:02}"

from_date(date, collection_name='episodes') classmethod

Crée un épisode à partir d'une date dans la base de données.

Parameters:

Name Type Description Default
date datetime

La date recherchée.

required
collection_name str

Le nom de la collection. Défaut: "episodes".

'episodes'

Returns:

Type Description
Optional[Episode]

Optional[Episode]: L'instance d'Episode si trouvée, sinon None.

Source code in nbs/mongo_episode.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
@classmethod
def from_date(
    cls, date: datetime, collection_name: str = "episodes"
) -> Optional["Episode"]:
    """Crée un épisode à partir d'une date dans la base de données.

    Args:
        date (datetime): La date recherchée.
        collection_name (str, optional): Le nom de la collection. Défaut: "episodes".

    Returns:
        Optional[Episode]: L'instance d'Episode si trouvée, sinon None.
    """
    DB_HOST, DB_NAME, _ = get_DB_VARS()
    collection = get_collection(
        target_db=DB_HOST, client_name=DB_NAME, collection_name=collection_name
    )
    start_date = datetime(date.year, date.month, date.day)
    end_date = datetime(date.year, date.month, date.day, 23, 59, 59)
    document = collection.find_one({"date": {"$gte": start_date, "$lte": end_date}})
    if document:
        date_doc_str = cls.get_string_from_date(document.get("date"), DATE_FORMAT)
        instance = cls(date=date_doc_str, titre=document.get("titre"))
        return instance
    else:
        return None

from_oid(oid, collection_name='episodes') classmethod

Crée un épisode à partir d'un ObjectId dans la base de données.

Parameters:

Name Type Description Default
oid ObjectId

L'identifiant de l'épisode dans Mongo.

required
collection_name str

Le nom de la collection. Défaut: "episodes".

'episodes'

Returns:

Name Type Description
Episode Episode

L'instance d'Episode correspondante.

Source code in nbs/mongo_episode.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@classmethod
def from_oid(cls, oid: ObjectId, collection_name: str = "episodes") -> "Episode":
    """Crée un épisode à partir d'un ObjectId dans la base de données.

    Args:
        oid (ObjectId): L'identifiant de l'épisode dans Mongo.
        collection_name (str, optional): Le nom de la collection. Défaut: "episodes".

    Returns:
        Episode: L'instance d'Episode correspondante.
    """
    DB_HOST, DB_NAME, _ = get_DB_VARS()
    collection = get_collection(
        target_db=DB_HOST, client_name=DB_NAME, collection_name=collection_name
    )
    document = collection.find_one({"_id": oid})
    date_doc_str = cls.get_string_from_date(document.get("date"), DATE_FORMAT)
    instance = cls(date=date_doc_str, titre=document.get("titre"))
    return instance

get_all_auteurs()

Extrait la liste de tous les auteurs mentionnés dans la transcription.

Notes

Utilise le modèle GPT-4 via Azure LLM pour extraire une liste JSON de noms d'auteurs.

Returns:

Type Description
List[str]

List[str]: La liste des auteurs détectés.

Source code in nbs/mongo_episode.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
    def get_all_auteurs(self) -> List[str]:
        """Extrait la liste de tous les auteurs mentionnés dans la transcription.

        Notes:
            Utilise le modèle GPT-4 via Azure LLM pour extraire une liste JSON de noms d'auteurs.

        Returns:
            List[str]: La liste des auteurs détectés.
        """
        if self.transcription is None:
            return []

        llm_structured_output = get_azure_llm("gpt-4o")
        response_schema = {
            "type": "json_schema",
            "json_schema": {
                "name": "AuthorList",
                "schema": {
                    "type": "object",
                    "properties": {
                        "Authors": {
                            "type": "array",
                            "items": {
                                "type": "string",
                                "description": "Une liste des auteurs extraits de la transcription",
                            },
                        }
                    },
                    "required": ["Authors"],
                    "additionalProperties": False,
                },
            },
        }
        response = llm_structured_output.chat(
            messages=[
                ChatMessage(
                    role="system",
                    content="Tu es un assistant utile qui retourne une liste JSON de noms d'auteurs.",
                ),
                ChatMessage(
                    role="user",
                    content=f"Est-ce que tu peux me lister tous les noms d'auteurs dont on parle des oeuvres \
dans cette transcription d'un épisode du masque et la plume \
diffuse le {self.date.strftime('%d %b %Y')}. \
Je veux toujours avoir le prénom et le nom complet de chaque auteur. \
Voici cette transcription : {self.transcription} ",
                ),
            ],
            response_format=response_schema,
        )
        try:
            json_dict = json.loads(response.message.content)
        except json.JSONDecodeError as e:
            print("Error parsing JSON:", e)
            print("Raw response:", response.message.content)
            return []
        return json_dict["Authors"]

get_date_from_string(date, DATE_FORMAT=DATE_FORMAT) staticmethod

Convertit une chaîne de caractères en objet datetime.

Parameters:

Name Type Description Default
date str

La chaîne représentant la date.

required
DATE_FORMAT str

Le format de la date. Défaut est DATE_FORMAT.

DATE_FORMAT

Returns:

Name Type Description
datetime datetime

L'objet datetime correspondant.

Source code in nbs/mongo_episode.py
329
330
331
332
333
334
335
336
337
338
339
340
@staticmethod
def get_date_from_string(date: str, DATE_FORMAT: str = DATE_FORMAT) -> datetime:
    """Convertit une chaîne de caractères en objet datetime.

    Args:
        date (str): La chaîne représentant la date.
        DATE_FORMAT (str, optional): Le format de la date. Défaut est DATE_FORMAT.

    Returns:
        datetime: L'objet datetime correspondant.
    """
    return datetime.strptime(date, DATE_FORMAT)

get_oid()

Récupère l'identifiant Mongo (_id) de l'épisode.

Returns:

Type Description
Optional[ObjectId]

Optional[ObjectId]: L'ObjectId de l'épisode s'il existe, sinon None.

Source code in nbs/mongo_episode.py
317
318
319
320
321
322
323
324
325
326
327
def get_oid(self) -> Optional[ObjectId]:
    """Récupère l'identifiant Mongo (_id) de l'épisode.

    Returns:
        Optional[ObjectId]: L'ObjectId de l'épisode s'il existe, sinon None.
    """
    document = self.collection.find_one({"titre": self.titre, "date": self.date})
    if document:
        return document["_id"]
    else:
        return None

get_string_from_date(date, format=None) staticmethod

Convertit un objet datetime en chaîne de caractères.

Parameters:

Name Type Description Default
date datetime

L'objet datetime.

required
format Optional[str]

Le format de sortie. Si None, DATE_FORMAT est utilisé.

None

Returns:

Name Type Description
str str

La chaîne représentant la date.

Source code in nbs/mongo_episode.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
@staticmethod
def get_string_from_date(date: datetime, format: Optional[str] = None) -> str:
    """Convertit un objet datetime en chaîne de caractères.

    Args:
        date (datetime): L'objet datetime.
        format (Optional[str], optional): Le format de sortie. Si None, DATE_FORMAT est utilisé.

    Returns:
        str: La chaîne représentant la date.
    """
    if format is not None:
        return date.strftime(format)
    else:
        return date.strftime(DATE_FORMAT)

keep()

Télécharge le fichier audio si nécessaire et conserve l'épisode dans la base de données.

Returns:

Name Type Description
int int

1 si une nouvelle entrée est créée en base, 0 sinon.

Source code in nbs/mongo_episode.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def keep(self) -> int:
    """Télécharge le fichier audio si nécessaire et conserve l'épisode dans la base de données.

    Returns:
        int: 1 si une nouvelle entrée est créée en base, 0 sinon.
    """
    message_log = f"{Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)} - {self.titre}"
    if not self.exists():
        print(
            f"Episode du {Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)} nouveau: Duree: {self.duree}, Type: {self.type}"
        )
        mongolog("insert", self.collection.name, message_log)
        self.download_audio(verbose=True)
        self.collection.insert_one(
            {
                "titre": self.titre,
                "date": self.date,
                "description": self.description,
                "url": self.url_telechargement,
                "audio_rel_filename": self.audio_rel_filename,
                "transcription": self.transcription,
                "type": self.type,
                "duree": self.duree,
                "masked": self.masked,
            }
        )
        return 1
    else:
        print(
            f"Episode du {Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)} deja existant"
        )
        mongolog("update", self.collection.name, message_log)
        return 0

remove()

Supprime l'épisode de la base de données.

Source code in nbs/mongo_episode.py
311
312
313
314
315
def remove(self) -> None:
    """Supprime l'épisode de la base de données."""
    message_log = f"{Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)} - {self.titre}"
    self.collection.delete_one({"titre": self.titre, "date": self.date})
    mongolog("delete", self.collection.name, message_log)

set_transcription(verbose=False, keep_cache=True)

Extrait et sauvegarde la transcription de l'audio en utilisant un modèle Whisper.

Utilise le cache si disponible ou extrait la transcription de l'audio.

Parameters:

Name Type Description Default
verbose bool

Si True, affiche des messages d'information. Défaut False.

False
keep_cache bool

Si True, sauvegarde la transcription dans un fichier cache. Défaut True.

True
Source code in nbs/mongo_episode.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def set_transcription(self, verbose: bool = False, keep_cache: bool = True) -> None:
    """Extrait et sauvegarde la transcription de l'audio en utilisant un modèle Whisper.

    Utilise le cache si disponible ou extrait la transcription de l'audio.

    Args:
        verbose (bool, optional): Si True, affiche des messages d'information. Défaut False.
        keep_cache (bool, optional): Si True, sauvegarde la transcription dans un fichier cache. Défaut True.
    """
    if self.transcription is not None:
        if verbose:
            print("Transcription existe deja")
        return
    mp3_fullfilename = get_audio_path(AUDIO_PATH, year="") + self.audio_rel_filename
    cache_transcription_filename = f"{os.path.splitext(mp3_fullfilename)[0]}.txt"
    if os.path.exists(cache_transcription_filename):
        if verbose:
            print(f"Transcription cachee trouvee: {cache_transcription_filename}")
        with open(cache_transcription_filename, "r") as file:
            self.transcription = file.read()
        self.collection.update_one(
            {"_id": self.get_oid()},
            {"$set": {"transcription": self.transcription}},
        )
        return

    self.transcription = extract_whisper(mp3_fullfilename)
    if keep_cache:
        with open(cache_transcription_filename, "w") as f:
            f.write(self.transcription)
    self.collection.update_one(
        {"_id": self.get_oid()}, {"$set": {"transcription": self.transcription}}
    )

to_dict()

Convertit l'épisode en dictionnaire.

Returns:

Type Description
Dict[str, Union[str, datetime, int, None, bool]]

Dict[str, Union[str, datetime, int, None, bool]]: Dictionnaire contenant les informations de l'épisode. Les clés sont ['date', 'titre', 'description', 'url_telechargement', 'audio_rel_filename', 'transcription', 'type', 'duree', 'masked'].

Source code in nbs/mongo_episode.py
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
def to_dict(self) -> Dict[str, Union[str, datetime, int, None, bool]]:
    """Convertit l'épisode en dictionnaire.

    Returns:
        Dict[str, Union[str, datetime, int, None, bool]]: Dictionnaire contenant les informations de l'épisode.
            Les clés sont ['date', 'titre', 'description', 'url_telechargement', 'audio_rel_filename', 'transcription', 'type', 'duree', 'masked'].
    """
    return {
        "date": self.date,
        "titre": self.titre,
        "description": self.description,
        "url_telechargement": self.url_telechargement,
        "audio_rel_filename": self.audio_rel_filename,
        "transcription": self.transcription,
        "type": self.type,
        "duree": self.duree,
        "masked": self.masked,
    }

update_date(new_date)

Met à jour la date de l'épisode dans la base de données.

Parameters:

Name Type Description Default
new_date datetime

La nouvelle date de l'épisode.

required
Source code in nbs/mongo_episode.py
298
299
300
301
302
303
304
305
306
307
308
309
def update_date(self, new_date: datetime) -> None:
    """Met à jour la date de l'épisode dans la base de données.

    Args:
        new_date (datetime): La nouvelle date de l'épisode.
    """
    self.collection.update_one(
        {"_id": self.get_oid()}, {"$set": {"date": new_date}}
    )
    message_log = f"{Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)} - {self.titre} -> {Episode.get_string_from_date(new_date, format=LOG_DATE_FORMAT)}"
    self.date = new_date
    mongolog("force_update", self.collection.name, message_log)

Episodes

Classe pour rechercher et gérer la qualité des données des épisodes.

Cette classe permet par exemple de récupérer de nouvelles transcriptions en se connectant à la base de données MongoDB.

Source code in nbs/mongo_episode.py
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
class Episodes:
    """Classe pour rechercher et gérer la qualité des données des épisodes.

    Cette classe permet par exemple de récupérer de nouvelles transcriptions
    en se connectant à la base de données MongoDB.
    """

    def __init__(self, collection_name: str = "episodes") -> None:
        """Initialise une instance du gestionnaire d'épisodes.

        Se connecte à la base de données et charge les épisodes.

        Args:
            collection_name (str): Nom de la collection à utiliser. Par défaut "episodes".
        """
        DB_HOST, DB_NAME, _ = get_DB_VARS()
        self.collection = get_collection(
            target_db=DB_HOST, client_name=DB_NAME, collection_name=collection_name
        )
        # je ne charge plus par defaut tous les episodes c'est inefficace
        self.oid_episodes = []

    # def _load_all_episodes(self) -> List["Episode"]:
    #     """Charge tous les épisodes depuis la base de données.

    #     Returns:
    #         List[Episode]: Liste des épisodes chargés.
    #     """
    #     return self.get_entries()

    def get_entries(
        self, request: Any = "", limit: int = -1, include_masked: bool = False
    ):
        """
        Mets dans self.oid_episodes les oids correspondant à une requête spécifique, triés par date décroissante.
        Si limit est spécifié, seuls les limit premiers résultats sont conservés.
        Args:
            request (Any): Requête MongoDB à exécuter. Exemples:
                {"$or": [{"transcription": ""}, {"transcription": None}]}.
                Par défaut, une requête vide qui retourne tous les épisodes.
            include_masked (bool): Si False (par défaut), exclut les épisodes avec masked=True.
                Si True, inclut tous les épisodes y compris les masqués.
        """
        # Construire la requête finale en combinant request et le filtre masked
        if not include_masked:
            # Ajouter le filtre pour exclure les épisodes masqués
            masked_filter = {
                "$or": [{"masked": {"$ne": True}}, {"masked": {"$exists": False}}]
            }

            if request and request != "":
                # Combiner la requête existante avec le filtre masked
                final_request = {"$and": [request, masked_filter]}
            else:
                # Utiliser uniquement le filtre masked
                final_request = masked_filter
        else:
            # Utiliser la requête telle quelle sans filtrer masked
            final_request = request if request != "" else {}

        if limit == -1:
            results = self.collection.find(final_request, {"_id": 1}).sort({"date": -1})
        else:
            results = (
                self.collection.find(final_request, {"_id": 1})
                .sort({"date": -1})
                .limit(limit)
            )
        self.oid_episodes = [document["_id"] for document in results]

    def len_total_entries(self, include_masked: bool = False) -> int:
        """
        Retourne le nombre total d'épisodes dans la collection.

        Args:
            include_masked (bool): Si False (par défaut), exclut les épisodes masqués du comptage.
                Si True, compte tous les épisodes y compris les masqués.
        """
        if not include_masked:
            # Compter uniquement les épisodes non masqués
            masked_filter = {
                "$or": [{"masked": {"$ne": True}}, {"masked": {"$exists": False}}]
            }
            return self.collection.count_documents(masked_filter)
        else:
            # Compter tous les épisodes
            return self.collection.estimated_document_count()

    def get_missing_transcriptions(self):
        """
        Mets dans self.oid_episodes les oids correspondant aux épisodes sans transcription.
        """
        self.get_entries({"$or": [{"transcription": ""}, {"transcription": None}]})

    def get_transcriptions(self):
        """
        Mets dans self.oid_episodes les oids correspondant aux épisodes qui possèdent une transcription.
        """
        self.get_entries(
            {"$and": [{"transcription": {"$ne": None}}, {"transcription": {"$ne": ""}}]}
        )

    def __getitem__(self, index: int) -> "Episode":
        """Permet l'accès aux épisodes par indexation.

        Args:
            index (int): Position de l'épisode dans la liste.

        Returns:
            Episode: L'épisode à la position donnée.
        """
        return Episode.from_oid(self.oid_episodes[index])

    def __len__(self) -> int:
        """Retourne le nombre total d'épisodes dans oid_episodes.

        Returns:
            int: Nombre d'épisodes.
        """
        return len(self.oid_episodes)

    def __iter__(self) -> Iterator["Episode"]:
        """Permet d'itérer sur les épisodes.

        Returns:
            Iterator[Episode]: Itérateur sur la liste des épisodes.
        """
        return iter(self.oid_episodes)

__getitem__(index)

Permet l'accès aux épisodes par indexation.

Parameters:

Name Type Description Default
index int

Position de l'épisode dans la liste.

required

Returns:

Name Type Description
Episode Episode

L'épisode à la position donnée.

Source code in nbs/mongo_episode.py
912
913
914
915
916
917
918
919
920
921
def __getitem__(self, index: int) -> "Episode":
    """Permet l'accès aux épisodes par indexation.

    Args:
        index (int): Position de l'épisode dans la liste.

    Returns:
        Episode: L'épisode à la position donnée.
    """
    return Episode.from_oid(self.oid_episodes[index])

__init__(collection_name='episodes')

Initialise une instance du gestionnaire d'épisodes.

Se connecte à la base de données et charge les épisodes.

Parameters:

Name Type Description Default
collection_name str

Nom de la collection à utiliser. Par défaut "episodes".

'episodes'
Source code in nbs/mongo_episode.py
817
818
819
820
821
822
823
824
825
826
827
828
829
830
def __init__(self, collection_name: str = "episodes") -> None:
    """Initialise une instance du gestionnaire d'épisodes.

    Se connecte à la base de données et charge les épisodes.

    Args:
        collection_name (str): Nom de la collection à utiliser. Par défaut "episodes".
    """
    DB_HOST, DB_NAME, _ = get_DB_VARS()
    self.collection = get_collection(
        target_db=DB_HOST, client_name=DB_NAME, collection_name=collection_name
    )
    # je ne charge plus par defaut tous les episodes c'est inefficace
    self.oid_episodes = []

__iter__()

Permet d'itérer sur les épisodes.

Returns:

Type Description
Iterator[Episode]

Iterator[Episode]: Itérateur sur la liste des épisodes.

Source code in nbs/mongo_episode.py
931
932
933
934
935
936
937
def __iter__(self) -> Iterator["Episode"]:
    """Permet d'itérer sur les épisodes.

    Returns:
        Iterator[Episode]: Itérateur sur la liste des épisodes.
    """
    return iter(self.oid_episodes)

__len__()

Retourne le nombre total d'épisodes dans oid_episodes.

Returns:

Name Type Description
int int

Nombre d'épisodes.

Source code in nbs/mongo_episode.py
923
924
925
926
927
928
929
def __len__(self) -> int:
    """Retourne le nombre total d'épisodes dans oid_episodes.

    Returns:
        int: Nombre d'épisodes.
    """
    return len(self.oid_episodes)

get_entries(request='', limit=-1, include_masked=False)

Mets dans self.oid_episodes les oids correspondant à une requête spécifique, triés par date décroissante. Si limit est spécifié, seuls les limit premiers résultats sont conservés. Args: request (Any): Requête MongoDB à exécuter. Exemples: {"$or": [{"transcription": ""}, {"transcription": None}]}. Par défaut, une requête vide qui retourne tous les épisodes. include_masked (bool): Si False (par défaut), exclut les épisodes avec masked=True. Si True, inclut tous les épisodes y compris les masqués.

Source code in nbs/mongo_episode.py
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
def get_entries(
    self, request: Any = "", limit: int = -1, include_masked: bool = False
):
    """
    Mets dans self.oid_episodes les oids correspondant à une requête spécifique, triés par date décroissante.
    Si limit est spécifié, seuls les limit premiers résultats sont conservés.
    Args:
        request (Any): Requête MongoDB à exécuter. Exemples:
            {"$or": [{"transcription": ""}, {"transcription": None}]}.
            Par défaut, une requête vide qui retourne tous les épisodes.
        include_masked (bool): Si False (par défaut), exclut les épisodes avec masked=True.
            Si True, inclut tous les épisodes y compris les masqués.
    """
    # Construire la requête finale en combinant request et le filtre masked
    if not include_masked:
        # Ajouter le filtre pour exclure les épisodes masqués
        masked_filter = {
            "$or": [{"masked": {"$ne": True}}, {"masked": {"$exists": False}}]
        }

        if request and request != "":
            # Combiner la requête existante avec le filtre masked
            final_request = {"$and": [request, masked_filter]}
        else:
            # Utiliser uniquement le filtre masked
            final_request = masked_filter
    else:
        # Utiliser la requête telle quelle sans filtrer masked
        final_request = request if request != "" else {}

    if limit == -1:
        results = self.collection.find(final_request, {"_id": 1}).sort({"date": -1})
    else:
        results = (
            self.collection.find(final_request, {"_id": 1})
            .sort({"date": -1})
            .limit(limit)
        )
    self.oid_episodes = [document["_id"] for document in results]

get_missing_transcriptions()

Mets dans self.oid_episodes les oids correspondant aux épisodes sans transcription.

Source code in nbs/mongo_episode.py
898
899
900
901
902
def get_missing_transcriptions(self):
    """
    Mets dans self.oid_episodes les oids correspondant aux épisodes sans transcription.
    """
    self.get_entries({"$or": [{"transcription": ""}, {"transcription": None}]})

get_transcriptions()

Mets dans self.oid_episodes les oids correspondant aux épisodes qui possèdent une transcription.

Source code in nbs/mongo_episode.py
904
905
906
907
908
909
910
def get_transcriptions(self):
    """
    Mets dans self.oid_episodes les oids correspondant aux épisodes qui possèdent une transcription.
    """
    self.get_entries(
        {"$and": [{"transcription": {"$ne": None}}, {"transcription": {"$ne": ""}}]}
    )

len_total_entries(include_masked=False)

Retourne le nombre total d'épisodes dans la collection.

Parameters:

Name Type Description Default
include_masked bool

Si False (par défaut), exclut les épisodes masqués du comptage. Si True, compte tous les épisodes y compris les masqués.

False
Source code in nbs/mongo_episode.py
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
def len_total_entries(self, include_masked: bool = False) -> int:
    """
    Retourne le nombre total d'épisodes dans la collection.

    Args:
        include_masked (bool): Si False (par défaut), exclut les épisodes masqués du comptage.
            Si True, compte tous les épisodes y compris les masqués.
    """
    if not include_masked:
        # Compter uniquement les épisodes non masqués
        masked_filter = {
            "$or": [{"masked": {"$ne": True}}, {"masked": {"$exists": False}}]
        }
        return self.collection.count_documents(masked_filter)
    else:
        # Compter tous les épisodes
        return self.collection.estimated_document_count()

RSS_episode

Bases: Episode

Source code in nbs/mongo_episode.py
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
class RSS_episode(Episode):
    def __init__(self, date: str, titre: str) -> None:
        """
        Initialize an RSS_episode instance.

        Args:
            date (str): The episode date in the format "2024-12-22T09:59:39".
            titre (str): The title of the episode.
        """
        super().__init__(date, titre)

    @classmethod
    def from_feed_entry(cls, feed_entry: FeedParserDict) -> "RSS_episode":
        """
        Create an RSS_episode instance from an RSS feed entry.

        Args:
            feed_entry (FeedParserDict): The entry from the RSS feed.

        Returns:
            RSS_episode: The created RSS_episode instance.
        """
        locale.setlocale(locale.LC_TIME, "en_US.UTF-8")
        date_rss: datetime = datetime.strptime(feed_entry.published, RSS_DATE_FORMAT)
        date_rss_str: str = cls.get_string_from_date(date_rss, DATE_FORMAT)
        inst = cls(
            date=date_rss_str,
            titre=feed_entry.title,
        )
        inst.description = feed_entry.summary

        for link in feed_entry.links:
            if link.type == "audio/mpeg":
                inst.url_telechargement = link.href
                break

        inst.type = cls.set_titre(inst.titre + " " + inst.description)
        inst.duree = cls.get_duree_in_seconds(feed_entry.itunes_duration)  # in seconds

        return inst

    @staticmethod
    def get_duree_in_seconds(duree: str) -> int:
        """
        Convert a duration string into total seconds.

        The duration can be in formats "HH:MM:SS", "HH:MM", or simply seconds.

        Args:
            duree (str): The duration as a string.

        Returns:
            int: The duration expressed in total seconds.
        """
        duree_parts = duree.split(":")
        if len(duree_parts) == 3:
            return (
                int(duree_parts[0]) * 3600
                + int(duree_parts[1]) * 60
                + int(duree_parts[2])
            )
        elif len(duree_parts) == 2:
            return int(duree_parts[0]) * 60 + int(duree_parts[1])
        else:
            return int(duree_parts[0])

    def keep(self) -> int:
        """
        Save the episode to the database if conditions are met.

        The episode is saved if:
            - The duration is greater than RSS_DUREE_MINI_MINUTES * 60 seconds.
            - The type is equal to "livres".

        Returns:
            int: 1 if an entry is created in the database, 0 otherwise.
        """
        if (self.duree > RSS_DUREE_MINI_MINUTES * 60) and (self.type == "livres"):
            return super().keep()
        else:
            print(
                f"Episode du {Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)} ignored: Duree: {self.duree}, Type: {self.type}"
            )
            return 0

    @staticmethod
    def set_titre(description: str) -> str:
        """
        Classify the episode by using a zero-shot classification model from HuggingFace based on the provided description.

        Args:
            description (str): The description combining the title and summary.

        Returns:
            str: The label with the highest score among ["livres", "films", "pièces de théâtre"].
        """
        classifier = pipeline(
            "zero-shot-classification", model="facebook/bart-large-mnli"
        )
        labels = ["livres", "films", "pièces de théâtre"]

        result = classifier(description, labels)
        return result["labels"][0]

__init__(date, titre)

Initialize an RSS_episode instance.

Parameters:

Name Type Description Default
date str

The episode date in the format "2024-12-22T09:59:39".

required
titre str

The title of the episode.

required
Source code in nbs/mongo_episode.py
553
554
555
556
557
558
559
560
561
def __init__(self, date: str, titre: str) -> None:
    """
    Initialize an RSS_episode instance.

    Args:
        date (str): The episode date in the format "2024-12-22T09:59:39".
        titre (str): The title of the episode.
    """
    super().__init__(date, titre)

from_feed_entry(feed_entry) classmethod

Create an RSS_episode instance from an RSS feed entry.

Parameters:

Name Type Description Default
feed_entry FeedParserDict

The entry from the RSS feed.

required

Returns:

Name Type Description
RSS_episode RSS_episode

The created RSS_episode instance.

Source code in nbs/mongo_episode.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
@classmethod
def from_feed_entry(cls, feed_entry: FeedParserDict) -> "RSS_episode":
    """
    Create an RSS_episode instance from an RSS feed entry.

    Args:
        feed_entry (FeedParserDict): The entry from the RSS feed.

    Returns:
        RSS_episode: The created RSS_episode instance.
    """
    locale.setlocale(locale.LC_TIME, "en_US.UTF-8")
    date_rss: datetime = datetime.strptime(feed_entry.published, RSS_DATE_FORMAT)
    date_rss_str: str = cls.get_string_from_date(date_rss, DATE_FORMAT)
    inst = cls(
        date=date_rss_str,
        titre=feed_entry.title,
    )
    inst.description = feed_entry.summary

    for link in feed_entry.links:
        if link.type == "audio/mpeg":
            inst.url_telechargement = link.href
            break

    inst.type = cls.set_titre(inst.titre + " " + inst.description)
    inst.duree = cls.get_duree_in_seconds(feed_entry.itunes_duration)  # in seconds

    return inst

get_duree_in_seconds(duree) staticmethod

Convert a duration string into total seconds.

The duration can be in formats "HH:MM:SS", "HH:MM", or simply seconds.

Parameters:

Name Type Description Default
duree str

The duration as a string.

required

Returns:

Name Type Description
int int

The duration expressed in total seconds.

Source code in nbs/mongo_episode.py
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
@staticmethod
def get_duree_in_seconds(duree: str) -> int:
    """
    Convert a duration string into total seconds.

    The duration can be in formats "HH:MM:SS", "HH:MM", or simply seconds.

    Args:
        duree (str): The duration as a string.

    Returns:
        int: The duration expressed in total seconds.
    """
    duree_parts = duree.split(":")
    if len(duree_parts) == 3:
        return (
            int(duree_parts[0]) * 3600
            + int(duree_parts[1]) * 60
            + int(duree_parts[2])
        )
    elif len(duree_parts) == 2:
        return int(duree_parts[0]) * 60 + int(duree_parts[1])
    else:
        return int(duree_parts[0])

keep()

Save the episode to the database if conditions are met.

The episode is saved if
  • The duration is greater than RSS_DUREE_MINI_MINUTES * 60 seconds.
  • The type is equal to "livres".

Returns:

Name Type Description
int int

1 if an entry is created in the database, 0 otherwise.

Source code in nbs/mongo_episode.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
def keep(self) -> int:
    """
    Save the episode to the database if conditions are met.

    The episode is saved if:
        - The duration is greater than RSS_DUREE_MINI_MINUTES * 60 seconds.
        - The type is equal to "livres".

    Returns:
        int: 1 if an entry is created in the database, 0 otherwise.
    """
    if (self.duree > RSS_DUREE_MINI_MINUTES * 60) and (self.type == "livres"):
        return super().keep()
    else:
        print(
            f"Episode du {Episode.get_string_from_date(self.date, format=LOG_DATE_FORMAT)} ignored: Duree: {self.duree}, Type: {self.type}"
        )
        return 0

set_titre(description) staticmethod

Classify the episode by using a zero-shot classification model from HuggingFace based on the provided description.

Parameters:

Name Type Description Default
description str

The description combining the title and summary.

required

Returns:

Name Type Description
str str

The label with the highest score among ["livres", "films", "pièces de théâtre"].

Source code in nbs/mongo_episode.py
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
@staticmethod
def set_titre(description: str) -> str:
    """
    Classify the episode by using a zero-shot classification model from HuggingFace based on the provided description.

    Args:
        description (str): The description combining the title and summary.

    Returns:
        str: The label with the highest score among ["livres", "films", "pièces de théâtre"].
    """
    classifier = pipeline(
        "zero-shot-classification", model="facebook/bart-large-mnli"
    )
    labels = ["livres", "films", "pièces de théâtre"]

    result = classifier(description, labels)
    return result["labels"][0]

WEB_episode

Bases: Episode

Représente un épisode web avec ses attributs et méthodes de conversion et récupération des données.

Source code in nbs/mongo_episode.py
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
class WEB_episode(Episode):
    """Représente un épisode web avec ses attributs et méthodes de conversion et récupération des données."""

    def __init__(self, date: str, titre: str) -> None:
        """Initialise une instance de WEB_episode.

        Args:
            date (str): La date de l'épisode au format "2024-12-22T09:59:39".
            titre (str): Le titre de l'épisode.
        """
        super().__init__(date, titre)

    @staticmethod
    def parse_web_date(
        web_date: str, web_date_format: str = WEB_DATE_FORMAT
    ) -> Optional[datetime]:
        """Convertit une date en français extraite d'une page web en un objet datetime.

        Corrige les abréviations non standard pour certains mois (exemple : "fév." devient "févr.", "juill." devient "juil.").

        Args:
            web_date (str): La chaîne représentant la date en français.
            web_date_format (str, optional): Le format de la date utilisé par la page web. Defaults to WEB_DATE_FORMAT.

        Returns:
            Optional[datetime]: L'objet datetime si la conversion réussit, sinon None.
        """
        locale.setlocale(locale.LC_TIME, "fr_FR.UTF-8")

        def corrige_date(date_str: str) -> str:
            """Corrige les abréviations non standard dans la chaîne de date.

            Args:
                date_str (str): La chaîne de date originale.

            Returns:
                str: La chaîne de date corrigée.
            """
            month_replacements = {
                "fév.": "févr.",
                "juill.": "juil.",
            }
            for fr_month, fr_month_norm in month_replacements.items():
                date_str = date_str.replace(fr_month, fr_month_norm)
            return date_str

        try:
            dt: datetime = datetime.strptime(corrige_date(web_date), web_date_format)
            return dt
        except ValueError as e:
            print(f"Erreur de conversion pour la date '{web_date}': {e}")
            return None

    @staticmethod
    def get_audio_url(url: str) -> Optional[str]:
        """Récupère l'URL du fichier audio (.m4a ou .mp3) à partir de la page d'un épisode.

        Recherche dans une balise <script> contenant la clé "contentUrl".

        Args:
            url (str): L'URL de la page de l'épisode.

        Returns:
            Optional[str]: L'URL du fichier audio si trouvée, sinon None.
        """
        try:
            response: requests.Response = requests.get(url)
            response.raise_for_status()
        except requests.RequestException as e:
            print(f"Erreur lors de la requête HTTP: {e}")
            return None

        soup: BeautifulSoup = BeautifulSoup(response.content, "html.parser")
        script_tag = soup.find("script", string=lambda t: t and "contentUrl" in t)

        if script_tag:
            try:
                json_text: str = script_tag.string  # type: ignore
                json_data: Dict[str, Any] = json.loads(json_text)
                audio_url: Optional[str] = None
                for item in json_data.get("@graph", []):
                    if item.get("@type") == "RadioEpisode":
                        main_entity: Dict[str, Any] = item.get("mainEntity", {})
                        audio_url = main_entity.get("contentUrl")
                        break
                return audio_url
            except (json.JSONDecodeError, KeyError, TypeError) as e:
                print(f"Erreur lors de l'analyse du JSON: {e}")
                return None

        print("Balise <script> contenant 'contentUrl' non trouvée")
        return None

    @classmethod
    def from_webpage_entry(cls, dict_web_episode: Dict[str, Any]) -> "WEB_episode":
        """Crée une instance de WEB_episode à partir d'un dictionnaire représentant une entrée de page web.

        Le dictionnaire doit contenir les clés : 'title', 'url', 'description', 'date', 'duration'.
        La variable DATE_FORMAT et la méthode get_string_from_date doivent être définies ailleurs dans le code.

        Args:
            dict_web_episode (Dict[str, Any]): Dictionnaire contenant les informations de l'épisode.

        Returns:
            WEB_episode: Une instance de WEB_episode initialisée avec les données fournies.
        """
        date_web: Optional[datetime] = cls.parse_web_date(dict_web_episode["date"])
        date_web_str: str = cls.get_string_from_date(
            date_web, DATE_FORMAT
        )  # DATE_FORMAT doit être défini en amont
        inst: WEB_episode = cls(
            date=date_web_str,
            titre=dict_web_episode["title"],
        )
        inst.description = dict_web_episode["description"]
        inst.type = "livres"
        inst.url_telechargement = cls.get_audio_url(dict_web_episode["url"])
        inst.duree = cls.get_duree_in_seconds(dict_web_episode["duration"])
        return inst

    @staticmethod
    def get_duree_in_seconds(duree: str) -> int:
        """Convertit une durée exprimée en minutes ("MM min") en secondes.

        Args:
            duree (str): La durée sous forme de chaîne.

        Returns:
            int: La durée convertie en secondes. Retourne 0 si le format n'est pas correct.
        """
        parts = duree.split(" ")
        if len(parts) == 2:
            return int(parts[0]) * 60
        return 0

__init__(date, titre)

Initialise une instance de WEB_episode.

Parameters:

Name Type Description Default
date str

La date de l'épisode au format "2024-12-22T09:59:39".

required
titre str

Le titre de l'épisode.

required
Source code in nbs/mongo_episode.py
673
674
675
676
677
678
679
680
def __init__(self, date: str, titre: str) -> None:
    """Initialise une instance de WEB_episode.

    Args:
        date (str): La date de l'épisode au format "2024-12-22T09:59:39".
        titre (str): Le titre de l'épisode.
    """
    super().__init__(date, titre)

from_webpage_entry(dict_web_episode) classmethod

Crée une instance de WEB_episode à partir d'un dictionnaire représentant une entrée de page web.

Le dictionnaire doit contenir les clés : 'title', 'url', 'description', 'date', 'duration'. La variable DATE_FORMAT et la méthode get_string_from_date doivent être définies ailleurs dans le code.

Parameters:

Name Type Description Default
dict_web_episode Dict[str, Any]

Dictionnaire contenant les informations de l'épisode.

required

Returns:

Name Type Description
WEB_episode WEB_episode

Une instance de WEB_episode initialisée avec les données fournies.

Source code in nbs/mongo_episode.py
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
@classmethod
def from_webpage_entry(cls, dict_web_episode: Dict[str, Any]) -> "WEB_episode":
    """Crée une instance de WEB_episode à partir d'un dictionnaire représentant une entrée de page web.

    Le dictionnaire doit contenir les clés : 'title', 'url', 'description', 'date', 'duration'.
    La variable DATE_FORMAT et la méthode get_string_from_date doivent être définies ailleurs dans le code.

    Args:
        dict_web_episode (Dict[str, Any]): Dictionnaire contenant les informations de l'épisode.

    Returns:
        WEB_episode: Une instance de WEB_episode initialisée avec les données fournies.
    """
    date_web: Optional[datetime] = cls.parse_web_date(dict_web_episode["date"])
    date_web_str: str = cls.get_string_from_date(
        date_web, DATE_FORMAT
    )  # DATE_FORMAT doit être défini en amont
    inst: WEB_episode = cls(
        date=date_web_str,
        titre=dict_web_episode["title"],
    )
    inst.description = dict_web_episode["description"]
    inst.type = "livres"
    inst.url_telechargement = cls.get_audio_url(dict_web_episode["url"])
    inst.duree = cls.get_duree_in_seconds(dict_web_episode["duration"])
    return inst

get_audio_url(url) staticmethod

Récupère l'URL du fichier audio (.m4a ou .mp3) à partir de la page d'un épisode.

Recherche dans une balise