Proyectos de Subversion Moodle

Rev

| Ultima modificación | Ver Log |

Rev Autor Línea Nro. Línea
1 efrain 1
<?php
2
// This file is part of Moodle - http://moodle.org/
3
//
4
// Moodle is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8
//
9
// Moodle is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13
//
14
// You should have received a copy of the GNU General Public License
15
// along with Moodle.  If not, see <http://www.gnu.org/licenses/>.
16
 
17
/**
18
 * Native pgsql class representing moodle database interface.
19
 *
20
 * @package    core_dml
21
 * @copyright  2008 Petr Skoda (http://skodak.org)
22
 * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
23
 */
24
 
25
defined('MOODLE_INTERNAL') || die();
26
 
27
require_once(__DIR__.'/moodle_database.php');
28
require_once(__DIR__.'/moodle_read_slave_trait.php');
29
require_once(__DIR__.'/pgsql_native_moodle_recordset.php');
30
require_once(__DIR__.'/pgsql_native_moodle_temptables.php');
31
 
32
/**
33
 * Native pgsql class representing moodle database interface.
34
 *
35
 * @package    core_dml
36
 * @copyright  2008 Petr Skoda (http://skodak.org)
37
 * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
38
 */
39
class pgsql_native_moodle_database extends moodle_database {
40
    use moodle_read_slave_trait {
41
        select_db_handle as read_slave_select_db_handle;
42
        can_use_readonly as read_slave_can_use_readonly;
43
        query_start as read_slave_query_start;
44
        query_end as read_slave_query_end;
45
    }
46
 
47
    /** @var array $sslmodes */
48
    private static $sslmodes = [
49
        'disable',
50
        'prefer',
51
        'require',
52
        'verify-full'
53
    ];
54
 
55
    /** @var array $serverinfo cache */
56
    private $serverinfo = [];
57
 
58
    /** @var array $dbhcursor keep track of open cursors */
59
    private $dbhcursor = [];
60
 
61
    /** @var resource|PgSql\Connection|null $pgsql database resource */
62
    protected $pgsql     = null;
63
 
64
    protected $last_error_reporting; // To handle pgsql driver default verbosity
65
 
66
    /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
67
    protected $savepointpresent = false;
68
 
69
    /** @var int Number of cursors used (for constructing a unique ID) */
70
    protected $cursorcount = 0;
71
 
72
    /** @var int Default number of rows to fetch at a time when using recordsets with cursors */
73
    const DEFAULT_FETCH_BUFFER_SIZE = 100000;
74
 
75
    /**
76
     * Detects if all needed PHP stuff installed.
77
     * Note: can be used before connect()
78
     * @return mixed true if ok, string if something
79
     */
80
    public function driver_installed() {
81
        if (!extension_loaded('pgsql')) {
82
            return get_string('pgsqlextensionisnotpresentinphp', 'install');
83
        }
84
        return true;
85
    }
86
 
87
    /**
88
     * Returns database family type - describes SQL dialect
89
     * Note: can be used before connect()
90
     * @return string db family name (mysql, postgres, mssql, oracle, etc.)
91
     */
92
    public function get_dbfamily() {
93
        return 'postgres';
94
    }
95
 
96
    /**
97
     * Returns more specific database driver type
98
     * Note: can be used before connect()
99
     * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
100
     */
101
    protected function get_dbtype() {
102
        return 'pgsql';
103
    }
104
 
105
    /**
106
     * Returns general database library name
107
     * Note: can be used before connect()
108
     * @return string db type pdo, native
109
     */
110
    protected function get_dblibrary() {
111
        return 'native';
112
    }
113
 
114
    /**
115
     * Returns localised database type name
116
     * Note: can be used before connect()
117
     * @return string
118
     */
119
    public function get_name() {
120
        return get_string('nativepgsql', 'install');
121
    }
122
 
123
    /**
124
     * Returns localised database configuration help.
125
     * Note: can be used before connect()
126
     * @return string
127
     */
128
    public function get_configuration_help() {
129
        return get_string('nativepgsqlhelp', 'install');
130
    }
131
 
132
    /**
133
     * Connect to db
134
     * @param string $dbhost The database host.
135
     * @param string $dbuser The database username.
136
     * @param string $dbpass The database username's password.
137
     * @param string $dbname The name of the database being connected to.
138
     * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
139
     * @param array $dboptions driver specific options
140
     * @return bool true
141
     * @throws moodle_exception
142
     * @throws dml_connection_exception if error
143
     */
144
    public function raw_connect(string $dbhost, string $dbuser, string $dbpass, string $dbname, $prefix, array $dboptions=null): bool {
145
        if ($prefix == '' and !$this->external) {
146
            //Enforce prefixes for everybody but mysql
147
            throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
148
        }
149
 
150
        $driverstatus = $this->driver_installed();
151
 
152
        if ($driverstatus !== true) {
153
            throw new dml_exception('dbdriverproblem', $driverstatus);
154
        }
155
 
156
        $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
157
 
158
        $pass = addcslashes($this->dbpass, "'\\");
159
 
160
        // Unix socket connections should have lower overhead
161
        if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
162
            $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
163
            if (strpos($this->dboptions['dbsocket'], '/') !== false) {
164
                // A directory was specified as the socket location.
165
                $connection .= " host='".$this->dboptions['dbsocket']."'";
166
            }
167
            if (!empty($this->dboptions['dbport'])) {
168
                // A port as specified, add it to the connection as it's used as part of the socket path.
169
                $connection .= " port ='".$this->dboptions['dbport']."'";
170
            }
171
        } else {
172
            $this->dboptions['dbsocket'] = '';
173
            if (empty($this->dbname)) {
174
                // probably old style socket connection - do not add port
175
                $port = "";
176
            } else if (empty($this->dboptions['dbport'])) {
177
                $port = "port ='5432'";
178
            } else {
179
                $port = "port ='".$this->dboptions['dbport']."'";
180
            }
181
            $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
182
        }
183
 
184
        if (!empty($this->dboptions['connecttimeout'])) {
185
            $connection .= " connect_timeout=".$this->dboptions['connecttimeout'];
186
        }
187
 
188
        if (empty($this->dboptions['dbhandlesoptions'])) {
189
            // ALTER USER and ALTER DATABASE are overridden by these settings.
190
            $options = array('--client_encoding=utf8', '--standard_conforming_strings=on');
191
            // Select schema if specified, otherwise the first one wins.
192
            if (!empty($this->dboptions['dbschema'])) {
193
                $options[] = "-c search_path=" . addcslashes($this->dboptions['dbschema'], "'\\");
194
            }
195
 
196
            $connection .= " options='" . implode(' ', $options) . "'";
197
        }
198
 
199
        if (isset($this->dboptions['ssl'])) {
200
            $sslmode = $this->dboptions['ssl'];
201
            if (!in_array($sslmode, self::$sslmodes, true)) {
202
                throw new moodle_exception('validateerrorlist', 'admin', '', "'dboptions''ssl': $sslmode");
203
            }
204
            $connection .= " sslmode=$sslmode";
205
        }
206
 
207
        ob_start();
208
        // It seems that pg_connect() handles some errors differently.
209
        // For example, name resolution error will raise an exception, and non-existing
210
        // database or wrong credentials will just return false.
211
        // We need to cater for both.
212
        try {
213
            if (empty($this->dboptions['dbpersist'])) {
214
                $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
215
            } else {
216
                $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
217
            }
218
            $dberr = ob_get_contents();
219
        } catch (\Exception $e) {
220
            $dberr = $e->getMessage();
221
        }
222
        ob_end_clean();
223
 
224
        $status = $this->pgsql ? pg_connection_status($this->pgsql) : false;
225
 
226
        if ($status === false or $status === PGSQL_CONNECTION_BAD) {
227
            $this->pgsql = null;
228
            throw new dml_connection_exception($dberr);
229
        }
230
 
231
        if (!empty($this->dboptions['dbpersist'])) {
232
            // There are rare situations (such as PHP out of memory errors) when open cursors may
233
            // not be closed at the end of a connection. When using persistent connections, the
234
            // cursors remain open and 'get in the way' of future connections. To avoid this
235
            // problem, close all cursors here.
236
            $result = pg_query($this->pgsql, 'CLOSE ALL');
237
            if ($result) {
238
                pg_free_result($result);
239
            }
240
        }
241
 
242
        if (!empty($this->dboptions['dbhandlesoptions'])) {
243
            /* We don't trust people who just set the dbhandlesoptions, this code checks up on them.
244
             * These functions do not talk to the server, they use the client library knowledge to determine state.
245
             */
246
            if (!empty($this->dboptions['dbschema'])) {
247
                throw new dml_connection_exception('You cannot specify a schema with dbhandlesoptions, use the database to set it.');
248
            }
249
            if (pg_client_encoding($this->pgsql) != 'UTF8') {
250
                throw new dml_connection_exception('client_encoding = UTF8 not set, it is: ' . pg_client_encoding($this->pgsql));
251
            }
252
            if (pg_escape_string($this->pgsql, '\\') != '\\') {
253
                throw new dml_connection_exception('standard_conforming_strings = on, must be set at the database.');
254
            }
255
        }
256
 
257
        // Connection stabilised and configured, going to instantiate the temptables controller
258
        $this->temptables = new pgsql_native_moodle_temptables($this);
259
 
260
        return true;
261
    }
262
 
263
    /**
264
     * Close database connection and release all resources
265
     * and memory (especially circular memory references).
266
     * Do NOT use connect() again, create a new instance if needed.
267
     */
268
    public function dispose() {
269
        parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
270
        if ($this->pgsql) {
271
            pg_close($this->pgsql);
272
            $this->pgsql = null;
273
        }
274
    }
275
 
276
    /**
277
     * Gets db handle currently used with queries
278
     * @return resource
279
     */
280
    protected function get_db_handle() {
281
        return $this->pgsql;
282
    }
283
 
284
    /**
285
     * Sets db handle to be used with subsequent queries
286
     * @param resource $dbh
287
     * @return void
288
     */
289
    protected function set_db_handle($dbh): void {
290
        $this->pgsql = $dbh;
291
    }
292
 
293
    /**
294
     * Select appropriate db handle - readwrite or readonly
295
     * @param int $type type of query
296
     * @param string $sql
297
     * @return void
298
     */
299
    protected function select_db_handle(int $type, string $sql): void {
300
        $this->read_slave_select_db_handle($type, $sql);
301
 
302
        if (preg_match('/^DECLARE (crs\w*) NO SCROLL CURSOR/', $sql, $match)) {
303
            $cursor = $match[1];
304
            $this->dbhcursor[$cursor] = $this->pgsql;
305
        }
306
        if (preg_match('/^(?:FETCH \d+ FROM|CLOSE) (crs\w*)\b/', $sql, $match)) {
307
            $cursor = $match[1];
308
            $this->pgsql = $this->dbhcursor[$cursor];
309
        }
310
    }
311
 
312
    /**
313
     * Check if The query qualifies for readonly connection execution
314
     * Logging queries are exempt, those are write operations that circumvent
315
     * standard query_start/query_end paths.
316
     * @param int $type type of query
317
     * @param string $sql
318
     * @return bool
319
     */
320
    protected function can_use_readonly(int $type, string $sql): bool {
321
        // ... pg_*lock queries always go to master.
322
        if (preg_match('/\bpg_\w*lock/', $sql)) {
323
            return false;
324
        }
325
 
326
        // ... a nuisance - temptables use this.
327
        if (preg_match('/\bpg_catalog/', $sql) && $this->temptables->get_temptables()) {
328
            return false;
329
        }
330
 
331
        return $this->read_slave_can_use_readonly($type, $sql);
332
 
333
    }
334
 
335
    /**
336
     * Called before each db query.
337
     * @param string $sql
338
     * @param array|null $params An array of parameters.
339
     * @param int $type type of query
340
     * @param mixed $extrainfo driver specific extra information
341
     * @return void
342
     */
343
    protected function query_start($sql, ?array $params, $type, $extrainfo=null) {
344
        $this->read_slave_query_start($sql, $params, $type, $extrainfo);
345
        // pgsql driver tends to send debug to output, we do not need that.
346
        $this->last_error_reporting = error_reporting(0);
347
    }
348
 
349
    /**
350
     * Called immediately after each db query.
351
     * @param mixed db specific result
352
     * @return void
353
     */
354
    protected function query_end($result) {
355
        // reset original debug level
356
        error_reporting($this->last_error_reporting);
357
        try {
358
            $this->read_slave_query_end($result);
359
            if ($this->savepointpresent &&
360
                    !in_array(
361
                        $this->last_type,
362
                        [SQL_QUERY_AUX, SQL_QUERY_AUX_READONLY, SQL_QUERY_SELECT],
363
                        true
364
                    )) {
365
                $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
366
                if ($res) {
367
                    pg_free_result($res);
368
                }
369
            }
370
        } catch (Exception $e) {
371
            if ($this->savepointpresent) {
372
                $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
373
                if ($res) {
374
                    pg_free_result($res);
375
                }
376
            }
377
            throw $e;
378
        }
379
    }
380
 
381
    /**
382
     * Returns database server info array
383
     * @return array Array containing 'description' and 'version' info
384
     */
385
    public function get_server_info(): array {
386
        if (empty($this->serverinfo)) {
387
            $this->query_start('--pg_version()', null, SQL_QUERY_AUX);
388
            $this->serverinfo = pg_version($this->pgsql);
389
            $this->query_end(true);
390
        }
391
        return [
392
            'description' => $this->serverinfo['server'],
393
            'version' => $this->serverinfo['server'],
394
        ];
395
    }
396
 
397
    /**
398
     * Returns supported query parameter types
399
     * @return int bitmask of accepted SQL_PARAMS_*
400
     */
401
    protected function allowed_param_types() {
402
        return SQL_PARAMS_DOLLAR;
403
    }
404
 
405
    /**
406
     * Returns last error reported by database engine.
407
     * @return string error message
408
     */
409
    public function get_last_error() {
410
        return pg_last_error($this->pgsql);
411
    }
412
 
413
    /**
414
     * Return tables in database WITHOUT current prefix.
415
     * @param bool $usecache if true, returns list of cached tables.
416
     * @return array of table names in lowercase and without prefix
417
     */
418
    public function get_tables($usecache=true) {
419
        if ($usecache and $this->tables !== null) {
420
            return $this->tables;
421
        }
422
        $this->tables = array();
423
        $prefix = str_replace('_', '|_', $this->prefix);
424
        $sql = "SELECT c.relname
425
                  FROM pg_catalog.pg_class c
426
                  JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
427
                 WHERE c.relname LIKE '$prefix%' ESCAPE '|'
428
                       AND c.relkind = 'r'
429
                       AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
430
        $this->query_start($sql, null, SQL_QUERY_AUX_READONLY);
431
        $result = pg_query($this->pgsql, $sql);
432
        $this->query_end($result);
433
 
434
        if ($result) {
435
            while ($row = pg_fetch_row($result)) {
436
                $tablename = reset($row);
437
                if ($this->prefix !== false && $this->prefix !== '') {
438
                    if (strpos($tablename, $this->prefix) !== 0) {
439
                        continue;
440
                    }
441
                    $tablename = substr($tablename, strlen($this->prefix));
442
                }
443
                $this->tables[$tablename] = $tablename;
444
            }
445
            pg_free_result($result);
446
        }
447
        return $this->tables;
448
    }
449
 
450
    /**
451
     * Constructs 'IN()' or '=' sql fragment
452
     *
453
     * Method overriding {@see moodle_database::get_in_or_equal} to be able to use
454
     * more than 65535 elements in $items array.
455
     *
456
     * @param mixed $items A single value or array of values for the expression.
457
     * @param int $type Parameter bounding type : SQL_PARAMS_QM or SQL_PARAMS_NAMED.
458
     * @param string $prefix Named parameter placeholder prefix (a unique counter value is appended to each parameter name).
459
     * @param bool $equal True means we want to equate to the constructed expression, false means we don't want to equate to it.
460
     * @param mixed $onemptyitems This defines the behavior when the array of items provided is empty. Defaults to false,
461
     *              meaning throw exceptions. Other values will become part of the returned SQL fragment.
462
     * @throws coding_exception | dml_exception
463
     * @return array A list containing the constructed sql fragment and an array of parameters.
464
     */
465
    public function get_in_or_equal($items, $type=SQL_PARAMS_QM, $prefix='param', $equal=true, $onemptyitems=false): array {
466
        // We only interfere if number of items in expression exceeds 16 bit value.
467
        if (!is_array($items) || count($items) < 65535) {
468
            return parent::get_in_or_equal($items, $type, $prefix,  $equal, $onemptyitems);
469
        }
470
 
471
        // Determine the type from the first value. We don't need to be very smart here,
472
        // it is developer's responsibility to make sure that variable type is matching
473
        // field type, if not the case, DB engine will hint. Also mixing types won't work
474
        // here anyway, so we ignore NULL or boolean (unlikely you need 56k values of
475
        // these types only).
476
        $cast = is_string(current($items)) ? '::text' : '::bigint';
477
 
478
        if ($type == SQL_PARAMS_QM) {
479
            if ($equal) {
480
                $sql = 'IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))';
481
            } else {
482
                $sql = 'NOT IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))';
483
            }
484
            $params = array_values($items);
485
        } else if ($type == SQL_PARAMS_NAMED) {
486
            if (empty($prefix)) {
487
                $prefix = 'param';
488
            }
489
            $params = [];
490
            $sql = [];
491
            foreach ($items as $item) {
492
                $param = $prefix.$this->inorequaluniqueindex++;
493
                $params[$param] = $item;
494
                $sql[] = ':'.$param.$cast;
495
            }
496
            if ($equal) {
497
                $sql = 'IN (VALUES ('.implode('),(', $sql).'))';
498
            } else {
499
                $sql = 'NOT IN (VALUES ('.implode('),(', $sql).'))';
500
            }
501
        } else {
502
            throw new dml_exception('typenotimplement');
503
        }
504
        return [$sql, $params];
505
    }
506
 
507
    /**
508
     * Return table indexes - everything lowercased.
509
     * @param string $table The table we want to get indexes from.
510
     * @return array of arrays
511
     */
512
    public function get_indexes($table) {
513
        $indexes = array();
514
        $tablename = $this->prefix.$table;
515
 
516
        $sql = "SELECT i.*
517
                  FROM pg_catalog.pg_indexes i
518
                  JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
519
                 WHERE i.tablename = '$tablename'
520
                       AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
521
 
522
        $this->query_start($sql, null, SQL_QUERY_AUX_READONLY);
523
        $result = pg_query($this->pgsql, $sql);
524
        $this->query_end($result);
525
 
526
        if ($result) {
527
            while ($row = pg_fetch_assoc($result)) {
528
                // The index definition could be generated schema-qualifying the target table name
529
                // for safety, depending on the pgsql version (CVE-2018-1058).
530
                if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON (|'.$row['schemaname'].'\.)'.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
531
                    continue;
532
                }
533
                if ($matches[5] === 'id') {
534
                    continue;
535
                }
536
                $columns = explode(',', $matches[5]);
537
                foreach ($columns as $k=>$column) {
538
                    $column = trim($column);
539
                    if ($pos = strpos($column, ' ')) {
540
                        // index type is separated by space
541
                        $column = substr($column, 0, $pos);
542
                    }
543
                    $columns[$k] = $this->trim_quotes($column);
544
                }
545
                $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
546
                                              'columns'=>$columns);
547
            }
548
            pg_free_result($result);
549
        }
550
        return $indexes;
551
    }
552
 
553
    /**
554
     * Returns detailed information about columns in table.
555
     *
556
     * @param string $table name
557
     * @return database_column_info[] array of database_column_info objects indexed with column names
558
     */
559
    protected function fetch_columns(string $table): array {
560
        $structure = array();
561
 
562
        $tablename = $this->prefix.$table;
563
 
564
        $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef,
565
                       CASE WHEN a.atthasdef THEN pg_catalog.pg_get_expr(d.adbin, d.adrelid) ELSE '' END AS adsrc
566
                  FROM pg_catalog.pg_class c
567
                  JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
568
                  JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
569
                  JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
570
             LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
571
                 WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
572
                       AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
573
              ORDER BY a.attnum";
574
 
575
        $this->query_start($sql, null, SQL_QUERY_AUX_READONLY);
576
        $result = pg_query($this->pgsql, $sql);
577
        $this->query_end($result);
578
 
579
        if (!$result) {
580
            return array();
581
        }
582
        while ($rawcolumn = pg_fetch_object($result)) {
583
 
584
            $info = new stdClass();
585
            $info->name = $rawcolumn->field;
586
            $matches = null;
587
 
588
            if ($rawcolumn->type === 'varchar') {
589
                $info->type          = 'varchar';
590
                $info->meta_type     = 'C';
591
                $info->max_length    = $rawcolumn->atttypmod - 4;
592
                $info->scale         = null;
593
                $info->not_null      = ($rawcolumn->attnotnull === 't');
594
                $info->has_default   = ($rawcolumn->atthasdef === 't');
595
                if ($info->has_default) {
596
                    $parts = explode('::', $rawcolumn->adsrc);
597
                    if (count($parts) > 1) {
598
                        $info->default_value = reset($parts);
599
                        $info->default_value = trim($info->default_value, "'");
600
                    } else {
601
                        $info->default_value = $rawcolumn->adsrc;
602
                    }
603
                } else {
604
                    $info->default_value = null;
605
                }
606
                $info->primary_key   = false;
607
                $info->binary        = false;
608
                $info->unsigned      = null;
609
                $info->auto_increment= false;
610
                $info->unique        = null;
611
 
612
            } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
613
                $info->type = 'int';
614
                if (strpos($rawcolumn->adsrc ?? '', 'nextval') === 0) {
615
                    $info->primary_key   = true;
616
                    $info->meta_type     = 'R';
617
                    $info->unique        = true;
618
                    $info->auto_increment= true;
619
                    $info->has_default   = false;
620
                } else {
621
                    $info->primary_key   = false;
622
                    $info->meta_type     = 'I';
623
                    $info->unique        = null;
624
                    $info->auto_increment= false;
625
                    $info->has_default   = ($rawcolumn->atthasdef === 't');
626
                }
627
                // Return number of decimals, not bytes here.
628
                if ($matches[1] >= 8) {
629
                    $info->max_length = 18;
630
                } else if ($matches[1] >= 4) {
631
                    $info->max_length = 9;
632
                } else if ($matches[1] >= 2) {
633
                    $info->max_length = 4;
634
                } else if ($matches[1] >= 1) {
635
                    $info->max_length = 2;
636
                } else {
637
                    $info->max_length = 0;
638
                }
639
                $info->scale         = null;
640
                $info->not_null      = ($rawcolumn->attnotnull === 't');
641
                if ($info->has_default) {
642
                    // PG 9.5+ uses ::<TYPE> syntax for some defaults.
643
                    $parts = explode('::', $rawcolumn->adsrc);
644
                    if (count($parts) > 1) {
645
                        $info->default_value = reset($parts);
646
                    } else {
647
                        $info->default_value = $rawcolumn->adsrc;
648
                    }
649
                    $info->default_value = trim($info->default_value, "()'");
650
                } else {
651
                    $info->default_value = null;
652
                }
653
                $info->binary        = false;
654
                $info->unsigned      = false;
655
 
656
            } else if ($rawcolumn->type === 'numeric') {
657
                $info->type = $rawcolumn->type;
658
                $info->meta_type     = 'N';
659
                $info->primary_key   = false;
660
                $info->binary        = false;
661
                $info->unsigned      = null;
662
                $info->auto_increment= false;
663
                $info->unique        = null;
664
                $info->not_null      = ($rawcolumn->attnotnull === 't');
665
                $info->has_default   = ($rawcolumn->atthasdef === 't');
666
                if ($info->has_default) {
667
                    // PG 9.5+ uses ::<TYPE> syntax for some defaults.
668
                    $parts = explode('::', $rawcolumn->adsrc);
669
                    if (count($parts) > 1) {
670
                        $info->default_value = reset($parts);
671
                    } else {
672
                        $info->default_value = $rawcolumn->adsrc;
673
                    }
674
                    $info->default_value = trim($info->default_value, "()'");
675
                } else {
676
                    $info->default_value = null;
677
                }
678
                $info->max_length    = $rawcolumn->atttypmod >> 16;
679
                $info->scale         = ($rawcolumn->atttypmod & 0xFFFF) - 4;
680
 
681
            } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
682
                $info->type = 'float';
683
                $info->meta_type     = 'N';
684
                $info->primary_key   = false;
685
                $info->binary        = false;
686
                $info->unsigned      = null;
687
                $info->auto_increment= false;
688
                $info->unique        = null;
689
                $info->not_null      = ($rawcolumn->attnotnull === 't');
690
                $info->has_default   = ($rawcolumn->atthasdef === 't');
691
                if ($info->has_default) {
692
                    // PG 9.5+ uses ::<TYPE> syntax for some defaults.
693
                    $parts = explode('::', $rawcolumn->adsrc);
694
                    if (count($parts) > 1) {
695
                        $info->default_value = reset($parts);
696
                    } else {
697
                        $info->default_value = $rawcolumn->adsrc;
698
                    }
699
                    $info->default_value = trim($info->default_value, "()'");
700
                } else {
701
                    $info->default_value = null;
702
                }
703
                // just guess expected number of deciaml places :-(
704
                if ($matches[1] == 8) {
705
                    // total 15 digits
706
                    $info->max_length = 8;
707
                    $info->scale      = 7;
708
                } else {
709
                    // total 6 digits
710
                    $info->max_length = 4;
711
                    $info->scale      = 2;
712
                }
713
 
714
            } else if ($rawcolumn->type === 'text') {
715
                $info->type          = $rawcolumn->type;
716
                $info->meta_type     = 'X';
717
                $info->max_length    = -1;
718
                $info->scale         = null;
719
                $info->not_null      = ($rawcolumn->attnotnull === 't');
720
                $info->has_default   = ($rawcolumn->atthasdef === 't');
721
                if ($info->has_default) {
722
                    $parts = explode('::', $rawcolumn->adsrc);
723
                    if (count($parts) > 1) {
724
                        $info->default_value = reset($parts);
725
                        $info->default_value = trim($info->default_value, "'");
726
                    } else {
727
                        $info->default_value = $rawcolumn->adsrc;
728
                    }
729
                } else {
730
                    $info->default_value = null;
731
                }
732
                $info->primary_key   = false;
733
                $info->binary        = false;
734
                $info->unsigned      = null;
735
                $info->auto_increment= false;
736
                $info->unique        = null;
737
 
738
            } else if ($rawcolumn->type === 'bytea') {
739
                $info->type          = $rawcolumn->type;
740
                $info->meta_type     = 'B';
741
                $info->max_length    = -1;
742
                $info->scale         = null;
743
                $info->not_null      = ($rawcolumn->attnotnull === 't');
744
                $info->has_default   = false;
745
                $info->default_value = null;
746
                $info->primary_key   = false;
747
                $info->binary        = true;
748
                $info->unsigned      = null;
749
                $info->auto_increment= false;
750
                $info->unique        = null;
751
 
752
            }
753
 
754
            $structure[$info->name] = new database_column_info($info);
755
        }
756
 
757
        pg_free_result($result);
758
 
759
        return $structure;
760
    }
761
 
762
    /**
763
     * Normalise values based in RDBMS dependencies (booleans, LOBs...)
764
     *
765
     * @param database_column_info $column column metadata corresponding with the value we are going to normalise
766
     * @param mixed $value value we are going to normalise
767
     * @return mixed the normalised value
768
     */
769
    protected function normalise_value($column, $value) {
770
        $this->detect_objects($value);
771
 
772
        if (is_bool($value)) { // Always, convert boolean to int
773
            $value = (int)$value;
774
 
775
        } else if ($column->meta_type === 'B') {
776
            if (!is_null($value)) {
777
                // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape
778
                // \ and produce data errors.  This is set on the connection.
779
                $value = pg_escape_bytea($this->pgsql, $value);
780
            }
781
 
782
        } else if ($value === '') {
783
            if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
784
                $value = 0; // prevent '' problems in numeric fields
785
            }
786
        }
787
        return $value;
788
    }
789
 
790
    /**
791
     * Is db in unicode mode?
792
     * @return bool
793
     */
794
    public function setup_is_unicodedb() {
795
        // Get PostgreSQL server_encoding value
796
        $sql = 'SHOW server_encoding';
797
        $this->query_start($sql, null, SQL_QUERY_AUX_READONLY);
798
        $result = pg_query($this->pgsql, $sql);
799
        $this->query_end($result);
800
 
801
        if (!$result) {
802
            return false;
803
        }
804
        $rawcolumn = pg_fetch_object($result);
805
        $encoding = $rawcolumn->server_encoding;
806
        pg_free_result($result);
807
 
808
        return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
809
    }
810
 
811
    /**
812
     * Do NOT use in code, to be used by database_manager only!
813
     * @param string|array $sql query
814
     * @param array|null $tablenames an array of xmldb table names affected by this request.
815
     * @return bool true
816
     * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors.
817
     */
818
    public function change_database_structure($sql, $tablenames = null) {
819
        $this->get_manager(); // Includes DDL exceptions classes ;-)
820
        if (is_array($sql)) {
821
            $sql = implode("\n;\n", $sql);
822
        }
823
        if (!$this->is_transaction_started()) {
824
            // It is better to do all or nothing, this helps with recovery...
825
            $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT";
826
        }
827
 
828
        try {
829
            $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
830
            $result = pg_query($this->pgsql, $sql);
831
            $this->query_end($result);
832
            pg_free_result($result);
833
        } catch (ddl_change_structure_exception $e) {
834
            if (!$this->is_transaction_started()) {
835
                $result = @pg_query($this->pgsql, "ROLLBACK");
836
                @pg_free_result($result);
837
            }
838
            $this->reset_caches($tablenames);
839
            throw $e;
840
        }
841
 
842
        $this->reset_caches($tablenames);
843
        return true;
844
    }
845
 
846
    /**
847
     * Execute general sql query. Should be used only when no other method suitable.
848
     * Do NOT use this to make changes in db structure, use database_manager methods instead!
849
     * @param string $sql query
850
     * @param array $params query parameters
851
     * @return bool true
852
     * @throws dml_exception A DML specific exception is thrown for any errors.
853
     */
854
    public function execute($sql, array $params=null) {
855
        list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
856
 
857
        if (strpos($sql, ';') !== false) {
858
            throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
859
        }
860
 
861
        $this->query_start($sql, $params, SQL_QUERY_UPDATE);
862
        $result = pg_query_params($this->pgsql, $sql, $params);
863
        $this->query_end($result);
864
 
865
        pg_free_result($result);
866
        return true;
867
    }
868
 
869
    /**
870
     * Get a number of records as a moodle_recordset using a SQL statement.
871
     *
872
     * Since this method is a little less readable, use of it should be restricted to
873
     * code where it's possible there might be large datasets being returned.  For known
874
     * small datasets use get_records_sql - it leads to simpler code.
875
     *
876
     * The return type is like:
877
     * @see function get_recordset.
878
     *
879
     * @param string $sql the SQL select query to execute.
880
     * @param array $params array of sql parameters
881
     * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
882
     * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
883
     * @return moodle_recordset instance
884
     * @throws dml_exception A DML specific exception is thrown for any errors.
885
     */
886
    public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
887
 
888
        list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
889
 
890
        if ($limitnum) {
891
            $sql .= " LIMIT $limitnum";
892
        }
893
        if ($limitfrom) {
894
            $sql .= " OFFSET $limitfrom";
895
        }
896
 
897
        list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
898
 
899
        // For any query that doesn't explicitly specify a limit, we must use cursors to stop it
900
        // loading the entire thing (unless the config setting is turned off).
901
        $usecursors = !$limitnum && ($this->get_fetch_buffer_size() > 0);
902
        if ($usecursors) {
903
            // Work out the cursor unique identifer. This is based on a simple count used which
904
            // should be OK because the identifiers only need to be unique within the current
905
            // transaction.
906
            $this->cursorcount++;
907
            $cursorname = 'crs' . $this->cursorcount;
908
 
909
            // Do the query to a cursor.
910
            $sql = 'DECLARE ' . $cursorname . ' NO SCROLL CURSOR WITH HOLD FOR ' . $sql;
911
        } else {
912
            $cursorname = '';
913
        }
914
 
915
        $this->query_start($sql, $params, SQL_QUERY_SELECT);
916
 
917
        $result = pg_query_params($this->pgsql, $sql, $params);
918
 
919
        $this->query_end($result);
920
        if ($usecursors) {
921
            pg_free_result($result);
922
            $result = null;
923
        }
924
 
925
        return new pgsql_native_moodle_recordset($result, $this, $cursorname);
926
    }
927
 
928
    /**
929
     * Gets size of fetch buffer used for recordset queries.
930
     *
931
     * If this returns 0 then cursors will not be used, meaning recordset queries will occupy enough
932
     * memory as needed for the Postgres library to hold the entire query results in memory.
933
     *
934
     * @return int Fetch buffer size or 0 indicating not to use cursors
935
     */
936
    protected function get_fetch_buffer_size() {
937
        if (array_key_exists('fetchbuffersize', $this->dboptions)) {
938
            return (int)$this->dboptions['fetchbuffersize'];
939
        } else {
940
            return self::DEFAULT_FETCH_BUFFER_SIZE;
941
        }
942
    }
943
 
944
    /**
945
     * Retrieves data from cursor. For use by recordset only; do not call directly.
946
     *
947
     * Return value contains the next batch of Postgres data, and a boolean indicating if this is
948
     * definitely the last batch (if false, there may be more)
949
     *
950
     * @param string $cursorname Name of cursor to read from
951
     * @return array Array with 2 elements (next data batch and boolean indicating last batch)
952
     */
953
    public function fetch_from_cursor($cursorname) {
954
        $count = $this->get_fetch_buffer_size();
955
 
956
        $sql = 'FETCH ' . $count . ' FROM ' . $cursorname;
957
 
958
        $this->query_start($sql, [], SQL_QUERY_AUX);
959
        $result = pg_query($this->pgsql, $sql);
960
        $last = pg_num_rows($result) !== $count;
961
 
962
        $this->query_end($result);
963
 
964
        return [$result, $last];
965
    }
966
 
967
    /**
968
     * Closes a cursor. For use by recordset only; do not call directly.
969
     *
970
     * @param string $cursorname Name of cursor to close
971
     * @return bool True if we actually closed one, false if the transaction was cancelled
972
     */
973
    public function close_cursor($cursorname) {
974
        // If the transaction got cancelled, then ignore this request.
975
        $sql = 'CLOSE ' . $cursorname;
976
        $this->query_start($sql, [], SQL_QUERY_AUX);
977
        $result = pg_query($this->pgsql, $sql);
978
        $this->query_end($result);
979
        if ($result) {
980
            pg_free_result($result);
981
        }
982
        return true;
983
    }
984
 
985
    /**
986
     * A faster version of pg_field_type
987
     *
988
     * The pg_field_type function in the php postgres driver internally makes an sql call
989
     * to get the list of field types which it statically caches only for a single request.
990
     * This wraps it in a cache keyed by oid to avoid these DB calls on every request.
991
     *
992
     * @param resource|PgSql\Result $result
993
     * @param int $fieldnumber
994
     * @return string Field type
995
     */
996
    public function pg_field_type($result, int $fieldnumber) {
997
        static $map;
998
        $cache = $this->get_metacache();
999
 
1000
        // Getting the oid doesn't make an internal query.
1001
        $oid = pg_field_type_oid($result, $fieldnumber);
1002
        if (!$map) {
1003
            $map = $cache->get('oid2typname');
1004
        }
1005
        if ($map === false) {
1006
            $map = [];
1007
        }
1008
        if (isset($map[$oid])) {
1009
            return $map[$oid];
1010
        }
1011
        $map[$oid] = pg_field_type($result, $fieldnumber);
1012
        $cache->set('oid2typname', $map);
1013
        return $map[$oid];
1014
    }
1015
 
1016
    /**
1017
     * Get a number of records as an array of objects using a SQL statement.
1018
     *
1019
     * Return value is like:
1020
     * @see function get_records.
1021
     *
1022
     * @param string $sql the SQL select query to execute. The first column of this SELECT statement
1023
     *   must be a unique value (usually the 'id' field), as it will be used as the key of the
1024
     *   returned array.
1025
     * @param array $params array of sql parameters
1026
     * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
1027
     * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
1028
     * @return array of objects, or empty array if no records were found
1029
     * @throws dml_exception A DML specific exception is thrown for any errors.
1030
     */
1031
    public function get_records_sql($sql, array $params = null, $limitfrom = 0, $limitnum = 0) {
1032
        list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
1033
 
1034
        if ($limitnum) {
1035
            $sql .= " LIMIT $limitnum";
1036
        }
1037
        if ($limitfrom) {
1038
            $sql .= " OFFSET $limitfrom";
1039
        }
1040
 
1041
        list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1042
        $this->query_start($sql, $params, SQL_QUERY_SELECT);
1043
        $result = pg_query_params($this->pgsql, $sql, $params);
1044
        $this->query_end($result);
1045
 
1046
        // find out if there are any blobs
1047
        $numfields = pg_num_fields($result);
1048
        $blobs = array();
1049
        for ($i = 0; $i < $numfields; $i++) {
1050
            $type = $this->pg_field_type($result, $i);
1051
            if ($type == 'bytea') {
1052
                $blobs[] = pg_field_name($result, $i);
1053
            }
1054
        }
1055
 
1056
        $return = [];
1057
        while ($row = pg_fetch_assoc($result)) {
1058
            $id = reset($row);
1059
            if ($blobs) {
1060
                foreach ($blobs as $blob) {
1061
                    $row[$blob] = ($row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null);
1062
                }
1063
            }
1064
            if (isset($return[$id])) {
1065
                $colname = key($row);
1066
                debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
1067
            }
1068
            $return[$id] = (object) $row;
1069
        }
1070
 
1071
        return $return;
1072
    }
1073
 
1074
    /**
1075
     * Selects records and return values (first field) as an array using a SQL statement.
1076
     *
1077
     * @param string $sql The SQL query
1078
     * @param array $params array of sql parameters
1079
     * @return array of values
1080
     * @throws dml_exception A DML specific exception is thrown for any errors.
1081
     */
1082
    public function get_fieldset_sql($sql, array $params=null) {
1083
        list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1084
 
1085
        $this->query_start($sql, $params, SQL_QUERY_SELECT);
1086
        $result = pg_query_params($this->pgsql, $sql, $params);
1087
        $this->query_end($result);
1088
 
1089
        $return = pg_fetch_all_columns($result, 0);
1090
 
1091
        if ($this->pg_field_type($result, 0) == 'bytea') {
1092
            foreach ($return as $key => $value) {
1093
                $return[$key] = ($value === null ? $value : pg_unescape_bytea($value));
1094
            }
1095
        }
1096
 
1097
        pg_free_result($result);
1098
 
1099
        return $return;
1100
    }
1101
 
1102
    /**
1103
     * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
1104
     * @param string $table name
1105
     * @param mixed $params data record as object or array
1106
     * @param bool $returnit return it of inserted record
1107
     * @param bool $bulk true means repeated inserts expected
1108
     * @param bool $customsequence true if 'id' included in $params, disables $returnid
1109
     * @return bool|int true or new id
1110
     * @throws dml_exception A DML specific exception is thrown for any errors.
1111
     */
1112
    public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
1113
        if (!is_array($params)) {
1114
            $params = (array)$params;
1115
        }
1116
 
1117
        $returning = "";
1118
 
1119
        if ($customsequence) {
1120
            if (!isset($params['id'])) {
1121
                throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
1122
            }
1123
            $returnid = false;
1124
        } else {
1125
            if ($returnid) {
1126
                $returning = "RETURNING id";
1127
                unset($params['id']);
1128
            } else {
1129
                unset($params['id']);
1130
            }
1131
        }
1132
 
1133
        if (empty($params)) {
1134
            throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
1135
        }
1136
 
1137
        $fields = implode(',', array_keys($params));
1138
        $values = array();
1139
        $i = 1;
1140
        foreach ($params as $value) {
1141
            $this->detect_objects($value);
1142
            $values[] = "\$".$i++;
1143
        }
1144
        $values = implode(',', $values);
1145
 
1146
        $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
1147
        $this->query_start($sql, $params, SQL_QUERY_INSERT);
1148
        $result = pg_query_params($this->pgsql, $sql, $params);
1149
        $this->query_end($result);
1150
 
1151
        if ($returning !== "") {
1152
            $row = pg_fetch_assoc($result);
1153
            $params['id'] = reset($row);
1154
        }
1155
        pg_free_result($result);
1156
 
1157
        if (!$returnid) {
1158
            return true;
1159
        }
1160
 
1161
        return (int)$params['id'];
1162
    }
1163
 
1164
    /**
1165
     * Insert a record into a table and return the "id" field if required.
1166
     *
1167
     * Some conversions and safety checks are carried out. Lobs are supported.
1168
     * If the return ID isn't required, then this just reports success as true/false.
1169
     * $data is an object containing needed data
1170
     * @param string $table The database table to be inserted into
1171
     * @param object|array $dataobject A data object with values for one or more fields in the record
1172
     * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
1173
     * @return bool|int true or new id
1174
     * @throws dml_exception A DML specific exception is thrown for any errors.
1175
     */
1176
    public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
1177
        $dataobject = (array)$dataobject;
1178
 
1179
        $columns = $this->get_columns($table);
1180
        if (empty($columns)) {
1181
            throw new dml_exception('ddltablenotexist', $table);
1182
        }
1183
 
1184
        $cleaned = array();
1185
 
1186
        foreach ($dataobject as $field=>$value) {
1187
            if ($field === 'id') {
1188
                continue;
1189
            }
1190
            if (!isset($columns[$field])) {
1191
                continue;
1192
            }
1193
            $column = $columns[$field];
1194
            $cleaned[$field] = $this->normalise_value($column, $value);
1195
        }
1196
 
1197
        return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
1198
 
1199
    }
1200
 
1201
    /**
1202
     * Insert multiple records into database as fast as possible.
1203
     *
1204
     * Order of inserts is maintained, but the operation is not atomic,
1205
     * use transactions if necessary.
1206
     *
1207
     * This method is intended for inserting of large number of small objects,
1208
     * do not use for huge objects with text or binary fields.
1209
     *
1210
     * @since Moodle 2.7
1211
     *
1212
     * @param string $table  The database table to be inserted into
1213
     * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
1214
     * @return void does not return new record ids
1215
     *
1216
     * @throws coding_exception if data objects have different structure
1217
     * @throws dml_exception A DML specific exception is thrown for any errors.
1218
     */
1219
    public function insert_records($table, $dataobjects) {
1220
        if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
1221
            throw new coding_exception('insert_records() passed non-traversable object');
1222
        }
1223
 
1224
        // PostgreSQL does not seem to have problems with huge queries.
1225
        $chunksize = 500;
1226
        if (!empty($this->dboptions['bulkinsertsize'])) {
1227
            $chunksize = (int)$this->dboptions['bulkinsertsize'];
1228
        }
1229
 
1230
        $columns = $this->get_columns($table, true);
1231
 
1232
        $fields = null;
1233
        $count = 0;
1234
        $chunk = array();
1235
        foreach ($dataobjects as $dataobject) {
1236
            if (!is_array($dataobject) and !is_object($dataobject)) {
1237
                throw new coding_exception('insert_records() passed invalid record object');
1238
            }
1239
            $dataobject = (array)$dataobject;
1240
            if ($fields === null) {
1241
                $fields = array_keys($dataobject);
1242
                $columns = array_intersect_key($columns, $dataobject);
1243
                unset($columns['id']);
1244
            } else if ($fields !== array_keys($dataobject)) {
1245
                throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
1246
            }
1247
 
1248
            $count++;
1249
            $chunk[] = $dataobject;
1250
 
1251
            if ($count === $chunksize) {
1252
                $this->insert_chunk($table, $chunk, $columns);
1253
                $chunk = array();
1254
                $count = 0;
1255
            }
1256
        }
1257
 
1258
        if ($count) {
1259
            $this->insert_chunk($table, $chunk, $columns);
1260
        }
1261
    }
1262
 
1263
    /**
1264
     * Insert records in chunks, strict param types...
1265
     *
1266
     * Note: can be used only from insert_records().
1267
     *
1268
     * @param string $table
1269
     * @param array $chunk
1270
     * @param database_column_info[] $columns
1271
     */
1272
    protected function insert_chunk($table, array $chunk, array $columns) {
1273
        $i = 1;
1274
        $params = array();
1275
        $values = array();
1276
        foreach ($chunk as $dataobject) {
1277
            $vals = array();
1278
            foreach ($columns as $field => $column) {
1279
                $params[] = $this->normalise_value($column, $dataobject[$field]);
1280
                $vals[] = "\$".$i++;
1281
            }
1282
            $values[] = '('.implode(',', $vals).')';
1283
        }
1284
 
1285
        $fieldssql = '('.implode(',', array_keys($columns)).')';
1286
        $valuessql = implode(',', $values);
1287
 
1288
        $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
1289
        $this->query_start($sql, $params, SQL_QUERY_INSERT);
1290
        $result = pg_query_params($this->pgsql, $sql, $params);
1291
        $this->query_end($result);
1292
        pg_free_result($result);
1293
    }
1294
 
1295
    /**
1296
     * Import a record into a table, id field is required.
1297
     * Safety checks are NOT carried out. Lobs are supported.
1298
     *
1299
     * @param string $table name of database table to be inserted into
1300
     * @param object $dataobject A data object with values for one or more fields in the record
1301
     * @return bool true
1302
     * @throws dml_exception A DML specific exception is thrown for any errors.
1303
     */
1304
    public function import_record($table, $dataobject) {
1305
        $dataobject = (array)$dataobject;
1306
 
1307
        $columns = $this->get_columns($table);
1308
        $cleaned = array();
1309
 
1310
        foreach ($dataobject as $field=>$value) {
1311
            $this->detect_objects($value);
1312
            if (!isset($columns[$field])) {
1313
                continue;
1314
            }
1315
            $column = $columns[$field];
1316
            $cleaned[$field] = $this->normalise_value($column, $value);
1317
        }
1318
 
1319
        return $this->insert_record_raw($table, $cleaned, false, true, true);
1320
    }
1321
 
1322
    /**
1323
     * Update record in database, as fast as possible, no safety checks, lobs not supported.
1324
     * @param string $table name
1325
     * @param stdClass|array $params data record as object or array
1326
     * @param bool true means repeated updates expected
1327
     * @return bool true
1328
     * @throws dml_exception A DML specific exception is thrown for any errors.
1329
     */
1330
    public function update_record_raw($table, $params, $bulk=false) {
1331
        $params = (array)$params;
1332
 
1333
        if (!isset($params['id'])) {
1334
            throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
1335
        }
1336
        $id = $params['id'];
1337
        unset($params['id']);
1338
 
1339
        if (empty($params)) {
1340
            throw new coding_exception('moodle_database::update_record_raw() no fields found.');
1341
        }
1342
 
1343
        $i = 1;
1344
 
1345
        $sets = array();
1346
        foreach ($params as $field=>$value) {
1347
            $this->detect_objects($value);
1348
            $sets[] = "$field = \$".$i++;
1349
        }
1350
 
1351
        $params[] = $id; // last ? in WHERE condition
1352
 
1353
        $sets = implode(',', $sets);
1354
        $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1355
 
1356
        $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1357
        $result = pg_query_params($this->pgsql, $sql, $params);
1358
        $this->query_end($result);
1359
 
1360
        pg_free_result($result);
1361
        return true;
1362
    }
1363
 
1364
    /**
1365
     * Update a record in a table
1366
     *
1367
     * $dataobject is an object containing needed data
1368
     * Relies on $dataobject having a variable "id" to
1369
     * specify the record to update
1370
     *
1371
     * @param string $table The database table to be checked against.
1372
     * @param stdClass|array $dataobject An object with contents equal to fieldname=>fieldvalue.
1373
     *        Must have an entry for 'id' to map to the table specified.
1374
     * @param bool true means repeated updates expected
1375
     * @return bool true
1376
     * @throws dml_exception A DML specific exception is thrown for any errors.
1377
     */
1378
    public function update_record($table, $dataobject, $bulk=false) {
1379
        $dataobject = (array)$dataobject;
1380
 
1381
        $columns = $this->get_columns($table);
1382
        $cleaned = array();
1383
 
1384
        foreach ($dataobject as $field=>$value) {
1385
            if (!isset($columns[$field])) {
1386
                continue;
1387
            }
1388
            $column = $columns[$field];
1389
            $cleaned[$field] = $this->normalise_value($column, $value);
1390
        }
1391
 
1392
        $this->update_record_raw($table, $cleaned, $bulk);
1393
 
1394
        return true;
1395
    }
1396
 
1397
    /**
1398
     * Set a single field in every table record which match a particular WHERE clause.
1399
     *
1400
     * @param string $table The database table to be checked against.
1401
     * @param string $newfield the field to set.
1402
     * @param string $newvalue the value to set the field to.
1403
     * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1404
     * @param array $params array of sql parameters
1405
     * @return bool true
1406
     * @throws dml_exception A DML specific exception is thrown for any errors.
1407
     */
1408
    public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1409
 
1410
        if ($select) {
1411
            $select = "WHERE $select";
1412
        }
1413
        if (is_null($params)) {
1414
            $params = array();
1415
        }
1416
        list($select, $params, $type) = $this->fix_sql_params($select, $params);
1417
        $i = count($params)+1;
1418
 
1419
        // Get column metadata
1420
        $columns = $this->get_columns($table);
1421
        $column = $columns[$newfield];
1422
 
1423
        $normalisedvalue = $this->normalise_value($column, $newvalue);
1424
 
1425
        $newfield = "$newfield = \$" . $i;
1426
        $params[] = $normalisedvalue;
1427
        $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1428
 
1429
        $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1430
        $result = pg_query_params($this->pgsql, $sql, $params);
1431
        $this->query_end($result);
1432
 
1433
        pg_free_result($result);
1434
 
1435
        return true;
1436
    }
1437
 
1438
    /**
1439
     * Delete one or more records from a table which match a particular WHERE clause, lobs not supported.
1440
     *
1441
     * @param string $table The database table to be checked against.
1442
     * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1443
     * @param array $params array of sql parameters
1444
     * @return bool true
1445
     * @throws dml_exception A DML specific exception is thrown for any errors.
1446
     */
1447
    public function delete_records_select($table, $select, array $params=null) {
1448
        if ($select) {
1449
            $select = "WHERE $select";
1450
        }
1451
        $sql = "DELETE FROM {$this->prefix}$table $select";
1452
 
1453
        list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1454
 
1455
        $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1456
        $result = pg_query_params($this->pgsql, $sql, $params);
1457
        $this->query_end($result);
1458
 
1459
        pg_free_result($result);
1460
 
1461
        return true;
1462
    }
1463
 
1464
    /**
1465
     * Returns 'LIKE' part of a query.
1466
     *
1467
     * @param string $fieldname usually name of the table column
1468
     * @param string $param usually bound query parameter (?, :named)
1469
     * @param bool $casesensitive use case sensitive search
1470
     * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1471
     * @param bool $notlike true means "NOT LIKE"
1472
     * @param string $escapechar escape char for '%' and '_'
1473
     * @return string SQL code fragment
1474
     */
1475
    public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1476
        if (strpos($param, '%') !== false) {
1477
            debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1478
        }
1479
 
1480
        // postgresql does not support accent insensitive text comparisons, sorry
1481
        if ($casesensitive) {
1482
            $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1483
        } else {
1484
            $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1485
        }
1486
        return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1487
    }
1488
 
1489
    public function sql_bitxor($int1, $int2) {
1490
        return '((' . $int1 . ') # (' . $int2 . '))';
1491
    }
1492
 
1493
    /**
1494
     * Return SQL for casting to char of given field/expression
1495
     *
1496
     * @param string $field Table field or SQL expression to be cast
1497
     * @return string
1498
     */
1499
    public function sql_cast_to_char(string $field): string {
1500
        return "CAST({$field} AS VARCHAR)";
1501
    }
1502
 
1503
    public function sql_cast_char2int($fieldname, $text=false) {
1504
        return ' CAST(' . $fieldname . ' AS INT) ';
1505
    }
1506
 
1507
    public function sql_cast_char2real($fieldname, $text=false) {
1508
        return " $fieldname::real ";
1509
    }
1510
 
1511
    public function sql_concat(...$arr) {
1512
        $s = implode(' || ', $arr);
1513
        if ($s === '') {
1514
            return " '' ";
1515
        }
1516
        // Add always empty string element so integer-exclusive concats
1517
        // will work without needing to cast each element explicitly
1518
        return " '' || $s ";
1519
    }
1520
 
1521
    public function sql_concat_join($separator="' '", $elements=array()) {
1522
        for ($n=count($elements)-1; $n > 0 ; $n--) {
1523
            array_splice($elements, $n, 0, $separator);
1524
        }
1525
        $s = implode(' || ', $elements);
1526
        if ($s === '') {
1527
            return " '' ";
1528
        }
1529
        return " $s ";
1530
    }
1531
 
1532
    /**
1533
     * Return SQL for performing group concatenation on given field/expression
1534
     *
1535
     * @param string $field
1536
     * @param string $separator
1537
     * @param string $sort
1538
     * @return string
1539
     */
1540
    public function sql_group_concat(string $field, string $separator = ', ', string $sort = ''): string {
1541
        $fieldsort = $sort ? "ORDER BY {$sort}" : '';
1542
        return "STRING_AGG(" . $this->sql_cast_to_char($field) . ", '{$separator}' {$fieldsort})";
1543
    }
1544
 
1545
    /**
1546
     * Returns the SQL text to be used to order by columns, standardising the return
1547
     * pattern of null values across database types to sort nulls first when ascending
1548
     * and last when descending.
1549
     *
1550
     * @param string $fieldname The name of the field we need to sort by.
1551
     * @param int $sort An order to sort the results in.
1552
     * @return string The piece of SQL code to be used in your statement.
1553
     */
1554
    public function sql_order_by_null(string $fieldname, int $sort = SORT_ASC): string {
1555
        return parent::sql_order_by_null($fieldname, $sort) . ' NULLS ' . ($sort == SORT_ASC ? 'FIRST' : 'LAST');
1556
    }
1557
 
1558
    public function sql_regex_supported() {
1559
        return true;
1560
    }
1561
 
1562
    public function sql_regex($positivematch = true, $casesensitive = false) {
1563
        if ($casesensitive) {
1564
            return $positivematch ? '~' : '!~';
1565
        } else {
1566
            return $positivematch ? '~*' : '!~*';
1567
        }
1568
    }
1569
 
1570
    /**
1571
     * Does this driver support tool_replace?
1572
     *
1573
     * @since Moodle 2.6.1
1574
     * @return bool
1575
     */
1576
    public function replace_all_text_supported() {
1577
        return true;
1578
    }
1579
 
1580
    public function session_lock_supported() {
1581
        return true;
1582
    }
1583
 
1584
    /**
1585
     * Obtain session lock
1586
     * @param int $rowid id of the row with session record
1587
     * @param int $timeout max allowed time to wait for the lock in seconds
1588
     * @return bool success
1589
     */
1590
    public function get_session_lock($rowid, $timeout) {
1591
        // NOTE: there is a potential locking problem for database running
1592
        //       multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1593
        //       luckily there is not a big chance that they would collide
1594
        if (!$this->session_lock_supported()) {
1595
            return;
1596
        }
1597
 
1598
        parent::get_session_lock($rowid, $timeout);
1599
 
1600
        $timeoutmilli = $timeout * 1000;
1601
 
1602
        $sql = "SET statement_timeout TO $timeoutmilli";
1603
        $this->query_start($sql, null, SQL_QUERY_AUX);
1604
        $result = pg_query($this->pgsql, $sql);
1605
        $this->query_end($result);
1606
 
1607
        if ($result) {
1608
            pg_free_result($result);
1609
        }
1610
 
1611
        $sql = "SELECT pg_advisory_lock($rowid)";
1612
        $this->query_start($sql, null, SQL_QUERY_AUX);
1613
        $start = time();
1614
        $result = pg_query($this->pgsql, $sql);
1615
        $end = time();
1616
        try {
1617
            $this->query_end($result);
1618
        } catch (dml_exception $ex) {
1619
            if ($end - $start >= $timeout) {
1620
                throw new dml_sessionwait_exception();
1621
            } else {
1622
                throw $ex;
1623
            }
1624
        }
1625
 
1626
        if ($result) {
1627
            pg_free_result($result);
1628
        }
1629
 
1630
        $sql = "SET statement_timeout TO DEFAULT";
1631
        $this->query_start($sql, null, SQL_QUERY_AUX);
1632
        $result = pg_query($this->pgsql, $sql);
1633
        $this->query_end($result);
1634
 
1635
        if ($result) {
1636
            pg_free_result($result);
1637
        }
1638
    }
1639
 
1640
    public function release_session_lock($rowid) {
1641
        if (!$this->session_lock_supported()) {
1642
            return;
1643
        }
1644
        if (!$this->used_for_db_sessions) {
1645
            return;
1646
        }
1647
 
1648
        parent::release_session_lock($rowid);
1649
 
1650
        $sql = "SELECT pg_advisory_unlock($rowid)";
1651
        $this->query_start($sql, null, SQL_QUERY_AUX);
1652
        $result = pg_query($this->pgsql, $sql);
1653
        $this->query_end($result);
1654
 
1655
        if ($result) {
1656
            pg_free_result($result);
1657
        }
1658
    }
1659
 
1660
    /**
1661
     * Driver specific start of real database transaction,
1662
     * this can not be used directly in code.
1663
     * @return void
1664
     */
1665
    protected function begin_transaction() {
1666
        $this->savepointpresent = true;
1667
        $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1668
        $this->query_start($sql, null, SQL_QUERY_AUX);
1669
        $result = pg_query($this->pgsql, $sql);
1670
        $this->query_end($result);
1671
 
1672
        pg_free_result($result);
1673
    }
1674
 
1675
    /**
1676
     * Driver specific commit of real database transaction,
1677
     * this can not be used directly in code.
1678
     * @return void
1679
     */
1680
    protected function commit_transaction() {
1681
        $this->savepointpresent = false;
1682
        $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1683
        $this->query_start($sql, null, SQL_QUERY_AUX);
1684
        $result = pg_query($this->pgsql, $sql);
1685
        $this->query_end($result);
1686
 
1687
        pg_free_result($result);
1688
    }
1689
 
1690
    /**
1691
     * Driver specific abort of real database transaction,
1692
     * this can not be used directly in code.
1693
     * @return void
1694
     */
1695
    protected function rollback_transaction() {
1696
        $this->savepointpresent = false;
1697
        $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1698
        $this->query_start($sql, null, SQL_QUERY_AUX);
1699
        $result = pg_query($this->pgsql, $sql);
1700
        $this->query_end($result);
1701
 
1702
        pg_free_result($result);
1703
    }
1704
 
1705
    /**
1706
     * Helper function trimming (whitespace + quotes) any string
1707
     * needed because PG uses to enclose with double quotes some
1708
     * fields in indexes definition and others
1709
     *
1710
     * @param string $str string to apply whitespace + quotes trim
1711
     * @return string trimmed string
1712
     */
1713
    private function trim_quotes($str) {
1714
        return trim(trim($str), "'\"");
1715
    }
1716
 
1717
    /**
1718
     * Postgresql supports full-text search indexes.
1719
     *
1720
     * @return bool
1721
     */
1722
    public function is_fulltext_search_supported() {
1723
        return true;
1724
    }
1725
}